-1

我有一个需要并行处理的很多项目的IEnumerable。这些项目不是CPU密集型的。 理想情况下,这些项目应该同时执行100个或更多的线程。C# - 并行Foreach缓慢创建线程

我试着用Parallel.ForEach()来做到这一点。这有效,但问题是新线程产生得太慢。在Parallel.Foreach()达到100个线程之前需要很长时间。我知道有一个MaxDegreeOfParallelism属性,但这是最大值,而不是最小值。

有没有办法在100个线程上立即执行foreach? ThreadPool.SetMinThreads是我们宁愿避免的,因为它对整个过程有影响。

是否有自定义分区程序可能的解决方案?

+1

嘛'Parallel.ForEach'使用线程池等剧,但它的规则,所以'SetMinThreads'似乎是唯一的选择(如果您想使用'Parallel.ForEach'明确)。 – Evk

+0

您能否提供一个示例来演示您的_Processing_ intails。它是IO绑定/ CPU绑定,它是多步骤的过程..等等? – JSteward

+0

但是,如果您的任务不是CPU密集型的(因此 - 主要是IO密集型) - 您可以使用async \ await与SemaphoreSlim结合来限制并发性,例如http://stackoverflow.com/a/10810730/5311735(但不要像那样使用Task.Run(等待...)。 – Evk

回答

-2

我一直在使用线程池,而不是并行取得了成功:

public static void ThreadForEach<T>(this IEnumerable<T> items, Action<T> action) 
{ 
    var mres = new List<ManualResetEvent>(); 

    foreach (var item in items) 
    { 
     var mre = new ManualResetEvent(false); 

     ThreadPool.QueueUserWorkItem((i) => 
     { 
      action((T)i); 
      mre.Set(); 
     }, item); 

     mres.Add(mre); 
    } 

    mres.ForEach(mre => mre.WaitOne()); 
} 

在我不得不使用此情况下,它跑得比使用Parallel.ForEach尝试更快。我只能推测这是因为它试图使用已经存在的线程(而不是花费开销来创建新线程)。

+0

您可以使用简单的PLINQ查询来完全避免对线程的讨论,并完全针对foreach。但是,如果OP的处理以任何方式限制IO,那么需要转向同步解决方案。 – JSteward

+0

并行也使用线程池。并行不会中断。无论如何,你的代码有很多问题。首先,它使用'List'而不是并发集合。其次,这完全是多余的。 'await Task.WaitAll(items.Select(it => Task.Run(action(it))。ToArray())'会做同样的事情 –

+0

最后,'Parallel'意思是* data * parallelism。你需要处理大量的数据,而不是大量的任务,通常你不需要更多的任务,而不是核心来处理每个数据的一个分区,如果这个动作被阻塞了,那么就错误地使用了“Parallel” –

0

我用5秒的超时时间ping了很多设备。你如何尽可能快地做到这一点,只有4个线程(4个核心)?

我打算假设你的ping设备在局域网上,每一个都可以通过IP地址进行识别和访问。

namespace PingManyDevices { 

    public class DeviceChecker {     

     public async Task<PingReply[]> CheckAllDevices(IEnumerable<IPAddress> devices) { 
      var pings = devices.Select(address => new Ping().SendPingAsync(address, 5000)); 
      return await Task.WhenAll(pings); 
     } 
     /*** 
     * Maybe push it a little further 
     ***/ 
     public async Task<PingReply[]> CheckAllDevices(IEnumerable<IPAddress> devices) { 
      var pings = devices.AsParallel().Select(address => new Ping().SendPingAsync(address, 5000)); 
      return await Task.WhenAll(pings); 
     }   
    } 
}