diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index b6128d3e0..7e2c8923d 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -646,7 +646,14 @@ async def _handle_post_request(self, scope: Scope, request: Request, receive: Re INTERNAL_ERROR, ) await response(scope, receive, send) - await writer.send(Exception(err)) + # The session's read stream may already be closed (e.g. the session task + # crashed and tore down its streams before this handler ran). Sending into a + # closed/broken stream here would raise a secondary error that masks the + # original one and surfaces as "Exception in ASGI application". Guard it. + try: + await writer.send(Exception(err)) + except (anyio.ClosedResourceError, anyio.BrokenResourceError): # pragma: lax no cover + pass return async def _handle_get_request(self, request: Request, send: Send) -> None: diff --git a/tests/server/test_streamable_http_router.py b/tests/server/test_streamable_http_router.py index 3086dca99..4b8501daa 100644 --- a/tests/server/test_streamable_http_router.py +++ b/tests/server/test_streamable_http_router.py @@ -114,3 +114,59 @@ async def asgi_send(message: Message) -> None: assert sent[0]["status"] == 500 body = b"".join(m.get("body", b"") for m in sent if m["type"] == "http.response.body") assert b"backend unavailable" not in body + + +@pytest.mark.anyio +async def test_post_error_path_tolerates_closed_session_stream() -> None: + """The error path must not raise a secondary error when the read stream is gone (#2741). + + When a session task crashes it tears down its read stream before the concurrent + POST handler reaches its `except` block. The trailing ``writer.send(Exception(err))`` + then targets a closed/broken stream. Without a guard that raises a secondary + ``ClosedResourceError``/``BrokenResourceError`` out of the ASGI app, masking the + original error and surfacing as "Exception in ASGI application". The 500 response + must already be delivered and ``handle_request`` must return cleanly. + """ + transport = StreamableHTTPServerTransport( + mcp_session_id=None, + is_json_response_enabled=False, + event_store=_PrimingFailingStore(), + ) + + body = b'{"jsonrpc":"2.0","id":"req-1","method":"tools/list","params":{}}' + scope: Scope = { + "type": "http", + "method": "POST", + "path": "/", + "query_string": b"", + "headers": [ + (b"accept", b"application/json, text/event-stream"), + (b"content-type", b"application/json"), + (b"mcp-protocol-version", b"2025-11-25"), + ], + } + body_sent = False + + async def receive() -> Message: + nonlocal body_sent + if not body_sent: + body_sent = True + return {"type": "http.request", "body": body, "more_body": False} + raise NotImplementedError + + sent: list[Message] = [] + + async def asgi_send(message: Message) -> None: + sent.append(message) + + async with transport.connect() as (read_stream, _write_stream): + # Model the crashed-session teardown: the read stream the POST handler would + # send the wrapped error into is already closed before the handler runs. + await read_stream.aclose() + # Must not raise out of the ASGI app despite the closed stream. + await transport.handle_request(scope, receive, asgi_send) + + assert sent[0]["type"] == "http.response.start" + assert sent[0]["status"] == 500 + body = b"".join(m.get("body", b"") for m in sent if m["type"] == "http.response.body") + assert b"backend unavailable" not in body