我的问题类似于“根据同一字段中的排序值合并两个排序文件“但将其扩展到命名管道。
假设我有两个带有排序整数的文本文件,我想合并它们。我可以用来sort -nm file1.txt file2.txt > merged.txt
进行一次性、非阻塞合并。
现在,假设这些文件实际上是我正在制作然后从 python 中填充的命名管道 (FIFO)。只要我交替写入一个管道,然后写入下一个管道,我就可以很好地做到这一点。此代码用于生成两个有序的整数列表,将它们写入由sort
子进程读取的命名管道,子进程将合并结果输出到单个文件中:
import tempfile
import subprocess
import os
import sys
# Make temporary fifos
tempdir = tempfile.mkdtemp()
tempdir = "/tmp/tmph1ilvegn" # hard-code tempdir for repeated runs
fifo_path1 = os.path.join(tempdir, "fifo1")
fifo_path2 = os.path.join(tempdir, "fifo2")
pos_fifo = os.mkfifo(fifo_path1)
neg_fifo = os.mkfifo(fifo_path2)
# Output will be a sorted merge from 2 inlines2ut streams.
outfile = "sorted_merge.txt"
sortProcess = subprocess.Popen('sort -snm ' + fifo_path1 + " " + fifo_path2 + " > " +
outfile, shell=True)
fifo_writer1 = open(fifo_path1, 'w')
fifo_writer2 = open(fifo_path2, 'w')
nlines1 = 0
nlines2 = 0
# Simulate 2 sorted lists by just going iterating through a sorted list and
# printing some numbers to one list and some to the other.
for i in range(1,100000):
print("i: {}; n1: {}; n2: {}; imbalance:{}".format(i, nlines1, nlines2, nlines1-nlines2))
line_to_write = (str(i) + "\n")
if i % 2:
nlines1 +=1
fifo_writer2.write(line_to_write)
else:
nlines2 +=1
fifo_writer1.write(line_to_write)
# clean up fifos:
fifo_writer1.close()
fifo_writer2.close()
os.remove(fifo_path1)
os.remove(fifo_path2)
sortProcess.communicate()
我得到一个排序结果。但是 - 现在让我们通过将 更改i % 2
为来使列表不平衡i % 3
。在原来的版本中,这只是打印到 fifo1,然后是 fifo2,然后是 fifo1,然后是 fifo2,等等。在修改后的版本中,它向两个管道之一打印两倍的行数。
运行它i % 3
我得到以下输出:
...
i: 16182; n1: 10788; n2: 5393; imbalance:5395
i: 16183; n1: 10788; n2: 5394; imbalance:5394
i: 16184; n1: 10789; n2: 5394; imbalance:5395
i: 16185; n1: 10790; n2: 5394; imbalance:5396
i: 16186; n1: 10790; n2: 5395; imbalance:5395
i: 16187; n1: 10791; n2: 5395; imbalance:5396
i: 16188; n1: 10792; n2: 5395; imbalance:5397
i: 16189; n1: 10792; n2: 5396; imbalance:5396
i: 16190; n1: 10793; n2: 5396; imbalance:5397
i: 16191; n1: 10794; n2: 5396; imbalance:5398
i: 16192; n1: 10794; n2: 5397; imbalance:5397
i: 16193; n1: 10795; n2: 5397; imbalance:5398
它总是停在同一个地方。使用 strace 我可以看到:
write
python 进程在调用点 4 时挂起:write(4, "9\n15170\n15172\n15173\n15175\n15176\n"..., 4100
但该进程在调用点 3
sort
时挂起:read
read(3,
查看进程lsof -n -p
的输出sort
表明它正在等待值到达fifo1
,而write
进程正在等待写入值fifo2
:
sort 23330 nsheff txt REG 259,2 110040 10769142 /usr/bin/sort
sort 23330 nsheff mem REG 259,2 2981280 10752335 /usr/lib/locale/locale-archive
sort 23330 nsheff mem REG 259,2 1868984 6031544 /lib/x86_64-linux-gnu/libc-2.23.so
sort 23330 nsheff mem REG 259,2 138696 6031518 /lib/x86_64-linux-gnu/libpthread-2.23.so
sort 23330 nsheff mem REG 259,2 162632 6031516 /lib/x86_64-linux-gnu/ld-2.23.so
sort 23330 nsheff 0u CHR 136,1 0t0 4 /dev/pts/1
sort 23330 nsheff 1w REG 259,2 0 4719615 /home/nsheff/code/bamSitesToWig/sorted_merge.txt
sort 23330 nsheff 2u CHR 136,1 0t0 4 /dev/pts/1
sort 23330 nsheff 3r FIFO 259,2 0t0 786463 /tmp/tmph1ilvegn/fifo1
sort 23330 nsheff 4r FIFO 259,2 0t0 786465 /tmp/tmph1ilvegn/fifo2
因此由于某种原因,sort
进程 * 停止监听fifo2
,导致进程挂起。
fifo2
现在,如果我通过发出...来放置一个单独的侦听器cat fifo2
,该过程将再次开始并继续数千次迭代,直到...它现在在另一个随机点停止(迭代 53733)。
我认为一定有一些我不明白的事情发生在缓冲管道上,以及sort
从一个流的读取到下一个流的读取是如何变化的。对我来说奇怪的是,它是确定性的,在完全相同的位置失败,并且只有当列表不平衡时才会失败。
我有什么办法可以解决这个问题吗?
答案1
显然,当您向两个命名管道写入不同数量的数据时,您的程序会产生死锁。您的程序在write
for 1 fifo2 上阻塞(缓冲区已满),而sort
进程在 for fifo1 上阻塞read
(缓冲区为空)。
你不知道如何sort
实现。它可能希望以更大的块读取文件,然后处理内存中的数据以提高效率。如果使用读取数据的sort
函数,缓冲甚至可能会自动发生。stdio.h
命名(和未命名)管道使用数据缓冲区。
如果缓冲区已满,写入进程将阻塞,直到读取进程读取了一些数据或关闭其末尾。
如果缓冲区为空,则读取进程将阻塞,直到写入进程写入一些数据或结束为止。
如果在每个周期中向 fifo1 写入一行,向 fifo2 写入两行,则会填满 fifo2 的缓冲区,而 fifo1 的缓冲区仅填满一半。
根据您的程序向 fifo 写入的数据量以及sort
想要读取的数据量,这显然会导致这样的情况:您sort
想要从 fifo1 读取某些内容,而该程序只有一个空缓冲区,而您的程序想要写入带有完整缓冲区的 fifo2 。
结果是确定性的,因为管道缓冲区具有固定大小,并且您的程序也可能具有固定大小并sort
使用固定缓冲区大小来读取或写入数据。
您可以查看 GNU 的源代码sort
:
https://github.com/wertarbyte/coreutils/blob/master/src/sort.c
一开始,它尝试使用 function 在所有文件的循环中填充所有输入文件的输入缓冲区fillbuf
。
稍后在某些情况下它会fillbuf
再次调用输入文件。
函数中fillbuf
有一条注释
/* Read as many bytes as possible, but do not read so many
bytes that there might not be enough room for the
corresponding line array. The worst case is when the
rest of the input file consists entirely of newlines,
except that the last byte is not a newline. */
显然sort
选择一个输入文件并需要一定量的数据。如果读取阻塞,它不会切换输入文件。
该实现对于普通文件效果很好,因为read
操作会在一段时间后返回一些数据或 EOF,因此它不会永久阻塞。
如果有不止一件事可以在两个进程/线程之间阻塞,那么总是很难避免死锁。在您的情况下,您应该只使用一根管道。如果您总是有数据要写入 fifo1(如果 fifo2 会阻塞),则使用非阻塞操作可能会有所帮助,反之亦然。
如果您使用两个单独的线程/进程写入管道,则使用两个管道可能会起作用,但前提是线程/进程彼此独立工作。如果应该写入 pipeline1 的线程 A 以某种方式等待线程 B(该线程 B 只在写入 pipeline2 时阻塞),这将无济于事。