2011-10-19 62 views
1

如何构造下面的代码,以便我可以读取块中的所有流。 目前它一直返回相同的数据。不能前进。从流中异步反复读取?

public static IObservable<byte[]> AsyncRead(this Stream stream, int bufferSize) 
{ 
    var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(stream.BeginRead, stream.EndRead); 

    var buffer = new byte[bufferSize]; 

    return asyncRead(buffer, 0, bufferSize) 
     .Select(cbRead => 
        { 
         var dataChunk = new byte[cbRead]; 

         Buffer.BlockCopy(buffer, 0, dataChunk, 0, cbRead); 

         return dataChunk; 
        }) 
     .Repeat() 
     .TakeWhile(dataChunk => dataChunk.Length > 0); 
} 

回答

1

陷入同样的​​陷阱。异步流的行为类似于可重放的行为。虽然它只包含那一个项目。为了解决这个问题,将其包装在Defer这样的

Observable.Defer(() => asyncRead(buffer, 0, bufferSize)) 
...