Actions
버그 (Bug) #1
진행중ML 기반 탐지 기능 추가
버그 (Bug) #1:
ML 기반 탐지 기능 추가
시작일:
2026/03/08
완료일:
진척도:
0%
추정시간:
설명
[Feature] ZENIK EDR 0-day 탐지율 향상을 위한 ML 기반 탐지 엔진 도입¶
1. 개요 (Overview)¶
- 목표: ZENIK EDR의 AV-TEST Protection 점수 향상 및 신종/변종(0-day) 악성코드 탐지율 극대화
- 현재 상태: YARA + Hash IOC + ClamAV 기반으로 0-day 탐지율 60~70% 수준
- 기대 효과: 머신러닝(ML) 탐지 모듈을 7번째 분석 모듈로 추가하여 0-day 탐지율을 **92~99%**까지 상향
2. 아키텍처 및 모델 구성 (Architecture)¶
속도와 정확도를 모두 잡기 위해 두 가지 모델을 앙상블(Ensemble)하여 판정합니다.
- 모델 1 (메인 - LightGBM): PE 정적 특성 2,351차원 기반 악성코드 다중 분류 (빠른 속도)
- 모델 2 (보조 - CNN/ResNet): PE 파일을 224x224 그레이스케일 이미지로 변환하여 패밀리 유사도 탐지 (정밀 판정)
- 라이선스 (전부 상용 무료): EMBER(MIT), LightGBM(MIT), ONNX Runtime(MIT), ResNet(Apache 2.0)
📌 전체 구조도¶
[Python 학습 파이프라인] [C++ ZENIK 에이전트]
(별도 프로젝트) (기존 솔루션에 통합)
EMBER/MalBazaar 데이터셋 PE 파일 이벤트 수신
↓ ↓
PE 특성 추출 (2,351차원) PEFeatureExtractor (C++)
↓ PE 특성 2,351차원 추출
LightGBM 학습 (multi-class) ↓
↓ ONNX Runtime 추론
model_static.onnx 내보내기 ──배포→ Model1: LightGBM (메인)
↓
PE → 224×224 이미지 변환 PEImageConverter (C++)
↓ PE → 그레이스케일 변환
ResNet/EfficientNet 학습 ↓
↓ ONNX Runtime 추론
model_image.onnx 내보내기 ──배포→ Model2: CNN (보조)
↓
앙상블 판정 (가중 평균)
↓
DetectionPipeline 연동
IDetection::Report()
3. 다중 분류 카테고리 (10 Classes)¶
| ID | 카테고리 | 설명 | 대응 액션 (Response Action) |
|---|---|---|---|
| 0 | Benign | 정상 | - |
| 1 | Trojan | 일반 트로이목마 | KillAndQuarantine |
| 2 | Ransomware | 파일 암호화 | KillAndBlock (긴급) |
| 3 | RAT | 원격 제어 | KillAndBlock + 네트워크 차단 |
| 4 | Keylogger | 키 입력 탈취 | KillAndQuarantine |
| 5 | Infostealer | 정보 탈취 | KillAndBlock |
| 6 | Backdoor | 백도어/C2 | NetworkBlock + Kill |
| 7 | Downloader | 추가 페이로드 | KillAndQuarantine |
| 8 | Miner | 암호화폐 채굴 | KillProcess |
| 9 | Worm | 자가 전파 | KillAndBlock |
4. 상세 구현 내용¶
4.1. Part 1: Python 학습 파이프라인 (별도 프로젝트)¶
-
저장소/위치:
ml-training/ - 주요 기능: EMBER 데이터셋 기반 특성 추출, PE to Image 변환, 모델 학습 및 ONNX 익스포트
-
학습 설정:
-
LightGBM: 10 Multi-class, 2351 features (DOS/PE Headers, Sections, Imports/Exports 등) -
CNN: ResNet18 (ImageNet Pretrained → Fine-tune), 224x224 Grayscale
-
4.2. Part 2: C++ 추론 엔진 (ZENIK 에이전트 통합)¶
-
의존성 추가:
Microsoft.ML.OnnxRuntime(CPU 버전) NuGet 패키지 -
신규 파일 (ZenikEngine/src/ml/):
-
pe_feature_extractor.h/cpp: EMBER 호환 2,351차원 특성 벡터 추출 -
pe_image_converter.h/cpp: PE → 224x224 이미지 변환 -
onnx_inference.h/cpp: ONNX Runtime 래퍼 (모델 로드, 1D/2D 추론) -
ml_detector.h/cpp: 앙상블 판정 및 엔진 연동
-
-
기존 모듈 통합 (
engine_main.cpp):-
g_Engine구조체에MLDetector mlDetector;추가 -
EngineAnalyzeEvent내에 파일/프로세스 생성 이벤트 발생 시 ML 분석 트리거 로직 추가
-
5. 작업 진행 마일스톤 (Checklist)¶
🚀 Step 1: Python 학습 파이프라인 구축¶
-
ml-training프로젝트 디렉토리 및 환경 구성 (requirements.txt) - EMBER 데이터셋 다운로드 및 전처리 스크립트 작성
- PE 특성 추출기(2,351차원) 및 PE → 이미지 변환기 개발
- LightGBM multi-class 학습 및 모델 평가
- CNN (ResNet18) 학습 및 모델 평가
-
학습된 모델
ONNX포맷으로 내보내기 (model_static.onnx,model_image.onnx)
🛠 Step 2: C++ 추론 엔진 구축 및 통합¶
-
ZenikEngine.vcxproj에 ONNX Runtime NuGet 패키지 및 DLL 추가 -
OnnxInference래퍼 클래스 구현 -
Python 파서와 C++
PEFeatureExtractor간 추출 결과 정합성 교차 검증 (엔트로피, 해시 매핑 등) -
PEImageConverterC++ 로직 구현 -
MLDetector앙상블 모듈 (가중 평균 및 Threshold 로직) 구현 -
engine_main.cpp의 7번째 분석 모듈로 통합 연동
🧪 Step 3: 통합 테스트 및 검증¶
- 정상 PE 파일 (notepad, calc 등) False Positive 테스트 (Benign 판정 확인)
- 주요 알려진 악성 샘플 대상 올바른 카테고리(Trojan, Ransomware 등) 분류 확인
- 성능 벤치마크 (목표: LightGBM < 10ms, CNN < 100ms)
- 인기 상용 SW 100개 대상 오탐(FP) 제로 검증
6. 인수 조건 (Definition of Done)¶
- Python 모델 성능: LightGBM 정확도 99% 이상, CNN 정확도 95% 이상 달성 후 ONNX 변환 완료
- C++ 빌드 및 동작: Visual Studio (Release|x64) 환경에서 에러 없이 빌드되며, ONNX 모델이 정상적으로 메모리에 로드될 것
-
탐지 파이프라인 정상 연동:
ZETER_EVT_PROC_CREATE등의 이벤트 발생 시 ML 분석이 트리거되고, 악성 판정 시IDetection::Report()가 정상 호출될 것
희준 배이(가) 약 한시간 전에 변경
악성코드 샘플 수집 스크립트
(종류별 300개 수집)
"""
samples/
├── 1_Trojan/
├── 2_Ransomware/
├── 3_RAT/
├── 4_Keylogger/
├── 5_Infostealer/
├── 6_Backdoor/
├── 7_Downloader/
├── 8_Miner/
└── 9_Worm/
"""
import requests
import os
import sys
import io
import time
import json
from pathlib import Path
try:
import pyzipper
except ImportError:
print("pyzipper 설치 필요: pip install pyzipper")
sys.exit(1)
# ══════════════════════════════════════════════════════════
# 설정
# ══════════════════════════════════════════════════════════
API = "https://mb-api.abuse.ch/api/v1/"
SAVE_DIR = Path(__file__).parent / "samples"
SAMPLES_PER_CLASS = 300
CLASS_NAMES = {
1: "Trojan", 2: "Ransomware", 3: "RAT",
4: "Keylogger", 5: "Infostealer", 6: "Backdoor",
7: "Downloader", 8: "Miner", 9: "Worm",
}
SIGS = {
1: ["AgentTesla","Formbook","Emotet","TrickBot","Qakbot",
"Dridex","Ursnif","DarkTortilla","GCleaner","Pikabot","BumbleBee"],
2: ["LockBit","Conti","Stop","STOP","BlackCat","REvil",
"Ryuk","Maze","Phobos","Djvu","BlackBasta","Akira","Play","Clop"],
3: ["AsyncRAT","NjRAT","DarkComet","QuasarRAT","Remcos","RemcosRAT",
"NetWire","XWorm","Warzone","WarzoneRAT","DCRat","VenomRAT",
"ValleyRAT","OrcusRAT","Gh0stRAT","DarkVisionRAT","zgRAT"],
4: ["SnakeKeylogger","HawkEye","MassLogger","VIPKeylogger","a310Logger"],
5: ["Vidar","RedLine","Raccoon","LokiBot","AZORult","StealC",
"Lumma","LummaStealer","PhantomStealer","ACRStealer",
"RaccoonStealer","RedLineStealer","MetaStealer"],
6: ["CobaltStrike","Metasploit","ShadowPad","PlugX",
"SliverFox","EternalRocks","Sliver"],
7: ["GuLoader","SmokeLoader","Smoke Loader","Amadey",
"BazarLoader","IcedID","BatLoader","PrivateLoader"],
8: ["CoinMiner","XMRig"],
9: ["Phorpiex","Ramnit","Neshta","Virut"],
}
# ══════════════════════════════════════════════════════════
# 핵심 함수
# ══════════════════════════════════════════════════════════
def make_session(api_key: str) -> requests.Session:
sess = requests.Session()
sess.headers.update({"Auth-key": api_key})
return sess
def test_auth(sess: requests.Session) -> bool:
try:
r = sess.post(API,
data={"query": "get_siginfo", "signature": "AgentTesla", "limit": 1},
timeout=15)
return r.json().get("query_status") in ("ok", "no_results")
except:
return False
def get_pe_hashes(sess: requests.Session, signature: str, limit: int = 100) -> list:
"""시그니처 → PE exe/dll SHA256 목록"""
try:
r = sess.post(API,
data={"query": "get_siginfo", "signature": signature, "limit": limit},
timeout=30)
data = r.json()
if data.get("query_status") != "ok":
return []
return [s["sha256_hash"] for s in data.get("data", [])
if s.get("file_type") in ("exe", "dll")]
except:
return []
def download_one(sess: requests.Session, sha256: str, dest_dir: Path) -> bool:
"""샘플 1개 다운로드 → AES ZIP 해제 → PE 저장"""
out = dest_dir / f"{sha256}.bin"
if out.exists() and out.stat().st_size > 100:
return True # 이미 있음
try:
r = sess.post(API,
data={"query": "get_file", "sha256_hash": sha256},
timeout=60)
if r.status_code != 200 or len(r.content) < 100:
return False
if r.content[:2] != b"PK":
return False
# pyzipper: AES 암호화 ZIP 지원
with pyzipper.AESZipFile(io.BytesIO(r.content)) as zf:
names = zf.namelist()
if not names:
return False
data = zf.read(names[0], pwd=b"infected")
if len(data) < 64 or data[:2] != b"MZ":
return False
out.write_bytes(data)
return True
except Exception:
return False
# ══════════════════════════════════════════════════════════
# 메인
# ══════════════════════════════════════════════════════════
def main():
print("=" * 60)
print(" ZENIK ML — MalwareBazaar PE 자동 수집기")
print("=" * 60)
print()
api_key = input("MalwareBazaar Auth-Key: ").strip()
if not api_key:
print("API Key를 입력하세요")
sys.exit(1)
sess = make_session(api_key)
print("인증...", end=" ", flush=True)
if not test_auth(sess):
print("실패")
sys.exit(1)
print("OK\n")
# 폴더 + 기존 수량
counts = {}
for cid, name in CLASS_NAMES.items():
d = SAVE_DIR / f"{cid}_{name}"
d.mkdir(parents=True, exist_ok=True)
counts[cid] = len(list(d.glob("*.bin")))
print(f"저장: {SAVE_DIR.absolute()}")
print(f"목표: 클래스당 {SAMPLES_PER_CLASS}개\n")
for cid, name in CLASS_NAMES.items():
bar = "█" * (counts[cid] // 10)
print(f" [{cid}] {name:12s}: {counts[cid]:4d} {bar}")
print()
total_new = 0
for cid, name in CLASS_NAMES.items():
if counts[cid] >= SAMPLES_PER_CLASS:
print(f"[{name}] {counts[cid]}개 — skip")
continue
need = SAMPLES_PER_CLASS - counts[cid]
dest = SAVE_DIR / f"{cid}_{name}"
got = 0
print(f"\n[{name}] 필요: {need}개")
print("-" * 50)
for sig in SIGS[cid]:
if got >= need:
break
# 해시 목록
t0 = time.time()
hashes = get_pe_hashes(sess, sig, limit=100)
t_q = time.time() - t0
if not hashes:
print(f" {sig:20s} → 0 PE ({t_q:.1f}s)")
continue
# 순차 다운로드
t0 = time.time()
dl = 0
for sha in hashes:
if got >= need:
break
if download_one(sess, sha, dest):
got += 1
dl += 1
# 진행 표시
if dl % 10 == 0:
print(f" {sig:20s} → {dl}/{len(hashes)}...", flush=True)
t_d = time.time() - t0
print(f" {sig:20s} → +{dl:3d}/{len(hashes)} PE "
f"({t_q:.1f}s + {t_d:.1f}s)")
counts[cid] += got
total_new += got
print(f" => [{name}] Total: {counts[cid]}")
# 결과
print("\n" + "=" * 60)
print(" 완료")
print("=" * 60 + "\n")
grand = 0
for cid, name in CLASS_NAMES.items():
n = counts[cid]
bar = "█" * (n // 10)
ok = "OK" if n >= SAMPLES_PER_CLASS else ".." if n >= 50 else "!!"
print(f" {ok} [{cid}] {name:12s}: {n:4d} {bar}")
grand += n
print(f"\n Total: {grand}")
print(f" 신규: {total_new}")
print(f" 저장: {SAVE_DIR.absolute()}")
with open(SAVE_DIR / "progress.json", "w") as f:
json.dump({"counts": {str(k): v for k, v in counts.items()},
"total": grand}, f, indent=2)
if __name__ == "__main__":
main()
Actions