2015-09-02 63 views
4
from celery import Celery 

app = Celery('tasks', backend='amqp://[email protected]//', broker='amqp://[email protected]//') 

a_num = 0 

@app.task 
def addone(): 
    global a_num 
    a_num = a_num + 1 
    return a_num 

这是我用来测试芹菜的代码。 我希望每次使用addone()时都会增加返回值。 但它总是1 为什么?芹菜不适用于全局变量

结果

python 
>> from tasks import addone 
>> r = addone.delay() 
>> r.get() 
    1 
>> r = addone.delay() 
>> r.get() 
    1 
>> r = addone.delay() 
>> r.get() 
    1 

回答

12

默认情况下,当一个工作器启动时,Celery以并发4启动它,这意味着它有4个进程开始处理任务请求。 (加上控制其他进程的进程)。我不知道使用什么算法将任务请求分配给为工作人员启动的进程,但最终,如果执行addone.delay().get()就够了,您会看到数字大于1 。会发生什么情况是每个进程(并非每个任务)都获得其自己的副本a_num。当我在这里尝试时,我的第五次执行addone.delay().get()返回2

您可以通过启动您的工作人员使用单个进程处理请求来强制每次增加数量。 (例如,celery -A tasks worker -c1)但是,如果您曾经重启过您的工作人员,则编号将重置为0.此外,我不会设计仅在处理请求的进程数量为1时才起作用的代码。同事决定多个进程应该处理这些任务的请求,然后事情就会中断。 (代码中注释中的大胖警告只能做很多事情。)

在一天结束时,这种状态应该在缓存中共享,如Redis或用作缓存的数据库,为你的问题中的代码。

然而,在评论你写道:

让我们看看我想用一个任务来送东西。我不想每次都连接任务,我想分享一个全球连接。

将连接存储在缓存中将不起作用。我强烈建议让Celery开始的每个流程都使用自己的连接,而不是尝试在流程之间共享。 每个新的任务请求都不需要重新打开连接。每个进程打开一次,然后该进程提供的每个任务请求重用连接。

在很多情况下,尝试在进程之间共享相同的连接(例如通过共享虚拟内存(例如,通过fork))将无法继续工作。连接通常带有状态(例如,数据库连接是否处于自动提交模式)。如果代码的两部分期望连接处于不同的状态,则代码将不一致地运行。

-2

的任务将每次运行异步所以它会启动新任务a_num将被设置为0。它们运行作为单独的实例。

如果你想使用值,我建议一个价值商店或某种数据库。

+0

我该如何分享价值?让我们看看我想用任务发送一些东西。我不想每次都连接任务,我想分享一个全球连接。所以任务可以使用相同的连接。 – xren

+0

我的猜测是你不能那样做,但可能会有一些我不知道的魔法。 – olofom

+0

它与异步有什么关系? – spacediver