Cyber Security: Python: Compare Isolasi Outliar vs Alert
Jump to navigation
Jump to search
#!/usr/bin/env python3
"""
compare_outlier_alerts.py
Membandingkan hasil outlier Wazuh dengan alerts.json.
Tujuan:
- Membaca semua file hasil outlier dari folder tertentu.
- Membaca alerts.json Wazuh.
- Menormalkan metadata penting seperti timestamp, agent, rule, location, srcip, dstip, full_log.
- Mencocokkan outlier dengan alert berdasarkan metadata yang tersedia.
- Menghasilkan CSV ringkasan untuk investigasi SOC / threat hunting.
Contoh:
python3 compare_outlier_alerts.py \
--outlier-dir hasil-outlier \
--alerts /opt/wazuh-data/logs/alerts/alerts.json \
--output-dir hasil-compare \
--max-alert-lines 200000 \
--time-window 300 \
--min-score 8
"""
import argparse
import hashlib
import json
import math
from collections import deque
from pathlib import Path
from typing import Any, Iterable, Optional, Tuple
import pandas as pd
# Mapping nama metadata standar ke kemungkinan nama kolom dari Wazuh / output outlier.
CANON_FIELDS = {
"timestamp": ["timestamp", "@timestamp", "event.timestamp"],
"agent_id": ["agent.id", "agent_id"],
"agent_name": ["agent.name", "agent_name", "host.name", "hostname"],
"agent_ip": ["agent.ip", "agent_ip", "host.ip"],
"manager_name": ["manager.name", "manager_name"],
"rule_id": ["rule.id", "rule_id"],
"rule_level": ["rule.level", "rule_level"],
"rule_desc": ["rule.description", "rule_desc", "description"],
"decoder": ["decoder.name", "decoder_name"],
"location": ["location", "log.file.path"],
"srcip": ["data.srcip", "srcip", "source.ip"],
"dstip": ["data.dstip", "dstip", "destination.ip"],
"srcport": ["data.srcport", "srcport", "source.port"],
"dstport": ["data.dstport", "dstport", "destination.port"],
"user": [
"data.srcuser",
"data.dstuser",
"data.user",
"data.username",
"user",
"username",
],
"program_name": ["program_name", "program.name", "data.program_name"],
"full_log": ["full_log", "message", "log", "raw_log"],
}
EXTRA_KEYWORDS = [
"score",
"outlier",
"anomaly",
"cluster",
"label",
"source_file",
"prediction",
"isolation",
"distance",
"severity",
"risk",
]
def clean_scalar(value: Any) -> str:
"""
Mengubah nilai apa pun menjadi string yang aman.
Aman untuk NaN, None, angka, dict, list, dan pandas scalar.
"""
if value is None:
return ""
# pd.isna pada list/dict bisa menghasilkan array boolean dan error ambiguity.
if isinstance(value, (list, tuple, dict, set)):
try:
return json.dumps(value, ensure_ascii=False, sort_keys=True).strip()
except Exception:
return str(value).strip()
try:
if pd.isna(value):
return ""
except Exception:
pass
if isinstance(value, float) and math.isnan(value):
return ""
text = str(value).strip()
if text.lower() in {"", "nan", "none", "null", "<na>", "nat"}:
return ""
return text
def safe_hash(value: Any) -> str:
"""
Membuat SHA1 hash dari full_log/message secara aman.
Tidak error kalau value kosong, NaN, atau bukan string.
"""
text = clean_scalar(value)
if not text:
return ""
return hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest()
def read_jsonl(path: Path, max_lines: Optional[int] = None) -> Tuple[pd.DataFrame, int]:
"""
Membaca file JSONL/NDJSON.
Kalau max_lines diberikan, baca hanya N baris terakhir agar aman untuk file besar.
"""
records = []
bad = 0
with path.open("r", encoding="utf-8", errors="replace") as f:
if max_lines is not None and max_lines > 0:
iterable: Iterable[str] = deque(f, maxlen=max_lines)
else:
iterable = f
for line in iterable:
line = line.strip()
if not line:
continue
try:
records.append(json.loads(line))
except Exception:
bad += 1
if not records:
return pd.DataFrame(), bad
return pd.json_normalize(records, sep="."), bad
def read_json_file(path: Path) -> pd.DataFrame:
"""
Membaca file .json.
Mendukung:
- JSON lines
- list of dict
- dict tunggal
- dict dengan key "records"
"""
try:
# Banyak output pipeline JSON sebenarnya JSONL walaupun ekstensi .json.
return pd.read_json(path, lines=True)
except Exception:
pass
with path.open("r", encoding="utf-8", errors="replace") as f:
obj = json.load(f)
if isinstance(obj, list):
return pd.json_normalize(obj, sep=".")
if isinstance(obj, dict):
for key in ["records", "data", "rows", "items", "events", "alerts", "outliers"]:
if key in obj and isinstance(obj[key], list):
return pd.json_normalize(obj[key], sep=".")
return pd.json_normalize([obj], sep=".")
return pd.DataFrame()
def read_any_file(path: Path) -> pd.DataFrame:
"""
Membaca file hasil outlier dengan beberapa format umum.
"""
suffix = path.suffix.lower()
try:
if suffix == ".csv":
return pd.read_csv(path)
if suffix in [".jsonl", ".ndjson"]:
df, _ = read_jsonl(path)
return df
if suffix == ".json":
return read_json_file(path)
if suffix == ".parquet":
return pd.read_parquet(path)
if suffix in [".xlsx", ".xls"]:
return pd.read_excel(path)
except Exception as e:
print(f"[WARN] Gagal baca {path}: {e}")
return pd.DataFrame()
def load_outlier_dir(outlier_dir: Path, output_dir: Optional[Path] = None) -> pd.DataFrame:
"""
Membaca semua file data di folder outlier.
File hasil compare sebelumnya dilewati agar tidak ikut terbaca ulang.
"""
patterns = ["*.csv", "*.json", "*.jsonl", "*.ndjson", "*.parquet", "*.xlsx", "*.xls"]
files = []
for pattern in patterns:
files.extend(outlier_dir.rglob(pattern))
frames = []
for p in sorted(set(files)):
p_resolved = p.resolve()
# Hindari membaca output compare sebelumnya.
if output_dir is not None:
try:
if output_dir.resolve() in p_resolved.parents or output_dir.resolve() == p_resolved.parent:
continue
except Exception:
pass
if "hasil-compare" in str(p):
continue
df = read_any_file(p)
if df.empty:
continue
df["__outlier_source_file"] = str(p)
frames.append(df)
if not frames:
return pd.DataFrame()
return pd.concat(frames, ignore_index=True, sort=False)
def pick_col(df: pd.DataFrame, aliases: list[str]) -> Optional[str]:
"""
Mencari kolom berdasarkan alias.
Prioritas:
1. Exact match case-insensitive.
2. Fallback suffix, misalnya "wazuh.rule.id" cocok dengan "rule.id".
"""
if df.empty:
return None
lower_map = {str(c).lower(): c for c in df.columns}
for alias in aliases:
key = alias.lower()
if key in lower_map:
return lower_map[key]
for alias in aliases:
a = alias.lower()
for c in df.columns:
cl = str(c).lower()
if cl.endswith("." + a) or cl.endswith("_" + a.replace(".", "_")):
return c
return None
def normalize(df: pd.DataFrame, prefix: str) -> pd.DataFrame:
"""
Menormalkan dataframe mentah menjadi kolom metadata standar.
"""
out = pd.DataFrame(index=df.index)
for canon, aliases in CANON_FIELDS.items():
col = pick_col(df, aliases)
if col is not None:
out[canon] = df[col]
else:
out[canon] = ""
# Simpan kolom penting dari output outlier, kalau ada.
for c in df.columns:
lc = str(c).lower()
if any(k in lc for k in EXTRA_KEYWORDS):
out[f"extra_{c}"] = df[c]
# Bersihkan kolom metadata agar matching tidak rusak oleh NaN/float.
for c in CANON_FIELDS.keys():
out[c] = out[c].map(clean_scalar)
out["timestamp_dt"] = pd.to_datetime(out["timestamp"], errors="coerce", utc=True)
out["agent_key"] = (
out["agent_id"].map(clean_scalar)
+ "|"
+ out["agent_name"].map(clean_scalar)
)
out["full_log_hash"] = out["full_log"].map(safe_hash)
out["__type"] = prefix
return out
def value_match(a: Any, b: Any) -> bool:
"""
True jika dua nilai non-kosong sama setelah dibersihkan.
"""
aa = clean_scalar(a)
bb = clean_scalar(b)
return bool(aa and bb and aa == bb)
def score_candidate(out_row: pd.Series, alert_row: pd.Series, time_window: int) -> Tuple[int, str]:
"""
Menghitung skor kemiripan antara satu outlier dan satu alert.
Skor tinggi berarti metadata lebih cocok.
"""
score = 0
matched_fields = []
# full_log adalah indikator paling kuat jika tersedia.
if value_match(out_row.get("full_log_hash", ""), alert_row.get("full_log_hash", "")):
score += 50
matched_fields.append("full_log")
weighted_fields = [
("agent_id", 8),
("agent_name", 6),
("agent_ip", 5),
("location", 6),
("srcip", 5),
("dstip", 5),
("srcport", 3),
("dstport", 3),
("user", 4),
("program_name", 4),
("rule_id", 8),
("decoder", 5),
]
for field, weight in weighted_fields:
if value_match(out_row.get(field, ""), alert_row.get(field, "")):
score += weight
matched_fields.append(field)
out_ts = out_row.get("timestamp_dt")
alert_ts = alert_row.get("timestamp_dt")
if pd.notna(out_ts) and pd.notna(alert_ts) and time_window > 0:
delta = abs((out_ts - alert_ts).total_seconds())
if delta <= time_window:
score += max(1, int(10 * (1 - delta / time_window)))
matched_fields.append(f"time±{int(delta)}s")
return score, ",".join(matched_fields)
def is_valid_agent_key(agent_key: Any) -> bool:
text = clean_scalar(agent_key)
return bool(text and text not in {"|", "nan|nan", "None|None"})
def compare(
outliers: pd.DataFrame,
alerts: pd.DataFrame,
time_window: int,
min_score: int,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""
Mencocokkan setiap outlier ke alert terbaik.
"""
rows = []
no_match = []
alerts = alerts.copy()
alerts["__alert_index"] = alerts.index
for _, o in outliers.iterrows():
candidates = alerts
# Batasi kandidat berdasarkan waktu jika timestamp outlier tersedia.
if pd.notna(o.get("timestamp_dt")) and time_window > 0:
tmin = o["timestamp_dt"] - pd.Timedelta(seconds=time_window)
tmax = o["timestamp_dt"] + pd.Timedelta(seconds=time_window)
c_time = candidates[
(candidates["timestamp_dt"].notna())
& (candidates["timestamp_dt"] >= tmin)
& (candidates["timestamp_dt"] <= tmax)
]
if not c_time.empty:
candidates = c_time
# Batasi kandidat berdasarkan agent jika tersedia.
if is_valid_agent_key(o.get("agent_key")):
c_agent = candidates[candidates["agent_key"] == o["agent_key"]]
if not c_agent.empty:
candidates = c_agent
best = None
best_score = -1
best_fields = ""
for _, a in candidates.iterrows():
s, fields = score_candidate(o, a, time_window)
if s > best_score:
best_score = s
best = a
best_fields = fields
if best is not None and best_score >= min_score:
row = {
"match_score": best_score,
"matched_fields": best_fields,
}
for c in outliers.columns:
if c != "__type":
row[f"outlier_{c}"] = o[c]
for c in alerts.columns:
if c != "__type":
row[f"alert_{c}"] = best[c]
rows.append(row)
else:
row = {
"match_score": best_score,
"matched_fields": best_fields,
}
for c in outliers.columns:
if c != "__type":
row[f"outlier_{c}"] = o[c]
no_match.append(row)
return pd.DataFrame(rows), pd.DataFrame(no_match)
def metadata_presence(raw_outliers: pd.DataFrame, raw_alerts: pd.DataFrame) -> pd.DataFrame:
"""
Membuat tabel metadata apa saja yang ada di outlier dan alerts.json.
"""
rows = []
for canon, aliases in CANON_FIELDS.items():
out_col = pick_col(raw_outliers, aliases)
alert_col = pick_col(raw_alerts, aliases)
rows.append({
"metadata": canon,
"ada_di_hasil_outlier": bool(out_col),
"kolom_hasil_outlier": out_col if out_col else "",
"ada_di_alerts_json": bool(alert_col),
"kolom_alerts_json": alert_col if alert_col else "",
})
return pd.DataFrame(rows)
def metadata_match_counts(matched: pd.DataFrame) -> pd.DataFrame:
"""
Menghitung metadata apa yang paling sering menjadi dasar kecocokan.
"""
if matched.empty or "matched_fields" not in matched.columns:
return pd.DataFrame(columns=["metadata", "count"])
counts = {}
for fields in matched["matched_fields"].fillna(""):
for field in str(fields).split(","):
field = field.strip()
if not field:
continue
# Normalisasi time±10s menjadi time.
if field.startswith("time±"):
field = "time"
counts[field] = counts.get(field, 0) + 1
return (
pd.DataFrame(
[{"metadata": k, "count": v} for k, v in counts.items()]
)
.sort_values("count", ascending=False)
.reset_index(drop=True)
if counts
else pd.DataFrame(columns=["metadata", "count"])
)
def safe_group_count(df: pd.DataFrame, columns: list[str]) -> pd.DataFrame:
"""
Groupby aman walau kolom tidak ada atau dataframe kosong.
"""
if df.empty:
return pd.DataFrame(columns=columns + ["count"])
missing = [c for c in columns if c not in df.columns]
if missing:
return pd.DataFrame(columns=columns + ["count"])
return (
df.groupby(columns, dropna=False)
.size()
.reset_index(name="count")
.sort_values("count", ascending=False)
.reset_index(drop=True)
)
def write_csv(df: pd.DataFrame, path: Path) -> None:
"""
Menulis CSV dengan encoding aman.
"""
df.to_csv(path, index=False, encoding="utf-8")
def build_summary(
outlier_dir: Path,
alerts_path: Path,
raw_outliers: pd.DataFrame,
raw_alerts: pd.DataFrame,
matched: pd.DataFrame,
no_match: pd.DataFrame,
time_window: int,
min_score: int,
bad_alert_lines: int,
) -> str:
lines = []
lines.append("HASIL PERBANDINGAN OUTLIER VS ALERTS.JSON")
lines.append("=" * 60)
lines.append(f"Folder outlier : {outlier_dir}")
lines.append(f"File alerts.json : {alerts_path}")
lines.append(f"Rows outlier : {len(raw_outliers):,}")
lines.append(f"Rows alerts dibaca : {len(raw_alerts):,}")
lines.append(f"Baris alerts rusak : {bad_alert_lines:,}")
lines.append(f"Matched : {len(matched):,}")
lines.append(f"Tidak matched : {len(no_match):,}")
lines.append(f"Time window : ±{time_window} detik")
lines.append(f"Minimum match score : {min_score}")
if len(raw_outliers) > 0:
pct = 100 * len(matched) / len(raw_outliers)
lines.append(f"Persentase matched : {pct:.2f}%")
lines.append("")
lines.append("File output:")
lines.append("- metadata_presence.csv")
lines.append("- metadata_match_counts.csv")
lines.append("- matched_outliers_vs_alerts.csv")
lines.append("- outliers_without_matching_alert.csv")
lines.append("- matched_alert_rule_counts.csv")
lines.append("- matched_agent_counts.csv")
lines.append("- summary.txt")
lines.append("")
lines.append("Interpretasi singkat:")
lines.append("- Matched: outlier juga muncul sebagai alert Wazuh; prioritas investigasi lebih tinggi.")
lines.append("- Tidak matched: aneh secara statistik, tetapi belum tentu dianggap alert oleh rule Wazuh.")
lines.append("- metadata_presence.csv menunjukkan metadata apa yang tersedia untuk proses matching.")
lines.append("- metadata_match_counts.csv menunjukkan metadata apa yang paling sering dipakai untuk match.")
return "\n".join(lines)
def parse_args() -> argparse.Namespace:
ap = argparse.ArgumentParser(
description="Bandingkan hasil outlier Wazuh dengan alerts.json berdasarkan metadata."
)
ap.add_argument("--outlier-dir", required=True, help="Folder output hasil outlier.")
ap.add_argument("--alerts", required=True, help="Path ke alerts.json Wazuh.")
ap.add_argument("--output-dir", default="hasil-compare", help="Folder output hasil compare.")
ap.add_argument("--max-alert-lines", type=int, default=200000, help="Baca N baris terakhir alerts.json.")
ap.add_argument("--time-window", type=int, default=300, help="Window waktu matching dalam detik.")
ap.add_argument("--min-score", type=int, default=8, help="Skor minimum agar dianggap match.")
return ap.parse_args()
def main() -> None:
args = parse_args()
outlier_dir = Path(args.outlier_dir)
alerts_path = Path(args.alerts)
output_dir = Path(args.output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
if not outlier_dir.exists():
raise SystemExit(f"ERROR: Folder outlier tidak ditemukan: {outlier_dir}")
if not outlier_dir.is_dir():
raise SystemExit(f"ERROR: --outlier-dir bukan folder: {outlier_dir}")
if not alerts_path.exists():
raise SystemExit(f"ERROR: alerts.json tidak ditemukan: {alerts_path}")
if not alerts_path.is_file():
raise SystemExit(f"ERROR: --alerts bukan file: {alerts_path}")
print("[1] Membaca hasil outlier...")
raw_outliers = load_outlier_dir(outlier_dir, output_dir=output_dir)
if raw_outliers.empty:
raise SystemExit("ERROR: Tidak ada file CSV/JSON/JSONL/NDJSON/PARQUET/XLSX yang bisa dibaca di folder outlier.")
print(f" Rows outlier terbaca: {len(raw_outliers):,}")
print("[2] Membaca alerts.json...")
raw_alerts, bad = read_jsonl(alerts_path, max_lines=args.max_alert_lines)
if raw_alerts.empty:
raise SystemExit("ERROR: alerts.json kosong atau tidak bisa dibaca.")
print(f" Rows alert terbaca: {len(raw_alerts):,}")
if bad:
print(f" Warning: baris JSON rusak: {bad:,}")
print("[3] Mengecek metadata yang tersedia...")
meta = metadata_presence(raw_outliers, raw_alerts)
write_csv(meta, output_dir / "metadata_presence.csv")
print("[4] Normalisasi metadata...")
outliers = normalize(raw_outliers, "outlier")
alerts = normalize(raw_alerts, "alert")
print("[5] Matching outlier vs alert...")
matched, no_match = compare(
outliers=outliers,
alerts=alerts,
time_window=args.time_window,
min_score=args.min_score,
)
print("[6] Menulis output...")
write_csv(matched, output_dir / "matched_outliers_vs_alerts.csv")
write_csv(no_match, output_dir / "outliers_without_matching_alert.csv")
rule_counts = safe_group_count(
matched,
["alert_rule_id", "alert_rule_level", "alert_rule_desc"],
)
write_csv(rule_counts, output_dir / "matched_alert_rule_counts.csv")
agent_counts = safe_group_count(
matched,
["alert_agent_id", "alert_agent_name", "alert_agent_ip"],
)
write_csv(agent_counts, output_dir / "matched_agent_counts.csv")
meta_counts = metadata_match_counts(matched)
write_csv(meta_counts, output_dir / "metadata_match_counts.csv")
summary = build_summary(
outlier_dir=outlier_dir,
alerts_path=alerts_path,
raw_outliers=raw_outliers,
raw_alerts=raw_alerts,
matched=matched,
no_match=no_match,
time_window=args.time_window,
min_score=args.min_score,
bad_alert_lines=bad,
)
(output_dir / "summary.txt").write_text(summary, encoding="utf-8")
print()
print(summary)
if __name__ == "__main__":
main()
<\pre>