2017-01-30 148 views
4

我有一个API,它返回其他API的列表。执行其他芹菜任务不工作的芹菜周期性任务

我需要访问这些API每隔15分钟,并把返回到数据库中的数据。

下面是我用芹菜和Redis的在celery_worker.py文件中写道。但是所有的任务都没有开始。

list_of_APIs = requests.get(the_api_that_returns_list_of_APIs).json() 

CELERYBEAT_SCHEDULE = { 
    'every-15-minute': { 
     'task': 'fetch_data_of_all_APIs', 
     'schedule': timedelta(minutes=15), 
    }, 
} 

@celery.task 
def access_one_API(one_API): 
    return requests.get(one_API).json() 

@celery.task(name='fetch_data_of_all_APIs') 
def fetch_data_of_all_APIs(): 
    for one_API in list_of_APIs: 
      task = access_one_API.delay(one_API) 
      # some codes to put all task.id into a list_of_task_id 

    for task_id in list_of_task_id: 
      # some codes to get the results of all tasks 
      # some codes to put all the results into a database 

fetch_data_of_all_APIs功能应每15分钟,这是应该使用多个工人跑access_one_API功能

芹菜服务器启动成功的终端但既不fetch_data_of_all_APIs也不access_one_API开始运行。

如果我提取fetch_data_of_all_APIs函数中的代码,access_one_API可以启动并由多个芹菜工作人员执行。但只要我将这些代码放在一个函数中并用@celery.task来修饰它,那么这两个函数都不会启动。

所以我相信它一定与芹菜有关。

非常感谢提前。

+0

请注意,您需要'@ celery.task()'装饰器。另外,您需要检查'celery-beat'配置参数,因为当前的芹菜版本使用小写设置。 –

回答

0

这里例如如何配置周期性任务与子任务芹菜(我设置20秒示范)。 tasks.py:

import celery 
from celery.canvas import subtask 
from celery.result import AsyncResult 
# just for example list of integer values 
list_of_APIs = [1, 2, 3, 4] 


@celery.task(name='access_one_API') 
def access_one_API(api): 
    """ 
    Sum of subtask for demonstration 
    :param int api: 
    :return: int 
    """ 
    return api + api 


@celery.task(name='fetch_data_of_all_APIs') 
def fetch_data_of_all_APIs(list_of_APIs): 
    list_task_ids = [] 

    for api in list_of_APIs: 
     # run of celery subtask and collect id's of subtasks 
     task_id = subtask('access_one_API', args=(api,)).apply_async().id 
     list_task_ids.append(task_id) 

    result_sub_tasks = {} 

    for task_id in list_task_ids: 
     while True: 
      task_result = AsyncResult(task_id) 
      if task_result.status == 'SUCCESS': 
       # if subtask is finish add result and check result of next subtask 
       result_sub_tasks[task_id] = task_result.result 

       break 

    print result_sub_tasks 
    # do something with results of subtasks here... 


app = celery.Celery(
    'tasks', 
    broker='redis://localhost:6379/0', 
    backend='redis://localhost:6379/0', 
) 


app.conf.beat_schedule = { 
    'add-every-20-seconds': { 
     'task': 'fetch_data_of_all_APIs', 
     'schedule': 20.0, 
     # args for fetch_data_of_all_APIs 
     'args': (list_of_APIs,) 
    }, 
} 

运行芹菜:从终端celery worker -A tasks.app --loglevel=info --beat

跟踪:

[2017-03-14 10:31:36,361: WARNING/PoolWorker-3] {'929996b3-fc86-4274-b3c3-06c38a6d4edd': 6, 'f44456b4-df93-4a78-9f1d-b2c2d2b05322': 4, '4e44fd57-fbbc-43cd-8616-1eafef559417': 8, '6d943f35-0d74-4319-aa02-30a266aa3cd9': 2} 

希望这有助于。