2016-04-08 47 views
1

在下面的简化代码中,我产生了200个任务。每项任务都需要经过一个由锁保护的关键区域。内部锁是一个.AsParallel()语句。当我运行程序时,没有任何反应。该程序无限期挂起,没有任何打印。为什么.AsParallel()在任务内运行时挂起?

private static object lockObject = new object(); 

static void Main(string[] args) 
{ 
    RunTasks(); 
} 

private static void RunTasks() 
{ 
    List<Task> tasks = new List<Task>(); 
    for (int i = 0; i < 200; i++) 
    { 
     tasks.Add(Task.Factory.StartNew(PerformComputations)); 
    } 

    Task.WaitAll(tasks.ToArray()); 
} 

private static void PerformComputations() 
{ 
    // Computations 

    lock (lockObject) 
    { 
     // The actual operations performed here are irrelevant. The key is that they use .AsParallel() 
     foreach (int i in Enumerable.Range(0, 500).AsParallel().Select(i => i)) 
     { 
      Console.WriteLine(i); 
     } 
    } 

    // Additional computations 
} 

然而,一切都运行正常(虽然缓慢)如果RunTasks是像这样实现的:

Parallel.For(0, 200, i => 
{ 
    PerformComputations(); 
}); 

一切都同样适用,如果我从PerformComputations删除.AsParallel()语句。

问题:

  1. 为什么原来的代码锁住?
    • 我最好的猜测是RunTasks产生200个任务,这比我的机器上的物理内核数量多。 PerformComputations中的锁定语句确保除一个任务外的所有其他任务都被阻止。当未被阻止的线程运行并行查询时,它将排队另一个任务。但是,活动任务的最大数量已经处于活动状态,因此新任务永远处于队列中空闲状态。
    • 这是准确的吗?任何人都可以指向我的文档来证实这一点或更详细地解释吗?
  2. 为什么修改后的RunTasks工作?
    • 是不是只有Parallel.For队列少于最大活动任务数?
  3. 有没有办法以这样的方式编写PerformComputations,它将使用原始的RunTasks方法,但仍然并行运行?
+0

部分猜想,500转储到控制台中看到排队真的很快,所以,做一个for循环,有效地在几毫秒即整个performcomputations做,它只是需要更长的时间控制台滚动..而因为你在修改过的东西中开始执行任务似乎更加随机,因为有些任务实际上是同时开始的,而不是一个接一个地开始,当他们有可能在下一次开始之前有时间完成时才完成? – BugFinder

+0

在原始代码中,没有任何线程获取到Console.WriteLine语句。如果我在该行上放置一个断点,它永远不会被触发,应用程序只会继续无限期地运行而不会占用CPU。修改后的代码并非如此。 – Kvothe

+1

你应该完全避免在任务中阻塞代码,而不是使用Task.WaitAll你应该使用Task。WhenAll'或新的async/await语法来指定完成所有这些子任务时应该发生的情况。 –

回答

0

对问题#1和#2的回答绝对正确。

要回答#3:您可以在创建任务时指定TaskCreationOptions.LongRunning。根据https://msdn.microsoft.com/en-us/library/system.threading.tasks.taskcreationoptions(v=vs.110).aspx的文档,这将向任务调度程序提供提示,该任务可能需要额外的线程,以便它不会阻止本地线程池队列上的其他线程或工作项的转发进度。

实际上,这会使任务系统忽略ThreadPool,并为您的任务提供一个新的专用线程。

+0

锁定强制所有版本同步工作。改变选项不会改变这一点。 –

+0

不,但它将允许来自线程的AsParallel调用,该线程具有锁定,可以在线程池上运行工作,因为它没有充满来自任务工厂的请求。 –

+0

PLINQ和Parallel都使用ThreadPool。两者都产生任务。这两个任务都将从核心数量开始,对数据进行分区并将其提供给它们 - 无需生成*更多*任务,而不是执行任务。这里假定的差异是由于Task.Start一次创建200个任务而导致的,因此必须执行顺序。虽然并行中断了批量2000次迭代并将它们分配给任务,但仍然需要按顺序执行。 –

0

Parallel.For和.ForEach方法以及System.Collection.Concurrent命名空间确实让您的生活更轻松地处理这类问题。调度处理,根据处理优先级,系统工作负载,核心数量,等你的线程管理...平行轻松:

static void Main(string[] args) 
    { 
     RunTasks(); 
    } 

    // This sets up the parallel scheduler to use UP TO 16 simultaneous threads. In reality the thread 
    // workload is managed by the CLR according to how many logical threads you have available on your 
    // processor. 
    private static readonly ParallelOptions _po = new ParallelOptions() { MaxDegreeOfParallelism = 16 }; 

    private static void RunTasks() 
    { 
     // Run 200 instances of PerformComputations in parallel. 
     Parallel.For(0, 200, _po, i => PerformComputations()); 
    } 

    private static void PerformComputations() 
    { 
     // If you want to run the 500 iterations in parallel (sequence is not important), 
     // use a concurrent collection. This needs absolutely no lock, the collection is 
     // partitioned internally to avoid having to lock. Same goes if you need to share 
     // data between multiple runs of PerformComputations(), declare a static bag at 
     // class level. 
     var theBag = new ConcurrentBag<int>(Enumerable.Range(0, 500)); 
     Parallel.ForEach(theBag, _po, i => 
     { 
      Console.WriteLine(i.ToString()); 
     }); 

     // Otherwise you don't need a lock at all anyway since each element here is treated 
     // one at a time in sequence. 
     var theList = Enumerable.Range(0, 500).ToList(); 
     foreach (var i in theList) 
     { 
      Console.WriteLine(i.ToString()); 
     } 
    } 
0

是的,你是正确的 - 原代码的并行部分锁死PerformComputations

LongRunning强制创建一个全新的非线程池线程(通知调度程序为该任务创建一个新线程)。注意:您可能会创建许多线程,导致内存开销和交换开销等问题。

private static void RunTasks() 
{ 
    List<Task> tasks = new List<Task>(); 
    for (int i = 0; i < maxLoops; i++) 
    { 
     tasks.Add(Task.Factory.StartNew(PerformComputations, TaskCreationOptions.LongRunning)); 
    } 

    Task.WaitAll(tasks.ToArray()); 
} 

有趣的阅读:Parallelism in .NET

要回答问题3:如果你不介意VS相结合的结果(AsParallel().Select)创建多个线程(使用Parallel.For)。

private static void PerformComputations() 
{ 
    lock (lockObject) 
    { 
     Parallel.For(0, 500, i => 
     { 
      Console.WriteLine(i); 
     }); 
    } 
} 
+0

PerformComputations中的锁定将强制所有方法按顺序执行。唯一的区别是每个人如何造成阻塞。 –

0

首先,我不明白你为什么试图使用AsParallel()。如果你有200个主要独立的Task s,那应该足够充分利用你的CPU。这是特别令人困惑的,因为AsParallel()并行执行的唯一操作是无用的Select()

现在,实际上回答您的问题:

我最好的猜测是,RunTasks产卵200个任务,这比我的机器上的物理内核的数量更多。

核心的数目是不是相关的。可用的线程数量更重要。 TPL使用ThreadPool,它对每秒创建的线程数量也有限制,这也是线程总数的硬限制。如果达到第一个限制,您的代码可能会慢下来(并且出现不会执行任何操作)。如果达到第二个限制,您的代码实际上会死锁并停止工作。

第一个限制是不可配置或有据可查,the second limit is

在任何情况下,无论是深远的这些限制表明,当涉及到并行代码设计很糟糕。

为什么DOE RunTasks的修改版本的工作?这是否仅仅是Parallel.For队列少于活动任务的最大数量?

是的,Parallel.For使用较小数量的Task s,因为这样更有效。

有没有办法以这样的方式编写PerformComputations,它可以使用原始的RunTasks方法,但仍能并行运行?

我不明白你为什么会想这样做。就像我之前说过的,我认为并行运行Select()是没有意义的。我的