设置:我有一个 bash 系统,其中有一个从后台进程管道中生成的文件。在管道的第一阶段,有多个进程写入单个命名管道。在管道的下一阶段,有一个读取器进程,它读取命名管道并将数据传递给其他工作进程以对数据进行操作。 (参见下面的示例脚本)。如果通过 https 与远程资源通信确定数据尚未准备好,这些工作人员可以将数据返回到 stage2 管道。
问题:该设置在不平凡的负载下工作得很好,直到我在管道第二阶段的工作进程内添加了 ssh 调用。一旦我这样做了,通过命名管道的大量数据就开始消失。即使我将负载显着减少到只有之前工作负载的 2%,也会发生这种情况。
我做了一些阅读,发现了一些关于 ssh 客户端销毁 fds 的模糊引用,所以我不确定这是否相关。
环境:我目前在 Ubuntu 20.04 上使用 bash 5.0.17。我还在 Ubuntu 22.04 系统上进行了测试,并看到了相同的行为。
简化的脚本
#!/bin/bash
function stage1_worker()
{
# Do work including network calls via a 3rd party program which uses python
flock --exclusive $QUEUE -c "print \"%s\n\" $DATA > $QUEUE"
}
function stage2_reader_v1()
{
local QUEUE_DATA=""
while true ;
do
# logic
if read QUEUE_DATA ; then
stage2_worker $QUEUE_DATA &
fi
# logic
done < $QUEUE
}
function stage2_worker_v1()
{
local QUEUE_DATA=$1
local retry_needed=false
# logic
# Add back to the stage 2 queue if retry needed (reader rate limits to stage 2 worker to keep process count down)
if $retry_needed ; then
flock --exclusive $QUEUE -c "print \"%s\n\" $DATA > $QUEUE"
fi
}
QUEUE=$(mktemp -u)
mkfifo $QUEUE
trap "rm -f $QUEUE" EXIT
# Launch stage 2 reader in background process
stage2_reader &
# Launch several stage 1 workers...
stage1_worker $ARG &
# wait for all background processes to complete
wait
当我进行此更改时,突然我的队列开始丢失数据并且项目没有得到处理
function stage2_worker_v2()
{
local QUEUE_DATA=$1
local retry_needed=false
# logic
local IP_FROM_QUEUE_DATA=... #logic
ssh -o ConnectTimeout=1 -o BatchMode=yes user@$IP_FROM_QUEUE_DATA '<remote command' >/dev/null 2>&1
if [[ $? -ne 0 ]] ; then
#handle failure
fi
# Add back to the stage 2 queue if retry needed (reader rate limits to stage 2 worker to keep process count down)
if $retry_needed ; then
flock --exclusive $QUEUE -c "print \"%s\n\" $DATA > $QUEUE"
fi
}
在我寻求限制永久打开的 fds 时,我将阅读器更改为这样,这提高了吞吐量,但我仍然缺少数据
function stage2_reader_v1()
{
local QUEUE_DATA=""
while true ;
do
# logic
if read -t 1 QUEUE_DATA <> $QUEUE ; then
stage2_worker $QUEUE_DATA &
fi
# logic
done
}
从 stage2_worker_v2 中删除 ssh 调用会导致所有数据按预期流经管道。
如果您了解为什么会发生这种情况以及如何减轻这种情况,我将不胜感激。
我快速编写了这个简化的脚本,可能存在我的真实代码中不存在的小语法/命名错误。
答案1
它不应该干扰命名管道,但它确实会干扰标准输入,即尝试从中读取数据。当您调用 时stage2_worker
,标准输入将从管道重定向到进程ssh
。
while true; do
if read QUEUE_DATA ; then
stage2_worker $QUEUE_DATA &
fi
done < $QUEUE
function stage2_worker_v2()
{
...
ssh -o ConnectTimeout=1 -o BatchMode=yes user@$IP
ssh
要么从其他地方重定向标准输入ssh ... < /dev/null
(或者只是使用ssh -n
本质上相同的功能),要么在 while 循环中进行重定向,这样它就不会干扰循环内内容的标准输入。有一些变化:
告诉read
从 fd 3 读取,并将输入重定向到那里(Bash):
while true; do
if read QUEUE_DATA -u 3; then
stage2_worker $QUEUE_DATA &
fi
done 3< $QUEUE
相同,但有另一个重定向:
while true; do
if read QUEUE_DATA <&3; then
stage2_worker $QUEUE_DATA &
fi
done 3< $QUEUE
或者仅从管道进行重定向read
:
while true; do
if read QUEUE_DATA < $QUEUE; then
stage2_worker $QUEUE_DATA &
fi
done
在任何情况下,您可能需要考虑引用这些变量扩展,以防万一您的任何值包含空格或通配符。另外,声明函数的标准方法是funcname()
while ,function funcname()
它是 ksh 风格和标准方法的混合,并且仅在 Bash (AFAIR) 中受支持。