处理套接字的更传统方法是每个连接都有一个缓冲区,并且一旦套接字变得可读,就增量读取尽可能多的字节。
对于通过 TCP 运行的框架协议,更有效的方法是利用内核已经为每个连接缓冲数据的事实。 Linux 提供了 SO_RCVLOWAT 标志来防止 poll/select/epoll 将套接字标记为可读,直到获得所需的字节数为止。这可以与 FIONREAD ioctl 结合使用,以读取有多少字节可立即用于消耗,以防止处理循环中的部分读取。在这种结构下,可以立即将整个帧读入单个共享缓冲区(大小为最大帧大小)并就地处理,准备好被下一帧覆盖。
但是,我的实现中断了。即使 FIONREAD 报告的值小于 SO_RCVLOWAT,Epoll 也会开始触发 READ 事件。使用 MSG_PEEK 调用 recv() 返回一个与 FIONREAD 匹配的值,因此它们一致认为套接字确实不是至少有 SO_RCVLOWAT 字节可立即读取,因此不应被视为可由 epoll 读取。
这是我的实现,以及一些重现该问题的示例服务器/客户端代码:https://github.com/MrSonicMaster/broken
尤其:
static void handle_reads(proto_state *s) {
uint32_t bav;
ioctl(s->fd, FIONREAD, &bav);
uint32_t lowat;
getsockopt(s->fd, SOL_SOCKET, SO_RCVLOWAT, &lowat,
&(socklen_t){sizeof lowat});
printf("EPOLL FIRED READ EVENT FIONREAD=%d SO_RCVLOWAT=%d\n", bav, lowat);
if (bav < lowat) {
/* debug code */
#define CAP (1 << 15)
uint8_t *largebuf = alloca(CAP);
ssize_t recvd = recv(s->fd, largebuf, CAP, MSG_PEEK);
printf(
"is FIONREAD lying? actual bav via recv with MSG_PEEK = %ld (cap %d)\n",
recvd, CAP);
}
printf("BAV %d NEED %d\n", bav, s->hdr.len);
msghdr hdr = s->hdr;
while (bav >= hdr.len) {
ssize_t read_bytes = read(s->fd, rbuffer, hdr.len);
if (read_bytes != hdr.len) {
fprintf(stderr, "WTF HOW? BROKE!\n");
break;
}
bav -= read_bytes;
if (s->need_hdr) {
hdr = *(msghdr *)rbuffer;
if (hdr.len > 16384) {
fprintf(stderr, "msg too large %d\n", hdr.len);
close(s->fd);
handle_close(s);
return;
}
// printf("READ HEADER, CODE %d LEN %d (udat %d)\n", hdr.code, hdr.len,
// hdr.udat);
if (hdr.len == 0) {
/* handle zero-length message */
s->frame_cb(s, hdr, NULL);
hdr.len = sizeof(msghdr);
continue;
}
s->need_hdr = 0;
} else {
// printf("READ FRAME, CODE %d LEN %d\n", hdr.code, hdr.len);
s->frame_cb(s, hdr, rbuffer);
hdr.len = sizeof(msghdr);
s->need_hdr = 1;
}
}
s->hdr = hdr;
out_setlowat:
if (hdr.len != s->lowat) {
setsockopt(s->fd, SOL_SOCKET, SO_RCVLOWAT, &hdr.len, sizeof hdr.len);
s->lowat = hdr.len;
printf("SET lowat %d\n", s->lowat);
}
}
...
void proto_loop() {
int nevents = epoll_wait(ep, events, MAX_EVENTS, 0);
if (nevents == -1) {
perror("proto_loop() epoll_wait()");
return;
}
for (int i = 0; i < nevents; i++) {
struct epoll_event event = events[i];
void *ptr = (void *)(((uintptr_t)event.data.ptr) & ~1);
// printf("EVENT %d %p\n", event.events, ptr);
if (ptr != event.data.ptr) {
listen_desc *d = ptr;
handle_accept(d->fd, d->cb);
return;
}
if (event.events & EPOLLERR || event.events & EPOLLHUP ||
event.events & EPOLLRDHUP)
handle_close(ptr);
else {
if (event.events & EPOLLIN)
handle_reads(ptr);
if (event.events & EPOLLOUT)
handle_writes(ptr);
}
}
}
在我尝试过的所有配置下,最终都会卡住:
EPOLL FIRED READ EVENT FIONREAD=29103 SO_RCVLOWAT=14764
BAV 29103 NEED 14764
got frame with code 0 len 14764
got frame with code 0 len 5232
SET lowat 9647
EPOLL FIRED READ EVENT FIONREAD=9083 SO_RCVLOWAT=9647
is FIONREAD lying? actual bav via recv with MSG_PEEK = 9083 (cap 32768)
EPOLL FIRED READ EVENT FIONREAD=9083 SO_RCVLOWAT=9647
is FIONREAD lying? actual bav via recv with MSG_PEEK = 9083 (cap 32768)
EPOLL FIRED READ EVENT FIONREAD=9083 SO_RCVLOWAT=9647
is FIONREAD lying? actual bav via recv with MSG_PEEK = 9083 (cap 32768)
EPOLL FIRED READ EVENT FIONREAD=9083 SO_RCVLOWAT=9647
is FIONREAD lying? actual bav via recv with MSG_PEEK = 9083 (cap 32768)
...
我使用级别触发的 epoll,并通过强制缓冲区为静态大小来禁用内核缓冲区自动调整。我将它们设置的大小似乎并不重要(只要它足够大以容纳整个帧而不会完全关闭拥塞窗口,例如> 2x最大帧大小)
另一个值得注意的事情是,关闭服务器不会在客户端上生成 EPOLLRDHUP 事件。
不幸的是,我似乎没有找到其他人这样做的例子,所以我不知道它是否会起作用。
答案1
所以,我实际上不久前就解决了这个问题,但没有抽出时间发布答案。
当非阻塞 Linux 套接字耗尽缓冲区空间时,它将违反 SO_RCVLOWAT 设置,并且无论如何都会被标记为可读。此行为没有在任何地方记录。唯一提到的是对 Linux 内核源代码树的一些提交。我最初并不认为是这种情况,因为显示问题的代码不应该遇到这个问题 - 我相信这是由在 UNIX 域套接字上进行的额外未记录的优化引起的,导致发送者 SNDBUF 通过管道传输到接收者 RCVBUF一种将批量发送发送到比一次可以容纳的更大的块中的方式(大于单个发送应有的大小)。我可以确认这是问题所在,因为当违反 SO_RCVLOWAT 时,使用 SO_ERROR 调用 getsockopt() 会返回 ENOBUFS。
在 UNIX 域套接字上实现此操作的解决方案是确保接收方 RCVBUF 足够大,能够容纳比整个发送方 SNDBUF 多的内容,这样 Linux 内部进行的任何优化/批处理都不会破坏程序逻辑。