使用并行处理唯一的输入文件到唯一的输出文件

使用并行处理唯一的输入文件到唯一的输出文件

我有一个 shell 脚本问题,给定一个充满输入文件的目录(每个文件包含许多输入行),我需要单独处理它们,将它们的每个输出重定向到一个唯一的文件(又名 file_1.input 需要在 file_1.output 中捕获,依此类推)。

预并行,我只会迭代目录中的每个文件并执行我的命令,同时执行某种计时器/计数技术以免压垮处理器(假设每个进程都有恒定的运行时间)。但是,我知道情况并非总是如此,因此使用“并行”之类的解决方案似乎是无需编写自定义代码即可获得 shell 脚本多线程的最佳方法。

虽然我已经想到了一些方法来并行处理每个文件(并允许我有效地管理我的核心),但它们看起来都很老套。我有一个我认为非常简单的用例,所以希望尽可能保持它的干净(并且并行示例中的任何内容似乎都不是我的问题。

任何帮助,将不胜感激!

输入目录示例:

> ls -l input_files/
total 13355
location1.txt
location2.txt
location3.txt
location4.txt
location5.txt

脚本:

> cat proces_script.sh
#!/bin/sh

customScript -c 33 -I -file [inputFile] -a -v 55 > [outputFile]

更新:阅读下面 Ole 的答案后,我能够将缺失的部分放在一起以实现我自己的并行实现。虽然他的回答很好,但以下是我的补充研究和笔记:

我没有运行完整的流程,而是从概念验证命令开始,在我的环境中证明他的解决方案。请参阅我的两种不同的实现(和注释):

find /home/me/input_files -type f -name *.txt | parallel cat /home/me/input_files/{} '>' /home/me/output_files/{.}.out

使用 find (不是 ls,这可能会导致问题)查找输入文件目录中的所有适用文件,然后将其内容重定向到单独的目录和文件。我上面的问题是读取和重定向(实际脚本很简单),因此用 cat 替换脚本是一个很好的概念证明。

parallel cat '>' /home/me/output_files/{.}.out :::  /home/me/input_files/*

第二种解决方案使用并行的输入变量范例来读取文件,但是对于新手来说,这更加令人困惑。对我来说,使用 find a and pipeline 很好地满足了我的需求。

答案1

GNU Parallel 专为此类任务而设计:

parallel customScript -c 33 -I -file {} -a -v 55 '>' {.}.output ::: *.input

或者:

ls | parallel customScript -c 33 -I -file {} -a -v 55 '>' {.}.output

每个 CPU 核心将运行一个作业。

您可以简单地通过以下方式安装 GNU Parallel:

wget https://git.savannah.gnu.org/cgit/parallel.git/plain/src/parallel
chmod 755 parallel
cp parallel sem

观看 GNU Parallel 的介绍视频以了解更多信息: https://www.youtube.com/playlist?list=PL284C9FF2488BC6D1

答案2

执行此操作的标准方法是设置一个队列并生成任意数量的知道如何从队列中提取内容并处理它的工作人员。您可以使用 fifo(也称为命名管道)在这些进程之间进行通信。

下面是一个简单的例子来演示这个概念。

一个简单的队列脚本:

#!/bin/sh
mkfifo /tmp/location-queue
for i in inputfiles/*; do
  echo $i > /tmp/location-queue
done
rm /tmp/location-queue

还有一个工人:

#!/bin/sh
while read file < /tmp/location-queue; do
  process_file "$file"
done

process_file可以在你的工作线程中的某个地方定义,它可以做你需要它做的任何事情。

一旦有了这两部分,您就可以拥有一个简单的监视器来启动队列进程和任意数量的工作进程。

监控脚本:

#!/bin/sh
queue.sh &
num_workers="$1"
i=0
while [ $i < $num_workers ]; do
  worker.sh &
  echo $! >> /tmp/worker.pids
  i=$((i+1))
done
monitor_workers

你有它。如果您确实这样做,最好在监视器中设置 fifo,并将路径传递给队列和工作线程,这样它们就不会耦合,也不会粘在 fifo 的特定位置。我在答案中专门以这种方式设置它,以便您在阅读时清楚地了解您正在使用的内容。

答案3

另一个例子:

ls *.txt | parallel 'sort {} > {.}.sorted.txt'

我发现其他示例不必要地复杂,而在大多数情况下,上述内容可能是您一直在寻找的内容。

答案4

这是对当前目录中的一大组文件执行相同的命令:

#!/bin/sh
trap 'worker=`expr $worker - 1`' USR1  # free up a worker
worker=0  # current worker
num_workers=10  # maximum number of workers
for file in *.txt; do
    if [ $worker -lt $num_workers ]; then
        {   customScript -c 33 -I -file $file -a -v 55 > `basename $file .txt`.outtxt 
            kill -USR1 $$ 2>/dev/null  # signal parent that we're free
        } &
        echo $worker/$num_worker $! $file  # feedback to caller
        worker=`expr $worker + 1`
    else
        wait # for a worker to finish
    fi
done

这会customScript在每个txt文件上运行,将输出放入outtxt文件中。根据需要进行更改。使其工作的关键是信号处理,使用 SIGUSR1,以便子进程可以让父进程知道它已完成。使用 SIGCHLD 不起作用,因为脚本中的大多数语句都会向 shell 脚本生成 SIGCHLD 信号。我尝试将您的命令替换为sleep 1,该程序使用了 0.28s 的用户 cpu 和 0.14s 的系统 cpu ;这仅涉及大约 400 个文件。

相关内容