4

我想这是一种代码审查,但这里是我的生产者/消费者模式的实现。我想知道的是会有这种情况,ReceivingThread()SendingThread()方法中的while循环可能会停止执行。请注意,从多个不同的线程调用EnqueueSend(DataSendEnqeueInfo info),我可能无法在这里使用任务,因为我必须在单独的线程中使用命令。生产者/消费者模式使用线程和EventWaitHandle

private Thread mReceivingThread; 
private Thread mSendingThread; 
private Queue<DataRecievedEnqeueInfo> mReceivingThreadQueue; 
private Queue<DataSendEnqeueInfo> mSendingThreadQueue; 
private readonly object mReceivingQueueLock = new object(); 
private readonly object mSendingQueueLock = new object(); 
private bool mIsRunning; 
EventWaitHandle mRcWaitHandle; 
EventWaitHandle mSeWaitHandle; 

private void ReceivingThread() 
{ 
    while (mIsRunning) 
    { 
     mRcWaitHandle.WaitOne(); 
     DataRecievedEnqeueInfo item = null; 
     while (mReceivingThreadQueue.Count > 0) 
     { 
      lock (mReceivingQueueLock) 
      { 
       item = mReceivingThreadQueue.Dequeue(); 
      } 
      ProcessReceivingItem(item); 
     } 
     mRcWaitHandle.Reset(); 
    } 
} 

private void SendingThread() 
{ 
    while (mIsRunning) 
    { 
     mSeWaitHandle.WaitOne(); 
     while (mSendingThreadQueue.Count > 0) 
     { 
      DataSendEnqeueInfo item = null; 
      lock (mSendingQueueLock) 
      { 
       item = mSendingThreadQueue.Dequeue(); 
      } 
      ProcessSendingItem(item); 
     } 
     mSeWaitHandle.Reset(); 
    } 
} 

internal void EnqueueRecevingData(DataRecievedEnqeueInfo info) 
{ 
    lock (mReceivingQueueLock) 
    { 
     mReceivingThreadQueue.Enqueue(info); 
     mRcWaitHandle.Set(); 
    } 
} 

public void EnqueueSend(DataSendEnqeueInfo info) 
{ 
    lock (mSendingQueueLock) 
    { 
     mSendingThreadQueue.Enqueue(info); 
     mSeWaitHandle.Set(); 
    } 
} 

P.S这里的想法是,我使用WaitHandle s到放线时,队列为空睡觉,而它们的信号新项目时排队开始。

UPDATE 我只想离开这个https://blogs.msdn.microsoft.com/benwilli/2015/09/10/tasks-are-still-not-threads-and-async-is-not-parallel/,为人们谁可能是试图实现使用TPL或任务生产者/消费者模式。

+1

您最好使用[BlockingCollection](https://msdn.microsoft.com/en-us/library/dd267312(v = vs.110).aspx),它可以为您处理所有同步逻辑。 –

+0

阻止收集是一个不错的选择,但是,您也可以尝试[TPL Dataflow](https://msdn.microsoft.com/zh-cn/library/hh228601.aspx)。如果需要,我可以提供一些示例逻辑。 – VMAtm

+0

@VMAtm我一直想绕过TPL数据流。如果你能给我提供一些示例逻辑,当然,只有在它没有太多的麻烦的情况下,这将是很有用的。 –

回答

3

就个人而言,简单的生产者 - 消费者的问题,我只想用BlockingCollection。不需要手动编写自己的同步逻辑。如果队列中没有任何项目,消费线程也会阻塞。

这里是你的代码看起来,如果你使用这个类,如:

private BlockingCollection<DataRecievedEnqeueInfo> mReceivingThreadQueue = new BlockingCollection<DataRecievedEnqeueInfo>(); 
private BlockingCollection<DataSendEnqeueInfo> mSendingThreadQueue = new BlockingCollection<DataSendEnqeueInfo>(); 

public void Stop() 
{ 
    // No need for mIsRunning. Makes the enumerables in the GetConsumingEnumerable() calls 
    // below to complete. 
    mReceivingThreadQueue.CompleteAdding(); 
    mSendingThreadQueue.CompleteAdding(); 
} 

private void ReceivingThread() 
{ 
    foreach (DataRecievedEnqeueInfo item in mReceivingThreadQueue.GetConsumingEnumerable()) 
    { 
     ProcessReceivingItem(item); 
    } 
} 

private void SendingThread() 
{ 
    foreach (DataSendEnqeueInfo item in mSendingThreadQueue.GetConsumingEnumerable()) 
    { 
     ProcessSendingItem(item); 
    } 
} 

internal void EnqueueRecevingData(DataRecievedEnqeueInfo info) 
{ 
    // You can also use TryAdd() if there is a possibility that you 
    // can add items after you have stopped. Otherwise, this can throw an 
    // an exception after CompleteAdding() has been called. 
    mReceivingThreadQueue.Add(info); 
} 

public void EnqueueSend(DataSendEnqeueInfo info) 
{ 
    mSendingThreadQueue.Add(info); 
} 
+0

如果我没记错的话,BlockingCollection.GetConsumingEnumerable()并不一定按照排列顺序返回集合,这对我来说是个问题。另外,我希望线程在收集为空时进入睡眠状态,这就是为什么使用Waithandles,那么内部处理也是如此?我也要对此进行基准测试,但在此之前,您知道BlockingCollections的性能是好还是差吗? –

+0

'BlockingCollection'将使用你给它的任何'IProducerConsumerCollection '。默认情况下,它将使用'ConcurrentQueue ',因此它将按照它的入队顺序进行处理。与您对解决方案进行编码的方式相同。性能比较明智?只有你可以分析它。但是我敢肯定,相比于你(或我)手中的任何东西,它至少会相同或更快。 –

+0

哇! BlockingCollection.GetConsumingEnumerable()会自动阻塞线程..我不知道为什么总是害怕使用更新的API在并发和/或线程方面。 -_-。谢谢你的时间,我学到了一些新东西。 –

2

使用BlockingCollection代替排队,和的EventWaitHandle对象锁定:

public class DataInfo { } 

private Thread mReceivingThread; 
private Thread mSendingThread; 

private BlockingCollection<DataInfo> queue; 

private CancellationTokenSource receivingCts = new CancellationTokenSource(); 

private void ReceivingThread() 
{ 
    try 
    { 
     while (!receivingCts.IsCancellationRequested) 
     { 
      // This will block until an item is added to the queue or the cancellation token is cancelled 
      DataInfo item = queue.Take(receivingCts.Token); 

      ProcessReceivingItem(item); 
     } 
    } 
    catch (OperationCanceledException) 
    { 

    } 
} 

internal void EnqueueRecevingData(DataInfo info) 
{ 
    // When a new item is produced, just add it to the queue 
    queue.Add(info); 
} 

// To cancel the receiving thread, cancel the token 
private void CancelReceivingThread() 
{ 
    receivingCts.Cancel(); 
} 
1

正如评论所说,你也可以给一个尝试到TPL Dataflow块。

据我所看到的,你有两个类似的管道,用于接收和发送,所以我假定你的类层次结构是这样的:

class EnqueueInfo { } 
class DataRecievedEnqeueInfo : EnqueueInfo { } 
class DataSendEnqeueInfo : EnqueueInfo { } 

我们可以组装一个抽象类,这将封装创建管道,并提供该接口用于处理项目,像这样的逻辑:

abstract class EnqueueInfoProcessor<T> 
    where T : EnqueueInfo 
{ 
    // here we will store all the messages received before the handling 
    private readonly BufferBlock<T> _buffer; 
    // simple action block for actual handling the items 
    private ActionBlock<T> _action; 

    // cancellation token to cancel the pipeline 
    public EnqueueInfoProcessor(CancellationToken token) 
    { 
     _buffer = new BufferBlock<T>(new DataflowBlockOptions { CancellationToken = token }); 
     _action = new ActionBlock<T>(item => ProcessItem(item), new ExecutionDataflowBlockOptions 
     { 
      MaxDegreeOfParallelism = Environment.ProcessorCount, 
      CancellationToken = token 
     }); 

     // we are linking two blocks so all the items from buffer 
     // will flow down to action block in order they've been received 
     _buffer.LinkTo(_action, new DataflowLinkOptions { PropagateCompletion = true }); 
    } 

    public void PostItem(T item) 
    { 
     // synchronously wait for posting to complete 
     _buffer.Post(item); 
    } 

    public async Task SendItemAsync(T item) 
    { 
     // asynchronously wait for message to be posted 
     await _buffer.SendAsync(item); 
    } 

    // abstract method to implement 
    protected abstract void ProcessItem(T item); 
} 

请注意,您还可以通过使用Encapsulate<TInput, TOutput>方法封装两个街区之间的联系,但在这种情况下,你要好好地处理Completion缓冲区块,如果你正在使用它。

在此之后,我们只需要实现接收和发送处理逻辑两种方法:

public class SendEnqueueInfoProcessor : EnqueueInfoProcessor<DataSendEnqeueInfo> 
{ 
    SendEnqueueInfoProcessor(CancellationToken token) 
     : base(token) 
    { 

    } 
    protected override void ProcessItem(DataSendEnqeueInfo item) 
    { 
     // send logic here 
    } 
} 

public class RecievedEnqueueInfoProcessor : EnqueueInfoProcessor<DataRecievedEnqeueInfo> 
{ 
    RecievedEnqueueInfoProcessor(CancellationToken token) 
     : base(token) 
    { 

    } 
    protected override void ProcessItem(DataRecievedEnqeueInfo item) 
    { 
     // recieve logic here 
    } 
} 

您还可以TransformBlock<DataRecievedEnqeueInfo, DataSendEnqeueInfo>创建更复杂的管道,如果你的信息流是关于ReceiveInfo消息成为SendInfo

+0

哇!非常感谢你为这样一个详细的例子。我已经阅读了关于TransformBlock几天的MSDN,它看起来很有趣。我要实现这个代码,看看它是如何工作的。 –

+0

@SushantPoojary祝你好运 – VMAtm