mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
feat(copilot): improve SDK loading time (#12280)
## Summary - Skip CLI version check at worker init (saves ~300ms/request) - Pre-warm bundled CLI binary at startup to warm OS page caches (~500ms saved on first request per worker) - Parallelize E2B setup, system prompt fetch, and transcript download with `asyncio.gather()` (saves ~200-500ms) - Enable Langfuse prompt caching with configurable TTL (default 300s) ## Test plan - [ ] `poetry run pytest backend/copilot/sdk/service_test.py -s -vvv` - [ ] Manual: send copilot messages via SDK path, verify resume still works on multi-turn - [ ] Check executor logs for "CLI pre-warm done" messages
This commit is contained in:
@@ -62,6 +62,10 @@ class ChatConfig(BaseSettings):
|
||||
default="CoPilot Prompt",
|
||||
description="Name of the prompt in Langfuse to fetch",
|
||||
)
|
||||
langfuse_prompt_cache_ttl: int = Field(
|
||||
default=300,
|
||||
description="Cache TTL in seconds for Langfuse prompt (0 to disable caching)",
|
||||
)
|
||||
|
||||
# Claude Agent SDK Configuration
|
||||
use_claude_agent_sdk: bool = Field(
|
||||
|
||||
@@ -6,6 +6,8 @@ in a thread-local context, following the graph executor pattern.
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import subprocess
|
||||
import threading
|
||||
import time
|
||||
|
||||
@@ -108,8 +110,41 @@ class CoPilotProcessor:
|
||||
)
|
||||
self.execution_thread.start()
|
||||
|
||||
# Skip the SDK's per-request CLI version check — the bundled CLI is
|
||||
# already version-matched to the SDK package.
|
||||
os.environ.setdefault("CLAUDE_AGENT_SDK_SKIP_VERSION_CHECK", "1")
|
||||
|
||||
# Pre-warm the bundled CLI binary so the OS page-caches the ~185 MB
|
||||
# executable. First spawn pays ~1.2 s; subsequent spawns ~0.65 s.
|
||||
self._prewarm_cli()
|
||||
|
||||
logger.info(f"[CoPilotExecutor] Worker {self.tid} started")
|
||||
|
||||
def _prewarm_cli(self) -> None:
|
||||
"""Run the bundled CLI binary once to warm OS page caches."""
|
||||
try:
|
||||
from claude_agent_sdk._internal.transport.subprocess_cli import (
|
||||
SubprocessCLITransport,
|
||||
)
|
||||
|
||||
cli_path = SubprocessCLITransport._find_bundled_cli(None) # type: ignore[arg-type]
|
||||
if cli_path:
|
||||
result = subprocess.run(
|
||||
[cli_path, "-v"],
|
||||
capture_output=True,
|
||||
timeout=10,
|
||||
)
|
||||
if result.returncode == 0:
|
||||
logger.info(f"[CoPilotExecutor] CLI pre-warm done: {cli_path}")
|
||||
else:
|
||||
logger.warning(
|
||||
"[CoPilotExecutor] CLI pre-warm failed (rc=%d): %s",
|
||||
result.returncode, # type: ignore[reportCallIssue]
|
||||
cli_path,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.debug(f"[CoPilotExecutor] CLI pre-warm skipped: {e}")
|
||||
|
||||
def cleanup(self):
|
||||
"""Clean up event-loop-bound resources before the loop is destroyed.
|
||||
|
||||
|
||||
@@ -683,43 +683,96 @@ async def stream_chat_completion_sdk(
|
||||
code="sdk_cwd_error",
|
||||
)
|
||||
return
|
||||
# Set up E2B sandbox for persistent cloud execution when configured.
|
||||
# When active, MCP file tools route directly to the sandbox filesystem
|
||||
# so bash_exec and file tools share the same /home/user directory.
|
||||
if config.use_e2b_sandbox and not config.e2b_api_key:
|
||||
logger.warning(
|
||||
"[E2B] [%s] E2B sandbox enabled but no API key configured "
|
||||
"(CHAT_E2B_API_KEY / E2B_API_KEY) — falling back to bubblewrap",
|
||||
session_id[:12],
|
||||
)
|
||||
if config.use_e2b_sandbox and config.e2b_api_key:
|
||||
try:
|
||||
e2b_sandbox = await get_or_create_sandbox(
|
||||
session_id,
|
||||
api_key=config.e2b_api_key,
|
||||
template=config.e2b_sandbox_template,
|
||||
timeout=config.e2b_sandbox_timeout,
|
||||
)
|
||||
except Exception as e2b_err:
|
||||
logger.error(
|
||||
"[E2B] [%s] Setup failed: %s",
|
||||
# --- Run independent async I/O operations in parallel ---
|
||||
# E2B sandbox setup, system prompt build (Langfuse + DB), and transcript
|
||||
# download are independent network calls. Running them concurrently
|
||||
# saves ~200-500ms compared to sequential execution.
|
||||
|
||||
async def _setup_e2b():
|
||||
"""Set up E2B sandbox if configured, return sandbox or None."""
|
||||
if config.use_e2b_sandbox and not config.e2b_api_key:
|
||||
logger.warning(
|
||||
"[E2B] [%s] E2B sandbox enabled but no API key configured "
|
||||
"(CHAT_E2B_API_KEY / E2B_API_KEY) — falling back to bubblewrap",
|
||||
session_id[:12],
|
||||
e2b_err,
|
||||
exc_info=True,
|
||||
)
|
||||
e2b_sandbox = None
|
||||
return None
|
||||
if config.use_e2b_sandbox and config.e2b_api_key:
|
||||
try:
|
||||
return await get_or_create_sandbox(
|
||||
session_id,
|
||||
api_key=config.e2b_api_key,
|
||||
template=config.e2b_sandbox_template,
|
||||
timeout=config.e2b_sandbox_timeout,
|
||||
)
|
||||
except Exception as e2b_err:
|
||||
logger.error(
|
||||
"[E2B] [%s] Setup failed: %s",
|
||||
session_id[:12],
|
||||
e2b_err,
|
||||
exc_info=True,
|
||||
)
|
||||
return None
|
||||
|
||||
async def _fetch_transcript():
|
||||
"""Download transcript for --resume if applicable."""
|
||||
if not (
|
||||
config.claude_agent_use_resume and user_id and len(session.messages) > 1
|
||||
):
|
||||
return None
|
||||
try:
|
||||
return await download_transcript(user_id, session_id)
|
||||
except Exception as transcript_err:
|
||||
logger.warning(
|
||||
"[SDK] [%s] Transcript download failed, continuing without "
|
||||
"--resume: %s",
|
||||
session_id[:12],
|
||||
transcript_err,
|
||||
)
|
||||
return None
|
||||
|
||||
e2b_sandbox, (base_system_prompt, _), dl = await asyncio.gather(
|
||||
_setup_e2b(),
|
||||
_build_system_prompt(user_id, has_conversation_history=has_history),
|
||||
_fetch_transcript(),
|
||||
)
|
||||
|
||||
use_e2b = e2b_sandbox is not None
|
||||
|
||||
system_prompt, _ = await _build_system_prompt(
|
||||
user_id, has_conversation_history=has_history
|
||||
)
|
||||
system_prompt += (
|
||||
system_prompt = base_system_prompt + (
|
||||
_E2B_TOOL_SUPPLEMENT
|
||||
if use_e2b
|
||||
else _LOCAL_TOOL_SUPPLEMENT.format(cwd=sdk_cwd)
|
||||
)
|
||||
|
||||
# Process transcript download result
|
||||
transcript_msg_count = 0
|
||||
if dl:
|
||||
is_valid = validate_transcript(dl.content)
|
||||
if is_valid:
|
||||
logger.info(
|
||||
f"[SDK] Transcript available for session {session_id}: "
|
||||
f"{len(dl.content)}B, msg_count={dl.message_count}"
|
||||
)
|
||||
resume_file = write_transcript_to_tempfile(
|
||||
dl.content, session_id, sdk_cwd
|
||||
)
|
||||
if resume_file:
|
||||
use_resume = True
|
||||
transcript_msg_count = dl.message_count
|
||||
logger.debug(
|
||||
f"[SDK] Using --resume ({len(dl.content)}B, "
|
||||
f"msg_count={transcript_msg_count})"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"[SDK] Transcript downloaded but invalid for {session_id}"
|
||||
)
|
||||
elif config.claude_agent_use_resume and user_id and len(session.messages) > 1:
|
||||
logger.warning(
|
||||
f"[SDK] No transcript available for {session_id} "
|
||||
f"({len(session.messages)} messages in session)"
|
||||
)
|
||||
|
||||
yield StreamStart(messageId=message_id, sessionId=session_id)
|
||||
|
||||
set_execution_context(user_id, session, sandbox=e2b_sandbox, sdk_cwd=sdk_cwd)
|
||||
@@ -767,37 +820,6 @@ async def stream_chat_completion_sdk(
|
||||
on_compact=compaction.on_compact,
|
||||
)
|
||||
|
||||
# --- Resume strategy: download transcript from bucket ---
|
||||
transcript_msg_count = 0 # watermark: session.messages length at upload
|
||||
|
||||
if config.claude_agent_use_resume and user_id and len(session.messages) > 1:
|
||||
dl = await download_transcript(user_id, session_id)
|
||||
is_valid = bool(dl and validate_transcript(dl.content))
|
||||
if dl and is_valid:
|
||||
logger.info(
|
||||
f"[SDK] Transcript available for session {session_id}: "
|
||||
f"{len(dl.content)}B, msg_count={dl.message_count}"
|
||||
)
|
||||
resume_file = write_transcript_to_tempfile(
|
||||
dl.content, session_id, sdk_cwd
|
||||
)
|
||||
if resume_file:
|
||||
use_resume = True
|
||||
transcript_msg_count = dl.message_count
|
||||
logger.debug(
|
||||
f"[SDK] Using --resume ({len(dl.content)}B, "
|
||||
f"msg_count={transcript_msg_count})"
|
||||
)
|
||||
elif dl:
|
||||
logger.warning(
|
||||
f"[SDK] Transcript downloaded but invalid for {session_id}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"[SDK] No transcript available for {session_id} "
|
||||
f"({len(session.messages)} messages in session)"
|
||||
)
|
||||
|
||||
allowed = get_copilot_tool_names(use_e2b=use_e2b)
|
||||
disallowed = get_sdk_disallowed_tools(use_e2b=use_e2b)
|
||||
sdk_options_kwargs: dict[str, Any] = {
|
||||
|
||||
@@ -173,7 +173,6 @@ async def _get_system_prompt_template(context: str) -> str:
|
||||
"""
|
||||
if _is_langfuse_configured():
|
||||
try:
|
||||
# cache_ttl_seconds=0 disables SDK caching to always get the latest prompt
|
||||
# Use asyncio.to_thread to avoid blocking the event loop
|
||||
# In non-production environments, fetch the latest prompt version
|
||||
# instead of the production-labeled version for easier testing
|
||||
@@ -186,7 +185,7 @@ async def _get_system_prompt_template(context: str) -> str:
|
||||
langfuse.get_prompt,
|
||||
config.langfuse_prompt_name,
|
||||
label=label,
|
||||
cache_ttl_seconds=0,
|
||||
cache_ttl_seconds=config.langfuse_prompt_cache_ttl,
|
||||
)
|
||||
return prompt.compile(users_information=context)
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user