2012-10-22 27 views
0

我是perl中的线程的新手。使用信号量,而在perl中调用文件

我有一个项目列表(每个项目是在一个单独的行)的文件,我想要并行构建这些项目。 目前,每个线程:

  1. 打开文件为“已读”模式
  2. 节省一些项目的列表(=一些文件行)
  3. 关闭
  4. 再次 - 打开文件作为文件“写”模式
  5. 重写它没有以确保每个线程访问的F唯一一个选择

,该线ile,即时尝试使用信号量。

出于某种原因,线程collisons发生,我无法弄清楚我做错了什么。

我可以看到(在我的“报告”中也得到了每个版本的当前时间) 不同的线程从“共享”文件中选择相同的项目(它只会在一段时间内发生,但仍然.. )

我甚至不确定如果我的$信号量decleration是合法的“我的”变量。

任何帮助将得到真正的赞赏!

谢谢。


这里是我的代码的一部分:

my $semaphore = Thread::semaphore->new() ; 

sub build_from_targets_list{ 

    #... 
    open(REPORT, "+>$REPORT_tmp"); # Open for output 
    #.... 
    @threads =(); 


    for ($i = 0; $i < $number_of_cores; $i++){ 
     my $thr = threads->new(\&compile_process, $i,*REPORT); 
     push @threads, $thr; 
    } 

    $_->join for @threads; 
    close (REPORT); 
} 
### Some stuff.. 


sub compile_process{ 

    *REPORT = shift(@_); 
    #... 

    while (1){ 
     $semaphore->down(); 
     open (DATA_FILE, $shared_file); 
     flock(DATA_FILE, 2); 
     while ($record = <DATA_FILE>) { 
      chomp($record); 
      push(@temp_target_list,$record);  
     } 


     # ... choose some lines (=projects)... 
     # remove the projects that should be built by this thread: 
     for ($k = 0; $k < $num_of_targets_in_project; $k++){    
      shift(@temp_target_list); 

     } 

     close(DATA_FILE);  
     open (REWRITE,">$shared_file"); 

     flock(REWRITE, 2); 

     seek(REWRITE, 0, 0); 
     foreach $temp_target (@temp_target_list){ 

      print REWRITE "$temp_target\n"; 

     } 

     close (REWRITE); 

     ## ... BUILD selected projects... 

     $semaphore->up(); 
     } 
} 

回答

1

首先,你是如何处理的文件的一些基本的清理工作。如果它是一个简单的文件问题,那么在尝试调试线程问题时没有意义。

必须检查任何文件命令(打开,关闭,聚集,寻找等)是否成功。可以在那里贴一些or dieuse autodie

其次是使用硬编码常数的鸡群。这些是依赖于系统的,并且很难记住哪种模式2。 Fcntl提供了常量。

您正在打开数据文件以使用独占锁(2通常是独占锁)进行读取。这应该可能是一个共享锁。这不会导致问题,但它会导致您的线程不必要地阻塞。

最后,使用词法文件句柄代替全局范围的glob。这减少了机会

作为一个附注,seek $fh, 0, 0打开文件写入后是不必要的。查找常量和群集一样,使用Fcntl获取常量。

一个额外的错误是,你传递$i, *REPORTcompile_process认为*REPORT是第一个参数。再次使用全局文件句柄意味着传递它是多余的,使用词法文件句柄。

现在已经不存在了,您的基本算法似乎有缺陷。 compile_process每个线程读取整个数据文件到线程本地数组@temp_target_list,将某些本地数组移出并写出其余部分。因为@temp_target_list是每个线程,所以没有协调。除非$num_of_targets_in_project被共享,并进行某种屏幕外协调,但没有显示。

基于文件的锁定总是会成为地狱的一小部分。线程有更好的协调机制。有一个更简单的方法来做到这一点。

假设文件不是太大,请将每行读入共享数组。然后让每个线程从该数组中处理项目。该数组是共享的,因此每个元素都被删除,数组将更新所有线程。喜欢的东西...

use strict; 
use warnings; 
use autodie; 

use threads; 
use threads::shared; 

my $Max_Threads = 5; 
my @Todo : shared; 

open my $fh, "<", $work_file; 
@Todo = <$fh>; 
close $fh; 

my @threads; 
for (1..$Max_Threads) { 
    push @threads, threads->new(\&compile_process); 
} 

$_->join for @threads; 

sub compile_process { 
    while(my $work = shift @Todo) { 
     ...do whatever with $work... 
    } 
} 

如果文件过大在内存中保存,可以使用Thread::Queue建设的工作项队列,并动态地添加到它。

+0

谢谢!我会试试这个。 – user1625723