2010-09-27 39 views
4

我已经使用Rx为我的WCF Web服务编写了简化的Silverlight客户端库,但是我注意到有时候我缺少完成的事件。使用Rx简化异步Silverlight Web服务请求

public IObservable<XElement> GetReport(string reportName) 
{ 
    return from client in Observable.Return(new WebServiceClient()) 
      from request in Observable.ToAsync<string>(client.GetReportDataAsync)(reportName) 
      from result in Observable.FromEvent<GetReportDataCompletedEventArgs>(client, "GetReportDataCompleted").Take(1) 
      from close in this.CloseClient(client) 
      select result.EventArgs.Result; 
} 

我相信这个问题是由以下事实导致的调用Web服务和订阅完成事件之前返回。我无法弄清楚如何在异步调用之前让Rx订阅事件。我试过StartWith,但那需要输入和输出类型相同,有什么想法?

+0

看起来不是很简单给我吗? – AnthonyWJones 2010-09-27 12:24:08

+0

相信我,当你使用它。调用它只是service.GetReport(“MyReport1”)。订阅(this.LoadResults); – 2010-09-27 23:55:26

回答

7

好像最好答案是使用Observable.CreateWithDisposable()

例如

public IObservable<XElement> GetReport(string reportName) 
{ 
    return from client in Observable.Return(new WebServiceClient()) 
      from completed in Observable.CreateWithDisposable<GetReportDataCompletedEventArgs>(observer => 
       { 
        var subscription = Observable.FromEvent<GetReportDataCompletedEventArgs>(client, "GetReportDataCompleted") 
         .Take(1) 
         .Select(e => e.EventArgs) 
         .Subscribe(observer); 
        client.GetReportDataAsync(reportName); 
        return subscription; 
       }) 
      from close in this.CloseClient(client) 
      select completed.Result; 
} 

为了更方便与我重构了CreateWithDisposable成可以用我所有的Web服务调用,包括事件参数类型自动确定事件的名称使用的公共功能的工作:

private IObservable<T> CallService<T>(ICommunicationObject serviceClient, Action start) where T : AsyncCompletedEventArgs 
{ 
    if (typeof(T) == typeof(AsyncCompletedEventArgs)) 
    { 
     throw new InvalidOperationException("Event arguments type cannot be used to determine event name, use event name overload instead."); 
    } 

    string completedEventName = typeof(T).Name.TrimEnd("EventArgs"); 
    return CallService<T>(serviceClient, start, completedEventName); 
} 

private IObservable<T> CallService<T>(ICommunicationObject serviceClient, Action start, string completedEventName) where T : AsyncCompletedEventArgs 
{ 
    return Observable.CreateWithDisposable<T>(observer => 
    { 
     var subscription = Observable.FromEvent<T>(serviceClient, completedEventName).Take(1).Select(e => e.EventArgs).Subscribe(observer); 
     start(); 
     return subscription; 
    }); 
} 

// Example usage: 
public IObservable<XElement> GetReport(string reportName) 
{ 
    return from client in Observable.Return(new WebServiceClient()) 
      from completed in this.CallService<GetReportDataCompletedEventArgs>(client,() => client.GetReportDataAsync(reportName)) 
      from close in this.CloseClient(client) 
      select completed.Result; 
} 

/// <summary> 
/// Asynchronously closes the web service client 
/// </summary> 
/// <param name="client">The web service client to be closed.</param> 
/// <returns>Returns a cold observable sequence of a single success Unit.</returns> 
private IObservable<AsyncCompletedEventArgs> CloseClient(WebServiceClient client) 
{ 
    return this.CallService<AsyncCompletedEventArgs>(client, client.CloseAsync, "CloseCompleted"); 
} 

希望这可以帮助别人!

+0

每次创建和关闭客户端都有什么好处吗? 另外,你有一个CloseClient方法的例子吗? – Jordan 2010-11-12 20:25:40

+0

我每次关闭并重新打开客户端以确保不存在线程问题。底层的频道无论如何都是由WCF缓存的,所以几乎没有这样做的成本。对不起,我忘了包含CloseClient,现在我将添加它。 – 2010-11-15 01:21:17

1

我需要使用通用WebClient.DownloadStringAsync所以在这里我的版本。

首先,包裹事件:

public static IObservable<IEvent<DownloadStringCompletedEventArgs>> 
    GetDownloadStringObservableEvent(this WebClient wc) 
{ 
    return Observable.FromEvent<DownloadStringCompletedEventArgs>(
     wc, "DownloadStringCompleted"); 
} 

然后创建扩展方法:

public static IObservable<string> GetDownloadString(this WebClient wc, Uri uri) 
{ 
    return Observable.CreateWithDisposable<string>(
     observer => { 
      // Several downloads may be going on simultaneously. The token allows 
      // us to establish that we're retrieving the right one. 
      Guid token = Guid.NewGuid(); 
      var stringDownloaded = wc.GetDownloadStringObservableEvent() 
        .Where(evt => ((Guid)evt.EventArgs.UserState) == token) 
        .Take(1);  //implicitly unhooks handler after event is received 
      bool errorOccurred = false; 
      IDisposable unsubscribe = 
       stringDownloaded.Subscribe(
        // OnNext action 
        ev => { 
         // Propagate the exception if one is reported. 
         if (ev.EventArgs.Error != null) { 
          errorOccurred = true; 
          observer.OnError(ev.EventArgs.Error); 
         } else if (!ev.EventArgs.Cancelled) { 
          observer.OnNext(ev.EventArgs.Result); 
         } 
        }, 
        // OnError action (propagate exception) 
        ex => observer.OnError(ex), 
        // OnCompleted action 
        () => { 
         if (!errorOccurred) { 
          observer.OnCompleted(); 
         } 
        }); 
      try { 
       wc.DownloadStringAsync(uri, token); 
      } catch (Exception ex) { 
       observer.OnError(ex); 
      } 
      return unsubscribe; 
     } 
    ); 
} 

用法很简单:

wc.GetDownloadString(new Uri("http://myservice")) 
    .Subscribe(resultCallback , errorCallback);