2013-03-05 78 views
6

我需要通过ftp下载文件,将其更改并上传回去。我使用的芹菜要做到这一点,但我遇到了问题,尝试使用链接的时候,我在哪里得到:顺序连续芹菜链接任务

TypeError: upload_ftp_image() takes exactly 5 arguments (6 given)

而且,我可以使用链和放心,步骤将是连续的?如果不是什么替代方案?

res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async() 
print res.get() 

任务:

@task() 
def download_ftp_image(ftp_server, username , password , filename, directory): 
    try: 
     ftp = FTP(ftp_server) 
     ftp.login(username, password) 
     if not os.path.exists(directory): 
      os.makedirs(directory) 
      ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write) 
     else: 
      ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write) 
     ftp.quit() 
    except error_perm, resp: 
     raise download_ftp_image.retry(countdown=15) 

    return "SUCCESS: " 

@task() 
def upload_ftp_image(ftp_server, username , password , file , directory): 
    try: 
     ftp = FTP(ftp_server) 
     ftp.login(username, password) 
     new_file= file.replace(directory, "") 
     directory = directory.replace("tmp","") 
     try: 
      ftp.storbinary("STOR " + directory + new_file , open(file, "rb")) 
     except: 
      ftp.mkd(directory) 
      ftp.storbinary("STOR " + directory + new_file, open(file, "rb")) 
     ftp.quit() 
    except error_perm, resp: 
     raise upload_ftp_image.retry(countdown=15) 

    return "SUCCESS: " 

,是这样的好的或坏的做法对我的具体情况? :

result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data') 
result.get() 
result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data') 
#result.get() 

回答

13

A链总是通过以前结果作为第一个参数。从chains documentation

The linked task will be applied with the result of its parent task as the first argument, which in the above case will result in mul(4, 16) since the result is 4.

upload_ftp_image任务不接受这种额外的参数,因此失败。

你有一个很好的用例链接;第二项任务是保证被称为第一项任务完成(否则结果无法传递)。

只需添加一个参数从以前的任务的结果:

def upload_ftp_image(download_result, ftp_server, username , password , file , directory): 

你可以做一些使用该结果值;也许使下载方法返回下载文件的路径,以便上传方法知道要上传什么?

+0

我应该怎么做呢? – psychok7 2013-03-05 13:16:47

+0

@ psychok7:展开一点。 – 2013-03-05 13:20:04

+0

似乎我得到它的工作:) ..谢谢你帮忙 – psychok7 2013-03-05 13:31:45

17

如果您不希望将前一个任务的返回值用作参数,则另一种选择是使用“不变性”。

http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability

而不是定义您的子任务为:

download_ftp_image.s(...) and upload_ftp_image.s(...) 

它们定义为:

download_ftp_image.si(...) and upload_ftp_image.si(...) 

而且你可以在一个链与通常数量的参数现在使用的任务。