我正在实现分布式cronjob执行系统(所谓的cron计算集群)。当动作时间在那里时,Cronjobs应该排队进入消息队列(RabbitMQ)。另一方面(集群的节点/工作者)是一个Perl守护进程,它利用AnyEvent::RabbitMQ
从消息队列中完全接收一个cronjob /任务/消息,处理任务并从消息队列请求另一个恰好一个cronjob /任务/消息等等。Howto如何处理Perl中的AnyEvent,RabbitMQ(心跳)和长时间运行的作业?
我使用RabbitMQ的心跳功能,该功能与AnyEvent::RabbitMQ
一起实现,以帮助RabbitMQ识别断开的连接。
没关系心跳间隔的实际值!我也有很长时间的运行工作需要几天时间。因此,将时间间隔设置为最长的cronjob所需的时间并不是一种选择。
请参阅下面的代码片段来执行Perl守护进程worker中的实际cronjob。它在'AnyEvent-> timer'内实现,不会对RabbitMQ做消息处理。由于RabbitMQ的consume
被禁止(由管理层)使用此方法。
sub _timer_tick {
$rabbitmq_channel->get(
queue => 'job_queue',
on_success => sub {
my ($amqp_method) = @_;
if (not $amqp_method->{empty}) {
pause_timer();
progress_job($amqp_method);
resume_timer();
}
},
on_failure => sub { $quit_programm->send('RABBITMQ_ERROR', @_) },
);
return;
}
progress_job()
是消息被解析并执行作业的地方。 pause_timer()
和resume_timer()
控制触发_timer_tick()
的AnyEvent->timer
。
use Capture::Tiny 'capture';
sub progress_job {
my ($amqp_method) = @_;
my $job = decode_json($amqp_method->{body}->to_raw_payload());
my ($stdout, $stderr, $exit) = capture {
system $job->{execute};
};
return;
}
第一个长期运行的工作中去,系统“崩溃”与各种错误消息。有时会抛出'未知频道ID:1',有时会抛出'频道已关闭'。所以我做了'愚蠢的调试'(试图弄乱配置),发现当heartbeat
的间隔比progress_job()
所用的时间短时,将会抛出这些错误。经过一番考虑后才有意义。 progress_job()
是一个阻塞子程序,AnyEvent无法继续向RabbitMQ发送心跳包。
我首先想到的解决阻塞问题的方法是在子进程中分叉并执行progress_job()
。 AnyEvents documentation on FORK指出,如果孩子内没有对事件系统的访问(例如通过AnyEvent),则使用fork
保存。 接下来想到:好吧,没有访问事件系统,所以我可以做叉子。 但是:定时器应该恢复(resume_timer()
)后progress_job()
已返回。理论上resume_timer()
将在fork()
之后被调用,而不是在progress_job()
之后返回。所以我停止了我的实施。
我的问题:如何解决最后一点? progress_job()
(或换句话说,分叉的孩子)返回后如何resume_timer()
? 由于分叉和事件系统不是线程安全的,我不能将resume_timer()
放在孩子的内部。
'$ run_cmd_cv-> recv'给了我'256'而不是预期的'1'。命令'perl -E'exit 1'; echo $?'也回声'1'。如何接收执行命令的实际错误代码? – burnersk
bash的'$?'不同于Perl的'$?'。参见['system'](http://perldoc.perl.org/functions/system.html)。 – ikegami
谢谢你指出。我添加了'$ exit = $ exit >> 8,如果$ exit && $ exit> 255;'现在它可以工作。 – burnersk