From 428ed39a1adf1bc0f0911bfd2d44379baf5ca580 Mon Sep 17 00:00:00 2001 From: majdyz Date: Sat, 11 Apr 2026 11:04:50 +0000 Subject: [PATCH] fix(copilot/sdk-proxy): abort transport on mid-stream upstream error MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous fix set a ``stream_error`` flag and returned the prepared ``StreamResponse`` without calling ``write_eof()``, assuming aiohttp would leave the body dangling. It doesn't: aiohttp's handler dispatcher finalises any returned ``StreamResponse`` on the way out (writing the chunked terminator / content-length / EOF), so a regression test with a real mid-stream failure still saw the client get a clean 200 body. Correct fix: on the stream-error path, abort the underlying transport directly via ``request.transport.abort()`` and then re-raise the original stream error out of the handler. Aborting drops the TCP socket mid-response so the client's parser surfaces a ``ClientPayloadError`` / ``ServerDisconnectedError`` and the caller sees the truncation as a real transport failure. Also rewrote the regression test to use a raw ``asyncio.start_server`` TCP handler that sends a chunked response header plus one partial chunk and then hard-closes the socket (``transport.abort()``) — this is the one failure mode that reliably propagates through aiohttp's ``iter_any()`` as a ``ClientError`` for the proxy to detect. Verified locally: the test now fails with the expected ``ClientPayloadError`` on the client side instead of silently returning 200. --- .../copilot/sdk/openrouter_compat_proxy.py | 30 +++++-- .../sdk/openrouter_compat_proxy_test.py | 87 +++++++++++-------- 2 files changed, 73 insertions(+), 44 deletions(-) diff --git a/autogpt_platform/backend/backend/copilot/sdk/openrouter_compat_proxy.py b/autogpt_platform/backend/backend/copilot/sdk/openrouter_compat_proxy.py index 2c03d94ae1..dc1b9adbd8 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/openrouter_compat_proxy.py +++ b/autogpt_platform/backend/backend/copilot/sdk/openrouter_compat_proxy.py @@ -469,18 +469,32 @@ class OpenRouterCompatProxy: upstream_response.release() if stream_error is not None: - # Do NOT call ``write_eof`` — that would signal a clean end - # of stream to the client on top of a truncated body. - # Mark the connection for close (``Connection: close``) and - # skip the EOF so the aiohttp writer drops the connection - # mid-response. The client's parser then raises a - # transport error and the caller can retry / surface the - # failure instead of silently consuming a corrupt body. + # Do NOT call ``write_eof`` or return the prepared + # ``downstream`` here — aiohttp finalises a returned + # StreamResponse (writing the terminating chunk / + # content-length / EOF) even if we skipped ``write_eof`` + # ourselves, which would signal a clean end of stream to + # the client on top of the truncated body. Instead abort + # the underlying transport directly so the client's + # parser surfaces a ``ClientPayloadError`` / + # ``ServerDisconnectedError`` and the caller can retry / + # surface the failure instead of silently consuming a + # corrupt body. try: downstream.force_close() except Exception: # pragma: no cover - defensive on transport pass - return downstream + transport = request.transport + if transport is not None: + try: + transport.abort() + except Exception: # pragma: no cover - defensive on transport + pass + # Re-raise the original stream error so aiohttp treats + # this handler as having failed; the transport is + # already aborted above so the client sees an abrupt + # disconnect either way. + raise stream_error await downstream.write_eof() return downstream diff --git a/autogpt_platform/backend/backend/copilot/sdk/openrouter_compat_proxy_test.py b/autogpt_platform/backend/backend/copilot/sdk/openrouter_compat_proxy_test.py index 5f408d16e6..c98711e24f 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/openrouter_compat_proxy_test.py +++ b/autogpt_platform/backend/backend/copilot/sdk/openrouter_compat_proxy_test.py @@ -591,56 +591,71 @@ async def test_proxy_does_not_signal_clean_eof_on_mid_stream_error(): client only saw a truncated body. Instead the proxy drops the connection so the client's parser surfaces a transport error. - We simulate the failure by giving the proxy an upstream that - closes the TCP socket mid-response. The proxy must either drop - the client connection (``aiohttp.ClientPayloadError`` / - ``ClientConnectionError``) or — if aiohttp masks it — at least - not report an ``ok`` complete body. + We simulate the failure with a raw asyncio TCP server that + sends a chunked-encoding response header plus one partial chunk + and then hard-closes the socket — this is the one failure mode + aiohttp's ``iter_any()`` reliably surfaces as an + ``aiohttp.ClientError`` rather than an ordinary clean EOF. """ class _TruncatingUpstream: - """Upstream that starts sending a response then kills the - connection before ``write_eof`` — mimicking a backend that - dies mid-stream.""" + """Raw TCP server that sends a partial chunked body then + closes the socket without writing the terminating chunk.""" def __init__(self) -> None: - self._runner: web.AppRunner | None = None + self._server: asyncio.base_events.Server | None = None self.port: int = 0 async def start(self) -> str: - async def handler(request: web.Request) -> web.StreamResponse: - resp = web.StreamResponse( - status=200, - headers={"Content-Type": "application/octet-stream"}, - ) - await resp.prepare(request) - await resp.write(b"partial-") - # Force-close without write_eof so the proxy's - # iter_any() raises mid-stream. - resp.force_close() - return resp + async def handle_conn( + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + ) -> None: + try: + # Read and discard the request until the blank + # line — we don't care what the proxy sends. + while True: + line = await reader.readline() + if not line or line == b"\r\n": + break + # Chunked response with one partial chunk. + writer.write( + b"HTTP/1.1 200 OK\r\n" + b"Content-Type: application/octet-stream\r\n" + b"Transfer-Encoding: chunked\r\n" + b"Connection: close\r\n" + b"\r\n" + # One chunk, size 8, content "partial-". + b"8\r\n" + b"partial-\r\n" + # Deliberately DO NOT send the terminating + # "0\r\n\r\n" — this is the mid-stream + # truncation we're testing. + ) + await writer.drain() + finally: + # Hard-close the socket so the proxy's + # iter_any() sees an abrupt end-of-stream. + try: + writer.transport.abort() + except Exception: + pass - app = web.Application() - app.router.add_route("*", "/{tail:.*}", handler) - self._runner = web.AppRunner(app) - await self._runner.setup() - site = web.TCPSite(self._runner, "127.0.0.1", 0) - await site.start() - server = site._server - assert server is not None - sockets = getattr(server, "sockets", None) + self._server = await asyncio.start_server(handle_conn, "127.0.0.1", 0) + sockets = self._server.sockets assert sockets is not None self.port = sockets[0].getsockname()[1] return f"http://127.0.0.1:{self.port}" async def stop(self) -> None: - if self._runner is not None: - await self._runner.cleanup() - self._runner = None + if self._server is not None: + self._server.close() + await self._server.wait_closed() + self._server = None upstream = _TruncatingUpstream() upstream_url = await upstream.start() - proxy = OpenRouterCompatProxy(target_base_url=upstream_url) + proxy = OpenRouterCompatProxy(target_base_url=upstream_url, request_timeout=5.0) await proxy.start() try: async with aiohttp.ClientSession() as client: @@ -653,9 +668,9 @@ async def test_proxy_does_not_signal_clean_eof_on_mid_stream_error(): ) as resp: # The client should see either an error raising # here or a truncated body followed by a - # transport-level failure on read — both are - # acceptable because both surface the truncation - # instead of silently reporting success. + # transport-level failure on read — both surface + # the truncation instead of silently reporting + # success. await resp.read() except ( aiohttp.ClientPayloadError,