Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .github/actions/conformance/expected-failures.2026-07-28.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ client:
# neither run nor evaluated on this leg.

server:
# The stateless 2026 path now reaches handlers for plain request/response
# scenarios; tools-call-with-progress still fails because the stateless
# server has no channel for server→client progress notifications.
- tools-call-with-progress
# SEP-2322 (multi-round-trip requests / IncompleteResult): the prompt pipeline
# cannot return InputRequiredResult from MCPServer yet (tools/call can).
- input-required-result-non-tool-request
2 changes: 0 additions & 2 deletions examples/stories/manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ era = "dual-in-body"
multi_connection = true

[story.streaming]
# progress + log notifications dropped on the modern streamable-HTTP path pending SSE wiring
xfail = ["http-asgi:modern"]

[story.legacy_elicitation]
era = "legacy"
Expand Down
13 changes: 3 additions & 10 deletions examples/stories/streaming/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@ uv run python -m stories.streaming.client
uv run python -m stories.streaming.client --server server_lowlevel

# HTTP — the client self-hosts the server on a free port, runs, then tears it
# down (--legacy: see the note below)
uv run python -m stories.streaming.client --http --legacy
# down
uv run python -m stories.streaming.client --http
# same, against the lowlevel-API server variant
uv run python -m stories.streaming.client --http --legacy --server server_lowlevel
uv run python -m stories.streaming.client --http --server server_lowlevel
```

The modern HTTP leg (drop `--legacy`) is `xfail` until the SSE wiring lands —
mid-call progress and log notifications are currently dropped there (see
Caveats).

## What to look at

- `client.py` `main` — opens with `async with Client(target, mode=mode,
Expand Down Expand Up @@ -60,9 +56,6 @@ Caveats).
OpenTelemetry instead of `notifications/message`. It is shown here because
servers still need to support 2025-era clients during that window. Progress
and cancellation are **not** deprecated. TODO(maxisbey): revisit before beta.
- On the modern (2026-07-28) streamable-HTTP path, mid-call progress and log
notifications are currently dropped pending the SSE wiring; the
`http-asgi:modern` leg of this story is `xfail` until that lands.
- When a request is cancelled the server currently replies with
`ErrorData(code=0, message="Request cancelled")`; the spec says it should not
reply at all. The client never observes it (its awaiting task is already
Expand Down
7 changes: 5 additions & 2 deletions src/mcp/client/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ async def _handle_sse_event(
# Otherwise, return False to continue listening
return isinstance(message, JSONRPCResponse | JSONRPCError)

except Exception as exc: # pragma: no cover
# Forwarding to a closed read stream lands here when the caller cancels mid-SSE
# (BrokenResourceError, not a parse failure); coverage is timing-dependent in the
# streaming story's modern HTTP cancellation leg.
except Exception as exc: # pragma: lax no cover
logger.exception("Error parsing SSE message")
if original_request_id is not None:
error_data = ErrorData(code=PARSE_ERROR, message=f"Failed to parse SSE message: {exc}")
Expand Down Expand Up @@ -372,7 +375,7 @@ async def _handle_sse_response(
await response.aclose()
return # Normal completion, no reconnect needed
except Exception:
logger.debug("SSE stream ended", exc_info=True) # pragma: no cover
logger.debug("SSE stream ended", exc_info=True) # pragma: lax no cover

# Stream ended without response - reconnect if we received an event with ID
if last_event_id is not None: # pragma: no branch
Expand Down
139 changes: 123 additions & 16 deletions src/mcp/server/_streamable_http_modern.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
path for earlier protocol revisions.

A 2026-07-28 request is a self-contained POST: no `initialize` handshake, no
`Mcp-Session-Id`, one JSON-RPC request in, one JSON-RPC response out. This
module handles such a request directly in the ASGI task - no memory streams,
no per-request task group, no `JSONRPCDispatcher`.
`Mcp-Session-Id`, one JSON-RPC request in, one JSON-RPC response out. JSON
mode handles the request directly in the ASGI task. SSE mode runs the handler
as a sibling task and defers committing to `text/event-stream` until the
handler emits a notification or `_SSE_PING_INTERVAL` elapses, whichever
comes first: a handler that completes (or raises) within that window without
emitting still gets a JSON response with the table-mapped HTTP status, so
the spec's `404`/`400` MUSTs hold for kernel-dispatch errors; a handler that
runs silent past the window commits SSE so the keepalive ping can keep the
connection open behind a proxy idle-read timeout.
"""

from __future__ import annotations
Expand All @@ -16,9 +22,10 @@
import logging
from collections.abc import Awaitable, Mapping
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, TypeVar
from typing import TYPE_CHECKING, Any, Final, TypeVar

import anyio
from anyio.streams.memory import MemoryObjectSendStream
from mcp_types import (
INTERNAL_ERROR,
INVALID_REQUEST,
Expand All @@ -27,8 +34,10 @@
ErrorData,
Implementation,
JSONRPCError,
JSONRPCNotification,
JSONRPCRequest,
JSONRPCResponse,
ProgressToken,
RequestId,
)
from pydantic import BaseModel, ValidationError
Expand All @@ -38,6 +47,7 @@

from mcp.server.connection import Connection
from mcp.server.runner import serve_one
from mcp.server.streamable_http import check_accept_headers
from mcp.server.transport_security import TransportSecurityMiddleware, TransportSecuritySettings
from mcp.shared.dispatcher import CallOptions
from mcp.shared.exceptions import NoBackChannelError
Expand All @@ -46,7 +56,7 @@
InboundLadderRejection,
classify_inbound_request,
)
from mcp.shared.jsonrpc_dispatcher import handler_exception_to_error_data
from mcp.shared.jsonrpc_dispatcher import handler_exception_to_error_data, progress_token_from_params
from mcp.shared.message import MessageMetadata, ServerMessageMetadata
from mcp.shared.transport_context import TransportContext

Expand All @@ -66,12 +76,15 @@ class _SingleExchangeDispatchContext:

Structurally satisfies `mcp.shared.dispatcher.DispatchContext`. The
back-channel is closed by construction: a 2026-07-28 server cannot send
requests to the client.
requests to the client. The SSE sink, when present, carries request-scoped
notifications onto this request's response stream.
"""

transport: TransportContext
request_id: RequestId
message_metadata: MessageMetadata
progress_token: ProgressToken | None = None
sink: MemoryObjectSendStream[bytes] | None = None
cancel_requested: anyio.Event = field(default_factory=anyio.Event)
can_send_request: bool = field(default=False, init=False)

Expand All @@ -84,12 +97,23 @@ async def send_raw_request(
raise NoBackChannelError(method)

async def notify(self, method: str, params: Mapping[str, Any] | None, opts: CallOptions | None = None) -> None:
# TODO(D-005a): buffer and stream as SSE once the JSON-vs-SSE response mode lands.
return None
if self.sink is None:
return
body = dict(params) if params is not None else None
try:
await self.sink.send(_sse_event(JSONRPCNotification(jsonrpc="2.0", method=method, params=body)))
except (anyio.ClosedResourceError, anyio.BrokenResourceError):
logger.debug("dropped %s: response stream closed", method)

async def progress(self, progress: float, total: float | None = None, message: str | None = None) -> None:
# TODO(D-005a): no progressToken plumbing yet; ships with the SSE response mode.
return None
if self.progress_token is None:
return
params: dict[str, Any] = {"progressToken": self.progress_token, "progress": progress}
if total is not None:
params["total"] = total
if message is not None:
params["message"] = message
await self.notify("notifications/progress", params)


def _typed(model: type[_ModelT], raw: Any) -> _ModelT | None:
Expand Down Expand Up @@ -126,6 +150,28 @@ async def _to_jsonrpc_response(
return JSONRPCResponse(jsonrpc="2.0", id=request_id, result=result)


_SSE_PING_INTERVAL: float = 15.0
"""Seconds between SSE comment-line keepalives once `text/event-stream` has committed."""

_SSE_HEADERS: Final[list[tuple[bytes, bytes]]] = [
(b"content-type", b"text/event-stream"),
(b"cache-control", b"no-cache, no-transform"),
(b"connection", b"keep-alive"),
(b"x-accel-buffering", b"no"),
]


def _sse_event(msg: JSONRPCResponse | JSONRPCError | JSONRPCNotification) -> bytes:
"""Serialise a JSON-RPC message as one SSE `event: message` frame.

SSE mode begins after the handler has emitted, so a `JSONRPCError` here
always carries the request's id; the `id: null` case lives in `_write`.
"""
body = msg.model_dump(mode="json", by_alias=True, exclude_none=True)
data = json.dumps(body, separators=(",", ":"))
return f"event: message\r\ndata: {data}\r\n\r\n".encode()


async def _write(
msg: JSONRPCResponse | JSONRPCError,
scope: Scope,
Expand All @@ -149,6 +195,7 @@ async def _write(
async def handle_modern_request(
app: Server[Any],
security_settings: TransportSecuritySettings | None,
json_response: bool,
lifespan_state: Any,
scope: Scope,
receive: Receive,
Expand All @@ -169,14 +216,17 @@ async def handle_modern_request(
await err(scope, receive, send)
return

# TODO(D-005a): validate Accept once the JSON-vs-SSE response mode is settled.

if request.method != "POST":
# HTTP-layer rejection (Allow accompanies 405 per RFC 9110) — happens
# before JSON-RPC parsing, so it doesn't go through `_write`.
await Response(status_code=405, headers={"Allow": "POST"})(scope, receive, send)
return

has_json, has_sse = check_accept_headers(request)
Comment thread
maxisbey marked this conversation as resolved.
if not has_json or (not json_response and not has_sse):
await Response(status_code=406)(scope, receive, send)
return

body = await request.body()
try:
decoded = json.loads(body)
Expand Down Expand Up @@ -219,8 +269,65 @@ async def handle_modern_request(
transport=TransportContext(kind="streamable-http", can_send_request=False, headers=request.headers),
request_id=req.id,
message_metadata=ServerMessageMetadata(request_context=request),
progress_token=progress_token_from_params(req.params),
)
msg = await _to_jsonrpc_response(
req.id, serve_one(app, dctx, req.method, req.params, connection=connection, lifespan_state=lifespan_state)
)
await _write(msg, scope, receive, send)

if json_response:
msg = await _to_jsonrpc_response(
req.id, serve_one(app, dctx, req.method, req.params, connection=connection, lifespan_state=lifespan_state)
)
await _write(msg, scope, receive, send)
return

send_ch, recv_ch = anyio.create_memory_object_stream[bytes](0)
dctx.sink = send_ch
result: list[JSONRPCResponse | JSONRPCError] = []

async def run_handler() -> None:
async with send_ch:
result.append(
await _to_jsonrpc_response(
req.id,
serve_one(app, dctx, req.method, req.params, connection=connection, lifespan_state=lifespan_state),
)
)

async def watch_disconnect(cancel_scope: anyio.CancelScope) -> None:
while (await receive()).get("type") != "http.disconnect":
pass # pragma: no cover
cancel_scope.cancel()

async with recv_ch, anyio.create_task_group() as tg:
tg.start_soon(run_handler)
tg.start_soon(watch_disconnect, tg.cancel_scope)

event: bytes | None = None
done = False
with anyio.move_on_after(_SSE_PING_INTERVAL):
try:
event = await recv_ch.receive()
except anyio.EndOfStream:
done = True

if done:
# Handler completed within the deferral window without emitting:
# `application/json` with the table-mapped status. Kernel-dispatch
# errors (METHOD_NOT_FOUND, missing-capability, INVALID_PARAMS)
# resolve here in practice.
await _write(result[0], scope, receive, send)
else:
# First notification arrived, or the deferral window elapsed: commit
# `text/event-stream` and start pinging so a proxy idle-read timeout
# cannot close the stream (which on this path cancels the handler).
await send({"type": "http.response.start", "status": _OK_STATUS, "headers": _SSE_HEADERS})
Comment thread
maxisbey marked this conversation as resolved.
while not done:
await send({"type": "http.response.body", "body": event or b": ping\r\n\r\n", "more_body": True})
event = None
with anyio.move_on_after(_SSE_PING_INTERVAL):
try:
event = await recv_ch.receive()
except anyio.EndOfStream:
done = True
await send({"type": "http.response.body", "body": _sse_event(result[0]), "more_body": False})

tg.cancel_scope.cancel()
39 changes: 20 additions & 19 deletions src/mcp/server/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,24 @@
SSEEvent = dict[str, Any]


def check_accept_headers(request: Request) -> tuple[bool, bool]:
"""Return (has_json, has_sse) for the request's Accept header, with RFC 7231 wildcard handling.

Supports wildcard media types per RFC 7231, section 5.3.2:
- */* matches any media type
- application/* matches any application/ subtype
- text/* matches any text/ subtype
"""
accept_header = request.headers.get("accept", "")
accept_types = [media_type.strip().split(";")[0].strip().lower() for media_type in accept_header.split(",")]
Comment thread
maxisbey marked this conversation as resolved.

has_wildcard = "*/*" in accept_types
has_json = has_wildcard or any(t in (CONTENT_TYPE_JSON, "application/*") for t in accept_types)
has_sse = has_wildcard or any(t in (CONTENT_TYPE_SSE, "text/*") for t in accept_types)

return has_json, has_sse


@dataclass
class EventMessage:
"""A JSONRPCMessage with an optional event ID for stream resumability."""
Expand Down Expand Up @@ -415,23 +433,6 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No
else:
await self._handle_unsupported_request(request, send)

def _check_accept_headers(self, request: Request) -> tuple[bool, bool]:
"""Check if the request accepts the required media types.

Supports wildcard media types per RFC 7231, section 5.3.2:
- */* matches any media type
- application/* matches any application/ subtype
- text/* matches any text/ subtype
"""
accept_header = request.headers.get("accept", "")
accept_types = [media_type.strip().split(";")[0].strip().lower() for media_type in accept_header.split(",")]

has_wildcard = "*/*" in accept_types
has_json = has_wildcard or any(t in (CONTENT_TYPE_JSON, "application/*") for t in accept_types)
has_sse = has_wildcard or any(t in (CONTENT_TYPE_SSE, "text/*") for t in accept_types)

return has_json, has_sse

def _check_content_type(self, request: Request) -> bool:
"""Check if the request has the correct Content-Type."""
content_type = request.headers.get("content-type", "")
Expand All @@ -441,7 +442,7 @@ def _check_content_type(self, request: Request) -> bool:

async def _validate_accept_header(self, request: Request, scope: Scope, send: Send) -> bool:
"""Validate Accept header based on response mode. Returns True if valid."""
has_json, has_sse = self._check_accept_headers(request)
has_json, has_sse = check_accept_headers(request)
if self.is_json_response_enabled:
# For JSON-only responses, only require application/json
if not has_json:
Expand Down Expand Up @@ -661,7 +662,7 @@ async def _handle_get_request(self, request: Request, send: Send) -> None:
raise ValueError("No read stream writer available. Ensure connect() is called first.")

# Validate Accept header - must include text/event-stream
_, has_sse = self._check_accept_headers(request)
_, has_sse = check_accept_headers(request)

if not has_sse:
response = self._create_error_response(
Expand Down
4 changes: 3 additions & 1 deletion src/mcp/server/streamable_http_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> No
header = MCP_PROTOCOL_VERSION_HEADER.encode("ascii")
pv = next((v.decode("latin-1") for k, v in scope["headers"] if k == header), None)
if pv is not None and pv not in HANDSHAKE_PROTOCOL_VERSIONS:
await handle_modern_request(self.app, self.security_settings, self._lifespan_state, scope, receive, send)
await handle_modern_request(
self.app, self.security_settings, self.json_response, self._lifespan_state, scope, receive, send
)
return

# Dispatch to the appropriate handler
Expand Down
Loading
Loading