如何使用输入作为“文件”将 IP 地址发送到 logstash?

如何使用输入作为“文件”将 IP 地址发送到 logstash?

我正在使用客户端上的另一个 logstash 作为发送器将一些日志发送到 logstash 中央服务器。输入类型为“文件”。服务器上可以正常接收消息,但它没有反映客户端的 IP 地址。它在字段“@source_host”中发送主机名。我可以做些什么来将 IP 作为字段?也许是一个过滤器?

客户端配置:

input {
  file {
    format => "plain"
    path => "/var/log/app/test1.txt"
    type => "start"
  }
}

output {
  redis {
    host => "test.example.com"
    data_type => "list"
    key => "logstash"
  }
}

答案1

您可以使用“dns”过滤器进行反向查找,然后使用它来设置字段。 http://logstash.net/docs/1.2.2/filters/dns

答案2

如果您想要的客户端 IP 是静态的,那么我建议您使用以下内容替换内容 @source_hostmutate筛选

例如:

filter {
  mutate {
    replace => ["@source_host","xx.xx.xx.xx"]
  }
}

如果您只是想要字段中的 IP(而不是 @source_host),您可以将其添加到输入中:

input {
  file {
    format => "plain"
    path => "/var/log/app/test1.txt"
    type => "start"
    add_field => ['source_ip','xx.xx.xx.xx']
  }
}

否则,如果您确实需要解析非静态客户端主机名,那么@丹·加思韦特的答案是正确的。

答案3

这也许是一个很好的例子,可以让你入门,这是发送到 elasticsearch 的 id 的 logstash 配置https://github.com/StamusNetworks/SELKS/blob/master/docker/containers-data/logstash/conf.d/logstash.conf

input {
  file { 
    path => ["/var/log/suricata/*.json"]
    #sincedb_path => ["/var/lib/logstash/"]
    sincedb_path => ["/usr/share/logstash/since.db"]
    codec =>   json 
    type => "SELKS" 
  }

}

filter {
  if [type] == "SELKS" {
    
    date {
      match => [ "timestamp", "ISO8601" ]
    }
    
    ruby {
      code => "
        if event.get('[event_type]') == 'fileinfo'
          event.set('[fileinfo][type]', event.get('[fileinfo][magic]').to_s.split(',')[0])
        end
      "
    }
    ruby {
      code => "
        if event.get('[event_type]') == 'alert'
          sp = event.get('[alert][signature]').to_s.split(' group ')
          if (sp.length == 2) and /\A\d+\z/.match(sp[1])
            event.set('[alert][signature]', sp[0])
          end
        end
      "
     }
  
    metrics {
      meter => [ "eve_insert" ]
      add_tag => "metric"
      flush_interval => 30
    }
  }

  if [http] {
    useragent {
       source => "[http][http_user_agent]"
       target => "[http][user_agent]"
    }
  }
  if [src_ip]  {
    geoip {
      source => "src_ip" 
      target => "geoip" 
      #database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat" 
      #add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      #add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
    }
  }
    if [dest_ip]  {
    geoip {
      source => "dest_ip" 
      target => "geoip" 
      #database => "/opt/logstash/vendor/geoip/GeoLiteCity.dat" 
      #add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
      #add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
    }
  }
}

output {
  if [event_type] and [event_type] != 'stats' {
    elasticsearch {
      hosts => "elasticsearch"
      index => "logstash-%{event_type}-%{+YYYY.MM.dd}"
      template_overwrite => true
      template => "/usr/share/logstash/config/elasticsearch7-template.json"
    }
  } else {
    elasticsearch {
      hosts => "elasticsearch"
      index => "logstash-%{+YYYY.MM.dd}"
      template_overwrite => true
      template => "/usr/share/logstash/config/elasticsearch7-template.json"
    }
  }
}

相关内容