From c75e71c0be2da8ad09c9c72ff7ccd021a7beb662 Mon Sep 17 00:00:00 2001 From: Max Isbey <224885523+maxisbey@users.noreply.github.com> Date: Fri, 26 Jun 2026 17:11:41 +0000 Subject: [PATCH] feat: RFC 6570 URI templates with operator-aware security Replace the regex-based resource template matcher with a linear-time RFC 6570 implementation (mcp.shared.uri_template.UriTemplate), add filesystem path-safety primitives (mcp.shared.path_security), and wire a configurable ResourceSecurity policy into MCPServer. The matcher is a two-ended linear scan with no backtracking. Rather than handling ambiguous templates with scan-time special cases, the parser rejects them up front: two variables adjacent with no literal between them (including a variable adjacent to the multi-segment variable), more than one multi-segment variable, and more than one {?...} expression all raise InvalidUriTemplate when the decorator runs. Operators that emit their own delimiter ({.ext}, {/seg}, {;name}) anchor themselves and still compose with a multi-segment variable. A handler parameter bound to an optional {?...}/{&...} query variable must declare a Python default; this is also checked at decoration time so the mistake cannot reach a request. ResourceSecurity rejects extracted parameter values that contain a null byte, look like an absolute path, or would resolve outside their starting directory. A rejection is indistinguishable on the wire from a not-found resource (-32602) and halts template iteration so a later permissive template is never tried. safe_join() is exported for filesystem handlers. UriTemplate is re-exported at the top level so clients can expand a template a server advertises. Beyond the example-based suite, two seeded property tests cover the whole space the parser accepts: match(expand(v)) round-trips and re-expands to the same URI for every accepted template, and match() never raises on any input. Docs: a tested reference page at docs/advanced/uri-templates.md with runnable examples under docs_src/uri_templates/, a forward link from the resources tutorial, and migration notes for every behaviour change. --- docs/advanced/uri-templates.md | 269 ++++ docs/migration.md | 70 ++ docs/tutorial/resources.md | 2 + docs_src/uri_templates/__init__.py | 0 docs_src/uri_templates/tutorial001.py | 43 + docs_src/uri_templates/tutorial002.py | 14 + docs_src/uri_templates/tutorial003.py | 25 + docs_src/uri_templates/tutorial004.py | 28 + docs_src/uri_templates/tutorial005.py | 55 + mkdocs.yml | 1 + src/mcp/__init__.py | 3 + src/mcp/server/mcpserver/__init__.py | 11 +- .../server/mcpserver/resources/__init__.py | 5 +- .../mcpserver/resources/resource_manager.py | 24 +- .../server/mcpserver/resources/templates.py | 130 +- src/mcp/server/mcpserver/server.py | 88 +- src/mcp/shared/path_security.py | 176 +++ src/mcp/shared/uri_template.py | 1116 +++++++++++++++++ tests/docs_src/test_uri_templates.py | 214 ++++ .../resources/test_resource_template.py | 146 +++ tests/server/mcpserver/test_server.py | 257 +++- tests/shared/test_path_security.py | 159 +++ tests/shared/test_uri_template.py | 1001 +++++++++++++++ 23 files changed, 3800 insertions(+), 37 deletions(-) create mode 100644 docs/advanced/uri-templates.md create mode 100644 docs_src/uri_templates/__init__.py create mode 100644 docs_src/uri_templates/tutorial001.py create mode 100644 docs_src/uri_templates/tutorial002.py create mode 100644 docs_src/uri_templates/tutorial003.py create mode 100644 docs_src/uri_templates/tutorial004.py create mode 100644 docs_src/uri_templates/tutorial005.py create mode 100644 src/mcp/shared/path_security.py create mode 100644 src/mcp/shared/uri_template.py create mode 100644 tests/docs_src/test_uri_templates.py create mode 100644 tests/shared/test_path_security.py create mode 100644 tests/shared/test_uri_template.py diff --git a/docs/advanced/uri-templates.md b/docs/advanced/uri-templates.md new file mode 100644 index 0000000000..32560f8ecd --- /dev/null +++ b/docs/advanced/uri-templates.md @@ -0,0 +1,269 @@ +# URI templates and path safety + +This is the reference for the URI-template syntax that +[`@mcp.resource`](../tutorial/resources.md) accepts, and for the +path-safety policy the SDK applies to extracted values. For an +introduction to what resources are and when to use them, start with +**Resources**; this page assumes you're already comfortable declaring a +resource and want the full operator set, the security knobs, or the +low-level wiring. + +The template syntax is [RFC 6570](https://datatracker.ietf.org/doc/html/rfc6570). +The SDK supports a subset chosen for matching incoming `resources/read` +URIs, plus a security layer that rejects values that would resolve +outside the directory you intend to serve. For the protocol-level +details (message formats, lifecycle, pagination) see the +[MCP resources specification](https://modelcontextprotocol.io/specification/latest/server/resources). + +## The full operator set + +**Resources** showed one placeholder, `{user_id}`. There are four more +operator forms; here they are on one server so you can see them next to +each other: + +```python title="server.py" hl_lines="16-17 22-23 28-29 34-35 40-41" +--8<-- "docs_src/uri_templates/tutorial001.py" +``` + +Each highlighted decorator is a different way of carving up the URI. +The sections below walk them top to bottom. + +### Simple expansion: `{name}` + +`books://{isbn}` is the form you already know. The placeholder maps to +the `isbn` parameter, so a client reading `books://978-0441172719` calls +`get_book("978-0441172719")`. + +A plain `{name}` stops at the first `/`. `books://978/extra` does not +match because the slash after `978` ends the capture and `/extra` is +left over. + +### Type conversion + +Extracted values arrive as strings, but you can declare a more specific +type and the SDK will convert. `orders://{order_id}` lands in a function +whose parameter is `order_id: int`, so reading `orders://12345` calls +`get_order(12345)`, not `get_order("12345")`. The handler does +arithmetic on it (`order_id + 1`) without a cast. + +### Multi-segment paths: `{+name}` + +To capture a value that contains slashes, use `{+name}`. With +`manuals://{+path}`: + +* `manuals://returns.md` gives `path = "returns.md"` +* `manuals://printing/setup.md` gives `path = "printing/setup.md"` + +Reach for `{+name}` whenever the value is hierarchical: filesystem +paths, nested object keys, URL paths you're proxying. + +### Query parameters: `{?a,b,c}` + +`reviews://{isbn}{?limit,sort}` puts `limit` and `sort` after the `?`. +The path identifies *which* book; the query tunes *how* you read it. + +Query params are matched leniently: order doesn't matter, extras are +ignored, and omitted params fall through to your function defaults. So +`reviews://978-0441172719` uses `limit=10, sort="newest"`, and +`reviews://978-0441172719?sort=top` overrides only `sort`. + +### Path segments as a list: `{/name*}` + +If you want each path segment as a separate list item rather than one +string with slashes, use `{/name*}`. With `shelves://browse{/path*}`, a +client reading `shelves://browse/fiction/sci-fi` calls +`browse_shelf(["fiction", "sci-fi"])`. + +### Template reference + +The most common patterns: + +| Pattern | Example input | You get | +|--------------|-----------------------|-------------------------| +| `{name}` | `alice` | `"alice"` | +| `{name}` | `docs/intro.md` | *no match* (stops at `/`) | +| `{+path}` | `docs/intro.md` | `"docs/intro.md"` | +| `{.ext}` | `.json` | `"json"` | +| `{/segment}` | `/v2` | `"v2"` | +| `{?key}` | `?key=value` | `"value"` | +| `{?a,b}` | `?a=1&b=2` | `"1"`, `"2"` | +| `{/path*}` | `/a/b/c` | `["a", "b", "c"]` | + +### What the parser rejects + +A few template shapes are caught up front rather than failing on the +first request. `@mcp.resource` parses the template when the decorator +runs, so none of these ever reach a running server. + +`UriTemplate.parse()` raises `InvalidUriTemplate` for: + +* **Two variables with nothing between them.** `manuals://{+path}{ext}` + is rejected: matching can't tell where `path` ends and `ext` begins. + Put a literal between them (`manuals://{+path}/{ext}`), or use an + operator that supplies its own delimiter. `manuals://{+path}{.ext}` + is accepted because `{.ext}` contributes the `.` itself. +* **More than one multi-segment variable.** At most one of `{+var}`, + `{#var}`, or an exploded variable (`{/var*}`, `{.var*}`, `{;var*}`) + per template. Two are inherently ambiguous: there is no principled + way to decide which one absorbs an extra segment. +* **The usual syntax errors**: an unclosed brace, a variable name used + twice, or an RFC 6570 feature the SDK doesn't support, such as the + `{var:3}` prefix modifier or the `{?vars*}` query explode. + +On top of that, `@mcp.resource` raises `ValueError` when a handler +parameter is bound to a query variable in the template's trailing +`{?...}`/`{&...}` run but has no Python default. Those variables are +matched leniently (a client may leave any of them out), so a parameter +without a default would only surface as an opaque internal error on the +first request that omits it. `reviews://{isbn}{?limit,sort}` in the +server above is the well-formed version: `limit` and `sort` both carry +defaults. + +## Security + +Template parameters come from the client. If they flow into filesystem +or database operations unchecked, values like `../../etc/passwd` can +resolve outside the directory you intended to serve. + +### What the SDK checks by default + +Before your handler runs, the SDK rejects any parameter that: + +* would escape its starting directory via `..` components +* looks like an absolute path (`/etc/passwd`, `C:\Windows`) or a + Windows drive-relative one (`C:foo`). A drive-relative value and a + namespaced identifier like `x:y` are indistinguishable as strings, + so any single-letter-plus-colon value is rejected by default; + exempt the parameter if it legitimately receives such values +* contains a null byte (`\x00`) + +The `..` check is component-based, not a substring scan. Values like +`v1.0..v2.0` or `HEAD~3..HEAD` pass because `..` is not a standalone +path segment there. + +These checks apply to the decoded value, so they catch traversal +regardless of how it was encoded in the URI (`../etc`, `..%2Fetc`, +`%2E%2E/etc`, `..%5Cetc`, `%00` all get caught). + +!!! check + Read `manuals://../etc/passwd` from the server above and the request + is rejected outright: template matching stops at the first failure, + so no later (potentially more permissive) template is tried as a + fallback. The client sees the same `-32602` "Unknown resource" error + it would for a URI that matches no template at all, and + `read_manual` never runs. + +### Filesystem handlers: use safe_join + +The built-in checks stop the common cases but can't know your sandbox +boundary. For filesystem access, use `safe_join` to resolve the path +and verify it stays inside your base directory: + +```python title="server.py" hl_lines="4 14" +--8<-- "docs_src/uri_templates/tutorial002.py" +``` + +`safe_join` catches symlink escapes, `..` sequences, and absolute-path +tricks that a simple string check would miss. If the resolved path +escapes `DOCS_ROOT`, it raises `PathEscapeError`, which surfaces to the +client as a `ResourceError`. + +### When the defaults get in the way + +Sometimes the checks block legitimate values. A catalog-import tool +might intentionally receive an absolute path, or a parameter might be a +relative reference like `../sibling` that your handler interprets +safely without touching the filesystem. Exempt that parameter, or relax +the policy for the whole server: + +```python title="server.py" hl_lines="9 16-19" +--8<-- "docs_src/uri_templates/tutorial003.py" +``` + +* `security=ResourceSecurity(exempt_params={"source"})` on the decorator + skips the checks for that one parameter on that one resource. The + rest of the server keeps the default policy. +* `resource_security=` on the `MCPServer` constructor sets the default + for every resource. Here `relaxed` turns off the `..` check entirely. + +The configurable checks: + +| Setting | Default | What it does | +|-------------------------|---------|-------------------------------------| +| `reject_path_traversal` | `True` | Rejects `..` sequences that escape the starting directory | +| `reject_absolute_paths` | `True` | Rejects `/foo`, `C:\foo`, UNC paths, and drive-relative `C:foo` (also catches `x:y`) | +| `reject_null_bytes` | `True` | Rejects values containing `\x00` | +| `exempt_params` | empty | Parameter names to skip checks for | + +These checks are a heuristic pre-filter; for filesystem access, +`safe_join` remains the containment boundary. + +!!! tip + If your handler can't fulfil the request (the file doesn't exist, + the id is unknown), raise an exception. The SDK turns it into an + error response. See **Handling errors** for the difference between a + protocol error and a tool error. + +## Resources on the low-level Server + +If you're building on the low-level `Server` (see **The low-level +Server**), you register handlers for the `resources/list` and +`resources/read` protocol methods directly. There's no decorator; you +return the protocol types yourself. + +### Static resources + +For fixed URIs, keep a registry and dispatch on exact match: + +```python title="server.py" hl_lines="18 22 28" +--8<-- "docs_src/uri_templates/tutorial004.py" +``` + +The list handler tells clients what's available; the read handler +serves the content. Check your registry first, fall through to +templates (below) if you have any, then raise for anything else. + +### Templates + +The template engine `MCPServer` uses lives in `mcp.shared.uri_template` +and works on its own. You get the same parsing and matching; you wire +up the routing and security policy yourself. + +```python title="server.py" hl_lines="14-17 23-26 30 34 46" +--8<-- "docs_src/uri_templates/tutorial005.py" +``` + +Three things are happening in the highlighted lines: + +* **Parse once, match per request.** `UriTemplate.parse()` builds the + template; `template.match(uri)` returns the extracted variables as a + `dict`, or `None` if the URI doesn't fit. URL decoding happens inside + `match()`; the decoded values are returned as-is without path-safety + validation. Values come out as strings: convert them yourself + (`int(matched["id"])`, `Path(matched["path"])`). +* **Apply the safety checks yourself.** The `..` and absolute-path + checks `MCPServer` runs by default live in `mcp.shared.path_security`. + `read_manual_safely` calls them before touching `MANUALS`. If a + parameter isn't a filesystem path (an ISBN, a search query), skip the + checks for that value: you control the policy per handler rather than + through a config object. +* **List the templates from the same source.** Clients discover + templates through `resources/templates/list`. `str(template)` gives + back the original template string, so the listing and the matcher + share one source of truth. + +## Recap + +* `{name}` matches one segment; `{+name}` keeps the slashes; `{?a,b}` + pulls from the query string; `{/name*}` splits segments into a list. +* Two variables with nothing between them, or a second multi-segment + variable, are rejected at parse time. A parameter bound to a trailing + `{?...}`/`{&...}` query variable must declare a Python default. +* Annotate the parameter (`order_id: int`) and the SDK converts. +* The default security policy rejects `..`, absolute paths, and null + bytes before your handler runs; override per resource with + `security=ResourceSecurity(...)` or server-wide with + `resource_security=`. +* For filesystem access, `safe_join` is the containment boundary. +* On the low-level `Server`, parse with `UriTemplate.parse()`, match + with `.match()`, and apply `mcp.shared.path_security` yourself. diff --git a/docs/migration.md b/docs/migration.md index e987b626c6..42d420bf04 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -609,6 +609,76 @@ Reading a missing resource now returns JSON-RPC error code `-32602` (invalid par The underlying lookups now raise typed exceptions instead of `ValueError`. `ResourceManager.get_resource()` raises `ResourceNotFoundError` when no resource or template matches the URI, and `ResourceTemplate.create_resource()` raises `ResourceError` when the template function fails. Neither subclasses `ValueError`, so callers catching `ValueError` should switch to `ResourceNotFoundError` / `ResourceError` (both importable from `mcp.server.mcpserver.exceptions`; `ResourceNotFoundError` subclasses `ResourceError`). +### Resource templates: matching behavior changes + +Resource template matching has been rewritten with RFC 6570 support. +Several behaviors have changed: + +**Path-safety checks applied by default.** Extracted parameter values +containing `..` as a path component, a null byte, or looking like an +absolute path (`/etc/passwd`, `C:\Windows`) now cause the read to +fail — the client receives an "Unknown resource" error and template +iteration stops, so a strict template's rejection does not fall +through to a later permissive template. This is checked on the +decoded value, so `..%2Fetc`, `%2E%2E`, and `%00` are caught too. +Note that `..` is only flagged as a standalone path component, so +values like `v1.0..v2.0` or `HEAD~3..HEAD` are unaffected. + +If a parameter legitimately needs to receive absolute paths or +traversal sequences, exempt it: + +```python +from mcp.server.mcpserver import ResourceSecurity + +@mcp.resource( + "inspect://file/{+target}", + security=ResourceSecurity(exempt_params={"target"}), +) +def inspect_file(target: str) -> str: ... +``` + +**Template literals and structural delimiters match exactly.** The +previous matcher built a regex without escaping, so `.` matched any +character and simple `{var}` swallowed `?`, `#`, `&`, and `,`. Now +`data://v1.0/{id}` no longer matches `data://v1X0/42`, and +`api://{id}` no longer matches `api://foo?x=1` — use `api://{id}{?x}` +to capture the query parameter. + +**`{var}` now matches an empty value.** A simple expression captures +zero or more characters, so `tickets://{ticket_id}` now matches +`tickets://` with `ticket_id=""` (v1.x's `[^/]+` regex required at +least one). This makes `match` round-trip `expand` for empty values — RFC 6570 +expands an empty string to nothing — but handlers that assumed a +non-empty value should validate it explicitly. + +**Template syntax errors surface at decoration time.** Unclosed +braces, duplicate variable names, and unsupported syntax raise +`InvalidUriTemplate` when the decorator runs rather than `re.error` +on first match. Two variables with no literal between them are also +rejected — matching cannot tell where one ends and the next begins — +so `{name}{+path}` raises. Write `{name}/{+path}`, or use an operator +that emits its own delimiter: `{+path}{.ext}` is fine because the `.` +operator contributes a literal `.` between the two. A handler +parameter bound to a query variable in the template's trailing +`{?...}`/`{&...}` run — the variables `match()` treats as optional, +listed by `UriTemplate.query_variable_names` — must declare a Python +default: a client may omit those, so a handler that requires one now +raises `ValueError` when the decorator runs instead of failing on the +first request that leaves it out. (A `{&...}` expression with no +preceding `{?...}` is not in that run: it is matched strictly, may +not be omitted, and needs no default.) + +**Static URIs with Context-only handlers now error.** A non-template +URI paired with a handler that takes only a `Context` parameter +previously registered but was silently unreachable (the resource +could never be read). This now raises `ValueError` at decoration time. +Context injection for static resources is not supported — use a +template with at least one variable or access context through other +means. + +See [URI templates](advanced/uri-templates.md) for the full template syntax, +security configuration, and filesystem safety utilities. + ### Registering lowlevel handlers from `MCPServer` `MCPServer` does not expose public APIs for `subscribe_resource`, `unsubscribe_resource`, or `set_logging_level` handlers. In v1, the workaround was to reach into the private lowlevel server and use its decorator methods: diff --git a/docs/tutorial/resources.md b/docs/tutorial/resources.md index 5cf35503f9..749b8227d6 100644 --- a/docs/tutorial/resources.md +++ b/docs/tutorial/resources.md @@ -92,6 +92,8 @@ Notice the `uri` in the result. It is the **concrete** URI the client asked for, A mismatch can only ever be a bug, so the SDK makes it impossible to start the server with one. +The placeholder syntax is RFC 6570: `{+path}` for multi-segment values, `{?q,lang}` for optional query parameters, and more. The SDK also applies path-safety checks to extracted values by default. See **[URI templates and path safety](../advanced/uri-templates.md)** for the full reference. + `get_user_profile` can also take a parameter annotated `Context`. The SDK injects it without ever treating it as a URI parameter, and **The Context** chapter covers what it gives you. ## What you return diff --git a/docs_src/uri_templates/__init__.py b/docs_src/uri_templates/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/docs_src/uri_templates/tutorial001.py b/docs_src/uri_templates/tutorial001.py new file mode 100644 index 0000000000..87685c0e56 --- /dev/null +++ b/docs_src/uri_templates/tutorial001.py @@ -0,0 +1,43 @@ +from mcp.server import MCPServer + +mcp = MCPServer("Bookshop") + +BOOKS = { + "978-0441172719": {"title": "Dune", "author": "Frank Herbert"}, + "978-0553293357": {"title": "Foundation", "author": "Isaac Asimov"}, +} + +MANUALS = { + "printing/setup.md": "# Printer setup\n\nLoad paper, then power on.", + "returns.md": "# Returns policy\n\nThirty days with a receipt.", +} + + +@mcp.resource("books://{isbn}") +def get_book(isbn: str) -> dict[str, str]: + """A single book by ISBN.""" + return BOOKS[isbn] + + +@mcp.resource("orders://{order_id}") +def get_order(order_id: int) -> dict[str, object]: + """An order by its numeric id.""" + return {"order_id": order_id, "next_order": order_id + 1, "status": "shipped"} + + +@mcp.resource("manuals://{+path}") +def read_manual(path: str) -> str: + """A staff manual page. The path keeps its slashes.""" + return MANUALS[path] + + +@mcp.resource("reviews://{isbn}{?limit,sort}") +def list_reviews(isbn: str, limit: int = 10, sort: str = "newest") -> str: + """Reviews of a book, optionally limited and sorted.""" + return f"{limit} {sort} reviews of {BOOKS[isbn]['title']}" + + +@mcp.resource("shelves://browse{/path*}") +def browse_shelf(path: list[str]) -> str: + """A shelf in the category tree, addressed by segments.""" + return " > ".join(["catalog", *path]) diff --git a/docs_src/uri_templates/tutorial002.py b/docs_src/uri_templates/tutorial002.py new file mode 100644 index 0000000000..2ad1ec7c1e --- /dev/null +++ b/docs_src/uri_templates/tutorial002.py @@ -0,0 +1,14 @@ +from pathlib import Path + +from mcp.server import MCPServer +from mcp.shared.path_security import safe_join + +mcp = MCPServer("Bookshop") + +DOCS_ROOT = Path("./manuals") + + +@mcp.resource("manuals://{+path}") +def read_manual(path: str) -> str: + """A staff manual page, served from a directory on disk.""" + return safe_join(DOCS_ROOT, path).read_text() diff --git a/docs_src/uri_templates/tutorial003.py b/docs_src/uri_templates/tutorial003.py new file mode 100644 index 0000000000..bc7a59cb3b --- /dev/null +++ b/docs_src/uri_templates/tutorial003.py @@ -0,0 +1,25 @@ +from mcp.server import MCPServer +from mcp.server.mcpserver import ResourceSecurity + +mcp = MCPServer("Bookshop") + + +@mcp.resource( + "imports://preview/{+source}", + security=ResourceSecurity(exempt_params={"source"}), +) +def preview_import(source: str) -> str: + """Preview a catalog import. `source` may be an absolute path.""" + return f"Would import from {source}" + + +relaxed = MCPServer( + "Bookshop", + resource_security=ResourceSecurity(reject_path_traversal=False), +) + + +@relaxed.resource("imports://preview/{+source}") +def preview_import_relaxed(source: str) -> str: + """The server-wide flag exempts every resource on `relaxed`.""" + return f"Would import from {source}" diff --git a/docs_src/uri_templates/tutorial004.py b/docs_src/uri_templates/tutorial004.py new file mode 100644 index 0000000000..c1920b3cc5 --- /dev/null +++ b/docs_src/uri_templates/tutorial004.py @@ -0,0 +1,28 @@ +from mcp_types import ( + ListResourcesResult, + PaginatedRequestParams, + ReadResourceRequestParams, + ReadResourceResult, + Resource, + TextResourceContents, +) + +from mcp.server import Server, ServerRequestContext + +RESOURCES = { + "config://shop": '{"currency": "USD", "tax_rate": 0.08}', + "status://health": "ok", +} + + +async def list_resources(ctx: ServerRequestContext, params: PaginatedRequestParams | None) -> ListResourcesResult: + return ListResourcesResult(resources=[Resource(name=uri, uri=uri) for uri in RESOURCES]) + + +async def read_resource(ctx: ServerRequestContext, params: ReadResourceRequestParams) -> ReadResourceResult: + if (text := RESOURCES.get(params.uri)) is not None: + return ReadResourceResult(contents=[TextResourceContents(uri=params.uri, text=text)]) + raise ValueError(f"Unknown resource: {params.uri}") + + +server = Server("Bookshop", on_list_resources=list_resources, on_read_resource=read_resource) diff --git a/docs_src/uri_templates/tutorial005.py b/docs_src/uri_templates/tutorial005.py new file mode 100644 index 0000000000..716ff08dc1 --- /dev/null +++ b/docs_src/uri_templates/tutorial005.py @@ -0,0 +1,55 @@ +from mcp_types import ( + ListResourceTemplatesResult, + PaginatedRequestParams, + ReadResourceRequestParams, + ReadResourceResult, + ResourceTemplate, + TextResourceContents, +) + +from mcp.server import Server, ServerRequestContext +from mcp.shared.path_security import contains_path_traversal, is_absolute_path +from mcp.shared.uri_template import UriTemplate + +TEMPLATES = { + "manuals": UriTemplate.parse("manuals://{+path}"), + "books": UriTemplate.parse("books://{isbn}"), +} + +MANUALS = {"printing/setup.md": "# Printer setup", "returns.md": "# Returns policy"} +BOOKS = {"978-0441172719": "Dune by Frank Herbert"} + + +def read_manual_safely(path: str) -> str: + if contains_path_traversal(path) or is_absolute_path(path): + raise ValueError("rejected") + return MANUALS[path] + + +async def read_resource(ctx: ServerRequestContext, params: ReadResourceRequestParams) -> ReadResourceResult: + if (matched := TEMPLATES["manuals"].match(params.uri)) is not None: + text = read_manual_safely(str(matched["path"])) + return ReadResourceResult(contents=[TextResourceContents(uri=params.uri, text=text)]) + + if (matched := TEMPLATES["books"].match(params.uri)) is not None: + text = BOOKS[str(matched["isbn"])] + return ReadResourceResult(contents=[TextResourceContents(uri=params.uri, text=text)]) + + raise ValueError(f"Unknown resource: {params.uri}") + + +async def list_resource_templates( + ctx: ServerRequestContext, params: PaginatedRequestParams | None +) -> ListResourceTemplatesResult: + return ListResourceTemplatesResult( + resource_templates=[ + ResourceTemplate(name=name, uri_template=str(template)) for name, template in TEMPLATES.items() + ] + ) + + +server = Server( + "Bookshop", + on_read_resource=read_resource, + on_list_resource_templates=list_resource_templates, +) diff --git a/mkdocs.yml b/mkdocs.yml index af32c74ab6..93127a410a 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -40,6 +40,7 @@ nav: - Advanced: - Multi-round-trip requests: advanced/multi-round-trip.md - The low-level Server: advanced/low-level-server.md + - URI templates: advanced/uri-templates.md - Pagination: advanced/pagination.md - Middleware: advanced/middleware.md - OpenTelemetry: advanced/opentelemetry.md diff --git a/src/mcp/__init__.py b/src/mcp/__init__.py index 49bb494f94..085e445d4a 100644 --- a/src/mcp/__init__.py +++ b/src/mcp/__init__.py @@ -66,6 +66,7 @@ from .server.session import ServerSession from .server.stdio import stdio_server from .shared.exceptions import MCPDeprecationWarning, MCPError, UrlElicitationRequiredError +from .shared.uri_template import InvalidUriTemplate, UriTemplate __all__ = [ "CallToolRequest", @@ -133,7 +134,9 @@ "ToolsCapability", "ToolUseContent", "UnsubscribeRequest", + "UriTemplate", "UrlElicitationRequiredError", + "InvalidUriTemplate", "stdio_client", "stdio_server", ] diff --git a/src/mcp/server/mcpserver/__init__.py b/src/mcp/server/mcpserver/__init__.py index e36a7ae7d6..741f16beb1 100644 --- a/src/mcp/server/mcpserver/__init__.py +++ b/src/mcp/server/mcpserver/__init__.py @@ -3,7 +3,16 @@ from mcp_types import Icon from .context import Context +from .resources import DEFAULT_RESOURCE_SECURITY, ResourceSecurity from .server import MCPServer from .utilities.types import Audio, Image -__all__ = ["MCPServer", "Context", "Image", "Audio", "Icon"] +__all__ = [ + "MCPServer", + "Context", + "Image", + "Audio", + "Icon", + "ResourceSecurity", + "DEFAULT_RESOURCE_SECURITY", +] diff --git a/src/mcp/server/mcpserver/resources/__init__.py b/src/mcp/server/mcpserver/resources/__init__.py index b5805fb348..f54ea44e42 100644 --- a/src/mcp/server/mcpserver/resources/__init__.py +++ b/src/mcp/server/mcpserver/resources/__init__.py @@ -1,6 +1,6 @@ from .base import Resource from .resource_manager import ResourceManager -from .templates import ResourceTemplate +from .templates import DEFAULT_RESOURCE_SECURITY, ResourceSecurity, ResourceSecurityError, ResourceTemplate from .types import ( BinaryResource, DirectoryResource, @@ -20,4 +20,7 @@ "DirectoryResource", "ResourceTemplate", "ResourceManager", + "ResourceSecurity", + "ResourceSecurityError", + "DEFAULT_RESOURCE_SECURITY", ] diff --git a/src/mcp/server/mcpserver/resources/resource_manager.py b/src/mcp/server/mcpserver/resources/resource_manager.py index 54f54549eb..41d3d7bb37 100644 --- a/src/mcp/server/mcpserver/resources/resource_manager.py +++ b/src/mcp/server/mcpserver/resources/resource_manager.py @@ -10,7 +10,12 @@ from mcp.server.mcpserver.exceptions import ResourceNotFoundError from mcp.server.mcpserver.resources.base import Resource -from mcp.server.mcpserver.resources.templates import ResourceTemplate +from mcp.server.mcpserver.resources.templates import ( + DEFAULT_RESOURCE_SECURITY, + ResourceSecurity, + ResourceSecurityError, + ResourceTemplate, +) from mcp.server.mcpserver.utilities.logging import get_logger if TYPE_CHECKING: @@ -63,6 +68,7 @@ def add_template( icons: list[Icon] | None = None, annotations: Annotations | None = None, meta: dict[str, Any] | None = None, + security: ResourceSecurity = DEFAULT_RESOURCE_SECURITY, ) -> ResourceTemplate: """Add a template from a function.""" template = ResourceTemplate.from_function( @@ -75,6 +81,7 @@ def add_template( icons=icons, annotations=annotations, meta=meta, + security=security, ) self._templates[template.uri_template] = template return template @@ -85,6 +92,15 @@ async def get_resource(self, uri: AnyUrl | str, context: Context[LifespanContext Raises: ResourceNotFoundError: If no resource or template matches the URI. ResourceError: If a matching template fails to create the resource. + + Note: + Pydantic's ``AnyUrl`` normalises percent-encoding and + resolves ``..`` segments during validation, so a value + constructed as ``AnyUrl("file:///a/%2E%2E/b")`` arrives + here as ``file:///b``. The JSON-RPC protocol layer passes + raw ``str`` values and is unaffected, but internal callers + wrapping URIs in ``AnyUrl`` should be aware that security + checks see the already-normalised form. """ uri_str = str(uri) logger.debug("Getting resource", extra={"uri": uri_str}) @@ -95,7 +111,11 @@ async def get_resource(self, uri: AnyUrl | str, context: Context[LifespanContext # Then check templates for template in self._templates.values(): - if params := template.matches(uri_str): + try: + params = template.matches(uri_str) + except ResourceSecurityError as e: + raise ResourceNotFoundError(f"Unknown resource: {uri}") from e + if params is not None: return await template.create_resource(uri_str, params, context=context) raise ResourceNotFoundError(f"Unknown resource: {uri}") diff --git a/src/mcp/server/mcpserver/resources/templates.py b/src/mcp/server/mcpserver/resources/templates.py index 72707a11ab..f78b5ec666 100644 --- a/src/mcp/server/mcpserver/resources/templates.py +++ b/src/mcp/server/mcpserver/resources/templates.py @@ -3,10 +3,9 @@ from __future__ import annotations import functools -import re -from collections.abc import Callable +from collections.abc import Callable, Mapping, Set +from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any -from urllib.parse import unquote import anyio.to_thread from mcp_types import Annotations, Icon @@ -18,6 +17,8 @@ from mcp.server.mcpserver.utilities.func_metadata import func_metadata from mcp.server.mcpserver.utilities.logging import get_logger from mcp.shared._callable_inspection import is_async_callable +from mcp.shared.path_security import contains_path_traversal, is_absolute_path +from mcp.shared.uri_template import UriTemplate logger = get_logger(__name__) @@ -26,6 +27,82 @@ from mcp.server.mcpserver.context import Context +@dataclass(frozen=True) +class ResourceSecurity: + """Security policy applied to extracted resource template parameters. + + These checks run after :meth:`~mcp.shared.uri_template.UriTemplate.match` + has extracted and decoded parameter values. They catch path-traversal + and absolute-path injection regardless of how the value was encoded in + the URI (literal, ``%2F``, ``%5C``, ``%2E%2E``). + + Example:: + + # Opt out for a parameter that legitimately contains .. + @mcp.resource( + "git://diff/{+range}", + security=ResourceSecurity(exempt_params={"range"}), + ) + def git_diff(range: str) -> str: ... + """ + + reject_path_traversal: bool = True + """Reject values containing ``..`` as a path component.""" + + reject_absolute_paths: bool = True + """Reject values that look like absolute filesystem paths.""" + + reject_null_bytes: bool = True + """Reject values containing NUL (``\\x00``). Null bytes defeat string + comparisons (``"..\\x00" != ".."``) and can cause truncation in C + extensions or subprocess calls.""" + + exempt_params: Set[str] = field(default_factory=frozenset[str]) + """Parameter names to skip all checks for.""" + + def validate(self, params: Mapping[str, str | list[str]]) -> str | None: + """Check all parameter values against the configured policy. + + Args: + params: Extracted template parameters. List values (from + explode variables) are checked element-wise. + + Returns: + The name of the first parameter that fails, or ``None`` if + all values pass. + """ + for name, value in params.items(): + if name in self.exempt_params: + continue + values = value if isinstance(value, list) else [value] + for v in values: + if self.reject_null_bytes and "\0" in v: + return name + if self.reject_path_traversal and contains_path_traversal(v): + return name + if self.reject_absolute_paths and is_absolute_path(v): + return name + return None + + +DEFAULT_RESOURCE_SECURITY = ResourceSecurity() +"""Secure-by-default policy: traversal, absolute paths, and null bytes rejected.""" + + +class ResourceSecurityError(ValueError): + """Raised when an extracted parameter fails :class:`ResourceSecurity` checks. + + Distinct from a simple ``None`` non-match so that template + iteration can stop at the first security rejection rather than + falling through to a later, possibly more permissive, template. + """ + + def __init__(self, template: str, param: str) -> None: + super().__init__(f"Parameter {param!r} of template {template!r} failed security validation") + self.template = template + self.param = param + + class ResourceTemplate(BaseModel): """A template for dynamically creating resources.""" @@ -40,6 +117,8 @@ class ResourceTemplate(BaseModel): fn: Callable[..., Any] = Field(exclude=True) parameters: dict[str, Any] = Field(description="JSON schema for function parameters") context_kwarg: str | None = Field(None, description="Name of the kwarg that should receive context") + parsed_template: UriTemplate = Field(exclude=True, description="Parsed RFC 6570 template") + security: ResourceSecurity = Field(exclude=True, description="Path-safety policy for extracted parameters") @classmethod def from_function( @@ -54,12 +133,20 @@ def from_function( annotations: Annotations | None = None, meta: dict[str, Any] | None = None, context_kwarg: str | None = None, + security: ResourceSecurity = DEFAULT_RESOURCE_SECURITY, ) -> ResourceTemplate: - """Create a template from a function.""" + """Create a template from a function. + + Raises: + InvalidUriTemplate: If ``uri_template`` is malformed or uses + unsupported RFC 6570 features. + """ func_name = name or fn.__name__ if func_name == "": raise ValueError("You must provide a name for lambda functions") # pragma: no cover + parsed = UriTemplate.parse(uri_template) + # Find context parameter if it exists if context_kwarg is None: # pragma: no branch context_kwarg = find_context_parameter(fn) @@ -86,20 +173,35 @@ def from_function( fn=fn, parameters=parameters, context_kwarg=context_kwarg, + parsed_template=parsed, + security=security, ) - def matches(self, uri: str) -> dict[str, Any] | None: - """Check if URI matches template and extract parameters. + def matches(self, uri: str) -> dict[str, str | list[str]] | None: + """Check if a URI matches this template and extract parameters. + + Delegates to :meth:`UriTemplate.match` for RFC 6570 extraction, + then applies this template's :class:`ResourceSecurity` policy + (path traversal, absolute paths). - Extracted parameters are URL-decoded to handle percent-encoded characters. + Returns: + Extracted parameters on success, or ``None`` if the URI + doesn't match the template. + + Raises: + ResourceSecurityError: If the URI matches but an extracted + parameter fails security validation. Raising (rather + than returning ``None``) prevents the resource manager + from silently falling through to a later, possibly more + permissive, template. """ - # Convert template to regex pattern - pattern = self.uri_template.replace("{", "(?P<").replace("}", ">[^/]+)") - match = re.match(f"^{pattern}$", uri) - if match: - # URL-decode all extracted parameter values - return {key: unquote(value) for key, value in match.groupdict().items()} - return None + params = self.parsed_template.match(uri) + if params is None: + return None + failed = self.security.validate(params) + if failed is not None: + raise ResourceSecurityError(self.uri_template, failed) + return params async def create_resource( self, diff --git a/src/mcp/server/mcpserver/server.py b/src/mcp/server/mcpserver/server.py index 855770eda7..029512a780 100644 --- a/src/mcp/server/mcpserver/server.py +++ b/src/mcp/server/mcpserver/server.py @@ -4,7 +4,6 @@ import base64 import inspect -import re from collections.abc import AsyncIterator, Awaitable, Callable, Iterable from contextlib import AbstractAsyncContextManager, asynccontextmanager from typing import Any, Generic, Literal, TypeVar, overload @@ -62,7 +61,13 @@ from mcp.server.mcpserver.context import Context from mcp.server.mcpserver.exceptions import ResourceError, ResourceNotFoundError from mcp.server.mcpserver.prompts import Prompt, PromptManager -from mcp.server.mcpserver.resources import FunctionResource, Resource, ResourceManager +from mcp.server.mcpserver.resources import ( + DEFAULT_RESOURCE_SECURITY, + FunctionResource, + Resource, + ResourceManager, + ResourceSecurity, +) from mcp.server.mcpserver.tools import Tool, ToolManager from mcp.server.mcpserver.utilities.context_injection import find_context_parameter from mcp.server.mcpserver.utilities.logging import configure_logging, get_logger @@ -72,6 +77,7 @@ from mcp.server.streamable_http_manager import StreamableHTTPSessionManager from mcp.server.transport_security import TransportSecuritySettings from mcp.shared.exceptions import MCPError +from mcp.shared.uri_template import UriTemplate logger = get_logger(__name__) @@ -150,7 +156,9 @@ def __init__( dependencies: list[str] | None = None, lifespan: Callable[[MCPServer[LifespanResultT]], AbstractAsyncContextManager[LifespanResultT]] | None = None, auth: AuthSettings | None = None, + resource_security: ResourceSecurity = DEFAULT_RESOURCE_SECURITY, ): + self._resource_security = resource_security self.settings = Settings( debug=debug, log_level=log_level, @@ -622,6 +630,7 @@ def resource( icons: list[Icon] | None = None, annotations: Annotations | None = None, meta: dict[str, Any] | None = None, + security: ResourceSecurity | None = None, ) -> Callable[[_CallableT], _CallableT]: """Decorator to register a function as a resource. @@ -631,8 +640,9 @@ def resource( - bytes for binary content - other types will be converted to JSON - If the URI contains parameters (e.g. "resource://{param}") or the function - has parameters, it will be registered as a template resource. + If the URI contains parameters (e.g. "resource://{param}"), it is + registered as a template resource. Otherwise it is registered as a + static resource; function parameters on a static URI raise an error. Args: uri: URI for the resource (e.g. "resource://my-resource" or "resource://{param}") @@ -643,6 +653,9 @@ def resource( icons: Optional list of icons for the resource annotations: Optional annotations for the resource meta: Optional metadata dictionary for the resource + security: Path-safety policy for extracted template parameters. + Defaults to the server's ``resource_security`` setting. + Only applies to template resources. Example: ```python @@ -664,6 +677,15 @@ async def get_weather(city: str) -> str: data = await fetch_weather(city) return f"Weather for {city}: {data}" ``` + + Raises: + InvalidUriTemplate: If ``uri`` is not a valid RFC 6570 template. + ValueError: If URI template parameters don't match the + function's parameters, or if a parameter bound to a + ``{?...}``/``{&...}`` query variable has no default + (the client may omit it). + TypeError: If the decorator is applied without being called + (``@resource`` instead of ``@resource("uri")``). """ # Check if user passed function directly instead of calling decorator if callable(uri): @@ -672,27 +694,44 @@ async def get_weather(city: str) -> str: "Did you forget to call it? Use @resource('uri') instead of @resource" ) + # Parse once, early — surfaces malformed-template errors at + # decoration time with a clear position, and gives us correct + # variable names for all RFC 6570 operators. + parsed = UriTemplate.parse(uri) + uri_params = set(parsed.variable_names) + def decorator(fn: _CallableT) -> _CallableT: - # Check if this should be a template sig = inspect.signature(fn) - has_uri_params = "{" in uri and "}" in uri - has_func_params = bool(sig.parameters) - - if has_uri_params or has_func_params: - # Check for Context parameter to exclude from validation - context_param = find_context_parameter(fn) - - # Validate that URI params match function params (excluding context) - uri_params = set(re.findall(r"{(\w+)}", uri)) - # We need to remove the context_param from the resource function if - # there is any. - func_params = {p for p in sig.parameters.keys() if p != context_param} + context_param = find_context_parameter(fn) + func_params = {p for p in sig.parameters.keys() if p != context_param} + # Template/static is decided purely by the URI: variables + # present means template, none means static. + if uri_params: if uri_params != func_params: raise ValueError( f"Mismatch between URI parameters {uri_params} and function parameters {func_params}" ) + # A {?...}/{&...} query variable is optional on the wire: + # match() omits it from the extracted parameters when the + # client leaves it out of the URI. The handler parameter + # bound to it must therefore have a Python default; without + # one, the author only finds out on the first request that + # omits it, as an opaque internal error. + missing_defaults = sorted( + name + for name in parsed.query_variable_names + if sig.parameters[name].default is inspect.Parameter.empty + ) + if missing_defaults: + raise ValueError( + f"Resource {uri!r}: query parameter(s) {missing_defaults} have no " + f"default value. A client may omit a {{?...}}/{{&...}} query " + f"parameter, so the matching handler parameter must declare a " + f"default." + ) + # Register as template self._resource_manager.add_template( fn=fn, @@ -703,9 +742,24 @@ def decorator(fn: _CallableT) -> _CallableT: mime_type=mime_type, icons=icons, annotations=annotations, + security=security if security is not None else self._resource_security, meta=meta, ) else: + if func_params: + raise ValueError( + f"Resource {uri!r} has no URI template variables, but the " + f"handler declares parameters {func_params}. Add matching " + f"{{...}} variables to the URI or remove the parameters." + ) + if context_param is not None: + raise ValueError( + f"Resource {uri!r} has no URI template variables, but the " + f"handler declares a Context parameter. Context injection " + f"for static resources is not supported. " + f"Add a template variable to the URI or remove the " + f"Context parameter." + ) # Register as regular resource resource = FunctionResource.from_function( fn=fn, diff --git a/src/mcp/shared/path_security.py b/src/mcp/shared/path_security.py new file mode 100644 index 0000000000..0d338eacc2 --- /dev/null +++ b/src/mcp/shared/path_security.py @@ -0,0 +1,176 @@ +"""Filesystem path safety primitives for resource handlers. + +These functions help MCP servers reject paths that would resolve +outside the served root when extracted URI template parameters are +used in filesystem operations. They are standalone utilities usable from both the +high-level :class:`~mcp.server.mcpserver.MCPServer` and lowlevel server +implementations. + +The canonical safe pattern:: + + from mcp.shared.path_security import safe_join + + @mcp.resource("file://docs/{+path}") + def read_doc(path: str) -> str: + return safe_join("/data/docs", path).read_text() +""" + +import string +from pathlib import Path + +__all__ = ["PathEscapeError", "contains_path_traversal", "is_absolute_path", "safe_join"] + + +class PathEscapeError(ValueError): + """Raised by :func:`safe_join` when the resolved path escapes the base.""" + + +def contains_path_traversal(value: str) -> bool: + r"""Check whether a value, treated as a relative path, escapes its origin. + + This is a **base-free** check: it does not know the sandbox root, so + it detects only whether ``..`` components would move above the + starting point. Use :func:`safe_join` when you know the root — it + additionally catches symlink escapes and absolute-path injection. + + Note: + This is a string-level check on the value as supplied. It does + not model platform-specific filesystem normalisation (e.g. Win32 + stripping of trailing dots and spaces from the final path + component). For filesystem access, use :func:`safe_join`, which + resolves through the OS and verifies containment. + + The check is component-based: ``..`` is dangerous only as a + standalone path segment, not as a substring. Both ``/`` and ``\`` + are treated as separators. + + Example:: + + >>> contains_path_traversal("a/b/c") + False + >>> contains_path_traversal("../etc") + True + >>> contains_path_traversal("a/../../b") + True + >>> contains_path_traversal("a/../b") + False + >>> contains_path_traversal("1.0..2.0") + False + >>> contains_path_traversal("..") + True + + Args: + value: A string that may be used as a filesystem path. + + Returns: + ``True`` if the path would escape its starting directory. + """ + depth = 0 + for part in value.replace("\\", "/").split("/"): + if part == "..": + depth -= 1 + if depth < 0: + return True + elif part and part != ".": + depth += 1 + return False + + +def is_absolute_path(value: str) -> bool: + r"""Check whether a value is an absolute filesystem path. + + Absolute paths are dangerous when joined onto a base: in Python, + ``Path("/data") / "/etc/passwd"`` yields ``/etc/passwd`` — the + absolute right-hand side silently discards the base. + + Detects POSIX absolute (``/foo``), Windows drive-absolute + (``C:\foo``) and drive-relative (``C:foo``), and Windows + UNC/root-relative (``\\server\share``, ``\foo``). + + Example:: + + >>> is_absolute_path("relative/path") + False + >>> is_absolute_path("/etc/passwd") + True + >>> is_absolute_path("C:\\Windows") + True + >>> is_absolute_path("") + False + + Args: + value: A string that may be used as a filesystem path. + + Returns: + ``True`` if the path is absolute on any common platform. + """ + if not value: + return False + if value[0] in ("/", "\\"): + return True + # Windows drive form: C:, C:\, C:foo (drive-relative). A drive- + # relative right-hand side discards the join base when drives + # differ, so flag it even though PureWindowsPath.is_absolute() + # is False. This means single-letter-prefixed identifiers like + # "x:y" also match — opt out via ResourceSecurity(exempt_params=). + if len(value) >= 2 and value[1] == ":" and value[0] in string.ascii_letters: + return True + return False + + +def safe_join(base: str | Path, *parts: str) -> Path: + """Join path components onto a base, rejecting escapes. + + Resolves the joined path and verifies it remains within ``base``. + This is the **gold-standard** check: it catches ``..`` traversal, + absolute-path injection, and symlink escapes that the base-free + checks cannot. + + The symlink check is point-in-time: a directory swapped for a + symlink between this call and the caller's subsequent open would not + be re-checked. Handlers serving a tree that may be modified + concurrently should additionally open with ``O_NOFOLLOW`` or use + platform path-confinement primitives. + + Example:: + + >>> safe_join("/data/docs", "readme.txt") + PosixPath('/data/docs/readme.txt') + >>> safe_join("/data/docs", "../../../etc/passwd") + Traceback (most recent call last): + ... + PathEscapeError: ... + + Args: + base: The sandbox root. May be relative; it will be resolved. + parts: Path components to join. Each is checked for null bytes + and absolute form before joining. + + Returns: + The resolved path, verified to be within ``base`` at resolution + time. + + Raises: + PathEscapeError: If any part contains a null byte, any part is + absolute, or the resolved path is not contained within the + resolved base. + """ + base_resolved = Path(base).resolve() + + for part in parts: + # Null bytes pass through Path construction but fail at the + # syscall boundary with a cryptic error. Reject here so callers + # get a clear PathEscapeError instead. + if "\0" in part: + raise PathEscapeError(f"Path component contains a null byte; refusing to join onto {base_resolved}") + # Absolute parts would silently discard everything to the left + # in Path's / operator. + if is_absolute_path(part): + raise PathEscapeError(f"Path component {part!r} is absolute; refusing to join onto {base_resolved}") + + target = base_resolved.joinpath(*parts).resolve() + + if not target.is_relative_to(base_resolved): + raise PathEscapeError(f"Path {target} escapes base {base_resolved}") + + return target diff --git a/src/mcp/shared/uri_template.py b/src/mcp/shared/uri_template.py new file mode 100644 index 0000000000..dc57bfa757 --- /dev/null +++ b/src/mcp/shared/uri_template.py @@ -0,0 +1,1116 @@ +"""RFC 6570 URI Templates with bidirectional support. + +Provides both expansion (template + variables → URI) and matching +(URI → variables). RFC 6570 only specifies expansion; matching is the +inverse operation needed by MCP servers to route ``resources/read`` +requests to handlers. + +Supports Levels 1-3 fully, plus Level 4 explode modifier for path-like +operators (``{/var*}``, ``{.var*}``, ``{;var*}``). The Level 4 prefix +modifier (``{var:N}``) and query-explode (``{?var*}``) are not supported. + +Matching semantics +------------------ + +Matching is not specified by RFC 6570 (§1.4 explicitly defers to regex +languages). This implementation uses a two-ended scan that never +backtracks: match time is O(n·v) where n is URI length and v is the +number of template variables. Realistic templates have v < 10, making +this effectively linear; there is no input that produces +superpolynomial time. + +A template may contain **at most one multi-segment variable** — +``{+var}``, ``{#var}``, or an explode-modified variable (``{/var*}``, +``{.var*}``, ``{;var*}``). This variable greedily consumes whatever the +surrounding bounded variables and literals do not. Two such variables +in one template are inherently ambiguous (which one gets the extra +segment?) and are rejected at parse time. So are any two variables +adjacent with no literal between them — including a variable adjacent +to the multi-segment variable: the scan has nothing to anchor the +boundary on. Operators that emit their own lead character supply that +literal themselves, so ``{+path}{.ext}`` and ``{a}{.b}`` are fine +while ``{+path}{ext}`` and ``{a}{b}`` are not. + +Bounded variables before the multi-segment variable match **lazily** +(first occurrence of the following literal); those after match +**greedily** (last occurrence of the preceding literal). Templates +without a multi-segment variable match greedily throughout, identical +to regex semantics. + +Reserved expansion ``{+var}`` leaves ``?`` and ``#`` unencoded, but +the scan stops at those characters so ``{+path}{?q}`` can separate path +from query. A value containing a literal ``?`` or ``#`` expands fine +but will not round-trip through ``match()``. +""" + +from __future__ import annotations + +import re +from collections.abc import Mapping, Sequence +from dataclasses import dataclass, field +from typing import Literal, TypeAlias, cast +from urllib.parse import quote, unquote + +__all__ = [ + "DEFAULT_MAX_TEMPLATE_LENGTH", + "DEFAULT_MAX_VARIABLES", + "DEFAULT_MAX_URI_LENGTH", + "InvalidUriTemplate", + "Operator", + "UriTemplate", + "Variable", +] + +Operator = Literal["", "+", "#", ".", "/", ";", "?", "&"] + +_OPERATORS: frozenset[str] = frozenset({"+", "#", ".", "/", ";", "?", "&"}) + +# RFC 6570 §2.3: varname = varchar *(["."] varchar), varchar = ALPHA / DIGIT / "_" +# Dots appear only between varchar groups — not consecutive, not trailing. +# (Percent-encoded varchars are technically allowed but unseen in practice.) +_VARNAME_RE = re.compile(r"^[A-Za-z0-9_]+(?:\.[A-Za-z0-9_]+)*$") + +DEFAULT_MAX_TEMPLATE_LENGTH = 8_192 +DEFAULT_MAX_VARIABLES = 256 +DEFAULT_MAX_URI_LENGTH = 65_536 + +# RFC 3986 reserved characters, kept unencoded by {+var} and {#var}. +_RESERVED = ":/?#[]@!$&'()*+,;=" + + +@dataclass(frozen=True) +class _OperatorSpec: + """Expansion behavior for a single operator (RFC 6570 §3.2, Table in §A).""" + + prefix: str + """Leading character emitted before the first variable.""" + separator: str + """Character between variables (and between exploded list items).""" + named: bool + """Emit ``name=value`` pairs (query/path-param style) rather than bare values.""" + allow_reserved: bool + """Keep reserved characters unencoded ({+var}, {#var}).""" + ifemp: str + """Suffix after a named variable whose expanded value is empty (RFC §A): '' for ;, '=' for ?/&.""" + + +_OPERATOR_SPECS: dict[Operator, _OperatorSpec] = { + "": _OperatorSpec(prefix="", separator=",", named=False, allow_reserved=False, ifemp=""), + "+": _OperatorSpec(prefix="", separator=",", named=False, allow_reserved=True, ifemp=""), + "#": _OperatorSpec(prefix="#", separator=",", named=False, allow_reserved=True, ifemp=""), + ".": _OperatorSpec(prefix=".", separator=".", named=False, allow_reserved=False, ifemp=""), + "/": _OperatorSpec(prefix="/", separator="/", named=False, allow_reserved=False, ifemp=""), + ";": _OperatorSpec(prefix=";", separator=";", named=True, allow_reserved=False, ifemp=""), + "?": _OperatorSpec(prefix="?", separator="&", named=True, allow_reserved=False, ifemp="="), + "&": _OperatorSpec(prefix="&", separator="&", named=True, allow_reserved=False, ifemp="="), +} + +# Per-operator stop characters for the linear scan. A bounded variable's +# value ends at the first occurrence of any character in its stop set, +# mirroring the character-class boundaries a regex would use but without +# the backtracking. +_STOP_CHARS: dict[Operator, str] = { + "": "/?#&,", # simple: everything structural is pct-encoded + "+": "?#", # reserved: / allowed, stop at query/fragment + "#": "", # fragment: tail of URI, nothing stops it + ".": "./?#", # label: stop at next . + "/": "/?#", # path segment: stop at next / + ";": ";/?#", # path-param value (may be empty: ;name) + "?": "&#", # query value (may be empty: ?name=) + "&": "&#", # query-cont value +} + + +class InvalidUriTemplate(ValueError): + """Raised when a URI template string is malformed or unsupported. + + Attributes: + template: The template string that failed to parse. + position: Character offset where the error was detected, or None + if the error is not tied to a specific position. + """ + + def __init__(self, message: str, *, template: str, position: int | None = None) -> None: + super().__init__(message) + self.template = template + self.position = position + + +@dataclass(frozen=True) +class Variable: + """A single variable within a URI template expression.""" + + name: str + operator: Operator + explode: bool = False + + +@dataclass +class _Expression: + """A parsed ``{...}`` expression: one operator, one or more variables.""" + + operator: Operator + variables: list[Variable] + + +_Part = str | _Expression + + +@dataclass(frozen=True) +class _Lit: + """A literal run in the flattened match-atom sequence.""" + + text: str + + +@dataclass(frozen=True) +class _Cap: + """A single-variable capture in the flattened match-atom sequence. + + ``ifemp`` marks the ``;`` operator's optional-equals quirk: ``{;id}`` + expands to ``;id=value`` or bare ``;id`` when the value is empty, so + the scan must accept both forms. + """ + + var: Variable + ifemp: bool = False + + +_Atom: TypeAlias = _Lit | _Cap + + +def _is_greedy(var: Variable) -> bool: + """Return True if this variable can span multiple path segments. + + Reserved/fragment expansion and explode variables are the only + constructs whose match range is not bounded by a single structural + delimiter. A template may contain at most one such variable. + """ + return var.explode or var.operator in ("+", "#") + + +def _is_str_sequence(value: object) -> bool: + """Check if value is a non-string sequence whose items are all strings.""" + if isinstance(value, str) or not isinstance(value, Sequence): + return False + seq = cast(Sequence[object], value) + return all(isinstance(item, str) for item in seq) + + +_PCT_TRIPLET_RE = re.compile(r"%[0-9A-Fa-f]{2}") + + +def _encode(value: str, *, allow_reserved: bool) -> str: + """Percent-encode a value per RFC 6570 §3.2.1. + + Simple expansion encodes everything except unreserved characters. + Reserved expansion (``{+var}``, ``{#var}``) additionally keeps + RFC 3986 reserved characters intact and passes through existing + ``%XX`` pct-triplets unchanged (RFC 6570 §3.2.3). A bare ``%`` not + followed by two hex digits is still encoded to ``%25``. + """ + if not allow_reserved: + return quote(value, safe="") + + # Reserved expansion: walk the string, pass through triplets as-is, + # quote the gaps between them. A bare % with no triplet lands in a + # gap and gets encoded normally. + out: list[str] = [] + last = 0 + for m in _PCT_TRIPLET_RE.finditer(value): + out.append(quote(value[last : m.start()], safe=_RESERVED)) + out.append(m.group()) + last = m.end() + out.append(quote(value[last:], safe=_RESERVED)) + return "".join(out) + + +def _expand_expression(expr: _Expression, variables: Mapping[str, str | Sequence[str]]) -> str: + """Expand a single ``{...}`` expression into its URI fragment. + + Walks the expression's variables, encoding and joining defined ones + according to the operator's spec. Undefined variables are skipped + (RFC 6570 §2.3); if all are undefined, the expression contributes + nothing (no prefix is emitted). + """ + spec = _OPERATOR_SPECS[expr.operator] + rendered: list[str] = [] + + for var in expr.variables: + if var.name not in variables: + # Undefined: skip entirely, no placeholder. + continue + + value = variables[var.name] + + # Explicit type guard: reject non-str scalars with a clear message + # rather than a confusing "not iterable" from the sequence branch. + if not isinstance(value, str) and not _is_str_sequence(value): + raise TypeError(f"Variable {var.name!r} must be str or a sequence of str, got {type(value).__name__}") + + if isinstance(value, str): + encoded = _encode(value, allow_reserved=spec.allow_reserved) + if spec.named: + rendered.append(f"{var.name}{spec.ifemp}" if value == "" else f"{var.name}={encoded}") + else: + rendered.append(encoded) + else: + # Sequence value. + items = [_encode(v, allow_reserved=spec.allow_reserved) for v in value] + if not items: + continue + if var.explode: + # Each item gets the operator's separator; named ops repeat the key. + if spec.named: + rendered.append( + spec.separator.join(f"{var.name}{spec.ifemp}" if v == "" else f"{var.name}={v}" for v in items) + ) + else: + rendered.append(spec.separator.join(items)) + else: + # Non-explode: comma-join into a single value, then apply + # ifemp to the joined result (RFC §3.2.1: behaves as if the + # value were the joined string). + joined = ",".join(items) + if spec.named: + rendered.append(f"{var.name}{spec.ifemp}" if joined == "" else f"{var.name}={joined}") + else: + rendered.append(joined) + + if not rendered: + return "" + return spec.prefix + spec.separator.join(rendered) + + +@dataclass(frozen=True) +class UriTemplate: + """A parsed RFC 6570 URI template. + + Construct via :meth:`parse`. Instances are immutable and hashable; + equality is based on the template string alone. + """ + + template: str + _parts: list[_Part] = field(repr=False, compare=False) + _variables: list[Variable] = field(repr=False, compare=False) + _prefix: list[_Atom] = field(repr=False, compare=False) + _greedy: Variable | None = field(repr=False, compare=False) + _suffix: list[_Atom] = field(repr=False, compare=False) + _query_variables: list[Variable] = field(repr=False, compare=False) + + @staticmethod + def is_template(value: str) -> bool: + """Check whether a string contains URI template expressions. + + A cheap heuristic for distinguishing concrete URIs from templates + without the cost of full parsing. Returns ``True`` if the string + contains at least one ``{...}`` pair. + + Example:: + + >>> UriTemplate.is_template("file://docs/{name}") + True + >>> UriTemplate.is_template("file://docs/readme.txt") + False + + Note: + This does not validate the template. A ``True`` result does + not guarantee :meth:`parse` will succeed. + """ + open_i = value.find("{") + return open_i != -1 and value.find("}", open_i) != -1 + + @classmethod + def parse( + cls, + template: str, + *, + max_length: int = DEFAULT_MAX_TEMPLATE_LENGTH, + max_variables: int = DEFAULT_MAX_VARIABLES, + ) -> UriTemplate: + """Parse a URI template string. + + Args: + template: An RFC 6570 URI template. + max_length: Maximum permitted length of the template string. + Guards against resource exhaustion. + max_variables: Maximum number of variables permitted across + all expressions. Counting variables rather than + ``{...}`` expressions closes the gap where a single + ``{v0,v1,...,vN}`` expression packs arbitrarily many + variables under one expression count. + + Raises: + InvalidUriTemplate: If the template is malformed, exceeds the + size limits, or uses unsupported RFC 6570 features. + """ + if len(template) > max_length: + raise InvalidUriTemplate( + f"Template exceeds maximum length of {max_length}", + template=template, + ) + + parts, variables = _parse(template, max_variables=max_variables) + + # Trailing {?...}/{&...} expressions are split off and matched as + # a query string (order-agnostic, partial, extras ignored) rather + # than via the linear scan. + path_parts, query_vars = _split_query_tail(parts) + atoms = _flatten(path_parts) + prefix, greedy, suffix = _partition_greedy(atoms, template) + + return cls( + template=template, + _parts=parts, + _variables=variables, + _prefix=prefix, + _greedy=greedy, + _suffix=suffix, + _query_variables=query_vars, + ) + + @property + def variables(self) -> list[Variable]: + """All variables in the template, in order of appearance.""" + return list(self._variables) + + @property + def variable_names(self) -> list[str]: + """All variable names in the template, in order of appearance.""" + return [v.name for v in self._variables] + + @property + def query_variable_names(self) -> frozenset[str]: + """Names of variables that :meth:`match` treats as optional query parameters. + + These are the variables in a trailing run of ``{?...}``/``{&...}`` + expressions, which are matched leniently: a URI that omits some + (or all) of them still matches, and the omitted names are simply + absent from the result. Any value bound to such a name therefore + needs a fallback for the omitted case. + + Every other variable is bound on every successful :meth:`match` + (possibly to an empty string) and is *not* in this set. That + includes a ``{&...}`` expression with no preceding ``{?...}``: it + never emits the ``?`` the lenient query split keys on, so it is + matched strictly. + """ + return frozenset(v.name for v in self._query_variables) + + def expand(self, variables: Mapping[str, str | Sequence[str]]) -> str: + """Expand the template by substituting variable values. + + String values are percent-encoded according to their operator: + simple ``{var}`` encodes reserved characters; ``{+var}`` and + ``{#var}`` leave them intact. Sequence values are joined with + commas for non-explode variables, or with the operator's + separator for explode variables. + + Example:: + + >>> t = UriTemplate.parse("file://docs/{name}") + >>> t.expand({"name": "hello world.txt"}) + 'file://docs/hello%20world.txt' + + >>> t = UriTemplate.parse("file://docs/{+path}") + >>> t.expand({"path": "src/main.py"}) + 'file://docs/src/main.py' + + >>> t = UriTemplate.parse("/search{?q,lang}") + >>> t.expand({"q": "mcp", "lang": "en"}) + '/search?q=mcp&lang=en' + + >>> t = UriTemplate.parse("/files{/path*}") + >>> t.expand({"path": ["a", "b", "c"]}) + '/files/a/b/c' + + Args: + variables: Values for each template variable. Keys must be + strings; values must be ``str`` or a sequence of ``str``. + + Returns: + The expanded URI string. + + Note: + Per RFC 6570, variables absent from the mapping are + **silently omitted**. This is the correct behavior for + optional query parameters (``{?page}`` with no page yields + no ``?page=``), but for required path segments it produces + a structurally incomplete URI. If you need all variables + present, validate before calling:: + + missing = set(t.variable_names) - variables.keys() + if missing: + raise ValueError(f"Missing: {missing}") + + Raises: + TypeError: If a value is neither ``str`` nor an iterable of + ``str``. Non-string scalars (``int``, ``None``) are not + coerced. + """ + out: list[str] = [] + for part in self._parts: + if isinstance(part, str): + out.append(part) + else: + out.append(_expand_expression(part, variables)) + return "".join(out) + + def match(self, uri: str, *, max_uri_length: int = DEFAULT_MAX_URI_LENGTH) -> dict[str, str | list[str]] | None: + """Match a concrete URI against this template and extract variables. + + This is the inverse of :meth:`expand`. The URI is matched via a + linear scan of the template and captured values are + percent-decoded. The round-trip ``match(expand({k: v})) == {k: v}`` + holds when ``v`` does not contain its operator's separator + unencoded: ``{.ext}`` with ``ext="tar.gz"`` expands to + ``.tar.gz`` but does not match — the scan stops ``ext`` at the + first ``.`` and the trailing ``.gz`` has nothing to consume it. + RFC 6570 §1.4 notes this is an inherent reversal limitation. + + Matching is structural at the URI level only: a simple ``{name}`` + will not match across a literal ``/`` in the URI (the scan stops + there), but a percent-encoded ``%2F`` that decodes to ``/`` is + accepted as part of the value. Path-safety validation belongs at + a higher layer; see :mod:`mcp.shared.path_security`. + + Example:: + + >>> t = UriTemplate.parse("file://docs/{name}") + >>> t.match("file://docs/readme.txt") + {'name': 'readme.txt'} + >>> t.match("file://docs/hello%20world.txt") + {'name': 'hello world.txt'} + + >>> t = UriTemplate.parse("file://docs/{+path}") + >>> t.match("file://docs/src/main.py") + {'path': 'src/main.py'} + + >>> t = UriTemplate.parse("/files{/path*}") + >>> t.match("/files/a/b/c") + {'path': ['a', 'b', 'c']} + + **Query parameters** (``{?q,lang}`` at the end of a template) + are matched leniently: order-agnostic, partial, and unrecognized + params are ignored. Absent params are omitted from the result so + downstream function defaults can apply:: + + >>> t = UriTemplate.parse("logs://{service}{?since,level}") + >>> t.match("logs://api") + {'service': 'api'} + >>> t.match("logs://api?level=error") + {'service': 'api', 'level': 'error'} + >>> t.match("logs://api?level=error&since=5m&utm=x") + {'service': 'api', 'since': '5m', 'level': 'error'} + + Args: + uri: A concrete URI string. + max_uri_length: Maximum permitted length of the input URI. + Oversized inputs return ``None`` without scanning, + guarding against resource exhaustion. + + Returns: + A mapping from variable names to decoded values (``str`` for + scalar variables, ``list[str]`` for explode variables), or + ``None`` if the URI does not match the template or exceeds + ``max_uri_length``. + """ + if len(uri) > max_uri_length: + return None + + if self._query_variables: + # Two-phase: scan matches the path, the query is split and + # decoded manually. Query params may be partial, reordered, + # or include extras; absent params stay absent so downstream + # defaults can apply. Fragment is stripped first since the + # template's {?...} tail never describes a fragment. + before_fragment, _, _ = uri.partition("#") + path, _, query = before_fragment.partition("?") + result = self._scan(path) + if result is None: + return None + if query: + parsed = _parse_query(query) + for var in self._query_variables: + if var.name in parsed: + result[var.name] = parsed[var.name] + return result + + return self._scan(uri) + + def _scan(self, uri: str) -> dict[str, str | list[str]] | None: + """Run the two-ended linear scan against the path portion of a URI.""" + n = len(uri) + + if self._greedy is None: + # No greedy var: the suffix IS the whole template, scanned + # right-to-left and anchored so atoms[0] matches at position 0. + suffix = _scan_suffix(self._suffix, uri, n, anchored=True) + if suffix is None: + return None + suffix_result, suffix_start = suffix + return suffix_result if suffix_start == 0 else None + + # Greedy var present. The parser rejects a capture adjacent to + # the greedy slot, so a non-empty suffix begins with a _Lit whose + # rfind-derived anchor does not depend on how far the prefix + # scans. Scan the suffix first, then give the prefix that exact + # position as its ceiling so it cannot consume past the anchor. + suffix = _scan_suffix(self._suffix, uri, n, anchored=False) + if suffix is None: + return None + suffix_result, suffix_start = suffix + prefix = _scan_prefix(self._prefix, uri, 0, suffix_start) + if prefix is None: + return None + prefix_result, prefix_end = prefix + + # Prefix consumed [0, prefix_end); suffix consumed [suffix_start, n); + # the greedy var takes the gap. The prefix scan is bounded by + # suffix_start, so this holds by construction; guard explicitly + # rather than asserting so a future regression surfaces as a + # non-match, not an exception. + if suffix_start < prefix_end: + return None # pragma: no cover - unreachable while bounds hold + middle = uri[prefix_end:suffix_start] + greedy_value = _extract_greedy(self._greedy, middle) + if greedy_value is None: + return None + + return {**prefix_result, self._greedy.name: greedy_value, **suffix_result} + + def __str__(self) -> str: + return self.template + + +def _parse_query(query: str) -> dict[str, str]: + """Parse a query string into a name→value mapping. + + Unlike ``urllib.parse.parse_qs``, this follows RFC 3986 semantics: + ``+`` is a literal sub-delim, not a space. Form-urlencoding treats + ``+`` as space for HTML form submissions, but RFC 6570 and MCP + resource URIs follow RFC 3986 where only ``%20`` encodes a space. + + Parameter names are **not** percent-decoded. RFC 6570 expansion + never encodes variable names, so a legitimate match will always + have the name in literal form. Decoding names would let + ``%74oken=evil&token=real`` shadow the real ``token`` parameter + via first-wins. + + Duplicate keys keep the first value. Pairs without ``=`` are + treated as empty-valued. + """ + result: dict[str, str] = {} + for pair in query.split("&"): + name, _, value = pair.partition("=") + if name and name not in result: + result[name] = unquote(value) + return result + + +def _extract_greedy(var: Variable, raw: str) -> str | list[str] | None: + """Decode the greedy variable's isolated middle span. + + For scalar greedy (``{+var}``, ``{#var}``) this is a stop-char + validation and a single ``unquote``. For explode variables the span + is a run of separator-delimited segments (``/a/b/c`` or + ``;keys=a;keys=b``) that is split, validated, and decoded per item. + """ + spec = _OPERATOR_SPECS[var.operator] + stops = _STOP_CHARS[var.operator] + + if not var.explode: + if any(c in stops for c in raw): + return None + return unquote(raw) + + sep = spec.separator + if not raw: + return [] + # A non-empty explode span must begin with the separator: {/a*} + # expands to "/x/y", never "x/y". The scan does not consume the + # separator itself, so it must be the first character here. + if raw[0] != sep: + return None + # Segments must not contain the operator's non-separator stop + # characters (e.g. {/path*} segments may contain neither ? nor #). + body_stops = set(stops) - {sep} + if any(c in body_stops for c in raw): + return None + + segments: list[str] = [] + prefix = f"{var.name}=" + # split()[0] is always "" because raw starts with the separator; + # subsequent empties are legitimate values ({/path*} with + # ["a","","c"] expands to /a//c). + for seg in raw.split(sep)[1:]: + if spec.named: + # Named explode emits name=value per item (or bare name + # under ; with empty value). Validate the name and strip + # the prefix before decoding. + if seg.startswith(prefix): + seg = seg[len(prefix) :] + elif seg == var.name: + seg = "" + else: + return None + segments.append(unquote(seg)) + return segments + + +def _split_query_tail(parts: list[_Part]) -> tuple[list[_Part], list[Variable]]: + """Separate trailing ``?``/``&`` expressions from the path portion. + + Lenient query matching (order-agnostic, partial, ignores extras) + applies when a template ends with one or more consecutive ``?``/``&`` + expressions and the preceding path portion contains no literal + ``?``. If the path has a literal ``?`` (e.g., ``?fixed=1{&page}``), + the URI's ``?`` split won't align with the template's expression + boundary, so the strict scan is used instead. + + Returns: + A pair ``(path_parts, query_vars)``. If lenient matching does + not apply, ``query_vars`` is empty and ``path_parts`` is the + full input. + """ + split = len(parts) + for i in range(len(parts) - 1, -1, -1): + part = parts[i] + if isinstance(part, _Expression) and part.operator in ("?", "&"): + split = i + else: + break + + if split == len(parts): + return parts, [] + + # The tail must start with a {?...} expression so that expand() + # emits a ? the URI can split on. A standalone {&page} expands + # with an & prefix, which partition("?") won't find. + first = parts[split] + assert isinstance(first, _Expression) + if first.operator != "?": + return parts, [] + + # If the path portion contains a literal ?/# or a {?...}/{#...} + # expression, lenient matching's partition("#") then partition("?") + # would strip content the path scan expects to see. Fall back to + # the strict scan. + for part in parts[:split]: + if isinstance(part, str): + if "?" in part or "#" in part: + return parts, [] + elif part.operator in ("?", "#"): + return parts, [] + + query_vars: list[Variable] = [] + for part in parts[split:]: + assert isinstance(part, _Expression) + query_vars.extend(part.variables) + + return parts[:split], query_vars + + +def _parse(template: str, *, max_variables: int) -> tuple[list[_Part], list[Variable]]: + """Split a template into an ordered sequence of literals and expressions. + + Walks the string, alternating between collecting literal runs and + parsing ``{...}`` expressions. The resulting ``parts`` sequence + preserves positional interleaving so ``match()`` and ``expand()`` can + walk it in order. + + Raises: + InvalidUriTemplate: On unclosed braces, too many expressions, or + any error surfaced by :func:`_parse_expression`. + """ + parts: list[_Part] = [] + variables: list[Variable] = [] + i = 0 + n = len(template) + + while i < n: + # Find the next expression opener from the current cursor. + brace = template.find("{", i) + + if brace == -1: + # No more expressions; everything left is a trailing literal. + parts.append(template[i:]) + break + + if brace > i: + # Literal text between cursor and the brace. + parts.append(template[i:brace]) + + end = template.find("}", brace) + if end == -1: + raise InvalidUriTemplate( + f"Unclosed expression at position {brace}", + template=template, + position=brace, + ) + + # Delegate body (between braces, exclusive) to the expression parser. + expr = _parse_expression(template, template[brace + 1 : end], brace) + parts.append(expr) + variables.extend(expr.variables) + + if len(variables) > max_variables: + raise InvalidUriTemplate( + f"Template exceeds maximum of {max_variables} variables", + template=template, + ) + + # Advance past the closing brace. + i = end + 1 + + _check_duplicate_variables(template, variables) + _check_single_query_expression(template, parts) + return parts, variables + + +def _parse_expression(template: str, body: str, pos: int) -> _Expression: + """Parse the body of a single ``{...}`` expression. + + The body is everything between the braces. It consists of an optional + leading operator character followed by one or more comma-separated + variable specifiers. Each specifier is a name with an optional + trailing ``*`` (explode modifier). + + Args: + template: The full template string, for error reporting. + body: The expression body, braces excluded. + pos: Character offset of the opening brace, for error reporting. + + Raises: + InvalidUriTemplate: On empty body, invalid variable names, or + unsupported modifiers. + """ + if not body: + raise InvalidUriTemplate(f"Empty expression at position {pos}", template=template, position=pos) + + # Peel off the operator, if any. Membership check justifies the cast. + operator: Operator = "" + if body[0] in _OPERATORS: + operator = cast(Operator, body[0]) + body = body[1:] + if not body: + raise InvalidUriTemplate( + f"Expression has operator but no variables at position {pos}", + template=template, + position=pos, + ) + + # Remaining body is comma-separated variable specs: name[*] + variables: list[Variable] = [] + for spec in body.split(","): + if ":" in spec: + raise InvalidUriTemplate( + f"Prefix modifier {{var:N}} is not supported (in {spec!r} at position {pos})", + template=template, + position=pos, + ) + + explode = spec.endswith("*") + name = spec[:-1] if explode else spec + + if not _VARNAME_RE.match(name): + raise InvalidUriTemplate( + f"Invalid variable name {name!r} at position {pos}", + template=template, + position=pos, + ) + + # Explode only makes sense for operators that repeat a separator. + # Simple/reserved/fragment have no per-item separator; query-explode + # needs order-agnostic dict matching which we don't support yet. + if explode and operator in ("", "+", "#", "?", "&"): + raise InvalidUriTemplate( + f"Explode modifier on {{{operator}{name}*}} is not supported for matching", + template=template, + position=pos, + ) + + variables.append(Variable(name=name, operator=operator, explode=explode)) + + return _Expression(operator=operator, variables=variables) + + +def _check_duplicate_variables(template: str, variables: list[Variable]) -> None: + """Reject templates that use the same variable name more than once. + + RFC 6570 requires repeated variables to expand to the same value, + which would require backreference matching with potentially + exponential cost. Rather than silently returning only the last + captured value, we reject at parse time. + + Raises: + InvalidUriTemplate: If any variable name appears more than once. + """ + seen: set[str] = set() + for var in variables: + if var.name in seen: + raise InvalidUriTemplate( + f"Variable {var.name!r} appears more than once; repeated variables are not supported", + template=template, + ) + seen.add(var.name) + + +def _check_single_query_expression(template: str, parts: list[_Part]) -> None: + """Reject templates with more than one ``{?...}`` expression. + + The ``?`` operator emits a leading ``?``, so two such expressions + expand to a URI with two ``?`` characters — malformed per RFC 3986 + §3.4. Use ``{?a,b}`` or ``{?a}{&b}`` for multiple query parameters. + """ + seen = False + for part in parts: + if isinstance(part, _Expression) and part.operator == "?": + if seen: + raise InvalidUriTemplate( + "Template contains more than one {?...} expression; " + "use {?a,b} or {?a}{&b} for multiple query parameters", + template=template, + ) + seen = True + + +def _flatten(parts: list[_Part]) -> list[_Atom]: + """Lower expressions into a flat sequence of literals and single-variable captures. + + Operator prefixes and separators become explicit ``_Lit`` atoms so + the scan only ever sees two atom kinds. Adjacent literals are + coalesced so that anchor-finding (``find``/``rfind``) operates on + the longest possible literal, reducing false matches. + + Explode variables emit no lead literal: the explode capture + includes its own separator-prefixed repetitions (``{/a*}`` → + ``/x/y/z``, not ``/`` then ``x/y/z``). + """ + atoms: list[_Atom] = [] + + def push_lit(text: str) -> None: + if not text: + return + if atoms and isinstance(atoms[-1], _Lit): + atoms[-1] = _Lit(atoms[-1].text + text) + else: + atoms.append(_Lit(text)) + + for part in parts: + if isinstance(part, str): + push_lit(part) + continue + spec = _OPERATOR_SPECS[part.operator] + for i, var in enumerate(part.variables): + lead = spec.prefix if i == 0 else spec.separator + if var.explode: + atoms.append(_Cap(var)) + elif spec.named: + # ; uses ifemp (bare name when empty); ? and & always + # emit name= so the equals is part of the literal. + if part.operator == ";": + push_lit(f"{lead}{var.name}") + atoms.append(_Cap(var, ifemp=True)) + else: + push_lit(f"{lead}{var.name}=") + atoms.append(_Cap(var)) + else: + push_lit(lead) + atoms.append(_Cap(var)) + return atoms + + +def _partition_greedy(atoms: list[_Atom], template: str) -> tuple[list[_Atom], Variable | None, list[_Atom]]: + """Split atoms at the single greedy variable, if any. + + Returns ``(prefix, greedy_var, suffix)``. If there is no greedy + variable the entire atom list is returned as the suffix so that + the right-to-left scan (which matches regex-greedy semantics) + handles it. + + Raises: + InvalidUriTemplate: If two variables are adjacent with no + literal between them — whether or not one is the + multi-segment variable, the scan has nothing to anchor the + boundary on — or if more than one multi-segment variable + is present (two are inherently ambiguous: there is no + principled way to decide which one absorbs an extra + segment). + """ + greedy_idx: int | None = None + prev: _Atom | None = None + for i, atom in enumerate(atoms): + if isinstance(atom, _Cap): + if isinstance(prev, _Cap): + raise InvalidUriTemplate( + f"Variables {prev.var.name!r} and {atom.var.name!r} are adjacent " + "with no literal separator; matching cannot determine where one " + "ends and the other begins. Add a literal between them or use a " + "single variable.", + template=template, + ) + if _is_greedy(atom.var): + if greedy_idx is not None: + raise InvalidUriTemplate( + "Template contains more than one multi-segment variable " + "({+var}, {#var}, or explode modifier); matching would be ambiguous", + template=template, + ) + greedy_idx = i + prev = atom + if greedy_idx is None: + return [], None, atoms + greedy = atoms[greedy_idx] + assert isinstance(greedy, _Cap) + return atoms[:greedy_idx], greedy.var, atoms[greedy_idx + 1 :] + + +def _scan_suffix( + atoms: Sequence[_Atom], uri: str, end: int, *, anchored: bool +) -> tuple[dict[str, str | list[str]], int] | None: + """Scan atoms right-to-left from ``end``, returning captures and start position. + + Each bounded variable takes the minimum span that lets its + preceding literal match (found via ``rfind``), which makes the + *first* variable in template order greedy — identical to Python + regex semantics for a sequence of greedy groups. + + When ``anchored`` is true the atom sequence is the entire template + (no greedy variable), so ``atoms[0]`` must match at URI position 0 + rather than at its rightmost occurrence. + """ + result: dict[str, str | list[str]] = {} + pos = end + i = len(atoms) - 1 + while i >= 0: + atom = atoms[i] + if isinstance(atom, _Lit): + n = len(atom.text) + if pos < n or uri[pos - n : pos] != atom.text: + return None + pos -= n + i -= 1 + continue + + var = atom.var + stops = _STOP_CHARS[var.operator] + prev = atoms[i - 1] if i > 0 else None + + if atom.ifemp: + # ;name or ;name=value. The preceding _Lit is ";name". + # Try empty first: if the lit ends at pos the value is + # absent (RFC ifemp). Otherwise require =value. + assert isinstance(prev, _Lit) + if uri.endswith(prev.text, 0, pos): + result[var.name] = "" + i -= 1 + continue + earliest = pos + while earliest > 0 and uri[earliest - 1] not in stops: + earliest -= 1 + eq = uri.find("=", earliest, pos) + if eq == -1: + return None + result[var.name] = unquote(uri[eq + 1 : pos]) + pos = eq + i -= 1 + continue + + # Earliest valid start: the var cannot extend left past any + # stop-char, so scan backward to find that boundary. + earliest = pos + while earliest > 0 and uri[earliest - 1] not in stops: + earliest -= 1 + + if prev is None: + start = earliest + else: + # prev is a _Lit: the parser rejects two adjacent captures, + # so the only possible neighbour kind is a literal. + assert isinstance(prev, _Lit) + if anchored and i - 1 == 0: + # First atom of the whole template: positionally fixed at + # 0, not rightmost occurrence. rfind would land inside the + # value when the literal repeats there (e.g. "prefix-{id}" + # against "prefix-prefix-123"). + start = len(prev.text) + if start < earliest or start > pos: + return None + else: + # Rightmost occurrence of the preceding literal whose end + # falls within the var's valid range. + idx = uri.rfind(prev.text, 0, pos) + if idx == -1 or idx + len(prev.text) < earliest: + return None + start = idx + len(prev.text) + + result[var.name] = unquote(uri[start:pos]) + pos = start + i -= 1 + return result, pos + + +def _scan_prefix( + atoms: Sequence[_Atom], uri: str, start: int, limit: int +) -> tuple[dict[str, str | list[str]], int] | None: + """Scan atoms left-to-right from ``start``, not exceeding ``limit``. + + Each bounded variable takes the minimum span that lets its + following literal match (found via ``find``), leaving the + greedy variable as much of the URI as possible. + """ + result: dict[str, str | list[str]] = {} + pos = start + for i, atom in enumerate(atoms): + if isinstance(atom, _Lit): + end = pos + len(atom.text) + if end > limit or uri[pos:end] != atom.text: + return None + pos = end + continue + + var = atom.var + stops = _STOP_CHARS[var.operator] + # Every capture here is followed by a literal: the parser rejects + # two adjacent captures, and a capture at the END of the prefix + # would be adjacent to the greedy variable. + nxt = atoms[i + 1] + assert isinstance(nxt, _Lit) + + if atom.ifemp: + # RFC §3.2.7 ifemp: ;name=val for non-empty, bare ;name for + # empty. Decide which form is present without falling through + # to the stop-char scan when the value is empty. + if uri.startswith(nxt.text, pos): + # Following literal begins immediately: value is empty. + # Checked before '=' so a literal that itself starts + # with '=' is not mistaken for the ifemp separator. + result[var.name] = "" + continue + if pos < limit and uri[pos] == "=": + pos += 1 # value follows; fall through to the scan + else: + # The following literal does not start here and there is + # no '=': the URI's name continued past the template's + # (e.g. ;keys vs ;key) — no parse. + return None + + # Latest valid end: the var stops at the first stop-char or + # the scan limit, whichever comes first. + latest = pos + while latest < limit and uri[latest] not in stops: + latest += 1 + + # First occurrence of the following literal: the capture takes + # the minimum span, leaving the greedy variable as much of the + # URI as possible. The search window's upper bound already + # forces any hit to start at or before ``latest``, so the var + # never extends past a stop-char. + end = uri.find(nxt.text, pos, latest + len(nxt.text)) + if end == -1: + return None + + result[var.name] = unquote(uri[pos:end]) + pos = end + return result, pos diff --git a/tests/docs_src/test_uri_templates.py b/tests/docs_src/test_uri_templates.py new file mode 100644 index 0000000000..b90e099c19 --- /dev/null +++ b/tests/docs_src/test_uri_templates.py @@ -0,0 +1,214 @@ +"""`docs/advanced/uri-templates.md`: every claim the page makes, proved against the real SDK.""" + +from pathlib import Path + +import pytest +from inline_snapshot import snapshot +from mcp_types import INVALID_PARAMS, ErrorData, ResourceTemplate, TextResourceContents + +from docs_src.uri_templates import tutorial001, tutorial002, tutorial003, tutorial004, tutorial005 +from mcp import Client, MCPError +from mcp.server import MCPServer +from mcp.shared.path_security import PathEscapeError, contains_path_traversal, safe_join +from mcp.shared.uri_template import InvalidUriTemplate, UriTemplate + +# See test_index.py for why this is a per-module mark and not a conftest hook. +pytestmark = [pytest.mark.anyio, pytest.mark.filterwarnings("error::mcp.MCPDeprecationWarning")] + + +async def test_simple_expansion_maps_the_segment_to_the_argument() -> None: + """tutorial001: `books://{isbn}` reads `books://978-...` and the matched string is the argument.""" + async with Client(tutorial001.mcp) as client: + (content,) = (await client.read_resource("books://978-0441172719")).contents + assert isinstance(content, TextResourceContents) + assert content.text == snapshot('{\n "title": "Dune",\n "author": "Frank Herbert"\n}') + + +async def test_an_int_parameter_is_converted_from_the_uri_string() -> None: + """tutorial001: `order_id: int` receives `12345`, not `"12345"`, so `order_id + 1` is `12346`.""" + async with Client(tutorial001.mcp) as client: + (content,) = (await client.read_resource("orders://12345")).contents + assert isinstance(content, TextResourceContents) + assert content.text == snapshot('{\n "order_id": 12345,\n "next_order": 12346,\n "status": "shipped"\n}') + + +async def test_plus_keeps_the_slashes_in_the_captured_value() -> None: + """tutorial001: `{+path}` matches `printing/setup.md` as one value; a plain `{path}` would not.""" + async with Client(tutorial001.mcp) as client: + (content,) = (await client.read_resource("manuals://printing/setup.md")).contents + assert isinstance(content, TextResourceContents) + assert content.text == "# Printer setup\n\nLoad paper, then power on." + + +async def test_omitted_query_params_fall_through_to_function_defaults() -> None: + """tutorial001: `{?limit,sort}` is lenient. No query string means `limit=10, sort="newest"`.""" + async with Client(tutorial001.mcp) as client: + (content,) = (await client.read_resource("reviews://978-0441172719")).contents + assert isinstance(content, TextResourceContents) + assert content.text == "10 newest reviews of Dune" + + +async def test_a_query_param_overrides_only_the_default_it_names() -> None: + """tutorial001: `?sort=top` sets `sort` and leaves `limit` at its default.""" + async with Client(tutorial001.mcp) as client: + (content,) = (await client.read_resource("reviews://978-0441172719?sort=top")).contents + assert isinstance(content, TextResourceContents) + assert content.text == "10 top reviews of Dune" + + +async def test_exploded_path_arrives_as_a_list_of_segments() -> None: + """tutorial001: `{/path*}` splits `/fiction/sci-fi` into `["fiction", "sci-fi"]`.""" + async with Client(tutorial001.mcp) as client: + (content,) = (await client.read_resource("shelves://browse/fiction/sci-fi")).contents + assert isinstance(content, TextResourceContents) + assert content.text == "catalog > fiction > sci-fi" + + +def test_two_adjacent_variables_are_rejected_at_parse_time() -> None: + """'What the parser rejects': nothing separates `path` from `ext`, so the template is refused.""" + with pytest.raises(InvalidUriTemplate) as exc_info: + UriTemplate.parse("manuals://{+path}{ext}") + assert str(exc_info.value) == snapshot( + "Variables 'path' and 'ext' are adjacent with no literal separator; matching cannot " + "determine where one ends and the other begins. Add a literal between them or use a single variable." + ) + + +def test_a_self_delimiting_operator_supplies_the_separator() -> None: + """'What the parser rejects': `{.ext}` contributes the `.` itself, so `{+path}{.ext}` is accepted.""" + template = UriTemplate.parse("manuals://{+path}{.ext}") + assert template.match("manuals://printing/setup.md") == {"path": "printing/setup", "ext": "md"} + + +def test_a_second_multi_segment_variable_is_rejected_at_parse_time() -> None: + """'What the parser rejects': two `{+...}` are ambiguous about which one absorbs an extra segment.""" + with pytest.raises(InvalidUriTemplate) as exc_info: + UriTemplate.parse("copy://{+source}/to/{+destination}") + assert str(exc_info.value) == snapshot( + "Template contains more than one multi-segment variable ({+var}, {#var}, or explode modifier); " + "matching would be ambiguous" + ) + + +def test_a_query_parameter_without_a_python_default_is_rejected_at_decoration_time() -> None: + """'What the parser rejects': a client may omit `{?limit}`, so the bound parameter must declare a default.""" + strict = MCPServer("Bookshop") + with pytest.raises(ValueError) as exc_info: + + @strict.resource("reviews://{isbn}{?limit}") + def list_reviews(isbn: str, limit: int) -> None: + """Reviews of a book.""" + + assert str(exc_info.value) == snapshot( + "Resource 'reviews://{isbn}{?limit}': query parameter(s) ['limit'] have no default value. " + "A client may omit a {?...}/{&...} query parameter, so the matching handler parameter " + "must declare a default." + ) + + +async def test_traversal_is_rejected_before_the_handler_runs() -> None: + """The `!!! check`: `../` triggers `-32602` "Unknown resource" and `read_manual` is never called.""" + async with Client(tutorial001.mcp) as client: + with pytest.raises(MCPError) as exc_info: + await client.read_resource("manuals://../etc/passwd") + assert exc_info.value.error == snapshot( + ErrorData( + code=INVALID_PARAMS, + message="Unknown resource: manuals://../etc/passwd", + data={"uri": "manuals://../etc/passwd"}, + ) + ) + + +def test_dotdot_is_a_component_check_not_a_substring_scan() -> None: + """The page's prose: `v1.0..v2.0` passes because `..` is not a standalone path segment.""" + assert contains_path_traversal("../etc") is True + assert contains_path_traversal("v1.0..v2.0") is False + + +async def test_safe_join_serves_a_file_inside_the_base_directory( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + """tutorial002: `safe_join(DOCS_ROOT, path).read_text()` returns the file under the base.""" + (tmp_path / "printing").mkdir() + (tmp_path / "printing" / "setup.md").write_text("# Printer setup") + monkeypatch.setattr(tutorial002, "DOCS_ROOT", tmp_path) + async with Client(tutorial002.mcp) as client: + (content,) = (await client.read_resource("manuals://printing/setup.md")).contents + assert isinstance(content, TextResourceContents) + assert content.text == "# Printer setup" + + +def test_safe_join_raises_when_the_resolved_path_escapes_the_base(tmp_path: Path) -> None: + """tutorial002: a path that climbs out of `DOCS_ROOT` raises `PathEscapeError`.""" + with pytest.raises(PathEscapeError): + safe_join(tmp_path, "../etc/passwd") + + +async def test_exempt_params_lets_an_absolute_path_through() -> None: + """tutorial003: `exempt_params={"source"}` skips the checks for that one parameter.""" + async with Client(tutorial003.mcp) as client: + (content,) = (await client.read_resource("imports://preview//srv/incoming/catalog.csv")).contents + assert isinstance(content, TextResourceContents) + assert content.text == "Would import from /srv/incoming/catalog.csv" + + +async def test_server_wide_resource_security_relaxes_every_resource() -> None: + """tutorial003: `resource_security=ResourceSecurity(reject_path_traversal=False)` exempts the whole server.""" + async with Client(tutorial003.relaxed) as client: + (content,) = (await client.read_resource("imports://preview/../sibling/catalog.csv")).contents + assert isinstance(content, TextResourceContents) + assert content.text == "Would import from ../sibling/catalog.csv" + + +async def test_lowlevel_static_dispatch_lists_and_reads_by_exact_uri() -> None: + """tutorial004: the registry is the listing, and a known URI returns its text.""" + async with Client(tutorial004.server) as client: + listed = (await client.list_resources()).resources + assert [r.uri for r in listed] == ["config://shop", "status://health"] + (content,) = (await client.read_resource("status://health")).contents + assert content == TextResourceContents(uri="status://health", text="ok") + + +async def test_lowlevel_unknown_uri_raises() -> None: + """tutorial004: a URI outside the registry raises and surfaces as a protocol error.""" + async with Client(tutorial004.server) as client: + with pytest.raises(MCPError): + await client.read_resource("config://missing") + + +def test_uritemplate_match_returns_a_dict_or_none() -> None: + """tutorial005: `match()` extracts decoded variables, or `None` when the URI doesn't fit.""" + assert tutorial005.TEMPLATES["manuals"].match("manuals://printing/setup.md") == {"path": "printing/setup.md"} + assert tutorial005.TEMPLATES["books"].match("manuals://nope") is None + + +async def test_lowlevel_match_routes_the_request_to_the_right_template() -> None: + """tutorial005: two templates, one handler. Each concrete URI lands in its own branch.""" + async with Client(tutorial005.server) as client: + (manual,) = (await client.read_resource("manuals://printing/setup.md")).contents + assert manual == TextResourceContents(uri="manuals://printing/setup.md", text="# Printer setup") + (book,) = (await client.read_resource("books://978-0441172719")).contents + assert book == TextResourceContents(uri="books://978-0441172719", text="Dune by Frank Herbert") + + +async def test_lowlevel_handler_applies_the_safety_checks_itself() -> None: + """tutorial005: there is no default policy down here; `read_manual_safely` is the gate.""" + async with Client(tutorial005.server) as client: + with pytest.raises(MCPError): + await client.read_resource("manuals://../etc/passwd") + with pytest.raises(MCPError): + await client.read_resource("nothing://matches") + + +async def test_str_of_a_template_round_trips_to_the_original_string() -> None: + """tutorial005: `str(template)` is the source string, so the listing reuses the parsed templates.""" + assert str(tutorial005.TEMPLATES["manuals"]) == "manuals://{+path}" + async with Client(tutorial005.server) as client: + result = await client.list_resource_templates() + assert result.resource_templates == snapshot( + [ + ResourceTemplate(name="manuals", uri_template="manuals://{+path}"), + ResourceTemplate(name="books", uri_template="books://{isbn}"), + ] + ) diff --git a/tests/server/mcpserver/resources/test_resource_template.py b/tests/server/mcpserver/resources/test_resource_template.py index 565afe81a7..58c072ae32 100644 --- a/tests/server/mcpserver/resources/test_resource_template.py +++ b/tests/server/mcpserver/resources/test_resource_template.py @@ -9,6 +9,152 @@ from mcp.server.mcpserver import Context, MCPServer from mcp.server.mcpserver.exceptions import ResourceError from mcp.server.mcpserver.resources import FunctionResource, ResourceTemplate +from mcp.server.mcpserver.resources.templates import ( + DEFAULT_RESOURCE_SECURITY, + ResourceSecurity, + ResourceSecurityError, +) + + +def _make(uri_template: str, security: ResourceSecurity = DEFAULT_RESOURCE_SECURITY) -> ResourceTemplate: + def handler(**kwargs: Any) -> str: + raise NotImplementedError # these tests only exercise matches() + + return ResourceTemplate.from_function(fn=handler, uri_template=uri_template, security=security) + + +def test_matches_rfc6570_reserved_expansion(): + # {+path} allows / — the feature the old regex implementation couldn't support + t = _make("file://docs/{+path}") + assert t.matches("file://docs/src/main.py") == {"path": "src/main.py"} + + +def test_matches_rejects_encoded_slash_traversal(): + # %2F decodes to / in UriTemplate.match(), giving "../../etc/passwd". + # ResourceSecurity's traversal check then rejects the '..' components. + t = _make("file://docs/{name}") + with pytest.raises(ResourceSecurityError, match="'name'"): + t.matches("file://docs/..%2F..%2Fetc%2Fpasswd") + + +def test_matches_rejects_path_traversal_by_default(): + t = _make("file://docs/{name}") + with pytest.raises(ResourceSecurityError): + t.matches("file://docs/..") + + +def test_matches_rejects_path_traversal_in_reserved_var(): + # Even {+path} gets the traversal check — it's semantic, not structural + t = _make("file://docs/{+path}") + with pytest.raises(ResourceSecurityError): + t.matches("file://docs/../../etc/passwd") + + +def test_matches_rejects_absolute_path(): + t = _make("file://docs/{+path}") + with pytest.raises(ResourceSecurityError): + t.matches("file://docs//etc/passwd") + + +def test_matches_allows_dotdot_as_substring(): + # .. is only dangerous as a path component + t = _make("git://refs/{range}") + assert t.matches("git://refs/v1.0..v2.0") == {"range": "v1.0..v2.0"} + + +def test_matches_exempt_params_skip_security(): + policy = ResourceSecurity(exempt_params={"range"}) + t = _make("git://diff/{+range}", security=policy) + assert t.matches("git://diff/../foo") == {"range": "../foo"} + + +def test_matches_disabled_policy_allows_traversal(): + policy = ResourceSecurity(reject_path_traversal=False, reject_absolute_paths=False) + t = _make("file://docs/{name}", security=policy) + assert t.matches("file://docs/..") == {"name": ".."} + + +def test_matches_rejects_null_byte_by_default(): + # %00 decodes to \x00 which defeats string comparisons + # ("..\x00" != "..") and can truncate in C extensions. + t = _make("file://docs/{name}") + with pytest.raises(ResourceSecurityError): + t.matches("file://docs/key%00.txt") + # Null byte also defeats the traversal check's component comparison + with pytest.raises(ResourceSecurityError): + t.matches("file://docs/..%00%2Fsecret") + + +def test_matches_null_byte_check_can_be_disabled(): + policy = ResourceSecurity(reject_null_bytes=False) + t = _make("file://docs/{name}", security=policy) + assert t.matches("file://docs/key%00.txt") == {"name": "key\x00.txt"} + + +def test_security_rejection_does_not_fall_through_to_next_template(): + # A strict template's security rejection must halt iteration, not + # fall through to a later permissive template. Previously matches() + # returned None for both "no match" and "security failed", making + # registration order security-critical. + strict = _make("file://docs/{name}") + lax = _make( + "file://docs/{+path}", + security=ResourceSecurity(exempt_params={"path"}), + ) + uri = "file://docs/..%2Fsecrets" + # Strict matches structurally then fails security -> raises. + with pytest.raises(ResourceSecurityError) as exc: + strict.matches(uri) + assert exc.value.param == "name" + # If this raised, the resource manager never reaches the lax + # template. Verify the lax template WOULD have accepted it. + assert lax.matches(uri) == {"path": "../secrets"} + + +def test_matches_explode_checks_each_segment(): + t = _make("api{/parts*}") + assert t.matches("api/a/b/c") == {"parts": ["a", "b", "c"]} + # Any segment with traversal rejects the whole match + with pytest.raises(ResourceSecurityError): + t.matches("api/a/../c") + + +def test_matches_encoded_backslash_caught_by_traversal_check(): + # %5C decodes to '\\'. The traversal check normalizes '\\' to '/' + # and catches the '..' components. + t = _make("file://docs/{name}") + with pytest.raises(ResourceSecurityError): + t.matches("file://docs/..%5C..%5Csecret") + + +def test_matches_encoded_dots_caught_by_traversal_check(): + # %2E%2E decodes to '..' which the traversal check rejects. + t = _make("file://docs/{name}") + with pytest.raises(ResourceSecurityError): + t.matches("file://docs/%2E%2E") + + +def test_matches_mixed_encoded_and_literal_slash(): + # The literal '/' stops the simple-var regex, so the URI doesn't + # match the template at all. + t = _make("file://docs/{name}") + assert t.matches("file://docs/..%2F../etc") is None + + +def test_matches_encoded_slash_without_traversal_allowed(): + # %2F decoding to '/' is fine when there's no traversal involved. + # UriTemplate accepts it; ResourceSecurity only blocks '..' and + # absolute paths. Handlers that need single-segment should use + # safe_join or validate explicitly. + t = _make("file://docs/{name}") + assert t.matches("file://docs/sub%2Ffile.txt") == {"name": "sub/file.txt"} + + +def test_matches_escapes_template_literals(): + # Regression: old impl treated . as regex wildcard + t = _make("data://v1.0/{id}") + assert t.matches("data://v1.0/42") == {"id": "42"} + assert t.matches("data://v1X0/42") is None class TestResourceTemplate: diff --git a/tests/server/mcpserver/test_server.py b/tests/server/mcpserver/test_server.py index 2c58b8fdea..70855f44b2 100644 --- a/tests/server/mcpserver/test_server.py +++ b/tests/server/mcpserver/test_server.py @@ -43,13 +43,14 @@ from mcp.client import Client from mcp.server.context import ServerRequestContext -from mcp.server.mcpserver import Context, MCPServer +from mcp.server.mcpserver import Context, MCPServer, ResourceSecurity from mcp.server.mcpserver.exceptions import ResourceNotFoundError, ToolError from mcp.server.mcpserver.prompts.base import Message, UserMessage from mcp.server.mcpserver.resources import FileResource, FunctionResource from mcp.server.mcpserver.utilities.types import Audio, Image from mcp.server.transport_security import TransportSecuritySettings from mcp.shared.exceptions import MCPError +from mcp.shared.uri_template import InvalidUriTemplate pytestmark = pytest.mark.anyio @@ -862,7 +863,7 @@ async def test_resource_with_params(self): parameters don't match""" mcp = MCPServer() - with pytest.raises(ValueError, match="Mismatch between URI parameters"): + with pytest.raises(ValueError, match="has no URI template variables"): @mcp.resource("resource://data") def get_data_fn(param: str) -> str: # pragma: no cover @@ -1489,6 +1490,258 @@ def prompt_fn(name: str) -> str: ... # pragma: no branch await client.get_prompt("prompt_fn") +async def test_resource_decorator_rfc6570_reserved_expansion(): + # Regression: old regex-based param extraction couldn't see `path` + # in `{+path}` and failed with a confusing mismatch error. + mcp = MCPServer() + + @mcp.resource("file://docs/{+path}") + def read_doc(path: str) -> str: + raise NotImplementedError + + templates = await mcp.list_resource_templates() + assert [t.uri_template for t in templates] == ["file://docs/{+path}"] + + +async def test_resource_decorator_rejects_malformed_template(): + mcp = MCPServer() + with pytest.raises(InvalidUriTemplate, match="Unclosed expression"): + mcp.resource("file://{name") + + +async def test_resource_optional_query_params_use_function_defaults(): + """Omitted {?...} query params should fall through to the + handler's Python defaults. Partial and reordered params work.""" + mcp = MCPServer() + + @mcp.resource("logs://{service}{?since,level}") + def tail_logs(service: str, since: str = "1h", level: str = "info") -> str: + return f"{service}|{since}|{level}" + + async with Client(mcp) as client: + # No query → all defaults + r = await client.read_resource("logs://api") + assert isinstance(r.contents[0], TextResourceContents) + assert r.contents[0].text == "api|1h|info" + + # Partial query → one default + r = await client.read_resource("logs://api?since=15m") + assert isinstance(r.contents[0], TextResourceContents) + assert r.contents[0].text == "api|15m|info" + + # Reordered, both present + r = await client.read_resource("logs://api?level=error&since=5m") + assert isinstance(r.contents[0], TextResourceContents) + assert r.contents[0].text == "api|5m|error" + + # Extra param ignored + r = await client.read_resource("logs://api?since=2h&utm=x") + assert isinstance(r.contents[0], TextResourceContents) + assert r.contents[0].text == "api|2h|info" + + +async def test_resource_query_param_without_default_rejected_at_decoration(): + """A handler parameter bound to a {?...} query variable must have a + Python default: a client may omit a query parameter, so the handler has + to be callable without it. Omitting the default is an error when the + decorator runs, not on the first request that leaves the parameter out.""" + mcp = MCPServer() + + with pytest.raises(ValueError, match=r"logs://.*\['level'\].*must declare a default"): + + @mcp.resource("logs://{service}{?level}") + def tail_logs(service: str, level: str) -> str: + raise NotImplementedError + + +async def test_resource_path_param_without_default_accepted(): + """The default requirement applies only to query-bound parameters. + A path variable is always present in a matching URI, so its handler + parameter may be required.""" + mcp = MCPServer() + + @mcp.resource("logs://{service}{?level}") + def tail_logs(service: str, level: str = "info") -> str: + raise NotImplementedError + + templates = await mcp.list_resource_templates() + assert [t.uri_template for t in templates] == ["logs://{service}{?level}"] + + +async def test_resource_security_default_rejects_traversal(): + mcp = MCPServer() + + @mcp.resource("data://items/{name}") + def get_item(name: str) -> str: + return f"item:{name}" + + async with Client(mcp) as client: + # Safe value passes through to the handler + r = await client.read_resource("data://items/widget") + assert isinstance(r.contents[0], TextResourceContents) + assert r.contents[0].text == "item:widget" + + # ".." as a path component is rejected by default policy + with pytest.raises(MCPError, match="Unknown resource"): + await client.read_resource("data://items/..") + + +async def test_resource_template_non_match_is_unknown_resource(): + """A URI that doesn't satisfy a registered template — including one + shorter than the template's literal segments — must surface as the + standard -32602 Unknown resource, not an internal error.""" + mcp = MCPServer() + + @mcp.resource("api://{+path}/{id}") + def get(path: str, id: str) -> str: + return f"{path}|{id}" + + async with Client(mcp) as client: + with pytest.raises(MCPError) as exc_info: + await client.read_resource("api://foo") + assert exc_info.value.error.code == INVALID_PARAMS + assert exc_info.value.error.message == "Unknown resource: api://foo" + + # And a satisfying URI still routes to the handler. + r = await client.read_resource("api://a/b/c") + assert isinstance(r.contents[0], TextResourceContents) + assert r.contents[0].text == "a/b|c" + + +async def test_resource_security_rejection_indistinguishable_from_not_found(): + """A path-safety rejection must produce the same wire error as a + genuinely-absent resource: same code, same message shape, no hint + about which check failed.""" + mcp = MCPServer() + + @mcp.resource("data://items/{name}") + def get_item(name: str) -> str: # pragma: no cover - never reached + return name + + async with Client(mcp) as client: + with pytest.raises(MCPError) as rejected: + await client.read_resource("data://items/..") + with pytest.raises(MCPError) as absent: + await client.read_resource("nosuch://thing") + + assert rejected.value.error.code == absent.value.error.code == INVALID_PARAMS + # Message echoes the requested URI and nothing else; no + # reference to which validation step rejected it. + assert rejected.value.error.message == "Unknown resource: data://items/.." + assert absent.value.error.message == "Unknown resource: nosuch://thing" + assert rejected.value.error.data == {"uri": "data://items/.."} + assert absent.value.error.data == {"uri": "nosuch://thing"} + + +async def test_resource_security_per_resource_override(): + mcp = MCPServer() + + @mcp.resource( + "git://diff/{+range}", + security=ResourceSecurity(exempt_params={"range"}), + ) + def git_diff(range: str) -> str: + return f"diff:{range}" + + async with Client(mcp) as client: + # "../foo" would be rejected by default, but "range" is exempt + result = await client.read_resource("git://diff/../foo") + assert isinstance(result.contents[0], TextResourceContents) + assert result.contents[0].text == "diff:../foo" + + +async def test_resource_security_server_wide_override(): + mcp = MCPServer(resource_security=ResourceSecurity(reject_path_traversal=False)) + + @mcp.resource("data://items/{name}") + def get_item(name: str) -> str: + return f"item:{name}" + + async with Client(mcp) as client: + # Server-wide policy disabled traversal check; ".." now allowed + result = await client.read_resource("data://items/..") + assert isinstance(result.contents[0], TextResourceContents) + assert result.contents[0].text == "item:.." + + +async def test_resource_security_namespaced_identifier_requires_exempt(): + """Single-letter-colon values like ``x:y`` are flagged by the + default absolute-path check (they parse as Windows drive-relative, + which discards the join base). A non-filesystem parameter that + legitimately accepts such values opts out via ``exempt_params``.""" + mcp = MCPServer() + + @mcp.resource("data://items/{id}") + def get_item(id: str) -> str: # pragma: no cover - rejected before call + return f"item:{id}" + + async with Client(mcp) as client: + with pytest.raises(MCPError, match="Unknown resource") as exc: + await client.read_resource("data://items/x:y") + assert exc.value.error.code == INVALID_PARAMS + + # Exempting the parameter lets the value through. + mcp = MCPServer() + + @mcp.resource("data://items/{id}", security=ResourceSecurity(exempt_params={"id"})) + def get_item_exempt(id: str) -> str: + return f"item:{id}" + + async with Client(mcp) as client: + r = await client.read_resource("data://items/x:y") + assert isinstance(r.contents[0], TextResourceContents) + assert r.contents[0].text == "item:x:y" + + +async def test_resource_security_rejection_halts_template_iteration(): + """A strict template's security rejection must surface as + not-found and stop; a later permissive template must not be + reached.""" + mcp = MCPServer() + + @mcp.resource("file://docs/{name}") + def strict(name: str) -> str: # pragma: no cover - never reached + return name + + @mcp.resource( + "file://docs/{+path}", + security=ResourceSecurity(exempt_params={"path"}), + ) + def lax(path: str) -> str: # pragma: no cover - must not be reached + raise AssertionError("permissive template reached after security rejection") + + async with Client(mcp) as client: + with pytest.raises(MCPError) as exc: + await client.read_resource("file://docs/..%2Fsecrets") + assert exc.value.error.code == INVALID_PARAMS + assert "Unknown resource" in exc.value.error.message + + +async def test_static_resource_with_context_param_errors(): + """A non-template URI with a Context-only handler should error + at decoration time with a clear message, not silently register + an unreachable resource.""" + mcp = MCPServer() + + with pytest.raises(ValueError, match="Context injection for static resources is not supported"): + + @mcp.resource("weather://current") + def current_weather(ctx: Context) -> str: + raise NotImplementedError + + +async def test_static_resource_with_extra_params_errors(): + """A non-template URI with non-Context params should error at + decoration time.""" + mcp = MCPServer() + + with pytest.raises(ValueError, match="has no URI template variables"): + + @mcp.resource("data://fixed") + def get_data(name: str) -> str: + raise NotImplementedError + + async def test_completion_decorator() -> None: """Test that the completion decorator registers a working handler.""" mcp = MCPServer() diff --git a/tests/shared/test_path_security.py b/tests/shared/test_path_security.py new file mode 100644 index 0000000000..46bf111a5e --- /dev/null +++ b/tests/shared/test_path_security.py @@ -0,0 +1,159 @@ +"""Tests for filesystem path safety primitives.""" + +from pathlib import Path + +import pytest + +from mcp.shared.path_security import ( + PathEscapeError, + contains_path_traversal, + is_absolute_path, + safe_join, +) + + +@pytest.mark.parametrize( + ("value", "expected"), + [ + # Safe: no traversal + ("a/b/c", False), + ("readme.txt", False), + ("", False), + (".", False), + ("./a/b", False), + # Safe: .. balanced by prior descent + ("a/../b", False), + ("a/b/../c", False), + ("a/b/../../c", False), + # Unsafe: net escape + ("..", True), + ("../etc", True), + ("../../etc/passwd", True), + ("a/../../b", True), + ("./../../etc", True), + # .. as substring, not component — safe + ("1.0..2.0", False), + ("foo..bar", False), + ("..foo", False), + ("foo..", False), + # Backslash separator + ("..\\etc", True), + ("a\\..\\..\\b", True), + ("a\\b\\c", False), + # Mixed separators + ("a/..\\..\\b", True), + ], +) +def test_contains_path_traversal(value: str, expected: bool): + assert contains_path_traversal(value) is expected + + +@pytest.mark.parametrize( + ("value", "expected"), + [ + # Relative + ("relative/path", False), + ("file.txt", False), + ("", False), + (".", False), + ("..", False), + # POSIX absolute + ("/", True), + ("/etc/passwd", True), + ("/a", True), + # Windows drive + ("C:", True), + ("C:\\Windows", True), + ("c:/foo", True), + ("Z:\\", True), + # Windows UNC / backslash-absolute + ("\\\\server\\share", True), + ("\\foo", True), + # Windows drive-relative — discards the join base when drives differ + ("C:relative", True), + ("x:y", True), + ("a:debug", True), + # Not a drive: digit before colon + ("1:foo", False), + # Colon not in position 1 + ("ab:c", False), + # Non-ASCII letter is not a drive letter + ("Ω:namespace", False), + ("é:foo", False), + ], +) +def test_is_absolute_path(value: str, expected: bool): + assert is_absolute_path(value) is expected + + +def test_safe_join_simple(tmp_path: Path): + result = safe_join(tmp_path, "docs", "readme.txt") + assert result == tmp_path / "docs" / "readme.txt" + + +def test_safe_join_resolves_relative_base(tmp_path: Path, monkeypatch: pytest.MonkeyPatch): + monkeypatch.chdir(tmp_path) + result = safe_join(".", "file.txt") + assert result == tmp_path / "file.txt" + + +def test_safe_join_rejects_dotdot_escape(tmp_path: Path): + with pytest.raises(PathEscapeError, match="escapes base"): + safe_join(tmp_path, "../../../etc/passwd") + + +def test_safe_join_rejects_balanced_then_escape(tmp_path: Path): + with pytest.raises(PathEscapeError, match="escapes base"): + safe_join(tmp_path, "a/../../etc") + + +def test_safe_join_allows_balanced_dotdot(tmp_path: Path): + result = safe_join(tmp_path, "a/../b") + assert result == tmp_path / "b" + + +def test_safe_join_rejects_absolute_part(tmp_path: Path): + with pytest.raises(PathEscapeError, match="is absolute"): + safe_join(tmp_path, "/etc/passwd") + + +def test_safe_join_rejects_absolute_in_later_part(tmp_path: Path): + with pytest.raises(PathEscapeError, match="is absolute"): + safe_join(tmp_path, "docs", "/etc/passwd") + + +def test_safe_join_rejects_windows_drive(tmp_path: Path): + with pytest.raises(PathEscapeError, match="is absolute"): + safe_join(tmp_path, "C:\\Windows\\System32") + + +def test_safe_join_rejects_null_byte(tmp_path: Path): + with pytest.raises(PathEscapeError, match="null byte"): + safe_join(tmp_path, "file\0.txt") + + +def test_safe_join_rejects_null_byte_in_later_part(tmp_path: Path): + with pytest.raises(PathEscapeError, match="null byte"): + safe_join(tmp_path, "docs", "file\0.txt") + + +def test_safe_join_rejects_symlink_escape(tmp_path: Path): + outside = tmp_path / "outside" + outside.mkdir() + sandbox = tmp_path / "sandbox" + sandbox.mkdir() + (sandbox / "escape").symlink_to(outside) + + with pytest.raises(PathEscapeError, match="escapes base"): + safe_join(sandbox, "escape", "secret.txt") + + +def test_safe_join_base_equals_target(tmp_path: Path): + # Joining nothing (or ".") should return the base itself + assert safe_join(tmp_path) == tmp_path + assert safe_join(tmp_path, ".") == tmp_path + + +def test_path_escape_error_is_value_error(): + with pytest.raises(ValueError): + safe_join("/tmp", "/etc") diff --git a/tests/shared/test_uri_template.py b/tests/shared/test_uri_template.py new file mode 100644 index 0000000000..48f6c66237 --- /dev/null +++ b/tests/shared/test_uri_template.py @@ -0,0 +1,1001 @@ +"""Tests for RFC 6570 URI template parsing, expansion, and matching.""" + +import dataclasses +import random +import string + +import pytest + +from mcp.shared.uri_template import DEFAULT_MAX_URI_LENGTH, InvalidUriTemplate, UriTemplate, Variable + + +def test_parse_literal_only(): + tmpl = UriTemplate.parse("file://docs/readme.txt") + assert tmpl.variables == [] + assert tmpl.variable_names == [] + assert str(tmpl) == "file://docs/readme.txt" + + +@pytest.mark.parametrize( + ("value", "expected"), + [ + ("file://docs/{name}", True), + ("file://docs/readme.txt", False), + ("", False), + ("{a}", True), + ("{", False), + ("}", False), + ("}{", False), + ("prefix{+path}/suffix", True), + ("{invalid syntax but still a template}", True), + ], +) +def test_is_template(value: str, expected: bool): + assert UriTemplate.is_template(value) is expected + + +def test_parse_simple_variable(): + tmpl = UriTemplate.parse("file://docs/{name}") + assert tmpl.variables == [Variable(name="name", operator="")] + assert tmpl.variable_names == ["name"] + + +@pytest.mark.parametrize( + ("template", "operator"), + [ + ("{+path}", "+"), + ("{#frag}", "#"), + ("{.ext}", "."), + ("{/seg}", "/"), + ("{;param}", ";"), + ("{?q}", "?"), + ("{&next}", "&"), + ], +) +def test_parse_all_operators(template: str, operator: str): + tmpl = UriTemplate.parse(template) + (var,) = tmpl.variables + assert var.operator == operator + assert var.explode is False + + +def test_parse_multiple_variables_in_expression(): + tmpl = UriTemplate.parse("{?q,lang,page}") + assert tmpl.variable_names == ["q", "lang", "page"] + assert all(v.operator == "?" for v in tmpl.variables) + + +def test_parse_multiple_expressions(): + tmpl = UriTemplate.parse("db://{table}/{id}{?format}") + assert tmpl.variable_names == ["table", "id", "format"] + ops = [v.operator for v in tmpl.variables] + assert ops == ["", "", "?"] + + +@pytest.mark.parametrize( + ("template", "expected"), + [ + ("logs://{service}{?a,b}", frozenset({"a", "b"})), + ("logs://{service}{?a,b}{&c}", frozenset({"a", "b", "c"})), + ("logs://{service}", frozenset[str]()), + # A lone {&...} never emits the leading ? that lenient query + # matching splits on, so it is matched strictly: c must be + # present in the URI and is not an optional query variable. + ("logs://{service}{&c}", frozenset[str]()), + ], +) +def test_query_variable_names(template: str, expected: frozenset[str]): + """query_variable_names is exactly the set match() treats as optional: + the trailing {?...}/{&...} variables a client may omit from the URI.""" + assert UriTemplate.parse(template).query_variable_names == expected + + +def test_parse_explode_modifier(): + tmpl = UriTemplate.parse("/files{/path*}") + (var,) = tmpl.variables + assert var.name == "path" + assert var.operator == "/" + assert var.explode is True + + +@pytest.mark.parametrize("template", ["{.labels*}", "{;params*}"]) +def test_parse_explode_supported_operators(template: str): + tmpl = UriTemplate.parse(template) + assert tmpl.variables[0].explode is True + + +def test_parse_mixed_explode_and_plain(): + tmpl = UriTemplate.parse("{/path*}{?q}") + assert tmpl.variables == [ + Variable(name="path", operator="/", explode=True), + Variable(name="q", operator="?"), + ] + + +def test_parse_varname_with_dots_and_underscores(): + tmpl = UriTemplate.parse("{foo_bar.baz}") + assert tmpl.variable_names == ["foo_bar.baz"] + + +def test_parse_rejects_unclosed_expression(): + with pytest.raises(InvalidUriTemplate, match="Unclosed expression") as exc: + UriTemplate.parse("file://{name") + assert exc.value.position == 7 + assert exc.value.template == "file://{name" + + +def test_parse_rejects_empty_expression(): + with pytest.raises(InvalidUriTemplate, match="Empty expression"): + UriTemplate.parse("file://{}") + + +def test_parse_rejects_operator_without_variable(): + with pytest.raises(InvalidUriTemplate, match="operator but no variables"): + UriTemplate.parse("{+}") + + +@pytest.mark.parametrize( + "name", + [ + "-bad", + "bad-name", + "bad name", + "bad/name", + # RFC §2.3: dots only between varchars, not consecutive or trailing + "foo..bar", + "foo.", + ], +) +def test_parse_rejects_invalid_varname(name: str): + with pytest.raises(InvalidUriTemplate, match="Invalid variable name"): + UriTemplate.parse(f"{{{name}}}") + + +def test_parse_accepts_dotted_varname(): + t = UriTemplate.parse("{a.b.c}") + assert t.variable_names == ["a.b.c"] + + +def test_parse_rejects_empty_spec_in_list(): + with pytest.raises(InvalidUriTemplate, match="Invalid variable name"): + UriTemplate.parse("{a,,b}") + + +def test_parse_rejects_prefix_modifier(): + with pytest.raises(InvalidUriTemplate, match="Prefix modifier"): + UriTemplate.parse("{var:3}") + + +@pytest.mark.parametrize("template", ["{var*}", "{+var*}", "{#var*}", "{?var*}", "{&var*}"]) +def test_parse_rejects_unsupported_explode(template: str): + with pytest.raises(InvalidUriTemplate, match="Explode modifier"): + UriTemplate.parse(template) + + +@pytest.mark.parametrize( + "template", + [ + "{/a*}/x{/b*}", # two explode vars: a literal between them doesn't help + # Multi-var + expression: each var is greedy (',' separates them) + "{+a,b}", + # Two {+var}/{#var} anywhere + "{+a}/x/{+b}", + "{+a},{+b}", + "{#a}/x/{+b}", + "{+a}.foo.{#b}", + ], +) +def test_parse_rejects_multiple_multi_segment_variables(template: str): + # Two multi-segment variables make matching inherently ambiguous: + # there is no principled way to decide which one absorbs an extra + # segment. The linear scan can only partition the URI around a + # single greedy slot. (Two ADJACENT multi-segment variables are + # caught by the adjacency rule first; see the test below.) + with pytest.raises(InvalidUriTemplate, match="more than one multi-segment"): + UriTemplate.parse(template) + + +@pytest.mark.parametrize( + "template", + [ + # Two bounded variables + "{a}{b}", + "{.a}{b}", + "{/a}{b}", + "{;a}{b}", + "{a}{b}X{+p}", + "{+p}X{a}{b}", + "pre{a}{b}post", + # A bounded variable adjacent to the multi-segment variable + "{a}{+b}", + "{+a}{b}", + "{#a}{b}", + "{.a}{+b}", + "{/a}{+b}", + "x{name}{+path}y", + "X{+a}{b}", + "{+p}{n}", + "{x}Y{+p}{n}", + "{?a}{+b}x", + # ... on either side, with a literal on the OTHER side + "{a}-{+p}{b}", + "{a}{+p}-{b}", + "{name}{+path}{.ext}", + "{base}{+p}{;k}", + # ... or on both sides + "{a}{+b}{c}", + "{a}{+p}{b}Y{c}", + "X{a}{+p}{b}Y{c}", + "{a}{/p*}{b}", + # An explode variable carries its operator's separators inside + # the capture, so it emits no lead literal that could anchor it + "{a}{/p*}", + "{/seg}{;k*}", + "item://{id}{;opts*}", + # ifemp: the ';key' literal anchors the LEFT edge of {;key}, but + # nothing separates its right edge from the multi-segment var + "api{;key}{+rest}", + # Two multi-segment variables that are ALSO adjacent + "{/a*}{/b*}", + "{/a*}{.b*}", + "{.a*}{;b*}", + "{/a*}{b}{.c*}", + "{+a}{/b*}", + ], +) +def test_parse_rejects_adjacent_variables(template: str) -> None: + # Two captures with no literal between them give the scan nothing to + # anchor the boundary on — whether or not one of them is the + # multi-segment variable. + with pytest.raises(InvalidUriTemplate, match="adjacent with no literal separator"): + UriTemplate.parse(template) + + +@pytest.mark.parametrize( + "template", + [ + "file://docs/{+path}", # + at end of template + "file://{+path}.txt", # + followed by literal only + "file://{+path}/edit", # + followed by literal only + "api/{+path}{?v,page}", # + followed by query tail (split off before scan) + "api/{+path}{&next}", # + followed by query-continuation + "page{#section}", # # at end + "{a}{#b}", # # emits a literal '#' that anchors the boundary + "{+a}/sep/{b}", # + with bounded vars after + "{+a},{b}", + # Operators that emit their own lead character ('.', '/', ';name') + # supply the literal anchor, so these are NOT adjacent variables. + "{+a}{/b}", + "{+a}{.b}", + "{+a}{;b}", + "{+path}{.ext}", + "prefix/{+path}{.ext}", + "tree://nodes{/path*}", + "api{;key}/{+rest}", + ], +) +def test_parse_allows_single_multi_segment_variable(template: str): + # One multi-segment variable is fine: the linear scan isolates it + # between the prefix and suffix boundaries, and the scan never + # backtracks so match time stays O(n) regardless of URI content. + t = UriTemplate.parse(template) + assert t is not None + + +@pytest.mark.parametrize( + "template", + ["{x}/{x}", "{x,x}", "{a}{b}{a}", "{+x}/foo/{x}"], +) +def test_parse_rejects_duplicate_variable_names(template: str): + with pytest.raises(InvalidUriTemplate, match="appears more than once"): + UriTemplate.parse(template) + + +@pytest.mark.parametrize( + "template", + ["/x{?a}{?b}", "/x{?a}/y{?b}", "{?a}{&b}{?c}"], +) +def test_parse_rejects_multiple_query_expressions(template: str) -> None: + with pytest.raises(InvalidUriTemplate, match=r"more than one \{\?"): + UriTemplate.parse(template) + + +def test_query_tail_roundtrip_correct_spellings() -> None: + for tmpl in ("/x{?a,b}", "/x{?a}{&b}"): + t = UriTemplate.parse(tmpl) + assert t.match(t.expand({"a": "1", "b": "2"})) == {"a": "1", "b": "2"} + + +def test_invalid_uri_template_is_value_error(): + with pytest.raises(ValueError): + UriTemplate.parse("{}") + + +@pytest.mark.parametrize( + "template", + [ + "{{name}}", # nested open: body becomes "{name" + "{a{b}c}", # brace inside expression + "{{]{}}{}", # garbage soup + "{a,{b}", # brace in comma list + ], +) +def test_parse_rejects_nested_braces(template: str): + # Nested/stray { inside an expression lands in the varname and + # fails the varname regex rather than needing special handling. + with pytest.raises(InvalidUriTemplate, match="Invalid variable name"): + UriTemplate.parse(template) + + +@pytest.mark.parametrize( + ("template", "position"), + [ + ("{", 0), + ("{{", 0), + ("file://{name", 7), + ("{a}{", 3), + ("}{", 1), # stray } is literal, then unclosed { + ], +) +def test_parse_rejects_unclosed_brace(template: str, position: int): + with pytest.raises(InvalidUriTemplate, match="Unclosed") as exc: + UriTemplate.parse(template) + assert exc.value.position == position + + +@pytest.mark.parametrize( + "template", + ["}}", "}", "a}b", "{a}}{b}"], +) +def test_parse_treats_stray_close_brace_as_literal(template: str): + # RFC 6570 §2.1 strictly excludes } from literals, but we accept it + # for TypeScript SDK parity. A stray } almost always indicates a + # typo; rejecting would be more helpful but would also break + # cross-SDK behavior. + tmpl = UriTemplate.parse(template) + assert str(tmpl) == template + + +def test_parse_stray_close_brace_between_expressions(): + tmpl = UriTemplate.parse("{a}}{b}") + assert tmpl.variable_names == ["a", "b"] + + +def test_parse_rejects_oversized_template(): + with pytest.raises(InvalidUriTemplate, match="maximum length"): + UriTemplate.parse("x" * 101, max_length=100) + + +def test_parse_rejects_too_many_variables(): + template = "".join(f"{{v{i}}}" for i in range(11)) + with pytest.raises(InvalidUriTemplate, match="maximum of 10 variables"): + UriTemplate.parse(template, max_variables=10) + + +def test_parse_counts_variables_not_expressions(): + # A single {v0,v1,...} expression packs many variables under one + # brace pair. Counting expressions would miss this. + template = "{" + ",".join(f"v{i}" for i in range(11)) + "}" + with pytest.raises(InvalidUriTemplate, match="maximum of 10 variables"): + UriTemplate.parse(template, max_variables=10) + + +def test_parse_custom_limits_allow_larger(): + template = "/".join(f"{{v{i}}}" for i in range(20)) + tmpl = UriTemplate.parse(template, max_variables=20) + assert len(tmpl.variables) == 20 + + +def test_equality_based_on_template_string(): + a = UriTemplate.parse("file://{name}") + b = UriTemplate.parse("file://{name}") + c = UriTemplate.parse("file://{other}") + assert a == b + assert a != c + assert hash(a) == hash(b) + + +def test_frozen(): + tmpl = UriTemplate.parse("{x}") + with pytest.raises(dataclasses.FrozenInstanceError): + tmpl.template = "changed" # type: ignore[misc] + + +@pytest.mark.parametrize( + ("template", "variables", "expected"), + [ + # Level 1: simple, encodes reserved chars + ("{var}", {"var": "value"}, "value"), + ("{var}", {"var": "hello world"}, "hello%20world"), + ("{var}", {"var": "a/b"}, "a%2Fb"), + ("file://docs/{name}", {"name": "readme.txt"}, "file://docs/readme.txt"), + # Level 2: reserved expansion keeps / ? # etc. + ("{+var}", {"var": "a/b/c"}, "a/b/c"), + ("{+var}", {"var": "a?b#c"}, "a?b#c"), + # RFC §3.2.3: reserved expansion passes through existing + # pct-triplets unchanged; bare % is still encoded. + ("{+var}", {"var": "path%2Fto"}, "path%2Fto"), + ("{+var}", {"var": "50%"}, "50%25"), + ("{+var}", {"var": "50%2"}, "50%252"), + ("{+var}", {"var": "a%2Fb%20c"}, "a%2Fb%20c"), + ("{#var}", {"var": "a%2Fb"}, "#a%2Fb"), + # Simple expansion still encodes % unconditionally (triplet + # preservation is reserved-only). + ("{var}", {"var": "path%2Fto"}, "path%252Fto"), + ("file://docs/{+path}", {"path": "src/main.py"}, "file://docs/src/main.py"), + # Level 2: fragment + ("{#var}", {"var": "section"}, "#section"), + ("{#var}", {"var": "a/b"}, "#a/b"), + # Level 3: label + ("file{.ext}", {"ext": "txt"}, "file.txt"), + # Level 3: path segment + ("{/seg}", {"seg": "docs"}, "/docs"), + # Level 3: path-style param + ("{;id}", {"id": "42"}, ";id=42"), + ("{;id}", {"id": ""}, ";id"), + # Level 3: query + ("{?q}", {"q": "search"}, "?q=search"), + ("{?q}", {"q": ""}, "?q="), + ("/search{?q,lang}", {"q": "mcp", "lang": "en"}, "/search?q=mcp&lang=en"), + # Level 3: query continuation + ("?a=1{&b}", {"b": "2"}, "?a=1&b=2"), + # Multi-var in one expression + ("{x,y}", {"x": "1", "y": "2"}, "1,2"), + # {+x,y} is rejected at parse time: each var in a + expression + # is multi-segment, and a template may only have one. + # Sequence values, non-explode (comma-join) + ("{/list}", {"list": ["a", "b", "c"]}, "/a,b,c"), + ("{?list}", {"list": ["a", "b"]}, "?list=a,b"), + # Explode: each item gets separator + ("{/path*}", {"path": ["a", "b", "c"]}, "/a/b/c"), + ("{.labels*}", {"labels": ["x", "y"]}, ".x.y"), + ("{;keys*}", {"keys": ["a", "b"]}, ";keys=a;keys=b"), + # RFC §3.2.7 ifemp: ; omits = for empty explode items + ("{;keys*}", {"keys": ["a", "", "b"]}, ";keys=a;keys;keys=b"), + # RFC §3.2.7 ifemp: ; omits = for empty, including non-explode list [""] + ("{;name}", {"name": [""]}, ";name"), + ("{;name}", {"name": ["", ""]}, ";name=,"), + ("{?name}", {"name": [""]}, "?name="), + ("{&name}", {"name": [""]}, "&name="), + ("{;name}", {"name": ""}, ";name"), + # Undefined variables omitted + ("{?q,page}", {"q": "x"}, "?q=x"), + ("{a,b}", {"a": "x"}, "x"), + ("{?page}", {}, ""), + # Empty sequence omitted + ("{/path*}", {"path": []}, ""), + # Literal-only template + ("file://static", {}, "file://static"), + ], +) +def test_expand(template: str, variables: dict[str, str | list[str]], expected: str): + assert UriTemplate.parse(template).expand(variables) == expected + + +def test_expand_encodes_special_chars_in_simple(): + t = UriTemplate.parse("{v}") + assert t.expand({"v": "a&b=c"}) == "a%26b%3Dc" + + +def test_expand_preserves_special_chars_in_reserved(): + t = UriTemplate.parse("{+v}") + assert t.expand({"v": "a&b=c"}) == "a&b=c" + + +@pytest.mark.parametrize( + "value", + [42, None, 3.14, {"a": "b"}, ["ok", 42], b"bytes"], +) +def test_expand_rejects_invalid_value_types(value: object): + t = UriTemplate.parse("{v}") + with pytest.raises(TypeError, match="must be str or a sequence of str"): + t.expand({"v": value}) # type: ignore[dict-item] + + +@pytest.mark.parametrize( + ("template", "uri", "expected"), + [ + # Level 1: simple + ("{var}", "hello", {"var": "hello"}), + ("file://docs/{name}", "file://docs/readme.txt", {"name": "readme.txt"}), + ("{a}/{b}", "foo/bar", {"a": "foo", "b": "bar"}), + # Level 2: reserved allows / + ("file://docs/{+path}", "file://docs/src/main.py", {"path": "src/main.py"}), + ("{+var}", "a/b/c", {"var": "a/b/c"}), + # Level 2: fragment + ("page{#section}", "page#intro", {"section": "intro"}), + # A multi-segment var next to an operator that emits its own + # lead character: the lead ('.', '/', '#') is a literal anchor, + # so these are NOT two adjacent variables. + ("{+path}{/name}", "a/b/c/readme", {"path": "a/b/c", "name": "readme"}), + ("{+path}{.ext}", "src/main.py", {"path": "src/main", "ext": "py"}), + ("prefix/{+path}{.ext}", "prefix/a/b.txt", {"path": "a/b", "ext": "txt"}), + ("{#section}{/page}", "#intro/1", {"section": "intro", "page": "1"}), + # Bounded vars before the multi-segment var match lazily (first + # anchor); those after match greedily (last anchor). + ("{owner}@{+path}", "alice@src/main", {"owner": "alice", "path": "src/main"}), + ("{+path}@{name}", "src@main@v1", {"path": "src@main", "name": "v1"}), + # Level 3: label + ("file{.ext}", "file.txt", {"ext": "txt"}), + # Level 3: path segment + ("api{/version}", "api/v1", {"version": "v1"}), + # Level 3: path-style param + ("item{;id}", "item;id=42", {"id": "42"}), + ("item{;id}", "item;id", {"id": ""}), + # Explode: ; emits name=value per item, match strips the prefix + ("item{;keys*}", "item;keys=a;keys=b", {"keys": ["a", "b"]}), + ("item{;keys*}", "item;keys=a;keys;keys=b", {"keys": ["a", "", "b"]}), + ("item{;keys*}", "item", {"keys": []}), + # Level 3: query. Lenient matching: partial, reordered, and + # extra params are all accepted. Absent params stay absent. + ("search{?q}", "search?q=hello", {"q": "hello"}), + ("search{?q}", "search?q=", {"q": ""}), + ("search{?q}", "search", {}), + ("search{?q,lang}", "search?q=mcp&lang=en", {"q": "mcp", "lang": "en"}), + ("search{?q,lang}", "search?lang=en&q=mcp", {"q": "mcp", "lang": "en"}), + ("search{?q,lang}", "search?q=mcp", {"q": "mcp"}), + ("search{?q,lang}", "search", {}), + ("search{?q}", "search?q=mcp&utm=x&ref=y", {"q": "mcp"}), + # URL-encoded query values are decoded + ("search{?q}", "search?q=hello%20world", {"q": "hello world"}), + # + is a literal sub-delim per RFC 3986, not a space (form-encoding) + ("search{?q}", "search?q=C++", {"q": "C++"}), + ("search{?q}", "search?q=1.0+build.5", {"q": "1.0+build.5"}), + # Fragment is stripped before query parsing + ("logs://{service}{?level}", "logs://api?level=error#section1", {"service": "api", "level": "error"}), + ("search{?q}", "search#frag", {}), + # Multiple ?/& expressions collected together + ("api{?v}{&page,limit}", "api?limit=10&v=2", {"v": "2", "limit": "10"}), + # Standalone {&var} falls through to the strict scan (expands + # with & prefix, no ? for lenient matching to split on) + ("api{&page}", "api&page=2", {"page": "2"}), + # Literal ? in path portion falls through to the strict scan + ("api?x{?page}", "api?x?page=2", {"page": "2"}), + # {#...} or literal # in path portion falls through: lenient + # matching would strip the fragment before the path scan sees it + ("page{#section}{?q}", "page#intro?q=x", {"section": "intro", "q": "x"}), + ("page#lit{?q}", "page#lit?q=x", {"q": "x"}), + # Empty & segments in query are skipped + ("search{?q}", "search?&q=hello&", {"q": "hello"}), + # Duplicate query keys keep first value + ("search{?q}", "search?q=first&q=second", {"q": "first"}), + # Percent-encoded parameter names are NOT decoded: RFC 6570 + # expansion never encodes names, so an encoded name cannot be + # a legitimate match. Prevents HTTP parameter pollution. + ("api://x{?token}", "api://x?%74oken=evil&token=real", {"token": "real"}), + ("api://x{?token}", "api://x?%74oken=evil", {}), + # Level 3: query continuation with literal ? falls back to + # the strict scan (template-order, all-present required) + ("?a=1{&b}", "?a=1&b=2", {"b": "2"}), + # Explode: path segments as list + ("/files{/path*}", "/files/a/b/c", {"path": ["a", "b", "c"]}), + ("/files{/path*}", "/files", {"path": []}), + ("/files{/path*}/edit", "/files/a/b/edit", {"path": ["a", "b"]}), + # Explode: labels + ("host{.labels*}", "host.example.com", {"labels": ["example", "com"]}), + # Repeated-slash literals preserved exactly + ("///{a}////{b}////", "///x////y////", {"a": "x", "b": "y"}), + ], +) +def test_match(template: str, uri: str, expected: dict[str, str | list[str]]): + assert UriTemplate.parse(template).match(uri) == expected + + +@pytest.mark.parametrize( + ("template", "uri"), + [ + ("file://docs/{name}", "file://other/readme.txt"), + ("{a}/{b}", "foo"), + ("file{.ext}", "file"), + ("static", "different"), + # Anchoring: trailing extra component must not match. Guards + # against a refactor from fullmatch() to match() or search(). + ("/users/{id}", "/users/123/extra"), + ("/users/{id}/posts/{pid}", "/users/1/posts/2/extra"), + # Repeated-slash literal with wrong slash count + ("///{a}////{b}////", "//x////y////"), + # ; name boundary: {;id} must not match a longer parameter name + ("item{;id}", "item;identity=john"), + ("item{;id}", "item;ident"), + # ; explode: wrong parameter name in any segment rejects the match + ("item{;keys*}", "item;admin=true"), + ("item{;keys*}", "item;keys=a;admin=true"), + # Lenient-query branch: path portion fails to match + ("api/{name}{?q}", "wrong/path?q=x"), + # Lenient-query branch: ; explode name mismatch in path portion + ("item{;keys*}{?q}", "item;wrong=x?q=1"), + ], +) +def test_match_no_match(template: str, uri: str): + assert UriTemplate.parse(template).match(uri) is None + + +def test_match_explode_preserves_empty_list_items(): + # Splitting the explode capture on its separator yields a leading + # empty item from the operator prefix; only that one is stripped. + # Subsequent empties are legitimate values from the input list. + t = UriTemplate.parse("{/path*}") + assert t.match("/a//c") == {"path": ["a", "", "c"]} + assert t.match("//a") == {"path": ["", "a"]} + assert t.match("/a/") == {"path": ["a", ""]} + + t = UriTemplate.parse("host{.labels*}") + assert t.match("host.a..c") == {"labels": ["a", "", "c"]} + + +def test_match_adjacent_vars_disambiguated_by_literal(): + # A literal between vars resolves the ambiguity. + t = UriTemplate.parse("{a}-{b}") + assert t.match("foo-bar") == {"a": "foo", "b": "bar"} + + +@pytest.mark.parametrize( + ("template", "variables"), + [ + # Leading literal appears inside the value: must anchor at + # position 0, not rfind to the rightmost occurrence. + ("prefix-{id}", {"id": "prefix-123"}), + ("u{s}", {"s": "xu"}), + ("_{x}", {"x": "_"}), + ("~{v}~", {"v": "~~~"}), + # Multi-occurrence with two vars: rfind correctly picks the + # rightmost literal BETWEEN vars, first literal anchors at 0. + ("L{a}L{b}", {"a": "xLy", "b": "z"}), + # Leading literal with stop-char: earliest bound still applies. + ("api/{name}", {"name": "api"}), + ], +) +def test_match_leading_literal_appears_in_value(template: str, variables: dict[str, str]): + # Regression: the R->L scan used rfind for the preceding literal, + # which lands inside the value when the template's leading literal + # is a substring of the expanded value. The first atom must anchor + # at position 0, not search. + t = UriTemplate.parse(template) + uri = t.expand(variables) + assert t.match(uri) == variables + + +@pytest.mark.parametrize( + ("template", "uri"), + [ + # Greedy var whose suffix literal is absent from the input. + ("{a}-{+b}x", "-" * 200), + # Chained anchors that all appear in input but suffix fails. + ("{a}L{b}L{c}L{d}M", "L" * 200), + ], +) +def test_match_no_backtracking_on_pathological_input(template: str, uri: str): + # These patterns caused O(n²) or worse backtracking under the regex + # matcher. The linear scan returns None without retrying splits. + # (Correctness check only; we benchmark separately to avoid flaky + # timing assertions in CI.) + assert UriTemplate.parse(template).match(uri) is None + + +@pytest.mark.parametrize( + ("template", "uri"), + [ + # Prefix literal mismatch before a greedy var + ("file://{+path}", "http://x"), + # Suffix literal absent: the suffix scan fails before the prefix runs + ("file://{+path}.txt", "file://x"), + # Prefix anchor not found: {a} needs '@' before greedy but none exists + ("{a}@{+path}", "no-at-sign-here"), + # Prefix literal doesn't fit within suffix boundary + ("foo{+a}oob", "fooob"), + # Greedy scalar contains its own stop-char ({+var} stops at ?) + ("api://{+path}", "api://foo?bar"), + # Explode span doesn't start with its separator + ("X{/path*}", "Xnoslash"), + # Explode body contains a non-separator stop-char + ("X{/path*}", "X/a?b"), + # ifemp name continuation: the literal after {;key} doesn't start + # at pos and there's no '=', so the URI's name kept going. + ("api{;key}suffix/{+p}", "api;keyZ/x"), + # Regression: suffix scan must not walk back into prefix territory. + # Input is shorter than prefix+suffix literals — these used to + # raise AssertionError instead of returning None. + ("api://{+path}/{id}", "api://foo"), + ("docs/{+path}/v/{name}", "docs/v/x"), + ], +) +def test_match_greedy_rejection_paths(template: str, uri: str): + assert UriTemplate.parse(template).match(uri) is None + + +@pytest.mark.parametrize( + ("template", "uri", "expected"), + [ + # ifemp before a literal that itself starts with '=': the literal + # check runs first so '=' is not mistaken for the ifemp separator. + ("api{;key}=base/{+path}", "api;key=base/a/b", {"key": "", "path": "a/b"}), + ("api{;key}=base/{+path}", "api;key=v=base/x", {"key": "v", "path": "x"}), + ], +) +def test_match_prefix_scan_edge_cases(template: str, uri: str, expected: dict[str, str]): + assert UriTemplate.parse(template).match(uri) == expected + + +@pytest.mark.parametrize( + ("template", "uri", "expected"), + [ + # Suffix-side ifemp: '=' inside the value is preserved — the + # value '=' is the first one after ;name, not the last. + ("item{;id}", "item;id=a=b", {"id": "a=b"}), + ("{;a}{;b}", ";a=x=y;b=z", {"a": "x=y", "b": "z"}), + ], +) +def test_match_suffix_ifemp_equals_in_value(template: str, uri: str, expected: dict[str, str]): + assert UriTemplate.parse(template).match(uri) == expected + + +def test_match_prefix_ifemp_empty_before_non_stop_literal(): + # Regression: _scan_prefix rejected the empty-value case when the + # following template literal starts with a non-stop-char. The + # name-continuation guard saw 'X' after ';key' and assumed the + # name continued, but 'X' is the template's next literal. + t = UriTemplate.parse("api{;key}X{+rest}") + # Non-empty round-trips fine: + assert t.match(t.expand({"key": "abc", "rest": "/tail"})) == {"key": "abc", "rest": "/tail"} + # Empty value (ifemp → bare ;key, then X) must also round-trip: + uri = t.expand({"key": "", "rest": "/tail"}) + assert uri == "api;keyX/tail" + assert t.match(uri) == {"key": "", "rest": "/tail"} + # But an actual name continuation still rejects: + assert t.match("api;keyZX/tail") is None + + +def test_match_large_uri_against_greedy_template(): + # Large payload against a greedy template — the scan visits each + # character once for the suffix anchor and once for the greedy + # validation, so this is O(n) not O(n²). + t = UriTemplate.parse("{+path}/end") + body = "seg/" * 15000 + result = t.match(body + "end") + assert result == {"path": body[:-1]} + # And the failing case returns None without retrying splits. + assert t.match(body + "nope") is None + + +def test_match_decodes_percent_encoding(): + t = UriTemplate.parse("file://docs/{name}") + assert t.match("file://docs/hello%20world.txt") == {"name": "hello world.txt"} + + +def test_match_escapes_template_literals(): + # Regression: previous impl didn't escape . in literals, making it + # a regex wildcard. "fileXtxt" should NOT match "file.txt/{id}". + t = UriTemplate.parse("file.txt/{id}") + assert t.match("file.txt/42") == {"id": "42"} + assert t.match("fileXtxt/42") is None + + +@pytest.mark.parametrize( + ("template", "uri", "expected"), + [ + # Percent-encoded delimiters round-trip through match/expand. + # Path-safety validation belongs to ResourceSecurity, not here. + ("file://docs/{name}", "file://docs/a%2Fb", {"name": "a/b"}), + ("{var}", "a%3Fb", {"var": "a?b"}), + ("{var}", "a%23b", {"var": "a#b"}), + ("{var}", "a%26b", {"var": "a&b"}), + ("file{.ext}", "file.a%2Eb", {"ext": "a.b"}), + ("api{/v}", "api/a%2Fb", {"v": "a/b"}), + ("search{?q}", "search?q=a%26b", {"q": "a&b"}), + ("{;filter}", ";filter=a%3Bb", {"filter": "a;b"}), + ], +) +def test_match_encoded_delimiters_roundtrip(template: str, uri: str, expected: dict[str, str]): + assert UriTemplate.parse(template).match(uri) == expected + + +def test_match_reserved_expansion_handles_slash(): + # {+var} allows literal / (not just encoded) + t = UriTemplate.parse("{+path}") + assert t.match("a%2Fb") == {"path": "a/b"} + assert t.match("a/b") == {"path": "a/b"} + + +def test_match_double_encoding_decoded_once(): + # %252F is %2F encoded again. Single decode gives "%2F" (a literal + # percent sign, a '2', and an 'F'). Guards against over-decoding. + t = UriTemplate.parse("file://docs/{name}") + assert t.match("file://docs/..%252Fetc") == {"name": "..%2Fetc"} + + +def test_match_rejects_oversized_uri(): + t = UriTemplate.parse("{var}") + assert t.match("x" * 100, max_uri_length=50) is None + + +def test_match_accepts_uri_within_custom_limit(): + t = UriTemplate.parse("{var}") + assert t.match("x" * 100, max_uri_length=200) == {"var": "x" * 100} + + +def test_match_default_uri_length_limit(): + t = UriTemplate.parse("{+var}") + # Just at the limit: should match + assert t.match("x" * DEFAULT_MAX_URI_LENGTH) is not None + # One over: should reject + assert t.match("x" * (DEFAULT_MAX_URI_LENGTH + 1)) is None + + +def test_match_explode_encoded_separator_in_segment(): + # An encoded separator inside a segment decodes as part of the value, + # not as a split point. The split happens at literal separators only. + t = UriTemplate.parse("/files{/path*}") + assert t.match("/files/a%2Fb/c") == {"path": ["a/b", "c"]} + + +@pytest.mark.parametrize( + ("template", "variables"), + [ + ("{var}", {"var": "hello"}), + ("file://docs/{name}", {"name": "readme.txt"}), + ("file://docs/{+path}", {"path": "src/main.py"}), + ("search{?q,lang}", {"q": "mcp", "lang": "en"}), + ("file{.ext}", {"ext": "txt"}), + ("/files{/path*}", {"path": ["a", "b", "c"]}), + ("{var}", {"var": "hello world"}), + ("item{;id}", {"id": "42"}), + ("item{;id}", {"id": ""}), + # Defined-but-empty values still emit the operator prefix; match + # must accept the empty capture after it. + ("page{#section}", {"section": ""}), + ("file{.ext}", {"ext": ""}), + ("api{/v}", {"v": ""}), + ("x{name}y", {"name": ""}), + ("item{;keys*}", {"keys": ["a", "b", "c"]}), + ("item{;keys*}", {"keys": ["a", "", "b"]}), + # Empty strings in explode lists round-trip for unnamed operators + ("{/path*}", {"path": ["a", "", "c"]}), + ("{/path*}", {"path": ["", "a"]}), + ("host{.labels*}", {"labels": ["a", "", "c"]}), + # Partial query expansion round-trips: expand omits undefined + # vars, match leaves them absent from the result. + ("logs://{service}{?since,level}", {"service": "api"}), + ("logs://{service}{?since,level}", {"service": "api", "since": "1h"}), + ("logs://{service}{?since,level}", {"service": "api", "since": "1h", "level": "error"}), + ("api{;key}=base/{+path}", {"key": "", "path": "a/b"}), + ], +) +def test_roundtrip_expand_then_match(template: str, variables: dict[str, str | list[str]]): + t = UriTemplate.parse(template) + uri = t.expand(variables) + assert t.match(uri) == variables + + +def test_match_simple_var_accepts_empty() -> None: + # RFC 6570 §3.2.2: {var} with var="" expands to nothing, so the inverse + # must accept it. v1.x's [^/]+ regex did not — see migration guide. + t = UriTemplate.parse("tickets://{ticket_id}") + assert t.match("tickets://") == {"ticket_id": ""} + assert t.match("tickets://42") == {"ticket_id": "42"} + + +# --- Property tests over the generated template space ------------------------ +# +# The two tests below generate template strings instead of enumerating +# examples, so the contracts they state are checked over the whole space +# `parse()` accepts in a single deterministic run. The generator deliberately +# produces strings the parser rejects (adjacent variables, two greedy +# variables, unsupported explode placements, a second `{?...}` expression) and +# relies on `parse()` to filter them: pre-selecting "known good" shapes would +# only ever exercise the shapes someone already thought of. + +_PROPERTY_SEED = 20260626 +_PROPERTY_OPERATORS = ["", "+", "#", ".", "/", ";", "?", "&"] +# Literal runs draw from URI punctuation (`- . / ~ _`) plus uppercase letters. +# Values draw only from lowercase letters and digits. The two alphabets are +# disjoint, so a round-trip failure can never be explained away as a value +# colliding with a literal, an operator prefix, or a separator. +_LITERAL_CHARS = "XY-._~/Z" +_VALUE_CHARS = string.ascii_lowercase + string.digits +_FUZZ_CHARS = string.printable + + +def _random_template(rng: random.Random) -> tuple[str, list[tuple[str, bool]]]: + """Build a candidate template string plus the (name, explode) spec of each variable.""" + parts: list[str] = [] + specs: list[tuple[str, bool]] = [] + for _ in range(rng.randint(1, 5)): + if rng.random() < 0.45: + parts.append("".join(rng.choice(_LITERAL_CHARS) for _ in range(rng.randint(1, 2)))) + continue + operator = rng.choice(_PROPERTY_OPERATORS) + names: list[str] = [] + # Multi-variable expressions and the explode modifier are produced for + # every operator; `parse()` rejects the combinations it does not allow. + for _ in range(2 if rng.random() < 0.2 else 1): + name = f"v{len(specs)}" + explode = rng.random() < 0.25 + specs.append((name, explode)) + names.append(f"{name}*" if explode else name) + parts.append("{" + operator + ",".join(names) + "}") + return "".join(parts), specs + + +def _random_value(rng: random.Random) -> str: + """Draw a short (possibly empty) value from the literal-disjoint alphabet.""" + return "".join(rng.choice(_VALUE_CHARS) for _ in range(rng.randint(0, 4))) + + +def _random_values(specs: list[tuple[str, bool]], rng: random.Random) -> dict[str, str | list[str]]: + """Draw a value for every variable: a string, or a non-empty list for explode variables.""" + return { + name: [_random_value(rng) for _ in range(rng.randint(1, 3))] if explode else _random_value(rng) + for name, explode in specs + } + + +def _mangled_inputs(uri: str, rng: random.Random) -> list[str]: + """Mangle one expansion into a batch of candidate inputs for `match()`.""" + candidates = [uri, "", uri[::-1], uri * 2] + for _ in range(6): + chars = list(uri) + mutation = rng.randint(0, 2) + if mutation == 0 and chars: + del chars[rng.randrange(len(chars))] + elif mutation == 1: + chars.insert(rng.randint(0, len(chars)), rng.choice(_FUZZ_CHARS)) + elif chars: + chars[rng.randrange(len(chars))] = rng.choice(_FUZZ_CHARS) + candidates.append("".join(chars)) + candidates.extend("".join(rng.choice(_FUZZ_CHARS) for _ in range(rng.randint(0, 30))) for _ in range(3)) + return candidates + + +def test_match_inverts_expand_for_every_parseable_template() -> None: + """For every template the parser accepts, matching the template's own expansion + yields a value set that re-expands to the same URI. + + Exact equality with the original values is not required: a different + pre-image (e.g. an explode list that flattens) is a correct answer as long + as it re-expands identically. SDK-defined contract — RFC 6570 specifies + only expansion, so `match()` is the inverse the SDK promises. + """ + rng = random.Random(_PROPERTY_SEED) + accepted = 0 + for _ in range(600): + template, specs = _random_template(rng) + try: + t = UriTemplate.parse(template) + except InvalidUriTemplate: + continue + accepted += 1 + for _ in range(2): + values = _random_values(specs, rng) + uri = t.expand(values) + got = t.match(uri) + assert got is not None, f"{template!r} did not match its own expansion {uri!r} of {values!r}" + assert t.expand(got) == uri, f"{template!r}: match({uri!r}) -> {got!r}, which re-expands differently" + # Floor the accepted count so the property can never go vacuous: a future + # change that rejects every generated template would otherwise pass silently. + assert accepted >= 150 + + +def test_match_never_raises() -> None: + """`match()` returns a dict or None for every input string; it never raises. + + Each accepted template's own expansion is mangled (a character inserted, + deleted, or replaced from a wide printable alphabet; emptied; reversed; + doubled) alongside fully random strings. SDK-defined contract — a URI that + does not fit the template is a non-match, not an error. + """ + rng = random.Random(_PROPERTY_SEED) + calls = 0 + for _ in range(600): + template, specs = _random_template(rng) + try: + t = UriTemplate.parse(template) + except InvalidUriTemplate: + continue + uri = t.expand(_random_values(specs, rng)) + for candidate in _mangled_inputs(uri, rng): + result = t.match(candidate) + assert result is None or isinstance(result, dict), f"{template!r}: match({candidate!r}) -> {result!r}" + calls += 1 + # Floor the call count so the property can never go vacuous: a future + # change that rejects every generated template would otherwise pass silently. + assert calls >= 4000