2010-12-22 49 views
2

如果您在使用RX如呼叫的长链:无扩展清理

var responses = collectionOfHttpRequests.ToObservable() 
.FromAsyncPattern(req.BeginGetResponse, req.EndGetResponse) 
.Select(res => res.GetResponseBodyString()) // Extension method to get the body of the request 
.Subscribe(); 

,然后在操作完成,你叫处置之前,将HTTP请求被取消,关闭和处置正确或我必须以某种方式从方法链中选择httprequests并分别处理它们?

我有一件事情,一个人可以同时发生多个http请求,我需要能够取消(而不是忽略)某些/所有这些以节省网络流量。

+1

你的代码似乎不正确:在`IObservable <>`上没有可调用的FromAsyncPattern扩展方法。该方法只是静态的。不扩展。如果你花时间想出真正的代码,问题和解决方案将变得明显。 – 2010-12-24 04:47:20

回答

2

当序列完成,错误或订阅处置时,Rx运算符链将自行清理。但是,每个操作员只会清理他们预计要清理的内容。例如,FromEvent将取消订阅该事件。

对于您的情况,Begin/End asynchronous pattern不支持取消,因此Rx没有任何取消操作。但是,您可以使用Finally致电HttpWebRequest.Abort

var observableRequests = collectionOfHttpRequests.ToObservable(); 

var responses = observableRequests 
    .SelectMany(req => 
     Observable.FromAsyncPattern(req.BeginGetResponse, req.EndGetResponse)() 
    ) 
    .Select(resp => resp.GetResponseBodyString()) 
    .Finally(() => 
    { 
     observableRequests 
      .Subscribe(req => req.Abort()); 
    }) 
    .Subscribe(); 
2

我不能承认Richard Szalay的解决方案是可以接受的。如果您启动100个请求,并且第二个请求由于服务器不可用错误而失败(例如),则剩余的98个请求将中止。第二个问题是UI如何对这种可观察性做出反应?太伤心。

请牢记章节4.3的Rx Design Guidelines我希望通过Observable.Using()运算符表示WebRequest observable。但WebRequest不是一次性的!所以我定义DisposableWebRequest:

public class DisposableWebRequest : WebRequest, IDisposable 
{ 
    private static int _Counter = 0; 

    private readonly WebRequest _request; 
    private readonly int _index; 

    private volatile bool _disposed = false; 

    public DisposableWebRequest (string url) 
    { 
     this._request = WebRequest.Create(url); 
     this._index = ++DisposableWebRequest._Counter; 
    } 

    public override IAsyncResult BeginGetResponse(AsyncCallback callback, object state) 
    { 
     return this._request.BeginGetResponse(callback, state); 
    } 

    public override WebResponse EndGetResponse(IAsyncResult asyncResult) 
    { 
     Trace.WriteLine(string.Format("EndGetResponse index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId)); 
     Trace.Flush(); 
     if (!this._disposed) 
     { 
      return this._request.EndGetResponse(asyncResult); 
     } 
     else 
     { 
      return null; 
     } 
    } 

    public override WebResponse GetResponse() 
    { 
     return this._request.GetResponse(); 
    } 

    public override void Abort() 
    { 
     this._request.Abort(); 
    } 

    public void Dispose() 
    { 
     if(!this._disposed) 
     { 
      this._disposed = true; 

      Trace.WriteLine(string.Format("Disposed index {0} in thread {1}", this._index, Thread.CurrentThread.ManagedThreadId)); 
      Trace.Flush(); 
      this.Abort(); 
     } 
    } 
} 

然后,我创建WPF窗口,并把两个按钮就可以了(启动和停止)。

因此,让我们创建合适的请求可观察集合。 首先,定义URL的可观察到的创建功能:

 Func<IObservable<string>> createUrlObservable =() => 
      Observable 
       .Return("http://yahoo.com") 
       .Repeat(100) 
       .OnStartup(() => 
       { 
        this._failed = 0; 
        this._successed = 0; 
       }); 

在每一个URL,我们应该创建WebRequest的obervable,所以:

 Func<string, IObservable<WebResponse>> createRequestObservable = 
      url => 
      Observable.Using(
       () => new DisposableWebRequest("http://yahoo.com"), 
       r => 
       { 
        Trace.WriteLine("Queued " + url); 
        Trace.Flush(); 
        return Observable 
         .FromAsyncPattern<WebResponse>(r.BeginGetResponse, r.EndGetResponse)(); 
       }); 

另外定义两个事件可观察其反应上的按钮“开始” /”停止”点击:

 var startMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StartButton, "Click"); 
     var stopMouseDown = Observable.FromEvent<RoutedEventArgs>(this.StopButton, "Click"); 

所以砖是准备好了,时间到撰写他们(我这样做是鉴于构造刚过的InitializeComponent()):

 startMouseDown 
      .SelectMany(down => 
       createUrlObservable() 
        .SelectMany(url => createRequestObservable(url) 
         .TakeUntil(stopMouseDown) 
         .Select(r => r.GetResponseStream()) 
         .Do(s => 
          { 
           using (var sr = new StreamReader(s)) 
           { 
            Trace.WriteLine(sr.ReadLine()); 
            Trace.Flush(); 
           } 

          }) 
         .Do(r => this._successed++) 
         .HandleError(e => this._failed++)) 
         .ObserveOnDispatcher() 
         .Do(_ => this.RefresLabels(), 
          e => { }, 
          () => this.RefresLabels()) 

         ) 
      .Subscribe(); 

您可能想知道函数“HandleError()”。如果在EndGetResponse()调用中发生异常(我关闭了网络连接来重现它),并且在请求observable中没有捕获 - 它会使startMouseDown observable崩溃。的HandleError捕获异常默默地,perfom提供行动和替代呼叫的OnError明年观测它调用OnCompleted:

public static class ObservableExtensions 
{ 
    public static IObservable<TSource> HandleError<TSource>(this IObservable<TSource> source, Action<Exception> errorHandler) 
    { 
     return Observable.CreateWithDisposable<TSource>(observer => 
      { 
       return source.Subscribe(observer.OnNext, 
        e => 
        { 
         errorHandler(e); 
         //observer.OnError(e); 
         observer.OnCompleted(); 
        }, 
        observer.OnCompleted); 
      }); 
    } 
} 

最后莫名其妙的地方是方法RefreshLabels,这将更新UI控件:

private void RefresLabels() 
    { 
     this.SuccessedLabel.Content = string.Format("Successed {0}", this._successed); 
     this.FailedLabel.Content = string.Format("Failed {0}", this._failed); 
    }