从自定义 Reactive Publisher 处创建

final Stream<String> stream1 = Streams.create(new Publisher<String>() {
  @Override
  public void subscribe(Subscriber<? super String> sub) {
    sub.onSubscribe(new Subscription() { //1
      @Override
      public void request(long demand) {
        if(demand == 2L){
          sub.onNext("1");
          sub.onNext("2");
          sub.onComplete();
        }
      }

      @Override
      public void cancel() {
        System.out.println("Cancelled!");
      }
    });
  }
});

final Stream<String> stream2 = Streams.create(sub -> {
  sub.onNext("3"); //2
  sub.onNext("4");
  sub.onComplete();
});

final AtomicInteger counterSubscriber = new AtomicInteger();

Stream<String> deferred = Streams.defer(() -> {
  if (counterSubscriber.incrementAndGet() == 1) { //3
    return stream1;
  }
  else {
     return stream2;
  }
});

deferred
  .consume(s -> System.out.printf("%s First subscription = %s%n", Thread.currentThread(), s));
deferred
  .consume(s -> System.out.printf("%s Second subscription = %s%n", Thread.currentThread(), s));
final Stream<String> stream = Streams.createWith(
  (demand, sub) -> { //1
      sub.context(); //2
      if (demand >= 2L && !sub.isCancelled()) {
          sub.onNext("1");
          sub.onNext("2");
          sub.onComplete();
      }
  },
  sub -> 0, //3
  sub -> System.out.println("Cancelled!") //4
);

stream.consume(s -> System.out.printf("%s greeting = %s%n", Thread.currentThread(), s));