Difference between revisions of "Cyber Security: Python: Isolasi Outliar"
Onnowpurbo (talk | contribs) (Created page with "Bisa. Ini skrip Python '''read-only''' untuk menganalisis: /opt/wazuh-data/logs/archives/archives.json Tujuannya: membaca log Wazuh, membuat fitur sederhana, lalu memakai '...") |
Onnowpurbo (talk | contribs) |
||
| (One intermediate revision by the same user not shown) | |||
| Line 12: | Line 12: | ||
== 2. Buat virtual environment== | == 2. Buat virtual environment== | ||
| + | sudo apt install python3.14-venv | ||
python3 -m venv venv | python3 -m venv venv | ||
source venv/bin/activate | source venv/bin/activate | ||
| Line 481: | Line 482: | ||
== 4. Jalankan langsung ke file Wazuh== | == 4. Jalankan langsung ke file Wazuh== | ||
| + | |||
| + | sudo apt install -y acl | ||
| + | |||
| + | sudo setfacl -R -m u:onno:rx /opt/wazuh-data | ||
| + | sudo setfacl -R -m u:onno:rx /opt/wazuh-data/logs | ||
| + | sudo setfacl -R -m u:onno:rx /opt/wazuh-data/logs/archives | ||
| + | sudo setfacl -m u:onno:r /opt/wazuh-data/logs/archives/archives.json | ||
python3 wazuh_archive_outlier.py \ | python3 wazuh_archive_outlier.py \ | ||
Latest revision as of 04:21, 23 June 2026
Bisa. Ini skrip Python read-only untuk menganalisis:
/opt/wazuh-data/logs/archives/archives.json
Tujuannya: membaca log Wazuh, membuat fitur sederhana, lalu memakai Isolation Forest untuk menemukan event yang polanya tidak umum / outlier.
1. Buat folder kerja
mkdir -p ~/Apps/Wazuh-Outlier cd ~/Apps/Wazuh-Outlier
2. Buat virtual environment
sudo apt install python3.14-venv python3 -m venv venv source venv/bin/activate pip install pandas numpy scikit-learn
3. Buat script
nano wazuh_archive_outlier.py
Isi dengan script ini:
#!/usr/bin/env python3
"""
wazuh_archive_outlier.py
Analisa outlier dari Wazuh archives.json.
Output:
- outliers.csv
- outliers.jsonl
- sample_all_scored.csv
Script ini READ ONLY terhadap file Wazuh.
Tidak menghapus, tidak memindahkan, tidak mengubah archives.json.
"""
import argparse
import json
import os
import sys
import math
from pathlib import Path
from collections import Counter
from datetime import datetime
import numpy as np
import pandas as pd
from sklearn.feature_extraction import FeatureHasher
from sklearn.ensemble import IsolationForest
CAT_FIELDS = [
"agent.name", "agent.id", "manager.name", "location",
"decoder.name", "rule.id", "rule.description",
"data.srcip", "data.dstip", "data.srcport", "data.dstport",
"data.protocol", "data.action", "data.status", "data.url",
"srcip", "dstip", "srcport", "dstport", "protocol",
"program_name", "syscheck.path",
"win.system.eventID", "win.system.providerName",
]
NUM_FIELDS = [
"rule.level", "data.srcport", "data.dstport",
"srcport", "dstport", "win.system.eventID",
]
COMMON_DST_PORTS = {
22, 25, 53, 80, 110, 123, 143, 443, 465, 587,
993, 995, 1514, 1515, 9200, 5601
}
def read_last_lines(path: str, max_lines: int):
"""
Membaca hanya N baris terakhir dari file besar.
max_lines = 0 berarti baca seluruh file.
"""
if max_lines <= 0:
with open(path, "r", encoding="utf-8", errors="replace") as f:
return list(f)
block_size = 1024 * 1024
data = b""
with open(path, "rb") as f:
f.seek(0, os.SEEK_END)
pos = f.tell()
while pos > 0:
read_size = min(block_size, pos)
pos -= read_size
f.seek(pos)
data = f.read(read_size) + data
if data.count(b"\n") > max_lines:
break
lines = data.splitlines()[-max_lines:]
return [line.decode("utf-8", errors="replace") for line in lines]
def flatten_json(obj, parent="", out=None, depth=0, max_depth=4):
"""
Ubah JSON nested menjadi dot notation.
Contoh:
agent.name
rule.level
data.srcip
"""
if out is None:
out = {}
if depth > max_depth:
return out
if isinstance(obj, dict):
for k, v in obj.items():
key = f"{parent}.{k}" if parent else str(k)
flatten_json(v, key, out, depth + 1, max_depth)
elif isinstance(obj, list):
if all(not isinstance(x, (dict, list)) for x in obj):
out[parent] = ",".join(str(x) for x in obj[:20])
else:
out[parent] = f"list_len_{len(obj)}"
else:
out[parent] = obj
return out
def clean_value(v, max_len=140):
if v is None:
return ""
s = str(v).strip()
if not s or s.lower() in {"none", "null", "nan", "-"}:
return ""
return s[:max_len]
def pick(flat, *keys):
for k in keys:
v = clean_value(flat.get(k))
if v:
return v
return ""
def to_float(v):
try:
if v is None or v == "":
return None
return float(str(v).strip())
except Exception:
return None
def parse_timestamp(ts):
if not ts:
return None
try:
return datetime.fromisoformat(str(ts).replace("Z", "+00:00"))
except Exception:
return None
def build_rows_and_cats(events):
rows = []
cat_pairs_per_event = []
for i, event in enumerate(events):
flat = flatten_json(event)
ts = pick(flat, "timestamp")
dt = parse_timestamp(ts)
row = {
"idx": i,
"timestamp": ts,
"agent_name": pick(flat, "agent.name"),
"agent_id": pick(flat, "agent.id"),
"manager_name": pick(flat, "manager.name"),
"location": pick(flat, "location"),
"decoder_name": pick(flat, "decoder.name"),
"rule_id": pick(flat, "rule.id"),
"rule_level": pick(flat, "rule.level"),
"rule_description": pick(flat, "rule.description"),
"srcip": pick(flat, "data.srcip", "srcip"),
"dstip": pick(flat, "data.dstip", "dstip"),
"srcport": pick(flat, "data.srcport", "srcport"),
"dstport": pick(flat, "data.dstport", "dstport"),
"protocol": pick(flat, "data.protocol", "protocol"),
"full_log": pick(flat, "full_log", "message"),
}
full_log = row["full_log"]
row["full_log_len"] = len(full_log)
row["hour"] = dt.hour if dt else -1
row["day_of_week"] = dt.weekday() if dt else -1
cat_pairs = []
for field in CAT_FIELDS:
val = clean_value(flat.get(field))
if val:
cat_pairs.append((field, val))
if not cat_pairs and full_log:
cat_pairs.append(("full_log_prefix", full_log[:80]))
rows.append(row)
cat_pairs_per_event.append(cat_pairs)
return rows, cat_pairs_per_event
def build_features(rows, cat_pairs_per_event):
n = max(len(rows), 1)
pair_counts = Counter(
f"{k}={v}"
for pairs in cat_pairs_per_event
for k, v in pairs
)
feature_dicts = []
rare_pairs_per_event = []
for row, pairs in zip(rows, cat_pairs_per_event):
feats = {}
rare_parts = []
rare_scores = []
for k, v in pairs:
pair = f"{k}={v}"
feats[pair] = 1.0
freq = pair_counts[pair]
rarity = -math.log(max(freq / n, 1e-12))
rare_scores.append(rarity)
if freq <= 3:
rare_parts.append(pair)
for field in NUM_FIELDS:
val = to_float(row.get(field) or "")
if val is not None:
feats[f"num:{field}"] = math.log1p(abs(val))
if row["hour"] >= 0:
feats["num:hour"] = row["hour"] / 23.0
if row["day_of_week"] >= 0:
feats["num:day_of_week"] = row["day_of_week"] / 6.0
feats["num:full_log_len"] = math.log1p(row["full_log_len"])
feats["num:rare_score_avg"] = float(np.mean(rare_scores)) if rare_scores else 0.0
feats["num:rare_score_sum"] = float(np.sum(rare_scores)) if rare_scores else 0.0
feature_dicts.append(feats)
rare_pairs_per_event.append(", ".join(rare_parts[:5]))
return feature_dicts, rare_pairs_per_event
def explain_reason(row, rare_fields):
reasons = []
level = to_float(row.get("rule_level"))
if level is not None and level >= 10:
reasons.append(f"rule.level tinggi ({int(level)})")
dstport = to_float(row.get("dstport"))
if dstport is not None:
dstport_int = int(dstport)
if dstport_int not in COMMON_DST_PORTS:
reasons.append(f"dstport tidak umum ({dstport_int})")
if row.get("full_log_len", 0) > 500:
reasons.append("full_log panjang/tidak biasa")
if rare_fields:
reasons.append("kombinasi field jarang muncul")
if not reasons:
reasons.append("pola statistik berbeda dari mayoritas log")
return "; ".join(reasons)
def main():
parser = argparse.ArgumentParser(
description="Detect outliers from Wazuh archives.json JSONL file."
)
parser.add_argument(
"--input",
required=True,
help="Path ke archives.json"
)
parser.add_argument(
"--max-lines",
type=int,
default=100000,
help="Baca N baris terakhir. Pakai 0 untuk seluruh file."
)
parser.add_argument(
"--contamination",
type=float,
default=0.01,
help="Perkiraan rasio outlier. 0.01 berarti 1 persen."
)
parser.add_argument(
"--output-dir",
default="wazuh-outlier-results",
help="Folder output."
)
parser.add_argument(
"--hash-features",
type=int,
default=4096,
help="Jumlah fitur hashed untuk ML."
)
parser.add_argument(
"--top",
type=int,
default=30,
help="Tampilkan top N outlier di terminal."
)
args = parser.parse_args()
input_path = Path(args.input)
if not input_path.exists():
print(f"ERROR: File tidak ditemukan: {input_path}", file=sys.stderr)
sys.exit(1)
out_dir = Path(args.output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
print(f"[1/5] Reading: {input_path}")
lines = read_last_lines(str(input_path), args.max_lines)
events = []
bad_lines = 0
print("[2/5] Parsing JSON lines...")
for line in lines:
line = line.strip()
if not line:
continue
try:
events.append(json.loads(line))
except Exception:
bad_lines += 1
if len(events) < 50:
print(
f"ERROR: Hanya menemukan {len(events)} JSON event valid. Butuh data lebih banyak.",
file=sys.stderr
)
print(f"Bad lines skipped: {bad_lines}", file=sys.stderr)
sys.exit(2)
print(f"Valid events: {len(events)} | Bad lines skipped: {bad_lines}")
print("[3/5] Building features...")
rows, cat_pairs = build_rows_and_cats(events)
feature_dicts, rare_fields = build_features(rows, cat_pairs)
print("[4/5] Training IsolationForest...")
hasher = FeatureHasher(
n_features=args.hash_features,
input_type="dict",
alternate_sign=False
)
X = hasher.transform(feature_dicts)
model = IsolationForest(
n_estimators=200,
contamination=args.contamination,
random_state=42,
n_jobs=-1,
)
labels = model.fit_predict(X)
scores = model.score_samples(X)
df = pd.DataFrame(rows)
df["model_label"] = labels
df["model_score"] = scores
df["outlier_score"] = -scores
df["rare_fields"] = rare_fields
df["reason"] = [
explain_reason(row, rf)
for row, rf in zip(rows, rare_fields)
]
df = df.sort_values("outlier_score", ascending=False)
outliers = df[df["model_label"] == -1].copy()
display_cols = [
"timestamp", "agent_name", "location", "decoder_name",
"rule_level", "rule_id", "srcip", "dstip", "srcport", "dstport",
"protocol", "outlier_score", "reason", "rare_fields", "full_log",
]
existing_cols = [c for c in display_cols if c in df.columns]
all_csv = out_dir / "sample_all_scored.csv"
out_csv = out_dir / "outliers.csv"
out_jsonl = out_dir / "outliers.jsonl"
df[existing_cols].to_csv(all_csv, index=False)
outliers[existing_cols].to_csv(out_csv, index=False)
with open(out_jsonl, "w", encoding="utf-8") as f:
for _, row in outliers.iterrows():
event = events[int(row["idx"])]
event["_ml_outlier"] = {
"outlier_score": float(row["outlier_score"]),
"model_score": float(row["model_score"]),
"reason": row["reason"],
"rare_fields": row["rare_fields"],
}
f.write(json.dumps(event, ensure_ascii=False) + "\n")
print("[5/5] Done.")
print(f"All scored events : {all_csv}")
print(f"Outliers CSV : {out_csv}")
print(f"Outliers JSONL : {out_jsonl}")
print(f"Outliers found : {len(outliers)} from {len(df)} events")
print("\nTop outliers:")
preview_cols = [
"timestamp", "agent_name", "location", "decoder_name",
"rule_level", "srcip", "dstip", "dstport",
"outlier_score", "reason",
]
preview_cols = [c for c in preview_cols if c in outliers.columns]
print(
outliers[preview_cols]
.head(args.top)
.to_string(index=False)
)
if __name__ == "__main__":
main()
Simpan:
Ctrl+O Enter Ctrl+X
4. Jalankan langsung ke file Wazuh
sudo apt install -y acl sudo setfacl -R -m u:onno:rx /opt/wazuh-data sudo setfacl -R -m u:onno:rx /opt/wazuh-data/logs sudo setfacl -R -m u:onno:rx /opt/wazuh-data/logs/archives sudo setfacl -m u:onno:r /opt/wazuh-data/logs/archives/archives.json
python3 wazuh_archive_outlier.py \ --input /opt/wazuh-data/logs/archives/archives.json \ --max-lines 100000 \ --contamination 0.01 \ --output-dir hasil-outlier
Artinya:
--max-lines 100000
membaca 100.000 baris terakhir saja, supaya aman untuk file besar.
--contamination 0.01
berarti model menganggap kira-kira 1% event paling aneh sebagai outlier.
Kalau mau lebih sensitif:
python3 wazuh_archive_outlier.py \ --input /opt/wazuh-data/logs/archives/archives.json \ --max-lines 100000 \ --contamination 0.03 \ --output-dir hasil-outlier
Artinya outlier sekitar 3%.
5. Kalau permission denied
Karena file Wazuh biasanya milik root, pakai salah satu cara ini.
Cara aman: ambil copy 100.000 baris terakhir dulu.
sudo tail -n 100000 /opt/wazuh-data/logs/archives/archives.json > archives_sample.json sudo chown $USER:$USER archives_sample.json
Lalu analisis file copy:
python3 wazuh_archive_outlier.py \ --input archives_sample.json \ --max-lines 0 \ --contamination 0.01 \ --output-dir hasil-outlier
6. Lihat hasil
ls -lh hasil-outlier
Output penting:
hasil-outlier/outliers.csv hasil-outlier/outliers.jsonl hasil-outlier/sample_all_scored.csv
Buka 20 outlier teratas:
head -n 20 hasil-outlier/outliers.csv
Atau lebih enak:
column -s, -t < hasil-outlier/outliers.csv | less -S
7. Arti hasil
Kolom penting:
outlier_score
Semakin besar nilainya, semakin aneh event tersebut.
reason
Alasan kasar kenapa event dianggap outlier.
Contoh alasan:
rule.level tinggi dstport tidak umum full_log panjang/tidak biasa kombinasi field jarang muncul pola statistik berbeda dari mayoritas log
rare_fields
Field yang jarang muncul di dataset, misalnya agent tertentu, decoder tertentu, source IP tertentu, port tertentu, atau kombinasi log yang tidak biasa.
8. Kalau Wazuh Docker dan file ada di dalam container
Cek nama container:
sudo docker ps --format "table Template:.Names\tTemplate:.Image\tTemplate:.Status" | grep wazuh
Ambil 100.000 baris terakhir dari container manager:
sudo docker exec single-node-wazuh.manager-1 \ tail -n 100000 /var/ossec/logs/archives/archives.json \ > archives_sample.json
Lalu jalankan:
python3 wazuh_archive_outlier.py \ --input archives_sample.json \ --max-lines 0 \ --contamination 0.01 \ --output-dir hasil-outlier
== Catatan penting
Script ini belum menentukan “serangan” secara pasti. Ia hanya menjawab:
> “Event mana yang paling berbeda dari pola mayoritas log?”
Jadi hasil `outliers.csv` tetap perlu dibaca SOC analyst. Biasanya yang menarik adalah event dengan kombinasi:
rule.level tinggi source IP jarang destination port aneh decoder tidak biasa agent tertentu tiba-tiba berbeda full_log sangat panjang event muncul pada jam tidak biasa
Untuk lab Wazuh, ini sudah cukup bagus sebagai tahap awal sebelum nanti ditambah MITRE ATT&CK mapping, risk score, dan LLM summary.