2015-10-18 99 views
1

首先我应该提及,我没有内部的Stream对象可用。取而代之的是,我有这个对象:实现自定义流

public interface IChannel 
{ 
    void Send(byte[] data); 
    event EventHandler<byte[]> Receive; 
} 

我想实现一个流类,一个是这样的:

public class ChannelStream : Stream 
{ 
    private readonly IChannel _channel; 

    public ChannelStream(IChannel channel) 
    { 
     this._channel = channel; 
    } 

    // TODO: Implement Stream class 
} 

我需要的功能非常相似,NetworkStream
写入字节到我的流应该将这些字节添加到缓冲区并调用_channel.Send一次Flush()被称为。
流还将监听_channel.Receive事件并将字节添加到另一个内部缓冲区,直到从流中读取它们。如果流没有可用的数据,它应该阻塞直到新数据可用。

但是,我正在努力实施。我在内部使用两个MemoryStream进行了实验,但这导致缓冲区继续吃越来越多的内存。

我可以用什么样的收集/流来实现我的流?

+1

'BlockingCollection'可能是一个有用的技巧。 – SimpleVar

回答

2

考虑你需要什么从收集和去那里。

下面是当你需要某种形式的集合,你应该考虑几个问题:

  1. 你需要在集合中的项随机访问?

  2. 集合是否将被多个线程访问?

  3. 读取后需要保留集合中的数据吗?

  4. 排序重要吗?如果是这样,什么顺序 - 添加订单,反向添加订单,通过一些比较项目排序?

对于在这种情况下,答案是否定的,是的,没有,是的输出缓冲器:添加顺序。这几乎挑出了ConcurrentQueue类。这允许您添加一个或多个源代码中的对象,这些对象不需要与正在读取它们的代码位于相同的线程中。它不会让你任意索引收集(好吧,不是直接),你不需要。

我会使用相同类型的输入缓冲区,以“当前块”缓冲区来保存最近读缓冲区,包裹在一些简单的对象锁定机制,以处理任何线程的问题。

输出部分看起来是这样的:

// Output buffer 
private readonly ConcurrentQueue<byte[]> _outputBuffer = new ConcurrentQueue<byte[]>(); 

public override void Write(byte[] buffer, int offset, int count) 
{ 
    // Copy written data to new buffer and add to output queue 
    byte[] data = new byte[count]; 
    Buffer.BlockCopy(buffer, offset, data, 0, count); 
    _outputBuffer.Enqueue(data); 
} 

public override void Flush() 
{ 
    // pull everything out of the queue and send to wherever it is going 
    byte[] curr; 
    while (_outputBuffer.TryDequeue(out curr)) 
     internalSendData(curr); 
} 

internalSendData方法是数据会再出去到网络。

读缓冲是稍微复杂一些:

// collection to hold unread input data 
private readonly ConcurrentQueue<byte[]> _inputBuffer = new ConcurrentQueue<byte[]>(); 
// current data block being read from 
private byte[] _inputCurrent = null; 
// read offset in current block 
private short _inputPos = 0; 
// object for locking access to the above. 
private readonly object _inputLock = new object(); 

public override int Read(byte[] buffer, int offset, int count) 
{ 
    int readCount = 0; 
    lock(_inputLock) 
    { 
     while (count > 0) 
     { 
      if (_inputCurrent == null || _inputCurrent.Length <= _inputPos) 
      { 
       // read next block from input buffer 
       if (!_inputBuffer.TryDequeue(out _inputCurrent)) 
        break; 

       _inputPos = 0; 
      } 

      // copy bytes to destination 
      int nBytes = Math.Min(count, _inputCurrent.Length - _inputPos); 
      Buffer.BlockCopy(_inputCurrent, _inputPos, buffer, offset, nBytes); 

      // adjust all the offsets and counters 
      readCount += nBytes; 
      offset += nBytes; 
      count -= nBytes; 
      _inputPos += (short)nBytes; 
     } 
    } 
    return readCount; 
} 

希望这是有道理的。

使用这种软缓冲的队列意味着数据只在内存中保存,只要它们被延迟发送或读取。一旦你调用Flush输出缓冲区的内存被释放以进行垃圾收集,所以你不必担心内存爆炸,除非你试图发送比实际传输机制更快的处理速度。但是,如果你每秒钟通过ADSL连接排队数兆字节的数据,没有什么可以为你节省:P

我想在上面添加一些改进,就像一些检查以确保一旦缓冲区处于合理的水平,即可自动调用Flush