2021年5月7日 星期五

ELK logstash grok for Fortigate

Reference:

Step1
找出Fortigate傳進ELK的完整log。將message複製出來。
Step2
測試Grok pattern。

Example:
%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{HOSTNAME:devname} devid=%{HOSTNAME:devid} logid=%{NUMBER:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} %{GREEDYDATA:fortigate}

格式是 %{pattern:field}
紅色的部分是Logstash裝好之後預設的pattern,可參考Grok Patterns
%{GREEDYDATA:fortigate}是把上下的部分都放到fortigate欄位。

拆出來的欄位在Kibana可以方便查詢。
因為Fortigate log的格式並不是統一格式,上面的Example只能做粗略的拆解,
如果想要拆出source IP, destination IP,就必須把grok pattern寫更細。

Step3
更新Logstash的config file。

input {
  beats {
    port => 5044
  }
}

output {
  elasticsearch {
    hosts => ["https://elk01.domain.com:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
}

filter {
  grok {
    match => [message, "%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{DATA:devname} devid=%{DATA:devid} logid=%{DATA:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} srcip=%{IP:srcip} srcport=%{DATA:srcport} srcintf=\"%{DATA:srcintf}\" dstip=%{IP:dstip} dstport=%{DATA:dstport} dstintf=\"%{DATA:dstintf}\" sessionid=%{DATA:sessionid} proto=%{INT:proto} action=%{DATA:action} policyid=%{DATA:policyid} policytype=%{DATA:policytype} dstcountry=\"%{DATA:dstcountry}\" srccountry=\"%{DATA:srccountry}\" trandisp=%{DATA:trandisp} service=\"%{DATA:servicename}\" app=\"%{DATA:app}\" duration=%{INT:duration} sentbyte=%{INT:sentbyte} rcvdbyte=%{INT:rcvdbyte} sentpkt=%{INT:sentbyte} appcat=%{QS:appcat} crscore=%{INT:crscore} craction=%{INT:craction} crlevel=%{DATA:crlevel}"]
    match => [message, "%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{DATA:devname} devid=%{DATA:devid} logid=%{DATA:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} logdesc=\"%{DATA:logdesc}\" interface=\"%{DATA:intf}\" dhcp_msg=\"%{DATA:dhcp_msg}\" mac=%{DATA:macaddr} ip=%{IP:ipaddr} lease=%{DATA:lease} hostname=\"%{DATA:hostname1}\" msg=\"%{DATA:msg}\""]
    match => [message, "%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{HOSTNAME:devname} devid=%{HOSTNAME:devid} logid=%{NUMBER:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} %{GREEDYDATA:fortigate}"]
  }
}

Step4
重新啟動logstash service,指令 systemctl restart logstash
用tail -f /var/log/message | grep logstash 查看是否有錯誤訊息。
Ex: field 名稱重複、grok語法錯誤

2022/7/26 update:
如果要丟掉某些 log,可以用 drop。
Ex:
filter {
  grok {
    match => [message,  .....<略>
  }
  if [type] =~ /traffic/ {
    drop { }
 
Reference:

沒有留言:

張貼留言