绑定一个 Stream

import static reactor.Environment.*;
import reactor.rx.Streams;
import reactor.rx.Stream;
//...

Stream<String> stream = Streams.just("a","b","c","d","e","f","g","h");

//prepare two unique pipelines
Stream<String> actionChain1 = stream.map(String::toUpperCase).filter(w -> w.equals("C"));
Stream<Long> actionChain2 = stream.dispatchOn(sharedDispatcher()).take(5).count();

actionChain1.consume(System.out::println); //start chain1
Control c = actionChain2.consume(System.out::println); //start chain2
//...
c.cancel(); //force this consumer to stop receiving data
import static reactor.Environment.*;
import reactor.rx.Streams;
import reactor.rx.Stream;
//...

Stream<String> stream = Streams.just("a","b","c","d","e","f","g","h");

//prepare a shared pipeline
Stream<String> sharedStream = stream.observe(System.out::println).broadcast();

//prepare two unique pipelines
Stream<String> actionChain1 = sharedStream.map(String::toUpperCase).filter(w -> w.equals("C"));
Stream<Long> actionChain2 = sharedStream.take(5).count();

actionChain1.consume(System.out::println); //start chain1
actionChain2.consume(System.out::println); //start chain2
Stream<T> 方法 返回类型 作用