制作一个简单的文件流

 Publisher<String> fileStream = new Publisher<String>() { 
          @Override
          public void subscribe(final Subscriber<? super String> subscriber) {
                  final File file = new File("settings.gradle"); 
  
                  try {
                          final BufferedReader is = new BufferedReader(new FileReader(file)); 
  
                          subscriber.onSubscribe(new Subscription() {
  
                                  final AtomicBoolean terminated = new AtomicBoolean(false);
  
                                  @Override
                                  public void request(long n) {
                                          long requestCursor = 0l;
                                          try {
                                                  String line;
                                                  while ((requestCursor++ < n || n == Long.MAX_VALUE) 
                                                                  && !terminated.get()) { 
  
                                                          line = is.readLine();
                                                          if (line != null) {
                                                                  subscriber.onNext(line);
                                                          } else {
                                                                  if (terminate()) {
                                                                          subscriber.onComplete(); 
                                                                  }
                                                                  return;
                                                          }
                                                  }
                                          } catch (IOException e) {
                                                  if (terminate()) {
                                                          subscriber.onError(e); 
                                                  }
                                          }
                                  }
  
                                  @Override
                                  public void cancel() {
                                          terminate();
                                  }
  
                                  private boolean terminate() {
                                          if (terminated.compareAndSet(false, true)) {
                                                  try {
                                                          is.close(); 
                                                  } catch (Exception t) {
                                                          subscriber.onError(t);
                                                  }
                                                  return true;
                                          }
                                          return false;
                                  }
                          });
  
                  } catch (FileNotFoundException e) {
                          Streams.<String, FileNotFoundException> fail(e)
                                  .subscribe(subscriber); 
                  }
          }
  };
  
  Streams.wrap(fileStream)
          .capacity(4L) 
          .consumeOn( 
                  Environment.sharedDispatcher(),
                  System.out::println,
                  Throwable::printStackTrace,
                  nothing -> System.out.println("## EOF ##")
  );
final String filename = "settings.gradle";
Publisher<String> fileStream = PublisherFactory.create(
        (n, sub) -> { 
                String line;
                final BufferedReader inputStream = sub.context() 
                long requestCursor = 0l;
                while ((requestCursor++ < n || n == Long.MAX_VALUE) && !sub.isCancelled()) { 

                        try {
                                line = inputStream.readLine();
                                if (line != null) {
                                        sub.onNext(line);
                                } else {
                                        sub.onComplete(); 
                                        return;
                                }
                        }
                        catch (IOException exc) {
                                sub.onError(exc);
                        }
                }
        },
        sub -> new BufferedReader(new FileReader(filename)), 
        inputStream -> inputStream.close() 
);

Streams
        .wrap(fileStream)
        .process(RingBufferProcessor.create())
        .capacity(4L)
        .consume(
                System.out::println,
                Throwable::printStackTrace,
                nothing -> System.out.println("## EOF ##")
);