CG
SkillsBuilding Threat Intelligence Enrichment in Splunk
Start Free
Back to Skills Library
Security Operations🟡 Intermediate

Building Threat Intelligence Enrichment in Splunk

Build automated threat intelligence enrichment pipelines in Splunk Enterprise Security using lookup tables, modular inputs, and the Threat Intelligence Framework.

4 min read12 code examples

Prerequisites

  • Splunk Enterprise Security (ES) 7.x or later
  • Threat Intelligence Management add-on or Threat Intelligence Framework
  • API keys for external threat intelligence feeds (MISP, OTX, VirusTotal, AbuseIPDB)
  • KV Store enabled and properly configured
  • Admin access for modular input configuration

Building Threat Intelligence Enrichment in Splunk

Overview

Splunk's Threat Intelligence Framework in Enterprise Security enables SOC teams to automatically correlate indicators of compromise (IOCs) against security events. The framework ingests threat feeds, normalizes indicators into KV Store collections, and uses lookup-based correlation searches to flag matching events. Splunk Threat Intelligence Management centralizes collection, normalization, and enrichment from multiple sources, reducing triage time by providing analysts with immediate context.

Prerequisites

  • Splunk Enterprise Security (ES) 7.x or later
  • Threat Intelligence Management add-on or Threat Intelligence Framework
  • API keys for external threat intelligence feeds (MISP, OTX, VirusTotal, AbuseIPDB)
  • KV Store enabled and properly configured
  • Admin access for modular input configuration

Threat Intelligence Framework Architecture

External TI Sources (STIX/TAXII, CSV, API)
    |
    v
Modular Inputs (download and parse feeds)
    |
    v
KV Store Collections (normalized IOC storage)
    |-- ip_intel
    |-- domain_intel
    |-- file_intel
    |-- url_intel
    |-- email_intel
    |
    v
Threat Intelligence Lookups
    |
    v
Correlation Searches (match events against IOCs)
    |
    v
Notable Events (enriched with TI context)

Configuring Threat Intelligence Sources

STIX/TAXII Feed Integration

# inputs.conf - TAXII feed configuration
[threatlist://taxii_feed_example]
description = TAXII 2.1 Threat Feed
type = taxii
url = https://threatfeed.example.com/taxii2/
collection = threat-indicators-v21
polling_interval = 3600
api_key = <encrypted_api_key>
disabled = false

CSV-Based Threat List

# inputs.conf - CSV threat list
[threatlist://custom_blocklist]
description = Internal threat blocklist
type = csv
url = https://internal.company.com/threat-feeds/blocklist.csv
polling_interval = 1800
disabled = false

Custom Modular Input for API-Based Feeds

# bin/threatfeed_otx.py - OTX AlienVault feed collector
import json
import sys
import requests
from splunklib.modularinput import Script, Scheme, Argument, Event


class OTXFeedInput(Script):
    def get_scheme(self):
        scheme = Scheme("OTX AlienVault Feed")
        scheme.description = "Collects IOCs from AlienVault OTX"
        scheme.use_external_validation = False
        scheme.streaming_mode = Scheme.streaming_mode_xml

        api_key_arg = Argument("api_key")
        api_key_arg.data_type = Argument.data_type_string
        api_key_arg.required_on_create = True
        scheme.add_argument(api_key_arg)

        pulse_days_arg = Argument("pulse_days")
        pulse_days_arg.data_type = Argument.data_type_number
        pulse_days_arg.required_on_create = False
        scheme.add_argument(pulse_days_arg)

        return scheme

    def stream_events(self, inputs, ew):
        for input_name, input_item in inputs.inputs.items():
            api_key = input_item["api_key"]
            pulse_days = int(input_item.get("pulse_days", 30))

            headers = {"X-OTX-API-KEY": api_key}
            url = f"https://otx.alienvault.com/api/v1/pulses/subscribed?modified_since={pulse_days}d"

            try:
                response = requests.get(url, headers=headers, timeout=60)
                response.raise_for_status()
                data = response.json()

                for pulse in data.get("results", []):
                    for indicator in pulse.get("indicators", []):
                        event = Event()
                        event.stanza = input_name
                        event.data = json.dumps({
                            "indicator": indicator["indicator"],
                            "type": indicator["type"],
                            "pulse_name": pulse["name"],
                            "pulse_id": pulse["id"],
                            "description": indicator.get("description", ""),
                            "created": indicator.get("created", ""),
                            "threat_source": "OTX",
                            "confidence": pulse.get("adversary", "unknown"),
                        })
                        ew.write_event(event)
            except requests.RequestException as e:
                ew.log("ERROR", f"OTX feed collection failed: {str(e)}")


if __name__ == "__main__":
    sys.exit(OTXFeedInput().run(sys.argv))

Building Enrichment Lookups

KV Store Collection Configuration

# collections.conf
[ip_threat_intel]
field.ip = string
field.threat_type = string
field.confidence = number
field.source = string
field.description = string
field.first_seen = time
field.last_seen = time
field.severity = string

[domain_threat_intel]
field.domain = string
field.threat_type = string
field.confidence = number
field.source = string
field.whois_registrar = string
field.whois_created = string

[file_hash_intel]
field.file_hash = string
field.hash_type = string
field.malware_family = string
field.confidence = number
field.source = string
field.detection_names = string

Lookup Table Definitions

# transforms.conf
[ip_threat_intel_lookup]
external_type = kvstore
collection = ip_threat_intel
fields_list = ip, threat_type, confidence, source, description, severity

[domain_threat_intel_lookup]
external_type = kvstore
collection = domain_threat_intel
fields_list = domain, threat_type, confidence, source

[file_hash_intel_lookup]
external_type = kvstore
collection = file_hash_intel
fields_list = file_hash, hash_type, malware_family, confidence, source

Enrichment Correlation Searches

IP-Based Threat Intelligence Correlation

| tstats summariesonly=true count from datamodel=Network_Traffic
    where All_Traffic.action=allowed
    by All_Traffic.src_ip, All_Traffic.dest_ip, All_Traffic.dest_port, _time span=5m
| rename "All_Traffic.*" as *
| lookup ip_threat_intel_lookup ip as dest_ip OUTPUT threat_type, confidence, source as ti_source, severity as ti_severity
| where isnotnull(threat_type)
| lookup asset_lookup ip as src_ip OUTPUT asset_name, asset_owner, asset_priority
| eval urgency=case(
    ti_severity=="critical" AND asset_priority=="critical", "critical",
    ti_severity=="high" OR asset_priority=="critical", "high",
    ti_severity=="medium", "medium",
    true(), "low"
)
| eval description="Connection from ".src_ip." (".asset_name.") to known malicious IP ".dest_ip." (".threat_type.") - Source: ".ti_source

Domain-Based Threat Intelligence Correlation

index=dns sourcetype=stream:dns query_type=A OR query_type=AAAA
| lookup domain_threat_intel_lookup domain as query OUTPUT threat_type as domain_threat, confidence as domain_confidence, source as ti_source
| where isnotnull(domain_threat) AND domain_confidence > 70
| stats count dc(src_ip) as unique_sources values(src_ip) as source_ips by query, domain_threat, ti_source
| eval severity=case(domain_confidence > 90, "critical", domain_confidence > 70, "high", true(), "medium")
| eval description="DNS queries to malicious domain ".query." from ".unique_sources." hosts - Threat: ".domain_threat

File Hash Correlation

index=endpoint sourcetype=sysmon EventCode=1
| lookup file_hash_intel_lookup file_hash as Hashes OUTPUT malware_family, confidence as hash_confidence, source as ti_source
| where isnotnull(malware_family)
| stats count values(ParentCommandLine) as parent_commands by Computer, User, Image, malware_family, ti_source
| eval severity="critical"
| eval description="Known malware ".malware_family." executed on ".Computer." by ".User." - Binary: ".Image

Multi-Source Enrichment Pipeline

index=firewall sourcetype=pan:traffic action=allowed
| eval indicators=mvappend(src_ip, dest_ip)
| mvexpand indicators
| lookup ip_threat_intel_lookup ip as indicators OUTPUT threat_type as ip_threat, confidence as ip_confidence, source as ip_ti_source
| lookup geo_ip_lookup ip as indicators OUTPUT country, city, latitude, longitude
| lookup whois_lookup ip as indicators OUTPUT org as ip_org, asn as ip_asn
| where isnotnull(ip_threat)
| stats count
    values(ip_threat) as threat_types
    values(ip_ti_source) as intel_sources
    values(country) as countries
    values(ip_org) as organizations
    latest(_time) as last_seen
    earliest(_time) as first_seen
    by src_ip, dest_ip, dest_port
| eval enrichment_context="Threat: ".mvjoin(threat_types, ", ")." | Geo: ".mvjoin(countries, ", ")." | Org: ".mvjoin(organizations, ", ")

Threat Intelligence Dashboards

IOC Coverage Statistics

| inputlookup ip_threat_intel_lookup
| stats count by source, threat_type
| sort -count
| head 20

Feed Freshness Monitoring

| inputlookup ip_threat_intel_lookup
| eval age_days=round((now() - strptime(last_seen, "%Y-%m-%dT%H:%M:%S")) / 86400, 0)
| stats count avg(age_days) as avg_age_days max(age_days) as max_age_days by source
| eval status=case(avg_age_days > 30, "STALE", avg_age_days > 7, "AGING", true(), "FRESH")

Verification Criteria

Confirm successful execution by validating:

  • [ ] All prerequisite tools and access requirements are satisfied
  • [ ] Each workflow step completed without errors
  • [ ] Output matches expected format and contains expected data
  • [ ] No security warnings or misconfigurations detected
  • [ ] Results are documented and evidence is preserved for audit

Compliance Framework Mapping

This skill supports compliance evidence collection across multiple frameworks:

  • SOC 2: CC7.1 (Monitoring), CC7.2 (Anomaly Detection), CC7.3 (Incident Identification)
  • ISO 27001: A.12.4 (Logging & Monitoring), A.16.1 (Security Incident Management)
  • NIST 800-53: AU-6 (Audit Review), SI-4 (System Monitoring), IR-5 (Incident Monitoring)
  • NIST CSF: DE.AE (Anomalies & Events), DE.CM (Continuous Monitoring)

Claw GRC Tip: When this skill is executed by a registered agent, compliance evidence is automatically captured and mapped to the relevant controls in your active frameworks.

Deploying This Skill with Claw GRC

Agent Execution

Register this skill with your Claw GRC agent for automated execution:

# Install via CLI
npx claw-grc skills add building-threat-intelligence-enrichment-in-splunk

# Or load dynamically via MCP
grc.load_skill("building-threat-intelligence-enrichment-in-splunk")

Audit Trail Integration

When executed through Claw GRC, every step of this skill generates tamper-evident audit records:

  • SHA-256 chain hashing ensures no step can be modified after execution
  • Evidence artifacts (configs, scan results, logs) are automatically attached to relevant controls
  • Trust score impact — successful execution increases your agent's trust score

Continuous Compliance

Schedule this skill for recurring execution to maintain continuous compliance posture. Claw GRC monitors for drift and alerts when re-execution is needed.

References

  • Splunk Threat Intelligence Framework Documentation
  • Splunk Lantern - Threat Intelligence Enrichment
  • Integrated Intelligence Enrichment - Splunk Blog
  • Cisco Talos Threat Intelligence in Splunk

Use with Claw GRC Agents

This skill is fully compatible with Claw GRC's autonomous agent system. Deploy it to any registered agent via MCP, and every execution will be logged in the tamper-evident audit trail.

// Load this skill in your agent
npx claw-grc skills add building-threat-intelligence-enrichment-in-splunk
// Or via MCP
grc.load_skill("building-threat-intelligence-enrichment-in-splunk")

Tags

splunkthreat-intelligenceenrichmentioclookupsiemsocenterprise-security

Related Skills

Security Operations

Building Detection Rule with Splunk Spl

5m·intermediate
Security Operations

Performing IOC Enrichment Automation

7m·intermediate
Security Operations

Building Detection Rules with Sigma

5m·intermediate
Security Operations

Building Threat Intelligence Feed Integration

5m·intermediate
Security Operations

Implementing SIEM Use Case Tuning

3m·intermediate
Security Operations

Implementing SIEM Use Cases for Detection

6m·intermediate

Skill Details

Domain
Security Operations
Difficulty
intermediate
Read Time
4 min
Code Examples
12

On This Page

OverviewPrerequisitesThreat Intelligence Framework ArchitectureConfiguring Threat Intelligence SourcesBuilding Enrichment LookupsEnrichment Correlation SearchesMulti-Source Enrichment PipelineThreat Intelligence DashboardsReferencesVerification CriteriaCompliance Framework MappingDeploying This Skill with Claw GRC

Deploy This Skill

Add this skill to your Claw GRC agent and start automating.

Get Started Free →