我有一个Python脚本或多或少可以做这个
current_tasks = TaskManager()
MAXPROCS = 8
while len(outstanding_tasks) > 0:
if len(current_tasks.running) < MAXPROCS:
current_tasks.addTask(outstanding_tasks.next())
else:
current_tasks.wait_for_one_finish()
Outstanding_tasks.next() 基本上是这样的:
p = subprocess.Popen([task], stdout=OUTFILE, stderr=subprocess.PIPE)
和current_tasks.wait_for_one_finish()
:
waiting = True
while waiting:
for t in tasks:
ret = t.poll()
if ret not None:
handle_stderr(t)
waiting = False
break
相当简单 - 按需生成任务,直到我们一次运行 8 个任务,然后阻塞,直到它们一次完成一个任务,然后再生成更多任务。
问题是这样的:
stderr=subprocess.PIPE
每个子进程都将 stderr 写入管道。如果它崩溃并且想要向管道写入大日志消息或其他任何内容,并且该消息超出了管道缓冲区的大小,则 write() 将阻塞。该过程不会完成,因此我的控制过程将永远不会看到其返回值poll()
并从其 stderr 中读取。
显然有一些方法可以解决这个问题:
- 将 stderr 从我的子进程重定向到临时文件
- 生成一个 Python 线程,该线程从所有正在运行的任务的 stderr 文件描述符中读取并将它们缓冲在内存中
- 在我的小临时事件循环中有一个 select() 或其他东西
但所有这些都是我必须在应用程序代码中处理的内容。我想知道的是:是否有某种方法可以获取管道的行为,但具有良好的大弹性缓冲区,以便子进程始终可以对其 stderr 执行成功的 write() 然后退出,而无需我看看它直到完成?
答案1
简短的回答是:没有。
您已经强调了处理通过子进程管道发送的大数据所需的解决方法。 “漂亮的大弹性缓冲”管道不存在。这被称为子流程管理 Python 文档作为死锁的潜在来源,您可以调用添加的解决方案proc.communicate()
来从 stderr 读取。您的情况的问题是您无法communicate()
同时调用所有进程,并且该方法会阻塞,直到读取所有数据。
如果是我,我可能会select()
在所有stderr
进程上使用调用而不是proc.poll()
循环。 select()
可以阻塞直到任何进程执行某些操作,并且当进程退出时,它将关闭 stderr 管道,因此一石二鸟(知道数据何时写入 stderr 并知道进程何时死亡)。