我试图将一条消息的副本从ActionBlock<int>
发送给多个使用者,这些使用者也是ActionBlock<int>
。这很好,但是如果其中一个目标块引发异常,看起来这不会传播到源块。在这里,我怎么尝试处理异常,但它从未进入到catch
部分:TPL DataFlow无法处理ActionBlock中的异常
static void Main(string[] args)
{
var t1 = new ActionBlock<int>(async i =>
{
await Task.Delay(2000);
Trace.TraceInformation($"target 1 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
var t2 = new ActionBlock<int>(async i =>
{
await Task.Delay(1000);
Trace.TraceInformation($"target 2 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
var t3 = new ActionBlock<int>(async i =>
{
await Task.Delay(100);
Trace.TraceInformation($"target 3 | Thread {System.Threading.Thread.CurrentThread.ManagedThreadId} | message {i}");
if (i > 5)
throw new Exception("Too big number");
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
var targets = new [] { t1, t2, t3};
var broadcaster = new ActionBlock<int>(
async item =>
{
var processingTasks = targets.Select(async t =>
{
try
{
await t.SendAsync(item);
}
catch
{
Trace.TraceInformation("handled in select"); // never goes here
}
});
try
{
await Task.WhenAll(processingTasks);
}
catch
{
Trace.TraceInformation("handled"); // never goes here
}
});
for (var i = 1; i <= 10; i++)
broadcaster.Post(i);
}
我不知道什么,我在这里失踪,但我希望能够以检索异常和目标块已经发生了故障。
你只从'SendAsync''等待'Task',它只表示该项目是否被目标接受。如果任何一个目标抛出异常将被附加到该目标的“完成”任务的异常。为了观察这个异常,你需要“等待”那个任务,即“等待t3.Completion”。 – JSteward
一个简单的解决方案可能是用'if(!await t.SendAsync(item))'替代'await t.SendAsync(item);'等待t.Completion;'这会将异常传播到最内层'try/catch'。然后您可以再次抛出或将信息添加到新的例外中,例如哪个块发生故障。然后你需要处理错误的“广播电台”,但你明白了。 – JSteward
@JSteward谢谢!我用'if(!await t.SendAsync(item))'代替t.Completion;'现在一切正常。发布它作为答案,以便我可以接受它。 –