从并行生成的其他三个流中创建单个输出流

从并行生成的其他三个流中创建单个输出流

我有三种不同格式的数据;对于每种数据类型,都有一个 Python 脚本将其转换为单一的统一格式。

这个 Python 脚本速度很慢并且受 CPU 限制(对于多核机器上的单个核心),因此我想运行它的三个实例 - 每种数据类型一个 - 并将它们的输出组合起来以将其传递到sort.基本上相当于这样:

{ ./handle_1.py; ./handle_2.py; ./handle_3.py } | sort -n

但三个脚本并行运行。

我发现这个问题其中 GNUsplit用于在处理流的脚本的 n 个实例之间循环某些标准输出流。

从分割手册页:

-n, --number=CHUNKS
          generate CHUNKS output files.  See below
CHUNKS  may be:
 N       split into N files based on size of input
 K/N     output Kth of N to stdout
 l/N     split into N files without splitting lines
 l/K/N   output Kth of N to stdout without splitting lines
 r/N     like 'l'  but  use  round  robin  distributio

所以该r/N命令意味着“没有分割线”。

基于此,似乎以下解决方案应该是可行的:

split -n r/3 -u --filter="./choose_script" << EOF
> 1
> 2
> 3
> EOF

这是在哪里choose_script

#!/bin/bash
{ read x; ./handle_$x.py; }

不幸的是,我看到一些行的混合 - 以及许多不应该出现的换行符。

例如,如果我用一些简单的 bash 脚本替换我的 Python 脚本,这些脚本可以执行以下操作:

#!/bin/bash
# ./handle_1.sh
while true; echo "1-$RANDOM"; done;

#!/bin/bash
# ./handle_2.sh
while true; echo "2-$RANDOM"; done;

#!/bin/bash
# ./handle_3.sh
while true; echo "3-$RANDOM"; done;

我看到这个输出:

1-8394

2-11238
2-22757
1-723
2-6669
3-3690
2-892
2-312511-24152
2-9317
3-5981

这很烦人 - 根据我上面粘贴的手册页摘录,它应该保持行完整性。

显然,如果我删除该-u参数,它会起作用,但随后它会被缓冲,并且我会耗尽内存,因为它会缓冲除其中一个脚本之外的所有脚本的输出。

如果有人在这里有一些见解,我们将不胜感激。我已经超出了我的深度。

答案1

尝试使用 GNU 并行的 -u 选项。

echo "1\n2\n3" | parallel -u -IX ./handle_X.sh

这会并行运行它们,而不缓冲整个进程。

答案2

尝试:

parallel ::: ./handle_1.py ./handle_2.py ./handle_3.py

如果handle_1.py采用文件名:

parallel ::: ./handle_1.py ./handle_2.py ./handle_3.py ::: files*

您不希望输出混合,因此不要使用 -u。

如果您想保持顺序(因此所有handle_1输出都在handle_2之前,因此您可能能够避免排序):

parallel -k  ::: ./handle_1.py ./handle_2.py ./handle_3.py ::: files*

如果您仍然希望对其进行排序,您可以并行排序并利用sort -m

parallel --files "./handle_{1}.py {2} | sort -n"  ::: 1 2 3 ::: files* | parallel -j1 -X sort -m

将 $TMPDIR 设置为足够大以容纳输出的目录。

答案3

也许我错过了一些东西,但你不能这样做:

(./handle_1.py & ./handle_2.py & ./handle_3.py) | sort -n

如果您希望每个进程中的行不被交错,那么更容易的方法可能是确保进程本身将它们完全写入,并可能禁用输出缓冲,因为write只要它们不大于管道,就保证管道的原子性PIPE_BUF。例如,您可以确保使用输出缓冲 à lastdio并在写入一行或几行后调用fflush或任何等效的内容。python

如果你无法修改 python 脚本,你可以这样做:

lb() { grep --line-buffered '^'; }

(使用 GNU grep)或:

lb() while IFS= read -r l; do printf '%s\n' "$l"; done

(如果命令输出不是文本,请参阅下面注释中的注释)

并做:

(./handle_1.py | lb & ./handle_2.py | lb & ./handle_3.py | lb) | sort -n

避免这 3 个lb进程的另一种选择是对一个命令使用三个管道,该命令使用select/poll来查看来自哪里的输出并将其提供给sort基于行的输出,但这需要一些编程。

答案4

Flowbok 的答案是正确的解决方案。奇怪的是,如果 GNU 的输出parallel直接输出到文件,则它会被破坏,但如果它输出到 tty,则不会。

幸运的是,script -c可以模仿tty。

仍然是三个脚本:

#!/bin/bash
# handle_1.sh
while true; do echo "1-$RANDOM$RANDOM$RANDOM$RANDOM"; done

#!/bin/bash
# handle_2.sh
while true; do echo "2-$RANDOM$RANDOM$RANDOM$RANDOM"; done

#!/bin/bash
# handle_3.sh
while true; do echo "3-$RANDOM$RANDOM$RANDOM$RANDOM"; done

然后有一个文件封装了对并行的调用:

#!/bin/bash
# run_parallel.sh
parallel -u -I N ./handle_N.sh ::: "1" "2" "3"

然后我这样称呼它:

script -c ./run_parallel > output

输出中的行在不同脚本的输出之间逐行混合,但它们不会在给定行上被破坏或交错。

奇怪的行为parallel- 我可能会提交错误报告。

相关内容