我有几台运行 Ubuntu 14.04 的 Linux 服务器,并将它们变成了 Spark 集群。此时,我想以毫秒为单位测量服务器之间的网络流量,以便执行一些推理和分析任务,用于研究目的。我尝试了以下操作,但没有成功:
1) 我从脚本中反复调用“iptables -nvx -L”并获取其输出,其中包含我想要的信息(数据包和字节数)。但是,调用它需要超过 1ms 的时间,因为我在 Python(使用子进程模块)和 Bash 脚本中都实现了它。
Python:
args = ['iptables','-nvx','-L','日志']
raw_traffic = sp.Popen('iptables -nvx -L Log', shell = True, stdout = sp.PIPE).stdout
(上述代码运行耗时1.2ms)
重击:
开始时间=$(日期+%s%N)
std_output=$(iptables -nvx -L 日志)
回显“(($(date + %s %N)-$start_time)/1000000)”| bc -l
输出为:
$ sudo sh foo.sh
1.17057700000000000000
2) 我尝试让 iptables 记录来自服务器 IP 地址的每个数据包。它确实达到了目的,因为它可以最详细地测量所有内容。但是,它会减慢流量并产生很大的开销。服务器之间的链接是 10G,因此全速时每个 IP 地址每秒会生成大约 1M 行日志,这是不可行的。我此时需要的信息是服务器之间每毫秒的数据包和字节数。
有没有 1ms 规模监控的变通方法?我想应该可以用 iptables 来实现,也许需要一些技巧。任何建议都非常感谢。提前致谢!
答案1
您实际上可以这样做,但您需要使用 NF_LOG iptables 目标并编写一个专门满足您需求的程序。
NF_LOG 将会发送数据包到接收应用程序,该应用程序(考虑到您完成操作的速度)将以每毫秒的级别对数据包进行计数。
您可以避免写出每个数据包并截断数据以确保您可以尝试获取所需的 100 万 pps。
然而问题是不存在这样的接收程序——您需要编写一个。
之前我对 NF_LOG 做过一些事情,而且您说 LOG 会减慢流量,我怀疑是将数据包写入磁盘导致了流量减少。
我仍然认为每秒一百万个数据包仍然是一个很难达到的标准。
另一种选择(非实时)是使用 tcpdump 来捕获数据包,然后作为后续步骤使用程序读取 pcap 文件并进行适当的毫秒计算。
答案2
不管你如何处理这些信息,你可能需要使用 C 等低级语言来提高性能。你甚至可能需要使用多线程。
我还没有尝试过,但是网络过滤器图书馆libnetfilter_log
看起来很有希望。iptables本质上netfilter 命令行界面和netfilter 是 Linux 内核中的包过滤器。所以这将是你的数据包过滤器的直接库。听起来很高效。
http://www.netfilter.org/projects/libnetfilter_log/index.html
什么是 libnetfilter_log?
libnetfilter_log 是一个用户空间库,为内核数据包过滤器记录的数据包提供接口。它是弃用旧版基于 syslog/dmesg 的数据包日志记录的系统的一部分。此库以前称为 libnfnetlink_log。
主要特点
从内核 nfnetlink_log 子系统接收要记录的数据包
以下是一些示例代码(示例复制自 netfilter 网站/还不错吧?)
00002 #include <stdio.h>
00003 #include <stdlib.h>
00004 #include <unistd.h>
00005 #include <netinet/in.h>
00006
00007 #include <libnetfilter_log/libnetfilter_log.h>
00008
00009 static int print_pkt(struct nflog_data *ldata)
00010 {
00011 struct nfulnl_msg_packet_hdr *ph = nflog_get_msg_packet_hdr(ldata);
00012 u_int32_t mark = nflog_get_nfmark(ldata);
00013 u_int32_t indev = nflog_get_indev(ldata);
00014 u_int32_t outdev = nflog_get_outdev(ldata);
00015 char *prefix = nflog_get_prefix(ldata);
00016 char *payload;
00017 int payload_len = nflog_get_payload(ldata, &payload);
00018
00019 if (ph) {
00020 printf("hw_protocol=0x%04x hook=%u ",
00021 ntohs(ph->hw_protocol), ph->hook);
00022 }
00023
00024 printf("mark=%u ", mark);
00025
00026 if (indev > 0)
00027 printf("indev=%u ", indev);
00028
00029 if (outdev > 0)
00030 printf("outdev=%u ", outdev);
00031
00032
00033 if (prefix) {
00034 printf("prefix=\"%s\" ", prefix);
00035 }
00036 if (payload_len >= 0)
00037 printf("payload_len=%d ", payload_len);
00038
00039 fputc('\n', stdout);
00040 return 0;
00041 }
00042
00043 static int cb(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
00044 struct nflog_data *nfa, void *data)
00045 {
00046 print_pkt(nfa);
00047 }
00048
00049
00050 int main(int argc, char **argv)
00051 {
00052 struct nflog_handle *h;
00053 struct nflog_g_handle *qh;
00054 struct nflog_g_handle *qh100;
00055 int rv, fd;
00056 char buf[4096];
00057
00058 h = nflog_open();
00059 if (!h) {
00060 fprintf(stderr, "error during nflog_open()\n");
00061 exit(1);
00062 }
00063
00064 printf("unbinding existing nf_log handler for AF_INET (if any)\n");
00065 if (nflog_unbind_pf(h, AF_INET) < 0) {
00066 fprintf(stderr, "error nflog_unbind_pf()\n");
00067 exit(1);
00068 }
00069
00070 printf("binding nfnetlink_log to AF_INET\n");
00071 if (nflog_bind_pf(h, AF_INET) < 0) {
00072 fprintf(stderr, "error during nflog_bind_pf()\n");
00073 exit(1);
00074 }
00075 printf("binding this socket to group 0\n");
00076 qh = nflog_bind_group(h, 0);
00077 if (!qh) {
00078 fprintf(stderr, "no handle for grup 0\n");
00079 exit(1);
00080 }
00081
00082 printf("binding this socket to group 100\n");
00083 qh100 = nflog_bind_group(h, 100);
00084 if (!qh100) {
00085 fprintf(stderr, "no handle for group 100\n");
00086 exit(1);
00087 }
00088
00089 printf("setting copy_packet mode\n");
00090 if (nflog_set_mode(qh, NFULNL_COPY_PACKET, 0xffff) < 0) {
00091 fprintf(stderr, "can't set packet copy mode\n");
00092 exit(1);
00093 }
00094
00095 fd = nflog_fd(h);
00096
00097 printf("registering callback for group 0\n");
00098 nflog_callback_register(qh, &cb, NULL);
00099
00100 printf("going into main loop\n");
00101 while ((rv = recv(fd, buf, sizeof(buf), 0)) && rv >= 0) {
00102 struct nlmsghdr *nlh;
00103 printf("pkt received (len=%u)\n", rv);
00104
00105 /* handle messages in just-received packet */
00106 nflog_handle_packet(h, buf, rv);
00107 }
00108
00109 printf("unbinding from group 100\n");
00110 nflog_unbind_group(qh100);
00111 printf("unbinding from group 0\n");
00112 nflog_unbind_group(qh);
00113
00114 #ifdef INSANE
00115 /* norally, applications SHOULD NOT issue this command,
00116 * since it detaches other programs/sockets from AF_INET, too ! */
00117 printf("unbinding from AF_INET\n");
00118 nflog_unbind_pf(h, AF_INET);
00119 #endif
00120
00121 printf("closing handle\n");
00122 nflog_close(h);
00123
00124 exit(0);
00125 }