我有一个解决方案,但我会建议改变你的PollImage
方法,使其更接近Rx。
签名应该是这样的:
IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval)
你应该考虑PollImage
是可观察到的工厂,它实际上不会轮询图像,直到您订阅返回观测。这种方法的优点是它可以取消订阅(最后一个要点是要求这样做),并且干净地将轮询图像的代码和更新局部变量的代码分开。
因此,调用PollImage
则是这样的:
PollImage(imageUri, TimeSpan.FromMilliseconds(200.0))
.Subscribe(image =>
{
/* do save/update images here */
});
和实现是这样的:
private IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval)
{
Func<Stream, Image> getImageFromStream = st =>
{
/* read image from stream here */
};
return Observable.Create<Image>(o =>
{
if (_channel == null)
{
_channel = _channelFactory.CreateChannel();
}
var getImageFunc =
Observable
.FromAsyncPattern<string, Stream>(
_channel.BeginGetImage,
_channel.EndGetImage);
var query =
from ts in Observable.Timer(gapInterval)
from stream in getImageFunc(imageUri)
from img in Observable.Using(
() => stream,
st => Observable.Start(
() => getImageFromStream(st)))
select img;
return query.Do(img => { }, ex =>
{
Trace.TraceError(ex.ToString());
((ICommunicationObject)_channel).CloseOrAbort();
_channel = null;
}).Repeat().Retry().Subscribe(o);
});
}
的query
可观察等待,直到gapInterval
完成,然后调用WCF函数返回流,然后将流转换为图像。
内部return
声明做了很多事情。
首先它使用Do
运算符来捕获发生的任何异常,并像以前一样进行跟踪和通道重置。
接下来它调用.Repeat()
使查询有效地重新运行,使其在等待gapInterval
再次调用webservice之前。我本来可以使用Observable.Interval
而不是Observable.Timer
而不是query
,并将呼叫放到.Repeat()
,但这意味着每次调用web服务时都要启动gapInterval
,而不是在上次完成后等待很长时间。
接下来它调用.Retry()
它有效地重新启动observable,如果它遇到异常,以便用户永远不会看到异常。 Do
运算符捕获错误,所以这是正确的。
最后,它订阅观察者并返回IDisposable
,允许调用代码取消订阅。
除了实现getImageFromStream
函数外,就是这样。
现在谨慎一句。很多人误解如何订阅可观察的作品,这会导致很难发现错误。
拿这个作为一个例子:
var xs = Observable.Interval(TimeSpan.FromSeconds(1.0));
var s1 = xs.Subscribe(x => { });
var s2 = xs.Subscribe(x => { });
两个s1
& s2
订阅xs
,但不是分享他们各自创建一个定时器一个计时器。您创建了两个内部工作实例Observable.Interval
,而不是一个实例。
现在,这是观察者的正确行为。如果其中一个失败,那么另一个不会因为它们不共享任何内部组件而彼此隔离。
但是,在您的代码(和我的事)中,您有一个潜在的线程问题,因为您通过多次调用PollImage
共享_channel
。如果一个呼叫失败,它将重置信道,这会导致并发呼叫,从而导致失败。
我的建议是,您为每次调用创建一个新通道以防止并发问题。
我希望这会有所帮助。
问题是什么? – svick
如何使用Rx优雅地解决问题点 – Schneider