我有一个处理标准输入上的文件列表的任务。程序的启动时间很长,每个文件所花费的时间差异很大。我想产生大量这样的进程,然后将工作分派给不忙的进程。有几种不同的命令行工具几乎可以满足我的要求,我将其范围缩小到两个几乎可以工作的选项:
find . -type f | split -n r/24 -u --filter="myjob"
find . -type f | parallel --pipe -u -l 1 myjob
问题在于,它split
是纯粹的循环,因此其中一个进程落后并停留在后面,从而延迟了整个操作的完成;虽然parallel
想要每 N 行或 N 字节的输入生成一个进程,但我最终在启动开销上花费了太多时间。
是否有类似的东西可以重用进程并将线路提供给具有未阻塞标准输入的进程?
答案1
对于 GNU Parallel,您可以使用 --block 设置块大小。但是,它确实要求您有足够的内存来为每个正在运行的进程在内存中保留 1 个块。
我知道这并不完全是您正在寻找的,但目前它可能是一个可以接受的解决方法。
如果您的任务平均花费相同的时间,那么您也许可以使用 mbuffer:
find . -type f | split -n r/24 -u --filter="mbuffer -m 2G | myjob"
答案2
在这种一般情况下,这看起来是不可能的。这意味着每个进程都有一个缓冲区,您可以从外部观察缓冲区来决定将下一个条目放在哪里(调度)...当然您可以写一些东西(或使用像 slurm 这样的批处理系统)
但根据流程的不同,您也许能够对输入进行预处理。例如,如果您想下载文件、从数据库更新条目或类似的内容,但其中 50% 最终将被跳过(因此根据输入您有很大的处理差异),那么只需设置一个预处理器验证哪些条目将花费很长时间(文件存在、数据已更改等),因此无论来自另一方的内容都保证花费相当相等的时间。即使启发式并不完美,您最终也可能会取得相当大的进步。您可以将其他文件转储到文件中,然后以相同的方式进行处理。
但这取决于您的用例。
答案3
不,没有通用的解决方案。您的调度程序需要知道每个程序何时准备好读取另一行,但据我所知,没有标准允许这样做。你所能做的就是在 STDOUT 上放一行并等待有东西消耗它;对于管道上的生产者来说,实际上没有一个好方法来判断下一个消费者是否准备好。
答案4
尝试这个:
mkfifo
对于每个过程。
然后挂tail -f | myjob
在每个 fifo 上。
例如设置工人(myjob 流程)
mkdir /tmp/jobs
for X in 1 2 3 4
do
mkfifo pipe$X
tail -f pipe$X | myjob &
jobs -l| awk '/pipe'$X'/ {print $2, "'pipe$X'"}' >> pipe-job-mapping
done
根据您的应用程序(myjob),您可能能够使用 jobs -s 来查找已停止的作业。否则列出按 CPU 排序的进程并选择消耗资源最少的进程。拥有作业报告本身,例如,当需要更多工作时,通过在文件系统中设置一个标志。
假设作业在等待输入时停止,请使用
jobs -sl
例如,找出已停止作业的 pid 并为其分配工作
grep "^$STOPPED_PID" pipe-to-job-mapping | while read PID PIPE
do
cat workset > $PIPE
done
我测试了这个
garfield:~$ cd /tmp
garfield:/tmp$ mkfifo f1
garfield:/tmp$ mkfifo f2
garfield:/tmp$ tail -f f1 | sed 's/^/1 /' &
[1] 21056
garfield:/tmp$ tail -f f2 | sed 's/^/2 /' &
[2] 21058
garfield:/tmp$ echo hello > f1
1 hello
garfield:/tmp$ echo what > f2
2 what
garfield:/tmp$ echo yes > f1
1 yes
我必须承认这只是炮制出来的,所以ymmv。