唯一的解决方案是copeperate - 处理方必须告知您该项目已处理。的实现是非常简单 - 一个方法是这样的:
public struct SignalizableItem<T>
{
private readonly T _value;
private readonly TaskCompletionSource<object> _signaller;
public SignalizableItem(T value, TaskCompletionSource<object> signaller)
{
_value = value;
_signaller = signaller;
}
public void Process(Action<T> action)
{
try
{
action(_value);
_signaller.SetResult(default(object));
}
catch (Exception ex)
{
_signaller.SetException(ex);
}
}
}
public static class BlockingCollectionExtensions
{
public static Task QueueAndWaitAsync<T>
(this BlockingCollection<SignalizableItem<T>> @this, T value)
{
var tcs = new TaskCompletionSource<object>();
@this.Add(new SignalizableItem<T>(value, tcs));
return tcs.Task;
}
}
用法很简单 - 在制片方,你根本就
await collection.QueueAndWaitAsync(value);
在消费者方面,你会解开价值和信号准备时:
var item = collection.Take();
item.Process
(
data =>
{
// Your processing
...
}
);
,当然还有,收集将BlockingCollection<SignalizableItem<YourType>>
,而不是BlockingCollection<YourType>
。
你可以进一步通过添加另一个扩展方法简化处理:
public static void Process<T>
(this BlockingCollection<SignalizableItem<T>> @this, Action<T> action)
{
@this.Take().Process(action);
}
它也可能是实现消除(简单CancellationToken
应该能正常运行)或关机的另一种形式是个好主意。
东西实际可用可与
public static void ProcessAll<T>
(this BlockingCollection<SignalizableItem<T>> @this, Action<T> action,
CancellationToken cancellationToken)
{
SignalizableItem<T> val;
while (@this.TryTake(out val, -1, cancellationToken)) val.Process(action);
}
抽象掉整个处理机制,并暴露只是简单的动作代表结束。
只是出于好奇:你为什么要等待?如果你需要同步处理一个项目,为什么你使用'BlockingCollection'?您可以在呼叫站点处理该项目,而不是将其添加到集合中。也许我在这里错过了一些东西。 – xxbbcc
我怀疑这是[将基于回调的异步方法转换为等待任务]的重复(http://stackoverflow.com/questions/11879967/best-way-to-convert-callback-based-async-method-to-awaitable -任务)。 –
@AlexeiLevenkov:这个'TryAdd'回调是基于什么? – olydis