错误处理

Broadcaster<String> broadcaster = Broadcaster.create();

Promise<List<String>> promise =
    broadcaster
        .timeout(1, TimeUnit.SECONDS, Streams.fail(new Exception("another one!"))) 
        .onErrorResumeNext(Streams.just("Alternative Message")) 
        .toList();

broadcaster.onNext("test1");
broadcaster.onNext("test2");
Thread.sleep(1500);

try {
  broadcaster.onNext("test3");
} catch (CancelException ce) {
  //Broadcaster has no subscriber, timeout disconnected the pipeline
}

promise.await();

assertEquals(promise.get().get(0), "test1");
assertEquals(promise.get().get(1), "test2");
assertEquals(promise.get().get(2), "Alternative Message");
Stream<T> API 作用