2011-09-26 47 views
1

我有一个采用“URI”并返回图像(作为Stream)的异步WCF服务。如何使用Rx通过异步WCF服务轮询图像

想我想要做的是:

  • 确保有效的WCF通道存在,如果没有创建它
  • 使异步服务调用
  • 如果成功,将图像保存一个成员变量
  • 如果出现异常,请关闭通道
  • 无论是失败还是成功,请等待200ms再重新开始(永久循环或直到取消)

到目前为止,我想出了这个畸形:

private void PollImage(string imageUri) 
    { 
     const int pollingHertz = 1; 
     const int millisecondsTimeout = 1000/pollingHertz; 
     Thread.Sleep(millisecondsTimeout); 

     if (_channel == null) 
     { 
      _channel = _channelFactory.CreateChannel(); 
     } 

     var getImageFunc = Observable.FromAsyncPattern<string, Stream> 
            (_channel.BeginGetImage, _channel.EndGetImage); 

     getImageFunc(imageUri) 
      .Finally(() => PollImage(imageUri)) 
      .Subscribe(
       stream => UpdateImageStream(imageUri, stream), 
       ex => 
        { 
         Trace.TraceError(ex.ToString()); 
         ((ICommunicationObject) _channel).CloseOrAbort(); 
         _channel = null; 
        }); 
    } 

我真的想学习的Rx但每次我尝试一次,我被甩在抓我的头。

任何人都在意给我一些指针呢?谢谢

+0

问题是什么? – svick

+0

如何使用Rx优雅地解决问题点 – Schneider

回答

8

我有一个解决方案,但我会建议改变你的PollImage方法,使其更接近Rx。

签名应该是这样的:

IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval) 

你应该考虑PollImage是可观察到的工厂,它实际上不会轮询图像,直到您订阅返回观测。这种方法的优点是它可以取消订阅(最后一个要点是要求这样做),并且干净地将轮询图像的代码和更新局部变量的代码分开。

因此,调用PollImage则是这样的:

PollImage(imageUri, TimeSpan.FromMilliseconds(200.0)) 
    .Subscribe(image => 
    { 
     /* do save/update images here */ 
    }); 

和实现是这样的:

private IObservable<Image> PollImage(string imageUri, TimeSpan gapInterval) 
{ 
    Func<Stream, Image> getImageFromStream = st => 
    { 
     /* read image from stream here */ 
    }; 

    return Observable.Create<Image>(o => 
    { 
     if (_channel == null) 
     { 
      _channel = _channelFactory.CreateChannel(); 
     } 

     var getImageFunc = 
      Observable 
       .FromAsyncPattern<string, Stream>(
        _channel.BeginGetImage, 
        _channel.EndGetImage); 

     var query = 
      from ts in Observable.Timer(gapInterval) 
      from stream in getImageFunc(imageUri) 
      from img in Observable.Using(
       () => stream, 
       st => Observable.Start(
        () => getImageFromStream(st))) 
      select img; 

     return query.Do(img => { }, ex => 
     { 
      Trace.TraceError(ex.ToString()); 
      ((ICommunicationObject)_channel).CloseOrAbort(); 
      _channel = null; 
     }).Repeat().Retry().Subscribe(o);     
    }); 
} 

query可观察等待,直到gapInterval完成,然后调用WCF函数返回流,然后将流转换为图像。

内部return声明做了很多事情。

首先它使用Do运算符来捕获发生的任何异常,并像以前一样进行跟踪和通道重置。

接下来它调用.Repeat()使查询有效地重新运行,使其在等待gapInterval再次调用webservice之前。我本来可以使用Observable.Interval而不是Observable.Timer而不是query,并将呼叫放到.Repeat(),但这意味着每次调用web服务时都要启动gapInterval,而不是在上次完成后等待很长时间。

接下来它调用.Retry()它有效地重新启动observable,如果它遇到异常,以便用户永远不会看到异常。 Do运算符捕获错误,所以这是正确的。

最后,它订阅观察者并返回IDisposable,允许调用代码取消订阅。

除了实现getImageFromStream函数外,就是这样。

现在谨慎一句。很多人误解如何订阅可观察的作品,这会导致很难发现错误。

拿这个作为一个例子:

var xs = Observable.Interval(TimeSpan.FromSeconds(1.0)); 

var s1 = xs.Subscribe(x => { }); 
var s2 = xs.Subscribe(x => { }); 

两个s1 & s2订阅xs,但不是分享他们各自创建一个定时器一个计时器。您创建了两个内部工作实例Observable.Interval,而不是一个实例。

现在,这是观察者的正确行为。如果其中一个失败,那么另一个不会因为它们不共享任何内部组件而彼此隔离。

但是,在您的代码(和我的事)中,您有一个潜在的线程问题,因为您通过多次调用PollImage共享_channel。如果一个呼叫失败,它将重置信道,这会导致并发呼叫,从而导致失败。

我的建议是,您为每次调用创建一个新通道以防止并发问题。

我希望这会有所帮助。

+0

很好的答案和谢谢。我喜欢你使用查询表达式。我现在也摆脱了共享频道。我会尽快发布我的最新尝试。 – Schneider

0

这就是我想出来的(有一些帮助!)...仍然不“完美”,但似乎工作。

因为@Enigma说我现在已经摆脱了共享_channel并用一个捕获的本地var替换它。它的工作原理,但我不明白Rx enuf知道这是否很差/错误的方法。我怀疑至少有一个更清洁的方式。

除此之外,我的主要反对意见是Do(),我称之为EnsureChannel ...似乎有点臭。但是...它的工作原理...

哦,我必须在SelectMany中有_(下划线),否则不会再次调用GetImage。

private IDisposable PollImage(string imageUri) 
    { 
     ICameraServiceAsync channel = _channelFactory.CreateChannel(); 
     return Observable 
      .Timer(TimeSpan.FromSeconds(0.2)) 
      .Do(_ => { channel = EnsureChannel(channel); }) 
      .SelectMany(_ => 
       Observable 
       .FromAsyncPattern<string, Stream>(channel.BeginGetImage, channel.EndGetImage)(imageUri)) 
      .Retry() 
      .Repeat() 
      .Subscribe(stream => UpdateImageStream(imageUri, stream)); 
    } 

    private ICameraServiceAsync EnsureChannel(ICameraServiceAsync channel) 
    { 
     var icc = channel as ICommunicationObject; 
     if (icc != null) 
     { 
      var communicationState = icc.State; // Copy local for debug inspection 
      if (communicationState == CommunicationState.Faulted) 
      { 
       icc.CloseOrAbort(); 
       channel = null; 
      } 
     } 
     return channel ?? _channelFactory.CreateChannel(); 
    } 
+2

你所做的相当不错 - 确保频道良好的“做”。但是,你并没有处理你的流。你可以在我的解决方案中看到我使用了'Observable.Using'方法调用。你现在也没有处理异常。而现在唯一有气味的是你正在创建observable并在一个函数中订阅它 - 这样做不是可组合的 - 你不能截取图像流,并且你创建的代码很困难由于强耦合而进行测试。 – Enigmativity

+0

好的意见。我现在把它制成了可组合的。我在别处处理流(另一个线程),所以我不能在这里处理它。你如何看待本地频道变量?有没有更好的Rx方式来做到这一点?我尝试开始()然后传递它在管道中,但有一些问题。 – Schneider

+1

确保您使用'Observable.Create'而不是直接进入'Observable.Timer',现在您已经可以组装了。至于'EnsureChannel'方法,怎么样才能做到这一点:'ICameraServiceAsync CreateChannel()'?我不喜欢你在两个地方创建频道,当它可以在一个地方。 – Enigmativity

-1

如果你真的想用那么人们已经回答了你的问题,如果你正在寻找其他方法,那么我会建议看看TPL(任务等对象),可让您从创建任务目标一个Async方法模式(您的Web服务调用),然后使用取消令牌启动任务,以便在一段时间后,如果任务未完成,您可以通过调用令牌取消方法来取消它。

+0

我通过了TPL,因为它似乎没有Rx – Schneider

+0

这样的高级组合能力似乎人们在没有发表评论的情况下投票(不想分享他们的知识):) – Ankur