CG
SkillsBuilding IOC Enrichment Pipeline with Opencti
Start Free
Back to Skills Library
Threat Intelligence🟡 Intermediate

Building IOC Enrichment Pipeline with Opencti

Leverage OpenCTI — open-source platform for managing cyber threat intelligence knowledge, built on STIX 2.1 as its native data model. This guide covers building an automated IOC enrichment pipeline using O.

3 min read1 code examples

Prerequisites

  • Docker and Docker Compose for OpenCTI deployment
  • Python 3.9+ with `pycti` library
  • API keys for enrichment services: VirusTotal, Shodan, AbuseIPDB, GreyNoise
  • Understanding of STIX 2.1 data model and relationships
  • ElasticSearch or OpenSearch for OpenCTI backend
  • RabbitMQ or Redis for connector messaging

Building IOC Enrichment Pipeline with OpenCTI

Overview

OpenCTI is an open-source platform for managing cyber threat intelligence knowledge, built on STIX 2.1 as its native data model. This guide covers building an automated IOC enrichment pipeline using OpenCTI's connector ecosystem to enrich indicators with context from VirusTotal, Shodan, AbuseIPDB, GreyNoise, and other sources. The pipeline automatically enriches newly ingested indicators, correlates them with known threat actors and campaigns, and scores them for analyst prioritization.

Prerequisites

  • Docker and Docker Compose for OpenCTI deployment
  • Python 3.9+ with pycti library
  • API keys for enrichment services: VirusTotal, Shodan, AbuseIPDB, GreyNoise
  • Understanding of STIX 2.1 data model and relationships
  • ElasticSearch or OpenSearch for OpenCTI backend
  • RabbitMQ or Redis for connector messaging

Key Concepts

OpenCTI Architecture

OpenCTI uses a GraphQL API frontend backed by ElasticSearch for storage and Redis/RabbitMQ for connector communication. Data is natively stored as STIX 2.1 objects with relationships. Connectors are categorized as: External Import (feed ingestion), Internal Import (file parsing), Internal Enrichment (context addition), and Stream (real-time export).

Enrichment Connector Model

Internal enrichment connectors are triggered automatically when new observables are created or manually by analysts. Each connector receives STIX objects, queries external services, and returns STIX 2.1 bundles that augment the original observable with additional context, labels, and relationships.

Confidence Scoring

OpenCTI uses a 0-100 confidence scale for indicators. Enrichment connectors can update confidence scores based on external validation: VirusTotal detection ratios, Shodan exposure data, AbuseIPDB report counts, and GreyNoise classification results.

Practical Steps

Step 1: Deploy OpenCTI with Docker Compose

# docker-compose.yml (key services)
version: '3'
services:
  opencti:
    image: opencti/platform:6.4.4
    environment:
      - APP__PORT=8080
      - APP__ADMIN__EMAIL=admin@opencti.io
      - APP__ADMIN__PASSWORD=ChangeMeNow
      - APP__ADMIN__TOKEN=your-admin-token-uuid
      - ELASTICSEARCH__URL=http://elasticsearch:9200
      - MINIO__ENDPOINT=minio
      - RABBITMQ__HOSTNAME=rabbitmq
    ports:
      - "8080:8080"
    depends_on:
      - elasticsearch
      - minio
      - rabbitmq
      - redis

  connector-virustotal:
    image: opencti/connector-virustotal:6.4.4
    environment:
      - OPENCTI_URL=http://opencti:8080
      - OPENCTI_TOKEN=your-admin-token-uuid
      - CONNECTOR_ID=connector-virustotal-id
      - CONNECTOR_NAME=VirusTotal
      - CONNECTOR_SCOPE=StixFile,Artifact,IPv4-Addr,Domain-Name,Url
      - CONNECTOR_AUTO=true
      - VIRUSTOTAL_TOKEN=your-vt-api-key
      - VIRUSTOTAL_MAX_TLP=TLP:AMBER

  connector-shodan:
    image: opencti/connector-shodan:6.4.4
    environment:
      - OPENCTI_URL=http://opencti:8080
      - OPENCTI_TOKEN=your-admin-token-uuid
      - CONNECTOR_ID=connector-shodan-id
      - CONNECTOR_NAME=Shodan
      - CONNECTOR_SCOPE=IPv4-Addr
      - CONNECTOR_AUTO=true
      - SHODAN_TOKEN=your-shodan-api-key
      - SHODAN_MAX_TLP=TLP:AMBER

  connector-abuseipdb:
    image: opencti/connector-abuseipdb:6.4.4
    environment:
      - OPENCTI_URL=http://opencti:8080
      - OPENCTI_TOKEN=your-admin-token-uuid
      - CONNECTOR_ID=connector-abuseipdb-id
      - CONNECTOR_NAME=AbuseIPDB
      - CONNECTOR_SCOPE=IPv4-Addr
      - CONNECTOR_AUTO=true
      - ABUSEIPDB_API_KEY=your-abuseipdb-key

Step 2: Build Custom Enrichment Connector

import os
from pycti import OpenCTIConnectorHelper, get_config_variable
from stix2 import (
    Bundle, Indicator, Note, Relationship,
    IPv4Address, DomainName
)
import requests


class CustomEnrichmentConnector:
    def __init__(self):
        config = {
            "opencti": {
                "url": os.environ.get("OPENCTI_URL"),
                "token": os.environ.get("OPENCTI_TOKEN"),
            },
            "connector": {
                "id": os.environ.get("CONNECTOR_ID"),
                "name": "CustomEnrichment",
                "scope": "IPv4-Addr,Domain-Name,Url",
                "auto": True,
                "type": "INTERNAL_ENRICHMENT",
            },
        }
        self.helper = OpenCTIConnectorHelper(config)
        self.helper.listen(self._process_message)

    def _process_message(self, data):
        entity_id = data["entity_id"]
        stix_object = self.helper.api.stix_cyber_observable.read(id=entity_id)

        if not stix_object:
            return "Observable not found"

        observable_type = stix_object["entity_type"]
        observable_value = stix_object.get("value", "")

        enrichment_results = []

        if observable_type == "IPv4-Addr":
            enrichment_results = self._enrich_ip(observable_value, entity_id)
        elif observable_type == "Domain-Name":
            enrichment_results = self._enrich_domain(observable_value, entity_id)

        if enrichment_results:
            bundle = Bundle(objects=enrichment_results, allow_custom=True)
            self.helper.send_stix2_bundle(bundle.serialize())

        return "Enrichment completed"

    def _enrich_ip(self, ip_address, entity_id):
        """Enrich IP address with GreyNoise, AbuseIPDB context."""
        objects = []

        # GreyNoise Community API
        try:
            gn_response = requests.get(
                f"https://api.greynoise.io/v3/community/{ip_address}",
                headers={"key": os.environ.get("GREYNOISE_API_KEY")},
                timeout=30,
            )
            if gn_response.status_code == 200:
                gn_data = gn_response.json()
                classification = gn_data.get("classification", "unknown")
                noise = gn_data.get("noise", False)
                riot = gn_data.get("riot", False)

                note_content = (
                    f"## GreyNoise Enrichment\n"
                    f"- Classification: {classification}\n"
                    f"- Internet Noise: {noise}\n"
                    f"- RIOT (Benign Service): {riot}\n"
                    f"- Name: {gn_data.get('name', 'N/A')}\n"
                    f"- Last Seen: {gn_data.get('last_seen', 'N/A')}"
                )

                note = Note(
                    content=note_content,
                    object_refs=[entity_id],
                    abstract=f"GreyNoise: {classification}",
                    allow_custom=True,
                )
                objects.append(note)

                # Add labels based on classification
                if classification == "malicious":
                    self.helper.api.stix_cyber_observable.add_label(
                        id=entity_id, label_name="greynoise:malicious"
                    )
                elif riot:
                    self.helper.api.stix_cyber_observable.add_label(
                        id=entity_id, label_name="greynoise:benign-service"
                    )

        except Exception as e:
            self.helper.log_error(f"GreyNoise enrichment failed: {e}")

        return objects

    def _enrich_domain(self, domain, entity_id):
        """Enrich domain with WHOIS and DNS context."""
        objects = []

        try:
            # Use SecurityTrails API for domain enrichment
            st_response = requests.get(
                f"https://api.securitytrails.com/v1/domain/{domain}",
                headers={"APIKEY": os.environ.get("SECURITYTRAILS_API_KEY")},
                timeout=30,
            )
            if st_response.status_code == 200:
                st_data = st_response.json()
                current_dns = st_data.get("current_dns", {})

                a_records = [
                    r.get("ip") for r in current_dns.get("a", {}).get("values", [])
                ]

                note_content = (
                    f"## SecurityTrails Enrichment\n"
                    f"- A Records: {', '.join(a_records)}\n"
                    f"- Alexa Rank: {st_data.get('alexa_rank', 'N/A')}\n"
                    f"- Hostname: {st_data.get('hostname', 'N/A')}"
                )

                note = Note(
                    content=note_content,
                    object_refs=[entity_id],
                    abstract=f"SecurityTrails: {domain}",
                    allow_custom=True,
                )
                objects.append(note)

        except Exception as e:
            self.helper.log_error(f"SecurityTrails enrichment failed: {e}")

        return objects


if __name__ == "__main__":
    connector = CustomEnrichmentConnector()
## Verification Criteria

Confirm successful execution by validating:

- [ ] All prerequisite tools and access requirements are satisfied
- [ ] Each workflow step completed without errors
- [ ] Output matches expected format and contains expected data
- [ ] No security warnings or misconfigurations detected
- [ ] Results are documented and evidence is preserved for audit

## Compliance Framework Mapping

This skill supports compliance evidence collection across multiple frameworks:

- **SOC 2**: CC7.1 (Monitoring), CC7.2 (Anomaly Detection)
- **ISO 27001**: A.6.1 (Threat Intelligence), A.16.1 (Security Incident Management)
- **NIST 800-53**: PM-16 (Threat Awareness), RA-3 (Risk Assessment), SI-5 (Security Alerts)
- **NIST CSF**: ID.RA (Risk Assessment), DE.AE (Anomalies & Events)

> **Claw GRC Tip**: When this skill is executed by a registered agent, compliance evidence is automatically captured and mapped to the relevant controls in your active frameworks.

## Deploying This Skill with Claw GRC

### Agent Execution

Register this skill with your Claw GRC agent for automated execution:

Install via CLI

npx claw-grc skills add building-ioc-enrichment-pipeline-with-opencti

Or load dynamically via MCP

grc.load_skill("building-ioc-enrichment-pipeline-with-opencti")


### Audit Trail Integration

When executed through Claw GRC, every step of this skill generates tamper-evident audit records:

- **SHA-256 chain hashing** ensures no step can be modified after execution
- **Evidence artifacts** (configs, scan results, logs) are automatically attached to relevant controls
- **Trust score impact** — successful execution increases your agent's trust score

### Continuous Compliance

Schedule this skill for recurring execution to maintain continuous compliance posture. Claw GRC monitors for drift and alerts when re-execution is needed.

Use with Claw GRC Agents

This skill is fully compatible with Claw GRC's autonomous agent system. Deploy it to any registered agent via MCP, and every execution will be logged in the tamper-evident audit trail.

// Load this skill in your agent
npx claw-grc skills add building-ioc-enrichment-pipeline-with-opencti
// Or via MCP
grc.load_skill("building-ioc-enrichment-pipeline-with-opencti")

Tags

threat-intelligencectiiocmitre-attackstixopenctienrichmentvirustotal

Related Skills

Threat Intelligence

Building Threat Intelligence Platform

4m·intermediate
Threat Intelligence

Analyzing Campaign Attribution Evidence

3m·intermediate
Threat Intelligence

Analyzing Threat Actor TTPS with MITRE ATT&CK

4m·intermediate
Threat Intelligence

Collecting Threat Intelligence with Misp

3m·intermediate
Threat Intelligence

Implementing STIX Taxii Feed Integration

4m·intermediate
Threat Intelligence

Performing Indicator Lifecycle Management

3m·intermediate

Skill Details

Domain
Threat Intelligence
Difficulty
intermediate
Read Time
3 min
Code Examples
1

On This Page

OverviewPrerequisitesKey ConceptsPractical StepsVerification CriteriaCompliance Framework MappingDeploying This Skill with Claw GRC

Deploy This Skill

Add this skill to your Claw GRC agent and start automating.

Get Started Free →