我正在从Rx向网络写入数据。当订阅结束时,我自然使用Finally
来关闭我的流。这在OnError()
和OnComplete()
上都干净地工作。 Rx将依次运行OnNext()
... OnNext()
,OnComplete()
,Finally()
。当订阅在无效扩展中结束时关闭非托管资源
但是,有时我想尽早终止序列,这样做我使用Dispose()
。不知怎的,Finally()
现在与最后的OnNext()
调用并行运行,导致在仍写入OnNext()
中的流时出现异常,以及写入不完整。
我的订阅看起来大致是这样的:
NetworkStream stm = client.GetStream();
IDisposable disp = obs
.Finally(() => {
client.Close();
})
.Subscribe(d => {
client.GetStream().Write(d.a, 0, d.a.Lenght);
client.GetStream().Write(d.b, 0, d.b.Lenght);
}() => {
client.GetStream().Write(something(), 0, 1);
});
Thread.sleep(1000);
disp.Dispose();
我也试过替代,CancellationToken
。
如何正确取消我的订阅?我不介意它是否跳过OnComplete()
,只要Finally()
仍在运行。但是,并行运行Finally()
存在问题。
我也有这样的感觉,应该有更好的方法来管理资源,将声明移到序列中,这将是更好的解决方案。
编辑:下面的代码更清楚地显示问题。我希望它总能打印真实,相反,它会提供更多的错误信息,指示Dispose在最后的OnNext
之前结束。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Sockets;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Try finally");
for (int i = 0; i < 10; i++)
{
Finally();
}
Console.WriteLine("Try using");
for (int i = 0; i < 10; i++)
{
Using();
}
Console.WriteLine("Try using2");
for (int i = 0; i < 10; i++)
{
Using2();
}
Console.ReadKey();
}
private static void Using2()
{
bool b = true, c = true, d;
var dis = Disposable.Create(() => c = b);
IDisposable obDis = Observable.Using(
() => dis,
_ => Observable.Create<Unit>(obs=>
Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
i => TimeSpan.FromMilliseconds(1)
).Subscribe(__ => { b = false; Thread.Sleep(100); b = true; })))
.Subscribe();
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
private static void Using()
{
bool b = true, c = true, d;
var dis = Disposable.Create(() => c = b);
IDisposable obDis = Observable.Using(
() => dis,
_ => Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
i => TimeSpan.FromMilliseconds(1)
)).Subscribe(_ => { b = false; Thread.Sleep(100); b = true; });
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
private static void Finally()
{
bool b = true, c = true, d;
IDisposable obDis = Observable.Generate(0,
i => i < 1000,
i => i + 1,
i => i,
_ => DateTime.Now.AddMilliseconds(1)
)
.Finally(() => c = b)
.Subscribe(_ => { b = false; Thread.Sleep(100); b = true; });
Thread.Sleep(15);
obDis.Dispose();
d = b;
Thread.Sleep(101);
Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d);
}
}
}
其实'Finally'运行时会出现错误,我认为这个错误自2012年以来一直保持不变。无论如何,重置您的答案看起来很有希望,我有'Observable.Using'应该适合,但我无法确定现在还没有。 'client'确实是一个'TcpClient',所以我要放弃它。 – Dorus
是'使用'是要走的路。 – Brandon
顺便说一句,你的最后一个例子不会编译。第二行有一个括号错误,并且我无法修复'Observable.Create'错误。 – Dorus