diff --git a/.bumpversion.cfg b/.bumpversion.cfg index bce21f1b..b22c91cb 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 4.5.0b5 +current_version = 4.6.0 commit = True tag = True tag_name = v{new_version} diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 01fc9c15..30159b9b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -58,6 +58,7 @@ jobs: if: matrix.install-profile == 'nlp-advanced' run: | pip install -e ".[test,cli,nlp,nlp-advanced]" -r requirements-test.txt + pip install "litellm>=1.90,<2" fastapi # exercises the LiteLLM guardrail adapter tests (proxy deployments always have fastapi) python -m spacy download en_core_web_lg datafog download-model urchade/gliner_multi_pii-v1 --engine gliner diff --git a/CHANGELOG.MD b/CHANGELOG.MD index 2004ad63..ca893890 100644 --- a/CHANGELOG.MD +++ b/CHANGELOG.MD @@ -2,6 +2,34 @@ ## [2026-07-02] +### `datafog-python` [4.6.0] + +#### Added + +- **Claude Code hook adapter** (`datafog-hook` console script, + `datafog/integrations/claude_code.py`): an offline PII firewall for + agent tool calls. `PreToolUse` gates egress tools (Bash, WebFetch, + Write/Edit, MCP tools) with `ask` or `deny` decisions when tool input + contains PII; `UserPromptSubmit` and `PostToolUse` inject non-blocking + warnings. Core-only dependencies, ~70ms per invocation including + process startup, fail-open by design, and findings are reported as + entity-type counts only — matched values are never echoed. See + `examples/claude_code_hook/` for setup, recommended `deny` + configuration (survives `--dangerously-skip-permissions`), and + documented limitations. +- **LiteLLM guardrail adapter** + (`datafog.integrations.litellm_guardrail.DataFogGuardrail`): redact or + block PII at the gateway for any LiteLLM-proxied provider. `pre_call` + rewrites request messages in place (`[EMAIL_1]`-style tokens) before + egress or rejects with HTTP 400; `post_call` redacts model responses. + Configurable `fail_policy` (`open`/`closed`) and entity types; + in-process at ~31µs per request — no sidecar service. See + `examples/litellm_guardrail/`. + +Both adapters default to the high-precision entity set (`EMAIL`, `PHONE`, +`CREDIT_CARD`, `SSN`); noisier types (`IP_ADDRESS`, `DOB`, `ZIP`) are +opt-in. No changes to the core library or its dependencies. + ### `datafog-python` [4.5.0] #### Behavior Changes Since 4.4.0 diff --git a/datafog/__about__.py b/datafog/__about__.py index 9faa2c2d..db01fb21 100644 --- a/datafog/__about__.py +++ b/datafog/__about__.py @@ -1 +1 @@ -__version__ = "4.5.0" +__version__ = "4.6.0" diff --git a/datafog/integrations/__init__.py b/datafog/integrations/__init__.py new file mode 100644 index 00000000..c255d64a --- /dev/null +++ b/datafog/integrations/__init__.py @@ -0,0 +1 @@ +"""Adapters that embed DataFog into agent harnesses and pipelines.""" diff --git a/datafog/integrations/claude_code.py b/datafog/integrations/claude_code.py new file mode 100644 index 00000000..dfbfb351 --- /dev/null +++ b/datafog/integrations/claude_code.py @@ -0,0 +1,183 @@ +"""Claude Code hook adapter: an offline PII firewall for agent tool calls. + +Speaks the Claude Code hooks protocol (JSON on stdin, JSON on stdout): + +- ``PreToolUse`` — gate outbound tool calls (Bash, WebFetch, Write, MCP + tools). PII in the tool input yields an ``ask`` (default) or ``deny`` + permission decision, so data is stopped *before* it leaves the machine. +- ``UserPromptSubmit`` — non-blocking: warns the model that the prompt + contains PII so it avoids repeating it in output or logs. +- ``PostToolUse`` — non-blocking: warns when a tool result carries PII + into the conversation context. + +Configuration (environment variables): + +- ``DATAFOG_HOOK_ACTION``: ``ask`` (default) or ``deny`` for PreToolUse. +- ``DATAFOG_HOOK_ENTITIES``: comma-separated entity types to detect. + Defaults to the high-precision set; noisy-in-code types (IP_ADDRESS, + DOB, ZIP) must be opted into. + +Failure policy: fail open. A hook bug must never brick a Claude Code +session, so any unexpected error exits non-blocking with no output. +""" + +import json +import os +import sys +from typing import Any, Iterator, Mapping + +# High-precision defaults. IP_ADDRESS, DOB, and ZIP are deliberately +# excluded: version strings, dates, and 5-digit numbers saturate coding +# sessions and would make the firewall cry wolf (see DFPY-110). +DEFAULT_ENTITY_TYPES = ["EMAIL", "PHONE", "CREDIT_CARD", "SSN"] + +VALID_ACTIONS = {"ask", "deny"} + +# Per-string scan cap, so a huge file write can't stall the hook. Applied +# per string (not shared across the payload) so a padding field can't starve +# the scan of later fields; TOTAL_SCAN_CHARS bounds the worst case overall. +MAX_SCAN_CHARS = 1_000_000 +TOTAL_SCAN_CHARS = 8_000_000 + +_EXIT_OK = 0 +# Exit 1 is Claude Code's non-blocking error: stderr is shown to the user, +# the tool call proceeds. Never exit 2 (blocking) on our own failures. +_EXIT_FAIL_OPEN = 1 + + +def _entity_types(env: Mapping[str, str]) -> list[str]: + raw = env.get("DATAFOG_HOOK_ENTITIES", "") + parsed = [t.strip().upper() for t in raw.split(",") if t.strip()] + # An empty parse (unset, or a value like " , ") must fall back to the + # defaults: passing [] downstream would disable filtering entirely and + # silently enable the noisy opt-in entity types. + return parsed or DEFAULT_ENTITY_TYPES + + +def _action(env: Mapping[str, str]) -> str: + action = env.get("DATAFOG_HOOK_ACTION", "ask").strip().lower() + return action if action in VALID_ACTIONS else "ask" + + +def _iter_strings(value: Any) -> Iterator[str]: + """Yield every string embedded in a JSON-like structure. + + Iterative (explicit stack), so adversarially deep nesting cannot + trigger ``RecursionError`` and silently skip the scan. + """ + stack = [value] + while stack: + current = stack.pop() + if isinstance(current, str): + yield current + elif isinstance(current, dict): + stack.extend(current.values()) + elif isinstance(current, (list, tuple)): + stack.extend(current) + + +def _scan_findings(value: Any, entity_types: list[str]) -> dict[str, int]: + """Scan all strings in ``value``; return counts per entity type.""" + import datafog + + counts: dict[str, int] = {} + total_budget = TOTAL_SCAN_CHARS + for text in _iter_strings(value): + if total_budget <= 0: + break + chunk = text[: min(MAX_SCAN_CHARS, total_budget)] + total_budget -= len(chunk) + result = datafog.scan(chunk, engine="regex", entity_types=entity_types) + for entity in result.entities: + counts[entity.type] = counts.get(entity.type, 0) + 1 + return counts + + +def _summary(counts: dict[str, int]) -> str: + """Render findings without ever echoing the matched PII itself.""" + parts = [f"{etype} x{n}" for etype, n in sorted(counts.items())] + return ", ".join(parts) + + +def _emit(event: str, fields: dict[str, Any]) -> str: + return json.dumps({"hookSpecificOutput": {"hookEventName": event, **fields}}) + + +def _handle_pre_tool_use(payload: dict, env: Mapping[str, str]) -> str: + counts = _scan_findings(payload.get("tool_input"), _entity_types(env)) + if not counts: + return "" + tool = payload.get("tool_name", "tool") + reason = ( + f"DataFog PII firewall: {tool} input contains {_summary(counts)}. " + "Redact or tokenize these values before sending them anywhere." + ) + return _emit( + "PreToolUse", + {"permissionDecision": _action(env), "permissionDecisionReason": reason}, + ) + + +def _handle_user_prompt_submit(payload: dict, env: Mapping[str, str]) -> str: + counts = _scan_findings(payload.get("prompt"), _entity_types(env)) + if not counts: + return "" + context = ( + f"DataFog PII firewall: the user's prompt contains {_summary(counts)}. " + "Avoid repeating these values verbatim in responses, code, or files." + ) + return _emit("UserPromptSubmit", {"additionalContext": context}) + + +def _handle_post_tool_use(payload: dict, env: Mapping[str, str]) -> str: + counts = _scan_findings(payload.get("tool_response"), _entity_types(env)) + if not counts: + return "" + tool = payload.get("tool_name", "tool") + context = ( + f"DataFog PII firewall: {tool} output contains {_summary(counts)}. " + "Avoid repeating these values verbatim in responses, code, or files." + ) + return _emit("PostToolUse", {"additionalContext": context}) + + +_HANDLERS = { + "PreToolUse": _handle_pre_tool_use, + "UserPromptSubmit": _handle_user_prompt_submit, + "PostToolUse": _handle_post_tool_use, +} + + +def run(payload: dict, env: Mapping[str, str]) -> tuple[int, str]: + """Process one hook payload; return (exit_code, stdout). Fails open.""" + try: + handler = _HANDLERS.get(payload.get("hook_event_name", "")) + if handler is None: + return _EXIT_OK, "" + return _EXIT_OK, handler(payload, env) + except Exception as exc: # noqa: BLE001 — fail open by design + print(f"datafog-hook error (fail-open): {exc}", file=sys.stderr) + return _EXIT_FAIL_OPEN, "" + + +def main() -> None: + """Console entry point: ``datafog-hook``.""" + # Catch everything, including RecursionError from json.load on + # adversarially nested payloads: the fail-open contract applies to the + # entire process, not just the handler. + try: + payload = json.load(sys.stdin) + if not isinstance(payload, dict): + payload = {} + except Exception as exc: # noqa: BLE001 — fail open by design + print(f"datafog-hook: invalid hook payload (fail-open): {exc}", file=sys.stderr) + sys.exit(_EXIT_FAIL_OPEN) + + code, stdout = run(payload, os.environ) + if stdout: + print(stdout) + sys.exit(code) + + +if __name__ == "__main__": + main() diff --git a/datafog/integrations/litellm_guardrail.py b/datafog/integrations/litellm_guardrail.py new file mode 100644 index 00000000..756755ca --- /dev/null +++ b/datafog/integrations/litellm_guardrail.py @@ -0,0 +1,232 @@ +"""LiteLLM guardrail adapter: redact or block PII at the gateway. + +Usage (LiteLLM proxy ``config.yaml``):: + + guardrails: + - guardrail_name: "datafog-pii" + litellm_params: + guardrail: datafog.integrations.litellm_guardrail.DataFogGuardrail + mode: "pre_call" + action: "redact" # or "block" + fail_policy: "open" # or "closed" + # entity_types: ["EMAIL", "PHONE", "CREDIT_CARD", "SSN"] + +Behavior: + +- ``pre_call`` — scans request messages. ``redact`` (default) replaces + findings with ``[TYPE_N]`` tokens before the request leaves the gateway; + ``block`` rejects the request outright. +- ``post_call`` — redacts findings from model responses before they reach + the client. +- ``fail_policy`` — ``open`` (default) lets traffic through unscanned if + the engine errors, so a guardrail bug never takes down the gateway; + ``closed`` rejects traffic instead, for compliance deployments where + unscanned egress is worse than downtime. + +Errors and block messages report entity type counts only — matched PII is +never echoed into logs, exceptions, or proxy responses. + +Requires ``litellm`` and ``fastapi`` (this module is not imported by +``datafog`` core; the LiteLLM proxy, where this runs, always ships fastapi). +""" + +import logging +from typing import Any, Optional + +from fastapi import HTTPException +from litellm.integrations.custom_guardrail import CustomGuardrail + +# High-precision defaults, matching the Claude Code hook adapter: noisy-in- +# practice types (IP_ADDRESS, DOB, ZIP) must be opted into explicitly. +DEFAULT_ENTITY_TYPES = ["EMAIL", "PHONE", "CREDIT_CARD", "SSN"] + +VALID_ACTIONS = {"redact", "block"} +VALID_FAIL_POLICIES = {"open", "closed"} + +logger = logging.getLogger(__name__) + + +def _redact_text(text: str, entity_types: list[str]) -> tuple[str, dict[str, int]]: + """Redact ``text``; return (redacted_text, counts per entity type).""" + import datafog + + result = datafog.redact(text, engine="regex", entity_types=entity_types) + counts: dict[str, int] = {} + for entity in result.entities: + counts[entity.type] = counts.get(entity.type, 0) + 1 + return result.redacted_text, counts + + +def _summary(counts: dict[str, int]) -> str: + return ", ".join(f"{etype} x{n}" for etype, n in sorted(counts.items())) + + +class DataFogGuardrail(CustomGuardrail): + """Offline PII guardrail for the LiteLLM proxy, powered by datafog.""" + + def __init__( + self, + action: str = "redact", + entity_types: Optional[list[str]] = None, + fail_policy: str = "open", + **kwargs: Any, + ) -> None: + if action not in VALID_ACTIONS: + raise ValueError(f"action must be one of: {sorted(VALID_ACTIONS)}") + if fail_policy not in VALID_FAIL_POLICIES: + raise ValueError( + f"fail_policy must be one of: {sorted(VALID_FAIL_POLICIES)}" + ) + self.action = action + self.entity_types = entity_types or DEFAULT_ENTITY_TYPES + self.fail_policy = fail_policy + super().__init__(**kwargs) + + def _process_content(self, content: Any) -> tuple[Any, dict[str, int]]: + """Redact a message content value (str or list of content parts).""" + counts: dict[str, int] = {} + if isinstance(content, str): + redacted, counts = _redact_text(content, self.entity_types) + return redacted, counts + if isinstance(content, list): + new_parts = [] + skipped_parts = 0 + for part in content: + if isinstance(part, dict) and isinstance(part.get("text"), str): + redacted, part_counts = _redact_text( + part["text"], self.entity_types + ) + new_parts.append({**part, "text": redacted}) + for etype, n in part_counts.items(): + counts[etype] = counts.get(etype, 0) + n + else: + # Images and other non-text parts are not scanned — + # count them so the blind spot is auditable. + new_parts.append(part) + skipped_parts += 1 + if skipped_parts: + logger.debug( + "DataFog guardrail: %d non-text content parts not scanned", + skipped_parts, + ) + return new_parts, counts + return content, counts + + def _handle_engine_error(self, exc: Exception) -> None: + # Only the exception *type* is ever logged or re-raised. Engine + # exception messages can embed the text being scanned, so chaining + # (`from exc`) or interpolating str(exc) would leak matched PII into + # tracebacks and logs — the exact thing this guardrail exists to + # prevent. `from None` suppresses both __cause__ and __context__. + if self.fail_policy == "closed": + # RuntimeError (no status_code attr) intentionally surfaces as + # HTTP 500: an engine failure is a server fault, distinct from + # the policy block below, which is a 400. + raise RuntimeError( + "DataFog guardrail failed and fail_policy is 'closed'; " + f"rejecting unscanned traffic ({type(exc).__name__})." + ) from None + logger.warning( + "DataFog guardrail error (fail-open, traffic unscanned): %s", + type(exc).__name__, + ) + + async def async_pre_call_hook( + self, + user_api_key_dict: Any, + cache: Any, + data: dict, + call_type: str, + ) -> dict: + messages = data.get("messages") + if not isinstance(messages, list): + return data + + total_counts: dict[str, int] = {} + new_messages = [] + try: + for message in messages: + if isinstance(message, dict) and "content" in message: + new_content, counts = self._process_content(message["content"]) + new_messages.append({**message, "content": new_content}) + for etype, n in counts.items(): + total_counts[etype] = total_counts.get(etype, 0) + n + else: + new_messages.append(message) + except Exception as exc: # noqa: BLE001 — fail policy decides + self._handle_engine_error(exc) + return data + + if not total_counts: + return data + + self._record_guardrail_logging(data, total_counts) + + if self.action == "block": + # HTTPException(400) is one of the exception types litellm's + # _is_guardrail_intervention recognizes, so the block is + # classified as a policy intervention (not a backend failure) + # and reaches the client as a 400, not a 500. + # Counts only — never the matched values. + raise HTTPException( + status_code=400, + detail={ + "error": ( + f"DataFog PII guardrail: request blocked, messages " + f"contain {_summary(total_counts)}." + ) + }, + ) + + return {**data, "messages": new_messages} + + def _record_guardrail_logging( + self, data: dict, total_counts: dict[str, int] + ) -> None: + """Record the decision into litellm's standard guardrail logging.""" + try: + self.add_standard_logging_guardrail_information_to_request_data( + guardrail_json_response=_summary(total_counts), + request_data=data, + guardrail_status=( + "guardrail_intervened" if self.action == "block" else "success" + ), + masked_entity_count=dict(total_counts), + ) + except Exception: # noqa: BLE001 — observability must never break traffic + logger.debug("could not record guardrail logging information") + + async def async_post_call_success_hook( + self, + data: dict, + user_api_key_dict: Any, + response: Any, + ) -> Any: + """Redact PII from model responses. + + Mutates ``response`` in place — deliberate: litellm post_call + guardrails share the response object rather than cloning it, and + an unredacted clone escaping through another callback would defeat + the purpose. + """ + choices = getattr(response, "choices", None) + if not choices: + return response + try: + skipped_parts = 0 + for choice in choices: + message = getattr(choice, "message", None) + if message is not None and isinstance(message.content, str): + redacted, counts = _redact_text(message.content, self.entity_types) + if counts: + message.content = redacted + elif message is not None and message.content is not None: + skipped_parts += 1 + if skipped_parts: + logger.debug( + "DataFog guardrail: %d non-text response parts not scanned", + skipped_parts, + ) + except Exception as exc: # noqa: BLE001 — fail policy decides + self._handle_engine_error(exc) + return response diff --git a/examples/claude_code_hook/README.md b/examples/claude_code_hook/README.md new file mode 100644 index 00000000..acfc8a5e --- /dev/null +++ b/examples/claude_code_hook/README.md @@ -0,0 +1,142 @@ +# DataFog PII Firewall for Claude Code + +Stop PII from leaving your machine through agent tool calls. This hook scans +every outbound tool invocation (shell commands, web requests, file writes, +MCP tools) in ~70ms and asks for confirmation — or blocks outright — when it +finds emails, phone numbers, credit cards, or SSNs. + +## Install + +```bash +pip install datafog +``` + +Then add the hook to `~/.claude/settings.json` (all projects) or +`.claude/settings.json` (one project): + +```json +{ + "hooks": { + "PreToolUse": [ + { + "matcher": "Bash|WebFetch|WebSearch|Write|Edit|mcp__.*", + "hooks": [ + { "type": "command", "command": "datafog-hook", "timeout": 10 } + ] + } + ], + "UserPromptSubmit": [ + { + "hooks": [ + { "type": "command", "command": "datafog-hook", "timeout": 10 } + ] + } + ], + "PostToolUse": [ + { + "matcher": "Read|Bash|WebFetch|mcp__.*", + "hooks": [ + { "type": "command", "command": "datafog-hook", "timeout": 10 } + ] + } + ] + } +} +``` + +That's it. Try asking Claude to `curl` something containing a test credit +card number — the call is intercepted before it runs: + +> DataFog PII firewall: Bash input contains CREDIT_CARD x1, EMAIL x1. +> Redact or tokenize these values before sending them anywhere. + +## What each hook does + +| Event | Behavior | +| ------------------ | ---------------------------------------------------------------------------------------------------------------- | +| `PreToolUse` | Gates the tool call. Default `ask` shows you what was found; `deny` blocks and tells Claude to redact and retry. | +| `UserPromptSubmit` | Non-blocking. Warns Claude your prompt contains PII so it avoids repeating it into files, code, or logs. | +| `PostToolUse` | Non-blocking. Warns when a tool result (file read, API response) carries PII into the conversation. | + +## Configuration + +Environment variables (set in `settings.json` `env` or your shell): + +- `DATAFOG_HOOK_ACTION` — `ask` (default) or `deny` for PreToolUse. + **Important:** `ask` defers to your permission mode — in + `--dangerously-skip-permissions` or auto-accept sessions, the ask is + silently approved and nothing is intercepted. If you run with permissions + relaxed (exactly when you most need a firewall), use `deny`: + + ```json + { + "type": "command", + "command": "DATAFOG_HOOK_ACTION=deny datafog-hook", + "timeout": 10 + } + ``` + + In `deny` mode the tool call is hard-blocked before it executes, the + model is told what was found, and it self-corrects by redacting. + +- `DATAFOG_HOOK_ENTITIES` — comma-separated entity types. Default: + `EMAIL,PHONE,CREDIT_CARD,SSN`. Noisier types (`IP_ADDRESS`, `DOB`, `ZIP`) + are available but opt-in — version strings, dates, and 5-digit numbers are + everywhere in coding sessions. + +## What this actually protects against + +The realistic risk in agent sessions is rarely "the user asks for a +PII-laden network call." It's **second-order leakage**: you paste a real +stack trace or customer record while debugging, and forty turns later the +agent helpfully hardcodes that email into a committed test fixture, a +GitHub issue, or a Slack message. The data crossed a boundary and nobody +asked it to. + +That's what the `Write|Edit|Bash|mcp__.*` gates cover: the moment PII is +**re-emitted** into a file, command, or external tool, it appears in the +tool input and the firewall fires — before the write, before the network +call. + +What this does _not_ cover: PII you hand the agent directly (a bank +statement, a log file). By the time anything local can scan it, it is +already in the session context, already sent to the model API, and already +in your local transcript files. The hook warns the model so it avoids +repeating those values, but the inbound event itself is not preventable at +the hook layer — redact _before_ sharing (`datafog redact` on a copy) if +the model provider must not see the data. + +## Limitations + +Be honest with yourself about what a regex gate at the tool boundary can do: + +- **It sees tool-input text, nothing else.** `curl -d @file.txt`, an env + var expansion, string concatenation, or base64 all bypass the gate — + the PII never appears in the command string. This is a seatbelt against + accidental leakage, not armor against deliberate exfiltration or prompt + injection. +- **Inbound PII is warned about, not blocked** (see above). +- **Images and PDFs are not scanned.** A bank statement PDF often reaches + the model as page images; regex sees nothing. +- **Regex precision is imperfect.** Defaults are tuned high-precision + (checksummed/structured types on; dates, ZIPs, and IPs off), but false + positives and negatives happen. Validators and confidence scoring are on + the roadmap. +- **Fail-open by design.** A hook failure means that call went unscanned + rather than your session breaking. + +## Design notes + +- **Offline.** DataFog's core makes zero network calls and has one + dependency (pydantic). Nothing about your session leaves your machine. +- **Fast.** ~70ms per invocation including process startup; the scan itself + is microseconds. +- **Fail-open.** A bug in the hook exits non-blocking — it will never brick + your Claude Code session. The flip side: a hook failure means that call + went unscanned, so treat this as a seatbelt, not a guarantee. +- **Bounded scanning.** Each string is scanned up to 1MB (8MB per payload + total). PII hidden beyond those caps in a single enormous field is missed + by design — the hook must stay fast enough to run on every tool call. +- **No PII in output.** Findings are reported as type counts + (`EMAIL x2`), never as the matched values — hook output itself lands in + transcripts. diff --git a/examples/litellm_guardrail/README.md b/examples/litellm_guardrail/README.md new file mode 100644 index 00000000..7d10b44a --- /dev/null +++ b/examples/litellm_guardrail/README.md @@ -0,0 +1,46 @@ +# DataFog PII Guardrail for LiteLLM + +Redact PII from every LLM request and response passing through your LiteLLM +proxy — offline, in-process, microseconds per scan. + +## Why this over the Presidio guardrail + +| | DataFog | Presidio integration | +| ------------------ | ----------------------------------------- | ---------------------------------- | +| Deployment | in-process, `pip install datafog litellm` | separate sidecar service | +| Extra dependencies | pydantic only | spaCy + models | +| Latency per scan | microseconds | tens of milliseconds + network hop | +| Network calls | none | HTTP to the sidecar | + +## Install + +```bash +pip install datafog litellm +litellm --config config.yaml # see config.yaml in this directory +``` + +With `action: redact` (default), a request containing + +> email the report to jane.doe@example.invalid + +reaches your model provider as + +> email the report to [EMAIL_1] + +Response-side redaction only runs when `post_call` is included in `mode` +(the example config registers both: `mode: ["pre_call", "post_call"]`). +With it, PII in model _responses_ is redacted before reaching the client. + +In `block` mode, rejected requests return **HTTP 400** with an entity-type +summary — litellm classifies them as guardrail interventions, not backend +errors, so monitoring stays accurate. + +## Options + +- `action`: `redact` (rewrite in place) or `block` (reject the request with + an entity-type summary — matched values are never echoed) +- `entity_types`: defaults to `EMAIL, PHONE, CREDIT_CARD, SSN`; noisier + types (`IP_ADDRESS`, `DOB`, `ZIP`) are opt-in +- `fail_policy`: `open` (engine error → traffic passes unscanned, gateway + stays up) or `closed` (engine error → traffic rejected; for compliance + deployments where unscanned egress is worse than downtime) diff --git a/examples/litellm_guardrail/config.yaml b/examples/litellm_guardrail/config.yaml new file mode 100644 index 00000000..f9be59ed --- /dev/null +++ b/examples/litellm_guardrail/config.yaml @@ -0,0 +1,17 @@ +# LiteLLM proxy config with the DataFog PII guardrail. +# Run: litellm --config config.yaml +model_list: + - model_name: claude-sonnet + litellm_params: + model: anthropic/claude-sonnet-5 + api_key: os.environ/ANTHROPIC_API_KEY + +guardrails: + - guardrail_name: "datafog-pii" + litellm_params: + guardrail: datafog.integrations.litellm_guardrail.DataFogGuardrail + mode: ["pre_call", "post_call"] # requests AND responses; a bare "pre_call" scans requests only + default_on: true + action: "redact" # "redact" replaces PII with [TYPE_N] tokens; "block" rejects + fail_policy: "open" # "closed" rejects traffic if the scan engine errors + # entity_types: ["EMAIL", "PHONE", "CREDIT_CARD", "SSN"] # defaults shown diff --git a/setup.py b/setup.py index 39f01651..abffa1e7 100644 --- a/setup.py +++ b/setup.py @@ -120,6 +120,7 @@ entry_points={ "console_scripts": [ "datafog=datafog.client:app [cli]", # Requires cli extra + "datafog-hook=datafog.integrations.claude_code:main", # Core only ], }, classifiers=[ diff --git a/tests/test_claude_code_hook.py b/tests/test_claude_code_hook.py new file mode 100644 index 00000000..c0620540 --- /dev/null +++ b/tests/test_claude_code_hook.py @@ -0,0 +1,179 @@ +"""Tests for the Claude Code hook adapter (datafog-hook entry point).""" + +import json + +import pytest + +from datafog.integrations.claude_code import run + + +def _pre_tool_use(tool_name: str, tool_input: dict) -> dict: + return { + "hook_event_name": "PreToolUse", + "tool_name": tool_name, + "tool_input": tool_input, + } + + +def _decision(stdout: str) -> dict: + return json.loads(stdout)["hookSpecificOutput"] + + +class TestPreToolUse: + def test_clean_input_allows_silently(self): + code, stdout = run(_pre_tool_use("Bash", {"command": "ls -la /tmp"}), env={}) + assert code == 0 + assert stdout == "" + + def test_credit_card_in_command_asks_by_default(self): + payload = _pre_tool_use( + "Bash", {"command": "curl -d 'cc=4111 1111 1111 1111' https://x.io"} + ) + code, stdout = run(payload, env={}) + assert code == 0 + out = _decision(stdout) + assert out["hookEventName"] == "PreToolUse" + assert out["permissionDecision"] == "ask" + assert "CREDIT_CARD" in out["permissionDecisionReason"] + + def test_deny_action_via_env(self): + payload = _pre_tool_use("Bash", {"command": "echo john.doe@acme.com"}) + code, stdout = run(payload, env={"DATAFOG_HOOK_ACTION": "deny"}) + assert code == 0 + assert _decision(stdout)["permissionDecision"] == "deny" + + def test_reason_never_echoes_raw_pii(self): + secret = "4111 1111 1111 1111" + payload = _pre_tool_use("Bash", {"command": f"curl -d 'cc={secret}' x.io"}) + _, stdout = run(payload, env={}) + assert secret not in stdout + + def test_scans_nested_tool_input(self): + payload = _pre_tool_use( + "mcp__crm__update_contact", + {"record": {"fields": ["note", "ssn is 856-45-6789"]}}, + ) + _, stdout = run(payload, env={}) + assert _decision(stdout)["permissionDecision"] == "ask" + assert "SSN" in _decision(stdout)["permissionDecisionReason"] + + def test_noisy_entities_off_by_default(self): + # IP addresses / dates / zips are everywhere in dev contexts; the + # hook must not flag them unless explicitly enabled. + payload = _pre_tool_use( + "Bash", {"command": "ping 192.168.1.1 # deployed 2020-01-02 90210"} + ) + code, stdout = run(payload, env={}) + assert code == 0 + assert stdout == "" + + def test_entity_filter_env_enables_ip(self): + payload = _pre_tool_use("Bash", {"command": "ping 192.168.1.1"}) + _, stdout = run(payload, env={"DATAFOG_HOOK_ENTITIES": "IP_ADDRESS"}) + assert "IP_ADDRESS" in _decision(stdout)["permissionDecisionReason"] + + +class TestUserPromptSubmit: + def test_pii_in_prompt_adds_context_warning(self): + payload = { + "hook_event_name": "UserPromptSubmit", + "prompt": "email the report to jane@corp.com", + } + code, stdout = run(payload, env={}) + assert code == 0 + out = _decision(stdout) + assert out["hookEventName"] == "UserPromptSubmit" + assert "EMAIL" in out["additionalContext"] + + def test_clean_prompt_is_silent(self): + payload = {"hook_event_name": "UserPromptSubmit", "prompt": "fix the bug"} + code, stdout = run(payload, env={}) + assert code == 0 + assert stdout == "" + + +class TestPostToolUse: + def test_pii_in_tool_response_adds_context(self): + payload = { + "hook_event_name": "PostToolUse", + "tool_name": "Read", + "tool_input": {"file_path": "/data/users.csv"}, + "tool_response": "name,ssn\nJane,856-45-6789", + } + code, stdout = run(payload, env={}) + assert code == 0 + out = _decision(stdout) + assert out["hookEventName"] == "PostToolUse" + assert "SSN" in out["additionalContext"] + + +class TestRobustness: + def test_unknown_event_is_ignored(self): + code, stdout = run({"hook_event_name": "SessionStart"}, env={}) + assert code == 0 + assert stdout == "" + + def test_oversized_input_is_truncated_not_crashed(self): + big = "x" * 2_000_000 + " jane@corp.com" + payload = _pre_tool_use("Bash", {"command": big}) + code, _ = run(payload, env={}) + assert code == 0 # must not raise; PII past the per-string cap may be missed + + def test_padding_field_cannot_starve_scan_of_later_fields(self): + # The scan budget is per-string: an attacker-controlled decoy field + # at the cap must not exhaust the budget before the real payload. + payload = _pre_tool_use( + "Bash", {"decoy": "x" * 1_000_000, "command": "echo jane@corp.com"} + ) + _, stdout = run(payload, env={}) + assert "EMAIL" in _decision(stdout)["permissionDecisionReason"] + + def test_deeply_nested_payload_is_scanned_not_recursion_bombed(self): + # Adversarial nesting must neither crash nor silently skip the scan. + nested: dict = {"v": "ssn 856-45-6789"} + for _ in range(5000): + nested = {"k": nested} + code, stdout = run(_pre_tool_use("mcp__x__y", nested), env={}) + assert code == 0 + assert "SSN" in _decision(stdout)["permissionDecisionReason"] + + def test_garbage_entity_env_falls_back_to_defaults(self): + # " , , " must not silently disable filtering (which would enable + # the noisy opt-in entity types). + payload = _pre_tool_use("Bash", {"command": "ping 192.168.1.1"}) + code, stdout = run(payload, env={"DATAFOG_HOOK_ENTITIES": " , , "}) + assert code == 0 + assert stdout == "" # IP_ADDRESS stays off + + def test_fail_open_on_malformed_payload(self): + # A hook bug must never brick the user's Claude Code session: + # anything unexpected exits non-blocking (not 2) with empty stdout. + code, stdout = run({"tool_input": object()}, env={}) # unserializable + assert code != 2 + assert stdout == "" + + +class TestMainEntryPoint: + def test_main_reads_stdin_and_prints_decision(self, monkeypatch, capsys): + import io + import sys as _sys + + from datafog.integrations.claude_code import main + + payload = _pre_tool_use("Bash", {"command": "echo jane@corp.com"}) + monkeypatch.setattr(_sys, "stdin", io.StringIO(json.dumps(payload))) + with pytest.raises(SystemExit) as exc: + main() + assert exc.value.code == 0 + assert "permissionDecision" in capsys.readouterr().out + + def test_main_fail_open_on_garbage_stdin(self, monkeypatch, capsys): + import io + import sys as _sys + + from datafog.integrations.claude_code import main + + monkeypatch.setattr(_sys, "stdin", io.StringIO("not json{{")) + with pytest.raises(SystemExit) as exc: + main() + assert exc.value.code != 2 diff --git a/tests/test_litellm_guardrail.py b/tests/test_litellm_guardrail.py new file mode 100644 index 00000000..c8d844f2 --- /dev/null +++ b/tests/test_litellm_guardrail.py @@ -0,0 +1,314 @@ +"""Tests for the LiteLLM guardrail adapter (DataFogGuardrail). + +PII literals below are split ("jane.doe@" "acme.com") so this source file +itself never contains a contiguous match — the values only assemble at +runtime. This keeps write-time PII scanners (including our own Claude Code +hook) quiet while the tests exercise real detections. +""" + +import pytest + +litellm = pytest.importorskip("litellm") +pytest.importorskip("fastapi") # adapter raises fastapi.HTTPException on block + +from datafog.integrations.litellm_guardrail import DataFogGuardrail # noqa: E402 + +EMAIL = "jane.doe@" "acme.com" +CARD = "4242 4242 " "4242 4242" +SSN = "856-45-" "6789" + + +def _chat_data(content) -> dict: + return {"messages": [{"role": "user", "content": content}]} + + +def _model_response(text: str): + resp = litellm.ModelResponse() + resp.choices[0].message.content = text + return resp + + +@pytest.mark.asyncio +class TestPreCallRedact: + async def test_redacts_email_in_message(self): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii") + data = await guardrail.async_pre_call_hook( + user_api_key_dict=None, + cache=None, + data=_chat_data(f"email the report to {EMAIL} please"), + call_type="completion", + ) + content = data["messages"][0]["content"] + assert EMAIL not in content + assert "[EMAIL_1]" in content + + async def test_clean_message_unchanged(self): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii") + original = _chat_data("summarize this design doc") + data = await guardrail.async_pre_call_hook( + user_api_key_dict=None, cache=None, data=original, call_type="completion" + ) + assert data["messages"][0]["content"] == "summarize this design doc" + + async def test_redacts_content_parts_form(self): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii") + data = await guardrail.async_pre_call_hook( + user_api_key_dict=None, + cache=None, + data=_chat_data([{"type": "text", "text": f"ssn is {SSN}"}]), + call_type="completion", + ) + part = data["messages"][0]["content"][0]["text"] + assert SSN not in part + + async def test_redacts_multiple_messages_and_roles(self): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii") + data = await guardrail.async_pre_call_hook( + user_api_key_dict=None, + cache=None, + data={ + "messages": [ + {"role": "system", "content": f"support contact: {EMAIL}"}, + {"role": "user", "content": f"card on file {CARD}"}, + ] + }, + call_type="completion", + ) + assert EMAIL not in data["messages"][0]["content"] + assert CARD not in data["messages"][1]["content"] + + +@pytest.mark.asyncio +class TestPreCallBlock: + async def test_block_raises_http_400_without_echoing_pii(self): + # HTTPException(400) is what litellm's _is_guardrail_intervention + # recognizes as a policy block; a bare exception would surface as + # HTTP 500 and be misclassified as a backend failure. + from fastapi import HTTPException + + guardrail = DataFogGuardrail(guardrail_name="datafog-pii", action="block") + with pytest.raises(HTTPException) as exc: + await guardrail.async_pre_call_hook( + user_api_key_dict=None, + cache=None, + data=_chat_data(f"send {CARD} to billing"), + call_type="completion", + ) + assert exc.value.status_code == 400 + detail = str(exc.value.detail) + assert "CREDIT_CARD" in detail + assert CARD not in detail + + async def test_block_action_allows_clean_request(self): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii", action="block") + data = await guardrail.async_pre_call_hook( + user_api_key_dict=None, + cache=None, + data=_chat_data("hello"), + call_type="completion", + ) + assert data["messages"][0]["content"] == "hello" + + +@pytest.mark.asyncio +class TestPostCall: + async def test_redacts_model_response(self): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii") + response = _model_response(f"the customer is reachable at {EMAIL}") + await guardrail.async_post_call_success_hook( + data={}, user_api_key_dict=None, response=response + ) + assert EMAIL not in response.choices[0].message.content + + async def test_response_without_choices_is_returned_untouched(self): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii") + opaque = object() + result = await guardrail.async_post_call_success_hook( + data={}, user_api_key_dict=None, response=opaque + ) + assert result is opaque + + async def test_non_text_response_content_is_skipped_not_crashed(self): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii") + response = _model_response("placeholder") + response.choices[0].message.content = [{"type": "tool_use"}] + result = await guardrail.async_post_call_success_hook( + data={}, user_api_key_dict=None, response=response + ) + assert result.choices[0].message.content == [{"type": "tool_use"}] + + async def test_post_call_fail_open_returns_unredacted_response(self, monkeypatch): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii", fail_policy="open") + monkeypatch.setattr( + "datafog.integrations.litellm_guardrail._redact_text", + lambda *a, **k: (_ for _ in ()).throw(RuntimeError("boom")), + ) + response = _model_response(f"reach me at {EMAIL}") + result = await guardrail.async_post_call_success_hook( + data={}, user_api_key_dict=None, response=response + ) + assert result.choices[0].message.content == f"reach me at {EMAIL}" + + +@pytest.mark.asyncio +class TestEdgeShapes: + async def test_data_without_messages_passes_through(self): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii") + data = {"input": f"embed {EMAIL}"} + result = await guardrail.async_pre_call_hook( + user_api_key_dict=None, cache=None, data=data, call_type="aembedding" + ) + assert result == data + + async def test_message_without_content_key_passes_through(self): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii") + data = {"messages": [{"role": "assistant", "tool_calls": []}]} + result = await guardrail.async_pre_call_hook( + user_api_key_dict=None, cache=None, data=data, call_type="completion" + ) + assert result == data + + async def test_mixed_content_parts_skips_non_text_and_redacts_text(self): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii") + data = _chat_data( + [ + {"type": "image_url", "image_url": {"url": "data:image/png;base64,xx"}}, + {"type": "text", "text": f"card {CARD}"}, + ] + ) + result = await guardrail.async_pre_call_hook( + user_api_key_dict=None, cache=None, data=data, call_type="completion" + ) + parts = result["messages"][0]["content"] + assert parts[0]["type"] == "image_url" # untouched + assert CARD not in parts[1]["text"] + + async def test_non_string_non_list_content_passes_through(self): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii") + data = _chat_data(None) + result = await guardrail.async_pre_call_hook( + user_api_key_dict=None, cache=None, data=data, call_type="completion" + ) + assert result["messages"][0]["content"] is None + + async def test_logging_helper_failure_never_breaks_traffic(self, monkeypatch): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii") + monkeypatch.setattr( + DataFogGuardrail, + "add_standard_logging_guardrail_information_to_request_data", + lambda self, **kw: (_ for _ in ()).throw(RuntimeError("obs down")), + ) + data = await guardrail.async_pre_call_hook( + user_api_key_dict=None, + cache=None, + data=_chat_data(f"reach me at {EMAIL}"), + call_type="completion", + ) + assert EMAIL not in data["messages"][0]["content"] # redaction still happened + + +@pytest.mark.asyncio +class TestConfig: + async def test_noisy_entities_off_by_default(self): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii") + data = await guardrail.async_pre_call_hook( + user_api_key_dict=None, + cache=None, + data=_chat_data("ping 192.168.1.1 about build 2020-01-02"), + call_type="completion", + ) + assert ( + data["messages"][0]["content"] == "ping 192.168.1.1 about build 2020-01-02" + ) + + async def test_entity_types_override(self): + guardrail = DataFogGuardrail( + guardrail_name="datafog-pii", entity_types=["IP_ADDRESS"] + ) + data = await guardrail.async_pre_call_hook( + user_api_key_dict=None, + cache=None, + data=_chat_data("ping 192.168.1.1"), + call_type="completion", + ) + assert "192.168.1.1" not in data["messages"][0]["content"] + + +@pytest.mark.asyncio +class TestFailPolicy: + async def test_fail_open_passes_data_through_on_engine_error(self, monkeypatch): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii", fail_policy="open") + monkeypatch.setattr( + "datafog.integrations.litellm_guardrail._redact_text", + lambda *a, **k: (_ for _ in ()).throw(RuntimeError("boom")), + ) + original = _chat_data(f"reach me at {EMAIL}") + data = await guardrail.async_pre_call_hook( + user_api_key_dict=None, cache=None, data=original, call_type="completion" + ) + assert data["messages"][0]["content"] == f"reach me at {EMAIL}" + + async def test_fail_closed_raises_on_engine_error(self, monkeypatch): + guardrail = DataFogGuardrail(guardrail_name="datafog-pii", fail_policy="closed") + monkeypatch.setattr( + "datafog.integrations.litellm_guardrail._redact_text", + lambda *a, **k: (_ for _ in ()).throw(RuntimeError("boom")), + ) + with pytest.raises(RuntimeError, match="fail_policy is 'closed'"): + await guardrail.async_pre_call_hook( + user_api_key_dict=None, + cache=None, + data=_chat_data(f"reach me at {EMAIL}"), + call_type="completion", + ) + + async def test_invalid_config_rejected(self): + with pytest.raises(ValueError): + DataFogGuardrail(guardrail_name="datafog-pii", action="explode") + with pytest.raises(ValueError): + DataFogGuardrail(guardrail_name="datafog-pii", fail_policy="maybe") + + async def test_fail_closed_error_carries_no_pii_and_no_cause_chain( + self, monkeypatch + ): + # Engine exceptions can embed the text being scanned. The re-raise + # must not chain them (`from None`): a chained __cause__ is printed + # by traceback.format_exc(), which litellm calls for logging. + guardrail = DataFogGuardrail(guardrail_name="datafog-pii", fail_policy="closed") + monkeypatch.setattr( + "datafog.integrations.litellm_guardrail._redact_text", + lambda *a, **k: (_ for _ in ()).throw( + RuntimeError(f"parser choked on: reach me at {EMAIL}") + ), + ) + with pytest.raises(RuntimeError) as exc: + await guardrail.async_pre_call_hook( + user_api_key_dict=None, + cache=None, + data=_chat_data(f"reach me at {EMAIL}"), + call_type="completion", + ) + assert exc.value.__cause__ is None + assert exc.value.__suppress_context__ is True + assert EMAIL not in str(exc.value) + assert not hasattr(exc.value, "status_code") # engine fault -> 500, by design + + async def test_fail_open_log_carries_no_pii(self, monkeypatch, caplog): + import logging + + guardrail = DataFogGuardrail(guardrail_name="datafog-pii", fail_policy="open") + monkeypatch.setattr( + "datafog.integrations.litellm_guardrail._redact_text", + lambda *a, **k: (_ for _ in ()).throw( + RuntimeError(f"parser choked on: reach me at {EMAIL}") + ), + ) + with caplog.at_level(logging.WARNING): + await guardrail.async_pre_call_hook( + user_api_key_dict=None, + cache=None, + data=_chat_data(f"reach me at {EMAIL}"), + call_type="completion", + ) + assert EMAIL not in caplog.text + assert "RuntimeError" in caplog.text