Logstash tcp 输入未传递给 elasticsearch

Logstash tcp 输入未传递给 elasticsearch

成功设置 ELK 的文件输入、logstash-forwarder 并查看 Kibana 中来自几个服务器的日志流后,我尝试设置 TCP 输入:

tcp {
    codec => "json"
    host => "localhost"
    port => 9250
    tags => ["sensu"]
  }

发件人是 sensu,消息确实是 JSON - 使用 tcpdump 命令检查了这一点。

Logstash 日志表明连接已被接受:

{:timestamp=>"2015-06-15T14:03:39.832000+1000", :message=>"Accepted connection", :client=>"127.0.0.1:38065", :server=>"localhost:9250", :level=>:debug, :file=>"logstash/inputs/tcp.rb", :line=>"146", :method=>"client_thread"}
{:timestamp=>"2015-06-15T14:03:39.962000+1000", :message=>"config LogStash::Codecs::JSONLines/@charset = \"UTF-8\"", :level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}
{:timestamp=>"2015-06-15T14:03:39.963000+1000", :message=>"config LogStash::Codecs::Line/@charset = \"UTF-8\"", :level=>:debug, :file=>"logstash/config/mixin.rb", :line=>"112", :method=>"config_init"}

然而,数据似乎没有进一步发展,并且无法在 Kibana 中找到。

我甚至禁用了其他输入,然后观察 elasticsearch 中的分片(curl 'localhost:9200/_cat/shards'),其大小并没有增加。

根据此链接我走在正确的轨道上,但可能只是在某个地方做了一些愚蠢的事情......提前谢谢。

logstash.conf:

input {
  file {
    path => ["/var/log/messages", "/var/log/secure", "/var/log/iptables"]
    type => "syslog"
    start_position => "end"
  }

  lumberjack {
    port => 5043
    type => "logs"
    ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
    ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
  }

  tcp {
    codec => "json"
    host => "localhost"
    port => 9250
    tags => ["sensu"]
  }

}

output {
  elasticsearch {
    host => "localhost"
    cluster => "webCluster"
  }
}

elasticsearch.yml:

cluster.name: webCluster
node.name: "bossNode"
node.master: true
node.data: true
index.number_of_shards: 1
index.number_of_replicas: 0
network.host: localhost

答案1

经过几天令人沮丧的日子后,我得出结论,json/json_lines 编解码器已损坏 - 可能仅在与 tcp 输入一起使用时才出现。

不过,我找到了一种解决方法,使用过滤器:

filter {
  if ("sensu" in [tags]) {
    json {
      "source" => "message"
    }
  }
}

这个,以及一些变化产生了我最初试图实现的效果。为了方便大家参考,下面是我的工作 logstash.conf,它结合了来自 sensu 的日志和 cpu/内存指标数据:

input {
  file {
    path => [
      "/var/log/messages"
      , "/var/log/secure"
    ]
    type => "syslog"
    start_position => "end"
  }

  file {
    path => "/var/log/iptables"
    type => "iptables"
    start_position => "end"
  }

  file {
    path => ["/var/log/httpd/access_log"
        ,"/var/log/httpd/ssl_access_log"
    ]
    type => "apache_access"
    start_position => "end"
  }

  file {
    path => [
      "/var/log/httpd/error_log"
      , "/var/log/httpd/ssl_error_log"
    ]
    type => "apache_error"
    start_position => "end"
  }

  lumberjack {
    port => 5043
    type => "logs"
    ssl_certificate => "/etc/pki/tls/certs/logstash-forwarder.crt"
    ssl_key => "/etc/pki/tls/private/logstash-forwarder.key"
  }

  tcp {
    host => "localhost"
    port => 9250
    mode => "server"
    tags => ["sensu"]
  }

}

filter {
  if ("sensu" in [tags]) {
    json {
      "source" => "message"
    }
    mutate {
      rename => { "[check][name]" => "type" }
      replace => { "host" => "%{[client][address]}" }
      split => { "[check][output]" => " " }
      add_field => { "output" => "%{[check][output][1]}" }
      remove_field => [ "[client]", "[check]", "occurrences" ]
    }
  } else if([type] == "apache_access") {
    grok {
      match => { "message" => "%{IP:client}" }
    }
  }
}

filter {
  mutate {
    convert => { "output" => "float" }
  }
}

output {
  elasticsearch {
    host => "localhost"
    cluster => "webCluser"
  }
}

与问题无关:"输出" 被接收为多个由空格分隔的值,因此需要进行 "拆分" 操作。使用第二个元素,然后将其转换为浮点数,以便 Kibana 可以很好地绘制它(这是我从痛苦中学到的东西)。

相关内容