2014-04-28 141 views
1

Rx有很好的功能Observable.Buffer。但在现实生活中存在问题。如何重试热观察?

场景:应用程序向数据库发送一个事件流。逐个插入事件很昂贵,所以我们需要对它进行批处理。我想为此使用Observable.Buffer。但是插入到DB中的失败几率很小(死锁,超时,停机等)。

我可以添加一些重试逻辑到批处理函数本身,但它会反对Rx的可实现性的想法。 Observable.Retry不会削减它,因为它会重新订阅“热门”来源,这意味着失败的批次将会丢失。

是否有函数,我可以编写来实现所需的效果,还是我需要实现自己的扩展?我想是这样的:

_inputBuffer = new BufferBlock<int>(); 
_inputBuffer.AsObservable(). 
    Buffer(TimeSpan.FromSeconds(10), 1000). 
    Do(batch => SqlSaveBatch(batch)). 
    {Retry???}. 
    Subscribe() 

做到尽善尽美,我希望能够克服时的onComplete被称为控制局面,而重试缓冲器有不完整的批次,并能够执行某些操作(发送错误电子邮件,将数据保存到本地文件系统等)

回答

3

当保存到数据库失败并需要重试时,它不是真正的流或错误的事件,它是针对事件采取的操作。

我会组织你的代码更是这样的:

IDisposable subscription = 
    _inputBuffer.AsObservable(). 
    Buffer(TimeSpan.FromSeconds(10), 1000). 
    Subscribe(
     batch => SqlSaveBatchWithRetryLogic(batch), 
     () => YourOnCompleteAction); 
  • 可以提供内部SqlSaveBatchWithRetryLogic()
  • 事件
  • 手柄的onComplete内YourOnCompleteAction()
  • 您可以选择配置的重试逻辑如果您未能保存批次,请在SqlSaveBatchWithRetryLogic()之内订阅。
  • 这也消除Do副作用。

虽然我会小心这种方法 - 您需要观察重试逻辑。你没有背压(减慢输入的方式)。因此,如果您有任何退避/重试,您将冒着备份和填充内存的风险。如果您开始在计数限制内始终看到批次,那么您可能会遇到麻烦!您可能想要实施一个计数器来监视未完成的项目。

+0

在完美的解决方案中,我将能够重用此重试逻辑。否则,我将不得不拿出SqlSaveBatchWithRetryLogic,RabbitMQWithRetryLogic,MemcachedWithRetryLogic等。 –

+0

您可以将该抽象放入处理程序中,不是吗? –

+0

我无法看到“YourOnCompleteAction”如何等待SqlSaveBatchWithRetryLogic内部缓冲区泄漏。我的意思是我可以看到如何实现它,但它会涉及这两个函数之间共享的状态变量,这会涉及更多的代码,而不是我想要的。 –