Compare commits

..

94 Commits

Author SHA1 Message Date
Zamil Majdy
43884a7322 fix(chat/sdk): resolve relative paths in security hooks and unify workspace access
The security hook's path validation blocked SDK Read/Write tools because
it didn't resolve relative paths against sdk_cwd. Since the SDK sets cwd,
Claude naturally uses relative paths like "test.txt" which failed the
absolute path prefix check. Now relative paths are joined with sdk_cwd
before validation, and denial messages include the allowed workspace path.

Also clarifies the workspace model: SDK Read/Write + bash_exec share the
same ephemeral session directory, while workspace_file tools provide
persistent cloud storage across sessions.
2026-02-13 00:08:15 +04:00
Zamil Majdy
cb45e7957b feat: fix openapi.json 2026-02-12 23:39:47 +04:00
Zamil Majdy
f1d02fb8f3 fix(chat/sdk): move cwd setup inside try block to ensure cleanup
Move _make_sdk_cwd() and os.makedirs() inside the try block so
the finally cleanup always runs, preventing /tmp dir leaks if
setup fails.
2026-02-12 23:32:26 +04:00
Zamil Majdy
47de6b6420 feat(chat): add check_operation_status tool for long-running ops
Lets the CoPilot agent query whether a create_agent/edit_agent
operation is still running, completed, or failed. Accepts operation_id
or task_id from a previous operation_started response and looks up
the task status in Redis via stream_registry.
2026-02-12 23:30:51 +04:00
Zamil Majdy
62cd2eea89 fix(chat/sandbox): use --symlink for compat paths on Debian 13
On Debian 13 (bookworm+), /bin, /lib, /sbin, /lib64 are symlinks to
/usr/*.  bwrap --ro-bind cannot create a symlink as a mount target
inside the sandbox, causing "execvp: No such file or directory" because
the ELF dynamic linker at /lib64/ld-linux-x86-64.so.2 is unreachable.

Detect symlinks at runtime with os.path.islink() and use bwrap
--symlink instead of --ro-bind.  Falls back to --ro-bind on older
distros where these are real directories.
2026-02-12 22:55:29 +04:00
Zamil Majdy
ae61ec692e Merge branch 'dev' into feat/copitlot-claude-code 2026-02-12 22:27:50 +04:00
Zamil Majdy
9296bd8736 fix(chat/sandbox): fix bwrap inside Docker containers
Three fixes for bubblewrap sandbox:
- Fix --tmpdir (invalid) to --tmpfs (correct bwrap option)
- Add --unshare-user so bwrap can create namespaces inside
  unprivileged Docker containers (no CAP_SYS_ADMIN needed)
- Reorder mounts: --tmpfs /tmp first, then --bind workspace on top,
  so the workspace directory is visible through the fresh tmpfs
2026-02-12 22:22:39 +04:00
Zamil Majdy
308113c03d fix(chat/sdk): remove obsolete Bash allowlist tests
The SDK built-in Bash tool is now unconditionally blocked (bash_exec
MCP tool with bubblewrap is used instead). Remove tests that expected
safe Bash commands to be allowed and replace with a single test that
verifies Bash is always denied.
2026-02-12 22:19:30 +04:00
Zamil Majdy
51abf13254 feat(chat): use LaunchDarkly flag for copilot SDK rollout
Replace static CHAT_USE_CLAUDE_AGENT_SDK env var with a LaunchDarkly
feature flag (copilot-sdk) for per-user rollout control. The env var
value serves as the default when LD is not configured or the flag
doesn't exist yet.
2026-02-12 22:02:28 +04:00
Zamil Majdy
54b03d3a29 fix(frontend): remove python_exec from openapi.json ResponseType enum
The python_exec tool was removed from the backend but the generated
openapi.json still referenced the enum value.
2026-02-12 21:55:25 +04:00
Zamil Majdy
239dff5ebd feat(chat/sandbox): add resource limits to bubblewrap sandbox
Add ulimit-based resource caps inside the bwrap sandbox to prevent
fork bombs and resource exhaustion:
- max 64 processes (stops fork bombs)
- 512 MB virtual memory
- 50 MB max file size
- 256 open file descriptors

Limits are applied via `sh -c 'ulimit ...; exec "$@"'` wrapper inside
the sandbox, so they're inherited by all child processes.
2026-02-12 21:47:49 +04:00
Zamil Majdy
1dd53db21c feat(chat/sandbox): bubblewrap sandbox for bash_exec, remove python_exec
- Replace `--ro-bind / /` with whitelist-only filesystem: only /usr, /etc,
  /bin, /lib, /sbin mounted read-only. /app, /root, /home, /opt, /var are
  completely invisible inside the sandbox.
- Add `--clearenv` to wipe all inherited env vars (API keys, DB passwords).
  Only safe vars (PATH, HOME=workspace, LANG) are explicitly set.
- Remove python_exec tool — bash_exec can run `python3 -c` or heredocs with
  identical bubblewrap protection, reducing attack surface.
- Remove all fallback security code (import hooks, blocked modules, network
  command lists). Tools now hard-require bubblewrap — disabled on platforms
  without bwrap.
- Clean up security_hooks.py: remove ~200 lines of dead bash validation code,
  add Bash to BLOCKED_TOOLS as defence-in-depth.
- Wire up long-running tool callback in SDK service for create_agent/edit_agent
  delegation to Redis Streams background infrastructure.
2026-02-12 21:44:40 +04:00
Zamil Majdy
06c16ee2fe fix(chat/sdk): non-blocking long-running tools, tighten security
- Long-running tools (create_agent) now run in background and return
  immediately with an operation_id. Add check_operation MCP tool for
  polling results. Prevents 3+ min blocking and survives page refresh.
- Fix CodeQL path traversal alert: use normpath+startswith sanitizer
  in _make_sdk_cwd() instead of assert.
- Tighten _read_file_handler: restrict from ~/.claude/ to only
  ~/.claude/projects/**/tool-results/ (sentry review feedback).
- Fix bash redirect bypass: strip quoted strings before checking for
  unquoted > operator, catches `echo hello>file` (sentry review).
2026-02-12 20:39:33 +04:00
Zamil Majdy
8d2a649ee5 refactor(chat/sdk): remove Langfuse tracing — OpenRouter handles observability
Delete tracing.py (~408 lines) and all TracedSession/hook references from the
SDK path. OpenRouter already provides token usage, cost tracking, and request
logging, making manual Langfuse integration redundant. This also fixes the
broken 'Langfuse' object has no attribute 'trace' warning on every request.
2026-02-12 20:24:27 +04:00
Nicholas Tindle
cb166dd6fb feat(blocks): Store sandbox files to workspace (#12073)
Store files created by sandbox blocks (Claude Code, Code Executor) to
the user's workspace for persistence across runs.

### Changes 🏗️

- **New `sandbox_files.py` utility** (`backend/util/sandbox_files.py`)
  - Shared module for extracting files from E2B sandboxes
- Stores files to workspace via `store_media_file()` (includes virus
scanning, size limits)
  - Returns `SandboxFileOutput` with path, content, and `workspace_ref`

- **Claude Code block** (`backend/blocks/claude_code.py`)
  - Added `workspace_ref` field to `FileOutput` schema
  - Replaced inline `_extract_files()` with shared utility
  - Files from working directory now stored to workspace automatically

- **Code Executor block** (`backend/blocks/code_executor.py`)
  - Added `files` output field to `ExecuteCodeBlock.Output`
  - Creates `/output` directory in sandbox before execution
  - Extracts all files (text + binary) from `/output` after execution
- Updated `execute_code()` to support file extraction with
`extract_files` param

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
- [x] Create agent with Claude Code block, have it create a file, verify
`workspace_ref` in output
- [x] Create agent with Code Executor block, write file to `/output`,
verify `workspace_ref` in output
  - [x] Verify files persist in workspace after sandbox disposal
- [x] Verify binary files (images, etc.) work correctly in Code Executor
- [x] Verify existing graphs using `content` field still work (backward
compat)

#### For configuration changes:
- [x] `.env.default` is updated or already compatible with my changes
- [x] `docker-compose.yml` is updated or already compatible with my
changes
- [x] I have included a list of my configuration changes in the PR
description (under **Changes**)

No configuration changes required - this is purely additive backend
code.

---

**Related:** Closes SECRT-1931

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> **Medium Risk**
> Adds automatic extraction and workspace storage of sandbox-written
files (including binaries for code execution), which can affect output
payload size, performance, and file-handling edge cases.
> 
> **Overview**
> **Sandbox blocks now persist generated files to workspace.** A new
shared utility (`backend/util/sandbox_files.py`) extracts files from an
E2B sandbox (scoped by a start timestamp) and stores them via
`store_media_file`, returning `SandboxFileOutput` with `workspace_ref`.
> 
> `ClaudeCodeBlock` replaces its inline file-scraping logic with this
utility and updates the `files` output schema to include
`workspace_ref`.
> 
> `ExecuteCodeBlock` adds a `files` output and extends the executor
mixin to optionally extract/store files (text + binary) when an
`execution_context` is provided; related mocks/tests and docs are
updated accordingly.
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
343854c0cf. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 15:56:59 +00:00
Zamil Majdy
9589474709 Merge branch 'dev' into feat/copitlot-claude-code 2026-02-12 19:40:32 +04:00
Swifty
3d31f62bf1 Revert "added feature request tooling"
This reverts commit b8b6c9de23.
2026-02-12 16:39:24 +01:00
Swifty
b8b6c9de23 added feature request tooling 2026-02-12 16:38:17 +01:00
Zamil Majdy
749a78723a refactor(chat/sdk): deduplicate code and remove anthropic fallback
- Extract shared `make_session_path()` into sandbox.py (single source of
  truth for workspace path sanitization), replace duplicate in service.py
- Delete anthropic_fallback.py (~360 lines) — redundant third code path;
  routes.py already falls back to non-SDK service
- Remove dead `traced_session()`, `get_tool_definitions()`,
  `get_tool_handlers()`, `_current_tool_call_id` ContextVar
- Fix hardcoded model in tracing — pass actual resolved model
- Fix inconsistent model name splitting in anthropic fallback
2026-02-12 19:26:29 +04:00
Zamil Majdy
bec2e1ddee fix(chat/tools): sanitize session_id in sandbox workspace path
Align with SDK's _make_sdk_cwd() to prevent path traversal and ensure
python_exec/bash_exec share the same workspace as SDK file tools.
2026-02-12 19:08:47 +04:00
Zamil Majdy
ec1ab06e0d chore(chat): bump default max_subtasks from 3 to 10 2026-02-12 19:07:42 +04:00
Zamil Majdy
f31cb49557 feat(chat/tools): add sandboxed python_exec, bash_exec, web_fetch tools and enable Task
- Add sandbox.py with network-isolated execution via unshare --net (Linux)
  and import/command blocklist fallback (macOS dev)
- Add python_exec tool: runs Python in subprocess with no network, workspace-scoped
- Add bash_exec tool: full Bash scripting with no network, workspace-scoped
- Add web_fetch tool: SSRF-protected URL fetching via backend Requests utility
- Remove SDK built-in Bash from allowlist (replaced by sandboxed bash_exec)
- Enable SDK built-in Task (sub-agents) with per-session rate limit (default 3)
- Add claude_agent_max_subtasks config field
2026-02-12 19:07:19 +04:00
Zamil Majdy
fd28c386f4 Merge branch 'dev' into feat/copitlot-claude-code 2026-02-12 18:50:11 +04:00
Zamil Majdy
3bea584659 feat(chat/sdk): route SDK through OpenRouter with observability (#12084)
## Summary
- Routes Claude Agent SDK API calls through OpenRouter via
`ANTHROPIC_BASE_URL` / `ANTHROPIC_AUTH_TOKEN` env vars, enabling
per-call token and cost tracking on the OpenRouter dashboard
- Adds `sdk_model` and `sdk_max_budget_usd` config fields for
SDK-specific model selection and budget control
- Emits `StreamUsage` from SDK `ResultMessage` so the frontend receives
token counts, and persists usage to `session.usage`
- Fixes Langfuse tracing to use the configured model name instead of a
hardcoded default
- Updates Anthropic fallback to use `config.api_key` / `config.base_url`
(OpenRouter routing) instead of raw `ANTHROPIC_API_KEY` env var

## Test plan
- [ ] Deploy and send a CoPilot message — verify the API call appears on
the OpenRouter dashboard
- [ ] Check Langfuse trace shows correct model name (e.g.
`claude-opus-4.6` not hardcoded `claude-sonnet-4-20250514`)
- [ ] Verify frontend receives `StreamUsage` with `promptTokens` /
`completionTokens` values
- [ ] Set `CHAT_SDK_MAX_BUDGET_USD` and verify budget is respected
- [ ] Test fallback path (without `claude-agent-sdk` installed) still
works via OpenRouter

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

Routes Claude Agent SDK API calls through OpenRouter for enhanced
observability and cost tracking. The PR enables per-call token tracking
on the OpenRouter dashboard by configuring the SDK to use
`ANTHROPIC_BASE_URL` and `ANTHROPIC_AUTH_TOKEN` environment variables
derived from the chat configuration.

Key changes:
- Added `sdk_model` and `sdk_max_budget_usd` configuration fields for
SDK-specific control
- Implemented automatic model name resolution that strips OpenRouter
provider prefixes
- Updated SDK client initialization to route through OpenRouter with
proper environment variables
- Emits `StreamUsage` events from SDK `ResultMessage` for frontend token
visibility
- Persists usage data to `session.usage` for historical tracking
- Fixed Langfuse tracing to use the configured model name instead of
hardcoded defaults
- Updated fallback path to use OpenRouter routing instead of direct
Anthropic API
</details>


<details><summary><h3>Confidence Score: 4/5</h3></summary>

- Safe to merge with minor observations - the implementation is solid
and the changes are well-structured
- The code quality is high with proper error handling, clear separation
of concerns, and good defensive coding practices. The changes integrate
cleanly with existing patterns. Minor observations include missing
validation for sdk_max_budget_usd and a potential edge case in model
name resolution, but these don't block merging
- No files require special attention - all changes follow existing
patterns and maintain consistency
</details>


<details><summary><h3>Sequence Diagram</h3></summary>

```mermaid
sequenceDiagram
    participant Frontend
    participant Backend
    participant SDK as Claude Agent SDK
    participant OpenRouter
    participant Anthropic
    participant Langfuse

    Frontend->>Backend: POST /chat/completions
    Backend->>Backend: Load config (api_key, base_url)
    Backend->>Backend: Resolve SDK model (strip OpenRouter prefix)
    Backend->>Backend: Build SDK env vars (ANTHROPIC_BASE_URL, ANTHROPIC_AUTH_TOKEN)
    
    Backend->>Langfuse: Initialize TracedSession with model name
    Backend->>SDK: ClaudeSDKClient(model, env, max_budget_usd)
    
    SDK->>SDK: Use ANTHROPIC_BASE_URL from env
    SDK->>OpenRouter: POST /messages (via configured base_url)
    OpenRouter->>Anthropic: Forward request with routing
    Anthropic-->>OpenRouter: Stream response chunks
    OpenRouter-->>SDK: Stream response with usage data
    
    loop For each SDK message
        SDK-->>Backend: AssistantMessage/UserMessage/ResultMessage
        Backend->>Langfuse: log_sdk_message()
        Backend->>Backend: SDKResponseAdapter.convert_message()
        Backend->>Backend: Extract usage from ResultMessage
        Backend->>Backend: Persist Usage to session.usage
        Backend-->>Frontend: StreamUsage(promptTokens, completionTokens)
        Backend-->>Frontend: StreamTextDelta/StreamToolInput/etc
    end
    
    Backend->>Langfuse: Log final generation with model name
    Backend->>Backend: Save session with usage data
    Backend-->>Frontend: StreamFinish
```
</details>


<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->
2026-02-12 21:47:39 +07:00
Abhimanyu Yadav
4f6055f494 refactor(frontend): remove default expiration date from API key credentials form (#12092)
### Changes 🏗️

Removed the default expiration date for API keys in the credentials
modal. Previously, API keys were set to expire the next day by default,
but now the expiration date field starts empty, allowing users to
explicitly choose whether they want to set an expiration date.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
- [x] Open the API key credentials modal and verify the expiration date
field is empty by default
  - [x] Test creating an API key with and without an expiration date
  - [x] Verify both scenarios work correctly

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

Removed the default expiration date for API key credentials in the
credentials modal. Previously, API keys were automatically set to expire
the next day at midnight. Now the expiration date field starts empty,
allowing users to explicitly choose whether to set an expiration.

- Removed `getDefaultExpirationDate()` helper function that calculated
tomorrow's date
- Changed default `expiresAt` value from calculated date to empty string
- Backend already supports optional expiration (`expires_at?: number`),
so no backend changes needed
- Form submission correctly handles empty expiration by passing
`undefined` to the API
</details>


<details><summary><h3>Confidence Score: 5/5</h3></summary>

- This PR is safe to merge with minimal risk
- The changes are straightforward and well-contained. The refactor
removes a helper function and changes a default value. The backend API
already supports optional expiration dates, and the form submission
logic correctly handles empty values by passing undefined. The change
improves UX by not forcing a default expiration date on users.
- No files require special attention
</details>


<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->
2026-02-12 12:57:06 +00:00
Otto
695a185fa1 fix(frontend): remove fixed min-height from CoPilot message container (#12091)
## Summary

Removes the `min-h-screen` class from `ConversationContent` in
ChatMessagesContainer, which was causing fixed height layout issues in
the CoPilot chat interface.

## Changes

- Removed `min-h-screen` from ConversationContent className

## Linear

Fixes [SECRT-1944](https://linear.app/autogpt/issue/SECRT-1944)

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

Removes the `min-h-screen` (100vh) class from `ConversationContent` that
was causing the chat message container to enforce a minimum viewport
height. The parent container already handles height constraints with
`h-full min-h-0` and flexbox layout, so the fixed minimum height was
creating layout conflicts. The component now properly grows within its
flex container using `flex-1`.
</details>


<details><summary><h3>Confidence Score: 5/5</h3></summary>

- This PR is safe to merge with minimal risk
- The change removes a single problematic CSS class that was causing
fixed height layout issues. The parent container already handles height
constraints properly with flexbox, and removing min-h-screen allows the
component to size correctly within its flex parent. This is a targeted,
low-risk bug fix with no logic changes.
- No files require special attention
</details>


<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->
2026-02-12 12:46:29 +00:00
Reinier van der Leer
113e87a23c refactor(backend): Reduce circular imports (#12068)
I'm getting circular import issues because there is a lot of
cross-importing between `backend.data`, `backend.blocks`, and other
modules. This change reduces block-related cross-imports and thus risk
of breaking circular imports.

### Changes 🏗️

- Strip down `backend.data.block`
- Move `Block` base class and related class/enum defs to
`backend.blocks._base`
  - Move `is_block_auth_configured` to `backend.blocks._utils`
- Move `get_blocks()`, `get_io_block_ids()` etc. to `backend.blocks`
(`__init__.py`)
  - Update imports everywhere
- Remove unused and poorly typed `Block.create()`
  - Change usages from `block_cls.create()` to `block_cls()`
- Improve typing of `load_all_blocks` and `get_blocks`
- Move cross-import of `backend.api.features.library.model` from
`backend/data/__init__.py` to `backend/data/integrations.py`
- Remove deprecated attribute `NodeModel.webhook`
  - Re-generate OpenAPI spec and fix frontend usage
- Eliminate module-level `backend.blocks` import from `blocks/agent.py`
- Eliminate module-level `backend.data.execution` and
`backend.executor.manager` imports from `blocks/helpers/review.py`
- Replace `BlockInput` with `GraphInput` for graph inputs

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - CI static type-checking + tests should be sufficient for this
2026-02-12 12:07:49 +00:00
Abhimanyu Yadav
d09f1532a4 feat(frontend): replace legacy builder with new flow editor
(#12081)

### Changes 🏗️

This PR completes the migration from the legacy builder to the new Flow
editor by removing all legacy code and feature flags.

**Removed:**
- Old builder view toggle functionality (`BuilderViewTabs.tsx`)
- Legacy debug panel (`RightSidebar.tsx`)
- Feature flags: `NEW_FLOW_EDITOR` and `BUILDER_VIEW_SWITCH`
- `useBuilderView` hook and related view-switching logic

**Updated:**
- Simplified `build/page.tsx` to always render the new Flow editor
- Added CSS styling (`flow.css`) to properly render Phosphor icons in
React Flow handles

**Tests:**
- Skipped e2e test suite in `build.spec.ts` (legacy builder tests)
- Follow-up PR (#12082) will add new e2e tests for the Flow editor

### Checklist 📋

#### For code changes:

- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
    - [x] Create a new flow and verify it loads correctly
    - [x] Add nodes and connections to verify basic functionality works
    - [x] Verify that node handles render correctly with the new CSS
- [x] Check that the UI is clean without the old debug panel or view
toggles

#### For configuration changes:

- [x] `.env.default` is updated or already compatible with my changes
- [x] `docker-compose.yml` is updated or already compatible with my
changes
2026-02-12 11:16:01 +00:00
Zamil Majdy
d7f7a2747f fix(backend/chat): Atomic message append to prevent race condition
Replace the read-modify-write pattern in stream_chat_post with an
atomic append_and_save_message helper that acquires the session lock
before re-fetching and appending. This prevents message loss when
concurrent requests modify the same session.
2026-02-12 09:10:43 +04:00
Zamil Majdy
68849e197c format 2026-02-12 08:26:26 +04:00
Zamil Majdy
211478bb29 Revert "style: run ruff format and isort"
This reverts commit 40b58807ab.
2026-02-12 08:25:22 +04:00
Zamil Majdy
0e88dd15b2 feat(chat): add hook-based tracing integration for Claude Agent SDK
- Add create_tracing_hooks() for fine-grained tool timing
- Add merge_hooks() utility to combine security + tracing hooks
- Captures precise pre/post timing for tool executions
- Tracks tool failures via PostToolUseFailure hook
- Integrates seamlessly with existing security hooks
2026-02-12 03:35:16 +00:00
Zamil Majdy
7f3c227f0a feat(chat): add modular Langfuse tracing for Claude Agent SDK
- Create tracing.py with TracedSession context manager
- Automatically trace user messages, SDK messages, and results
- Capture tool calls with input/output and timing
- Log usage and cost from SDK ResultMessage
- No-op when Langfuse not configured (zero overhead)
- Clean integration into service.py via context manager
2026-02-12 03:33:37 +00:00
Zamil Majdy
40b58807ab style: run ruff format and isort 2026-02-12 03:25:19 +00:00
Zamil Majdy
d0e2e6f013 security(service): strengthen path validation for SDK cleanup
- Add empty check after session_id sanitization
- Add assertion for defense-in-depth
- Add explicit '..' traversal check in cleanup
- Replace glob with os.listdir to avoid glob injection
- Add validation that project_dir stays under ~/.claude/projects
- Add warning logs for rejected paths

Addresses CodeQL alert about uncontrolled data in path expression
2026-02-12 03:07:08 +00:00
Zamil Majdy
efdc8d73cc fix(security_hooks): use json.dumps for pattern matching and log warning
- Use json.dumps instead of str() for more predictable pattern matching
- Log warning when SDK not available and security hooks are disabled

Addresses CodeRabbit review feedback
2026-02-12 02:55:04 +00:00
Zamil Majdy
a34810d8a2 revert: remove Bash command extraction from GenericTool
Keep it simple - just show 'Bash completed' instead of special
handling to extract command names like 'jq completed'
2026-02-12 02:53:37 +00:00
Zamil Majdy
038b7d5841 feat(copilot): show specific command name for Bash tool
- Extract command name (jq, grep, etc.) from Bash tool input
- Display 'jq completed' instead of 'Bash completed'
- Add ripgrep and tree to Dockerfile (match ALLOWED_BASH_COMMANDS)
2026-02-12 02:48:19 +00:00
Zamil Majdy
cac93b0cc9 fix(chat): increase SDK buffer limit and add jq
- Add sdk_max_buffer_size config option (default 10MB, was 1MB)
- Pass max_buffer_size to ClaudeAgentOptions to prevent crashes on large tool outputs
- Install jq in Dockerfile for JSON processing capabilities

Fixes AUTOGPT-SERVER-7V2
2026-02-12 02:41:12 +00:00
Zamil Majdy
a78145505b fix(copilot): merge split assistant messages to prevent Anthropic API errors (#12062)
## Summary
- When the copilot model responds with both text content AND a
long-running tool call (e.g., `create_agent`), the streaming code
created two separate consecutive assistant messages — one with text, one
with `tool_calls`. This caused Anthropic's API to reject with
`"unexpected tool_use_id found in tool_result blocks"` because the
`tool_result` couldn't find a matching `tool_use` in the immediately
preceding assistant message.
- Added a defensive merge of consecutive assistant messages in
`to_openai_messages()` (fixes existing corrupt sessions too)
- Fixed `_yield_tool_call` to add tool_calls to the existing
current-turn assistant message instead of creating a new one
- Changed `accumulated_tool_calls` assignment to use `extend` to prevent
overwriting tool_calls added by long-running tool flow

## Test plan
- [x] All 23 chat feature tests pass (`backend/api/features/chat/`)
- [x] All 44 prompt utility tests pass (`backend/util/prompt_test.py`)
- [x] All pre-commit hooks pass (ruff, isort, black, pyright)
- [ ] Manual test: create an agent via copilot, then ask a follow-up
question — should no longer get 400 error

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

Fixes a critical bug where long-running tool calls (like `create_agent`)
caused Anthropic API 400 errors due to split assistant messages. The fix
ensures tool calls are added to the existing assistant message instead
of creating new ones, and adds a defensive merge function to repair any
existing corrupt sessions.

**Key changes:**
- Added `_merge_consecutive_assistant_messages()` to defensively merge
split assistant messages in `to_openai_messages()`
- Modified `_yield_tool_call()` to append tool calls to the current-turn
assistant message instead of creating a new one
- Changed `accumulated_tool_calls` from assignment to `extend` to
preserve tool calls already added by long-running tool flow

**Impact:** Resolves the issue where users received 400 errors after
creating agents via copilot and asking follow-up questions.
</details>


<details><summary><h3>Confidence Score: 4/5</h3></summary>

- Safe to merge with minor verification recommended
- The changes are well-targeted and solve a real API compatibility
issue. The logic is sound: searching backwards for the current assistant
message is correct, and using `extend` instead of assignment prevents
overwriting. The defensive merge in `to_openai_messages()` also fixes
existing corrupt sessions. All existing tests pass according to the PR
description.
- No files require special attention - changes are localized and
defensive
</details>


<details><summary><h3>Sequence Diagram</h3></summary>

```mermaid
sequenceDiagram
    participant User
    participant StreamAPI as stream_chat_completion
    participant Chunks as _stream_chat_chunks
    participant ToolCall as _yield_tool_call
    participant Session as ChatSession
    
    User->>StreamAPI: Send message
    StreamAPI->>Chunks: Stream chat chunks
    
    alt Text + Long-running tool call
        Chunks->>StreamAPI: Text delta (content)
        StreamAPI->>Session: Append assistant message with content
        Chunks->>ToolCall: Tool call detected
        
        Note over ToolCall: OLD: Created new assistant message<br/>NEW: Appends to existing assistant
        
        ToolCall->>Session: Search backwards for current assistant
        ToolCall->>Session: Append tool_call to existing message
        ToolCall->>Session: Add pending tool result
    end
    
    StreamAPI->>StreamAPI: Merge accumulated_tool_calls
    Note over StreamAPI: Use extend (not assign)<br/>to preserve existing tool_calls
    
    StreamAPI->>Session: to_openai_messages()
    Session->>Session: _merge_consecutive_assistant_messages()
    Note over Session: Defensive: Merges any split<br/>assistant messages
    Session-->>StreamAPI: Merged messages
    
    StreamAPI->>User: Return response
```
</details>


<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->
2026-02-12 01:52:17 +00:00
Zamil Majdy
2025aaf5f2 fix(backend/chat): Preserve full MCP tool output for frontend widgets
The SDK CLI truncates large tool results (writing them to disk),
which breaks frontend widget rendering (e.g., find_block's block
list cards). Stash the full MCP tool output before the SDK sees it,
then use the stash in the response adapter so the frontend always
receives the complete JSON for proper widget parsing.
2026-02-11 23:13:42 +04:00
Zamil Majdy
ae9bce3bae feat(backend/chat): Add sandboxed Bash and notify SDK of restrictions
- Allow Bash tool with command allowlist (jq, grep, head, tail, etc.)
  validated via shlex.split for proper quote handling
- Add workspace path validation for Bash absolute paths
- Add SDK built-in tools (Read/Write/Edit/Glob/Grep/Bash) to allowed_tools
- Append Bash restrictions to system prompt (SDK doesn't know our allowlist)
- Add default_factory to BlockInfoSummary schema fields
- Add 12 Bash sandbox tests covering safe/dangerous commands, substitution,
  redirection, /dev/ access, path escaping
2026-02-11 22:35:39 +04:00
Zamil Majdy
3107d889fc feat(frontend/copilot): Add generic tool widget for unrecognized tools
SDK built-in tools (Read, Glob, Grep, etc.) have no dedicated frontend
widget, so tool calls silently disappeared. Add a GenericTool component
that shows a spinning gear + "Running {tool}…" for any tool-* part
type that doesn't match a known case.
2026-02-11 22:08:03 +04:00
Zamil Majdy
f174fb6303 fix(backend/chat): Strip MCP prefix from SDK tool names for frontend rendering
The Vercel AI SDK frontend renders tool widgets based on tool name
(e.g. "tool-find_block", "tool-run_agent"). The SDK sends tool names
with the MCP prefix (mcp__copilot__find_block) which didn't match
any frontend switch case, causing tool execution to be invisible.

Strip the mcp__copilot__ prefix in the response adapter so tool events
reach the correct frontend widget handlers.
2026-02-11 22:01:59 +04:00
Zamil Majdy
920a4c5f15 feat(backend/chat): Allow Read/Write/Edit/Glob/Grep in SDK within workspace
Move these tools from fully-blocked to workspace-scoped: they are now
allowed when the file path stays within the SDK working directory
(/tmp/copilot-<session>/) or the tool-results directory
(~/.claude/projects/…/tool-results/). This enables the SDK's built-in
oversized tool result handling and workspace file operations.

- Add _validate_workspace_path() with normpath-based path validation
- Pass sdk_cwd from service.py into create_security_hooks()
- Add 20 unit tests covering allowed/denied paths, traversal attacks
2026-02-11 20:39:33 +04:00
Zamil Majdy
e95fadbb86 Merge branch 'dev' into feat/copitlot-claude-code 2026-02-11 20:23:56 +04:00
Zamil Majdy
b14b3803ad feat(backend/chat): Add StreamStartStep/StreamFinishStep to SDK adapter
The non-SDK path emits step boundaries (StartStep/FinishStep) around
each LLM turn and tool cycle. The SDK adapter was missing these,
causing the frontend to lack visual step framing for tool calls.

Now the SDK adapter emits:
- StreamStartStep after init and before each new LLM turn
- StreamFinishStep after tool results and before final finish
2026-02-11 20:18:27 +04:00
Otto
36aeb0b2b3 docs(blocks): clarify HumanInTheLoop output descriptions for agent builder (#12069)
## Problem

The agent builder (LLM) misinterprets the HumanInTheLoop block outputs.
It thinks `approved_data` and `rejected_data` will yield status strings
like "APPROVED" or "REJECTED" instead of understanding that the actual
input data passes through.

This leads to unnecessary complexity - the agent builder adds comparison
blocks to check for status strings that don't exist.

## Solution

Enriched the block docstring and all input/output field descriptions to
make it explicit that:
1. The output is the actual data itself, not a status string
2. The routing is determined by which output pin fires
3. How to use the block correctly (connect downstream blocks to
appropriate output pins)

## Changes

- Updated block docstring with clear "How it works" and "Example usage"
sections
- Enhanced `data` input description to explain data flow
- Enhanced `name` input description for reviewer context
- Enhanced `approved_data` output to explicitly state it's NOT a status
string
- Enhanced `rejected_data` output to explicitly state it's NOT a status
string
- Enhanced `review_message` output for clarity

## Testing

Documentation-only change to schema descriptions. No functional changes.

Fixes SECRT-1930

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

Enhanced documentation for the `HumanInTheLoopBlock` to clarify how
output pins work. The key improvement explicitly states that output pins
(`approved_data` and `rejected_data`) yield the actual input data, not
status strings like "APPROVED" or "REJECTED". This prevents the agent
builder (LLM) from misinterpreting the block's behavior and adding
unnecessary comparison blocks.

**Key changes:**
- Added "How it works" and "Example usage" sections to the block
docstring
- Clarified that routing is determined by which output pin fires, not by
comparing output values
- Enhanced all input/output field descriptions with explicit data flow
explanations
- Emphasized that downstream blocks should be connected to the
appropriate output pin based on desired workflow path

This is a documentation-only change with no functional modifications to
the code logic.
</details>


<details><summary><h3>Confidence Score: 5/5</h3></summary>

- This PR is safe to merge with no risk
- Documentation-only change that accurately reflects the existing code
behavior. No functional changes, no runtime impact, and the enhanced
descriptions correctly explain how the block outputs work based on
verification of the implementation code.
- No files require special attention
</details>


<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->

Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
2026-02-11 15:43:58 +00:00
Ubbe
2a189c44c4 fix(frontend): API stream issues leaking into prompt (#12063)
## Changes 🏗️

<img width="800" height="621" alt="Screenshot 2026-02-11 at 19 32 39"
src="https://github.com/user-attachments/assets/e97be1a7-972e-4ae0-8dfa-6ade63cf287b"
/>

When the BE API has an error, prevent it from leaking into the stream
and instead handle it gracefully via toast.

## Checklist 📋

### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Run the app locally and trust the changes

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

This PR fixes an issue where backend API stream errors were leaking into
the chat prompt instead of being handled gracefully. The fix involves
both backend and frontend changes to ensure error events conform to the
AI SDK's strict schema.

**Key Changes:**
- **Backend (`response_model.py`)**: Added custom `to_sse()` method for
`StreamError` that only emits `type` and `errorText` fields, stripping
extra fields like `code` and `details` that cause AI SDK validation
failures
- **Backend (`prompt.py`)**: Added validation step after context
compression to remove orphaned tool responses without matching tool
calls, preventing "unexpected tool_use_id" API errors
- **Frontend (`route.ts`)**: Implemented SSE stream normalization with
`normalizeSSEStream()` and `normalizeSSEEvent()` functions to strip
non-conforming fields from error events before they reach the AI SDK
- **Frontend (`ChatMessagesContainer.tsx`)**: Added toast notifications
for errors and improved error display UI with deduplication logic

The changes ensure a clean separation between internal error metadata
(useful for logging/debugging) and the strict schema required by the AI
SDK on the frontend.
</details>


<details><summary><h3>Confidence Score: 4/5</h3></summary>

- This PR is safe to merge with low risk
- The changes are well-structured and address a specific bug with proper
error handling. The dual-layer approach (backend filtering in `to_sse()`
+ frontend normalization) provides defense-in-depth. However, the lack
of automated tests for the new error normalization logic and the
potential for edge cases in SSE parsing prevent a perfect score.
- Pay close attention to
`autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts`
- the SSE normalization logic should be tested with various error
scenarios
</details>


<details><summary><h3>Sequence Diagram</h3></summary>

```mermaid
sequenceDiagram
    participant User
    participant Frontend as ChatMessagesContainer
    participant Proxy as /api/chat/.../stream
    participant Backend as Backend API
    participant AISDK as AI SDK

    User->>Frontend: Send message
    Frontend->>Proxy: POST with message
    Proxy->>Backend: Forward request with auth
    Backend->>Backend: Process message
    
    alt Success Path
        Backend->>Proxy: SSE stream (text-delta, etc.)
        Proxy->>Proxy: normalizeSSEStream (pass through)
        Proxy->>AISDK: Forward SSE events
        AISDK->>Frontend: Update messages
        Frontend->>User: Display response
    else Error Path
        Backend->>Backend: StreamError.to_sse()
        Note over Backend: Only emit {type, errorText}
        Backend->>Proxy: SSE error event
        Proxy->>Proxy: normalizeSSEEvent()
        Note over Proxy: Strip extra fields (code, details)
        Proxy->>AISDK: {type: "error", errorText: "..."}
        AISDK->>Frontend: error state updated
        Frontend->>Frontend: Toast notification (deduplicated)
        Frontend->>User: Show error UI + toast
    end
```
</details>


<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->

---------

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: Otto-AGPT <otto@agpt.co>
2026-02-11 22:46:37 +08:00
Abhimanyu Yadav
508759610f fix(frontend): add min-width-0 to ContentCard to prevent overflow (#12060)
### Changes 🏗️

Added `min-w-0` class to the ContentCard component in the ToolAccordion
to prevent content overflow issues. This CSS fix ensures that the card
properly respects its container width constraints and allows text
truncation to work correctly when content is too wide.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Verified that tool content displays correctly in the accordion
- [x] Confirmed that long content properly truncates instead of
overflowing
  - [x] Tested with various screen sizes to ensure responsive behavior

#### For configuration changes:

- [x] `.env.default` is updated or already compatible with my changes
- [x] `docker-compose.yml` is updated or already compatible with my
changes

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

Added `min-w-0` class to `ContentCard` component to fix text truncation
overflow in grid layouts. This is a standard CSS fix that allows grid
items to shrink below their content size, enabling `truncate` classes on
child elements (`ContentCardTitle`, `ContentCardSubtitle`) to work
correctly. The fix follows the same pattern already used in
`ContentCardHeader` (line 54) and `ToolAccordion` (line 54).
</details>


<details><summary><h3>Confidence Score: 5/5</h3></summary>

- Safe to merge with no risk
- Single-line CSS fix that addresses a well-known flexbox/grid layout
issue. The change follows existing patterns in the codebase and is
thoroughly tested. No logic changes, no breaking changes, no side
effects.
- No files require special attention
</details>


<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->
2026-02-11 21:09:21 +08:00
Otto
062fe1aa70 fix(security): enforce disabled flag on blocks in graph validation (#12059)
## Summary
Blocks marked `disabled=True` (like BlockInstallationBlock) were not
being checked during graph validation, allowing them to be used via
direct API calls despite being hidden from the UI.

This adds a security check in `_validate_graph_get_errors()` to reject
any graph containing disabled blocks.

## Security Advisory
GHSA-4crw-9p35-9x54

## Linear
SECRT-1927

## Changes
- Added `block.disabled` check in graph validation (6 lines)

## Testing
- Graphs with disabled blocks → rejected with clear error message
- Graphs with valid blocks → unchanged behavior

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

Adds critical security validation to prevent execution of disabled
blocks (like `BlockInstallationBlock`) via direct API calls. The fix
validates that `block.disabled` is `False` during graph validation in
`_validate_graph_get_errors()` on line 747-750, ensuring disabled blocks
are rejected before graph creation or execution. This closes a
vulnerability where blocks marked disabled in the UI could still be used
through API endpoints.
</details>


<details><summary><h3>Confidence Score: 5/5</h3></summary>

- This PR is safe to merge and addresses a critical security
vulnerability
- The fix is minimal (6 lines), correctly placed in the validation flow,
includes clear security context (GHSA reference), and follows existing
validation patterns. The check is positioned after block existence
validation and before input validation, ensuring disabled blocks are
caught early in both graph creation and execution paths.
- No files require special attention
</details>


<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->

---------

Co-authored-by: Nicholas Tindle <nicholas.tindle@agpt.co>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 03:28:19 +00:00
Zamil Majdy
82c483d6c8 Merge branch 'dev' into feat/copitlot-claude-code 2026-02-11 07:17:38 +04:00
Zamil Majdy
7cffa1895f fix(backend/chat): Filter duplicate StreamStart from non-SDK path
Routes.py already publishes a StreamStart before calling the service.
The SDK path filters the duplicate internally, but the non-SDK path
did not, causing two StreamStart events to reach the frontend.
2026-02-11 06:52:47 +04:00
Zamil Majdy
9791bdd724 fix(backend/chat): Use normpath+startswith pattern for CodeQL path sanitization
CodeQL doesn't recognize re.sub as a path sanitizer. Switch to the
os.path.normpath + startswith prefix check pattern that CodeQL's
taint model explicitly recognizes as breaking the taint chain.
2026-02-11 06:45:12 +04:00
Zamil Majdy
750a674c78 fix lock 2026-02-11 06:39:03 +04:00
Zamil Majdy
960c7980a3 fix(backend/chat): Use named helper for session_id sanitization to satisfy CodeQL
Replace inline comprehension with _sanitize_session_id() using re.sub
so CodeQL recognizes the path-traversal sanitization barrier.
2026-02-11 06:32:16 +04:00
Zamil Majdy
e85d437bb2 fix(backend/chat): Sanitize session_id in SDK cwd path to prevent path traversal 2026-02-11 06:26:48 +04:00
dependabot[bot]
2cd0d4fe0f chore(deps): bump actions/checkout from 4 to 6 (#12034)
Bumps [actions/checkout](https://github.com/actions/checkout) from 4 to
6.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a
href="https://github.com/actions/checkout/releases">actions/checkout's
releases</a>.</em></p>
<blockquote>
<h2>v6.0.0</h2>
<h2>What's Changed</h2>
<ul>
<li>Update README to include Node.js 24 support details and requirements
by <a href="https://github.com/salmanmkc"><code>@​salmanmkc</code></a>
in <a
href="https://redirect.github.com/actions/checkout/pull/2248">actions/checkout#2248</a></li>
<li>Persist creds to a separate file by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2286">actions/checkout#2286</a></li>
<li>v6-beta by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2298">actions/checkout#2298</a></li>
<li>update readme/changelog for v6 by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2311">actions/checkout#2311</a></li>
</ul>
<p><strong>Full Changelog</strong>: <a
href="https://github.com/actions/checkout/compare/v5.0.0...v6.0.0">https://github.com/actions/checkout/compare/v5.0.0...v6.0.0</a></p>
<h2>v6-beta</h2>
<h2>What's Changed</h2>
<p>Updated persist-credentials to store the credentials under
<code>$RUNNER_TEMP</code> instead of directly in the local git
config.</p>
<p>This requires a minimum Actions Runner version of <a
href="https://github.com/actions/runner/releases/tag/v2.329.0">v2.329.0</a>
to access the persisted credentials for <a
href="https://docs.github.com/en/actions/tutorials/use-containerized-services/create-a-docker-container-action">Docker
container action</a> scenarios.</p>
<h2>v5.0.1</h2>
<h2>What's Changed</h2>
<ul>
<li>Port v6 cleanup to v5 by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2301">actions/checkout#2301</a></li>
</ul>
<p><strong>Full Changelog</strong>: <a
href="https://github.com/actions/checkout/compare/v5...v5.0.1">https://github.com/actions/checkout/compare/v5...v5.0.1</a></p>
<h2>v5.0.0</h2>
<h2>What's Changed</h2>
<ul>
<li>Update actions checkout to use node 24 by <a
href="https://github.com/salmanmkc"><code>@​salmanmkc</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2226">actions/checkout#2226</a></li>
<li>Prepare v5.0.0 release by <a
href="https://github.com/salmanmkc"><code>@​salmanmkc</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2238">actions/checkout#2238</a></li>
</ul>
<h2>⚠️ Minimum Compatible Runner Version</h2>
<p><strong>v2.327.1</strong><br />
<a
href="https://github.com/actions/runner/releases/tag/v2.327.1">Release
Notes</a></p>
<p>Make sure your runner is updated to this version or newer to use this
release.</p>
<p><strong>Full Changelog</strong>: <a
href="https://github.com/actions/checkout/compare/v4...v5.0.0">https://github.com/actions/checkout/compare/v4...v5.0.0</a></p>
<h2>v4.3.1</h2>
<h2>What's Changed</h2>
<ul>
<li>Port v6 cleanup to v4 by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2305">actions/checkout#2305</a></li>
</ul>
<p><strong>Full Changelog</strong>: <a
href="https://github.com/actions/checkout/compare/v4...v4.3.1">https://github.com/actions/checkout/compare/v4...v4.3.1</a></p>
<h2>v4.3.0</h2>
<h2>What's Changed</h2>
<ul>
<li>docs: update README.md by <a
href="https://github.com/motss"><code>@​motss</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1971">actions/checkout#1971</a></li>
<li>Add internal repos for checking out multiple repositories by <a
href="https://github.com/mouismail"><code>@​mouismail</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1977">actions/checkout#1977</a></li>
<li>Documentation update - add recommended permissions to Readme by <a
href="https://github.com/benwells"><code>@​benwells</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2043">actions/checkout#2043</a></li>
</ul>
<!-- raw HTML omitted -->
</blockquote>
<p>... (truncated)</p>
</details>
<details>
<summary>Changelog</summary>
<p><em>Sourced from <a
href="https://github.com/actions/checkout/blob/main/CHANGELOG.md">actions/checkout's
changelog</a>.</em></p>
<blockquote>
<h1>Changelog</h1>
<h2>v6.0.2</h2>
<ul>
<li>Fix tag handling: preserve annotations and explicit fetch-tags by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2356">actions/checkout#2356</a></li>
</ul>
<h2>v6.0.1</h2>
<ul>
<li>Add worktree support for persist-credentials includeIf by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2327">actions/checkout#2327</a></li>
</ul>
<h2>v6.0.0</h2>
<ul>
<li>Persist creds to a separate file by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2286">actions/checkout#2286</a></li>
<li>Update README to include Node.js 24 support details and requirements
by <a href="https://github.com/salmanmkc"><code>@​salmanmkc</code></a>
in <a
href="https://redirect.github.com/actions/checkout/pull/2248">actions/checkout#2248</a></li>
</ul>
<h2>v5.0.1</h2>
<ul>
<li>Port v6 cleanup to v5 by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2301">actions/checkout#2301</a></li>
</ul>
<h2>v5.0.0</h2>
<ul>
<li>Update actions checkout to use node 24 by <a
href="https://github.com/salmanmkc"><code>@​salmanmkc</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2226">actions/checkout#2226</a></li>
</ul>
<h2>v4.3.1</h2>
<ul>
<li>Port v6 cleanup to v4 by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2305">actions/checkout#2305</a></li>
</ul>
<h2>v4.3.0</h2>
<ul>
<li>docs: update README.md by <a
href="https://github.com/motss"><code>@​motss</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1971">actions/checkout#1971</a></li>
<li>Add internal repos for checking out multiple repositories by <a
href="https://github.com/mouismail"><code>@​mouismail</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1977">actions/checkout#1977</a></li>
<li>Documentation update - add recommended permissions to Readme by <a
href="https://github.com/benwells"><code>@​benwells</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2043">actions/checkout#2043</a></li>
<li>Adjust positioning of user email note and permissions heading by <a
href="https://github.com/joshmgross"><code>@​joshmgross</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2044">actions/checkout#2044</a></li>
<li>Update README.md by <a
href="https://github.com/nebuk89"><code>@​nebuk89</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2194">actions/checkout#2194</a></li>
<li>Update CODEOWNERS for actions by <a
href="https://github.com/TingluoHuang"><code>@​TingluoHuang</code></a>
in <a
href="https://redirect.github.com/actions/checkout/pull/2224">actions/checkout#2224</a></li>
<li>Update package dependencies by <a
href="https://github.com/salmanmkc"><code>@​salmanmkc</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2236">actions/checkout#2236</a></li>
</ul>
<h2>v4.2.2</h2>
<ul>
<li><code>url-helper.ts</code> now leverages well-known environment
variables by <a href="https://github.com/jww3"><code>@​jww3</code></a>
in <a
href="https://redirect.github.com/actions/checkout/pull/1941">actions/checkout#1941</a></li>
<li>Expand unit test coverage for <code>isGhes</code> by <a
href="https://github.com/jww3"><code>@​jww3</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1946">actions/checkout#1946</a></li>
</ul>
<h2>v4.2.1</h2>
<ul>
<li>Check out other refs/* by commit if provided, fall back to ref by <a
href="https://github.com/orhantoy"><code>@​orhantoy</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1924">actions/checkout#1924</a></li>
</ul>
<h2>v4.2.0</h2>
<ul>
<li>Add Ref and Commit outputs by <a
href="https://github.com/lucacome"><code>@​lucacome</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1180">actions/checkout#1180</a></li>
<li>Dependency updates by <a
href="https://github.com/dependabot"><code>@​dependabot</code></a>- <a
href="https://redirect.github.com/actions/checkout/pull/1777">actions/checkout#1777</a>,
<a
href="https://redirect.github.com/actions/checkout/pull/1872">actions/checkout#1872</a></li>
</ul>
<h2>v4.1.7</h2>
<ul>
<li>Bump the minor-npm-dependencies group across 1 directory with 4
updates by <a
href="https://github.com/dependabot"><code>@​dependabot</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1739">actions/checkout#1739</a></li>
<li>Bump actions/checkout from 3 to 4 by <a
href="https://github.com/dependabot"><code>@​dependabot</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1697">actions/checkout#1697</a></li>
<li>Check out other refs/* by commit by <a
href="https://github.com/orhantoy"><code>@​orhantoy</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1774">actions/checkout#1774</a></li>
<li>Pin actions/checkout's own workflows to a known, good, stable
version. by <a href="https://github.com/jww3"><code>@​jww3</code></a> in
<a
href="https://redirect.github.com/actions/checkout/pull/1776">actions/checkout#1776</a></li>
</ul>
<h2>v4.1.6</h2>
<ul>
<li>Check platform to set archive extension appropriately by <a
href="https://github.com/cory-miller"><code>@​cory-miller</code></a> in
<a
href="https://redirect.github.com/actions/checkout/pull/1732">actions/checkout#1732</a></li>
</ul>
<!-- raw HTML omitted -->
</blockquote>
<p>... (truncated)</p>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a
href="de0fac2e45"><code>de0fac2</code></a>
Fix tag handling: preserve annotations and explicit fetch-tags (<a
href="https://redirect.github.com/actions/checkout/issues/2356">#2356</a>)</li>
<li><a
href="064fe7f331"><code>064fe7f</code></a>
Add orchestration_id to git user-agent when ACTIONS_ORCHESTRATION_ID is
set (...</li>
<li><a
href="8e8c483db8"><code>8e8c483</code></a>
Clarify v6 README (<a
href="https://redirect.github.com/actions/checkout/issues/2328">#2328</a>)</li>
<li><a
href="033fa0dc0b"><code>033fa0d</code></a>
Add worktree support for persist-credentials includeIf (<a
href="https://redirect.github.com/actions/checkout/issues/2327">#2327</a>)</li>
<li><a
href="c2d88d3ecc"><code>c2d88d3</code></a>
Update all references from v5 and v4 to v6 (<a
href="https://redirect.github.com/actions/checkout/issues/2314">#2314</a>)</li>
<li><a
href="1af3b93b68"><code>1af3b93</code></a>
update readme/changelog for v6 (<a
href="https://redirect.github.com/actions/checkout/issues/2311">#2311</a>)</li>
<li><a
href="71cf2267d8"><code>71cf226</code></a>
v6-beta (<a
href="https://redirect.github.com/actions/checkout/issues/2298">#2298</a>)</li>
<li><a
href="069c695914"><code>069c695</code></a>
Persist creds to a separate file (<a
href="https://redirect.github.com/actions/checkout/issues/2286">#2286</a>)</li>
<li><a
href="ff7abcd0c3"><code>ff7abcd</code></a>
Update README to include Node.js 24 support details and requirements (<a
href="https://redirect.github.com/actions/checkout/issues/2248">#2248</a>)</li>
<li><a
href="08c6903cd8"><code>08c6903</code></a>
Prepare v5.0.0 release (<a
href="https://redirect.github.com/actions/checkout/issues/2238">#2238</a>)</li>
<li>Additional commits viewable in <a
href="https://github.com/actions/checkout/compare/v4...v6">compare
view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=actions/checkout&package-manager=github_actions&previous-version=4&new-version=6)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't
alter it yourself. You can also trigger a rebase manually by commenting
`@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits
that have been made to it
- `@dependabot show <dependency name> ignore conditions` will show all
of the ignore conditions of the specified dependency
- `@dependabot ignore this major version` will close this PR and stop
Dependabot creating any more for this major version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop
Dependabot creating any more for this minor version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop
Dependabot creating any more for this dependency (unless you reopen the
PR or upgrade to it yourself)


</details>

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Otto <otto@agpt.co>
2026-02-11 02:25:51 +00:00
Zamil Majdy
44f9536bd6 fix lock 2026-02-11 06:24:41 +04:00
Zamil Majdy
1c1085a227 Merge remote-tracking branch 'origin/dev' into feat/copitlot-claude-code
# Conflicts:
#	autogpt_platform/backend/backend/api/features/chat/config.py
#	autogpt_platform/backend/poetry.lock
2026-02-11 05:30:46 +04:00
Zamil Majdy
d7ef70469e fix(backend/chat): Fix cleanup race condition and move to outer finally
- Use session-specific temp dir (/tmp/copilot-{session_id}) as SDK cwd
  to prevent concurrent sessions from deleting each other's tool-result
  files during cleanup
- Move _cleanup_sdk_tool_results() to outer finally block so it runs
  even when the outer except Exception fires
- Clean up the temp cwd directory after each session
- Remove unnecessary inner try/finally nesting
2026-02-11 05:13:02 +04:00
Zamil Majdy
1926127ddd fix(backend/chat): Fix bugs and remove dead code in SDK integration
- Fix message accumulation bug: reset has_appended_assistant when
  creating new post-tool assistant message to prevent lost text deltas
- Fix hardcoded model in anthropic_fallback.py: use config.model instead
  of hardcoded "claude-sonnet-4-20250514"
- Fix _SDK_TOOL_RESULTS_DIR using hardcoded /root/ path: use expanduser
- Remove unused create_strict_security_hooks (~75 lines)
- Remove unused create_heartbeat/create_usage from response adapter
- Remove unused RAW_TOOL_NAMES from tool_adapter
- Extract _MAX_TOOL_ITERATIONS constant from magic number
2026-02-11 04:42:05 +04:00
Zamil Majdy
8b509e56de refactor(backend/chat): Replace --resume with conversation context, add compaction and dedup
- Remove broken --resume/session file approach (CLI v2.1.38 can't load
  >2 message session files) and delete session_file.py + tests
- Embed prior conversation turns as <conversation_history> context in
  the user message for multi-turn memory
- Add context compaction using shared compress_context() from prompt.py
  with LLM summarization + truncation fallback for long conversations
- Reuse _build_system_prompt and _generate_session_title from parent
  service.py instead of duplicating (gains Langfuse prompt support)
- Add has_conversation_history param to _build_system_prompt to avoid
  greeting on multi-turn conversations
- Fix _SDK_TOOL_RESULTS_GLOB from hardcoded /root/ to expanduser ~/
2026-02-11 04:22:11 +04:00
Zamil Majdy
acb2d0bd1b fix(backend/chat): Resolve symlinks in session file path for --resume
The CLI resolves symlinks when computing its project directory (e.g.
/tmp -> /private/tmp on macOS), so our session file writes must use
the resolved path to match. Also adds cwd to ClaudeAgentOptions and
debug logging for SDK messages.
2026-02-10 20:11:16 +04:00
Zamil Majdy
51aa369c80 fix(backend): Restore PyYAML cp38 wheel entries in poetry.lock
Re-add Python 3.8 wheel entries for PyYAML that were dropped by
poetry lock resolution, keeping the lockfile consistent with dev.
2026-02-10 20:06:45 +04:00
Zamil Majdy
6403ffe353 fix(backend/chat): Use --resume with session files for multi-turn conversations
Replace broken AsyncIterable approach (CLI rejects assistant-type stdin
messages) with JSONL session files written to the CLI's storage directory.
This enables --resume to load full user+assistant context with turn-level
compaction support for long conversations.
2026-02-10 18:46:33 +04:00
Zamil Majdy
c40a98ba3c Merge branches 'feat/copitlot-claude-code' and 'dev' of github.com:Significant-Gravitas/AutoGPT into feat/copitlot-claude-code 2026-02-10 18:19:23 +04:00
Zamil Majdy
a31fc8b162 refactor(backend/chat): Use proper SDK types and in-memory conversation history
Replace duck typing (class name checks, getattr) with isinstance() using
SDK-exported dataclasses. Replace file-based --resume with AsyncIterable
message injection for conversation history, eliminating disk I/O. Add 15
unit tests for the response adapter.
2026-02-10 18:17:00 +04:00
Zamil Majdy
0f2d1a6553 Merge branch 'dev' into feat/copitlot-claude-code 2026-02-10 17:23:06 +04:00
Zamil Majdy
87d817b83b fix(backend/chat): Allow MCP-registered tools through security hook and fix title generation
- Skip BLOCKED_TOOLS check for tools with mcp__copilot__ prefix since they
  are already sandboxed by tool_adapter (fixes Read tool being blocked)
- Fall back to session.messages for title generation when message=None
2026-02-10 17:15:42 +04:00
Zamil Majdy
acf932bf4f refactor(backend/chat): Move glob/os imports to top-level in SDK service 2026-02-10 16:57:11 +04:00
Zamil Majdy
f562d9a277 fix(backend/chat): Add Read tool for SDK oversized tool results
The Claude Agent SDK saves tool results exceeding its token limit to
files and instructs the agent to read them back with a Read tool. Our
MCP server didn't have this tool, breaking the agent on large results
like run_block output (117K+ chars).

Changes:
- Add a Read tool to the MCP server (restricted to /root/.claude/)
- Register it in COPILOT_TOOL_NAMES so the SDK can use it
- Add safety-net truncation at 500K chars for extreme cases
- Clean up SDK tool-result files after each client session
2026-02-10 16:53:04 +04:00
Zamil Majdy
3c92a96504 fix(backend/chat): Publish StreamError before StreamFinish on error paths
When run_ai_generation() or event_generator() encounter errors, they
were only publishing StreamFinish without a preceding StreamError. The
frontend treats finish-without-error as normal completion, leaving the
user with an apparently stuck/empty response requiring a page refresh.
2026-02-10 15:49:23 +04:00
Zamil Majdy
8b8e1df739 fix(backend/chat): Auto-expire stale running tasks to unblock sessions
Tasks stuck in "running" status beyond stream_timeout (300s) are now
auto-marked as failed when looked up, preventing zombie tasks from
blocking the session indefinitely.
2026-02-10 15:35:43 +04:00
Zamil Majdy
602a0a4fb1 fix(backend/chat): Strip tool call noise from conversation history context 2026-02-10 14:11:27 +04:00
Zamil Majdy
8d7d531ae0 refactor(backend/chat): Remove unused max_context_messages config 2026-02-10 13:57:33 +04:00
Zamil Majdy
43153a12e0 fix(backend/chat): Remove manual context truncation from SDK path, let SDK handle compaction 2026-02-10 13:52:49 +04:00
Zamil Majdy
587e11c60a refactor(backend/chat): Extract MCP server name constants to avoid hardcoded strings 2026-02-10 12:12:08 +04:00
Zamil Majdy
57da545e02 Merge branch 'dev' into feat/copitlot-claude-code 2026-02-10 12:10:35 +04:00
Zamil Majdy
626980bf27 Merge branch 'dev' into feat/copitlot-claude-code 2026-02-09 19:26:52 +04:00
Swifty
e42b27af3c Merge branch 'dev' into feat/copitlot-claude-code 2026-02-09 09:12:23 +01:00
Zamil Majdy
34face15d2 fix lock 2026-02-09 11:45:59 +04:00
Zamil Majdy
7d32c83f95 fix(backend/chat): Handle non-serializable SDK objects in tool result output 2026-02-09 10:59:50 +04:00
Zamil Majdy
6e2a45b84e style(backend): Remove unused pytest import in execution_queue_test 2026-02-09 10:14:20 +04:00
Zamil Majdy
32f6532e9c Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into feat/copitlot-claude-code 2026-02-09 10:10:32 +04:00
Zamil Majdy
0bbe8a184d Merge dev and resolve poetry.lock conflict 2026-02-08 19:40:17 +04:00
Zamil Majdy
7592deed63 fix(backend/chat): Address remaining PR review comments
- Fix tool_call_id always being "sdk-call" by generating unique IDs per invocation
- Fix validation using original tool_name instead of clean_name in security hooks
- Fix duplicate StreamFinish in Anthropic fallback path
- Fix ImportError fallback returning plain dict instead of re-raising
- Extract _build_input_schema helper to deduplicate schema construction
- Add else branch for unhandled SDK message types for observability
- Truncate large tool results in conversation history to prevent context overflow
2026-02-08 19:39:10 +04:00
Zamil Majdy
b9c759ce4f fix(backend/chat): Address additional PR review comments
- Add terminal StreamFinish in adapt_sdk_stream if SDK ends without one
- Sanitize error message in adapt_sdk_stream exception handler
- Pass full JSON schema (type, properties, required) to tool decorator
2026-02-08 07:14:45 +04:00
Zamil Majdy
5efb80d47b fix(backend/chat): Address PR review comments for Claude SDK integration
- Add StreamFinish after ErrorMessage in response adapter
- Fix str.replace to removeprefix in security hooks
- Apply max_context_messages limit as safety guard in history formatting
- Add empty prompt guard before sending to SDK
- Sanitize error messages to avoid exposing internal details
- Fix fire-and-forget asyncio.create_task by storing task reference
- Fix tool_calls population on assistant messages
- Rewrite Anthropic fallback to persist messages and merge consecutive roles
- Only use ANTHROPIC_API_KEY for fallback (not OpenRouter keys)
- Fix IndexError when tool result content list is empty
2026-02-06 13:25:10 +04:00
Zamil Majdy
b49d8e2cba fix lock 2026-02-06 13:19:53 +04:00
Zamil Majdy
452544530d feat(chat/sdk): Enable native SDK context compaction
- Remove manual truncation in conversation history formatting
- SDK's automatic compaction handles context limits intelligently
- Add observability hooks:
  - PreCompact: Log when SDK triggers context compaction
  - PostToolUse: Log successful tool executions
  - PostToolUseFailure: Log and debug failed tool executions
- Update config: increase max_context_messages (SDK handles compaction)
2026-02-06 12:44:48 +04:00
Zamil Majdy
32ee7e6cf8 fix(chat): Remove aggressive stale task detection
The 60-second timeout was too aggressive and could incorrectly mark
legitimate long-running tool calls as stale. Relying on Redis TTL
(1 hour) for cleanup is sufficient and more reliable.
2026-02-06 11:45:54 +04:00
Zamil Majdy
670663c406 Merge dev and resolve poetry.lock conflict 2026-02-06 11:40:41 +04:00
Zamil Majdy
0dbe4cf51e feat(backend/chat): Add Claude Agent SDK integration for CoPilot
This PR adds Claude Agent SDK as the default backend for CoPilot chat completions,
replacing the direct OpenAI API integration.

Key changes:
- Add Claude Agent SDK service layer with MCP tool adapter
- Fix message persistence after tool calls (messages no longer disappear on refresh)
- Add OpenRouter tracing for session title generation
- Add security hooks for user context validation
- Add Anthropic fallback when SDK is not available
- Clean up excessive debug logging
2026-02-06 11:38:17 +04:00
246 changed files with 6333 additions and 2370 deletions

View File

@@ -22,7 +22,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v6
with: with:
ref: ${{ github.event.workflow_run.head_branch }} ref: ${{ github.event.workflow_run.head_branch }}
fetch-depth: 0 fetch-depth: 0

View File

@@ -30,7 +30,7 @@ jobs:
actions: read # Required for CI access actions: read # Required for CI access
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v6
with: with:
fetch-depth: 1 fetch-depth: 1

View File

@@ -40,7 +40,7 @@ jobs:
actions: read # Required for CI access actions: read # Required for CI access
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v6
with: with:
fetch-depth: 1 fetch-depth: 1

View File

@@ -58,7 +58,7 @@ jobs:
# your codebase is analyzed, see https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/codeql-code-scanning-for-compiled-languages # your codebase is analyzed, see https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/codeql-code-scanning-for-compiled-languages
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v6
# Initializes the CodeQL tools for scanning. # Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL - name: Initialize CodeQL

View File

@@ -27,7 +27,7 @@ jobs:
# If you do not check out your code, Copilot will do this for you. # If you do not check out your code, Copilot will do this for you.
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v6
with: with:
fetch-depth: 0 fetch-depth: 0
submodules: true submodules: true

View File

@@ -23,7 +23,7 @@ jobs:
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v6
with: with:
fetch-depth: 1 fetch-depth: 1

View File

@@ -23,7 +23,7 @@ jobs:
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v6
with: with:
fetch-depth: 0 fetch-depth: 0

View File

@@ -28,7 +28,7 @@ jobs:
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v6
with: with:
fetch-depth: 1 fetch-depth: 1

View File

@@ -25,7 +25,7 @@ jobs:
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v6
with: with:
ref: ${{ github.event.inputs.git_ref || github.ref_name }} ref: ${{ github.event.inputs.git_ref || github.ref_name }}

View File

@@ -17,7 +17,7 @@ jobs:
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v6
with: with:
ref: ${{ github.ref_name || 'master' }} ref: ${{ github.ref_name || 'master' }}

View File

@@ -68,7 +68,7 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v6
with: with:
fetch-depth: 0 fetch-depth: 0
submodules: true submodules: true

View File

@@ -31,7 +31,7 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v6
- name: Check for component changes - name: Check for component changes
uses: dorny/paths-filter@v3 uses: dorny/paths-filter@v3
@@ -71,7 +71,7 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v6
- name: Set up Node.js - name: Set up Node.js
uses: actions/setup-node@v6 uses: actions/setup-node@v6
@@ -107,7 +107,7 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v6
with: with:
fetch-depth: 0 fetch-depth: 0
@@ -148,7 +148,7 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v6
with: with:
submodules: recursive submodules: recursive
@@ -277,7 +277,7 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v6
with: with:
submodules: recursive submodules: recursive

View File

@@ -29,7 +29,7 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v6
- name: Set up Node.js - name: Set up Node.js
uses: actions/setup-node@v6 uses: actions/setup-node@v6
@@ -63,7 +63,7 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v4 uses: actions/checkout@v6
with: with:
submodules: recursive submodules: recursive

View File

@@ -11,7 +11,7 @@ jobs:
steps: steps:
# - name: Wait some time for all actions to start # - name: Wait some time for all actions to start
# run: sleep 30 # run: sleep 30
- uses: actions/checkout@v4 - uses: actions/checkout@v6
# with: # with:
# fetch-depth: 0 # fetch-depth: 0
- name: Set up Python - name: Set up Python

View File

@@ -1062,14 +1062,14 @@ urllib3 = ">=1.26.0,<3"
[[package]] [[package]]
name = "launchdarkly-server-sdk" name = "launchdarkly-server-sdk"
version = "9.15.0" version = "9.14.1"
description = "LaunchDarkly SDK for Python" description = "LaunchDarkly SDK for Python"
optional = false optional = false
python-versions = ">=3.10" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "launchdarkly_server_sdk-9.15.0-py3-none-any.whl", hash = "sha256:c267e29bfa3fb5e2a06a208448ada6ed5557a2924979b8d79c970b45d227c668"}, {file = "launchdarkly_server_sdk-9.14.1-py3-none-any.whl", hash = "sha256:a9e2bd9ecdef845cd631ae0d4334a1115e5b44257c42eb2349492be4bac7815c"},
{file = "launchdarkly_server_sdk-9.15.0.tar.gz", hash = "sha256:f31441b74bc1a69c381db57c33116509e407a2612628ad6dff0a7dbb39d5020b"}, {file = "launchdarkly_server_sdk-9.14.1.tar.gz", hash = "sha256:1df44baf0a0efa74d8c1dad7a00592b98bce7d19edded7f770da8dbc49922213"},
] ]
[package.dependencies] [package.dependencies]
@@ -1478,14 +1478,14 @@ testing = ["coverage", "pytest", "pytest-benchmark"]
[[package]] [[package]]
name = "postgrest" name = "postgrest"
version = "2.28.0" version = "2.27.2"
description = "PostgREST client for Python. This library provides an ORM interface to PostgREST." description = "PostgREST client for Python. This library provides an ORM interface to PostgREST."
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "postgrest-2.28.0-py3-none-any.whl", hash = "sha256:7bca2f24dd1a1bf8a3d586c7482aba6cd41662da6733045fad585b63b7f7df75"}, {file = "postgrest-2.27.2-py3-none-any.whl", hash = "sha256:1666fef3de05ca097a314433dd5ae2f2d71c613cb7b233d0f468c4ffe37277da"},
{file = "postgrest-2.28.0.tar.gz", hash = "sha256:c36b38646d25ea4255321d3d924ce70f8d20ec7799cb42c1221d6a818d4f6515"}, {file = "postgrest-2.27.2.tar.gz", hash = "sha256:55407d530b5af3d64e883a71fec1f345d369958f723ce4a8ab0b7d169e313242"},
] ]
[package.dependencies] [package.dependencies]
@@ -2135,21 +2135,21 @@ files = [
[[package]] [[package]]
name = "pytest" name = "pytest"
version = "9.0.2" version = "8.4.1"
description = "pytest: simple powerful testing with Python" description = "pytest: simple powerful testing with Python"
optional = false optional = false
python-versions = ">=3.10" python-versions = ">=3.9"
groups = ["dev"] groups = ["dev"]
files = [ files = [
{file = "pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b"}, {file = "pytest-8.4.1-py3-none-any.whl", hash = "sha256:539c70ba6fcead8e78eebbf1115e8b589e7565830d7d006a8723f19ac8a0afb7"},
{file = "pytest-9.0.2.tar.gz", hash = "sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11"}, {file = "pytest-8.4.1.tar.gz", hash = "sha256:7c67fd69174877359ed9371ec3af8a3d2b04741818c51e5e99cc1742251fa93c"},
] ]
[package.dependencies] [package.dependencies]
colorama = {version = ">=0.4", markers = "sys_platform == \"win32\""} colorama = {version = ">=0.4", markers = "sys_platform == \"win32\""}
exceptiongroup = {version = ">=1", markers = "python_version < \"3.11\""} exceptiongroup = {version = ">=1", markers = "python_version < \"3.11\""}
iniconfig = ">=1.0.1" iniconfig = ">=1"
packaging = ">=22" packaging = ">=20"
pluggy = ">=1.5,<2" pluggy = ">=1.5,<2"
pygments = ">=2.7.2" pygments = ">=2.7.2"
tomli = {version = ">=1", markers = "python_version < \"3.11\""} tomli = {version = ">=1", markers = "python_version < \"3.11\""}
@@ -2248,14 +2248,14 @@ cli = ["click (>=5.0)"]
[[package]] [[package]]
name = "realtime" name = "realtime"
version = "2.28.0" version = "2.27.2"
description = "" description = ""
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "realtime-2.28.0-py3-none-any.whl", hash = "sha256:db1bd59bab9b1fcc9f9d3b1a073bed35bf4994d720e6751f10031a58d57a3836"}, {file = "realtime-2.27.2-py3-none-any.whl", hash = "sha256:34a9cbb26a274e707e8fc9e3ee0a66de944beac0fe604dc336d1e985db2c830f"},
{file = "realtime-2.28.0.tar.gz", hash = "sha256:d18cedcebd6a8f22fcd509bc767f639761eb218b7b2b6f14fc4205b6259b50fc"}, {file = "realtime-2.27.2.tar.gz", hash = "sha256:b960a90294d2cea1b3f1275ecb89204304728e08fff1c393cc1b3150739556b3"},
] ]
[package.dependencies] [package.dependencies]
@@ -2265,21 +2265,20 @@ websockets = ">=11,<16"
[[package]] [[package]]
name = "redis" name = "redis"
version = "7.1.1" version = "6.2.0"
description = "Python client for Redis database and key-value store" description = "Python client for Redis database and key-value store"
optional = false optional = false
python-versions = ">=3.10" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "redis-7.1.1-py3-none-any.whl", hash = "sha256:f77817f16071c2950492c67d40b771fa493eb3fccc630a424a10976dbb794b7a"}, {file = "redis-6.2.0-py3-none-any.whl", hash = "sha256:c8ddf316ee0aab65f04a11229e94a64b2618451dab7a67cb2f77eb799d872d5e"},
{file = "redis-7.1.1.tar.gz", hash = "sha256:a2814b2bda15b39dad11391cc48edac4697214a8a5a4bd10abe936ab4892eb43"}, {file = "redis-6.2.0.tar.gz", hash = "sha256:e821f129b75dde6cb99dd35e5c76e8c49512a5a0d8dfdc560b2fbd44b85ca977"},
] ]
[package.dependencies] [package.dependencies]
async-timeout = {version = ">=4.0.3", markers = "python_full_version < \"3.11.3\""} async-timeout = {version = ">=4.0.3", markers = "python_full_version < \"3.11.3\""}
[package.extras] [package.extras]
circuit-breaker = ["pybreaker (>=1.4.0)"]
hiredis = ["hiredis (>=3.2.0)"] hiredis = ["hiredis (>=3.2.0)"]
jwt = ["pyjwt (>=2.9.0)"] jwt = ["pyjwt (>=2.9.0)"]
ocsp = ["cryptography (>=36.0.1)", "pyopenssl (>=20.0.1)", "requests (>=2.31.0)"] ocsp = ["cryptography (>=36.0.1)", "pyopenssl (>=20.0.1)", "requests (>=2.31.0)"]
@@ -2437,14 +2436,14 @@ full = ["httpx (>=0.27.0,<0.29.0)", "itsdangerous", "jinja2", "python-multipart
[[package]] [[package]]
name = "storage3" name = "storage3"
version = "2.28.0" version = "2.27.2"
description = "Supabase Storage client for Python." description = "Supabase Storage client for Python."
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "storage3-2.28.0-py3-none-any.whl", hash = "sha256:ecb50efd2ac71dabbdf97e99ad346eafa630c4c627a8e5a138ceb5fbbadae716"}, {file = "storage3-2.27.2-py3-none-any.whl", hash = "sha256:e6f16e7a260729e7b1f46e9bf61746805a02e30f5e419ee1291007c432e3ec63"},
{file = "storage3-2.28.0.tar.gz", hash = "sha256:bc1d008aff67de7a0f2bd867baee7aadbcdb6f78f5a310b4f7a38e8c13c19865"}, {file = "storage3-2.27.2.tar.gz", hash = "sha256:cb4807b7f86b4bb1272ac6fdd2f3cfd8ba577297046fa5f88557425200275af5"},
] ]
[package.dependencies] [package.dependencies]
@@ -2488,35 +2487,35 @@ python-dateutil = ">=2.6.0"
[[package]] [[package]]
name = "supabase" name = "supabase"
version = "2.28.0" version = "2.27.2"
description = "Supabase client for Python." description = "Supabase client for Python."
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "supabase-2.28.0-py3-none-any.whl", hash = "sha256:42776971c7d0ccca16034df1ab96a31c50228eb1eb19da4249ad2f756fc20272"}, {file = "supabase-2.27.2-py3-none-any.whl", hash = "sha256:d4dce00b3a418ee578017ec577c0e5be47a9a636355009c76f20ed2faa15bc54"},
{file = "supabase-2.28.0.tar.gz", hash = "sha256:aea299aaab2a2eed3c57e0be7fc035c6807214194cce795a3575add20268ece1"}, {file = "supabase-2.27.2.tar.gz", hash = "sha256:2aed40e4f3454438822442a1e94a47be6694c2c70392e7ae99b51a226d4293f7"},
] ]
[package.dependencies] [package.dependencies]
httpx = ">=0.26,<0.29" httpx = ">=0.26,<0.29"
postgrest = "2.28.0" postgrest = "2.27.2"
realtime = "2.28.0" realtime = "2.27.2"
storage3 = "2.28.0" storage3 = "2.27.2"
supabase-auth = "2.28.0" supabase-auth = "2.27.2"
supabase-functions = "2.28.0" supabase-functions = "2.27.2"
yarl = ">=1.22.0" yarl = ">=1.22.0"
[[package]] [[package]]
name = "supabase-auth" name = "supabase-auth"
version = "2.28.0" version = "2.27.2"
description = "Python Client Library for Supabase Auth" description = "Python Client Library for Supabase Auth"
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "supabase_auth-2.28.0-py3-none-any.whl", hash = "sha256:2ac85026cc285054c7fa6d41924f3a333e9ec298c013e5b5e1754039ba7caec9"}, {file = "supabase_auth-2.27.2-py3-none-any.whl", hash = "sha256:78ec25b11314d0a9527a7205f3b1c72560dccdc11b38392f80297ef98664ee91"},
{file = "supabase_auth-2.28.0.tar.gz", hash = "sha256:2bb8f18ff39934e44b28f10918db965659f3735cd6fbfcc022fe0b82dbf8233e"}, {file = "supabase_auth-2.27.2.tar.gz", hash = "sha256:0f5bcc79b3677cb42e9d321f3c559070cfa40d6a29a67672cc8382fb7dc2fe97"},
] ]
[package.dependencies] [package.dependencies]
@@ -2526,14 +2525,14 @@ pyjwt = {version = ">=2.10.1", extras = ["crypto"]}
[[package]] [[package]]
name = "supabase-functions" name = "supabase-functions"
version = "2.28.0" version = "2.27.2"
description = "Library for Supabase Functions" description = "Library for Supabase Functions"
optional = false optional = false
python-versions = ">=3.9" python-versions = ">=3.9"
groups = ["main"] groups = ["main"]
files = [ files = [
{file = "supabase_functions-2.28.0-py3-none-any.whl", hash = "sha256:30bf2d586f8df285faf0621bb5d5bb3ec3157234fc820553ca156f009475e4ae"}, {file = "supabase_functions-2.27.2-py3-none-any.whl", hash = "sha256:db480efc669d0bca07605b9b6f167312af43121adcc842a111f79bea416ef754"},
{file = "supabase_functions-2.28.0.tar.gz", hash = "sha256:db3dddfc37aca5858819eb461130968473bd8c75bd284581013958526dac718b"}, {file = "supabase_functions-2.27.2.tar.gz", hash = "sha256:d0c8266207a94371cb3fd35ad3c7f025b78a97cf026861e04ccd35ac1775f80b"},
] ]
[package.dependencies] [package.dependencies]
@@ -2912,4 +2911,4 @@ type = ["pytest-mypy"]
[metadata] [metadata]
lock-version = "2.1" lock-version = "2.1"
python-versions = ">=3.10,<4.0" python-versions = ">=3.10,<4.0"
content-hash = "3f738dbf158a0b9319387d7251cd557e8e143d4dec809c5ab720321d2b53e368" content-hash = "40eae94995dc0a388fa832ed4af9b6137f28d5b5ced3aaea70d5f91d4d9a179d"

View File

@@ -13,17 +13,17 @@ cryptography = "^46.0"
expiringdict = "^1.2.2" expiringdict = "^1.2.2"
fastapi = "^0.128.0" fastapi = "^0.128.0"
google-cloud-logging = "^3.13.0" google-cloud-logging = "^3.13.0"
launchdarkly-server-sdk = "^9.15.0" launchdarkly-server-sdk = "^9.14.1"
pydantic = "^2.12.5" pydantic = "^2.12.5"
pydantic-settings = "^2.12.0" pydantic-settings = "^2.12.0"
pyjwt = { version = "^2.11.0", extras = ["crypto"] } pyjwt = { version = "^2.11.0", extras = ["crypto"] }
redis = "^7.1.1" redis = "^6.2.0"
supabase = "^2.28.0" supabase = "^2.27.2"
uvicorn = "^0.40.0" uvicorn = "^0.40.0"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
pyright = "^1.1.408" pyright = "^1.1.408"
pytest = "^9.0.2" pytest = "^8.4.1"
pytest-asyncio = "^1.3.0" pytest-asyncio = "^1.3.0"
pytest-mock = "^3.15.1" pytest-mock = "^3.15.1"
pytest-cov = "^7.0.0" pytest-cov = "^7.0.0"

View File

@@ -62,12 +62,18 @@ ENV POETRY_HOME=/opt/poetry \
DEBIAN_FRONTEND=noninteractive DEBIAN_FRONTEND=noninteractive
ENV PATH=/opt/poetry/bin:$PATH ENV PATH=/opt/poetry/bin:$PATH
# Install Python, FFmpeg, and ImageMagick (required for video processing blocks) # Install Python, FFmpeg, ImageMagick, and CLI tools for agent use.
# bubblewrap provides OS-level sandbox (whitelist-only FS + no network)
# for the bash_exec MCP tool.
RUN apt-get update && apt-get install -y \ RUN apt-get update && apt-get install -y \
python3.13 \ python3.13 \
python3-pip \ python3-pip \
ffmpeg \ ffmpeg \
imagemagick \ imagemagick \
jq \
ripgrep \
tree \
bubblewrap \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
# Copy only necessary files from builder # Copy only necessary files from builder

View File

@@ -10,7 +10,7 @@ from typing_extensions import TypedDict
import backend.api.features.store.cache as store_cache import backend.api.features.store.cache as store_cache
import backend.api.features.store.model as store_model import backend.api.features.store.model as store_model
import backend.data.block import backend.blocks
from backend.api.external.middleware import require_permission from backend.api.external.middleware import require_permission
from backend.data import execution as execution_db from backend.data import execution as execution_db
from backend.data import graph as graph_db from backend.data import graph as graph_db
@@ -67,7 +67,7 @@ async def get_user_info(
dependencies=[Security(require_permission(APIKeyPermission.READ_BLOCK))], dependencies=[Security(require_permission(APIKeyPermission.READ_BLOCK))],
) )
async def get_graph_blocks() -> Sequence[dict[Any, Any]]: async def get_graph_blocks() -> Sequence[dict[Any, Any]]:
blocks = [block() for block in backend.data.block.get_blocks().values()] blocks = [block() for block in backend.blocks.get_blocks().values()]
return [b.to_dict() for b in blocks if not b.disabled] return [b.to_dict() for b in blocks if not b.disabled]
@@ -83,7 +83,7 @@ async def execute_graph_block(
require_permission(APIKeyPermission.EXECUTE_BLOCK) require_permission(APIKeyPermission.EXECUTE_BLOCK)
), ),
) -> CompletedBlockOutput: ) -> CompletedBlockOutput:
obj = backend.data.block.get_block(block_id) obj = backend.blocks.get_block(block_id)
if not obj: if not obj:
raise HTTPException(status_code=404, detail=f"Block #{block_id} not found.") raise HTTPException(status_code=404, detail=f"Block #{block_id} not found.")
if obj.disabled: if obj.disabled:

View File

@@ -10,10 +10,15 @@ import backend.api.features.library.db as library_db
import backend.api.features.library.model as library_model import backend.api.features.library.model as library_model
import backend.api.features.store.db as store_db import backend.api.features.store.db as store_db
import backend.api.features.store.model as store_model import backend.api.features.store.model as store_model
import backend.data.block
from backend.blocks import load_all_blocks from backend.blocks import load_all_blocks
from backend.blocks._base import (
AnyBlockSchema,
BlockCategory,
BlockInfo,
BlockSchema,
BlockType,
)
from backend.blocks.llm import LlmModel from backend.blocks.llm import LlmModel
from backend.data.block import AnyBlockSchema, BlockCategory, BlockInfo, BlockSchema
from backend.data.db import query_raw_with_schema from backend.data.db import query_raw_with_schema
from backend.integrations.providers import ProviderName from backend.integrations.providers import ProviderName
from backend.util.cache import cached from backend.util.cache import cached
@@ -22,7 +27,7 @@ from backend.util.models import Pagination
from .model import ( from .model import (
BlockCategoryResponse, BlockCategoryResponse,
BlockResponse, BlockResponse,
BlockType, BlockTypeFilter,
CountResponse, CountResponse,
FilterType, FilterType,
Provider, Provider,
@@ -88,7 +93,7 @@ def get_block_categories(category_blocks: int = 3) -> list[BlockCategoryResponse
def get_blocks( def get_blocks(
*, *,
category: str | None = None, category: str | None = None,
type: BlockType | None = None, type: BlockTypeFilter | None = None,
provider: ProviderName | None = None, provider: ProviderName | None = None,
page: int = 1, page: int = 1,
page_size: int = 50, page_size: int = 50,
@@ -669,9 +674,9 @@ async def get_suggested_blocks(count: int = 5) -> list[BlockInfo]:
for block_type in load_all_blocks().values(): for block_type in load_all_blocks().values():
block: AnyBlockSchema = block_type() block: AnyBlockSchema = block_type()
if block.disabled or block.block_type in ( if block.disabled or block.block_type in (
backend.data.block.BlockType.INPUT, BlockType.INPUT,
backend.data.block.BlockType.OUTPUT, BlockType.OUTPUT,
backend.data.block.BlockType.AGENT, BlockType.AGENT,
): ):
continue continue
# Find the execution count for this block # Find the execution count for this block

View File

@@ -4,7 +4,7 @@ from pydantic import BaseModel
import backend.api.features.library.model as library_model import backend.api.features.library.model as library_model
import backend.api.features.store.model as store_model import backend.api.features.store.model as store_model
from backend.data.block import BlockInfo from backend.blocks._base import BlockInfo
from backend.integrations.providers import ProviderName from backend.integrations.providers import ProviderName
from backend.util.models import Pagination from backend.util.models import Pagination
@@ -15,7 +15,7 @@ FilterType = Literal[
"my_agents", "my_agents",
] ]
BlockType = Literal["all", "input", "action", "output"] BlockTypeFilter = Literal["all", "input", "action", "output"]
class SearchEntry(BaseModel): class SearchEntry(BaseModel):

View File

@@ -88,7 +88,7 @@ async def get_block_categories(
) )
async def get_blocks( async def get_blocks(
category: Annotated[str | None, fastapi.Query()] = None, category: Annotated[str | None, fastapi.Query()] = None,
type: Annotated[builder_model.BlockType | None, fastapi.Query()] = None, type: Annotated[builder_model.BlockTypeFilter | None, fastapi.Query()] = None,
provider: Annotated[ProviderName | None, fastapi.Query()] = None, provider: Annotated[ProviderName | None, fastapi.Query()] = None,
page: Annotated[int, fastapi.Query()] = 1, page: Annotated[int, fastapi.Query()] = 1,
page_size: Annotated[int, fastapi.Query()] = 50, page_size: Annotated[int, fastapi.Query()] = 50,

View File

@@ -27,12 +27,11 @@ class ChatConfig(BaseSettings):
session_ttl: int = Field(default=43200, description="Session TTL in seconds") session_ttl: int = Field(default=43200, description="Session TTL in seconds")
# Streaming Configuration # Streaming Configuration
max_context_messages: int = Field(
default=50, ge=1, le=200, description="Maximum context messages"
)
stream_timeout: int = Field(default=300, description="Stream timeout in seconds") stream_timeout: int = Field(default=300, description="Stream timeout in seconds")
max_retries: int = Field(default=3, description="Maximum number of retries") max_retries: int = Field(
default=3,
description="Max retries for fallback path (SDK handles retries internally)",
)
max_agent_runs: int = Field(default=30, description="Maximum number of agent runs") max_agent_runs: int = Field(default=30, description="Maximum number of agent runs")
max_agent_schedules: int = Field( max_agent_schedules: int = Field(
default=30, description="Maximum number of agent schedules" default=30, description="Maximum number of agent schedules"
@@ -93,6 +92,26 @@ class ChatConfig(BaseSettings):
description="Name of the prompt in Langfuse to fetch", description="Name of the prompt in Langfuse to fetch",
) )
# Claude Agent SDK Configuration
use_claude_agent_sdk: bool = Field(
default=True,
description="Use Claude Agent SDK for chat completions",
)
claude_agent_model: str | None = Field(
default=None,
description="Model for the Claude Agent SDK path. If None, derives from "
"the `model` field by stripping the OpenRouter provider prefix.",
)
claude_agent_max_buffer_size: int = Field(
default=10 * 1024 * 1024, # 10MB (default SDK is 1MB)
description="Max buffer size in bytes for Claude Agent SDK JSON message parsing. "
"Increase if tool outputs exceed the limit.",
)
claude_agent_max_subtasks: int = Field(
default=10,
description="Max number of sub-agent Tasks the SDK can spawn per session.",
)
# Extended thinking configuration for Claude models # Extended thinking configuration for Claude models
thinking_enabled: bool = Field( thinking_enabled: bool = Field(
default=True, default=True,
@@ -138,6 +157,17 @@ class ChatConfig(BaseSettings):
v = os.getenv("CHAT_INTERNAL_API_KEY") v = os.getenv("CHAT_INTERNAL_API_KEY")
return v return v
@field_validator("use_claude_agent_sdk", mode="before")
@classmethod
def get_use_claude_agent_sdk(cls, v):
"""Get use_claude_agent_sdk from environment if not provided."""
# Check environment variable - default to True if not set
env_val = os.getenv("CHAT_USE_CLAUDE_AGENT_SDK", "").lower()
if env_val:
return env_val in ("true", "1", "yes", "on")
# Default to True (SDK enabled by default)
return True if v is None else v
# Prompt paths for different contexts # Prompt paths for different contexts
PROMPT_PATHS: dict[str, str] = { PROMPT_PATHS: dict[str, str] = {
"default": "prompts/chat_system.md", "default": "prompts/chat_system.md",

View File

@@ -2,7 +2,7 @@ import asyncio
import logging import logging
import uuid import uuid
from datetime import UTC, datetime from datetime import UTC, datetime
from typing import Any from typing import Any, cast
from weakref import WeakValueDictionary from weakref import WeakValueDictionary
from openai.types.chat import ( from openai.types.chat import (
@@ -104,6 +104,26 @@ class ChatSession(BaseModel):
successful_agent_runs: dict[str, int] = {} successful_agent_runs: dict[str, int] = {}
successful_agent_schedules: dict[str, int] = {} successful_agent_schedules: dict[str, int] = {}
def add_tool_call_to_current_turn(self, tool_call: dict) -> None:
"""Attach a tool_call to the current turn's assistant message.
Searches backwards for the most recent assistant message (stopping at
any user message boundary). If found, appends the tool_call to it.
Otherwise creates a new assistant message with the tool_call.
"""
for msg in reversed(self.messages):
if msg.role == "user":
break
if msg.role == "assistant":
if not msg.tool_calls:
msg.tool_calls = []
msg.tool_calls.append(tool_call)
return
self.messages.append(
ChatMessage(role="assistant", content="", tool_calls=[tool_call])
)
@staticmethod @staticmethod
def new(user_id: str) -> "ChatSession": def new(user_id: str) -> "ChatSession":
return ChatSession( return ChatSession(
@@ -172,6 +192,47 @@ class ChatSession(BaseModel):
successful_agent_schedules=successful_agent_schedules, successful_agent_schedules=successful_agent_schedules,
) )
@staticmethod
def _merge_consecutive_assistant_messages(
messages: list[ChatCompletionMessageParam],
) -> list[ChatCompletionMessageParam]:
"""Merge consecutive assistant messages into single messages.
Long-running tool flows can create split assistant messages: one with
text content and another with tool_calls. Anthropic's API requires
tool_result blocks to reference a tool_use in the immediately preceding
assistant message, so these splits cause 400 errors via OpenRouter.
"""
if len(messages) < 2:
return messages
result: list[ChatCompletionMessageParam] = [messages[0]]
for msg in messages[1:]:
prev = result[-1]
if prev.get("role") != "assistant" or msg.get("role") != "assistant":
result.append(msg)
continue
prev = cast(ChatCompletionAssistantMessageParam, prev)
curr = cast(ChatCompletionAssistantMessageParam, msg)
curr_content = curr.get("content") or ""
if curr_content:
prev_content = prev.get("content") or ""
prev["content"] = (
f"{prev_content}\n{curr_content}" if prev_content else curr_content
)
curr_tool_calls = curr.get("tool_calls")
if curr_tool_calls:
prev_tool_calls = prev.get("tool_calls")
prev["tool_calls"] = (
list(prev_tool_calls) + list(curr_tool_calls)
if prev_tool_calls
else list(curr_tool_calls)
)
return result
def to_openai_messages(self) -> list[ChatCompletionMessageParam]: def to_openai_messages(self) -> list[ChatCompletionMessageParam]:
messages = [] messages = []
for message in self.messages: for message in self.messages:
@@ -258,7 +319,7 @@ class ChatSession(BaseModel):
name=message.name or "", name=message.name or "",
) )
) )
return messages return self._merge_consecutive_assistant_messages(messages)
async def _get_session_from_cache(session_id: str) -> ChatSession | None: async def _get_session_from_cache(session_id: str) -> ChatSession | None:
@@ -273,9 +334,8 @@ async def _get_session_from_cache(session_id: str) -> ChatSession | None:
try: try:
session = ChatSession.model_validate_json(raw_session) session = ChatSession.model_validate_json(raw_session)
logger.info( logger.info(
f"Loading session {session_id} from cache: " f"[CACHE] Loaded session {session_id}: {len(session.messages)} messages, "
f"message_count={len(session.messages)}, " f"last_roles={[m.role for m in session.messages[-3:]]}" # Last 3 roles
f"roles={[m.role for m in session.messages]}"
) )
return session return session
except Exception as e: except Exception as e:
@@ -317,11 +377,9 @@ async def _get_session_from_db(session_id: str) -> ChatSession | None:
return None return None
messages = prisma_session.Messages messages = prisma_session.Messages
logger.info( logger.debug(
f"Loading session {session_id} from DB: " f"[DB] Loaded session {session_id}: {len(messages) if messages else 0} messages, "
f"has_messages={messages is not None}, " f"roles={[m.role for m in messages[-3:]] if messages else []}" # Last 3 roles
f"message_count={len(messages) if messages else 0}, "
f"roles={[m.role for m in messages] if messages else []}"
) )
return ChatSession.from_db(prisma_session, messages) return ChatSession.from_db(prisma_session, messages)
@@ -372,10 +430,9 @@ async def _save_session_to_db(
"function_call": msg.function_call, "function_call": msg.function_call,
} }
) )
logger.info( logger.debug(
f"Saving {len(new_messages)} new messages to DB for session {session.session_id}: " f"[DB] Saving {len(new_messages)} messages to session {session.session_id}, "
f"roles={[m['role'] for m in messages_data]}, " f"roles={[m['role'] for m in messages_data]}"
f"start_sequence={existing_message_count}"
) )
await chat_db.add_chat_messages_batch( await chat_db.add_chat_messages_batch(
session_id=session.session_id, session_id=session.session_id,
@@ -415,7 +472,7 @@ async def get_chat_session(
logger.warning(f"Unexpected cache error for session {session_id}: {e}") logger.warning(f"Unexpected cache error for session {session_id}: {e}")
# Fall back to database # Fall back to database
logger.info(f"Session {session_id} not in cache, checking database") logger.debug(f"Session {session_id} not in cache, checking database")
session = await _get_session_from_db(session_id) session = await _get_session_from_db(session_id)
if session is None: if session is None:
@@ -432,7 +489,6 @@ async def get_chat_session(
# Cache the session from DB # Cache the session from DB
try: try:
await _cache_session(session) await _cache_session(session)
logger.info(f"Cached session {session_id} from database")
except Exception as e: except Exception as e:
logger.warning(f"Failed to cache session {session_id}: {e}") logger.warning(f"Failed to cache session {session_id}: {e}")
@@ -497,6 +553,40 @@ async def upsert_chat_session(
return session return session
async def append_and_save_message(session_id: str, message: ChatMessage) -> ChatSession:
"""Atomically append a message to a session and persist it.
Acquires the session lock, re-fetches the latest session state,
appends the message, and saves — preventing message loss when
concurrent requests modify the same session.
"""
lock = await _get_session_lock(session_id)
async with lock:
session = await get_chat_session(session_id)
if session is None:
raise ValueError(f"Session {session_id} not found")
session.messages.append(message)
existing_message_count = await chat_db.get_chat_session_message_count(
session_id
)
try:
await _save_session_to_db(session, existing_message_count)
except Exception as e:
raise DatabaseError(
f"Failed to persist message to session {session_id}"
) from e
try:
await _cache_session(session)
except Exception as e:
logger.warning(f"Cache write failed for session {session_id}: {e}")
return session
async def create_chat_session(user_id: str) -> ChatSession: async def create_chat_session(user_id: str) -> ChatSession:
"""Create a new chat session and persist it. """Create a new chat session and persist it.
@@ -603,13 +693,19 @@ async def update_session_title(session_id: str, title: str) -> bool:
logger.warning(f"Session {session_id} not found for title update") logger.warning(f"Session {session_id} not found for title update")
return False return False
# Invalidate cache so next fetch gets updated title # Update title in cache if it exists (instead of invalidating).
# This prevents race conditions where cache invalidation causes
# the frontend to see stale DB data while streaming is still in progress.
try: try:
redis_key = _get_session_cache_key(session_id) cached = await _get_session_from_cache(session_id)
async_redis = await get_redis_async() if cached:
await async_redis.delete(redis_key) cached.title = title
await _cache_session(cached)
except Exception as e: except Exception as e:
logger.warning(f"Failed to invalidate cache for session {session_id}: {e}") # Not critical - title will be correct on next full cache refresh
logger.warning(
f"Failed to update title in cache for session {session_id}: {e}"
)
return True return True
except Exception as e: except Exception as e:

View File

@@ -1,4 +1,16 @@
from typing import cast
import pytest import pytest
from openai.types.chat import (
ChatCompletionAssistantMessageParam,
ChatCompletionMessageParam,
ChatCompletionToolMessageParam,
ChatCompletionUserMessageParam,
)
from openai.types.chat.chat_completion_message_tool_call_param import (
ChatCompletionMessageToolCallParam,
Function,
)
from .model import ( from .model import (
ChatMessage, ChatMessage,
@@ -117,3 +129,205 @@ async def test_chatsession_db_storage(setup_test_user, test_user_id):
loaded.tool_calls is not None loaded.tool_calls is not None
), f"Tool calls missing for {orig.role} message" ), f"Tool calls missing for {orig.role} message"
assert len(orig.tool_calls) == len(loaded.tool_calls) assert len(orig.tool_calls) == len(loaded.tool_calls)
# --------------------------------------------------------------------------- #
# _merge_consecutive_assistant_messages #
# --------------------------------------------------------------------------- #
_tc = ChatCompletionMessageToolCallParam(
id="tc1", type="function", function=Function(name="do_stuff", arguments="{}")
)
_tc2 = ChatCompletionMessageToolCallParam(
id="tc2", type="function", function=Function(name="other", arguments="{}")
)
def test_merge_noop_when_no_consecutive_assistants():
"""Messages without consecutive assistants are returned unchanged."""
msgs = [
ChatCompletionUserMessageParam(role="user", content="hi"),
ChatCompletionAssistantMessageParam(role="assistant", content="hello"),
ChatCompletionUserMessageParam(role="user", content="bye"),
]
merged = ChatSession._merge_consecutive_assistant_messages(msgs)
assert len(merged) == 3
assert [m["role"] for m in merged] == ["user", "assistant", "user"]
def test_merge_splits_text_and_tool_calls():
"""The exact bug scenario: text-only assistant followed by tool_calls-only assistant."""
msgs = [
ChatCompletionUserMessageParam(role="user", content="build agent"),
ChatCompletionAssistantMessageParam(
role="assistant", content="Let me build that"
),
ChatCompletionAssistantMessageParam(
role="assistant", content="", tool_calls=[_tc]
),
ChatCompletionToolMessageParam(role="tool", content="ok", tool_call_id="tc1"),
]
merged = ChatSession._merge_consecutive_assistant_messages(msgs)
assert len(merged) == 3
assert merged[0]["role"] == "user"
assert merged[2]["role"] == "tool"
a = cast(ChatCompletionAssistantMessageParam, merged[1])
assert a["role"] == "assistant"
assert a.get("content") == "Let me build that"
assert a.get("tool_calls") == [_tc]
def test_merge_combines_tool_calls_from_both():
"""Both consecutive assistants have tool_calls — they get merged."""
msgs: list[ChatCompletionAssistantMessageParam] = [
ChatCompletionAssistantMessageParam(
role="assistant", content="text", tool_calls=[_tc]
),
ChatCompletionAssistantMessageParam(
role="assistant", content="", tool_calls=[_tc2]
),
]
merged = ChatSession._merge_consecutive_assistant_messages(msgs) # type: ignore[arg-type]
assert len(merged) == 1
a = cast(ChatCompletionAssistantMessageParam, merged[0])
assert a.get("tool_calls") == [_tc, _tc2]
assert a.get("content") == "text"
def test_merge_three_consecutive_assistants():
"""Three consecutive assistants collapse into one."""
msgs: list[ChatCompletionAssistantMessageParam] = [
ChatCompletionAssistantMessageParam(role="assistant", content="a"),
ChatCompletionAssistantMessageParam(role="assistant", content="b"),
ChatCompletionAssistantMessageParam(
role="assistant", content="", tool_calls=[_tc]
),
]
merged = ChatSession._merge_consecutive_assistant_messages(msgs) # type: ignore[arg-type]
assert len(merged) == 1
a = cast(ChatCompletionAssistantMessageParam, merged[0])
assert a.get("content") == "a\nb"
assert a.get("tool_calls") == [_tc]
def test_merge_empty_and_single_message():
"""Edge cases: empty list and single message."""
assert ChatSession._merge_consecutive_assistant_messages([]) == []
single: list[ChatCompletionMessageParam] = [
ChatCompletionUserMessageParam(role="user", content="hi")
]
assert ChatSession._merge_consecutive_assistant_messages(single) == single
# --------------------------------------------------------------------------- #
# add_tool_call_to_current_turn #
# --------------------------------------------------------------------------- #
_raw_tc = {
"id": "tc1",
"type": "function",
"function": {"name": "f", "arguments": "{}"},
}
_raw_tc2 = {
"id": "tc2",
"type": "function",
"function": {"name": "g", "arguments": "{}"},
}
def test_add_tool_call_appends_to_existing_assistant():
"""When the last assistant is from the current turn, tool_call is added to it."""
session = ChatSession.new(user_id="u")
session.messages = [
ChatMessage(role="user", content="hi"),
ChatMessage(role="assistant", content="working on it"),
]
session.add_tool_call_to_current_turn(_raw_tc)
assert len(session.messages) == 2 # no new message created
assert session.messages[1].tool_calls == [_raw_tc]
def test_add_tool_call_creates_assistant_when_none_exists():
"""When there's no current-turn assistant, a new one is created."""
session = ChatSession.new(user_id="u")
session.messages = [
ChatMessage(role="user", content="hi"),
]
session.add_tool_call_to_current_turn(_raw_tc)
assert len(session.messages) == 2
assert session.messages[1].role == "assistant"
assert session.messages[1].tool_calls == [_raw_tc]
def test_add_tool_call_does_not_cross_user_boundary():
"""A user message acts as a boundary — previous assistant is not modified."""
session = ChatSession.new(user_id="u")
session.messages = [
ChatMessage(role="assistant", content="old turn"),
ChatMessage(role="user", content="new message"),
]
session.add_tool_call_to_current_turn(_raw_tc)
assert len(session.messages) == 3 # new assistant was created
assert session.messages[0].tool_calls is None # old assistant untouched
assert session.messages[2].role == "assistant"
assert session.messages[2].tool_calls == [_raw_tc]
def test_add_tool_call_multiple_times():
"""Multiple long-running tool calls accumulate on the same assistant."""
session = ChatSession.new(user_id="u")
session.messages = [
ChatMessage(role="user", content="hi"),
ChatMessage(role="assistant", content="doing stuff"),
]
session.add_tool_call_to_current_turn(_raw_tc)
# Simulate a pending tool result in between (like _yield_tool_call does)
session.messages.append(
ChatMessage(role="tool", content="pending", tool_call_id="tc1")
)
session.add_tool_call_to_current_turn(_raw_tc2)
assert len(session.messages) == 3 # user, assistant, tool — no extra assistant
assert session.messages[1].tool_calls == [_raw_tc, _raw_tc2]
def test_to_openai_messages_merges_split_assistants():
"""End-to-end: session with split assistants produces valid OpenAI messages."""
session = ChatSession.new(user_id="u")
session.messages = [
ChatMessage(role="user", content="build agent"),
ChatMessage(role="assistant", content="Let me build that"),
ChatMessage(
role="assistant",
content="",
tool_calls=[
{
"id": "tc1",
"type": "function",
"function": {"name": "create_agent", "arguments": "{}"},
}
],
),
ChatMessage(role="tool", content="done", tool_call_id="tc1"),
ChatMessage(role="assistant", content="Saved!"),
ChatMessage(role="user", content="show me an example run"),
]
openai_msgs = session.to_openai_messages()
# The two consecutive assistants at index 1,2 should be merged
roles = [m["role"] for m in openai_msgs]
assert roles == ["user", "assistant", "tool", "assistant", "user"]
# The merged assistant should have both content and tool_calls
merged = cast(ChatCompletionAssistantMessageParam, openai_msgs[1])
assert merged.get("content") == "Let me build that"
tc_list = merged.get("tool_calls")
assert tc_list is not None and len(list(tc_list)) == 1
assert list(tc_list)[0]["id"] == "tc1"

View File

@@ -10,6 +10,8 @@ from typing import Any
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from backend.util.json import dumps as json_dumps
class ResponseType(str, Enum): class ResponseType(str, Enum):
"""Types of streaming responses following AI SDK protocol.""" """Types of streaming responses following AI SDK protocol."""
@@ -193,6 +195,18 @@ class StreamError(StreamBaseResponse):
default=None, description="Additional error details" default=None, description="Additional error details"
) )
def to_sse(self) -> str:
"""Convert to SSE format, only emitting fields required by AI SDK protocol.
The AI SDK uses z.strictObject({type, errorText}) which rejects
any extra fields like `code` or `details`.
"""
data = {
"type": self.type.value,
"errorText": self.errorText,
}
return f"data: {json_dumps(data)}\n\n"
class StreamHeartbeat(StreamBaseResponse): class StreamHeartbeat(StreamBaseResponse):
"""Heartbeat to keep SSE connection alive during long-running operations. """Heartbeat to keep SSE connection alive during long-running operations.

View File

@@ -1,5 +1,6 @@
"""Chat API routes for chat session management and streaming via SSE.""" """Chat API routes for chat session management and streaming via SSE."""
import asyncio
import logging import logging
import uuid as uuid_module import uuid as uuid_module
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
@@ -11,13 +12,22 @@ from fastapi.responses import StreamingResponse
from pydantic import BaseModel from pydantic import BaseModel
from backend.util.exceptions import NotFoundError from backend.util.exceptions import NotFoundError
from backend.util.feature_flag import Flag, is_feature_enabled
from . import service as chat_service from . import service as chat_service
from . import stream_registry from . import stream_registry
from .completion_handler import process_operation_failure, process_operation_success from .completion_handler import process_operation_failure, process_operation_success
from .config import ChatConfig from .config import ChatConfig
from .model import ChatSession, create_chat_session, get_chat_session, get_user_sessions from .model import (
from .response_model import StreamFinish, StreamHeartbeat ChatMessage,
ChatSession,
append_and_save_message,
create_chat_session,
get_chat_session,
get_user_sessions,
)
from .response_model import StreamError, StreamFinish, StreamHeartbeat, StreamStart
from .sdk import service as sdk_service
from .tools.models import ( from .tools.models import (
AgentDetailsResponse, AgentDetailsResponse,
AgentOutputResponse, AgentOutputResponse,
@@ -40,6 +50,7 @@ from .tools.models import (
SetupRequirementsResponse, SetupRequirementsResponse,
UnderstandingUpdatedResponse, UnderstandingUpdatedResponse,
) )
from .tracking import track_user_message
config = ChatConfig() config = ChatConfig()
@@ -231,6 +242,10 @@ async def get_session(
active_task, last_message_id = await stream_registry.get_active_task_for_session( active_task, last_message_id = await stream_registry.get_active_task_for_session(
session_id, user_id session_id, user_id
) )
logger.info(
f"[GET_SESSION] session={session_id}, active_task={active_task is not None}, "
f"msg_count={len(messages)}, last_role={messages[-1].get('role') if messages else 'none'}"
)
if active_task: if active_task:
# Filter out the in-progress assistant message from the session response. # Filter out the in-progress assistant message from the session response.
# The client will receive the complete assistant response through the SSE # The client will receive the complete assistant response through the SSE
@@ -300,10 +315,9 @@ async def stream_chat_post(
f"user={user_id}, message_len={len(request.message)}", f"user={user_id}, message_len={len(request.message)}",
extra={"json_fields": log_meta}, extra={"json_fields": log_meta},
) )
session = await _validate_and_get_session(session_id, user_id) session = await _validate_and_get_session(session_id, user_id)
logger.info( logger.info(
f"[TIMING] session validated in {(time.perf_counter() - stream_start_time)*1000:.1f}ms", f"[TIMING] session validated in {(time.perf_counter() - stream_start_time) * 1000:.1f}ms",
extra={ extra={
"json_fields": { "json_fields": {
**log_meta, **log_meta,
@@ -312,6 +326,25 @@ async def stream_chat_post(
}, },
) )
# Atomically append user message to session BEFORE creating task to avoid
# race condition where GET_SESSION sees task as "running" but message isn't
# saved yet. append_and_save_message re-fetches inside a lock to prevent
# message loss from concurrent requests.
if request.message:
message = ChatMessage(
role="user" if request.is_user_message else "assistant",
content=request.message,
)
if request.is_user_message:
track_user_message(
user_id=user_id,
session_id=session_id,
message_length=len(request.message),
)
logger.info(f"[STREAM] Saving user message to session {session_id}")
session = await append_and_save_message(session_id, message)
logger.info(f"[STREAM] User message saved for session {session_id}")
# Create a task in the stream registry for reconnection support # Create a task in the stream registry for reconnection support
task_id = str(uuid_module.uuid4()) task_id = str(uuid_module.uuid4())
operation_id = str(uuid_module.uuid4()) operation_id = str(uuid_module.uuid4())
@@ -327,7 +360,7 @@ async def stream_chat_post(
operation_id=operation_id, operation_id=operation_id,
) )
logger.info( logger.info(
f"[TIMING] create_task completed in {(time.perf_counter() - task_create_start)*1000:.1f}ms", f"[TIMING] create_task completed in {(time.perf_counter() - task_create_start) * 1000:.1f}ms",
extra={ extra={
"json_fields": { "json_fields": {
**log_meta, **log_meta,
@@ -348,15 +381,47 @@ async def stream_chat_post(
first_chunk_time, ttfc = None, None first_chunk_time, ttfc = None, None
chunk_count = 0 chunk_count = 0
try: try:
async for chunk in chat_service.stream_chat_completion( # Emit a start event with task_id for reconnection
start_chunk = StreamStart(messageId=task_id, taskId=task_id)
await stream_registry.publish_chunk(task_id, start_chunk)
logger.info(
f"[TIMING] StreamStart published at {(time_module.perf_counter() - gen_start_time) * 1000:.1f}ms",
extra={
"json_fields": {
**log_meta,
"elapsed_ms": (time_module.perf_counter() - gen_start_time)
* 1000,
}
},
)
# Choose service based on LaunchDarkly flag (falls back to config default)
use_sdk = await is_feature_enabled(
Flag.COPILOT_SDK,
user_id or "anonymous",
default=config.use_claude_agent_sdk,
)
stream_fn = (
sdk_service.stream_chat_completion_sdk
if use_sdk
else chat_service.stream_chat_completion
)
logger.info(
f"[TIMING] Calling {'sdk' if use_sdk else 'standard'} stream_chat_completion",
extra={"json_fields": log_meta},
)
# Pass message=None since we already added it to the session above
async for chunk in stream_fn(
session_id, session_id,
request.message, None, # Message already in session
is_user_message=request.is_user_message, is_user_message=request.is_user_message,
user_id=user_id, user_id=user_id,
session=session, # Pass pre-fetched session to avoid double-fetch session=session, # Pass session with message already added
context=request.context, context=request.context,
_task_id=task_id, # Pass task_id so service emits start with taskId for reconnection
): ):
# Skip duplicate StreamStart — we already published one above
if isinstance(chunk, StreamStart):
continue
chunk_count += 1 chunk_count += 1
if first_chunk_time is None: if first_chunk_time is None:
first_chunk_time = time_module.perf_counter() first_chunk_time = time_module.perf_counter()
@@ -377,7 +442,7 @@ async def stream_chat_post(
gen_end_time = time_module.perf_counter() gen_end_time = time_module.perf_counter()
total_time = (gen_end_time - gen_start_time) * 1000 total_time = (gen_end_time - gen_start_time) * 1000
logger.info( logger.info(
f"[TIMING] run_ai_generation FINISHED in {total_time/1000:.1f}s; " f"[TIMING] run_ai_generation FINISHED in {total_time / 1000:.1f}s; "
f"task={task_id}, session={session_id}, " f"task={task_id}, session={session_id}, "
f"ttfc={ttfc or -1:.2f}s, n_chunks={chunk_count}", f"ttfc={ttfc or -1:.2f}s, n_chunks={chunk_count}",
extra={ extra={
@@ -404,6 +469,17 @@ async def stream_chat_post(
} }
}, },
) )
# Publish a StreamError so the frontend can display an error message
try:
await stream_registry.publish_chunk(
task_id,
StreamError(
errorText="An error occurred. Please try again.",
code="stream_error",
),
)
except Exception:
pass # Best-effort; mark_task_completed will publish StreamFinish
await stream_registry.mark_task_completed(task_id, "failed") await stream_registry.mark_task_completed(task_id, "failed")
# Start the AI generation in a background task # Start the AI generation in a background task
@@ -506,8 +582,14 @@ async def stream_chat_post(
"json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)} "json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)}
}, },
) )
# Surface error to frontend so it doesn't appear stuck
yield StreamError(
errorText="An error occurred. Please try again.",
code="stream_error",
).to_sse()
yield StreamFinish().to_sse()
finally: finally:
# Unsubscribe when client disconnects or stream ends to prevent resource leak # Unsubscribe when client disconnects or stream ends
if subscriber_queue is not None: if subscriber_queue is not None:
try: try:
await stream_registry.unsubscribe_from_task( await stream_registry.unsubscribe_from_task(
@@ -751,8 +833,6 @@ async def stream_task(
) )
async def event_generator() -> AsyncGenerator[str, None]: async def event_generator() -> AsyncGenerator[str, None]:
import asyncio
heartbeat_interval = 15.0 # Send heartbeat every 15 seconds heartbeat_interval = 15.0 # Send heartbeat every 15 seconds
try: try:
while True: while True:

View File

@@ -0,0 +1,14 @@
"""Claude Agent SDK integration for CoPilot.
This module provides the integration layer between the Claude Agent SDK
and the existing CoPilot tool system, enabling drop-in replacement of
the current LLM orchestration with the battle-tested Claude Agent SDK.
"""
from .service import stream_chat_completion_sdk
from .tool_adapter import create_copilot_mcp_server
__all__ = [
"stream_chat_completion_sdk",
"create_copilot_mcp_server",
]

View File

@@ -0,0 +1,198 @@
"""Response adapter for converting Claude Agent SDK messages to Vercel AI SDK format.
This module provides the adapter layer that converts streaming messages from
the Claude Agent SDK into the Vercel AI SDK UI Stream Protocol format that
the frontend expects.
"""
import json
import logging
import uuid
from claude_agent_sdk import (
AssistantMessage,
Message,
ResultMessage,
SystemMessage,
TextBlock,
ToolResultBlock,
ToolUseBlock,
UserMessage,
)
from backend.api.features.chat.response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamFinishStep,
StreamStart,
StreamStartStep,
StreamTextDelta,
StreamTextEnd,
StreamTextStart,
StreamToolInputAvailable,
StreamToolInputStart,
StreamToolOutputAvailable,
)
from backend.api.features.chat.sdk.tool_adapter import (
MCP_TOOL_PREFIX,
pop_pending_tool_output,
)
logger = logging.getLogger(__name__)
class SDKResponseAdapter:
"""Adapter for converting Claude Agent SDK messages to Vercel AI SDK format.
This class maintains state during a streaming session to properly track
text blocks, tool calls, and message lifecycle.
"""
def __init__(self, message_id: str | None = None):
self.message_id = message_id or str(uuid.uuid4())
self.text_block_id = str(uuid.uuid4())
self.has_started_text = False
self.has_ended_text = False
self.current_tool_calls: dict[str, dict[str, str]] = {}
self.task_id: str | None = None
self.step_open = False
def set_task_id(self, task_id: str) -> None:
"""Set the task ID for reconnection support."""
self.task_id = task_id
def convert_message(self, sdk_message: Message) -> list[StreamBaseResponse]:
"""Convert a single SDK message to Vercel AI SDK format."""
responses: list[StreamBaseResponse] = []
if isinstance(sdk_message, SystemMessage):
if sdk_message.subtype == "init":
responses.append(
StreamStart(messageId=self.message_id, taskId=self.task_id)
)
# Open the first step (matches non-SDK: StreamStart then StreamStartStep)
responses.append(StreamStartStep())
self.step_open = True
elif isinstance(sdk_message, AssistantMessage):
# After tool results, the SDK sends a new AssistantMessage for the
# next LLM turn. Open a new step if the previous one was closed.
if not self.step_open:
responses.append(StreamStartStep())
self.step_open = True
for block in sdk_message.content:
if isinstance(block, TextBlock):
if block.text:
self._ensure_text_started(responses)
responses.append(
StreamTextDelta(id=self.text_block_id, delta=block.text)
)
elif isinstance(block, ToolUseBlock):
self._end_text_if_open(responses)
# Strip MCP prefix so frontend sees "find_block"
# instead of "mcp__copilot__find_block".
tool_name = block.name.removeprefix(MCP_TOOL_PREFIX)
responses.append(
StreamToolInputStart(toolCallId=block.id, toolName=tool_name)
)
responses.append(
StreamToolInputAvailable(
toolCallId=block.id,
toolName=tool_name,
input=block.input,
)
)
self.current_tool_calls[block.id] = {"name": tool_name}
elif isinstance(sdk_message, UserMessage):
# UserMessage carries tool results back from tool execution.
content = sdk_message.content
blocks = content if isinstance(content, list) else []
for block in blocks:
if isinstance(block, ToolResultBlock) and block.tool_use_id:
tool_info = self.current_tool_calls.get(block.tool_use_id, {})
tool_name = tool_info.get("name", "unknown")
# Prefer the stashed full output over the SDK's
# (potentially truncated) ToolResultBlock content.
# The SDK truncates large results, writing them to disk,
# which breaks frontend widget parsing.
output = pop_pending_tool_output(tool_name) or (
_extract_tool_output(block.content)
)
responses.append(
StreamToolOutputAvailable(
toolCallId=block.tool_use_id,
toolName=tool_name,
output=output,
success=not (block.is_error or False),
)
)
# Close the current step after tool results — the next
# AssistantMessage will open a new step for the continuation.
if self.step_open:
responses.append(StreamFinishStep())
self.step_open = False
elif isinstance(sdk_message, ResultMessage):
self._end_text_if_open(responses)
# Close the step before finishing.
if self.step_open:
responses.append(StreamFinishStep())
self.step_open = False
if sdk_message.subtype == "success":
responses.append(StreamFinish())
elif sdk_message.subtype in ("error", "error_during_execution"):
error_msg = getattr(sdk_message, "result", None) or "Unknown error"
responses.append(
StreamError(errorText=str(error_msg), code="sdk_error")
)
responses.append(StreamFinish())
else:
logger.debug(f"Unhandled SDK message type: {type(sdk_message).__name__}")
return responses
def _ensure_text_started(self, responses: list[StreamBaseResponse]) -> None:
"""Start (or restart) a text block if needed."""
if not self.has_started_text or self.has_ended_text:
if self.has_ended_text:
self.text_block_id = str(uuid.uuid4())
self.has_ended_text = False
responses.append(StreamTextStart(id=self.text_block_id))
self.has_started_text = True
def _end_text_if_open(self, responses: list[StreamBaseResponse]) -> None:
"""End the current text block if one is open."""
if self.has_started_text and not self.has_ended_text:
responses.append(StreamTextEnd(id=self.text_block_id))
self.has_ended_text = True
def _extract_tool_output(content: str | list[dict[str, str]] | None) -> str:
"""Extract a string output from a ToolResultBlock's content field."""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = [item.get("text", "") for item in content if item.get("type") == "text"]
if parts:
return "".join(parts)
try:
return json.dumps(content)
except (TypeError, ValueError):
return str(content)
if content is None:
return ""
try:
return json.dumps(content)
except (TypeError, ValueError):
return str(content)

View File

@@ -0,0 +1,366 @@
"""Unit tests for the SDK response adapter."""
from claude_agent_sdk import (
AssistantMessage,
ResultMessage,
SystemMessage,
TextBlock,
ToolResultBlock,
ToolUseBlock,
UserMessage,
)
from backend.api.features.chat.response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamFinishStep,
StreamStart,
StreamStartStep,
StreamTextDelta,
StreamTextEnd,
StreamTextStart,
StreamToolInputAvailable,
StreamToolInputStart,
StreamToolOutputAvailable,
)
from .response_adapter import SDKResponseAdapter
from .tool_adapter import MCP_TOOL_PREFIX
def _adapter() -> SDKResponseAdapter:
a = SDKResponseAdapter(message_id="msg-1")
a.set_task_id("task-1")
return a
# -- SystemMessage -----------------------------------------------------------
def test_system_init_emits_start_and_step():
adapter = _adapter()
results = adapter.convert_message(SystemMessage(subtype="init", data={}))
assert len(results) == 2
assert isinstance(results[0], StreamStart)
assert results[0].messageId == "msg-1"
assert results[0].taskId == "task-1"
assert isinstance(results[1], StreamStartStep)
def test_system_non_init_emits_nothing():
adapter = _adapter()
results = adapter.convert_message(SystemMessage(subtype="other", data={}))
assert results == []
# -- AssistantMessage with TextBlock -----------------------------------------
def test_text_block_emits_step_start_and_delta():
adapter = _adapter()
msg = AssistantMessage(content=[TextBlock(text="hello")], model="test")
results = adapter.convert_message(msg)
assert len(results) == 3
assert isinstance(results[0], StreamStartStep)
assert isinstance(results[1], StreamTextStart)
assert isinstance(results[2], StreamTextDelta)
assert results[2].delta == "hello"
def test_empty_text_block_emits_only_step():
adapter = _adapter()
msg = AssistantMessage(content=[TextBlock(text="")], model="test")
results = adapter.convert_message(msg)
# Empty text skipped, but step still opens
assert len(results) == 1
assert isinstance(results[0], StreamStartStep)
def test_multiple_text_deltas_reuse_block_id():
adapter = _adapter()
msg1 = AssistantMessage(content=[TextBlock(text="a")], model="test")
msg2 = AssistantMessage(content=[TextBlock(text="b")], model="test")
r1 = adapter.convert_message(msg1)
r2 = adapter.convert_message(msg2)
# First gets step+start+delta, second only delta (block & step already started)
assert len(r1) == 3
assert isinstance(r1[0], StreamStartStep)
assert isinstance(r1[1], StreamTextStart)
assert len(r2) == 1
assert isinstance(r2[0], StreamTextDelta)
assert r1[1].id == r2[0].id # same block ID
# -- AssistantMessage with ToolUseBlock --------------------------------------
def test_tool_use_emits_input_start_and_available():
"""Tool names arrive with MCP prefix and should be stripped for the frontend."""
adapter = _adapter()
msg = AssistantMessage(
content=[
ToolUseBlock(
id="tool-1",
name=f"{MCP_TOOL_PREFIX}find_agent",
input={"q": "x"},
)
],
model="test",
)
results = adapter.convert_message(msg)
assert len(results) == 3
assert isinstance(results[0], StreamStartStep)
assert isinstance(results[1], StreamToolInputStart)
assert results[1].toolCallId == "tool-1"
assert results[1].toolName == "find_agent" # prefix stripped
assert isinstance(results[2], StreamToolInputAvailable)
assert results[2].toolName == "find_agent" # prefix stripped
assert results[2].input == {"q": "x"}
def test_text_then_tool_ends_text_block():
adapter = _adapter()
text_msg = AssistantMessage(content=[TextBlock(text="thinking...")], model="test")
tool_msg = AssistantMessage(
content=[ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}tool", input={})],
model="test",
)
adapter.convert_message(text_msg) # opens step + text
results = adapter.convert_message(tool_msg)
# Step already open, so: TextEnd, ToolInputStart, ToolInputAvailable
assert len(results) == 3
assert isinstance(results[0], StreamTextEnd)
assert isinstance(results[1], StreamToolInputStart)
# -- UserMessage with ToolResultBlock ----------------------------------------
def test_tool_result_emits_output_and_finish_step():
adapter = _adapter()
# First register the tool call (opens step) — SDK sends prefixed name
tool_msg = AssistantMessage(
content=[ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}find_agent", input={})],
model="test",
)
adapter.convert_message(tool_msg)
# Now send tool result
result_msg = UserMessage(
content=[ToolResultBlock(tool_use_id="t1", content="found 3 agents")]
)
results = adapter.convert_message(result_msg)
assert len(results) == 2
assert isinstance(results[0], StreamToolOutputAvailable)
assert results[0].toolCallId == "t1"
assert results[0].toolName == "find_agent" # prefix stripped
assert results[0].output == "found 3 agents"
assert results[0].success is True
assert isinstance(results[1], StreamFinishStep)
def test_tool_result_error():
adapter = _adapter()
adapter.convert_message(
AssistantMessage(
content=[
ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}run_agent", input={})
],
model="test",
)
)
result_msg = UserMessage(
content=[ToolResultBlock(tool_use_id="t1", content="timeout", is_error=True)]
)
results = adapter.convert_message(result_msg)
assert isinstance(results[0], StreamToolOutputAvailable)
assert results[0].success is False
assert isinstance(results[1], StreamFinishStep)
def test_tool_result_list_content():
adapter = _adapter()
adapter.convert_message(
AssistantMessage(
content=[ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}tool", input={})],
model="test",
)
)
result_msg = UserMessage(
content=[
ToolResultBlock(
tool_use_id="t1",
content=[
{"type": "text", "text": "line1"},
{"type": "text", "text": "line2"},
],
)
]
)
results = adapter.convert_message(result_msg)
assert isinstance(results[0], StreamToolOutputAvailable)
assert results[0].output == "line1line2"
assert isinstance(results[1], StreamFinishStep)
def test_string_user_message_ignored():
"""A plain string UserMessage (not tool results) produces no output."""
adapter = _adapter()
results = adapter.convert_message(UserMessage(content="hello"))
assert results == []
# -- ResultMessage -----------------------------------------------------------
def test_result_success_emits_finish_step_and_finish():
adapter = _adapter()
# Start some text first (opens step)
adapter.convert_message(
AssistantMessage(content=[TextBlock(text="done")], model="test")
)
msg = ResultMessage(
subtype="success",
duration_ms=100,
duration_api_ms=50,
is_error=False,
num_turns=1,
session_id="s1",
)
results = adapter.convert_message(msg)
# TextEnd + FinishStep + StreamFinish
assert len(results) == 3
assert isinstance(results[0], StreamTextEnd)
assert isinstance(results[1], StreamFinishStep)
assert isinstance(results[2], StreamFinish)
def test_result_error_emits_error_and_finish():
adapter = _adapter()
msg = ResultMessage(
subtype="error",
duration_ms=100,
duration_api_ms=50,
is_error=True,
num_turns=0,
session_id="s1",
result="API rate limited",
)
results = adapter.convert_message(msg)
# No step was open, so no FinishStep — just Error + Finish
assert len(results) == 2
assert isinstance(results[0], StreamError)
assert "API rate limited" in results[0].errorText
assert isinstance(results[1], StreamFinish)
# -- Text after tools (new block ID) ----------------------------------------
def test_text_after_tool_gets_new_block_id():
adapter = _adapter()
# Text -> Tool -> ToolResult -> Text should get a new text block ID and step
adapter.convert_message(
AssistantMessage(content=[TextBlock(text="before")], model="test")
)
adapter.convert_message(
AssistantMessage(
content=[ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}tool", input={})],
model="test",
)
)
# Send tool result (closes step)
adapter.convert_message(
UserMessage(content=[ToolResultBlock(tool_use_id="t1", content="ok")])
)
results = adapter.convert_message(
AssistantMessage(content=[TextBlock(text="after")], model="test")
)
# Should get StreamStartStep (new step) + StreamTextStart (new block) + StreamTextDelta
assert len(results) == 3
assert isinstance(results[0], StreamStartStep)
assert isinstance(results[1], StreamTextStart)
assert isinstance(results[2], StreamTextDelta)
assert results[2].delta == "after"
# -- Full conversation flow --------------------------------------------------
def test_full_conversation_flow():
"""Simulate a complete conversation: init -> text -> tool -> result -> text -> finish."""
adapter = _adapter()
all_responses: list[StreamBaseResponse] = []
# 1. Init
all_responses.extend(
adapter.convert_message(SystemMessage(subtype="init", data={}))
)
# 2. Assistant text
all_responses.extend(
adapter.convert_message(
AssistantMessage(content=[TextBlock(text="Let me search")], model="test")
)
)
# 3. Tool use
all_responses.extend(
adapter.convert_message(
AssistantMessage(
content=[
ToolUseBlock(
id="t1",
name=f"{MCP_TOOL_PREFIX}find_agent",
input={"query": "email"},
)
],
model="test",
)
)
)
# 4. Tool result
all_responses.extend(
adapter.convert_message(
UserMessage(
content=[ToolResultBlock(tool_use_id="t1", content="Found 2 agents")]
)
)
)
# 5. More text
all_responses.extend(
adapter.convert_message(
AssistantMessage(content=[TextBlock(text="I found 2")], model="test")
)
)
# 6. Result
all_responses.extend(
adapter.convert_message(
ResultMessage(
subtype="success",
duration_ms=500,
duration_api_ms=400,
is_error=False,
num_turns=2,
session_id="s1",
)
)
)
types = [type(r).__name__ for r in all_responses]
assert types == [
"StreamStart",
"StreamStartStep", # step 1: text + tool call
"StreamTextStart",
"StreamTextDelta", # "Let me search"
"StreamTextEnd", # closed before tool
"StreamToolInputStart",
"StreamToolInputAvailable",
"StreamToolOutputAvailable", # tool result
"StreamFinishStep", # step 1 closed after tool result
"StreamStartStep", # step 2: continuation text
"StreamTextStart", # new block after tool
"StreamTextDelta", # "I found 2"
"StreamTextEnd", # closed by result
"StreamFinishStep", # step 2 closed
"StreamFinish",
]

View File

@@ -0,0 +1,296 @@
"""Security hooks for Claude Agent SDK integration.
This module provides security hooks that validate tool calls before execution,
ensuring multi-user isolation and preventing unauthorized operations.
"""
import json
import logging
import os
import re
from typing import Any, cast
from backend.api.features.chat.sdk.tool_adapter import MCP_TOOL_PREFIX
logger = logging.getLogger(__name__)
# Tools that are blocked entirely (CLI/system access).
# "Bash" (capital) is the SDK built-in — it's NOT in allowed_tools but blocked
# here as defence-in-depth. The agent uses mcp__copilot__bash_exec instead,
# which has kernel-level network isolation (unshare --net).
BLOCKED_TOOLS = {
"Bash",
"bash",
"shell",
"exec",
"terminal",
"command",
}
# Tools allowed only when their path argument stays within the SDK workspace.
# The SDK uses these to handle oversized tool results (writes to tool-results/
# files, then reads them back) and for workspace file operations.
WORKSPACE_SCOPED_TOOLS = {"Read", "Write", "Edit", "Glob", "Grep"}
# Dangerous patterns in tool inputs
DANGEROUS_PATTERNS = [
r"sudo",
r"rm\s+-rf",
r"dd\s+if=",
r"/etc/passwd",
r"/etc/shadow",
r"chmod\s+777",
r"curl\s+.*\|.*sh",
r"wget\s+.*\|.*sh",
r"eval\s*\(",
r"exec\s*\(",
r"__import__",
r"os\.system",
r"subprocess",
]
def _deny(reason: str) -> dict[str, Any]:
"""Return a hook denial response."""
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": reason,
}
}
def _validate_workspace_path(
tool_name: str, tool_input: dict[str, Any], sdk_cwd: str | None
) -> dict[str, Any]:
"""Validate that a workspace-scoped tool only accesses allowed paths.
Allowed directories:
- The SDK working directory (``/tmp/copilot-<session>/``)
- The SDK tool-results directory (``~/.claude/projects/…/tool-results/``)
"""
path = tool_input.get("file_path") or tool_input.get("path") or ""
if not path:
# Glob/Grep without a path default to cwd which is already sandboxed
return {}
# Resolve relative paths against sdk_cwd (the SDK sets cwd so the LLM
# naturally uses relative paths like "test.txt" instead of absolute ones).
if not os.path.isabs(path) and sdk_cwd:
resolved = os.path.normpath(os.path.join(sdk_cwd, path))
else:
resolved = os.path.normpath(os.path.expanduser(path))
# Allow access within the SDK working directory
if sdk_cwd:
norm_cwd = os.path.normpath(sdk_cwd)
if resolved.startswith(norm_cwd + os.sep) or resolved == norm_cwd:
return {}
# Allow access to ~/.claude/projects/*/tool-results/ (big tool results)
claude_dir = os.path.normpath(os.path.expanduser("~/.claude/projects"))
if resolved.startswith(claude_dir + os.sep) and "tool-results" in resolved:
return {}
logger.warning(
f"Blocked {tool_name} outside workspace: {path} (resolved={resolved})"
)
workspace_hint = f" Allowed workspace: {sdk_cwd}" if sdk_cwd else ""
return _deny(
f"[SECURITY] Tool '{tool_name}' can only access files within the workspace "
f"directory.{workspace_hint} "
"This is enforced by the platform and cannot be bypassed."
)
def _validate_tool_access(
tool_name: str, tool_input: dict[str, Any], sdk_cwd: str | None = None
) -> dict[str, Any]:
"""Validate that a tool call is allowed.
Returns:
Empty dict to allow, or dict with hookSpecificOutput to deny
"""
# Block forbidden tools
if tool_name in BLOCKED_TOOLS:
logger.warning(f"Blocked tool access attempt: {tool_name}")
return _deny(
f"[SECURITY] Tool '{tool_name}' is blocked for security. "
"This is enforced by the platform and cannot be bypassed. "
"Use the CoPilot-specific MCP tools instead."
)
# Workspace-scoped tools: allowed only within the SDK workspace directory
if tool_name in WORKSPACE_SCOPED_TOOLS:
return _validate_workspace_path(tool_name, tool_input, sdk_cwd)
# Check for dangerous patterns in tool input
# Use json.dumps for predictable format (str() produces Python repr)
input_str = json.dumps(tool_input) if tool_input else ""
for pattern in DANGEROUS_PATTERNS:
if re.search(pattern, input_str, re.IGNORECASE):
logger.warning(
f"Blocked dangerous pattern in tool input: {pattern} in {tool_name}"
)
return _deny(
"[SECURITY] Input contains a blocked pattern. "
"This is enforced by the platform and cannot be bypassed."
)
return {}
def _validate_user_isolation(
tool_name: str, tool_input: dict[str, Any], user_id: str | None
) -> dict[str, Any]:
"""Validate that tool calls respect user isolation."""
# For workspace file tools, ensure path doesn't escape
if "workspace" in tool_name.lower():
path = tool_input.get("path", "") or tool_input.get("file_path", "")
if path:
# Check for path traversal
if ".." in path or path.startswith("/"):
logger.warning(
f"Blocked path traversal attempt: {path} by user {user_id}"
)
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": "Path traversal not allowed",
}
}
return {}
def create_security_hooks(
user_id: str | None,
sdk_cwd: str | None = None,
max_subtasks: int = 3,
) -> dict[str, Any]:
"""Create the security hooks configuration for Claude Agent SDK.
Includes security validation and observability hooks:
- PreToolUse: Security validation before tool execution
- PostToolUse: Log successful tool executions
- PostToolUseFailure: Log and handle failed tool executions
- PreCompact: Log context compaction events (SDK handles compaction automatically)
Args:
user_id: Current user ID for isolation validation
sdk_cwd: SDK working directory for workspace-scoped tool validation
max_subtasks: Maximum Task (sub-agent) spawns allowed per session
Returns:
Hooks configuration dict for ClaudeAgentOptions
"""
try:
from claude_agent_sdk import HookMatcher
from claude_agent_sdk.types import HookContext, HookInput, SyncHookJSONOutput
# Per-session counter for Task sub-agent spawns
task_spawn_count = 0
async def pre_tool_use_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Combined pre-tool-use validation hook."""
nonlocal task_spawn_count
_ = context # unused but required by signature
tool_name = cast(str, input_data.get("tool_name", ""))
tool_input = cast(dict[str, Any], input_data.get("tool_input", {}))
# Rate-limit Task (sub-agent) spawns per session
if tool_name == "Task":
task_spawn_count += 1
if task_spawn_count > max_subtasks:
logger.warning(
f"[SDK] Task limit reached ({max_subtasks}), user={user_id}"
)
return cast(
SyncHookJSONOutput,
_deny(
f"Maximum {max_subtasks} sub-tasks per session. "
"Please continue in the main conversation."
),
)
# Strip MCP prefix for consistent validation
is_copilot_tool = tool_name.startswith(MCP_TOOL_PREFIX)
clean_name = tool_name.removeprefix(MCP_TOOL_PREFIX)
# Only block non-CoPilot tools; our MCP-registered tools
# (including Read for oversized results) are already sandboxed.
if not is_copilot_tool:
result = _validate_tool_access(clean_name, tool_input, sdk_cwd)
if result:
return cast(SyncHookJSONOutput, result)
# Validate user isolation
result = _validate_user_isolation(clean_name, tool_input, user_id)
if result:
return cast(SyncHookJSONOutput, result)
logger.debug(f"[SDK] Tool start: {tool_name}, user={user_id}")
return cast(SyncHookJSONOutput, {})
async def post_tool_use_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Log successful tool executions for observability."""
_ = context
tool_name = cast(str, input_data.get("tool_name", ""))
logger.debug(f"[SDK] Tool success: {tool_name}, tool_use_id={tool_use_id}")
return cast(SyncHookJSONOutput, {})
async def post_tool_failure_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Log failed tool executions for debugging."""
_ = context
tool_name = cast(str, input_data.get("tool_name", ""))
error = input_data.get("error", "Unknown error")
logger.warning(
f"[SDK] Tool failed: {tool_name}, error={error}, "
f"user={user_id}, tool_use_id={tool_use_id}"
)
return cast(SyncHookJSONOutput, {})
async def pre_compact_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Log when SDK triggers context compaction.
The SDK automatically compacts conversation history when it grows too large.
This hook provides visibility into when compaction happens.
"""
_ = context, tool_use_id
trigger = input_data.get("trigger", "auto")
logger.info(
f"[SDK] Context compaction triggered: {trigger}, user={user_id}"
)
return cast(SyncHookJSONOutput, {})
return {
"PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])],
"PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])],
"PostToolUseFailure": [
HookMatcher(matcher="*", hooks=[post_tool_failure_hook])
],
"PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])],
}
except ImportError:
# Fallback for when SDK isn't available - return empty hooks
logger.warning("claude-agent-sdk not available, security hooks disabled")
return {}

View File

@@ -0,0 +1,165 @@
"""Unit tests for SDK security hooks."""
import os
from .security_hooks import _validate_tool_access, _validate_user_isolation
SDK_CWD = "/tmp/copilot-abc123"
def _is_denied(result: dict) -> bool:
hook = result.get("hookSpecificOutput", {})
return hook.get("permissionDecision") == "deny"
# -- Blocked tools -----------------------------------------------------------
def test_blocked_tools_denied():
for tool in ("bash", "shell", "exec", "terminal", "command"):
result = _validate_tool_access(tool, {})
assert _is_denied(result), f"{tool} should be blocked"
def test_unknown_tool_allowed():
result = _validate_tool_access("SomeCustomTool", {})
assert result == {}
# -- Workspace-scoped tools --------------------------------------------------
def test_read_within_workspace_allowed():
result = _validate_tool_access(
"Read", {"file_path": f"{SDK_CWD}/file.txt"}, sdk_cwd=SDK_CWD
)
assert result == {}
def test_write_within_workspace_allowed():
result = _validate_tool_access(
"Write", {"file_path": f"{SDK_CWD}/output.json"}, sdk_cwd=SDK_CWD
)
assert result == {}
def test_edit_within_workspace_allowed():
result = _validate_tool_access(
"Edit", {"file_path": f"{SDK_CWD}/src/main.py"}, sdk_cwd=SDK_CWD
)
assert result == {}
def test_glob_within_workspace_allowed():
result = _validate_tool_access("Glob", {"path": f"{SDK_CWD}/src"}, sdk_cwd=SDK_CWD)
assert result == {}
def test_grep_within_workspace_allowed():
result = _validate_tool_access("Grep", {"path": f"{SDK_CWD}/src"}, sdk_cwd=SDK_CWD)
assert result == {}
def test_read_outside_workspace_denied():
result = _validate_tool_access(
"Read", {"file_path": "/etc/passwd"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_write_outside_workspace_denied():
result = _validate_tool_access(
"Write", {"file_path": "/home/user/secrets.txt"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_traversal_attack_denied():
result = _validate_tool_access(
"Read",
{"file_path": f"{SDK_CWD}/../../etc/passwd"},
sdk_cwd=SDK_CWD,
)
assert _is_denied(result)
def test_no_path_allowed():
"""Glob/Grep without a path argument defaults to cwd — should pass."""
result = _validate_tool_access("Glob", {}, sdk_cwd=SDK_CWD)
assert result == {}
def test_read_no_cwd_denies_absolute():
"""If no sdk_cwd is set, absolute paths are denied."""
result = _validate_tool_access("Read", {"file_path": "/tmp/anything"})
assert _is_denied(result)
# -- Tool-results directory --------------------------------------------------
def test_read_tool_results_allowed():
home = os.path.expanduser("~")
path = f"{home}/.claude/projects/-tmp-copilot-abc123/tool-results/12345.txt"
result = _validate_tool_access("Read", {"file_path": path}, sdk_cwd=SDK_CWD)
assert result == {}
def test_read_claude_projects_without_tool_results_denied():
home = os.path.expanduser("~")
path = f"{home}/.claude/projects/-tmp-copilot-abc123/settings.json"
result = _validate_tool_access("Read", {"file_path": path}, sdk_cwd=SDK_CWD)
assert _is_denied(result)
# -- Built-in Bash is blocked (use bash_exec MCP tool instead) ---------------
def test_bash_builtin_always_blocked():
"""SDK built-in Bash is blocked — bash_exec MCP tool with bubblewrap is used instead."""
result = _validate_tool_access("Bash", {"command": "echo hello"}, sdk_cwd=SDK_CWD)
assert _is_denied(result)
# -- Dangerous patterns ------------------------------------------------------
def test_dangerous_pattern_blocked():
result = _validate_tool_access("SomeTool", {"cmd": "sudo rm -rf /"})
assert _is_denied(result)
def test_subprocess_pattern_blocked():
result = _validate_tool_access("SomeTool", {"code": "subprocess.run(...)"})
assert _is_denied(result)
# -- User isolation ----------------------------------------------------------
def test_workspace_path_traversal_blocked():
result = _validate_user_isolation(
"workspace_read", {"path": "../../../etc/shadow"}, user_id="user-1"
)
assert _is_denied(result)
def test_workspace_absolute_path_blocked():
result = _validate_user_isolation(
"workspace_read", {"path": "/etc/passwd"}, user_id="user-1"
)
assert _is_denied(result)
def test_workspace_normal_path_allowed():
result = _validate_user_isolation(
"workspace_read", {"path": "src/main.py"}, user_id="user-1"
)
assert result == {}
def test_non_workspace_tool_passes_isolation():
result = _validate_user_isolation(
"find_agent", {"query": "email"}, user_id="user-1"
)
assert result == {}

View File

@@ -0,0 +1,668 @@
"""Claude Agent SDK service layer for CoPilot chat completions."""
import asyncio
import json
import logging
import os
import uuid
from collections.abc import AsyncGenerator
from typing import Any
from backend.util.exceptions import NotFoundError
from .. import stream_registry
from ..config import ChatConfig
from ..model import (
ChatMessage,
ChatSession,
get_chat_session,
update_session_title,
upsert_chat_session,
)
from ..response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamStart,
StreamTextDelta,
StreamToolInputAvailable,
StreamToolOutputAvailable,
)
from ..service import (
_build_system_prompt,
_execute_long_running_tool_with_streaming,
_generate_session_title,
)
from ..tools.models import OperationPendingResponse, OperationStartedResponse
from ..tools.sandbox import WORKSPACE_PREFIX, make_session_path
from ..tracking import track_user_message
from .response_adapter import SDKResponseAdapter
from .security_hooks import create_security_hooks
from .tool_adapter import (
COPILOT_TOOL_NAMES,
LongRunningCallback,
create_copilot_mcp_server,
set_execution_context,
)
logger = logging.getLogger(__name__)
config = ChatConfig()
# Set to hold background tasks to prevent garbage collection
_background_tasks: set[asyncio.Task[Any]] = set()
_SDK_CWD_PREFIX = WORKSPACE_PREFIX
# Appended to the system prompt to inform the agent about available tools.
# The SDK built-in Bash is NOT available — use mcp__copilot__bash_exec instead,
# which has kernel-level network isolation (unshare --net).
_SDK_TOOL_SUPPLEMENT = """
## Tool notes
- The SDK built-in Bash tool is NOT available. Use the `bash_exec` MCP tool
for shell commands — it runs in a network-isolated sandbox.
- **Shared workspace**: The SDK Read/Write tools and `bash_exec` share the
same working directory. Files created by one are readable by the other.
These files are **ephemeral** — they exist only for the current session.
- **Persistent storage**: Use `write_workspace_file` / `read_workspace_file`
for files that should persist across sessions (stored in cloud storage).
- Long-running tools (create_agent, edit_agent, etc.) are handled
asynchronously. You will receive an immediate response; the actual result
is delivered to the user via a background stream.
"""
def _build_long_running_callback(user_id: str | None) -> LongRunningCallback:
"""Build a callback that delegates long-running tools to the non-SDK infrastructure.
Long-running tools (create_agent, edit_agent, etc.) are delegated to the
existing background infrastructure: stream_registry (Redis Streams),
database persistence, and SSE reconnection. This means results survive
page refreshes / pod restarts, and the frontend shows the proper loading
widget with progress updates.
The returned callback matches the ``LongRunningCallback`` signature:
``(tool_name, args, session) -> MCP response dict``.
"""
async def _callback(
tool_name: str, args: dict[str, Any], session: ChatSession
) -> dict[str, Any]:
operation_id = str(uuid.uuid4())
task_id = str(uuid.uuid4())
tool_call_id = f"sdk-{uuid.uuid4().hex[:12]}"
session_id = session.session_id
# --- Build user-friendly messages (matches non-SDK service) ---
if tool_name == "create_agent":
desc = args.get("description", "")
desc_preview = (desc[:100] + "...") if len(desc) > 100 else desc
pending_msg = (
f"Creating your agent: {desc_preview}"
if desc_preview
else "Creating agent... This may take a few minutes."
)
started_msg = (
"Agent creation started. You can close this tab - "
"check your library in a few minutes."
)
elif tool_name == "edit_agent":
changes = args.get("changes", "")
changes_preview = (changes[:100] + "...") if len(changes) > 100 else changes
pending_msg = (
f"Editing agent: {changes_preview}"
if changes_preview
else "Editing agent... This may take a few minutes."
)
started_msg = (
"Agent edit started. You can close this tab - "
"check your library in a few minutes."
)
else:
pending_msg = f"Running {tool_name}... This may take a few minutes."
started_msg = (
f"{tool_name} started. You can close this tab - "
"check back in a few minutes."
)
# --- Register task in Redis for SSE reconnection ---
await stream_registry.create_task(
task_id=task_id,
session_id=session_id,
user_id=user_id,
tool_call_id=tool_call_id,
tool_name=tool_name,
operation_id=operation_id,
)
# --- Save OperationPendingResponse to chat history ---
pending_message = ChatMessage(
role="tool",
content=OperationPendingResponse(
message=pending_msg,
operation_id=operation_id,
tool_name=tool_name,
).model_dump_json(),
tool_call_id=tool_call_id,
)
session.messages.append(pending_message)
await upsert_chat_session(session)
# --- Spawn background task (reuses non-SDK infrastructure) ---
bg_task = asyncio.create_task(
_execute_long_running_tool_with_streaming(
tool_name=tool_name,
parameters=args,
tool_call_id=tool_call_id,
operation_id=operation_id,
task_id=task_id,
session_id=session_id,
user_id=user_id,
)
)
_background_tasks.add(bg_task)
bg_task.add_done_callback(_background_tasks.discard)
await stream_registry.set_task_asyncio_task(task_id, bg_task)
logger.info(
f"[SDK] Long-running tool {tool_name} delegated to background "
f"(operation_id={operation_id}, task_id={task_id})"
)
# --- Return OperationStartedResponse as MCP tool result ---
# This flows through SDK → response adapter → frontend, triggering
# the loading widget with SSE reconnection support.
started_json = OperationStartedResponse(
message=started_msg,
operation_id=operation_id,
tool_name=tool_name,
task_id=task_id,
).model_dump_json()
return {
"content": [{"type": "text", "text": started_json}],
"isError": False,
}
return _callback
def _resolve_sdk_model() -> str | None:
"""Resolve the model name for the Claude Agent SDK CLI.
Uses ``config.claude_agent_model`` if set, otherwise derives from
``config.model`` by stripping the OpenRouter provider prefix (e.g.,
``"anthropic/claude-opus-4.6"`` → ``"claude-opus-4.6"``).
"""
if config.claude_agent_model:
return config.claude_agent_model
model = config.model
if "/" in model:
return model.split("/", 1)[1]
return model
def _build_sdk_env() -> dict[str, str]:
"""Build env vars for the SDK CLI process.
Routes API calls through OpenRouter (or a custom base_url) using
the same ``config.api_key`` / ``config.base_url`` as the non-SDK path.
This gives per-call token and cost tracking on the OpenRouter dashboard.
Only overrides ``ANTHROPIC_API_KEY`` when a valid proxy URL and auth
token are both present — otherwise returns an empty dict so the SDK
falls back to its default credentials.
"""
env: dict[str, str] = {}
if config.api_key and config.base_url:
# Strip /v1 suffix — SDK expects the base URL without a version path
base = config.base_url.rstrip("/")
if base.endswith("/v1"):
base = base[:-3]
if not base or not base.startswith("http"):
# Invalid base_url — don't override SDK defaults
return env
env["ANTHROPIC_BASE_URL"] = base
env["ANTHROPIC_AUTH_TOKEN"] = config.api_key
# Must be explicitly empty so the CLI uses AUTH_TOKEN instead
env["ANTHROPIC_API_KEY"] = ""
return env
def _make_sdk_cwd(session_id: str) -> str:
"""Create a safe, session-specific working directory path.
Delegates to :func:`~backend.api.features.chat.tools.sandbox.make_session_path`
(single source of truth for path sanitization) and adds a defence-in-depth
assertion.
"""
cwd = make_session_path(session_id)
# Defence-in-depth: normpath + startswith is a CodeQL-recognised sanitizer
cwd = os.path.normpath(cwd)
if not cwd.startswith(_SDK_CWD_PREFIX):
raise ValueError(f"SDK cwd escaped prefix: {cwd}")
return cwd
def _cleanup_sdk_tool_results(cwd: str) -> None:
"""Remove SDK tool-result files for a specific session working directory.
The SDK creates tool-result files under ~/.claude/projects/<encoded-cwd>/tool-results/.
We clean only the specific cwd's results to avoid race conditions between
concurrent sessions.
Security: cwd MUST be created by _make_sdk_cwd() which sanitizes session_id.
"""
import shutil
# Security check 1: Validate cwd is under the expected prefix
normalized = os.path.normpath(cwd)
if not normalized.startswith(_SDK_CWD_PREFIX):
logger.warning(f"[SDK] Rejecting cleanup for invalid path: {cwd}")
return
# Security check 2: Ensure no path traversal in the normalized path
if ".." in normalized:
logger.warning(f"[SDK] Rejecting cleanup for traversal attempt: {cwd}")
return
# SDK encodes the cwd path by replacing '/' with '-'
encoded_cwd = normalized.replace("/", "-")
# Construct the project directory path (known-safe home expansion)
claude_projects = os.path.expanduser("~/.claude/projects")
project_dir = os.path.join(claude_projects, encoded_cwd)
# Security check 3: Validate project_dir is under ~/.claude/projects
project_dir = os.path.normpath(project_dir)
if not project_dir.startswith(claude_projects):
logger.warning(
f"[SDK] Rejecting cleanup for escaped project path: {project_dir}"
)
return
results_dir = os.path.join(project_dir, "tool-results")
if os.path.isdir(results_dir):
for filename in os.listdir(results_dir):
file_path = os.path.join(results_dir, filename)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except OSError:
pass
# Also clean up the temp cwd directory itself
try:
shutil.rmtree(normalized, ignore_errors=True)
except OSError:
pass
async def _compress_conversation_history(
session: ChatSession,
) -> list[ChatMessage]:
"""Compress prior conversation messages if they exceed the token threshold.
Uses the shared compress_context() from prompt.py which supports:
- LLM summarization of old messages (keeps recent ones intact)
- Progressive content truncation as fallback
- Middle-out deletion as last resort
Returns the compressed prior messages (everything except the current message).
"""
prior = session.messages[:-1]
if len(prior) < 2:
return prior
from backend.util.prompt import compress_context
# Convert ChatMessages to dicts for compress_context
messages_dict = []
for msg in prior:
msg_dict: dict[str, Any] = {"role": msg.role}
if msg.content:
msg_dict["content"] = msg.content
if msg.tool_calls:
msg_dict["tool_calls"] = msg.tool_calls
if msg.tool_call_id:
msg_dict["tool_call_id"] = msg.tool_call_id
messages_dict.append(msg_dict)
try:
import openai
async with openai.AsyncOpenAI(
api_key=config.api_key, base_url=config.base_url, timeout=30.0
) as client:
result = await compress_context(
messages=messages_dict,
model=config.model,
client=client,
)
except Exception as e:
logger.warning(f"[SDK] Context compression with LLM failed: {e}")
# Fall back to truncation-only (no LLM summarization)
result = await compress_context(
messages=messages_dict,
model=config.model,
client=None,
)
if result.was_compacted:
logger.info(
f"[SDK] Context compacted: {result.original_token_count} -> "
f"{result.token_count} tokens "
f"({result.messages_summarized} summarized, "
f"{result.messages_dropped} dropped)"
)
# Convert compressed dicts back to ChatMessages
return [
ChatMessage(
role=m["role"],
content=m.get("content"),
tool_calls=m.get("tool_calls"),
tool_call_id=m.get("tool_call_id"),
)
for m in result.messages
]
return prior
def _format_conversation_context(messages: list[ChatMessage]) -> str | None:
"""Format conversation messages into a context prefix for the user message.
Returns a string like:
<conversation_history>
User: hello
You responded: Hi! How can I help?
</conversation_history>
Returns None if there are no messages to format.
"""
if not messages:
return None
lines: list[str] = []
for msg in messages:
if not msg.content:
continue
if msg.role == "user":
lines.append(f"User: {msg.content}")
elif msg.role == "assistant":
lines.append(f"You responded: {msg.content}")
# Skip tool messages — they're internal details
if not lines:
return None
return "<conversation_history>\n" + "\n".join(lines) + "\n</conversation_history>"
async def stream_chat_completion_sdk(
session_id: str,
message: str | None = None,
tool_call_response: str | None = None, # noqa: ARG001
is_user_message: bool = True,
user_id: str | None = None,
retry_count: int = 0, # noqa: ARG001
session: ChatSession | None = None,
context: dict[str, str] | None = None, # noqa: ARG001
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Stream chat completion using Claude Agent SDK.
Drop-in replacement for stream_chat_completion with improved reliability.
"""
if session is None:
session = await get_chat_session(session_id, user_id)
if not session:
raise NotFoundError(
f"Session {session_id} not found. Please create a new session first."
)
if message:
session.messages.append(
ChatMessage(
role="user" if is_user_message else "assistant", content=message
)
)
if is_user_message:
track_user_message(
user_id=user_id, session_id=session_id, message_length=len(message)
)
session = await upsert_chat_session(session)
# Generate title for new sessions (first user message)
if is_user_message and not session.title:
user_messages = [m for m in session.messages if m.role == "user"]
if len(user_messages) == 1:
first_message = user_messages[0].content or message or ""
if first_message:
task = asyncio.create_task(
_update_title_async(session_id, first_message, user_id)
)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
# Build system prompt (reuses non-SDK path with Langfuse support)
has_history = len(session.messages) > 1
system_prompt, _ = await _build_system_prompt(
user_id, has_conversation_history=has_history
)
system_prompt += _SDK_TOOL_SUPPLEMENT
message_id = str(uuid.uuid4())
task_id = str(uuid.uuid4())
yield StreamStart(messageId=message_id, taskId=task_id)
stream_completed = False
# Initialise sdk_cwd before the try so the finally can reference it
# even if _make_sdk_cwd raises (in that case it stays as "").
sdk_cwd = ""
try:
# Use a session-specific temp dir to avoid cleanup race conditions
# between concurrent sessions.
sdk_cwd = _make_sdk_cwd(session_id)
os.makedirs(sdk_cwd, exist_ok=True)
set_execution_context(
user_id,
session,
long_running_callback=_build_long_running_callback(user_id),
)
try:
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
# Fail fast when no API credentials are available at all
sdk_env = _build_sdk_env()
if not sdk_env and not os.environ.get("ANTHROPIC_API_KEY"):
raise RuntimeError(
"No API key configured. Set OPEN_ROUTER_API_KEY "
"(or CHAT_API_KEY) for OpenRouter routing, "
"or ANTHROPIC_API_KEY for direct Anthropic access."
)
mcp_server = create_copilot_mcp_server()
sdk_model = _resolve_sdk_model()
security_hooks = create_security_hooks(
user_id,
sdk_cwd=sdk_cwd,
max_subtasks=config.claude_agent_max_subtasks,
)
options = ClaudeAgentOptions(
system_prompt=system_prompt,
mcp_servers={"copilot": mcp_server}, # type: ignore[arg-type]
allowed_tools=COPILOT_TOOL_NAMES,
hooks=security_hooks, # type: ignore[arg-type]
cwd=sdk_cwd,
max_buffer_size=config.claude_agent_max_buffer_size,
# Only pass model/env when OpenRouter is configured
**({"model": sdk_model, "env": sdk_env} if sdk_env else {}),
)
adapter = SDKResponseAdapter(message_id=message_id)
adapter.set_task_id(task_id)
async with ClaudeSDKClient(options=options) as client:
current_message = message or ""
if not current_message and session.messages:
last_user = [m for m in session.messages if m.role == "user"]
if last_user:
current_message = last_user[-1].content or ""
if not current_message.strip():
yield StreamError(
errorText="Message cannot be empty.",
code="empty_prompt",
)
yield StreamFinish()
return
# Build query with conversation history context.
# Compress history first to handle long conversations.
query_message = current_message
if len(session.messages) > 1:
compressed = await _compress_conversation_history(session)
history_context = _format_conversation_context(compressed)
if history_context:
query_message = (
f"{history_context}\n\n"
f"Now, the user says:\n{current_message}"
)
logger.info(
f"[SDK] Sending query: {current_message[:80]!r}"
f" ({len(session.messages)} msgs in session)"
)
await client.query(query_message, session_id=session_id)
assistant_response = ChatMessage(role="assistant", content="")
accumulated_tool_calls: list[dict[str, Any]] = []
has_appended_assistant = False
has_tool_results = False
async for sdk_msg in client.receive_messages():
logger.debug(
f"[SDK] Received: {type(sdk_msg).__name__} "
f"{getattr(sdk_msg, 'subtype', '')}"
)
for response in adapter.convert_message(sdk_msg):
if isinstance(response, StreamStart):
continue
yield response
if isinstance(response, StreamTextDelta):
delta = response.delta or ""
# After tool results, start a new assistant
# message for the post-tool text.
if has_tool_results and has_appended_assistant:
assistant_response = ChatMessage(
role="assistant", content=delta
)
accumulated_tool_calls = []
has_appended_assistant = False
has_tool_results = False
session.messages.append(assistant_response)
has_appended_assistant = True
else:
assistant_response.content = (
assistant_response.content or ""
) + delta
if not has_appended_assistant:
session.messages.append(assistant_response)
has_appended_assistant = True
elif isinstance(response, StreamToolInputAvailable):
accumulated_tool_calls.append(
{
"id": response.toolCallId,
"type": "function",
"function": {
"name": response.toolName,
"arguments": json.dumps(response.input or {}),
},
}
)
assistant_response.tool_calls = accumulated_tool_calls
if not has_appended_assistant:
session.messages.append(assistant_response)
has_appended_assistant = True
elif isinstance(response, StreamToolOutputAvailable):
session.messages.append(
ChatMessage(
role="tool",
content=(
response.output
if isinstance(response.output, str)
else str(response.output)
),
tool_call_id=response.toolCallId,
)
)
has_tool_results = True
elif isinstance(response, StreamFinish):
stream_completed = True
if stream_completed:
break
if (
assistant_response.content or assistant_response.tool_calls
) and not has_appended_assistant:
session.messages.append(assistant_response)
except ImportError:
raise RuntimeError(
"claude-agent-sdk is not installed. "
"Disable SDK mode (CHAT_USE_CLAUDE_AGENT_SDK=false) "
"to use the OpenAI-compatible fallback."
)
await upsert_chat_session(session)
logger.debug(
f"[SDK] Session {session_id} saved with {len(session.messages)} messages"
)
if not stream_completed:
yield StreamFinish()
except Exception as e:
logger.error(f"[SDK] Error: {e}", exc_info=True)
try:
await upsert_chat_session(session)
except Exception as save_err:
logger.error(f"[SDK] Failed to save session on error: {save_err}")
yield StreamError(
errorText="An error occurred. Please try again.",
code="sdk_error",
)
yield StreamFinish()
finally:
if sdk_cwd:
_cleanup_sdk_tool_results(sdk_cwd)
async def _update_title_async(
session_id: str, message: str, user_id: str | None = None
) -> None:
"""Background task to update session title."""
try:
title = await _generate_session_title(
message, user_id=user_id, session_id=session_id
)
if title:
await update_session_title(session_id, title)
logger.debug(f"[SDK] Generated title for {session_id}: {title}")
except Exception as e:
logger.warning(f"[SDK] Failed to update session title: {e}")

View File

@@ -0,0 +1,320 @@
"""Tool adapter for wrapping existing CoPilot tools as Claude Agent SDK MCP tools.
This module provides the adapter layer that converts existing BaseTool implementations
into in-process MCP tools that can be used with the Claude Agent SDK.
Long-running tools (``is_long_running=True``) are delegated to the non-SDK
background infrastructure (stream_registry, Redis persistence, SSE reconnection)
via a callback provided by the service layer. This avoids wasteful SDK polling
and makes results survive page refreshes.
"""
import json
import logging
import os
import uuid
from collections.abc import Awaitable, Callable
from contextvars import ContextVar
from typing import Any
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools import TOOL_REGISTRY
from backend.api.features.chat.tools.base import BaseTool
logger = logging.getLogger(__name__)
# Allowed base directory for the Read tool (SDK saves oversized tool results here).
# Restricted to ~/.claude/projects/ and further validated to require "tool-results"
# in the path — prevents reading settings, credentials, or other sensitive files.
_SDK_PROJECTS_DIR = os.path.expanduser("~/.claude/projects/")
# MCP server naming - the SDK prefixes tool names as "mcp__{server_name}__{tool}"
MCP_SERVER_NAME = "copilot"
MCP_TOOL_PREFIX = f"mcp__{MCP_SERVER_NAME}__"
# Context variables to pass user/session info to tool execution
_current_user_id: ContextVar[str | None] = ContextVar("current_user_id", default=None)
_current_session: ContextVar[ChatSession | None] = ContextVar(
"current_session", default=None
)
# Stash for MCP tool outputs before the SDK potentially truncates them.
# Keyed by tool_name → full output string. Consumed (popped) by the
# response adapter when it builds StreamToolOutputAvailable.
_pending_tool_outputs: ContextVar[dict[str, str]] = ContextVar(
"pending_tool_outputs", default=None # type: ignore[arg-type]
)
# Callback type for delegating long-running tools to the non-SDK infrastructure.
# Args: (tool_name, arguments, session) → MCP-formatted response dict.
LongRunningCallback = Callable[
[str, dict[str, Any], ChatSession], Awaitable[dict[str, Any]]
]
# ContextVar so the service layer can inject the callback per-request.
_long_running_callback: ContextVar[LongRunningCallback | None] = ContextVar(
"long_running_callback", default=None
)
def set_execution_context(
user_id: str | None,
session: ChatSession,
long_running_callback: LongRunningCallback | None = None,
) -> None:
"""Set the execution context for tool calls.
This must be called before streaming begins to ensure tools have access
to user_id and session information.
Args:
user_id: Current user's ID.
session: Current chat session.
long_running_callback: Optional callback to delegate long-running tools
to the non-SDK background infrastructure (stream_registry + Redis).
"""
_current_user_id.set(user_id)
_current_session.set(session)
_pending_tool_outputs.set({})
_long_running_callback.set(long_running_callback)
def get_execution_context() -> tuple[str | None, ChatSession | None]:
"""Get the current execution context."""
return (
_current_user_id.get(),
_current_session.get(),
)
def pop_pending_tool_output(tool_name: str) -> str | None:
"""Pop and return the stashed full output for *tool_name*.
The SDK CLI may truncate large tool results (writing them to disk and
replacing the content with a file reference). This stash keeps the
original MCP output so the response adapter can forward it to the
frontend for proper widget rendering.
Returns ``None`` if nothing was stashed for *tool_name*.
"""
pending = _pending_tool_outputs.get(None)
if pending is None:
return None
return pending.pop(tool_name, None)
async def _execute_tool_sync(
base_tool: BaseTool,
user_id: str | None,
session: ChatSession,
args: dict[str, Any],
) -> dict[str, Any]:
"""Execute a tool synchronously and return MCP-formatted response."""
effective_id = f"sdk-{uuid.uuid4().hex[:12]}"
result = await base_tool.execute(
user_id=user_id,
session=session,
tool_call_id=effective_id,
**args,
)
text = (
result.output if isinstance(result.output, str) else json.dumps(result.output)
)
# Stash the full output before the SDK potentially truncates it.
pending = _pending_tool_outputs.get(None)
if pending is not None:
pending[base_tool.name] = text
return {
"content": [{"type": "text", "text": text}],
"isError": not result.success,
}
def _mcp_error(message: str) -> dict[str, Any]:
return {
"content": [
{"type": "text", "text": json.dumps({"error": message, "type": "error"})}
],
"isError": True,
}
def create_tool_handler(base_tool: BaseTool):
"""Create an async handler function for a BaseTool.
This wraps the existing BaseTool._execute method to be compatible
with the Claude Agent SDK MCP tool format.
Long-running tools (``is_long_running=True``) are delegated to the
non-SDK background infrastructure via a callback set in the execution
context. The callback persists the operation in Redis (stream_registry)
so results survive page refreshes and pod restarts.
"""
async def tool_handler(args: dict[str, Any]) -> dict[str, Any]:
"""Execute the wrapped tool and return MCP-formatted response."""
user_id, session = get_execution_context()
if session is None:
return _mcp_error("No session context available")
# --- Long-running: delegate to non-SDK background infrastructure ---
if base_tool.is_long_running:
callback = _long_running_callback.get(None)
if callback:
try:
return await callback(base_tool.name, args, session)
except Exception as e:
logger.error(
f"Long-running callback failed for {base_tool.name}: {e}",
exc_info=True,
)
return _mcp_error(f"Failed to start {base_tool.name}: {e}")
# No callback — fall through to synchronous execution
logger.warning(
f"[SDK] No long-running callback for {base_tool.name}, "
f"executing synchronously (may block)"
)
# --- Normal (fast) tool: execute synchronously ---
try:
return await _execute_tool_sync(base_tool, user_id, session, args)
except Exception as e:
logger.error(f"Error executing tool {base_tool.name}: {e}", exc_info=True)
return _mcp_error(f"Failed to execute {base_tool.name}: {e}")
return tool_handler
def _build_input_schema(base_tool: BaseTool) -> dict[str, Any]:
"""Build a JSON Schema input schema for a tool."""
return {
"type": "object",
"properties": base_tool.parameters.get("properties", {}),
"required": base_tool.parameters.get("required", []),
}
async def _read_file_handler(args: dict[str, Any]) -> dict[str, Any]:
"""Read a file with optional offset/limit. Restricted to SDK working directory.
After reading, the file is deleted to prevent accumulation in long-running pods.
"""
file_path = args.get("file_path", "")
offset = args.get("offset", 0)
limit = args.get("limit", 2000)
# Security: only allow reads under ~/.claude/projects/**/tool-results/
real_path = os.path.realpath(file_path)
if not real_path.startswith(_SDK_PROJECTS_DIR) or "tool-results" not in real_path:
return {
"content": [{"type": "text", "text": f"Access denied: {file_path}"}],
"isError": True,
}
try:
with open(real_path) as f:
lines = f.readlines()
selected = lines[offset : offset + limit]
content = "".join(selected)
return {"content": [{"type": "text", "text": content}], "isError": False}
except FileNotFoundError:
return {
"content": [{"type": "text", "text": f"File not found: {file_path}"}],
"isError": True,
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error reading file: {e}"}],
"isError": True,
}
_READ_TOOL_NAME = "Read"
_READ_TOOL_DESCRIPTION = (
"Read a file from the local filesystem. "
"Use offset and limit to read specific line ranges for large files."
)
_READ_TOOL_SCHEMA = {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The absolute path to the file to read",
},
"offset": {
"type": "integer",
"description": "Line number to start reading from (0-indexed). Default: 0",
},
"limit": {
"type": "integer",
"description": "Number of lines to read. Default: 2000",
},
},
"required": ["file_path"],
}
# Create the MCP server configuration
def create_copilot_mcp_server():
"""Create an in-process MCP server configuration for CoPilot tools.
This can be passed to ClaudeAgentOptions.mcp_servers.
Note: The actual SDK MCP server creation depends on the claude-agent-sdk
package being available. This function returns the configuration that
can be used with the SDK.
"""
try:
from claude_agent_sdk import create_sdk_mcp_server, tool
# Create decorated tool functions
sdk_tools = []
for tool_name, base_tool in TOOL_REGISTRY.items():
handler = create_tool_handler(base_tool)
decorated = tool(
tool_name,
base_tool.description,
_build_input_schema(base_tool),
)(handler)
sdk_tools.append(decorated)
# Add the Read tool so the SDK can read back oversized tool results
read_tool = tool(
_READ_TOOL_NAME,
_READ_TOOL_DESCRIPTION,
_READ_TOOL_SCHEMA,
)(_read_file_handler)
sdk_tools.append(read_tool)
server = create_sdk_mcp_server(
name=MCP_SERVER_NAME,
version="1.0.0",
tools=sdk_tools,
)
return server
except ImportError:
# Let ImportError propagate so service.py handles the fallback
raise
# SDK built-in tools allowed within the workspace directory.
# Security hooks validate that file paths stay within sdk_cwd.
# Bash is NOT included — use the sandboxed MCP bash_exec tool instead,
# which provides kernel-level network isolation via unshare --net.
# Task allows spawning sub-agents (rate-limited by security hooks).
_SDK_BUILTIN_TOOLS = ["Read", "Write", "Edit", "Glob", "Grep", "Task"]
# List of tool names for allowed_tools configuration
# Include MCP tools, the MCP Read tool for oversized results,
# and SDK built-in file tools for workspace operations.
COPILOT_TOOL_NAMES = [
*[f"{MCP_TOOL_PREFIX}{name}" for name in TOOL_REGISTRY.keys()],
f"{MCP_TOOL_PREFIX}{_READ_TOOL_NAME}",
*_SDK_BUILTIN_TOOLS,
]

View File

@@ -245,12 +245,16 @@ async def _get_system_prompt_template(context: str) -> str:
return DEFAULT_SYSTEM_PROMPT.format(users_information=context) return DEFAULT_SYSTEM_PROMPT.format(users_information=context)
async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]: async def _build_system_prompt(
user_id: str | None, has_conversation_history: bool = False
) -> tuple[str, Any]:
"""Build the full system prompt including business understanding if available. """Build the full system prompt including business understanding if available.
Args: Args:
user_id: The user ID for fetching business understanding user_id: The user ID for fetching business understanding.
If "default" and this is the user's first session, will use "onboarding" instead. has_conversation_history: Whether there's existing conversation history.
If True, we don't tell the model to greet/introduce (since they're
already in a conversation).
Returns: Returns:
Tuple of (compiled prompt string, business understanding object) Tuple of (compiled prompt string, business understanding object)
@@ -266,6 +270,8 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
if understanding: if understanding:
context = format_understanding_for_prompt(understanding) context = format_understanding_for_prompt(understanding)
elif has_conversation_history:
context = "No prior understanding saved yet. Continue the existing conversation naturally."
else: else:
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform" context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
@@ -374,7 +380,6 @@ async def stream_chat_completion(
Raises: Raises:
NotFoundError: If session_id is invalid NotFoundError: If session_id is invalid
ValueError: If max_context_messages is exceeded
""" """
completion_start = time.monotonic() completion_start = time.monotonic()
@@ -459,8 +464,9 @@ async def stream_chat_completion(
# Generate title for new sessions on first user message (non-blocking) # Generate title for new sessions on first user message (non-blocking)
# Check: is_user_message, no title yet, and this is the first user message # Check: is_user_message, no title yet, and this is the first user message
if is_user_message and message and not session.title: user_messages = [m for m in session.messages if m.role == "user"]
user_messages = [m for m in session.messages if m.role == "user"] first_user_msg = message or (user_messages[0].content if user_messages else None)
if is_user_message and first_user_msg and not session.title:
if len(user_messages) == 1: if len(user_messages) == 1:
# First user message - generate title in background # First user message - generate title in background
import asyncio import asyncio
@@ -468,7 +474,7 @@ async def stream_chat_completion(
# Capture only the values we need (not the session object) to avoid # Capture only the values we need (not the session object) to avoid
# stale data issues when the main flow modifies the session # stale data issues when the main flow modifies the session
captured_session_id = session_id captured_session_id = session_id
captured_message = message captured_message = first_user_msg
captured_user_id = user_id captured_user_id = user_id
async def _update_title(): async def _update_title():
@@ -800,9 +806,13 @@ async def stream_chat_completion(
# Build the messages list in the correct order # Build the messages list in the correct order
messages_to_save: list[ChatMessage] = [] messages_to_save: list[ChatMessage] = []
# Add assistant message with tool_calls if any # Add assistant message with tool_calls if any.
# Use extend (not assign) to preserve tool_calls already added by
# _yield_tool_call for long-running tools.
if accumulated_tool_calls: if accumulated_tool_calls:
assistant_response.tool_calls = accumulated_tool_calls if not assistant_response.tool_calls:
assistant_response.tool_calls = []
assistant_response.tool_calls.extend(accumulated_tool_calls)
logger.info( logger.info(
f"Added {len(accumulated_tool_calls)} tool calls to assistant message" f"Added {len(accumulated_tool_calls)} tool calls to assistant message"
) )
@@ -1233,7 +1243,7 @@ async def _stream_chat_chunks(
total_time = (time_module.perf_counter() - stream_chunks_start) * 1000 total_time = (time_module.perf_counter() - stream_chunks_start) * 1000
logger.info( logger.info(
f"[TIMING] _stream_chat_chunks COMPLETED in {total_time/1000:.1f}s; " f"[TIMING] _stream_chat_chunks COMPLETED in {total_time / 1000:.1f}s; "
f"session={session.session_id}, user={session.user_id}", f"session={session.session_id}, user={session.user_id}",
extra={"json_fields": {**log_meta, "total_time_ms": total_time}}, extra={"json_fields": {**log_meta, "total_time_ms": total_time}},
) )
@@ -1404,13 +1414,9 @@ async def _yield_tool_call(
operation_id=operation_id, operation_id=operation_id,
) )
# Save assistant message with tool_call FIRST (required by LLM) # Attach the tool_call to the current turn's assistant message
assistant_message = ChatMessage( # (or create one if this is a tool-only response with no text).
role="assistant", session.add_tool_call_to_current_turn(tool_calls[yield_idx])
content="",
tool_calls=[tool_calls[yield_idx]],
)
session.messages.append(assistant_message)
# Then save pending tool result # Then save pending tool result
pending_message = ChatMessage( pending_message = ChatMessage(

View File

@@ -814,6 +814,28 @@ async def get_active_task_for_session(
if task_user_id and user_id != task_user_id: if task_user_id and user_id != task_user_id:
continue continue
# Auto-expire stale tasks that exceeded stream_timeout
created_at_str = meta.get("created_at", "")
if created_at_str:
try:
created_at = datetime.fromisoformat(created_at_str)
age_seconds = (
datetime.now(timezone.utc) - created_at
).total_seconds()
if age_seconds > config.stream_timeout:
logger.warning(
f"[TASK_LOOKUP] Auto-expiring stale task {task_id[:8]}... "
f"(age={age_seconds:.0f}s > timeout={config.stream_timeout}s)"
)
await mark_task_completed(task_id, "failed")
continue
except (ValueError, TypeError):
pass
logger.info(
f"[TASK_LOOKUP] Found running task {task_id[:8]}... for session {session_id[:8]}..."
)
# Get the last message ID from Redis Stream # Get the last message ID from Redis Stream
stream_key = _get_task_stream_key(task_id) stream_key = _get_task_stream_key(task_id)
last_id = "0-0" last_id = "0-0"

View File

@@ -9,6 +9,8 @@ from backend.api.features.chat.tracking import track_tool_called
from .add_understanding import AddUnderstandingTool from .add_understanding import AddUnderstandingTool
from .agent_output import AgentOutputTool from .agent_output import AgentOutputTool
from .base import BaseTool from .base import BaseTool
from .bash_exec import BashExecTool
from .check_operation_status import CheckOperationStatusTool
from .create_agent import CreateAgentTool from .create_agent import CreateAgentTool
from .customize_agent import CustomizeAgentTool from .customize_agent import CustomizeAgentTool
from .edit_agent import EditAgentTool from .edit_agent import EditAgentTool
@@ -19,6 +21,7 @@ from .get_doc_page import GetDocPageTool
from .run_agent import RunAgentTool from .run_agent import RunAgentTool
from .run_block import RunBlockTool from .run_block import RunBlockTool
from .search_docs import SearchDocsTool from .search_docs import SearchDocsTool
from .web_fetch import WebFetchTool
from .workspace_files import ( from .workspace_files import (
DeleteWorkspaceFileTool, DeleteWorkspaceFileTool,
ListWorkspaceFilesTool, ListWorkspaceFilesTool,
@@ -43,9 +46,14 @@ TOOL_REGISTRY: dict[str, BaseTool] = {
"run_agent": RunAgentTool(), "run_agent": RunAgentTool(),
"run_block": RunBlockTool(), "run_block": RunBlockTool(),
"view_agent_output": AgentOutputTool(), "view_agent_output": AgentOutputTool(),
"check_operation_status": CheckOperationStatusTool(),
"search_docs": SearchDocsTool(), "search_docs": SearchDocsTool(),
"get_doc_page": GetDocPageTool(), "get_doc_page": GetDocPageTool(),
# Workspace tools for CoPilot file operations # Web fetch for safe URL retrieval
"web_fetch": WebFetchTool(),
# Sandboxed code execution (bubblewrap)
"bash_exec": BashExecTool(),
# Persistent workspace tools (cloud storage, survives across sessions)
"list_workspace_files": ListWorkspaceFilesTool(), "list_workspace_files": ListWorkspaceFilesTool(),
"read_workspace_file": ReadWorkspaceFileTool(), "read_workspace_file": ReadWorkspaceFileTool(),
"write_workspace_file": WriteWorkspaceFileTool(), "write_workspace_file": WriteWorkspaceFileTool(),

View File

@@ -0,0 +1,131 @@
"""Bash execution tool — run shell commands in a bubblewrap sandbox.
Full Bash scripting is allowed (loops, conditionals, pipes, functions, etc.).
Safety comes from OS-level isolation (bubblewrap): only system dirs visible
read-only, writable workspace only, clean env, no network.
Requires bubblewrap (``bwrap``) — the tool is disabled when bwrap is not
available (e.g. macOS development).
"""
import logging
from typing import Any
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools.base import BaseTool
from backend.api.features.chat.tools.models import (
BashExecResponse,
ErrorResponse,
ToolResponseBase,
)
from backend.api.features.chat.tools.sandbox import (
get_workspace_dir,
has_full_sandbox,
run_sandboxed,
)
logger = logging.getLogger(__name__)
class BashExecTool(BaseTool):
"""Execute Bash commands in a bubblewrap sandbox."""
@property
def name(self) -> str:
return "bash_exec"
@property
def description(self) -> str:
if not has_full_sandbox():
return (
"Bash execution is DISABLED — bubblewrap sandbox is not "
"available on this platform. Do not call this tool."
)
return (
"Execute a Bash command or script in a bubblewrap sandbox. "
"Full Bash scripting is supported (loops, conditionals, pipes, "
"functions, etc.). "
"The sandbox shares the same working directory as the SDK Read/Write "
"tools — files created by either are accessible to both. "
"SECURITY: Only system directories (/usr, /bin, /lib, /etc) are "
"visible read-only, the per-session workspace is the only writable "
"path, environment variables are wiped (no secrets), all network "
"access is blocked at the kernel level, and resource limits are "
"enforced (max 64 processes, 512MB memory, 50MB file size). "
"Application code, configs, and other directories are NOT accessible. "
"To fetch web content, use the web_fetch tool instead. "
"Execution is killed after the timeout (default 30s, max 120s). "
"Returns stdout and stderr. "
"Useful for file manipulation, data processing with Unix tools "
"(grep, awk, sed, jq, etc.), and running shell scripts."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "Bash command or script to execute.",
},
"timeout": {
"type": "integer",
"description": (
"Max execution time in seconds (default 30, max 120)."
),
"default": 30,
},
},
"required": ["command"],
}
@property
def requires_auth(self) -> bool:
return False
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs: Any,
) -> ToolResponseBase:
session_id = session.session_id if session else None
if not has_full_sandbox():
return ErrorResponse(
message="bash_exec requires bubblewrap sandbox (Linux only).",
error="sandbox_unavailable",
session_id=session_id,
)
command: str = (kwargs.get("command") or "").strip()
timeout: int = kwargs.get("timeout", 30)
if not command:
return ErrorResponse(
message="No command provided.",
error="empty_command",
session_id=session_id,
)
workspace = get_workspace_dir(session_id or "default")
stdout, stderr, exit_code, timed_out = await run_sandboxed(
command=["bash", "-c", command],
cwd=workspace,
timeout=timeout,
)
return BashExecResponse(
message=(
"Execution timed out"
if timed_out
else f"Command executed (exit {exit_code})"
),
stdout=stdout,
stderr=stderr,
exit_code=exit_code,
timed_out=timed_out,
session_id=session_id,
)

View File

@@ -0,0 +1,127 @@
"""CheckOperationStatusTool — query the status of a long-running operation."""
import logging
from typing import Any
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools.base import BaseTool
from backend.api.features.chat.tools.models import (
ErrorResponse,
ResponseType,
ToolResponseBase,
)
logger = logging.getLogger(__name__)
class OperationStatusResponse(ToolResponseBase):
"""Response for check_operation_status tool."""
type: ResponseType = ResponseType.OPERATION_STATUS
task_id: str
operation_id: str
status: str # "running", "completed", "failed"
tool_name: str | None = None
message: str = ""
class CheckOperationStatusTool(BaseTool):
"""Check the status of a long-running operation (create_agent, edit_agent, etc.).
The CoPilot uses this tool to report back to the user whether an
operation that was started earlier has completed, failed, or is still
running.
"""
@property
def name(self) -> str:
return "check_operation_status"
@property
def description(self) -> str:
return (
"Check the current status of a long-running operation such as "
"create_agent or edit_agent. Accepts either an operation_id or "
"task_id from a previous operation_started response. "
"Returns the current status: running, completed, or failed."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"operation_id": {
"type": "string",
"description": (
"The operation_id from an operation_started response."
),
},
"task_id": {
"type": "string",
"description": (
"The task_id from an operation_started response. "
"Used as fallback if operation_id is not provided."
),
},
},
"required": [],
}
@property
def requires_auth(self) -> bool:
return False
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
from backend.api.features.chat import stream_registry
operation_id: str = kwargs.get("operation_id", "").strip()
task_id: str = kwargs.get("task_id", "").strip()
if not operation_id and not task_id:
return ErrorResponse(
message="Please provide an operation_id or task_id.",
error="missing_parameter",
)
task = None
if operation_id:
task = await stream_registry.find_task_by_operation_id(operation_id)
if task is None and task_id:
task = await stream_registry.get_task(task_id)
if task is None:
# Task not in Redis — it may have already expired (TTL).
# Check conversation history for the result instead.
return ErrorResponse(
message=(
"Operation not found — it may have already completed and "
"expired from the status tracker. Check the conversation "
"history for the result."
),
error="not_found",
)
status_messages = {
"running": (
f"The {task.tool_name or 'operation'} is still running. "
"Please wait for it to complete."
),
"completed": (
f"The {task.tool_name or 'operation'} has completed successfully."
),
"failed": f"The {task.tool_name or 'operation'} has failed.",
}
return OperationStatusResponse(
task_id=task.task_id,
operation_id=task.operation_id,
status=task.status,
tool_name=task.tool_name,
message=status_messages.get(task.status, f"Status: {task.status}"),
)

View File

@@ -13,7 +13,8 @@ from backend.api.features.chat.tools.models import (
NoResultsResponse, NoResultsResponse,
) )
from backend.api.features.store.hybrid_search import unified_hybrid_search from backend.api.features.store.hybrid_search import unified_hybrid_search
from backend.data.block import BlockType, get_block from backend.blocks import get_block
from backend.blocks._base import BlockType
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View File

@@ -10,7 +10,7 @@ from backend.api.features.chat.tools.find_block import (
FindBlockTool, FindBlockTool,
) )
from backend.api.features.chat.tools.models import BlockListResponse from backend.api.features.chat.tools.models import BlockListResponse
from backend.data.block import BlockType from backend.blocks._base import BlockType
from ._test_data import make_session from ._test_data import make_session

View File

@@ -40,6 +40,12 @@ class ResponseType(str, Enum):
OPERATION_IN_PROGRESS = "operation_in_progress" OPERATION_IN_PROGRESS = "operation_in_progress"
# Input validation # Input validation
INPUT_VALIDATION_ERROR = "input_validation_error" INPUT_VALIDATION_ERROR = "input_validation_error"
# Web fetch
WEB_FETCH = "web_fetch"
# Code execution
BASH_EXEC = "bash_exec"
# Operation status check
OPERATION_STATUS = "operation_status"
# Base response model # Base response model
@@ -335,11 +341,17 @@ class BlockInfoSummary(BaseModel):
name: str name: str
description: str description: str
categories: list[str] categories: list[str]
input_schema: dict[str, Any] input_schema: dict[str, Any] = Field(
output_schema: dict[str, Any] default_factory=dict,
description="Full JSON schema for block inputs",
)
output_schema: dict[str, Any] = Field(
default_factory=dict,
description="Full JSON schema for block outputs",
)
required_inputs: list[BlockInputFieldInfo] = Field( required_inputs: list[BlockInputFieldInfo] = Field(
default_factory=list, default_factory=list,
description="List of required input fields for this block", description="List of input fields for this block",
) )
@@ -352,7 +364,7 @@ class BlockListResponse(ToolResponseBase):
query: str query: str
usage_hint: str = Field( usage_hint: str = Field(
default="To execute a block, call run_block with block_id set to the block's " default="To execute a block, call run_block with block_id set to the block's "
"'id' field and input_data containing the required fields from input_schema." "'id' field and input_data containing the fields listed in required_inputs."
) )
@@ -421,3 +433,24 @@ class AsyncProcessingResponse(ToolResponseBase):
status: str = "accepted" # Must be "accepted" for detection status: str = "accepted" # Must be "accepted" for detection
operation_id: str | None = None operation_id: str | None = None
task_id: str | None = None task_id: str | None = None
class WebFetchResponse(ToolResponseBase):
"""Response for web_fetch tool."""
type: ResponseType = ResponseType.WEB_FETCH
url: str
status_code: int
content_type: str
content: str
truncated: bool = False
class BashExecResponse(ToolResponseBase):
"""Response for bash_exec tool."""
type: ResponseType = ResponseType.BASH_EXEC
stdout: str
stderr: str
exit_code: int
timed_out: bool = False

View File

@@ -12,7 +12,8 @@ from backend.api.features.chat.tools.find_block import (
COPILOT_EXCLUDED_BLOCK_IDS, COPILOT_EXCLUDED_BLOCK_IDS,
COPILOT_EXCLUDED_BLOCK_TYPES, COPILOT_EXCLUDED_BLOCK_TYPES,
) )
from backend.data.block import AnyBlockSchema, get_block from backend.blocks import get_block
from backend.blocks._base import AnyBlockSchema
from backend.data.execution import ExecutionContext from backend.data.execution import ExecutionContext
from backend.data.model import CredentialsFieldInfo, CredentialsMetaInput from backend.data.model import CredentialsFieldInfo, CredentialsMetaInput
from backend.data.workspace import get_or_create_workspace from backend.data.workspace import get_or_create_workspace

View File

@@ -6,7 +6,7 @@ import pytest
from backend.api.features.chat.tools.models import ErrorResponse from backend.api.features.chat.tools.models import ErrorResponse
from backend.api.features.chat.tools.run_block import RunBlockTool from backend.api.features.chat.tools.run_block import RunBlockTool
from backend.data.block import BlockType from backend.blocks._base import BlockType
from ._test_data import make_session from ._test_data import make_session

View File

@@ -0,0 +1,267 @@
"""Sandbox execution utilities for code execution tools.
Provides filesystem + network isolated command execution using **bubblewrap**
(``bwrap``): whitelist-only filesystem (only system dirs visible read-only),
writable workspace only, clean environment, network blocked.
Tools that call :func:`run_sandboxed` must first check :func:`has_full_sandbox`
and refuse to run if bubblewrap is not available.
"""
import asyncio
import logging
import os
import platform
import shutil
logger = logging.getLogger(__name__)
# Output limits — prevent blowing up LLM context
_MAX_OUTPUT_CHARS = 50_000
_DEFAULT_TIMEOUT = 30
_MAX_TIMEOUT = 120
# ---------------------------------------------------------------------------
# Sandbox capability detection (cached at first call)
# ---------------------------------------------------------------------------
_BWRAP_AVAILABLE: bool | None = None
def has_full_sandbox() -> bool:
"""Return True if bubblewrap is available (filesystem + network isolation).
On non-Linux platforms (macOS), always returns False.
"""
global _BWRAP_AVAILABLE
if _BWRAP_AVAILABLE is None:
_BWRAP_AVAILABLE = (
platform.system() == "Linux" and shutil.which("bwrap") is not None
)
return _BWRAP_AVAILABLE
WORKSPACE_PREFIX = "/tmp/copilot-"
def make_session_path(session_id: str) -> str:
"""Build a sanitized, session-specific path under :data:`WORKSPACE_PREFIX`.
Shared by both the SDK working-directory setup and the sandbox tools so
they always resolve to the same directory for a given session.
Steps:
1. Strip all characters except ``[A-Za-z0-9-]``.
2. Construct ``/tmp/copilot-<safe_id>``.
3. Validate via ``os.path.normpath`` + ``startswith`` (CodeQL-recognised
sanitizer) to prevent path traversal.
Raises:
ValueError: If the resulting path escapes the prefix.
"""
import re
safe_id = re.sub(r"[^A-Za-z0-9-]", "", session_id)
if not safe_id:
safe_id = "default"
path = os.path.normpath(f"{WORKSPACE_PREFIX}{safe_id}")
if not path.startswith(WORKSPACE_PREFIX):
raise ValueError(f"Session path escaped prefix: {path}")
return path
def get_workspace_dir(session_id: str) -> str:
"""Get or create the workspace directory for a session.
Uses :func:`make_session_path` — the same path the SDK uses — so that
bash_exec shares the workspace with the SDK file tools.
"""
workspace = make_session_path(session_id)
os.makedirs(workspace, exist_ok=True)
return workspace
# ---------------------------------------------------------------------------
# Bubblewrap command builder
# ---------------------------------------------------------------------------
# System directories mounted read-only inside the sandbox.
# ONLY these are visible — /app, /root, /home, /opt, /var etc. are NOT accessible.
_SYSTEM_RO_BINDS = [
"/usr", # binaries, libraries, Python interpreter
"/etc", # system config: ld.so, locale, passwd, alternatives
]
# Compat paths: symlinks to /usr/* on modern Debian, real dirs on older systems.
# On Debian 13 these are symlinks (e.g. /bin -> usr/bin). bwrap --ro-bind
# can't create a symlink target, so we detect and use --symlink instead.
# /lib64 is critical: the ELF dynamic linker lives at /lib64/ld-linux-x86-64.so.2.
_COMPAT_PATHS = [
("/bin", "usr/bin"), # -> /usr/bin on Debian 13
("/sbin", "usr/sbin"), # -> /usr/sbin on Debian 13
("/lib", "usr/lib"), # -> /usr/lib on Debian 13
("/lib64", "usr/lib64"), # 64-bit libraries / ELF interpreter
]
# Resource limits to prevent fork bombs, memory exhaustion, and disk abuse.
# Applied via ulimit inside the sandbox before exec'ing the user command.
_RESOURCE_LIMITS = (
"ulimit -u 64" # max 64 processes (prevents fork bombs)
" -v 524288" # 512 MB virtual memory
" -f 51200" # 50 MB max file size (1024-byte blocks)
" -n 256" # 256 open file descriptors
" 2>/dev/null"
)
def _build_bwrap_command(
command: list[str], cwd: str, env: dict[str, str]
) -> list[str]:
"""Build a bubblewrap command with strict filesystem + network isolation.
Security model:
- **Whitelist-only filesystem**: only system directories (``/usr``, ``/etc``,
``/bin``, ``/lib``) are mounted read-only. Application code (``/app``),
home directories, ``/var``, ``/opt``, etc. are NOT accessible at all.
- **Writable workspace only**: the per-session workspace is the sole
writable path.
- **Clean environment**: ``--clearenv`` wipes all inherited env vars.
Only the explicitly-passed safe env vars are set inside the sandbox.
- **Network isolation**: ``--unshare-net`` blocks all network access.
- **Resource limits**: ulimit caps on processes (64), memory (512MB),
file size (50MB), and open FDs (256) to prevent fork bombs and abuse.
- **New session**: prevents terminal control escape.
- **Die with parent**: prevents orphaned sandbox processes.
"""
cmd = [
"bwrap",
# Create a new user namespace so bwrap can set up sandboxing
# inside unprivileged Docker containers (no CAP_SYS_ADMIN needed).
"--unshare-user",
# Wipe all inherited environment variables (API keys, secrets, etc.)
"--clearenv",
]
# Set only the safe env vars inside the sandbox
for key, value in env.items():
cmd.extend(["--setenv", key, value])
# System directories: read-only
for path in _SYSTEM_RO_BINDS:
cmd.extend(["--ro-bind", path, path])
# Compat paths: use --symlink when host path is a symlink (Debian 13),
# --ro-bind when it's a real directory (older distros).
for path, symlink_target in _COMPAT_PATHS:
if os.path.islink(path):
cmd.extend(["--symlink", symlink_target, path])
elif os.path.exists(path):
cmd.extend(["--ro-bind", path, path])
# Wrap the user command with resource limits:
# sh -c 'ulimit ...; exec "$@"' -- <original command>
# `exec "$@"` replaces the shell so there's no extra process overhead,
# and properly handles arguments with spaces.
limited_command = [
"sh",
"-c",
f'{_RESOURCE_LIMITS}; exec "$@"',
"--",
*command,
]
cmd.extend(
[
# Fresh virtual filesystems
"--dev",
"/dev",
"--proc",
"/proc",
"--tmpfs",
"/tmp",
# Workspace bind AFTER --tmpfs /tmp so it's visible through the tmpfs.
# (workspace lives under /tmp/copilot-<session>)
"--bind",
cwd,
cwd,
# Isolation
"--unshare-net",
"--die-with-parent",
"--new-session",
"--chdir",
cwd,
"--",
*limited_command,
]
)
return cmd
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def run_sandboxed(
command: list[str],
cwd: str,
timeout: int = _DEFAULT_TIMEOUT,
env: dict[str, str] | None = None,
) -> tuple[str, str, int, bool]:
"""Run a command inside a bubblewrap sandbox.
Callers **must** check :func:`has_full_sandbox` before calling this
function. If bubblewrap is not available, this function raises
:class:`RuntimeError` rather than running unsandboxed.
Returns:
(stdout, stderr, exit_code, timed_out)
"""
if not has_full_sandbox():
raise RuntimeError(
"run_sandboxed() requires bubblewrap but bwrap is not available. "
"Callers must check has_full_sandbox() before calling this function."
)
timeout = min(max(timeout, 1), _MAX_TIMEOUT)
safe_env = {
"PATH": "/usr/local/bin:/usr/bin:/bin",
"HOME": cwd,
"TMPDIR": cwd,
"LANG": "en_US.UTF-8",
"PYTHONDONTWRITEBYTECODE": "1",
"PYTHONIOENCODING": "utf-8",
}
if env:
safe_env.update(env)
full_command = _build_bwrap_command(command, cwd, safe_env)
try:
proc = await asyncio.create_subprocess_exec(
*full_command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=cwd,
env=safe_env,
)
try:
stdout_bytes, stderr_bytes = await asyncio.wait_for(
proc.communicate(), timeout=timeout
)
stdout = stdout_bytes.decode("utf-8", errors="replace")[:_MAX_OUTPUT_CHARS]
stderr = stderr_bytes.decode("utf-8", errors="replace")[:_MAX_OUTPUT_CHARS]
return stdout, stderr, proc.returncode or 0, False
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
return "", f"Execution timed out after {timeout}s", -1, True
except RuntimeError:
raise
except Exception as e:
return "", f"Sandbox error: {e}", -1, False

View File

@@ -0,0 +1,156 @@
"""Web fetch tool — safely retrieve public web page content."""
import logging
from typing import Any
import aiohttp
import html2text
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools.base import BaseTool
from backend.api.features.chat.tools.models import (
ErrorResponse,
ToolResponseBase,
WebFetchResponse,
)
from backend.util.request import Requests
logger = logging.getLogger(__name__)
# Limits
_MAX_CONTENT_BYTES = 102_400 # 100 KB download cap
_MAX_OUTPUT_CHARS = 50_000 # 50K char truncation for LLM context
_REQUEST_TIMEOUT = aiohttp.ClientTimeout(total=15)
# Content types we'll read as text
_TEXT_CONTENT_TYPES = {
"text/html",
"text/plain",
"text/xml",
"text/csv",
"text/markdown",
"application/json",
"application/xml",
"application/xhtml+xml",
"application/rss+xml",
"application/atom+xml",
}
def _is_text_content(content_type: str) -> bool:
base = content_type.split(";")[0].strip().lower()
return base in _TEXT_CONTENT_TYPES or base.startswith("text/")
def _html_to_text(html: str) -> str:
h = html2text.HTML2Text()
h.ignore_links = False
h.ignore_images = True
h.body_width = 0
return h.handle(html)
class WebFetchTool(BaseTool):
"""Safely fetch content from a public URL using SSRF-protected HTTP."""
@property
def name(self) -> str:
return "web_fetch"
@property
def description(self) -> str:
return (
"Fetch the content of a public web page by URL. "
"Returns readable text extracted from HTML by default. "
"Useful for reading documentation, articles, and API responses. "
"Only supports HTTP/HTTPS GET requests to public URLs "
"(private/internal network addresses are blocked)."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "The public HTTP/HTTPS URL to fetch.",
},
"extract_text": {
"type": "boolean",
"description": (
"If true (default), extract readable text from HTML. "
"If false, return raw content."
),
"default": True,
},
},
"required": ["url"],
}
@property
def requires_auth(self) -> bool:
return False
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs: Any,
) -> ToolResponseBase:
url: str = (kwargs.get("url") or "").strip()
extract_text: bool = kwargs.get("extract_text", True)
session_id = session.session_id if session else None
if not url:
return ErrorResponse(
message="Please provide a URL to fetch.",
error="missing_url",
session_id=session_id,
)
try:
client = Requests(raise_for_status=False, retry_max_attempts=1)
response = await client.get(url, timeout=_REQUEST_TIMEOUT)
except ValueError as e:
# validate_url raises ValueError for SSRF / blocked IPs
return ErrorResponse(
message=f"URL blocked: {e}",
error="url_blocked",
session_id=session_id,
)
except Exception as e:
logger.warning(f"[web_fetch] Request failed for {url}: {e}")
return ErrorResponse(
message=f"Failed to fetch URL: {e}",
error="fetch_failed",
session_id=session_id,
)
content_type = response.headers.get("content-type", "")
if not _is_text_content(content_type):
return ErrorResponse(
message=f"Non-text content type: {content_type.split(';')[0]}",
error="unsupported_content_type",
session_id=session_id,
)
raw = response.content[:_MAX_CONTENT_BYTES]
text = raw.decode("utf-8", errors="replace")
if extract_text and "html" in content_type.lower():
text = _html_to_text(text)
truncated = len(text) > _MAX_OUTPUT_CHARS
if truncated:
text = text[:_MAX_OUTPUT_CHARS]
return WebFetchResponse(
message=f"Fetched {url}" + (" (truncated)" if truncated else ""),
url=response.url,
status_code=response.status,
content_type=content_type.split(";")[0].strip(),
content=text,
truncated=truncated,
session_id=session_id,
)

View File

@@ -88,7 +88,9 @@ class ListWorkspaceFilesTool(BaseTool):
@property @property
def description(self) -> str: def description(self) -> str:
return ( return (
"List files in the user's workspace. " "List files in the user's persistent workspace (cloud storage). "
"These files survive across sessions. "
"For ephemeral session files, use the SDK Read/Glob tools instead. "
"Returns file names, paths, sizes, and metadata. " "Returns file names, paths, sizes, and metadata. "
"Optionally filter by path prefix." "Optionally filter by path prefix."
) )
@@ -204,7 +206,9 @@ class ReadWorkspaceFileTool(BaseTool):
@property @property
def description(self) -> str: def description(self) -> str:
return ( return (
"Read a file from the user's workspace. " "Read a file from the user's persistent workspace (cloud storage). "
"These files survive across sessions. "
"For ephemeral session files, use the SDK Read tool instead. "
"Specify either file_id or path to identify the file. " "Specify either file_id or path to identify the file. "
"For small text files, returns content directly. " "For small text files, returns content directly. "
"For large or binary files, returns metadata and a download URL. " "For large or binary files, returns metadata and a download URL. "
@@ -378,7 +382,9 @@ class WriteWorkspaceFileTool(BaseTool):
@property @property
def description(self) -> str: def description(self) -> str:
return ( return (
"Write or create a file in the user's workspace. " "Write or create a file in the user's persistent workspace (cloud storage). "
"These files survive across sessions. "
"For ephemeral session files, use the SDK Write tool instead. "
"Provide the content as a base64-encoded string. " "Provide the content as a base64-encoded string. "
f"Maximum file size is {Config().max_file_size_mb}MB. " f"Maximum file size is {Config().max_file_size_mb}MB. "
"Files are saved to the current session's folder by default. " "Files are saved to the current session's folder by default. "
@@ -523,7 +529,7 @@ class DeleteWorkspaceFileTool(BaseTool):
@property @property
def description(self) -> str: def description(self) -> str:
return ( return (
"Delete a file from the user's workspace. " "Delete a file from the user's persistent workspace (cloud storage). "
"Specify either file_id or path to identify the file. " "Specify either file_id or path to identify the file. "
"Paths are scoped to the current session by default. " "Paths are scoped to the current session by default. "
"Use /sessions/<session_id>/... for cross-session access." "Use /sessions/<session_id>/... for cross-session access."

View File

@@ -12,12 +12,11 @@ import backend.api.features.store.image_gen as store_image_gen
import backend.api.features.store.media as store_media import backend.api.features.store.media as store_media
import backend.data.graph as graph_db import backend.data.graph as graph_db
import backend.data.integrations as integrations_db import backend.data.integrations as integrations_db
from backend.data.block import BlockInput
from backend.data.db import transaction from backend.data.db import transaction
from backend.data.execution import get_graph_execution from backend.data.execution import get_graph_execution
from backend.data.graph import GraphSettings from backend.data.graph import GraphSettings
from backend.data.includes import AGENT_PRESET_INCLUDE, library_agent_include from backend.data.includes import AGENT_PRESET_INCLUDE, library_agent_include
from backend.data.model import CredentialsMetaInput from backend.data.model import CredentialsMetaInput, GraphInput
from backend.integrations.creds_manager import IntegrationCredentialsManager from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.integrations.webhooks.graph_lifecycle_hooks import ( from backend.integrations.webhooks.graph_lifecycle_hooks import (
on_graph_activate, on_graph_activate,
@@ -1130,7 +1129,7 @@ async def create_preset_from_graph_execution(
async def update_preset( async def update_preset(
user_id: str, user_id: str,
preset_id: str, preset_id: str,
inputs: Optional[BlockInput] = None, inputs: Optional[GraphInput] = None,
credentials: Optional[dict[str, CredentialsMetaInput]] = None, credentials: Optional[dict[str, CredentialsMetaInput]] = None,
name: Optional[str] = None, name: Optional[str] = None,
description: Optional[str] = None, description: Optional[str] = None,

View File

@@ -6,9 +6,12 @@ import prisma.enums
import prisma.models import prisma.models
import pydantic import pydantic
from backend.data.block import BlockInput
from backend.data.graph import GraphModel, GraphSettings, GraphTriggerInfo from backend.data.graph import GraphModel, GraphSettings, GraphTriggerInfo
from backend.data.model import CredentialsMetaInput, is_credentials_field_name from backend.data.model import (
CredentialsMetaInput,
GraphInput,
is_credentials_field_name,
)
from backend.util.json import loads as json_loads from backend.util.json import loads as json_loads
from backend.util.models import Pagination from backend.util.models import Pagination
@@ -323,7 +326,7 @@ class LibraryAgentPresetCreatable(pydantic.BaseModel):
graph_id: str graph_id: str
graph_version: int graph_version: int
inputs: BlockInput inputs: GraphInput
credentials: dict[str, CredentialsMetaInput] credentials: dict[str, CredentialsMetaInput]
name: str name: str
@@ -352,7 +355,7 @@ class LibraryAgentPresetUpdatable(pydantic.BaseModel):
Request model used when updating a preset for a library agent. Request model used when updating a preset for a library agent.
""" """
inputs: Optional[BlockInput] = None inputs: Optional[GraphInput] = None
credentials: Optional[dict[str, CredentialsMetaInput]] = None credentials: Optional[dict[str, CredentialsMetaInput]] = None
name: Optional[str] = None name: Optional[str] = None
@@ -395,7 +398,7 @@ class LibraryAgentPreset(LibraryAgentPresetCreatable):
"Webhook must be included in AgentPreset query when webhookId is set" "Webhook must be included in AgentPreset query when webhookId is set"
) )
input_data: BlockInput = {} input_data: GraphInput = {}
input_credentials: dict[str, CredentialsMetaInput] = {} input_credentials: dict[str, CredentialsMetaInput] = {}
for preset_input in preset.InputPresets: for preset_input in preset.InputPresets:

View File

@@ -5,8 +5,8 @@ from typing import Optional
import aiohttp import aiohttp
from fastapi import HTTPException from fastapi import HTTPException
from backend.blocks import get_block
from backend.data import graph as graph_db from backend.data import graph as graph_db
from backend.data.block import get_block
from backend.util.settings import Settings from backend.util.settings import Settings
from .models import ApiResponse, ChatRequest, GraphData from .models import ApiResponse, ChatRequest, GraphData

View File

@@ -152,7 +152,7 @@ class BlockHandler(ContentHandler):
async def get_missing_items(self, batch_size: int) -> list[ContentItem]: async def get_missing_items(self, batch_size: int) -> list[ContentItem]:
"""Fetch blocks without embeddings.""" """Fetch blocks without embeddings."""
from backend.data.block import get_blocks from backend.blocks import get_blocks
# Get all available blocks # Get all available blocks
all_blocks = get_blocks() all_blocks = get_blocks()
@@ -249,7 +249,7 @@ class BlockHandler(ContentHandler):
async def get_stats(self) -> dict[str, int]: async def get_stats(self) -> dict[str, int]:
"""Get statistics about block embedding coverage.""" """Get statistics about block embedding coverage."""
from backend.data.block import get_blocks from backend.blocks import get_blocks
all_blocks = get_blocks() all_blocks = get_blocks()

View File

@@ -93,7 +93,7 @@ async def test_block_handler_get_missing_items(mocker):
mock_existing = [] mock_existing = []
with patch( with patch(
"backend.data.block.get_blocks", "backend.blocks.get_blocks",
return_value=mock_blocks, return_value=mock_blocks,
): ):
with patch( with patch(
@@ -135,7 +135,7 @@ async def test_block_handler_get_stats(mocker):
mock_embedded = [{"count": 2}] mock_embedded = [{"count": 2}]
with patch( with patch(
"backend.data.block.get_blocks", "backend.blocks.get_blocks",
return_value=mock_blocks, return_value=mock_blocks,
): ):
with patch( with patch(
@@ -327,7 +327,7 @@ async def test_block_handler_handles_missing_attributes():
mock_blocks = {"block-minimal": mock_block_class} mock_blocks = {"block-minimal": mock_block_class}
with patch( with patch(
"backend.data.block.get_blocks", "backend.blocks.get_blocks",
return_value=mock_blocks, return_value=mock_blocks,
): ):
with patch( with patch(
@@ -360,7 +360,7 @@ async def test_block_handler_skips_failed_blocks():
mock_blocks = {"good-block": good_block, "bad-block": bad_block} mock_blocks = {"good-block": good_block, "bad-block": bad_block}
with patch( with patch(
"backend.data.block.get_blocks", "backend.blocks.get_blocks",
return_value=mock_blocks, return_value=mock_blocks,
): ):
with patch( with patch(

View File

@@ -662,7 +662,7 @@ async def cleanup_orphaned_embeddings() -> dict[str, Any]:
) )
current_ids = {row["id"] for row in valid_agents} current_ids = {row["id"] for row in valid_agents}
elif content_type == ContentType.BLOCK: elif content_type == ContentType.BLOCK:
from backend.data.block import get_blocks from backend.blocks import get_blocks
current_ids = set(get_blocks().keys()) current_ids = set(get_blocks().keys())
elif content_type == ContentType.DOCUMENTATION: elif content_type == ContentType.DOCUMENTATION:

View File

@@ -7,15 +7,6 @@ from replicate.client import Client as ReplicateClient
from replicate.exceptions import ReplicateError from replicate.exceptions import ReplicateError
from replicate.helpers import FileOutput from replicate.helpers import FileOutput
from backend.blocks.ideogram import (
AspectRatio,
ColorPalettePreset,
IdeogramModelBlock,
IdeogramModelName,
MagicPromptOption,
StyleType,
UpscaleOption,
)
from backend.data.graph import GraphBaseMeta from backend.data.graph import GraphBaseMeta
from backend.data.model import CredentialsMetaInput, ProviderName from backend.data.model import CredentialsMetaInput, ProviderName
from backend.integrations.credentials_store import ideogram_credentials from backend.integrations.credentials_store import ideogram_credentials
@@ -50,6 +41,16 @@ async def generate_agent_image_v2(graph: GraphBaseMeta | AgentGraph) -> io.Bytes
if not ideogram_credentials.api_key: if not ideogram_credentials.api_key:
raise ValueError("Missing Ideogram API key") raise ValueError("Missing Ideogram API key")
from backend.blocks.ideogram import (
AspectRatio,
ColorPalettePreset,
IdeogramModelBlock,
IdeogramModelName,
MagicPromptOption,
StyleType,
UpscaleOption,
)
name = graph.name name = graph.name
description = f"{name} ({graph.description})" if graph.description else name description = f"{name} ({graph.description})" if graph.description else name

View File

@@ -40,10 +40,11 @@ from backend.api.model import (
UpdateTimezoneRequest, UpdateTimezoneRequest,
UploadFileResponse, UploadFileResponse,
) )
from backend.blocks import get_block, get_blocks
from backend.data import execution as execution_db from backend.data import execution as execution_db
from backend.data import graph as graph_db from backend.data import graph as graph_db
from backend.data.auth import api_key as api_key_db from backend.data.auth import api_key as api_key_db
from backend.data.block import BlockInput, CompletedBlockOutput, get_block, get_blocks from backend.data.block import BlockInput, CompletedBlockOutput
from backend.data.credit import ( from backend.data.credit import (
AutoTopUpConfig, AutoTopUpConfig,
RefundRequest, RefundRequest,

View File

@@ -3,22 +3,19 @@ import logging
import os import os
import re import re
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, TypeVar from typing import Sequence, Type, TypeVar
from backend.blocks._base import AnyBlockSchema, BlockType
from backend.util.cache import cached from backend.util.cache import cached
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from backend.data.block import Block
T = TypeVar("T") T = TypeVar("T")
@cached(ttl_seconds=3600) @cached(ttl_seconds=3600)
def load_all_blocks() -> dict[str, type["Block"]]: def load_all_blocks() -> dict[str, type["AnyBlockSchema"]]:
from backend.data.block import Block from backend.blocks._base import Block
from backend.util.settings import Config from backend.util.settings import Config
# Check if example blocks should be loaded from settings # Check if example blocks should be loaded from settings
@@ -50,8 +47,8 @@ def load_all_blocks() -> dict[str, type["Block"]]:
importlib.import_module(f".{module}", package=__name__) importlib.import_module(f".{module}", package=__name__)
# Load all Block instances from the available modules # Load all Block instances from the available modules
available_blocks: dict[str, type["Block"]] = {} available_blocks: dict[str, type["AnyBlockSchema"]] = {}
for block_cls in all_subclasses(Block): for block_cls in _all_subclasses(Block):
class_name = block_cls.__name__ class_name = block_cls.__name__
if class_name.endswith("Base"): if class_name.endswith("Base"):
@@ -64,7 +61,7 @@ def load_all_blocks() -> dict[str, type["Block"]]:
"please name the class with 'Base' at the end" "please name the class with 'Base' at the end"
) )
block = block_cls.create() block = block_cls() # pyright: ignore[reportAbstractUsage]
if not isinstance(block.id, str) or len(block.id) != 36: if not isinstance(block.id, str) or len(block.id) != 36:
raise ValueError( raise ValueError(
@@ -105,7 +102,7 @@ def load_all_blocks() -> dict[str, type["Block"]]:
available_blocks[block.id] = block_cls available_blocks[block.id] = block_cls
# Filter out blocks with incomplete auth configs, e.g. missing OAuth server secrets # Filter out blocks with incomplete auth configs, e.g. missing OAuth server secrets
from backend.data.block import is_block_auth_configured from ._utils import is_block_auth_configured
filtered_blocks = {} filtered_blocks = {}
for block_id, block_cls in available_blocks.items(): for block_id, block_cls in available_blocks.items():
@@ -115,11 +112,48 @@ def load_all_blocks() -> dict[str, type["Block"]]:
return filtered_blocks return filtered_blocks
__all__ = ["load_all_blocks"] def _all_subclasses(cls: type[T]) -> list[type[T]]:
def all_subclasses(cls: type[T]) -> list[type[T]]:
subclasses = cls.__subclasses__() subclasses = cls.__subclasses__()
for subclass in subclasses: for subclass in subclasses:
subclasses += all_subclasses(subclass) subclasses += _all_subclasses(subclass)
return subclasses return subclasses
# ============== Block access helper functions ============== #
def get_blocks() -> dict[str, Type["AnyBlockSchema"]]:
return load_all_blocks()
# Note on the return type annotation: https://github.com/microsoft/pyright/issues/10281
def get_block(block_id: str) -> "AnyBlockSchema | None":
cls = get_blocks().get(block_id)
return cls() if cls else None
@cached(ttl_seconds=3600)
def get_webhook_block_ids() -> Sequence[str]:
return [
id
for id, B in get_blocks().items()
if B().block_type in (BlockType.WEBHOOK, BlockType.WEBHOOK_MANUAL)
]
@cached(ttl_seconds=3600)
def get_io_block_ids() -> Sequence[str]:
return [
id
for id, B in get_blocks().items()
if B().block_type in (BlockType.INPUT, BlockType.OUTPUT)
]
@cached(ttl_seconds=3600)
def get_human_in_the_loop_block_ids() -> Sequence[str]:
return [
id
for id, B in get_blocks().items()
if B().block_type == BlockType.HUMAN_IN_THE_LOOP
]

View File

@@ -0,0 +1,739 @@
import inspect
import logging
from abc import ABC, abstractmethod
from enum import Enum
from typing import (
TYPE_CHECKING,
Any,
Callable,
ClassVar,
Generic,
Optional,
Type,
TypeAlias,
TypeVar,
cast,
get_origin,
)
import jsonref
import jsonschema
from pydantic import BaseModel
from backend.data.block import BlockInput, BlockOutput, BlockOutputEntry
from backend.data.model import (
Credentials,
CredentialsFieldInfo,
CredentialsMetaInput,
SchemaField,
is_credentials_field_name,
)
from backend.integrations.providers import ProviderName
from backend.util import json
from backend.util.exceptions import (
BlockError,
BlockExecutionError,
BlockInputError,
BlockOutputError,
BlockUnknownError,
)
from backend.util.settings import Config
logger = logging.getLogger(__name__)
if TYPE_CHECKING:
from backend.data.execution import ExecutionContext
from backend.data.model import ContributorDetails, NodeExecutionStats
from ..data.graph import Link
app_config = Config()
BlockTestOutput = BlockOutputEntry | tuple[str, Callable[[Any], bool]]
class BlockType(Enum):
STANDARD = "Standard"
INPUT = "Input"
OUTPUT = "Output"
NOTE = "Note"
WEBHOOK = "Webhook"
WEBHOOK_MANUAL = "Webhook (manual)"
AGENT = "Agent"
AI = "AI"
AYRSHARE = "Ayrshare"
HUMAN_IN_THE_LOOP = "Human In The Loop"
class BlockCategory(Enum):
AI = "Block that leverages AI to perform a task."
SOCIAL = "Block that interacts with social media platforms."
TEXT = "Block that processes text data."
SEARCH = "Block that searches or extracts information from the internet."
BASIC = "Block that performs basic operations."
INPUT = "Block that interacts with input of the graph."
OUTPUT = "Block that interacts with output of the graph."
LOGIC = "Programming logic to control the flow of your agent"
COMMUNICATION = "Block that interacts with communication platforms."
DEVELOPER_TOOLS = "Developer tools such as GitHub blocks."
DATA = "Block that interacts with structured data."
HARDWARE = "Block that interacts with hardware."
AGENT = "Block that interacts with other agents."
CRM = "Block that interacts with CRM services."
SAFETY = (
"Block that provides AI safety mechanisms such as detecting harmful content"
)
PRODUCTIVITY = "Block that helps with productivity"
ISSUE_TRACKING = "Block that helps with issue tracking"
MULTIMEDIA = "Block that interacts with multimedia content"
MARKETING = "Block that helps with marketing"
def dict(self) -> dict[str, str]:
return {"category": self.name, "description": self.value}
class BlockCostType(str, Enum):
RUN = "run" # cost X credits per run
BYTE = "byte" # cost X credits per byte
SECOND = "second" # cost X credits per second
class BlockCost(BaseModel):
cost_amount: int
cost_filter: BlockInput
cost_type: BlockCostType
def __init__(
self,
cost_amount: int,
cost_type: BlockCostType = BlockCostType.RUN,
cost_filter: Optional[BlockInput] = None,
**data: Any,
) -> None:
super().__init__(
cost_amount=cost_amount,
cost_filter=cost_filter or {},
cost_type=cost_type,
**data,
)
class BlockInfo(BaseModel):
id: str
name: str
inputSchema: dict[str, Any]
outputSchema: dict[str, Any]
costs: list[BlockCost]
description: str
categories: list[dict[str, str]]
contributors: list[dict[str, Any]]
staticOutput: bool
uiType: str
class BlockSchema(BaseModel):
cached_jsonschema: ClassVar[dict[str, Any]]
@classmethod
def jsonschema(cls) -> dict[str, Any]:
if cls.cached_jsonschema:
return cls.cached_jsonschema
model = jsonref.replace_refs(cls.model_json_schema(), merge_props=True)
def ref_to_dict(obj):
if isinstance(obj, dict):
# OpenAPI <3.1 does not support sibling fields that has a $ref key
# So sometimes, the schema has an "allOf"/"anyOf"/"oneOf" with 1 item.
keys = {"allOf", "anyOf", "oneOf"}
one_key = next((k for k in keys if k in obj and len(obj[k]) == 1), None)
if one_key:
obj.update(obj[one_key][0])
return {
key: ref_to_dict(value)
for key, value in obj.items()
if not key.startswith("$") and key != one_key
}
elif isinstance(obj, list):
return [ref_to_dict(item) for item in obj]
return obj
cls.cached_jsonschema = cast(dict[str, Any], ref_to_dict(model))
return cls.cached_jsonschema
@classmethod
def validate_data(cls, data: BlockInput) -> str | None:
return json.validate_with_jsonschema(
schema=cls.jsonschema(),
data={k: v for k, v in data.items() if v is not None},
)
@classmethod
def get_mismatch_error(cls, data: BlockInput) -> str | None:
return cls.validate_data(data)
@classmethod
def get_field_schema(cls, field_name: str) -> dict[str, Any]:
model_schema = cls.jsonschema().get("properties", {})
if not model_schema:
raise ValueError(f"Invalid model schema {cls}")
property_schema = model_schema.get(field_name)
if not property_schema:
raise ValueError(f"Invalid property name {field_name}")
return property_schema
@classmethod
def validate_field(cls, field_name: str, data: BlockInput) -> str | None:
"""
Validate the data against a specific property (one of the input/output name).
Returns the validation error message if the data does not match the schema.
"""
try:
property_schema = cls.get_field_schema(field_name)
jsonschema.validate(json.to_dict(data), property_schema)
return None
except jsonschema.ValidationError as e:
return str(e)
@classmethod
def get_fields(cls) -> set[str]:
return set(cls.model_fields.keys())
@classmethod
def get_required_fields(cls) -> set[str]:
return {
field
for field, field_info in cls.model_fields.items()
if field_info.is_required()
}
@classmethod
def __pydantic_init_subclass__(cls, **kwargs):
"""Validates the schema definition. Rules:
- Fields with annotation `CredentialsMetaInput` MUST be
named `credentials` or `*_credentials`
- Fields named `credentials` or `*_credentials` MUST be
of type `CredentialsMetaInput`
"""
super().__pydantic_init_subclass__(**kwargs)
# Reset cached JSON schema to prevent inheriting it from parent class
cls.cached_jsonschema = {}
credentials_fields = cls.get_credentials_fields()
for field_name in cls.get_fields():
if is_credentials_field_name(field_name):
if field_name not in credentials_fields:
raise TypeError(
f"Credentials field '{field_name}' on {cls.__qualname__} "
f"is not of type {CredentialsMetaInput.__name__}"
)
CredentialsMetaInput.validate_credentials_field_schema(
cls.get_field_schema(field_name), field_name
)
elif field_name in credentials_fields:
raise KeyError(
f"Credentials field '{field_name}' on {cls.__qualname__} "
"has invalid name: must be 'credentials' or *_credentials"
)
@classmethod
def get_credentials_fields(cls) -> dict[str, type[CredentialsMetaInput]]:
return {
field_name: info.annotation
for field_name, info in cls.model_fields.items()
if (
inspect.isclass(info.annotation)
and issubclass(
get_origin(info.annotation) or info.annotation,
CredentialsMetaInput,
)
)
}
@classmethod
def get_auto_credentials_fields(cls) -> dict[str, dict[str, Any]]:
"""
Get fields that have auto_credentials metadata (e.g., GoogleDriveFileInput).
Returns a dict mapping kwarg_name -> {field_name, auto_credentials_config}
Raises:
ValueError: If multiple fields have the same kwarg_name, as this would
cause silent overwriting and only the last field would be processed.
"""
result: dict[str, dict[str, Any]] = {}
schema = cls.jsonschema()
properties = schema.get("properties", {})
for field_name, field_schema in properties.items():
auto_creds = field_schema.get("auto_credentials")
if auto_creds:
kwarg_name = auto_creds.get("kwarg_name", "credentials")
if kwarg_name in result:
raise ValueError(
f"Duplicate auto_credentials kwarg_name '{kwarg_name}' "
f"in fields '{result[kwarg_name]['field_name']}' and "
f"'{field_name}' on {cls.__qualname__}"
)
result[kwarg_name] = {
"field_name": field_name,
"config": auto_creds,
}
return result
@classmethod
def get_credentials_fields_info(cls) -> dict[str, CredentialsFieldInfo]:
result = {}
# Regular credentials fields
for field_name in cls.get_credentials_fields().keys():
result[field_name] = CredentialsFieldInfo.model_validate(
cls.get_field_schema(field_name), by_alias=True
)
# Auto-generated credentials fields (from GoogleDriveFileInput etc.)
for kwarg_name, info in cls.get_auto_credentials_fields().items():
config = info["config"]
# Build a schema-like dict that CredentialsFieldInfo can parse
auto_schema = {
"credentials_provider": [config.get("provider", "google")],
"credentials_types": [config.get("type", "oauth2")],
"credentials_scopes": config.get("scopes"),
}
result[kwarg_name] = CredentialsFieldInfo.model_validate(
auto_schema, by_alias=True
)
return result
@classmethod
def get_input_defaults(cls, data: BlockInput) -> BlockInput:
return data # Return as is, by default.
@classmethod
def get_missing_links(cls, data: BlockInput, links: list["Link"]) -> set[str]:
input_fields_from_nodes = {link.sink_name for link in links}
return input_fields_from_nodes - set(data)
@classmethod
def get_missing_input(cls, data: BlockInput) -> set[str]:
return cls.get_required_fields() - set(data)
class BlockSchemaInput(BlockSchema):
"""
Base schema class for block inputs.
All block input schemas should extend this class for consistency.
"""
pass
class BlockSchemaOutput(BlockSchema):
"""
Base schema class for block outputs that includes a standard error field.
All block output schemas should extend this class to ensure consistent error handling.
"""
error: str = SchemaField(
description="Error message if the operation failed", default=""
)
BlockSchemaInputType = TypeVar("BlockSchemaInputType", bound=BlockSchemaInput)
BlockSchemaOutputType = TypeVar("BlockSchemaOutputType", bound=BlockSchemaOutput)
class EmptyInputSchema(BlockSchemaInput):
pass
class EmptyOutputSchema(BlockSchemaOutput):
pass
# For backward compatibility - will be deprecated
EmptySchema = EmptyOutputSchema
# --8<-- [start:BlockWebhookConfig]
class BlockManualWebhookConfig(BaseModel):
"""
Configuration model for webhook-triggered blocks on which
the user has to manually set up the webhook at the provider.
"""
provider: ProviderName
"""The service provider that the webhook connects to"""
webhook_type: str
"""
Identifier for the webhook type. E.g. GitHub has repo and organization level hooks.
Only for use in the corresponding `WebhooksManager`.
"""
event_filter_input: str = ""
"""
Name of the block's event filter input.
Leave empty if the corresponding webhook doesn't have distinct event/payload types.
"""
event_format: str = "{event}"
"""
Template string for the event(s) that a block instance subscribes to.
Applied individually to each event selected in the event filter input.
Example: `"pull_request.{event}"` -> `"pull_request.opened"`
"""
class BlockWebhookConfig(BlockManualWebhookConfig):
"""
Configuration model for webhook-triggered blocks for which
the webhook can be automatically set up through the provider's API.
"""
resource_format: str
"""
Template string for the resource that a block instance subscribes to.
Fields will be filled from the block's inputs (except `payload`).
Example: `f"{repo}/pull_requests"` (note: not how it's actually implemented)
Only for use in the corresponding `WebhooksManager`.
"""
# --8<-- [end:BlockWebhookConfig]
class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
def __init__(
self,
id: str = "",
description: str = "",
contributors: list["ContributorDetails"] = [],
categories: set[BlockCategory] | None = None,
input_schema: Type[BlockSchemaInputType] = EmptyInputSchema,
output_schema: Type[BlockSchemaOutputType] = EmptyOutputSchema,
test_input: BlockInput | list[BlockInput] | None = None,
test_output: BlockTestOutput | list[BlockTestOutput] | None = None,
test_mock: dict[str, Any] | None = None,
test_credentials: Optional[Credentials | dict[str, Credentials]] = None,
disabled: bool = False,
static_output: bool = False,
block_type: BlockType = BlockType.STANDARD,
webhook_config: Optional[BlockWebhookConfig | BlockManualWebhookConfig] = None,
is_sensitive_action: bool = False,
):
"""
Initialize the block with the given schema.
Args:
id: The unique identifier for the block, this value will be persisted in the
DB. So it should be a unique and constant across the application run.
Use the UUID format for the ID.
description: The description of the block, explaining what the block does.
contributors: The list of contributors who contributed to the block.
input_schema: The schema, defined as a Pydantic model, for the input data.
output_schema: The schema, defined as a Pydantic model, for the output data.
test_input: The list or single sample input data for the block, for testing.
test_output: The list or single expected output if the test_input is run.
test_mock: function names on the block implementation to mock on test run.
disabled: If the block is disabled, it will not be available for execution.
static_output: Whether the output links of the block are static by default.
"""
from backend.data.model import NodeExecutionStats
self.id = id
self.input_schema = input_schema
self.output_schema = output_schema
self.test_input = test_input
self.test_output = test_output
self.test_mock = test_mock
self.test_credentials = test_credentials
self.description = description
self.categories = categories or set()
self.contributors = contributors or set()
self.disabled = disabled
self.static_output = static_output
self.block_type = block_type
self.webhook_config = webhook_config
self.is_sensitive_action = is_sensitive_action
self.execution_stats: "NodeExecutionStats" = NodeExecutionStats()
if self.webhook_config:
if isinstance(self.webhook_config, BlockWebhookConfig):
# Enforce presence of credentials field on auto-setup webhook blocks
if not (cred_fields := self.input_schema.get_credentials_fields()):
raise TypeError(
"credentials field is required on auto-setup webhook blocks"
)
# Disallow multiple credentials inputs on webhook blocks
elif len(cred_fields) > 1:
raise ValueError(
"Multiple credentials inputs not supported on webhook blocks"
)
self.block_type = BlockType.WEBHOOK
else:
self.block_type = BlockType.WEBHOOK_MANUAL
# Enforce shape of webhook event filter, if present
if self.webhook_config.event_filter_input:
event_filter_field = self.input_schema.model_fields[
self.webhook_config.event_filter_input
]
if not (
isinstance(event_filter_field.annotation, type)
and issubclass(event_filter_field.annotation, BaseModel)
and all(
field.annotation is bool
for field in event_filter_field.annotation.model_fields.values()
)
):
raise NotImplementedError(
f"{self.name} has an invalid webhook event selector: "
"field must be a BaseModel and all its fields must be boolean"
)
# Enforce presence of 'payload' input
if "payload" not in self.input_schema.model_fields:
raise TypeError(
f"{self.name} is webhook-triggered but has no 'payload' input"
)
# Disable webhook-triggered block if webhook functionality not available
if not app_config.platform_base_url:
self.disabled = True
@abstractmethod
async def run(self, input_data: BlockSchemaInputType, **kwargs) -> BlockOutput:
"""
Run the block with the given input data.
Args:
input_data: The input data with the structure of input_schema.
Kwargs: Currently 14/02/2025 these include
graph_id: The ID of the graph.
node_id: The ID of the node.
graph_exec_id: The ID of the graph execution.
node_exec_id: The ID of the node execution.
user_id: The ID of the user.
Returns:
A Generator that yields (output_name, output_data).
output_name: One of the output name defined in Block's output_schema.
output_data: The data for the output_name, matching the defined schema.
"""
# --- satisfy the type checker, never executed -------------
if False: # noqa: SIM115
yield "name", "value" # pyright: ignore[reportMissingYield]
raise NotImplementedError(f"{self.name} does not implement the run method.")
async def run_once(
self, input_data: BlockSchemaInputType, output: str, **kwargs
) -> Any:
async for item in self.run(input_data, **kwargs):
name, data = item
if name == output:
return data
raise ValueError(f"{self.name} did not produce any output for {output}")
def merge_stats(self, stats: "NodeExecutionStats") -> "NodeExecutionStats":
self.execution_stats += stats
return self.execution_stats
@property
def name(self):
return self.__class__.__name__
def to_dict(self):
return {
"id": self.id,
"name": self.name,
"inputSchema": self.input_schema.jsonschema(),
"outputSchema": self.output_schema.jsonschema(),
"description": self.description,
"categories": [category.dict() for category in self.categories],
"contributors": [
contributor.model_dump() for contributor in self.contributors
],
"staticOutput": self.static_output,
"uiType": self.block_type.value,
}
def get_info(self) -> BlockInfo:
from backend.data.credit import get_block_cost
return BlockInfo(
id=self.id,
name=self.name,
inputSchema=self.input_schema.jsonschema(),
outputSchema=self.output_schema.jsonschema(),
costs=get_block_cost(self),
description=self.description,
categories=[category.dict() for category in self.categories],
contributors=[
contributor.model_dump() for contributor in self.contributors
],
staticOutput=self.static_output,
uiType=self.block_type.value,
)
async def execute(self, input_data: BlockInput, **kwargs) -> BlockOutput:
try:
async for output_name, output_data in self._execute(input_data, **kwargs):
yield output_name, output_data
except Exception as ex:
if isinstance(ex, BlockError):
raise ex
else:
raise (
BlockExecutionError
if isinstance(ex, ValueError)
else BlockUnknownError
)(
message=str(ex),
block_name=self.name,
block_id=self.id,
) from ex
async def is_block_exec_need_review(
self,
input_data: BlockInput,
*,
user_id: str,
node_id: str,
node_exec_id: str,
graph_exec_id: str,
graph_id: str,
graph_version: int,
execution_context: "ExecutionContext",
**kwargs,
) -> tuple[bool, BlockInput]:
"""
Check if this block execution needs human review and handle the review process.
Returns:
Tuple of (should_pause, input_data_to_use)
- should_pause: True if execution should be paused for review
- input_data_to_use: The input data to use (may be modified by reviewer)
"""
if not (
self.is_sensitive_action and execution_context.sensitive_action_safe_mode
):
return False, input_data
from backend.blocks.helpers.review import HITLReviewHelper
# Handle the review request and get decision
decision = await HITLReviewHelper.handle_review_decision(
input_data=input_data,
user_id=user_id,
node_id=node_id,
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
graph_version=graph_version,
block_name=self.name,
editable=True,
)
if decision is None:
# We're awaiting review - pause execution
return True, input_data
if not decision.should_proceed:
# Review was rejected, raise an error to stop execution
raise BlockExecutionError(
message=f"Block execution rejected by reviewer: {decision.message}",
block_name=self.name,
block_id=self.id,
)
# Review was approved - use the potentially modified data
# ReviewResult.data must be a dict for block inputs
reviewed_data = decision.review_result.data
if not isinstance(reviewed_data, dict):
raise BlockExecutionError(
message=f"Review data must be a dict for block input, got {type(reviewed_data).__name__}",
block_name=self.name,
block_id=self.id,
)
return False, reviewed_data
async def _execute(self, input_data: BlockInput, **kwargs) -> BlockOutput:
# Check for review requirement only if running within a graph execution context
# Direct block execution (e.g., from chat) skips the review process
has_graph_context = all(
key in kwargs
for key in (
"node_exec_id",
"graph_exec_id",
"graph_id",
"execution_context",
)
)
if has_graph_context:
should_pause, input_data = await self.is_block_exec_need_review(
input_data, **kwargs
)
if should_pause:
return
# Validate the input data (original or reviewer-modified) once
if error := self.input_schema.validate_data(input_data):
raise BlockInputError(
message=f"Unable to execute block with invalid input data: {error}",
block_name=self.name,
block_id=self.id,
)
# Use the validated input data
async for output_name, output_data in self.run(
self.input_schema(**{k: v for k, v in input_data.items() if v is not None}),
**kwargs,
):
if output_name == "error":
raise BlockExecutionError(
message=output_data, block_name=self.name, block_id=self.id
)
if self.block_type == BlockType.STANDARD and (
error := self.output_schema.validate_field(output_name, output_data)
):
raise BlockOutputError(
message=f"Block produced an invalid output data: {error}",
block_name=self.name,
block_id=self.id,
)
yield output_name, output_data
def is_triggered_by_event_type(
self, trigger_config: dict[str, Any], event_type: str
) -> bool:
if not self.webhook_config:
raise TypeError("This method can't be used on non-trigger blocks")
if not self.webhook_config.event_filter_input:
return True
event_filter = trigger_config.get(self.webhook_config.event_filter_input)
if not event_filter:
raise ValueError("Event filter is not configured on trigger")
return event_type in [
self.webhook_config.event_format.format(event=k)
for k in event_filter
if event_filter[k] is True
]
# Type alias for any block with standard input/output schemas
AnyBlockSchema: TypeAlias = Block[BlockSchemaInput, BlockSchemaOutput]

View File

@@ -0,0 +1,122 @@
import logging
import os
from backend.integrations.providers import ProviderName
from ._base import AnyBlockSchema
logger = logging.getLogger(__name__)
def is_block_auth_configured(
block_cls: type[AnyBlockSchema],
) -> bool:
"""
Check if a block has a valid authentication method configured at runtime.
For example if a block is an OAuth-only block and there env vars are not set,
do not show it in the UI.
"""
from backend.sdk.registry import AutoRegistry
# Create an instance to access input_schema
try:
block = block_cls()
except Exception as e:
# If we can't create a block instance, assume it's not OAuth-only
logger.error(f"Error creating block instance for {block_cls.__name__}: {e}")
return True
logger.debug(
f"Checking if block {block_cls.__name__} has a valid provider configured"
)
# Get all credential inputs from input schema
credential_inputs = block.input_schema.get_credentials_fields_info()
required_inputs = block.input_schema.get_required_fields()
if not credential_inputs:
logger.debug(
f"Block {block_cls.__name__} has no credential inputs - Treating as valid"
)
return True
# Check credential inputs
if len(required_inputs.intersection(credential_inputs.keys())) == 0:
logger.debug(
f"Block {block_cls.__name__} has only optional credential inputs"
" - will work without credentials configured"
)
# Check if the credential inputs for this block are correctly configured
for field_name, field_info in credential_inputs.items():
provider_names = field_info.provider
if not provider_names:
logger.warning(
f"Block {block_cls.__name__} "
f"has credential input '{field_name}' with no provider options"
" - Disabling"
)
return False
# If a field has multiple possible providers, each one needs to be usable to
# prevent breaking the UX
for _provider_name in provider_names:
provider_name = _provider_name.value
if provider_name in ProviderName.__members__.values():
logger.debug(
f"Block {block_cls.__name__} credential input '{field_name}' "
f"provider '{provider_name}' is part of the legacy provider system"
" - Treating as valid"
)
break
provider = AutoRegistry.get_provider(provider_name)
if not provider:
logger.warning(
f"Block {block_cls.__name__} credential input '{field_name}' "
f"refers to unknown provider '{provider_name}' - Disabling"
)
return False
# Check the provider's supported auth types
if field_info.supported_types != provider.supported_auth_types:
logger.warning(
f"Block {block_cls.__name__} credential input '{field_name}' "
f"has mismatched supported auth types (field <> Provider): "
f"{field_info.supported_types} != {provider.supported_auth_types}"
)
if not (supported_auth_types := provider.supported_auth_types):
# No auth methods are been configured for this provider
logger.warning(
f"Block {block_cls.__name__} credential input '{field_name}' "
f"provider '{provider_name}' "
"has no authentication methods configured - Disabling"
)
return False
# Check if provider supports OAuth
if "oauth2" in supported_auth_types:
# Check if OAuth environment variables are set
if (oauth_config := provider.oauth_config) and bool(
os.getenv(oauth_config.client_id_env_var)
and os.getenv(oauth_config.client_secret_env_var)
):
logger.debug(
f"Block {block_cls.__name__} credential input '{field_name}' "
f"provider '{provider_name}' is configured for OAuth"
)
else:
logger.error(
f"Block {block_cls.__name__} credential input '{field_name}' "
f"provider '{provider_name}' "
"is missing OAuth client ID or secret - Disabling"
)
return False
logger.debug(
f"Block {block_cls.__name__} credential input '{field_name}' is valid; "
f"supported credential types: {', '.join(field_info.supported_types)}"
)
return True

View File

@@ -1,7 +1,7 @@
import logging import logging
from typing import Any, Optional from typing import TYPE_CHECKING, Any, Optional
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockInput, BlockInput,
@@ -9,13 +9,15 @@ from backend.data.block import (
BlockSchema, BlockSchema,
BlockSchemaInput, BlockSchemaInput,
BlockType, BlockType,
get_block,
) )
from backend.data.execution import ExecutionContext, ExecutionStatus, NodesInputMasks from backend.data.execution import ExecutionContext, ExecutionStatus, NodesInputMasks
from backend.data.model import NodeExecutionStats, SchemaField from backend.data.model import NodeExecutionStats, SchemaField
from backend.util.json import validate_with_jsonschema from backend.util.json import validate_with_jsonschema
from backend.util.retry import func_retry from backend.util.retry import func_retry
if TYPE_CHECKING:
from backend.executor.utils import LogMetadata
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
@@ -124,9 +126,10 @@ class AgentExecutorBlock(Block):
graph_version: int, graph_version: int,
graph_exec_id: str, graph_exec_id: str,
user_id: str, user_id: str,
logger, logger: "LogMetadata",
) -> BlockOutput: ) -> BlockOutput:
from backend.blocks import get_block
from backend.data.execution import ExecutionEventType from backend.data.execution import ExecutionEventType
from backend.executor import utils as execution_utils from backend.executor import utils as execution_utils
@@ -198,7 +201,7 @@ class AgentExecutorBlock(Block):
self, self,
graph_exec_id: str, graph_exec_id: str,
user_id: str, user_id: str,
logger, logger: "LogMetadata",
) -> None: ) -> None:
from backend.executor import utils as execution_utils from backend.executor import utils as execution_utils

View File

@@ -1,5 +1,11 @@
from typing import Any from typing import Any
from backend.blocks._base import (
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.blocks.llm import ( from backend.blocks.llm import (
DEFAULT_LLM_MODEL, DEFAULT_LLM_MODEL,
TEST_CREDENTIALS, TEST_CREDENTIALS,
@@ -11,12 +17,6 @@ from backend.blocks.llm import (
LLMResponse, LLMResponse,
llm_call, llm_call,
) )
from backend.data.block import (
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import APIKeyCredentials, NodeExecutionStats, SchemaField from backend.data.model import APIKeyCredentials, NodeExecutionStats, SchemaField

View File

@@ -6,7 +6,7 @@ from pydantic import SecretStr
from replicate.client import Client as ReplicateClient from replicate.client import Client as ReplicateClient
from replicate.helpers import FileOutput from replicate.helpers import FileOutput
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -5,7 +5,12 @@ from pydantic import SecretStr
from replicate.client import Client as ReplicateClient from replicate.client import Client as ReplicateClient
from replicate.helpers import FileOutput from replicate.helpers import FileOutput
from backend.data.block import Block, BlockCategory, BlockSchemaInput, BlockSchemaOutput from backend.blocks._base import (
Block,
BlockCategory,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.execution import ExecutionContext from backend.data.execution import ExecutionContext
from backend.data.model import ( from backend.data.model import (
APIKeyCredentials, APIKeyCredentials,

View File

@@ -6,7 +6,7 @@ from typing import Literal
from pydantic import SecretStr from pydantic import SecretStr
from replicate.client import Client as ReplicateClient from replicate.client import Client as ReplicateClient
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -6,7 +6,7 @@ from typing import Literal
from pydantic import SecretStr from pydantic import SecretStr
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -1,3 +1,10 @@
from backend.blocks._base import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.blocks.apollo._api import ApolloClient from backend.blocks.apollo._api import ApolloClient
from backend.blocks.apollo._auth import ( from backend.blocks.apollo._auth import (
TEST_CREDENTIALS, TEST_CREDENTIALS,
@@ -10,13 +17,6 @@ from backend.blocks.apollo.models import (
PrimaryPhone, PrimaryPhone,
SearchOrganizationsRequest, SearchOrganizationsRequest,
) )
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import CredentialsField, SchemaField from backend.data.model import CredentialsField, SchemaField

View File

@@ -1,5 +1,12 @@
import asyncio import asyncio
from backend.blocks._base import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.blocks.apollo._api import ApolloClient from backend.blocks.apollo._api import ApolloClient
from backend.blocks.apollo._auth import ( from backend.blocks.apollo._auth import (
TEST_CREDENTIALS, TEST_CREDENTIALS,
@@ -14,13 +21,6 @@ from backend.blocks.apollo.models import (
SearchPeopleRequest, SearchPeopleRequest,
SenorityLevels, SenorityLevels,
) )
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import CredentialsField, SchemaField from backend.data.model import CredentialsField, SchemaField

View File

@@ -1,3 +1,10 @@
from backend.blocks._base import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.blocks.apollo._api import ApolloClient from backend.blocks.apollo._api import ApolloClient
from backend.blocks.apollo._auth import ( from backend.blocks.apollo._auth import (
TEST_CREDENTIALS, TEST_CREDENTIALS,
@@ -6,13 +13,6 @@ from backend.blocks.apollo._auth import (
ApolloCredentialsInput, ApolloCredentialsInput,
) )
from backend.blocks.apollo.models import Contact, EnrichPersonRequest from backend.blocks.apollo.models import Contact, EnrichPersonRequest
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import CredentialsField, SchemaField from backend.data.model import CredentialsField, SchemaField

View File

@@ -3,7 +3,7 @@ from typing import Optional
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from backend.data.block import BlockSchemaInput from backend.blocks._base import BlockSchemaInput
from backend.data.model import SchemaField, UserIntegrations from backend.data.model import SchemaField, UserIntegrations
from backend.integrations.ayrshare import AyrshareClient from backend.integrations.ayrshare import AyrshareClient
from backend.util.clients import get_database_manager_async_client from backend.util.clients import get_database_manager_async_client

View File

@@ -1,7 +1,7 @@
import enum import enum
from typing import Any from typing import Any
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -2,7 +2,7 @@ import os
import re import re
from typing import Type from typing import Type
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -1,7 +1,7 @@
from enum import Enum from enum import Enum
from typing import Any from typing import Any
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -1,12 +1,12 @@
import json import json
import shlex import shlex
import uuid import uuid
from typing import Literal, Optional from typing import TYPE_CHECKING, Literal, Optional
from e2b import AsyncSandbox as BaseAsyncSandbox from e2b import AsyncSandbox as BaseAsyncSandbox
from pydantic import BaseModel, SecretStr from pydantic import SecretStr
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,
@@ -20,6 +20,13 @@ from backend.data.model import (
SchemaField, SchemaField,
) )
from backend.integrations.providers import ProviderName from backend.integrations.providers import ProviderName
from backend.util.sandbox_files import (
SandboxFileOutput,
extract_and_store_sandbox_files,
)
if TYPE_CHECKING:
from backend.executor.utils import ExecutionContext
class ClaudeCodeExecutionError(Exception): class ClaudeCodeExecutionError(Exception):
@@ -174,22 +181,15 @@ class ClaudeCodeBlock(Block):
advanced=True, advanced=True,
) )
class FileOutput(BaseModel):
"""A file extracted from the sandbox."""
path: str
relative_path: str # Path relative to working directory (for GitHub, etc.)
name: str
content: str
class Output(BlockSchemaOutput): class Output(BlockSchemaOutput):
response: str = SchemaField( response: str = SchemaField(
description="The output/response from Claude Code execution" description="The output/response from Claude Code execution"
) )
files: list["ClaudeCodeBlock.FileOutput"] = SchemaField( files: list[SandboxFileOutput] = SchemaField(
description=( description=(
"List of text files created/modified by Claude Code during this execution. " "List of text files created/modified by Claude Code during this execution. "
"Each file has 'path', 'relative_path', 'name', and 'content' fields." "Each file has 'path', 'relative_path', 'name', 'content', and 'workspace_ref' fields. "
"workspace_ref contains a workspace:// URI if the file was stored to workspace."
) )
) )
conversation_history: str = SchemaField( conversation_history: str = SchemaField(
@@ -252,6 +252,7 @@ class ClaudeCodeBlock(Block):
"relative_path": "index.html", "relative_path": "index.html",
"name": "index.html", "name": "index.html",
"content": "<html>Hello World</html>", "content": "<html>Hello World</html>",
"workspace_ref": None,
} }
], ],
), ),
@@ -267,11 +268,12 @@ class ClaudeCodeBlock(Block):
"execute_claude_code": lambda *args, **kwargs: ( "execute_claude_code": lambda *args, **kwargs: (
"Created index.html with hello world content", # response "Created index.html with hello world content", # response
[ [
ClaudeCodeBlock.FileOutput( SandboxFileOutput(
path="/home/user/index.html", path="/home/user/index.html",
relative_path="index.html", relative_path="index.html",
name="index.html", name="index.html",
content="<html>Hello World</html>", content="<html>Hello World</html>",
workspace_ref=None,
) )
], # files ], # files
"User: Create a hello world HTML file\n" "User: Create a hello world HTML file\n"
@@ -294,7 +296,8 @@ class ClaudeCodeBlock(Block):
existing_sandbox_id: str, existing_sandbox_id: str,
conversation_history: str, conversation_history: str,
dispose_sandbox: bool, dispose_sandbox: bool,
) -> tuple[str, list["ClaudeCodeBlock.FileOutput"], str, str, str]: execution_context: "ExecutionContext",
) -> tuple[str, list[SandboxFileOutput], str, str, str]:
""" """
Execute Claude Code in an E2B sandbox. Execute Claude Code in an E2B sandbox.
@@ -449,14 +452,18 @@ class ClaudeCodeBlock(Block):
else: else:
new_conversation_history = turn_entry new_conversation_history = turn_entry
# Extract files created/modified during this run # Extract files created/modified during this run and store to workspace
files = await self._extract_files( sandbox_files = await extract_and_store_sandbox_files(
sandbox, working_directory, start_timestamp sandbox=sandbox,
working_directory=working_directory,
execution_context=execution_context,
since_timestamp=start_timestamp,
text_only=True,
) )
return ( return (
response, response,
files, sandbox_files, # Already SandboxFileOutput objects
new_conversation_history, new_conversation_history,
current_session_id, current_session_id,
sandbox_id, sandbox_id,
@@ -471,140 +478,6 @@ class ClaudeCodeBlock(Block):
if dispose_sandbox and sandbox: if dispose_sandbox and sandbox:
await sandbox.kill() await sandbox.kill()
async def _extract_files(
self,
sandbox: BaseAsyncSandbox,
working_directory: str,
since_timestamp: str | None = None,
) -> list["ClaudeCodeBlock.FileOutput"]:
"""
Extract text files created/modified during this Claude Code execution.
Args:
sandbox: The E2B sandbox instance
working_directory: Directory to search for files
since_timestamp: ISO timestamp - only return files modified after this time
Returns:
List of FileOutput objects with path, relative_path, name, and content
"""
files: list[ClaudeCodeBlock.FileOutput] = []
# Text file extensions we can safely read as text
text_extensions = {
".txt",
".md",
".html",
".htm",
".css",
".js",
".ts",
".jsx",
".tsx",
".json",
".xml",
".yaml",
".yml",
".toml",
".ini",
".cfg",
".conf",
".py",
".rb",
".php",
".java",
".c",
".cpp",
".h",
".hpp",
".cs",
".go",
".rs",
".swift",
".kt",
".scala",
".sh",
".bash",
".zsh",
".sql",
".graphql",
".env",
".gitignore",
".dockerfile",
"Dockerfile",
".vue",
".svelte",
".astro",
".mdx",
".rst",
".tex",
".csv",
".log",
}
try:
# List files recursively using find command
# Exclude node_modules and .git directories, but allow hidden files
# like .env and .gitignore (they're filtered by text_extensions later)
# Filter by timestamp to only get files created/modified during this run
safe_working_dir = shlex.quote(working_directory)
timestamp_filter = ""
if since_timestamp:
timestamp_filter = f"-newermt {shlex.quote(since_timestamp)} "
find_result = await sandbox.commands.run(
f"find {safe_working_dir} -type f "
f"{timestamp_filter}"
f"-not -path '*/node_modules/*' "
f"-not -path '*/.git/*' "
f"2>/dev/null"
)
if find_result.stdout:
for file_path in find_result.stdout.strip().split("\n"):
if not file_path:
continue
# Check if it's a text file we can read
is_text = any(
file_path.endswith(ext) for ext in text_extensions
) or file_path.endswith("Dockerfile")
if is_text:
try:
content = await sandbox.files.read(file_path)
# Handle bytes or string
if isinstance(content, bytes):
content = content.decode("utf-8", errors="replace")
# Extract filename from path
file_name = file_path.split("/")[-1]
# Calculate relative path by stripping working directory
relative_path = file_path
if file_path.startswith(working_directory):
relative_path = file_path[len(working_directory) :]
# Remove leading slash if present
if relative_path.startswith("/"):
relative_path = relative_path[1:]
files.append(
ClaudeCodeBlock.FileOutput(
path=file_path,
relative_path=relative_path,
name=file_name,
content=content,
)
)
except Exception:
# Skip files that can't be read
pass
except Exception:
# If file extraction fails, return empty results
pass
return files
def _escape_prompt(self, prompt: str) -> str: def _escape_prompt(self, prompt: str) -> str:
"""Escape the prompt for safe shell execution.""" """Escape the prompt for safe shell execution."""
# Use single quotes and escape any single quotes in the prompt # Use single quotes and escape any single quotes in the prompt
@@ -617,6 +490,7 @@ class ClaudeCodeBlock(Block):
*, *,
e2b_credentials: APIKeyCredentials, e2b_credentials: APIKeyCredentials,
anthropic_credentials: APIKeyCredentials, anthropic_credentials: APIKeyCredentials,
execution_context: "ExecutionContext",
**kwargs, **kwargs,
) -> BlockOutput: ) -> BlockOutput:
try: try:
@@ -637,6 +511,7 @@ class ClaudeCodeBlock(Block):
existing_sandbox_id=input_data.sandbox_id, existing_sandbox_id=input_data.sandbox_id,
conversation_history=input_data.conversation_history, conversation_history=input_data.conversation_history,
dispose_sandbox=input_data.dispose_sandbox, dispose_sandbox=input_data.dispose_sandbox,
execution_context=execution_context,
) )
yield "response", response yield "response", response

View File

@@ -1,12 +1,12 @@
from enum import Enum from enum import Enum
from typing import Any, Literal, Optional from typing import TYPE_CHECKING, Any, Literal, Optional
from e2b_code_interpreter import AsyncSandbox from e2b_code_interpreter import AsyncSandbox
from e2b_code_interpreter import Result as E2BExecutionResult from e2b_code_interpreter import Result as E2BExecutionResult
from e2b_code_interpreter.charts import Chart as E2BExecutionResultChart from e2b_code_interpreter.charts import Chart as E2BExecutionResultChart
from pydantic import BaseModel, Field, JsonValue, SecretStr from pydantic import BaseModel, Field, JsonValue, SecretStr
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,
@@ -20,6 +20,13 @@ from backend.data.model import (
SchemaField, SchemaField,
) )
from backend.integrations.providers import ProviderName from backend.integrations.providers import ProviderName
from backend.util.sandbox_files import (
SandboxFileOutput,
extract_and_store_sandbox_files,
)
if TYPE_CHECKING:
from backend.executor.utils import ExecutionContext
TEST_CREDENTIALS = APIKeyCredentials( TEST_CREDENTIALS = APIKeyCredentials(
id="01234567-89ab-cdef-0123-456789abcdef", id="01234567-89ab-cdef-0123-456789abcdef",
@@ -85,6 +92,9 @@ class CodeExecutionResult(MainCodeExecutionResult):
class BaseE2BExecutorMixin: class BaseE2BExecutorMixin:
"""Shared implementation methods for E2B executor blocks.""" """Shared implementation methods for E2B executor blocks."""
# Default working directory in E2B sandboxes
WORKING_DIR = "/home/user"
async def execute_code( async def execute_code(
self, self,
api_key: str, api_key: str,
@@ -95,14 +105,21 @@ class BaseE2BExecutorMixin:
timeout: Optional[int] = None, timeout: Optional[int] = None,
sandbox_id: Optional[str] = None, sandbox_id: Optional[str] = None,
dispose_sandbox: bool = False, dispose_sandbox: bool = False,
execution_context: Optional["ExecutionContext"] = None,
extract_files: bool = False,
): ):
""" """
Unified code execution method that handles all three use cases: Unified code execution method that handles all three use cases:
1. Create new sandbox and execute (ExecuteCodeBlock) 1. Create new sandbox and execute (ExecuteCodeBlock)
2. Create new sandbox, execute, and return sandbox_id (InstantiateCodeSandboxBlock) 2. Create new sandbox, execute, and return sandbox_id (InstantiateCodeSandboxBlock)
3. Connect to existing sandbox and execute (ExecuteCodeStepBlock) 3. Connect to existing sandbox and execute (ExecuteCodeStepBlock)
Args:
extract_files: If True and execution_context provided, extract files
created/modified during execution and store to workspace.
""" # noqa """ # noqa
sandbox = None sandbox = None
files: list[SandboxFileOutput] = []
try: try:
if sandbox_id: if sandbox_id:
# Connect to existing sandbox (ExecuteCodeStepBlock case) # Connect to existing sandbox (ExecuteCodeStepBlock case)
@@ -118,6 +135,12 @@ class BaseE2BExecutorMixin:
for cmd in setup_commands: for cmd in setup_commands:
await sandbox.commands.run(cmd) await sandbox.commands.run(cmd)
# Capture timestamp before execution to scope file extraction
start_timestamp = None
if extract_files:
ts_result = await sandbox.commands.run("date -u +%Y-%m-%dT%H:%M:%S")
start_timestamp = ts_result.stdout.strip() if ts_result.stdout else None
# Execute the code # Execute the code
execution = await sandbox.run_code( execution = await sandbox.run_code(
code, code,
@@ -133,7 +156,24 @@ class BaseE2BExecutorMixin:
stdout_logs = "".join(execution.logs.stdout) stdout_logs = "".join(execution.logs.stdout)
stderr_logs = "".join(execution.logs.stderr) stderr_logs = "".join(execution.logs.stderr)
return results, text_output, stdout_logs, stderr_logs, sandbox.sandbox_id # Extract files created/modified during this execution
if extract_files and execution_context:
files = await extract_and_store_sandbox_files(
sandbox=sandbox,
working_directory=self.WORKING_DIR,
execution_context=execution_context,
since_timestamp=start_timestamp,
text_only=False, # Include binary files too
)
return (
results,
text_output,
stdout_logs,
stderr_logs,
sandbox.sandbox_id,
files,
)
finally: finally:
# Dispose of sandbox if requested to reduce usage costs # Dispose of sandbox if requested to reduce usage costs
if dispose_sandbox and sandbox: if dispose_sandbox and sandbox:
@@ -238,6 +278,12 @@ class ExecuteCodeBlock(Block, BaseE2BExecutorMixin):
description="Standard output logs from execution" description="Standard output logs from execution"
) )
stderr_logs: str = SchemaField(description="Standard error logs from execution") stderr_logs: str = SchemaField(description="Standard error logs from execution")
files: list[SandboxFileOutput] = SchemaField(
description=(
"Files created or modified during execution. "
"Each file has path, name, content, and workspace_ref (if stored)."
),
)
def __init__(self): def __init__(self):
super().__init__( super().__init__(
@@ -259,23 +305,30 @@ class ExecuteCodeBlock(Block, BaseE2BExecutorMixin):
("results", []), ("results", []),
("response", "Hello World"), ("response", "Hello World"),
("stdout_logs", "Hello World\n"), ("stdout_logs", "Hello World\n"),
("files", []),
], ],
test_mock={ test_mock={
"execute_code": lambda api_key, code, language, template_id, setup_commands, timeout, dispose_sandbox: ( # noqa "execute_code": lambda api_key, code, language, template_id, setup_commands, timeout, dispose_sandbox, execution_context, extract_files: ( # noqa
[], # results [], # results
"Hello World", # text_output "Hello World", # text_output
"Hello World\n", # stdout_logs "Hello World\n", # stdout_logs
"", # stderr_logs "", # stderr_logs
"sandbox_id", # sandbox_id "sandbox_id", # sandbox_id
[], # files
), ),
}, },
) )
async def run( async def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs self,
input_data: Input,
*,
credentials: APIKeyCredentials,
execution_context: "ExecutionContext",
**kwargs,
) -> BlockOutput: ) -> BlockOutput:
try: try:
results, text_output, stdout, stderr, _ = await self.execute_code( results, text_output, stdout, stderr, _, files = await self.execute_code(
api_key=credentials.api_key.get_secret_value(), api_key=credentials.api_key.get_secret_value(),
code=input_data.code, code=input_data.code,
language=input_data.language, language=input_data.language,
@@ -283,6 +336,8 @@ class ExecuteCodeBlock(Block, BaseE2BExecutorMixin):
setup_commands=input_data.setup_commands, setup_commands=input_data.setup_commands,
timeout=input_data.timeout, timeout=input_data.timeout,
dispose_sandbox=input_data.dispose_sandbox, dispose_sandbox=input_data.dispose_sandbox,
execution_context=execution_context,
extract_files=True,
) )
# Determine result object shape & filter out empty formats # Determine result object shape & filter out empty formats
@@ -296,6 +351,8 @@ class ExecuteCodeBlock(Block, BaseE2BExecutorMixin):
yield "stdout_logs", stdout yield "stdout_logs", stdout
if stderr: if stderr:
yield "stderr_logs", stderr yield "stderr_logs", stderr
# Always yield files (empty list if none)
yield "files", [f.model_dump() for f in files]
except Exception as e: except Exception as e:
yield "error", str(e) yield "error", str(e)
@@ -393,6 +450,7 @@ class InstantiateCodeSandboxBlock(Block, BaseE2BExecutorMixin):
"Hello World\n", # stdout_logs "Hello World\n", # stdout_logs
"", # stderr_logs "", # stderr_logs
"sandbox_id", # sandbox_id "sandbox_id", # sandbox_id
[], # files
), ),
}, },
) )
@@ -401,7 +459,7 @@ class InstantiateCodeSandboxBlock(Block, BaseE2BExecutorMixin):
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput: ) -> BlockOutput:
try: try:
_, text_output, stdout, stderr, sandbox_id = await self.execute_code( _, text_output, stdout, stderr, sandbox_id, _ = await self.execute_code(
api_key=credentials.api_key.get_secret_value(), api_key=credentials.api_key.get_secret_value(),
code=input_data.setup_code, code=input_data.setup_code,
language=input_data.language, language=input_data.language,
@@ -500,6 +558,7 @@ class ExecuteCodeStepBlock(Block, BaseE2BExecutorMixin):
"Hello World\n", # stdout_logs "Hello World\n", # stdout_logs
"", # stderr_logs "", # stderr_logs
sandbox_id, # sandbox_id sandbox_id, # sandbox_id
[], # files
), ),
}, },
) )
@@ -508,7 +567,7 @@ class ExecuteCodeStepBlock(Block, BaseE2BExecutorMixin):
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput: ) -> BlockOutput:
try: try:
results, text_output, stdout, stderr, _ = await self.execute_code( results, text_output, stdout, stderr, _, _ = await self.execute_code(
api_key=credentials.api_key.get_secret_value(), api_key=credentials.api_key.get_secret_value(),
code=input_data.step_code, code=input_data.step_code,
language=input_data.language, language=input_data.language,

View File

@@ -1,6 +1,6 @@
import re import re
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -6,7 +6,7 @@ from openai import AsyncOpenAI
from openai.types.responses import Response as OpenAIResponse from openai.types.responses import Response as OpenAIResponse
from pydantic import SecretStr from pydantic import SecretStr
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -1,6 +1,6 @@
from pydantic import BaseModel from pydantic import BaseModel
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockManualWebhookConfig, BlockManualWebhookConfig,

View File

@@ -1,4 +1,4 @@
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -1,6 +1,6 @@
from typing import Any, List from typing import Any, List
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -1,6 +1,6 @@
import codecs import codecs
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -8,7 +8,7 @@ from typing import Any, Literal, cast
import discord import discord
from pydantic import SecretStr from pydantic import SecretStr
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -2,7 +2,7 @@
Discord OAuth-based blocks. Discord OAuth-based blocks.
""" """
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -7,7 +7,7 @@ from typing import Literal
from pydantic import BaseModel, ConfigDict, SecretStr from pydantic import BaseModel, ConfigDict, SecretStr
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -2,7 +2,7 @@
import codecs import codecs
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -8,7 +8,7 @@ which provides access to LinkedIn profile data and related information.
import logging import logging
from typing import Optional from typing import Optional
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -3,6 +3,13 @@ import logging
from enum import Enum from enum import Enum
from typing import Any from typing import Any
from backend.blocks._base import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.blocks.fal._auth import ( from backend.blocks.fal._auth import (
TEST_CREDENTIALS, TEST_CREDENTIALS,
TEST_CREDENTIALS_INPUT, TEST_CREDENTIALS_INPUT,
@@ -10,13 +17,6 @@ from backend.blocks.fal._auth import (
FalCredentialsField, FalCredentialsField,
FalCredentialsInput, FalCredentialsInput,
) )
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.execution import ExecutionContext from backend.data.execution import ExecutionContext
from backend.data.model import SchemaField from backend.data.model import SchemaField
from backend.util.file import store_media_file from backend.util.file import store_media_file

View File

@@ -5,7 +5,7 @@ from pydantic import SecretStr
from replicate.client import Client as ReplicateClient from replicate.client import Client as ReplicateClient
from replicate.helpers import FileOutput from replicate.helpers import FileOutput
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -3,7 +3,7 @@ from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -5,7 +5,7 @@ from typing import Optional
from typing_extensions import TypedDict from typing_extensions import TypedDict
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -3,7 +3,7 @@ from urllib.parse import urlparse
from typing_extensions import TypedDict from typing_extensions import TypedDict
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -2,7 +2,7 @@ import re
from typing_extensions import TypedDict from typing_extensions import TypedDict
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -2,7 +2,7 @@ import base64
from typing_extensions import TypedDict from typing_extensions import TypedDict
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -4,7 +4,7 @@ from typing import Any, List, Optional
from typing_extensions import TypedDict from typing_extensions import TypedDict
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -3,7 +3,7 @@ from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -4,7 +4,7 @@ from pathlib import Path
from pydantic import BaseModel from pydantic import BaseModel
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -8,7 +8,7 @@ from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build from googleapiclient.discovery import build
from pydantic import BaseModel from pydantic import BaseModel
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -7,14 +7,14 @@ from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build from googleapiclient.discovery import build
from gravitas_md2gdocs import to_requests from gravitas_md2gdocs import to_requests
from backend.blocks.google._drive import GoogleDriveFile, GoogleDriveFileField from backend.blocks._base import (
from backend.data.block import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,
BlockSchemaInput, BlockSchemaInput,
BlockSchemaOutput, BlockSchemaOutput,
) )
from backend.blocks.google._drive import GoogleDriveFile, GoogleDriveFileField
from backend.data.model import SchemaField from backend.data.model import SchemaField
from backend.util.settings import Settings from backend.util.settings import Settings

View File

@@ -14,7 +14,7 @@ from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build from googleapiclient.discovery import build
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -7,14 +7,14 @@ from enum import Enum
from google.oauth2.credentials import Credentials from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build from googleapiclient.discovery import build
from backend.blocks.google._drive import GoogleDriveFile, GoogleDriveFileField from backend.blocks._base import (
from backend.data.block import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,
BlockSchemaInput, BlockSchemaInput,
BlockSchemaOutput, BlockSchemaOutput,
) )
from backend.blocks.google._drive import GoogleDriveFile, GoogleDriveFileField
from backend.data.model import SchemaField from backend.data.model import SchemaField
from backend.util.settings import Settings from backend.util.settings import Settings

View File

@@ -3,7 +3,7 @@ from typing import Literal
import googlemaps import googlemaps
from pydantic import BaseModel, SecretStr from pydantic import BaseModel, SecretStr
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

View File

@@ -9,9 +9,7 @@ from typing import Any, Optional
from prisma.enums import ReviewStatus from prisma.enums import ReviewStatus
from pydantic import BaseModel from pydantic import BaseModel
from backend.data.execution import ExecutionStatus
from backend.data.human_review import ReviewResult from backend.data.human_review import ReviewResult
from backend.executor.manager import async_update_node_execution_status
from backend.util.clients import get_database_manager_async_client from backend.util.clients import get_database_manager_async_client
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -43,6 +41,8 @@ class HITLReviewHelper:
@staticmethod @staticmethod
async def update_node_execution_status(**kwargs) -> None: async def update_node_execution_status(**kwargs) -> None:
"""Update the execution status of a node.""" """Update the execution status of a node."""
from backend.executor.manager import async_update_node_execution_status
await async_update_node_execution_status( await async_update_node_execution_status(
db_client=get_database_manager_async_client(), **kwargs db_client=get_database_manager_async_client(), **kwargs
) )
@@ -88,12 +88,13 @@ class HITLReviewHelper:
Raises: Raises:
Exception: If review creation or status update fails Exception: If review creation or status update fails
""" """
from backend.data.execution import ExecutionStatus
# Note: Safe mode checks (human_in_the_loop_safe_mode, sensitive_action_safe_mode) # Note: Safe mode checks (human_in_the_loop_safe_mode, sensitive_action_safe_mode)
# are handled by the caller: # are handled by the caller:
# - HITL blocks check human_in_the_loop_safe_mode in their run() method # - HITL blocks check human_in_the_loop_safe_mode in their run() method
# - Sensitive action blocks check sensitive_action_safe_mode in is_block_exec_need_review() # - Sensitive action blocks check sensitive_action_safe_mode in is_block_exec_need_review()
# This function only handles checking for existing approvals. # This function only handles checking for existing approvals.
# Check if this node has already been approved (normal or auto-approval) # Check if this node has already been approved (normal or auto-approval)
if approval_result := await HITLReviewHelper.check_approval( if approval_result := await HITLReviewHelper.check_approval(
node_exec_id=node_exec_id, node_exec_id=node_exec_id,

View File

@@ -8,7 +8,7 @@ from typing import Literal
import aiofiles import aiofiles
from pydantic import SecretStr from pydantic import SecretStr
from backend.data.block import ( from backend.blocks._base import (
Block, Block,
BlockCategory, BlockCategory,
BlockOutput, BlockOutput,

Some files were not shown because too many files have changed in this diff Show More