2014-06-24 215 views
1

我有这样的文件数组。为什么parallel.Invoke在这种情况下不工作

string [] unZippedFiles; 这个想法是我想要并行解析这些文件。当他们被解析时,一个记录被放置在一个concurrentbag上。随着记录越来越多,我想踢更新功能。

以下是我在我的Main()现在做的:

foreach(var file in unZippedFiles) 
    { Parallel.Invoke 
      ( 
           () => ImportFiles(file), 
           () => UpdateTest() 


          ); 
      } 

这是更新loooks的代码等。

static void UpdateTest() 
    { 
     Console.WriteLine("Updating/Inserting merchant information."); 

     while (!merchCollection.IsEmpty || producingRecords) 
     { 
      merchant x; 
      if (merchCollection.TryTake(out x)) 
      { 

       UPDATE_MERCHANT(x.m_id, x.mInfo, x.month, x.year); 
      } 
     } 



    } 

这就是导入代码的样子。这几乎是一个巨大的字符串解析器。

 System.IO.StreamReader SR = new System.IO.StreamReader(fileName); 
      long COUNTER = 0; 
      StringBuilder contents = new StringBuilder(); 
      string M_ID = ""; 

      string BOF_DELIMITER = "%%MS_SKEY_0000_000_PDF:"; 
      string EOF_DELIMITER = "%%EOF"; 

      try 
      { 
       record_count = 0; 
       producingRecords = true; 
       for (COUNTER = 0; COUNTER <= SR.BaseStream.Length - 1; COUNTER++) 
       { 
        if (SR.EndOfStream) 
        { 
         break; 
        } 
        contents.AppendLine(Strings.Trim(SR.ReadLine())); 
        contents.AppendLine(System.Environment.NewLine); 
        //contents += Strings.Trim(SR.ReadLine()); 
        //contents += Strings.Chr(10); 
        if (contents.ToString().IndexOf((EOF_DELIMITER)) > -1) 
        { 
         if (contents.ToString().StartsWith(BOF_DELIMITER) & contents.ToString().IndexOf(EOF_DELIMITER) > -1) 
         { 
          string data = contents.ToString(); 
          M_ID = data.Substring(data.IndexOf("_M") + 2, data.Substring(data.IndexOf("_M") + 2).IndexOf("_")); 
          Console.WriteLine("Merchant: " + M_ID); 
          merchant newmerch; 
          newmerch.m_id = M_ID; 
          newmerch.mInfo = data.Substring(0, (data.IndexOf(EOF_DELIMITER) + 5)); 
          newmerch.month = DateTime.Now.AddMonths(-1).Month; 
          newmerch.year = DateTime.Now.AddMonths(-1).Year; 
          //Update(newmerch); 
          merchCollection.Add(newmerch); 
         } 
         contents.Clear(); 
         //GC.Collect(); 
        } 
       } 

       SR.Close(); 
       // UpdateTest(); 

      } 
      catch (Exception ex) 
      { 
       producingRecords = false; 

      } 
      finally 
      { 
       producingRecords = false; 
      } 
     } 

我遇到的问题是更新运行一次,然后导入文件函数只是接管并不会产生更新功能。关于我做错什么的想法会有很大的帮助。

+0

看起来像是一个计时问题。您的'UpdateTest'在'ImportFiles'有机会将一个项目放入'metchCollection'或将'produceRecords'设置为true之前完成。没有看到代码的其余部分,这是在黑暗中的野生刺伤。 –

+0

我同意这是一个计时问题。但我想我所问的是为什么ImportFile不会屈服。我也将添加导入代码。 – Waddaulookingat

+0

下面介绍如何修补它:在'foreach'范围内声明'merchCollection'作为'BlockingCollection '*;完全抛弃'produceRecords'变量 - 它不适合线程同步;改变你的更新,通过'foreach(var m in merchCollection.GetConsumingEnumerable())''遍历阻塞集合。在你的'ImportFiles''finally'块中调用'merchCollection.CompleteAdding()',这样你的更新就完成了。瞧! –

回答

3

这是我的刺修复你的线程同步。请注意,我没有从功能角度改变任何代码(除了取出catch - 这通常是一个坏主意;异常需要传播)。

原谅如果有东西不编译 - 我写这个基于不完整的片段。

主要

foreach(var file in unZippedFiles) 
{ 
    using (var merchCollection = new BlockingCollection<merchant>()) 
    { 
     Parallel.Invoke 
     ( 
      () => ImportFiles(file, merchCollection), 
      () => UpdateTest(merchCollection) 
     ); 
    } 
} 

更新

private void UpdateTest(BlockingCollection<merchant> merchCollection) 
{ 
    Console.WriteLine("Updating/Inserting merchant information."); 

    foreach (merchant x in merchCollection.GetConsumingEnumerable()) 
    { 
     UPDATE_MERCHANT(x.m_id, x.mInfo, x.month, x.year); 
    } 
} 

进口

不要忘记在merchCollection以作为参数传递 - 它不应该是一成不变的。

  System.IO.StreamReader SR = new System.IO.StreamReader(fileName); 
     long COUNTER = 0; 
     StringBuilder contents = new StringBuilder(); 
     string M_ID = ""; 

     string BOF_DELIMITER = "%%MS_SKEY_0000_000_PDF:"; 
     string EOF_DELIMITER = "%%EOF"; 

     try 
     { 
      record_count = 0; 

      for (COUNTER = 0; COUNTER <= SR.BaseStream.Length - 1; COUNTER++) 
      { 
       if (SR.EndOfStream) 
       { 
        break; 
       } 
       contents.AppendLine(Strings.Trim(SR.ReadLine())); 
       contents.AppendLine(System.Environment.NewLine); 
       //contents += Strings.Trim(SR.ReadLine()); 
       //contents += Strings.Chr(10); 
       if (contents.ToString().IndexOf((EOF_DELIMITER)) > -1) 
       { 
        if (contents.ToString().StartsWith(BOF_DELIMITER) & contents.ToString().IndexOf(EOF_DELIMITER) > -1) 
        { 
         string data = contents.ToString(); 
         M_ID = data.Substring(data.IndexOf("_M") + 2, data.Substring(data.IndexOf("_M") + 2).IndexOf("_")); 
         Console.WriteLine("Merchant: " + M_ID); 
         merchant newmerch; 
         newmerch.m_id = M_ID; 
         newmerch.mInfo = data.Substring(0, (data.IndexOf(EOF_DELIMITER) + 5)); 
         newmerch.month = DateTime.Now.AddMonths(-1).Month; 
         newmerch.year = DateTime.Now.AddMonths(-1).Year; 
         //Update(newmerch); 
         merchCollection.Add(newmerch); 
        } 
        contents.Clear(); 
        //GC.Collect(); 
       } 
      } 

      SR.Close(); 
      // UpdateTest(); 

     } 
     finally 
     { 
      merchCollection.CompleteAdding(); 
     } 
    } 
+0

谢谢基里尔。这看起来有希望。我在尝试“加速这些代码”的过程中做了大量的阅读和大量的学习。我不知道BlockingCollection。我从来没有想过通过ConcurrentBag – Waddaulookingat

+0

@Waddaulookingat,'BlockingCollection '的美妙之处在于它包含了开箱即用的生产者和消费者同步的方法(通过'CompleteAdding' /'GetConsumingEnumerable'),所以你不用'不用担心拥有自己的线程同步结构 - 你可以免费获得它们。祝你好运。 –

相关问题