编程文汇

Reactor3 自定义Publisher

介绍使用Reactor的文章很多, 介绍如何利用Reactor封装现有异步库的少, 找了半天, 找到一篇文章, 然后自己写了个简单的例子, 运行这个例子 可以看出来执行流程:

public class Test {
  @SneakyThrows
  public static void main(String[] args) {
    Flux.from(new Publisher<String>() {
      @Override
      public void subscribe(Subscriber<? super String> s) {
        s.onSubscribe(new Subscription() {
          int got = 10;
          boolean canceled = false;

          @Override
          public void request(long n) {
            System.out.println(n);
            for (var i = 0; i < n && got >= 0 && !canceled; i++) {
              System.out.println("send");
              s.onNext(String.valueOf(got--));
            }
            if (got < 0 || canceled)
              s.onComplete();
          }

          @Override
          public void cancel() {
            canceled = true;
          }
        });
      }
    }).onErrorStop().doOnCancel(() -> {
      System.out.println("canceled.");
    }).doOnTerminate(() -> {
      System.out.println("done");
    }).subscribe( new Subscriber<String>() {
       private Subscription s;
      @Override
      public void onSubscribe(Subscription s) {
        this.s = s;
        System.out.println(s);
        s.request(5);
      }

      @Override
      public void onNext(String t) {
        System.out.println(t);
        s.cancel();
      }

      @Override
      public void onError(Throwable t) {
        System.out.println("onError");
      }

      @Override
      public void onComplete() {
        System.out.println("onComplete");
      }
    });
  }
}

总结下来, 写一个Publisher, 要注意响应订阅, 以及发布信息的流程:

  1. 调用onSubscribe, 传递过去一个Subscription
  2. 发送消息是调用onNext
  3. 正常结束时调用onComplete
  4. 异常结束, 调用onError
    后三步调用 , 一般在Subscription中完成.

Mastering own Reactive-Streams implementation. Part 1 - Publisher.