2016-03-02 52 views
6

我正在使用producer/consumer pattern实现数据链接层。数据链路层有自己的线程和状态机,用于通过线路(以太网,RS-232 ...)传输数据链路协议。物理层的接口表示为System.IO.Stream。另一个线程将消息写入数据链接对象并从中读取消息。C#等待生产者/消费者中的多个事件

数据链路对象具有空闲状态必须等待的四个条件之一:

  1. 一个字节被接收
  2. 消息是可从网络螺纹
  3. 保活定时器已过期
  4. 所有通信都被

我有困难的时候figu网络层取消在没有将通信拆分为读/写线程的情况下(这会显着增加复杂性),最好的方法是实现这一点。以下是我能得到3开出4:

// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token. 
stream.ReadTimeout = 10000; 
await stream.ReadAsync(buf, 0, 1, cts.Token); 

BlockingCollection<byte[]> SendQueue = new ...; 
... 
// Check for a message from network layer. Timeout after 10 seconds. 
// Monitor cancellation token. 
SendQueue.TryTake(out msg, 10000, cts.Token); 

我应该怎么做才能阻止线程,等待所有四个条件?所有建议都欢迎。我没有设置任何架构或数据结构。

编辑:********感谢大家的帮助。这是我的解决方案********

首先我不认为有一个生产者/消费者队列的异步实现。所以我实施了类似this stackoverflow post的东西。

我需要一个外部和内部的取消源来停止消费者线程并分别取消中间任务,similar to this article

byte[] buf = new byte[1]; 
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource()) 
{ 
    CancellationToken internalToken = internalTokenSource.Token; 
    CancellationToken stopToken = stopTokenSource.Token; 
    using (CancellationTokenSource linkedCts = 
     CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken)) 
    { 
     CancellationToken ct = linkedCts.Token; 
     Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct); 
     Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct); 
     Task keepAliveTask = Task.Delay(m_keepAliveTime, ct); 

     // Wait for at least one task to complete 
     await Task.WhenAny(readTask, msgTask, keepAliveTask); 

     // Next cancel the other tasks 
     internalTokenSource.Cancel(); 
     try { 
      await Task.WhenAll(readTask, msgTask, keepAliveTask); 
     } catch (OperationCanceledException e) { 
      if (e.CancellationToken == stopToken) 
       throw; 
     } 

     if (msgTask.IsCompleted) 
      // Send the network layer message 
     else if (readTask.IsCompleted) 
      // Process the byte from the physical layer 
     else 
      Contract.Assert(keepAliveTask.IsCompleted); 
      // Send a keep alive message 
    } 
} 
+3

['await Task.WhenAny(...)'](https://msdn.microsoft.com/en-us/library/system.threading.tasks.task.whenany%28v=vs.110%29.aspx)可能有帮助。 –

回答

2

在这种情况下,我只会用取消标记注销。像保活计时器一样的重复超时可以更好地表示为计时器。

所以,我会把它建模为三个可取消任务。首先,将取消标记:

的所有通信是由网络层取消

CancellationToken token = ...; 

然后,三个并发操作:

一个字节被接收

var readByteTask = stream.ReadAsync(buf, 0, 1, token); 

保活定时器超时

var keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token); 

消息是从网线

这一个是有点棘手。您当前的代码使用BlockingCollection<T>,它不是异步兼容的。我建议切换到TPL Dataflow's BufferBlock<T>my own AsyncProducerConsumerQueue<T>,它们中的任何一个都可以用作异步兼容的生产者/消费者队列(意味着生产者可以是同步或异步的,消费者可以是同步的或异步的)。

BufferBlock<byte[]> SendQueue = new ...; 
... 
var messageTask = SendQueue.ReceiveAsync(token); 

然后你可以使用Task.WhenAny,以确定这些任务的完成:

var completedTask = await Task.WhenAny(readByteTask, keepAliveTimerTask, messageTask); 

现在,你可以通过比较completedTask别人和await检索结果荷兰国际集团他们:

if (completedTask == readByteTask) 
{ 
    // Throw an exception if there was a read error or cancellation. 
    await readByteTask; 
    var byte = buf[0]; 
    ... 
    // Continue reading 
    readByteTask = stream.ReadAsync(buf, 0, 1, token); 
} 
else if (completedTask == keepAliveTimerTask) 
{ 
    // Throw an exception if there was a cancellation. 
    await keepAliveTimerTask; 
    ... 
    // Restart keepalive timer. 
    keepAliveTimerTask = Task.Delay(TimeSpan.FromSeconds(10), token); 
} 
else if (completedTask == messageTask) 
{ 
    // Throw an exception if there was a cancellation (or the SendQueue was marked as completed) 
    byte[] message = await messageTask; 
    ... 
    // Continue reading 
    messageTask = SendQueue.ReceiveAsync(token); 
} 
+0

希望我之前看到过这个,因为我去了并实现了自己的AsyncQueue,虽然不像您的一般。你为什么要等待每个人的任务?它不应该完成吗? –

+0

@BrianHeilig:有必要检索结果。这些任务已经完成,但他们的结果没有被观察到。结果可以是数据,例如'messageTask',但结果也可以是例外。 'keepAliveTimerTask'可以有一个异常,如果它被取消,'readByteTask'可以有一个异常,如果它被取消或者有一些I/O读取错误。如果存在,“等待”将(重新)提出这些例外。 –

1

取消读取不会让您知道数据是否被读取。取消和阅读不是相对于原子的。只有在取消后关闭流,该方法才有效。

队列方法更好。您可以创建链接的CancellationTokenSource,该链接在您需要时即被取消。而不是通过cts.Token你传递一个你控制的令牌。

然后,您可以根据时间,另一个标记以及您喜欢的任何其他事件发出该标记的信号。如果使用内置超时,则队列将在内部执行相同的操作,以将传入的令牌与超时链接起来。

+0

对不起,我的细节有点不足。 cts是由数据链路层拥有的CancellationTokenSource。取消也由数据链路层控制;网络线程调用停止,取消所有通信并等待线程完成。我们的目标是平稳而快速地停止数据链路层。 –

+1

为什么这个答案不起作用?你可以在任何你喜欢的条件下使TryTake中止。 – usr

+0

我想我明白了。你建议我在网络线程取消或一个字节可用时发出取消令牌的信号,对吗?我想我需要另一个线程同步从流中读取,然后在读取完成时发出取消标记。我需要一个变量来确定什么被取消(读取或停止)。我还需要在取消消息后取消读取线程,对吗?或者我错过了什么? –

3

我会与您的选择两个,等待任何4种情况发生。假设你已经有4个任务已经awaitable方法:

var task1 = WaitForByteReceivedAsync(); 
var task2 = WaitForMessageAvailableAsync(); 
var task3 = WaitForKeepAliveTimerAsync(); 
var task4 = WaitForCommunicationCancelledAsync(); 

// now gather them 
IEnumerable<Task<bool>> theTasks = new List<IEnumerable<Task<bool>>>{ 
task1, task2, task3, task4 
}; 

// Wait for any of the things to complete 
var result = await Task.WhenAny(theTasks); 

上面的代码将立即恢复第一个任务完成后,而忽视了其他3。

注:

the documentation for WhenAny,它说:

返回的任务将始终与它的结果的RanToCompletion状态设置为第一任务完成结束。即使完成的第一项任务以“已取消”或“已故”状态结束,也是如此。

相信发生了什么事之前,所以一定要做到这一点最后的检查:

if(result.Result.Result == true) 
... // First Result is the task, the second is the bool that the task returns 
+1

我认为你的意思是WhenAny() –

+1

不。WhenAny返回任何任务返回的时刻。当所有的任务都等待完成,并返回一个任务本身(而不是WaitAll() –

+1

这是行不通的,因为字节读取的结果可能会被丢弃,我认为他关心的数据不会丢失。 – usr