2012-10-05 97 views
0

我正在使用Reactive Extensions创建匿名Observable来执行Web请求,后处理它并返回一些值。这种技术帮助我将价值获取的机制与使用这些价值的其他类别分开。 Web请求经常执行,所以有一个计时器使用这种技术构造和调用请求。我建立Observable如下:Rx订阅()回调问题

Observable.FromAsyncPattern<WebResponse>(getRequest.BeginGetResponse, getRequest.EndGetResponse) 

实际上,端点并不总是可用的。对于在Subscribe()中指定的Web请求超时回调,永远不会调用方法。我正在使用Timeout() Rx扩展来处理这种情况。此外,我正在做请求结果的后处理。下面是一段我的仿真代码,我实现来解决这个问题:

var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint"); 
var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)(); 
var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(...) 

然后我实现处置Subscribe()方法的返回值退订从可观察的回调委托。其实我有一个三元组的集合<IObservable obs, IDisposable disp, bool RequestComplete>Subscribe()设置方法回调RequestComplete = true并且随着时间的推移,如果不再需要所有一次性物品,即所有一次性物品都已被调用,即回调已被调用。一切似乎都很好,但我注意到这个系列不断增长,我不知道为什么。

所以我实现了这种情况的模拟。此代码没有布置部件 - 我用简单的请求计数器取代了它:

public static class RequestCounter 
{ 
    private static object syncRoot = new object(); 

    private static int count = 0; 

    public static void Increment() 
    { 
     lock (syncRoot) 
     { 
      count++; 
     } 
    } 

    public static void Decrement() 
    { 
     lock (syncRoot) 
     { 
      count--; 
     } 
    } 

    public static int RequestCount 
    { 
     get { return count; } 
    } 
} 

而这里的测试本身:

public void RunTest() 
    { 
     while (true) 
     { 
      var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint"); 
      var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)(); 
      RequestCounter.Increment(); 
      var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(
       res => 
       { 
        RequestCounter.Decrement(); 
        Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount); 
       }, 
       ex => 
       { 
        RequestCounter.Decrement(); 
        Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount); 
       }, 
       () => 
       { 
       }); 
     } 
    } 

Web请求计数器被递增之前。请求超时后,它会减少。但请求柜台增长无限。你能解释我错过了什么吗?

+0

它证明了线程池没有自由线程来处理递减 – EvAlex

回答

1

对可观察序列的订阅是异步的。 while while循环会尽可能快地继续。增量将超过延迟的减量。

如果您使用的Rx 2.0与.NET 4.5,你可以使用伺机来运行你的循环,然后继续下面的迭代时,先前的请求完成:

while (true) 
{ 
    var obs = ...; 

    RequestCounter.Increment(); 

    await obs.Select(...).Timeout(...); 

    RequestCounter.Decrement(); 

    Console.WriteLine(...); 
} 

更大的问题是,为什么如果请求已完成,您是否尝试处理订阅?当序列达到终端状态(OnError,OnCompleted)时,Rx会自行清理,因此您可以简单地将IDisposable对象放在地板上,而不用担心这些。

+0

哦,我明白了,谢谢!它会自动取消订阅这些代表吗? – EvAlex

+0

是的,除非您想提前取消计算(即在序列到达终端状态之前),否则无需担心消费者。 –