Rx有很好的功能Observable.Buffer。但在现实生活中存在问题。如何重试热观察?
场景:应用程序向数据库发送一个事件流。逐个插入事件很昂贵,所以我们需要对它进行批处理。我想为此使用Observable.Buffer。但是插入到DB中的失败几率很小(死锁,超时,停机等)。
我可以添加一些重试逻辑到批处理函数本身,但它会反对Rx的可实现性的想法。 Observable.Retry不会削减它,因为它会重新订阅“热门”来源,这意味着失败的批次将会丢失。
是否有函数,我可以编写来实现所需的效果,还是我需要实现自己的扩展?我想是这样的:
_inputBuffer = new BufferBlock<int>();
_inputBuffer.AsObservable().
Buffer(TimeSpan.FromSeconds(10), 1000).
Do(batch => SqlSaveBatch(batch)).
{Retry???}.
Subscribe()
做到尽善尽美,我希望能够克服时的onComplete被称为控制局面,而重试缓冲器有不完整的批次,并能够执行某些操作(发送错误电子邮件,将数据保存到本地文件系统等)
在完美的解决方案中,我将能够重用此重试逻辑。否则,我将不得不拿出SqlSaveBatchWithRetryLogic,RabbitMQWithRetryLogic,MemcachedWithRetryLogic等。 –
您可以将该抽象放入处理程序中,不是吗? –
我无法看到“YourOnCompleteAction”如何等待SqlSaveBatchWithRetryLogic内部缓冲区泄漏。我的意思是我可以看到如何实现它,但它会涉及这两个函数之间共享的状态变量,这会涉及更多的代码,而不是我想要的。 –