2023年11月21日 星期二

Query Elasticsearch via Python

Reference:
 
安裝:
RockyLinux 9.2 minimal
yum update python
python -m ensurepip
python -m pip install elasticsearch==7.x.x (安裝和 ELK 相同版本的python package)
copy cert to /root/cert/ca.crt (複製自建的根憑證)
 
Python script:
#!/usr/bin/python
#connect to ElasticSearch
from elasticsearch import Elasticsearch
# Password for the 'elastic' user generated by Elasticsearch
ELASTIC_PASSWORD = "mypassword"

from ssl import create_default_context
context = create_default_context(cafile="/root/cert/ca.crt")

# Create the client instance
es = Elasticsearch(
        'elk01.domain.com',
        http_auth=('elastic', ELASTIC_PASSWORD),
        scheme="https",
        port=9200,
        ssl_context=context,
)
 
#查詢 index_name, 查詢條件 body 可先在Elasticsearch Dev Tools先驗證
 
body = {
  "query": {
    "bool": {
      "must": [
        {
          "match": {
            "event.code" : "6272"
          }
        },
        {
          "range": {
            "@timestamp" : {
              "gt" : "now-11m/m"
            }
          }
        }
      ]
    }
  },
  "_source": ["@timestamp", "winlog.event_data.SubjectUserName"]

}
 
res = es.search(index="vpn*", body=body)
for hit in res['hits']['hits']:
    time_stamp = hit["_source"]['@timestamp']
    user_name = hit["_source"]['winlog']['event_data']['SubjectUserName']
    print(time_stamp , user_name)
 
以下為 python 的雜記:
‧將時間調為時區+8
from datetime import datetime, timedelta
GMT8_time = (datetime.fromisoformat(time_stamp[:-1]) + timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')

‧從檔案讀取 DSL 查詢條件
body_str = ""
f = open(filename, "r")
for x in f:
    body_str = body_str + x
import json
body = json.loads(body_str)

‧如果 user_name 有反斜線,取出反斜線後的帳號名稱
if user_name.find('\\') >= 0:
    tmp = user_name.split('\\')
    user_name = tmp[1]

空白、TAB對齊錯誤

 

沒有留言:

張貼留言