Reference:
Step1
找出Fortigate傳進ELK的完整log。將message複製出來。
Step2
測試Grok pattern。
Example:
%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{HOSTNAME:devname} devid=%{HOSTNAME:devid} logid=%{NUMBER:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} %{GREEDYDATA:fortigate}
格式是 %{pattern:field}
紅色的部分是Logstash裝好之後預設的pattern,可參考Grok Patterns。
%{GREEDYDATA:fortigate}是把上下的部分都放到fortigate欄位。
拆出來的欄位在Kibana可以方便查詢。
因為Fortigate log的格式並不是統一格式,上面的Example只能做粗略的拆解,
如果想要拆出source IP, destination IP,就必須把grok pattern寫更細。
Step3
更新Logstash的config file。
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => ["https://elk01.domain.com:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => ["https://elk01.domain.com:9200"]
index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
}
}
filter {
grok {
match => [message, "%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{DATA:devname} devid=%{DATA:devid} logid=%{DATA:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} srcip=%{IP:srcip} srcport=%{DATA:srcport} srcintf=\"%{DATA:srcintf}\" dstip=%{IP:dstip} dstport=%{DATA:dstport} dstintf=\"%{DATA:dstintf}\" sessionid=%{DATA:sessionid} proto=%{INT:proto} action=%{DATA:action} policyid=%{DATA:policyid} policytype=%{DATA:policytype} dstcountry=\"%{DATA:dstcountry}\" srccountry=\"%{DATA:srccountry}\" trandisp=%{DATA:trandisp} service=\"%{DATA:servicename}\" app=\"%{DATA:app}\" duration=%{INT:duration} sentbyte=%{INT:sentbyte} rcvdbyte=%{INT:rcvdbyte} sentpkt=%{INT:sentbyte} appcat=%{QS:appcat} crscore=%{INT:crscore} craction=%{INT:craction} crlevel=%{DATA:crlevel}"]
match => [message, "%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{DATA:devname} devid=%{DATA:devid} logid=%{DATA:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} logdesc=\"%{DATA:logdesc}\" interface=\"%{DATA:intf}\" dhcp_msg=\"%{DATA:dhcp_msg}\" mac=%{DATA:macaddr} ip=%{IP:ipaddr} lease=%{DATA:lease} hostname=\"%{DATA:hostname1}\" msg=\"%{DATA:msg}\""]
match => [message, "%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{HOSTNAME:devname} devid=%{HOSTNAME:devid} logid=%{NUMBER:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} %{GREEDYDATA:fortigate}"]
}
}
grok {
match => [message, "%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{DATA:devname} devid=%{DATA:devid} logid=%{DATA:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} srcip=%{IP:srcip} srcport=%{DATA:srcport} srcintf=\"%{DATA:srcintf}\" dstip=%{IP:dstip} dstport=%{DATA:dstport} dstintf=\"%{DATA:dstintf}\" sessionid=%{DATA:sessionid} proto=%{INT:proto} action=%{DATA:action} policyid=%{DATA:policyid} policytype=%{DATA:policytype} dstcountry=\"%{DATA:dstcountry}\" srccountry=\"%{DATA:srccountry}\" trandisp=%{DATA:trandisp} service=\"%{DATA:servicename}\" app=\"%{DATA:app}\" duration=%{INT:duration} sentbyte=%{INT:sentbyte} rcvdbyte=%{INT:rcvdbyte} sentpkt=%{INT:sentbyte} appcat=%{QS:appcat} crscore=%{INT:crscore} craction=%{INT:craction} crlevel=%{DATA:crlevel}"]
match => [message, "%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{DATA:devname} devid=%{DATA:devid} logid=%{DATA:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} logdesc=\"%{DATA:logdesc}\" interface=\"%{DATA:intf}\" dhcp_msg=\"%{DATA:dhcp_msg}\" mac=%{DATA:macaddr} ip=%{IP:ipaddr} lease=%{DATA:lease} hostname=\"%{DATA:hostname1}\" msg=\"%{DATA:msg}\""]
match => [message, "%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{HOSTNAME:devname} devid=%{HOSTNAME:devid} logid=%{NUMBER:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} %{GREEDYDATA:fortigate}"]
}
}
Step4
重新啟動logstash service,指令 systemctl restart logstash
用tail -f /var/log/message | grep logstash 查看是否有錯誤訊息。
Ex: field 名稱重複、grok語法錯誤
2022/7/26 update:
如果要丟掉某些 log,可以用 drop。
Ex:
filter {
grok {
match => [message, .....<略>
grok {
match => [message, .....<略>
}
if [type] =~ /traffic/ {
drop { }
drop { }
Reference:
沒有留言:
張貼留言