2016-11-09 14 views
0

我试图实现的内容 编写一个调度程序,它使用数据库在不同的时间安排类似的任务。使用芹菜节拍在多个时间点(使用不同参数)调度任务

,因为我是用芹菜节拍相同,下面的代码片段会给一个想法

try: 
    reader = MongoReader() 
except: 
    raise 
try: 
    tasks = reader.get_scheduled_tasks() 
except: 
    raise 
celerybeat_schedule = dict() 
for task in tasks: 
    celerybeat_schedule[task["task_id"]] =dict() 
    celerybeat_schedule[task["task_id"]]["task"] = task["task_name"] 
    celerybeat_schedule[task["task_id"]]["args"] = (task,) 
    celerybeat_schedule[task["task_id"]]["schedule"] = get_task_schedule(task) 

app.conf.update(BROKER_URL=rabbit_mq_endpoint, CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], CELERYBEAT_SCHEDULE=celerybeat_schedule) 

所以这些都是三个步骤 - 创建一个字典,芹菜调度是 - 从数据存储 阅读所有任务通过具有特性,TASK_NAME(将运行方法),参数的所有任务填充(数据传递给该方法),(当运行存储)时间表 - 芹菜配置

预期场景 给出的所有ENTR更新此IES运行,只是打印,具有相同的时间表来运行每5分钟,具有不同的参数指定要打印的内容相同芹菜任务名称,可以说DB有

task name  , parameter , schedule 
regular_print , Hi  , {"minutes" : 5} 
regular_print , Hello  , {"minutes" : 5} 
regular_print , Bye  , {"minutes" : 5} 

我希望,这些将每5分钟打印打印所有三个

发生 只有喜的一个什么,你好,再见打印(可能随机,肯定不是按顺序)

请帮帮忙, 感谢很多提前:)

回答

0

w ^如能使用芹菜4版本解决这个问题。样品类似于我工作..也可以进行版本找到文档由芹菜4

#taking address and user-pass from environment(you can mention direct values) 
    ex_host_queue = os.environ["EX_HOST_QUEUE"] 
    ex_port_queue = os.environ["EX_PORT_QUEUE"] 
    ex_user_queue = os.environ["EX_USERID_QUEUE"] 
    ex_pass_queue = os.environ["EX_PASSWORD_QUEUE"] 
    broker= "amqp://"+ex_user_queue+":"+ex_pass_queue+"@"+ex_host_queue+":"+ex_port_queue+"//" 

    #celery initialization 
    app = Celery(__name__,backend=broker, broker=broker) 
    app.conf.task_default_queue = 'scheduler_queue' 
    app.conf.update(
     task_serializer='json', 
     accept_content=['json'], # Ignore other content 
     result_serializer='json' 
    ) 
task = {"task_id":1,"a":10,"b":20} 
##method to update scheduler 
def add_scheduled_task(task): 
    print("scheduling task") 
    del task["_id"] 
    print("adding task_id") 
    name = task["task_name"] 
    app.add_periodic_task(timedelta(minutes=1),add.s(task), name = task["task_id"])  

@app.task(name='scheduler_task') 
def scheduler_task(data): 
    print(str(data["a"]+data["b"]))