从多个 fifo 中并行非阻塞读取

从多个 fifo 中并行非阻塞读取

我有时会处理来自并行运行的程序的一堆输出 fifo。我想合并这些 fifo。简单的解决方案是:

cat fifo* > output

但这要求第一个 fifo 必须先完成才能从第二个 fifo 读取第一个字节,而这会阻塞并行运行的程序。

另一种方法是:

(cat fifo1 & cat fifo2 & ... ) > output

但这可能会混合输出,从而得到半行输出。

从多个 fifo 读取时,必须有一些合并文件的规则。通常逐行执行对我来说就足够了,所以我在寻找可以做到的事情:

parallel_non_blocking_cat fifo* > output

它将同时从所有 fifo 读取数据,并一次将输出合并为一整行。

我发现编写该程序并不难。您需要做的就是:

  1. 打开所有 fifo
  2. 对所有这些进行阻塞选择
  3. 从 fifo 中以非阻塞方式读取数据,将数据放入该 fifo 的缓冲区中
  4. 如果缓冲区包含整行(或记录),则打印出该行
  5. 如果所有 fifo 都已关闭/eof: 退出
  6. 转到2

所以我的问题是不是: 可以吗?

我的问题是:它已经完成了吗?我可以安装一个可以执行此操作的工具吗?

答案1

此解决方案仅在 fifo 的数量少于 GNU parallel 可以并行运行的作业数量(受文件句柄和进程数限制)时才有效:

parallel -j0 --line-buffer cat ::: fifo*

它似乎能够移动至 500 MB/s:

window1$ mkfifo {1..100}
window1$ parallel -j0 --line-buffer cat ::: {1..100} | pv >/dev/null

window2$ parallel -j0 'cat bigfile > ' ::: *

并且它不混合半行:

window1$ mkfifo {1..100}
window1$ parallel -j0 --line-buffer cat ::: {1..100} &

window2$ parallel -j0 'traceroute {}.1.1.1 > {}' ::: *

它并行读取作业(它不会在读取下一个作业之前完全读取一个作业):

window1$ mkfifo {1..100}
window1$ parallel -j0 --line-buffer cat ::: * > >(tr -s ABCabc)

window2$ long_lines_with_pause() {
            perl -e 'print STDOUT "a"x30000_000," "'                                                      
    perl -e 'print STDOUT "b"x30000_000," "'                                                      
    perl -e 'print STDOUT "c"x30000_000," "'                                                      
    echo "$1"                                                                                     
    sleep 2                                                                                       
    perl -e 'print STDOUT "A"x30000_000," "'                                                      
    perl -e 'print STDOUT "B"x30000_000," "'                                                      
    perl -e 'print STDOUT "C"x30000_000," "'                                                      
    echo "$1"                                                                                     
}
window2$ export -f long_lines_with_pause
window2$ parallel -j0 'long_lines_with_pause {} > {}' ::: *

这里,大量的“ab c”(作业的前半部分)将在“AB C”(作业的后半部分)之前打印。

答案2

所以,

tail -q -n+1 -f --pid=stop-tail-when-this-is-gone fifo1 fifo2 fifo3

几乎有效(正如我在关于我的答案的早期版本的初始评论中所提到的,尽管您可能需要事先使用“for f in fifo*; cat < /dev/null > $f & done”来确保所有 FIFO 都可供写入,因为 coreutils tail 以 O_RDONLY 打开它们而没有 O_NONBLOCK)。

不幸的是,有一个错误,它只tail对来自 stdin 上的管道的输入谨慎处理行/记录结尾,而不对来自参数中的命名管道/FIFO 的输入谨慎处理。总有一天,有人可能会修复 coreutils tail。

在此期间,为了获得一个真正的、遵循行尾的多消费者 / 单生产者队列,您可以使用一个简单的 100 行左右的 C 程序,我称之为tailpipes.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>    //TODO: Find&document build environments lacking memrchr
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
#include <errno.h>
#include <signal.h>
#include <sys/types.h>
#include <sys/stat.h>
#define errstr strerror(errno)

char const * const Use = "%s: %s\n\nUsage:\n\n"
"  %s [-p PID] [-o OPEN_MODE(RW)] [-d DLM(\\n)] [-s SEC(.01)] PATH1 PATH2..\n\n"
"Read delimited records (lines by default) from all input paths, writing only\n"
"complete records to stdout and changing to a stop-at-EOF mode upon receiving\n"
"SIGHUP (unlike \"tail -fqn+1\" which just dies) OR when we first notice that\n"
"PID does not exist (if PID is given).  Since by default fifos are opened RW,\n"
"signal/PID termination is needed to not loop forever, but said FIFOs may be\n"
"closed & reopened by other processes as often as is convenient. For one-shot\n"
"writing style, ending input reads at the first EOF, use \"-oRO\".  Also, DLM\n"
"adjusts the record delimiter byte from the default newline, and SEC adjusts\n"
"max select sleep time.  Any improperly terminated final records are sent to\n"
"stderr at the end of execution (with a label and bracketing).\n";

int writer_done;
void sig(int signum) { writer_done = 1; }

int main(int N, char *V[]) {
    signed char     ch;
    char           *buf[N-1], delim = '\n', *V0 = V[0], *eol;
    int             len[N-1], fds[N-1], nBf[N-1], i, fdMx = 0, nS = 0, nF = 0,
                    oFlags = O_RDWR;
    pid_t           pid = 0;
    ssize_t         nR, nW;
    struct timespec tmOut = { 0, 10000000 }; //10 ms select time out
    fd_set          fdRdMaster, fdRd;
    //If we get signaled before here, this program dies and data may be lost.
    //If possible use -p PID option w/pre-extant PID of appropriate lifetime.
    signal(SIGHUP, sig);                    //Install sig() for SIGHUP
    memset((void *)fds, 0, sizeof fds);
    memset((void *)len, 0, sizeof len);
    FD_ZERO(&fdRdMaster);
    fdRd = fdRdMaster;
    while ((ch = getopt(N, V, "d:p:s:o:")) != -1)
        switch (ch) {                       //For \0 do '' as a sep CLI arg
            double tO;
            case 'd': delim  = optarg ? *optarg : '\n';   break;
            case 'p': pid    = optarg ? atoi(optarg) : 0; break;
            case 's': tO = optarg ? atof(optarg) : .01;
                      tmOut.tv_sec = (long)tO;
                      tmOut.tv_nsec = 1e9 * (tO - tmOut.tv_sec);
                      break;
            case 'o': oFlags = (optarg && strcasecmp(optarg, "ro") == 0) ?
                                 O_RDONLY | O_NONBLOCK : O_RDWR;
                      break;
            default: return fprintf(stderr, Use, V0, "bad option", V0), 1;
        }
    V += optind; N -= optind;               //Shift off option args
    if (N < 1)
        return fprintf(stderr, Use, V0, "too few arguments", V0), 2;
    setvbuf(stdout, NULL, _IONBF, 65536);   //Full pipe on Linux
    for (i = 0; i < N; i++)                 //Check for any available V[]
        if ((fds[i] = open(V[i], oFlags)) != -1) {
            struct stat st;
            fstat(fds[i], &st);
            if (!S_ISFIFO(st.st_mode))
                return fprintf(stderr,"%s: %s not a named pipe\n", V0, V[i]), 3;
            nF++;
            FD_SET(fds[i], &fdRdMaster);    //Add fd to master copy for pselect
            buf[i] = malloc(nBf[i] = 4096);
            if (fds[i] > fdMx)
                fdMx = fds[i];
        } else if (errno == EINTR) {        //We may get signaled to finish up..
            i--; continue;                  //..before we even this far.
        } else
            return fprintf(stderr, "%s: open(%s): %s\n", V0, V[i], errstr), 3;
    fdMx++;
    fdRd = fdRdMaster;
    while (nF && (nS = pselect(fdMx, &fdRd, NULL, NULL, &tmOut, NULL)) != -99) {
        if (pid && kill(pid, 0) != 0 && errno != EPERM) //Given pid didn't exist
            writer_done = 1;
        if (nS == 0 && writer_done)                     //No input & no writers
            break;
        else if (nS == -1) {                            //Some select error:
            if (errno != EINTR && errno == EAGAIN)      //..fatal or retry
                return fprintf(stderr, "%s: select: %s\n", V0, errstr), 4;
            continue;
        }
        for (i = 0; nS > 0 && i < N; i++) {             //For all fds..
            if (fds[i] < 0 || !FD_ISSET(fds[i], &fdRd)) //with readable data
                continue;
            if ((nR = read(fds[i], buf[i]+len[i], nBf[i] - len[i])) < 0) {
                if (errno != EAGAIN && errno != EINTR)
                    fprintf(stderr, "%s: read: %s\n", V0, errstr);
                continue;
            } else if (oFlags == (O_RDONLY | O_NONBLOCK) && nR == 0) {
                FD_CLR(fds[i], &fdRdMaster);
                nF--;
                free(buf[i]);
            }
            len[i] += nR;                               //Update Re: read data
            if ((eol = memrchr(buf[i], delim, len[i]))) {
                nW = eol - buf[i] + 1;                  //Only to last delim
                if (fwrite(buf[i], nW, 1, stdout) == 1) {
                    memmove(buf[i], buf[i] + nW, len[i] - nW);
                    len[i] -= nW;                       //Residual buffer shift
                } else
                    return fprintf(stderr, "%s: %d bytes->stdout failed: %s\n",
                                   V0, len[i], errstr), 5;
            } else if (len[i] == nBf[i]) {              //NoDelim&FullBuf=>GROW
                void *tmp;
                if (nBf[i] >= 1 << 30)
                    return fprintf(stderr, "%s: record > 1 GiB\n", V0), 6;
                nBf[i] *= 2;
                if (!(tmp = realloc(buf[i], nBf[i])))
                    return fprintf(stderr,"%s: out of memory\n", V0), 7;
                buf[i] = tmp;
            }
        }
        fdRd = fdRdMaster;
    }
    for (i = 0; i < N; i++)                     //Ensure any residual data is..
        if (len[i] > 0) {                       //..labeled,bracketed,=>stderr.
            fprintf(stderr, "%s: %s: final unterminated record: {", V0, V[i]);
            fwrite(buf[i], len[i], 1, stderr);
            fputs("}\n", stderr);
        }
    return 0;
}

安装是剪切 & 粘贴 & cc -Owhatever tailpipes.c -o somewhere-in-$PATH/tailpipes。在 Linux 和 FreeBSD 上测试。我得到大约 2500e6 字节/秒的输出,但内存可能比 500e6 字节/秒的盒子更快。

该算法大致如建议的那样,但更为通用。O_NONBLOCK 仅与 O_RDONLY 一起使用,并带有一些易于使用的选项,例如默认情况下打开 FIFO O_RDWR,以便写入器可以多次关闭和重新打开,并使用 -p PID 跟踪实现无竞争协议。如果需要,您可以传递 -oRO 以使用 EOF。还会 tailpipes在程序终止时处理不完整的行,将它们标记并括起来发送到 stderr,以防可以轻松进行后处理以使记录完整,或者如果它们的日志对调试有用。

示例用法。GNUxargs可以是 map-reduce-ish 并行管道的单消费者、多生产者/扇出部分,并tailpipes作为记录边界尊重扇入部分运行,所有这些都无需使用磁盘空间来存储临时文件:

export MYTEMP=$(mktemp -d /tmp/MYPROG.XXXXX)
FIFOs=`n=0; while [ $n -lt 8 ]; do echo $MYTEMP/$n; n=$((n+1)); done`
mkfifo $FIFOs
sleep 2147483647 & p=$!       #Cannot know xargs pid is good for long
( find . -print0 | xargs -0 -P8 --process-slot-var=MYSLOT MYPROGRAM
  kill $p ) &                 #Inform tailpipes writers are done
tailpipes -p$p $FIFOs | CONSUMING-PIPELINE
rm -rf $MYTEMP
wait                          #Wait for xargs subshell to finish

在上面,重要的是 A)n0到适当的上限,因为这是方案xargs对 的使用MYSLOT,并且 B)MYPROGRAM将其输出定向到新分配的$MYSLOT-keyed 文件,如$MYTEMP/$MYSLOT,例如,exec > $MYTEMP/$MYSLOT如果是 shell 脚本。如果假设设置其子 stdouts,MYPROGRAM则在许多情况下可以消除 shell/程序包装器。xargs--process-slot-out

答案3

一个更优雅的答案是不会在磁盘上缓冲无用的副本:

#!/usr/bin/perl                                                                                                       

use threads;
use threads::shared;
use Thread::Queue;

my $done :shared;

my $DataQueue = Thread::Queue->new();

my @producers;
for (@ARGV) {
    push @producers, threads->create('producer', $_);
}

while($done <= $#ARGV) {
    # This blocks until $DataQueue->pending > 0                                                                       
    print $DataQueue->dequeue();
}

for (@producers) {
    $_->join();
}


sub producer {
    open(my $fh, "<", shift) || die;
    while(<$fh>) {
        $DataQueue->enqueue($_);
    }
    # Closing $fh blocks                                                                                              
    # close $fh;                                                                                                      
    $done++;
    # Guard against race condition                                                                                    
    $DataQueue->enqueue("");
}

相关内容