我正在使用 Linux 的 tty 子系统来模拟串行端口。模拟串行端口由需要物理串行端口的应用程序使用。就我而言,应用程序在 Docker 下运行,但我认为 Docker 与这个问题无关。但是,我遇到一个问题,管理模拟串行端口的应用程序被锁定;我相信它正在阻止对 的调用write
。我想知道是否有一种方法可以使调用不被阻止,或者检测它何时会被阻止。
我有一个Python应用程序,用于管理模拟串行端口并在它们之间执行广播(即,如果一个应用程序写入其串行端口,则网络中的所有其他串行端口都会收到写入的消息)。该应用程序看起来像这样:
#!/usr/bin/env python3
import os
import pty
import selectors
import signal
import tty
# read the configuration
all_apps = ...
selector = selectors.DefaultSelector()
# create the emulated serial ports
master_fds = { }
for app in all_apps:
app_master, app_slave = pty.openpty()
tty.setraw(app_master)
master_fds[app] = app_master
selector.register(app_master, selectors.EVENT_READ, app_master)
# make the symlink that gets picked up by the application
os.symlink(os.ttyname(app_slave), "/run/my-app/{}/ttyS0".format(app))
# exit on SIGINT or SIGTERM
run_flag = True
def signal_handler(sig, stack):
global run_flag
run_flag = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# main loop
while run_flag:
events = selector.select(timeout = 1)
for key,mask in events:
fd = key.data
# read up to 1024 bytes of data from the app that sent the message
data = os.read(fd, 1024)
for app_fd in master_fds.values():
# don't broadcast to the application that sent the message
if fd == app_fd:
continue
# send the message to each application
os.write(app_fd, data)
# cleanup
for key in master_fds.keys():
os.remove("/run/my-app/{}/ttyS0".format(key))
os.close(master_fds[key])
一般来说,这个脚本运行良好 - 当应用程序启动时,它们将 pty 从属设备作为串行端口,并在应用程序之间广播消息。然而,在某些时候,脚本可能会挂起 - 我相信它是在调用os.write
,只有在将应用程序发送到 pty master 之一时才会被调用。在这种状态下,脚本不会响应信号(因为 run_flag 仅在循环运行时检查,但os.write
会阻塞循环)。
对我来说唯一有意义的解释是至少有一个应用程序没有正确从 pty 的从属端读取数据。如果这是真的,那么我可以想象内核中支持 pty 的某些缓冲区正在变满,并且由于对 的调用write
会溢出缓冲区,因此调用会阻塞,直到缓冲区被充分耗尽(这永远不会发生)。
我发现有时可以通过cat
在端口的从属端运行来耗尽模拟串行端口。但是,我需要应用程序足够可靠,以便在调用os.write
通常会阻塞时不会阻塞,而无需人工干预。
有没有办法测量(例如,在我的 Python 脚本中)支持 pty 的缓冲区的剩余容量,以避免write
对 pty master 的调用阻塞?
答案1
我想我找到了一种解决方案,尽管我不知道它是否一定是理想的。
我们可以编辑脚本将 pty master 设置为非阻塞模式:
import fcntl
...
for app in all_apps:
app_master, app_slave = pty.openpty()
flags = fcntl.fcntl(app_master, fcntl.F_GETFL)
flags |= os.O_NONBLOCK
fnctl.fcntl(app_master, fcntl.F_SETFL, flags)
然后,每当我们向主机写入时,如果写入通常会阻塞,我们可以捕获BlockingIOError
以继续:
# (try to) send the message to each application
try:
os.write(app_fd, data)
except BlockingIOError:
print("Caught BlockingIOError")
这并不能解决根本问题,即应用程序可能行为不当并且无法读取其串行端口,但通过此解决方案,脚本至少可以继续完成其工作。代价是一些数据可能会丢失(广播到不耗尽其端口的应用程序)。