2013-01-24 39 views
1

在我的Django网络应用程序中,我有一个工作程序,它是速率受限的API的客户端,负责处理来自服务器线程的对该API的所有请求。我使用我的数据库来存储任务队列。任务可以大批量进行,或根本不会。我正在使用事件循环来轮询队列并管理任务之间的延迟,以防万一超过速率限制(限制是动态的)。这一切都可以正常工作,但是我唯一想做的其他事情是让工作人员在队列变干时停止点击数据库,并且让我的Django应用程序向工作人员发出队列不再存在的信号再次干燥。Django应用程序的工作进程的异步事件

示意性地,在伪Python的它看起来像这样:

state = NORMAL 
delay_time = NORMAL_DELAY 

while True: 
    sleep(delay_time) 

    if state == DORMANT: 
     continue 

    task = get_next_task() # hits database 
    if task is None: 
     state = DORMANT 
     delay_time = NORMAL_TIME 

    try: 
     execute(task) 
    except RateExceeded: 
     delay_time = backoff(delay_time) 
    else: 
     delay_time = NORMAL_DELAY 

# Triggered by web layer 
def asynchronous_event(): 
    state = NORMAL 

我要么需要一个异步事件从可设置状态恢复到正常(这将在sleep执行)网络层或一些触发其他轻量级检查,不会增加不必要的循环DB查询。

在单机设置中,我只能使用信号,但显然这在多机设置中不起作用。我试图不必为了这个信号的目的而运行单独的消息队列服务器。我使用Dotcloud进行托管,以便可以使用基于网络的解决方案。理想情况下,与信号处理程序一样易于实现。我研究过ZeroRPC,但是我不确定如何将它合并到我的事件循环中。

任何想法?

编辑

我期待到ZeroMQ来解决这个问题,但我可以使用一些帮助。棘手的部分是将有多个并发的webserver实例,并且在重新部署时,我需要从一个工作者到其后继者的平稳过渡。因此,请耐心等待,因为我的术语可能不正确,在我看来,最好的做法是让每个工作人员异步地将其作为邮箱进行绑定,并在主循环中将其从休眠模式中唤醒。每名工人在其IP的数据库中创建一个记录,并用创建日期加盖时间戳。提交请求时,Web服务器向所有工作人员发布消息。当工作人员收到一条消息时,它会检查它是否具有最新创建日期的工作人员:如果是,则会处理该消息,如果没有,则会自行终止。

这看起来很麻烦,但我想要正确,因为我可能会在我的应用程序的其他地方使用这种模式。

+0

如果您已经在使用Redis,则可以将其用作轻量级队列。这会将它从数据库中一起移除,而且速度非常快。 –

+0

听起来不错,但我还没有使用Redis。如果这是唯一不错的选择,我可能只需要把它弄糟。 – acjay

+0

如果您正在查看ZeroMQ并且您正在使用Python,则应该查看ZeroRPC。它有助于抽象出很多ZeroMQ的复杂性。 –

回答

0

事实证明,我决定只是每次循环都敲击数据库。但是我也决定,如果我想要努力实现这个目标,ZeroMQ就是一条非常有效的途径。以下是它的工作原理:

每个工作人员绑定一个ZeroMQ用户套接字,并将其自己注册到工作人员的数据库中,其中包含套接字的IP地址和端口。 Web线程将DO_TASK消息发布给最近注册的工作人员,并将QUIT消息发布给可能正在工作的任何其他人。

我在部署Dotcloud,他们的支持说使用custom service环境变量和构建选项可以让我打开必要的端口并获取工作任务实例的IP。

0

如果数据库中没有任务,如何将指数退避应用到工作作业的delay_time?这可能会使您足够减少数据库负载,而无需将消息从Web应用程序传输到工作程序的额外复杂性。例如:

delay_time = NORMAL_DELAY 

while True: 
    sleep(delay_time) 
    task = get_next_task() # hits database 

    if task: 
     try: 
      execute(task) 
     except RateExceeded: 
      pass 
     else: 
      delay_time = NORMAL_DELAY 
      continue 

    delay_time = backoff(delay_time) 
+0

问题在于当任务进入时我需要快速响应 – acjay

相关问题