笔记:这是关于制片人检测消费者断开连接,而不是消费者检测到生产者已断开连接(又名 EOF)。生产者可能长时间不写入任何数据,但希望快速发现消费者已断开连接。
事件的一般顺序:
- 消费者(Wireshark)创建一个命名管道(使用
mkfifo
)并打开其读取端。 - 生产者(所谓的 Wireshark 外部捕获程序,又名 extcap)启动并传递命名管道的名称/路径。
- 生产者将写入端打开为 O_WRONLY 并首先将一些数据写入管道。
- 蟋蟀
- 用户按下 Wireshark 中的“停止”按钮,Wireshark 然后关闭其管道的读取端。
- ???尝试在生产者中检测消费者是否已与命名管道断开连接,即使生产者没有数据要发送???
Linux
在 Linux 上的生产者中,使用select
带有 fd 的系统调用作为读取 fd 集中命名管道的写入端,将返回写入端 fd 在消费者断开连接时变得可读。
苹果系统
然而,在macos上,命名管道的写端fd每当生产者写入数据时就变得可读(原文如此!)。确实如此不是在消费者断开连接后变得可读。
编辑:添加错误 fd 集不会改变情况;永远不会有错误 fd 设置。 /编辑
关于如何检测消费者与 macos 上的命名管道断开连接的任何想法,没有 SIGPIPE,因为可能很长一段时间没有写入,但用户已经停止了 Wireshark 记录?
生产者命名管道上的消费者断开连接检测
https://github.com/siemens/cshargextcap/blob/macos/pipe/checker_notwin.go
package pipe
import (
"os"
"golang.org/x/sys/unix"
log "github.com/sirupsen/logrus"
)
// WaitTillBreak continuously checks a fifo/pipe to see when it breaks. When
// called, WaitTillBreak blocks until the fifo/pipe finally has broken.
//
// This implementation leverages [syscall.Select].
func WaitTillBreak(fifo *os.File) {
log.Debug("constantly monitoring packet capture fifo status...")
fds := unix.FdSet{}
for {
// Check the fifo becomming readable, which signals that it has been
// closed. In this case, ex-termi-nate ;) Oh, and remember to correctly
// initialize the fdset each time before calling select() ... well, just
// because that's a good idea to do. :(
fds.Set(int(fifo.Fd()))
n, err := unix.Select(
int(fifo.Fd())+1, // highest fd is our file descriptor.
&fds, nil, nil, // only watch readable.
nil, // no timeout, ever.
)
if n != 0 || err != nil {
// Either the pipe was broken by Wireshark, or we did break it on
// purpose in the piping process. Anyway, we're done.
log.Debug("capture fifo broken, stopped monitoring.")
return
}
}
}
在 MacOS 上产生错误行为的单元测试
https://github.com/siemens/cshargextcap/blob/macos/pipe/checker_notwin_test.goWaitTillBreak
--在我们实际关闭命名管道的消费者端之前不得返回的断言失败。
package pipe
import (
"io"
"os"
"time"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "github.com/thediveo/success"
"golang.org/x/sys/unix"
)
var _ = Describe("pipes", func() {
It("detects on the write end when a pipe breaks", func() {
// As Wireshark uses a named pipe it passes an extcap its name (path)
// and then expects the extcap to open this named pipe for writing
// packet capture data into it. For this test we simulate Wireshark
// closing its reading end and we must properly detect this situation on
// our writing end of the pipe.
By("creating a temporary named pipe/fifo and opening its ends")
tmpfifodir := Successful(os.MkdirTemp("", "test-fifo-*"))
defer os.RemoveAll(tmpfifodir)
fifoname := tmpfifodir + "/fifo"
unix.Mkfifo(fifoname, 0660)
wch := make(chan *os.File)
go func() {
defer GinkgoRecover()
wch <- Successful(os.OpenFile(fifoname, os.O_WRONLY, os.ModeNamedPipe))
}()
rch := make(chan *os.File)
go func() {
defer GinkgoRecover()
rch <- Successful(os.OpenFile(fifoname, os.O_RDONLY, os.ModeNamedPipe))
}()
var r, w *os.File
Eventually(rch).Should(Receive(&r))
Eventually(wch).Should(Receive(&w))
defer w.Close()
go func() {
defer GinkgoRecover()
By("continously draining the read end of the pipe into /dev/null")
null := Successful(os.OpenFile("/dev/null", os.O_WRONLY, 0))
defer null.Close()
io.Copy(null, r)
By("pipe draining done")
}()
go func() {
defer GinkgoRecover()
time.Sleep(2 * time.Second)
By("closing read end of pipe")
Expect(r.Close()).To(Succeed())
}()
go func() {
defer GinkgoRecover()
time.Sleep(300 * time.Microsecond)
By("writing some data into the pipe")
w.WriteString("Wireshark rulez")
}()
By("waiting for pipe to break")
start := time.Now()
WaitTillBreak(w)
Expect(time.Since(start).Milliseconds()).To(
BeNumerically(">", 1900), "pipe wasn't broken yet")
})
})
基于民意调查的版本
这在 macOS 上也不起作用,永远不会返回任何POLLERR
.不过,它在 Linux 上确实可以正常工作。
package pipe
import (
"os"
"golang.org/x/sys/unix"
log "github.com/sirupsen/logrus"
)
// WaitTillBreak continuously checks a fifo/pipe to see when it breaks. When
// called, WaitTillBreak blocks until the fifo/pipe finally has broken.
//
// This implementation leverages [unix.Poll].
func WaitTillBreak(fifo *os.File) {
log.Debug("constantly monitoring packet capture fifo status...")
fds := []unix.PollFd{
{
Fd: int32(fifo.Fd()),
Events: 0,
},
}
for {
// Check the fifo becomming readable, which signals that it has been
// closed. In this case, ex-termi-nate ;) Oh, and remember to correctly
// initialize the fdset each time before calling select() ... well, just
// because that's a good idea to do. :(
n, err := unix.Poll(fds, 1000 /*ms*/)
if err != nil {
if err == unix.EINTR {
continue
}
log.Debugf("capture fifo broken, reason: %s", err.Error())
return
}
if n <= 0 {
continue
}
log.Debugf("poll: %+v", fds)
if fds[0].Revents&unix.POLLERR != 0 {
// Either the pipe was broken by Wireshark, or we did break it on
// purpose in the piping process. Anyway, we're done.
log.Debug("capture fifo broken, stopped monitoring.")
return
}
}
}