2016-03-04 47 views
0

我正在构建一个依赖于芹菜来处理一些长时间运行的任务的Flask应用程序。一旦完成处理,每个任务将基本上将一个字典追加到共享列表中 - 该列表由芹菜工作者和Flask应用程序的路由共享。 Flask组件基本上由一组路径组成,用于检索共享列表的内容并修改元素的顺序。芹菜工和共享列表使用python的多处理

我瘦我已经成功地使用Python的多处理模块中的管理器共享芹菜工人之间的列表。但是,Flask应用程序无法看到对此列表所做的更改。下面是一个最小的应用程序,它说明了问题:

import os 
import json 

from flask import Flask 
from multiprocessing import Manager 
from celery import Celery 

application = Flask(__name__) 

redis_url = os.environ.get('REDIS_URL') 
if redis_url is None: 
    redis_url = 'redis://localhost:6379/0' 

# Set the secret key to enable cookies 
application.secret_key = 'some secret key' 
application.config['SESSION_TYPE'] = 'filesystem' 

# Redis and Celery configuration 
application.config['BROKER_URL'] = redis_url 
application.config['CELERY_RESULT_BACKEND'] = redis_url 

celery = Celery(application.name, broker=redis_url) 
celery.conf.update(BROKER_URL=redis_url, 
       CELERY_RESULT_BACKEND=redis_url) 

manager = Manager() 
shared_queue = manager.list() # THIS IS THE SHARED LIST 

@application.route("/submit", methods=['GET']) 
def submit_song(): 
    add_song_to_queue.delay() 
    return 'Added a song to the queue' 

@application.route("/playlist", methods=['GET', 'POST']) 
def get_playlist(): 
    playlist = [] 
    i = 0 
    queue_size = len(shared_queue) 
    while i < queue_size: 
     print(shared_queue[i]) 
     playlist.append(shared_queue[i]) 
    return json.dumps(playlist) 

@celery.task 
def add_song_to_queue(): 
    shared_queue.append({'some':'data!'}) 
    print(len(shared_queue)) 

if __name__ == "__main__": 
    application.run(host='0.0.0.0', debug=True) 

在芹菜日志,我可以清楚地看到字典被添加到列表中,而列表中的大小增加。但是,当我在浏览器上访问/播放列表路由时,我总是会得到一个空列表。

有谁知道我可以如何让列表在所有工作人员和Flask应用程序之间共享?

+2

我相信你有两个Python解释运行,所以你不能。您可以将播放列表存储在数据库中。这将是最好的选择任何方式。 –

+0

@JoeDoherty我从来没有考虑过我使用两个单独的解释器。我真的想避免使用数据库。您是否认为将Celery for Process(target = add_song_to_queue)调用换出可能有效?这就意味着我只会使用一位口译员。 – macalaca

+1

我想它会工作,但我不能推荐它。 Python不太适合多线程应用程序。我们通常采用更多流程进行扩展你对应用有多少流量?您可以使用像Redis这样的简单KV商店,而不是完整的RDBMS。 –

回答

1

我发现了一个解决方案,通过从Celery移开,并使用multiprocessing.Pool作为任务队列和通过Manager共享内存,如示例代码中所示。此链接有如何的解决方案可以与瓶集成一个很好的例子:http://gouthamanbalaraman.com/blog/python-multiprocessing-as-a-task-queue.html

from multiprocessing import Pool 
from flask import Flask 

app = Flask(__name__) 
_pool = None 

def expensive_function(x): 
     # import packages that is used in this function 
     # do your expensive time consuming process 
     return x*x 

@app.route('/expensive_calc/<int:x>') 
def route_expcalc(x): 
     f = _pool.apply_async(expensive_function,[x]) 
     r = f.get(timeout=2) 
     return 'Result is %d'%r 

if __name__=='__main__': 
     _pool = Pool(processes=4) 
     try: 
       # insert production server deployment code 
       app.run() 
     except KeyboardInterrupt: 
       _pool.close() 
       _pool.join()