fix(backend): address PR review - wire timeout setting, use monotonic clock, cap poll errors

This commit is contained in:
Zamil Majdy
2026-02-25 14:53:10 +07:00
parent d78e0ee122
commit 0bc098acb1
2 changed files with 60 additions and 12 deletions

View File

@@ -7,6 +7,7 @@ then poll GET /api/jobs/{job_id} every few seconds until the result is ready.
import asyncio
import logging
import time
from typing import Any
import httpx
@@ -32,6 +33,7 @@ _dummy_mode_warned = False
POLL_INTERVAL_SECONDS = 10.0
MAX_POLL_TIME_SECONDS = 1800.0 # 30 minutes
MAX_CONSECUTIVE_POLL_ERRORS = 5
def _create_error_response(
@@ -81,9 +83,6 @@ def _classify_request_error(e: httpx.RequestError) -> tuple[str, str]:
_client: httpx.AsyncClient | None = None
_settings: Settings | None = None
# Short per-request timeout (the overall wait is handled by the polling loop)
_REQUEST_TIMEOUT = httpx.Timeout(30.0)
def _get_settings() -> Settings:
"""Get or create settings singleton."""
@@ -127,9 +126,11 @@ def _get_client() -> httpx.AsyncClient:
"""Get or create the HTTP client for the external service."""
global _client
if _client is None:
settings = _get_settings()
timeout = httpx.Timeout(float(settings.config.agentgenerator_timeout))
_client = httpx.AsyncClient(
base_url=_get_base_url(),
timeout=_REQUEST_TIMEOUT,
timeout=timeout,
)
return _client
@@ -177,10 +178,10 @@ async def _submit_and_poll(
logger.info(f"Agent Generator job submitted: {job_id} via {endpoint}")
# 2. Poll ------------------------------------------------------------------
elapsed = 0.0
while elapsed < MAX_POLL_TIME_SECONDS:
start = time.monotonic()
consecutive_errors = 0
while (time.monotonic() - start) < MAX_POLL_TIME_SECONDS:
await asyncio.sleep(POLL_INTERVAL_SECONDS)
elapsed += POLL_INTERVAL_SECONDS
try:
poll_resp = await client.get(f"/api/jobs/{job_id}")
@@ -194,10 +195,21 @@ async def _submit_and_poll(
logger.error(f"Poll error for job {job_id}: {error_msg}")
return _create_error_response(error_msg, error_type)
except httpx.RequestError as e:
# Transient network error during polling — retry on next iteration
logger.warning(f"Transient poll error for job {job_id}: {e}")
consecutive_errors += 1
logger.warning(
f"Transient poll error for job {job_id} "
f"({consecutive_errors}/{MAX_CONSECUTIVE_POLL_ERRORS}): {e}"
)
if consecutive_errors >= MAX_CONSECUTIVE_POLL_ERRORS:
error_msg = (
f"Giving up on job {job_id} after "
f"{MAX_CONSECUTIVE_POLL_ERRORS} consecutive poll errors: {e}"
)
logger.error(error_msg)
return _create_error_response(error_msg, "poll_error")
continue
consecutive_errors = 0
poll_data = poll_resp.json()
status = poll_data.get("status")
@@ -225,8 +237,13 @@ async def decompose_goal_external(
) -> dict[str, Any] | None:
"""Call the external service to decompose a goal.
Returns:
Dict with the decomposition result or error dict.
Returns one of the following dicts (keyed by ``"type"``):
* ``{"type": "instructions", "steps": [...]}``
* ``{"type": "clarifying_questions", "questions": [...]}``
* ``{"type": "unachievable_goal", "reason": ..., "suggested_goal": ...}``
* ``{"type": "vague_goal", "suggested_goal": ...}``
* ``{"type": "error", "error": ..., "error_type": ...}``
"""
if _is_dummy_mode():
return await decompose_goal_dummy(description, context, library_agents)

View File

@@ -230,17 +230,48 @@ class TestSubmitAndPoll:
mock_client.post.return_value = submit_resp
mock_client.get.return_value = running_resp
# Simulate time passing: first call returns 0.0 (start), then jumps past limit
monotonic_values = iter([0.0, 0.0, 100.0])
with (
patch.object(service, "_get_client", return_value=mock_client),
patch.object(service, "MAX_POLL_TIME_SECONDS", 0.05),
patch.object(service, "MAX_POLL_TIME_SECONDS", 50.0),
patch.object(service, "POLL_INTERVAL_SECONDS", 0.01),
patch("asyncio.sleep", new_callable=AsyncMock),
patch("backend.copilot.tools.agent_generator.service.time") as mock_time,
):
mock_time.monotonic.side_effect = monotonic_values
result = await service._submit_and_poll("/api/test", {})
assert result["type"] == "error"
assert result["error_type"] == "timeout"
@pytest.mark.asyncio
async def test_poll_gives_up_after_consecutive_transient_errors(self):
"""Test that polling gives up after MAX_CONSECUTIVE_POLL_ERRORS."""
submit_resp = MagicMock()
submit_resp.json.return_value = {"job_id": "job-flaky"}
submit_resp.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.post.return_value = submit_resp
mock_client.get.side_effect = httpx.RequestError("network down")
# Ensure monotonic always returns 0 so timeout doesn't kick in
with (
patch.object(service, "_get_client", return_value=mock_client),
patch.object(service, "MAX_POLL_TIME_SECONDS", 9999.0),
patch.object(service, "POLL_INTERVAL_SECONDS", 0.01),
patch("asyncio.sleep", new_callable=AsyncMock),
patch("backend.copilot.tools.agent_generator.service.time") as mock_time,
):
mock_time.monotonic.return_value = 0.0
result = await service._submit_and_poll("/api/test", {})
assert result["type"] == "error"
assert result["error_type"] == "poll_error"
assert mock_client.get.call_count == service.MAX_CONSECUTIVE_POLL_ERRORS
class TestDecomposeGoalExternal:
"""Test decompose_goal_external function."""