프로젝트

일반

사용자정보

Actions

버그 (Bug) #1

진행중

ML 기반 탐지 기능 추가

버그 (Bug) #1: ML 기반 탐지 기능 추가

희준 배이(가) 약 6시간 전에 추가함. 약 한시간 전에 수정됨.

상태:
신규
우선순위:
보통
담당자:
시작일:
2026/03/08
완료일:
진척도:

0%

추정시간:

설명

[Feature] ZENIK EDR 0-day 탐지율 향상을 위한 ML 기반 탐지 엔진 도입

1. 개요 (Overview)

  • 목표: ZENIK EDR의 AV-TEST Protection 점수 향상 및 신종/변종(0-day) 악성코드 탐지율 극대화
  • 현재 상태: YARA + Hash IOC + ClamAV 기반으로 0-day 탐지율 60~70% 수준
  • 기대 효과: 머신러닝(ML) 탐지 모듈을 7번째 분석 모듈로 추가하여 0-day 탐지율을 **92~99%**까지 상향

2. 아키텍처 및 모델 구성 (Architecture)

속도와 정확도를 모두 잡기 위해 두 가지 모델을 앙상블(Ensemble)하여 판정합니다.

  • 모델 1 (메인 - LightGBM): PE 정적 특성 2,351차원 기반 악성코드 다중 분류 (빠른 속도)
  • 모델 2 (보조 - CNN/ResNet): PE 파일을 224x224 그레이스케일 이미지로 변환하여 패밀리 유사도 탐지 (정밀 판정)
  • 라이선스 (전부 상용 무료): EMBER(MIT), LightGBM(MIT), ONNX Runtime(MIT), ResNet(Apache 2.0)

📌 전체 구조도

[Python 학습 파이프라인]              [C++ ZENIK 에이전트] 
(별도 프로젝트)                       (기존 솔루션에 통합)
EMBER/MalBazaar 데이터셋              PE 파일 이벤트 수신
        ↓                                    ↓
PE 특성 추출 (2,351차원)              PEFeatureExtractor (C++)
        ↓                              PE 특성 2,351차원 추출
LightGBM 학습 (multi-class)                  ↓
        ↓                             ONNX Runtime 추론
model_static.onnx 내보내기   ──배포→   Model1: LightGBM (메인)
                                             ↓
PE → 224×224 이미지 변환              PEImageConverter (C++)
        ↓                              PE → 그레이스케일 변환
ResNet/EfficientNet 학습                     ↓
        ↓                             ONNX Runtime 추론
model_image.onnx 내보내기    ──배포→   Model2: CNN (보조)
                                             ↓
                                      앙상블 판정 (가중 평균)
                                             ↓
                                      DetectionPipeline 연동
                                      IDetection::Report()

3. 다중 분류 카테고리 (10 Classes)

ID 카테고리 설명 대응 액션 (Response Action)
0 Benign 정상 -
1 Trojan 일반 트로이목마 KillAndQuarantine
2 Ransomware 파일 암호화 KillAndBlock (긴급)
3 RAT 원격 제어 KillAndBlock + 네트워크 차단
4 Keylogger 키 입력 탈취 KillAndQuarantine
5 Infostealer 정보 탈취 KillAndBlock
6 Backdoor 백도어/C2 NetworkBlock + Kill
7 Downloader 추가 페이로드 KillAndQuarantine
8 Miner 암호화폐 채굴 KillProcess
9 Worm 자가 전파 KillAndBlock

4. 상세 구현 내용

4.1. Part 1: Python 학습 파이프라인 (별도 프로젝트)

  • 저장소/위치: ml-training/
  • 주요 기능: EMBER 데이터셋 기반 특성 추출, PE to Image 변환, 모델 학습 및 ONNX 익스포트
  • 학습 설정:
    • LightGBM: 10 Multi-class, 2351 features (DOS/PE Headers, Sections, Imports/Exports 등)
    • CNN: ResNet18 (ImageNet Pretrained → Fine-tune), 224x224 Grayscale

4.2. Part 2: C++ 추론 엔진 (ZENIK 에이전트 통합)

  • 의존성 추가: Microsoft.ML.OnnxRuntime (CPU 버전) NuGet 패키지
  • 신규 파일 (ZenikEngine/src/ml/):
    • pe_feature_extractor.h/cpp: EMBER 호환 2,351차원 특성 벡터 추출
    • pe_image_converter.h/cpp: PE → 224x224 이미지 변환
    • onnx_inference.h/cpp: ONNX Runtime 래퍼 (모델 로드, 1D/2D 추론)
    • ml_detector.h/cpp: 앙상블 판정 및 엔진 연동
  • 기존 모듈 통합 (engine_main.cpp):
    • g_Engine 구조체에 MLDetector mlDetector; 추가
    • EngineAnalyzeEvent 내에 파일/프로세스 생성 이벤트 발생 시 ML 분석 트리거 로직 추가

5. 작업 진행 마일스톤 (Checklist)

🚀 Step 1: Python 학습 파이프라인 구축

  • ml-training 프로젝트 디렉토리 및 환경 구성 (requirements.txt)
  • EMBER 데이터셋 다운로드 및 전처리 스크립트 작성
  • PE 특성 추출기(2,351차원) 및 PE → 이미지 변환기 개발
  • LightGBM multi-class 학습 및 모델 평가
  • CNN (ResNet18) 학습 및 모델 평가
  • 학습된 모델 ONNX 포맷으로 내보내기 (model_static.onnx, model_image.onnx)

🛠 Step 2: C++ 추론 엔진 구축 및 통합

  • ZenikEngine.vcxproj에 ONNX Runtime NuGet 패키지 및 DLL 추가
  • OnnxInference 래퍼 클래스 구현
  • Python 파서와 C++ PEFeatureExtractor 간 추출 결과 정합성 교차 검증 (엔트로피, 해시 매핑 등)
  • PEImageConverter C++ 로직 구현
  • MLDetector 앙상블 모듈 (가중 평균 및 Threshold 로직) 구현
  • engine_main.cpp의 7번째 분석 모듈로 통합 연동

🧪 Step 3: 통합 테스트 및 검증

  • 정상 PE 파일 (notepad, calc 등) False Positive 테스트 (Benign 판정 확인)
  • 주요 알려진 악성 샘플 대상 올바른 카테고리(Trojan, Ransomware 등) 분류 확인
  • 성능 벤치마크 (목표: LightGBM < 10ms, CNN < 100ms)
  • 인기 상용 SW 100개 대상 오탐(FP) 제로 검증

6. 인수 조건 (Definition of Done)

  1. Python 모델 성능: LightGBM 정확도 99% 이상, CNN 정확도 95% 이상 달성 후 ONNX 변환 완료
  2. C++ 빌드 및 동작: Visual Studio (Release|x64) 환경에서 에러 없이 빌드되며, ONNX 모델이 정상적으로 메모리에 로드될 것
  3. 탐지 파이프라인 정상 연동: ZETER_EVT_PROC_CREATE 등의 이벤트 발생 시 ML 분석이 트리거되고, 악성 판정 시 IDetection::Report()가 정상 호출될 것

희준 배이(가) 약 한시간 전에 변경 Actions #1

악성코드 샘플 수집 스크립트
(종류별 300개 수집)

"""
    samples/
    ├── 1_Trojan/
    ├── 2_Ransomware/
    ├── 3_RAT/
    ├── 4_Keylogger/
    ├── 5_Infostealer/
    ├── 6_Backdoor/
    ├── 7_Downloader/
    ├── 8_Miner/
    └── 9_Worm/
"""

import requests
import os
import sys
import io
import time
import json
from pathlib import Path

try:
    import pyzipper
except ImportError:
    print("pyzipper 설치 필요: pip install pyzipper")
    sys.exit(1)

# ══════════════════════════════════════════════════════════
# 설정
# ══════════════════════════════════════════════════════════

API = "https://mb-api.abuse.ch/api/v1/"
SAVE_DIR = Path(__file__).parent / "samples"
SAMPLES_PER_CLASS = 300

CLASS_NAMES = {
    1: "Trojan", 2: "Ransomware", 3: "RAT",
    4: "Keylogger", 5: "Infostealer", 6: "Backdoor",
    7: "Downloader", 8: "Miner", 9: "Worm",
}

SIGS = {
    1: ["AgentTesla","Formbook","Emotet","TrickBot","Qakbot",
        "Dridex","Ursnif","DarkTortilla","GCleaner","Pikabot","BumbleBee"],
    2: ["LockBit","Conti","Stop","STOP","BlackCat","REvil",
        "Ryuk","Maze","Phobos","Djvu","BlackBasta","Akira","Play","Clop"],
    3: ["AsyncRAT","NjRAT","DarkComet","QuasarRAT","Remcos","RemcosRAT",
        "NetWire","XWorm","Warzone","WarzoneRAT","DCRat","VenomRAT",
        "ValleyRAT","OrcusRAT","Gh0stRAT","DarkVisionRAT","zgRAT"],
    4: ["SnakeKeylogger","HawkEye","MassLogger","VIPKeylogger","a310Logger"],
    5: ["Vidar","RedLine","Raccoon","LokiBot","AZORult","StealC",
        "Lumma","LummaStealer","PhantomStealer","ACRStealer",
        "RaccoonStealer","RedLineStealer","MetaStealer"],
    6: ["CobaltStrike","Metasploit","ShadowPad","PlugX",
        "SliverFox","EternalRocks","Sliver"],
    7: ["GuLoader","SmokeLoader","Smoke Loader","Amadey",
        "BazarLoader","IcedID","BatLoader","PrivateLoader"],
    8: ["CoinMiner","XMRig"],
    9: ["Phorpiex","Ramnit","Neshta","Virut"],
}


# ══════════════════════════════════════════════════════════
# 핵심 함수
# ══════════════════════════════════════════════════════════

def make_session(api_key: str) -> requests.Session:
    sess = requests.Session()
    sess.headers.update({"Auth-key": api_key})
    return sess


def test_auth(sess: requests.Session) -> bool:
    try:
        r = sess.post(API,
            data={"query": "get_siginfo", "signature": "AgentTesla", "limit": 1},
            timeout=15)
        return r.json().get("query_status") in ("ok", "no_results")
    except:
        return False


def get_pe_hashes(sess: requests.Session, signature: str, limit: int = 100) -> list:
    """시그니처 → PE exe/dll SHA256 목록"""
    try:
        r = sess.post(API,
            data={"query": "get_siginfo", "signature": signature, "limit": limit},
            timeout=30)
        data = r.json()
        if data.get("query_status") != "ok":
            return []
        return [s["sha256_hash"] for s in data.get("data", [])
                if s.get("file_type") in ("exe", "dll")]
    except:
        return []


def download_one(sess: requests.Session, sha256: str, dest_dir: Path) -> bool:
    """샘플 1개 다운로드 → AES ZIP 해제 → PE 저장"""
    out = dest_dir / f"{sha256}.bin"
    if out.exists() and out.stat().st_size > 100:
        return True  # 이미 있음

    try:
        r = sess.post(API,
            data={"query": "get_file", "sha256_hash": sha256},
            timeout=60)

        if r.status_code != 200 or len(r.content) < 100:
            return False

        if r.content[:2] != b"PK":
            return False

        # pyzipper: AES 암호화 ZIP 지원
        with pyzipper.AESZipFile(io.BytesIO(r.content)) as zf:
            names = zf.namelist()
            if not names:
                return False
            data = zf.read(names[0], pwd=b"infected")
            if len(data) < 64 or data[:2] != b"MZ":
                return False
            out.write_bytes(data)
            return True

    except Exception:
        return False


# ══════════════════════════════════════════════════════════
# 메인
# ══════════════════════════════════════════════════════════

def main():
    print("=" * 60)
    print("  ZENIK ML — MalwareBazaar PE 자동 수집기")
    print("=" * 60)
    print()

    api_key = input("MalwareBazaar Auth-Key: ").strip()
    if not api_key:
        print("API Key를 입력하세요")
        sys.exit(1)

    sess = make_session(api_key)
    print("인증...", end=" ", flush=True)
    if not test_auth(sess):
        print("실패")
        sys.exit(1)
    print("OK\n")

    # 폴더 + 기존 수량
    counts = {}
    for cid, name in CLASS_NAMES.items():
        d = SAVE_DIR / f"{cid}_{name}"
        d.mkdir(parents=True, exist_ok=True)
        counts[cid] = len(list(d.glob("*.bin")))

    print(f"저장: {SAVE_DIR.absolute()}")
    print(f"목표: 클래스당 {SAMPLES_PER_CLASS}개\n")
    for cid, name in CLASS_NAMES.items():
        bar = "█" * (counts[cid] // 10)
        print(f"  [{cid}] {name:12s}: {counts[cid]:4d} {bar}")
    print()

    total_new = 0

    for cid, name in CLASS_NAMES.items():
        if counts[cid] >= SAMPLES_PER_CLASS:
            print(f"[{name}] {counts[cid]}개 — skip")
            continue

        need = SAMPLES_PER_CLASS - counts[cid]
        dest = SAVE_DIR / f"{cid}_{name}"
        got = 0

        print(f"\n[{name}] 필요: {need}개")
        print("-" * 50)

        for sig in SIGS[cid]:
            if got >= need:
                break

            # 해시 목록
            t0 = time.time()
            hashes = get_pe_hashes(sess, sig, limit=100)
            t_q = time.time() - t0

            if not hashes:
                print(f"  {sig:20s} → 0 PE ({t_q:.1f}s)")
                continue

            # 순차 다운로드
            t0 = time.time()
            dl = 0
            for sha in hashes:
                if got >= need:
                    break
                if download_one(sess, sha, dest):
                    got += 1
                    dl += 1
                    # 진행 표시
                    if dl % 10 == 0:
                        print(f"  {sig:20s} → {dl}/{len(hashes)}...", flush=True)
            t_d = time.time() - t0

            print(f"  {sig:20s} → +{dl:3d}/{len(hashes)} PE  "
                  f"({t_q:.1f}s + {t_d:.1f}s)")

        counts[cid] += got
        total_new += got
        print(f"  => [{name}] Total: {counts[cid]}")

    # 결과
    print("\n" + "=" * 60)
    print("  완료")
    print("=" * 60 + "\n")

    grand = 0
    for cid, name in CLASS_NAMES.items():
        n = counts[cid]
        bar = "█" * (n // 10)
        ok = "OK" if n >= SAMPLES_PER_CLASS else ".." if n >= 50 else "!!"
        print(f"  {ok} [{cid}] {name:12s}: {n:4d} {bar}")
        grand += n

    print(f"\n  Total: {grand}")
    print(f"  신규: {total_new}")
    print(f"  저장: {SAVE_DIR.absolute()}")

    with open(SAVE_DIR / "progress.json", "w") as f:
        json.dump({"counts": {str(k): v for k, v in counts.items()},
                    "total": grand}, f, indent=2)


if __name__ == "__main__":
    main()

Actions

내보내기 PDF Atom