5

我有一大堆的数据行的,我想用Parallel.ForEach计算在这样的每一行一定的价值...多个Parallel.ForEach调用,MemoryBarrier?

class DataRow 
{ 
    public double A { get; internal set; } 
    public double B { get; internal set; } 
    public double C { get; internal set; } 

    public DataRow() 
    { 
     A = double.NaN; 
     B = double.NaN; 
     C = double.NaN; 
    } 
} 

class Program 
{ 
    static void ParallelForEachToyExample() 
    { 
     var rnd = new Random(); 
     var df = new List<DataRow>(); 

     for (int i = 0; i < 10000000; i++) 
     { 
      var dr = new DataRow {A = rnd.NextDouble()}; 
      df.Add(dr); 
     } 

     // Ever Needed? (I) 
     //Thread.MemoryBarrier(); 

     // Parallel For Each (II) 
     Parallel.ForEach(df, dr => 
     { 
      dr.B = 2.0*dr.A; 
     }); 

     // Ever Needed? (III) 
     //Thread.MemoryBarrier(); 

     // Parallel For Each 2 (IV) 
     Parallel.ForEach(df, dr => 
     { 
      dr.C = 2.0 * dr.B; 
     }); 
    } 
} 

(在这个例子中,没有必要进行并行化,如果有是的,它可以全部放在一个Parallel.ForEach里面,但是这意味着它是一些代码的简化版本,它可以像这样设置它)。

是否有可能在这里重新排序读取,以便最终得到一个数据行,其中B!= 2A或C!= 2B?

说出第一个Parallel.ForEach(II)指派工作线程42工作在数据行0上。第二个Parallel.ForEach(IV)指派工作线程43工作在数据行0上(只要第一个并行.ForEach完成)。是否有机会读取dr.B在线程43上的第0行返回double.NaN,因为它没有看到来自线程42的写入呢?

如果是这样,插入一个内存障碍在III帮助吗?这是否会迫使第一个Parallel.ForEach的更新在第二个Parallel.ForEach启动之前对所有线程都可见?

+0

总之..我不认为你需要的外显记忆障碍..一个猜测将是并行的,落实。ForEach在结束循环时有某种同步/在调用ForEach之前返回 –

+0

如果您对实际代码有更好的了解,我可能会给您一个更好的答案,而不是“不要担心它。 “ :) – jdphenix

+0

也许如果我说第二个并行循环(IV)的每一行中的计算取决于某些只能在第一个循环(II)完成后才知道的值,那么分离的原因会更清楚一些。假设我们需要所有行的dr.B值的中位数,然后才能计算每行dr.C的值。 –

回答

4

Parallel.ForEach()开始的工作将在返回之前完成。对于每次迭代,在内部,ForEach()会产生Task,并且每个都会调用Wait()。因此,您不需要在ForEach()调用之间同步访问。

需要记住这一点对于单个任务与允许您访问循环状态ForEach()过载,聚集来自任务等。例如结果,其中总结了1 ≤ x ≤ 100这个简单的例子中,Action传递给的Parallel.For()localFinally必须关心同步问题,

var total = 0; 

Parallel.For(0, 101,() => 0, // <-- localInit 
(i, state, localTotal) => { // <-- body 
    localTotal += i; 
    return localTotal; 
}, localTotal => { <-- localFinally 
    Interlocked.Add(ref total, localTotal); // Note the use of an `Interlocked` static method 
}); 

// Work of previous `For()` call is guaranteed to be done here 

Console.WriteLine(total); 

在您的例子,它是没有必要插入ForEach()电话之间的内存屏障。具体来说,循环IV可以取决于II正在完成的结果,而Parallel.ForEach()已经为您插入III

片段来自来源:Parallel Framework and avoiding false sharing

+0

谢谢。当我查看几个级别的Parallel.ForEach代码时,它看起来像“私有静态ParallelLoopResult ForWorker ”正在完成大部分工作。对我来说有点难,但看起来好像有个叫“rootTask.Wait()”的电话。在继续之前等待所有工作线程完成。但即使我的主线程正在等待工作人员完成,但并不能保证工作线程在所有其他处理器之间传播,当他们读取值时,它们一定会看到最近的写入,是吗? –

+0

这是正确的,我会编辑我的答案,也许会更清楚一点。由* same *'ForEach()'调用产生的任务需要知道并发问题 - 通常需要关注的地点在您传递给'localFinally'的'Action'中。然而,不同的'ForEach()'调用可以安全地依赖于之前的'ForEach()'调用的结果。 – jdphenix

+0

我想我的问题是与此有关... http://stackoverflow.com/questions/6581848/memory-barrier-generators。我只想确保Parallel.ForEach的结尾落入其中一个桶中。所以它有自己的MemoryBarrier(有效),并保证在下一个Parallel.ForEach开始之前,所有内容都被完全写入。 –

0

由于多个线程将访问相同的变量“dr.B”,你需要确保你的C#代码是线程安全的。

尝试使用 “锁定” 轮每个操作 https://msdn.microsoft.com/en-us/library/c5kehkcz.aspx

例如

private Object thisLock1 = new Object(); 
... 
lock(thisLock1) 
{ 
    dr.C = 2.0 * dr.B; 
} 

... 
lock(thisLock1) 
{ 
    dr.B = 2.0*dr.A; 
} 

但是,这样做会打败并行处理。因为每个线程都必须等到下一个线程完成。

请务必阅读与并行处理潜在的缺陷: https://msdn.microsoft.com/en-us/library/dd997403%28v=vs.110%29.aspx

+0

在OP的使用'Parallel.ForEach()'的具体例子中,每个'ForEach()'调用已经处理同步,特别是确保调用产生的任何并行操作在返回之前完成。 – jdphenix

+0

@jdphenix - 你能提供一个参考吗(对我的教育)?注意Microsoft MSDN显示:如何:编写具有线程局部变量的Parallel.ForEach循环https://msdn.microsoft.com/zh-cn/library/dd460703%28v=vs.110%29.aspx使用( finalResult)=> Interlocked.Add(ref total,finalResult) –

+0

指出一个单独的'ForEach()'确实需要考虑线程安全性,因此'ForEach()'提供了允许指定线程的重载本地和终结者,因为你已经链接。就'Wait()'内部调用ForEach()而言,我必须查看参考源以确认。 – jdphenix