2016-09-07 56 views
2

我有独立事件流,它们与反应扩展异步处理。该处理程序可能因任何原因失败,但该流仍在继续。C#Rx - 忽略错误

但是,在Rx中,发生错误后,它会自动取消订阅。这在某种程度上是可配置的吗?

实施例:

async Task<Unit> ActionAsync(int i) 
{ 
    if (i > 1) 
     throw new Exception(); 

    i.Dump();  
    return Unit.Default; 
} 

void Main() 
{ 
    var sb = new Subject<int>(); 

    sb.SelectMany(ActionAsync).Subscribe(
     _ => { }, 
     ex => 
     { 
      ex.Dump(); 
     } 
    ); 


    sb.OnNext(1); 
    sb.OnNext(2); 
    sb.OnNext(3); 
} 

我想有以下输出:

  • 异常

我可以实现这一点没有尝试/赶上ActionAsync

+0

没有Rx你不能。:-) –

回答

6

在Rx中有一个行为合约,其中流只能是OnNext*(OnError|OnCompleted)。换言之,零或多个OnNext以及最后只有一个OnErrorOnCompleted

所以,不,你不能配置Rx。如果你这样做,它将不再是Rx。

但是,您可以做的是编写一个可以重试源代码的查询。

如果你写你这样的代码:

async Task<int> ActionAsync(int i) 
{ 
    if (i == 2) 
     throw new Exception(); 

    return i; 
} 

void Main() 
{ 
    var sb = new Subject<int>(); 

    sb 
     .SelectMany(ActionAsync) 
     .Do(_ => { }, ex => ex.Dump()) 
     .Retry() 
     .Subscribe(_ => _.Dump()); 

    sb.OnNext(1); 
    sb.OnNext(2); 
    sb.OnNext(3); 
} 

然后你就可以获得:

 
1 
Exception of type 'System.Exception' was thrown. 
3 

根据您的意见询问性能问题,不存在任何性能问题在使用.Retry(),但有一个行为问题。

如果源是冷 - 像var sb = new [] { 1, 2, 3 }.ToObservable(); - 那么.Retry()将重新开始与整个观察到的序列,造成的无限序列:

 
1 
Exception of type 'System.Exception' was thrown. 
1 
Exception of type 'System.Exception' was thrown. 
1 
Exception of type 'System.Exception' was thrown. 
1 
Exception of type 'System.Exception' was thrown. 
1 
Exception of type 'System.Exception' was thrown. 
1 
Exception of type 'System.Exception' was thrown. 
... 

在你的代码的情况下,可观察到的是一个炎热的观察到这样这不会发生。

如果你想在冷的可观察点上做到这一点,你需要通过.Publish(...)使它变热。像这样:

var sb = new[] { 1, 2, 3 }.ToObservable(); 

sb 
    .Publish(sbp => 
     sbp 
      .SelectMany(ActionAsync) 
      .Do(_ => { }, ex => ex.Dump()) 
      .Retry()) 
    .Subscribe(_ => _.Dump()); 

然后预期的行为返回。

+0

这是什么性能影响? Retry()调用是否昂贵? – nothrow

+0

@Yossarian - 没有任何性能问题,但是有行为问题。我会附上答案。 – Enigmativity

0

使用Materialize

async Task<Unit> ActionAsync(int i) 
    { 
     if (i > 1) 
      throw new Exception(); 

     i.Dump(); 
     return Unit.Default; 
    } 

    void Main() 
    { 
     var sb = new Subject<int>(); 
     sb.SelectMany(i => Observable.FromAsync(() => ActionAsync(i)).Materialize()) 
      .Subscribe(item => 
      { 
       if (item.Kind == NotificationKind.OnError) 
       { 
        item.Exception.Dump(); 
       } 
       //else if (item.Kind == NotificationKind.OnNext) 
       //{ 
       // var value = item.Value; 
       //} 
       //else if (item.Kind == NotificationKind.OnCompleted) 
       //{ 
       //} 
      } 
     ); 
     sb.OnNext(1); 
     sb.OnNext(2); 
     sb.OnNext(3); 
    } 

注意,如果核心逻辑保持不变,你将永远不会输出“3”既然你有例外(I> 1),其中包括3你可能想改变(i> 1)到(i == 2)以获得问题中指定的输出。