你应该看看到TPL Dataflow的System.Threading.Tasks.Dataflow NuGet包添加到您的项目,那么你想要的东西很简单,只要
private static HttpClient _client = new HttpClient();
public async Task<List<MyClass>> ProcessDownloads(IEnumerable<string> uris,
int concurrentDownloads)
{
var result = new List<MyClass>();
var downloadData = new TransformBlock<string, string>(async uri =>
{
return await _client.GetStringAsync(uri); //GetStringAsync is a thread safe method.
}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads});
var processData = new TransformBlock<string, MyClass>(
json => JsonConvert.DeserializeObject<MyClass>(json),
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded});
var collectData = new ActionBlock<MyClass>(
data => result.Add(data)); //When you don't specifiy options dataflow processes items one at a time.
//Set up the chain of blocks, have it call `.Complete()` on the next block when the current block finishes processing it's last item.
downloadData.LinkTo(processData, new DataflowLinkOptions {PropagateCompletion = true});
processData.LinkTo(collectData, new DataflowLinkOptions {PropagateCompletion = true});
//Load the data in to the first transform block to start off the process.
foreach (var uri in uris)
{
await downloadData.SendAsync(uri).ConfigureAwait(false);
}
downloadData.Complete(); //Signal you are done adding data.
//Wait for the last object to be added to the list.
await collectData.Completion.ConfigureAwait(false);
return result;
}
在上面的代码唯一concurrentDownloads
数HttpClients的将被激活,在任何给定时间,无限线程将被处理接收到的字符串和在物体转动它们,和一个单独的线程将采取茨艾伦e对象并将它们添加到列表中。
更新:这是一个简单的例子,只有做你问对问题
private static HttpClient _client = new HttpClient();
public void ProcessDownloads(IEnumerable<string> uris, int concurrentDownloads)
{
var downloadData = new ActionBlock<string>(async uri =>
{
var response = await _client.GetAsync(uri); //GetAsync is a thread safe method.
//do something with response here.
}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads});
foreach (var uri in uris)
{
downloadData.Post(uri);
}
downloadData.Complete();
downloadData.Completion.Wait();
}
他DoWork的是异步,和Parallel.ForEach不支持异步。 –
此方法不适用于我的问题。因为你不能使用Parallel.ForEach和异步方法。在使用Parallel.ForEach和异步方法的情况下,所有任务都会立即触发(不等待完成异步任务)。我使用的是一个异步方法HttpClient.getAsync。 –