2017-09-13 117 views
0

我被这个问题困住了。运行并行任务SciLuigi

我正在使用SciLuigi框架进行任务管理,但我无法运行并行任务。

class Workflow(sl.WorkflowTask): 

    def workflow(self): 
     task1 = self.new_task('task 1', Task1) 
     task2 = self.new_task('task 2', Task2) 
     next_task = sl.new_task('next task', NextTask) 
     next_task.in_foo = task1.out_foo 
     next_task.in_foo = task2.out_foo 
     return next_task 

class Task1(sl.Task): 
    # No inputs.. just define outputs 

    def out_foo(self): 
     return sl.TargetInfo(self, 'foo1.txt')) 

    def run(self): 
     ... 

class Task2(sl.Task): 
    # No inputs.. just define outputs 

    def out_foo(self): 
     return sl.TargetInfo(self, 'foo2.txt')) 

    def run(self): 
     ... 

class NextTask(sl.Task): 
    # Input 
    in_foo = None 

    def out_foo(self): 
     return sl.TargetInfo(self, 'foo3.txt')) 

    def run(self): 
     ... 

sl.run(main_task_cls=Workflow, cmdline_args=['--workers=2']) 

将不胜感激。

干杯, 迭戈

回答

0

好,我找到了解决方案。

为了在平行我必须使用在NextTask两个不同的输入运行任务1和任务2,如下所示:

class Workflow(sl.WorkflowTask): 

    def workflow(self): 
     ... 
     next_task.in_foo1 = task1.out_foo 
     next_task.in_foo2 = task2.out_foo 
     ... 


    class NextTask(sl.Task): 
     # Input 
     in_foo1 = None 
     in_foo2 = None