diff --git a/datafog/integrations/__init__.py b/datafog/integrations/__init__.py new file mode 100644 index 00000000..c255d64a --- /dev/null +++ b/datafog/integrations/__init__.py @@ -0,0 +1 @@ +"""Adapters that embed DataFog into agent harnesses and pipelines.""" diff --git a/datafog/integrations/claude_code.py b/datafog/integrations/claude_code.py new file mode 100644 index 00000000..dfbfb351 --- /dev/null +++ b/datafog/integrations/claude_code.py @@ -0,0 +1,183 @@ +"""Claude Code hook adapter: an offline PII firewall for agent tool calls. + +Speaks the Claude Code hooks protocol (JSON on stdin, JSON on stdout): + +- ``PreToolUse`` — gate outbound tool calls (Bash, WebFetch, Write, MCP + tools). PII in the tool input yields an ``ask`` (default) or ``deny`` + permission decision, so data is stopped *before* it leaves the machine. +- ``UserPromptSubmit`` — non-blocking: warns the model that the prompt + contains PII so it avoids repeating it in output or logs. +- ``PostToolUse`` — non-blocking: warns when a tool result carries PII + into the conversation context. + +Configuration (environment variables): + +- ``DATAFOG_HOOK_ACTION``: ``ask`` (default) or ``deny`` for PreToolUse. +- ``DATAFOG_HOOK_ENTITIES``: comma-separated entity types to detect. + Defaults to the high-precision set; noisy-in-code types (IP_ADDRESS, + DOB, ZIP) must be opted into. + +Failure policy: fail open. A hook bug must never brick a Claude Code +session, so any unexpected error exits non-blocking with no output. +""" + +import json +import os +import sys +from typing import Any, Iterator, Mapping + +# High-precision defaults. IP_ADDRESS, DOB, and ZIP are deliberately +# excluded: version strings, dates, and 5-digit numbers saturate coding +# sessions and would make the firewall cry wolf (see DFPY-110). +DEFAULT_ENTITY_TYPES = ["EMAIL", "PHONE", "CREDIT_CARD", "SSN"] + +VALID_ACTIONS = {"ask", "deny"} + +# Per-string scan cap, so a huge file write can't stall the hook. Applied +# per string (not shared across the payload) so a padding field can't starve +# the scan of later fields; TOTAL_SCAN_CHARS bounds the worst case overall. +MAX_SCAN_CHARS = 1_000_000 +TOTAL_SCAN_CHARS = 8_000_000 + +_EXIT_OK = 0 +# Exit 1 is Claude Code's non-blocking error: stderr is shown to the user, +# the tool call proceeds. Never exit 2 (blocking) on our own failures. +_EXIT_FAIL_OPEN = 1 + + +def _entity_types(env: Mapping[str, str]) -> list[str]: + raw = env.get("DATAFOG_HOOK_ENTITIES", "") + parsed = [t.strip().upper() for t in raw.split(",") if t.strip()] + # An empty parse (unset, or a value like " , ") must fall back to the + # defaults: passing [] downstream would disable filtering entirely and + # silently enable the noisy opt-in entity types. + return parsed or DEFAULT_ENTITY_TYPES + + +def _action(env: Mapping[str, str]) -> str: + action = env.get("DATAFOG_HOOK_ACTION", "ask").strip().lower() + return action if action in VALID_ACTIONS else "ask" + + +def _iter_strings(value: Any) -> Iterator[str]: + """Yield every string embedded in a JSON-like structure. + + Iterative (explicit stack), so adversarially deep nesting cannot + trigger ``RecursionError`` and silently skip the scan. + """ + stack = [value] + while stack: + current = stack.pop() + if isinstance(current, str): + yield current + elif isinstance(current, dict): + stack.extend(current.values()) + elif isinstance(current, (list, tuple)): + stack.extend(current) + + +def _scan_findings(value: Any, entity_types: list[str]) -> dict[str, int]: + """Scan all strings in ``value``; return counts per entity type.""" + import datafog + + counts: dict[str, int] = {} + total_budget = TOTAL_SCAN_CHARS + for text in _iter_strings(value): + if total_budget <= 0: + break + chunk = text[: min(MAX_SCAN_CHARS, total_budget)] + total_budget -= len(chunk) + result = datafog.scan(chunk, engine="regex", entity_types=entity_types) + for entity in result.entities: + counts[entity.type] = counts.get(entity.type, 0) + 1 + return counts + + +def _summary(counts: dict[str, int]) -> str: + """Render findings without ever echoing the matched PII itself.""" + parts = [f"{etype} x{n}" for etype, n in sorted(counts.items())] + return ", ".join(parts) + + +def _emit(event: str, fields: dict[str, Any]) -> str: + return json.dumps({"hookSpecificOutput": {"hookEventName": event, **fields}}) + + +def _handle_pre_tool_use(payload: dict, env: Mapping[str, str]) -> str: + counts = _scan_findings(payload.get("tool_input"), _entity_types(env)) + if not counts: + return "" + tool = payload.get("tool_name", "tool") + reason = ( + f"DataFog PII firewall: {tool} input contains {_summary(counts)}. " + "Redact or tokenize these values before sending them anywhere." + ) + return _emit( + "PreToolUse", + {"permissionDecision": _action(env), "permissionDecisionReason": reason}, + ) + + +def _handle_user_prompt_submit(payload: dict, env: Mapping[str, str]) -> str: + counts = _scan_findings(payload.get("prompt"), _entity_types(env)) + if not counts: + return "" + context = ( + f"DataFog PII firewall: the user's prompt contains {_summary(counts)}. " + "Avoid repeating these values verbatim in responses, code, or files." + ) + return _emit("UserPromptSubmit", {"additionalContext": context}) + + +def _handle_post_tool_use(payload: dict, env: Mapping[str, str]) -> str: + counts = _scan_findings(payload.get("tool_response"), _entity_types(env)) + if not counts: + return "" + tool = payload.get("tool_name", "tool") + context = ( + f"DataFog PII firewall: {tool} output contains {_summary(counts)}. " + "Avoid repeating these values verbatim in responses, code, or files." + ) + return _emit("PostToolUse", {"additionalContext": context}) + + +_HANDLERS = { + "PreToolUse": _handle_pre_tool_use, + "UserPromptSubmit": _handle_user_prompt_submit, + "PostToolUse": _handle_post_tool_use, +} + + +def run(payload: dict, env: Mapping[str, str]) -> tuple[int, str]: + """Process one hook payload; return (exit_code, stdout). Fails open.""" + try: + handler = _HANDLERS.get(payload.get("hook_event_name", "")) + if handler is None: + return _EXIT_OK, "" + return _EXIT_OK, handler(payload, env) + except Exception as exc: # noqa: BLE001 — fail open by design + print(f"datafog-hook error (fail-open): {exc}", file=sys.stderr) + return _EXIT_FAIL_OPEN, "" + + +def main() -> None: + """Console entry point: ``datafog-hook``.""" + # Catch everything, including RecursionError from json.load on + # adversarially nested payloads: the fail-open contract applies to the + # entire process, not just the handler. + try: + payload = json.load(sys.stdin) + if not isinstance(payload, dict): + payload = {} + except Exception as exc: # noqa: BLE001 — fail open by design + print(f"datafog-hook: invalid hook payload (fail-open): {exc}", file=sys.stderr) + sys.exit(_EXIT_FAIL_OPEN) + + code, stdout = run(payload, os.environ) + if stdout: + print(stdout) + sys.exit(code) + + +if __name__ == "__main__": + main() diff --git a/examples/claude_code_hook/README.md b/examples/claude_code_hook/README.md new file mode 100644 index 00000000..acfc8a5e --- /dev/null +++ b/examples/claude_code_hook/README.md @@ -0,0 +1,142 @@ +# DataFog PII Firewall for Claude Code + +Stop PII from leaving your machine through agent tool calls. This hook scans +every outbound tool invocation (shell commands, web requests, file writes, +MCP tools) in ~70ms and asks for confirmation — or blocks outright — when it +finds emails, phone numbers, credit cards, or SSNs. + +## Install + +```bash +pip install datafog +``` + +Then add the hook to `~/.claude/settings.json` (all projects) or +`.claude/settings.json` (one project): + +```json +{ + "hooks": { + "PreToolUse": [ + { + "matcher": "Bash|WebFetch|WebSearch|Write|Edit|mcp__.*", + "hooks": [ + { "type": "command", "command": "datafog-hook", "timeout": 10 } + ] + } + ], + "UserPromptSubmit": [ + { + "hooks": [ + { "type": "command", "command": "datafog-hook", "timeout": 10 } + ] + } + ], + "PostToolUse": [ + { + "matcher": "Read|Bash|WebFetch|mcp__.*", + "hooks": [ + { "type": "command", "command": "datafog-hook", "timeout": 10 } + ] + } + ] + } +} +``` + +That's it. Try asking Claude to `curl` something containing a test credit +card number — the call is intercepted before it runs: + +> DataFog PII firewall: Bash input contains CREDIT_CARD x1, EMAIL x1. +> Redact or tokenize these values before sending them anywhere. + +## What each hook does + +| Event | Behavior | +| ------------------ | ---------------------------------------------------------------------------------------------------------------- | +| `PreToolUse` | Gates the tool call. Default `ask` shows you what was found; `deny` blocks and tells Claude to redact and retry. | +| `UserPromptSubmit` | Non-blocking. Warns Claude your prompt contains PII so it avoids repeating it into files, code, or logs. | +| `PostToolUse` | Non-blocking. Warns when a tool result (file read, API response) carries PII into the conversation. | + +## Configuration + +Environment variables (set in `settings.json` `env` or your shell): + +- `DATAFOG_HOOK_ACTION` — `ask` (default) or `deny` for PreToolUse. + **Important:** `ask` defers to your permission mode — in + `--dangerously-skip-permissions` or auto-accept sessions, the ask is + silently approved and nothing is intercepted. If you run with permissions + relaxed (exactly when you most need a firewall), use `deny`: + + ```json + { + "type": "command", + "command": "DATAFOG_HOOK_ACTION=deny datafog-hook", + "timeout": 10 + } + ``` + + In `deny` mode the tool call is hard-blocked before it executes, the + model is told what was found, and it self-corrects by redacting. + +- `DATAFOG_HOOK_ENTITIES` — comma-separated entity types. Default: + `EMAIL,PHONE,CREDIT_CARD,SSN`. Noisier types (`IP_ADDRESS`, `DOB`, `ZIP`) + are available but opt-in — version strings, dates, and 5-digit numbers are + everywhere in coding sessions. + +## What this actually protects against + +The realistic risk in agent sessions is rarely "the user asks for a +PII-laden network call." It's **second-order leakage**: you paste a real +stack trace or customer record while debugging, and forty turns later the +agent helpfully hardcodes that email into a committed test fixture, a +GitHub issue, or a Slack message. The data crossed a boundary and nobody +asked it to. + +That's what the `Write|Edit|Bash|mcp__.*` gates cover: the moment PII is +**re-emitted** into a file, command, or external tool, it appears in the +tool input and the firewall fires — before the write, before the network +call. + +What this does _not_ cover: PII you hand the agent directly (a bank +statement, a log file). By the time anything local can scan it, it is +already in the session context, already sent to the model API, and already +in your local transcript files. The hook warns the model so it avoids +repeating those values, but the inbound event itself is not preventable at +the hook layer — redact _before_ sharing (`datafog redact` on a copy) if +the model provider must not see the data. + +## Limitations + +Be honest with yourself about what a regex gate at the tool boundary can do: + +- **It sees tool-input text, nothing else.** `curl -d @file.txt`, an env + var expansion, string concatenation, or base64 all bypass the gate — + the PII never appears in the command string. This is a seatbelt against + accidental leakage, not armor against deliberate exfiltration or prompt + injection. +- **Inbound PII is warned about, not blocked** (see above). +- **Images and PDFs are not scanned.** A bank statement PDF often reaches + the model as page images; regex sees nothing. +- **Regex precision is imperfect.** Defaults are tuned high-precision + (checksummed/structured types on; dates, ZIPs, and IPs off), but false + positives and negatives happen. Validators and confidence scoring are on + the roadmap. +- **Fail-open by design.** A hook failure means that call went unscanned + rather than your session breaking. + +## Design notes + +- **Offline.** DataFog's core makes zero network calls and has one + dependency (pydantic). Nothing about your session leaves your machine. +- **Fast.** ~70ms per invocation including process startup; the scan itself + is microseconds. +- **Fail-open.** A bug in the hook exits non-blocking — it will never brick + your Claude Code session. The flip side: a hook failure means that call + went unscanned, so treat this as a seatbelt, not a guarantee. +- **Bounded scanning.** Each string is scanned up to 1MB (8MB per payload + total). PII hidden beyond those caps in a single enormous field is missed + by design — the hook must stay fast enough to run on every tool call. +- **No PII in output.** Findings are reported as type counts + (`EMAIL x2`), never as the matched values — hook output itself lands in + transcripts. diff --git a/setup.py b/setup.py index 39f01651..abffa1e7 100644 --- a/setup.py +++ b/setup.py @@ -120,6 +120,7 @@ entry_points={ "console_scripts": [ "datafog=datafog.client:app [cli]", # Requires cli extra + "datafog-hook=datafog.integrations.claude_code:main", # Core only ], }, classifiers=[ diff --git a/tests/test_claude_code_hook.py b/tests/test_claude_code_hook.py new file mode 100644 index 00000000..c0620540 --- /dev/null +++ b/tests/test_claude_code_hook.py @@ -0,0 +1,179 @@ +"""Tests for the Claude Code hook adapter (datafog-hook entry point).""" + +import json + +import pytest + +from datafog.integrations.claude_code import run + + +def _pre_tool_use(tool_name: str, tool_input: dict) -> dict: + return { + "hook_event_name": "PreToolUse", + "tool_name": tool_name, + "tool_input": tool_input, + } + + +def _decision(stdout: str) -> dict: + return json.loads(stdout)["hookSpecificOutput"] + + +class TestPreToolUse: + def test_clean_input_allows_silently(self): + code, stdout = run(_pre_tool_use("Bash", {"command": "ls -la /tmp"}), env={}) + assert code == 0 + assert stdout == "" + + def test_credit_card_in_command_asks_by_default(self): + payload = _pre_tool_use( + "Bash", {"command": "curl -d 'cc=4111 1111 1111 1111' https://x.io"} + ) + code, stdout = run(payload, env={}) + assert code == 0 + out = _decision(stdout) + assert out["hookEventName"] == "PreToolUse" + assert out["permissionDecision"] == "ask" + assert "CREDIT_CARD" in out["permissionDecisionReason"] + + def test_deny_action_via_env(self): + payload = _pre_tool_use("Bash", {"command": "echo john.doe@acme.com"}) + code, stdout = run(payload, env={"DATAFOG_HOOK_ACTION": "deny"}) + assert code == 0 + assert _decision(stdout)["permissionDecision"] == "deny" + + def test_reason_never_echoes_raw_pii(self): + secret = "4111 1111 1111 1111" + payload = _pre_tool_use("Bash", {"command": f"curl -d 'cc={secret}' x.io"}) + _, stdout = run(payload, env={}) + assert secret not in stdout + + def test_scans_nested_tool_input(self): + payload = _pre_tool_use( + "mcp__crm__update_contact", + {"record": {"fields": ["note", "ssn is 856-45-6789"]}}, + ) + _, stdout = run(payload, env={}) + assert _decision(stdout)["permissionDecision"] == "ask" + assert "SSN" in _decision(stdout)["permissionDecisionReason"] + + def test_noisy_entities_off_by_default(self): + # IP addresses / dates / zips are everywhere in dev contexts; the + # hook must not flag them unless explicitly enabled. + payload = _pre_tool_use( + "Bash", {"command": "ping 192.168.1.1 # deployed 2020-01-02 90210"} + ) + code, stdout = run(payload, env={}) + assert code == 0 + assert stdout == "" + + def test_entity_filter_env_enables_ip(self): + payload = _pre_tool_use("Bash", {"command": "ping 192.168.1.1"}) + _, stdout = run(payload, env={"DATAFOG_HOOK_ENTITIES": "IP_ADDRESS"}) + assert "IP_ADDRESS" in _decision(stdout)["permissionDecisionReason"] + + +class TestUserPromptSubmit: + def test_pii_in_prompt_adds_context_warning(self): + payload = { + "hook_event_name": "UserPromptSubmit", + "prompt": "email the report to jane@corp.com", + } + code, stdout = run(payload, env={}) + assert code == 0 + out = _decision(stdout) + assert out["hookEventName"] == "UserPromptSubmit" + assert "EMAIL" in out["additionalContext"] + + def test_clean_prompt_is_silent(self): + payload = {"hook_event_name": "UserPromptSubmit", "prompt": "fix the bug"} + code, stdout = run(payload, env={}) + assert code == 0 + assert stdout == "" + + +class TestPostToolUse: + def test_pii_in_tool_response_adds_context(self): + payload = { + "hook_event_name": "PostToolUse", + "tool_name": "Read", + "tool_input": {"file_path": "/data/users.csv"}, + "tool_response": "name,ssn\nJane,856-45-6789", + } + code, stdout = run(payload, env={}) + assert code == 0 + out = _decision(stdout) + assert out["hookEventName"] == "PostToolUse" + assert "SSN" in out["additionalContext"] + + +class TestRobustness: + def test_unknown_event_is_ignored(self): + code, stdout = run({"hook_event_name": "SessionStart"}, env={}) + assert code == 0 + assert stdout == "" + + def test_oversized_input_is_truncated_not_crashed(self): + big = "x" * 2_000_000 + " jane@corp.com" + payload = _pre_tool_use("Bash", {"command": big}) + code, _ = run(payload, env={}) + assert code == 0 # must not raise; PII past the per-string cap may be missed + + def test_padding_field_cannot_starve_scan_of_later_fields(self): + # The scan budget is per-string: an attacker-controlled decoy field + # at the cap must not exhaust the budget before the real payload. + payload = _pre_tool_use( + "Bash", {"decoy": "x" * 1_000_000, "command": "echo jane@corp.com"} + ) + _, stdout = run(payload, env={}) + assert "EMAIL" in _decision(stdout)["permissionDecisionReason"] + + def test_deeply_nested_payload_is_scanned_not_recursion_bombed(self): + # Adversarial nesting must neither crash nor silently skip the scan. + nested: dict = {"v": "ssn 856-45-6789"} + for _ in range(5000): + nested = {"k": nested} + code, stdout = run(_pre_tool_use("mcp__x__y", nested), env={}) + assert code == 0 + assert "SSN" in _decision(stdout)["permissionDecisionReason"] + + def test_garbage_entity_env_falls_back_to_defaults(self): + # " , , " must not silently disable filtering (which would enable + # the noisy opt-in entity types). + payload = _pre_tool_use("Bash", {"command": "ping 192.168.1.1"}) + code, stdout = run(payload, env={"DATAFOG_HOOK_ENTITIES": " , , "}) + assert code == 0 + assert stdout == "" # IP_ADDRESS stays off + + def test_fail_open_on_malformed_payload(self): + # A hook bug must never brick the user's Claude Code session: + # anything unexpected exits non-blocking (not 2) with empty stdout. + code, stdout = run({"tool_input": object()}, env={}) # unserializable + assert code != 2 + assert stdout == "" + + +class TestMainEntryPoint: + def test_main_reads_stdin_and_prints_decision(self, monkeypatch, capsys): + import io + import sys as _sys + + from datafog.integrations.claude_code import main + + payload = _pre_tool_use("Bash", {"command": "echo jane@corp.com"}) + monkeypatch.setattr(_sys, "stdin", io.StringIO(json.dumps(payload))) + with pytest.raises(SystemExit) as exc: + main() + assert exc.value.code == 0 + assert "permissionDecision" in capsys.readouterr().out + + def test_main_fail_open_on_garbage_stdin(self, monkeypatch, capsys): + import io + import sys as _sys + + from datafog.integrations.claude_code import main + + monkeypatch.setattr(_sys, "stdin", io.StringIO("not json{{")) + with pytest.raises(SystemExit) as exc: + main() + assert exc.value.code != 2