2014-01-16 139 views
5

我需要从ushort arrayB中相应长度的索引值中快速减去ushort arrayA中的每个值。如何从C#中快速减去另一个ushort数组?

另外,如果差值是负值,我需要存储一个零值,而不是负值差值。

(长度= 327680是准确的,因为我从另一幅相同尺寸的图像中减去640x512图像)。

下面的代码目前正在使用〜20ms,如果可能的话,我想在〜5ms以内。不安全的代码是可以的,但请提供一个例子,因为我在编写不安全的代码方面没有超强的技巧。

谢谢!

public ushort[] Buffer { get; set; } 

public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer) 
{ 
    System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch(); 
    sw.Start(); 

    int bufferLength = Buffer.Length; 

    for (int index = 0; index < bufferLength; index++) 
    { 
     int difference = Buffer[index] - backgroundBuffer[index]; 

     if (difference >= 0) 
      Buffer[index] = (ushort)difference; 
     else 
      Buffer[index] = 0; 
    } 

    Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2")); 
} 

UPDATE:虽然它不是严格的C#,别人谁读这个的好处,我终于结束了加入了C++ CLR类库我用下面的代码的解决方案。它运行在〜3.1ms。如果使用非托管C++库,它运行在〜2.2ms。由于时差很小,我决定去托管的图书馆。

// SpeedCode.h 
#pragma once 
using namespace System; 

namespace SpeedCode 
{ 
    public ref class SpeedClass 
    { 
     public: 
      static void SpeedSubtractBackgroundFromBuffer(array<UInt16>^buffer, array<UInt16>^backgroundBuffer, int bufferLength); 
    }; 
} 

// SpeedCode.cpp 
// This is the main DLL file. 
#include "stdafx.h" 
#include "SpeedCode.h" 

namespace SpeedCode 
{ 
    void SpeedClass::SpeedSubtractBackgroundFromBuffer(array<UInt16>^buffer, array<UInt16>^backgroundBuffer, int bufferLength) 
    { 
     for (int index = 0; index < bufferLength; index++) 
     { 
      buffer[index] = (UInt16)((buffer[index] - backgroundBuffer[index]) * (buffer[index] > backgroundBuffer[index])); 
     } 
    } 
} 

然后我把它称为是这样的:

public void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer) 
    { 
     System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch(); 
     sw.Start(); 

     SpeedCode.SpeedClass.SpeedSubtractBackgroundFromBuffer(Buffer, backgroundBuffer, Buffer.Length); 

     Debug.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2")); 
    } 
+0

〜20ms的声音很慢(也许你的机器是低规格?)。 *以防万一*,您是否正在运行发布版本而不进行调试? – Ergwun

+0

p /调用并使用PSUBW? – Yaur

+0

出于兴趣,但你在灰度图像上操作? –

回答

4

一些基准测试。

  1. SubtractBackgroundFromBuffer:这是从问题的原始方法。
  2. SubtractBackgroundFromBufferWithCalcOpt:这是TTat提高计算速度的原创方法。
  3. SubtractBackgroundFromBufferParallelFor:来自Selman22的答案的解决方案。
  4. SubtractBackgroundFromBufferBlockParallelFor:我的回答。与3.类似,但将处理分成4096个值。
  5. SubtractBackgroundFromBufferPartitionedParallelForEach:杰夫的第一个答案。
  6. SubtractBackgroundFromBufferPartitionedParallelForEachHack:杰夫的第二个答案。

更新

有趣的是,我可以通过使用(由布鲁诺科斯塔所建议的)

Buffer[i] = (ushort)Math.Max(difference, 0); 

代替

得到一个小的速度增加(〜6%)为 SubtractBackgroundFromBufferBlockParallelFor
if (difference >= 0) 
    Buffer[i] = (ushort)difference; 
else 
    Buffer[i] = 0; 

结果

请注意,这是每次运行1000次迭代的总时间。

SubtractBackgroundFromBuffer(ms):         2,062.23 
SubtractBackgroundFromBufferWithCalcOpt(ms):      2,245.42 
SubtractBackgroundFromBufferParallelFor(ms):      4,021.58 
SubtractBackgroundFromBufferBlockParallelFor(ms):     769.74 
SubtractBackgroundFromBufferPartitionedParallelForEach(ms):   827.48 
SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms):  539.60 

所以从最好的方法结合了计算优化的一个小的收获这些结果似乎和利用的Parallel.For对图像的块进行操作。您的里程当然会有所不同,并行代码的性能对您正在运行的CPU非常敏感。

测试工具

我跑这在发行模式中的每个方法。我以这种方式开始并停止Stopwatch以确保只测量处理时间。

System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch(); 
ushort[] bgImg = GenerateRandomBuffer(327680, 818687447); 

for (int i = 0; i < 1000; i++) 
{ 
    Buffer = GenerateRandomBuffer(327680, 128011992);     

    sw.Start(); 
    SubtractBackgroundFromBuffer(bgImg); 
    sw.Stop(); 
} 

Console.WriteLine("SubtractBackgroundFromBuffer(ms): " + sw.Elapsed.TotalMilliseconds.ToString("N2")); 


public static ushort[] GenerateRandomBuffer(int size, int randomSeed) 
{ 
    ushort[] buffer = new ushort[327680]; 
    Random random = new Random(randomSeed); 

    for (int i = 0; i < size; i++) 
    { 
     buffer[i] = (ushort)random.Next(ushort.MinValue, ushort.MaxValue); 
    } 

    return buffer; 
} 

方法

public static void SubtractBackgroundFromBuffer(ushort[] backgroundBuffer) 
{ 
    int bufferLength = Buffer.Length; 

    for (int index = 0; index < bufferLength; index++) 
    { 
     int difference = Buffer[index] - backgroundBuffer[index]; 

     if (difference >= 0) 
      Buffer[index] = (ushort)difference; 
     else 
      Buffer[index] = 0; 
    } 
} 

public static void SubtractBackgroundFromBufferWithCalcOpt(ushort[] backgroundBuffer) 
{ 
    int bufferLength = Buffer.Length; 

    for (int index = 0; index < bufferLength; index++) 
    { 
     if (Buffer[index] < backgroundBuffer[index]) 
     { 
      Buffer[index] = 0; 
     } 
     else 
     { 
      Buffer[index] -= backgroundBuffer[index]; 
     } 
    } 
} 

public static void SubtractBackgroundFromBufferParallelFor(ushort[] backgroundBuffer) 
{ 
    Parallel.For(0, Buffer.Length, (i) => 
    { 
     int difference = Buffer[i] - backgroundBuffer[i]; 
     if (difference >= 0) 
      Buffer[i] = (ushort)difference; 
     else 
      Buffer[i] = 0; 
    }); 
}   

public static void SubtractBackgroundFromBufferBlockParallelFor(ushort[] backgroundBuffer) 
{ 
    int blockSize = 4096; 

    Parallel.For(0, (int)Math.Ceiling(Buffer.Length/(double)blockSize), (j) => 
    { 
     for (int i = j * blockSize; i < (j + 1) * blockSize; i++) 
     { 
      int difference = Buffer[i] - backgroundBuffer[i]; 

      Buffer[i] = (ushort)Math.Max(difference, 0);      
     } 
    }); 
} 

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(ushort[] backgroundBuffer) 
{ 
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range => 
     { 
      for (int i = range.Item1; i < range.Item2; ++i) 
      { 
       if (Buffer[i] < backgroundBuffer[i]) 
       { 
        Buffer[i] = 0; 
       } 
       else 
       { 
        Buffer[i] -= backgroundBuffer[i]; 
       } 
      } 
     }); 
} 

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(ushort[] backgroundBuffer) 
{ 
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range => 
    { 
     for (int i = range.Item1; i < range.Item2; ++i) 
     { 
      unsafe 
      { 
       var nonNegative = Buffer[i] > backgroundBuffer[i]; 
       Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) * 
        *((int*)(&nonNegative))); 
      } 
     } 
    }); 
} 
+0

@BrunoCosta 1.我不确定我是否明白你的意思。 “再划分”是什么意思? 2.什么让你觉得这不是在整个阵列上运行? Blocksize是一个有点武断的选择,可能值得进一步的基准。 –

+0

我完全错过了发现的代码...但我还是相信,Parallel.Foreach会使它成为一个自我分区。通过分区我的意思是可以分配许多线程来处理您的4096块。但也许我错了.. –

+0

@BrunoCosta别担心,我们都做了几次;) –

1

您可以尝试Parallel.For

Parallel.For(0, Buffer.Length, (i) => 
{ 
    int difference = Buffer[i] - backgroundBuffer[i]; 
    if (difference >= 0) 
      Buffer[i] = (ushort) difference; 
    else 
     Buffer[i] = 0; 
}); 

更新:我已经尝试过了,我看有你的情况最小差,但当阵列变大时差异变得更大

enter image description here

+0

@elgonzo'Parallel.For'不会为每次迭代创建新任务:[是否Parallel.For每次迭代使用一个任务?](http://blogs.msdn.com/b/pfxteam/archive/2009/05/ 26/9641563.aspx?Redirected = true) – MarcinJuraszek

+0

对。哎呀,我的坏... – elgonzo

+0

这可能会节省几个减和铸造周期: if(Buffer [i] <= backgroundBuffer [i]){Buffer [i] = 0; } else {Buffer [i] - = backgroundBuffer [i]; } – TTat

1

在实际执行减法之前,您可能会首先检查结果是否为负值,从而可能会获得较小的性能提升。这样,如果结果为负,则不需要执行减法。示例:

if (Buffer[index] > backgroundBuffer[index]) 
    Buffer[index] = (ushort)(Buffer[index] - backgroundBuffer[index]); 
else 
    Buffer[index] = 0; 
+0

这取决于抖动如何编译代码到程序集。即使节省了速度也不会超过几微秒。 –

4

这是一个有趣的问题。

只有在测试结果不会为负时执行减法(如TTat和Maximum Cookie所示)影响可以忽略不计,因为此优化可能已由JIT编译器执行。

并行化任务(如Selman22建议)是一个好主意,但是当环路一样快,因为它是在这种情况下,开销最终outwaying,所以实际上Selman22's implementation在我的测试运行速度较慢的收益。我怀疑nick_w's benchmarks是用附加的调试器生成的,隐藏了这个事实。

并行化较大的块(由nick_w建议)任务处理与开销的问题,实际上可以产生更快的性能,但你不必自己计算块 - 您可以使用Partitioner为你做这个:

public static void SubtractBackgroundFromBufferPartitionedParallelForEach(
    ushort[] backgroundBuffer) 
{ 
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range => 
     { 
      for (int i = range.Item1; i < range.Item2; ++i) 
      { 
       if (Buffer[i] < backgroundBuffer[i]) 
       { 
        Buffer[i] = 0; 
       } 
       else 
       { 
        Buffer[i] -= backgroundBuffer[i]; 
       } 
      } 
     }); 
} 

上述方法的性能一直优于nick_w's手卷在我的测试组块。

但是等等!除此之外还有更多。

放慢代码的真正罪魁祸首不是赋值或算术。这是if声明。它对性能的影响将主要受到您正在处理的数据的性质的影响。

nick_w's基准测试会为两个缓冲区生成相同大小的随机数据。但是,我怀疑它很可能实际上在后台缓冲区中具有较低的平均幅度数据。由于分支预测(如this classic SO answer中所述),此详细信息可能很重要。

当后台缓冲区中的值通常小于缓冲区中的值时,JIT编译器会注意到这一点,并相应地对该分支进行优化。当每个缓冲区中的数据来自相同的随机总体时,无法以超过50%的准确度猜测结果的if声明。正是后一种情况下,nick_w是基准测试,在这些条件下,我们可以通过使用不安全的代码将bool转换为整数并避免分支,进一步优化您的方法。 (请注意,下面的代码依赖于bool在内存中的表示方式的实现细节,并且它适用于.NET 4.5中的场景,但这不一定是个好主意,并且在此处显示用于说明目的。)

public static void SubtractBackgroundFromBufferPartitionedParallelForEachHack(
    ushort[] backgroundBuffer) 
{ 
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range => 
     { 
      for (int i = range.Item1; i < range.Item2; ++i) 
      { 
       unsafe 
       { 
        var nonNegative = Buffer[i] > backgroundBuffer[i]; 
        Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) * 
         *((int*)(&nonNegative))); 
       } 
      } 
     }); 
} 

如果你真的希望关闭刮胡子多一点的时间,那么你可以通过切换语言C++/CLI遵循一个更安全的方式这种做法,因为这将让你使用一个布尔值在算术表达式,而不诉诸不安全代码:

UInt16 MyCppLib::Maths::SafeSubtraction(UInt16 minuend, UInt16 subtrahend) 
{ 
    return (UInt16)((minuend - subtrahend) * (minuend > subtrahend)); 
} 

您可以使用C++/CLI露出上述静态方法纯粹管理DLL,一个然后在你的C#代码中使用它:

public static void SubtractBackgroundFromBufferPartitionedParallelForEachCpp(
    ushort[] backgroundBuffer) 
{ 
    Parallel.ForEach(Partitioner.Create(0, Buffer.Length), range => 
    { 
     for (int i = range.Item1; i < range.Item2; ++i) 
     { 
      Buffer[i] = 
       MyCppLib.Maths.SafeSubtraction(Buffer[i], backgroundBuffer[i]); 
     } 
    }); 
} 

这超过了hacky不安全的C#代码上面。实际上,它的速度非常快,您可以使用C++/CLI编写整个方法来忘记并行化,并且它仍然会胜过其他技术。

使用nick_w's test harness,上述方法将胜过迄今发布在此处的任何其他建议。下面是结果我得到(1-4是他尝试的情况下,5-7是在这个答案中概述的):

1. SubtractBackgroundFromBuffer(ms):        2,021.37 
2. SubtractBackgroundFromBufferWithCalcOpt(ms):     2,125.80 
3. SubtractBackgroundFromBufferParallelFor(ms):     3,431.58 
4. SubtractBackgroundFromBufferBlockParallelFor(ms):    1,401.36 
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):  1,197.76 
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms): 742.72 
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms): 499.27 

然而,在情景我希望你确实有,哪里背景值通常较小,成功的分支预测提高全线的结果,和“黑客”,以避免if声明实际上是慢:

这里是我开始使用nick_w's test harness结果时,我在后台缓冲限制值范围0-6500(约10%缓冲区):

1. SubtractBackgroundFromBuffer(ms):         773.50 
2. SubtractBackgroundFromBufferWithCalcOpt(ms):      915.91 
3. SubtractBackgroundFromBufferParallelFor(ms):     2,458.36 
4. SubtractBackgroundFromBufferBlockParallelFor(ms):     663.76 
5. SubtractBackgroundFromBufferPartitionedParallelForEach(ms):  658.05 
6. SubtractBackgroundFromBufferPartitionedParallelForEachHack(ms): 762.11 
7. SubtractBackgroundFromBufferPartitionedParallelForEachCpp(ms): 494.12 

您可以看到结果1-5显着改善,因为它们现在从更好的分支预测中受益。结果6 & 7没有太大变化,因为它们避免了分支。

数据的这种改变已经完全改变了事情。在这种情况下,即使是最快的所有C#解决方案现在只比原始代码快15%。

底线:一定要测试你挑代表性的数据,或者您的结果将是没有意义的任何方法。

+0

你正在将一个bool *转换为一个int *(无效),并且正在假设一个真正的bool的数值(无效 - 不保证是一个)。虽然我喜欢避免分支的一般想法。 – usr

+0

@usr是的,该代码确实有效,但你说得对,依靠这个实现细节并不是一个好主意 - 我会澄清的。正如我在答案中所说的那样,我怀疑这个黑客实际上会随着OP的数据而变慢。 – Ergwun

+0

@GeoffBattye:感谢您的基准测试和精彩评论!我希望我也可以将您的答案标记为解决方案!顺便说一句,backgroundBuffer几乎总是会有比Buffer小的值,所以希望JIT编译器能够正确地注意和优化,正如你所说的那样。 – nb1forxp

0

这是一个使用Zip()一个解决方案:

Buffer = Buffer.Zip<ushort, ushort, ushort>(backgroundBuffer, (x, y) => 
{ 
    return (ushort)Math.Max(0, x - y); 
}).ToArray(); 

它不执行,以及其他的答案,但它肯定是最短的解决方案。

0

怎么样,

Enumerable.Range(0, Buffer.Length).AsParalell().ForAll(i => 
    { 
     unsafe 
     { 
      var nonNegative = Buffer[i] > backgroundBuffer[i]; 
      Buffer[i] = (ushort)((Buffer[i] - backgroundBuffer[i]) * 
       *((int*)(&nonNegative))); 
     } 
    }); 
+0

这个出来比'Partitioner'的'Parallel.Foreach'大约慢10倍。令人惊讶的是,它远远落后。 – Ergwun

相关问题