Difference between revisions of "Cyber Security: Python: Isolasi Outliar"

From OnnoWiki
Jump to navigation Jump to search
 
Line 482: Line 482:
  
 
== 4. Jalankan langsung ke file Wazuh==
 
== 4. Jalankan langsung ke file Wazuh==
 +
 +
sudo apt install -y acl
 +
 +
sudo setfacl -R -m u:onno:rx /opt/wazuh-data
 +
sudo setfacl -R -m u:onno:rx /opt/wazuh-data/logs
 +
sudo setfacl -R -m u:onno:rx /opt/wazuh-data/logs/archives
 +
sudo setfacl -m u:onno:r /opt/wazuh-data/logs/archives/archives.json
  
 
  python3 wazuh_archive_outlier.py \
 
  python3 wazuh_archive_outlier.py \

Latest revision as of 04:21, 23 June 2026

Bisa. Ini skrip Python read-only untuk menganalisis:

/opt/wazuh-data/logs/archives/archives.json

Tujuannya: membaca log Wazuh, membuat fitur sederhana, lalu memakai Isolation Forest untuk menemukan event yang polanya tidak umum / outlier.

1. Buat folder kerja

mkdir -p ~/Apps/Wazuh-Outlier
cd ~/Apps/Wazuh-Outlier

2. Buat virtual environment

sudo apt install python3.14-venv
python3 -m venv venv
source venv/bin/activate
pip install pandas numpy scikit-learn

3. Buat script

nano wazuh_archive_outlier.py

Isi dengan script ini:


#!/usr/bin/env python3
"""
wazuh_archive_outlier.py

Analisa outlier dari Wazuh archives.json.

Output:
- outliers.csv
- outliers.jsonl
- sample_all_scored.csv

Script ini READ ONLY terhadap file Wazuh.
Tidak menghapus, tidak memindahkan, tidak mengubah archives.json.
"""

import argparse
import json
import os
import sys
import math
from pathlib import Path
from collections import Counter
from datetime import datetime

import numpy as np
import pandas as pd
from sklearn.feature_extraction import FeatureHasher
from sklearn.ensemble import IsolationForest


CAT_FIELDS = [
    "agent.name", "agent.id", "manager.name", "location",
    "decoder.name", "rule.id", "rule.description",
    "data.srcip", "data.dstip", "data.srcport", "data.dstport",
    "data.protocol", "data.action", "data.status", "data.url",
    "srcip", "dstip", "srcport", "dstport", "protocol",
    "program_name", "syscheck.path",
    "win.system.eventID", "win.system.providerName",
]

NUM_FIELDS = [
    "rule.level", "data.srcport", "data.dstport",
    "srcport", "dstport", "win.system.eventID",
]

COMMON_DST_PORTS = {
    22, 25, 53, 80, 110, 123, 143, 443, 465, 587,
    993, 995, 1514, 1515, 9200, 5601
}


def read_last_lines(path: str, max_lines: int):
    """
    Membaca hanya N baris terakhir dari file besar.
    max_lines = 0 berarti baca seluruh file.
    """
    if max_lines <= 0:
        with open(path, "r", encoding="utf-8", errors="replace") as f:
            return list(f)

    block_size = 1024 * 1024
    data = b""

    with open(path, "rb") as f:
        f.seek(0, os.SEEK_END)
        pos = f.tell()

        while pos > 0:
            read_size = min(block_size, pos)
            pos -= read_size
            f.seek(pos)
            data = f.read(read_size) + data

            if data.count(b"\n") > max_lines:
                break

    lines = data.splitlines()[-max_lines:]
    return [line.decode("utf-8", errors="replace") for line in lines]


def flatten_json(obj, parent="", out=None, depth=0, max_depth=4):
    """
    Ubah JSON nested menjadi dot notation.
    Contoh:
    agent.name
    rule.level
    data.srcip
    """
    if out is None:
        out = {}

    if depth > max_depth:
        return out

    if isinstance(obj, dict):
        for k, v in obj.items():
            key = f"{parent}.{k}" if parent else str(k)
            flatten_json(v, key, out, depth + 1, max_depth)
    elif isinstance(obj, list):
        if all(not isinstance(x, (dict, list)) for x in obj):
            out[parent] = ",".join(str(x) for x in obj[:20])
        else:
            out[parent] = f"list_len_{len(obj)}"
    else:
        out[parent] = obj

    return out


def clean_value(v, max_len=140):
    if v is None:
        return ""
    s = str(v).strip()
    if not s or s.lower() in {"none", "null", "nan", "-"}:
        return ""
    return s[:max_len]


def pick(flat, *keys):
    for k in keys:
        v = clean_value(flat.get(k))
        if v:
            return v
    return ""


def to_float(v):
    try:
        if v is None or v == "":
            return None
        return float(str(v).strip())
    except Exception:
        return None


def parse_timestamp(ts):
    if not ts:
        return None
    try:
        return datetime.fromisoformat(str(ts).replace("Z", "+00:00"))
    except Exception:
        return None


def build_rows_and_cats(events):
    rows = []
    cat_pairs_per_event = []

    for i, event in enumerate(events):
        flat = flatten_json(event)
        ts = pick(flat, "timestamp")
        dt = parse_timestamp(ts)

        row = {
            "idx": i,
            "timestamp": ts,
            "agent_name": pick(flat, "agent.name"),
            "agent_id": pick(flat, "agent.id"),
            "manager_name": pick(flat, "manager.name"),
            "location": pick(flat, "location"),
            "decoder_name": pick(flat, "decoder.name"),
            "rule_id": pick(flat, "rule.id"),
            "rule_level": pick(flat, "rule.level"),
            "rule_description": pick(flat, "rule.description"),
            "srcip": pick(flat, "data.srcip", "srcip"),
            "dstip": pick(flat, "data.dstip", "dstip"),
            "srcport": pick(flat, "data.srcport", "srcport"),
            "dstport": pick(flat, "data.dstport", "dstport"),
            "protocol": pick(flat, "data.protocol", "protocol"),
            "full_log": pick(flat, "full_log", "message"),
        }

        full_log = row["full_log"]
        row["full_log_len"] = len(full_log)
        row["hour"] = dt.hour if dt else -1
        row["day_of_week"] = dt.weekday() if dt else -1

        cat_pairs = []

        for field in CAT_FIELDS:
            val = clean_value(flat.get(field))
            if val:
                cat_pairs.append((field, val))

        if not cat_pairs and full_log:
            cat_pairs.append(("full_log_prefix", full_log[:80]))

        rows.append(row)
        cat_pairs_per_event.append(cat_pairs)

    return rows, cat_pairs_per_event


def build_features(rows, cat_pairs_per_event):
    n = max(len(rows), 1)

    pair_counts = Counter(
        f"{k}={v}"
        for pairs in cat_pairs_per_event
        for k, v in pairs
    )

    feature_dicts = []
    rare_pairs_per_event = []

    for row, pairs in zip(rows, cat_pairs_per_event):
        feats = {}
        rare_parts = []
        rare_scores = []

        for k, v in pairs:
            pair = f"{k}={v}"
            feats[pair] = 1.0

            freq = pair_counts[pair]
            rarity = -math.log(max(freq / n, 1e-12))
            rare_scores.append(rarity)

            if freq <= 3:
                rare_parts.append(pair)

        for field in NUM_FIELDS:
            val = to_float(row.get(field) or "")
            if val is not None:
                feats[f"num:{field}"] = math.log1p(abs(val))

        if row["hour"] >= 0:
            feats["num:hour"] = row["hour"] / 23.0

        if row["day_of_week"] >= 0:
            feats["num:day_of_week"] = row["day_of_week"] / 6.0

        feats["num:full_log_len"] = math.log1p(row["full_log_len"])
        feats["num:rare_score_avg"] = float(np.mean(rare_scores)) if rare_scores else 0.0
        feats["num:rare_score_sum"] = float(np.sum(rare_scores)) if rare_scores else 0.0

        feature_dicts.append(feats)
        rare_pairs_per_event.append(", ".join(rare_parts[:5]))

    return feature_dicts, rare_pairs_per_event


def explain_reason(row, rare_fields):
    reasons = []

    level = to_float(row.get("rule_level"))

    if level is not None and level >= 10:
        reasons.append(f"rule.level tinggi ({int(level)})")

    dstport = to_float(row.get("dstport"))

    if dstport is not None:
        dstport_int = int(dstport)

        if dstport_int not in COMMON_DST_PORTS:
            reasons.append(f"dstport tidak umum ({dstport_int})")

    if row.get("full_log_len", 0) > 500:
        reasons.append("full_log panjang/tidak biasa")

    if rare_fields:
        reasons.append("kombinasi field jarang muncul")

    if not reasons:
        reasons.append("pola statistik berbeda dari mayoritas log")

    return "; ".join(reasons)


def main():
    parser = argparse.ArgumentParser(
        description="Detect outliers from Wazuh archives.json JSONL file."
    )

    parser.add_argument(
        "--input",
        required=True,
        help="Path ke archives.json"
    )

    parser.add_argument(
        "--max-lines",
        type=int,
        default=100000,
        help="Baca N baris terakhir. Pakai 0 untuk seluruh file."
    )

    parser.add_argument(
        "--contamination",
        type=float,
        default=0.01,
        help="Perkiraan rasio outlier. 0.01 berarti 1 persen."
    )

    parser.add_argument(
        "--output-dir",
        default="wazuh-outlier-results",
        help="Folder output."
    )

    parser.add_argument(
        "--hash-features",
        type=int,
        default=4096,
        help="Jumlah fitur hashed untuk ML."
    )

    parser.add_argument(
        "--top",
        type=int,
        default=30,
        help="Tampilkan top N outlier di terminal."
    )

    args = parser.parse_args()

    input_path = Path(args.input)

    if not input_path.exists():
        print(f"ERROR: File tidak ditemukan: {input_path}", file=sys.stderr)
        sys.exit(1)

    out_dir = Path(args.output_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    print(f"[1/5] Reading: {input_path}")
    lines = read_last_lines(str(input_path), args.max_lines)

    events = []
    bad_lines = 0

    print("[2/5] Parsing JSON lines...")

    for line in lines:
        line = line.strip()

        if not line:
            continue

        try:
            events.append(json.loads(line))
        except Exception:
            bad_lines += 1

    if len(events) < 50:
        print(
            f"ERROR: Hanya menemukan {len(events)} JSON event valid. Butuh data lebih banyak.",
            file=sys.stderr
        )
        print(f"Bad lines skipped: {bad_lines}", file=sys.stderr)
        sys.exit(2)

    print(f"Valid events: {len(events)} | Bad lines skipped: {bad_lines}")

    print("[3/5] Building features...")
    rows, cat_pairs = build_rows_and_cats(events)
    feature_dicts, rare_fields = build_features(rows, cat_pairs)

    print("[4/5] Training IsolationForest...")

    hasher = FeatureHasher(
        n_features=args.hash_features,
        input_type="dict",
        alternate_sign=False
    )

    X = hasher.transform(feature_dicts)

    model = IsolationForest(
        n_estimators=200,
        contamination=args.contamination,
        random_state=42,
        n_jobs=-1,
    )

    labels = model.fit_predict(X)
    scores = model.score_samples(X)

    df = pd.DataFrame(rows)

    df["model_label"] = labels
    df["model_score"] = scores
    df["outlier_score"] = -scores
    df["rare_fields"] = rare_fields

    df["reason"] = [
        explain_reason(row, rf)
        for row, rf in zip(rows, rare_fields)
    ]

    df = df.sort_values("outlier_score", ascending=False)

    outliers = df[df["model_label"] == -1].copy()

    display_cols = [
        "timestamp", "agent_name", "location", "decoder_name",
        "rule_level", "rule_id", "srcip", "dstip", "srcport", "dstport",
        "protocol", "outlier_score", "reason", "rare_fields", "full_log",
    ]

    existing_cols = [c for c in display_cols if c in df.columns]

    all_csv = out_dir / "sample_all_scored.csv"
    out_csv = out_dir / "outliers.csv"
    out_jsonl = out_dir / "outliers.jsonl"

    df[existing_cols].to_csv(all_csv, index=False)
    outliers[existing_cols].to_csv(out_csv, index=False)

    with open(out_jsonl, "w", encoding="utf-8") as f:
        for _, row in outliers.iterrows():
            event = events[int(row["idx"])]

            event["_ml_outlier"] = {
                "outlier_score": float(row["outlier_score"]),
                "model_score": float(row["model_score"]),
                "reason": row["reason"],
                "rare_fields": row["rare_fields"],
            }

            f.write(json.dumps(event, ensure_ascii=False) + "\n")

    print("[5/5] Done.")
    print(f"All scored events : {all_csv}")
    print(f"Outliers CSV      : {out_csv}")
    print(f"Outliers JSONL    : {out_jsonl}")
    print(f"Outliers found    : {len(outliers)} from {len(df)} events")

    print("\nTop outliers:")

    preview_cols = [
        "timestamp", "agent_name", "location", "decoder_name",
        "rule_level", "srcip", "dstip", "dstport",
        "outlier_score", "reason",
    ]

    preview_cols = [c for c in preview_cols if c in outliers.columns]

    print(
        outliers[preview_cols]
        .head(args.top)
        .to_string(index=False)
    )


if __name__ == "__main__":
    main()

Simpan:

Ctrl+O
Enter
Ctrl+X

4. Jalankan langsung ke file Wazuh

sudo apt install -y acl

sudo setfacl -R -m u:onno:rx /opt/wazuh-data
sudo setfacl -R -m u:onno:rx /opt/wazuh-data/logs
sudo setfacl -R -m u:onno:rx /opt/wazuh-data/logs/archives
sudo setfacl -m u:onno:r /opt/wazuh-data/logs/archives/archives.json
python3 wazuh_archive_outlier.py \
  --input /opt/wazuh-data/logs/archives/archives.json \
  --max-lines 100000 \
  --contamination 0.01 \
  --output-dir hasil-outlier

Artinya:

--max-lines 100000

membaca 100.000 baris terakhir saja, supaya aman untuk file besar.

--contamination 0.01

berarti model menganggap kira-kira 1% event paling aneh sebagai outlier.

Kalau mau lebih sensitif:

python3 wazuh_archive_outlier.py \
  --input /opt/wazuh-data/logs/archives/archives.json \
  --max-lines 100000 \
  --contamination 0.03 \
  --output-dir hasil-outlier

Artinya outlier sekitar 3%.

5. Kalau permission denied

Karena file Wazuh biasanya milik root, pakai salah satu cara ini.

Cara aman: ambil copy 100.000 baris terakhir dulu.

sudo tail -n 100000 /opt/wazuh-data/logs/archives/archives.json > archives_sample.json
sudo chown $USER:$USER archives_sample.json

Lalu analisis file copy:

python3 wazuh_archive_outlier.py \
  --input archives_sample.json \
  --max-lines 0 \
  --contamination 0.01 \
  --output-dir hasil-outlier

6. Lihat hasil

ls -lh hasil-outlier

Output penting:

hasil-outlier/outliers.csv
hasil-outlier/outliers.jsonl
hasil-outlier/sample_all_scored.csv

Buka 20 outlier teratas:

head -n 20 hasil-outlier/outliers.csv

Atau lebih enak:

column -s, -t < hasil-outlier/outliers.csv | less -S

7. Arti hasil

Kolom penting:

outlier_score

Semakin besar nilainya, semakin aneh event tersebut.

reason

Alasan kasar kenapa event dianggap outlier.

Contoh alasan:

rule.level tinggi
dstport tidak umum
full_log panjang/tidak biasa
kombinasi field jarang muncul
pola statistik berbeda dari mayoritas log
rare_fields


Field yang jarang muncul di dataset, misalnya agent tertentu, decoder tertentu, source IP tertentu, port tertentu, atau kombinasi log yang tidak biasa.

8. Kalau Wazuh Docker dan file ada di dalam container

Cek nama container:

sudo docker ps --format "table Template:.Names\tTemplate:.Image\tTemplate:.Status" | grep wazuh

Ambil 100.000 baris terakhir dari container manager:

sudo docker exec single-node-wazuh.manager-1 \
  tail -n 100000 /var/ossec/logs/archives/archives.json \
  > archives_sample.json

Lalu jalankan:

python3 wazuh_archive_outlier.py \
  --input archives_sample.json \
  --max-lines 0 \
  --contamination 0.01 \
  --output-dir hasil-outlier

== Catatan penting

Script ini belum menentukan “serangan” secara pasti. Ia hanya menjawab:

> “Event mana yang paling berbeda dari pola mayoritas log?”

Jadi hasil `outliers.csv` tetap perlu dibaca SOC analyst. Biasanya yang menarik adalah event dengan kombinasi:

rule.level tinggi
source IP jarang
destination port aneh
decoder tidak biasa
agent tertentu tiba-tiba berbeda
full_log sangat panjang
event muncul pada jam tidak biasa

Untuk lab Wazuh, ini sudah cukup bagus sebagai tahap awal sebelum nanti ditambah MITRE ATT&CK mapping, risk score, dan LLM summary.