Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/mcp/server/streamable_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,14 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re
INTERNAL_ERROR,
)
await response(scope, receive, send)
await writer.send(Exception(err))
# The session's read stream may already be closed (e.g. the session task
# crashed and tore down its streams before this handler ran). Sending into a
# closed/broken stream here would raise a secondary error that masks the
# original one and surfaces as "Exception in ASGI application". Guard it.
try:
await writer.send(Exception(err))
except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: lax no cover
pass
return

async def _handle_get_request(self, request: Request, send: Send) -> None:
Expand Down
56 changes: 56 additions & 0 deletions tests/server/test_streamable_http_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,59 @@ async def asgi_send(message: Message) -> None:
assert sent[0]["status"] == 500
body = b"".join(m.get("body", b"") for m in sent if m["type"] == "http.response.body")
assert b"backend unavailable" not in body


@pytest.mark.anyio
async def test_post_error_path_tolerates_closed_session_stream() -> None:
"""The error path must not raise a secondary error when the read stream is gone (#2741).

When a session task crashes it tears down its read stream before the concurrent
POST handler reaches its `except` block. The trailing ``writer.send(Exception(err))``
then targets a closed/broken stream. Without a guard that raises a secondary
``ClosedResourceError``/``BrokenResourceError`` out of the ASGI app, masking the
original error and surfacing as "Exception in ASGI application". The 500 response
must already be delivered and ``handle_request`` must return cleanly.
"""
transport = StreamableHTTPServerTransport(
mcp_session_id=None,
is_json_response_enabled=False,
event_store=_PrimingFailingStore(),
)

body = b'{"jsonrpc":"2.0","id":"req-1","method":"tools/list","params":{}}'
scope: Scope = {
"type": "http",
"method": "POST",
"path": "/",
"query_string": b"",
"headers": [
(b"accept", b"application/json, text/event-stream"),
(b"content-type", b"application/json"),
(b"mcp-protocol-version", b"2025-11-25"),
],
}
body_sent = False

async def receive() -> Message:
nonlocal body_sent
if not body_sent:
body_sent = True
return {"type": "http.request", "body": body, "more_body": False}
raise NotImplementedError

sent: list[Message] = []

async def asgi_send(message: Message) -> None:
sent.append(message)

async with transport.connect() as (read_stream, _write_stream):
# Model the crashed-session teardown: the read stream the POST handler would
# send the wrapped error into is already closed before the handler runs.
await read_stream.aclose()
# Must not raise out of the ASGI app despite the closed stream.
await transport.handle_request(scope, receive, asgi_send)

assert sent[0]["type"] == "http.response.start"
assert sent[0]["status"] == 500
body = b"".join(m.get("body", b"") for m in sent if m["type"] == "http.response.body")
assert b"backend unavailable" not in body
Loading