命名管道(mkfifo):如何在macos上检测读取端(消费者)已断开连接,而消费者没有写入写入端?

命名管道(mkfifo):如何在macos上检测读取端(消费者)已断开连接,而消费者没有写入写入端?

笔记:这是关于制片人检测消费者断开连接,而不是消费者检测到生产者已断开连接(又名 EOF)。生产者可能长时间不写入任何数据,但希望快速发现消费者已断开连接。

事件的一般顺序:

  1. 消费者(Wireshark)创建一个命名管道(使用mkfifo)并打开其读取端。
  2. 生产者(所谓的 Wireshark 外部捕获程序,又名 extcap)启动并传递命名管道的名称/路径。
  3. 生产者将写入端打开为 O_WRONLY 并首先将一些数据写入管道。
  4. 蟋蟀
  5. 用户按下 Wireshark 中的“停止”按钮,Wireshark 然后关闭其管道的读取端。
  6. ???尝试在生产者中检测消费者是否已与命名管道断开连接,即使生产者没有数据要发送???

Linux

在 Linux 上的生产者中,使用select带有 fd 的系统调用作为读取 fd 集中命名管道的写入端,将返回写入端 fd 在消费者断开连接时变得可读。

苹果系统

然而,在macos上,命名管道的写端fd每当生产者写入数据时就变得可读(原文如此!)。确实如此不是在消费者断开连接后变得可读。

编辑:添加错误 fd 集不会改变情况;永远不会有错误 fd 设置。 /编辑

关于如何检测消费者与 macos 上的命名管道断开连接的任何想法,没有 SIGPIPE,因为可能很长一段时间没有写入,但用户已经停止了 Wireshark 记录?

生产者命名管道上的消费者断开连接检测

https://github.com/siemens/cshargextcap/blob/macos/pipe/checker_notwin.go

package pipe

import (
    "os"

    "golang.org/x/sys/unix"

    log "github.com/sirupsen/logrus"
)

// WaitTillBreak continuously checks a fifo/pipe to see when it breaks. When
// called, WaitTillBreak blocks until the fifo/pipe finally has broken.
//
// This implementation leverages [syscall.Select].
func WaitTillBreak(fifo *os.File) {
    log.Debug("constantly monitoring packet capture fifo status...")
    fds := unix.FdSet{}
    for {
        // Check the fifo becomming readable, which signals that it has been
        // closed. In this case, ex-termi-nate ;) Oh, and remember to correctly
        // initialize the fdset each time before calling select() ... well, just
        // because that's a good idea to do. :(
        fds.Set(int(fifo.Fd()))
        n, err := unix.Select(
            int(fifo.Fd())+1, // highest fd is our file descriptor.
            &fds, nil, nil,   // only watch readable.
            nil, // no timeout, ever.
        )
        if n != 0 || err != nil {
            // Either the pipe was broken by Wireshark, or we did break it on
            // purpose in the piping process. Anyway, we're done.
            log.Debug("capture fifo broken, stopped monitoring.")
            return
        }
    }
}

在 MacOS 上产生错误行为的单元测试

https://github.com/siemens/cshargextcap/blob/macos/pipe/checker_notwin_test.goWaitTillBreak--在我们实际关闭命名管道的消费者端之前不得返回的断言失败。

package pipe

import (
    "io"
    "os"
    "time"

    . "github.com/onsi/ginkgo/v2"
    . "github.com/onsi/gomega"
    . "github.com/thediveo/success"
    "golang.org/x/sys/unix"
)

var _ = Describe("pipes", func() {

    It("detects on the write end when a pipe breaks", func() {
        // As Wireshark uses a named pipe it passes an extcap its name (path)
        // and then expects the extcap to open this named pipe for writing
        // packet capture data into it. For this test we simulate Wireshark
        // closing its reading end and we must properly detect this situation on
        // our writing end of the pipe.
        By("creating a temporary named pipe/fifo and opening its ends")
        tmpfifodir := Successful(os.MkdirTemp("", "test-fifo-*"))
        defer os.RemoveAll(tmpfifodir)

        fifoname := tmpfifodir + "/fifo"
        unix.Mkfifo(fifoname, 0660)
        wch := make(chan *os.File)
        go func() {
            defer GinkgoRecover()
            wch <- Successful(os.OpenFile(fifoname, os.O_WRONLY, os.ModeNamedPipe))
        }()

        rch := make(chan *os.File)
        go func() {
            defer GinkgoRecover()
            rch <- Successful(os.OpenFile(fifoname, os.O_RDONLY, os.ModeNamedPipe))
        }()

        var r, w *os.File
        Eventually(rch).Should(Receive(&r))
        Eventually(wch).Should(Receive(&w))
        defer w.Close()

        go func() {
            defer GinkgoRecover()
            By("continously draining the read end of the pipe into /dev/null")
            null := Successful(os.OpenFile("/dev/null", os.O_WRONLY, 0))
            defer null.Close()
            io.Copy(null, r)
            By("pipe draining done")
        }()

        go func() {
            defer GinkgoRecover()
            time.Sleep(2 * time.Second)
            By("closing read end of pipe")
            Expect(r.Close()).To(Succeed())
        }()

        go func() {
            defer GinkgoRecover()
            time.Sleep(300 * time.Microsecond)
            By("writing some data into the pipe")
            w.WriteString("Wireshark rulez")
        }()

        By("waiting for pipe to break")
        start := time.Now()
        WaitTillBreak(w)
        Expect(time.Since(start).Milliseconds()).To(
            BeNumerically(">", 1900), "pipe wasn't broken yet")
    })

})

基于民意调查的版本

这在 macOS 上也不起作用,永远不会返回任何POLLERR.不过,它在 Linux 上确实可以正常工作。

package pipe

import (
    "os"

    "golang.org/x/sys/unix"

    log "github.com/sirupsen/logrus"
)

// WaitTillBreak continuously checks a fifo/pipe to see when it breaks. When
// called, WaitTillBreak blocks until the fifo/pipe finally has broken.
//
// This implementation leverages [unix.Poll].
func WaitTillBreak(fifo *os.File) {
    log.Debug("constantly monitoring packet capture fifo status...")
    fds := []unix.PollFd{
        {
            Fd:     int32(fifo.Fd()),
            Events: 0,
        },
    }
    for {
        // Check the fifo becomming readable, which signals that it has been
        // closed. In this case, ex-termi-nate ;) Oh, and remember to correctly
        // initialize the fdset each time before calling select() ... well, just
        // because that's a good idea to do. :(
        n, err := unix.Poll(fds, 1000 /*ms*/)
        if err != nil {
            if err == unix.EINTR {
                continue
            }
            log.Debugf("capture fifo broken, reason: %s", err.Error())
            return
        }
        if n <= 0 {
            continue
        }
        log.Debugf("poll: %+v", fds)
        if fds[0].Revents&unix.POLLERR != 0 {
            // Either the pipe was broken by Wireshark, or we did break it on
            // purpose in the piping process. Anyway, we're done.
            log.Debug("capture fifo broken, stopped monitoring.")
            return
        }
    }
}

相关内容