我想使用logstash收集日志文件,文件的格式如下:
type=USER_START msg=audit(1404170401.294:157): user pid=29228 uid=0 auid=0 ses=7972 subj=system_u:system_r:crond_t:s0-s0:c0.c1023 msg='op=PAM:session_open acct="root" exe="/usr/sbin/crond" hostname=? addr=? terminal=cron res=success'
我应该使用哪个过滤器来匹配线路?或者有另一种方法来处理它。
任何帮助,将不胜感激。
使用下面的模式来匹配grok 调试器,但仍然收到一条No matches
消息。
type=%{WORD:audit_type} msg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\): user pid=%{NUMBER:audit_pid} uid=%{NUMBER:audit_uid} auid=%{NUMBER:audit_audid} subj=%{WORD:audit_subject} msg=%{GREEDYDATA:audit_message}
但是当我删除时subj=%{WORD:audit_subject} msg=%{GREEDYDATA:audit_message}
,它成功并得到了像这样的 JSON 对象。
{
"audit_type": [
[
"USER_END"
]
],
"audit_epoch": [
[
"1404175981.491"
]
],
"BASE10NUM": [
[
"1404175981.491",
"524",
"1465",
"0",
"0"
]
],
"audit_counter": [
[
"524"
]
],
"audit_pid": [
[
"1465"
]
],
"audit_uid": [
[
"0"
]
],
"audit_audid": [
[
"0"
]
]
}
不知道为什么subj
,msg
无法继续。
答案1
快速搜索发现这在 github 上
AUDIT type=%{WORD:audit_type} msg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\): user pid=%{NUMBER:audit_pid} uid=%{NUMBER:audit_uid} auid=%{NUMBER:audit_audid} subj=%{WORD:audit_subject} msg=%{GREEDYDATA:audit_message}
AUDITLOGIN type=%{WORD:audit_type} msg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\): login pid=%{NUMBER:audit_pid} uid=%{NUMBER:audit_uid} old auid=%{NUMBER:old_auid} new auid=%{NUMBER:new_auid} old ses=%{NUMBER:old_ses} new ses=%{NUMBER:new_ses}
粗略的审查表明这可能就是您正在寻找的东西。
答案2
审计日志以一系列键=值对的形式编写,可以使用 kv 过滤器轻松提取。但我注意到,键msg
有时会使用两次,并且也是一系列键=值对。
首先使用 grok 获取字段audit_type
、audit_epoch
和audit_counter
(sub_msg
第二个 msg 字段)
grok {
pattern => [ "type=%{DATA:audit_type}\smsg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\):.*?( msg=\'(?<sub_msg>.*?)\')?$" ]
named_captures_only => true
}
kv 用于提取除 msg 和 type 之外的所有 key=value 对,因为我们已经使用 grok 获得了该数据:
kv {
exclude_keys => [ "msg", "type" ]
}
再次使用 kv 来解析 sub_msg 中的 key=value 对(如果存在):
kv {
source => "sub_msg"
}
date 用于将日期设置为audit_epoch中的值,使用日期格式UNIX
将解析浮点数或整数时间戳:
date {
match => [ "audit_epoch", "UNIX" ]
}
最后使用 mutate 删除冗余字段:
mutate {
remove_field => ['sub_msg', 'audit_epoch']
}
您还可以重命名字段,如 sysadmin1138 建议的那样:
mutate {
rename => [
"auid", "uid_audit",
"fsuid", "uid_fs",
"suid", "uid_set",
"ses", "session_id"
]
remove_field => ['sub_msg', 'audit_epoch']
}
所有组合的过滤器如下所示:
filter {
grok {
pattern => [ "type=%{DATA:audit_type}\smsg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\):.*?( msg=\'(?<sub_msg>.*?)\')?$" ]
named_captures_only => true
}
kv {
exclude_keys => [ "msg", "type" ]
}
kv {
source => "sub_msg"
}
date {
match => [ "audit_epoch", "UNIX" ]
}
mutate {
rename => [
"auid", "uid_audit",
"fsuid", "uid_fs",
"suid", "uid_set",
"ses", "session_id"
]
remove_field => ['sub_msg', 'audit_epoch']
}
}
答案3
比 grok 更好的解决方案可能是使用千伏过滤器。这将解析以“key=value”格式配置的字段,大多数审计日志条目都是这种格式。与 Grok 不同,这将处理有时有有时没有字段的字符串。但是,字段名称是不太有用的缩写形式,因此您可能需要进行一些字段重命名。
filter {
kv { }
}
这样您就可以获得大部分信息,并且字段将与日志中显示的内容相匹配。所有数据类型都是string
。为了不遗余力地使字段人性化:
filter {
kv { }
mutate {
rename => {
"type" => "audit_type"
"auid" => "uid_audit"
"fsuid => "uid_fs"
"suid" => "uid_set"
"ses" => "session_id"
}
}
}
msg
不过,包含时间戳和事件 ID 的字段仍需要进行理解。其他答案显示了如何执行此操作。
filter {
kv { }
grok {
match => { "msg" => "audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\):"
}
mutate {
rename => {
"type" => "audit_type"
"auid" => "uid_audit"
"fsuid => "uid_fs"
"suid" => "uid_set"
"ses" => "session_id"
}
}
}
答案4
grok 的格式已经改变,请看一下这个:
filter {
grok {
# example: type=CRED_DISP msg=audit(1431084081.914:298): pid=1807 uid=0 auid=1000 ses=7 msg='op=PAM:setcred acct="user1" exe="/usr/sbin/sshd" hostname=host1 addr=192.168.160.1 terminal=ssh res=success'
match => { "message" => "type=%{WORD:audit_type} msg=audit\(%{NUMBER:audit_epoch}:%{NUMBER:audit_counter}\): pid=%{NUMBER:audit_pid} uid=%{NUMBER:audit_uid} auid=%{NUMBER:audit_audid} ses=%{NUMBER:ses} msg=\'op=%{WORD:operation}:%{WORD:detail_operation} acct=\"%{WORD:acct_user}\" exe=\"%{GREEDYDATA:exec}\" hostname=%{GREEDYDATA:hostname} addr=%{GREEDYDATA:ipaddr} terminal=%{WORD:terminal} res=%{WORD:result}\'" }
}
date {
match => [ "audit_epoch", "UNIX_MS" ]
}
}
这将使用 audit_epoch 中的日期作为@datetime。