我有大量大文件(数百个文件,每个文件数百 MB),我需要通过多个程序来过滤和转换它们。我利用了多个 CPU 核心,因此我在每个文件上运行了同一管道的多个实例(最多可以有一百个核心,并且可以使用 ssh 作为管道的一部分,以防对答案产生任何影响)。我想监控每个管道,并且我正在使用pv
它。这是我拥有的一个简单的例子:
$ pv file-001.gz | gunzip | xz > file-001.xz
1.58GB 0:00:02 [ 713MB/s] [=================================>] 100%
实际上,我还在管道中做了其他几件事,包括通过 ssh 将数据传递到其他机器,并通过这些机器上的过滤器进行管道传输,但管道总是以重定向到主机上的新文件而结束。此外,管道中的任何阶段都不需要整个数据集;它们可以逐行或逐块地进行操作。
目前,我需要为管道的每个实例设置一个单独的终端窗口。我想要做的是在单个终端/shell 中启动管道的 n 个并行实例,并在其自己的一行上获取每个 pv 实例的输出。如下所示:
1.48GB 0:00:54 [ 704MB/s] [===============================> ] 97% ETA 00:00:06
1.58GB 0:01:00 [ 713MB/s] [=================================>] 100%
0.75GB 0:00:31 [ 709MB/s] [================> ] 50% ETA 00:00:29
n 的值是我可以在终端窗口中容纳的行数,比如 3-50 左右。进度报告的确切格式并不重要,只要它包含速度、完成百分比、已用时间和估计剩余时间即可。我是否使用也不重要,pv
使用其他程序也没问题,只要我可以轻松安装它或只是普通的 shell(最好是 bash)。但重要的是,如果管道的某个部分由于某种原因崩溃,该方法可以处理偶尔发生的管道断裂。我还想在每次作业完成(成功或失败)并且仍然有未处理的文件时启动新作业。
关于如何做到这一点有什么想法吗?
请注意,我已经尝试过GNU 并行但是它的 ssh 功能似乎假设每个输入文件首先被传输到远程主机,然后进行处理,然后将结果传回,由于涉及的数据量和每个处理节点上的空间有限,我想避免这种情况。
答案1
关于如何做到这一点有什么想法吗?
不。
pv 有 -c 和 -N 选项,可以让你做你想做的事
$ pv -cN source access.log | gzip | pv -cN gzip > access.log.gz
source: 760MB 0:00:15 [37.4MB/s] [=> ] 19% ETA 0:01:02
gzip: 34.5MB 0:00:15 [1.74MB/s] [ <=> ]
但我不知道如何将该功能应用于多个管道
然而,如果你查看 pv 的手册页,你会看到这个
(tar cf - . \
| pv -n -s $(du -sb . | awk '{print $1}') \
| gzip -9 > out.tgz) 2>&1 \
| dialog --gauge 'Progress' 7 70
因此,您可以扩展此功能以并行运行多个任务,只要可以在一组小窗口中查看进度即可。我会尝试 Xdialog。
目前,我需要为管道的每个实例提供一个单独的终端窗口
我的主要观点是,您没有必要以交互方式打开大量终端窗口,您可以让一个脚本本身打开大量对话框。
答案2
您看过--pipe
GNU Parallel 吗?
cat bigfiles* | pv | parallel --pipe -S server1,server2 'cat | process_pipe'
(为了强调,加上了猫)
其默认块大小为 1 MB,可以使用 --block 进行调整。
-- 编辑为 1-1 对应关系 --
基于上述内容,您可以获得如下的 1-1 对应关系:
parallel --eta "cat {} | parallel --pipe -S server1,server2 'cat | process_pipe' > {}.out" ::: bigfiles*
(为了强调,加上了猫)
它不是最理想的,因为内部并行不知道其兄弟节点,因此可能在服务器 1 上比在服务器 2 上产生更多。避免这种情况的一种方法是在外部并行上使用 -j1,但如果内部并行只有足够第一台服务器使用的块,那么这不是最佳选择。换句话说:为了完美平衡您的工作负载,您可能需要对此进行一些调整 - 甚至可能使用 --load 100% 或类似选项。
--- 编辑:处理崩溃 ---
如果process_pipe
返回错误,则应再重试该命令 2 次:
parallel --retries 3 --eta "cat {} | parallel --pipe -S server1,server2 'cat | process_pipe' > {}.out" ::: bigfiles*