0

我想要实现使用持续运行类似问题here和代码here的BufferBlock消费者/生产模式。TPL BufferBlock食用方法不会被调用

我试图使用类似于OP的ActionBlock,但是如果bufferblock已满并且新消息位于其队列中,则新消息永远不会添加到ConcurrentDictionary _queue中。

在ConsumeAsync方法时,一个新的消息添加到与该呼叫的bufferblock不会被调用下面的代码:_messageBufferBlock.SendAsync(message)

我怎样才能纠正下面的代码,以便ConsumeAsync方法被调用每一个新的时间消息是使用_messageBufferBlock.SendAsync(message)添加的?

public class PriorityMessageQueue 
    { 
     private volatile ConcurrentDictionary<int,MyMessage> _queue = new ConcurrentDictionary<int,MyMessage>(); 
     private volatile BufferBlock<MyMessage> _messageBufferBlock; 
     private readonly Task<bool> _initializingTask; // not used but allows for calling async method from constructor 
     private int _dictionaryKey; 

     public PriorityMessageQueue() 
     { 
      _initializingTask = Init(); 
     } 

     public async Task<bool> EnqueueAsync(MyMessage message) 
     { 
      return await _messageBufferBlock.SendAsync(message); 
     } 

     private async Task<bool> ConsumeAsync() 
     { 
      try 
      { 
       // This code does not fire when a new message is added to the buffereblock 
       while (await _messageBufferBlock.OutputAvailableAsync()) 
       { 
        // A message object is never received from the bufferblock 
        var message = await _messageBufferBlock.ReceiveAsync(); 


       } 

       return true; 
      } 
      catch (Exception ex) 
      { 
       return false; 
      } 
     } 

     private async Task<bool> Init() 
     { 
      var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions 
      { 
       MaxDegreeOfParallelism = Environment.ProcessorCount, 
       BoundedCapacity = 50 
      }; 

      var prioritizeMessageBlock = new ActionBlock<MyMessage>(msg => 
      { 
       SetMessagePriority(msg); 
      }, executionDataflowBlockOptions); 

      _messageBufferBlock = new BufferBlock<MyMessage>(); 
      _messageBufferBlock.LinkTo(prioritizeMessageBlock, new DataflowLinkOptions { PropagateCompletion = true, MaxMessages = 50}); 

      return await ConsumeAsync(); 
     } 
    } 

编辑 我已经删除了所有额外的代码并添加注释。

+0

你可以做非工作代码的最小例子吗?这太难以得到答案。 – VMAtm

+0

@VMAtm请看看更新后的代码 – Que

+0

您不需要调用'OutputAvailableAsync'或'ReceiveAsync',你的块已经链接并且会在你发送消息的时候传播消息。当前代码中没有任何内容实际上将消息发送到管道。你可以直接删除'ConsumeAsync',这里没有任何用处。 – JSteward

回答

2

我还没有完全确定你想要完成什么,但我会尽力指出你朝着正确的方向。该示例中的大部分代码并非严格必要。

我需要知道当有新邮件到达时

如果这是你唯一的要求,然后我会假设你只需要运行每当一个新的消息在通过一些任意代码。最简单的在数据流中这样做的方法是使用TransformBlock并将该块设置为管道中的初始接收器。每个块都有它自己的缓冲区,所以除非你需要另一个缓冲区,否则你可以不用。

public class PriorityMessageQueue {   
    private TransformBlock<MyMessage, MyMessage> _messageReciever; 

    public PriorityMessageQueue() { 
     var executionDataflowBlockOptions = new ExecutionDataflowBlockOptions { 
      MaxDegreeOfParallelism = Environment.ProcessorCount, 
      BoundedCapacity = 50 
     }; 

     var prioritizeMessageBlock = new ActionBlock<MyMessage>(msg => { 
      SetMessagePriority(msg); 
     }, executionDataflowBlockOptions); 

     _messageReciever = new TransformBlock<MyMessage, MyMessage>(msg => NewMessageRecieved(msg), executionDataflowBlockOptions); 
     _messageReciever.LinkTo(prioritizeMessageBlock, new DataflowLinkOptions { PropagateCompletion = true }); 
    } 

    public async Task<bool> EnqueueAsync(MyMessage message) { 
     return await _messageReciever.SendAsync(message); 
    } 

    private MyMessage NewMessageRecieved(MyMessage message) { 
     //do something when a new message arrives 

     //pass the message along in the pipeline 
     return message; 
    } 

    private void SetMessagePriority(MyMessage message) { 
     //Handle a message 
    } 
} 

当然,你必须将尽一切可能是另一种选择,你需要立即到内EnqueAsyncSendAsync返回任务之前,但TransformBlock为您提供额外的灵活性。

+0

感谢您的帮助。最初,我提出了很多代码来问我的问题。然后有人评论说,它仍然是太多的代码,我删除了更多的代码,所以最终我猜这就是很难理解的原因。你让我走上了正轨,我很欣赏它 – Que