以毫秒为单位监控流量

以毫秒为单位监控流量

我有几台运行 Ubuntu 14.04 的 Linux 服务器,并将它们变成了 Spark 集群。此时,我想以毫秒为单位测量服务器之间的网络流量,以便执行一些推理和分析任务,用于研究目的。我尝试了以下操作,但没有成功:

1) 我从脚本中反复调用“iptables -nvx -L”并获取其输出,其中包含我想要的信息(数据包和字节数)。但是,调用它需要超过 1ms 的时间,因为我在 Python(使用子进程模块)和 Bash 脚本中都实现了它。

Python:

args = ['iptables','-nvx','-L','日志']

raw_traffic = sp.Popen('iptables -nvx -L Log', shell = True, stdout = sp.PIPE).stdout

(上述代码运行耗时1.2ms)

重击:

开始时间=$(日期+%s%N)

std_output=$(iptables -nvx -L 日志)

回显“(($(date + %s %N)-$start_time)/1000000)”| bc -l

输出为:

$ sudo sh foo.sh

1.17057700000000000000

2) 我尝试让 iptables 记录来自服务器 IP 地址的每个数据包。它确实达到了目的,因为它可以最详细地测量所有内容。但是,它会减慢流量并产生很大的开销。服务器之间的链接是 10G,因此全速时每个 IP 地址每秒会生成大约 1M 行日志,这是不可行的。我此时需要的信息是服务器之间每毫秒的数据包和字节数。

有没有 1ms 规模监控的变通方法?我想应该可以用 iptables 来实现,也许需要一些技巧。任何建议都非常感谢。提前致谢!

答案1

您实际上可以这样做,但您需要使用 NF_LOG iptables 目标并编写一个专门满足您需求的程序。

NF_LOG 将会发送数据包到接收应用程序,该应用程序(考虑到您完成操作的速度)将以每毫秒的级别对数​​据包进行计数。

您可以避免写出每个数据包并截断数据以确保您可以尝试获取所需的 100 万 pps。

然而问题是不存在这样的接收程序——您需要编写一个。

之前我对 NF_LOG 做过一些事情,而且您说 LOG 会减慢流量,我怀疑是将数据包写入磁盘导致了流量减少。

我仍然认为每秒一百万个数据包仍然是一个很难达到的标准。

另一种选择(非实时)是使用 tcpdump 来捕获数据包,然后作为后续步骤使用程序读取 pcap 文件并进行适当的毫秒计算。

答案2

不管你如何处理这些信息,你可能需要使用 C 等低级语言来提高性能。你甚至可能需要使用多线程。

我还没有尝试过,但是网络过滤器图书馆libnetfilter_log看起来很有希望。iptables本质上netfilter 命令行界面netfilter 是 Linux 内核中的包过滤器。所以这将是你的数据包过滤器的直接库。听起来很高效。

http://www.netfilter.org/projects/libnetfilter_log/index.html

什么是 libnetfilter_log?

libnetfilter_log 是一个用户空间库,为内核数据包过滤器记录的数据包提供接口。它是弃用旧版基于 syslog/dmesg 的数据包日志记录的系统的一部分。此库以前称为 libnfnetlink_log。

主要特点

从内核 nfnetlink_log 子系统接收要记录的数据包

以下是一些示例代码(示例复制自 netfilter 网站/还不错吧?)

00002 #include <stdio.h>
00003 #include <stdlib.h>
00004 #include <unistd.h>
00005 #include <netinet/in.h>
00006 
00007 #include <libnetfilter_log/libnetfilter_log.h>
00008 
00009 static int print_pkt(struct nflog_data *ldata)
00010 {
00011         struct nfulnl_msg_packet_hdr *ph = nflog_get_msg_packet_hdr(ldata);
00012         u_int32_t mark = nflog_get_nfmark(ldata);
00013         u_int32_t indev = nflog_get_indev(ldata);
00014         u_int32_t outdev = nflog_get_outdev(ldata);
00015         char *prefix = nflog_get_prefix(ldata);
00016         char *payload;
00017         int payload_len = nflog_get_payload(ldata, &payload);
00018         
00019         if (ph) {
00020                 printf("hw_protocol=0x%04x hook=%u ", 
00021                         ntohs(ph->hw_protocol), ph->hook);
00022         }
00023 
00024         printf("mark=%u ", mark);
00025 
00026         if (indev > 0)
00027                 printf("indev=%u ", indev);
00028 
00029         if (outdev > 0)
00030                 printf("outdev=%u ", outdev);
00031 
00032 
00033         if (prefix) {
00034                 printf("prefix=\"%s\" ", prefix);
00035         }
00036         if (payload_len >= 0)
00037                 printf("payload_len=%d ", payload_len);
00038 
00039         fputc('\n', stdout);
00040         return 0;
00041 }
00042 
00043 static int cb(struct nflog_g_handle *gh, struct nfgenmsg *nfmsg,
00044                 struct nflog_data *nfa, void *data)
00045 {
00046         print_pkt(nfa);
00047 }
00048 
00049 
00050 int main(int argc, char **argv)
00051 {
00052         struct nflog_handle *h;
00053         struct nflog_g_handle *qh;
00054         struct nflog_g_handle *qh100;
00055         int rv, fd;
00056         char buf[4096];
00057 
00058         h = nflog_open();
00059         if (!h) {
00060                 fprintf(stderr, "error during nflog_open()\n");
00061                 exit(1);
00062         }
00063 
00064         printf("unbinding existing nf_log handler for AF_INET (if any)\n");
00065         if (nflog_unbind_pf(h, AF_INET) < 0) {
00066                 fprintf(stderr, "error nflog_unbind_pf()\n");
00067                 exit(1);
00068         }
00069 
00070         printf("binding nfnetlink_log to AF_INET\n");
00071         if (nflog_bind_pf(h, AF_INET) < 0) {
00072                 fprintf(stderr, "error during nflog_bind_pf()\n");
00073                 exit(1);
00074         }
00075         printf("binding this socket to group 0\n");
00076         qh = nflog_bind_group(h, 0);
00077         if (!qh) {
00078                 fprintf(stderr, "no handle for grup 0\n");
00079                 exit(1);
00080         }
00081 
00082         printf("binding this socket to group 100\n");
00083         qh100 = nflog_bind_group(h, 100);
00084         if (!qh100) {
00085                 fprintf(stderr, "no handle for group 100\n");
00086                 exit(1);
00087         }
00088 
00089         printf("setting copy_packet mode\n");
00090         if (nflog_set_mode(qh, NFULNL_COPY_PACKET, 0xffff) < 0) {
00091                 fprintf(stderr, "can't set packet copy mode\n");
00092                 exit(1);
00093         }
00094 
00095         fd = nflog_fd(h);
00096 
00097         printf("registering callback for group 0\n");
00098         nflog_callback_register(qh, &cb, NULL);
00099 
00100         printf("going into main loop\n");
00101         while ((rv = recv(fd, buf, sizeof(buf), 0)) && rv >= 0) {
00102                 struct nlmsghdr *nlh;
00103                 printf("pkt received (len=%u)\n", rv);
00104 
00105                 /* handle messages in just-received packet */
00106                 nflog_handle_packet(h, buf, rv);
00107         }
00108 
00109         printf("unbinding from group 100\n");
00110         nflog_unbind_group(qh100);
00111         printf("unbinding from group 0\n");
00112         nflog_unbind_group(qh);
00113 
00114 #ifdef INSANE
00115         /* norally, applications SHOULD NOT issue this command,
00116          * since it detaches other programs/sockets from AF_INET, too ! */
00117         printf("unbinding from AF_INET\n");
00118         nflog_unbind_pf(h, AF_INET);
00119 #endif
00120 
00121         printf("closing handle\n");
00122         nflog_close(h);
00123 
00124         exit(0);
00125 }

相关内容