在Rx中有一个行为合约,其中流只能是OnNext*(OnError|OnCompleted)
。换言之,零或多个OnNext
以及最后只有一个OnError
或OnCompleted
。
所以,不,你不能配置Rx。如果你这样做,它将不再是Rx。
但是,您可以做的是编写一个可以重试源代码的查询。
如果你写你这样的代码:
async Task<int> ActionAsync(int i)
{
if (i == 2)
throw new Exception();
return i;
}
void Main()
{
var sb = new Subject<int>();
sb
.SelectMany(ActionAsync)
.Do(_ => { }, ex => ex.Dump())
.Retry()
.Subscribe(_ => _.Dump());
sb.OnNext(1);
sb.OnNext(2);
sb.OnNext(3);
}
然后你就可以获得:
1
Exception of type 'System.Exception' was thrown.
3
根据您的意见询问性能问题,不存在任何性能问题在使用.Retry()
,但有一个行为问题。
如果源是冷 - 像var sb = new [] { 1, 2, 3 }.ToObservable();
- 那么.Retry()
将重新开始与整个观察到的序列,造成的无限序列:
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
1
Exception of type 'System.Exception' was thrown.
...
在你的代码的情况下,可观察到的是一个炎热的观察到这样这不会发生。
如果你想在冷的可观察点上做到这一点,你需要通过.Publish(...)
使它变热。像这样:
var sb = new[] { 1, 2, 3 }.ToObservable();
sb
.Publish(sbp =>
sbp
.SelectMany(ActionAsync)
.Do(_ => { }, ex => ex.Dump())
.Retry())
.Subscribe(_ => _.Dump());
然后预期的行为返回。
没有Rx你不能。:-) –