我正在尝试对许多进程进行一些并行化(任务发送/在许多(比如说数百个)节点上执行)。我遇到了这个解决方案: https://unix.stackexchange.com/a/216475
# initialize a semaphore with a given number of tokens
open_sem(){
mkfifo pipe-$$
exec 3<>pipe-$$
rm pipe-$$
local i=$1
for((;i>0;i--)); do
printf %s 000 >&3
done
}
# run the given command asynchronously and pop/push tokens
run_with_lock(){
local x
# this read waits until there is something to read
read -u 3 -n 3 x && ((0==x)) || exit $x
(
( "$@"; )
# push the return code of the command to the semaphore
printf '%.3d' $? >&3
)&
}
N=4
open_sem $N
for thing in {a..g}; do
run_with_lock task $thing
done
我在这里需要一些解释:
open_sem()
- 做什么
exec 3<>pipe-$$
? - 为什么后来被删除了?
run_with_lock()
- 这
&& ((0==x)) || exit $x
部分是什么意思? ( "$@"; )
- 据我所知,这是所有传递的参数的列表......但它在这里做什么?
这些是我理解这个过程的主要障碍,但请随意解释整个流程:)
PS:我只是想在那篇文章下发表评论,但我刚刚注册,没有这样做的声誉。它也可能对其他人有用。谢谢! J。
答案1
先说总体思路
当您从 fifo 读取时,只有当有数据写入 fifo 时,读取命令才会完成。 fifo 上的任何读取命令都会导致脚本“挂起”,直到其他进程写入同一 fifo。
您可以将其用作信号量
- 仅在读命令完成后执行一些随机命令 X,并且
- 当命令完成时,您写入一次以允许另一个进程使用信号量完成其读取命令。
现在,为了允许 N 个初始进程启动,您可以使用 N 个写入 fifo 的命令来设置信号量。
因此,通过使用 N 个写入 fifo 的命令(在本例中绑定到文件描述符 3)来初始化“信号量”,您可以得到长度为 N 的队列。
重复:获取任何命令 X 并将其包装在一个函数中,f()
在命令 X 之前的行上进行读取,在其之后进行写入,并将包装的命令作为后台作业发送 ( f X &
)。因此,如果已经设置了 4 个写入命令,则第 4 个后台作业将执行,第 5 个将根据其read
命令停止。因此,如果您在命令 X 之后的包装器中添加一个写入命令,则read
只要前 4 个命令中的任何一个完成并执行它的包装,第五个命令 X 中的包装将能够完成(并在下一行执行 X)写命令。
open_sem()
exec
可用于重定向进程的输入和输出或替换当前进程,在这种情况下,它将文件描述符 3 重定向到(写入)和从(读取)fifo。<
用于输入和>
输出。如果你给出exec
一个命令参数(这里没有完成),那么 bash shell 就会被破坏,它的 PID 会被命令替换。如果跳过此步骤,则读取和写入文件描述符 3 将不会创建 fifo 所具有的读取和写入锁定。- 删除它不是必要的,而只是一种清理方法。
run_with_lock()
- 该
read
命令将写入 FIFO 的内容读取到变量中x
。在示例中,写入 fifo 的命令会写入命令 X 的退出状态号,因此可用于检查前一个 X 命令是否成功退出(代码 0)。如果x
变量不为 0,则退出,退出状态保存在变量 x 中。 - 它假定第一个参数
run_with_lock()
是命令,其余参数是该命令的参数。因此,("$@")&
在发送到后台的子 shell 中执行带有参数的命令。
答案2
我认为有更简单的代码可以做到这一点。我部分基于提问者给出的示例,部分基于此处给出的代码https://www.mlo.io/blog/2012/06/13/parallel-processes-in-bash/,以及部分@methuselah-0 的解释。
# make a fifo pipe
mkfifo pipe
exec 3<>pipe
rm -f pipe
# fill with some number of values (will correspond to the number of spawned processes)
for i in `seq 7`; do
{ sleep $i; echo >&3; } &
done
# if a read from the queue succeeds, start a subprocess, otherwise wait
# for another to finish and restock the queue with a readable entry
for f in `ls /Users/pavelkomarov/data_cache`; do
read <&3
{ python3 /Users/pavelkomarov/my_heavy_script.py -t $f; echo >&3; } &
done