Difference between revisions of "KI: PRAKTEK 10: AI untuk Deteksi Anomali"

From OnnoWiki
Jump to navigation Jump to search
(Created page with " PRAKTEK 10: AI untuk Deteksi Anomali Fokus sesi ini: kamu bikin “AI security” sederhana yang bisa belajar pola normal dari log, lalu menandai yang aneh (anomaly). Ini buk...")
 
Line 1: Line 1:
 +
Fokus sesi ini: kamu bikin “AI security” sederhana yang bisa belajar pola normal dari log, lalu menandai yang aneh (anomaly). Ini bukan “AI yang tahu segalanya”, tapi alat bantu triage biar analis tidak tenggelam dalam jutaan baris log.
  
PRAKTEK 10: AI untuk Deteksi Anomali
+
==Tujuan==
Fokus sesi ini: kamu bikin “AI security” sederhana yang bisa belajar pola normal dari log, lalu menandai yang aneh (anomaly). Ini bukan “AI yang tahu segalanya”, tapi alat bantu triage biar analis tidak tenggelam dalam jutaan baris log.
 
Tujuan
 
 
Mahasiswa mampu:
 
Mahasiswa mampu:
membangun pipeline deteksi anomali dari log nyata (Linux / web / auth),
+
* membangun pipeline deteksi anomali dari log nyata (Linux / web / auth),
melatih model unsupervised (tanpa label),
+
* melatih model unsupervised (tanpa label),
menghasilkan daftar event mencurigakan + alasan/fitur ringkas,
+
* menghasilkan daftar event mencurigakan + alasan/fitur ringkas,
menyimpan model dan menjalankan deteksi ulang secara berkala.
+
* menyimpan model dan menjalankan deteksi ulang secara berkala.
 +
 
 
Output akhir yang ditargetkan:
 
Output akhir yang ditargetkan:
Model tersimpan (.joblib)
+
* Model tersimpan (.joblib)
Laporan evaluasi sederhana (rasio anomali, contoh top N anomali)
+
* Laporan evaluasi sederhana (rasio anomali, contoh top N anomali)
File hasil deteksi (CSV/JSON)
+
* File hasil deteksi (CSV/JSON)
(Opsional) hasil/model dienkripsi dengan GnuPG
+
* (Opsional) hasil/model dienkripsi dengan GnuPG
Konsep Inti
+
 
 +
==Konsep Inti==
 
Deteksi anomali = mencari data yang “jarang”, “jauh dari pola normal”, atau “punya kombinasi fitur yang aneh”.
 
Deteksi anomali = mencari data yang “jarang”, “jauh dari pola normal”, atau “punya kombinasi fitur yang aneh”.
 +
 
Kamu akan pakai dua pendekatan:
 
Kamu akan pakai dua pendekatan:
Isolation Forest (tree-based): bagus untuk anomaly detection umum, sering jadi baseline kuat.
+
* Isolation Forest (tree-based): bagus untuk anomaly detection umum, sering jadi baseline kuat.
KMeans + jarak ke centroid: sederhana, cepat, mudah dijelaskan (jarak besar = makin aneh).
+
* KMeans + jarak ke centroid: sederhana, cepat, mudah dijelaskan (jarak besar = makin aneh).
 +
 
 
Catatan penting: Model unsupervised akan menandai “aneh”, bukan otomatis “jahat”. Anomali ≠ serangan, tapi anomali yang harus kamu cek dulu.
 
Catatan penting: Model unsupervised akan menandai “aneh”, bukan otomatis “jahat”. Anomali ≠ serangan, tapi anomali yang harus kamu cek dulu.
Tools (Open Source)
+
 
OS: Ubuntu 24.04
+
==Tools (Open Source)==
Python 3 + venv
+
* OS: Ubuntu 24.04
Library: pandas, numpy, scikit-learn, joblib
+
* Python 3 + venv
(Opsional) matplotlib untuk grafik ringan
+
* Library: pandas, numpy, scikit-learn, joblib
(Opsional) GnuPG untuk enkripsi file output/model
+
* (Opsional) matplotlib untuk grafik ringan
Skenario Data Log yang Real (Pilih salah satu atau gabungkan)
+
* (Opsional) GnuPG untuk enkripsi file output/model
 +
 
 +
==Skenario Data Log yang Real (Pilih salah satu atau gabungkan)==
 
Kamu bisa latihan pakai:
 
Kamu bisa latihan pakai:
Linux auth log: /var/log/auth.log
+
* Linux auth log: /var/log/auth.log
 
Cocok untuk mendeteksi percobaan login gagal masif, lonjakan aktivitas sudo, jam akses aneh.
 
Cocok untuk mendeteksi percobaan login gagal masif, lonjakan aktivitas sudo, jam akses aneh.
Nginx access log (lab): misalnya file access.log dari web server
+
* Nginx access log (lab): misalnya file access.log dari web server
 
Cocok untuk mendeteksi lonjakan request, path aneh, user-agent janggal, pola scanning.
 
Cocok untuk mendeteksi lonjakan request, path aneh, user-agent janggal, pola scanning.
Suricata eve.json (kalau sudah main IDS): event security lebih kaya.
+
* Suricata eve.json (kalau sudah main IDS): event security lebih kaya.
 +
 
 
Di modul ini kita buat pipeline yang paling mudah jalan di semua laptop/server: mulai dari auth.log + opsi format log sederhana.
 
Di modul ini kita buat pipeline yang paling mudah jalan di semua laptop/server: mulai dari auth.log + opsi format log sederhana.
Tahap Praktikum (Step-by-step)
+
 
0. Setup Environment di Ubuntu 24.04
+
==Tahap Praktikum (Step-by-step)==
 +
 
 +
===0. Setup Environment di Ubuntu 24.04===
 
Jalankan:
 
Jalankan:
sudo apt update
+
sudo apt update
sudo apt install -y python3-venv python3-pip gnupg
+
sudo apt install -y python3-venv python3-pip gnupg
mkdir -p ~/modul10-ai-anomali/{data,models,output,scripts}
+
mkdir -p ~/modul10-ai-anomali/{data,models,output,scripts}
cd ~/modul10-ai-anomali
+
cd ~/modul10-ai-anomali
python3 -m venv .venv
+
python3 -m venv .venv
source .venv/bin/activate
+
source .venv/bin/activate
pip install -U pip
+
pip install -U pip
pip install pandas numpy scikit-learn joblib
+
pip install pandas numpy scikit-learn joblib
  
 
Checklist: pastikan python --version mengarah ke venv dan pip show scikit-learn ada.
 
Checklist: pastikan python --version mengarah ke venv dan pip show scikit-learn ada.
  
1. Ambil Dataset Log
+
===1. Ambil Dataset Log===
Opsi A — pakai log asli mesin (paling real)
+
 
 +
====Opsi A — pakai log asli mesin (paling real)====
 
Copy auth log:
 
Copy auth log:
sudo cp /var/log/auth.log ~/modul10-ai-anomali/data/auth.log
+
sudo cp /var/log/auth.log ~/modul10-ai-anomali/data/auth.log
sudo chown $USER:$USER ~/modul10-ai-anomali/data/auth.log
+
sudo chown $USER:$USER ~/modul10-ai-anomali/data/auth.log
Opsi B — bikin dataset latihan (biar kontrol)
+
 
 +
====Opsi B — bikin dataset latihan (biar kontrol)====
 
Kita akan generate log sintetik “mirip event” (normal + aneh) dari Python (nanti ada script).
 
Kita akan generate log sintetik “mirip event” (normal + aneh) dari Python (nanti ada script).
2. Prinsip Feature Engineering (Supaya “AI” mengerti)
+
 
 +
===2. Prinsip Feature Engineering (Supaya “AI” mengerti)===
 
Log itu teks; model butuh angka. Maka kita ubah event jadi fitur numerik, contoh:
 
Log itu teks; model butuh angka. Maka kita ubah event jadi fitur numerik, contoh:
 +
 
Untuk auth event:
 
Untuk auth event:
hour (jam kejadian)
+
* hour (jam kejadian)
fail_count_5m (jumlah gagal login dalam 5 menit per IP/user)
+
* fail_count_5m (jumlah gagal login dalam 5 menit per IP/user)
distinct_users_10m
+
* distinct_users_10m
is_sudo (0/1)
+
* is_sudo (0/1)
is_failed_password (0/1)
+
* is_failed_password (0/1)
src_ip_hash (hash → angka stabil; bukan identitas asli)
+
* src_ip_hash (hash → angka stabil; bukan identitas asli)
msg_len (panjang pesan)
+
* msg_len (panjang pesan)
 +
 
 
Yang penting: fitur harus menggambarkan perilaku (burst, jam tidak wajar, variasi user, dsb), bukan sekadar teks mentah.
 
Yang penting: fitur harus menggambarkan perilaku (burst, jam tidak wajar, variasi user, dsb), bukan sekadar teks mentah.
Implementasi: Pipeline Lengkap (Python)
 
Di bawah ini 3 file utama:
 
parser & feature builder
 
training model
 
deteksi + export hasil
 
  
A. scripts/parse_authlog.py — parse auth.log → dataset fitur
+
==Implementasi: Pipeline Lengkap (Python)==
Buat file:
 
nano ~/modul10-ai-anomali/scripts/parse_authlog.py
 
Isi:
 
  
#!/usr/bin/env python3
+
Di bawah ini 3 file utama:
import re
+
* parser & feature builder
import json
+
* training model
from datetime import datetime, timedelta
+
* deteksi + export hasil
from collections import deque, defaultdict
 
  
# Auth log default Ubuntu: "Jan 20 13:01:02 hostname sshd[123]: Failed password for ..."
+
===A. scripts/parse_authlog.py — parse auth.log → dataset fitur===
# Catatan: tahun tidak ada, kita isi dengan tahun sekarang (cukup untuk lab).
 
AUTH_RE = re.compile(
 
    r'^(?P<mon>\w{3})\s+(?P<day>\d{1,2})\s+(?P<time>\d{2}:\d{2}:\d{2})\s+'
 
    r'(?P<host>\S+)\s+(?P<proc>[\w\-\/]+)(?:\[\d+\])?:\s+(?P<msg>.*)$'
 
)
 
  
IP_RE = re.compile(r'(\d{1,3}\.){3}\d{1,3}')
+
Buat file:
USER_RE = re.compile(r'for (invalid user )?(?P<user>[a-zA-Z0-9_\-\.]+)')
+
nano ~/modul10-ai-anomali/scripts/parse_authlog.py
  
MONTHS = {
+
Isi:
    "Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6,
 
    "Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12
 
}
 
  
def stable_hash_to_int(s: str, mod: int = 1000003) -> int:
+
#!/usr/bin/env python3
    # hash stabil sederhana (bukan cryptographic, cukup untuk fitur)
+
import re
    h = 2166136261
+
import json
    for ch in s.encode("utf-8", errors="ignore"):
+
from datetime import datetime, timedelta
        h ^= ch
+
from collections import deque, defaultdict
        h = (h * 16777619) & 0xFFFFFFFF
+
    return int(h % mod)
+
# Auth log default Ubuntu: "Jan 20 13:01:02 hostname sshd[123]: Failed password for ..."
 +
# Catatan: tahun tidak ada, kita isi dengan tahun sekarang (cukup untuk lab).
 +
AUTH_RE = re.compile(
 +
    r'^(?P<mon>\w{3})\s+(?P<day>\d{1,2})\s+(?P<time>\d{2}:\d{2}:\d{2})\s+'
 +
    r'(?P<host>\S+)\s+(?P<proc>[\w\-\/]+)(?:\[\d+\])?:\s+(?P<msg>.*)$'
 +
)
 +
 +
IP_RE = re.compile(r'(\d{1,3}\.){3}\d{1,3}')
 +
USER_RE = re.compile(r'for (invalid user )?(?P<user>[a-zA-Z0-9_\-\.]+)')
 +
 +
MONTHS = {
 +
    "Jan": 1, "Feb": 2, "Mar": 3, "Apr": 4, "May": 5, "Jun": 6,
 +
    "Jul": 7, "Aug": 8, "Sep": 9, "Oct": 10, "Nov": 11, "Dec": 12
 +
}
 +
 +
def stable_hash_to_int(s: str, mod: int = 1000003) -> int:
 +
    # hash stabil sederhana (bukan cryptographic, cukup untuk fitur)
 +
    h = 2166136261
 +
    for ch in s.encode("utf-8", errors="ignore"):
 +
        h ^= ch
 +
        h = (h * 16777619) & 0xFFFFFFFF
 +
    return int(h % mod)
 +
 +
def parse_ts(line: str, year: int) -> datetime | None:
 +
    m = AUTH_RE.match(line)
 +
    if not m:
 +
        return None
 +
    mon = MONTHS.get(m.group("mon"))
 +
    day = int(m.group("day"))
 +
    t = m.group("time")
 +
    hh, mm, ss = map(int, t.split(":"))
 +
    return datetime(year, mon, day, hh, mm, ss)
 +
 +
def extract_ip(msg: str) -> str:
 +
    m = IP_RE.search(msg)
 +
    return m.group(0) if m else "0.0.0.0"
 +
 +
def extract_user(msg: str) -> str:
 +
    m = USER_RE.search(msg)
 +
    return m.group("user") if m else "unknown"
 +
 +
def build_features(lines: list[str], year: int):
 +
    """
 +
    Membuat event-level features + rolling window counts (burst behavior)
 +
    """
 +
    events = []
 +
    # rolling windows untuk hitung burst
 +
    window_5m_by_ip = defaultdict(deque)    # ip -> timestamps
 +
    window_10m_by_user = defaultdict(deque)  # user -> timestamps
 +
 +
    for line in lines:
 +
        m = AUTH_RE.match(line)
 +
        if not m:
 +
            continue
 +
 +
        ts = parse_ts(line, year)
 +
        if not ts:
 +
            continue
 +
 +
        msg = m.group("msg")
 +
        proc = m.group("proc")
 +
        ip = extract_ip(msg)
 +
        user = extract_user(msg)
 +
 +
        is_failed = 1 if "Failed password" in msg else 0
 +
        is_invalid_user = 1 if "invalid user" in msg else 0
 +
        is_accepted = 1 if "Accepted password" in msg or "Accepted publickey" in msg else 0
 +
        is_sudo = 1 if proc.startswith("sudo") or "sudo:" in msg else 0
 +
        msg_len = len(msg)
 +
 +
        # update rolling window 5m per IP (untuk failed count)
 +
        dq = window_5m_by_ip[ip]
 +
        dq.append(ts)
 +
        while dq and (ts - dq[0]) > timedelta(minutes=5):
 +
            dq.popleft()
 +
        count_5m_ip = len(dq)
 +
 +
        # update rolling window 10m per user
 +
        du = window_10m_by_user[user]
 +
        du.append(ts)
 +
        while du and (ts - du[0]) > timedelta(minutes=10):
 +
            du.popleft()
 +
        count_10m_user = len(du)
 +
 +
        event = {
 +
            "ts": ts.isoformat(),
 +
            "hour": ts.hour,
 +
            "minute": ts.minute,
 +
            "proc": proc,
 +
            "ip": ip,
 +
            "user": user,
 +
            "ip_hash": stable_hash_to_int(ip),
 +
            "user_hash": stable_hash_to_int(user),
 +
            "is_failed": is_failed,
 +
            "is_invalid_user": is_invalid_user,
 +
            "is_accepted": is_accepted,
 +
            "is_sudo": is_sudo,
 +
            "msg_len": msg_len,
 +
            "count_5m_ip": count_5m_ip,
 +
            "count_10m_user": count_10m_user,
 +
        }
 +
        events.append(event)
 +
 +
    return events
 +
 +
def main():
 +
    import argparse
 +
    parser = argparse.ArgumentParser()
 +
    parser.add_argument("--infile", required=True, help="path ke auth.log")
 +
    parser.add_argument("--outfile", required=True, help="output JSONL features")
 +
    args = parser.parse_args()
 +
 +
    year = datetime.now().year
 +
 +
    with open(args.infile, "r", encoding="utf-8", errors="ignore") as f:
 +
        lines = f.readlines()
 +
 +
    events = build_features(lines, year)
 +
 +
    with open(args.outfile, "w", encoding="utf-8") as out:
 +
        for ev in events:
 +
            out.write(json.dumps(ev) + "\n")
 +
 +
    print(f"[OK] Parsed {len(events)} events -> {args.outfile}")
 +
 +
if __name__ == "__main__":
 +
    main()
  
def parse_ts(line: str, year: int) -> datetime | None:
+
Jalankan:
    m = AUTH_RE.match(line)
 
    if not m:
 
        return None
 
    mon = MONTHS.get(m.group("mon"))
 
    day = int(m.group("day"))
 
    t = m.group("time")
 
    hh, mm, ss = map(int, t.split(":"))
 
    return datetime(year, mon, day, hh, mm, ss)
 
  
def extract_ip(msg: str) -> str:
+
chmod +x scripts/parse_authlog.py
    m = IP_RE.search(msg)
+
./scripts/parse_authlog.py --infile data/auth.log --outfile data/auth_features.jsonl
    return m.group(0) if m else "0.0.0.0"
+
head -n 3 data/auth_features.jsonl
  
def extract_user(msg: str) -> str:
+
===B. scripts/train_models.py — latih Isolation Forest + KMeans===
    m = USER_RE.search(msg)
 
    return m.group("user") if m else "unknown"
 
  
def build_features(lines: list[str], year: int):
+
Buat file:
    """
+
  nano ~/modul10-ai-anomali/scripts/train_models.py
    Membuat event-level features + rolling window counts (burst behavior)
 
    """
 
    events = []
 
    # rolling windows untuk hitung burst
 
    window_5m_by_ip = defaultdict(deque)    # ip -> timestamps
 
    window_10m_by_user = defaultdict(deque) # user -> timestamps
 
 
 
    for line in lines:
 
        m = AUTH_RE.match(line)
 
        if not m:
 
            continue
 
 
 
        ts = parse_ts(line, year)
 
        if not ts:
 
            continue
 
 
 
        msg = m.group("msg")
 
        proc = m.group("proc")
 
        ip = extract_ip(msg)
 
        user = extract_user(msg)
 
 
 
        is_failed = 1 if "Failed password" in msg else 0
 
        is_invalid_user = 1 if "invalid user" in msg else 0
 
        is_accepted = 1 if "Accepted password" in msg or "Accepted publickey" in msg else 0
 
        is_sudo = 1 if proc.startswith("sudo") or "sudo:" in msg else 0
 
        msg_len = len(msg)
 
 
 
        # update rolling window 5m per IP (untuk failed count)
 
        dq = window_5m_by_ip[ip]
 
        dq.append(ts)
 
        while dq and (ts - dq[0]) > timedelta(minutes=5):
 
            dq.popleft()
 
        count_5m_ip = len(dq)
 
 
 
        # update rolling window 10m per user
 
        du = window_10m_by_user[user]
 
        du.append(ts)
 
        while du and (ts - du[0]) > timedelta(minutes=10):
 
            du.popleft()
 
        count_10m_user = len(du)
 
 
 
        event = {
 
            "ts": ts.isoformat(),
 
            "hour": ts.hour,
 
            "minute": ts.minute,
 
            "proc": proc,
 
            "ip": ip,
 
            "user": user,
 
            "ip_hash": stable_hash_to_int(ip),
 
            "user_hash": stable_hash_to_int(user),
 
            "is_failed": is_failed,
 
            "is_invalid_user": is_invalid_user,
 
            "is_accepted": is_accepted,
 
            "is_sudo": is_sudo,
 
            "msg_len": msg_len,
 
            "count_5m_ip": count_5m_ip,
 
            "count_10m_user": count_10m_user,
 
        }
 
        events.append(event)
 
 
 
    return events
 
 
 
def main():
 
    import argparse
 
    parser = argparse.ArgumentParser()
 
    parser.add_argument("--infile", required=True, help="path ke auth.log")
 
    parser.add_argument("--outfile", required=True, help="output JSONL features")
 
    args = parser.parse_args()
 
 
 
    year = datetime.now().year
 
 
 
    with open(args.infile, "r", encoding="utf-8", errors="ignore") as f:
 
        lines = f.readlines()
 
 
 
    events = build_features(lines, year)
 
  
    with open(args.outfile, "w", encoding="utf-8") as out:
 
        for ev in events:
 
            out.write(json.dumps(ev) + "\n")
 
 
    print(f"[OK] Parsed {len(events)} events -> {args.outfile}")
 
 
if __name__ == "__main__":
 
    main()
 
 
Jalankan:
 
 
chmod +x scripts/parse_authlog.py
 
./scripts/parse_authlog.py --infile data/auth.log --outfile data/auth_features.jsonl
 
head -n 3 data/auth_features.jsonl
 
B. scripts/train_models.py — latih Isolation Forest + KMeans
 
Buat file:
 
nano ~/modul10-ai-anomali/scripts/train_models.py
 
 
Isi:
 
Isi:
#!/usr/bin/env python3
+
#!/usr/bin/env python3
import json
+
import json
import joblib
+
import joblib
import numpy as np
+
import numpy as np
from pathlib import Path
+
from pathlib import Path
from sklearn.ensemble import IsolationForest
+
from sklearn.ensemble import IsolationForest
from sklearn.cluster import KMeans
+
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
+
from sklearn.preprocessing import StandardScaler
 
+
FEATURE_COLS = [
+
FEATURE_COLS = [
    "hour", "minute",
+
    "hour", "minute",
    "ip_hash", "user_hash",
+
    "ip_hash", "user_hash",
    "is_failed", "is_invalid_user", "is_accepted", "is_sudo",
+
    "is_failed", "is_invalid_user", "is_accepted", "is_sudo",
    "msg_len",
+
    "msg_len",
    "count_5m_ip", "count_10m_user"
+
    "count_5m_ip", "count_10m_user"
]
+
]
 
+
def load_jsonl(path: str):
+
def load_jsonl(path: str):
    rows = []
+
    rows = []
    with open(path, "r", encoding="utf-8") as f:
+
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
+
        for line in f:
            rows.append(json.loads(line))
+
            rows.append(json.loads(line))
    return rows
+
    return rows
 
+
def to_matrix(rows):
+
def to_matrix(rows):
    X = []
+
    X = []
    for r in rows:
+
    for r in rows:
        X.append([float(r.get(c, 0.0)) for c in FEATURE_COLS])
+
        X.append([float(r.get(c, 0.0)) for c in FEATURE_COLS])
    return np.array(X, dtype=float)
+
    return np.array(X, dtype=float)
 
+
def main():
+
def main():
    import argparse
+
    import argparse
    p = argparse.ArgumentParser()
+
    p = argparse.ArgumentParser()
    p.add_argument("--infile", required=True, help="JSONL features")
+
    p.add_argument("--infile", required=True, help="JSONL features")
    p.add_argument("--outdir", required=True, help="folder simpan model")
+
    p.add_argument("--outdir", required=True, help="folder simpan model")
    p.add_argument("--contamination", type=float, default=0.02,
+
    p.add_argument("--contamination", type=float, default=0.02,
 
                   help="perkiraan rasio anomali (mis. 0.01-0.05)")
 
                   help="perkiraan rasio anomali (mis. 0.01-0.05)")
    p.add_argument("--k", type=int, default=8, help="jumlah cluster KMeans")
+
    p.add_argument("--k", type=int, default=8, help="jumlah cluster KMeans")
    args = p.parse_args()
+
    args = p.parse_args()
 +
 +
    outdir = Path(args.outdir)
 +
    outdir.mkdir(parents=True, exist_ok=True)
 +
 +
    rows = load_jsonl(args.infile)
 +
    X = to_matrix(rows)
 +
 +
    scaler = StandardScaler()
 +
    Xs = scaler.fit_transform(X)
 +
 +
    iso = IsolationForest(
 +
        n_estimators=300,
 +
        contamination=args.contamination,
 +
        random_state=42,
 +
        n_jobs=-1
 +
    )
 +
    iso.fit(Xs)
 +
 +
    km = KMeans(n_clusters=args.k, random_state=42, n_init="auto")
 +
    km.fit(Xs)
 +
 +
    bundle = {
 +
        "feature_cols": FEATURE_COLS,
 +
        "scaler": scaler,
 +
        "isolation_forest": iso,
 +
        "kmeans": km
 +
    }
 +
    model_path = outdir / "anomali_models.joblib"
 +
    joblib.dump(bundle, model_path)
 +
 +
    # ringkasan cepat
 +
    iso_pred = iso.predict(Xs)  # -1 anomali, 1 normal
 +
    anom_ratio = float(np.mean(iso_pred == -1))
 +
 +
    print("[OK] Model saved:", model_path)
 +
    print(f"[INFO] Events: {len(rows)} | Estimated anomaly ratio (IF): {anom_ratio:.4f}")
 +
 +
if __name__ == "__main__":
 +
    main()
  
    outdir = Path(args.outdir)
+
Jalankan:
    outdir.mkdir(parents=True, exist_ok=True)
+
chmod +x scripts/train_models.py
 +
./scripts/train_models.py --infile data/auth_features.jsonl --outdir models --contamination 0.02 --k 8
 +
ls -lah models
  
    rows = load_jsonl(args.infile)
+
Tips tuning seru: mainkan --contamination (mis. 0.01, 0.03, 0.05). Lihat bagaimana jumlah anomali berubah.
    X = to_matrix(rows)
 
  
    scaler = StandardScaler()
+
===C. scripts/detect_anomalies.py — scoring, ranking, export===
    Xs = scaler.fit_transform(X)
 
  
    iso = IsolationForest(
+
Buat file:
        n_estimators=300,
+
nano ~/modul10-ai-anomali/scripts/detect_anomalies.py
        contamination=args.contamination,
 
        random_state=42,
 
        n_jobs=-1
 
    )
 
    iso.fit(Xs)
 
  
    km = KMeans(n_clusters=args.k, random_state=42, n_init="auto")
+
Isi:
    km.fit(Xs)
+
#!/usr/bin/env python3
 
+
import json
    bundle = {
+
import csv
        "feature_cols": FEATURE_COLS,
+
import joblib
        "scaler": scaler,
+
import numpy as np
        "isolation_forest": iso,
+
from pathlib import Path
        "kmeans": km
+
    }
+
def load_jsonl(path: str):
    model_path = outdir / "anomali_models.joblib"
+
    rows = []
    joblib.dump(bundle, model_path)
+
    with open(path, "r", encoding="utf-8") as f:
 +
        for line in f:
 +
            rows.append(json.loads(line))
 +
    return rows
 +
 +
def main():
 +
    import argparse
 +
    p = argparse.ArgumentParser()
 +
    p.add_argument("--features", required=True, help="JSONL features")
 +
    p.add_argument("--model", required=True, help="joblib model bundle")
 +
    p.add_argument("--outcsv", required=True, help="output CSV anomali")
 +
    p.add_argument("--top", type=int, default=50, help="ambil top N paling anomali")
 +
    args = p.parse_args()
 +
 +
    bundle = joblib.load(args.model)
 +
    cols = bundle["feature_cols"]
 +
    scaler = bundle["scaler"]
 +
    iso = bundle["isolation_forest"]
 +
    km = bundle["kmeans"]
 +
 +
    rows = load_jsonl(args.features)
 +
    X = np.array([[float(r.get(c, 0.0)) for c in cols] for r in rows], dtype=float)
 +
    Xs = scaler.transform(X)
 +
 +
    # IsolationForest: decision_function makin kecil -> makin anomali
 +
    iso_score = iso.decision_function(Xs)  # higher = more normal
 +
    iso_label = iso.predict(Xs)            # -1 anomali
 +
 +
    # KMeans: jarak ke centroid terdekat (makin jauh -> makin anomali)
 +
    centers = km.cluster_centers_
 +
    # hitung jarak L2 ke centroid terdekat
 +
    dists = np.sqrt(((Xs[:, None, :] - centers[None, :, :]) ** 2).sum(axis=2))
 +
    km_dist = dists.min(axis=1)
 +
 +
    # gabung score sederhana: rank berdasarkan 2 sinyal
 +
    # normalisasi kasar
 +
    iso_norm = (iso_score - iso_score.min()) / (iso_score.max() - iso_score.min() + 1e-9)
 +
    km_norm = (km_dist - km_dist.min()) / (km_dist.max() - km_dist.min() + 1e-9)
  
    # ringkasan cepat
+
    # semakin kecil iso_norm = semakin anomali; semakin besar km_norm = semakin anomali
    iso_pred = iso.predict(Xs)  # -1 anomali, 1 normal
+
    combined = (1.0 - iso_norm) * 0.6 + (km_norm * 0.4)
    anom_ratio = float(np.mean(iso_pred == -1))
+
   
 +
    # pilih top N
 +
    idx = np.argsort(-combined)[:args.top]
 +
 +
    outpath = Path(args.outcsv)
 +
    outpath.parent.mkdir(parents=True, exist_ok=True)
 +
 +
    fieldnames = [
 +
        "rank", "combined_score",
 +
        "iso_label", "iso_score",
 +
        "km_dist",
 +
        "ts", "hour", "minute", "proc", "user", "ip",
 +
        "is_failed", "is_invalid_user", "is_accepted", "is_sudo",
 +
        "count_5m_ip", "count_10m_user", "msg_len"
 +
    ]
 +
 +
    with open(outpath, "w", newline="", encoding="utf-8") as f:
 +
        w = csv.DictWriter(f, fieldnames=fieldnames)
 +
        w.writeheader()
 +
        for rnk, i in enumerate(idx, start=1):
 +
            r = rows[int(i)]
 +
            w.writerow({
 +
                "rank": rnk,
 +
                "combined_score": float(combined[i]),
 +
                "iso_label": int(iso_label[i]),
 +
                "iso_score": float(iso_score[i]),
 +
                "km_dist": float(km_dist[i]),
 +
                "ts": r.get("ts", ""),
 +
                "hour": r.get("hour", 0),
 +
                "minute": r.get("minute", 0),
 +
                "proc": r.get("proc", ""),
 +
                "user": r.get("user", ""),
 +
                "ip": r.get("ip", ""),
 +
                "is_failed": r.get("is_failed", 0),
 +
                "is_invalid_user": r.get("is_invalid_user", 0),
 +
                "is_accepted": r.get("is_accepted", 0),
 +
                "is_sudo": r.get("is_sudo", 0),
 +
                "count_5m_ip": r.get("count_5m_ip", 0),
 +
                "count_10m_user": r.get("count_10m_user", 0),
 +
                "msg_len": r.get("msg_len", 0),
 +
            })
  
    print("[OK] Model saved:", model_path)
+
    anom_count = int(np.sum(iso_label == -1))
    print(f"[INFO] Events: {len(rows)} | Estimated anomaly ratio (IF): {anom_ratio:.4f}")
+
    print(f"[OK] Wrote top-{args.top} anomalies -> {outpath}")
 
+
    print(f"[INFO] Total events: {len(rows)} | IF anomalies flagged: {anom_count}")
if __name__ == "__main__":
+
    main()
+
if __name__ == "__main__":
 +
    main()
  
 
Jalankan:
 
Jalankan:
chmod +x scripts/train_models.py
+
chmod +x scripts/detect_anomalies.py
./scripts/train_models.py --infile data/auth_features.jsonl --outdir models --contamination 0.02 --k 8
+
./scripts/detect_anomalies.py --features data/auth_features.jsonl --model models/anomali_models.joblib --outcsv output/anomali_top.csv --top 50
ls -lah models
+
column -s, -t output/anomali_top.csv | head -n 20
Tips tuning seru: mainkan --contamination (mis. 0.01, 0.03, 0.05). Lihat bagaimana jumlah anomali berubah.
 
C. scripts/detect_anomalies.py — scoring, ranking, export
 
Buat file:
 
nano ~/modul10-ai-anomali/scripts/detect_anomalies.py
 
Isi:
 
#!/usr/bin/env python3
 
import json
 
import csv
 
import joblib
 
import numpy as np
 
from pathlib import Path
 
  
def load_jsonl(path: str):
+
==Cara Membaca Hasil==
    rows = []
+
Di output/anomali_top.csv, fokus ke:
    with open(path, "r", encoding="utf-8") as f:
+
* count_5m_ip tinggi + is_failed=1 → indikasi brute force
        for line in f:
+
* hour sangat dini (mis. 02:00) + is_sudo=1 → aktivitas admin jam aneh
            rows.append(json.loads(line))
+
* user=unknown / invalid_user=1 berulang → scanning user
    return rows
+
* proc/sshd dominan → serangan ke SSH (umum banget di server publik)
  
def main():
+
Tugas mini yang menantang:
    import argparse
+
* Ambil 10 anomali teratas, lalu tulis analisis 1–2 kalimat per event:
    p = argparse.ArgumentParser()
+
** “Kenapa ini anomali?”
    p.add_argument("--features", required=True, help="JSONL features")
+
** “Apa tindakan lanjut?” (block IP? cek user? cek sistem?)
    p.add_argument("--model", required=True, help="joblib model bundle")
 
    p.add_argument("--outcsv", required=True, help="output CSV anomali")
 
    p.add_argument("--top", type=int, default=50, help="ambil top N paling anomali")
 
    args = p.parse_args()
 
  
    bundle = joblib.load(args.model)
+
==Simulasi Serangan Ringan (Aman untuk Lab)==
    cols = bundle["feature_cols"]
+
Kalau kamu punya VM/host lab sendiri, bisa memicu event gagal login (tanpa merusak):
    scaler = bundle["scaler"]
+
* Coba login SSH dengan user salah beberapa kali dari client lab.
    iso = bundle["isolation_forest"]
+
* Atau buat event sudo beberapa kali.
    km = bundle["kmeans"]
+
Penting: lakukan hanya di lingkungan yang kamu miliki/diizinkan.
  
    rows = load_jsonl(args.features)
+
==(Opsional) Amankan Model & Output dengan GnuPG==
    X = np.array([[float(r.get(c, 0.0)) for c in cols] for r in rows], dtype=float)
+
Tujuannya: hasil deteksi bisa berisi data sensitif (user, IP, pola aktivitas). Minimal, kamu bisa enkripsi file output dan model sebelum dipindah/diupload.
    Xs = scaler.transform(X)
 
  
    # IsolationForest: decision_function makin kecil -> makin anomali
+
===1. Generate key (sekali saja)===
    iso_score = iso.decision_function(Xs) # higher = more normal
 
    iso_label = iso.predict(Xs)            # -1 anomali
 
  
    # KMeans: jarak ke centroid terdekat (makin jauh -> makin anomali)
+
gpg --full-generate-key
    centers = km.cluster_centers_
 
    # hitung jarak L2 ke centroid terdekat
 
    dists = np.sqrt(((Xs[:, None, :] - centers[None, :, :]) ** 2).sum(axis=2))
 
    km_dist = dists.min(axis=1)
 
  
    # gabung score sederhana: rank berdasarkan 2 sinyal
+
Cek key:
    # normalisasi kasar
 
    iso_norm = (iso_score - iso_score.min()) / (iso_score.max() - iso_score.min() + 1e-9)
 
    km_norm = (km_dist - km_dist.min()) / (km_dist.max() - km_dist.min() + 1e-9)
 
  
    # semakin kecil iso_norm = semakin anomali; semakin besar km_norm = semakin anomali
+
gpg --list-keys
    combined = (1.0 - iso_norm) * 0.6 + (km_norm * 0.4)
 
  
    # pilih top N
+
===2. Enkripsi output CSV===
    idx = np.argsort(-combined)[:args.top]
+
Misal email key kamu you@example.com:
  
    outpath = Path(args.outcsv)
+
gpg --output output/anomali_top.csv.gpg --encrypt --recipient you@example.com output/anomali_top.csv
    outpath.parent.mkdir(parents=True, exist_ok=True)
 
  
    fieldnames = [
+
Decrypt:
        "rank", "combined_score",
 
        "iso_label", "iso_score",
 
        "km_dist",
 
        "ts", "hour", "minute", "proc", "user", "ip",
 
        "is_failed", "is_invalid_user", "is_accepted", "is_sudo",
 
        "count_5m_ip", "count_10m_user", "msg_len"
 
    ]
 
  
    with open(outpath, "w", newline="", encoding="utf-8") as f:
+
gpg --output output/anomali_top.csv --decrypt output/anomali_top.csv.gpg
        w = csv.DictWriter(f, fieldnames=fieldnames)
 
        w.writeheader()
 
        for rnk, i in enumerate(idx, start=1):
 
            r = rows[int(i)]
 
            w.writerow({
 
                "rank": rnk,
 
                "combined_score": float(combined[i]),
 
                "iso_label": int(iso_label[i]),
 
                "iso_score": float(iso_score[i]),
 
                "km_dist": float(km_dist[i]),
 
                "ts": r.get("ts", ""),
 
                "hour": r.get("hour", 0),
 
                "minute": r.get("minute", 0),
 
                "proc": r.get("proc", ""),
 
                "user": r.get("user", ""),
 
                "ip": r.get("ip", ""),
 
                "is_failed": r.get("is_failed", 0),
 
                "is_invalid_user": r.get("is_invalid_user", 0),
 
                "is_accepted": r.get("is_accepted", 0),
 
                "is_sudo": r.get("is_sudo", 0),
 
                "count_5m_ip": r.get("count_5m_ip", 0),
 
                "count_10m_user": r.get("count_10m_user", 0),
 
                "msg_len": r.get("msg_len", 0),
 
            })
 
  
    anom_count = int(np.sum(iso_label == -1))
+
===3. Enkripsi model===
    print(f"[OK] Wrote top-{args.top} anomalies -> {outpath}")
 
    print(f"[INFO] Total events: {len(rows)} | IF anomalies flagged: {anom_count}")
 
  
if __name__ == "__main__":
+
gpg --output models/anomali_models.joblib.gpg --encrypt --recipient you@example.com models/anomali_models.joblib
    main()
 
Jalankan:
 
chmod +x scripts/detect_anomalies.py
 
./scripts/detect_anomalies.py --features data/auth_features.jsonl --model models/anomali_models.joblib --outcsv output/anomali_top.csv --top 50
 
column -s, -t output/anomali_top.csv | head -n 20
 
Cara Membaca Hasil
 
Di output/anomali_top.csv, fokus ke:
 
count_5m_ip tinggi + is_failed=1 → indikasi brute force
 
hour sangat dini (mis. 02:00) + is_sudo=1 → aktivitas admin jam aneh
 
user=unknown / invalid_user=1 berulang → scanning user
 
proc/sshd dominan → serangan ke SSH (umum banget di server publik)
 
Tugas mini yang menantang:
 
Ambil 10 anomali teratas, lalu tulis analisis 1–2 kalimat per event:
 
“Kenapa ini anomali?”
 
“Apa tindakan lanjut?” (block IP? cek user? cek sistem?)
 
Simulasi Serangan Ringan (Aman untuk Lab)
 
Kalau kamu punya VM/host lab sendiri, bisa memicu event gagal login (tanpa merusak):
 
Coba login SSH dengan user salah beberapa kali dari client lab.
 
Atau buat event sudo beberapa kali.
 
Penting: lakukan hanya di lingkungan yang kamu miliki/diizinkan.
 
(Opsional) Amankan Model & Output dengan GnuPG
 
Tujuannya: hasil deteksi bisa berisi data sensitif (user, IP, pola aktivitas). Minimal, kamu bisa enkripsi file output dan model sebelum dipindah/diupload.
 
1. Generate key (sekali saja)
 
  
gpg --full-generate-key
+
Skill security yang dinilai: kamu tidak hanya bikin AI, tapi juga mengelola artefak (model/output) dengan aman.
Cek key:
 
gpg --list-keys
 
2. Enkripsi output CSV
 
Misal email key kamu you@example.com:
 
gpg --output output/anomali_top.csv.gpg --encrypt --recipient you@example.com output/anomali_top.csv
 
Decrypt:
 
gpg --output output/anomali_top.csv --decrypt output/anomali_top.csv.gpg
 
3. Enkripsi model
 
  
gpg --output models/anomali_models.joblib.gpg --encrypt --recipient you@example.com models/anomali_models.joblib
+
==Output yang Wajib Dikumpulkan==
Skill security yang dinilai: kamu tidak hanya bikin AI, tapi juga mengelola artefak (model/output) dengan aman.
+
* data/auth_features.jsonl (atau ringkasannya)
 +
* models/anomali_models.joblib (atau versi .gpg)
 +
* output/anomali_top.csv (atau versi .gpg)
 +
* ...
 +
* Laporan.md singkat berisi:
 +
** deskripsi dataset (berapa event),
 +
** parameter model (contamination, k),
 +
** 10 anomali teratas + analisis,
 +
** 3 rekomendasi aksi.
  
Output yang Wajib Dikumpulkan
 
data/auth_features.jsonl (atau ringkasannya)
 
models/anomali_models.joblib (atau versi .gpg)
 
output/anomali_top.csv (atau versi .gpg)
 
...
 
Laporan.md singkat berisi:
 
deskripsi dataset (berapa event),
 
parameter model (contamination, k),
 
10 anomali teratas + analisis,
 
3 rekomendasi aksi.
 
 
Template laporan cepat:
 
Template laporan cepat:
# Laporan Modul 10 — AI Deteksi Anomali
+
# Laporan Modul 10 — AI Deteksi Anomali
 
+
## Dataset
+
## Dataset
- Sumber: auth.log
+
- Sumber: auth.log
- Jumlah event: ...
+
- Jumlah event: ...
- Rentang waktu: ...
+
- Rentang waktu: ...
 
+
## Model
+
## Model
- IsolationForest contamination: ...
+
- IsolationForest contamination: ...
- KMeans k: ...
+
- KMeans k: ...
- Fitur: hour, minute, is_failed, count_5m_ip, ...
+
- Fitur: hour, minute, is_failed, count_5m_ip, ...
 
+
## Temuan Top 10
+
## Temuan Top 10
1) ...
+
1) ...
  - alasan: ...
+
    - alasan: ...
  - tindak lanjut: ...
+
    - tindak lanjut: ...
 +
 +
## Rekomendasi
 +
- ...
 +
- ...
 +
- ...
  
## Rekomendasi
+
==Challenge Upgrade (Kalau Mau Naik Level)==
- ...
 
- ...
 
- ...
 
Challenge Upgrade (Kalau Mau Naik Level)
 
 
Kalau mahasiswa cepat selesai, kasih 1–2 tantangan ini:
 
Kalau mahasiswa cepat selesai, kasih 1–2 tantangan ini:
Tambahkan fitur: hari (weekday) dan deteksi “akses weekend”.
+
* Tambahkan fitur: hari (weekday) dan deteksi “akses weekend”.
Buat mode “stream”: baca log baru (tail) dan skor on-the-fly.
+
* Buat mode “stream”: baca log baru (tail) dan skor on-the-fly.
Ganti auth.log ke Nginx access log dan buat fitur:
+
* Ganti auth.log ke Nginx access log dan buat fitur:
request per IP per menit,
+
** request per IP per menit,
status code 404/500 burst,
+
** status code 404/500 burst,
path yang jarang muncul.
+
** path yang jarang muncul.
  
  

Revision as of 03:14, 24 January 2026

Fokus sesi ini: kamu bikin “AI security” sederhana yang bisa belajar pola normal dari log, lalu menandai yang aneh (anomaly). Ini bukan “AI yang tahu segalanya”, tapi alat bantu triage biar analis tidak tenggelam dalam jutaan baris log.

Tujuan

Mahasiswa mampu:

  • membangun pipeline deteksi anomali dari log nyata (Linux / web / auth),
  • melatih model unsupervised (tanpa label),
  • menghasilkan daftar event mencurigakan + alasan/fitur ringkas,
  • menyimpan model dan menjalankan deteksi ulang secara berkala.

Output akhir yang ditargetkan:

  • Model tersimpan (.joblib)
  • Laporan evaluasi sederhana (rasio anomali, contoh top N anomali)
  • File hasil deteksi (CSV/JSON)
  • (Opsional) hasil/model dienkripsi dengan GnuPG

Konsep Inti

Deteksi anomali = mencari data yang “jarang”, “jauh dari pola normal”, atau “punya kombinasi fitur yang aneh”.

Kamu akan pakai dua pendekatan:

  • Isolation Forest (tree-based): bagus untuk anomaly detection umum, sering jadi baseline kuat.
  • KMeans + jarak ke centroid: sederhana, cepat, mudah dijelaskan (jarak besar = makin aneh).

Catatan penting: Model unsupervised akan menandai “aneh”, bukan otomatis “jahat”. Anomali ≠ serangan, tapi anomali yang harus kamu cek dulu.

Tools (Open Source)

  • OS: Ubuntu 24.04
  • Python 3 + venv
  • Library: pandas, numpy, scikit-learn, joblib
  • (Opsional) matplotlib untuk grafik ringan
  • (Opsional) GnuPG untuk enkripsi file output/model

Skenario Data Log yang Real (Pilih salah satu atau gabungkan)

Kamu bisa latihan pakai:

  • Linux auth log: /var/log/auth.log

Cocok untuk mendeteksi percobaan login gagal masif, lonjakan aktivitas sudo, jam akses aneh.

  • Nginx access log (lab): misalnya file access.log dari web server

Cocok untuk mendeteksi lonjakan request, path aneh, user-agent janggal, pola scanning.

  • Suricata eve.json (kalau sudah main IDS): event security lebih kaya.

Di modul ini kita buat pipeline yang paling mudah jalan di semua laptop/server: mulai dari auth.log + opsi format log sederhana.

Tahap Praktikum (Step-by-step)

0. Setup Environment di Ubuntu 24.04

Jalankan:

sudo apt update
sudo apt install -y python3-venv python3-pip gnupg
mkdir -p ~/modul10-ai-anomali/{data,models,output,scripts}
cd ~/modul10-ai-anomali
python3 -m venv .venv
source .venv/bin/activate
pip install -U pip
pip install pandas numpy scikit-learn joblib

Checklist: pastikan python --version mengarah ke venv dan pip show scikit-learn ada.

1. Ambil Dataset Log

Opsi A — pakai log asli mesin (paling real)

Copy auth log:

sudo cp /var/log/auth.log ~/modul10-ai-anomali/data/auth.log
sudo chown $USER:$USER ~/modul10-ai-anomali/data/auth.log

Opsi B — bikin dataset latihan (biar kontrol)

Kita akan generate log sintetik “mirip event” (normal + aneh) dari Python (nanti ada script).

2. Prinsip Feature Engineering (Supaya “AI” mengerti)

Log itu teks; model butuh angka. Maka kita ubah event jadi fitur numerik, contoh:

Untuk auth event:

  • hour (jam kejadian)
  • fail_count_5m (jumlah gagal login dalam 5 menit per IP/user)
  • distinct_users_10m
  • is_sudo (0/1)
  • is_failed_password (0/1)
  • src_ip_hash (hash → angka stabil; bukan identitas asli)
  • msg_len (panjang pesan)

Yang penting: fitur harus menggambarkan perilaku (burst, jam tidak wajar, variasi user, dsb), bukan sekadar teks mentah.

Implementasi: Pipeline Lengkap (Python)

Di bawah ini 3 file utama:

  • parser & feature builder
  • training model
  • deteksi + export hasil

A. scripts/parse_authlog.py — parse auth.log → dataset fitur

Buat file:

nano ~/modul10-ai-anomali/scripts/parse_authlog.py

Isi:

#!/usr/bin/env python3
import re
import json
from datetime import datetime, timedelta
from collections import deque, defaultdict

# Auth log default Ubuntu: "Jan 20 13:01:02 hostname sshd[123]: Failed password for ..."
# Catatan: tahun tidak ada, kita isi dengan tahun sekarang (cukup untuk lab).
AUTH_RE = re.compile(
    r'^(?P<mon>\w{3})\s+(?P<day>\d{1,2})\s+(?P

Jalankan:

chmod +x scripts/parse_authlog.py
./scripts/parse_authlog.py --infile data/auth.log --outfile data/auth_features.jsonl
head -n 3 data/auth_features.jsonl

B. scripts/train_models.py — latih Isolation Forest + KMeans

Buat file:

nano ~/modul10-ai-anomali/scripts/train_models.py

Isi:

#!/usr/bin/env python3
import json
import joblib
import numpy as np
from pathlib import Path
from sklearn.ensemble import IsolationForest
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler

FEATURE_COLS = [
    "hour", "minute",
    "ip_hash", "user_hash",
    "is_failed", "is_invalid_user", "is_accepted", "is_sudo",
    "msg_len",
    "count_5m_ip", "count_10m_user"
]

def load_jsonl(path: str):
    rows = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            rows.append(json.loads(line))
    return rows

def to_matrix(rows):
    X = []
    for r in rows:
        X.append([float(r.get(c, 0.0)) for c in FEATURE_COLS])
    return np.array(X, dtype=float)

def main():
    import argparse
    p = argparse.ArgumentParser()
    p.add_argument("--infile", required=True, help="JSONL features")
    p.add_argument("--outdir", required=True, help="folder simpan model")
    p.add_argument("--contamination", type=float, default=0.02,
                  help="perkiraan rasio anomali (mis. 0.01-0.05)")
    p.add_argument("--k", type=int, default=8, help="jumlah cluster KMeans")
    args = p.parse_args()

    outdir = Path(args.outdir)
    outdir.mkdir(parents=True, exist_ok=True)

    rows = load_jsonl(args.infile)
    X = to_matrix(rows)

    scaler = StandardScaler()
    Xs = scaler.fit_transform(X)

    iso = IsolationForest(
        n_estimators=300,
        contamination=args.contamination,
        random_state=42,
        n_jobs=-1
    )
    iso.fit(Xs)

    km = KMeans(n_clusters=args.k, random_state=42, n_init="auto")
    km.fit(Xs)

    bundle = {
        "feature_cols": FEATURE_COLS,
        "scaler": scaler,
        "isolation_forest": iso,
        "kmeans": km
    }
    model_path = outdir / "anomali_models.joblib"
    joblib.dump(bundle, model_path)

    # ringkasan cepat
    iso_pred = iso.predict(Xs)  # -1 anomali, 1 normal
    anom_ratio = float(np.mean(iso_pred == -1))

    print("[OK] Model saved:", model_path)
    print(f"[INFO] Events: {len(rows)} | Estimated anomaly ratio (IF): {anom_ratio:.4f}")

if __name__ == "__main__":
    main()

Jalankan:

chmod +x scripts/train_models.py
./scripts/train_models.py --infile data/auth_features.jsonl --outdir models --contamination 0.02 --k 8
ls -lah models

Tips tuning seru: mainkan --contamination (mis. 0.01, 0.03, 0.05). Lihat bagaimana jumlah anomali berubah.

C. scripts/detect_anomalies.py — scoring, ranking, export

Buat file:

nano ~/modul10-ai-anomali/scripts/detect_anomalies.py

Isi:

#!/usr/bin/env python3
import json
import csv
import joblib
import numpy as np
from pathlib import Path

def load_jsonl(path: str):
    rows = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            rows.append(json.loads(line))
    return rows

def main():
    import argparse
    p = argparse.ArgumentParser()
    p.add_argument("--features", required=True, help="JSONL features")
    p.add_argument("--model", required=True, help="joblib model bundle")
    p.add_argument("--outcsv", required=True, help="output CSV anomali")
    p.add_argument("--top", type=int, default=50, help="ambil top N paling anomali")
    args = p.parse_args()

    bundle = joblib.load(args.model)
    cols = bundle["feature_cols"]
    scaler = bundle["scaler"]
    iso = bundle["isolation_forest"]
    km = bundle["kmeans"]

    rows = load_jsonl(args.features)
    X = np.array([[float(r.get(c, 0.0)) for c in cols] for r in rows], dtype=float)
    Xs = scaler.transform(X)

    # IsolationForest: decision_function makin kecil -> makin anomali
    iso_score = iso.decision_function(Xs)  # higher = more normal
    iso_label = iso.predict(Xs)            # -1 anomali

    # KMeans: jarak ke centroid terdekat (makin jauh -> makin anomali)
    centers = km.cluster_centers_
    # hitung jarak L2 ke centroid terdekat
    dists = np.sqrt(((Xs[:, None, :] - centers[None, :, :]) ** 2).sum(axis=2))
    km_dist = dists.min(axis=1)

    # gabung score sederhana: rank berdasarkan 2 sinyal
    # normalisasi kasar
    iso_norm = (iso_score - iso_score.min()) / (iso_score.max() - iso_score.min() + 1e-9)
    km_norm = (km_dist - km_dist.min()) / (km_dist.max() - km_dist.min() + 1e-9)
    # semakin kecil iso_norm = semakin anomali; semakin besar km_norm = semakin anomali
    combined = (1.0 - iso_norm) * 0.6 + (km_norm * 0.4)

    # pilih top N
    idx = np.argsort(-combined)[:args.top]

    outpath = Path(args.outcsv)
    outpath.parent.mkdir(parents=True, exist_ok=True)

    fieldnames = [
        "rank", "combined_score",
        "iso_label", "iso_score",
        "km_dist",
        "ts", "hour", "minute", "proc", "user", "ip",
        "is_failed", "is_invalid_user", "is_accepted", "is_sudo",
        "count_5m_ip", "count_10m_user", "msg_len"
    ]

    with open(outpath, "w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        for rnk, i in enumerate(idx, start=1):
            r = rows[int(i)]
            w.writerow({
                "rank": rnk,
                "combined_score": float(combined[i]),
                "iso_label": int(iso_label[i]),
                "iso_score": float(iso_score[i]),
                "km_dist": float(km_dist[i]),
                "ts": r.get("ts", ""),
                "hour": r.get("hour", 0),
                "minute": r.get("minute", 0),
                "proc": r.get("proc", ""),
                "user": r.get("user", ""),
                "ip": r.get("ip", ""),
                "is_failed": r.get("is_failed", 0),
                "is_invalid_user": r.get("is_invalid_user", 0),
                "is_accepted": r.get("is_accepted", 0),
                "is_sudo": r.get("is_sudo", 0),
                "count_5m_ip": r.get("count_5m_ip", 0),
                "count_10m_user": r.get("count_10m_user", 0),
                "msg_len": r.get("msg_len", 0),
            })
    anom_count = int(np.sum(iso_label == -1))
    print(f"[OK] Wrote top-{args.top} anomalies -> {outpath}")
    print(f"[INFO] Total events: {len(rows)} | IF anomalies flagged: {anom_count}")

if __name__ == "__main__":
    main()

Jalankan:

chmod +x scripts/detect_anomalies.py
./scripts/detect_anomalies.py --features data/auth_features.jsonl --model models/anomali_models.joblib --outcsv output/anomali_top.csv --top 50

column -s, -t output/anomali_top.csv | head -n 20

Cara Membaca Hasil

Di output/anomali_top.csv, fokus ke:

  • count_5m_ip tinggi + is_failed=1 → indikasi brute force
  • hour sangat dini (mis. 02:00) + is_sudo=1 → aktivitas admin jam aneh
  • user=unknown / invalid_user=1 berulang → scanning user
  • proc/sshd dominan → serangan ke SSH (umum banget di server publik)

Tugas mini yang menantang:

  • Ambil 10 anomali teratas, lalu tulis analisis 1–2 kalimat per event:
    • “Kenapa ini anomali?”
    • “Apa tindakan lanjut?” (block IP? cek user? cek sistem?)

Simulasi Serangan Ringan (Aman untuk Lab)

Kalau kamu punya VM/host lab sendiri, bisa memicu event gagal login (tanpa merusak):

  • Coba login SSH dengan user salah beberapa kali dari client lab.
  • Atau buat event sudo beberapa kali.

Penting: lakukan hanya di lingkungan yang kamu miliki/diizinkan.

(Opsional) Amankan Model & Output dengan GnuPG

Tujuannya: hasil deteksi bisa berisi data sensitif (user, IP, pola aktivitas). Minimal, kamu bisa enkripsi file output dan model sebelum dipindah/diupload.

1. Generate key (sekali saja)

gpg --full-generate-key

Cek key:

gpg --list-keys

2. Enkripsi output CSV

Misal email key kamu you@example.com:

gpg --output output/anomali_top.csv.gpg --encrypt --recipient you@example.com output/anomali_top.csv

Decrypt:

gpg --output output/anomali_top.csv --decrypt output/anomali_top.csv.gpg

3. Enkripsi model

gpg --output models/anomali_models.joblib.gpg --encrypt --recipient you@example.com models/anomali_models.joblib

Skill security yang dinilai: kamu tidak hanya bikin AI, tapi juga mengelola artefak (model/output) dengan aman.

Output yang Wajib Dikumpulkan

  • data/auth_features.jsonl (atau ringkasannya)
  • models/anomali_models.joblib (atau versi .gpg)
  • output/anomali_top.csv (atau versi .gpg)
  • ...
  • Laporan.md singkat berisi:
    • deskripsi dataset (berapa event),
    • parameter model (contamination, k),
    • 10 anomali teratas + analisis,
    • 3 rekomendasi aksi.

Template laporan cepat:

# Laporan Modul 10 — AI Deteksi Anomali

## Dataset
- Sumber: auth.log
- Jumlah event: ...
- Rentang waktu: ...

## Model
- IsolationForest contamination: ...
- KMeans k: ...
- Fitur: hour, minute, is_failed, count_5m_ip, ...

## Temuan Top 10
1) ...
   - alasan: ...
   - tindak lanjut: ...

## Rekomendasi
- ...
- ...
- ...

Challenge Upgrade (Kalau Mau Naik Level)

Kalau mahasiswa cepat selesai, kasih 1–2 tantangan ini:

  • Tambahkan fitur: hari (weekday) dan deteksi “akses weekend”.
  • Buat mode “stream”: baca log baru (tail) dan skor on-the-fly.
  • Ganti auth.log ke Nginx access log dan buat fitur:
    • request per IP per menit,
    • status code 404/500 burst,
    • path yang jarang muncul.


Pranala Menarik