2012-05-23 55 views
0

只是试图让周围的Rx轮询使用的Rx

我的头

我使用的Rx轮询网站每2秒

var results = new List<MyDTO>(); 
var cx = new WebserviceAPI(...); 
var callback = cx.GetDataAsync().Subscribe(rs => { results.AddRange(rs); }); 
var poller = Observable.Interval(TimeSpan.FromSeconds(2)).Subscribe(_ => { cx.StartGetDataAsync(); }); 

(web服务API暴露了getItemsAsync/getItemsCompleted事件处理程序类型网站从中创建可观察的机制)。

当网站返回,我拆包的响应“的商业部分”到一个IEnumerable的DTO的

public IObservable<IEnumerable<MyDTO>> GetDataAsync() 
{ 
    var o = Observable.FromEventPattern<getItemsCompletedEventHandler,getItemsCompletedEventArgs>(
     h => _webService.getItemsCompleted += h, 
     h => _webService.getItemsCompleted -= h); 

    return o.Select(c=> from itm in c.EventArgs.Result.ItemList 
         select new MyDTO() 
         { 
          ... 
         }); 
} 

我的理由在于考虑到所有的数据都只是在那里的字符串,它有意义的只是把它包装到一个IEnumerable中......但现在我不确定这是否正确!

如果网站的响应时间超过2秒我发现MSTest崩溃了。在调试时,所产生的误差是

“有异步处理期间发生错误。独特的状态 对象所需的多个异步同时操作 是优秀”

与内部异常

“Item has been added。键入词典:'System.Object'键 正在被添加:'System.Object'”

我认为问题是重新进入的问题之一,因为在前一个调用完成数据填充之前,下一个调用正在启动并返回数据。

所以我不知道我是否

  1. 已经把这件事情一起完全正确
  2. 我应该节流以某种方式连接,以避免重入。
  3. 我应该使用不同的中间数据结构(或机构) 代替的IEnumerable

我将不胜感激一些指导。

编辑1: 所以我已经改变了网络电话,包括一个独特的状态对象

public void StartGetDataAsync() 
{ 
    ... 
    // was: _webService.getItemsAsync(request); 
    _webService.getItemsAsync(request, Guid.NewGuid()); 
} 

,并使其正常工作。但我仍然不确定这是否是正确的方法

编辑2 - Web服务sigs: 我正在使用webServiceApi类包装的soap web服务。创建references.cs包含以下方法

public void getItemsAsync(GetItemsReq request, object userState) 
{ 
    if ((this.getItemsOperationCompleted == null)) 
    { 
     this.getItemsOperationCompleted = new System.Threading.SendOrPostCallback(this.OngetItemsOperationCompleted); 
    } 
    this.InvokeAsync("getItems", new object[] { 
        request}, this.getItemsOperationCompleted, userState); 
} 

private System.Threading.SendOrPostCallback getItemsOperationCompleted; 

public event getItemsCompletedEventHandler getItemsCompleted; 

public delegate void getItemsCompletedEventHandler(object sender, getItemsCompletedEventArgs e); 

public partial class getItemsCompletedEventArgs : System.ComponentModel.AsyncCompletedEventArgs 
{ 
    ... 
} 

private void OngetItemsOperationCompleted(object arg) 
{ 
    if ((this.getItemsCompleted != null)) 
    { 
     System.Web.Services.Protocols.InvokeCompletedEventArgs invokeArgs = ((System.Web.Services.Protocols.InvokeCompletedEventArgs)(arg)); 
     this.getItemsCompleted(this, new getItemsCompletedEventArgs(invokeArgs.Results, invokeArgs.Error, invokeArgs.Cancelled, invokeArgs.UserState)); 
    } 
} 

可能给你太多(或错过了什么)!

Thx

+0

您可以发布Web服务的方法/事件签名吗? – Enigmativity

回答

1

我想我对你来说有一个体面的起点。

基本上我认为你需要抽象出Web服务的复杂性并创建一个很好的清理函数来获得你的结果。

尝试这样:

Func<GetItemsReq, IObservable<getItemsCompletedEventArgs>> fetch = 
    rq => 
     Observable.Create<getItemsCompletedEventArgs>(o => 
     { 
      var cx = new WebserviceAPI(/* ... */); 
      var state = new object(); 
      var res = 
       Observable 
        .FromEventPattern< 
         getItemsCompletedEventHandler, 
         getItemsCompletedEventArgs>(
         h => cx.getItemsCompleted += h, 
         h => cx.getItemsCompleted -= h) 
        .Where(x => x.EventArgs.UserState == state) 
        .Take(1) 
        .Select(x => x.EventArgs); 
      var subscription = res.Subscribe(o); 
      cx.getItemsAsync(rq, state); 
      return subscription; 
     }); 

个人而言,我会走一步,并定义返回类型,说GetItemsReq,不包括用户状态对象,但基本上是一样的getItemsCompletedEventArgs

然后,您应该能够使用Observable.Interval创建您需要的轮询。

如果您的Web服务实现了IDisposable,那么您应该将Observable.Using调用添加到上述函数中,以便在完成时正确处理Web服务。

让我知道这是否有帮助。

+0

刚刚看到你的回复。恐怕我需要一点时间来消化它,才能让我的头脑完整!尽管如此,许多thx。 S –

+0

好的。认为我已经...很好。 Thx再次。 S –

+0

仅供参考...其他补充答案在这里... http://social.msdn.microsoft.com/Forums/en-US/rx/thread/52e3744e-cb18-4745-951f-51596a0f3b2d –

1

Enigmativity有一个很好的解决方案。

  var state = new object(); 

      cx.getItemsAsync(rq, state); 

这些陈述是关键,解决错误。

每次调用Async方法时,都需要传递一个唯一的对象(除非当然只有一个Async方法正在运行,这极不可能)。我在这个问题上摸索了好几个小时,直到我发现可以传递一个对象参数。如果没有参数,它会看到多个方法作为相同的调用,并给你“Item已添加。键入字典:'System.Object'添加的键:'System.Object'”消息。