2016-01-18 79 views
3

我正在实现分布式cronjob执行系统(所谓的cron计算集群)。当动作时间在那里时,Cronjobs应该排队进入消息队列(RabbitMQ)。另一方面(集群的节点/工作者)是一个Perl守护进程,它利用AnyEvent::RabbitMQ从消息队列中完全接收一个cronjob /任务/消息,处理任务并从消息队列请求另一个恰好一个cronjob /任务/消息等等。Howto如何处理Perl中的AnyEvent,RabbitMQ(心跳)和长时间运行的作业?

我使用RabbitMQ的心跳功能,该功能与AnyEvent::RabbitMQ一起实现,以帮助RabbitMQ识别断开的连接。

没关系心跳间隔的实际值!我也有很长时间的运行工作需要几天时间。因此,将时间间隔设置为最长的cronjob所需的时间并不是一种选择。

请参阅下面的代码片段来执行Perl守护进程worker中的实际cronjob。它在'AnyEvent-> timer'内实现,不会对RabbitMQ做消息处理。由于RabbitMQ的consume被禁止(由管理层)使用此方法。

sub _timer_tick { 

    $rabbitmq_channel->get(
    queue  => 'job_queue', 
    on_success => sub { 
     my ($amqp_method) = @_; 
     if (not $amqp_method->{empty}) { 
     pause_timer(); 
     progress_job($amqp_method); 
     resume_timer(); 
     } 
    }, 
    on_failure => sub { $quit_programm->send('RABBITMQ_ERROR', @_) }, 
); 

    return; 
} 

progress_job()是消息被解析并执行作业的地方。 pause_timer()resume_timer()控制触发_timer_tick()AnyEvent->timer

use Capture::Tiny 'capture'; 
sub progress_job { 
    my ($amqp_method) = @_; 
    my $job = decode_json($amqp_method->{body}->to_raw_payload()); 
    my ($stdout, $stderr, $exit) = capture { 
    system $job->{execute}; 
    }; 
    return; 
} 

第一个长期运行的工作中去,系统“崩溃”与各种错误消息。有时会抛出'未知频道ID:1',有时会抛出'频道已关闭'。所以我做了'愚蠢的调试'(试图弄乱配置),发现当heartbeat的间隔比progress_job()所用的时间短时,将会抛出这些错误。经过一番考虑后才有意义。 progress_job()是一个阻塞子程序,AnyEvent无法继续向RabbitMQ发送心跳包。

我首先想到的解决阻塞问题的方法是在子进程中分叉并执行progress_job()AnyEvents documentation on FORK指出,如果孩子内没有对事件系统的访问(例如通过AnyEvent),则使用fork保存。 接下来想到:好吧,没有访问事件系统,所以我可以做叉子。 但是:定时器应该恢复(resume_timer())后progress_job()已返回。理论上resume_timer()将在fork()之后被调用,而不是在progress_job()之后返回。所以我停止了我的实施。

我的问题:如何解决最后一点? progress_job()(或换句话说,分叉的孩子)返回后如何resume_timer()? 由于分叉和事件系统不是线程安全的,我不能将resume_timer()放在孩子的内部。

回答

3

AE不能处理事件,除非程序使用AE感知呼叫阻塞。 system不是AE感知的。改为使用AnyEvent::Utilrun_cmd

+0

'$ run_cmd_cv-> recv'给了我'256'而不是预期的'1'。命令'perl -E'exit 1'; echo $?'也回声'1'。如何接收执行命令的实际错误代码? – burnersk

+1

bash的'$?'不同于Perl的'$?'。参见['system'](http://perldoc.perl.org/functions/system.html)。 – ikegami

+0

谢谢你指出。我添加了'$ exit = $ exit >> 8,如果$ exit && $ exit> 255;'现在它可以工作。 – burnersk

相关问题