CG
SkillsBuilding Automated Malware Submission Pipeline
Start Free
Back to Skills Library
Security Operations๐ŸŸก Intermediate

Building Automated Malware Submission Pipeline

Builds an automated malware submission and analysis pipeline that collects suspicious files from endpoints and email gateways, submits them to sandbox environments and multi-engine scanners, and generates verdicts with IOCs for SIEM integration.

7 min read7 code examples

Prerequisites

  • Sandbox environment: Cuckoo Sandbox, Joe Sandbox, Any.Run, or VMRay
  • VirusTotal API key (Enterprise for submission, free for lookup)
  • MalwareBazaar API access for known malware lookup
  • File collection mechanism: EDR quarantine API, email gateway export, network capture
  • Python 3.8+ with `requests`, `vt-py`, `pefile` libraries
  • Isolated analysis network with no production connectivity

Building Automated Malware Submission Pipeline

When to Use

Use this skill when:

  • SOC teams face high volume of suspicious file alerts requiring sandbox analysis
  • Manual sandbox submission creates bottlenecks in alert triage workflow
  • Endpoint and email security tools quarantine files needing automated verdict determination
  • Incident response requires rapid malware family identification and IOC extraction

Do not use for analyzing live malware samples in production environments โ€” always use isolated sandbox infrastructure.

Prerequisites

  • Sandbox environment: Cuckoo Sandbox, Joe Sandbox, Any.Run, or VMRay
  • VirusTotal API key (Enterprise for submission, free for lookup)
  • MalwareBazaar API access for known malware lookup
  • File collection mechanism: EDR quarantine API, email gateway export, network capture
  • Python 3.8+ with requests, vt-py, pefile libraries
  • Isolated analysis network with no production connectivity

Workflow

Step 1: Build File Collection Pipeline

Collect suspicious files from multiple sources:

import requests
import hashlib
import os
from pathlib import Path
from datetime import datetime

class MalwareCollector:
    def __init__(self, quarantine_dir="/opt/malware_quarantine"):
        self.quarantine_dir = Path(quarantine_dir)
        self.quarantine_dir.mkdir(exist_ok=True)

    def collect_from_edr(self, edr_api_url, api_token):
        """Pull quarantined files from CrowdStrike Falcon"""
        headers = {"Authorization": f"Bearer {api_token}"}

        # Get recent quarantine events
        response = requests.get(
            f"{edr_api_url}/quarantine/queries/quarantined-files/v1",
            headers=headers,
            params={"filter": "state:'quarantined'", "limit": 50}
        )
        file_ids = response.json()["resources"]

        for file_id in file_ids:
            # Download quarantined file
            dl_response = requests.get(
                f"{edr_api_url}/quarantine/entities/quarantined-files/v1",
                headers=headers,
                params={"ids": file_id}
            )
            file_data = dl_response.content
            sha256 = hashlib.sha256(file_data).hexdigest()

            filepath = self.quarantine_dir / f"{sha256}.sample"
            filepath.write_bytes(file_data)
            yield {"sha256": sha256, "path": str(filepath), "source": "edr"}

    def collect_from_email_gateway(self, smtp_quarantine_path):
        """Pull attachments from email gateway quarantine"""
        import email
        from email import policy

        for eml_file in Path(smtp_quarantine_path).glob("*.eml"):
            msg = email.message_from_binary_file(
                eml_file.open("rb"), policy=policy.default
            )
            for attachment in msg.iter_attachments():
                content = attachment.get_content()
                if isinstance(content, str):
                    content = content.encode()
                sha256 = hashlib.sha256(content).hexdigest()
                filename = attachment.get_filename() or "unknown"

                filepath = self.quarantine_dir / f"{sha256}.sample"
                filepath.write_bytes(content)
                yield {
                    "sha256": sha256,
                    "path": str(filepath),
                    "source": "email",
                    "original_filename": filename,
                    "sender": msg["From"],
                    "subject": msg["Subject"]
                }

    def compute_hashes(self, filepath):
        """Calculate MD5, SHA1, SHA256 for a file"""
        with open(filepath, "rb") as f:
            content = f.read()
        return {
            "md5": hashlib.md5(content).hexdigest(),
            "sha1": hashlib.sha1(content).hexdigest(),
            "sha256": hashlib.sha256(content).hexdigest(),
            "size": len(content)
        }

Step 2: Pre-Screen with Hash Lookups

Check if the file is already known before sandbox submission:

import vt

class MalwarePreScreener:
    def __init__(self, vt_api_key, mb_api_url="https://mb-api.abuse.ch/api/v1/"):
        self.vt_client = vt.Client(vt_api_key)
        self.mb_api_url = mb_api_url

    def check_virustotal(self, sha256):
        """Lookup hash in VirusTotal"""
        try:
            file_obj = self.vt_client.get_object(f"/files/{sha256}")
            stats = file_obj.last_analysis_stats
            return {
                "found": True,
                "malicious": stats.get("malicious", 0),
                "suspicious": stats.get("suspicious", 0),
                "undetected": stats.get("undetected", 0),
                "total": sum(stats.values()),
                "threat_label": getattr(file_obj, "popular_threat_classification", {}).get(
                    "suggested_threat_label", "Unknown"
                ),
                "type": getattr(file_obj, "type_description", "Unknown")
            }
        except vt.APIError:
            return {"found": False}

    def check_malwarebazaar(self, sha256):
        """Lookup hash in MalwareBazaar"""
        response = requests.post(
            self.mb_api_url,
            data={"query": "get_info", "hash": sha256}
        )
        data = response.json()
        if data["query_status"] == "ok":
            entry = data["data"][0]
            return {
                "found": True,
                "signature": entry.get("signature", "Unknown"),
                "tags": entry.get("tags", []),
                "file_type": entry.get("file_type", "Unknown"),
                "first_seen": entry.get("first_seen", "Unknown")
            }
        return {"found": False}

    def pre_screen(self, sha256):
        """Run all pre-screening checks"""
        vt_result = self.check_virustotal(sha256)
        mb_result = self.check_malwarebazaar(sha256)

        verdict = "UNKNOWN"
        if vt_result["found"] and vt_result.get("malicious", 0) > 10:
            verdict = "KNOWN_MALICIOUS"
        elif vt_result["found"] and vt_result.get("malicious", 0) == 0:
            verdict = "LIKELY_CLEAN"

        return {
            "sha256": sha256,
            "virustotal": vt_result,
            "malwarebazaar": mb_result,
            "pre_screen_verdict": verdict,
            "needs_sandbox": verdict == "UNKNOWN"
        }

    def close(self):
        self.vt_client.close()

Step 3: Submit to Sandbox for Dynamic Analysis

Cuckoo Sandbox Submission:

class SandboxSubmitter:
    def __init__(self, cuckoo_url="http://cuckoo.internal:8090"):
        self.cuckoo_url = cuckoo_url

    def submit_to_cuckoo(self, filepath, timeout=300):
        """Submit file to Cuckoo Sandbox"""
        with open(filepath, "rb") as f:
            response = requests.post(
                f"{self.cuckoo_url}/tasks/create/file",
                files={"file": f},
                data={
                    "timeout": timeout,
                    "options": "procmemdump=yes,route=none",
                    "priority": 2,
                    "machine": "win10_x64"
                }
            )
        task_id = response.json()["task_id"]
        return task_id

    def wait_for_analysis(self, task_id, poll_interval=30, max_wait=600):
        """Wait for sandbox analysis to complete"""
        import time
        elapsed = 0
        while elapsed < max_wait:
            response = requests.get(f"{self.cuckoo_url}/tasks/view/{task_id}")
            status = response.json()["task"]["status"]
            if status == "reported":
                return self.get_report(task_id)
            elif status == "failed_analysis":
                return {"error": "Analysis failed"}
            time.sleep(poll_interval)
            elapsed += poll_interval
        return {"error": "Analysis timed out"}

    def get_report(self, task_id):
        """Retrieve analysis report"""
        response = requests.get(f"{self.cuckoo_url}/tasks/report/{task_id}")
        report = response.json()

        # Extract key indicators
        return {
            "task_id": task_id,
            "score": report.get("info", {}).get("score", 0),
            "signatures": [
                {"name": s["name"], "severity": s["severity"], "description": s["description"]}
                for s in report.get("signatures", [])
            ],
            "network": {
                "dns": [d["request"] for d in report.get("network", {}).get("dns", [])],
                "http": [
                    {"url": h["uri"], "method": h["method"]}
                    for h in report.get("network", {}).get("http", [])
                ],
                "hosts": report.get("network", {}).get("hosts", [])
            },
            "dropped_files": [
                {"name": f["name"], "sha256": f["sha256"], "size": f["size"]}
                for f in report.get("dropped", [])
            ],
            "processes": [
                {"name": p["process_name"], "pid": p["pid"], "command_line": p.get("command_line", "")}
                for p in report.get("behavior", {}).get("processes", [])
            ],
            "registry_keys": [
                k for k in report.get("behavior", {}).get("summary", {}).get("regkey_written", [])
            ]
        }

    def submit_to_joesandbox(self, filepath, joe_api_key, joe_url="https://jbxcloud.joesecurity.org/api"):
        """Submit to Joe Sandbox Cloud"""
        with open(filepath, "rb") as f:
            response = requests.post(
                f"{joe_url}/v2/submission/new",
                headers={"Authorization": f"Bearer {joe_api_key}"},
                files={"sample": f},
                data={
                    "systems": "w10_64",
                    "internet-access": False,
                    "report-cache": True
                }
            )
        return response.json()["data"]["webid"]

Step 4: Extract IOCs and Generate Verdict

class VerdictGenerator:
    def __init__(self):
        self.malicious_threshold = 7  # Cuckoo score threshold

    def generate_verdict(self, pre_screen, sandbox_report):
        """Combine pre-screening and sandbox results for final verdict"""
        iocs = {
            "ips": [],
            "domains": [],
            "urls": [],
            "hashes": [],
            "registry_keys": [],
            "files_dropped": []
        }

        # Extract IOCs from sandbox report
        if sandbox_report:
            iocs["domains"] = sandbox_report.get("network", {}).get("dns", [])
            iocs["ips"] = sandbox_report.get("network", {}).get("hosts", [])
            iocs["urls"] = [
                h["url"] for h in sandbox_report.get("network", {}).get("http", [])
            ]
            iocs["hashes"] = [
                f["sha256"] for f in sandbox_report.get("dropped_files", [])
            ]
            iocs["registry_keys"] = sandbox_report.get("registry_keys", [])[:10]
            iocs["files_dropped"] = sandbox_report.get("dropped_files", [])

        # Determine verdict
        vt_malicious = pre_screen.get("virustotal", {}).get("malicious", 0)
        sandbox_score = sandbox_report.get("score", 0) if sandbox_report else 0
        sig_count = len(sandbox_report.get("signatures", [])) if sandbox_report else 0

        combined_score = (vt_malicious * 2) + (sandbox_score * 10) + (sig_count * 5)

        if combined_score >= 100:
            verdict = "MALICIOUS"
            confidence = "HIGH"
        elif combined_score >= 50:
            verdict = "SUSPICIOUS"
            confidence = "MEDIUM"
        elif combined_score >= 20:
            verdict = "POTENTIALLY_UNWANTED"
            confidence = "LOW"
        else:
            verdict = "CLEAN"
            confidence = "HIGH"

        return {
            "verdict": verdict,
            "confidence": confidence,
            "combined_score": combined_score,
            "iocs": iocs,
            "vt_detections": vt_malicious,
            "sandbox_score": sandbox_score,
            "signatures": sandbox_report.get("signatures", []) if sandbox_report else []
        }

Step 5: Push Results to SIEM

def push_to_splunk(verdict_result, splunk_url, splunk_token):
    """Send malware analysis verdict to Splunk HEC"""
    import json

    event = {
        "sourcetype": "malware_analysis",
        "source": "malware_pipeline",
        "event": {
            "sha256": verdict_result["sha256"],
            "verdict": verdict_result["verdict"],
            "confidence": verdict_result["confidence"],
            "score": verdict_result["combined_score"],
            "vt_detections": verdict_result["vt_detections"],
            "sandbox_score": verdict_result["sandbox_score"],
            "malware_family": verdict_result.get("threat_label", "Unknown"),
            "iocs": verdict_result["iocs"],
            "signatures": [s["name"] for s in verdict_result["signatures"]]
        }
    }

    response = requests.post(
        f"{splunk_url}/services/collector/event",
        headers={
            "Authorization": f"Splunk {splunk_token}",
            "Content-Type": "application/json"
        },
        json=event,
        verify=False
    )
    return response.status_code == 200

def push_iocs_to_blocklist(iocs, firewall_api):
    """Push extracted IOCs to blocking infrastructure"""
    for ip in iocs.get("ips", []):
        requests.post(
            f"{firewall_api}/block",
            json={"type": "ip", "value": ip, "action": "block", "source": "malware_pipeline"}
        )
    for domain in iocs.get("domains", []):
        requests.post(
            f"{firewall_api}/block",
            json={"type": "domain", "value": domain, "action": "sinkhole", "source": "malware_pipeline"}
        )

Step 6: Orchestrate the Full Pipeline

def run_malware_pipeline(sample_path, config):
    """Execute full malware analysis pipeline"""
    collector = MalwareCollector()
    screener = MalwarePreScreener(config["vt_key"])
    submitter = SandboxSubmitter(config["cuckoo_url"])
    generator = VerdictGenerator()

    # Step 1: Hash and pre-screen
    hashes = collector.compute_hashes(sample_path)
    pre_screen = screener.pre_screen(hashes["sha256"])

    # Step 2: Submit to sandbox if unknown
    sandbox_report = None
    if pre_screen["needs_sandbox"]:
        task_id = submitter.submit_to_cuckoo(sample_path)
        sandbox_report = submitter.wait_for_analysis(task_id)

    # Step 3: Generate verdict
    verdict = generator.generate_verdict(pre_screen, sandbox_report)
    verdict["sha256"] = hashes["sha256"]
    verdict["threat_label"] = pre_screen.get("virustotal", {}).get("threat_label", "Unknown")

    # Step 4: Push to SIEM
    push_to_splunk(verdict, config["splunk_url"], config["splunk_token"])

    # Step 5: Block if malicious
    if verdict["verdict"] == "MALICIOUS":
        push_iocs_to_blocklist(verdict["iocs"], config["firewall_api"])

    screener.close()
    return verdict

Key Concepts

TermDefinition
Dynamic AnalysisExecuting malware in a sandbox to observe runtime behavior (process creation, network, file system changes)
Static AnalysisExamining malware without execution (hash lookup, string analysis, PE header inspection)
Sandbox EvasionTechniques malware uses to detect sandbox environments and alter behavior to avoid analysis
IOC ExtractionAutomated process of identifying network indicators, file artifacts, and registry changes from sandbox reports
Multi-AV ScanningSubmitting samples to multiple antivirus engines (VirusTotal) for consensus-based detection
VerdictFinal classification of a sample: Malicious, Suspicious, Potentially Unwanted, or Clean

Tools & Systems

  • Cuckoo Sandbox: Open-source automated malware analysis platform with behavioral analysis and network capture
  • Joe Sandbox: Commercial sandbox with deep behavioral analysis, YARA matching, and MITRE ATT&CK mapping
  • Any.Run: Interactive sandbox service allowing real-time manipulation during analysis for debugging evasive malware
  • VirusTotal: Multi-engine scanning service providing 70+ AV results and behavioral analysis reports
  • CAPE Sandbox: Community-maintained Cuckoo fork with enhanced payload extraction and configuration dumping

Common Scenarios

  • Email Attachment Triage: Auto-submit quarantined email attachments, generate verdict in <5 minutes
  • EDR Quarantine Processing: Batch-process files quarantined by endpoint security for detailed analysis
  • Incident Investigation: Submit suspicious binaries found during IR for malware family identification and IOC extraction
  • Threat Intel Enrichment: Analyze samples from threat feeds to extract C2 infrastructure and update blocking
  • Zero-Day Detection: Sandbox catches novel malware missed by signature-based AV through behavioral analysis

Output Format

MALWARE ANALYSIS REPORT โ€” Pipeline Submission
โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”โ”
Sample:       invoice_march.docx
SHA256:       a1b2c3d4e5f6a7b8...
File Type:    Microsoft Word Document (macro-enabled)

Pre-Screening:
  VirusTotal:    34/72 malicious (Emotet.Downloader)
  MalwareBazaar: Tags: emotet, macro, downloader

Sandbox Analysis (Cuckoo):
  Score:         9.2/10 (MALICIOUS)
  Signatures:
    - Macro executes PowerShell download cradle (severity: 8)
    - Process injection into explorer.exe (severity: 9)
    - Connects to known Emotet C2 server (severity: 9)

Extracted IOCs:
  C2 IPs:       185.234.218[.]50:8080, 45.77.123[.]45:443
  Domains:       update-service[.]evil[.]com
  Dropped Files: payload.dll (SHA256: b2c3d4e5...)
  Registry:      HKCU\Software\Microsoft\Windows\CurrentVersion\Run\Update

VERDICT: MALICIOUS (Emotet Downloader) โ€” Confidence: HIGH
ACTIONS:
  [DONE] IOCs pushed to Splunk threat intel
  [DONE] C2 IPs blocked on firewall
  [DONE] Domain sinkholed on DNS
  [DONE] Hash blocked on endpoint

Verification Criteria

Confirm successful execution by validating:

  • [ ] All prerequisite tools and access requirements are satisfied
  • [ ] Each workflow step completed without errors
  • [ ] Output matches expected format and contains expected data
  • [ ] No security warnings or misconfigurations detected
  • [ ] Results are documented and evidence is preserved for audit

Compliance Framework Mapping

This skill supports compliance evidence collection across multiple frameworks:

  • SOC 2: CC7.1 (Monitoring), CC7.2 (Anomaly Detection), CC7.3 (Incident Identification)
  • ISO 27001: A.12.4 (Logging & Monitoring), A.16.1 (Security Incident Management)
  • NIST 800-53: AU-6 (Audit Review), SI-4 (System Monitoring), IR-5 (Incident Monitoring)
  • NIST CSF: DE.AE (Anomalies & Events), DE.CM (Continuous Monitoring)

Claw GRC Tip: When this skill is executed by a registered agent, compliance evidence is automatically captured and mapped to the relevant controls in your active frameworks.

Deploying This Skill with Claw GRC

Agent Execution

Register this skill with your Claw GRC agent for automated execution:

# Install via CLI
npx claw-grc skills add building-automated-malware-submission-pipeline

# Or load dynamically via MCP
grc.load_skill("building-automated-malware-submission-pipeline")

Audit Trail Integration

When executed through Claw GRC, every step of this skill generates tamper-evident audit records:

  • SHA-256 chain hashing ensures no step can be modified after execution
  • Evidence artifacts (configs, scan results, logs) are automatically attached to relevant controls
  • Trust score impact โ€” successful execution increases your agent's trust score

Continuous Compliance

Schedule this skill for recurring execution to maintain continuous compliance posture. Claw GRC monitors for drift and alerts when re-execution is needed.

Use with Claw GRC Agents

This skill is fully compatible with Claw GRC's autonomous agent system. Deploy it to any registered agent via MCP, and every execution will be logged in the tamper-evident audit trail.

// Load this skill in your agent
npx claw-grc skills add building-automated-malware-submission-pipeline
// Or via MCP
grc.load_skill("building-automated-malware-submission-pipeline")

Tags

socmalware-analysissandboxautomationvirustotalcuckooany-runpipeline

Related Skills

Security Operations

Performing IOC Enrichment Automation

7mยทintermediate
Security Operations

Implementing SOAR Automation with Phantom

6mยทintermediate
Security Operations

Investigating Phishing Email Incident

5mยทintermediate
Malware Analysis

Performing Automated Malware Analysis with Cape

3mยทintermediate
Security Operations

Analyzing DNS Logs for Exfiltration

6mยทintermediate
Security Operations

Analyzing Windows Event Logs in Splunk

5mยทintermediate

Skill Details

Domain
Security Operations
Difficulty
intermediate
Read Time
7 min
Code Examples
7

On This Page

When to UsePrerequisitesWorkflowKey ConceptsTools & SystemsCommon ScenariosOutput FormatVerification CriteriaCompliance Framework MappingDeploying This Skill with Claw GRC

Deploy This Skill

Add this skill to your Claw GRC agent and start automating.

Get Started Free โ†’