函数式组合

Stream<String> st;

st.observe(s -> LOG.info("Got input [{}] on thread [{}}]", s, Thread.currentThread())) //1
  .observeComplete(v -> LOG.info("Stream is complete")) //2
  .observeError(Throwable.class, (o, t) -> LOG.error("{} caused an error: {}", o, t)) //3
  .consume(s -> service.doWork(s)); //4
Stream<String> st;

st.filter(s -> s.startsWith("Hello")) //1
  .consume(s -> service.doWork(s)); //2
Streams
  .range(1, 100)
  .take(50) //1
  .consume(
    System.out::println,
    Throwable::printStackTrace,
    avoid -> System.out.println("--complete--")
  );
Streams
  .range(1, 100)
  .map(number -> ""+number) //1
  .consume(System.out::println);
Streams
  .range(1, 100)
  .flatMap(number -> Streams.range(1, number).subscribeOn(Environment.workDispatcher()) ) //1
  .consume(
    System.out::println, //2
    Throwable::printStackTrace,
    avoid -> System.out.println("--complete--")
  );
Promise<List<Long>> result = Streams
  .range(1, 100)
  .subscribeOn(Environment.workDispatcher())
  .toList(); //1

System.out.println(result.await()); //2
result.onSuccess(System.out::println); //3
函数式API或者工厂方法 作用