我正在使用Reactive Extensions创建匿名Observable来执行Web请求,后处理它并返回一些值。这种技术帮助我将价值获取的机制与使用这些价值的其他类别分开。 Web请求经常执行,所以有一个计时器使用这种技术构造和调用请求。我建立Observable如下:Rx订阅()回调问题
Observable.FromAsyncPattern<WebResponse>(getRequest.BeginGetResponse, getRequest.EndGetResponse)
实际上,端点并不总是可用的。对于在Subscribe()
中指定的Web请求超时回调,永远不会调用方法。我正在使用Timeout()
Rx扩展来处理这种情况。此外,我正在做请求结果的后处理。下面是一段我的仿真代码,我实现来解决这个问题:
var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint");
var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)();
var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(...)
然后我实现处置Subscribe()
方法的返回值退订从可观察的回调委托。其实我有一个三元组的集合<IObservable obs, IDisposable disp, bool RequestComplete>
。 Subscribe()
设置方法回调RequestComplete = true
并且随着时间的推移,如果不再需要所有一次性物品,即所有一次性物品都已被调用,即回调已被调用。一切似乎都很好,但我注意到这个系列不断增长,我不知道为什么。
所以我实现了这种情况的模拟。此代码没有布置部件 - 我用简单的请求计数器取代了它:
public static class RequestCounter
{
private static object syncRoot = new object();
private static int count = 0;
public static void Increment()
{
lock (syncRoot)
{
count++;
}
}
public static void Decrement()
{
lock (syncRoot)
{
count--;
}
}
public static int RequestCount
{
get { return count; }
}
}
而这里的测试本身:
public void RunTest()
{
while (true)
{
var request = HttpWebRequest.Create("http://10.5.5.137/unreachable_endpoint");
var obs = Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)();
RequestCounter.Increment();
var disposable = obs.Select(res => res.ContentLength).Timeout(TimeSpan.FromSeconds(2)).Subscribe(
res =>
{
RequestCounter.Decrement();
Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount);
},
ex =>
{
RequestCounter.Decrement();
Console.WriteLine("Pending requests count: {0}", RequestCounter.RequestCount);
},
() =>
{
});
}
}
Web请求计数器被递增之前。请求超时后,它会减少。但请求柜台增长无限。你能解释我错过了什么吗?
它证明了线程池没有自由线程来处理递减 – EvAlex