我正在使用客户端上的另一个 logstash 作为发送器将一些日志发送到 logstash 中央服务器。输入类型为“文件”。服务器上可以正常接收消息,但它没有反映客户端的 IP 地址。它在字段“@source_host”中发送主机名。我可以做些什么来将 IP 作为字段?也许是一个过滤器?
客户端配置:
input {
file {
format => "plain"
path => "/var/log/app/test1.txt"
type => "start"
}
}
output {
redis {
host => "test.example.com"
data_type => "list"
key => "logstash"
}
}
答案1
您可以使用“dns”过滤器进行反向查找,然后使用它来设置字段。 http://logstash.net/docs/1.2.2/filters/dns
答案2
如果您想要的客户端 IP 是静态的,那么我建议您使用以下内容替换内容 @source_hostmutate
筛选
例如:
filter {
mutate {
replace => ["@source_host","xx.xx.xx.xx"]
}
}
如果您只是想要字段中的 IP(而不是 @source_host),您可以将其添加到输入中:
input {
file {
format => "plain"
path => "/var/log/app/test1.txt"
type => "start"
add_field => ['source_ip','xx.xx.xx.xx']
}
}
否则,如果您确实需要解析非静态客户端主机名,那么@丹·加思韦特的答案是正确的。
答案3
这也许是一个很好的例子,可以让你入门,这是发送到 elasticsearch 的 id 的 logstash 配置https://github.com/StamusNetworks/SELKS/blob/master/docker/containers-data/logstash/conf.d/logstash.conf
input {
file {
path => ["/var/log/suricata/*.json"]
#sincedb_path => ["/var/lib/logstash/"]
sincedb_path => ["/usr/share/logstash/since.db"]
codec => json
type => "SELKS"
}
}
filter {
if [type] == "SELKS" {
date {
match => [ "timestamp", "ISO8601" ]
}
ruby {
code => "
if event.get('[event_type]') == 'fileinfo'
event.set('[fileinfo][type]', event.get('[fileinfo][magic]').to_s.split(',')[0])
end
"
}
ruby {
code => "
if event.get('[event_type]') == 'alert'
sp = event.get('[alert][signature]').to_s.split(' group ')
if (sp.length == 2) and /\A\d+\z/.match(sp[1])
event.set('[alert][signature]', sp[0])
end
end
"
}
metrics {
meter => [ "eve_insert" ]
add_tag => "metric"
flush_interval => 30
}
}
if [http] {
useragent {
source => "[http][http_user_agent]"
target => "[http][user_agent]"
}
}
if [src_ip] {
geoip {
source => "src_ip"
target => "geoip"
#database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
#add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
#add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
}
if [dest_ip] {
geoip {
source => "dest_ip"
target => "geoip"
#database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat"
#add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
#add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
}
}
output {
if [event_type] and [event_type] != 'stats' {
elasticsearch {
hosts => "elasticsearch"
index => "logstash-%{event_type}-%{+YYYY.MM.dd}"
template_overwrite => true
template => "/usr/share/logstash/config/elasticsearch7-template.json"
}
} else {
elasticsearch {
hosts => "elasticsearch"
index => "logstash-%{+YYYY.MM.dd}"
template_overwrite => true
template => "/usr/share/logstash/config/elasticsearch7-template.json"
}
}
}