2009-01-21 90 views
42

我有一个200个命令的列表/队列,我需要在Linux服务器的shell中运行。从Linux上的命令队列并行处理(bash,python,ruby ... whatever)

我只希望一次运行最多10个进程(从队列中)。一些过程需要几秒钟才能完成,其他过程需要更长的时间。

当一个进程完成时,我希望下一个命令从队列中“弹出”并执行。

有没有人有解决这个问题的代码?

进一步阐述:

有200件作品需要做的,在某种队列。我最多只想做10件作品。当一个线程完成一项工作时,它应该向队列请求下一项工作。如果队列中没有更多的工作,线程就会死掉。当所有线程都已经死亡时,意味着所有的工作都已完成。

我试图解决的实际问题是使用imapsync将200个邮箱从旧邮件服务器同步到新邮件服务器。某些用户拥有大量邮箱并需要很长时间才能同步,其他用户的邮箱非常小,并且可以快速同步。

回答

38

我想你可以使用make和make -j xx命令来做到这一点。

也许这样

all : usera userb userc.... 

usera: 
     imapsync usera 
userb: 
     imapsync userb 
.... 

化妆-j 10 -f生成文件生成文件

+1

这个工作正如我所希望的那样。我写了一些代码来生成Makefile。它结束了超过1000行。谢谢! – mlambie 2009-01-21 07:01:36

+1

+1这确实很聪明! – progo 2011-05-15 17:56:58

-3

您能详细说明您的意思吗并行?这听起来像你需要在队列中实现某种锁定,所以你的输入不会被选中两次,等等,这些命令只运行一次。

大多数队列系统作弊 - 他们只是写一个巨大的待办事项列表,然后选择例如十个项目,工作,并选择下十个项目。没有并行化。

如果您提供更多详细信息,我相信我们可以帮助您。

7

GNU make(也可能是其他的实现)具有-j参数,它控制一次运行多少个作业。当一份工作完成时,make会开始另一个。

4

好吧,如果他们在很大程度上是相互独立的,我倒是觉得在以下方面:

Initialize an array of jobs pending (queue, ...) - 200 entries 
Initialize an array of jobs running - empty 

while (jobs still pending and queue of jobs running still has space) 
    take a job off the pending queue 
    launch it in background 
    if (queue of jobs running is full) 
     wait for a job to finish 
     remove from jobs running queue 
while (queue of jobs is not empty) 
    wait for job to finish 
    remove from jobs running queue 

注意的是,在主回路尾测试意味着,如果“工作运行队列”有空间while循环迭代时 - 防止循环的提前终止。我认为这个逻辑是正确的。

我可以很容易地看到如何在C中做到这一点 - 它在Perl中也不会那么困难(因此在其他脚本语言(Python,Ruby,Tcl等)中并不太难)。我完全不确定我是否想要在shell中执行它 - shell中的wait命令会等待所有孩子终止,而不是让某个孩子终止。

+0

您也可以对特定的子进程使用wait命令。可以给出任意数量的参数,其中每个参数都可以是一个PID或作业ID。 – 2013-10-01 14:56:55

+1

@BrianMinton:你说得对,你可以用`wait`列出特定的PID,但你仍然可以得到'所有人都死了'的行为,而不是'第一个死亡'这是这个代码真正需要的。 – 2013-10-01 15:10:30

42

在shell上,可以使用xargs对并行命令进行排队处理。例如,对于具有3个总是睡并联,睡眠对每个1秒,和在总执行10点睡觉做

echo {1..10} | xargs -d ' ' -n1 -P3 sh -c 'sleep 1s' _ 

而且,它还将睡总共4秒。如果您有名称的列表,并想通过名称来执行,再执行并行3个指令命令,请

cat names | xargs -n1 -P3 process_name 

将执行命令process_name aliceprocess_name bob等。

+0

哇,我一直使用`xargs`,从来没有料到它会有这个选项! – Warrick 2013-07-19 12:00:33

+0

对于第二个示例,您如何修改命令以使`process_name`可以使用多个参数?我想要做这样的事情:`cat commands.txt | xargs -n1 -P3 eval`其中`commands.txt`中有一堆命令(每行一个,每个都有多个参数)。问题是`eval`不起作用,因为它是一个shell内置命令 – Eddy 2015-07-23 10:12:33

3

在Python中,你可以尝试:

import Queue, os, threading 

# synchronised queue 
queue = Queue.Queue(0) # 0 means no maximum size 

# do stuff to initialise queue with strings 
# representing os commands 
queue.put('sleep 10') 
queue.put('echo Sleeping..') 
# etc 
# or use python to generate commands, e.g. 
# for username in ['joe', 'bob', 'fred']: 
# queue.put('imapsync %s' % username) 

def go(): 
    while True: 
    try: 
     # False here means no blocking: raise exception if queue empty 
     command = queue.get(False) 
     # Run command. python also has subprocess module which is more 
     # featureful but I am not very familiar with it. 
     # os.system is easy :-) 
     os.system(command) 
    except Queue.Empty: 
     return 

for i in range(10): # change this to run more/fewer threads 
    threading.Thread(target=go).start() 

未经测试...

(当然,python本身是单线程的,你仍然应该从wa中获得多线程的好处尽管如此,IO也是如此。)

1

Python的multiprocessing module似乎很适合您的问题。这是一个支持按流程进行线程化的高级包。

13

对于这种工作PPSS是这样写的:并行处理shell脚本。谷歌这个名字,你会发现它,我不会linkpam。

24

Parallel是为此目的而精心制作的。

cat userlist | parallel imapsync 

一个相对于其他的解决方案Parallel美女的是,它可以确保输出不混合。在Paralleltraceroute例如正常工作:

(echo foss.org.my; echo www.debian.org; echo www.freenetproject.org) | parallel traceroute 
0

zsh中的简单函数用于在不超过4个子壳中并行化作业,在/ tmp中使用锁定文件。

唯一的非平凡部分是在第一测试中的glob国旗:

  • #q:使文件名在测试
  • [4]通配符:返回第四结果仅
  • N:忽略错误空结果

应该很容易将其转换为posix,尽管它会稍微冗长些。

不要忘记在\"的工作中逃避任何报价。

#!/bin/zsh 

setopt extendedglob 

para() { 
    lock=/tmp/para_$$_$((paracnt++)) 
    # sleep as long as the 4th lock file exists 
    until [[ -z /tmp/para_$$_*(#q[4]N) ]] { sleep 0.1 } 
    # Launch the job in a subshell 
    (touch $lock ; eval $* ; rm $lock) & 
    # Wait for subshell start and lock creation 
    until [[ -f $lock ]] { sleep 0.001 } 
} 

para "print A0; sleep 1; print Z0" 
para "print A1; sleep 2; print Z1" 
para "print A2; sleep 3; print Z2" 
para "print A3; sleep 4; print Z3" 
para "print A4; sleep 3; print Z4" 
para "print A5; sleep 2; print Z5" 

# wait for all subshells to terminate 
wait