我想要实现使用持续运行类似问题here和代码here的BufferBlock消费者/生产模式。TPL BufferBlock食用方法不会被调用
我试图使用类似于OP的ActionBlock,但是如果bufferblock已满并且新消息位于其队列中,则新消息永远不会添加到ConcurrentDictionary _queue中。
在ConsumeAsync方法时,一个新的消息添加到与该呼叫的bufferblock不会被调用下面的代码:_messageBufferBlock.SendAsync(message)
我怎样才能纠正下面的代码,以便ConsumeAsync方法被调用每一个新的时间消息是使用_messageBufferBlock.SendAsync(message)
添加的?
public class PriorityMessageQueue
{
private volatile ConcurrentDictionary<int,MyMessage> _queue = new ConcurrentDictionary<int,MyMessage>();
private volatile BufferBlock<MyMessage> _messageBufferBlock;
private readonly Task<bool> _initializingTask; // not used but allows for calling async method from constructor
private int _dictionaryKey;
public PriorityMessageQueue()
{
_initializingTask = Init();
}
public async Task<bool> EnqueueAsync(MyMessage message)
{
return await _messageBufferBlock.SendAsync(message);
}
private async Task<bool> ConsumeAsync()
{
try
{
// This code does not fire when a new message is added to the buffereblock
while (await _messageBufferBlock.OutputAvailableAsync())
{
// A message object is never received from the bufferblock
var message = await _messageBufferBlock.ReceiveAsync();
}
return true;
}
catch (Exception ex)
{
return false;
}
}
private async Task<bool> Init()
{
var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = Environment.ProcessorCount,
BoundedCapacity = 50
};
var prioritizeMessageBlock = new ActionBlock<MyMessage>(msg =>
{
SetMessagePriority(msg);
}, executionDataflowBlockOptions);
_messageBufferBlock = new BufferBlock<MyMessage>();
_messageBufferBlock.LinkTo(prioritizeMessageBlock, new DataflowLinkOptions { PropagateCompletion = true, MaxMessages = 50});
return await ConsumeAsync();
}
}
编辑 我已经删除了所有额外的代码并添加注释。
你可以做非工作代码的最小例子吗?这太难以得到答案。 – VMAtm
@VMAtm请看看更新后的代码 – Que
您不需要调用'OutputAvailableAsync'或'ReceiveAsync',你的块已经链接并且会在你发送消息的时候传播消息。当前代码中没有任何内容实际上将消息发送到管道。你可以直接删除'ConsumeAsync',这里没有任何用处。 – JSteward