2016-07-29 20 views
1

简而言之,是否有任何解决方案可以解决RxJava中的背压问题,而无需借助丢弃物品,序列化操作或无限制缓冲?处理背压而不丢弃物品或在RxJava中序列化

考虑以下任务作为何时可能有用的示例。从磁盘

  1. 将数据读入存储器
  2. 压缩数据
  3. 通过网络

的直接的方法是做在单个后台线程的所有任务顺序地传输压缩数据,作为在:

observeBlocksOfFileContents(file). 
    .subscribeOn(backgroundScheduler) 
    .map(compressBlock) 
    .subscribe(transmitBlock); 

虽然这工作没有问题,从性能Ë角度看,它是次优的运行时间是所有三个业务的总和,因为它在并行运行的,而不是最大的人:

observeBlocksOfFileContents(file). 
    .subscribeOn(diskScheduler) 
    .observeOn(cpuScheduler) 
    .map(compressBlock) 
    .observeOn(networkScheduler) 
    .subscribe(transmitBlock); 

如果数据是从读这可以但是会由于背压磁盘比它能被压缩和传输的速度快。通常的反压解决方案是不可取的,原因如下:

  1. 掉落物品:文件必须完全无缺件
  2. 序列化在单个线程传输:流水线性能的改善丢失
  3. 调用栈阻塞:not supported in RxJava
  4. 增加observeOn缓冲区:内存消耗可能会成为几十倍的文件大小
  5. 重新实现observeOn没有MissingBackpressureException:大量的工作,打破流畅API

还有其他解决方案吗?或者这是从根本上不适合ReactiveX可观察模型的东西?

+0

我不明白如果从'physicalFileContents'发出'compress'的数据来源是从物理的角度来看的话。此操作无法并行运行。 – Divers

+0

我应该指出,该文件被观察为许多内容块,而不是一块,在这种情况下,是的,它不能并行运行。我已经更新了这个问题来反映这一点。 –

回答

1

6)实现observeBlocksOfFileContents,使其支持背压。

文件系统已经拉式(InputStream.read()发生在你想让它和你不抛出),所以认为合理的块大小和读取,在每个请求:

Observable.create(SyncOnSubscribe.createStateful(
    () -> new FileInputStream("file.dat") 
    (in, out) -> { 
     byte[] buf = new byte[4096]; 
     int r = in.read(buf); 
     if (r < 0) { 
      out.onCompleted(); 
     } else { 
      if (r == buf.length) { 
       out.onNext(buf); 
      } else { 
       byte[] buf2 = new byte[r]; 
       System.arraycopy(buf, 0, buf2, 0, r); 
       out.onNext(buf2); 
      } 
     } 

    }, 
    in -> in.close() 
)); 

(为简洁起见,省略了try-catch。)