2013-01-22 32 views
1

使用TPL,如何从多个IO源(“无线”任务)收集结果并将它们合并为序列各自的源不会产生每个源的基于线程的任务来监视它们?从一个线程轮询来源是否安全?从单个基于线程的任务收集来自多个基于IO的任务的数据

while (true) 
{ 
    try 
    { 
     IEnumerable<UdpClient> readyChannels = 
      from channel in channels 
      where channel.Available > 0 
      select channel; 

     foreach(UdpClient channel in readyChannels) 
     { 
      var result = await channel.ReceiveAsync(); 
      //do something with result like post to dataflow block. 
     } 
    } 
    catch (Exception e) 
    { 
     throw (e); 
    } 
    ... 

这样的事情呢?

+0

while循环真的需要在这里?只是好奇而已 –

+0

@CongLong - 我想继续调查和阅读我的udp资源。 – Eric

+0

可以用Task.WhenAny或类似的结构来做这种事情,但我认为更好的匹配是为每个UdpClient暴露一个IObservable或ISourceBlock。 Rx或TPL Dataflow可以更好地匹配这样的“推送”事件。 –

回答

1

我在这里看到几个选项:

如果你想火起来的呼叫ReceiveAsync(),设置它们做一些与结果(例如发送到数据流块,像你说的),然后忘记他们,你可以使用ContinueWith():这个

foreach (var channel in readyChannels) 
{ 
    channel.ReceiveAsync().ContinueWith(task => 
    { 
     var result = task.Result; 
     //do something with result like post to dataflow block. 
    } 
} 

一个缺点是,你需要处理的每个连续的异常。

可能更好的方法是使用Stephen Cleary的AsyncEx的OrderByCompletion()。这样一来,你就可以开始所有读取一次并处理他们,因为他们完成:

var tasks = readyChannels.Select(c => c.ReceiveAsync()).OrderByCompletion(); 

foreach (var task in tasks) 
{ 
    var result = await task; 
    //do something with result like post to dataflow block. 
} 

另一个选择,例如有用的,如果你想限制的并行,是使用TransformBlock

var receiveBlock = new TransformBlock<UdpClient, UdpReceiveResult>(
    c => c.ReceiveAsync(), 
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = degreeOfParallelism }); 
foreach (var channel in readyChannels) 
    receiveBlock.Post(channel); 
receiveBlock.Complete(); 

// set up processing here 

await receiveBlock.Completion; 

如果你想将结果发送到另一个块,然后在评论中提及了上述处理,由简单地连接在一起:

receiveBlock.LinkTo(anotherBlock); 

在上述所有情况中,永远不会有线程阻塞来监视任何事情。但是拨打ReceiveAsync()然后处理结果的代码必须执行某处

+0

有没有简单的方法来删除UdpClients或阻止他们接收? – Jesse