同一节点上通过 tcp 127.0.0.1 进行的进程间通信非常慢

同一节点上通过 tcp 127.0.0.1 进行的进程间通信非常慢

同一主机上通过 127.0.0.1 或 eth IP(eg:10.10.253.12) 进行 TCP 通信非常慢。服务器侦听 0.0.0.0:2000,客户端连接到 127.0.0.1:2000 或本地 eth ip:10.10.253.12:2000,CS 传输速度只有每秒 100KB。使用 libevent 的 C 语言程序和使用 Netty 的 Java 语言程序具有相同的效果,程序编写如下:

  1. 服务器接受连接,并回显其收到的所有内容。
  2. 客户端发送任意 128 字节数据,当套接字可写时,发送另一个 128 字节数据;读取并丢弃收到的内容。

这对客户端\服务器程序在不同的机器上运行时工作良好,速度为每秒 30MB。

但通过 127.0.0.1 进行的 zeromq 对通信不存在此类问题。

服务器端代码是:

---start listener
struct evconnlistener *listener = evconnlistener_new_bind(leader,
    listener_cb, NULL,
    LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, s_backlog, &addr,
    addrlen);
if (!listener) {
    logit("Could not create a listener!");
    return 1;
}
int fd = evconnlistener_get_fd(listener);
int keepAlive = 0; // 非0值,开启keepalive属性
setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive));
do{
    if (event_base_loop(leader, EVLOOP_NO_EXIT_ON_EMPTY) < 0){
        break;
    }
}while(!event_base_got_exit(leader));

---connect processing
static void listener_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sa, int socklen, void *user_data) {
    if (s_rcvbufsize > 0){
        setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *)&s_rcvbufsize, sizeof(s_rcvbufsize));
    }
    if (s_sndbufsize > 0){
        setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&s_sndbufsize, sizeof(s_sndbufsize));
    }
    setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&s_tcpnodelay, sizeof(s_tcpnodelay));
    int keepAlive = 0;    // 非0值,开启keepalive属性
    setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive));

    struct bufferevent *bev = bufferevent_socket_new(s_worker, fd, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE);
    if (!bev) {
        logit("Error constructing bufferevent!");
        evutil_closesocket(fd);
        return;
    }
    bufferevent_setcb(bev, conn_readcb, conn_writecb, conn_eventcb, NULL);
    bufferevent_enable(bev, EV_READ);
}

---read\write processing
static void conn_writecb(struct bufferevent *bev, void *user_data) {
}
static void conn_readcb(struct bufferevent *bev, void *user_data) {
    struct evbuffer *input = bufferevent_get_input(bev);
    int len = evbuffer_get_length(input);

    struct evbuffer *output = bufferevent_get_output(bev);
    evbuffer_add_buffer(output, input);
}

客户端代码是:

---init connection
struct bufferevent* bev= bufferevent_socket_new(s_event_base, -1, BEV_OPT_CLOSE_ON_FREE|BEV_OPT_THREADSAFE);
if (!bev){
    return 1;
}
struct timeval tv;
tv.tv_sec = 30; //connect timeout
tv.tv_usec = 0;
bufferevent_set_timeouts(bev, NULL, &tv);
bufferevent_setcb(bev, NULL, NULL, connect_eventcb, (void*)s_event_base);
int flag = bufferevent_socket_connect(bev, &s_target_sockaddr, s_target_socklen);
if (-1 == flag ){
    bufferevent_free(bev);
    return 1;
}

---connected processing
static void connect_eventcb(struct bufferevent *bev, short events, void *user_data) {
    if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)){
        bufferevent_free(bev);
    }else if (events & BEV_EVENT_CONNECTED) {
        int fd = bufferevent_getfd(bev);
        if (s_sorcvbufsize > 0){
            setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *)&s_sorcvbufsize, sizeof(s_sorcvbufsize));
        }
        if (s_sosndbufsize > 0){
            setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *)&s_sosndbufsize, sizeof(s_sosndbufsize));
        }
        setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&s_tcpnodelay, sizeof(s_tcpnodelay));
        int keepAlive = 0;    // 非0值,开启keepalive属性
        setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&keepAlive, sizeof(keepAlive));

        bufferevent_setwatermark(bev, EV_WRITE, s_snd_wmark_l, s_snd_wmark_h);
        bufferevent_setcb(bev, conn_readcb, conn_writecb, conn_eventcb, NULL);

        bufferevent_enable(bev, EV_READ|EV_WRITE);
        bufferevent_trigger(bev, EV_WRITE, BEV_TRIG_IGNORE_WATERMARKS|BEV_OPT_DEFER_CALLBACKS);
    }
}

---read/write processing
static void conn_writecb(struct bufferevent *bev, void *user_data) {
    struct evbuffer *output = bufferevent_get_output(bev);
    for (int len = evbuffer_get_length(output); len < s_snd_wmark_h; len += s_sendsize){
        if (0 != bufferevent_write(bev, s_send_buf, s_sendsize)){
            break;
        }
    }
}
static void conn_readcb(struct bufferevent *bev, void *user_data) {
    struct evbuffer *input = bufferevent_get_input(bev);
    evbuffer_drain(input, 0x7FFFFFFF);
}

Tshark 捕获显示无论 SO_KEEPALIVE 如何设置,都会有许多 KeepAliveReq: tshark 捕获结果1

tshark 捕获结果2

答案1

我现在测试解决了:主要原因是服务器端的发送缓冲区大小比较小(8K),然后是recv的大小,导致服务器发送拥塞。

当我将两个缓冲区大小调整为 32K 时,问题消失了。

相关内容