2

我描述了我的问题在一个简单的例子,然后描述更贴近问题的并行执行。假设我们在box1中有n项[i1,i2,i3,i4,...,in],并且我们有一个可以处理m个项目的box2(m通常远小于n)。每个项目所需的时间是不同的。我希望在所有项目都进行之前,一直都在做m个工作项目。在组的任务

更加紧密的问题是,例如,您有一个包含n个字符串(URL地址)的文件列表1,我们希望系统具有m个文件并发下载(例如,通过httpclient.getAsync()方法) 。的m个项目之一,每当完成下载,从列表1另一剩余项目必须尽快取代,这必须countinued直到所有的列表1项进行。

(n和m的数量通过在运行时用户输入指定的)如何这可以做什么?并行

回答

1

工艺项目,限制并发作业的数量:

string[] strings = GetStrings(); // Items to process. 
const int m = 2; // Max simultaneous jobs. 

Parallel.ForEach(strings, new ParallelOptions {MaxDegreeOfParallelism = m}, s => 
{ 
    DoWork(s); 
}); 
+2

他DoWork的是异步,和Parallel.ForEach不支持异步。 –

+0

此方法不适用于我的问题。因为你不能使用Parallel.ForEach和异步方法。在使用Parallel.ForEach和异步方法的情况下,所有任务都会立即触发(不等待完成异步任务)。我使用的是一个异步方法HttpClient.getAsync。 –

6

你应该看看到TPL DataflowSystem.Threading.Tasks.Dataflow NuGet包添加到您的项目,那么你想要的东西很简单,只要

private static HttpClient _client = new HttpClient(); 
public async Task<List<MyClass>> ProcessDownloads(IEnumerable<string> uris, 
                int concurrentDownloads) 
{ 
    var result = new List<MyClass>(); 

    var downloadData = new TransformBlock<string, string>(async uri => 
    { 
     return await _client.GetStringAsync(uri); //GetStringAsync is a thread safe method. 
    }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads}); 

    var processData = new TransformBlock<string, MyClass>(
      json => JsonConvert.DeserializeObject<MyClass>(json), 
      new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded}); 

    var collectData = new ActionBlock<MyClass>(
      data => result.Add(data)); //When you don't specifiy options dataflow processes items one at a time. 

    //Set up the chain of blocks, have it call `.Complete()` on the next block when the current block finishes processing it's last item. 
    downloadData.LinkTo(processData, new DataflowLinkOptions {PropagateCompletion = true}); 
    processData.LinkTo(collectData, new DataflowLinkOptions {PropagateCompletion = true}); 

    //Load the data in to the first transform block to start off the process. 
    foreach (var uri in uris) 
    { 
     await downloadData.SendAsync(uri).ConfigureAwait(false); 
    } 
    downloadData.Complete(); //Signal you are done adding data. 

    //Wait for the last object to be added to the list. 
    await collectData.Completion.ConfigureAwait(false); 

    return result; 
} 

在上面的代码唯一concurrentDownloads数HttpClients的将被激活,在任何给定时间,无限线程将被处理接收到的字符串和在物体转动它们,和一个单独的线程将采取茨艾伦e对象并将它们添加到列表中。

更新:这是一个简单的例子,只有做你问对问题

private static HttpClient _client = new HttpClient(); 
public void ProcessDownloads(IEnumerable<string> uris, int concurrentDownloads) 
{ 
    var downloadData = new ActionBlock<string>(async uri => 
    { 
     var response = await _client.GetAsync(uri); //GetAsync is a thread safe method. 
     //do something with response here. 
    }, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = concurrentDownloads}); 


    foreach (var uri in uris) 
    { 
     downloadData.Post(uri); 
    } 
    downloadData.Complete(); 

    downloadData.Completion.Wait(); 
} 
+0

谢谢兄弟。我听到并看到很多关于TPL或Reactive Extension的文章,这些文章都是针对我的问题寻找答案的,但它对我来说有些复杂,不知道如何使用它们。是不是有一个更简单的解决方案来做到这一点? :) –

+0

数据流很容易做到,一旦你意识到你只是在管道中设置步骤。我让我的例子过于复杂,所以我可以向您展示TPL DataFlow的所有功能,我已经更新了一个只是您的需求的例子。 –

+0

'HttpClient'被设计为可以重复使用多个请求,甚至是并发的。创建一个实例并使用它,不要每次创建一个新实例 –

6

这里是你可以使用一个通用的方法。

当你调用这个锡将字符串(URL地址)和asyncProcessor将是你的异步方法,它的URL地址作为输入并返回任务。

的SlimSemaphore由该方法中使用是要尽快允许实时并发异步I/O请求,只有n个作为一个完成其他请求将执行。像滑动窗口模式。

public static Task ForEachAsync<TIn>(
      IEnumerable<TIn> inputEnumerable, 
      Func<TIn, Task> asyncProcessor, 
      int? maxDegreeOfParallelism = null) 
     { 
      int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism; 
      SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount); 

      IEnumerable<Task> tasks = inputEnumerable.Select(async input => 
      { 
       await throttler.WaitAsync().ConfigureAwait(false); 
       try 
       { 
        await asyncProcessor(input).ConfigureAwait(false); 
       } 
       finally 
       { 
        throttler.Release(); 
       } 
      }); 

      return Task.WhenAll(tasks); 
     } 
+0

谢谢。看起来很可爱。我必须测试它并报告它的工作。 –

+0

对不起,我有一个问题。它是否立即创建所有任务,并等待每个任务的顺序成为时间线,或者在任何时候和必要时创建任务? –

+0

'Task.WhenAll'在内部为所有任务创建一个列表,所以我认为它会立即创建它们全部 –

2

一个简单的节流解决方案是SemaphoreSlim
编辑
略有改动后的代码现在创建的任务都需要

var client = new HttpClient(); 
SemaphoreSlim semaphore = new SemaphoreSlim(m, m); //set the max here 
var tasks = new List<Task>(); 

foreach(var url in urls) 
{ 
    // moving the wait here throttles the foreach loop 
    await semaphore.WaitAsync(); 
    tasks.Add(((Func<Task>)(async() => 
    { 
     //await semaphore.WaitAsync(); 
     var response = await client.GetAsync(url); // possibly ConfigureAwait(false) here 
     // do something with response 
     semaphore.Release(); 
    }))()); 
} 

await Task.WhenAll(tasks); 

时,他们这是另一种方式来做到这一点

var client = new HttpClient(); 
var tasks = new HashSet<Task>(); 

foreach(var url in urls) 
{ 
    if(tasks.Count == m) 
    { 
     tasks.Remove(await Task.WhenAny(tasks));    
    } 

    tasks.Add(((Func<Task>)(async() => 
    { 
     var response = await client.GetAsync(url); // possibly ConfigureAwait(false) here 
     // do something with response    
    }))()); 
} 

await Task.WhenAll(tasks); 
+0

它似乎在做这项工作,它模拟地下载到移动设备上,但有问题。例如,如果您拥有一百万个网址列表,它会在短时间内创建100万个任务,然后等待每个任务的顺序变为。我错了吗? –

+0

你是对的,它在短时间内创建所有任务。它也可以在同一个线程上执行所有操作,但是可以使用'ConfigureAwait(false)'修改它,或者在线程池上运行它们。我会用更多的信息更新答案 –

+0

我的网址列表可能会很长,可能是几百万,如果所有这些数百万任务被创建,它可能会导致内存不足,其他异常或错误:)我正在寻找一个解决方案每个部分只要需要,内存占用少 –