我想要将 mysql-proxy lua 脚本中的日志推送到 lostash。示例日志可能是
[2015-03-09 11:13:47] USER:username IP:10.102.51.134:41420 DB:dbName Query: -- One Pager Trends
-- params:
SELECT
date,
SUM(t.rev) revenue,
SUM(t.rev - t.cost) profit
FROM
am.s_d t
INNER JOIN am.event e
ON t.`event_id` = e.`event_id`
WHERE 1=1 AND DATE BETWEEN '2014-12-08' AND '2015-03-08'
AND t.source_id = 25
GROUP BY date
[2015-03-09 11:17:28] USER:mzupan IP:10.102.22.216:49843 DB: Query: show databases
新的日志条目总是以[
因此,我使用 logstash-forwarder 发送日志并进行如下处理
filter {
if [type] == "mysql-proxy" {
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601}\] USER:%{WORD:user} IP:%{IP:ip}:%{INT} DB:%{DATA:db} Query: (?<query>(.|\r|\n)*)" }
}
multiline {
pattern => "^\["
what => "previous"
negate=> true
}
date {
match => [ "timestamp", "yyyy-MM-dd HH:mm:ss" ]
}
}
}
我的问题是在 kibana 中,我看到类似以下 json 的查询
{
"_index": "logstash-2015.03.09",
"_type": "mysql-proxy",
"_id": "AUv_vj3u0BuDzneUoKKc",
"_score": null,
"_source": {
"message": "[2015-03-09 11:13:47] USER:username IP:10.102.51.134:41420 DB:dbName Query: -- One Pager Trends \n-- params:\n\nSELECT \n date,\n SUM(t.rev) revenue,\n SUM(t.rev - t.cost) profit \nFROM\n am.s_d t\n INNER JOIN am.event e \n ON t.`event_id` = e.`event_id`\nWHERE 1=1 AND DATE BETWEEN '2014-12-08' AND '2015-03-08'\n AND t.source_id = 25\nGROUP BY date",
"@version": "1",
"@timestamp": "2015-03-09T18:13:52.287Z",
"type": "mysql-proxy",
"file": "/var/log/mysql-queries.log",
"host": "an01.domain.com",
"offset": [
"11855847",
"11855943",
"11855954",
"11855955",
"11855963",
"11855971",
"11855993",
"11856023",
"11856028",
"11856039",
"11856064",
"11856099",
"11856156",
"11856179",
"11856193",
"11856194"
],
"user": "username",
"ip": "10.102.51.134",
"db": "dbname",
"query": "-- One Pager Trends ",
"tags": [
"_grokparsefailure",
"multiline"
]
},
"fields": {
"@timestamp": [
1425924832287
]
},
"sort": [
1425924832287
]
}
尽管 logstash 似乎正确设置了消息,但我只看到第一部分。
答案1
过滤器中的多行应该放在匹配部分之前。尝试像这样配置它:
筛选 { 如果 [类型] == “mysql-proxy” { 多行 { 模式 => “^\[” 什么 => “上一个” 否定 => 真 } 格罗克 { 匹配 => { “消息” => “\[%{TIMESTAMP_ISO8601}\] USER:%{WORD:user} IP:%{IP:ip}:%{INT} DB:%{DATA:db} 查询:(?(.|\r|\n)*)” } } 日期 { 匹配 => [ “时间戳”, “yyyy-MM-dd HH:mm:ss” ] } }
这对我来说适用于 logstash v1.4.2。