2011-11-22 41 views
13

是否有类任何.NET数据结构/组合允许字节数据附加到一个缓冲区的末尾,但所有偷窥和读取是从一开始,缩短了缓冲时我读?FIFO /队列缓冲器专业字节流

MemoryStream类似乎做了这个的一部分,但我需要维护单独的位置进行读取和写入,并且它不会在读取之后在开始时自动丢弃数据。

答案已发布在回复this question这基本上是我想要做的,但我更喜欢的东西我可以在同一过程的不同组件上做异步I/O,就像一个普通的管道甚至是网络流(我需要先过滤/处理数据)。

+1

在读取缓冲区内跳来跳去有什么问题吗? – Ryan

+0

只有我说的,不得不持续跟踪,而不是读的风格的NetworkStream,读,读等 – Deanna

+0

你需要读写不同大小的阵列? 'byte []'的队列不会对你有用吗? – svick

回答

10

我会后一些逻辑,我在工作中写了一个项目,一次的剥离出来的副本。这个版本的优点是它可以与缓冲数据的链表一起工作,因此在阅读时你不必缓存大量的内存和/或复制内存。此外,它的线程安全并且表现得像一个网络流,即:当没有可用数据时读取:等到有数据可用或超时。另外,当读取x个字节并且只有y个字节时,在读取所有字节后返回。我希望这有帮助!

public class SlidingStream : Stream 
{ 
    #region Other stream member implementations 

    ... 

    #endregion Other stream member implementations 

    public SlidingStream() 
    { 
     ReadTimeout = -1; 
    } 

    private readonly object _writeSyncRoot = new object(); 
    private readonly object _readSyncRoot = new object(); 
    private readonly LinkedList<ArraySegment<byte>> _pendingSegments = new LinkedList<ArraySegment<byte>>(); 
    private readonly ManualResetEventSlim _dataAvailableResetEvent = new ManualResetEventSlim(); 

    public int ReadTimeout { get; set; } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     if (_dataAvailableResetEvent.Wait(ReadTimeout)) 
      throw new TimeoutException("No data available"); 

     lock (_readSyncRoot) 
     { 
      int currentCount = 0; 
      int currentOffset = 0; 

      while (currentCount != count) 
      { 
       ArraySegment<byte> segment = _pendingSegments.First.Value; 
       _pendingSegments.RemoveFirst(); 

       int index = segment.Offset; 
       for (; index < segment.Count; index++) 
       { 
        if (currentOffset < offset) 
        { 
         currentOffset++; 
        } 
        else 
        { 
         buffer[currentCount] = segment.Array[index]; 
         currentCount++; 
        } 
       } 

       if (currentCount == count) 
       { 
        if (index < segment.Offset + segment.Count) 
        { 
         _pendingSegments.AddFirst(new ArraySegment<byte>(segment.Array, index, segment.Offset + segment.Count - index)); 
        } 
       } 

       if (_pendingSegments.Count == 0) 
       { 
        _dataAvailableResetEvent.Reset(); 

        return currentCount; 
       } 
      } 

      return currentCount; 
     } 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     lock (_writeSyncRoot) 
     { 
      byte[] copy = new byte[count]; 
      Array.Copy(buffer, offset, copy, 0, count); 

      _pendingSegments.AddLast(new ArraySegment<byte>(copy)); 

      _dataAvailableResetEvent.Set(); 
     } 
    } 
} 
+1

看起来不错,而且是我一直在走的方式。我今晚会尝试。 – Deanna

+0

在我看来,如果您尝试在没有可用的情况下读取数据,则会崩溃。 – svick

+0

@svick - 绝对正确,它只是一个草稿,没有参数验证等。manualResetEvent就是出于那个唯一的原因,我忘记了在读取方法的开始处等待它。现在修复。谢谢你的提问 – Polity

1

该代码可以比接受的答案更简单。没有必要使用for循环。:

/// <summary> 
/// This class is a very fast and threadsafe FIFO buffer 
/// </summary> 
public class FastFifo 
{ 
    private List<Byte> mi_FifoData = new List<Byte>(); 

    /// <summary> 
    /// Get the count of bytes in the Fifo buffer 
    /// </summary> 
    public int Count 
    { 
     get 
     { 
      lock (mi_FifoData) 
      { 
       return mi_FifoData.Count; 
      } 
     } 
    } 

    /// <summary> 
    /// Clears the Fifo buffer 
    /// </summary> 
    public void Clear() 
    { 
     lock (mi_FifoData) 
     { 
      mi_FifoData.Clear(); 
     } 
    } 

    /// <summary> 
    /// Append data to the end of the fifo 
    /// </summary> 
    public void Push(Byte[] u8_Data) 
    { 
     lock (mi_FifoData) 
     { 
      // Internally the .NET framework uses Array.Copy() which is extremely fast 
      mi_FifoData.AddRange(u8_Data); 
     } 
    } 

    /// <summary> 
    /// Get data from the beginning of the fifo. 
    /// returns null if s32_Count bytes are not yet available. 
    /// </summary> 
    public Byte[] Pop(int s32_Count) 
    { 
     lock (mi_FifoData) 
     { 
      if (mi_FifoData.Count < s32_Count) 
       return null; 

      // Internally the .NET framework uses Array.Copy() which is extremely fast 
      Byte[] u8_PopData = new Byte[s32_Count]; 
      mi_FifoData.CopyTo(0, u8_PopData, 0, s32_Count); 
      mi_FifoData.RemoveRange(0, s32_Count); 
      return u8_PopData; 
     } 
    } 

    /// <summary> 
    /// Gets a byte without removing it from the Fifo buffer 
    /// returns -1 if the index is invalid 
    /// </summary> 
    public int PeekAt(int s32_Index) 
    { 
     lock (mi_FifoData) 
     { 
      if (s32_Index < 0 || s32_Index >= mi_FifoData.Count) 
       return -1; 

      return mi_FifoData[s32_Index]; 
     } 
    } 
} 
+0

这基本上与链接的问题相同,它不履行异步或阻止的愿望。不过谢谢。 – Deanna

+0

好的,但该代码不是那么优雅,它不是线程安全的。你可以用6行来完成,而不需要16行。 – Elmue