目标
中央存储和分析性能数据的方式:
- CPU负载
- 内存使用情况
- ...
当前策略
我想实现这样的设置:
- 收集
- 日志存储
- elasticsearch
- 基巴纳
就像这里解释的那样: https://mtalavera.wordpress.com/2015/02/16/monitoring-with-collectd-and-kibana/
问题:远程主机无法推送数据
限制:
- 我们只有
ssh
从中央服务器到远程主机。 ssh
由于网络设置(不幸的是我无法改变),从远程主机到中央服务器不起作用。- 网络流量跨越多个非公共网络。每月两次,由于管理员玩弄防火墙规则,主机无法访问。我不想丢失任何一行。这就是为什么我想将日志存储在远程主机上并获取(压缩)数据。
解决方案?
我如何才能获取每小时的数据?
答案1
对于上面列出的问题,您需要在远程端缓冲统计数据,以免丢失任何信息。
有很多方法可以做到这一点,但没有一种过于简单,需要进行大量测试才能确保它们可行。它们都涉及在collectd
本地编写 的输出,然后使用某种方法将其发送到中央服务器上。
我还没有测试过以下任何内容,因此有些可能根本不起作用。
以下不分难易程度:
套接字/网络输出到脚本
将输出写入Collectd
套接字或 IP/端口,其中 PHP/Perl/Python/Bash 脚本正在监听以将命令写入文件。然后,这些文件可以被推送到/拉取到中央服务器并被 Logstash 提取。
优点:用于捕获输出的简单脚本;使用标准 Linux 命令
缺点:如果要提取大量统计数据,则不可扩展;需要维护脚本;不确定 LS 是否会处理普通协议Redis / AMQP / Kafka / MongoDB 将 的输出写入
Collectd
可能的“缓冲区”之一。它们的工作方式略有不同,并且具有不同的部署选项,因此我将留给您来决定哪个是最好的,因为这超出了这个问题的范围。也就是说,它们中的任何一个都应该有效。然后,您需要一种方法将缓冲溶液中的数据返回到中央服务器。应用程序本机复制/镜像/群集或每隔 X 间隔运行一次以传送数据的脚本(在任一端运行)是两种可能性。
优点:非常灵活的部署选项;应该可以很好地扩展;使用众所周知的工具/程序
缺点:缓冲程序可能需要大量资源,或者安装了许多软件包套接字/网络输出至 Logstash 这几乎与选项 1 相同,但不是输出
collectd
到脚本/程序,而是将其写入每个远程主机上的本地 Logstash 实例。然后,Logstash 会在本地写入 CSV/JSON,您可以使用任何方式将这些文件返回到中央服务器,包括 LS 本身。
优点:一套用于整个解决方案的工具;提供一种在边缘转换数据然后集中提取的方法;移动部件很少 缺点:需要所有远程主机上都有 Java/LS
除了每个选项的优缺点之外,它们唯一的共同缺点是您需要找到一种方法来维护所有服务器上的一致配置。如果您有很多远程节点(或者一般来说有很多节点),您可能已经有一个配置管理系统,这将是微不足道的。
答案2
编辑:注意!警告!
请使用这个docker-compose
而不是我链接的那个(它确实需要docker
并且compose
可能,machine
但它为您做了更多,您将需要更少的努力。
凯尔特语:https://github.com/codenamekt/celk-docker-compose/blob/master/logstash/logstash.conf
还
因此,请从这里开始,对工作系统有一个大致的了解。他们已经为您完成了部分工作,因此您只需担心与配置和部署相关的问题。
即使你最终没有使用 Docker,这仍然会让你走上成功之路和还有一个额外的好处,就是可以向你展示它们是如何组合在一起的。
首先获取 Vagrant 并使用 vagrant up 构建 Vagrant 镜像
如果你不知道 Vagrant 是什么,它真的很棒。它是一个允许人们共享整套虚拟机和配置器的程序,这样你就可以只定义一个虚拟机及其配置,而不必共享整个虚拟机,而且它“可以正常工作”。它感觉很神奇,但它实际上只是可靠的系统工作。
- (使用上面的链接访问 CELK):https://github.com/pblittle/docker-logstash
您需要安装 Vagrant 才能使用它。只需安装即可!然后,您无需安装,docker
因为它将在 Vagrant VM 上运行。
您有四种选择来使用它,但首先,使用粗体命令准备好 Vagrant....
vagrant up
决定需要运行哪些程序
您的选择是:
- 全套套件或 ELK(elasticsearch、logstash、kibana)
- 仅限代理(Logstash 收集器)
- 仅限 Kibana
还有其他可用配置,但仅供测试。
开演时间
现在是时候配置 Logstash 了,这实际上是唯一具有复杂行为的部分。
Logstash 配置文件是纯文本文件,以 结尾conf
,并且可以选择使用tar
或 gunzip组合在一起gz
。
您可以通过以下两种方式之一获取配置文件:
- 你可以从互联网上下载它们,使用环境变量
LOGSTASH_CONFIG_URL
指向你的配置的 URL,并且**如果你输入了错误的 URL 或者出现了一些问题,导致无法从 URL 获取配置,那么它会返回到一个已知的 URL,否则 - 从磁盘读取它们,因为这是docker,你实际上会创建一个体积一次(现在),每次运行容器时你都会挂载该卷。
当您使用来自互联网的配置运行时,它看起来是这样的:
$ docker run -d \
-e LOGSTASH_CONFIG_URL=https://secretlogstashesstash.com/myconfig.tar.gz \
-p 9292:9292 \
-p 9200:9200 \
pblittle/docker-logstash
作者docker
警告你:
默认的 logstash.conf 仅监听 stdin 和文件输入。如果您希望配置 tcp 和/或 udp 输入,请使用您自己的 logstash 配置文件并自行公开端口。请参阅 logstash 文档以了解配置语法和更多信息。
注意:什么是默认 logstash 配置?
回想一下,这是当你没有输入所需环境变量的正确 URL 时获得的文件LOGSTASH_CONFIG_URL
这是输入部分:
// As the author warned, this is all you get. StdIn and Syslog file inputs.
input {
stdin {
type => "stdin-type"
}
file {
type => "syslog"
path => [ "/var/log/*.log", "/var/log/messages", "/var/log/syslog" ]
}
file {
type => "logstash"
path => [ "/var/log/logstash/logstash.log" ]
start_position => "beginning"
}
}
超越默认
logstash
在网站上阅读更多相关信息。
现在logstash
有插件可以将数据推送到input
。插件的种类正如您所期望的;下面是一些插件:
s3
来自亚马逊(文件系统事件)stdin
来自logstash
(默认,读取stdin
缓冲区)http
来自logstash
(你的猜测)- ...ETC...
示例:UDP 套接字
UDP
是一种无连接、快速的协议,运行在(传输)底层L4
,支持多路复用、处理故障,通常是日志数据传输的良好选择。
您选择所需的端口;其他选项取决于您正在做的事情。
TCP 的工作方式相同。
udp { 端口 => 9999 编解码器 => json 缓冲区大小 => 1452 }
示例 2:UDP 套接字的collectd
过滤和输出
This is stolen from https://github.com/codenamekt/celk-docker-compose/blob/master/logstash/logstash.conf
input {
udp {
port => 25826 # 25826 matches port specified in collectd.conf
buffer_size => 1452 # 1452 is the default buffer size for Collectd
codec => collectd { } # specific Collectd codec to invoke
type => collectd
}
}
output {
elasticsearch {
host => elasticsearch
cluster => logstash
protocol => http
}
}
过滤就是一个很好的例子: 也就是说,它真的很长,而且我认为它确实做了一些事情
filter {
# TEST implementation of parse for collectd
if [type] == "collectd" {
if [plugin] {
mutate {
rename => { "plugin" => "collectd_plugin" }
}
}
if [plugin_instance] {
mutate {
rename => { "plugin_instance" => "collectd_plugin_instance" }
}
}
if [type_instance] {
mutate {
rename => { "type_instance" => "collectd_type_instance" }
}
}
if [value] {
mutate {
rename => { "value" => "collectd_value" }
}
mutate {
convert => { "collectd_value" => "float" }
}
}
if [collectd_plugin] == "interface" {
mutate {
add_field => {
"collectd_value_instance" => "rx"
"collectd_value" => "%{rx}"
}
}
mutate {
convert => {
"tx" => "float"
"collectd_value" => "float"
}
}
# force clone for kibana3
clone {
clones => [ "tx" ]
}
##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working #####
ruby {
code => "
if event['type'] == 'tx'
event['collectd_value_instance'] = 'tx'
event['collectd_value'] = event['tx']
end
"
}
mutate {
replace => { "_type" => "collectd" }
replace => { "type" => "collectd" }
remove_field => [ "rx", "tx" ]
}
}
if [collectd_plugin] == "disk" {
mutate {
add_field => {
"collectd_value_instance" => "read"
"collectd_value" => "%{read}"
}
}
mutate {
convert => {
"write" => "float"
"collectd_value" => "float"
}
}
# force clone for kibana3
clone {
clones => [ "write" ]
}
##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working #####
ruby {
code => "
if event['type'] == 'write'
event['collectd_value_instance'] = 'write'
event['collectd_value'] = event['write']
end
"
}
mutate {
replace => { "_type" => "collectd" }
replace => { "type" => "collectd" }
remove_field => [ "read", "write" ]
}
}
if [collectd_plugin] == "df" {
mutate {
add_field => {
"collectd_value_instance" => "free"
"collectd_value" => "%{free}"
}
}
mutate {
convert => {
"used" => "float"
"collectd_value" => "float"
}
}
# force clone for kibana3
clone {
clones => [ "used" ]
}
##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working #####
ruby {
code => "
if event['type'] == 'used'
event['collectd_value_instance'] = 'used'
event['collectd_value'] = event['used']
end
"
}
mutate {
replace => { "_type" => "collectd" }
replace => { "type" => "collectd" }
remove_field => [ "used", "free" ]
}
}
if [collectd_plugin] == "load" {
mutate {
add_field => {
"collectd_value_instance" => "shortterm"
"collectd_value" => "%{shortterm}"
}
}
mutate {
convert => {
"longterm" => "float"
"midterm" => "float"
"collectd_value" => "float"
}
}
# force clone for kibana3
clone {
clones => [ "longterm", "midterm" ]
}
##### BUG EXISTS : AFTER clone 'if [type] == "foo"' NOT WORKING : ruby code is working #####
ruby {
code => "
if event['type'] != 'collectd'
event['collectd_value_instance'] = event['type']
event['collectd_value'] = event[event['type']]
end
"
}
mutate {
replace => { "_type" => "collectd" }
replace => { "type" => "collectd" }
remove_field => [ "longterm", "midterm", "shortterm" ]
}
}
}
}
编辑 3:我可能不应该为您做工作,但没关系。
collectd
像任何好的软件一样封装有些方面对于用户来说很难看或难以处理,我们会尝试让它变得简单,看起来就像您正在发送数据(在本例中是一个元组),而不是愚弄序列化。
你的例子:
(date_time, current_cpu_load), for example ('2016-0-04-24 11:09:12', 12.3)
我不会花时间去弄清楚你是如何形成这个的。如果你能够使用 CPU 插件获取这些数据,那就太好了。我将复制并粘贴我在网上找到的一个插件,以方便我使用。
话虽如此,想想看......只需一点点,就不会有害。
您会看到 CPU 插件已在下面加载。
collectd
您会看到文件中的界面conf
太小,无法指定字段。
所以如果你这样做,它会起作用,但你会得到还有更多数据不仅仅是 CPU 负载。
您可以使用过滤器。但我认为您也可以在 Kibana 中执行此操作。因此,我宁愿不浪费时间编写一个您 a) 不需要且 b) 如果您花点时间就可以轻松编写的过滤器。
## In `collectd`:
# For each instance where collectd is running, we define
# hostname proper to that instance. When metrics from
# multiple instances are aggregated, hostname will tell
# us were they came from.
Hostname "**YOUR_HOSTNAME**"
# Fully qualified domain name, false for our little lab
FQDNLookup false
# Plugins we are going to use with their configurations,
# if needed
LoadPlugin cpu
LoadPlugin df
<Plugin df>
Device "/dev/sda1"
MountPoint "/"
FSType "ext4"
ReportReserved "true"
</Plugin>
LoadPlugin interface
<Plugin interface>
Interface "eth0"
IgnoreSelected false
</Plugin>
LoadPlugin network
<Plugin network>
Server "**YOUR.HOST.IP.ADDR**" "**PORTNUMBER**"
</Plugin>
LoadPlugin memory
LoadPlugin syslog
<Plugin syslog>
LogLevel info
</Plugin>
LoadPlugin swap
<Include "/etc/collectd/collectd.conf.d">
Filter ".conf"
</Include>
您的 logstash 配置
input {
udp {
port => **PORTNUMBER** # 25826 matches port specified in collectd.conf
buffer_size => **1452** **# 1452 is the default buffer size for Collectd**
codec => collectd { } # specific Collectd codec to invoke
type => collectd
}
}
output {
elasticsearch {
cluster => **ELASTICSEARCH_CLUSTER_NAME** # this matches out elasticsearch cluster.name
protocol => http
}
}
来自 collectd 的 aaron 的更新
在 Logstash 1.3.x 中,我们引入了 collectd 输入插件。这太棒了!我们可以在 Logstash 中处理指标,将它们存储在 Elasticsearch 中,并使用 Kibana 查看它们。唯一的缺点是您每秒只能通过插件获得大约 3100 个事件。在 Logstash 1.4.0 中,我们引入了一个新改进的 UDP 输入插件,它是多线程的,有一个队列。我将 collectd 输入插件重构为编解码器(在同事和社区的帮助下),以利用这一巨大的性能提升。现在,在我的双核 Macbook Air 上只有 3 个线程,我可以通过 collectd 编解码器每秒获得超过 45,000 个事件!
所以,我想提供一些简单的示例,您可以使用它们来更改插件配置以使用编解码器。
旧方法:
input { collectd {} }
新方法:
input { udp {
port => 25826 # Must be specified. 25826 is the default for collectd
buffer_size => 1452 # Should be specified. 1452 is the default for recent versions of collectd
codec => collectd { } # This will invoke the default options for the codec
type => "collectd" } } This new configuration will use 2 threads and a queue size of 2000 by default for the UDP input plugin. With
this you should easily be able to break 30,000 events per second!
我提供了一些其他配置示例的要点。有关更多信息,请查看 collectd 编解码器的 Logstash 文档。
祝您 Logstash 愉快!