嘿 - 要重用我在这里等一个答案(当然,它的一部分,反正):
编号:Reading from NetworkStream corrupts the buffer
在这方面,我有一个扩展方法是这样的:
public static class Ext
{
public static IObservable<byte[]> ReadObservable(this Stream stream, int bufferSize)
{
// to hold read data
var buffer = new byte[bufferSize];
// Step 1: async signature => observable factory
var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
stream.BeginRead,
stream.EndRead);
return Observable.While(
// while there is data to be read
() => stream.CanRead,
// iteratively invoke the observable factory, which will
// "recreate" it such that it will start from the current
// stream position - hence "0" for offset
Observable.Defer(() => asyncRead(buffer, 0, bufferSize))
.Select(readBytes => buffer.Take(readBytes).ToArray()));
}
}
你或许可以利用这个书面的形式,像这样:
// Note: ToEnumerable works here because your filestream
// has a finite length - don't do this with infinite streams!
var blobboData = stream
.ReadObservable(bufferSize)
// take while we're still reading data
.TakeWhile(returnBuffer => returnBuffer.Length > 0)
.ToEnumerable()
// mash them all together
.SelectMany(buffer => buffer)
.ToArray();
伟大的文章,但大量过时。这引用了旧的API。本章*将采用这些概念并使其适用于更新版本的Rx。 * http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#CreatingYourOwnIterator –
哦,从来不知道那个链接@LeeCampbell - 非常好! – JerKimball
这个问题到底发生了什么?以前的答案在哪里? –