2013-07-25 28 views
0

我有类似是否正确扔OperationCanceledException内Observable.FromAsync

observable.Select(x => Observable.FromAsync(token => Task.Run(() => 
{ 
    // ... do some work ... 

    token.ThrowIfCancellationRequested(); 

    // ... do some more work ... 

    return 7 * x; 
}, token))) 
.Switch() 
.Subscribe(x => 
{ 
    // Whatever 
}); 

Task内可以发生一个需要一段时间工作的一些代码,所以如果Switch已经转移到另一个任务(因为新的价值抵达observable)我想放弃做这项工作。

拨打ThrowIfCancellationRequested()是否正确?即一旦从FromAsync返回IObservable已被处置(并取消令牌),我的OperationCanceledException将被忽略 - 或者它会导致其他地方的问题?

我在一个测试应用程序尝试了这一点,似乎就ok了工作 - 但是,这并不意味着它是正确的:)

回答

1

在底层,task.ToObservable()最终调用。 ToObservable增加了一个延续到最后也做这个任务:这意味着可观察会失败,TaskCanceledException

switch (task.Status) 
{ 
case TaskStatus.RanToCompletion: 
    subject.OnNext(Unit.Default); 
    subject.OnCompleted(); 
    return; 
case TaskStatus.Canceled: 
    subject.OnError(new TaskCanceledException(task)); 
    return; 
case TaskStatus.Faulted: 
    subject.OnError(task.Exception.InnerException); 
    return; 
default: 
    return; 
} 

。但在你的代码发生这种情况时,没有观察者观察到AsyncSubject,所以没有任何反应。所以它似乎没问题。