2013-02-15 55 views
10

我有一个异步谓词方法是这样的:如何使用“Where”与异步谓词?

​​

说我有Uri个集合:

var addresses = new[] 
{ 
    new Uri("http://www.google.com/"), 
    new Uri("http://www.stackoverflow.com/") //etc. 
}; 

我想用MeetsCriteria过滤addresses。我想要异步执行此操作;我想要对谓词进行多次调用以异步运行,然后我想等待所有这些调用完成并生成过滤结果集。不幸的是,LINQ似乎并不支持异步谓词,所以像这样工作:

var filteredAddresses = addresses.Where(MeetsCriteria); 

是否有同样方便的方式来做到这一点?

+2

如果支持这项功能,您会发生什么?特别是当迭代'filteredAddresses'时,实际调用'MeetsCriteria'。 – 2013-02-15 07:50:04

+0

@DanielHilgarth:谢谢;那是个很好的观点。这似乎并不适合LINQ。 – Sam 2013-02-17 22:27:53

回答

6

我认为原因没有这样一个框架中的是,有很多可能的变化,每个选择会在某些情况下是正确的:

  • 应该谓词并行执行,或串联?
    • 如果它们并行执行,它们是否应该全部执行,还是应该限制并行度?
    • 如果它们并行执行,结果应该与原始集合的顺序相同,按完成顺序还是未定义顺序?
      • 如果他们应该按照完成顺序返回,是否应该有某种方式(异步)在完成时获取结果? (这将需要返回类型从Task<IEnumerable<T>>到别的变化。)

你说你想要的谓词并行执行。在这种情况下,最简单的选择是在一次执行所有这些,在完成的顺序返回:

static async Task<IEnumerable<T>> Where<T>(
    this IEnumerable<T> source, Func<T, Task<bool>> predicate) 
{ 
    var results = new ConcurrentQueue<T>(); 
    var tasks = source.Select(
     async x => 
     { 
      if (await predicate(x)) 
       results.Enqueue(x); 
     }); 
    await Task.WhenAll(tasks); 
    return results; 
} 

然后,您可以使用这样的:

var filteredAddresses = await addresses.Where(MeetsCriteria); 
+1

我会使用不同的方法名称,所以不同的语义(特别是重新排序)变得清晰。 – CodesInChaos 2013-02-15 13:02:49

+0

@CodesInChaos可能,但是我不确定什么是好名字。 'AsyncParallelWhereOrderedByCompletion()'会描述这个方法的作用,但这是一个糟糕的名字。 – svick 2013-02-15 13:08:30

+0

也许像'ConcurrentlyFilterAsync'这样的名字是合适的。 – Sam 2013-02-17 22:30:11

5

第一种方法:问题的所有先后请求预先请求,然后等待所有请求返回,然后过滤结果。 (svick的代码也是这样做的,但在这里我没有使用ConcurrentQueue)。

// First approach: massive fan-out 
var tasks = addresses.Select(async a => new { A = a, C = await MeetsCriteriaAsync(a) }); 
var addressesAndCriteria = await Task.WhenAll(tasks); 
var filteredAddresses = addressAndCriteria.Where(ac => ac.C).Select(ac => ac.A); 

第二种方法:一个接一个地执行请求。这将需要更长的时间,但它会确保不与请求的巨大冲击锤的web服务(假设MeetsCriteriaAsync出去一个web服务...)

// Second approach: one by one 
var filteredAddresses = new List<Uri>(); 
foreach (var a in filteredAddresses) 
{ 
    if (await MeetsCriteriaAsync(a)) filteredAddresses.Add(a); 
} 

第三种方法:对于第二,但使用一个假想的C#8特性“异步流”。 C#8还没有出来,异步流还没有设计,但我们可以做梦! IAsyncEnumerable类型已经存在于RX中,并且希望它们会为它添加更多的组合器。关于IAsyncEnumerable的好处在于,我们可以在开始使用前几个filteredAddresses时立即开始消费,而不是等待所有要先过滤的东西。

// Third approach: ??? 
IEnumerable<Uri> addresses = {...}; 
IAsyncEnumerable<Uri> filteredAddresses = addresses.WhereAsync(MeetsCriteriaAsync); 

第四种方法:也许我们不想一下子锤所有请求的web服务,但我们很乐意向在同一时间超过一个请求。也许我们做了实验,发现“一次三个”是一个快乐的媒介。注意:此代码假设单线程执行上下文,如在UI编程或ASP.NET中。如果它在多线程执行上下文中运行,那么它需要一个ConcurrentQueue和ConcurrentList。

// Fourth approach: throttle to three-at-a-time requests 
var addresses = new Queue<Uri>(...); 
var filteredAddresses = new List<Uri>(); 
var worker1 = FilterAsync(addresses, filteredAddresses); 
var worker2 = FilterAsync(addresses, filteredAddresses); 
var worker3 = FilterAsync(addresses, filteredAddresses); 
await Task.WhenAll(worker1, worker2, worker3); 

async Task FilterAsync(Queue<Uri> q, List<Uri> r) 
{ 
    while (q.Count > 0) 
    { 
    var item = q.Dequeue(); 
    if (await MeetsCriteriaAsync(item)) r.Add(item); 
    } 
} 

对于使用TPL数据流库的第四种方法也有一些办法。