2017-09-06 142 views
1

我试图将一条消息的副本从ActionBlock<int>发送给多个使用者,这些使用者也是ActionBlock<int>。这很好,但是如果其中一个目标块引发异常,看起来这不会传播到源块。在这里,我怎么尝试处理异常,但它从未进入到catch部分:TPL DataFlow无法处理ActionBlock中的异常

static void Main(string[] args) 
{ 
    var t1 = new ActionBlock<int>(async i => 
    { 
     await Task.Delay(2000); 
     Trace.TraceInformation($"target 1 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}"); 
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 }); 

    var t2 = new ActionBlock<int>(async i => 
    { 
     await Task.Delay(1000); 
     Trace.TraceInformation($"target 2 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}"); 
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 }); 

    var t3 = new ActionBlock<int>(async i => 
    { 
     await Task.Delay(100); 
     Trace.TraceInformation($"target 3 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}"); 
     if (i > 5) 
      throw new Exception("Too big number"); 
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 }); 

    var targets = new [] { t1, t2, t3}; 

    var broadcaster = new ActionBlock<int>(
     async item => 
     { 
      var processingTasks = targets.Select(async t => 
      { 
       try 
       { 
        await t.SendAsync(item); 
       } 
       catch 
       { 
        Trace.TraceInformation("handled in select"); // never goes here 
       } 
      }); 

      try 
      { 
       await Task.WhenAll(processingTasks); 
      } 
      catch 
      { 
       Trace.TraceInformation("handled"); // never goes here 
      } 
     }); 

    for (var i = 1; i <= 10; i++) 
     broadcaster.Post(i); 
} 

我不知道什么,我在这里失踪,但我希望能够以检索异常和目标块已经发生了故障。

+0

你只从'SendAsync''等待'Task',它只表示该项目是否被目标接受。如果任何一个目标抛出异常将被附加到该目标的“完成”任务的异常。为了观察这个异常,你需要“等待”那个任务,即“等待t3.Completion”。 – JSteward

+0

一个简单的解决方案可能是用'if(!await t.SendAsync(item))'替代'await t.SendAsync(item);'等待t.Completion;'这会将异常传播到最内层'try/catch'。然后您可以再次抛出或将信息添加到新的例外中,例如哪个块发生故障。然后你需要处理错误的“广播电台”,但你明白了。 – JSteward

+0

@JSteward谢谢!我用'if(!await t.SendAsync(item))'代替t.Completion;'现在一切正常。发布它作为答案,以便我可以接受它。 –

回答

1

如果一个块进入故障状态,它将不再接受新的项目,并且它投掷的Exception将被附加到它的Completion任务和/或如果在管道中被链接完成传播它。若要观察Exception,您可以await完成,如果该块拒绝更多的项目。

var processingTasks = targets.Select(async t => 
{ 
    try 
    { 
     if(!await t.SendAsync(item)) 
      await t.Completion; 
    } 
    catch 
    { 
     Trace.TraceInformation("handled in select"); // never goes here 
    } 
}); 
+0

虽然有一个问题。在我看来,即使't.SendAsync(item)'发送消息以便t之后的下一个块按顺序接收它们。我虽然DataFlow waranted消息处理的顺序? –

+0

您的每个目标都将按照发送到您的“广播公司”的顺序接收消息。如果最多只有一个目标失败,广播公司将被拒绝并拒绝新消息。使用默认块选项“等待Task.WhenAll”可确保所有目标在处理下一个目标之前已接受单个消息。你能否提供一个不按顺序显示消息的例子或用例? – JSteward

+0

这是我的错。我以'ActionBlock '的身份作为参数传递为'Action '而不是'Func ' –