2014-01-14 132 views
1

我有这个类的解释我的问题:实现Rx处理程序的最佳实践是什么?

public class DataObserver: IDisposable 
{ 
    private readonly List<IDisposable> _subscriptions = new List<IDisposable>(); 
    private readonly SomeBusinessLogicServer _server; 

    public DataObserver(SomeBusinessLogicServer server, IObservable<SomeData> data) 
    { 
     _server = server; 
     _subscriptions.Add(data.Subscribe(TryHandle)); 
    } 

    private void TryHandle(SomeData data) 
    { 
     try 
     { 
      _server.MakeApiCallAsync(data).Wait(); 
     } 
     catch (Exception) 
     { 
      // Handle exceptions somehow! 
     } 
    } 

    public void Dispose() 
    { 
     _subscriptions.ForEach(s => s.Dispose()); 
     _subscriptions.Clear(); 
    } 

}

A)我怎样才能避免TryHandle()函数中阻塞? B)你将如何发布在该函数内捕获的异常以正确处理它们?

回答

3

在RX设计准则提供很多有用的建议写自己的Rx运营商时:

http://go.microsoft.com/fwlink/?LinkID=205219

我敢肯定,我会被骂得狗血淋头链接到外部文章,但这种联系已经好几年,它的太在SO上重新发布。

+1

顺便提一句,我建议将此文档添加到源代码库 - 如果发生这种情况,我会回复。 –

+1

感谢James给Rx操作员提示。很有帮助。 :-) –

2

首先,来看看CompositeDisposable而不是自己重新实现它。

除此之外,你的问题有很多答案。我发现,我在与Rx合作时所获得的最佳见解是意识到,大多数您想要订阅的案例实际上只是您正在构建的可观察项目中的更多链接,而您并不是真的想要订阅,而是想要应用对即将到来的可观察数据的又一次转换。而让一些代码,是进一步“对系统的边缘”,并对于如何处理错误做实际订阅

在这个例子中,你已经提出了更多的知识:

A)不要阻塞只是将IObservable<SomeData>转换为IObservable<Task>(这实际上更好地表示为IObservable<IObservable<Unit>>)。 B)通过仅以错误结束observable来发布异常,或者如果您不希望异常结束observable,则会暴露IObservable<Exception>

下面是我重新写你的例子,假设你不想流结束的错误,而是自顾自地报告错误后运行:

public static class DataObserver 
{ 
    public static IObservable<Exception> ApplyLogic(this IObservable<SomeData> source, SomeBusinessLogicServer server) 
    { 
     return source 
      .Select(data => 
       { 
        // execute the async method as an observable<Unit> 
        // ignore its results, but capture its error (if any) and yield it. 
        return Observable 
         .FromAsync(() => server.MakeApiCallAsync(data)) 
         .IgnoreElements() 
         .Select(_ => (Exception)null) // to cast from IObservable<Unit> to IObservable<Exception> 
         .Catch((Exception e) => Observable.Return(e)); 
       }) 
      // runs the Api calls sequentially (so they will not run concurrently) 
      // If you prefer to let the calls run in parallel, then use 
      // .Merge() instead of .Concat() 
      .Concat() ; 
    } 
} 


// Usage (in Main() perhaps) 
IObservable<SomeData> dataStream = ...; 
var subscription = dataStream.ApplyLogic(server).Subscribe(error => 
{ 
    Console.WriteLine("An error occurred processing a dataItem: {0}", error); 
}, fatalError => 
{ 
    Console.WriteLine("A fatal error occurred retrieving data from the dataStream: {0}", fatalError); 
});