基于 FIFO 的信号量解释

基于 FIFO 的信号量解释

我正在尝试对许多进程进行一些并行化(任务发送/在许多(比如说数百个)节点上执行)。我遇到了这个解决方案: https://unix.stackexchange.com/a/216475

    # initialize a semaphore with a given number of tokens
    open_sem(){
        mkfifo pipe-$$
        exec 3<>pipe-$$
        rm pipe-$$
        local i=$1
        for((;i>0;i--)); do
            printf %s 000 >&3
        done
    }
    
    # run the given command asynchronously and pop/push tokens
    run_with_lock(){
        local x
        # this read waits until there is something to read
        read -u 3 -n 3 x && ((0==x)) || exit $x
        (
         ( "$@"; )
        # push the return code of the command to the semaphore
        printf '%.3d' $? >&3
        )&
    }
    
    N=4
    open_sem $N
    for thing in {a..g}; do
        run_with_lock task $thing
    done 

我在这里需要一些解释:

open_sem()

  1. 做什么exec 3<>pipe-$$
  2. 为什么后来被删除了?

run_with_lock()

  1. && ((0==x)) || exit $x部分是什么意思?
  2. ( "$@"; )- 据我所知,这是所有传递的参数的列表......但它在这里做什么?

这些是我理解这个过程的主要障碍,但请随意解释整个流程:)

PS:我只是想在那篇文章下发表评论,但我刚刚注册,没有这样做的声誉。它也可能对其他人有用。谢谢! J。

答案1

先说总体思路

当您从 fifo 读取时,只有当有数据写入 fifo 时,读取命令才会完成。 fifo 上的任何读取命令都会导致脚本“挂起”,直到其他进程写入同一 fifo。

您可以将其用作信号量

  1. 仅在读命令完成后执行一些随机命令 X,并且
  2. 当命令完成时,您写入一次以允许另一个进程使用信号量完成其读取命令。

现在,为了允许 N 个初始进程启动,您可以使用 N 个写入 fifo 的命令来设置信号量。

因此,通过使用 N 个写入 fifo 的命令(在本例中绑定到文件描述符 3)来初始化“信号量”,您可以得到长度为 N 的队列。

重复:获取任何命令 X 并将其包装在一个函数中,f()在命令 X 之前的行上进行读取,在其之后进行写入,并将包装的命令作为后台作业发送 ( f X &)。因此,如果已经设置了 4 个写入命令,则第 4 个后台作业将执行,第 5 个将根据其read命令停止。因此,如果您在命令 X 之后的包装器中添加一个写入命令,则read只要前 4 个命令中的任何一个完成并执行它的包装,第五个命令 X 中的包装将能够完成(并在下一行执行 X)写命令。

open_sem()

  1. exec可用于重定向进程的输入和输出或替换当前进程,在这种情况下,它将文件描述符 3 重定向到(写入)和从(读取)fifo。<用于输入和>输出。如果你给出exec一个命令参数(这里没有完成),那么 bash shell 就会被破坏,它的 PID 会被命令替换。如果跳过此步骤,则读取和写入文件描述符 3 将不会创建 fifo 所具有的读取和写入锁定。
  2. 删除它不是必要的,而只是一种清理方法。

run_with_lock()

  1. read命令将写入 FIFO 的内容读取到变量中x。在示例中,写入 fifo 的命令会写入命令 X 的退出状态号,因此可用于检查前一个 X 命令是否成功退出(代码 0)。如果x变量不为 0,则退出,退出状态保存在变量 x 中。
  2. 它假定第一个参数run_with_lock()是命令,其余参数是该命令的参数。因此,("$@")&在发送到后台的子 shell 中执行带有参数的命令。

答案2

我认为有更简单的代码可以做到这一点。我部分基于提问者给出的示例,部分基于此处给出的代码https://www.mlo.io/blog/2012/06/13/parallel-processes-in-bash/,以及部分@methuselah-0 的解释。

# make a fifo pipe
mkfifo pipe
exec 3<>pipe
rm -f pipe

# fill with some number of values (will correspond to the number of spawned processes)
for i in `seq 7`; do
    { sleep $i; echo >&3; } &
done

# if a read from the queue succeeds, start a subprocess, otherwise wait
# for another to finish and restock the queue with a readable entry
for f in `ls /Users/pavelkomarov/data_cache`; do
    read <&3
    { python3 /Users/pavelkomarov/my_heavy_script.py -t $f; echo >&3; } &
done

相关内容