2015-04-24 35 views
1

我正在从Rx向网络写入数据。当订阅结束时,我自然使用Finally来关闭我的流。这在OnError()OnComplete()上都干净地工作。 Rx将依次运行OnNext() ... OnNext(),OnComplete(),Finally()当订阅在无效扩展中结束时关闭非托管资源

但是,有时我想尽早终止序列,这样做我使用Dispose()。不知怎的,Finally()现在与最后的OnNext()调用并行运行,导致在仍写入OnNext()中的流时出现异常,以及写入不完整。

我的订阅看起来大致是这样的:

NetworkStream stm = client.GetStream(); 
IDisposable disp = obs 
    .Finally(() => { 
     client.Close(); 
    }) 
    .Subscribe(d => { 
     client.GetStream().Write(d.a, 0, d.a.Lenght); 
     client.GetStream().Write(d.b, 0, d.b.Lenght); 
    }() => { 
     client.GetStream().Write(something(), 0, 1); 
    }); 
Thread.sleep(1000); 
disp.Dispose(); 

我也试过替代,CancellationToken

如何正确取消我的订阅?我不介意它是否跳过OnComplete(),只要Finally()仍在运行。但是,并行运行Finally()存在问题。

我也有这样的感觉,应该有更好的方法来管理资源,将声明移到序列中,这将是更好的解决方案。

编辑:下面的代码更清楚地显示问题。我希望它总能打印真实,相反,它会提供更多的错误信息,指示Dispose在最后的OnNext之前结束。

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Net.Sockets; 
using System.Reactive; 
using System.Reactive.Disposables; 
using System.Reactive.Linq; 
using System.Text; 
using System.Threading; 
using System.Threading.Tasks; 

namespace ConsoleApplication1 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      Console.WriteLine("Try finally"); 
      for (int i = 0; i < 10; i++) 
      { 
       Finally(); 
      } 
      Console.WriteLine("Try using"); 
      for (int i = 0; i < 10; i++) 
      { 
       Using(); 
      } 
      Console.WriteLine("Try using2"); 
      for (int i = 0; i < 10; i++) 
      { 
       Using2(); 
      } 
      Console.ReadKey(); 
     } 

     private static void Using2() 
     { 
      bool b = true, c = true, d; 
      var dis = Disposable.Create(() => c = b); 
      IDisposable obDis = Observable.Using(
       () => dis, 
       _ => Observable.Create<Unit>(obs=> 
        Observable.Generate(0, 
        i => i < 1000, 
        i => i + 1, 
        i => i, 
        i => TimeSpan.FromMilliseconds(1) 
       ).Subscribe(__ => { b = false; Thread.Sleep(100); b = true; }))) 
       .Subscribe(); 
      Thread.Sleep(15); 
      obDis.Dispose(); 
      d = b; 
      Thread.Sleep(101); 
      Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d); 
     } 

     private static void Using() 
     { 
      bool b = true, c = true, d; 
      var dis = Disposable.Create(() => c = b); 
      IDisposable obDis = Observable.Using(
       () => dis, 
       _ => Observable.Generate(0, 
        i => i < 1000, 
        i => i + 1, 
        i => i, 
        i => TimeSpan.FromMilliseconds(1) 
       )).Subscribe(_ => { b = false; Thread.Sleep(100); b = true; }); 
      Thread.Sleep(15); 
      obDis.Dispose(); 
      d = b; 
      Thread.Sleep(101); 
      Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d); 
     } 

     private static void Finally() 
     { 
      bool b = true, c = true, d; 
      IDisposable obDis = Observable.Generate(0, 
       i => i < 1000, 
       i => i + 1, 
       i => i, 
       _ => DateTime.Now.AddMilliseconds(1) 
       ) 
       .Finally(() => c = b) 
       .Subscribe(_ => { b = false; Thread.Sleep(100); b = true; }); 
      Thread.Sleep(15); 
      obDis.Dispose(); 
      d = b; 
      Thread.Sleep(101); 
      Console.WriteLine("OnDispose: {1,5} After: {2,5} Sleep: {0,5}", b, c, d); 
     } 
    } 
} 

回答

5

Finally很可能不是你想要的。当您取消订阅时,它不会处理您的资源。相反,它的行为将类似于C#中的正常finally块,也就是说,它将保证执行某些代码,而不管其对应的try -block中的代码是否引发异常。此外,由于this question on MSDN,您的Finally中的代码甚至可能不会在任何情况下执行,因为您的预订未指定错误处理程序。

你可能想要的是Using

IDisposable disp = Observable 
    .Using(
     () => Disposable.Create(() => client.Close), 
     _ => obs) 
    .Subscribe(....); 

Using照顾,每当观察到终止或已取消订阅的资源妥善处理。

假设client是TcpClient的,它变得更简单:

IDisposable disp = Observable 
    .Using(
     () => client), 
     _ => obs) 
    .Subscribe(....); 

我期望这OnNext通话不会关闭客户端,甚至早unsubsribing时重叠,但我没有测试过这一点。

最后一件事:在您的示例中,要小心关闭外部变量,如stm。始终与当地人合作更安全。完全重写,因为我想尝试它是这样的:

IDisposable disp = Observable.Using(
    () => client, 
    _ => Observable.Using(
     () => client.GetStream(), 
     stream => Observable.Create<Unit>(observer => obs 
      .Subscribe(
       d => { 
        stream.Write(d.a, 0, d.a.Lenght); 
        stream.Write(d.b, 0, d.b.Lenght); 
       }, 
       () => { 
        stream.Write(something(), 0, 1); 
       })))) 
    .Subscribe(); 
+0

其实'Finally'运行时会出现错误,我认为这个错误自2012年以来一直保持不变。无论如何,重置您的答案看起来很有希望,我有'Observable.Using'应该适合,但我无法确定现在还没有。 'client'确实是一个'TcpClient',所以我要放弃它。 – Dorus

+0

是'使用'是要走的路。 – Brandon

+0

顺便说一句,你的最后一个例子不会编译。第二行有一个括号错误,并且我无法修复'Observable.Create'错误。 – Dorus

0

我认为你已经只是对如何NetworkStream操作的不正确的假设。

NetworkStream.WriteTcpClient.Close不一定等待客户端读取数据。 (此外,NetworkStream.Flush不做任何事情)。

当您拨打Close时,您可能在客户端读取所有内容之前关闭套接字。

看一看此相关的问题:NetworkStream doesn't always send data unless I Thread.Sleep() before closing the NetworkStream. What am I doing wrong?

该网页不同提到的东西喜欢使用的Close接受超时超载,或指定一个LingerOption - 但最好是要么发送Shutdown或具有更高在客户端通过自己的回复来确认您的消息时进行高级消息传递抽象。

相关问题