Building Automated Malware Submission Pipeline
When to Use
Use this skill when:
- SOC teams face high volume of suspicious file alerts requiring sandbox analysis
- Manual sandbox submission creates bottlenecks in alert triage workflow
- Endpoint and email security tools quarantine files needing automated verdict determination
- Incident response requires rapid malware family identification and IOC extraction
Do not use for analyzing live malware samples in production environments โ always use isolated sandbox infrastructure.
Prerequisites
- Sandbox environment: Cuckoo Sandbox, Joe Sandbox, Any.Run, or VMRay
- VirusTotal API key (Enterprise for submission, free for lookup)
- MalwareBazaar API access for known malware lookup
- File collection mechanism: EDR quarantine API, email gateway export, network capture
- Python 3.8+ with
requests,vt-py,pefilelibraries - Isolated analysis network with no production connectivity
Workflow
Step 1: Build File Collection Pipeline
Collect suspicious files from multiple sources:
import requests
import hashlib
import os
from pathlib import Path
from datetime import datetime
class MalwareCollector:
def __init__(self, quarantine_dir="/opt/malware_quarantine"):
self.quarantine_dir = Path(quarantine_dir)
self.quarantine_dir.mkdir(exist_ok=True)
def collect_from_edr(self, edr_api_url, api_token):
"""Pull quarantined files from CrowdStrike Falcon"""
headers = {"Authorization": f"Bearer {api_token}"}
# Get recent quarantine events
response = requests.get(
f"{edr_api_url}/quarantine/queries/quarantined-files/v1",
headers=headers,
params={"filter": "state:'quarantined'", "limit": 50}
)
file_ids = response.json()["resources"]
for file_id in file_ids:
# Download quarantined file
dl_response = requests.get(
f"{edr_api_url}/quarantine/entities/quarantined-files/v1",
headers=headers,
params={"ids": file_id}
)
file_data = dl_response.content
sha256 = hashlib.sha256(file_data).hexdigest()
filepath = self.quarantine_dir / f"{sha256}.sample"
filepath.write_bytes(file_data)
yield {"sha256": sha256, "path": str(filepath), "source": "edr"}
def collect_from_email_gateway(self, smtp_quarantine_path):
"""Pull attachments from email gateway quarantine"""
import email
from email import policy
for eml_file in Path(smtp_quarantine_path).glob("*.eml"):
msg = email.message_from_binary_file(
eml_file.open("rb"), policy=policy.default
)
for attachment in msg.iter_attachments():
content = attachment.get_content()
if isinstance(content, str):
content = content.encode()
sha256 = hashlib.sha256(content).hexdigest()
filename = attachment.get_filename() or "unknown"
filepath = self.quarantine_dir / f"{sha256}.sample"
filepath.write_bytes(content)
yield {
"sha256": sha256,
"path": str(filepath),
"source": "email",
"original_filename": filename,
"sender": msg["From"],
"subject": msg["Subject"]
}
def compute_hashes(self, filepath):
"""Calculate MD5, SHA1, SHA256 for a file"""
with open(filepath, "rb") as f:
content = f.read()
return {
"md5": hashlib.md5(content).hexdigest(),
"sha1": hashlib.sha1(content).hexdigest(),
"sha256": hashlib.sha256(content).hexdigest(),
"size": len(content)
}
Step 2: Pre-Screen with Hash Lookups
Check if the file is already known before sandbox submission:
import vt
class MalwarePreScreener:
def __init__(self, vt_api_key, mb_api_url="https://mb-api.abuse.ch/api/v1/"):
self.vt_client = vt.Client(vt_api_key)
self.mb_api_url = mb_api_url
def check_virustotal(self, sha256):
"""Lookup hash in VirusTotal"""
try:
file_obj = self.vt_client.get_object(f"/files/{sha256}")
stats = file_obj.last_analysis_stats
return {
"found": True,
"malicious": stats.get("malicious", 0),
"suspicious": stats.get("suspicious", 0),
"undetected": stats.get("undetected", 0),
"total": sum(stats.values()),
"threat_label": getattr(file_obj, "popular_threat_classification", {}).get(
"suggested_threat_label", "Unknown"
),
"type": getattr(file_obj, "type_description", "Unknown")
}
except vt.APIError:
return {"found": False}
def check_malwarebazaar(self, sha256):
"""Lookup hash in MalwareBazaar"""
response = requests.post(
self.mb_api_url,
data={"query": "get_info", "hash": sha256}
)
data = response.json()
if data["query_status"] == "ok":
entry = data["data"][0]
return {
"found": True,
"signature": entry.get("signature", "Unknown"),
"tags": entry.get("tags", []),
"file_type": entry.get("file_type", "Unknown"),
"first_seen": entry.get("first_seen", "Unknown")
}
return {"found": False}
def pre_screen(self, sha256):
"""Run all pre-screening checks"""
vt_result = self.check_virustotal(sha256)
mb_result = self.check_malwarebazaar(sha256)
verdict = "UNKNOWN"
if vt_result["found"] and vt_result.get("malicious", 0) > 10:
verdict = "KNOWN_MALICIOUS"
elif vt_result["found"] and vt_result.get("malicious", 0) == 0:
verdict = "LIKELY_CLEAN"
return {
"sha256": sha256,
"virustotal": vt_result,
"malwarebazaar": mb_result,
"pre_screen_verdict": verdict,
"needs_sandbox": verdict == "UNKNOWN"
}
def close(self):
self.vt_client.close()
Step 3: Submit to Sandbox for Dynamic Analysis
Cuckoo Sandbox Submission:
class SandboxSubmitter:
def __init__(self, cuckoo_url="http://cuckoo.internal:8090"):
self.cuckoo_url = cuckoo_url
def submit_to_cuckoo(self, filepath, timeout=300):
"""Submit file to Cuckoo Sandbox"""
with open(filepath, "rb") as f:
response = requests.post(
f"{self.cuckoo_url}/tasks/create/file",
files={"file": f},
data={
"timeout": timeout,
"options": "procmemdump=yes,route=none",
"priority": 2,
"machine": "win10_x64"
}
)
task_id = response.json()["task_id"]
return task_id
def wait_for_analysis(self, task_id, poll_interval=30, max_wait=600):
"""Wait for sandbox analysis to complete"""
import time
elapsed = 0
while elapsed < max_wait:
response = requests.get(f"{self.cuckoo_url}/tasks/view/{task_id}")
status = response.json()["task"]["status"]
if status == "reported":
return self.get_report(task_id)
elif status == "failed_analysis":
return {"error": "Analysis failed"}
time.sleep(poll_interval)
elapsed += poll_interval
return {"error": "Analysis timed out"}
def get_report(self, task_id):
"""Retrieve analysis report"""
response = requests.get(f"{self.cuckoo_url}/tasks/report/{task_id}")
report = response.json()
# Extract key indicators
return {
"task_id": task_id,
"score": report.get("info", {}).get("score", 0),
"signatures": [
{"name": s["name"], "severity": s["severity"], "description": s["description"]}
for s in report.get("signatures", [])
],
"network": {
"dns": [d["request"] for d in report.get("network", {}).get("dns", [])],
"http": [
{"url": h["uri"], "method": h["method"]}
for h in report.get("network", {}).get("http", [])
],
"hosts": report.get("network", {}).get("hosts", [])
},
"dropped_files": [
{"name": f["name"], "sha256": f["sha256"], "size": f["size"]}
for f in report.get("dropped", [])
],
"processes": [
{"name": p["process_name"], "pid": p["pid"], "command_line": p.get("command_line", "")}
for p in report.get("behavior", {}).get("processes", [])
],
"registry_keys": [
k for k in report.get("behavior", {}).get("summary", {}).get("regkey_written", [])
]
}
def submit_to_joesandbox(self, filepath, joe_api_key, joe_url="https://jbxcloud.joesecurity.org/api"):
"""Submit to Joe Sandbox Cloud"""
with open(filepath, "rb") as f:
response = requests.post(
f"{joe_url}/v2/submission/new",
headers={"Authorization": f"Bearer {joe_api_key}"},
files={"sample": f},
data={
"systems": "w10_64",
"internet-access": False,
"report-cache": True
}
)
return response.json()["data"]["webid"]
Step 4: Extract IOCs and Generate Verdict
class VerdictGenerator:
def __init__(self):
self.malicious_threshold = 7 # Cuckoo score threshold
def generate_verdict(self, pre_screen, sandbox_report):
"""Combine pre-screening and sandbox results for final verdict"""
iocs = {
"ips": [],
"domains": [],
"urls": [],
"hashes": [],
"registry_keys": [],
"files_dropped": []
}
# Extract IOCs from sandbox report
if sandbox_report:
iocs["domains"] = sandbox_report.get("network", {}).get("dns", [])
iocs["ips"] = sandbox_report.get("network", {}).get("hosts", [])
iocs["urls"] = [
h["url"] for h in sandbox_report.get("network", {}).get("http", [])
]
iocs["hashes"] = [
f["sha256"] for f in sandbox_report.get("dropped_files", [])
]
iocs["registry_keys"] = sandbox_report.get("registry_keys", [])[:10]
iocs["files_dropped"] = sandbox_report.get("dropped_files", [])
# Determine verdict
vt_malicious = pre_screen.get("virustotal", {}).get("malicious", 0)
sandbox_score = sandbox_report.get("score", 0) if sandbox_report else 0
sig_count = len(sandbox_report.get("signatures", [])) if sandbox_report else 0
combined_score = (vt_malicious * 2) + (sandbox_score * 10) + (sig_count * 5)
if combined_score >= 100:
verdict = "MALICIOUS"
confidence = "HIGH"
elif combined_score >= 50:
verdict = "SUSPICIOUS"
confidence = "MEDIUM"
elif combined_score >= 20:
verdict = "POTENTIALLY_UNWANTED"
confidence = "LOW"
else:
verdict = "CLEAN"
confidence = "HIGH"
return {
"verdict": verdict,
"confidence": confidence,
"combined_score": combined_score,
"iocs": iocs,
"vt_detections": vt_malicious,
"sandbox_score": sandbox_score,
"signatures": sandbox_report.get("signatures", []) if sandbox_report else []
}
Step 5: Push Results to SIEM
def push_to_splunk(verdict_result, splunk_url, splunk_token):
"""Send malware analysis verdict to Splunk HEC"""
import json
event = {
"sourcetype": "malware_analysis",
"source": "malware_pipeline",
"event": {
"sha256": verdict_result["sha256"],
"verdict": verdict_result["verdict"],
"confidence": verdict_result["confidence"],
"score": verdict_result["combined_score"],
"vt_detections": verdict_result["vt_detections"],
"sandbox_score": verdict_result["sandbox_score"],
"malware_family": verdict_result.get("threat_label", "Unknown"),
"iocs": verdict_result["iocs"],
"signatures": [s["name"] for s in verdict_result["signatures"]]
}
}
response = requests.post(
f"{splunk_url}/services/collector/event",
headers={
"Authorization": f"Splunk {splunk_token}",
"Content-Type": "application/json"
},
json=event,
verify=False
)
return response.status_code == 200
def push_iocs_to_blocklist(iocs, firewall_api):
"""Push extracted IOCs to blocking infrastructure"""
for ip in iocs.get("ips", []):
requests.post(
f"{firewall_api}/block",
json={"type": "ip", "value": ip, "action": "block", "source": "malware_pipeline"}
)
for domain in iocs.get("domains", []):
requests.post(
f"{firewall_api}/block",
json={"type": "domain", "value": domain, "action": "sinkhole", "source": "malware_pipeline"}
)
Step 6: Orchestrate the Full Pipeline
def run_malware_pipeline(sample_path, config):
"""Execute full malware analysis pipeline"""
collector = MalwareCollector()
screener = MalwarePreScreener(config["vt_key"])
submitter = SandboxSubmitter(config["cuckoo_url"])
generator = VerdictGenerator()
# Step 1: Hash and pre-screen
hashes = collector.compute_hashes(sample_path)
pre_screen = screener.pre_screen(hashes["sha256"])
# Step 2: Submit to sandbox if unknown
sandbox_report = None
if pre_screen["needs_sandbox"]:
task_id = submitter.submit_to_cuckoo(sample_path)
sandbox_report = submitter.wait_for_analysis(task_id)
# Step 3: Generate verdict
verdict = generator.generate_verdict(pre_screen, sandbox_report)
verdict["sha256"] = hashes["sha256"]
verdict["threat_label"] = pre_screen.get("virustotal", {}).get("threat_label", "Unknown")
# Step 4: Push to SIEM
push_to_splunk(verdict, config["splunk_url"], config["splunk_token"])
# Step 5: Block if malicious
if verdict["verdict"] == "MALICIOUS":
push_iocs_to_blocklist(verdict["iocs"], config["firewall_api"])
screener.close()
return verdict
Key Concepts
| Term | Definition |
|---|---|
| Dynamic Analysis | Executing malware in a sandbox to observe runtime behavior (process creation, network, file system changes) |
| Static Analysis | Examining malware without execution (hash lookup, string analysis, PE header inspection) |
| Sandbox Evasion | Techniques malware uses to detect sandbox environments and alter behavior to avoid analysis |
| IOC Extraction | Automated process of identifying network indicators, file artifacts, and registry changes from sandbox reports |
| Multi-AV Scanning | Submitting samples to multiple antivirus engines (VirusTotal) for consensus-based detection |
| Verdict | Final classification of a sample: Malicious, Suspicious, Potentially Unwanted, or Clean |
Tools & Systems
- Cuckoo Sandbox: Open-source automated malware analysis platform with behavioral analysis and network capture
- Joe Sandbox: Commercial sandbox with deep behavioral analysis, YARA matching, and MITRE ATT&CK mapping
- Any.Run: Interactive sandbox service allowing real-time manipulation during analysis for debugging evasive malware
- VirusTotal: Multi-engine scanning service providing 70+ AV results and behavioral analysis reports
- CAPE Sandbox: Community-maintained Cuckoo fork with enhanced payload extraction and configuration dumping
Common Scenarios
- Email Attachment Triage: Auto-submit quarantined email attachments, generate verdict in <5 minutes
- EDR Quarantine Processing: Batch-process files quarantined by endpoint security for detailed analysis
- Incident Investigation: Submit suspicious binaries found during IR for malware family identification and IOC extraction
- Threat Intel Enrichment: Analyze samples from threat feeds to extract C2 infrastructure and update blocking
- Zero-Day Detection: Sandbox catches novel malware missed by signature-based AV through behavioral analysis
Output Format
MALWARE ANALYSIS REPORT โ Pipeline Submission
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Sample: invoice_march.docx
SHA256: a1b2c3d4e5f6a7b8...
File Type: Microsoft Word Document (macro-enabled)
Pre-Screening:
VirusTotal: 34/72 malicious (Emotet.Downloader)
MalwareBazaar: Tags: emotet, macro, downloader
Sandbox Analysis (Cuckoo):
Score: 9.2/10 (MALICIOUS)
Signatures:
- Macro executes PowerShell download cradle (severity: 8)
- Process injection into explorer.exe (severity: 9)
- Connects to known Emotet C2 server (severity: 9)
Extracted IOCs:
C2 IPs: 185.234.218[.]50:8080, 45.77.123[.]45:443
Domains: update-service[.]evil[.]com
Dropped Files: payload.dll (SHA256: b2c3d4e5...)
Registry: HKCU\Software\Microsoft\Windows\CurrentVersion\Run\Update
VERDICT: MALICIOUS (Emotet Downloader) โ Confidence: HIGH
ACTIONS:
[DONE] IOCs pushed to Splunk threat intel
[DONE] C2 IPs blocked on firewall
[DONE] Domain sinkholed on DNS
[DONE] Hash blocked on endpoint
Verification Criteria
Confirm successful execution by validating:
- [ ] All prerequisite tools and access requirements are satisfied
- [ ] Each workflow step completed without errors
- [ ] Output matches expected format and contains expected data
- [ ] No security warnings or misconfigurations detected
- [ ] Results are documented and evidence is preserved for audit
Compliance Framework Mapping
This skill supports compliance evidence collection across multiple frameworks:
- SOC 2: CC7.1 (Monitoring), CC7.2 (Anomaly Detection), CC7.3 (Incident Identification)
- ISO 27001: A.12.4 (Logging & Monitoring), A.16.1 (Security Incident Management)
- NIST 800-53: AU-6 (Audit Review), SI-4 (System Monitoring), IR-5 (Incident Monitoring)
- NIST CSF: DE.AE (Anomalies & Events), DE.CM (Continuous Monitoring)
Claw GRC Tip: When this skill is executed by a registered agent, compliance evidence is automatically captured and mapped to the relevant controls in your active frameworks.
Deploying This Skill with Claw GRC
Agent Execution
Register this skill with your Claw GRC agent for automated execution:
# Install via CLI
npx claw-grc skills add building-automated-malware-submission-pipeline
# Or load dynamically via MCP
grc.load_skill("building-automated-malware-submission-pipeline")
Audit Trail Integration
When executed through Claw GRC, every step of this skill generates tamper-evident audit records:
- SHA-256 chain hashing ensures no step can be modified after execution
- Evidence artifacts (configs, scan results, logs) are automatically attached to relevant controls
- Trust score impact โ successful execution increases your agent's trust score
Continuous Compliance
Schedule this skill for recurring execution to maintain continuous compliance posture. Claw GRC monitors for drift and alerts when re-execution is needed.