CG
SkillsDetecting DNS Exfiltration with DNS Query Analysis
Start Free
Back to Skills Library
Network Security🔴 Advanced

Detecting DNS Exfiltration with DNS Query Analysis

Detect data exfiltration through DNS tunneling by analyzing query entropy, subdomain length, query volume, TXT record abuse, and response payload sizes using passive DNS monitoring.

7 min read8 code examples

Prerequisites

  • Access to DNS query logs (passive DNS capture, DNS server logs, or PCAP)
  • Zeek, Suricata, or tcpdump for DNS traffic capture
  • Python 3.8+ with scipy, numpy, pandas, and scikit-learn
  • SIEM platform for alert correlation
  • Baseline of normal DNS traffic patterns for the environment

Detecting DNS Exfiltration with DNS Query Analysis

Overview

DNS exfiltration exploits the Domain Name System as a covert channel to extract data from compromised networks. Attackers encode stolen data into DNS query names (subdomains) or DNS response records (TXT, CNAME, NULL), bypassing traditional security controls that typically allow DNS traffic unrestricted. Tools like iodine, dnscat2, and dns2tcp enable full TCP tunneling over DNS. Detection requires analyzing DNS query patterns for anomalies including excessive query length, high entropy subdomain strings, abnormal query volumes to single domains, and oversized TXT record responses. This guide covers building a comprehensive DNS exfiltration detection capability using passive DNS analysis, statistical methods, and machine learning approaches.

Prerequisites

  • Access to DNS query logs (passive DNS capture, DNS server logs, or PCAP)
  • Zeek, Suricata, or tcpdump for DNS traffic capture
  • Python 3.8+ with scipy, numpy, pandas, and scikit-learn
  • SIEM platform for alert correlation
  • Baseline of normal DNS traffic patterns for the environment

Core Concepts

DNS Tunneling Mechanics

DNS exfiltration encodes data in different parts of DNS messages:

Outbound (Query-based exfiltration):

Encoded data as subdomain labels:
dGhlIHNlY3JldCBkYXRh.exfil.attacker.com
[base64-encoded data].[tunnel domain]

Query types used: A, AAAA, CNAME, MX, TXT, NULL

Inbound (Response-based command channel):

TXT records carry encoded commands/data in responses
CNAME records chain encoded data through multiple labels
NULL records carry arbitrary binary data

Detection Indicators

IndicatorNormal DNSDNS Tunneling
Subdomain length5-20 chars40-253 chars
Label count2-4 labels5-10+ labels
Shannon entropy2.5-3.5 bits4.0-5.5 bits
Query volume (per domain)Variable100s-1000s/min
TXT response size< 100 bytes200-4000+ bytes
Unique subdomainsLowVery high
Query type distributionMostly A/AAAAHeavy TXT, NULL, CNAME

Common Tunneling Tools

ToolProtocolEncodingDetection Difficulty
iodineIP-over-DNSBase32/Base64/RawMedium
dnscat2TCP-over-DNSHex encodingMedium
dns2tcpTCP-over-DNSBase64Medium
DNSExfiltratorCustomBase64Low
Cobalt Strike DNSC2 over DNSCustom encodingHigh

Implementation Steps

Step 1: Capture DNS Traffic

Using Zeek:

# Live capture
zeek -i eth0 -C base/protocols/dns

# Offline PCAP analysis
zeek -r traffic.pcap base/protocols/dns

# Output: dns.log with query, qtype, answers, TTL

Using tcpdump:

# Capture all DNS traffic
tcpdump -i eth0 -w dns_capture.pcap port 53

# Capture with size filter (large DNS packets)
tcpdump -i eth0 -w large_dns.pcap 'port 53 and greater 512'

Using Suricata:

# In suricata.yaml, enable DNS logging
outputs:
  - eve-log:
      types:
        - dns:
            query: yes
            answer: yes
            formats: [detailed]

Step 2: Analyze Query Characteristics

Python script for DNS exfiltration detection:

#!/usr/bin/env python3
"""DNS Exfiltration Detector - Analyzes DNS logs for tunneling indicators."""

import json
import math
import re
import sys
from collections import defaultdict
from datetime import datetime, timedelta

import pandas as pd


def calculate_entropy(domain: str) -> float:
    """Calculate Shannon entropy of a string."""
    if not domain:
        return 0.0
    freq = defaultdict(int)
    for char in domain:
        freq[char] += 1
    length = len(domain)
    entropy = -sum(
        (count / length) * math.log2(count / length)
        for count in freq.values()
    )
    return entropy


def extract_subdomain(query: str) -> str:
    """Extract subdomain portion from FQDN."""
    parts = query.rstrip('.').split('.')
    if len(parts) > 2:
        return '.'.join(parts[:-2])
    return ''


def get_base_domain(query: str) -> str:
    """Extract registered domain from FQDN."""
    parts = query.rstrip('.').split('.')
    if len(parts) >= 2:
        return '.'.join(parts[-2:])
    return query


def is_base64_like(s: str) -> bool:
    """Check if string resembles base64 encoding."""
    b64_chars = set('ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=')
    if len(s) < 10:
        return False
    char_ratio = sum(1 for c in s if c in b64_chars) / len(s)
    return char_ratio > 0.9 and calculate_entropy(s) > 4.0


def is_hex_encoded(s: str) -> bool:
    """Check if string appears hex-encoded."""
    hex_chars = set('0123456789abcdefABCDEF')
    if len(s) < 16:
        return False
    clean = s.replace('.', '').replace('-', '')
    return all(c in hex_chars for c in clean) and len(clean) % 2 == 0


class DNSExfiltrationDetector:
    def __init__(self):
        self.domain_stats = defaultdict(lambda: {
            'query_count': 0,
            'unique_subdomains': set(),
            'total_subdomain_length': 0,
            'entropy_sum': 0.0,
            'query_types': defaultdict(int),
            'source_ips': set(),
            'first_seen': None,
            'last_seen': None,
            'txt_response_sizes': [],
        })

        # Detection thresholds
        self.thresholds = {
            'min_query_count': 50,
            'min_unique_subdomains': 30,
            'avg_subdomain_length': 30,
            'avg_entropy': 3.8,
            'unique_ratio': 0.7,
            'txt_query_ratio': 0.3,
            'max_label_length': 63,
            'max_subdomain_labels': 5,
        }

    def process_query(self, timestamp, src_ip, query, qtype, response_size=0):
        """Process a single DNS query and update statistics."""
        base_domain = get_base_domain(query)
        subdomain = extract_subdomain(query)

        stats = self.domain_stats[base_domain]
        stats['query_count'] += 1
        stats['unique_subdomains'].add(subdomain)
        stats['total_subdomain_length'] += len(subdomain)
        stats['entropy_sum'] += calculate_entropy(subdomain)
        stats['query_types'][qtype] += 1
        stats['source_ips'].add(src_ip)

        if stats['first_seen'] is None:
            stats['first_seen'] = timestamp
        stats['last_seen'] = timestamp

        if qtype in ('TXT', 'NULL') and response_size > 0:
            stats['txt_response_sizes'].append(response_size)

    def analyze(self):
        """Analyze accumulated statistics and return suspicious domains."""
        alerts = []

        for domain, stats in self.domain_stats.items():
            if stats['query_count'] < self.thresholds['min_query_count']:
                continue

            unique_count = len(stats['unique_subdomains'])
            avg_length = stats['total_subdomain_length'] / stats['query_count']
            avg_entropy = stats['entropy_sum'] / stats['query_count']
            unique_ratio = unique_count / stats['query_count']

            txt_queries = stats['query_types'].get('TXT', 0) + stats['query_types'].get('NULL', 0)
            txt_ratio = txt_queries / stats['query_count']

            score = 0
            indicators = []

            if avg_length > self.thresholds['avg_subdomain_length']:
                score += 25
                indicators.append(f"high_avg_subdomain_length={avg_length:.1f}")

            if avg_entropy > self.thresholds['avg_entropy']:
                score += 25
                indicators.append(f"high_entropy={avg_entropy:.2f}")

            if unique_ratio > self.thresholds['unique_ratio']:
                score += 20
                indicators.append(f"high_unique_ratio={unique_ratio:.2f}")

            if txt_ratio > self.thresholds['txt_query_ratio']:
                score += 15
                indicators.append(f"high_txt_ratio={txt_ratio:.2f}")

            if unique_count > self.thresholds['min_unique_subdomains']:
                score += 15
                indicators.append(f"unique_subdomains={unique_count}")

            # Check for encoding patterns
            encoded_count = sum(
                1 for sd in list(stats['unique_subdomains'])[:100]
                if is_base64_like(sd) or is_hex_encoded(sd)
            )
            if encoded_count > 20:
                score += 20
                indicators.append(f"encoded_subdomains={encoded_count}")

            if score >= 50:
                duration = (stats['last_seen'] - stats['first_seen']).total_seconds() if stats['first_seen'] and stats['last_seen'] else 0
                alerts.append({
                    'domain': domain,
                    'score': min(score, 100),
                    'query_count': stats['query_count'],
                    'unique_subdomains': unique_count,
                    'avg_subdomain_length': round(avg_length, 1),
                    'avg_entropy': round(avg_entropy, 2),
                    'unique_ratio': round(unique_ratio, 2),
                    'txt_ratio': round(txt_ratio, 2),
                    'source_ips': list(stats['source_ips']),
                    'duration_seconds': duration,
                    'indicators': indicators,
                })

        return sorted(alerts, key=lambda x: x['score'], reverse=True)

    def process_zeek_dns_log(self, log_path):
        """Process Zeek dns.log file."""
        with open(log_path, 'r') as f:
            for line in f:
                if line.startswith('#'):
                    continue
                fields = line.strip().split('\t')
                if len(fields) < 22:
                    continue
                try:
                    ts = datetime.fromtimestamp(float(fields[0]))
                    src_ip = fields[2]
                    query = fields[9]
                    qtype = fields[11]
                    self.process_query(ts, src_ip, query, qtype)
                except (ValueError, IndexError):
                    continue

    def process_eve_json(self, log_path):
        """Process Suricata EVE JSON DNS log."""
        with open(log_path, 'r') as f:
            for line in f:
                try:
                    event = json.loads(line)
                    if event.get('event_type') != 'dns':
                        continue
                    dns = event.get('dns', {})
                    ts = datetime.fromisoformat(event['timestamp'].replace('Z', '+00:00'))
                    src_ip = event.get('src_ip', '')
                    query = dns.get('rrname', '')
                    qtype = dns.get('rrtype', '')
                    self.process_query(ts, src_ip, query, qtype)
                except (json.JSONDecodeError, KeyError, ValueError):
                    continue


def main():
    detector = DNSExfiltrationDetector()

    log_file = sys.argv[1] if len(sys.argv) > 1 else '/opt/zeek/logs/current/dns.log'

    if log_file.endswith('.json'):
        detector.process_eve_json(log_file)
    else:
        detector.process_zeek_dns_log(log_file)

    alerts = detector.analyze()

    if alerts:
        print(f"\n{'='*80}")
        print(f"DNS EXFILTRATION DETECTION RESULTS - {len(alerts)} suspicious domains found")
        print(f"{'='*80}\n")

        for alert in alerts:
            severity = "CRITICAL" if alert['score'] >= 80 else "HIGH" if alert['score'] >= 60 else "MEDIUM"
            print(f"[{severity}] Domain: {alert['domain']}")
            print(f"  Score: {alert['score']}/100")
            print(f"  Queries: {alert['query_count']}, Unique Subdomains: {alert['unique_subdomains']}")
            print(f"  Avg Subdomain Length: {alert['avg_subdomain_length']}, Avg Entropy: {alert['avg_entropy']}")
            print(f"  Source IPs: {', '.join(alert['source_ips'][:5])}")
            print(f"  Indicators: {', '.join(alert['indicators'])}")
            print()
    else:
        print("No DNS exfiltration indicators detected.")


if __name__ == '__main__':
    main()

Step 3: Deploy Suricata Rules for DNS Exfiltration

# Detect long DNS queries (potential tunneling)
alert dns $HOME_NET any -> any 53 (msg:"DNS Exfiltration - Excessive query length"; dns.query; content:"."; pcre:"/^.{60,}/"; threshold:type both,track by_src,count 20,seconds 60; classtype:bad-unknown; sid:3000001; rev:1;)

# Detect high-entropy DNS subdomain
alert dns $HOME_NET any -> any 53 (msg:"DNS Exfiltration - High entropy subdomain"; dns.query; pcre:"/^[a-zA-Z0-9+\/=]{30,}\./"; threshold:type both,track by_src,count 10,seconds 60; classtype:bad-unknown; sid:3000002; rev:1;)

# Detect large TXT record responses
alert dns any 53 -> $HOME_NET any (msg:"DNS Exfiltration - Large TXT response"; content:"|00 10|"; byte_test:2,>,400,0,relative; classtype:bad-unknown; sid:3000003; rev:1;)

# Detect NULL record queries (used by iodine)
alert dns $HOME_NET any -> any 53 (msg:"DNS Exfiltration - NULL record query (iodine indicator)"; content:"|00 0a|"; classtype:bad-unknown; sid:3000004; rev:1;)

# Detect dnscat2 traffic pattern
alert dns $HOME_NET any -> any 53 (msg:"DNS Exfiltration - dnscat2 indicator"; dns.query; content:"dnscat"; nocase; classtype:trojan-activity; sid:3000005; rev:1;)

Step 4: SIEM Detection Rules

Splunk SPL query for DNS exfiltration:

index=dns sourcetype=zeek:dns
| eval subdomain=mvindex(split(query,"."),0)
| eval subdomain_len=len(subdomain)
| eval label_count=mvcount(split(query,"."))
| stats count as query_count,
        dc(subdomain) as unique_subdomains,
        avg(subdomain_len) as avg_sub_len,
        values(src_ip) as source_ips
        by query_domain
| where query_count > 100 AND avg_sub_len > 30 AND unique_subdomains > 50
| eval risk_score = case(
    avg_sub_len > 50 AND unique_subdomains > 200, "Critical",
    avg_sub_len > 40 AND unique_subdomains > 100, "High",
    avg_sub_len > 30 AND unique_subdomains > 50, "Medium",
    true(), "Low")
| sort -query_count
| table query_domain risk_score query_count unique_subdomains avg_sub_len source_ips

Response Actions

  1. Block the tunnel domain at DNS resolver and firewall level
  2. Isolate the source host from the network for forensic investigation
  3. Capture full PCAP of the DNS traffic for evidence preservation
  4. Identify exfiltrated data by decoding captured DNS queries
  5. Check for persistence mechanisms on the compromised host
  6. Update blocklists with identified C2 domains and infrastructure

Best Practices

  • DNS Logging - Enable full DNS query and response logging at resolvers and network level
  • Internal DNS Only - Force all DNS through internal resolvers; block direct external DNS (port 53)
  • Response Policy Zones - Deploy RPZ feeds to block known tunneling domains
  • Baseline First - Establish normal DNS query patterns before setting detection thresholds
  • TXT Record Monitoring - Pay special attention to TXT and NULL record queries
  • Encrypted DNS Awareness - Monitor for DoH/DoT usage that may bypass DNS inspection

Verification Criteria

Confirm successful execution by validating:

  • [ ] All prerequisite tools and access requirements are satisfied
  • [ ] Each workflow step completed without errors
  • [ ] Output matches expected format and contains expected data
  • [ ] No security warnings or misconfigurations detected
  • [ ] Results are documented and evidence is preserved for audit

Compliance Framework Mapping

This skill supports compliance evidence collection across multiple frameworks:

  • SOC 2: CC6.6 (System Boundaries), CC6.7 (Restriction on Transmission)
  • ISO 27001: A.13.1 (Network Security), A.13.2 (Information Transfer)
  • NIST 800-53: SC-7 (Boundary Protection), AC-17 (Remote Access), SI-4 (System Monitoring)
  • NIST CSF: PR.AC (Access Control), PR.PT (Protective Technology)

Claw GRC Tip: When this skill is executed by a registered agent, compliance evidence is automatically captured and mapped to the relevant controls in your active frameworks.

Deploying This Skill with Claw GRC

Agent Execution

Register this skill with your Claw GRC agent for automated execution:

# Install via CLI
npx claw-grc skills add detecting-dns-exfiltration-with-dns-query-analysis

# Or load dynamically via MCP
grc.load_skill("detecting-dns-exfiltration-with-dns-query-analysis")

Audit Trail Integration

When executed through Claw GRC, every step of this skill generates tamper-evident audit records:

  • SHA-256 chain hashing ensures no step can be modified after execution
  • Evidence artifacts (configs, scan results, logs) are automatically attached to relevant controls
  • Trust score impact — successful execution increases your agent's trust score

Continuous Compliance

Schedule this skill for recurring execution to maintain continuous compliance posture. Claw GRC monitors for drift and alerts when re-execution is needed.

References

  • Splunk DNS Exfiltration Detection with Deep Learning
  • Akamai DNS Data Exfiltration
  • SANS Detecting DNS Tunneling
  • Fidelis DNS Tunneling Detection

Use with Claw GRC Agents

This skill is fully compatible with Claw GRC's autonomous agent system. Deploy it to any registered agent via MCP, and every execution will be logged in the tamper-evident audit trail.

// Load this skill in your agent
npx claw-grc skills add detecting-dns-exfiltration-with-dns-query-analysis
// Or via MCP
grc.load_skill("detecting-dns-exfiltration-with-dns-query-analysis")

Tags

dns-exfiltrationdns-tunnelingdata-exfiltrationthreat-detectionentropy-analysispassive-dnsnetwork-monitoringiodine

Related Skills

Network Security

Detecting Exfiltration Over DNS with Zeek

3m·intermediate
Network Security

Performing Network Traffic Analysis with Zeek

6m·intermediate
Threat Hunting

Hunting for DNS Tunneling with Zeek

3m·intermediate
Malware Analysis

Analyzing Network Covert Channels in Malware

3m·advanced
Network Security

Configuring Suricata for Network Monitoring

8m·intermediate
Network Security

Detecting Lateral Movement in Network

10m·intermediate

Skill Details

Domain
Network Security
Difficulty
advanced
Read Time
7 min
Code Examples
8

On This Page

OverviewPrerequisitesCore ConceptsImplementation StepsResponse ActionsBest PracticesReferencesVerification CriteriaCompliance Framework MappingDeploying This Skill with Claw GRC

Deploy This Skill

Add this skill to your Claw GRC agent and start automating.

Get Started Free →