2010-11-27 52 views
14

在线搜索了一些内容,找到了简单的'tutorials'来使用命名管道。但是,当我做任何背景工作时,我似乎失去了大量的数据。在bash中使用命名管道 - 数据丢失问题

[[编辑:找到一个更简单的解决方案,请参阅回复帖子。所以我提出的问题现在是学术 - 如果有人可能需要一个工作服务器]]

使用Ubuntu 10.04与Linux 2.6.32-25-generic#45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64的GNU/Linux的

GNU的bash,版本4.1.5(1)-release下(x86_64-PC-Linux的GNU)。

我的bash的功能是:

function jqs 
{ 
    pipe=/tmp/__job_control_manager__ 
    trap "rm -f $pipe; exit" EXIT SIGKILL 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    while true 
    do 
    if read txt <"$pipe" 
    then 
     echo "$(date +'%Y'): new text is [[$txt]]" 

     if [[ "$txt" == 'quit' ]] 
     then 
    break 
     fi 
    fi 
    done 
} 

我在后台运行,这样的:

> jqs& 
[1] 5336 

现在我喂它:

for i in 1 2 3 4 5 6 7 8 
do 
    (echo aaa$i > /tmp/__job_control_manager__ && echo success$i &) 
done 

输出是不一致的。 我经常没有得到所有成功的回声。 我最多可以获得与成功回声一样多的新文本回声,有时甚至更少。

如果我从“饲料”去除“&”,似乎工作,但我阻止,直到输出被读取。因此,我想让子流程被阻止,但不是主流程。

其目的是写一个简单的作业控制脚本,这样我可以运行在说最并联10个就业机会,并排队等待稍后处理的休息,但可靠地知道,他们运行。

全部工作如下经理:

function jq_manage 
{ 
    export __gn__="$1" 

    pipe=/tmp/__job_control_manager_"$__gn__"__ 
    trap "rm -f $pipe" EXIT 
    trap "break"  SIGKILL 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    while true 
    do 
    date 
    jobs 
    if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__)) 
    then 
     echo "Waiting for new job" 
     if read new_job <"$pipe" 
     then 
    echo "new job is [[$new_job]]" 

    if [[ "$new_job" == 'quit' ]] 
    then 
     break 
    fi 

    echo "In group $__gn__, starting job $new_job" 
    eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &" 
     fi 
    else 
     sleep 3 
    fi 
    done 
} 

function jq 
{ 
    # __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs) 
    # __jN__ = second parameter to this function, the maximum of job numbers to run concurrently 

    export __gn__="$1" 
    shift 
    export __jN__="$1" 
    shift 

    export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l) 
    if (($__jq__ '<' 1)) 
    then 
    eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &" 
    fi 

    pipe=/tmp/__job_control_manager_"$__gn__"__ 

    echo [email protected] >$pipe 
} 

调用

jq <name> <max processes> <command> 
jq abc 2 sleep 20 

将启动一个进程。 这部分工作正常。开始第二个,很好。手工一个接一个似乎很好。 但是,在循环中启动10似乎会丢失系统,如上面更简单的示例。

任何提示,以我能做些什么来解决IPC数据的这种明显的损失,将不胜感激。

问候, 阿兰。

+0

查看[second 2018编辑为*如何设置一个变量来自命令输出*](https://stackoverflow.com/a/41236640/1765658)或[GitHub.com:Connector-的bash](https://github.com/F-Hauri/Connector-bash)。我*连接* subprocess hoding *工具*到我目前的shell会话。 – 2018-02-12 15:59:24

回答

26

你的问题是下面if声明:

while true 
do 
    if read txt <"$pipe" 
    .... 
done 

正在发生的事情是,你作业队列服务器开启,每次关闭管道周围循环。这意味着某些客户在尝试写入管道时会出现“断开管道”的错误 - 也就是说,在编写者打开管道后,管道的读者会消失。

为了解决这个问题,改变你在服务器循环打开管道一旦整个循环:

while true 
do 
    if read txt 
    .... 
done < "$pipe" 

使用这种方式时,管道被打开一次,并保持开放。

您将需要小心在循环中运行的内容,因为循环内的所有处理都将stdin附加到命名管道。你会希望确保你从其他地方重定向循环内部的所有进程的stdin,否则它们可能会从管道中消耗数据。

编辑:现在的问题是,当最后一个客户端关闭管道时,读取EOF时,可以使用jilles方法来复制文件描述符,或者您也可以确保您是客户端,保持管道的开启的侧写:

while true 
do 
    if read txt 
    .... 
done < "$pipe" 3> "$pipe" 

这将持有的管道写侧FD 3.打开相同的原则同样适用与此文件描述符:标准输入。您将需要关闭它,以便任何子进程不会继承它。这可能比stdin更不重要,但它会更干净。

+0

哇,很好的答案。说得通。谢谢。马上试试。 – asoundmove 2010-11-27 17:16:42

0

一方面这个问题比我想象的要糟糕: 现在我的更复杂的例子(jq_manage)似乎有一个例子,其中相同的数据从管道一遍又一遍地读取(即使没有新的数据正在写入它)。

在另一方面,我发现了一个简单的解决方案(编辑以下丹尼斯的评论):

function jqn # compute the number of jobs running in that group 
{ 
    __jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l) 
} 

function jq 
{ 
    __groupn__="$1"; shift # job group name (the pool within which to allocate $__jmax__ jobs) 
    __jmax__="$1"; shift # maximum of job numbers to run concurrently 

    jqn 
    while (($__jqty__ '>=' $__jmax__)) 
    do 
    sleep 1 
    jqn 
    done 

    eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; [email protected]) &" 
} 

就像一个魅力。 不涉及插座或管道。 简单。

+1

没有理由导出`__jqty__`(或任何原始导出)。你为什么直接回应`/ dev/null`?为什么使用`eval`?为什么不只是做`$ @&`?没有必要引用`> =`。我同意camh的回答。 – 2010-11-27 15:32:24

+0

这一切归结为读取和过滤ps的输出。回声/ dev/null,因为我实际上不需要输出,我只想在'ps'的输出中输入正确的字符串。同eval一样,否则ps显示变量名称,而不是扩展变量,eval执行扩展。我以前从未使用((...)),所以感谢您指出我不需要引号,我只是在某个地方阅读了一个示例,并且也感谢导出,这是以前的遗留问题更复杂的脚本具有子流程并且需要导出。 – asoundmove 2010-11-27 17:14:13

+0

对不起,我的意思是'工作',而不是'ps' – asoundmove 2010-11-27 17:45:48

1

像camh &丹尼斯威廉姆森说不要打破管道。

现在我有更小的例子,直接在命令行上:

服务器:

(
    for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9}; 
    do 
    if read s; 
     then echo ">>$i--$s//"; 
    else 
     echo "<<$i"; 
    fi; 
    done < tst-fifo 
)& 

客户:

(echo "Test-$i" > tst-fifo&); 

(
    for i in {%a,#b}{1,2}{0,1}; 
    do 
    echo "Test-$i" > tst-fifo; 
    done 
)& 

可以用替换键行

所有客户发送到管道的数据会被读取,但使用客户端的选项2可能需要在读取所有数据之前启动服务器几次。

虽然读取等待数据在管道中开始,一旦数据被压入,它将永远读取空字符串。

任何方法来阻止它?

再次感谢您的任何见解。

6

正如其他答案中所说的,您需要始终保持fifo打开状态以避免丢失数据。

但是,一旦所有编写者在fifo打开后(所以有一个编写者)都离开了,读取立即返回(并且poll()返回POLLHUP)。清除这个状态的唯一方法是重新开放fifo。

POSIX并没有提供解决方案,但至少Linux和FreeBSD是这样做的:如果读取失败,再次打开fifo,同时保持原始描述符处于打开状态。这是有效的,因为在Linux和FreeBSD中,“挂断”状态对于特定的打开文件描述是本地的,而在POSIX中对于fifo是全局的。

这可以在一个shell脚本来完成这样的:

while :; do 
    exec 3<tmp/testfifo 
    exec 4<&- 
    while read x; do 
     echo "input: $x" 
    done <&3 
    exec 4<&3 
    exec 3<&- 
done 
1

只是对于那些可能会感兴趣,[重新编辑]由CAMH和jilles以下意见,这里有两个新版本测试服务器脚本。

这两个版本现在的工作原理完全一样。

CAMH的版本管道管理:

function jqs # Job queue manager 
{ 
    pipe=/tmp/__job_control_manager__ 
    trap "rm -f $pipe; exit" EXIT TERM 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    while true 
    do 
    if read -u 3 txt 
    then 
     echo "$(date +'%Y'): new text is [[$txt]]" 

     if [[ "$txt" == 'quit' ]] 
     then 
    break 
     else 
     sleep 1 
     # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand 
     fi 
    fi 
    done 3< "$pipe" 4> "$pipe" # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF 
} 

JILLE的版本管道管理:

function jqs # Job queue manager 
{ 
    pipe=/tmp/__job_control_manager__ 
    trap "rm -f $pipe; exit" EXIT TERM 

    if [[ ! -p "$pipe" ]]; then 
     mkfifo "$pipe" 
    fi 

    exec 3< "$pipe" 
    exec 4<&- 

    while true 
    do 
    if read -u 3 txt 
    then 
     echo "$(date +'%Y'): new text is [[$txt]]" 

     if [[ "$txt" == 'quit' ]] 
     then 
    break 
     else 
     sleep 1 
     # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand 
     fi 
    else 
     # Close the pipe and reconnect it so that the next read does not end up returning EOF 
     exec 4<&3 
     exec 3<&- 
     exec 3< "$pipe" 
     exec 4<&- 
    fi 
    done 
} 

感谢所有您的帮助。

0

运行说最多并联10个就业机会,并排队等待稍后处理的休息,但可靠地知道,他们跑

你可以用GNU并行做到这一点。你不需要这个脚本。

http://www.gnu.org/software/parallel/man.html#options

您可以设置最大,特效“jobslots的数量。并行运行多达N个就业机会。”有一个选项可以设置要使用的CPU内核数量。您可以将已执行作业的列表保存到日志文件中,但这是一个测试版功能。