logstash 多行日志用于 mysql 查询

logstash 多行日志用于 mysql 查询

我想要将 mysql-proxy lua 脚本中的日志推送到 lostash。示例日志可能是

[2015-03-09 11:13:47] USER:username IP:10.102.51.134:41420 DB:dbName Query: -- One Pager Trends 
-- params:

SELECT 
  date,
  SUM(t.rev) revenue,
  SUM(t.rev - t.cost) profit 
FROM
  am.s_d t
  INNER JOIN am.event e 
    ON t.`event_id` = e.`event_id`
WHERE 1=1 AND DATE BETWEEN '2014-12-08' AND '2015-03-08'
  AND t.source_id = 25
GROUP BY date
[2015-03-09 11:17:28] USER:mzupan IP:10.102.22.216:49843 DB: Query: show databases

新的日志条目总是以[

因此,我使用 logstash-forwarder 发送日志并进行如下处理

filter {

  if [type] == "mysql-proxy" {
    grok {
      match => { "message" => "\[%{TIMESTAMP_ISO8601}\] USER:%{WORD:user} IP:%{IP:ip}:%{INT} DB:%{DATA:db} Query: (?<query>(.|\r|\n)*)" }
    }
    multiline {
      pattern => "^\["
      what => "previous"
      negate=> true
    }
    date {
      match => [ "timestamp", "yyyy-MM-dd HH:mm:ss" ]
    }
  }
}

我的问题是在 kibana 中,我看到类似以下 json 的查询

{
  "_index": "logstash-2015.03.09",
  "_type": "mysql-proxy",
  "_id": "AUv_vj3u0BuDzneUoKKc",
  "_score": null,
  "_source": {
    "message": "[2015-03-09 11:13:47] USER:username IP:10.102.51.134:41420 DB:dbName Query: -- One Pager Trends \n-- params:\n\nSELECT \n  date,\n  SUM(t.rev) revenue,\n  SUM(t.rev - t.cost) profit \nFROM\n  am.s_d t\n  INNER JOIN am.event e \n    ON t.`event_id` = e.`event_id`\nWHERE 1=1 AND DATE BETWEEN '2014-12-08' AND '2015-03-08'\n  AND t.source_id = 25\nGROUP BY date",
    "@version": "1",
    "@timestamp": "2015-03-09T18:13:52.287Z",
    "type": "mysql-proxy",
    "file": "/var/log/mysql-queries.log",
    "host": "an01.domain.com",
    "offset": [
      "11855847",
      "11855943",
      "11855954",
      "11855955",
      "11855963",
      "11855971",
      "11855993",
      "11856023",
      "11856028",
      "11856039",
      "11856064",
      "11856099",
      "11856156",
      "11856179",
      "11856193",
      "11856194"
    ],
    "user": "username",
    "ip": "10.102.51.134",
    "db": "dbname",
    "query": "-- One Pager Trends ",
    "tags": [
      "_grokparsefailure",
      "multiline"
    ]
  },
  "fields": {
    "@timestamp": [
      1425924832287
    ]
  },
  "sort": [
    1425924832287
  ]
}

尽管 logstash 似乎正确设置了消息,但我只看到第一部分。

答案1

过滤器中的多行应该放在匹配部分之前。尝试像这样配置它:

筛选 {
  如果 [类型] == “mysql-proxy” {
    多行 {
      模式 => “^\[”
      什么 => “上一个”
      否定 => 真
    }
    格罗克 {
      匹配 => { “消息” => “\[%{TIMESTAMP_ISO8601}\] USER:%{WORD:user} IP:%{IP:ip}:%{INT} DB:%{DATA:db} 查询:(?(.|\r|\n)*)” }
    }
    日期 {
      匹配 => [ “时间戳”, “yyyy-MM-dd HH:mm:ss” ]
    }
  }

这对我来说适用于 logstash v1.4.2。

相关内容