mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
fix(copilot/sdk-proxy): abort transport on mid-stream upstream error
The previous fix set a ``stream_error`` flag and returned the prepared ``StreamResponse`` without calling ``write_eof()``, assuming aiohttp would leave the body dangling. It doesn't: aiohttp's handler dispatcher finalises any returned ``StreamResponse`` on the way out (writing the chunked terminator / content-length / EOF), so a regression test with a real mid-stream failure still saw the client get a clean 200 body. Correct fix: on the stream-error path, abort the underlying transport directly via ``request.transport.abort()`` and then re-raise the original stream error out of the handler. Aborting drops the TCP socket mid-response so the client's parser surfaces a ``ClientPayloadError`` / ``ServerDisconnectedError`` and the caller sees the truncation as a real transport failure. Also rewrote the regression test to use a raw ``asyncio.start_server`` TCP handler that sends a chunked response header plus one partial chunk and then hard-closes the socket (``transport.abort()``) — this is the one failure mode that reliably propagates through aiohttp's ``iter_any()`` as a ``ClientError`` for the proxy to detect. Verified locally: the test now fails with the expected ``ClientPayloadError`` on the client side instead of silently returning 200.
This commit is contained in:
@@ -469,18 +469,32 @@ class OpenRouterCompatProxy:
|
||||
upstream_response.release()
|
||||
|
||||
if stream_error is not None:
|
||||
# Do NOT call ``write_eof`` — that would signal a clean end
|
||||
# of stream to the client on top of a truncated body.
|
||||
# Mark the connection for close (``Connection: close``) and
|
||||
# skip the EOF so the aiohttp writer drops the connection
|
||||
# mid-response. The client's parser then raises a
|
||||
# transport error and the caller can retry / surface the
|
||||
# failure instead of silently consuming a corrupt body.
|
||||
# Do NOT call ``write_eof`` or return the prepared
|
||||
# ``downstream`` here — aiohttp finalises a returned
|
||||
# StreamResponse (writing the terminating chunk /
|
||||
# content-length / EOF) even if we skipped ``write_eof``
|
||||
# ourselves, which would signal a clean end of stream to
|
||||
# the client on top of the truncated body. Instead abort
|
||||
# the underlying transport directly so the client's
|
||||
# parser surfaces a ``ClientPayloadError`` /
|
||||
# ``ServerDisconnectedError`` and the caller can retry /
|
||||
# surface the failure instead of silently consuming a
|
||||
# corrupt body.
|
||||
try:
|
||||
downstream.force_close()
|
||||
except Exception: # pragma: no cover - defensive on transport
|
||||
pass
|
||||
return downstream
|
||||
transport = request.transport
|
||||
if transport is not None:
|
||||
try:
|
||||
transport.abort()
|
||||
except Exception: # pragma: no cover - defensive on transport
|
||||
pass
|
||||
# Re-raise the original stream error so aiohttp treats
|
||||
# this handler as having failed; the transport is
|
||||
# already aborted above so the client sees an abrupt
|
||||
# disconnect either way.
|
||||
raise stream_error
|
||||
|
||||
await downstream.write_eof()
|
||||
return downstream
|
||||
|
||||
@@ -591,56 +591,71 @@ async def test_proxy_does_not_signal_clean_eof_on_mid_stream_error():
|
||||
client only saw a truncated body. Instead the proxy drops the
|
||||
connection so the client's parser surfaces a transport error.
|
||||
|
||||
We simulate the failure by giving the proxy an upstream that
|
||||
closes the TCP socket mid-response. The proxy must either drop
|
||||
the client connection (``aiohttp.ClientPayloadError`` /
|
||||
``ClientConnectionError``) or — if aiohttp masks it — at least
|
||||
not report an ``ok`` complete body.
|
||||
We simulate the failure with a raw asyncio TCP server that
|
||||
sends a chunked-encoding response header plus one partial chunk
|
||||
and then hard-closes the socket — this is the one failure mode
|
||||
aiohttp's ``iter_any()`` reliably surfaces as an
|
||||
``aiohttp.ClientError`` rather than an ordinary clean EOF.
|
||||
"""
|
||||
|
||||
class _TruncatingUpstream:
|
||||
"""Upstream that starts sending a response then kills the
|
||||
connection before ``write_eof`` — mimicking a backend that
|
||||
dies mid-stream."""
|
||||
"""Raw TCP server that sends a partial chunked body then
|
||||
closes the socket without writing the terminating chunk."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._runner: web.AppRunner | None = None
|
||||
self._server: asyncio.base_events.Server | None = None
|
||||
self.port: int = 0
|
||||
|
||||
async def start(self) -> str:
|
||||
async def handler(request: web.Request) -> web.StreamResponse:
|
||||
resp = web.StreamResponse(
|
||||
status=200,
|
||||
headers={"Content-Type": "application/octet-stream"},
|
||||
)
|
||||
await resp.prepare(request)
|
||||
await resp.write(b"partial-")
|
||||
# Force-close without write_eof so the proxy's
|
||||
# iter_any() raises mid-stream.
|
||||
resp.force_close()
|
||||
return resp
|
||||
async def handle_conn(
|
||||
reader: asyncio.StreamReader,
|
||||
writer: asyncio.StreamWriter,
|
||||
) -> None:
|
||||
try:
|
||||
# Read and discard the request until the blank
|
||||
# line — we don't care what the proxy sends.
|
||||
while True:
|
||||
line = await reader.readline()
|
||||
if not line or line == b"\r\n":
|
||||
break
|
||||
# Chunked response with one partial chunk.
|
||||
writer.write(
|
||||
b"HTTP/1.1 200 OK\r\n"
|
||||
b"Content-Type: application/octet-stream\r\n"
|
||||
b"Transfer-Encoding: chunked\r\n"
|
||||
b"Connection: close\r\n"
|
||||
b"\r\n"
|
||||
# One chunk, size 8, content "partial-".
|
||||
b"8\r\n"
|
||||
b"partial-\r\n"
|
||||
# Deliberately DO NOT send the terminating
|
||||
# "0\r\n\r\n" — this is the mid-stream
|
||||
# truncation we're testing.
|
||||
)
|
||||
await writer.drain()
|
||||
finally:
|
||||
# Hard-close the socket so the proxy's
|
||||
# iter_any() sees an abrupt end-of-stream.
|
||||
try:
|
||||
writer.transport.abort()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
app = web.Application()
|
||||
app.router.add_route("*", "/{tail:.*}", handler)
|
||||
self._runner = web.AppRunner(app)
|
||||
await self._runner.setup()
|
||||
site = web.TCPSite(self._runner, "127.0.0.1", 0)
|
||||
await site.start()
|
||||
server = site._server
|
||||
assert server is not None
|
||||
sockets = getattr(server, "sockets", None)
|
||||
self._server = await asyncio.start_server(handle_conn, "127.0.0.1", 0)
|
||||
sockets = self._server.sockets
|
||||
assert sockets is not None
|
||||
self.port = sockets[0].getsockname()[1]
|
||||
return f"http://127.0.0.1:{self.port}"
|
||||
|
||||
async def stop(self) -> None:
|
||||
if self._runner is not None:
|
||||
await self._runner.cleanup()
|
||||
self._runner = None
|
||||
if self._server is not None:
|
||||
self._server.close()
|
||||
await self._server.wait_closed()
|
||||
self._server = None
|
||||
|
||||
upstream = _TruncatingUpstream()
|
||||
upstream_url = await upstream.start()
|
||||
proxy = OpenRouterCompatProxy(target_base_url=upstream_url)
|
||||
proxy = OpenRouterCompatProxy(target_base_url=upstream_url, request_timeout=5.0)
|
||||
await proxy.start()
|
||||
try:
|
||||
async with aiohttp.ClientSession() as client:
|
||||
@@ -653,9 +668,9 @@ async def test_proxy_does_not_signal_clean_eof_on_mid_stream_error():
|
||||
) as resp:
|
||||
# The client should see either an error raising
|
||||
# here or a truncated body followed by a
|
||||
# transport-level failure on read — both are
|
||||
# acceptable because both surface the truncation
|
||||
# instead of silently reporting success.
|
||||
# transport-level failure on read — both surface
|
||||
# the truncation instead of silently reporting
|
||||
# success.
|
||||
await resp.read()
|
||||
except (
|
||||
aiohttp.ClientPayloadError,
|
||||
|
||||
Reference in New Issue
Block a user