2014-03-06 160 views
2

我在TcpClient的顶部使用了SslStream。不幸的是`SslStream``不支持同时写入或读取多个线程。这就是为什么我写的周围是我自己的包装:线程安全缓冲区封装流

private ConcurrentQueue<byte> sendQueue; 

private volatile bool oSending; 

private readonly object writeLock; 


public async void Write(byte[] buffer, int offset, int count) 
{ 
    if (oSending) 
    { 
    lock (writeLock) 
    { 
     foreach (var b in buffer) 
     { 
     sendQueue.Enqueue(b); 
     } 
    } 
    } 
    else 
    { 
    oSending = true; 
    await stream.WriteAsync(buffer, offset, count); 
    oSending = false; 

    lock (writeLock) 
    { 
     if (sendQueue.Count > 0) 
     { 
     Write(sendQueue.ToArray(), 0, sendQueue.Count); 
     sendQueue = new ConcurrentQueue<byte>(); 
     } 
    } 
    } 
} 

背后的意图是:

  1. 如果流是免费的,立即写入流。
  2. 如果流正忙,请写入缓冲区。
  3. 如果流从发送返回,请检查队列中是否有数据并递归发送。

到目前为止,我已经尝试了几种解决方案,但似乎每次发送的数据太多。

P.S .:我知道按字节顺序填充队列并不好,但这只是快速和肮脏。

更新:我已根据Dirk的评论添加了队列的删除。

+3

您是否曾经从队列中移除项目? – Dirk

回答

1

更新

使用TPL Dataflow

using System.Threading.Tasks.Dataflow; 

public class DataflowStreamWriter 
{ 
    private readonly MemoryStream _stream = new MemoryStream(); 
    private readonly ActionBlock<byte[]> _block; 

    public DataflowStreamWriter() 
    { 
     _block = new ActionBlock<byte[]>(
         bytes => _stream.Write(bytes, 0, bytes.Length)); 
    } 

    public void Write(byte[] data) 
    { 
     _block.Post(data); 
    } 
} 

这里有一个更好的生产者 - 消费者的做法。

每当有人向您的ConcurrentStreamWriter实例写入数据时,该数据将被添加到缓冲区中。此方法是线程安全的,并且多个线程可能一次写入数据。这些是您的生产商

然后,您有一个单一的消费者 - 将数据从缓冲区中消耗并写入流中。

A BlockingCollection<T>用于生产者和消费者之间的沟通。这样,如果没有人生产,消费者就会闲置。每当生产者开始写入和写入缓冲区时,消费者都会醒来。

消费者被懒惰地初始化 - 只有当某些数据第一次可用时才会创建它。

public class ConcurrentStreamWriter : IDisposable 
{ 
    private readonly MemoryStream _stream = new MemoryStream(); 
    private readonly BlockingCollection<byte> _buffer = new BlockingCollection<byte>(new ConcurrentQueue<byte>()); 

    private readonly object _writeBufferLock = new object(); 
    private Task _flusher; 
    private volatile bool _disposed; 

    private void FlushBuffer() 
    { 
     //keep writing to the stream, and block when the buffer is empty 
     while (!_disposed) 
      _stream.WriteByte(_buffer.Take()); 

     //when this instance has been disposed, flush any residue left in the ConcurrentStreamWriter and exit 
     byte b; 
     while (_buffer.TryTake(out b)) 
      _stream.WriteByte(b); 
    } 

    public void Write(byte[] data) 
    { 
     if (_disposed) 
      throw new ObjectDisposedException("ConcurrentStreamWriter"); 

     lock (_writeBufferLock) 
      foreach (var b in data) 
       _buffer.Add(b); 

     InitFlusher(); 
    } 

    public void InitFlusher() 
    { 
     //safely create a new flusher task if one hasn't been created yet 
     if (_flusher == null) 
     { 
      Task newFlusher = new Task(FlushBuffer); 
      if (Interlocked.CompareExchange(ref _flusher, newFlusher, null) == null) 
       newFlusher.Start(); 
     } 
    } 

    public void Dispose() 
    { 
     _disposed = true; 
     if (_flusher != null) 
      _flusher.Wait(); 

     _buffer.Dispose(); 
    } 
} 
+0

感谢您的支持!基于TPL Dataflow的解决方案效果很好。这样做会更好吗?_block = new ActionBlock (async bytes => await stream.WriteAsync(bytes,0,bytes.Length));我想避免为每一个打开的连接阻塞一个线程。 – sqeez3r

+0

@ sqeez3r是的,它会:) – dcastro

1
  1. 你锁定访问ConcurrentQueue<T> - 你不需要的是,队列是线程安全已经
  2. if(oSending) {} else {oSending = true}不是线程安全的。两个线程可能会将oSending读为false,输入else块并将其设置为true。现在你有两个线程写入流。
  3. 正如Dirk指出的那样,您并未从队列中移除项目。

我的修改:

  1. 而不是使用一个布尔标志的,使用Monitor.TryEnter而尝试访问流。如果流正在写入,则该调用将立即返回 - 并继续写入缓冲区。

  2. 执行IDisposable并确保Dispose刷新缓冲区。

  3. 写它的时候,从async void保持字节顺序
  4. 更改签名async Task只锁定队列。


private readonly ConcurrentQueue<byte> _bufferQueue = new ConcurrentQueue<byte>(); 

private readonly object _bufferLock = new object(); 
private readonly object _streamLock = new object(); 
private readonly MemoryStream stream = new MemoryStream(); 

public async Task Write(byte[] data, int offset, int count) 
{ 
    bool streamLockTaken = false; 

    try 
    { 
     //attempt to acquire the lock - if lock is currently taken, return immediately 
     Monitor.TryEnter(_streamLock, ref streamLockTaken); 

     if (streamLockTaken) //write to stream 
     { 
      //write data to stream and flush the buffer 
      await stream.WriteAsync(data, offset, count); 
      await FlushBuffer(); 

     } 
     else //write to buffer 
     { 
      lock (_bufferLock) 
       foreach (var b in data) 
        _bufferQueue.Enqueue(b); 
     } 
    } 
    finally 
    { 
     if (streamLockTaken) 
      Monitor.Exit(_streamLock); 
    } 
} 

private async Task FlushBuffer() 
{ 
    List<byte> bufferedData = new List<byte>(); 
    byte b; 
    while (_bufferQueue.TryDequeue(out b)) 
     bufferedData.Add(b); 

    await stream.WriteAsync(bufferedData.ToArray(), 0, bufferedData.Count); 
} 

public void Dispose() 
{ 
    lock(_streamLock) 
     FlushBuffer().Wait(); 
} 
+0

1.我正在使用锁,因为我不想丢失字节的顺序。只要我一次添加所有字节范围,我就可以删除它。 2.我该如何纠正?当我从一种可能的解决方案转变为另一种解决方案时,我忘了保留这一点。 (我开始使用'BeginWrite'和一个回调方法 – sqeez3r

+0

@ sqeez3r我已经用一个例子更新了我的文章 - 尝试一下。关于锁,足够公平,但你只需要锁来写入队列,而不是从它读。 – dcastro

+0

感谢您更新您的文章和感谢的例子。我已经尝试过,但我遇到SynchronizationLockException。我目前正试图了解为什么发生这种情况。 – sqeez3r

0

你就不能锁定底层流?我相信它可以这样简单:

private readonly object writeLock = new Object(); 

public async void Write(byte[] buffer, int offset, int count) 
{ 
    lock (writeLock) 
    { 
     await stream.WriteAsync(buffer, offset, count); 
    } 
} 

另外,与你的队列实现,我认为这是一个变化,一个写可排队从未写入流机器人。例如,在另一个线程退出队列之后排队,但在该线程释放其锁之前。

+1

只需锁定流将导致许多线程等待锁定。该应用程序将处理多达3000个连接,我无法承受过多的锁。 – sqeez3r

+0

@ sqeez3r在这种情况下,dcastro的代码看起来不错。请记住,尽管大多数线程将快速释放(缓冲)锁,但会有线程最终收取从其他线程排队的所有数据的费用。 – MarkO