2015-09-01 38 views
1

我正在使用BlockingCollection创建一个队列,其中多个线程可以将项目添加到单独的线程处理的项目中。添加的项目还包含一个回调函数(委托),该项目在处理项目后调用。要添加项目,我使用TryAdd方法。等待添加到BlockingCollection

这一切都工作正常,但知道我想知道是否有任何方式来运行TryAdd后处理await。 我想要的是,添加的线程等待处理完成,然后继续。

我希望我能很好地描述我的问题,这样任何人都可以给我一个提示。

我的目标框架是Mono/.Net4.5。

+6

只是出于好奇:你为什么要等待?如果你需要同步处理一个项目,为什么你使用'BlockingCollection'?您可以在呼叫站点处理该项目,而不是将其添加到集合中。也许我在这里错过了一些东西。 – xxbbcc

+0

我怀疑这是[将基于回调的异步方法转换为等待任务]的重复(http://stackoverflow.com/questions/11879967/best-way-to-convert-callback-based-async-method-to-awaitable -任务)。 –

+0

@AlexeiLevenkov:这个'TryAdd'回调是基于什么? – olydis

回答

1

唯一的解决方案是copeperate - 处理方必须告知您该项目已处理。的实现是非常简单 - 一个方法是这样的:

public struct SignalizableItem<T> 
{ 
    private readonly T _value; 
    private readonly TaskCompletionSource<object> _signaller; 

    public SignalizableItem(T value, TaskCompletionSource<object> signaller) 
    { 
    _value = value; 
    _signaller = signaller; 
    } 

    public void Process(Action<T> action) 
    { 
    try 
    { 
     action(_value); 
     _signaller.SetResult(default(object)); 
    } 
    catch (Exception ex) 
    { 
     _signaller.SetException(ex); 
    } 
    } 
} 

public static class BlockingCollectionExtensions 
{ 
    public static Task QueueAndWaitAsync<T> 
    (this BlockingCollection<SignalizableItem<T>> @this, T value) 
    { 
    var tcs = new TaskCompletionSource<object>(); 
    @this.Add(new SignalizableItem<T>(value, tcs)); 
    return tcs.Task; 
    } 
} 

用法很简单 - 在制片方,你根本就

await collection.QueueAndWaitAsync(value); 

在消费者方面,你会解开价值和信号准备时:

var item = collection.Take(); 

item.Process 
(
    data => 
    { 
    // Your processing 
    ... 
    } 
); 

,当然还有,收集将BlockingCollection<SignalizableItem<YourType>>,而不是BlockingCollection<YourType>

你可以进一步通过添加另一个扩展方法简化处理:

public static void Process<T> 
    (this BlockingCollection<SignalizableItem<T>> @this, Action<T> action) 
{ 
    @this.Take().Process(action); 
} 

它也可能是实现消除(简单CancellationToken应该能正常运行)或关机的另一种形式是个好主意。

东西实际可用可与

public static void ProcessAll<T> 
    (this BlockingCollection<SignalizableItem<T>> @this, Action<T> action, 
    CancellationToken cancellationToken) 
{ 
    SignalizableItem<T> val; 
    while (@this.TryTake(out val, -1, cancellationToken)) val.Process(action); 
} 

抽象掉整个处理机制,并暴露只是简单的动作代表结束。

+0

我在哪个命名空间中找到'Unit'类型? – Sebastian

+1

@塞巴斯蒂安哦,我的坏 - 我通常在不想通过某种类型时使用'Unit',但我试图将它从样本中移除;只需用'object'替换它即可。 – Luaan