2016-11-04 44 views
0

但是当我刚离开我的电脑打开并运行一夜之间每隔50秒运行一次任务的芹菜时,我看到一些跳过了1小时。它实际上执行很好,除了意外的跳过。这是为什么发生?如何解决这个问题?芹菜跳过一小时的任务

这里的跳过登录我的工人-l信息

2016-11-03 10:13:36,264: INFO/MainProcess] Task core.tasks.sample[8efcedc5-1e08-41c4-80b9-1f82a9ddbaad] succeeded in 1.062010367s: None 
[2016-11-03 11:14:19,751: INFO/MainProcess] Received task: core.tasks.sample[ca9d6ef4-2cdc-4546-a9fb-c413541a80ee] 

下面的例子年代跳过日志中我拍-l信息的一个例子

[2016-11-03 10:13:35,199: INFO/MainProcess] Scheduler: Sending due task core.tasks.sample (core.tasks.sample) 
[2016-11-03 11:14:19,748: INFO/MainProcess] Scheduler: Sending due task core.tasks.sample (core.tasks.sample) 

这是我的任务的代码:

# 50 seconds 
@periodic_task(run_every=timedelta(**settings.XXX_XML_PERIODIC_TASK)) 
def sample(): 
    global GLOBAL_CURRENT_DATE 
    if cache.get('XXX_xml_today_saved_data') is None: 
     cache.set('XXX_xml_today_saved_data', []) 
    saved_data = cache.get('XXX_xml_today_saved_data') 
    ftp = FTP('xxxxx') 
    ftp.login(user='xxxxx', passwd='xxxxx') 
    ftp.cwd('XXX') 
    date_dir = GLOBAL_CURRENT_DATE.replace("-", "") 
    try: 
     ftp.cwd(date_dir) 
    except: 
     ftp.cwd(str(int(date_dir) - 1)) 
    _str = StringIO() 
    files = ftp.nlst() 
    if (GLOBAL_CURRENT_DATE != datetime.now().strftime("%Y-%m-%d") and 
      files == saved_data): 
     GLOBAL_CURRENT_DATE = datetime.now().strftime("%Y-%m-%d") 
     cache.delete('XXX_xml_today_saved_data') 
     return 
    print files 
    print "-----" 
    print saved_data 
    unsaved = list(set(files) - set(saved_data)) 
    print "-----" 
    print unsaved 
    if unsaved: 
     file = min(unsaved) 
     # modified_time = ftp.sendcmd('MDTM '+ file) 
     print file 
     ftp.retrbinary('RETR ' + file, _str.write) 
     xml = '<root>' 
     xml += _str.getvalue() 
     xml += '</root>' 
     if cache.get('XXX_provider_id') is None: 
      cache.set('XXX_provider_id', Provider.objects.get(code="XXX").id) 
     _id = cache.get('XXX_provider_id') 
     _dict = xmltodict.parse(xml, process_namespaces=True, 
           dict_constructor=dict, attr_prefix="") 
     row = _dict['root']['row'] 
     if type(_dict['root']['row']) == dict: 
      _dict['root']['row'] = [] 
      _dict['root']['row'].append(row) 
      row = _dict['root']['row'] 
     for x in row: 
      if cache.get('XXX_data_type_' + x['dataType']) is None: 
       obj, created = DataType.objects.get_or_create(code=x['dataType']) 
       obj, created = ProviderDataType.objects.get_or_create(provider_id=_id, data_type=obj) 
       if created: 
        cache.set('XXX_data_type_' + x['dataType'], obj.id) 
      _id = cache.get('XXX_data_type_' + x['dataType']) 
      obj, created = Transaction.objects.get_or_create(data=x, file_name=file, 
             provider_data_type_id=_id) 
      if created: 
       if x['dataType'] == "BR": 
        print "Transact" 
        br_transfer(**x) 
      else: 
       print "Not transacting" 

     saved_data.append(file) 
     cache.set('XXX_xml_today_saved_data', saved_data) 
    ftp.close() 

这里是我的芹菜CONFIGS在settings.py:

BROKER_URL = 'redis://localhost:6379' 
CELERY_RESULT_BACKEND = 'redis://localhost:6379' 
CELERY_ACCEPT_CONTENT = ['application/json'] 
CELERY_TASK_SERIALIZER = 'json' 
CELERY_RESULT_SERIALIZER = 'json' 
CELERY_TIMEZONE = 'Africa/Nairobi' 
XXX_XML_PERIODIC_TASK = {'seconds': 50} 

CACHES = { 
    'default': { 
     'BACKEND': 'redis_cache.RedisCache', 
     'LOCATION': 'localhost:6379', 
     'TIMEOUT': None, 
    }, 
} 

任何解释或建议?

我使用python 2.7.10和Django的1.10

+0

您是否尝试过增加更多的工人呢?如果您的任务触发时没有任何可用,则必须等到有一个可用时为止。 – rrauenza

+0

如何添加工人?我是新来的这个任务运行后台 –

+0

http://docs.celeryproject.org/en/latest/userguide/workers.html - 第一次尝试 - 并发性 – rrauenza

回答

1

可能有几个问题。当你的任务被触发时,最可能的是你的工作人员很忙。你可以通过让更多的工人来阻止这种情况docs解释单个工作人员的--concurrency选项,以及运行多个工作进程的选项。

您也可以将不同的工作人员连接到不同的项目,以便将某些任务分配给某些项目。即对于某些任务的专用队列:Starting worker with dynamic routing_key?

我还看到,工作人员可以预取任务并保留它们 - 但是如果它当前正在运行的任务运行过倒计时,那么您的任务可能会延迟。

你想在CELERYD_PREFETCH_MULTIPLIER读了起来:

+0

非常感谢!我必须去并行,通过我们的互联网断断续续的方式,有时会失败的任务,这也可能是一个原因,我是对的吗? –

+0

添加更多日志并找出? – rrauenza

+0

我实际上使用芹菜日志 –

1

芹菜工人弹出时,他们已经准备好从队列中的任务,但如果该任务有一个倒计时,将在平均时间弹出其他任务和等待时间通过做其他事情到期。它并不能保证任务将在当时运行,至少在那个时候或之后运行。

+0

那么我的解决方案是什么?是否有办法始终遵循设定的期间任务时间? –

+0

使用cron获得更好的保证 – theWanderer4865

+0

对不起,我对这些东西并不是很熟悉?你能提出一个替代我的代码吗? –