2021年5月7日 星期五

ELK logstash grok for Fortigate

Reference:

Step1
找出Fortigate傳進ELK的完整log。將message複製出來。
Step2
測試Grok pattern。

Example:
%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{HOSTNAME:devname} devid=%{HOSTNAME:devid} logid=%{NUMBER:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} %{GREEDYDATA:fortigate}

格式是 %{pattern:field}
紅色的部分是Logstash裝好之後預設的pattern,可參考Grok Patterns
%{GREEDYDATA:fortigate}是把上下的部分都放到fortigate欄位。

拆出來的欄位在Kibana可以方便查詢。
因為Fortigate log的格式並不是統一格式,上面的Example只能做粗略的拆解,
如果想要拆出source IP, destination IP,就必須把grok pattern寫更細。

Step3
更新Logstash的config file。

input {
  beats {
    port => 5044
  }
}

output {
  elasticsearch {
    hosts => ["https://elk01.domain.com:9200"]
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
}

filter {
  grok {
    match => [message, "%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{DATA:devname} devid=%{DATA:devid} logid=%{DATA:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} srcip=%{IP:srcip} srcport=%{DATA:srcport} srcintf=\"%{DATA:srcintf}\" dstip=%{IP:dstip} dstport=%{DATA:dstport} dstintf=\"%{DATA:dstintf}\" sessionid=%{DATA:sessionid} proto=%{INT:proto} action=%{DATA:action} policyid=%{DATA:policyid} policytype=%{DATA:policytype} dstcountry=\"%{DATA:dstcountry}\" srccountry=\"%{DATA:srccountry}\" trandisp=%{DATA:trandisp} service=\"%{DATA:servicename}\" app=\"%{DATA:app}\" duration=%{INT:duration} sentbyte=%{INT:sentbyte} rcvdbyte=%{INT:rcvdbyte} sentpkt=%{INT:sentbyte} appcat=%{QS:appcat} crscore=%{INT:crscore} craction=%{INT:craction} crlevel=%{DATA:crlevel}"]
    match => [message, "%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{DATA:devname} devid=%{DATA:devid} logid=%{DATA:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} logdesc=\"%{DATA:logdesc}\" interface=\"%{DATA:intf}\" dhcp_msg=\"%{DATA:dhcp_msg}\" mac=%{DATA:macaddr} ip=%{IP:ipaddr} lease=%{DATA:lease} hostname=\"%{DATA:hostname1}\" msg=\"%{DATA:msg}\""]
    match => [message, "%{SYSLOGTIMESTAMP:systime} %{DATA:gw} date=%{DATA:date} time=%{TIME:time} devname=%{HOSTNAME:devname} devid=%{HOSTNAME:devid} logid=%{NUMBER:logid} type=%{DATA:type} subtype=%{DATA:subtype} level=%{DATA:level} vd=%{DATA:vd} %{GREEDYDATA:fortigate}"]
  }
}

Step4
重新啟動logstash service,指令 systemctl restart logstash
用tail -f /var/log/message | grep logstash 查看是否有錯誤訊息。
Ex: field 名稱重複、grok語法錯誤

2022/7/26 update:
如果要丟掉某些 log,可以用 drop。
Ex:
filter {
  grok {
    match => [message,  .....<略>
  }
  if [type] =~ /traffic/ {
    drop { }
 
2026/3/3 update:
另一個方法是在Kibana設定pipeline
Ex: "message": "Feb 20 21:54:15 192.168.17.2 DrayTek: SSLTunnel (VPN-8, chdu) ==> Protocol:CHAP(c223) Success Identifier:0x01 S=E6B3C37B7FFCED42288F2EE069549DB1634F6CB2 M=Welcome to Vigor2925 Series. ##"

在Dev Tools執行
PUT _ingest/pipeline/vigor_vpn_pipeline
{
  "description": "解析 Vigor VPN 使用者名稱",
  "processors": [
    {
      "grok": {
        "field": "message",
        "patterns": [
          "DrayTek:.*\\(VPN-\\d+,\\s+%{DATA:username}\\)"
        ],
        "description": "抓取 (VPN-X, username) 括號中的第二個參數"
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "error.message",
        "value": "Vigor 解析失敗: {{ _ingest.on_failure_message }}"
      }
    }
  ]
 
這裡可以管理pipeline 
 
 
Reference:

沒有留言:

張貼留言