2016-07-15 85 views
1

我正在从一个服务器端控制台应用程序接收来自多个WCF服务的数据,做了一些工作,然后通过使用SignalR的单个连接转发结果到IIS服务器。等待消费者使用BlockingCollection作为队列的异步方法

我试图用生产者消费者模式实现这一点,其中WCF服务是生产者,使用SignalR发送数据的类是消费者。对于队列我使用BlockingCollection

但是,当使用await/async发送消费者while循环中的数据时,直到所有其他线程完成将数据添加到队列为止。

出于测试目的,我用Task.Delay(1000).Wait();await Task.Delay(1000);替代了实际发送数据的代码,两者都被卡住。 一个简单的Thread.Sleep(1000);似乎工作得很好,导致我认为异步代码是问题。

所以我的问题是:是否有东西阻止在while循环中完成异步代码?我错过了什么?

我开始消费者线程这样的:

new Thread(Worker).Start(); 

和消费者代码:

private void Worker() 
{ 
    while (!_queue.IsCompleted) 
    { 
     IMobileMessage msg = null; 
     try 
     { 
      msg = _queue.Take(); 
     } 
     catch (InvalidOperationException) 
     { 
     } 

     if (msg != null) 
     { 
      try 
      { 
       Trace.TraceInformation("Sending: {0}", msg.Name); 
       Thread.Sleep(1000); // <-- works 
       //Task.Delay(1000).Wait(); // <-- doesn't work 
       msg.SentTime = DateTime.UtcNow; 
       Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime); 
      } 
      catch (Exception e) 
      { 
       TraceException(e); 
      } 
     } 
    } 
} 
+2

阻止和异步不是朋友。如果你将'async'与'BlockingCollection '混合在一起,你应该强烈考虑抛开'BlockingCollection '并看看TPL Dataflow。 'BufferBlock '是一个很好的起点,大约相当于'BlockingCollection ',但数据流还有更多可供生产者/消费者使用的场景。花点时间来了解它。这很值得。 – spender

+0

不错,非常感谢,我会研究它。 –

回答

2

至于花钱正确地指出,BlockingCollection(顾名思义)仅用于与阻止代码一起使用,并且对于异步代码而言效果不佳。

有异步兼容的生产者/消费者队列,如BufferBlock<T>。在这种情况下,我想ActionBlock<T>甚至会更好:

private ActionBlock<IMobileMsg> _block = new ActionBlock<IMobileMsg>(async msg => 
{ 
    try 
    { 
    Trace.TraceInformation("Sending: {0}", msg.Name); 
    await Task.Delay(1000); 
    msg.SentTime = DateTime.UtcNow; 
    Trace.TraceInformation("X sent at {1}: {0}", msg.Name, msg.SentTime); 
    } 
    catch (Exception e) 
    { 
    TraceException(e); 
    } 
}); 

这取代你的整个消费线和主循环。

+0

我添加了您建议替换我的消费者和队列的代码,并且还可以在您的博客上阅读关于TPL数据流的优秀帖子,它看起来像是有效,但我无法确认,因为现在我的制作人计时器每次在2次滴答后停止。但我想这是新鲜事。我会尽快接受你的回答,我可以确认它的运作!谢谢! –

+0

出于某种原因,我似乎无法得到它的工作......我的生产者检查是否有每秒从WCF服务的新消息(使用'Task.Delay(1000);'在循环中并将它们张贴到ActionBlock(如果有的话)根据我在动作块的延迟和生产者延迟中使用的次数,一切正常,完全停止,或者只发送消息到块缓冲区,但不再处理它们。在生产者中使用async while循环的动作块? –

+0

@ J.Neijt:不,异步生产者工作正常。如果你的'TraceException'抛出一个异常,那将会阻止这个块工作。 –