10.12.1. 自定义 Publisher / Subscriber
package cn.netkiller.test;
import java.util.concurrent.Flow;
public class Test {
public static void main(String[] args) {
IntPublisher intPublisher = new IntPublisher();
IntSubscriber intSubscriber = new IntSubscriber();
intPublisher.subscribe(intSubscriber);
}
public static class IntPublisher implements Flow.Publisher<Integer> {
@Override
public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
for (int i = 1; i <= 5; i++) {
System.out.println("Publishing = " + i);
// 将数据发给订阅者
subscriber.onNext(i);
}
// 发出完成信号
subscriber.onComplete();
}
}
public static class IntSubscriber implements Flow.Subscriber<Integer> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
System.out.println("onSubscribe");
}
@Override
public void onNext(Integer item) {
System.out.println("onNext: " + item);
}
@Override
public void onError(Throwable throwable) {
System.out.println("onError:" + throwable);
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
}
}
10.12.2. SubmissionPublisher
package cn.netkiller.test;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.stream.IntStream;
public class SubmissionPublisherTest {
public static void main(String[] args) throws InterruptedException {
System.out.println(Thread.currentThread().getName());
// 1. 定义 String 类型的数据发布者,SubmissionPublisher 实现了 Publisher
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
// 2. 创建一个订阅者,用于接收发布者的消息
Flow.Subscriber<String> subscriber = new Flow.Subscriber<>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("onSubscribe");
this.subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("onNext: " + item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("Throwable: " + throwable.getMessage());
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
// 3. 发布者和订阅者需要建立关系
publisher.subscribe(subscriber);
publisher.submit("https://www.netkiller.cn");
// 4. 发布者开始发布数据
IntStream.range(0, 11).mapToObj(i -> "publisher -----> message " + i).peek(message -> publisher.submit(message)).forEach(System.out::println);
// 5. 发布结束后,关闭发布者
publisher.close();
// main线程延迟关闭,不然订阅者还没接收完消息,线程就被关闭了
Thread.currentThread().join(1000);
}
}
main
onSubscribe
onNext: https://www.netkiller.cn
publisher -----> message 0
publisher -----> message 1
publisher -----> message 2
publisher -----> message 3
publisher -----> message 4
onNext: publisher -----> message 0
onNext: publisher -----> message 1
onNext: publisher -----> message 2
onNext: publisher -----> message 3
onNext: publisher -----> message 4
onNext: publisher -----> message 5
publisher -----> message 5
publisher -----> message 6
publisher -----> message 7
publisher -----> message 8
publisher -----> message 9
publisher -----> message 10
onNext: publisher -----> message 6
onNext: publisher -----> message 7
onNext: publisher -----> message 8
onNext: publisher -----> message 9
onNext: publisher -----> message 10
onComplete
package cn.netkiller.test;
import lombok.SneakyThrows;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class Test {
@SneakyThrows
public static void main(String[] args) {
Test test = new Test();
try (final var publisher = new SubmissionPublisher<String>()) {
final var processor = new UpperCaseProcessor();
publisher.subscribe(processor);
processor.subscribe(new Flow.Subscriber<String>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Subscriber.onSubscribe");
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("Subscriber.onNext: " + item);
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("Subscriber.onError: " + throwable);
}
@Override
public void onComplete() {
System.out.println("Subscriber.onComplete");
}
}
);
publisher.submit("abc");
publisher.submit("xyz");
publisher.submit("neo");
publisher.submit("netkiller");
}
Thread.currentThread().join(1000);
}
static class UpperCaseProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
System.out.println("Processor.onSubscribe");
subscription.request(1);
}
@Override
public void onNext(String item) {
System.out.println("Processor.onNext: " + item);
submit(item.toUpperCase());
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
System.out.println("Processor.onError: " + throwable.getMessage());
closeExceptionally(throwable);
}
@Override
public void onComplete() {
System.out.println("Processor.onComplete");
close();
}
}
}
Processor.onSubscribe
Subscriber.onSubscribe
Processor.onNext: abc
Processor.onNext: xyz
Processor.onNext: neo
Processor.onNext: netkiller
Processor.onComplete
Subscriber.onNext: ABC
Subscriber.onNext: XYZ
Subscriber.onNext: NEO
Subscriber.onNext: NETKILLER
Subscriber.onComplete