2014-11-04 139 views
3

我正在写一个函数来获取目录中的所有文件,但通过将每个子目录添加到线程池中并行执行。我认为这将意味着每个目录将被并行遍历,并且由于有许多子目录,它会比顺序执行要快得多。我的代码如下:线程池是否按顺序运行?

private object addlock = new object(); 
    private void addFiles(string[] newFiles) 
    { 
     lock (addlock) { 
      files.AddRange(newFiles); 
      Console.WriteLine("Added {0} files", newFiles.Length); 
     } 
    } 

    private void getFilesParallel(string dir) 
    { 
     if (!Directory.Exists(dir)) { 
      return; 
     } 

     string[] dirs = Directory.GetDirectories(dir, "*", SearchOption.TopDirectoryOnly); 
     ManualResetEvent mre = new ManualResetEvent(false); 

     ThreadPool.QueueUserWorkItem((object obj) => 
     { 
      addFiles(Directory.GetFiles(dir, "*", SearchOption.TopDirectoryOnly)); 
      mre.Set(); 
     }); 

     Process currentProcess = Process.GetCurrentProcess(); 
     long memorySize = currentProcess.PrivateMemorySize64; 

     Console.WriteLine("Used {0}", memorySize); 

     foreach (string str in dirs) { 
      getFilesParallel(str); 
     } 

     mre.WaitOne(); 
    } 

的问题是,我得到这样的输出:

Added 34510 files 
Used 301420544 
Added 41051 files 
Used 313937920 
Added 39093 files 
Used 322764800 
Added 44426 files 
Used 342536192 
Added 30772 files 
Used 350728192 
Added 36262 files 
Used 360329216 
Added 31686 files 
Used 368685056 
Added 33194 files 
Used 374894592 
Added 34486 files 
Used 384057344 
Added 37298 files 
Used 393998336 

这表明我的代码运行顺序,因为我本来希望能够找到每个语句因为它们在不同的线程上运行。我使用不同的文件夹多次运行它,结果总是相同的。为什么这个顺序运行?

+0

尝试使用一些ramdisk软件的多个虚拟磁盘驱动器。 6Gb/s是我的猜测。 – 2014-11-04 20:11:11

回答

8

您只有一个物理磁盘驱动器。磁盘的磁头一次只能放在一个地方。你在同一时间要求它提供两条信息,不允许它实际上同时在两个地方。

在你的程序中有少量的CPU绑定工作实际上可以并行化,但这不是主要的瓶颈。

如果你有多个物理磁盘驱动器和数据每个驱动器上,那么你可以访问每一个并行数据与实际有问题的工作并行进行。

0

它很难准确地进行基准测试,因为如果您有足够的内存,第一次运行将缓存数据,并且相同文件夹的后续枚举可能会在根本不访问磁盘的情况下运行。

它也值得考虑,如果你有SSD,它将从并行操作中受益更多,因为它支持更多的IOPS,因为它没有任何移动部件可以等待。

此代码显示了平行可高达2 - 针对SSD上运行时,或者当数据已经缓存3倍的速度在我的四核i5比单线程。

它演示了Parallel.ForEach的使用,它可以消除任务并行中的很多痛苦。

using System; 
using System.Collections.Generic; 
using System.Diagnostics; 
using System.IO; 
using System.Threading.Tasks; 

namespace FilesReader 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      string path = args[0]; 
      RunTrial(path, false); 
      RunTrial(path, true); 
     } 

     private static void RunTrial(string path, bool useParallel) 
     { 
      Console.WriteLine("Parallel: {0}", useParallel); 

      Stopwatch stopwatch = new Stopwatch(); 
      stopwatch.Start(); 
      FileListing listing = new FileListing(path, useParallel); 
      stopwatch.Stop(); 

      Console.WriteLine("Added {0} files in {1} ms ({2} file/second)", 
       listing.Files.Count, stopwatch.ElapsedMilliseconds, 
       (listing.Files.Count * 1000/stopwatch.ElapsedMilliseconds)); 
     } 
    } 

    class FileListing 
    { 
     private ConcurrentList<string> _files; 
     private bool _parallelExecution; 

     public FileListing(string path, bool parallelExecution) 
     { 
      _parallelExecution = parallelExecution; 
      _files = new ConcurrentList<string>(); 
      BuildListing(path); 
     } 

     public ConcurrentList<string> Files 
     { 
      get { return _files; } 
     } 

     private void BuildListing(string path) 
     { 
      string[] dirs = null; 
      string[] files = null; 
      bool success = false; 

      try 
      { 
       dirs = Directory.GetDirectories(path, "*", SearchOption.TopDirectoryOnly); 
       files = Directory.GetFiles(path); 
       success = true; 
      } 
      catch (SystemException) { /* Suppress security exceptions etc*/ } 

      if (success) 
      { 
       Files.AddRange(files); 

       if (_parallelExecution) 
       { 
        Parallel.ForEach(dirs, d => BuildListing(d)); 
       } 
       else 
       { 
        foreach (string dir in dirs) 
        { 
         BuildListing(dir); 
        } 
       } 
      } 
     } 
    } 

    class ConcurrentList<T> 
    { 
     object lockObject = new object(); 
     List<T> list; 

     public ConcurrentList() 
     { 
      list = new List<T>(); 
     } 

     public void Add(T item) 
     { 
      lock (lockObject) list.Add(item); 

     } 
     public void AddRange(IEnumerable<T> collection) 
     { 
      lock (lockObject) list.AddRange(collection); 
     } 

     public long Count 
     { 
      get { lock (lockObject) return list.Count; } 
     } 
    } 
} 

我考虑使用Concurrent集合而不是滚动线程安全列表实现,但他们证明慢了大约5%。