Compare commits

..

5 Commits

Author SHA1 Message Date
Reinier van der Leer
7cdbbdd65e fix bakery (2) 2026-02-12 13:33:44 +01:00
Reinier van der Leer
6191ac0b1e fix bakery 2026-02-12 13:25:36 +01:00
Reinier van der Leer
b51e87bc53 fix e2e dependencies cache 2026-02-12 13:23:26 +01:00
Reinier van der Leer
71f764f3d0 Inject caching config into docker compose for e2e test 2026-02-12 12:57:28 +01:00
Zamil Majdy
a78145505b fix(copilot): merge split assistant messages to prevent Anthropic API errors (#12062)
## Summary
- When the copilot model responds with both text content AND a
long-running tool call (e.g., `create_agent`), the streaming code
created two separate consecutive assistant messages — one with text, one
with `tool_calls`. This caused Anthropic's API to reject with
`"unexpected tool_use_id found in tool_result blocks"` because the
`tool_result` couldn't find a matching `tool_use` in the immediately
preceding assistant message.
- Added a defensive merge of consecutive assistant messages in
`to_openai_messages()` (fixes existing corrupt sessions too)
- Fixed `_yield_tool_call` to add tool_calls to the existing
current-turn assistant message instead of creating a new one
- Changed `accumulated_tool_calls` assignment to use `extend` to prevent
overwriting tool_calls added by long-running tool flow

## Test plan
- [x] All 23 chat feature tests pass (`backend/api/features/chat/`)
- [x] All 44 prompt utility tests pass (`backend/util/prompt_test.py`)
- [x] All pre-commit hooks pass (ruff, isort, black, pyright)
- [ ] Manual test: create an agent via copilot, then ask a follow-up
question — should no longer get 400 error

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

Fixes a critical bug where long-running tool calls (like `create_agent`)
caused Anthropic API 400 errors due to split assistant messages. The fix
ensures tool calls are added to the existing assistant message instead
of creating new ones, and adds a defensive merge function to repair any
existing corrupt sessions.

**Key changes:**
- Added `_merge_consecutive_assistant_messages()` to defensively merge
split assistant messages in `to_openai_messages()`
- Modified `_yield_tool_call()` to append tool calls to the current-turn
assistant message instead of creating a new one
- Changed `accumulated_tool_calls` from assignment to `extend` to
preserve tool calls already added by long-running tool flow

**Impact:** Resolves the issue where users received 400 errors after
creating agents via copilot and asking follow-up questions.
</details>


<details><summary><h3>Confidence Score: 4/5</h3></summary>

- Safe to merge with minor verification recommended
- The changes are well-targeted and solve a real API compatibility
issue. The logic is sound: searching backwards for the current assistant
message is correct, and using `extend` instead of assignment prevents
overwriting. The defensive merge in `to_openai_messages()` also fixes
existing corrupt sessions. All existing tests pass according to the PR
description.
- No files require special attention - changes are localized and
defensive
</details>


<details><summary><h3>Sequence Diagram</h3></summary>

```mermaid
sequenceDiagram
    participant User
    participant StreamAPI as stream_chat_completion
    participant Chunks as _stream_chat_chunks
    participant ToolCall as _yield_tool_call
    participant Session as ChatSession
    
    User->>StreamAPI: Send message
    StreamAPI->>Chunks: Stream chat chunks
    
    alt Text + Long-running tool call
        Chunks->>StreamAPI: Text delta (content)
        StreamAPI->>Session: Append assistant message with content
        Chunks->>ToolCall: Tool call detected
        
        Note over ToolCall: OLD: Created new assistant message<br/>NEW: Appends to existing assistant
        
        ToolCall->>Session: Search backwards for current assistant
        ToolCall->>Session: Append tool_call to existing message
        ToolCall->>Session: Add pending tool result
    end
    
    StreamAPI->>StreamAPI: Merge accumulated_tool_calls
    Note over StreamAPI: Use extend (not assign)<br/>to preserve existing tool_calls
    
    StreamAPI->>Session: to_openai_messages()
    Session->>Session: _merge_consecutive_assistant_messages()
    Note over Session: Defensive: Merges any split<br/>assistant messages
    Session-->>StreamAPI: Merged messages
    
    StreamAPI->>User: Return response
```
</details>


<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->
2026-02-12 01:52:17 +00:00
10 changed files with 562 additions and 420 deletions

View File

@@ -142,9 +142,6 @@ jobs:
e2e_test:
runs-on: big-boi
needs: setup
strategy:
fail-fast: false
steps:
- name: Checkout repository
@@ -174,29 +171,29 @@ jobs:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Cache Docker layers
uses: actions/cache@v5
with:
path: /tmp/.buildx-cache
key: ${{ runner.os }}-buildx-frontend-test-${{ hashFiles('autogpt_platform/docker-compose.yml', 'autogpt_platform/backend/Dockerfile', 'autogpt_platform/backend/pyproject.toml', 'autogpt_platform/backend/poetry.lock') }}
restore-keys: |
${{ runner.os }}-buildx-frontend-test-
driver-opts: network=host
- name: Build Docker images with cache
working-directory: autogpt_platform
run: |
pip install pyyaml
python ../.github/workflows/scripts/generate-docker-ci-compose.py \
--source docker-compose.platform.yml \
--output docker-compose.ci.yml \
--cache-from "type=gha" \
--cache-to "type=gha,mode=max" \
--backend-scope "platform-backend-${{ hashFiles('autogpt_platform/backend/Dockerfile', 'autogpt_platform/backend/poetry.lock', 'autogpt_platform/backend/backend') }}" \
--frontend-scope "platform-frontend-${{ hashFiles('autogpt_platform/frontend/Dockerfile', 'autogpt_platform/frontend/pnpm-lock.yaml', 'autogpt_platform/frontend/src') }}"
docker buildx bake --allow=fs.read=.. -f docker-compose.yml -f docker-compose.ci.yml --load
env:
NEXT_PUBLIC_PW_TEST: true
- name: Run docker compose
run: |
NEXT_PUBLIC_PW_TEST=true docker compose -f ../docker-compose.yml up -d
run: docker compose -f ../docker-compose.yml up -d --no-build
env:
DOCKER_BUILDKIT: 1
BUILDX_CACHE_FROM: type=local,src=/tmp/.buildx-cache
BUILDX_CACHE_TO: type=local,dest=/tmp/.buildx-cache-new,mode=max
- name: Move cache
run: |
rm -rf /tmp/.buildx-cache
if [ -d "/tmp/.buildx-cache-new" ]; then
mv /tmp/.buildx-cache-new /tmp/.buildx-cache
fi
NEXT_PUBLIC_PW_TEST: true
- name: Wait for services to be ready
run: |
@@ -230,14 +227,14 @@ jobs:
}
fi
- name: Restore dependencies cache
- name: Cache pnpm store
uses: actions/cache@v5
with:
path: ~/.pnpm-store
key: ${{ needs.setup.outputs.cache-key }}
# Use separate cache key for big-boi runner since it doesn't share cache with ubuntu-latest
key: big-boi-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
restore-keys: |
${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
${{ runner.os }}-pnpm-
big-boi-pnpm-
- name: Install dependencies
run: pnpm install --frozen-lockfile

View File

@@ -0,0 +1,93 @@
#!/usr/bin/env python3
"""
Generate a docker-compose.ci.yml with cache configuration for all services
that have a build key in the source compose file.
"""
import argparse
import yaml
def main():
parser = argparse.ArgumentParser(
description="Generate docker-compose cache override file"
)
parser.add_argument(
"--source",
default="docker-compose.platform.yml",
help="Source compose file to read (default: docker-compose.platform.yml)",
)
parser.add_argument(
"--output",
default="docker-compose.ci.yml",
help="Output compose file to write (default: docker-compose.ci.yml)",
)
parser.add_argument(
"--cache-from",
default="type=local,src=/tmp/.buildx-cache",
help="Cache source configuration",
)
parser.add_argument(
"--cache-to",
default="type=local,dest=/tmp/.buildx-cache-new,mode=max",
help="Cache destination configuration",
)
parser.add_argument(
"--backend-scope",
default="",
help="GHA cache scope for backend services (e.g., platform-backend-{hash})",
)
parser.add_argument(
"--frontend-scope",
default="",
help="GHA cache scope for frontend service (e.g., platform-frontend-{hash})",
)
args = parser.parse_args()
with open(args.source, "r") as f:
compose = yaml.safe_load(f)
ci_compose = {"services": {}}
for service_name, service_config in compose.get("services", {}).items():
if "build" not in service_config:
continue
cache_from = args.cache_from
cache_to = args.cache_to
# Determine scope based on Dockerfile path
if "type=gha" in args.cache_from or "type=gha" in args.cache_to:
dockerfile = service_config["build"].get("dockerfile", "Dockerfile")
if "frontend" in dockerfile:
scope = args.frontend_scope
elif "backend" in dockerfile:
scope = args.backend_scope
else:
# Skip services that don't clearly match frontend/backend
continue
if scope:
if "type=gha" in args.cache_from:
cache_from = f"{args.cache_from},scope={scope}"
if "type=gha" in args.cache_to:
cache_to = f"{args.cache_to},scope={scope}"
ci_compose["services"][service_name] = {
"build": {
"cache_from": [cache_from],
"cache_to": [cache_to],
}
}
with open(args.output, "w") as f:
yaml.dump(ci_compose, f, default_flow_style=False)
services = list(ci_compose["services"].keys())
print(f"Generated {args.output} with cache config for {len(services)} services:")
for svc in services:
print(f" - {svc}")
if __name__ == "__main__":
main()

View File

@@ -2,7 +2,7 @@ import asyncio
import logging
import uuid
from datetime import UTC, datetime
from typing import Any
from typing import Any, cast
from weakref import WeakValueDictionary
from openai.types.chat import (
@@ -104,6 +104,26 @@ class ChatSession(BaseModel):
successful_agent_runs: dict[str, int] = {}
successful_agent_schedules: dict[str, int] = {}
def add_tool_call_to_current_turn(self, tool_call: dict) -> None:
"""Attach a tool_call to the current turn's assistant message.
Searches backwards for the most recent assistant message (stopping at
any user message boundary). If found, appends the tool_call to it.
Otherwise creates a new assistant message with the tool_call.
"""
for msg in reversed(self.messages):
if msg.role == "user":
break
if msg.role == "assistant":
if not msg.tool_calls:
msg.tool_calls = []
msg.tool_calls.append(tool_call)
return
self.messages.append(
ChatMessage(role="assistant", content="", tool_calls=[tool_call])
)
@staticmethod
def new(user_id: str) -> "ChatSession":
return ChatSession(
@@ -172,6 +192,47 @@ class ChatSession(BaseModel):
successful_agent_schedules=successful_agent_schedules,
)
@staticmethod
def _merge_consecutive_assistant_messages(
messages: list[ChatCompletionMessageParam],
) -> list[ChatCompletionMessageParam]:
"""Merge consecutive assistant messages into single messages.
Long-running tool flows can create split assistant messages: one with
text content and another with tool_calls. Anthropic's API requires
tool_result blocks to reference a tool_use in the immediately preceding
assistant message, so these splits cause 400 errors via OpenRouter.
"""
if len(messages) < 2:
return messages
result: list[ChatCompletionMessageParam] = [messages[0]]
for msg in messages[1:]:
prev = result[-1]
if prev.get("role") != "assistant" or msg.get("role") != "assistant":
result.append(msg)
continue
prev = cast(ChatCompletionAssistantMessageParam, prev)
curr = cast(ChatCompletionAssistantMessageParam, msg)
curr_content = curr.get("content") or ""
if curr_content:
prev_content = prev.get("content") or ""
prev["content"] = (
f"{prev_content}\n{curr_content}" if prev_content else curr_content
)
curr_tool_calls = curr.get("tool_calls")
if curr_tool_calls:
prev_tool_calls = prev.get("tool_calls")
prev["tool_calls"] = (
list(prev_tool_calls) + list(curr_tool_calls)
if prev_tool_calls
else list(curr_tool_calls)
)
return result
def to_openai_messages(self) -> list[ChatCompletionMessageParam]:
messages = []
for message in self.messages:
@@ -258,7 +319,7 @@ class ChatSession(BaseModel):
name=message.name or "",
)
)
return messages
return self._merge_consecutive_assistant_messages(messages)
async def _get_session_from_cache(session_id: str) -> ChatSession | None:

View File

@@ -1,4 +1,16 @@
from typing import cast
import pytest
from openai.types.chat import (
ChatCompletionAssistantMessageParam,
ChatCompletionMessageParam,
ChatCompletionToolMessageParam,
ChatCompletionUserMessageParam,
)
from openai.types.chat.chat_completion_message_tool_call_param import (
ChatCompletionMessageToolCallParam,
Function,
)
from .model import (
ChatMessage,
@@ -117,3 +129,205 @@ async def test_chatsession_db_storage(setup_test_user, test_user_id):
loaded.tool_calls is not None
), f"Tool calls missing for {orig.role} message"
assert len(orig.tool_calls) == len(loaded.tool_calls)
# --------------------------------------------------------------------------- #
# _merge_consecutive_assistant_messages #
# --------------------------------------------------------------------------- #
_tc = ChatCompletionMessageToolCallParam(
id="tc1", type="function", function=Function(name="do_stuff", arguments="{}")
)
_tc2 = ChatCompletionMessageToolCallParam(
id="tc2", type="function", function=Function(name="other", arguments="{}")
)
def test_merge_noop_when_no_consecutive_assistants():
"""Messages without consecutive assistants are returned unchanged."""
msgs = [
ChatCompletionUserMessageParam(role="user", content="hi"),
ChatCompletionAssistantMessageParam(role="assistant", content="hello"),
ChatCompletionUserMessageParam(role="user", content="bye"),
]
merged = ChatSession._merge_consecutive_assistant_messages(msgs)
assert len(merged) == 3
assert [m["role"] for m in merged] == ["user", "assistant", "user"]
def test_merge_splits_text_and_tool_calls():
"""The exact bug scenario: text-only assistant followed by tool_calls-only assistant."""
msgs = [
ChatCompletionUserMessageParam(role="user", content="build agent"),
ChatCompletionAssistantMessageParam(
role="assistant", content="Let me build that"
),
ChatCompletionAssistantMessageParam(
role="assistant", content="", tool_calls=[_tc]
),
ChatCompletionToolMessageParam(role="tool", content="ok", tool_call_id="tc1"),
]
merged = ChatSession._merge_consecutive_assistant_messages(msgs)
assert len(merged) == 3
assert merged[0]["role"] == "user"
assert merged[2]["role"] == "tool"
a = cast(ChatCompletionAssistantMessageParam, merged[1])
assert a["role"] == "assistant"
assert a.get("content") == "Let me build that"
assert a.get("tool_calls") == [_tc]
def test_merge_combines_tool_calls_from_both():
"""Both consecutive assistants have tool_calls — they get merged."""
msgs: list[ChatCompletionAssistantMessageParam] = [
ChatCompletionAssistantMessageParam(
role="assistant", content="text", tool_calls=[_tc]
),
ChatCompletionAssistantMessageParam(
role="assistant", content="", tool_calls=[_tc2]
),
]
merged = ChatSession._merge_consecutive_assistant_messages(msgs) # type: ignore[arg-type]
assert len(merged) == 1
a = cast(ChatCompletionAssistantMessageParam, merged[0])
assert a.get("tool_calls") == [_tc, _tc2]
assert a.get("content") == "text"
def test_merge_three_consecutive_assistants():
"""Three consecutive assistants collapse into one."""
msgs: list[ChatCompletionAssistantMessageParam] = [
ChatCompletionAssistantMessageParam(role="assistant", content="a"),
ChatCompletionAssistantMessageParam(role="assistant", content="b"),
ChatCompletionAssistantMessageParam(
role="assistant", content="", tool_calls=[_tc]
),
]
merged = ChatSession._merge_consecutive_assistant_messages(msgs) # type: ignore[arg-type]
assert len(merged) == 1
a = cast(ChatCompletionAssistantMessageParam, merged[0])
assert a.get("content") == "a\nb"
assert a.get("tool_calls") == [_tc]
def test_merge_empty_and_single_message():
"""Edge cases: empty list and single message."""
assert ChatSession._merge_consecutive_assistant_messages([]) == []
single: list[ChatCompletionMessageParam] = [
ChatCompletionUserMessageParam(role="user", content="hi")
]
assert ChatSession._merge_consecutive_assistant_messages(single) == single
# --------------------------------------------------------------------------- #
# add_tool_call_to_current_turn #
# --------------------------------------------------------------------------- #
_raw_tc = {
"id": "tc1",
"type": "function",
"function": {"name": "f", "arguments": "{}"},
}
_raw_tc2 = {
"id": "tc2",
"type": "function",
"function": {"name": "g", "arguments": "{}"},
}
def test_add_tool_call_appends_to_existing_assistant():
"""When the last assistant is from the current turn, tool_call is added to it."""
session = ChatSession.new(user_id="u")
session.messages = [
ChatMessage(role="user", content="hi"),
ChatMessage(role="assistant", content="working on it"),
]
session.add_tool_call_to_current_turn(_raw_tc)
assert len(session.messages) == 2 # no new message created
assert session.messages[1].tool_calls == [_raw_tc]
def test_add_tool_call_creates_assistant_when_none_exists():
"""When there's no current-turn assistant, a new one is created."""
session = ChatSession.new(user_id="u")
session.messages = [
ChatMessage(role="user", content="hi"),
]
session.add_tool_call_to_current_turn(_raw_tc)
assert len(session.messages) == 2
assert session.messages[1].role == "assistant"
assert session.messages[1].tool_calls == [_raw_tc]
def test_add_tool_call_does_not_cross_user_boundary():
"""A user message acts as a boundary — previous assistant is not modified."""
session = ChatSession.new(user_id="u")
session.messages = [
ChatMessage(role="assistant", content="old turn"),
ChatMessage(role="user", content="new message"),
]
session.add_tool_call_to_current_turn(_raw_tc)
assert len(session.messages) == 3 # new assistant was created
assert session.messages[0].tool_calls is None # old assistant untouched
assert session.messages[2].role == "assistant"
assert session.messages[2].tool_calls == [_raw_tc]
def test_add_tool_call_multiple_times():
"""Multiple long-running tool calls accumulate on the same assistant."""
session = ChatSession.new(user_id="u")
session.messages = [
ChatMessage(role="user", content="hi"),
ChatMessage(role="assistant", content="doing stuff"),
]
session.add_tool_call_to_current_turn(_raw_tc)
# Simulate a pending tool result in between (like _yield_tool_call does)
session.messages.append(
ChatMessage(role="tool", content="pending", tool_call_id="tc1")
)
session.add_tool_call_to_current_turn(_raw_tc2)
assert len(session.messages) == 3 # user, assistant, tool — no extra assistant
assert session.messages[1].tool_calls == [_raw_tc, _raw_tc2]
def test_to_openai_messages_merges_split_assistants():
"""End-to-end: session with split assistants produces valid OpenAI messages."""
session = ChatSession.new(user_id="u")
session.messages = [
ChatMessage(role="user", content="build agent"),
ChatMessage(role="assistant", content="Let me build that"),
ChatMessage(
role="assistant",
content="",
tool_calls=[
{
"id": "tc1",
"type": "function",
"function": {"name": "create_agent", "arguments": "{}"},
}
],
),
ChatMessage(role="tool", content="done", tool_call_id="tc1"),
ChatMessage(role="assistant", content="Saved!"),
ChatMessage(role="user", content="show me an example run"),
]
openai_msgs = session.to_openai_messages()
# The two consecutive assistants at index 1,2 should be merged
roles = [m["role"] for m in openai_msgs]
assert roles == ["user", "assistant", "tool", "assistant", "user"]
# The merged assistant should have both content and tool_calls
merged = cast(ChatCompletionAssistantMessageParam, openai_msgs[1])
assert merged.get("content") == "Let me build that"
tc_list = merged.get("tool_calls")
assert tc_list is not None and len(list(tc_list)) == 1
assert list(tc_list)[0]["id"] == "tc1"

View File

@@ -800,9 +800,13 @@ async def stream_chat_completion(
# Build the messages list in the correct order
messages_to_save: list[ChatMessage] = []
# Add assistant message with tool_calls if any
# Add assistant message with tool_calls if any.
# Use extend (not assign) to preserve tool_calls already added by
# _yield_tool_call for long-running tools.
if accumulated_tool_calls:
assistant_response.tool_calls = accumulated_tool_calls
if not assistant_response.tool_calls:
assistant_response.tool_calls = []
assistant_response.tool_calls.extend(accumulated_tool_calls)
logger.info(
f"Added {len(accumulated_tool_calls)} tool calls to assistant message"
)
@@ -1404,13 +1408,9 @@ async def _yield_tool_call(
operation_id=operation_id,
)
# Save assistant message with tool_call FIRST (required by LLM)
assistant_message = ChatMessage(
role="assistant",
content="",
tool_calls=[tool_calls[yield_idx]],
)
session.messages.append(assistant_message)
# Attach the tool_call to the current turn's assistant message
# (or create one if this is a tool-only response with no text).
session.add_tool_call_to_current_turn(tool_calls[yield_idx])
# Then save pending tool result
pending_message = ChatMessage(

View File

@@ -1,10 +1,10 @@
import json
import shlex
import uuid
from typing import TYPE_CHECKING, Literal, Optional
from typing import Literal, Optional
from e2b import AsyncSandbox as BaseAsyncSandbox
from pydantic import SecretStr
from pydantic import BaseModel, SecretStr
from backend.data.block import (
Block,
@@ -20,13 +20,6 @@ from backend.data.model import (
SchemaField,
)
from backend.integrations.providers import ProviderName
from backend.util.sandbox_files import (
SandboxFileOutput,
extract_and_store_sandbox_files,
)
if TYPE_CHECKING:
from backend.executor.utils import ExecutionContext
class ClaudeCodeExecutionError(Exception):
@@ -181,15 +174,22 @@ class ClaudeCodeBlock(Block):
advanced=True,
)
class FileOutput(BaseModel):
"""A file extracted from the sandbox."""
path: str
relative_path: str # Path relative to working directory (for GitHub, etc.)
name: str
content: str
class Output(BlockSchemaOutput):
response: str = SchemaField(
description="The output/response from Claude Code execution"
)
files: list[SandboxFileOutput] = SchemaField(
files: list["ClaudeCodeBlock.FileOutput"] = SchemaField(
description=(
"List of text files created/modified by Claude Code during this execution. "
"Each file has 'path', 'relative_path', 'name', 'content', and 'workspace_ref' fields. "
"workspace_ref contains a workspace:// URI if the file was stored to workspace."
"Each file has 'path', 'relative_path', 'name', and 'content' fields."
)
)
conversation_history: str = SchemaField(
@@ -252,7 +252,6 @@ class ClaudeCodeBlock(Block):
"relative_path": "index.html",
"name": "index.html",
"content": "<html>Hello World</html>",
"workspace_ref": None,
}
],
),
@@ -268,12 +267,11 @@ class ClaudeCodeBlock(Block):
"execute_claude_code": lambda *args, **kwargs: (
"Created index.html with hello world content", # response
[
SandboxFileOutput(
ClaudeCodeBlock.FileOutput(
path="/home/user/index.html",
relative_path="index.html",
name="index.html",
content="<html>Hello World</html>",
workspace_ref=None,
)
], # files
"User: Create a hello world HTML file\n"
@@ -296,8 +294,7 @@ class ClaudeCodeBlock(Block):
existing_sandbox_id: str,
conversation_history: str,
dispose_sandbox: bool,
execution_context: "ExecutionContext",
) -> tuple[str, list[SandboxFileOutput], str, str, str]:
) -> tuple[str, list["ClaudeCodeBlock.FileOutput"], str, str, str]:
"""
Execute Claude Code in an E2B sandbox.
@@ -452,18 +449,14 @@ class ClaudeCodeBlock(Block):
else:
new_conversation_history = turn_entry
# Extract files created/modified during this run and store to workspace
sandbox_files = await extract_and_store_sandbox_files(
sandbox=sandbox,
working_directory=working_directory,
execution_context=execution_context,
since_timestamp=start_timestamp,
text_only=True,
# Extract files created/modified during this run
files = await self._extract_files(
sandbox, working_directory, start_timestamp
)
return (
response,
sandbox_files, # Already SandboxFileOutput objects
files,
new_conversation_history,
current_session_id,
sandbox_id,
@@ -478,6 +471,140 @@ class ClaudeCodeBlock(Block):
if dispose_sandbox and sandbox:
await sandbox.kill()
async def _extract_files(
self,
sandbox: BaseAsyncSandbox,
working_directory: str,
since_timestamp: str | None = None,
) -> list["ClaudeCodeBlock.FileOutput"]:
"""
Extract text files created/modified during this Claude Code execution.
Args:
sandbox: The E2B sandbox instance
working_directory: Directory to search for files
since_timestamp: ISO timestamp - only return files modified after this time
Returns:
List of FileOutput objects with path, relative_path, name, and content
"""
files: list[ClaudeCodeBlock.FileOutput] = []
# Text file extensions we can safely read as text
text_extensions = {
".txt",
".md",
".html",
".htm",
".css",
".js",
".ts",
".jsx",
".tsx",
".json",
".xml",
".yaml",
".yml",
".toml",
".ini",
".cfg",
".conf",
".py",
".rb",
".php",
".java",
".c",
".cpp",
".h",
".hpp",
".cs",
".go",
".rs",
".swift",
".kt",
".scala",
".sh",
".bash",
".zsh",
".sql",
".graphql",
".env",
".gitignore",
".dockerfile",
"Dockerfile",
".vue",
".svelte",
".astro",
".mdx",
".rst",
".tex",
".csv",
".log",
}
try:
# List files recursively using find command
# Exclude node_modules and .git directories, but allow hidden files
# like .env and .gitignore (they're filtered by text_extensions later)
# Filter by timestamp to only get files created/modified during this run
safe_working_dir = shlex.quote(working_directory)
timestamp_filter = ""
if since_timestamp:
timestamp_filter = f"-newermt {shlex.quote(since_timestamp)} "
find_result = await sandbox.commands.run(
f"find {safe_working_dir} -type f "
f"{timestamp_filter}"
f"-not -path '*/node_modules/*' "
f"-not -path '*/.git/*' "
f"2>/dev/null"
)
if find_result.stdout:
for file_path in find_result.stdout.strip().split("\n"):
if not file_path:
continue
# Check if it's a text file we can read
is_text = any(
file_path.endswith(ext) for ext in text_extensions
) or file_path.endswith("Dockerfile")
if is_text:
try:
content = await sandbox.files.read(file_path)
# Handle bytes or string
if isinstance(content, bytes):
content = content.decode("utf-8", errors="replace")
# Extract filename from path
file_name = file_path.split("/")[-1]
# Calculate relative path by stripping working directory
relative_path = file_path
if file_path.startswith(working_directory):
relative_path = file_path[len(working_directory) :]
# Remove leading slash if present
if relative_path.startswith("/"):
relative_path = relative_path[1:]
files.append(
ClaudeCodeBlock.FileOutput(
path=file_path,
relative_path=relative_path,
name=file_name,
content=content,
)
)
except Exception:
# Skip files that can't be read
pass
except Exception:
# If file extraction fails, return empty results
pass
return files
def _escape_prompt(self, prompt: str) -> str:
"""Escape the prompt for safe shell execution."""
# Use single quotes and escape any single quotes in the prompt
@@ -490,7 +617,6 @@ class ClaudeCodeBlock(Block):
*,
e2b_credentials: APIKeyCredentials,
anthropic_credentials: APIKeyCredentials,
execution_context: "ExecutionContext",
**kwargs,
) -> BlockOutput:
try:
@@ -511,7 +637,6 @@ class ClaudeCodeBlock(Block):
existing_sandbox_id=input_data.sandbox_id,
conversation_history=input_data.conversation_history,
dispose_sandbox=input_data.dispose_sandbox,
execution_context=execution_context,
)
yield "response", response

View File

@@ -1,5 +1,5 @@
from enum import Enum
from typing import TYPE_CHECKING, Any, Literal, Optional
from typing import Any, Literal, Optional
from e2b_code_interpreter import AsyncSandbox
from e2b_code_interpreter import Result as E2BExecutionResult
@@ -20,13 +20,6 @@ from backend.data.model import (
SchemaField,
)
from backend.integrations.providers import ProviderName
from backend.util.sandbox_files import (
SandboxFileOutput,
extract_and_store_sandbox_files,
)
if TYPE_CHECKING:
from backend.executor.utils import ExecutionContext
TEST_CREDENTIALS = APIKeyCredentials(
id="01234567-89ab-cdef-0123-456789abcdef",
@@ -92,9 +85,6 @@ class CodeExecutionResult(MainCodeExecutionResult):
class BaseE2BExecutorMixin:
"""Shared implementation methods for E2B executor blocks."""
# Default working directory in E2B sandboxes
WORKING_DIR = "/home/user"
async def execute_code(
self,
api_key: str,
@@ -105,21 +95,14 @@ class BaseE2BExecutorMixin:
timeout: Optional[int] = None,
sandbox_id: Optional[str] = None,
dispose_sandbox: bool = False,
execution_context: Optional["ExecutionContext"] = None,
extract_files: bool = False,
):
"""
Unified code execution method that handles all three use cases:
1. Create new sandbox and execute (ExecuteCodeBlock)
2. Create new sandbox, execute, and return sandbox_id (InstantiateCodeSandboxBlock)
3. Connect to existing sandbox and execute (ExecuteCodeStepBlock)
Args:
extract_files: If True and execution_context provided, extract files
created/modified during execution and store to workspace.
""" # noqa
sandbox = None
files: list[SandboxFileOutput] = []
try:
if sandbox_id:
# Connect to existing sandbox (ExecuteCodeStepBlock case)
@@ -135,12 +118,6 @@ class BaseE2BExecutorMixin:
for cmd in setup_commands:
await sandbox.commands.run(cmd)
# Capture timestamp before execution to scope file extraction
start_timestamp = None
if extract_files:
ts_result = await sandbox.commands.run("date -u +%Y-%m-%dT%H:%M:%S")
start_timestamp = ts_result.stdout.strip() if ts_result.stdout else None
# Execute the code
execution = await sandbox.run_code(
code,
@@ -156,24 +133,7 @@ class BaseE2BExecutorMixin:
stdout_logs = "".join(execution.logs.stdout)
stderr_logs = "".join(execution.logs.stderr)
# Extract files created/modified during this execution
if extract_files and execution_context:
files = await extract_and_store_sandbox_files(
sandbox=sandbox,
working_directory=self.WORKING_DIR,
execution_context=execution_context,
since_timestamp=start_timestamp,
text_only=False, # Include binary files too
)
return (
results,
text_output,
stdout_logs,
stderr_logs,
sandbox.sandbox_id,
files,
)
return results, text_output, stdout_logs, stderr_logs, sandbox.sandbox_id
finally:
# Dispose of sandbox if requested to reduce usage costs
if dispose_sandbox and sandbox:
@@ -278,12 +238,6 @@ class ExecuteCodeBlock(Block, BaseE2BExecutorMixin):
description="Standard output logs from execution"
)
stderr_logs: str = SchemaField(description="Standard error logs from execution")
files: list[SandboxFileOutput] = SchemaField(
description=(
"Files created or modified during execution. "
"Each file has path, name, content, and workspace_ref (if stored)."
),
)
def __init__(self):
super().__init__(
@@ -305,30 +259,23 @@ class ExecuteCodeBlock(Block, BaseE2BExecutorMixin):
("results", []),
("response", "Hello World"),
("stdout_logs", "Hello World\n"),
("files", []),
],
test_mock={
"execute_code": lambda api_key, code, language, template_id, setup_commands, timeout, dispose_sandbox, execution_context, extract_files: ( # noqa
"execute_code": lambda api_key, code, language, template_id, setup_commands, timeout, dispose_sandbox: ( # noqa
[], # results
"Hello World", # text_output
"Hello World\n", # stdout_logs
"", # stderr_logs
"sandbox_id", # sandbox_id
[], # files
),
},
)
async def run(
self,
input_data: Input,
*,
credentials: APIKeyCredentials,
execution_context: "ExecutionContext",
**kwargs,
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
try:
results, text_output, stdout, stderr, _, files = await self.execute_code(
results, text_output, stdout, stderr, _ = await self.execute_code(
api_key=credentials.api_key.get_secret_value(),
code=input_data.code,
language=input_data.language,
@@ -336,8 +283,6 @@ class ExecuteCodeBlock(Block, BaseE2BExecutorMixin):
setup_commands=input_data.setup_commands,
timeout=input_data.timeout,
dispose_sandbox=input_data.dispose_sandbox,
execution_context=execution_context,
extract_files=True,
)
# Determine result object shape & filter out empty formats
@@ -351,8 +296,6 @@ class ExecuteCodeBlock(Block, BaseE2BExecutorMixin):
yield "stdout_logs", stdout
if stderr:
yield "stderr_logs", stderr
# Always yield files (empty list if none)
yield "files", [f.model_dump() for f in files]
except Exception as e:
yield "error", str(e)
@@ -450,7 +393,6 @@ class InstantiateCodeSandboxBlock(Block, BaseE2BExecutorMixin):
"Hello World\n", # stdout_logs
"", # stderr_logs
"sandbox_id", # sandbox_id
[], # files
),
},
)
@@ -459,7 +401,7 @@ class InstantiateCodeSandboxBlock(Block, BaseE2BExecutorMixin):
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
try:
_, text_output, stdout, stderr, sandbox_id, _ = await self.execute_code(
_, text_output, stdout, stderr, sandbox_id = await self.execute_code(
api_key=credentials.api_key.get_secret_value(),
code=input_data.setup_code,
language=input_data.language,
@@ -558,7 +500,6 @@ class ExecuteCodeStepBlock(Block, BaseE2BExecutorMixin):
"Hello World\n", # stdout_logs
"", # stderr_logs
sandbox_id, # sandbox_id
[], # files
),
},
)
@@ -567,7 +508,7 @@ class ExecuteCodeStepBlock(Block, BaseE2BExecutorMixin):
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
try:
results, text_output, stdout, stderr, _, _ = await self.execute_code(
results, text_output, stdout, stderr, _ = await self.execute_code(
api_key=credentials.api_key.get_secret_value(),
code=input_data.step_code,
language=input_data.language,

View File

@@ -1,288 +0,0 @@
"""
Shared utilities for extracting and storing files from E2B sandboxes.
This module provides common file extraction and workspace storage functionality
for blocks that run code in E2B sandboxes (Claude Code, Code Executor, etc.).
"""
import base64
import logging
import mimetypes
import shlex
from dataclasses import dataclass
from typing import TYPE_CHECKING
from pydantic import BaseModel
from backend.util.file import store_media_file
from backend.util.type import MediaFileType
if TYPE_CHECKING:
from e2b import AsyncSandbox as BaseAsyncSandbox
from backend.executor.utils import ExecutionContext
logger = logging.getLogger(__name__)
# Text file extensions that can be safely read and stored as text
TEXT_EXTENSIONS = {
".txt",
".md",
".html",
".htm",
".css",
".js",
".ts",
".jsx",
".tsx",
".json",
".xml",
".yaml",
".yml",
".toml",
".ini",
".cfg",
".conf",
".py",
".rb",
".php",
".java",
".c",
".cpp",
".h",
".hpp",
".cs",
".go",
".rs",
".swift",
".kt",
".scala",
".sh",
".bash",
".zsh",
".sql",
".graphql",
".env",
".gitignore",
".dockerfile",
"Dockerfile",
".vue",
".svelte",
".astro",
".mdx",
".rst",
".tex",
".csv",
".log",
}
class SandboxFileOutput(BaseModel):
"""A file extracted from a sandbox and optionally stored in workspace."""
path: str
"""Full path in the sandbox."""
relative_path: str
"""Path relative to the working directory."""
name: str
"""Filename only."""
content: str
"""File content as text (for backward compatibility)."""
workspace_ref: str | None = None
"""Workspace reference (workspace://{id}#mime) if stored, None otherwise."""
@dataclass
class ExtractedFile:
"""Internal representation of an extracted file before storage."""
path: str
relative_path: str
name: str
content: bytes
is_text: bool
async def extract_sandbox_files(
sandbox: "BaseAsyncSandbox",
working_directory: str,
since_timestamp: str | None = None,
text_only: bool = True,
) -> list[ExtractedFile]:
"""
Extract files from an E2B sandbox.
Args:
sandbox: The E2B sandbox instance
working_directory: Directory to search for files
since_timestamp: ISO timestamp - only return files modified after this time
text_only: If True, only extract text files (default). If False, extract all files.
Returns:
List of ExtractedFile objects with path, content, and metadata
"""
files: list[ExtractedFile] = []
try:
# Build find command
safe_working_dir = shlex.quote(working_directory)
timestamp_filter = ""
if since_timestamp:
timestamp_filter = f"-newermt {shlex.quote(since_timestamp)} "
find_result = await sandbox.commands.run(
f"find {safe_working_dir} -type f "
f"{timestamp_filter}"
f"-not -path '*/node_modules/*' "
f"-not -path '*/.git/*' "
f"2>/dev/null"
)
if not find_result.stdout:
return files
for file_path in find_result.stdout.strip().split("\n"):
if not file_path:
continue
# Check if it's a text file
is_text = any(file_path.endswith(ext) for ext in TEXT_EXTENSIONS)
# Skip non-text files if text_only mode
if text_only and not is_text:
continue
try:
# Read file content as bytes
content = await sandbox.files.read(file_path, format="bytes")
if isinstance(content, str):
content = content.encode("utf-8")
elif isinstance(content, bytearray):
content = bytes(content)
# Extract filename from path
file_name = file_path.split("/")[-1]
# Calculate relative path
relative_path = file_path
if file_path.startswith(working_directory):
relative_path = file_path[len(working_directory) :]
if relative_path.startswith("/"):
relative_path = relative_path[1:]
files.append(
ExtractedFile(
path=file_path,
relative_path=relative_path,
name=file_name,
content=content,
is_text=is_text,
)
)
except Exception as e:
logger.debug(f"Failed to read file {file_path}: {e}")
continue
except Exception as e:
logger.warning(f"File extraction failed: {e}")
return files
async def store_sandbox_files(
extracted_files: list[ExtractedFile],
execution_context: "ExecutionContext",
) -> list[SandboxFileOutput]:
"""
Store extracted sandbox files to workspace and return output objects.
Args:
extracted_files: List of files extracted from sandbox
execution_context: Execution context for workspace storage
Returns:
List of SandboxFileOutput objects with workspace refs
"""
outputs: list[SandboxFileOutput] = []
for file in extracted_files:
# Decode content for text files (for backward compat content field)
if file.is_text:
try:
content_str = file.content.decode("utf-8", errors="replace")
except Exception:
content_str = ""
else:
content_str = f"[Binary file: {len(file.content)} bytes]"
# Build data URI (needed for storage and as binary fallback)
mime_type = mimetypes.guess_type(file.name)[0] or "application/octet-stream"
data_uri = f"data:{mime_type};base64,{base64.b64encode(file.content).decode()}"
# Try to store in workspace
workspace_ref: str | None = None
try:
result = await store_media_file(
file=MediaFileType(data_uri),
execution_context=execution_context,
return_format="for_block_output",
)
if result.startswith("workspace://"):
workspace_ref = result
elif not file.is_text:
# Non-workspace context (graph execution): store_media_file
# returned a data URI — use it as content so binary data isn't lost.
content_str = result
except Exception as e:
logger.warning(f"Failed to store file {file.name} to workspace: {e}")
# For binary files, fall back to data URI to prevent data loss
if not file.is_text:
content_str = data_uri
outputs.append(
SandboxFileOutput(
path=file.path,
relative_path=file.relative_path,
name=file.name,
content=content_str,
workspace_ref=workspace_ref,
)
)
return outputs
async def extract_and_store_sandbox_files(
sandbox: "BaseAsyncSandbox",
working_directory: str,
execution_context: "ExecutionContext",
since_timestamp: str | None = None,
text_only: bool = True,
) -> list[SandboxFileOutput]:
"""
Extract files from sandbox and store them in workspace.
This is the main entry point combining extraction and storage.
Args:
sandbox: The E2B sandbox instance
working_directory: Directory to search for files
execution_context: Execution context for workspace storage
since_timestamp: ISO timestamp - only return files modified after this time
text_only: If True, only extract text files
Returns:
List of SandboxFileOutput objects with content and workspace refs
"""
extracted = await extract_sandbox_files(
sandbox=sandbox,
working_directory=working_directory,
since_timestamp=since_timestamp,
text_only=text_only,
)
return await store_sandbox_files(extracted, execution_context)

View File

@@ -563,7 +563,7 @@ The block supports conversation continuation through three mechanisms:
|--------|-------------|------|
| error | Error message if execution failed | str |
| response | The output/response from Claude Code execution | str |
| files | List of text files created/modified by Claude Code during this execution. Each file has 'path', 'relative_path', 'name', 'content', and 'workspace_ref' fields. workspace_ref contains a workspace:// URI if the file was stored to workspace. | List[SandboxFileOutput] |
| files | List of text files created/modified by Claude Code during this execution. Each file has 'path', 'relative_path', 'name', and 'content' fields. | List[FileOutput] |
| conversation_history | Full conversation history including this turn. Pass this to conversation_history input to continue on a fresh sandbox if the previous sandbox timed out. | str |
| session_id | Session ID for this conversation. Pass this back along with sandbox_id to continue the conversation. | str |
| sandbox_id | ID of the sandbox instance. Pass this back along with session_id to continue the conversation. This is None if dispose_sandbox was True (sandbox was disposed). | str |

View File

@@ -215,7 +215,6 @@ The sandbox includes pip and npm pre-installed. Set timeout to limit execution t
| response | Text output (if any) of the main execution result | str |
| stdout_logs | Standard output logs from execution | str |
| stderr_logs | Standard error logs from execution | str |
| files | Files created or modified during execution. Each file has path, name, content, and workspace_ref (if stored). | List[SandboxFileOutput] |
### Possible use case
<!-- MANUAL: use_case -->