Cyber Security: Python: Compare Isolasi Outliar vs Alert

From OnnoWiki
Revision as of 04:41, 23 June 2026 by Onnowpurbo (talk | contribs) (Created page with "<pre> #!/usr/bin/env python3 """ compare_outlier_alerts.py Membandingkan hasil outlier Wazuh dengan alerts.json. Tujuan: - Membaca semua file hasil outlier dari folder terte...")
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Jump to navigation Jump to search
#!/usr/bin/env python3
"""
compare_outlier_alerts.py

Membandingkan hasil outlier Wazuh dengan alerts.json.

Tujuan:
- Membaca semua file hasil outlier dari folder tertentu.
- Membaca alerts.json Wazuh.
- Menormalkan metadata penting seperti timestamp, agent, rule, location, srcip, dstip, full_log.
- Mencocokkan outlier dengan alert berdasarkan metadata yang tersedia.
- Menghasilkan CSV ringkasan untuk investigasi SOC / threat hunting.

Contoh:
python3 compare_outlier_alerts.py \
  --outlier-dir hasil-outlier \
  --alerts /opt/wazuh-data/logs/alerts/alerts.json \
  --output-dir hasil-compare \
  --max-alert-lines 200000 \
  --time-window 300 \
  --min-score 8
"""

import argparse
import hashlib
import json
import math
from collections import deque
from pathlib import Path
from typing import Any, Iterable, Optional, Tuple

import pandas as pd


# Mapping nama metadata standar ke kemungkinan nama kolom dari Wazuh / output outlier.
CANON_FIELDS = {
    "timestamp": ["timestamp", "@timestamp", "event.timestamp"],
    "agent_id": ["agent.id", "agent_id"],
    "agent_name": ["agent.name", "agent_name", "host.name", "hostname"],
    "agent_ip": ["agent.ip", "agent_ip", "host.ip"],
    "manager_name": ["manager.name", "manager_name"],
    "rule_id": ["rule.id", "rule_id"],
    "rule_level": ["rule.level", "rule_level"],
    "rule_desc": ["rule.description", "rule_desc", "description"],
    "decoder": ["decoder.name", "decoder_name"],
    "location": ["location", "log.file.path"],
    "srcip": ["data.srcip", "srcip", "source.ip"],
    "dstip": ["data.dstip", "dstip", "destination.ip"],
    "srcport": ["data.srcport", "srcport", "source.port"],
    "dstport": ["data.dstport", "dstport", "destination.port"],
    "user": [
        "data.srcuser",
        "data.dstuser",
        "data.user",
        "data.username",
        "user",
        "username",
    ],
    "program_name": ["program_name", "program.name", "data.program_name"],
    "full_log": ["full_log", "message", "log", "raw_log"],
}


EXTRA_KEYWORDS = [
    "score",
    "outlier",
    "anomaly",
    "cluster",
    "label",
    "source_file",
    "prediction",
    "isolation",
    "distance",
    "severity",
    "risk",
]


def clean_scalar(value: Any) -> str:
    """
    Mengubah nilai apa pun menjadi string yang aman.
    Aman untuk NaN, None, angka, dict, list, dan pandas scalar.
    """
    if value is None:
        return ""

    # pd.isna pada list/dict bisa menghasilkan array boolean dan error ambiguity.
    if isinstance(value, (list, tuple, dict, set)):
        try:
            return json.dumps(value, ensure_ascii=False, sort_keys=True).strip()
        except Exception:
            return str(value).strip()

    try:
        if pd.isna(value):
            return ""
    except Exception:
        pass

    if isinstance(value, float) and math.isnan(value):
        return ""

    text = str(value).strip()
    if text.lower() in {"", "nan", "none", "null", "<na>", "nat"}:
        return ""

    return text


def safe_hash(value: Any) -> str:
    """
    Membuat SHA1 hash dari full_log/message secara aman.
    Tidak error kalau value kosong, NaN, atau bukan string.
    """
    text = clean_scalar(value)
    if not text:
        return ""
    return hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest()


def read_jsonl(path: Path, max_lines: Optional[int] = None) -> Tuple[pd.DataFrame, int]:
    """
    Membaca file JSONL/NDJSON.
    Kalau max_lines diberikan, baca hanya N baris terakhir agar aman untuk file besar.
    """
    records = []
    bad = 0

    with path.open("r", encoding="utf-8", errors="replace") as f:
        if max_lines is not None and max_lines > 0:
            iterable: Iterable[str] = deque(f, maxlen=max_lines)
        else:
            iterable = f

        for line in iterable:
            line = line.strip()
            if not line:
                continue

            try:
                records.append(json.loads(line))
            except Exception:
                bad += 1

    if not records:
        return pd.DataFrame(), bad

    return pd.json_normalize(records, sep="."), bad


def read_json_file(path: Path) -> pd.DataFrame:
    """
    Membaca file .json.
    Mendukung:
    - JSON lines
    - list of dict
    - dict tunggal
    - dict dengan key "records"
    """
    try:
        # Banyak output pipeline JSON sebenarnya JSONL walaupun ekstensi .json.
        return pd.read_json(path, lines=True)
    except Exception:
        pass

    with path.open("r", encoding="utf-8", errors="replace") as f:
        obj = json.load(f)

    if isinstance(obj, list):
        return pd.json_normalize(obj, sep=".")

    if isinstance(obj, dict):
        for key in ["records", "data", "rows", "items", "events", "alerts", "outliers"]:
            if key in obj and isinstance(obj[key], list):
                return pd.json_normalize(obj[key], sep=".")

        return pd.json_normalize([obj], sep=".")

    return pd.DataFrame()


def read_any_file(path: Path) -> pd.DataFrame:
    """
    Membaca file hasil outlier dengan beberapa format umum.
    """
    suffix = path.suffix.lower()

    try:
        if suffix == ".csv":
            return pd.read_csv(path)

        if suffix in [".jsonl", ".ndjson"]:
            df, _ = read_jsonl(path)
            return df

        if suffix == ".json":
            return read_json_file(path)

        if suffix == ".parquet":
            return pd.read_parquet(path)

        if suffix in [".xlsx", ".xls"]:
            return pd.read_excel(path)

    except Exception as e:
        print(f"[WARN] Gagal baca {path}: {e}")

    return pd.DataFrame()


def load_outlier_dir(outlier_dir: Path, output_dir: Optional[Path] = None) -> pd.DataFrame:
    """
    Membaca semua file data di folder outlier.
    File hasil compare sebelumnya dilewati agar tidak ikut terbaca ulang.
    """
    patterns = ["*.csv", "*.json", "*.jsonl", "*.ndjson", "*.parquet", "*.xlsx", "*.xls"]

    files = []
    for pattern in patterns:
        files.extend(outlier_dir.rglob(pattern))

    frames = []
    for p in sorted(set(files)):
        p_resolved = p.resolve()

        # Hindari membaca output compare sebelumnya.
        if output_dir is not None:
            try:
                if output_dir.resolve() in p_resolved.parents or output_dir.resolve() == p_resolved.parent:
                    continue
            except Exception:
                pass

        if "hasil-compare" in str(p):
            continue

        df = read_any_file(p)
        if df.empty:
            continue

        df["__outlier_source_file"] = str(p)
        frames.append(df)

    if not frames:
        return pd.DataFrame()

    return pd.concat(frames, ignore_index=True, sort=False)


def pick_col(df: pd.DataFrame, aliases: list[str]) -> Optional[str]:
    """
    Mencari kolom berdasarkan alias.
    Prioritas:
    1. Exact match case-insensitive.
    2. Fallback suffix, misalnya "wazuh.rule.id" cocok dengan "rule.id".
    """
    if df.empty:
        return None

    lower_map = {str(c).lower(): c for c in df.columns}

    for alias in aliases:
        key = alias.lower()
        if key in lower_map:
            return lower_map[key]

    for alias in aliases:
        a = alias.lower()
        for c in df.columns:
            cl = str(c).lower()
            if cl.endswith("." + a) or cl.endswith("_" + a.replace(".", "_")):
                return c

    return None


def normalize(df: pd.DataFrame, prefix: str) -> pd.DataFrame:
    """
    Menormalkan dataframe mentah menjadi kolom metadata standar.
    """
    out = pd.DataFrame(index=df.index)

    for canon, aliases in CANON_FIELDS.items():
        col = pick_col(df, aliases)
        if col is not None:
            out[canon] = df[col]
        else:
            out[canon] = ""

    # Simpan kolom penting dari output outlier, kalau ada.
    for c in df.columns:
        lc = str(c).lower()
        if any(k in lc for k in EXTRA_KEYWORDS):
            out[f"extra_{c}"] = df[c]

    # Bersihkan kolom metadata agar matching tidak rusak oleh NaN/float.
    for c in CANON_FIELDS.keys():
        out[c] = out[c].map(clean_scalar)

    out["timestamp_dt"] = pd.to_datetime(out["timestamp"], errors="coerce", utc=True)

    out["agent_key"] = (
        out["agent_id"].map(clean_scalar)
        + "|"
        + out["agent_name"].map(clean_scalar)
    )

    out["full_log_hash"] = out["full_log"].map(safe_hash)
    out["__type"] = prefix

    return out


def value_match(a: Any, b: Any) -> bool:
    """
    True jika dua nilai non-kosong sama setelah dibersihkan.
    """
    aa = clean_scalar(a)
    bb = clean_scalar(b)
    return bool(aa and bb and aa == bb)


def score_candidate(out_row: pd.Series, alert_row: pd.Series, time_window: int) -> Tuple[int, str]:
    """
    Menghitung skor kemiripan antara satu outlier dan satu alert.
    Skor tinggi berarti metadata lebih cocok.
    """
    score = 0
    matched_fields = []

    # full_log adalah indikator paling kuat jika tersedia.
    if value_match(out_row.get("full_log_hash", ""), alert_row.get("full_log_hash", "")):
        score += 50
        matched_fields.append("full_log")

    weighted_fields = [
        ("agent_id", 8),
        ("agent_name", 6),
        ("agent_ip", 5),
        ("location", 6),
        ("srcip", 5),
        ("dstip", 5),
        ("srcport", 3),
        ("dstport", 3),
        ("user", 4),
        ("program_name", 4),
        ("rule_id", 8),
        ("decoder", 5),
    ]

    for field, weight in weighted_fields:
        if value_match(out_row.get(field, ""), alert_row.get(field, "")):
            score += weight
            matched_fields.append(field)

    out_ts = out_row.get("timestamp_dt")
    alert_ts = alert_row.get("timestamp_dt")

    if pd.notna(out_ts) and pd.notna(alert_ts) and time_window > 0:
        delta = abs((out_ts - alert_ts).total_seconds())
        if delta <= time_window:
            score += max(1, int(10 * (1 - delta / time_window)))
            matched_fields.append(f"time±{int(delta)}s")

    return score, ",".join(matched_fields)


def is_valid_agent_key(agent_key: Any) -> bool:
    text = clean_scalar(agent_key)
    return bool(text and text not in {"|", "nan|nan", "None|None"})


def compare(
    outliers: pd.DataFrame,
    alerts: pd.DataFrame,
    time_window: int,
    min_score: int,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Mencocokkan setiap outlier ke alert terbaik.
    """
    rows = []
    no_match = []

    alerts = alerts.copy()
    alerts["__alert_index"] = alerts.index

    for _, o in outliers.iterrows():
        candidates = alerts

        # Batasi kandidat berdasarkan waktu jika timestamp outlier tersedia.
        if pd.notna(o.get("timestamp_dt")) and time_window > 0:
            tmin = o["timestamp_dt"] - pd.Timedelta(seconds=time_window)
            tmax = o["timestamp_dt"] + pd.Timedelta(seconds=time_window)

            c_time = candidates[
                (candidates["timestamp_dt"].notna())
                & (candidates["timestamp_dt"] >= tmin)
                & (candidates["timestamp_dt"] <= tmax)
            ]

            if not c_time.empty:
                candidates = c_time

        # Batasi kandidat berdasarkan agent jika tersedia.
        if is_valid_agent_key(o.get("agent_key")):
            c_agent = candidates[candidates["agent_key"] == o["agent_key"]]
            if not c_agent.empty:
                candidates = c_agent

        best = None
        best_score = -1
        best_fields = ""

        for _, a in candidates.iterrows():
            s, fields = score_candidate(o, a, time_window)
            if s > best_score:
                best_score = s
                best = a
                best_fields = fields

        if best is not None and best_score >= min_score:
            row = {
                "match_score": best_score,
                "matched_fields": best_fields,
            }

            for c in outliers.columns:
                if c != "__type":
                    row[f"outlier_{c}"] = o[c]

            for c in alerts.columns:
                if c != "__type":
                    row[f"alert_{c}"] = best[c]

            rows.append(row)
        else:
            row = {
                "match_score": best_score,
                "matched_fields": best_fields,
            }

            for c in outliers.columns:
                if c != "__type":
                    row[f"outlier_{c}"] = o[c]

            no_match.append(row)

    return pd.DataFrame(rows), pd.DataFrame(no_match)


def metadata_presence(raw_outliers: pd.DataFrame, raw_alerts: pd.DataFrame) -> pd.DataFrame:
    """
    Membuat tabel metadata apa saja yang ada di outlier dan alerts.json.
    """
    rows = []

    for canon, aliases in CANON_FIELDS.items():
        out_col = pick_col(raw_outliers, aliases)
        alert_col = pick_col(raw_alerts, aliases)

        rows.append({
            "metadata": canon,
            "ada_di_hasil_outlier": bool(out_col),
            "kolom_hasil_outlier": out_col if out_col else "",
            "ada_di_alerts_json": bool(alert_col),
            "kolom_alerts_json": alert_col if alert_col else "",
        })

    return pd.DataFrame(rows)


def metadata_match_counts(matched: pd.DataFrame) -> pd.DataFrame:
    """
    Menghitung metadata apa yang paling sering menjadi dasar kecocokan.
    """
    if matched.empty or "matched_fields" not in matched.columns:
        return pd.DataFrame(columns=["metadata", "count"])

    counts = {}

    for fields in matched["matched_fields"].fillna(""):
        for field in str(fields).split(","):
            field = field.strip()
            if not field:
                continue
            # Normalisasi time±10s menjadi time.
            if field.startswith("time±"):
                field = "time"
            counts[field] = counts.get(field, 0) + 1

    return (
        pd.DataFrame(
            [{"metadata": k, "count": v} for k, v in counts.items()]
        )
        .sort_values("count", ascending=False)
        .reset_index(drop=True)
        if counts
        else pd.DataFrame(columns=["metadata", "count"])
    )


def safe_group_count(df: pd.DataFrame, columns: list[str]) -> pd.DataFrame:
    """
    Groupby aman walau kolom tidak ada atau dataframe kosong.
    """
    if df.empty:
        return pd.DataFrame(columns=columns + ["count"])

    missing = [c for c in columns if c not in df.columns]
    if missing:
        return pd.DataFrame(columns=columns + ["count"])

    return (
        df.groupby(columns, dropna=False)
        .size()
        .reset_index(name="count")
        .sort_values("count", ascending=False)
        .reset_index(drop=True)
    )


def write_csv(df: pd.DataFrame, path: Path) -> None:
    """
    Menulis CSV dengan encoding aman.
    """
    df.to_csv(path, index=False, encoding="utf-8")


def build_summary(
    outlier_dir: Path,
    alerts_path: Path,
    raw_outliers: pd.DataFrame,
    raw_alerts: pd.DataFrame,
    matched: pd.DataFrame,
    no_match: pd.DataFrame,
    time_window: int,
    min_score: int,
    bad_alert_lines: int,
) -> str:
    lines = []
    lines.append("HASIL PERBANDINGAN OUTLIER VS ALERTS.JSON")
    lines.append("=" * 60)
    lines.append(f"Folder outlier       : {outlier_dir}")
    lines.append(f"File alerts.json     : {alerts_path}")
    lines.append(f"Rows outlier         : {len(raw_outliers):,}")
    lines.append(f"Rows alerts dibaca   : {len(raw_alerts):,}")
    lines.append(f"Baris alerts rusak   : {bad_alert_lines:,}")
    lines.append(f"Matched              : {len(matched):,}")
    lines.append(f"Tidak matched        : {len(no_match):,}")
    lines.append(f"Time window          : ±{time_window} detik")
    lines.append(f"Minimum match score  : {min_score}")

    if len(raw_outliers) > 0:
        pct = 100 * len(matched) / len(raw_outliers)
        lines.append(f"Persentase matched   : {pct:.2f}%")

    lines.append("")
    lines.append("File output:")
    lines.append("- metadata_presence.csv")
    lines.append("- metadata_match_counts.csv")
    lines.append("- matched_outliers_vs_alerts.csv")
    lines.append("- outliers_without_matching_alert.csv")
    lines.append("- matched_alert_rule_counts.csv")
    lines.append("- matched_agent_counts.csv")
    lines.append("- summary.txt")
    lines.append("")
    lines.append("Interpretasi singkat:")
    lines.append("- Matched: outlier juga muncul sebagai alert Wazuh; prioritas investigasi lebih tinggi.")
    lines.append("- Tidak matched: aneh secara statistik, tetapi belum tentu dianggap alert oleh rule Wazuh.")
    lines.append("- metadata_presence.csv menunjukkan metadata apa yang tersedia untuk proses matching.")
    lines.append("- metadata_match_counts.csv menunjukkan metadata apa yang paling sering dipakai untuk match.")

    return "\n".join(lines)


def parse_args() -> argparse.Namespace:
    ap = argparse.ArgumentParser(
        description="Bandingkan hasil outlier Wazuh dengan alerts.json berdasarkan metadata."
    )
    ap.add_argument("--outlier-dir", required=True, help="Folder output hasil outlier.")
    ap.add_argument("--alerts", required=True, help="Path ke alerts.json Wazuh.")
    ap.add_argument("--output-dir", default="hasil-compare", help="Folder output hasil compare.")
    ap.add_argument("--max-alert-lines", type=int, default=200000, help="Baca N baris terakhir alerts.json.")
    ap.add_argument("--time-window", type=int, default=300, help="Window waktu matching dalam detik.")
    ap.add_argument("--min-score", type=int, default=8, help="Skor minimum agar dianggap match.")
    return ap.parse_args()


def main() -> None:
    args = parse_args()

    outlier_dir = Path(args.outlier_dir)
    alerts_path = Path(args.alerts)
    output_dir = Path(args.output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    if not outlier_dir.exists():
        raise SystemExit(f"ERROR: Folder outlier tidak ditemukan: {outlier_dir}")

    if not outlier_dir.is_dir():
        raise SystemExit(f"ERROR: --outlier-dir bukan folder: {outlier_dir}")

    if not alerts_path.exists():
        raise SystemExit(f"ERROR: alerts.json tidak ditemukan: {alerts_path}")

    if not alerts_path.is_file():
        raise SystemExit(f"ERROR: --alerts bukan file: {alerts_path}")

    print("[1] Membaca hasil outlier...")
    raw_outliers = load_outlier_dir(outlier_dir, output_dir=output_dir)
    if raw_outliers.empty:
        raise SystemExit("ERROR: Tidak ada file CSV/JSON/JSONL/NDJSON/PARQUET/XLSX yang bisa dibaca di folder outlier.")

    print(f"    Rows outlier terbaca: {len(raw_outliers):,}")

    print("[2] Membaca alerts.json...")
    raw_alerts, bad = read_jsonl(alerts_path, max_lines=args.max_alert_lines)
    if raw_alerts.empty:
        raise SystemExit("ERROR: alerts.json kosong atau tidak bisa dibaca.")

    print(f"    Rows alert terbaca: {len(raw_alerts):,}")
    if bad:
        print(f"    Warning: baris JSON rusak: {bad:,}")

    print("[3] Mengecek metadata yang tersedia...")
    meta = metadata_presence(raw_outliers, raw_alerts)
    write_csv(meta, output_dir / "metadata_presence.csv")

    print("[4] Normalisasi metadata...")
    outliers = normalize(raw_outliers, "outlier")
    alerts = normalize(raw_alerts, "alert")

    print("[5] Matching outlier vs alert...")
    matched, no_match = compare(
        outliers=outliers,
        alerts=alerts,
        time_window=args.time_window,
        min_score=args.min_score,
    )

    print("[6] Menulis output...")
    write_csv(matched, output_dir / "matched_outliers_vs_alerts.csv")
    write_csv(no_match, output_dir / "outliers_without_matching_alert.csv")

    rule_counts = safe_group_count(
        matched,
        ["alert_rule_id", "alert_rule_level", "alert_rule_desc"],
    )
    write_csv(rule_counts, output_dir / "matched_alert_rule_counts.csv")

    agent_counts = safe_group_count(
        matched,
        ["alert_agent_id", "alert_agent_name", "alert_agent_ip"],
    )
    write_csv(agent_counts, output_dir / "matched_agent_counts.csv")

    meta_counts = metadata_match_counts(matched)
    write_csv(meta_counts, output_dir / "metadata_match_counts.csv")

    summary = build_summary(
        outlier_dir=outlier_dir,
        alerts_path=alerts_path,
        raw_outliers=raw_outliers,
        raw_alerts=raw_alerts,
        matched=matched,
        no_match=no_match,
        time_window=args.time_window,
        min_score=args.min_score,
        bad_alert_lines=bad,
    )

    (output_dir / "summary.txt").write_text(summary, encoding="utf-8")

    print()
    print(summary)


if __name__ == "__main__":
    main()

<\pre>