2013-06-19 34 views
0

我正在尝试使队列管理器在特定文件夹中创建文件时获取作业。 我使用AnyEvent创建了我的代码,所以它是异步的。 我的问题是,我试图从子程序add_route,和del_route,使用回调提供返回值,但AE ::定时器不会停止,并且回调得到的值将不会保存在变量$ return_code。 我哪里错了?perl异步代码中的回调

#!/usr/bin/perl 
use strict; 
use warnings; 

use AnyEvent; 
use AnyEvent::Filesys::Notify; 
use Const::Fast; 
use DDP; 
use File::Basename; 
use File::Copy; 
use File::Slurp; 
use FindBin '$Bin'; 
use List::Util qw(first); 
use Regexp::Common qw(net); 
use v5.10.1; 

const my $true => 1; 
const my $false => 0; 

my $cv = AE::cv; 
my $jobs_folder_path = $Bin . '/jobs'; 
my $interval = 5; 
my $after = 10; 

my %jobs_folders  = (
    "new"  => "$jobs_folder_path/new", 
    "progress" => "$jobs_folder_path/progress", 
    "failed" => "$jobs_folder_path/failed", 
); 

my $notifier = AnyEvent::Filesys::Notify->new(
    dirs  => [ $jobs_folders{'new'} ], 
    interval => $interval, 
    cb   => sub { 
     my (@events) = @_; 

     for my $event (@events) { 
      if ($event->is_created) { 
       process_new_job($event->path); 
      } 
     } 
    } 
); 

my $timer = AE::timer $after, $interval, sub { 
    my @files = read_dir($jobs_folders{'progress'}, prefix => $true); 

    if (@files) { 
     foreach my $file (@files) { 
      my $file_name = basename($file); 
      my $line  = read_file($file); 

      for ($file_name) { 
       when (/add/) { 
        my ($ip_address, $next_hop) = split(/ /, $line); 
        my $return_code; 
        my $cb = sub { 
         my $ret_val = shift; 

         $return_code = $ret_val; 
        }; 

        add_route($ip_address, $next_hop, $cb); 

        print $return_code; 
        #post_job_process($return_code, $file_name); 
       } 
       when (/del/) { 
        my ($ip_address) = $line; 
        my $return_code; 
        my $cb = sub { 
         my $ret_val = shift; 

         $return_code = $ret_val; 
        }; 

        del_route($ip_address, $cb); 

        print $return_code, "\n"; 

        #post_job_process($return_code, $file_name); 
       } 
      } 
     } 
    } 
}; 

$cv->recv; 

sub process_new_job { 
    my ($new_job) = shift; 

    my $file_name = basename($new_job); 
    move("$jobs_folders{'new'}/$file_name", "$jobs_folders{'progress'}/$file_name"); 
} 

sub post_job_process { 
    my ($return_code, $file_name) = @_; 

    if ($return_code == $false) { 
     move("$jobs_folders{'progress'}/$file_name", "$jobs_folders{'failed'}/$file_name"); 
     send_email(); 
    } 

} 

sub send_email { 
    print "Sending Email...\n"; 
} 

sub add_route { 
    my ($ip_address, $next_hop, $cb) = @_; 

    my $attempt = 0; 
    my $sleep = 10; 

    my $add_timer; $add_timer = AE::timer 0, $sleep, sub { 

     if ($attempt++ >= 3) { 
      undef $add_timer; 
      $cb->($false); 
     } 

     print "$attempt. Adding Route $ip_address via $next_hop\n"; 

     my @addresses = get_routing_table(); 

     my ($comparable_ip) = $ip_address =~ /($RE{net}{IPv4})\/32$/; 
     my $is_in_routing_table = first { $_->{'ip_address'} eq $comparable_ip } @addresses; 

     if ($is_in_routing_table) { 
      undef $add_timer; 
      $cb->($true); 
     } 
    }; 
} 

sub del_route { 
    my ($ip_address, $cb) = @_; 

    my $attempt = 0; 
    my $sleep = 10; 

    my $delete_timer; $delete_timer = AE::timer 0, $sleep, sub { 

     if ($attempt++ >= 3) { 
      undef $delete_timer; 
      $cb->($false); 
     } 

     print "$attempt. Deleting Route $ip_address\n"; 

     my @addresses = get_routing_table(); 

     my ($comparable_ip) = $ip_address =~ /^($RE{net}{IPv4})\/32/; 
     my $is_in_routing_table = first { $_->{'ip_address'} eq $comparable_ip } @addresses; 

     if (not $is_in_routing_table) { 
      undef $delete_timer; 
      $cb->($true); 
     } 
    }; 
} 

sub get_routing_table { 
    #my @routing_table = `ip ro`; 
    my @routing_table = (
     '127.0.0.0/8 dev lo proto kernel scope link src 127.0.0.1', 
     '127.0.0.11 via 10.0.0.11 dev eth0 proto baba', 
    ); 
    my @ret_val; 

    foreach my $line (@routing_table) { 
     my ($ip_address, $next_hop) = $line =~ /^($RE{net}{IPv4}) via ($RE{net}{IPv4}) .*proto baba$/; 
     if (defined ($ip_address) and defined ($next_hop)) { 
      push @ret_val, { ip_address => $ip_address, next_hop => $next_hop }; 
     } 
    } 

    return @ret_val; 
} 

跨张贴在PerlMonks

+0

这是一段很长的代码。尽量减少你的问题的要点,使阅读更容易。 – brice

回答

1

你没有什么等待异步代码回调来完成,实际上设置的值,你继续前进并打印返回代码之前。例如,在创建add_route和打印等待完成的返回代码之间需要一个condvar。沿(虽然未经测试)线的东西:

  when (/add/) { 
       my ($ip_address, $next_hop) = split(/ /, $line); 
       my $done = AE:cv; 
       my $cb = sub { 
        my $ret_val = shift; 

        $done->send($ret_val); 
       }; 

       add_route($ip_address, $next_hop, $cb); 

       my $return_code = $done->recv; ### Wait for the callback to finish... 

       print $return_code; 
       #post_job_process($return_code, $file_name); 
      } 

编辑:使用对象::活动作为基类真的帮了我满脑子都在盘算着如何整合实验,当我得知AnyEvent。