Compare commits

..

28 Commits

Author SHA1 Message Date
Zamil Majdy
8b8e1df739 fix(backend/chat): Auto-expire stale running tasks to unblock sessions
Tasks stuck in "running" status beyond stream_timeout (300s) are now
auto-marked as failed when looked up, preventing zombie tasks from
blocking the session indefinitely.
2026-02-10 15:35:43 +04:00
Zamil Majdy
602a0a4fb1 fix(backend/chat): Strip tool call noise from conversation history context 2026-02-10 14:11:27 +04:00
Zamil Majdy
8d7d531ae0 refactor(backend/chat): Remove unused max_context_messages config 2026-02-10 13:57:33 +04:00
Zamil Majdy
43153a12e0 fix(backend/chat): Remove manual context truncation from SDK path, let SDK handle compaction 2026-02-10 13:52:49 +04:00
Zamil Majdy
587e11c60a refactor(backend/chat): Extract MCP server name constants to avoid hardcoded strings 2026-02-10 12:12:08 +04:00
Zamil Majdy
57da545e02 Merge branch 'dev' into feat/copitlot-claude-code 2026-02-10 12:10:35 +04:00
Otto
81f8290f01 debug(backend/db): Add diagnostic logging for vector type errors (#12024)
Adds diagnostic logging when the `type vector does not exist` error
occurs in raw SQL queries.

## Problem

We're seeing intermittent "type vector does not exist" errors on
dev-behave ([Sentry
issue](https://significant-gravitas.sentry.io/issues/7205929979/)). The
pgvector extension should be in the search_path, but occasionally
queries fail to resolve the vector type.

## Solution

When a query fails with this specific error, we now log:
- `SHOW search_path` - what schemas are being searched
- `SELECT current_schema()` - the active schema
- `SELECT current_user, session_user, current_database()` - connection
context

This diagnostic info will help identify why the vector extension isn't
visible in certain cases.

## Changes

- Added `_log_vector_error_diagnostics()` helper function in
`backend/data/db.py`
- Wrapped SQL execution in try/except to catch and diagnose vector type
errors
- Original exception is re-raised after logging (no behavior change)

## Testing

This is observational/diagnostic code. It will be validated by waiting
for the error to occur naturally on dev and checking the logs.

## Rollout

Once we've captured diagnostic logs and identified the root cause, this
logging can be removed or reduced in verbosity.
2026-02-10 07:35:13 +00:00
Zamil Majdy
626980bf27 Merge branch 'dev' into feat/copitlot-claude-code 2026-02-09 19:26:52 +04:00
Reinier van der Leer
6467f6734f debug(backend/chat): Add timing logging to chat stream generation mechanism (#12019)
[SECRT-1912: Investigate & eliminate chat session start
latency](https://linear.app/autogpt/issue/SECRT-1912)

### Changes 🏗️

- Add timing logs to `backend.api.features.chat` in `routes.py`,
`service.py`, and `stream_registry.py`
- Remove unneeded DB join in `create_chat_session`

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - CI checks
2026-02-09 14:05:29 +00:00
Otto
5a30d11416 refactor(copilot): Code cleanup and deduplication (#11950)
## Summary

Code cleanup of the AI Copilot codebase - rebased onto latest dev.

## Changes

### New Files
- `backend/util/validation.py` - UUID validation helpers
- `backend/api/features/chat/tools/helpers.py` - Shared tool utilities

### Credential Matching Consolidation  
- Added shared utilities to `utils.py`
- Refactored `run_block._check_block_credentials()` with discriminator
support
- Extracted `_resolve_discriminated_credentials()` for multi-provider
handling

### Routes Cleanup
- Extracted `_create_stream_generator()` and `SSE_RESPONSE_HEADERS`

### Tool Files Cleanup
- Updated `run_agent.py` and `run_block.py` to use shared helpers

**WIP** - This PR will be updated incrementally.
2026-02-09 13:43:55 +00:00
Bently
1f4105e8f9 fix(frontend): Handle object values in FileInput component (#11948)
Fixes
[#11800](https://github.com/Significant-Gravitas/AutoGPT/issues/11800)

## Problem
The FileInput component crashed with `TypeError: e.startsWith is not a
function` when the value was an object (from external API) instead of a
string.

## Example Input Object
When using the external API
(`/external-api/v1/graphs/{id}/execute/{version}`), file inputs can be
passed as objects:

```json
{
  "node_input": {
    "input_image": {
      "name": "image.jpeg",
      "type": "image/jpeg",
      "size": 131147,
      "data": "/9j/4QAW..."
    }
  }
}
```

## Changes
- Updated `getFileLabelFromValue()` to handle object format: `{ name,
type, size, data }`
- Added type guards for string vs object values
- Graceful fallback for edge cases (null, undefined, empty object)

## Test cases verified
- Object with name: returns filename
- Object with type only: extracts and formats MIME type
- String data URI: parses correctly
- String file path: extracts extension
- Edge cases: returns "File" fallback
2026-02-09 10:25:08 +00:00
Bently
caf9ff34e6 fix(backend): Handle stale RabbitMQ channels on connection drop (#11929)
### Changes 🏗️

Fixes
[**AUTOGPT-SERVER-1TN**](https://autoagpt.sentry.io/issues/?query=AUTOGPT-SERVER-1TN)
(~39K events since Feb 2025) and related connection issues
**6JC/6JD/6JE/6JF** (~6K combined).

#### Problem

When the RabbitMQ TCP connection drops (network blip, server restart,
etc.):

1. `connect_robust` (aio_pika) automatically reconnects the underlying
AMQP connection
2. But `AsyncRabbitMQ._channel` still references the **old dead
channel**
3. `is_ready` checks `not self._channel.is_closed` — but the channel
object doesn't know the transport is gone
4. `publish_message` tries to use the stale channel →
`ChannelInvalidStateError: No active transport in channel`
5. `@func_retry` retries 5 times, but each retry hits the same stale
channel (it passes `is_ready`)

This means every connection drop generates errors until the process is
restarted.

#### Fix

**New `_ensure_channel()` helper** that resets stale channels before
reconnecting, so `connect()` creates a fresh one instead of
short-circuiting on `is_connected`.

**Explicit `ChannelInvalidStateError` handling in `publish_message`:**
1. First attempt uses `_ensure_channel()` (handles normal staleness)
2. If publish throws `ChannelInvalidStateError`, does a full reconnect
(resets both `_channel` and `_connection`) and retries once
3. `@func_retry` provides additional retry resilience on top

**Simplified `get_channel()`** to use the same resilient helper.

**1 file changed, 62 insertions, 24 deletions.**

#### Impact
- Eliminates ~39K `ChannelInvalidStateError` Sentry events
- RabbitMQ operations self-heal after connection drops without process
restart
- Related transport EOF errors (6JC/6JD/6JE/6JF) should also reduce
2026-02-09 10:24:08 +00:00
Swifty
e42b27af3c Merge branch 'dev' into feat/copitlot-claude-code 2026-02-09 09:12:23 +01:00
Zamil Majdy
34face15d2 fix lock 2026-02-09 11:45:59 +04:00
Nicholas Tindle
e8fc8ee623 fix(backend): filter graph-only blocks from CoPilot's find_block results (#11892)
Filters out blocks that are unsuitable for standalone execution from
CoPilot's block search and execution. These blocks serve graph-specific
purposes and will either fail, hang, or confuse users when run outside
of a graph context.

**Important:** This does NOT affect the Builder UI which uses
`load_all_blocks()` directly.

### Changes 🏗️

- **find_block.py**: Added `EXCLUDED_BLOCK_TYPES` and
`EXCLUDED_BLOCK_IDS` constants, skip excluded blocks in search results
- **run_block.py**: Added execution guard that returns clear error
message for excluded blocks
- **content_handlers.py**: Added filtering to
`BlockHandler.get_missing_items()` and `get_stats()` to prevent indexing
excluded blocks

**Excluded by BlockType:**
| BlockType | Reason |
|-----------|--------|
| `INPUT` | Graph interface definition - data enters via chat, not graph
inputs |
| `OUTPUT` | Graph interface definition - data exits via chat, not graph
outputs |
| `WEBHOOK` | Wait for external events - would hang forever in CoPilot |
| `WEBHOOK_MANUAL` | Same as WEBHOOK |
| `NOTE` | Visual annotation only - no runtime behavior |
| `HUMAN_IN_THE_LOOP` | Pauses for human approval - CoPilot IS
human-in-the-loop |
| `AGENT` | AgentExecutorBlock requires graph context - use `run_agent`
tool instead |

**Excluded by ID:**
| Block | Reason |
|-------|--------|
| `SmartDecisionMakerBlock` | Dynamically discovers downstream blocks
via graph topology |

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
- [ ] Search for "input" in CoPilot - should NOT return AgentInputBlock
variants
- [ ] Search for "output" in CoPilot - should NOT return
AgentOutputBlock
- [ ] Search for "webhook" in CoPilot - should NOT return trigger blocks
- [ ] Search for "human" in CoPilot - should NOT return
HumanInTheLoopBlock
- [ ] Search for "decision" in CoPilot - should NOT return
SmartDecisionMakerBlock
- [ ] Verify functional blocks still appear (e.g., "email", "http",
"text")
  - [ ] Verify Builder UI still shows ALL blocks (no regression)

#### For configuration changes:
- [x] `.env.default` is updated or already compatible with my changes
- [x] `docker-compose.yml` is updated or already compatible with my
changes
- [x] I have included a list of my configuration changes in the PR
description (under **Changes**)

No configuration changes required.

---

Resolves: [SECRT-1831](https://linear.app/autogpt/issue/SECRT-1831)

🤖 Generated with [Claude Code](https://claude.ai/code)

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> **Low Risk**
> Behavior change is limited to CoPilot’s block discovery/execution
guards and is covered by new tests; main risk is inadvertently excluding
a block that should be runnable.
> 
> **Overview**
> CoPilot now **filters out graph-only blocks** from `find_block`
results and prevents them from being executed via `run_block`, returning
a clear error when a user attempts to run an excluded block.
> 
> `find_block` introduces explicit exclusion lists (by `BlockType` and a
specific block ID), over-fetches search results to maintain up to 10
usable matches after filtering, and adds debug logging when results are
reduced. New unit tests cover both the search filtering and the
`run_block` execution guard; a minor cleanup removes an unused `pytest`
import in `execution_queue_test.py`.
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
bc50755dcf. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com>
Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
Co-authored-by: Otto <otto@agpt.co>
2026-02-09 07:19:43 +00:00
dependabot[bot]
1a16e203b8 chore(deps): Bump actions/setup-node from 4 to 6 (#11213)
Bumps [actions/setup-node](https://github.com/actions/setup-node) from 4
to 6.
<details>
<summary>Release notes</summary>
<p><em>Sourced from <a
href="https://github.com/actions/setup-node/releases">actions/setup-node's
releases</a>.</em></p>
<blockquote>
<h2>v6.0.0</h2>
<h2>What's Changed</h2>
<p><strong>Breaking Changes</strong></p>
<ul>
<li>Limit automatic caching to npm, update workflows and documentation
by <a
href="https://github.com/priyagupta108"><code>@​priyagupta108</code></a>
in <a
href="https://redirect.github.com/actions/setup-node/pull/1374">actions/setup-node#1374</a></li>
</ul>
<p><strong>Dependency Upgrades</strong></p>
<ul>
<li>Upgrade ts-jest from 29.1.2 to 29.4.1 and document breaking changes
in v5 by <a
href="https://github.com/dependabot"><code>@​dependabot</code></a>[bot]
in <a
href="https://redirect.github.com/actions/setup-node/pull/1336">#1336</a></li>
<li>Upgrade prettier from 2.8.8 to 3.6.2 by <a
href="https://github.com/dependabot"><code>@​dependabot</code></a>[bot]
in <a
href="https://redirect.github.com/actions/setup-node/pull/1334">#1334</a></li>
<li>Upgrade actions/publish-action from 0.3.0 to 0.4.0 by <a
href="https://github.com/dependabot"><code>@​dependabot</code></a>[bot]
in <a
href="https://redirect.github.com/actions/setup-node/pull/1362">#1362</a></li>
</ul>
<p><strong>Full Changelog</strong>: <a
href="https://github.com/actions/setup-node/compare/v5...v6.0.0">https://github.com/actions/setup-node/compare/v5...v6.0.0</a></p>
<h2>v5.0.0</h2>
<h2>What's Changed</h2>
<h3>Breaking Changes</h3>
<ul>
<li>Enhance caching in setup-node with automatic package manager
detection by <a
href="https://github.com/priya-kinthali"><code>@​priya-kinthali</code></a>
in <a
href="https://redirect.github.com/actions/setup-node/pull/1348">actions/setup-node#1348</a></li>
</ul>
<p>This update, introduces automatic caching when a valid
<code>packageManager</code> field is present in your
<code>package.json</code>. This aims to improve workflow performance and
make dependency management more seamless.
To disable this automatic caching, set <code>package-manager-cache:
false</code></p>
<pre lang="yaml"><code>steps:
- uses: actions/checkout@v5
- uses: actions/setup-node@v5
  with:
    package-manager-cache: false
</code></pre>
<ul>
<li>Upgrade action to use node24 by <a
href="https://github.com/salmanmkc"><code>@​salmanmkc</code></a> in <a
href="https://redirect.github.com/actions/setup-node/pull/1325">actions/setup-node#1325</a></li>
</ul>
<p>Make sure your runner is on version v2.327.1 or later to ensure
compatibility with this release. <a
href="https://github.com/actions/runner/releases/tag/v2.327.1">See
Release Notes</a></p>
<h3>Dependency Upgrades</h3>
<ul>
<li>Upgrade <code>@​octokit/request-error</code> and
<code>@​actions/github</code> by <a
href="https://github.com/dependabot"><code>@​dependabot</code></a>[bot]
in <a
href="https://redirect.github.com/actions/setup-node/pull/1227">actions/setup-node#1227</a></li>
<li>Upgrade uuid from 9.0.1 to 11.1.0 by <a
href="https://github.com/dependabot"><code>@​dependabot</code></a>[bot]
in <a
href="https://redirect.github.com/actions/setup-node/pull/1273">actions/setup-node#1273</a></li>
<li>Upgrade undici from 5.28.5 to 5.29.0 by <a
href="https://github.com/dependabot"><code>@​dependabot</code></a>[bot]
in <a
href="https://redirect.github.com/actions/setup-node/pull/1295">actions/setup-node#1295</a></li>
<li>Upgrade form-data to bring in fix for critical vulnerability by <a
href="https://github.com/gowridurgad"><code>@​gowridurgad</code></a> in
<a
href="https://redirect.github.com/actions/setup-node/pull/1332">actions/setup-node#1332</a></li>
<li>Upgrade actions/checkout from 4 to 5 by <a
href="https://github.com/dependabot"><code>@​dependabot</code></a>[bot]
in <a
href="https://redirect.github.com/actions/setup-node/pull/1345">actions/setup-node#1345</a></li>
</ul>
<h2>New Contributors</h2>
<ul>
<li><a
href="https://github.com/priya-kinthali"><code>@​priya-kinthali</code></a>
made their first contribution in <a
href="https://redirect.github.com/actions/setup-node/pull/1348">actions/setup-node#1348</a></li>
<li><a href="https://github.com/salmanmkc"><code>@​salmanmkc</code></a>
made their first contribution in <a
href="https://redirect.github.com/actions/setup-node/pull/1325">actions/setup-node#1325</a></li>
</ul>
<p><strong>Full Changelog</strong>: <a
href="https://github.com/actions/setup-node/compare/v4...v5.0.0">https://github.com/actions/setup-node/compare/v4...v5.0.0</a></p>
<h2>v4.4.0</h2>
<!-- raw HTML omitted -->
</blockquote>
<p>... (truncated)</p>
</details>
<details>
<summary>Commits</summary>
<ul>
<li><a
href="2028fbc5c2"><code>2028fbc</code></a>
Limit automatic caching to npm, update workflows and documentation (<a
href="https://redirect.github.com/actions/setup-node/issues/1374">#1374</a>)</li>
<li><a
href="13427813f7"><code>1342781</code></a>
Bump actions/publish-action from 0.3.0 to 0.4.0 (<a
href="https://redirect.github.com/actions/setup-node/issues/1362">#1362</a>)</li>
<li><a
href="89d709d423"><code>89d709d</code></a>
Bump prettier from 2.8.8 to 3.6.2 (<a
href="https://redirect.github.com/actions/setup-node/issues/1334">#1334</a>)</li>
<li><a
href="cd2651c462"><code>cd2651c</code></a>
Bump ts-jest from 29.1.2 to 29.4.1 (<a
href="https://redirect.github.com/actions/setup-node/issues/1336">#1336</a>)</li>
<li><a
href="a0853c2454"><code>a0853c2</code></a>
Bump actions/checkout from 4 to 5 (<a
href="https://redirect.github.com/actions/setup-node/issues/1345">#1345</a>)</li>
<li><a
href="b7234cc9fe"><code>b7234cc</code></a>
Upgrade action to use node24 (<a
href="https://redirect.github.com/actions/setup-node/issues/1325">#1325</a>)</li>
<li><a
href="d7a11313b5"><code>d7a1131</code></a>
Enhance caching in setup-node with automatic package manager detection
(<a
href="https://redirect.github.com/actions/setup-node/issues/1348">#1348</a>)</li>
<li><a
href="5e2628c959"><code>5e2628c</code></a>
Bumps form-data (<a
href="https://redirect.github.com/actions/setup-node/issues/1332">#1332</a>)</li>
<li><a
href="65beceff8e"><code>65becef</code></a>
Bump undici from 5.28.5 to 5.29.0 (<a
href="https://redirect.github.com/actions/setup-node/issues/1295">#1295</a>)</li>
<li><a
href="7e24a656e1"><code>7e24a65</code></a>
Bump uuid from 9.0.1 to 11.1.0 (<a
href="https://redirect.github.com/actions/setup-node/issues/1273">#1273</a>)</li>
<li>Additional commits viewable in <a
href="https://github.com/actions/setup-node/compare/v4...v6">compare
view</a></li>
</ul>
</details>
<br />


[![Dependabot compatibility
score](https://dependabot-badges.githubapp.com/badges/compatibility_score?dependency-name=actions/setup-node&package-manager=github_actions&previous-version=4&new-version=6)](https://docs.github.com/en/github/managing-security-vulnerabilities/about-dependabot-security-updates#about-compatibility-scores)

You can trigger a rebase of this PR by commenting `@dependabot rebase`.

[//]: # (dependabot-automerge-start)
[//]: # (dependabot-automerge-end)

---

<details>
<summary>Dependabot commands and options</summary>
<br />

You can trigger Dependabot actions by commenting on this PR:
- `@dependabot rebase` will rebase this PR
- `@dependabot recreate` will recreate this PR, overwriting any edits
that have been made to it
- `@dependabot merge` will merge this PR after your CI passes on it
- `@dependabot squash and merge` will squash and merge this PR after
your CI passes on it
- `@dependabot cancel merge` will cancel a previously requested merge
and block automerging
- `@dependabot reopen` will reopen this PR if it is closed
- `@dependabot close` will close this PR and stop Dependabot recreating
it. You can achieve the same result by closing it manually
- `@dependabot show <dependency name> ignore conditions` will show all
of the ignore conditions of the specified dependency
- `@dependabot ignore this major version` will close this PR and stop
Dependabot creating any more for this major version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this minor version` will close this PR and stop
Dependabot creating any more for this minor version (unless you reopen
the PR or upgrade to it yourself)
- `@dependabot ignore this dependency` will close this PR and stop
Dependabot creating any more for this dependency (unless you reopen the
PR or upgrade to it yourself)


</details>

> **Note**
> Automatic rebases have been disabled on this pull request as it has
been open for over 30 days.

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Nick Tindle <nick@ntindle.com>
2026-02-09 07:11:21 +00:00
Zamil Majdy
7d32c83f95 fix(backend/chat): Handle non-serializable SDK objects in tool result output 2026-02-09 10:59:50 +04:00
Zamil Majdy
6e2a45b84e style(backend): Remove unused pytest import in execution_queue_test 2026-02-09 10:14:20 +04:00
Zamil Majdy
32f6532e9c Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into feat/copitlot-claude-code 2026-02-09 10:10:32 +04:00
Zamil Majdy
0bbe8a184d Merge dev and resolve poetry.lock conflict 2026-02-08 19:40:17 +04:00
Zamil Majdy
7592deed63 fix(backend/chat): Address remaining PR review comments
- Fix tool_call_id always being "sdk-call" by generating unique IDs per invocation
- Fix validation using original tool_name instead of clean_name in security hooks
- Fix duplicate StreamFinish in Anthropic fallback path
- Fix ImportError fallback returning plain dict instead of re-raising
- Extract _build_input_schema helper to deduplicate schema construction
- Add else branch for unhandled SDK message types for observability
- Truncate large tool results in conversation history to prevent context overflow
2026-02-08 19:39:10 +04:00
Zamil Majdy
b9c759ce4f fix(backend/chat): Address additional PR review comments
- Add terminal StreamFinish in adapt_sdk_stream if SDK ends without one
- Sanitize error message in adapt_sdk_stream exception handler
- Pass full JSON schema (type, properties, required) to tool decorator
2026-02-08 07:14:45 +04:00
Zamil Majdy
5efb80d47b fix(backend/chat): Address PR review comments for Claude SDK integration
- Add StreamFinish after ErrorMessage in response adapter
- Fix str.replace to removeprefix in security hooks
- Apply max_context_messages limit as safety guard in history formatting
- Add empty prompt guard before sending to SDK
- Sanitize error messages to avoid exposing internal details
- Fix fire-and-forget asyncio.create_task by storing task reference
- Fix tool_calls population on assistant messages
- Rewrite Anthropic fallback to persist messages and merge consecutive roles
- Only use ANTHROPIC_API_KEY for fallback (not OpenRouter keys)
- Fix IndexError when tool result content list is empty
2026-02-06 13:25:10 +04:00
Zamil Majdy
b49d8e2cba fix lock 2026-02-06 13:19:53 +04:00
Zamil Majdy
452544530d feat(chat/sdk): Enable native SDK context compaction
- Remove manual truncation in conversation history formatting
- SDK's automatic compaction handles context limits intelligently
- Add observability hooks:
  - PreCompact: Log when SDK triggers context compaction
  - PostToolUse: Log successful tool executions
  - PostToolUseFailure: Log and debug failed tool executions
- Update config: increase max_context_messages (SDK handles compaction)
2026-02-06 12:44:48 +04:00
Zamil Majdy
32ee7e6cf8 fix(chat): Remove aggressive stale task detection
The 60-second timeout was too aggressive and could incorrectly mark
legitimate long-running tool calls as stale. Relying on Redis TTL
(1 hour) for cleanup is sufficient and more reliable.
2026-02-06 11:45:54 +04:00
Zamil Majdy
670663c406 Merge dev and resolve poetry.lock conflict 2026-02-06 11:40:41 +04:00
Zamil Majdy
0dbe4cf51e feat(backend/chat): Add Claude Agent SDK integration for CoPilot
This PR adds Claude Agent SDK as the default backend for CoPilot chat completions,
replacing the direct OpenAI API integration.

Key changes:
- Add Claude Agent SDK service layer with MCP tool adapter
- Fix message persistence after tool calls (messages no longer disappear on refresh)
- Add OpenRouter tracing for session title generation
- Add security hooks for user context validation
- Add Anthropic fallback when SDK is not available
- Clean up excessive debug logging
2026-02-06 11:38:17 +04:00
29 changed files with 3265 additions and 421 deletions

View File

@@ -78,7 +78,7 @@ jobs:
# Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml) # Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml)
- name: Set up Node.js - name: Set up Node.js
uses: actions/setup-node@v4 uses: actions/setup-node@v6
with: with:
node-version: "22" node-version: "22"

View File

@@ -94,7 +94,7 @@ jobs:
# Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml) # Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml)
- name: Set up Node.js - name: Set up Node.js
uses: actions/setup-node@v4 uses: actions/setup-node@v6
with: with:
node-version: "22" node-version: "22"

View File

@@ -76,7 +76,7 @@ jobs:
# Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml) # Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml)
- name: Set up Node.js - name: Set up Node.js
uses: actions/setup-node@v4 uses: actions/setup-node@v6
with: with:
node-version: "22" node-version: "22"

View File

@@ -42,7 +42,7 @@ jobs:
- 'autogpt_platform/frontend/src/components/**' - 'autogpt_platform/frontend/src/components/**'
- name: Set up Node.js - name: Set up Node.js
uses: actions/setup-node@v4 uses: actions/setup-node@v6
with: with:
node-version: "22.18.0" node-version: "22.18.0"
@@ -74,7 +74,7 @@ jobs:
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Set up Node.js - name: Set up Node.js
uses: actions/setup-node@v4 uses: actions/setup-node@v6
with: with:
node-version: "22.18.0" node-version: "22.18.0"
@@ -112,7 +112,7 @@ jobs:
fetch-depth: 0 fetch-depth: 0
- name: Set up Node.js - name: Set up Node.js
uses: actions/setup-node@v4 uses: actions/setup-node@v6
with: with:
node-version: "22.18.0" node-version: "22.18.0"
@@ -153,7 +153,7 @@ jobs:
submodules: recursive submodules: recursive
- name: Set up Node.js - name: Set up Node.js
uses: actions/setup-node@v4 uses: actions/setup-node@v6
with: with:
node-version: "22.18.0" node-version: "22.18.0"
@@ -282,7 +282,7 @@ jobs:
submodules: recursive submodules: recursive
- name: Set up Node.js - name: Set up Node.js
uses: actions/setup-node@v4 uses: actions/setup-node@v6
with: with:
node-version: "22.18.0" node-version: "22.18.0"

View File

@@ -32,7 +32,7 @@ jobs:
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Set up Node.js - name: Set up Node.js
uses: actions/setup-node@v4 uses: actions/setup-node@v6
with: with:
node-version: "22.18.0" node-version: "22.18.0"
@@ -68,7 +68,7 @@ jobs:
submodules: recursive submodules: recursive
- name: Set up Node.js - name: Set up Node.js
uses: actions/setup-node@v4 uses: actions/setup-node@v6
with: with:
node-version: "22.18.0" node-version: "22.18.0"

View File

@@ -27,12 +27,11 @@ class ChatConfig(BaseSettings):
session_ttl: int = Field(default=43200, description="Session TTL in seconds") session_ttl: int = Field(default=43200, description="Session TTL in seconds")
# Streaming Configuration # Streaming Configuration
max_context_messages: int = Field(
default=50, ge=1, le=200, description="Maximum context messages"
)
stream_timeout: int = Field(default=300, description="Stream timeout in seconds") stream_timeout: int = Field(default=300, description="Stream timeout in seconds")
max_retries: int = Field(default=3, description="Maximum number of retries") max_retries: int = Field(
default=3,
description="Max retries for fallback path (SDK handles retries internally)",
)
max_agent_runs: int = Field(default=30, description="Maximum number of agent runs") max_agent_runs: int = Field(default=30, description="Maximum number of agent runs")
max_agent_schedules: int = Field( max_agent_schedules: int = Field(
default=30, description="Maximum number of agent schedules" default=30, description="Maximum number of agent schedules"
@@ -93,6 +92,12 @@ class ChatConfig(BaseSettings):
description="Name of the prompt in Langfuse to fetch", description="Name of the prompt in Langfuse to fetch",
) )
# Claude Agent SDK Configuration
use_claude_agent_sdk: bool = Field(
default=True,
description="Use Claude Agent SDK for chat completions",
)
@field_validator("api_key", mode="before") @field_validator("api_key", mode="before")
@classmethod @classmethod
def get_api_key(cls, v): def get_api_key(cls, v):
@@ -132,6 +137,17 @@ class ChatConfig(BaseSettings):
v = os.getenv("CHAT_INTERNAL_API_KEY") v = os.getenv("CHAT_INTERNAL_API_KEY")
return v return v
@field_validator("use_claude_agent_sdk", mode="before")
@classmethod
def get_use_claude_agent_sdk(cls, v):
"""Get use_claude_agent_sdk from environment if not provided."""
# Check environment variable - default to True if not set
env_val = os.getenv("CHAT_USE_CLAUDE_AGENT_SDK", "").lower()
if env_val:
return env_val in ("true", "1", "yes", "on")
# Default to True (SDK enabled by default)
return True if v is None else v
# Prompt paths for different contexts # Prompt paths for different contexts
PROMPT_PATHS: dict[str, str] = { PROMPT_PATHS: dict[str, str] = {
"default": "prompts/chat_system.md", "default": "prompts/chat_system.md",

View File

@@ -45,10 +45,7 @@ async def create_chat_session(
successfulAgentRuns=SafeJson({}), successfulAgentRuns=SafeJson({}),
successfulAgentSchedules=SafeJson({}), successfulAgentSchedules=SafeJson({}),
) )
return await PrismaChatSession.prisma().create( return await PrismaChatSession.prisma().create(data=data)
data=data,
include={"Messages": True},
)
async def update_chat_session( async def update_chat_session(

View File

@@ -273,9 +273,8 @@ async def _get_session_from_cache(session_id: str) -> ChatSession | None:
try: try:
session = ChatSession.model_validate_json(raw_session) session = ChatSession.model_validate_json(raw_session)
logger.info( logger.info(
f"Loading session {session_id} from cache: " f"[CACHE] Loaded session {session_id}: {len(session.messages)} messages, "
f"message_count={len(session.messages)}, " f"last_roles={[m.role for m in session.messages[-3:]]}" # Last 3 roles
f"roles={[m.role for m in session.messages]}"
) )
return session return session
except Exception as e: except Exception as e:
@@ -317,11 +316,9 @@ async def _get_session_from_db(session_id: str) -> ChatSession | None:
return None return None
messages = prisma_session.Messages messages = prisma_session.Messages
logger.info( logger.debug(
f"Loading session {session_id} from DB: " f"[DB] Loaded session {session_id}: {len(messages) if messages else 0} messages, "
f"has_messages={messages is not None}, " f"roles={[m.role for m in messages[-3:]] if messages else []}" # Last 3 roles
f"message_count={len(messages) if messages else 0}, "
f"roles={[m.role for m in messages] if messages else []}"
) )
return ChatSession.from_db(prisma_session, messages) return ChatSession.from_db(prisma_session, messages)
@@ -372,10 +369,9 @@ async def _save_session_to_db(
"function_call": msg.function_call, "function_call": msg.function_call,
} }
) )
logger.info( logger.debug(
f"Saving {len(new_messages)} new messages to DB for session {session.session_id}: " f"[DB] Saving {len(new_messages)} messages to session {session.session_id}, "
f"roles={[m['role'] for m in messages_data]}, " f"roles={[m['role'] for m in messages_data]}"
f"start_sequence={existing_message_count}"
) )
await chat_db.add_chat_messages_batch( await chat_db.add_chat_messages_batch(
session_id=session.session_id, session_id=session.session_id,
@@ -415,7 +411,7 @@ async def get_chat_session(
logger.warning(f"Unexpected cache error for session {session_id}: {e}") logger.warning(f"Unexpected cache error for session {session_id}: {e}")
# Fall back to database # Fall back to database
logger.info(f"Session {session_id} not in cache, checking database") logger.debug(f"Session {session_id} not in cache, checking database")
session = await _get_session_from_db(session_id) session = await _get_session_from_db(session_id)
if session is None: if session is None:
@@ -432,7 +428,6 @@ async def get_chat_session(
# Cache the session from DB # Cache the session from DB
try: try:
await _cache_session(session) await _cache_session(session)
logger.info(f"Cached session {session_id} from database")
except Exception as e: except Exception as e:
logger.warning(f"Failed to cache session {session_id}: {e}") logger.warning(f"Failed to cache session {session_id}: {e}")
@@ -603,13 +598,19 @@ async def update_session_title(session_id: str, title: str) -> bool:
logger.warning(f"Session {session_id} not found for title update") logger.warning(f"Session {session_id} not found for title update")
return False return False
# Invalidate cache so next fetch gets updated title # Update title in cache if it exists (instead of invalidating).
# This prevents race conditions where cache invalidation causes
# the frontend to see stale DB data while streaming is still in progress.
try: try:
redis_key = _get_session_cache_key(session_id) cached = await _get_session_from_cache(session_id)
async_redis = await get_redis_async() if cached:
await async_redis.delete(redis_key) cached.title = title
await _cache_session(cached)
except Exception as e: except Exception as e:
logger.warning(f"Failed to invalidate cache for session {session_id}: {e}") # Not critical - title will be correct on next full cache refresh
logger.warning(
f"Failed to update title in cache for session {session_id}: {e}"
)
return True return True
except Exception as e: except Exception as e:

View File

@@ -1,5 +1,6 @@
"""Chat API routes for chat session management and streaming via SSE.""" """Chat API routes for chat session management and streaming via SSE."""
import asyncio
import logging import logging
import uuid as uuid_module import uuid as uuid_module
from collections.abc import AsyncGenerator from collections.abc import AsyncGenerator
@@ -16,8 +17,17 @@ from . import service as chat_service
from . import stream_registry from . import stream_registry
from .completion_handler import process_operation_failure, process_operation_success from .completion_handler import process_operation_failure, process_operation_success
from .config import ChatConfig from .config import ChatConfig
from .model import ChatSession, create_chat_session, get_chat_session, get_user_sessions from .model import (
ChatMessage,
ChatSession,
create_chat_session,
get_chat_session,
get_user_sessions,
upsert_chat_session,
)
from .response_model import StreamFinish, StreamHeartbeat, StreamStart from .response_model import StreamFinish, StreamHeartbeat, StreamStart
from .sdk import service as sdk_service
from .tracking import track_user_message
config = ChatConfig() config = ChatConfig()
@@ -209,6 +219,10 @@ async def get_session(
active_task, last_message_id = await stream_registry.get_active_task_for_session( active_task, last_message_id = await stream_registry.get_active_task_for_session(
session_id, user_id session_id, user_id
) )
logger.info(
f"[GET_SESSION] session={session_id}, active_task={active_task is not None}, "
f"msg_count={len(messages)}, last_role={messages[-1].get('role') if messages else 'none'}"
)
if active_task: if active_task:
# Filter out the in-progress assistant message from the session response. # Filter out the in-progress assistant message from the session response.
# The client will receive the complete assistant response through the SSE # The client will receive the complete assistant response through the SSE
@@ -266,12 +280,59 @@ async def stream_chat_post(
""" """
import asyncio import asyncio
import time
stream_start_time = time.perf_counter()
# Base log metadata (task_id added after creation)
log_meta = {"component": "ChatStream", "session_id": session_id}
if user_id:
log_meta["user_id"] = user_id
logger.info(
f"[TIMING] stream_chat_post STARTED, session={session_id}, "
f"user={user_id}, message_len={len(request.message)}",
extra={"json_fields": log_meta},
)
session = await _validate_and_get_session(session_id, user_id) session = await _validate_and_get_session(session_id, user_id)
logger.info(
f"[TIMING] session validated in {(time.perf_counter() - stream_start_time)*1000:.1f}ms",
extra={
"json_fields": {
**log_meta,
"duration_ms": (time.perf_counter() - stream_start_time) * 1000,
}
},
)
# Add user message to session BEFORE creating task to avoid race condition
# where GET_SESSION sees the task as "running" but the message isn't saved yet
if request.message:
session.messages.append(
ChatMessage(
role="user" if request.is_user_message else "assistant",
content=request.message,
)
)
if request.is_user_message:
track_user_message(
user_id=user_id,
session_id=session_id,
message_length=len(request.message),
)
logger.info(
f"[STREAM] Saving user message to session {session_id}, "
f"msg_count={len(session.messages)}"
)
session = await upsert_chat_session(session)
logger.info(f"[STREAM] User message saved for session {session_id}")
# Create a task in the stream registry for reconnection support # Create a task in the stream registry for reconnection support
task_id = str(uuid_module.uuid4()) task_id = str(uuid_module.uuid4())
operation_id = str(uuid_module.uuid4()) operation_id = str(uuid_module.uuid4())
log_meta["task_id"] = task_id
task_create_start = time.perf_counter()
await stream_registry.create_task( await stream_registry.create_task(
task_id=task_id, task_id=task_id,
session_id=session_id, session_id=session_id,
@@ -280,72 +341,260 @@ async def stream_chat_post(
tool_name="chat", tool_name="chat",
operation_id=operation_id, operation_id=operation_id,
) )
logger.info(
f"[TIMING] create_task completed in {(time.perf_counter() - task_create_start)*1000:.1f}ms",
extra={
"json_fields": {
**log_meta,
"duration_ms": (time.perf_counter() - task_create_start) * 1000,
}
},
)
# Background task that runs the AI generation independently of SSE connection # Background task that runs the AI generation independently of SSE connection
async def run_ai_generation(): async def run_ai_generation():
import time as time_module
gen_start_time = time_module.perf_counter()
logger.info(
f"[TIMING] run_ai_generation STARTED, task={task_id}, session={session_id}, user={user_id}",
extra={"json_fields": log_meta},
)
first_chunk_time, ttfc = None, None
chunk_count = 0
try: try:
# Emit a start event with task_id for reconnection # Emit a start event with task_id for reconnection
start_chunk = StreamStart(messageId=task_id, taskId=task_id) start_chunk = StreamStart(messageId=task_id, taskId=task_id)
await stream_registry.publish_chunk(task_id, start_chunk) await stream_registry.publish_chunk(task_id, start_chunk)
logger.info(
f"[TIMING] StreamStart published at {(time_module.perf_counter() - gen_start_time)*1000:.1f}ms",
extra={
"json_fields": {
**log_meta,
"elapsed_ms": (time_module.perf_counter() - gen_start_time)
* 1000,
}
},
)
async for chunk in chat_service.stream_chat_completion( # Choose service based on configuration
use_sdk = config.use_claude_agent_sdk
stream_fn = (
sdk_service.stream_chat_completion_sdk
if use_sdk
else chat_service.stream_chat_completion
)
logger.info(
f"[TIMING] Calling {'sdk' if use_sdk else 'standard'} stream_chat_completion",
extra={"json_fields": log_meta},
)
# Pass message=None since we already added it to the session above
async for chunk in stream_fn(
session_id, session_id,
request.message, None, # Message already in session
is_user_message=request.is_user_message, is_user_message=request.is_user_message,
user_id=user_id, user_id=user_id,
session=session, # Pass pre-fetched session to avoid double-fetch session=session, # Pass session with message already added
context=request.context, context=request.context,
): ):
chunk_count += 1
if first_chunk_time is None:
first_chunk_time = time_module.perf_counter()
ttfc = first_chunk_time - gen_start_time
logger.info(
f"[TIMING] FIRST AI CHUNK at {ttfc:.2f}s, type={type(chunk).__name__}",
extra={
"json_fields": {
**log_meta,
"chunk_type": type(chunk).__name__,
"time_to_first_chunk_ms": ttfc * 1000,
}
},
)
# Write to Redis (subscribers will receive via XREAD) # Write to Redis (subscribers will receive via XREAD)
await stream_registry.publish_chunk(task_id, chunk) await stream_registry.publish_chunk(task_id, chunk)
# Mark task as completed gen_end_time = time_module.perf_counter()
total_time = (gen_end_time - gen_start_time) * 1000
logger.info(
f"[TIMING] run_ai_generation FINISHED in {total_time/1000:.1f}s; "
f"task={task_id}, session={session_id}, "
f"ttfc={ttfc or -1:.2f}s, n_chunks={chunk_count}",
extra={
"json_fields": {
**log_meta,
"total_time_ms": total_time,
"time_to_first_chunk_ms": (
ttfc * 1000 if ttfc is not None else None
),
"n_chunks": chunk_count,
}
},
)
await stream_registry.mark_task_completed(task_id, "completed") await stream_registry.mark_task_completed(task_id, "completed")
except Exception as e: except Exception as e:
elapsed = time_module.perf_counter() - gen_start_time
logger.error( logger.error(
f"Error in background AI generation for session {session_id}: {e}" f"[TIMING] run_ai_generation ERROR after {elapsed:.2f}s: {e}",
extra={
"json_fields": {
**log_meta,
"elapsed_ms": elapsed * 1000,
"error": str(e),
}
},
) )
await stream_registry.mark_task_completed(task_id, "failed") await stream_registry.mark_task_completed(task_id, "failed")
# Start the AI generation in a background task # Start the AI generation in a background task
bg_task = asyncio.create_task(run_ai_generation()) bg_task = asyncio.create_task(run_ai_generation())
await stream_registry.set_task_asyncio_task(task_id, bg_task) await stream_registry.set_task_asyncio_task(task_id, bg_task)
setup_time = (time.perf_counter() - stream_start_time) * 1000
logger.info(
f"[TIMING] Background task started, setup={setup_time:.1f}ms",
extra={"json_fields": {**log_meta, "setup_time_ms": setup_time}},
)
# SSE endpoint that subscribes to the task's stream # SSE endpoint that subscribes to the task's stream
async def event_generator() -> AsyncGenerator[str, None]: async def event_generator() -> AsyncGenerator[str, None]:
import time as time_module
event_gen_start = time_module.perf_counter()
logger.info(
f"[TIMING] event_generator STARTED, task={task_id}, session={session_id}, "
f"user={user_id}",
extra={"json_fields": log_meta},
)
subscriber_queue = None subscriber_queue = None
first_chunk_yielded = False
chunks_yielded = 0
try: try:
# Subscribe to the task stream (this replays existing messages + live updates) # Subscribe to the task stream (this replays existing messages + live updates)
subscribe_start = time_module.perf_counter()
logger.info(
"[TIMING] Calling subscribe_to_task",
extra={"json_fields": log_meta},
)
subscriber_queue = await stream_registry.subscribe_to_task( subscriber_queue = await stream_registry.subscribe_to_task(
task_id=task_id, task_id=task_id,
user_id=user_id, user_id=user_id,
last_message_id="0-0", # Get all messages from the beginning last_message_id="0-0", # Get all messages from the beginning
) )
subscribe_time = (time_module.perf_counter() - subscribe_start) * 1000
logger.info(
f"[TIMING] subscribe_to_task completed in {subscribe_time:.1f}ms, "
f"queue_ok={subscriber_queue is not None}",
extra={
"json_fields": {
**log_meta,
"duration_ms": subscribe_time,
"queue_obtained": subscriber_queue is not None,
}
},
)
if subscriber_queue is None: if subscriber_queue is None:
logger.info(
"[TIMING] subscriber_queue is None, yielding finish",
extra={"json_fields": log_meta},
)
yield StreamFinish().to_sse() yield StreamFinish().to_sse()
yield "data: [DONE]\n\n" yield "data: [DONE]\n\n"
return return
# Read from the subscriber queue and yield to SSE # Read from the subscriber queue and yield to SSE
logger.info(
"[TIMING] Starting to read from subscriber_queue",
extra={"json_fields": log_meta},
)
while True: while True:
try: try:
queue_wait_start = time_module.perf_counter()
chunk = await asyncio.wait_for(subscriber_queue.get(), timeout=30.0) chunk = await asyncio.wait_for(subscriber_queue.get(), timeout=30.0)
queue_wait_time = (
time_module.perf_counter() - queue_wait_start
) * 1000
chunks_yielded += 1
if not first_chunk_yielded:
first_chunk_yielded = True
elapsed = time_module.perf_counter() - event_gen_start
logger.info(
f"[TIMING] FIRST CHUNK from queue at {elapsed:.2f}s, "
f"type={type(chunk).__name__}, "
f"wait={queue_wait_time:.1f}ms",
extra={
"json_fields": {
**log_meta,
"chunk_type": type(chunk).__name__,
"elapsed_ms": elapsed * 1000,
"queue_wait_ms": queue_wait_time,
}
},
)
elif chunks_yielded % 50 == 0:
logger.info(
f"[TIMING] Chunk #{chunks_yielded}, "
f"type={type(chunk).__name__}",
extra={
"json_fields": {
**log_meta,
"chunk_number": chunks_yielded,
"chunk_type": type(chunk).__name__,
}
},
)
yield chunk.to_sse() yield chunk.to_sse()
# Check for finish signal # Check for finish signal
if isinstance(chunk, StreamFinish): if isinstance(chunk, StreamFinish):
total_time = time_module.perf_counter() - event_gen_start
logger.info(
f"[TIMING] StreamFinish received in {total_time:.2f}s; "
f"n_chunks={chunks_yielded}",
extra={
"json_fields": {
**log_meta,
"chunks_yielded": chunks_yielded,
"total_time_ms": total_time * 1000,
}
},
)
break break
except asyncio.TimeoutError: except asyncio.TimeoutError:
# Send heartbeat to keep connection alive # Send heartbeat to keep connection alive
logger.info(
f"[TIMING] Heartbeat timeout, chunks_so_far={chunks_yielded}",
extra={
"json_fields": {**log_meta, "chunks_so_far": chunks_yielded}
},
)
yield StreamHeartbeat().to_sse() yield StreamHeartbeat().to_sse()
except GeneratorExit: except GeneratorExit:
logger.info(
f"[TIMING] GeneratorExit (client disconnected), chunks={chunks_yielded}",
extra={
"json_fields": {
**log_meta,
"chunks_yielded": chunks_yielded,
"reason": "client_disconnect",
}
},
)
pass # Client disconnected - background task continues pass # Client disconnected - background task continues
except Exception as e: except Exception as e:
logger.error(f"Error in SSE stream for task {task_id}: {e}") elapsed = (time_module.perf_counter() - event_gen_start) * 1000
logger.error(
f"[TIMING] event_generator ERROR after {elapsed:.1f}ms: {e}",
extra={
"json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)}
},
)
finally: finally:
# Unsubscribe when client disconnects or stream ends to prevent resource leak # Unsubscribe when client disconnects or stream ends
if subscriber_queue is not None: if subscriber_queue is not None:
try: try:
await stream_registry.unsubscribe_from_task( await stream_registry.unsubscribe_from_task(
@@ -357,6 +606,18 @@ async def stream_chat_post(
exc_info=True, exc_info=True,
) )
# AI SDK protocol termination - always yield even if unsubscribe fails # AI SDK protocol termination - always yield even if unsubscribe fails
total_time = time_module.perf_counter() - event_gen_start
logger.info(
f"[TIMING] event_generator FINISHED in {total_time:.2f}s; "
f"task={task_id}, session={session_id}, n_chunks={chunks_yielded}",
extra={
"json_fields": {
**log_meta,
"total_time_ms": total_time * 1000,
"chunks_yielded": chunks_yielded,
}
},
)
yield "data: [DONE]\n\n" yield "data: [DONE]\n\n"
return StreamingResponse( return StreamingResponse(
@@ -400,35 +661,21 @@ async def stream_chat_get(
session = await _validate_and_get_session(session_id, user_id) session = await _validate_and_get_session(session_id, user_id)
async def event_generator() -> AsyncGenerator[str, None]: async def event_generator() -> AsyncGenerator[str, None]:
chunk_count = 0 # Choose service based on configuration
first_chunk_type: str | None = None use_sdk = config.use_claude_agent_sdk
async for chunk in chat_service.stream_chat_completion( stream_fn = (
sdk_service.stream_chat_completion_sdk
if use_sdk
else chat_service.stream_chat_completion
)
async for chunk in stream_fn(
session_id, session_id,
message, message,
is_user_message=is_user_message, is_user_message=is_user_message,
user_id=user_id, user_id=user_id,
session=session, # Pass pre-fetched session to avoid double-fetch session=session, # Pass pre-fetched session to avoid double-fetch
): ):
if chunk_count < 3:
logger.info(
"Chat stream chunk",
extra={
"session_id": session_id,
"chunk_type": str(chunk.type),
},
)
if not first_chunk_type:
first_chunk_type = str(chunk.type)
chunk_count += 1
yield chunk.to_sse() yield chunk.to_sse()
logger.info(
"Chat stream completed",
extra={
"session_id": session_id,
"chunk_count": chunk_count,
"first_chunk_type": first_chunk_type,
},
)
# AI SDK protocol termination # AI SDK protocol termination
yield "data: [DONE]\n\n" yield "data: [DONE]\n\n"
@@ -550,8 +797,6 @@ async def stream_task(
) )
async def event_generator() -> AsyncGenerator[str, None]: async def event_generator() -> AsyncGenerator[str, None]:
import asyncio
heartbeat_interval = 15.0 # Send heartbeat every 15 seconds heartbeat_interval = 15.0 # Send heartbeat every 15 seconds
try: try:
while True: while True:

View File

@@ -0,0 +1,14 @@
"""Claude Agent SDK integration for CoPilot.
This module provides the integration layer between the Claude Agent SDK
and the existing CoPilot tool system, enabling drop-in replacement of
the current LLM orchestration with the battle-tested Claude Agent SDK.
"""
from .service import stream_chat_completion_sdk
from .tool_adapter import create_copilot_mcp_server
__all__ = [
"stream_chat_completion_sdk",
"create_copilot_mcp_server",
]

View File

@@ -0,0 +1,348 @@
"""Anthropic SDK fallback implementation.
This module provides the fallback streaming implementation using the Anthropic SDK
directly when the Claude Agent SDK is not available.
"""
import json
import logging
import os
import uuid
from collections.abc import AsyncGenerator
from typing import Any, cast
from ..model import ChatMessage, ChatSession
from ..response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamTextDelta,
StreamTextEnd,
StreamTextStart,
StreamToolInputAvailable,
StreamToolInputStart,
StreamToolOutputAvailable,
StreamUsage,
)
from .tool_adapter import get_tool_definitions, get_tool_handlers
logger = logging.getLogger(__name__)
async def stream_with_anthropic(
session: ChatSession,
system_prompt: str,
text_block_id: str,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Stream using Anthropic SDK directly with tool calling support.
This function accumulates messages into the session for persistence.
The caller should NOT yield an additional StreamFinish - this function handles it.
"""
import anthropic
# Only use ANTHROPIC_API_KEY - don't fall back to OpenRouter keys
api_key = os.getenv("ANTHROPIC_API_KEY")
if not api_key:
yield StreamError(
errorText="ANTHROPIC_API_KEY not configured for fallback",
code="config_error",
)
yield StreamFinish()
return
client = anthropic.AsyncAnthropic(api_key=api_key)
tool_definitions = get_tool_definitions()
tool_handlers = get_tool_handlers()
anthropic_tools = [
{
"name": t["name"],
"description": t["description"],
"input_schema": t["inputSchema"],
}
for t in tool_definitions
]
anthropic_messages = _convert_session_to_anthropic(session)
if not anthropic_messages or anthropic_messages[-1]["role"] != "user":
anthropic_messages.append(
{"role": "user", "content": "Continue with the task."}
)
has_started_text = False
max_iterations = 10
accumulated_text = ""
accumulated_tool_calls: list[dict[str, Any]] = []
for _ in range(max_iterations):
try:
async with client.messages.stream(
model="claude-sonnet-4-20250514",
max_tokens=4096,
system=system_prompt,
messages=cast(Any, anthropic_messages),
tools=cast(Any, anthropic_tools) if anthropic_tools else [],
) as stream:
async for event in stream:
if event.type == "content_block_start":
block = event.content_block
if hasattr(block, "type"):
if block.type == "text" and not has_started_text:
yield StreamTextStart(id=text_block_id)
has_started_text = True
elif block.type == "tool_use":
yield StreamToolInputStart(
toolCallId=block.id, toolName=block.name
)
elif event.type == "content_block_delta":
delta = event.delta
if hasattr(delta, "type") and delta.type == "text_delta":
accumulated_text += delta.text
yield StreamTextDelta(id=text_block_id, delta=delta.text)
final_message = await stream.get_final_message()
if final_message.stop_reason == "tool_use":
if has_started_text:
yield StreamTextEnd(id=text_block_id)
has_started_text = False
text_block_id = str(uuid.uuid4())
tool_results = []
assistant_content: list[dict[str, Any]] = []
for block in final_message.content:
if block.type == "text":
assistant_content.append(
{"type": "text", "text": block.text}
)
elif block.type == "tool_use":
assistant_content.append(
{
"type": "tool_use",
"id": block.id,
"name": block.name,
"input": block.input,
}
)
# Track tool call for session persistence
accumulated_tool_calls.append(
{
"id": block.id,
"type": "function",
"function": {
"name": block.name,
"arguments": json.dumps(
block.input
if isinstance(block.input, dict)
else {}
),
},
}
)
yield StreamToolInputAvailable(
toolCallId=block.id,
toolName=block.name,
input=(
block.input if isinstance(block.input, dict) else {}
),
)
output, is_error = await _execute_tool(
block.name, block.input, tool_handlers
)
yield StreamToolOutputAvailable(
toolCallId=block.id,
toolName=block.name,
output=output,
success=not is_error,
)
# Save tool result to session
session.messages.append(
ChatMessage(
role="tool",
content=output,
tool_call_id=block.id,
)
)
tool_results.append(
{
"type": "tool_result",
"tool_use_id": block.id,
"content": output,
"is_error": is_error,
}
)
# Save assistant message with tool calls to session
session.messages.append(
ChatMessage(
role="assistant",
content=accumulated_text or None,
tool_calls=(
accumulated_tool_calls
if accumulated_tool_calls
else None
),
)
)
# Reset for next iteration
accumulated_text = ""
accumulated_tool_calls = []
anthropic_messages.append(
{"role": "assistant", "content": assistant_content}
)
anthropic_messages.append({"role": "user", "content": tool_results})
continue
else:
if has_started_text:
yield StreamTextEnd(id=text_block_id)
# Save final assistant response to session
if accumulated_text:
session.messages.append(
ChatMessage(role="assistant", content=accumulated_text)
)
yield StreamUsage(
promptTokens=final_message.usage.input_tokens,
completionTokens=final_message.usage.output_tokens,
totalTokens=final_message.usage.input_tokens
+ final_message.usage.output_tokens,
)
yield StreamFinish()
return
except Exception as e:
logger.error(f"[Anthropic Fallback] Error: {e}", exc_info=True)
yield StreamError(
errorText="An error occurred. Please try again.",
code="anthropic_error",
)
yield StreamFinish()
return
yield StreamError(errorText="Max tool iterations reached", code="max_iterations")
yield StreamFinish()
def _convert_session_to_anthropic(session: ChatSession) -> list[dict[str, Any]]:
"""Convert session messages to Anthropic format.
Handles merging consecutive same-role messages (Anthropic requires alternating roles).
"""
messages: list[dict[str, Any]] = []
for msg in session.messages:
if msg.role == "user":
new_msg = {"role": "user", "content": msg.content or ""}
elif msg.role == "assistant":
content: list[dict[str, Any]] = []
if msg.content:
content.append({"type": "text", "text": msg.content})
if msg.tool_calls:
for tc in msg.tool_calls:
func = tc.get("function", {})
args = func.get("arguments", {})
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
args = {}
content.append(
{
"type": "tool_use",
"id": tc.get("id", str(uuid.uuid4())),
"name": func.get("name", ""),
"input": args,
}
)
if content:
new_msg = {"role": "assistant", "content": content}
else:
continue # Skip empty assistant messages
elif msg.role == "tool":
new_msg = {
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": msg.tool_call_id or "",
"content": msg.content or "",
}
],
}
else:
continue
messages.append(new_msg)
# Merge consecutive same-role messages (Anthropic requires alternating roles)
return _merge_consecutive_roles(messages)
def _merge_consecutive_roles(messages: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Merge consecutive messages with the same role.
Anthropic API requires alternating user/assistant roles.
"""
if not messages:
return []
merged: list[dict[str, Any]] = []
for msg in messages:
if merged and merged[-1]["role"] == msg["role"]:
# Merge with previous message
prev_content = merged[-1]["content"]
new_content = msg["content"]
# Normalize both to list-of-blocks form
if isinstance(prev_content, str):
prev_content = [{"type": "text", "text": prev_content}]
if isinstance(new_content, str):
new_content = [{"type": "text", "text": new_content}]
# Ensure both are lists
if not isinstance(prev_content, list):
prev_content = [prev_content]
if not isinstance(new_content, list):
new_content = [new_content]
merged[-1]["content"] = prev_content + new_content
else:
merged.append(msg)
return merged
async def _execute_tool(
tool_name: str, tool_input: Any, handlers: dict[str, Any]
) -> tuple[str, bool]:
"""Execute a tool and return (output, is_error)."""
handler = handlers.get(tool_name)
if not handler:
return f"Unknown tool: {tool_name}", True
try:
result = await handler(tool_input)
# Safely extract output - handle empty or missing content
content = result.get("content") or []
if content and isinstance(content, list) and len(content) > 0:
first_item = content[0]
output = first_item.get("text", "") if isinstance(first_item, dict) else ""
else:
output = ""
is_error = result.get("isError", False)
return output, is_error
except Exception as e:
return f"Error: {str(e)}", True

View File

@@ -0,0 +1,320 @@
"""Response adapter for converting Claude Agent SDK messages to Vercel AI SDK format.
This module provides the adapter layer that converts streaming messages from
the Claude Agent SDK into the Vercel AI SDK UI Stream Protocol format that
the frontend expects.
"""
import json
import logging
import uuid
from typing import Any, AsyncGenerator
from backend.api.features.chat.response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamHeartbeat,
StreamStart,
StreamTextDelta,
StreamTextEnd,
StreamTextStart,
StreamToolInputAvailable,
StreamToolInputStart,
StreamToolOutputAvailable,
StreamUsage,
)
logger = logging.getLogger(__name__)
class SDKResponseAdapter:
"""Adapter for converting Claude Agent SDK messages to Vercel AI SDK format.
This class maintains state during a streaming session to properly track
text blocks, tool calls, and message lifecycle.
"""
def __init__(self, message_id: str | None = None):
"""Initialize the adapter.
Args:
message_id: Optional message ID. If not provided, one will be generated.
"""
self.message_id = message_id or str(uuid.uuid4())
self.text_block_id = str(uuid.uuid4())
self.has_started_text = False
self.has_ended_text = False
self.current_tool_calls: dict[str, dict[str, Any]] = {}
self.task_id: str | None = None
def set_task_id(self, task_id: str) -> None:
"""Set the task ID for reconnection support."""
self.task_id = task_id
def convert_message(self, sdk_message: Any) -> list[StreamBaseResponse]:
"""Convert a single SDK message to Vercel AI SDK format.
Args:
sdk_message: A message from the Claude Agent SDK.
Returns:
List of StreamBaseResponse objects (may be empty or multiple).
"""
responses: list[StreamBaseResponse] = []
# Handle different SDK message types - use class name since SDK uses dataclasses
class_name = type(sdk_message).__name__
msg_subtype = getattr(sdk_message, "subtype", None)
if class_name == "SystemMessage":
if msg_subtype == "init":
# Session initialization - emit start
responses.append(
StreamStart(
messageId=self.message_id,
taskId=self.task_id,
)
)
elif class_name == "AssistantMessage":
# Assistant message with content blocks
content = getattr(sdk_message, "content", [])
for block in content:
# Check block type by class name (SDK uses dataclasses) or dict type
block_class = type(block).__name__
block_type = block.get("type") if isinstance(block, dict) else None
if block_class == "TextBlock" or block_type == "text":
# Text content
text = getattr(block, "text", None) or (
block.get("text") if isinstance(block, dict) else ""
)
if text:
# Start text block if needed (or restart after tool calls)
if not self.has_started_text or self.has_ended_text:
# Generate new text block ID for text after tools
if self.has_ended_text:
self.text_block_id = str(uuid.uuid4())
self.has_ended_text = False
responses.append(StreamTextStart(id=self.text_block_id))
self.has_started_text = True
# Emit text delta
responses.append(
StreamTextDelta(
id=self.text_block_id,
delta=text,
)
)
elif block_class == "ToolUseBlock" or block_type == "tool_use":
# Tool call
tool_id_raw = getattr(block, "id", None) or (
block.get("id") if isinstance(block, dict) else None
)
tool_id: str = (
str(tool_id_raw) if tool_id_raw else str(uuid.uuid4())
)
tool_name_raw = getattr(block, "name", None) or (
block.get("name") if isinstance(block, dict) else None
)
tool_name: str = str(tool_name_raw) if tool_name_raw else "unknown"
tool_input = getattr(block, "input", None) or (
block.get("input") if isinstance(block, dict) else {}
)
# End text block if we were streaming text
if self.has_started_text and not self.has_ended_text:
responses.append(StreamTextEnd(id=self.text_block_id))
self.has_ended_text = True
# Emit tool input start
responses.append(
StreamToolInputStart(
toolCallId=tool_id,
toolName=tool_name,
)
)
# Emit tool input available with full input
responses.append(
StreamToolInputAvailable(
toolCallId=tool_id,
toolName=tool_name,
input=tool_input if isinstance(tool_input, dict) else {},
)
)
# Track the tool call
self.current_tool_calls[tool_id] = {
"name": tool_name,
"input": tool_input,
}
elif class_name in ("ToolResultMessage", "UserMessage"):
# Tool result - check for tool_result content
content = getattr(sdk_message, "content", [])
for block in content:
block_class = type(block).__name__
block_type = block.get("type") if isinstance(block, dict) else None
if block_class == "ToolResultBlock" or block_type == "tool_result":
tool_use_id = getattr(block, "tool_use_id", None) or (
block.get("tool_use_id") if isinstance(block, dict) else None
)
result_content = getattr(block, "content", None) or (
block.get("content") if isinstance(block, dict) else ""
)
is_error = getattr(block, "is_error", False) or (
block.get("is_error", False)
if isinstance(block, dict)
else False
)
if tool_use_id:
tool_info = self.current_tool_calls.get(tool_use_id, {})
tool_name = tool_info.get("name", "unknown")
# Format the output
if isinstance(result_content, list):
# Extract text from content blocks
output_text = ""
for item in result_content:
if (
isinstance(item, dict)
and item.get("type") == "text"
):
output_text += item.get("text", "")
elif hasattr(item, "text"):
output_text += getattr(item, "text", "")
if output_text:
output = output_text
else:
try:
output = json.dumps(result_content)
except (TypeError, ValueError):
output = str(result_content)
elif isinstance(result_content, str):
output = result_content
else:
try:
output = json.dumps(result_content)
except (TypeError, ValueError):
output = str(result_content)
responses.append(
StreamToolOutputAvailable(
toolCallId=tool_use_id,
toolName=tool_name,
output=output,
success=not is_error,
)
)
elif class_name == "ResultMessage":
# Final result
if msg_subtype == "success":
# End text block if still open
if self.has_started_text and not self.has_ended_text:
responses.append(StreamTextEnd(id=self.text_block_id))
self.has_ended_text = True
# Emit finish
responses.append(StreamFinish())
elif msg_subtype in ("error", "error_during_execution"):
error_msg = getattr(sdk_message, "error", "Unknown error")
responses.append(
StreamError(
errorText=str(error_msg),
code="sdk_error",
)
)
responses.append(StreamFinish())
elif class_name == "ErrorMessage":
# Error message
error_msg = getattr(sdk_message, "message", None) or getattr(
sdk_message, "error", "Unknown error"
)
responses.append(
StreamError(
errorText=str(error_msg),
code="sdk_error",
)
)
responses.append(StreamFinish())
else:
logger.debug(f"Unhandled SDK message type: {class_name}")
return responses
def create_heartbeat(self, tool_call_id: str | None = None) -> StreamHeartbeat:
"""Create a heartbeat response."""
return StreamHeartbeat(toolCallId=tool_call_id)
def create_usage(
self,
prompt_tokens: int,
completion_tokens: int,
) -> StreamUsage:
"""Create a usage statistics response."""
return StreamUsage(
promptTokens=prompt_tokens,
completionTokens=completion_tokens,
totalTokens=prompt_tokens + completion_tokens,
)
async def adapt_sdk_stream(
sdk_stream: AsyncGenerator[Any, None],
message_id: str | None = None,
task_id: str | None = None,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Adapt a Claude Agent SDK stream to Vercel AI SDK format.
Args:
sdk_stream: The async generator from the Claude Agent SDK.
message_id: Optional message ID for the response.
task_id: Optional task ID for reconnection support.
Yields:
StreamBaseResponse objects in Vercel AI SDK format.
"""
adapter = SDKResponseAdapter(message_id=message_id)
if task_id:
adapter.set_task_id(task_id)
# Emit start immediately
yield StreamStart(messageId=adapter.message_id, taskId=task_id)
finished = False
try:
async for sdk_message in sdk_stream:
responses = adapter.convert_message(sdk_message)
for response in responses:
# Skip duplicate start messages
if isinstance(response, StreamStart):
continue
if isinstance(response, StreamFinish):
finished = True
yield response
except Exception as e:
logger.error(f"Error in SDK stream: {e}", exc_info=True)
yield StreamError(
errorText="An error occurred. Please try again.",
code="stream_error",
)
yield StreamFinish()
return
# Ensure terminal StreamFinish if SDK stream ended without one
if not finished:
yield StreamFinish()

View File

@@ -0,0 +1,283 @@
"""Security hooks for Claude Agent SDK integration.
This module provides security hooks that validate tool calls before execution,
ensuring multi-user isolation and preventing unauthorized operations.
"""
import logging
import re
from typing import Any, cast
from backend.api.features.chat.sdk.tool_adapter import MCP_TOOL_PREFIX
logger = logging.getLogger(__name__)
# Tools that are blocked entirely (CLI/system access)
BLOCKED_TOOLS = {
"Bash",
"bash",
"shell",
"exec",
"terminal",
"command",
"Read", # Block raw file read - use workspace tools instead
"Write", # Block raw file write - use workspace tools instead
"Edit", # Block raw file edit - use workspace tools instead
"Glob", # Block raw file glob - use workspace tools instead
"Grep", # Block raw file grep - use workspace tools instead
}
# Dangerous patterns in tool inputs
DANGEROUS_PATTERNS = [
r"sudo",
r"rm\s+-rf",
r"dd\s+if=",
r"/etc/passwd",
r"/etc/shadow",
r"chmod\s+777",
r"curl\s+.*\|.*sh",
r"wget\s+.*\|.*sh",
r"eval\s*\(",
r"exec\s*\(",
r"__import__",
r"os\.system",
r"subprocess",
]
def _validate_tool_access(tool_name: str, tool_input: dict[str, Any]) -> dict[str, Any]:
"""Validate that a tool call is allowed.
Returns:
Empty dict to allow, or dict with hookSpecificOutput to deny
"""
# Block forbidden tools
if tool_name in BLOCKED_TOOLS:
logger.warning(f"Blocked tool access attempt: {tool_name}")
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": (
f"Tool '{tool_name}' is not available. "
"Use the CoPilot-specific tools instead."
),
}
}
# Check for dangerous patterns in tool input
input_str = str(tool_input)
for pattern in DANGEROUS_PATTERNS:
if re.search(pattern, input_str, re.IGNORECASE):
logger.warning(
f"Blocked dangerous pattern in tool input: {pattern} in {tool_name}"
)
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": "Input contains blocked pattern",
}
}
return {}
def _validate_user_isolation(
tool_name: str, tool_input: dict[str, Any], user_id: str | None
) -> dict[str, Any]:
"""Validate that tool calls respect user isolation."""
# For workspace file tools, ensure path doesn't escape
if "workspace" in tool_name.lower():
path = tool_input.get("path", "") or tool_input.get("file_path", "")
if path:
# Check for path traversal
if ".." in path or path.startswith("/"):
logger.warning(
f"Blocked path traversal attempt: {path} by user {user_id}"
)
return {
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": "Path traversal not allowed",
}
}
return {}
def create_security_hooks(user_id: str | None) -> dict[str, Any]:
"""Create the security hooks configuration for Claude Agent SDK.
Includes security validation and observability hooks:
- PreToolUse: Security validation before tool execution
- PostToolUse: Log successful tool executions
- PostToolUseFailure: Log and handle failed tool executions
- PreCompact: Log context compaction events (SDK handles compaction automatically)
Args:
user_id: Current user ID for isolation validation
Returns:
Hooks configuration dict for ClaudeAgentOptions
"""
try:
from claude_agent_sdk import HookMatcher
from claude_agent_sdk.types import HookContext, HookInput, SyncHookJSONOutput
async def pre_tool_use_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Combined pre-tool-use validation hook."""
_ = context # unused but required by signature
tool_name = cast(str, input_data.get("tool_name", ""))
tool_input = cast(dict[str, Any], input_data.get("tool_input", {}))
# Strip MCP prefix for consistent validation
clean_name = tool_name.removeprefix(MCP_TOOL_PREFIX)
# Validate basic tool access
result = _validate_tool_access(clean_name, tool_input)
if result:
return cast(SyncHookJSONOutput, result)
# Validate user isolation
result = _validate_user_isolation(clean_name, tool_input, user_id)
if result:
return cast(SyncHookJSONOutput, result)
logger.debug(f"[SDK] Tool start: {tool_name}, user={user_id}")
return cast(SyncHookJSONOutput, {})
async def post_tool_use_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Log successful tool executions for observability."""
_ = context
tool_name = cast(str, input_data.get("tool_name", ""))
logger.debug(f"[SDK] Tool success: {tool_name}, tool_use_id={tool_use_id}")
return cast(SyncHookJSONOutput, {})
async def post_tool_failure_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Log failed tool executions for debugging."""
_ = context
tool_name = cast(str, input_data.get("tool_name", ""))
error = input_data.get("error", "Unknown error")
logger.warning(
f"[SDK] Tool failed: {tool_name}, error={error}, "
f"user={user_id}, tool_use_id={tool_use_id}"
)
return cast(SyncHookJSONOutput, {})
async def pre_compact_hook(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Log when SDK triggers context compaction.
The SDK automatically compacts conversation history when it grows too large.
This hook provides visibility into when compaction happens.
"""
_ = context, tool_use_id
trigger = input_data.get("trigger", "auto")
logger.info(
f"[SDK] Context compaction triggered: {trigger}, user={user_id}"
)
return cast(SyncHookJSONOutput, {})
return {
"PreToolUse": [HookMatcher(matcher="*", hooks=[pre_tool_use_hook])],
"PostToolUse": [HookMatcher(matcher="*", hooks=[post_tool_use_hook])],
"PostToolUseFailure": [
HookMatcher(matcher="*", hooks=[post_tool_failure_hook])
],
"PreCompact": [HookMatcher(matcher="*", hooks=[pre_compact_hook])],
}
except ImportError:
# Fallback for when SDK isn't available - return empty hooks
return {}
def create_strict_security_hooks(
user_id: str | None,
allowed_tools: list[str] | None = None,
) -> dict[str, Any]:
"""Create strict security hooks that only allow specific tools.
Args:
user_id: Current user ID
allowed_tools: List of allowed tool names (defaults to CoPilot tools)
Returns:
Hooks configuration dict
"""
try:
from claude_agent_sdk import HookMatcher
from claude_agent_sdk.types import HookContext, HookInput, SyncHookJSONOutput
from .tool_adapter import RAW_TOOL_NAMES
tools_list = allowed_tools if allowed_tools is not None else RAW_TOOL_NAMES
allowed_set = set(tools_list)
async def strict_pre_tool_use(
input_data: HookInput,
tool_use_id: str | None,
context: HookContext,
) -> SyncHookJSONOutput:
"""Strict validation that only allows whitelisted tools."""
_ = context # unused but required by signature
tool_name = cast(str, input_data.get("tool_name", ""))
tool_input = cast(dict[str, Any], input_data.get("tool_input", {}))
# Remove MCP prefix if present
clean_name = tool_name.removeprefix(MCP_TOOL_PREFIX)
if clean_name not in allowed_set:
logger.warning(f"Blocked non-whitelisted tool: {tool_name}")
return cast(
SyncHookJSONOutput,
{
"hookSpecificOutput": {
"hookEventName": "PreToolUse",
"permissionDecision": "deny",
"permissionDecisionReason": (
f"Tool '{tool_name}' is not in the allowed list"
),
}
},
)
# Run standard validations using clean_name for consistent checks
result = _validate_tool_access(clean_name, tool_input)
if result:
return cast(SyncHookJSONOutput, result)
result = _validate_user_isolation(clean_name, tool_input, user_id)
if result:
return cast(SyncHookJSONOutput, result)
logger.debug(
f"[SDK Audit] Tool call: tool={tool_name}, "
f"user={user_id}, tool_use_id={tool_use_id}"
)
return cast(SyncHookJSONOutput, {})
return {
"PreToolUse": [
HookMatcher(matcher="*", hooks=[strict_pre_tool_use]),
],
}
except ImportError:
return {}

View File

@@ -0,0 +1,465 @@
"""Claude Agent SDK service layer for CoPilot chat completions."""
import asyncio
import json
import logging
import uuid
from collections.abc import AsyncGenerator
from typing import Any
import openai
from backend.data.understanding import (
format_understanding_for_prompt,
get_business_understanding,
)
from backend.util.exceptions import NotFoundError
from ..config import ChatConfig
from ..model import (
ChatMessage,
ChatSession,
get_chat_session,
update_session_title,
upsert_chat_session,
)
from ..response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamStart,
StreamTextDelta,
StreamToolInputAvailable,
StreamToolOutputAvailable,
)
from ..tracking import track_user_message
from .anthropic_fallback import stream_with_anthropic
from .response_adapter import SDKResponseAdapter
from .security_hooks import create_security_hooks
from .tool_adapter import (
COPILOT_TOOL_NAMES,
create_copilot_mcp_server,
set_execution_context,
)
logger = logging.getLogger(__name__)
config = ChatConfig()
# Set to hold background tasks to prevent garbage collection
_background_tasks: set[asyncio.Task[Any]] = set()
DEFAULT_SYSTEM_PROMPT = """You are **Otto**, an AI Co-Pilot for AutoGPT and a Forward-Deployed Automation Engineer serving small business owners. Your mission is to help users automate business tasks with AI by delivering tangible value through working automations—not through documentation or lengthy explanations.
Here is everything you know about the current user from previous interactions:
<users_information>
{users_information}
</users_information>
## YOUR CORE MANDATE
You are action-oriented. Your success is measured by:
- **Value Delivery**: Does the user think "wow, that was amazing" or "what was the point"?
- **Demonstrable Proof**: Show working automations, not descriptions of what's possible
- **Time Saved**: Focus on tangible efficiency gains
- **Quality Output**: Deliver results that meet or exceed expectations
## YOUR WORKFLOW
Adapt flexibly to the conversation context. Not every interaction requires all stages:
1. **Explore & Understand**: Learn about the user's business, tasks, and goals. Use `add_understanding` to capture important context that will improve future conversations.
2. **Assess Automation Potential**: Help the user understand whether and how AI can automate their task.
3. **Prepare for AI**: Provide brief, actionable guidance on prerequisites (data, access, etc.).
4. **Discover or Create Agents**:
- **Always check the user's library first** with `find_library_agent` (these may be customized to their needs)
- Search the marketplace with `find_agent` for pre-built automations
- Find reusable components with `find_block`
- Create custom solutions with `create_agent` if nothing suitable exists
- Modify existing library agents with `edit_agent`
5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`.
6. **Show Results**: Display outputs using `agent_output`.
## BEHAVIORAL GUIDELINES
**Be Concise:**
- Target 2-5 short lines maximum
- Make every word count—no repetition or filler
- Use lightweight structure for scannability (bullets, numbered lists, short prompts)
- Avoid jargon (blocks, slugs, cron) unless the user asks
**Be Proactive:**
- Suggest next steps before being asked
- Anticipate needs based on conversation context and user information
- Look for opportunities to expand scope when relevant
- Reveal capabilities through action, not explanation
**Use Tools Effectively:**
- Select the right tool for each task
- **Always check `find_library_agent` before searching the marketplace**
- Use `add_understanding` to capture valuable business context
- When tool calls fail, try alternative approaches
## CRITICAL REMINDER
You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation."""
async def _build_system_prompt(
user_id: str | None, has_conversation_history: bool = False
) -> tuple[str, Any]:
"""Build the system prompt with user's business understanding context.
Args:
user_id: The user ID to fetch understanding for.
has_conversation_history: Whether there's existing conversation history.
If True, we don't tell the model to greet/introduce (since they're
already in a conversation).
"""
understanding = None
if user_id:
try:
understanding = await get_business_understanding(user_id)
except Exception as e:
logger.warning(f"Failed to fetch business understanding: {e}")
if understanding:
context = format_understanding_for_prompt(understanding)
elif has_conversation_history:
# Don't tell model to greet if there's conversation history
context = "No prior understanding saved yet. Continue the existing conversation naturally."
else:
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
return DEFAULT_SYSTEM_PROMPT.replace("{users_information}", context), understanding
def _format_conversation_history(session: ChatSession) -> str:
"""Format conversation history as a prompt context.
Passes full history to the SDK — the SDK handles context compaction
automatically when the context window approaches its limit.
"""
if not session.messages:
return ""
# Get all messages except the last user message (which will be the prompt)
messages = session.messages[:-1] if session.messages else []
if not messages:
return ""
history_parts = ["<conversation_history>"]
for msg in messages:
if msg.role == "user":
history_parts.append(f"User: {msg.content or ''}")
elif msg.role == "assistant":
# Only include text content, skip tool call metadata
# (tool calls are noise for history context)
if msg.content:
history_parts.append(f"Assistant: {msg.content}")
# Skip tool result messages — they're not useful for conversation context
history_parts.append("</conversation_history>")
history_parts.append("")
history_parts.append(
"Continue this conversation. Respond to the user's latest message:"
)
history_parts.append("")
return "\n".join(history_parts)
async def _generate_session_title(
message: str,
user_id: str | None = None,
session_id: str | None = None,
) -> str | None:
"""Generate a concise title for a chat session."""
from backend.util.settings import Settings
settings = Settings()
try:
# Build extra_body for OpenRouter tracing
extra_body: dict[str, Any] = {
"posthogProperties": {"environment": settings.config.app_env.value},
}
if user_id:
extra_body["user"] = user_id[:128]
extra_body["posthogDistinctId"] = user_id
if session_id:
extra_body["session_id"] = session_id[:128]
client = openai.AsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
response = await client.chat.completions.create(
model=config.title_model,
messages=[
{
"role": "system",
"content": "Generate a very short title (3-6 words) for a chat conversation based on the user's first message. Return ONLY the title, no quotes or punctuation.",
},
{"role": "user", "content": message[:500]},
],
max_tokens=20,
extra_body=extra_body,
)
title = response.choices[0].message.content
if title:
title = title.strip().strip("\"'")
return title[:47] + "..." if len(title) > 50 else title
return None
except Exception as e:
logger.warning(f"Failed to generate session title: {e}")
return None
async def stream_chat_completion_sdk(
session_id: str,
message: str | None = None,
tool_call_response: str | None = None, # noqa: ARG001
is_user_message: bool = True,
user_id: str | None = None,
retry_count: int = 0, # noqa: ARG001
session: ChatSession | None = None,
context: dict[str, str] | None = None, # noqa: ARG001
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Stream chat completion using Claude Agent SDK.
Drop-in replacement for stream_chat_completion with improved reliability.
"""
if session is None:
session = await get_chat_session(session_id, user_id)
if not session:
raise NotFoundError(
f"Session {session_id} not found. Please create a new session first."
)
if message:
session.messages.append(
ChatMessage(
role="user" if is_user_message else "assistant", content=message
)
)
if is_user_message:
track_user_message(
user_id=user_id, session_id=session_id, message_length=len(message)
)
session = await upsert_chat_session(session)
# Generate title for new sessions (first user message)
if is_user_message and not session.title:
user_messages = [m for m in session.messages if m.role == "user"]
if len(user_messages) == 1:
first_message = user_messages[0].content or message or ""
if first_message:
task = asyncio.create_task(
_update_title_async(session_id, first_message, user_id)
)
# Store reference to prevent garbage collection
_background_tasks.add(task)
task.add_done_callback(_background_tasks.discard)
# Check if there's conversation history (more than just the current message)
has_history = len(session.messages) > 1
system_prompt, _ = await _build_system_prompt(
user_id, has_conversation_history=has_history
)
set_execution_context(user_id, session, None)
message_id = str(uuid.uuid4())
text_block_id = str(uuid.uuid4())
task_id = str(uuid.uuid4())
yield StreamStart(messageId=message_id, taskId=task_id)
# Track whether the stream completed normally via ResultMessage
stream_completed = False
try:
try:
from claude_agent_sdk import ClaudeAgentOptions, ClaudeSDKClient
# Create MCP server with CoPilot tools
mcp_server = create_copilot_mcp_server()
options = ClaudeAgentOptions(
system_prompt=system_prompt,
mcp_servers={"copilot": mcp_server}, # type: ignore[arg-type]
allowed_tools=COPILOT_TOOL_NAMES,
hooks=create_security_hooks(user_id), # type: ignore[arg-type]
continue_conversation=True, # Enable conversation continuation
)
adapter = SDKResponseAdapter(message_id=message_id)
adapter.set_task_id(task_id)
async with ClaudeSDKClient(options=options) as client:
# Build prompt with conversation history for context
# The SDK doesn't support replaying full conversation history,
# so we include it as context in the prompt
current_message = message or ""
if not current_message and session.messages:
last_user = [m for m in session.messages if m.role == "user"]
if last_user:
current_message = last_user[-1].content or ""
# Include conversation history if there are prior messages
if len(session.messages) > 1:
history_context = _format_conversation_history(session)
prompt = f"{history_context}{current_message}"
else:
prompt = current_message
logger.info(
f"[SDK] Prompt built: {len(prompt)} chars, "
f"{len(session.messages)} messages in session"
)
# Guard against empty prompts
if not prompt.strip():
yield StreamError(
errorText="Message cannot be empty.",
code="empty_prompt",
)
yield StreamFinish()
return
await client.query(prompt, session_id=session_id)
# Track assistant response to save to session
# We may need multiple assistant messages if text comes after tool results
assistant_response = ChatMessage(role="assistant", content="")
accumulated_tool_calls: list[dict[str, Any]] = []
has_appended_assistant = False
has_tool_results = False # Track if we've received tool results
# Receive messages from the SDK
async for sdk_msg in client.receive_messages():
for response in adapter.convert_message(sdk_msg):
if isinstance(response, StreamStart):
continue
yield response
# Accumulate text deltas into assistant response
if isinstance(response, StreamTextDelta):
delta = response.delta or ""
# After tool results, create new assistant message for post-tool text
if has_tool_results and has_appended_assistant:
assistant_response = ChatMessage(
role="assistant", content=delta
)
accumulated_tool_calls = [] # Reset for new message
session.messages.append(assistant_response)
has_tool_results = False
else:
assistant_response.content = (
assistant_response.content or ""
) + delta
if not has_appended_assistant:
session.messages.append(assistant_response)
has_appended_assistant = True
# Track tool calls on the assistant message
elif isinstance(response, StreamToolInputAvailable):
accumulated_tool_calls.append(
{
"id": response.toolCallId,
"type": "function",
"function": {
"name": response.toolName,
"arguments": json.dumps(response.input or {}),
},
}
)
# Update assistant message with tool calls
assistant_response.tool_calls = accumulated_tool_calls
# Append assistant message if not already (tool-only response)
if not has_appended_assistant:
session.messages.append(assistant_response)
has_appended_assistant = True
elif isinstance(response, StreamToolOutputAvailable):
session.messages.append(
ChatMessage(
role="tool",
content=(
response.output
if isinstance(response.output, str)
else str(response.output)
),
tool_call_id=response.toolCallId,
)
)
has_tool_results = True
elif isinstance(response, StreamFinish):
stream_completed = True
# Break out of the message loop if we received finish signal
if stream_completed:
break
# Ensure assistant response is saved even if no text deltas
# (e.g., only tool calls were made)
if (
assistant_response.content or assistant_response.tool_calls
) and not has_appended_assistant:
session.messages.append(assistant_response)
except ImportError:
logger.warning(
"[SDK] claude-agent-sdk not available, using Anthropic fallback"
)
async for response in stream_with_anthropic(
session, system_prompt, text_block_id
):
if isinstance(response, StreamFinish):
stream_completed = True
yield response
# Save the session with accumulated messages
await upsert_chat_session(session)
logger.debug(
f"[SDK] Session {session_id} saved with {len(session.messages)} messages"
)
# Yield StreamFinish to signal completion to the caller (routes.py)
# Only if one hasn't already been yielded by the stream
if not stream_completed:
yield StreamFinish()
except Exception as e:
logger.error(f"[SDK] Error: {e}", exc_info=True)
# Save session even on error to preserve any partial response
try:
await upsert_chat_session(session)
except Exception as save_err:
logger.error(f"[SDK] Failed to save session on error: {save_err}")
# Sanitize error message to avoid exposing internal details
yield StreamError(
errorText="An error occurred. Please try again.",
code="sdk_error",
)
yield StreamFinish()
async def _update_title_async(
session_id: str, message: str, user_id: str | None = None
) -> None:
"""Background task to update session title."""
try:
title = await _generate_session_title(
message, user_id=user_id, session_id=session_id
)
if title:
await update_session_title(session_id, title)
logger.debug(f"[SDK] Generated title for {session_id}: {title}")
except Exception as e:
logger.warning(f"[SDK] Failed to update session title: {e}")

View File

@@ -0,0 +1,221 @@
"""Tool adapter for wrapping existing CoPilot tools as Claude Agent SDK MCP tools.
This module provides the adapter layer that converts existing BaseTool implementations
into in-process MCP tools that can be used with the Claude Agent SDK.
"""
import json
import logging
import uuid
from contextvars import ContextVar
from typing import Any
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools import TOOL_REGISTRY
from backend.api.features.chat.tools.base import BaseTool
logger = logging.getLogger(__name__)
# MCP server naming - the SDK prefixes tool names as "mcp__{server_name}__{tool}"
MCP_SERVER_NAME = "copilot"
MCP_TOOL_PREFIX = f"mcp__{MCP_SERVER_NAME}__"
# Context variables to pass user/session info to tool execution
_current_user_id: ContextVar[str | None] = ContextVar("current_user_id", default=None)
_current_session: ContextVar[ChatSession | None] = ContextVar(
"current_session", default=None
)
_current_tool_call_id: ContextVar[str | None] = ContextVar(
"current_tool_call_id", default=None
)
def set_execution_context(
user_id: str | None,
session: ChatSession,
tool_call_id: str | None = None,
) -> None:
"""Set the execution context for tool calls.
This must be called before streaming begins to ensure tools have access
to user_id and session information.
"""
_current_user_id.set(user_id)
_current_session.set(session)
_current_tool_call_id.set(tool_call_id)
def get_execution_context() -> tuple[str | None, ChatSession | None, str | None]:
"""Get the current execution context."""
return (
_current_user_id.get(),
_current_session.get(),
_current_tool_call_id.get(),
)
def create_tool_handler(base_tool: BaseTool):
"""Create an async handler function for a BaseTool.
This wraps the existing BaseTool._execute method to be compatible
with the Claude Agent SDK MCP tool format.
"""
async def tool_handler(args: dict[str, Any]) -> dict[str, Any]:
"""Execute the wrapped tool and return MCP-formatted response."""
user_id, session, tool_call_id = get_execution_context()
if session is None:
return {
"content": [
{
"type": "text",
"text": json.dumps(
{
"error": "No session context available",
"type": "error",
}
),
}
],
"isError": True,
}
try:
# Call the existing tool's execute method
# Generate unique tool_call_id per invocation for proper correlation
effective_id = tool_call_id or f"sdk-{uuid.uuid4().hex[:12]}"
result = await base_tool.execute(
user_id=user_id,
session=session,
tool_call_id=effective_id,
**args,
)
# The result is a StreamToolOutputAvailable, extract the output
return {
"content": [
{
"type": "text",
"text": (
result.output
if isinstance(result.output, str)
else json.dumps(result.output)
),
}
],
"isError": not result.success,
}
except Exception as e:
logger.error(f"Error executing tool {base_tool.name}: {e}", exc_info=True)
return {
"content": [
{
"type": "text",
"text": json.dumps(
{
"error": str(e),
"type": "error",
"message": f"Failed to execute {base_tool.name}",
}
),
}
],
"isError": True,
}
return tool_handler
def _build_input_schema(base_tool: BaseTool) -> dict[str, Any]:
"""Build a JSON Schema input schema for a tool."""
return {
"type": "object",
"properties": base_tool.parameters.get("properties", {}),
"required": base_tool.parameters.get("required", []),
}
def get_tool_definitions() -> list[dict[str, Any]]:
"""Get all tool definitions in MCP format.
Returns a list of tool definitions that can be used with
create_sdk_mcp_server or as raw tool definitions.
"""
tool_definitions = []
for tool_name, base_tool in TOOL_REGISTRY.items():
tool_def = {
"name": tool_name,
"description": base_tool.description,
"inputSchema": _build_input_schema(base_tool),
}
tool_definitions.append(tool_def)
return tool_definitions
def get_tool_handlers() -> dict[str, Any]:
"""Get all tool handlers mapped by name.
Returns a dictionary mapping tool names to their handler functions.
"""
handlers = {}
for tool_name, base_tool in TOOL_REGISTRY.items():
handlers[tool_name] = create_tool_handler(base_tool)
return handlers
# Create the MCP server configuration
def create_copilot_mcp_server():
"""Create an in-process MCP server configuration for CoPilot tools.
This can be passed to ClaudeAgentOptions.mcp_servers.
Note: The actual SDK MCP server creation depends on the claude-agent-sdk
package being available. This function returns the configuration that
can be used with the SDK.
"""
try:
from claude_agent_sdk import create_sdk_mcp_server, tool
# Create decorated tool functions
sdk_tools = []
for tool_name, base_tool in TOOL_REGISTRY.items():
# Get the handler
handler = create_tool_handler(base_tool)
# Create the decorated tool
# The @tool decorator expects (name, description, schema)
# Pass full JSON schema with type, properties, and required
decorated = tool(
tool_name,
base_tool.description,
_build_input_schema(base_tool),
)(handler)
sdk_tools.append(decorated)
# Create the MCP server
server = create_sdk_mcp_server(
name=MCP_SERVER_NAME,
version="1.0.0",
tools=sdk_tools,
)
return server
except ImportError:
# Let ImportError propagate so service.py handles the fallback
raise
# List of tool names for allowed_tools configuration
COPILOT_TOOL_NAMES = [f"{MCP_TOOL_PREFIX}{name}" for name in TOOL_REGISTRY.keys()]
# Also export the raw tool names for flexibility
RAW_TOOL_NAMES = list(TOOL_REGISTRY.keys())

View File

@@ -368,24 +368,47 @@ async def stream_chat_completion(
Raises: Raises:
NotFoundError: If session_id is invalid NotFoundError: If session_id is invalid
ValueError: If max_context_messages is exceeded
""" """
completion_start = time.monotonic()
# Build log metadata for structured logging
log_meta = {"component": "ChatService", "session_id": session_id}
if user_id:
log_meta["user_id"] = user_id
logger.info( logger.info(
f"Streaming chat completion for session {session_id} for message {message} and user id {user_id}. Message is user message: {is_user_message}" f"[TIMING] stream_chat_completion STARTED, session={session_id}, user={user_id}, "
f"message_len={len(message) if message else 0}, is_user={is_user_message}",
extra={
"json_fields": {
**log_meta,
"message_len": len(message) if message else 0,
"is_user_message": is_user_message,
}
},
) )
# Only fetch from Redis if session not provided (initial call) # Only fetch from Redis if session not provided (initial call)
if session is None: if session is None:
fetch_start = time.monotonic()
session = await get_chat_session(session_id, user_id) session = await get_chat_session(session_id, user_id)
fetch_time = (time.monotonic() - fetch_start) * 1000
logger.info( logger.info(
f"Fetched session from Redis: {session.session_id if session else 'None'}, " f"[TIMING] get_chat_session took {fetch_time:.1f}ms, "
f"message_count={len(session.messages) if session else 0}" f"n_messages={len(session.messages) if session else 0}",
extra={
"json_fields": {
**log_meta,
"duration_ms": fetch_time,
"n_messages": len(session.messages) if session else 0,
}
},
) )
else: else:
logger.info( logger.info(
f"Using provided session object: {session.session_id}, " f"[TIMING] Using provided session, messages={len(session.messages)}",
f"message_count={len(session.messages)}" extra={"json_fields": {**log_meta, "n_messages": len(session.messages)}},
) )
if not session: if not session:
@@ -406,17 +429,25 @@ async def stream_chat_completion(
# Track user message in PostHog # Track user message in PostHog
if is_user_message: if is_user_message:
posthog_start = time.monotonic()
track_user_message( track_user_message(
user_id=user_id, user_id=user_id,
session_id=session_id, session_id=session_id,
message_length=len(message), message_length=len(message),
) )
posthog_time = (time.monotonic() - posthog_start) * 1000
logger.info( logger.info(
f"Upserting session: {session.session_id} with user id {session.user_id}, " f"[TIMING] track_user_message took {posthog_time:.1f}ms",
f"message_count={len(session.messages)}" extra={"json_fields": {**log_meta, "duration_ms": posthog_time}},
) )
upsert_start = time.monotonic()
session = await upsert_chat_session(session) session = await upsert_chat_session(session)
upsert_time = (time.monotonic() - upsert_start) * 1000
logger.info(
f"[TIMING] upsert_chat_session took {upsert_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": upsert_time}},
)
assert session, "Session not found" assert session, "Session not found"
# Generate title for new sessions on first user message (non-blocking) # Generate title for new sessions on first user message (non-blocking)
@@ -454,7 +485,13 @@ async def stream_chat_completion(
asyncio.create_task(_update_title()) asyncio.create_task(_update_title())
# Build system prompt with business understanding # Build system prompt with business understanding
prompt_start = time.monotonic()
system_prompt, understanding = await _build_system_prompt(user_id) system_prompt, understanding = await _build_system_prompt(user_id)
prompt_time = (time.monotonic() - prompt_start) * 1000
logger.info(
f"[TIMING] _build_system_prompt took {prompt_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": prompt_time}},
)
# Initialize variables for streaming # Initialize variables for streaming
assistant_response = ChatMessage( assistant_response = ChatMessage(
@@ -483,9 +520,18 @@ async def stream_chat_completion(
text_block_id = str(uuid_module.uuid4()) text_block_id = str(uuid_module.uuid4())
# Yield message start # Yield message start
setup_time = (time.monotonic() - completion_start) * 1000
logger.info(
f"[TIMING] Setup complete, yielding StreamStart at {setup_time:.1f}ms",
extra={"json_fields": {**log_meta, "setup_time_ms": setup_time}},
)
yield StreamStart(messageId=message_id) yield StreamStart(messageId=message_id)
try: try:
logger.info(
"[TIMING] Calling _stream_chat_chunks",
extra={"json_fields": log_meta},
)
async for chunk in _stream_chat_chunks( async for chunk in _stream_chat_chunks(
session=session, session=session,
tools=tools, tools=tools,
@@ -893,9 +939,21 @@ async def _stream_chat_chunks(
SSE formatted JSON response objects SSE formatted JSON response objects
""" """
import time as time_module
stream_chunks_start = time_module.perf_counter()
model = config.model model = config.model
logger.info("Starting pure chat stream") # Build log metadata for structured logging
log_meta = {"component": "ChatService", "session_id": session.session_id}
if session.user_id:
log_meta["user_id"] = session.user_id
logger.info(
f"[TIMING] _stream_chat_chunks STARTED, session={session.session_id}, "
f"user={session.user_id}, n_messages={len(session.messages)}",
extra={"json_fields": {**log_meta, "n_messages": len(session.messages)}},
)
messages = session.to_openai_messages() messages = session.to_openai_messages()
if system_prompt: if system_prompt:
@@ -906,12 +964,18 @@ async def _stream_chat_chunks(
messages = [system_message] + messages messages = [system_message] + messages
# Apply context window management # Apply context window management
context_start = time_module.perf_counter()
context_result = await _manage_context_window( context_result = await _manage_context_window(
messages=messages, messages=messages,
model=model, model=model,
api_key=config.api_key, api_key=config.api_key,
base_url=config.base_url, base_url=config.base_url,
) )
context_time = (time_module.perf_counter() - context_start) * 1000
logger.info(
f"[TIMING] _manage_context_window took {context_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": context_time}},
)
if context_result.error: if context_result.error:
if "System prompt dropped" in context_result.error: if "System prompt dropped" in context_result.error:
@@ -946,9 +1010,19 @@ async def _stream_chat_chunks(
while retry_count <= MAX_RETRIES: while retry_count <= MAX_RETRIES:
try: try:
elapsed = (time_module.perf_counter() - stream_chunks_start) * 1000
retry_info = (
f" (retry {retry_count}/{MAX_RETRIES})" if retry_count > 0 else ""
)
logger.info( logger.info(
f"Creating OpenAI chat completion stream..." f"[TIMING] Creating OpenAI stream at {elapsed:.1f}ms{retry_info}",
f"{f' (retry {retry_count}/{MAX_RETRIES})' if retry_count > 0 else ''}" extra={
"json_fields": {
**log_meta,
"elapsed_ms": elapsed,
"retry_count": retry_count,
}
},
) )
# Build extra_body for OpenRouter tracing and PostHog analytics # Build extra_body for OpenRouter tracing and PostHog analytics
@@ -965,6 +1039,7 @@ async def _stream_chat_chunks(
:128 :128
] # OpenRouter limit ] # OpenRouter limit
api_call_start = time_module.perf_counter()
stream = await client.chat.completions.create( stream = await client.chat.completions.create(
model=model, model=model,
messages=cast(list[ChatCompletionMessageParam], messages), messages=cast(list[ChatCompletionMessageParam], messages),
@@ -974,6 +1049,11 @@ async def _stream_chat_chunks(
stream_options=ChatCompletionStreamOptionsParam(include_usage=True), stream_options=ChatCompletionStreamOptionsParam(include_usage=True),
extra_body=extra_body, extra_body=extra_body,
) )
api_init_time = (time_module.perf_counter() - api_call_start) * 1000
logger.info(
f"[TIMING] OpenAI stream object returned in {api_init_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": api_init_time}},
)
# Variables to accumulate tool calls # Variables to accumulate tool calls
tool_calls: list[dict[str, Any]] = [] tool_calls: list[dict[str, Any]] = []
@@ -984,10 +1064,13 @@ async def _stream_chat_chunks(
# Track if we've started the text block # Track if we've started the text block
text_started = False text_started = False
first_content_chunk = True
chunk_count = 0
# Process the stream # Process the stream
chunk: ChatCompletionChunk chunk: ChatCompletionChunk
async for chunk in stream: async for chunk in stream:
chunk_count += 1
if chunk.usage: if chunk.usage:
yield StreamUsage( yield StreamUsage(
promptTokens=chunk.usage.prompt_tokens, promptTokens=chunk.usage.prompt_tokens,
@@ -1010,6 +1093,23 @@ async def _stream_chat_chunks(
if not text_started and text_block_id: if not text_started and text_block_id:
yield StreamTextStart(id=text_block_id) yield StreamTextStart(id=text_block_id)
text_started = True text_started = True
# Log timing for first content chunk
if first_content_chunk:
first_content_chunk = False
ttfc = (
time_module.perf_counter() - api_call_start
) * 1000
logger.info(
f"[TIMING] FIRST CONTENT CHUNK at {ttfc:.1f}ms "
f"(since API call), n_chunks={chunk_count}",
extra={
"json_fields": {
**log_meta,
"time_to_first_chunk_ms": ttfc,
"n_chunks": chunk_count,
}
},
)
# Stream the text delta # Stream the text delta
text_response = StreamTextDelta( text_response = StreamTextDelta(
id=text_block_id or "", id=text_block_id or "",
@@ -1066,7 +1166,21 @@ async def _stream_chat_chunks(
toolName=tool_calls[idx]["function"]["name"], toolName=tool_calls[idx]["function"]["name"],
) )
emitted_start_for_idx.add(idx) emitted_start_for_idx.add(idx)
logger.info(f"Stream complete. Finish reason: {finish_reason}") stream_duration = time_module.perf_counter() - api_call_start
logger.info(
f"[TIMING] OpenAI stream COMPLETE, finish_reason={finish_reason}, "
f"duration={stream_duration:.2f}s, "
f"n_chunks={chunk_count}, n_tool_calls={len(tool_calls)}",
extra={
"json_fields": {
**log_meta,
"stream_duration_ms": stream_duration * 1000,
"finish_reason": finish_reason,
"n_chunks": chunk_count,
"n_tool_calls": len(tool_calls),
}
},
)
# Yield all accumulated tool calls after the stream is complete # Yield all accumulated tool calls after the stream is complete
# This ensures all tool call arguments have been fully received # This ensures all tool call arguments have been fully received
@@ -1086,11 +1200,16 @@ async def _stream_chat_chunks(
# Re-raise to trigger retry logic in the parent function # Re-raise to trigger retry logic in the parent function
raise raise
total_time = (time_module.perf_counter() - stream_chunks_start) * 1000
logger.info(
f"[TIMING] _stream_chat_chunks COMPLETED in {total_time/1000:.1f}s; "
f"session={session.session_id}, user={session.user_id}",
extra={"json_fields": {**log_meta, "total_time_ms": total_time}},
)
yield StreamFinish() yield StreamFinish()
return return
except Exception as e: except Exception as e:
last_error = e last_error = e
if _is_retryable_error(e) and retry_count < MAX_RETRIES: if _is_retryable_error(e) and retry_count < MAX_RETRIES:
retry_count += 1 retry_count += 1
# Calculate delay with exponential backoff # Calculate delay with exponential backoff
@@ -1106,26 +1225,12 @@ async def _stream_chat_chunks(
continue # Retry the stream continue # Retry the stream
else: else:
# Non-retryable error or max retries exceeded # Non-retryable error or max retries exceeded
_log_api_error( logger.error(
error=e, f"Error in stream (not retrying): {e!s}",
session_id=session.session_id if session else None, exc_info=True,
message_count=len(messages) if messages else None,
model=model,
retry_count=retry_count,
) )
error_code = None error_code = None
error_text = str(e) error_text = str(e)
error_details = _extract_api_error_details(e)
if error_details.get("response_body"):
body = error_details["response_body"]
if isinstance(body, dict):
err = body.get("error")
if isinstance(err, dict) and err.get("message"):
error_text = err["message"]
elif body.get("message"):
error_text = body["message"]
if _is_region_blocked_error(e): if _is_region_blocked_error(e):
error_code = "MODEL_NOT_AVAILABLE_REGION" error_code = "MODEL_NOT_AVAILABLE_REGION"
error_text = ( error_text = (
@@ -1142,12 +1247,9 @@ async def _stream_chat_chunks(
# If we exit the retry loop without returning, it means we exhausted retries # If we exit the retry loop without returning, it means we exhausted retries
if last_error: if last_error:
_log_api_error( logger.error(
error=last_error, f"Max retries ({MAX_RETRIES}) exceeded. Last error: {last_error!s}",
session_id=session.session_id if session else None, exc_info=True,
message_count=len(messages) if messages else None,
model=model,
retry_count=MAX_RETRIES,
) )
yield StreamError(errorText=f"Max retries exceeded: {last_error!s}") yield StreamError(errorText=f"Max retries exceeded: {last_error!s}")
yield StreamFinish() yield StreamFinish()
@@ -1719,7 +1821,6 @@ async def _generate_llm_continuation(
break # Success, exit retry loop break # Success, exit retry loop
except Exception as e: except Exception as e:
last_error = e last_error = e
if _is_retryable_error(e) and retry_count < MAX_RETRIES: if _is_retryable_error(e) and retry_count < MAX_RETRIES:
retry_count += 1 retry_count += 1
delay = min( delay = min(
@@ -1733,23 +1834,17 @@ async def _generate_llm_continuation(
await asyncio.sleep(delay) await asyncio.sleep(delay)
continue continue
else: else:
# Non-retryable error - log details and exit gracefully # Non-retryable error - log and exit gracefully
_log_api_error( logger.error(
error=e, f"Non-retryable error in LLM continuation: {e!s}",
session_id=session_id, exc_info=True,
message_count=len(messages) if messages else None,
model=config.model,
retry_count=retry_count,
) )
return return
if last_error: if last_error:
_log_api_error( logger.error(
error=last_error, f"Max retries ({MAX_RETRIES}) exceeded for LLM continuation. "
session_id=session_id, f"Last error: {last_error!s}"
message_count=len(messages) if messages else None,
model=config.model,
retry_count=MAX_RETRIES,
) )
return return
@@ -1789,89 +1884,6 @@ async def _generate_llm_continuation(
logger.error(f"Failed to generate LLM continuation: {e}", exc_info=True) logger.error(f"Failed to generate LLM continuation: {e}", exc_info=True)
def _log_api_error(
error: Exception,
session_id: str | None = None,
message_count: int | None = None,
model: str | None = None,
retry_count: int = 0,
) -> None:
"""Log detailed API error information for debugging."""
details = _extract_api_error_details(error)
details["session_id"] = session_id
details["message_count"] = message_count
details["model"] = model
details["retry_count"] = retry_count
if isinstance(error, RateLimitError):
logger.warning(f"Rate limit error: {details}")
elif isinstance(error, APIConnectionError):
logger.warning(f"API connection error: {details}")
elif isinstance(error, APIStatusError) and error.status_code >= 500:
logger.error(f"API server error (5xx): {details}")
else:
logger.error(f"API error: {details}")
def _extract_api_error_details(error: Exception) -> dict[str, Any]:
"""Extract detailed information from OpenAI/OpenRouter API errors."""
error_msg = str(error)
details: dict[str, Any] = {
"error_type": type(error).__name__,
"error_message": error_msg[:500] + "..." if len(error_msg) > 500 else error_msg,
}
if hasattr(error, "code"):
details["code"] = getattr(error, "code", None)
if hasattr(error, "param"):
details["param"] = getattr(error, "param", None)
if isinstance(error, APIStatusError):
details["status_code"] = error.status_code
details["request_id"] = getattr(error, "request_id", None)
if hasattr(error, "body") and error.body:
details["response_body"] = _sanitize_error_body(error.body)
if hasattr(error, "response") and error.response:
headers = error.response.headers
details["openrouter_provider"] = headers.get("x-openrouter-provider")
details["openrouter_model"] = headers.get("x-openrouter-model")
details["retry_after"] = headers.get("retry-after")
details["rate_limit_remaining"] = headers.get("x-ratelimit-remaining")
return details
def _sanitize_error_body(
body: Any, max_length: int = 2000
) -> dict[str, Any] | str | None:
"""Extract only safe fields from error response body to avoid logging sensitive data."""
if not isinstance(body, dict):
# Non-dict bodies (e.g., HTML error pages) - return truncated string
if body is not None:
body_str = str(body)
if len(body_str) > max_length:
return body_str[:max_length] + "...[truncated]"
return body_str
return None
safe_fields = ("message", "type", "code", "param", "error")
sanitized: dict[str, Any] = {}
for field in safe_fields:
if field in body:
value = body[field]
if field == "error" and isinstance(value, dict):
sanitized[field] = _sanitize_error_body(value, max_length)
elif isinstance(value, str) and len(value) > max_length:
sanitized[field] = value[:max_length] + "...[truncated]"
else:
sanitized[field] = value
return sanitized if sanitized else None
async def _generate_llm_continuation_with_streaming( async def _generate_llm_continuation_with_streaming(
session_id: str, session_id: str,
user_id: str | None, user_id: str | None,

View File

@@ -104,6 +104,24 @@ async def create_task(
Returns: Returns:
The created ActiveTask instance (metadata only) The created ActiveTask instance (metadata only)
""" """
import time
start_time = time.perf_counter()
# Build log metadata for structured logging
log_meta = {
"component": "StreamRegistry",
"task_id": task_id,
"session_id": session_id,
}
if user_id:
log_meta["user_id"] = user_id
logger.info(
f"[TIMING] create_task STARTED, task={task_id}, session={session_id}, user={user_id}",
extra={"json_fields": log_meta},
)
task = ActiveTask( task = ActiveTask(
task_id=task_id, task_id=task_id,
session_id=session_id, session_id=session_id,
@@ -114,10 +132,18 @@ async def create_task(
) )
# Store metadata in Redis # Store metadata in Redis
redis_start = time.perf_counter()
redis = await get_redis_async() redis = await get_redis_async()
redis_time = (time.perf_counter() - redis_start) * 1000
logger.info(
f"[TIMING] get_redis_async took {redis_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": redis_time}},
)
meta_key = _get_task_meta_key(task_id) meta_key = _get_task_meta_key(task_id)
op_key = _get_operation_mapping_key(operation_id) op_key = _get_operation_mapping_key(operation_id)
hset_start = time.perf_counter()
await redis.hset( # type: ignore[misc] await redis.hset( # type: ignore[misc]
meta_key, meta_key,
mapping={ mapping={
@@ -131,12 +157,22 @@ async def create_task(
"created_at": task.created_at.isoformat(), "created_at": task.created_at.isoformat(),
}, },
) )
hset_time = (time.perf_counter() - hset_start) * 1000
logger.info(
f"[TIMING] redis.hset took {hset_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": hset_time}},
)
await redis.expire(meta_key, config.stream_ttl) await redis.expire(meta_key, config.stream_ttl)
# Create operation_id -> task_id mapping for webhook lookups # Create operation_id -> task_id mapping for webhook lookups
await redis.set(op_key, task_id, ex=config.stream_ttl) await redis.set(op_key, task_id, ex=config.stream_ttl)
logger.debug(f"Created task {task_id} for session {session_id}") total_time = (time.perf_counter() - start_time) * 1000
logger.info(
f"[TIMING] create_task COMPLETED in {total_time:.1f}ms; task={task_id}, session={session_id}",
extra={"json_fields": {**log_meta, "total_time_ms": total_time}},
)
return task return task
@@ -156,26 +192,60 @@ async def publish_chunk(
Returns: Returns:
The Redis Stream message ID The Redis Stream message ID
""" """
import time
start_time = time.perf_counter()
chunk_type = type(chunk).__name__
chunk_json = chunk.model_dump_json() chunk_json = chunk.model_dump_json()
message_id = "0-0" message_id = "0-0"
# Build log metadata
log_meta = {
"component": "StreamRegistry",
"task_id": task_id,
"chunk_type": chunk_type,
}
try: try:
redis = await get_redis_async() redis = await get_redis_async()
stream_key = _get_task_stream_key(task_id) stream_key = _get_task_stream_key(task_id)
# Write to Redis Stream for persistence and real-time delivery # Write to Redis Stream for persistence and real-time delivery
xadd_start = time.perf_counter()
raw_id = await redis.xadd( raw_id = await redis.xadd(
stream_key, stream_key,
{"data": chunk_json}, {"data": chunk_json},
maxlen=config.stream_max_length, maxlen=config.stream_max_length,
) )
xadd_time = (time.perf_counter() - xadd_start) * 1000
message_id = raw_id if isinstance(raw_id, str) else raw_id.decode() message_id = raw_id if isinstance(raw_id, str) else raw_id.decode()
# Set TTL on stream to match task metadata TTL # Set TTL on stream to match task metadata TTL
await redis.expire(stream_key, config.stream_ttl) await redis.expire(stream_key, config.stream_ttl)
total_time = (time.perf_counter() - start_time) * 1000
# Only log timing for significant chunks or slow operations
if (
chunk_type
in ("StreamStart", "StreamFinish", "StreamTextStart", "StreamTextEnd")
or total_time > 50
):
logger.info(
f"[TIMING] publish_chunk {chunk_type} in {total_time:.1f}ms (xadd={xadd_time:.1f}ms)",
extra={
"json_fields": {
**log_meta,
"total_time_ms": total_time,
"xadd_time_ms": xadd_time,
"message_id": message_id,
}
},
)
except Exception as e: except Exception as e:
elapsed = (time.perf_counter() - start_time) * 1000
logger.error( logger.error(
f"Failed to publish chunk for task {task_id}: {e}", f"[TIMING] Failed to publish chunk {chunk_type} after {elapsed:.1f}ms: {e}",
extra={"json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)}},
exc_info=True, exc_info=True,
) )
@@ -200,24 +270,61 @@ async def subscribe_to_task(
An asyncio Queue that will receive stream chunks, or None if task not found An asyncio Queue that will receive stream chunks, or None if task not found
or user doesn't have access or user doesn't have access
""" """
import time
start_time = time.perf_counter()
# Build log metadata
log_meta = {"component": "StreamRegistry", "task_id": task_id}
if user_id:
log_meta["user_id"] = user_id
logger.info(
f"[TIMING] subscribe_to_task STARTED, task={task_id}, user={user_id}, last_msg={last_message_id}",
extra={"json_fields": {**log_meta, "last_message_id": last_message_id}},
)
redis_start = time.perf_counter()
redis = await get_redis_async() redis = await get_redis_async()
meta_key = _get_task_meta_key(task_id) meta_key = _get_task_meta_key(task_id)
meta: dict[Any, Any] = await redis.hgetall(meta_key) # type: ignore[misc] meta: dict[Any, Any] = await redis.hgetall(meta_key) # type: ignore[misc]
hgetall_time = (time.perf_counter() - redis_start) * 1000
logger.info(
f"[TIMING] Redis hgetall took {hgetall_time:.1f}ms",
extra={"json_fields": {**log_meta, "duration_ms": hgetall_time}},
)
if not meta: if not meta:
logger.debug(f"Task {task_id} not found in Redis") elapsed = (time.perf_counter() - start_time) * 1000
logger.info(
f"[TIMING] Task not found in Redis after {elapsed:.1f}ms",
extra={
"json_fields": {
**log_meta,
"elapsed_ms": elapsed,
"reason": "task_not_found",
}
},
)
return None return None
# Note: Redis client uses decode_responses=True, so keys are strings # Note: Redis client uses decode_responses=True, so keys are strings
task_status = meta.get("status", "") task_status = meta.get("status", "")
task_user_id = meta.get("user_id", "") or None task_user_id = meta.get("user_id", "") or None
log_meta["session_id"] = meta.get("session_id", "")
# Validate ownership - if task has an owner, requester must match # Validate ownership - if task has an owner, requester must match
if task_user_id: if task_user_id:
if user_id != task_user_id: if user_id != task_user_id:
logger.warning( logger.warning(
f"User {user_id} denied access to task {task_id} " f"[TIMING] Access denied: user {user_id} tried to access task owned by {task_user_id}",
f"owned by {task_user_id}" extra={
"json_fields": {
**log_meta,
"task_owner": task_user_id,
"reason": "access_denied",
}
},
) )
return None return None
@@ -225,7 +332,19 @@ async def subscribe_to_task(
stream_key = _get_task_stream_key(task_id) stream_key = _get_task_stream_key(task_id)
# Step 1: Replay messages from Redis Stream # Step 1: Replay messages from Redis Stream
xread_start = time.perf_counter()
messages = await redis.xread({stream_key: last_message_id}, block=0, count=1000) messages = await redis.xread({stream_key: last_message_id}, block=0, count=1000)
xread_time = (time.perf_counter() - xread_start) * 1000
logger.info(
f"[TIMING] Redis xread (replay) took {xread_time:.1f}ms, status={task_status}",
extra={
"json_fields": {
**log_meta,
"duration_ms": xread_time,
"task_status": task_status,
}
},
)
replayed_count = 0 replayed_count = 0
replay_last_id = last_message_id replay_last_id = last_message_id
@@ -244,19 +363,48 @@ async def subscribe_to_task(
except Exception as e: except Exception as e:
logger.warning(f"Failed to replay message: {e}") logger.warning(f"Failed to replay message: {e}")
logger.debug(f"Task {task_id}: replayed {replayed_count} messages") logger.info(
f"[TIMING] Replayed {replayed_count} messages, last_id={replay_last_id}",
extra={
"json_fields": {
**log_meta,
"n_messages_replayed": replayed_count,
"replay_last_id": replay_last_id,
}
},
)
# Step 2: If task is still running, start stream listener for live updates # Step 2: If task is still running, start stream listener for live updates
if task_status == "running": if task_status == "running":
logger.info(
"[TIMING] Task still running, starting _stream_listener",
extra={"json_fields": {**log_meta, "task_status": task_status}},
)
listener_task = asyncio.create_task( listener_task = asyncio.create_task(
_stream_listener(task_id, subscriber_queue, replay_last_id) _stream_listener(task_id, subscriber_queue, replay_last_id, log_meta)
) )
# Track listener task for cleanup on unsubscribe # Track listener task for cleanup on unsubscribe
_listener_tasks[id(subscriber_queue)] = (task_id, listener_task) _listener_tasks[id(subscriber_queue)] = (task_id, listener_task)
else: else:
# Task is completed/failed - add finish marker # Task is completed/failed - add finish marker
logger.info(
f"[TIMING] Task already {task_status}, adding StreamFinish",
extra={"json_fields": {**log_meta, "task_status": task_status}},
)
await subscriber_queue.put(StreamFinish()) await subscriber_queue.put(StreamFinish())
total_time = (time.perf_counter() - start_time) * 1000
logger.info(
f"[TIMING] subscribe_to_task COMPLETED in {total_time:.1f}ms; task={task_id}, "
f"n_messages_replayed={replayed_count}",
extra={
"json_fields": {
**log_meta,
"total_time_ms": total_time,
"n_messages_replayed": replayed_count,
}
},
)
return subscriber_queue return subscriber_queue
@@ -264,6 +412,7 @@ async def _stream_listener(
task_id: str, task_id: str,
subscriber_queue: asyncio.Queue[StreamBaseResponse], subscriber_queue: asyncio.Queue[StreamBaseResponse],
last_replayed_id: str, last_replayed_id: str,
log_meta: dict | None = None,
) -> None: ) -> None:
"""Listen to Redis Stream for new messages using blocking XREAD. """Listen to Redis Stream for new messages using blocking XREAD.
@@ -274,10 +423,27 @@ async def _stream_listener(
task_id: Task ID to listen for task_id: Task ID to listen for
subscriber_queue: Queue to deliver messages to subscriber_queue: Queue to deliver messages to
last_replayed_id: Last message ID from replay (continue from here) last_replayed_id: Last message ID from replay (continue from here)
log_meta: Structured logging metadata
""" """
import time
start_time = time.perf_counter()
# Use provided log_meta or build minimal one
if log_meta is None:
log_meta = {"component": "StreamRegistry", "task_id": task_id}
logger.info(
f"[TIMING] _stream_listener STARTED, task={task_id}, last_id={last_replayed_id}",
extra={"json_fields": {**log_meta, "last_replayed_id": last_replayed_id}},
)
queue_id = id(subscriber_queue) queue_id = id(subscriber_queue)
# Track the last successfully delivered message ID for recovery hints # Track the last successfully delivered message ID for recovery hints
last_delivered_id = last_replayed_id last_delivered_id = last_replayed_id
messages_delivered = 0
first_message_time = None
xread_count = 0
try: try:
redis = await get_redis_async() redis = await get_redis_async()
@@ -287,9 +453,39 @@ async def _stream_listener(
while True: while True:
# Block for up to 30 seconds waiting for new messages # Block for up to 30 seconds waiting for new messages
# This allows periodic checking if task is still running # This allows periodic checking if task is still running
xread_start = time.perf_counter()
xread_count += 1
messages = await redis.xread( messages = await redis.xread(
{stream_key: current_id}, block=30000, count=100 {stream_key: current_id}, block=30000, count=100
) )
xread_time = (time.perf_counter() - xread_start) * 1000
if messages:
msg_count = sum(len(msgs) for _, msgs in messages)
logger.info(
f"[TIMING] xread #{xread_count} returned {msg_count} messages in {xread_time:.1f}ms",
extra={
"json_fields": {
**log_meta,
"xread_count": xread_count,
"n_messages": msg_count,
"duration_ms": xread_time,
}
},
)
elif xread_time > 1000:
# Only log timeouts (30s blocking)
logger.info(
f"[TIMING] xread #{xread_count} timeout after {xread_time:.1f}ms",
extra={
"json_fields": {
**log_meta,
"xread_count": xread_count,
"duration_ms": xread_time,
"reason": "timeout",
}
},
)
if not messages: if not messages:
# Timeout - check if task is still running # Timeout - check if task is still running
@@ -326,10 +522,30 @@ async def _stream_listener(
) )
# Update last delivered ID on successful delivery # Update last delivered ID on successful delivery
last_delivered_id = current_id last_delivered_id = current_id
messages_delivered += 1
if first_message_time is None:
first_message_time = time.perf_counter()
elapsed = (first_message_time - start_time) * 1000
logger.info(
f"[TIMING] FIRST live message at {elapsed:.1f}ms, type={type(chunk).__name__}",
extra={
"json_fields": {
**log_meta,
"elapsed_ms": elapsed,
"chunk_type": type(chunk).__name__,
}
},
)
except asyncio.TimeoutError: except asyncio.TimeoutError:
logger.warning( logger.warning(
f"Subscriber queue full for task {task_id}, " f"[TIMING] Subscriber queue full, delivery timed out after {QUEUE_PUT_TIMEOUT}s",
f"message delivery timed out after {QUEUE_PUT_TIMEOUT}s" extra={
"json_fields": {
**log_meta,
"timeout_s": QUEUE_PUT_TIMEOUT,
"reason": "queue_full",
}
},
) )
# Send overflow error with recovery info # Send overflow error with recovery info
try: try:
@@ -351,15 +567,44 @@ async def _stream_listener(
# Stop listening on finish # Stop listening on finish
if isinstance(chunk, StreamFinish): if isinstance(chunk, StreamFinish):
total_time = (time.perf_counter() - start_time) * 1000
logger.info(
f"[TIMING] StreamFinish received in {total_time/1000:.1f}s; delivered={messages_delivered}",
extra={
"json_fields": {
**log_meta,
"total_time_ms": total_time,
"messages_delivered": messages_delivered,
}
},
)
return return
except Exception as e: except Exception as e:
logger.warning(f"Error processing stream message: {e}") logger.warning(
f"Error processing stream message: {e}",
extra={"json_fields": {**log_meta, "error": str(e)}},
)
except asyncio.CancelledError: except asyncio.CancelledError:
logger.debug(f"Stream listener cancelled for task {task_id}") elapsed = (time.perf_counter() - start_time) * 1000
logger.info(
f"[TIMING] _stream_listener CANCELLED after {elapsed:.1f}ms, delivered={messages_delivered}",
extra={
"json_fields": {
**log_meta,
"elapsed_ms": elapsed,
"messages_delivered": messages_delivered,
"reason": "cancelled",
}
},
)
raise # Re-raise to propagate cancellation raise # Re-raise to propagate cancellation
except Exception as e: except Exception as e:
logger.error(f"Stream listener error for task {task_id}: {e}") elapsed = (time.perf_counter() - start_time) * 1000
logger.error(
f"[TIMING] _stream_listener ERROR after {elapsed:.1f}ms: {e}",
extra={"json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)}},
)
# On error, send finish to unblock subscriber # On error, send finish to unblock subscriber
try: try:
await asyncio.wait_for( await asyncio.wait_for(
@@ -368,10 +613,24 @@ async def _stream_listener(
) )
except (asyncio.TimeoutError, asyncio.QueueFull): except (asyncio.TimeoutError, asyncio.QueueFull):
logger.warning( logger.warning(
f"Could not deliver finish event for task {task_id} after error" "Could not deliver finish event after error",
extra={"json_fields": log_meta},
) )
finally: finally:
# Clean up listener task mapping on exit # Clean up listener task mapping on exit
total_time = (time.perf_counter() - start_time) * 1000
logger.info(
f"[TIMING] _stream_listener FINISHED in {total_time/1000:.1f}s; task={task_id}, "
f"delivered={messages_delivered}, xread_count={xread_count}",
extra={
"json_fields": {
**log_meta,
"total_time_ms": total_time,
"messages_delivered": messages_delivered,
"xread_count": xread_count,
}
},
)
_listener_tasks.pop(queue_id, None) _listener_tasks.pop(queue_id, None)
@@ -555,6 +814,28 @@ async def get_active_task_for_session(
if task_user_id and user_id != task_user_id: if task_user_id and user_id != task_user_id:
continue continue
# Auto-expire stale tasks that exceeded stream_timeout
created_at_str = meta.get("created_at", "")
if created_at_str:
try:
created_at = datetime.fromisoformat(created_at_str)
age_seconds = (
datetime.now(timezone.utc) - created_at
).total_seconds()
if age_seconds > config.stream_timeout:
logger.warning(
f"[TASK_LOOKUP] Auto-expiring stale task {task_id[:8]}... "
f"(age={age_seconds:.0f}s > timeout={config.stream_timeout}s)"
)
await mark_task_completed(task_id, "failed")
continue
except (ValueError, TypeError):
pass
logger.info(
f"[TASK_LOOKUP] Found running task {task_id[:8]}... for session {session_id[:8]}..."
)
# Get the last message ID from Redis Stream # Get the last message ID from Redis Stream
stream_key = _get_task_stream_key(task_id) stream_key = _get_task_stream_key(task_id)
last_id = "0-0" last_id = "0-0"

View File

@@ -13,10 +13,32 @@ from backend.api.features.chat.tools.models import (
NoResultsResponse, NoResultsResponse,
) )
from backend.api.features.store.hybrid_search import unified_hybrid_search from backend.api.features.store.hybrid_search import unified_hybrid_search
from backend.data.block import get_block from backend.data.block import BlockType, get_block
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_TARGET_RESULTS = 10
# Over-fetch to compensate for post-hoc filtering of graph-only blocks.
# 40 is 2x current removed; speed of query 10 vs 40 is minimial
_OVERFETCH_PAGE_SIZE = 40
# Block types that only work within graphs and cannot run standalone in CoPilot.
COPILOT_EXCLUDED_BLOCK_TYPES = {
BlockType.INPUT, # Graph interface definition - data enters via chat, not graph inputs
BlockType.OUTPUT, # Graph interface definition - data exits via chat, not graph outputs
BlockType.WEBHOOK, # Wait for external events - would hang forever in CoPilot
BlockType.WEBHOOK_MANUAL, # Same as WEBHOOK
BlockType.NOTE, # Visual annotation only - no runtime behavior
BlockType.HUMAN_IN_THE_LOOP, # Pauses for human approval - CoPilot IS human-in-the-loop
BlockType.AGENT, # AgentExecutorBlock requires execution_context - use run_agent tool
}
# Specific block IDs excluded from CoPilot (STANDARD type but still require graph context)
COPILOT_EXCLUDED_BLOCK_IDS = {
# SmartDecisionMakerBlock - dynamically discovers downstream blocks via graph topology
"3b191d9f-356f-482d-8238-ba04b6d18381",
}
class FindBlockTool(BaseTool): class FindBlockTool(BaseTool):
"""Tool for searching available blocks.""" """Tool for searching available blocks."""
@@ -88,7 +110,7 @@ class FindBlockTool(BaseTool):
query=query, query=query,
content_types=[ContentType.BLOCK], content_types=[ContentType.BLOCK],
page=1, page=1,
page_size=10, page_size=_OVERFETCH_PAGE_SIZE,
) )
if not results: if not results:
@@ -108,18 +130,35 @@ class FindBlockTool(BaseTool):
block = get_block(block_id) block = get_block(block_id)
# Skip disabled blocks # Skip disabled blocks
if block and not block.disabled: if not block or block.disabled:
continue
# Skip blocks excluded from CoPilot (graph-only blocks)
if (
block.block_type in COPILOT_EXCLUDED_BLOCK_TYPES
or block.id in COPILOT_EXCLUDED_BLOCK_IDS
):
continue
# Get input/output schemas # Get input/output schemas
input_schema = {} input_schema = {}
output_schema = {} output_schema = {}
try: try:
input_schema = block.input_schema.jsonschema() input_schema = block.input_schema.jsonschema()
except Exception: except Exception as e:
pass logger.debug(
"Failed to generate input schema for block %s: %s",
block_id,
e,
)
try: try:
output_schema = block.output_schema.jsonschema() output_schema = block.output_schema.jsonschema()
except Exception: except Exception as e:
pass logger.debug(
"Failed to generate output schema for block %s: %s",
block_id,
e,
)
# Get categories from block instance # Get categories from block instance
categories = [] categories = []
@@ -163,6 +202,19 @@ class FindBlockTool(BaseTool):
) )
) )
if len(blocks) >= _TARGET_RESULTS:
break
if blocks and len(blocks) < _TARGET_RESULTS:
logger.debug(
"find_block returned %d/%d results for query '%s' "
"(filtered %d excluded/disabled blocks)",
len(blocks),
_TARGET_RESULTS,
query,
len(results) - len(blocks),
)
if not blocks: if not blocks:
return NoResultsResponse( return NoResultsResponse(
message=f"No blocks found for '{query}'", message=f"No blocks found for '{query}'",

View File

@@ -0,0 +1,139 @@
"""Tests for block filtering in FindBlockTool."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from backend.api.features.chat.tools.find_block import (
COPILOT_EXCLUDED_BLOCK_IDS,
COPILOT_EXCLUDED_BLOCK_TYPES,
FindBlockTool,
)
from backend.api.features.chat.tools.models import BlockListResponse
from backend.data.block import BlockType
from ._test_data import make_session
_TEST_USER_ID = "test-user-find-block"
def make_mock_block(
block_id: str, name: str, block_type: BlockType, disabled: bool = False
):
"""Create a mock block for testing."""
mock = MagicMock()
mock.id = block_id
mock.name = name
mock.description = f"{name} description"
mock.block_type = block_type
mock.disabled = disabled
mock.input_schema = MagicMock()
mock.input_schema.jsonschema.return_value = {"properties": {}, "required": []}
mock.input_schema.get_credentials_fields.return_value = {}
mock.output_schema = MagicMock()
mock.output_schema.jsonschema.return_value = {}
mock.categories = []
return mock
class TestFindBlockFiltering:
"""Tests for block filtering in FindBlockTool."""
def test_excluded_block_types_contains_expected_types(self):
"""Verify COPILOT_EXCLUDED_BLOCK_TYPES contains all graph-only types."""
assert BlockType.INPUT in COPILOT_EXCLUDED_BLOCK_TYPES
assert BlockType.OUTPUT in COPILOT_EXCLUDED_BLOCK_TYPES
assert BlockType.WEBHOOK in COPILOT_EXCLUDED_BLOCK_TYPES
assert BlockType.WEBHOOK_MANUAL in COPILOT_EXCLUDED_BLOCK_TYPES
assert BlockType.NOTE in COPILOT_EXCLUDED_BLOCK_TYPES
assert BlockType.HUMAN_IN_THE_LOOP in COPILOT_EXCLUDED_BLOCK_TYPES
assert BlockType.AGENT in COPILOT_EXCLUDED_BLOCK_TYPES
def test_excluded_block_ids_contains_smart_decision_maker(self):
"""Verify SmartDecisionMakerBlock is in COPILOT_EXCLUDED_BLOCK_IDS."""
assert "3b191d9f-356f-482d-8238-ba04b6d18381" in COPILOT_EXCLUDED_BLOCK_IDS
@pytest.mark.asyncio(loop_scope="session")
async def test_excluded_block_type_filtered_from_results(self):
"""Verify blocks with excluded BlockTypes are filtered from search results."""
session = make_session(user_id=_TEST_USER_ID)
# Mock search returns an INPUT block (excluded) and a STANDARD block (included)
search_results = [
{"content_id": "input-block-id", "score": 0.9},
{"content_id": "standard-block-id", "score": 0.8},
]
input_block = make_mock_block("input-block-id", "Input Block", BlockType.INPUT)
standard_block = make_mock_block(
"standard-block-id", "HTTP Request", BlockType.STANDARD
)
def mock_get_block(block_id):
return {
"input-block-id": input_block,
"standard-block-id": standard_block,
}.get(block_id)
with patch(
"backend.api.features.chat.tools.find_block.unified_hybrid_search",
new_callable=AsyncMock,
return_value=(search_results, 2),
):
with patch(
"backend.api.features.chat.tools.find_block.get_block",
side_effect=mock_get_block,
):
tool = FindBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID, session=session, query="test"
)
# Should only return the standard block, not the INPUT block
assert isinstance(response, BlockListResponse)
assert len(response.blocks) == 1
assert response.blocks[0].id == "standard-block-id"
@pytest.mark.asyncio(loop_scope="session")
async def test_excluded_block_id_filtered_from_results(self):
"""Verify SmartDecisionMakerBlock is filtered from search results."""
session = make_session(user_id=_TEST_USER_ID)
smart_decision_id = "3b191d9f-356f-482d-8238-ba04b6d18381"
search_results = [
{"content_id": smart_decision_id, "score": 0.9},
{"content_id": "normal-block-id", "score": 0.8},
]
# SmartDecisionMakerBlock has STANDARD type but is excluded by ID
smart_block = make_mock_block(
smart_decision_id, "Smart Decision Maker", BlockType.STANDARD
)
normal_block = make_mock_block(
"normal-block-id", "Normal Block", BlockType.STANDARD
)
def mock_get_block(block_id):
return {
smart_decision_id: smart_block,
"normal-block-id": normal_block,
}.get(block_id)
with patch(
"backend.api.features.chat.tools.find_block.unified_hybrid_search",
new_callable=AsyncMock,
return_value=(search_results, 2),
):
with patch(
"backend.api.features.chat.tools.find_block.get_block",
side_effect=mock_get_block,
):
tool = FindBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID, session=session, query="decision"
)
# Should only return normal block, not SmartDecisionMakerBlock
assert isinstance(response, BlockListResponse)
assert len(response.blocks) == 1
assert response.blocks[0].id == "normal-block-id"

View File

@@ -0,0 +1,29 @@
"""Shared helpers for chat tools."""
from typing import Any
def get_inputs_from_schema(
input_schema: dict[str, Any],
exclude_fields: set[str] | None = None,
) -> list[dict[str, Any]]:
"""Extract input field info from JSON schema."""
if not isinstance(input_schema, dict):
return []
exclude = exclude_fields or set()
properties = input_schema.get("properties", {})
required = set(input_schema.get("required", []))
return [
{
"name": name,
"title": schema.get("title", name),
"type": schema.get("type", "string"),
"description": schema.get("description", ""),
"required": name in required,
"default": schema.get("default"),
}
for name, schema in properties.items()
if name not in exclude
]

View File

@@ -24,6 +24,7 @@ from backend.util.timezone_utils import (
) )
from .base import BaseTool from .base import BaseTool
from .helpers import get_inputs_from_schema
from .models import ( from .models import (
AgentDetails, AgentDetails,
AgentDetailsResponse, AgentDetailsResponse,
@@ -261,7 +262,7 @@ class RunAgentTool(BaseTool):
), ),
requirements={ requirements={
"credentials": requirements_creds_list, "credentials": requirements_creds_list,
"inputs": self._get_inputs_list(graph.input_schema), "inputs": get_inputs_from_schema(graph.input_schema),
"execution_modes": self._get_execution_modes(graph), "execution_modes": self._get_execution_modes(graph),
}, },
), ),
@@ -369,22 +370,6 @@ class RunAgentTool(BaseTool):
session_id=session_id, session_id=session_id,
) )
def _get_inputs_list(self, input_schema: dict[str, Any]) -> list[dict[str, Any]]:
"""Extract inputs list from schema."""
inputs_list = []
if isinstance(input_schema, dict) and "properties" in input_schema:
for field_name, field_schema in input_schema["properties"].items():
inputs_list.append(
{
"name": field_name,
"title": field_schema.get("title", field_name),
"type": field_schema.get("type", "string"),
"description": field_schema.get("description", ""),
"required": field_name in input_schema.get("required", []),
}
)
return inputs_list
def _get_execution_modes(self, graph: GraphModel) -> list[str]: def _get_execution_modes(self, graph: GraphModel) -> list[str]:
"""Get available execution modes for the graph.""" """Get available execution modes for the graph."""
trigger_info = graph.trigger_setup_info trigger_info = graph.trigger_setup_info
@@ -398,7 +383,7 @@ class RunAgentTool(BaseTool):
suffix: str, suffix: str,
) -> str: ) -> str:
"""Build a message describing available inputs for an agent.""" """Build a message describing available inputs for an agent."""
inputs_list = self._get_inputs_list(graph.input_schema) inputs_list = get_inputs_from_schema(graph.input_schema)
required_names = [i["name"] for i in inputs_list if i["required"]] required_names = [i["name"] for i in inputs_list if i["required"]]
optional_names = [i["name"] for i in inputs_list if not i["required"]] optional_names = [i["name"] for i in inputs_list if not i["required"]]

View File

@@ -8,14 +8,19 @@ from typing import Any
from pydantic_core import PydanticUndefined from pydantic_core import PydanticUndefined
from backend.api.features.chat.model import ChatSession from backend.api.features.chat.model import ChatSession
from backend.data.block import get_block from backend.api.features.chat.tools.find_block import (
COPILOT_EXCLUDED_BLOCK_IDS,
COPILOT_EXCLUDED_BLOCK_TYPES,
)
from backend.data.block import AnyBlockSchema, get_block
from backend.data.execution import ExecutionContext from backend.data.execution import ExecutionContext
from backend.data.model import CredentialsMetaInput from backend.data.model import CredentialsFieldInfo, CredentialsMetaInput
from backend.data.workspace import get_or_create_workspace from backend.data.workspace import get_or_create_workspace
from backend.integrations.creds_manager import IntegrationCredentialsManager from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.util.exceptions import BlockError from backend.util.exceptions import BlockError
from .base import BaseTool from .base import BaseTool
from .helpers import get_inputs_from_schema
from .models import ( from .models import (
BlockOutputResponse, BlockOutputResponse,
ErrorResponse, ErrorResponse,
@@ -24,7 +29,10 @@ from .models import (
ToolResponseBase, ToolResponseBase,
UserReadiness, UserReadiness,
) )
from .utils import build_missing_credentials_from_field_info from .utils import (
build_missing_credentials_from_field_info,
match_credentials_to_requirements,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -73,91 +81,6 @@ class RunBlockTool(BaseTool):
def requires_auth(self) -> bool: def requires_auth(self) -> bool:
return True return True
async def _check_block_credentials(
self,
user_id: str,
block: Any,
input_data: dict[str, Any] | None = None,
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
"""
Check if user has required credentials for a block.
Args:
user_id: User ID
block: Block to check credentials for
input_data: Input data for the block (used to determine provider via discriminator)
Returns:
tuple[matched_credentials, missing_credentials]
"""
matched_credentials: dict[str, CredentialsMetaInput] = {}
missing_credentials: list[CredentialsMetaInput] = []
input_data = input_data or {}
# Get credential field info from block's input schema
credentials_fields_info = block.input_schema.get_credentials_fields_info()
if not credentials_fields_info:
return matched_credentials, missing_credentials
# Get user's available credentials
creds_manager = IntegrationCredentialsManager()
available_creds = await creds_manager.store.get_all_creds(user_id)
for field_name, field_info in credentials_fields_info.items():
effective_field_info = field_info
if field_info.discriminator and field_info.discriminator_mapping:
# Get discriminator from input, falling back to schema default
discriminator_value = input_data.get(field_info.discriminator)
if discriminator_value is None:
field = block.input_schema.model_fields.get(
field_info.discriminator
)
if field and field.default is not PydanticUndefined:
discriminator_value = field.default
if (
discriminator_value
and discriminator_value in field_info.discriminator_mapping
):
effective_field_info = field_info.discriminate(discriminator_value)
logger.debug(
f"Discriminated provider for {field_name}: "
f"{discriminator_value} -> {effective_field_info.provider}"
)
matching_cred = next(
(
cred
for cred in available_creds
if cred.provider in effective_field_info.provider
and cred.type in effective_field_info.supported_types
),
None,
)
if matching_cred:
matched_credentials[field_name] = CredentialsMetaInput(
id=matching_cred.id,
provider=matching_cred.provider, # type: ignore
type=matching_cred.type,
title=matching_cred.title,
)
else:
# Create a placeholder for the missing credential
provider = next(iter(effective_field_info.provider), "unknown")
cred_type = next(iter(effective_field_info.supported_types), "api_key")
missing_credentials.append(
CredentialsMetaInput(
id=field_name,
provider=provider, # type: ignore
type=cred_type, # type: ignore
title=field_name.replace("_", " ").title(),
)
)
return matched_credentials, missing_credentials
async def _execute( async def _execute(
self, self,
user_id: str | None, user_id: str | None,
@@ -212,11 +135,24 @@ class RunBlockTool(BaseTool):
session_id=session_id, session_id=session_id,
) )
# Check if block is excluded from CoPilot (graph-only blocks)
if (
block.block_type in COPILOT_EXCLUDED_BLOCK_TYPES
or block.id in COPILOT_EXCLUDED_BLOCK_IDS
):
return ErrorResponse(
message=(
f"Block '{block.name}' cannot be run directly in CoPilot. "
"This block is designed for use within graphs only."
),
session_id=session_id,
)
logger.info(f"Executing block {block.name} ({block_id}) for user {user_id}") logger.info(f"Executing block {block.name} ({block_id}) for user {user_id}")
creds_manager = IntegrationCredentialsManager() creds_manager = IntegrationCredentialsManager()
matched_credentials, missing_credentials = await self._check_block_credentials( matched_credentials, missing_credentials = (
user_id, block, input_data await self._resolve_block_credentials(user_id, block, input_data)
) )
if missing_credentials: if missing_credentials:
@@ -345,29 +281,75 @@ class RunBlockTool(BaseTool):
session_id=session_id, session_id=session_id,
) )
def _get_inputs_list(self, block: Any) -> list[dict[str, Any]]: async def _resolve_block_credentials(
self,
user_id: str,
block: AnyBlockSchema,
input_data: dict[str, Any] | None = None,
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
"""
Resolve credentials for a block by matching user's available credentials.
Args:
user_id: User ID
block: Block to resolve credentials for
input_data: Input data for the block (used to determine provider via discriminator)
Returns:
tuple of (matched_credentials, missing_credentials) - matched credentials
are used for block execution, missing ones indicate setup requirements.
"""
input_data = input_data or {}
requirements = self._resolve_discriminated_credentials(block, input_data)
if not requirements:
return {}, []
return await match_credentials_to_requirements(user_id, requirements)
def _get_inputs_list(self, block: AnyBlockSchema) -> list[dict[str, Any]]:
"""Extract non-credential inputs from block schema.""" """Extract non-credential inputs from block schema."""
inputs_list = []
schema = block.input_schema.jsonschema() schema = block.input_schema.jsonschema()
properties = schema.get("properties", {})
required_fields = set(schema.get("required", []))
# Get credential field names to exclude
credentials_fields = set(block.input_schema.get_credentials_fields().keys()) credentials_fields = set(block.input_schema.get_credentials_fields().keys())
return get_inputs_from_schema(schema, exclude_fields=credentials_fields)
for field_name, field_schema in properties.items(): def _resolve_discriminated_credentials(
# Skip credential fields self,
if field_name in credentials_fields: block: AnyBlockSchema,
continue input_data: dict[str, Any],
) -> dict[str, CredentialsFieldInfo]:
"""Resolve credential requirements, applying discriminator logic where needed."""
credentials_fields_info = block.input_schema.get_credentials_fields_info()
if not credentials_fields_info:
return {}
inputs_list.append( resolved: dict[str, CredentialsFieldInfo] = {}
{
"name": field_name, for field_name, field_info in credentials_fields_info.items():
"title": field_schema.get("title", field_name), effective_field_info = field_info
"type": field_schema.get("type", "string"),
"description": field_schema.get("description", ""), if field_info.discriminator and field_info.discriminator_mapping:
"required": field_name in required_fields, discriminator_value = input_data.get(field_info.discriminator)
} if discriminator_value is None:
field = block.input_schema.model_fields.get(
field_info.discriminator
)
if field and field.default is not PydanticUndefined:
discriminator_value = field.default
if (
discriminator_value
and discriminator_value in field_info.discriminator_mapping
):
effective_field_info = field_info.discriminate(discriminator_value)
# For host-scoped credentials, add the discriminator value
# (e.g., URL) so _credential_is_for_host can match it
effective_field_info.discriminator_values.add(discriminator_value)
logger.debug(
f"Discriminated provider for {field_name}: "
f"{discriminator_value} -> {effective_field_info.provider}"
) )
return inputs_list resolved[field_name] = effective_field_info
return resolved

View File

@@ -0,0 +1,106 @@
"""Tests for block execution guards in RunBlockTool."""
from unittest.mock import MagicMock, patch
import pytest
from backend.api.features.chat.tools.models import ErrorResponse
from backend.api.features.chat.tools.run_block import RunBlockTool
from backend.data.block import BlockType
from ._test_data import make_session
_TEST_USER_ID = "test-user-run-block"
def make_mock_block(
block_id: str, name: str, block_type: BlockType, disabled: bool = False
):
"""Create a mock block for testing."""
mock = MagicMock()
mock.id = block_id
mock.name = name
mock.block_type = block_type
mock.disabled = disabled
mock.input_schema = MagicMock()
mock.input_schema.jsonschema.return_value = {"properties": {}, "required": []}
mock.input_schema.get_credentials_fields_info.return_value = []
return mock
class TestRunBlockFiltering:
"""Tests for block execution guards in RunBlockTool."""
@pytest.mark.asyncio(loop_scope="session")
async def test_excluded_block_type_returns_error(self):
"""Attempting to execute a block with excluded BlockType returns error."""
session = make_session(user_id=_TEST_USER_ID)
input_block = make_mock_block("input-block-id", "Input Block", BlockType.INPUT)
with patch(
"backend.api.features.chat.tools.run_block.get_block",
return_value=input_block,
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="input-block-id",
input_data={},
)
assert isinstance(response, ErrorResponse)
assert "cannot be run directly in CoPilot" in response.message
assert "designed for use within graphs only" in response.message
@pytest.mark.asyncio(loop_scope="session")
async def test_excluded_block_id_returns_error(self):
"""Attempting to execute SmartDecisionMakerBlock returns error."""
session = make_session(user_id=_TEST_USER_ID)
smart_decision_id = "3b191d9f-356f-482d-8238-ba04b6d18381"
smart_block = make_mock_block(
smart_decision_id, "Smart Decision Maker", BlockType.STANDARD
)
with patch(
"backend.api.features.chat.tools.run_block.get_block",
return_value=smart_block,
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id=smart_decision_id,
input_data={},
)
assert isinstance(response, ErrorResponse)
assert "cannot be run directly in CoPilot" in response.message
@pytest.mark.asyncio(loop_scope="session")
async def test_non_excluded_block_passes_guard(self):
"""Non-excluded blocks pass the filtering guard (may fail later for other reasons)."""
session = make_session(user_id=_TEST_USER_ID)
standard_block = make_mock_block(
"standard-id", "HTTP Request", BlockType.STANDARD
)
with patch(
"backend.api.features.chat.tools.run_block.get_block",
return_value=standard_block,
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="standard-id",
input_data={},
)
# Should NOT be an ErrorResponse about CoPilot exclusion
# (may be other errors like missing credentials, but not the exclusion guard)
if isinstance(response, ErrorResponse):
assert "cannot be run directly in CoPilot" not in response.message

View File

@@ -8,6 +8,7 @@ from backend.api.features.library import model as library_model
from backend.api.features.store import db as store_db from backend.api.features.store import db as store_db
from backend.data.graph import GraphModel from backend.data.graph import GraphModel
from backend.data.model import ( from backend.data.model import (
Credentials,
CredentialsFieldInfo, CredentialsFieldInfo,
CredentialsMetaInput, CredentialsMetaInput,
HostScopedCredentials, HostScopedCredentials,
@@ -223,6 +224,99 @@ async def get_or_create_library_agent(
return library_agents[0] return library_agents[0]
async def match_credentials_to_requirements(
user_id: str,
requirements: dict[str, CredentialsFieldInfo],
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
"""
Match user's credentials against a dictionary of credential requirements.
This is the core matching logic shared by both graph and block credential matching.
"""
matched: dict[str, CredentialsMetaInput] = {}
missing: list[CredentialsMetaInput] = []
if not requirements:
return matched, missing
available_creds = await get_user_credentials(user_id)
for field_name, field_info in requirements.items():
matching_cred = find_matching_credential(available_creds, field_info)
if matching_cred:
try:
matched[field_name] = create_credential_meta_from_match(matching_cred)
except Exception as e:
logger.error(
f"Failed to create CredentialsMetaInput for field '{field_name}': "
f"provider={matching_cred.provider}, type={matching_cred.type}, "
f"credential_id={matching_cred.id}",
exc_info=True,
)
provider = next(iter(field_info.provider), "unknown")
cred_type = next(iter(field_info.supported_types), "api_key")
missing.append(
CredentialsMetaInput(
id=field_name,
provider=provider, # type: ignore
type=cred_type, # type: ignore
title=f"{field_name} (validation failed: {e})",
)
)
else:
provider = next(iter(field_info.provider), "unknown")
cred_type = next(iter(field_info.supported_types), "api_key")
missing.append(
CredentialsMetaInput(
id=field_name,
provider=provider, # type: ignore
type=cred_type, # type: ignore
title=field_name.replace("_", " ").title(),
)
)
return matched, missing
async def get_user_credentials(user_id: str) -> list[Credentials]:
"""Get all available credentials for a user."""
creds_manager = IntegrationCredentialsManager()
return await creds_manager.store.get_all_creds(user_id)
def find_matching_credential(
available_creds: list[Credentials],
field_info: CredentialsFieldInfo,
) -> Credentials | None:
"""Find a credential that matches the required provider, type, scopes, and host."""
for cred in available_creds:
if cred.provider not in field_info.provider:
continue
if cred.type not in field_info.supported_types:
continue
if cred.type == "oauth2" and not _credential_has_required_scopes(
cred, field_info
):
continue
if cred.type == "host_scoped" and not _credential_is_for_host(cred, field_info):
continue
return cred
return None
def create_credential_meta_from_match(
matching_cred: Credentials,
) -> CredentialsMetaInput:
"""Create a CredentialsMetaInput from a matched credential."""
return CredentialsMetaInput(
id=matching_cred.id,
provider=matching_cred.provider, # type: ignore
type=matching_cred.type,
title=matching_cred.title,
)
async def match_user_credentials_to_graph( async def match_user_credentials_to_graph(
user_id: str, user_id: str,
graph: GraphModel, graph: GraphModel,
@@ -331,8 +425,6 @@ def _credential_has_required_scopes(
# If no scopes are required, any credential matches # If no scopes are required, any credential matches
if not requirements.required_scopes: if not requirements.required_scopes:
return True return True
# Check that credential scopes are a superset of required scopes
return set(credential.scopes).issuperset(requirements.required_scopes) return set(credential.scopes).issuperset(requirements.required_scopes)

View File

@@ -8,6 +8,7 @@ Includes BM25 reranking for improved lexical relevance.
import logging import logging
import re import re
import time
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any, Literal from typing import Any, Literal
@@ -362,7 +363,11 @@ async def unified_hybrid_search(
LIMIT {limit_param} OFFSET {offset_param} LIMIT {limit_param} OFFSET {offset_param}
""" """
try:
results = await query_raw_with_schema(sql_query, *params) results = await query_raw_with_schema(sql_query, *params)
except Exception as e:
await _log_vector_error_diagnostics(e)
raise
total = results[0]["total_count"] if results else 0 total = results[0]["total_count"] if results else 0
# Apply BM25 reranking # Apply BM25 reranking
@@ -686,7 +691,11 @@ async def hybrid_search(
LIMIT {limit_param} OFFSET {offset_param} LIMIT {limit_param} OFFSET {offset_param}
""" """
try:
results = await query_raw_with_schema(sql_query, *params) results = await query_raw_with_schema(sql_query, *params)
except Exception as e:
await _log_vector_error_diagnostics(e)
raise
total = results[0]["total_count"] if results else 0 total = results[0]["total_count"] if results else 0
@@ -718,6 +727,87 @@ async def hybrid_search_simple(
return await hybrid_search(query=query, page=page, page_size=page_size) return await hybrid_search(query=query, page=page, page_size=page_size)
# ============================================================================
# Diagnostics
# ============================================================================
# Rate limit: only log vector error diagnostics once per this interval
_VECTOR_DIAG_INTERVAL_SECONDS = 60
_last_vector_diag_time: float = 0
async def _log_vector_error_diagnostics(error: Exception) -> None:
"""Log diagnostic info when 'type vector does not exist' error occurs.
Note: Diagnostic queries use query_raw_with_schema which may run on a different
pooled connection than the one that failed. Session-level search_path can differ,
so these diagnostics show cluster-wide state, not necessarily the failed session.
Includes rate limiting to avoid log spam - only logs once per minute.
Caller should re-raise the error after calling this function.
"""
global _last_vector_diag_time
# Check if this is the vector type error
error_str = str(error).lower()
if not (
"type" in error_str and "vector" in error_str and "does not exist" in error_str
):
return
# Rate limit: only log once per interval
now = time.time()
if now - _last_vector_diag_time < _VECTOR_DIAG_INTERVAL_SECONDS:
return
_last_vector_diag_time = now
try:
diagnostics: dict[str, object] = {}
try:
search_path_result = await query_raw_with_schema("SHOW search_path")
diagnostics["search_path"] = search_path_result
except Exception as e:
diagnostics["search_path"] = f"Error: {e}"
try:
schema_result = await query_raw_with_schema("SELECT current_schema()")
diagnostics["current_schema"] = schema_result
except Exception as e:
diagnostics["current_schema"] = f"Error: {e}"
try:
user_result = await query_raw_with_schema(
"SELECT current_user, session_user, current_database()"
)
diagnostics["user_info"] = user_result
except Exception as e:
diagnostics["user_info"] = f"Error: {e}"
try:
# Check pgvector extension installation (cluster-wide, stable info)
ext_result = await query_raw_with_schema(
"SELECT extname, extversion, nspname as schema "
"FROM pg_extension e "
"JOIN pg_namespace n ON e.extnamespace = n.oid "
"WHERE extname = 'vector'"
)
diagnostics["pgvector_extension"] = ext_result
except Exception as e:
diagnostics["pgvector_extension"] = f"Error: {e}"
logger.error(
f"Vector type error diagnostics:\n"
f" Error: {error}\n"
f" search_path: {diagnostics.get('search_path')}\n"
f" current_schema: {diagnostics.get('current_schema')}\n"
f" user_info: {diagnostics.get('user_info')}\n"
f" pgvector_extension: {diagnostics.get('pgvector_extension')}"
)
except Exception as diag_error:
logger.error(f"Failed to collect vector error diagnostics: {diag_error}")
# Backward compatibility alias - HybridSearchWeights maps to StoreAgentSearchWeights # Backward compatibility alias - HybridSearchWeights maps to StoreAgentSearchWeights
# for existing code that expects the popularity parameter # for existing code that expects the popularity parameter
HybridSearchWeights = StoreAgentSearchWeights HybridSearchWeights = StoreAgentSearchWeights

View File

@@ -1,3 +1,4 @@
import asyncio
import logging import logging
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from enum import Enum from enum import Enum
@@ -225,6 +226,10 @@ class SyncRabbitMQ(RabbitMQBase):
class AsyncRabbitMQ(RabbitMQBase): class AsyncRabbitMQ(RabbitMQBase):
"""Asynchronous RabbitMQ client""" """Asynchronous RabbitMQ client"""
def __init__(self, config: RabbitMQConfig):
super().__init__(config)
self._reconnect_lock: asyncio.Lock | None = None
@property @property
def is_connected(self) -> bool: def is_connected(self) -> bool:
return bool(self._connection and not self._connection.is_closed) return bool(self._connection and not self._connection.is_closed)
@@ -235,7 +240,17 @@ class AsyncRabbitMQ(RabbitMQBase):
@conn_retry("AsyncRabbitMQ", "Acquiring async connection") @conn_retry("AsyncRabbitMQ", "Acquiring async connection")
async def connect(self): async def connect(self):
if self.is_connected: if self.is_connected and self._channel and not self._channel.is_closed:
return
if (
self.is_connected
and self._connection
and (self._channel is None or self._channel.is_closed)
):
self._channel = await self._connection.channel()
await self._channel.set_qos(prefetch_count=1)
await self.declare_infrastructure()
return return
self._connection = await aio_pika.connect_robust( self._connection = await aio_pika.connect_robust(
@@ -291,24 +306,46 @@ class AsyncRabbitMQ(RabbitMQBase):
exchange, routing_key=queue.routing_key or queue.name exchange, routing_key=queue.routing_key or queue.name
) )
@func_retry @property
async def publish_message( def _lock(self) -> asyncio.Lock:
if self._reconnect_lock is None:
self._reconnect_lock = asyncio.Lock()
return self._reconnect_lock
async def _ensure_channel(self) -> aio_pika.abc.AbstractChannel:
"""Get a valid channel, reconnecting if the current one is stale.
Uses a lock to prevent concurrent reconnection attempts from racing.
"""
if self.is_ready:
return self._channel # type: ignore # is_ready guarantees non-None
async with self._lock:
# Double-check after acquiring lock
if self.is_ready:
return self._channel # type: ignore
self._channel = None
await self.connect()
if self._channel is None:
raise RuntimeError("Channel should be established after connect")
return self._channel
async def _publish_once(
self, self,
routing_key: str, routing_key: str,
message: str, message: str,
exchange: Optional[Exchange] = None, exchange: Optional[Exchange] = None,
persistent: bool = True, persistent: bool = True,
) -> None: ) -> None:
if not self.is_ready: channel = await self._ensure_channel()
await self.connect()
if self._channel is None:
raise RuntimeError("Channel should be established after connect")
if exchange: if exchange:
exchange_obj = await self._channel.get_exchange(exchange.name) exchange_obj = await channel.get_exchange(exchange.name)
else: else:
exchange_obj = self._channel.default_exchange exchange_obj = channel.default_exchange
await exchange_obj.publish( await exchange_obj.publish(
aio_pika.Message( aio_pika.Message(
@@ -322,9 +359,23 @@ class AsyncRabbitMQ(RabbitMQBase):
routing_key=routing_key, routing_key=routing_key,
) )
@func_retry
async def publish_message(
self,
routing_key: str,
message: str,
exchange: Optional[Exchange] = None,
persistent: bool = True,
) -> None:
try:
await self._publish_once(routing_key, message, exchange, persistent)
except aio_pika.exceptions.ChannelInvalidStateError:
logger.warning(
"RabbitMQ channel invalid, forcing reconnect and retrying publish"
)
async with self._lock:
self._channel = None
await self._publish_once(routing_key, message, exchange, persistent)
async def get_channel(self) -> aio_pika.abc.AbstractChannel: async def get_channel(self) -> aio_pika.abc.AbstractChannel:
if not self.is_ready: return await self._ensure_channel()
await self.connect()
if self._channel is None:
raise RuntimeError("Channel should be established after connect")
return self._channel

View File

@@ -897,6 +897,29 @@ files = [
{file = "charset_normalizer-3.4.4.tar.gz", hash = "sha256:94537985111c35f28720e43603b8e7b43a6ecfb2ce1d3058bbe955b73404e21a"}, {file = "charset_normalizer-3.4.4.tar.gz", hash = "sha256:94537985111c35f28720e43603b8e7b43a6ecfb2ce1d3058bbe955b73404e21a"},
] ]
[[package]]
name = "claude-agent-sdk"
version = "0.1.33"
description = "Python SDK for Claude Code"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "claude_agent_sdk-0.1.33-py3-none-macosx_11_0_arm64.whl", hash = "sha256:57886a2dd124e5b3c9e12ec3e4841742ab3444d1e428b45ceaec8841c96698fa"},
{file = "claude_agent_sdk-0.1.33-py3-none-manylinux_2_17_aarch64.whl", hash = "sha256:ea0f1e4fadeec766000122723c406a6f47c6210ea11bb5cc0c88af11ef7c940c"},
{file = "claude_agent_sdk-0.1.33-py3-none-manylinux_2_17_x86_64.whl", hash = "sha256:0ecd822c577b4ea2a52e51146a24dcea73eb69ff366bdb875785dadb116d593b"},
{file = "claude_agent_sdk-0.1.33-py3-none-win_amd64.whl", hash = "sha256:a9fbd09d8f947005e087340ecd0706ed35639c946b4bd49429d3132db4cb3751"},
{file = "claude_agent_sdk-0.1.33.tar.gz", hash = "sha256:134bf403bb7553d829dadec42c30ecef340f5d4ad1595c1bdef933a9ca3129cf"},
]
[package.dependencies]
anyio = ">=4.0.0"
mcp = ">=0.1.0"
typing-extensions = {version = ">=4.0.0", markers = "python_version < \"3.11\""}
[package.extras]
dev = ["anyio[trio] (>=4.0.0)", "mypy (>=1.0.0)", "pytest (>=7.0.0)", "pytest-asyncio (>=0.20.0)", "pytest-cov (>=4.0.0)", "ruff (>=0.1.0)"]
[[package]] [[package]]
name = "cleo" name = "cleo"
version = "2.1.0" version = "2.1.0"
@@ -2593,6 +2616,18 @@ http2 = ["h2 (>=3,<5)"]
socks = ["socksio (==1.*)"] socks = ["socksio (==1.*)"]
zstd = ["zstandard (>=0.18.0)"] zstd = ["zstandard (>=0.18.0)"]
[[package]]
name = "httpx-sse"
version = "0.4.3"
description = "Consume Server-Sent Event (SSE) messages with HTTPX."
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "httpx_sse-0.4.3-py3-none-any.whl", hash = "sha256:0ac1c9fe3c0afad2e0ebb25a934a59f4c7823b60792691f779fad2c5568830fc"},
{file = "httpx_sse-0.4.3.tar.gz", hash = "sha256:9b1ed0127459a66014aec3c56bebd93da3c1bc8bb6618c8082039a44889a755d"},
]
[[package]] [[package]]
name = "huggingface-hub" name = "huggingface-hub"
version = "1.4.1" version = "1.4.1"
@@ -3310,6 +3345,39 @@ files = [
{file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"}, {file = "mccabe-0.7.0.tar.gz", hash = "sha256:348e0240c33b60bbdf4e523192ef919f28cb2c3d7d5c7794f74009290f236325"},
] ]
[[package]]
name = "mcp"
version = "1.26.0"
description = "Model Context Protocol SDK"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "mcp-1.26.0-py3-none-any.whl", hash = "sha256:904a21c33c25aa98ddbeb47273033c435e595bbacfdb177f4bd87f6dceebe1ca"},
{file = "mcp-1.26.0.tar.gz", hash = "sha256:db6e2ef491eecc1a0d93711a76f28dec2e05999f93afd48795da1c1137142c66"},
]
[package.dependencies]
anyio = ">=4.5"
httpx = ">=0.27.1"
httpx-sse = ">=0.4"
jsonschema = ">=4.20.0"
pydantic = ">=2.11.0,<3.0.0"
pydantic-settings = ">=2.5.2"
pyjwt = {version = ">=2.10.1", extras = ["crypto"]}
python-multipart = ">=0.0.9"
pywin32 = {version = ">=310", markers = "sys_platform == \"win32\""}
sse-starlette = ">=1.6.1"
starlette = ">=0.27"
typing-extensions = ">=4.9.0"
typing-inspection = ">=0.4.1"
uvicorn = {version = ">=0.31.1", markers = "sys_platform != \"emscripten\""}
[package.extras]
cli = ["python-dotenv (>=1.0.0)", "typer (>=0.16.0)"]
rich = ["rich (>=13.9.4)"]
ws = ["websockets (>=15.0.1)"]
[[package]] [[package]]
name = "mdurl" name = "mdurl"
version = "0.1.2" version = "0.1.2"
@@ -5994,7 +6062,7 @@ description = "Python for Window Extensions"
optional = false optional = false
python-versions = "*" python-versions = "*"
groups = ["main"] groups = ["main"]
markers = "platform_system == \"Windows\"" markers = "sys_platform == \"win32\" or platform_system == \"Windows\""
files = [ files = [
{file = "pywin32-311-cp310-cp310-win32.whl", hash = "sha256:d03ff496d2a0cd4a5893504789d4a15399133fe82517455e78bad62efbb7f0a3"}, {file = "pywin32-311-cp310-cp310-win32.whl", hash = "sha256:d03ff496d2a0cd4a5893504789d4a15399133fe82517455e78bad62efbb7f0a3"},
{file = "pywin32-311-cp310-cp310-win_amd64.whl", hash = "sha256:797c2772017851984b97180b0bebe4b620bb86328e8a884bb626156295a63b3b"}, {file = "pywin32-311-cp310-cp310-win_amd64.whl", hash = "sha256:797c2772017851984b97180b0bebe4b620bb86328e8a884bb626156295a63b3b"},
@@ -6974,6 +7042,28 @@ postgresql-psycopgbinary = ["psycopg[binary] (>=3.0.7)"]
pymysql = ["pymysql"] pymysql = ["pymysql"]
sqlcipher = ["sqlcipher3_binary"] sqlcipher = ["sqlcipher3_binary"]
[[package]]
name = "sse-starlette"
version = "3.2.0"
description = "SSE plugin for Starlette"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "sse_starlette-3.2.0-py3-none-any.whl", hash = "sha256:5876954bd51920fc2cd51baee47a080eb88a37b5b784e615abb0b283f801cdbf"},
{file = "sse_starlette-3.2.0.tar.gz", hash = "sha256:8127594edfb51abe44eac9c49e59b0b01f1039d0c7461c6fd91d4e03b70da422"},
]
[package.dependencies]
anyio = ">=4.7.0"
starlette = ">=0.49.1"
[package.extras]
daphne = ["daphne (>=4.2.0)"]
examples = ["aiosqlite (>=0.21.0)", "fastapi (>=0.115.12)", "sqlalchemy[asyncio] (>=2.0.41)", "uvicorn (>=0.34.0)"]
granian = ["granian (>=2.3.1)"]
uvicorn = ["uvicorn (>=0.34.0)"]
[[package]] [[package]]
name = "stagehand" name = "stagehand"
version = "0.5.9" version = "0.5.9"
@@ -8440,4 +8530,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
[metadata] [metadata]
lock-version = "2.1" lock-version = "2.1"
python-versions = ">=3.10,<3.14" python-versions = ">=3.10,<3.14"
content-hash = "14686ee0e2dc446a75d0db145b08dc410dc31c357e25085bb0f9b0174711c4b1" content-hash = "75f736638561770cdbced7b880319d1e9abf7b48ee13689c2c9f593e6d25fea0"

View File

@@ -16,6 +16,7 @@ anthropic = "^0.79.0"
apscheduler = "^3.11.1" apscheduler = "^3.11.1"
autogpt-libs = { path = "../autogpt_libs", develop = true } autogpt-libs = { path = "../autogpt_libs", develop = true }
bleach = { extras = ["css"], version = "^6.2.0" } bleach = { extras = ["css"], version = "^6.2.0" }
claude-agent-sdk = "^0.1.0"
click = "^8.2.0" click = "^8.2.0"
cryptography = "^46.0" cryptography = "^46.0"
discord-py = "^2.5.2" discord-py = "^2.5.2"

View File

@@ -104,7 +104,31 @@ export function FileInput(props: Props) {
return false; return false;
} }
const getFileLabelFromValue = (val: string) => { const getFileLabelFromValue = (val: unknown): string => {
// Handle object format from external API: { name, type, size, data }
if (val && typeof val === "object") {
const obj = val as Record<string, unknown>;
if (typeof obj.name === "string") {
return getFileLabel(
obj.name,
typeof obj.type === "string" ? obj.type : "",
);
}
if (typeof obj.type === "string") {
const mimeParts = obj.type.split("/");
if (mimeParts.length > 1) {
return `${mimeParts[1].toUpperCase()} file`;
}
return `${obj.type} file`;
}
return "File";
}
// Handle string values (data URIs or file paths)
if (typeof val !== "string") {
return "File";
}
if (val.startsWith("data:")) { if (val.startsWith("data:")) {
const matches = val.match(/^data:([^;]+);/); const matches = val.match(/^data:([^;]+);/);
if (matches?.[1]) { if (matches?.[1]) {