我正在使用producer/consumer pattern实现数据链接层。数据链路层有自己的线程和状态机,用于通过线路(以太网,RS-232 ...)传输数据链路协议。物理层的接口表示为System.IO.Stream。另一个线程将消息写入数据链接对象并从中读取消息。C#等待生产者/消费者中的多个事件
数据链路对象具有空闲状态必须等待的四个条件之一:
- 一个字节被接收
- 消息是可从网络螺纹
- 保活定时器已过期
- 所有通信都被
我有困难的时候figu网络层取消在没有将通信拆分为读/写线程的情况下(这会显着增加复杂性),最好的方法是实现这一点。以下是我能得到3开出4:
// Read a byte from 'stream'. Timeout after 10 sec. Monitor the cancellation token.
stream.ReadTimeout = 10000;
await stream.ReadAsync(buf, 0, 1, cts.Token);
或
BlockingCollection<byte[]> SendQueue = new ...;
...
// Check for a message from network layer. Timeout after 10 seconds.
// Monitor cancellation token.
SendQueue.TryTake(out msg, 10000, cts.Token);
我应该怎么做才能阻止线程,等待所有四个条件?所有建议都欢迎。我没有设置任何架构或数据结构。
编辑:********感谢大家的帮助。这是我的解决方案********
首先我不认为有一个生产者/消费者队列的异步实现。所以我实施了类似this stackoverflow post的东西。
我需要一个外部和内部的取消源来停止消费者线程并分别取消中间任务,similar to this article。
byte[] buf = new byte[1];
using (CancellationTokenSource internalTokenSource = new CancellationTokenSource())
{
CancellationToken internalToken = internalTokenSource.Token;
CancellationToken stopToken = stopTokenSource.Token;
using (CancellationTokenSource linkedCts =
CancellationTokenSource.CreateLinkedTokenSource(stopToken, internalToken))
{
CancellationToken ct = linkedCts.Token;
Task<int> readTask = m_stream.ReadAsync(buf, 0, 1, ct);
Task<byte[]> msgTask = m_sendQueue.DequeueAsync(ct);
Task keepAliveTask = Task.Delay(m_keepAliveTime, ct);
// Wait for at least one task to complete
await Task.WhenAny(readTask, msgTask, keepAliveTask);
// Next cancel the other tasks
internalTokenSource.Cancel();
try {
await Task.WhenAll(readTask, msgTask, keepAliveTask);
} catch (OperationCanceledException e) {
if (e.CancellationToken == stopToken)
throw;
}
if (msgTask.IsCompleted)
// Send the network layer message
else if (readTask.IsCompleted)
// Process the byte from the physical layer
else
Contract.Assert(keepAliveTask.IsCompleted);
// Send a keep alive message
}
}
['await Task.WhenAny(...)'](https://msdn.microsoft.com/en-us/library/system.threading.tasks.task.whenany%28v=vs.110%29.aspx)可能有帮助。 –