在我的Django网络应用程序中,我有一个工作程序,它是速率受限的API的客户端,负责处理来自服务器线程的对该API的所有请求。我使用我的数据库来存储任务队列。任务可以大批量进行,或根本不会。我正在使用事件循环来轮询队列并管理任务之间的延迟,以防万一超过速率限制(限制是动态的)。这一切都可以正常工作,但是我唯一想做的其他事情是让工作人员在队列变干时停止点击数据库,并且让我的Django应用程序向工作人员发出队列不再存在的信号再次干燥。Django应用程序的工作进程的异步事件
示意性地,在伪Python的它看起来像这样:
state = NORMAL
delay_time = NORMAL_DELAY
while True:
sleep(delay_time)
if state == DORMANT:
continue
task = get_next_task() # hits database
if task is None:
state = DORMANT
delay_time = NORMAL_TIME
try:
execute(task)
except RateExceeded:
delay_time = backoff(delay_time)
else:
delay_time = NORMAL_DELAY
# Triggered by web layer
def asynchronous_event():
state = NORMAL
我要么需要一个异步事件从可设置状态恢复到正常(这将在sleep
执行)网络层或一些触发其他轻量级检查,不会增加不必要的循环DB查询。
在单机设置中,我只能使用信号,但显然这在多机设置中不起作用。我试图不必为了这个信号的目的而运行单独的消息队列服务器。我使用Dotcloud进行托管,以便可以使用基于网络的解决方案。理想情况下,与信号处理程序一样易于实现。我研究过ZeroRPC,但是我不确定如何将它合并到我的事件循环中。
任何想法?
编辑
我期待到ZeroMQ来解决这个问题,但我可以使用一些帮助。棘手的部分是将有多个并发的webserver实例,并且在重新部署时,我需要从一个工作者到其后继者的平稳过渡。因此,请耐心等待,因为我的术语可能不正确,在我看来,最好的做法是让每个工作人员异步地将其作为邮箱进行绑定,并在主循环中将其从休眠模式中唤醒。每名工人在其IP的数据库中创建一个记录,并用创建日期加盖时间戳。提交请求时,Web服务器向所有工作人员发布消息。当工作人员收到一条消息时,它会检查它是否具有最新创建日期的工作人员:如果是,则会处理该消息,如果没有,则会自行终止。
这看起来很麻烦,但我想要正确,因为我可能会在我的应用程序的其他地方使用这种模式。
如果您已经在使用Redis,则可以将其用作轻量级队列。这会将它从数据库中一起移除,而且速度非常快。 –
听起来不错,但我还没有使用Redis。如果这是唯一不错的选择,我可能只需要把它弄糟。 – acjay
如果您正在查看ZeroMQ并且您正在使用Python,则应该查看ZeroRPC。它有助于抽象出很多ZeroMQ的复杂性。 –