From 86418f023746fbd26de20f72c112748eabb9808c Mon Sep 17 00:00:00 2001 From: Bartok9 Date: Fri, 26 Jun 2026 02:03:21 -0400 Subject: [PATCH] fix(server): guard error-path send into closed session stream Closes #2741. When a stateful Streamable HTTP session task crashes, it tears down its read stream before the concurrent POST handler reaches its except block. The trailing `await writer.send(Exception(err))` then targets an already closed/broken stream and raises a secondary ClosedResourceError that escapes the ASGI app ("Exception in ASGI application"), masking the original failure. Guard the trailing send with except (ClosedResourceError, BrokenResourceError): pass, mirroring the existing closed-stream guards in this module. The 500 response is already delivered before this send, so swallowing the secondary error preserves client behavior while removing the noisy, misleading traceback. Regression test models the crashed-session teardown by pre-closing the read stream; it fails without the guard (ClosedResourceError out of handle_request) and passes with it. --- src/mcp/server/streamable_http.py | 9 +++- tests/server/test_streamable_http_router.py | 56 +++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index b6128d3e0..7e2c8923d 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -646,7 +646,14 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re INTERNAL_ERROR, ) await response(scope, receive, send) - await writer.send(Exception(err)) + # The session's read stream may already be closed (e.g. the session task + # crashed and tore down its streams before this handler ran). Sending into a + # closed/broken stream here would raise a secondary error that masks the + # original one and surfaces as "Exception in ASGI application". Guard it. + try: + await writer.send(Exception(err)) + except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: lax no cover + pass return async def _handle_get_request(self, request: Request, send: Send) -> None: diff --git a/tests/server/test_streamable_http_router.py b/tests/server/test_streamable_http_router.py index 3086dca99..4b8501daa 100644 --- a/tests/server/test_streamable_http_router.py +++ b/tests/server/test_streamable_http_router.py @@ -114,3 +114,59 @@ async def asgi_send(message: Message) -> None: assert sent[0]["status"] == 500 body = b"".join(m.get("body", b"") for m in sent if m["type"] == "http.response.body") assert b"backend unavailable" not in body + + +@pytest.mark.anyio +async def test_post_error_path_tolerates_closed_session_stream() -> None: + """The error path must not raise a secondary error when the read stream is gone (#2741). + + When a session task crashes it tears down its read stream before the concurrent + POST handler reaches its `except` block. The trailing ``writer.send(Exception(err))`` + then targets a closed/broken stream. Without a guard that raises a secondary + ``ClosedResourceError``/``BrokenResourceError`` out of the ASGI app, masking the + original error and surfacing as "Exception in ASGI application". The 500 response + must already be delivered and ``handle_request`` must return cleanly. + """ + transport = StreamableHTTPServerTransport( + mcp_session_id=None, + is_json_response_enabled=False, + event_store=_PrimingFailingStore(), + ) + + body = b'{"jsonrpc":"2.0","id":"req-1","method":"tools/list","params":{}}' + scope: Scope = { + "type": "http", + "method": "POST", + "path": "/", + "query_string": b"", + "headers": [ + (b"accept", b"application/json, text/event-stream"), + (b"content-type", b"application/json"), + (b"mcp-protocol-version", b"2025-11-25"), + ], + } + body_sent = False + + async def receive() -> Message: + nonlocal body_sent + if not body_sent: + body_sent = True + return {"type": "http.request", "body": body, "more_body": False} + raise NotImplementedError + + sent: list[Message] = [] + + async def asgi_send(message: Message) -> None: + sent.append(message) + + async with transport.connect() as (read_stream, _write_stream): + # Model the crashed-session teardown: the read stream the POST handler would + # send the wrapped error into is already closed before the handler runs. + await read_stream.aclose() + # Must not raise out of the ASGI app despite the closed stream. + await transport.handle_request(scope, receive, asgi_send) + + assert sent[0]["type"] == "http.response.start" + assert sent[0]["status"] == 500 + body = b"".join(m.get("body", b"") for m in sent if m["type"] == "http.response.body") + assert b"backend unavailable" not in body