分析

Broadcaster<Integer> source = Broadcaster.<Integer> create(Environment.get());
long avgTime = 50l;

Promise<Long> result = source
    .throttle(avgTime)  (1)
    .elapsed()  (2)
    .nest()     (3)
    .flatMap(self ->
            BiStreams.reduceByKey(self, (prev, next) -> prev + 1)    (4)
    )
    .sort((a,b) -> a.t1.compareTo(b.t1))    (5)
    .log("elapsed")
    .reduce(-1L, (acc, next) ->
            acc > 0l ? ((next.t1 + acc) / 2) : next.t1  (6)
    )
    .next();    (7)

for (int i = 0; i < 10; i++) {
  source.onNext(1);
}
source.onComplete();
03:14:42.013 [main] INFO  elapsed - subscribe: ScanAction
03:14:42.021 [main] INFO  elapsed - onSubscribe: {push}
03:14:42.022 [main] INFO  elapsed - request: 9223372036854775807
03:14:42.517 [hash-wheel-timer-run-3] INFO  elapsed - onNext: 44,1
03:14:42.518 [hash-wheel-timer-run-3] INFO  elapsed - onNext: 48,1
03:14:42.518 [hash-wheel-timer-run-3] INFO  elapsed - onNext: 49,2
03:14:42.518 [hash-wheel-timer-run-3] INFO  elapsed - onNext: 50,3
03:14:42.518 [hash-wheel-timer-run-3] INFO  elapsed - onNext: 51,3
03:14:42.519 [hash-wheel-timer-run-3] INFO  elapsed - complete: SortAction
03:14:42.520 [hash-wheel-timer-run-3] INFO  elapsed - cancel: SortAction
Stream<T> API 或工厂函数 输出值 作用