2013-01-22 16 views
5

我在这里挣扎。通常我会读一本书,但目前还没有。我发现使用RX读取流的各种事情的无数例子,但我很难找到我的头。什么是正确的方式来创建一个读取流到最后的Observable

我知道我可以使用Observable.FromAsyncPattern创建的流的的BeginRead/EndRead或BeginReadLine/EndReadLine方法的包装。

但是这只会读取一次 - 当第一个观察者订阅。

我想要一个Observable,它会继续读取并且抽取OnNext直到流错误或者结束。

除此之外,我还想知道如何将多个订阅者共享该可观察项目,以便他们都可以获取这些项目。

+3

伟大的文章,但大量过时。这引用了旧的API。本章*将采用这些概念并使其适用于更新版本的Rx。 * http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#CreatingYourOwnIterator –

+0

哦,从来不知道那个链接@LeeCampbell - 非常好! – JerKimball

+0

这个问题到底发生了什么?以前的答案在哪里? –

回答

1

的解决方案是使用Observable.Create

这里是可以者进行调整读取任何种类流的例子

public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader) 
    { 

     return Observable.Create<Command>(async (subject, token) => 
     { 


      try 
      { 

       while (true) 
       { 

        if (token.IsCancellationRequested) 
        { 
         subject.OnCompleted(); 
         return; 
        } 

        //this part here can be changed to something like this 
        //int received = await Task.Factory.FromAsync<int>(innerSocket.BeginReceive(data, offset, size, SocketFlags.None, null, null), innerSocket.EndReceive); 

        Command cmd = await reader.ReadCommandAsync(); 

        subject.OnNext(cmd); 

       } 

      } 

      catch (Exception ex) 
      { 
       try 
       { 
        subject.OnError(ex); 
       } 
       catch (Exception) 
       { 
        Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere"); 
        throw; 
       } 
      } 

     }).Publish(); 

    } 

不要忘记调用Connect()的返回IConnectableObservable

3

添加到李某的回答,用rxx

using (new FileStream(@"filename.txt", FileMode.Open) 
     .ReadToEndObservable() 
     .Subscribe(x => Console.WriteLine(x.Length))) 
{ 
    Console.ReadKey(); 
} 

读缓冲区的长度将被输出。

+0

李的答案在哪里? –

+0

由于某种原因而消失 –

+0

他的回答被删除,因为它实际上并未提供问题的答案 - 只是一本[他]书的链接 –

1

嘿 - 要重用我在这里等一个答案(当然,它的一部分,反正):

编号:Reading from NetworkStream corrupts the buffer

在这方面,我有一个扩展方法是这样的:

public static class Ext 
{   
    public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize) 
    {   
     // to hold read data 
     var buffer = new byte[bufferSize]; 
     // Step 1: async signature => observable factory 
     var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
      stream.BeginRead, 
      stream.EndRead); 
     return Observable.While(
      // while there is data to be read 
      () => stream.CanRead, 
      // iteratively invoke the observable factory, which will 
      // "recreate" it such that it will start from the current 
      // stream position - hence "0" for offset 
      Observable.Defer(() => asyncRead(buffer, 0, bufferSize)) 
       .Select(readBytes => buffer.Take(readBytes).ToArray())); 
    } 
} 

你或许可以利用这个书面的形式,像这样:

// Note: ToEnumerable works here because your filestream 
// has a finite length - don't do this with infinite streams! 
var blobboData = stream 
    .ReadObservable(bufferSize) 
    // take while we're still reading data 
    .TakeWhile(returnBuffer => returnBuffer.Length > 0) 
    .ToEnumerable() 
    // mash them all together 
    .SelectMany(buffer => buffer) 
    .ToArray(); 
+0

您好JerKimball,感谢您的解答。我一直在使用它,然后很难处理错误条件。当你必须关闭连接时,你如何处理?我已经打开了问题http://stackoverflow.com/questions/32079292/reading-from-stream-using-observable-through-fromasyncpattern-how-to-close-canc关于这一点,如果你可以检查出来。 –

+0

Rxx似乎做得非常不同。任何人都可以详述吗? https://github.com/RxDave/Rxx/blob/master/Main/Source/Rxx.Bindings-Net451/System/IO/StreamExtensions.cs –

+0

这个实现有一个问题。不同的用户将获得流数据的不同份额!必须附加一个'.Publish()。Connect()'以确保所有订阅者获得所有数据。 –

1

你可以使用Repeat以保持读取行直到流结束,并且PublishReplay为了控制跨多个读取器的共享。

用于从任何流读取行,直到端部将是一个简单的,完全的Rx溶液的一个例子:

public static IObservable<string> ReadLines(Stream stream) 
{ 
    return Observable.Using(
     () => new StreamReader(stream), 
     reader => Observable.FromAsync(reader.ReadLineAsync) 
          .Repeat() 
          .TakeWhile(line => line != null)); 
} 

该解决方案还利用了以下事实的优点在于ReadLine返回null当流的末尾到达了。

相关问题