mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
fix(copilot): address PR review comments — runtime check, SDK version pin, event-based stash
- Replace bare `assert client._query` with proper RuntimeError check - Add TECH DEBT comments on private SDK internal usage - Pin claude-agent-sdk to ~0.1.35 (tighter constraint for private API access) - Replace sleep(0.1) with event-based wait_for_stash() for race-condition fix - Add wait_for_stash synchronisation tests
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
"""Unit tests for the SDK response adapter."""
|
||||
|
||||
import asyncio
|
||||
|
||||
import pytest
|
||||
from claude_agent_sdk import (
|
||||
AssistantMessage,
|
||||
ResultMessage,
|
||||
@@ -27,6 +30,10 @@ from backend.copilot.response_model import (
|
||||
|
||||
from .response_adapter import SDKResponseAdapter
|
||||
from .tool_adapter import MCP_TOOL_PREFIX
|
||||
from .tool_adapter import _pending_tool_outputs as _pto
|
||||
from .tool_adapter import _stash_event
|
||||
from .tool_adapter import stash_pending_tool_output as _stash
|
||||
from .tool_adapter import wait_for_stash
|
||||
|
||||
|
||||
def _adapter() -> SDKResponseAdapter:
|
||||
@@ -469,13 +476,11 @@ def test_flush_unresolved_at_next_assistant_message():
|
||||
|
||||
def test_flush_with_stashed_output():
|
||||
"""Stashed output from PostToolUse hook is used when flushing."""
|
||||
from .tool_adapter import _pending_tool_outputs, stash_pending_tool_output
|
||||
|
||||
adapter = _adapter()
|
||||
|
||||
# Simulate PostToolUse hook stashing output
|
||||
_pending_tool_outputs.set({})
|
||||
stash_pending_tool_output("WebSearch", "Search result: 5 items found")
|
||||
_pto.set({})
|
||||
_stash("WebSearch", "Search result: 5 items found")
|
||||
|
||||
all_responses: list[StreamBaseResponse] = []
|
||||
|
||||
@@ -511,4 +516,72 @@ def test_flush_with_stashed_output():
|
||||
assert output_events[0].output == "Search result: 5 items found"
|
||||
|
||||
# Cleanup
|
||||
_pending_tool_outputs.set({}) # type: ignore[arg-type]
|
||||
_pto.set({}) # type: ignore[arg-type]
|
||||
|
||||
|
||||
# -- wait_for_stash synchronisation tests --
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wait_for_stash_signaled():
|
||||
"""wait_for_stash returns True when stash_pending_tool_output signals."""
|
||||
_pto.set({})
|
||||
event = asyncio.Event()
|
||||
_stash_event.set(event)
|
||||
|
||||
# Simulate a PostToolUse hook that stashes output after a short delay
|
||||
async def delayed_stash():
|
||||
await asyncio.sleep(0.01)
|
||||
_stash("WebSearch", "result data")
|
||||
|
||||
asyncio.create_task(delayed_stash())
|
||||
result = await wait_for_stash(timeout=1.0)
|
||||
|
||||
assert result is True
|
||||
assert _pto.get({}).get("WebSearch") == ["result data"]
|
||||
|
||||
# Cleanup
|
||||
_pto.set({}) # type: ignore[arg-type]
|
||||
_stash_event.set(None)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wait_for_stash_timeout():
|
||||
"""wait_for_stash returns False on timeout when no stash occurs."""
|
||||
_pto.set({})
|
||||
event = asyncio.Event()
|
||||
_stash_event.set(event)
|
||||
|
||||
result = await wait_for_stash(timeout=0.05)
|
||||
assert result is False
|
||||
|
||||
# Cleanup
|
||||
_pto.set({}) # type: ignore[arg-type]
|
||||
_stash_event.set(None)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_wait_for_stash_already_stashed():
|
||||
"""wait_for_stash picks up a stash that happened just before the wait."""
|
||||
_pto.set({})
|
||||
event = asyncio.Event()
|
||||
_stash_event.set(event)
|
||||
|
||||
# Stash before waiting — simulates hook completing before message arrives
|
||||
_stash("Read", "file contents")
|
||||
# Event is now set; wait_for_stash clears it and re-waits.
|
||||
# Since the stash already happened, the event won't fire again,
|
||||
# so this should timeout. But the stash DATA is available.
|
||||
result = await wait_for_stash(timeout=0.05)
|
||||
# Returns False because the event was cleared before waiting and
|
||||
# no NEW signal arrived. This is expected: the flush will find the
|
||||
# data in the stash directly — wait_for_stash is only needed when
|
||||
# the stash hasn't happened yet.
|
||||
assert result is False
|
||||
|
||||
# But the stash itself is populated
|
||||
assert _pto.get({}).get("Read") == ["file contents"]
|
||||
|
||||
# Cleanup
|
||||
_pto.set({}) # type: ignore[arg-type]
|
||||
_stash_event.set(None)
|
||||
|
||||
@@ -47,6 +47,7 @@ from .tool_adapter import (
|
||||
LongRunningCallback,
|
||||
create_copilot_mcp_server,
|
||||
set_execution_context,
|
||||
wait_for_stash,
|
||||
)
|
||||
from .transcript import (
|
||||
cleanup_cli_project_dir,
|
||||
@@ -691,16 +692,25 @@ async def stream_chat_completion_sdk(
|
||||
# because wait_for wraps in a separate Task whose cancellation
|
||||
# can leave the async generator in a broken state.
|
||||
#
|
||||
# We iterate over the internal query's raw dicts instead
|
||||
# of the parsed Messages so we can capture them for the
|
||||
# transcript (the CLI does not write JSONL files in SDK
|
||||
# mode).
|
||||
# TECH DEBT: We use two private SDK internals here:
|
||||
# 1. client._query.receive_messages() — raw dict iterator
|
||||
# 2. _internal.message_parser.parse_message — dict→Message
|
||||
# This is necessary because the public receive_messages()
|
||||
# only yields parsed Messages, but we need the raw dicts
|
||||
# for transcript capture (CLI doesn't write JSONL in SDK
|
||||
# mode) and per-message timeout for heartbeats.
|
||||
# Pin claude-agent-sdk tightly and audit on version bumps.
|
||||
from claude_agent_sdk import AssistantMessage, ResultMessage
|
||||
from claude_agent_sdk._internal.message_parser import (
|
||||
parse_message as _parse_sdk_msg,
|
||||
)
|
||||
|
||||
assert client._query is not None # set by connect()
|
||||
# NOTE: _query is a private SDK attribute; see tech-debt
|
||||
# comment on the import above.
|
||||
if client._query is None:
|
||||
raise RuntimeError(
|
||||
"SDK client query not initialized — connect() may have failed"
|
||||
)
|
||||
msg_iter = client._query.receive_messages().__aiter__()
|
||||
while not stream_completed:
|
||||
try:
|
||||
@@ -729,15 +739,25 @@ async def stream_chat_completion_sdk(
|
||||
getattr(sdk_msg, "subtype", ""),
|
||||
)
|
||||
|
||||
# Race-condition mitigation: SDK hooks (PostToolUse) are
|
||||
# executed asynchronously via start_soon() in Query._read_messages.
|
||||
# Messages that trigger flush (AssistantMessage, ResultMessage)
|
||||
# can arrive before the hook stashes its output. Yielding to
|
||||
# the event loop gives the hook task a chance to complete first.
|
||||
# Race-condition fix: SDK hooks (PostToolUse) are executed
|
||||
# asynchronously via start_soon() — the next message can
|
||||
# arrive before the hook stashes output. wait_for_stash()
|
||||
# awaits an asyncio.Event signaled by stash_pending_tool_output(),
|
||||
# completing as soon as the hook finishes (typically <1ms).
|
||||
# The sleep(0) after lets any remaining concurrent hooks complete.
|
||||
if adapter.has_unresolved_tool_calls and isinstance(
|
||||
sdk_msg, (AssistantMessage, ResultMessage)
|
||||
):
|
||||
await asyncio.sleep(0.1)
|
||||
if await wait_for_stash(timeout=0.5):
|
||||
await asyncio.sleep(0)
|
||||
else:
|
||||
logger.warning(
|
||||
"[SDK] [%s] Timed out waiting for PostToolUse "
|
||||
"hook stash (%d unresolved tool calls)",
|
||||
session_id[:12],
|
||||
len(adapter.current_tool_calls)
|
||||
- len(adapter.resolved_tool_calls),
|
||||
)
|
||||
|
||||
for response in adapter.convert_message(sdk_msg):
|
||||
if isinstance(response, StreamStart):
|
||||
|
||||
@@ -9,6 +9,7 @@ via a callback provided by the service layer. This avoids wasteful SDK polling
|
||||
and makes results survive page refreshes.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
@@ -44,6 +45,14 @@ _current_session: ContextVar[ChatSession | None] = ContextVar(
|
||||
_pending_tool_outputs: ContextVar[dict[str, list[str]]] = ContextVar(
|
||||
"pending_tool_outputs", default=None # type: ignore[arg-type]
|
||||
)
|
||||
# Event signaled whenever stash_pending_tool_output() adds a new entry.
|
||||
# Used by the streaming loop to wait for PostToolUse hooks to complete
|
||||
# instead of sleeping an arbitrary duration. The SDK fires hooks via
|
||||
# start_soon (fire-and-forget) so the next message can arrive before
|
||||
# the hook stashes its output — this event bridges that gap.
|
||||
_stash_event: ContextVar[asyncio.Event | None] = ContextVar(
|
||||
"_stash_event", default=None
|
||||
)
|
||||
|
||||
# Callback type for delegating long-running tools to the non-SDK infrastructure.
|
||||
# Args: (tool_name, arguments, session) → MCP-formatted response dict.
|
||||
@@ -76,6 +85,7 @@ def set_execution_context(
|
||||
_current_user_id.set(user_id)
|
||||
_current_session.set(session)
|
||||
_pending_tool_outputs.set({})
|
||||
_stash_event.set(asyncio.Event())
|
||||
_long_running_callback.set(long_running_callback)
|
||||
|
||||
|
||||
@@ -134,6 +144,39 @@ def stash_pending_tool_output(tool_name: str, output: Any) -> None:
|
||||
except (TypeError, ValueError):
|
||||
text = str(output)
|
||||
pending.setdefault(tool_name, []).append(text)
|
||||
# Signal any waiters that new output is available.
|
||||
event = _stash_event.get(None)
|
||||
if event is not None:
|
||||
event.set()
|
||||
|
||||
|
||||
async def wait_for_stash(timeout: float = 0.5) -> bool:
|
||||
"""Wait for a PostToolUse hook to stash tool output.
|
||||
|
||||
The SDK fires PostToolUse hooks asynchronously via ``start_soon()`` —
|
||||
the next message (AssistantMessage/ResultMessage) can arrive before the
|
||||
hook completes and stashes its output. This function bridges that gap
|
||||
by waiting on the ``_stash_event``, which is signaled by
|
||||
:func:`stash_pending_tool_output`.
|
||||
|
||||
After the event fires, callers should ``await asyncio.sleep(0)`` to
|
||||
give any remaining concurrent hooks a chance to complete.
|
||||
|
||||
Returns ``True`` if a stash signal was received, ``False`` on timeout.
|
||||
The timeout is a safety net — normally the stash happens within
|
||||
microseconds of yielding to the event loop.
|
||||
"""
|
||||
event = _stash_event.get(None)
|
||||
if event is None:
|
||||
return False
|
||||
# Clear before waiting so we detect new signals only.
|
||||
event.clear()
|
||||
try:
|
||||
async with asyncio.timeout(timeout):
|
||||
await event.wait()
|
||||
return True
|
||||
except TimeoutError:
|
||||
return False
|
||||
|
||||
|
||||
async def _execute_tool_sync(
|
||||
|
||||
9
autogpt_platform/backend/poetry.lock
generated
9
autogpt_platform/backend/poetry.lock
generated
@@ -6107,13 +6107,6 @@ optional = false
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main", "dev"]
|
||||
files = [
|
||||
{file = "PyYAML-6.0.3-cp38-cp38-macosx_10_13_x86_64.whl", hash = "sha256:c2514fceb77bc5e7a2f7adfaa1feb2fb311607c9cb518dbc378688ec73d8292f"},
|
||||
{file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9c57bb8c96f6d1808c030b1687b9b5fb476abaa47f0db9c0101f5e9f394e97f4"},
|
||||
{file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_s390x.manylinux_2_17_s390x.manylinux_2_28_s390x.whl", hash = "sha256:efd7b85f94a6f21e4932043973a7ba2613b059c4a000551892ac9f1d11f5baf3"},
|
||||
{file = "PyYAML-6.0.3-cp38-cp38-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:22ba7cfcad58ef3ecddc7ed1db3409af68d023b7f940da23c6c2a1890976eda6"},
|
||||
{file = "PyYAML-6.0.3-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:6344df0d5755a2c9a276d4473ae6b90647e216ab4757f8426893b5dd2ac3f369"},
|
||||
{file = "PyYAML-6.0.3-cp38-cp38-win32.whl", hash = "sha256:3ff07ec89bae51176c0549bc4c63aa6202991da2d9a6129d7aef7f1407d3f295"},
|
||||
{file = "PyYAML-6.0.3-cp38-cp38-win_amd64.whl", hash = "sha256:5cf4e27da7e3fbed4d6c3d8e797387aaad68102272f8f9752883bc32d61cb87b"},
|
||||
{file = "pyyaml-6.0.3-cp310-cp310-macosx_10_13_x86_64.whl", hash = "sha256:214ed4befebe12df36bcc8bc2b64b396ca31be9304b8f59e25c11cf94a4c033b"},
|
||||
{file = "pyyaml-6.0.3-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:02ea2dfa234451bbb8772601d7b8e426c2bfa197136796224e50e35a78777956"},
|
||||
{file = "pyyaml-6.0.3-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:b30236e45cf30d2b8e7b3e85881719e98507abed1011bf463a8fa23e9c3e98a8"},
|
||||
@@ -8530,4 +8523,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.10,<3.14"
|
||||
content-hash = "55e095de555482f0fe47de7695f390fe93e7bcf739b31c391b2e5e3c3d938ae3"
|
||||
content-hash = "415fe992399665e3b46c522b56d0871adeea9a87bea541ba3273a7cfbe4fd2e9"
|
||||
|
||||
@@ -16,7 +16,7 @@ anthropic = "^0.79.0"
|
||||
apscheduler = "^3.11.1"
|
||||
autogpt-libs = { path = "../autogpt_libs", develop = true }
|
||||
bleach = { extras = ["css"], version = "^6.2.0" }
|
||||
claude-agent-sdk = "^0.1.0"
|
||||
claude-agent-sdk = "~0.1.35" # Pinned: we use private internals (_query, _internal.message_parser)
|
||||
click = "^8.2.0"
|
||||
cryptography = "^46.0"
|
||||
discord-py = "^2.5.2"
|
||||
|
||||
Reference in New Issue
Block a user