制作一个快速断路器

final Broadcaster<String> closeCircuit = Broadcaster.create(); 
final Stream<String> openCircuit = Streams.just("Alternative Message"); 

final Action<Publisher<? extends String>, String> circuitSwitcher = Streams.switchOnNext(); 

final AtomicInteger successes = new AtomicInteger(); 
final AtomicInteger failures = new AtomicInteger();

final int maxErrors = 3;

Promise<List<String>> promise = 
                circuitSwitcher 
                        .observe(d -> successes.incrementAndGet()) 
                        .when(Throwable.class, error -> failures.incrementAndGet())
                        .observeStart(s -> { 

                                System.out.println("failures: " + failures +
                                         " successes:" + successes);

                                if (failures.compareAndSet(maxErrors, 0)) {
                                        circuitSwitcher.onNext(openCircuit); 
                                        successes.set(0);

                                        Streams
                                                .timer(1)  
                                                .consume(ignore -> circuitSwitcher.onNext(closeCircuit));
                                }
                        })
                        .retry() 
                        .toList(); 

circuitSwitcher.onNext(closeCircuit); 

closeCircuit.onNext("test1");
closeCircuit.onNext("test2");
closeCircuit.onNext("test3");
closeCircuit.onError(new Exception("test4"));
closeCircuit.onError(new Exception("test5"));
closeCircuit.onError(new Exception("test6"));
Thread.sleep(1500); 
closeCircuit.onNext("test7");
closeCircuit.onNext("test8");
closeCircuit.onComplete();  
circuitSwitcher.onComplete();

System.out.println(promise.await());
Assert.assertEquals(promise.get().get(0), "test1");
Assert.assertEquals(promise.get().get(1), "test2");
Assert.assertEquals(promise.get().get(2), "test3");
Assert.assertEquals(promise.get().get(3), "Alternative Message");
Assert.assertEquals(promise.get().get(4), "test7");
Assert.assertEquals(promise.get().get(5), "test8");