Commit Graph

166 Commits

Author SHA1 Message Date
Zamil Majdy
9ddcaa884c Merge remote-tracking branch 'origin/dev' into pwuts/open-2995-refactor-move-copilot-ai-generation-tool-execution-to 2026-02-13 20:47:24 +04:00
Zamil Majdy
f9f358c526 feat(mcp): Add MCP tool block with OAuth, tool discovery, and standard credential integration (#12011)
## Summary

<img width="1000" alt="image"
src="https://github.com/user-attachments/assets/18e8ef34-d222-453c-8b0a-1b25ef8cf806"
/>


<img width="250" alt="image"
src="https://github.com/user-attachments/assets/ba97556c-09c5-4f76-9f4e-49a2e8e57468"
/>

<img width="250" alt="image"
src="https://github.com/user-attachments/assets/68f7804a-fe74-442d-9849-39a229c052cf"
/>

<img width="250" alt="image"
src="https://github.com/user-attachments/assets/700690ba-f9fe-4726-8871-3bfbab586001"
/>

Full-stack MCP (Model Context Protocol) tool block integration that
allows users to connect to any MCP server, discover available tools,
authenticate via OAuth, and execute tools — all through the standard
AutoGPT credential system.

### Backend

- **MCPToolBlock** (`blocks/mcp/block.py`): New block using
`CredentialsMetaInput` pattern with optional credentials (`default={}`),
supporting both authenticated (OAuth) and public MCP servers. Includes
auto-lookup fallback for backward compatibility.
- **MCP Client** (`blocks/mcp/client.py`): HTTP transport with JSON-RPC
2.0, tool discovery, tool execution with robust error handling
(type-checked error fields, non-JSON response handling)
- **MCP OAuth Handler** (`blocks/mcp/oauth.py`): RFC 8414 discovery,
dynamic per-server OAuth with PKCE, token storage and refresh via
`raise_for_status=True`
- **MCP API Routes** (`api/features/mcp/routes.py`): `discover-tools`,
`oauth/login`, `oauth/callback` endpoints with credential cleanup,
defensive OAuth metadata validation
- **Credential system integration**:
- `CredentialsMetaInput` model_validator normalizes legacy
`"ProviderName.MCP"` format from Python 3.13's `str(StrEnum)` change
- `CredentialsFieldInfo.combine()` supports URL-based credential
discrimination (each MCP server gets its own credential entry)
- `aggregate_credentials_inputs` checks block schema defaults for
credential optionality
- Executor normalizes credential data for both Pydantic and JSON schema
validation paths
  - Chat credential matching handles MCP server URL filtering
- `provider_matches()` helper used consistently for Python 3.13 StrEnum
compatibility
- **Pre-run validation**: `_validate_graph_get_errors` now calls
`get_missing_input()` for custom block-level validation (MCP tool
arguments)
- **Security**: HTML tag stripping loop to prevent XSS bypass, SSRF
protection (removed trusted_origins)

### Frontend

- **MCPToolDialog** (`MCPToolDialog.tsx`): Full tool discovery UI —
enter server URL, authenticate if needed, browse tools, select tool and
configure
- **OAuth popup** (`oauth-popup.ts`): Shared utility supporting
cross-origin MCP OAuth flows with BroadcastChannel + localStorage
fallback
- **Credential integration**: MCP-specific OAuth flow in
`useCredentialsInput`, server URL filtering in `useCredentials`, MCP
callback page
- **CredentialsSelect**: Auto-selects first available credential instead
of defaulting to "None", credentials listed before "None" in dropdown
- **Node rendering**: Dynamic tool input schema rendering on MCP nodes,
proper handling in both legacy and new flow editors
- **Block title persistence**: `customized_name` set at block creation
for both MCP and Agent blocks — no fallback logic needed, titles survive
save/load reliably
- **Stable credential ordering**: Removed `sortByUnsetFirst` that caused
credential inputs to jump when selected

### Tests (~2060 lines)

- Unit tests: block, client, tool execution
- Integration tests: mock MCP server with auth
- OAuth flow tests
- API endpoint tests
- Credential combining/optionality tests
- E2e tests (skipped in CI, run manually)

## Key Design Decisions

1. **Optional credentials via `default={}`**: MCP servers can be public
(no auth) or private (OAuth). The `credentials` field has `default={}`
making it optional at the schema level, so public servers work without
prompting for credentials.

2. **URL-based credential discrimination**: Each MCP server URL gets its
own credential entry in the "Run agent" form (via
`discriminator="server_url"`), so agents using multiple MCP servers
prompt for each independently.

3. **Model-level normalization**: Python 3.13 changed `str(StrEnum)` to
return `"ClassName.MEMBER"`. Rather than scattering fixes across the
codebase, a Pydantic `model_validator(mode="before")` on
`CredentialsMetaInput` handles normalization centrally, and
`provider_matches()` handles lookups.

4. **Credential auto-select**: `CredentialsSelect` component defaults to
the first available credential and notifies the parent state, ensuring
credentials are pre-filled in the "Run agent" dialog without requiring
manual selection.

5. **customized_name for block titles**: Both MCP and Agent blocks set
`customized_name` in metadata at creation time. This eliminates
convoluted runtime fallback logic (`agent_name`, hostname extraction) —
the title is persisted once and read directly.

## Test plan

- [x] Unit/integration tests pass (68 MCP + 11 graph = 79 tests)
- [x] Manual: MCP block with public server (DeepWiki) — no credentials
needed, tools discovered and executable
- [x] Manual: MCP block with OAuth server (Linear, Sentry) — OAuth flow
prompts correctly
- [x] Manual: "Run agent" form shows correct credential requirements per
MCP server
- [x] Manual: Credential auto-selects when exactly one matches,
pre-selects first when multiple exist
- [x] Manual: Credential ordering stays stable when
selecting/deselecting
- [x] Manual: MCP block title persists after save and refresh
- [x] Manual: Agent block title persists after save and refresh (via
customized_name)
- [ ] Manual: Shared agent with MCP block prompts new user for
credentials

---------

Co-authored-by: Otto <otto@agpt.co>
Co-authored-by: Ubbe <hi@ubbe.dev>
2026-02-13 16:17:03 +00:00
Reinier van der Leer
113e87a23c refactor(backend): Reduce circular imports (#12068)
I'm getting circular import issues because there is a lot of
cross-importing between `backend.data`, `backend.blocks`, and other
modules. This change reduces block-related cross-imports and thus risk
of breaking circular imports.

### Changes 🏗️

- Strip down `backend.data.block`
- Move `Block` base class and related class/enum defs to
`backend.blocks._base`
  - Move `is_block_auth_configured` to `backend.blocks._utils`
- Move `get_blocks()`, `get_io_block_ids()` etc. to `backend.blocks`
(`__init__.py`)
  - Update imports everywhere
- Remove unused and poorly typed `Block.create()`
  - Change usages from `block_cls.create()` to `block_cls()`
- Improve typing of `load_all_blocks` and `get_blocks`
- Move cross-import of `backend.api.features.library.model` from
`backend/data/__init__.py` to `backend/data/integrations.py`
- Remove deprecated attribute `NodeModel.webhook`
  - Re-generate OpenAPI spec and fix frontend usage
- Eliminate module-level `backend.blocks` import from `blocks/agent.py`
- Eliminate module-level `backend.data.execution` and
`backend.executor.manager` imports from `blocks/helpers/review.py`
- Replace `BlockInput` with `GraphInput` for graph inputs

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - CI static type-checking + tests should be sufficient for this
2026-02-12 12:07:49 +00:00
Reinier van der Leer
80659d90e4 Merge branch 'pwuts/move-block-base-to-fix-circular-imports' into pwuts/open-2995-copilot-microservice-with-block-refactor 2026-02-12 13:01:31 +01:00
Reinier van der Leer
eef892893c untangle some more 2026-02-12 11:23:42 +01:00
Reinier van der Leer
f02c00374e Merge branch 'pwuts/move-block-base-to-fix-circular-imports' into pwuts/open-2995-copilot-microservice-with-block-refactor 2026-02-12 10:28:21 +01:00
Reinier van der Leer
cad54a9f3e eliminate more cross-imports 2026-02-11 17:12:43 +01:00
Reinier van der Leer
7a4cf4e186 Merge branch 'pwuts/move-block-base-to-fix-circular-imports' into pwuts/open-2995-copilot-microservice-with-block-refactor 2026-02-11 15:53:22 +01:00
Reinier van der Leer
fe9debd80f refactor(backend/blocks): Extract backend.blocks._base from backend.data.block
I'm getting circular import issues because there is a lot of cross-importing between `backend.data`, `backend.blocks`, and other components. This change reduces block-related cross-imports and thus risk of breaking circular imports.
2026-02-11 15:38:42 +01:00
Reinier van der Leer
ee2805d14c fix(backend/copilot): Use DatabaseManager where needed 2026-02-11 13:43:58 +01:00
Otto
062fe1aa70 fix(security): enforce disabled flag on blocks in graph validation (#12059)
## Summary
Blocks marked `disabled=True` (like BlockInstallationBlock) were not
being checked during graph validation, allowing them to be used via
direct API calls despite being hidden from the UI.

This adds a security check in `_validate_graph_get_errors()` to reject
any graph containing disabled blocks.

## Security Advisory
GHSA-4crw-9p35-9x54

## Linear
SECRT-1927

## Changes
- Added `block.disabled` check in graph validation (6 lines)

## Testing
- Graphs with disabled blocks → rejected with clear error message
- Graphs with valid blocks → unchanged behavior

<!-- greptile_comment -->

<h2>Greptile Overview</h2>

<details><summary><h3>Greptile Summary</h3></summary>

Adds critical security validation to prevent execution of disabled
blocks (like `BlockInstallationBlock`) via direct API calls. The fix
validates that `block.disabled` is `False` during graph validation in
`_validate_graph_get_errors()` on line 747-750, ensuring disabled blocks
are rejected before graph creation or execution. This closes a
vulnerability where blocks marked disabled in the UI could still be used
through API endpoints.
</details>


<details><summary><h3>Confidence Score: 5/5</h3></summary>

- This PR is safe to merge and addresses a critical security
vulnerability
- The fix is minimal (6 lines), correctly placed in the validation flow,
includes clear security context (GHSA reference), and follows existing
validation patterns. The check is positioned after block existence
validation and before input validation, ensuring disabled blocks are
caught early in both graph creation and execution paths.
- No files require special attention
</details>


<!-- greptile_other_comments_section -->

<!-- /greptile_comment -->

---------

Co-authored-by: Nicholas Tindle <nicholas.tindle@agpt.co>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 03:28:19 +00:00
Nicholas Tindle
7668c17d9c feat(platform): add User Workspace for persistent CoPilot file storage (#11867)
Implements persistent User Workspace storage for CoPilot, enabling
blocks to save and retrieve files across sessions. Files are stored in
session-scoped virtual paths (`/sessions/{session_id}/`).

Fixes SECRT-1833

### Changes 🏗️

**Database & Storage:**
- Add `UserWorkspace` and `UserWorkspaceFile` Prisma models
- Implement `WorkspaceStorageBackend` abstraction (GCS for cloud, local
filesystem for self-hosted)
- Add `workspace_id` and `session_id` fields to `ExecutionContext`

**Backend API:**
- Add REST endpoints: `GET/POST /api/workspace/files`, `GET/DELETE
/api/workspace/files/{id}`, `GET /api/workspace/files/{id}/download`
- Add CoPilot tools: `list_workspace_files`, `read_workspace_file`,
`write_workspace_file`
- Integrate workspace storage into `store_media_file()` - returns
`workspace://file-id` references

**Block Updates:**
- Refactor all file-handling blocks to use unified `ExecutionContext`
parameter
- Update media-generating blocks to persist outputs to workspace
(AIImageGenerator, AIImageCustomizer, FluxKontext, TalkingHead, FAL
video, Bannerbear, etc.)

**Frontend:**
- Render `workspace://` image references in chat via proxy endpoint
- Add "AI cannot see this image" overlay indicator

**CoPilot Context Mapping:**
- Session = Agent (graph_id) = Run (graph_exec_id)
- Files scoped to `/sessions/{session_id}/`

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [ ] I have tested my changes according to the test plan:
- [ ] Create CoPilot session, generate image with AIImageGeneratorBlock
  - [ ] Verify image returns `workspace://file-id` (not base64)
  - [ ] Verify image renders in chat with visibility indicator
  - [ ] Verify workspace files persist across sessions
  - [ ] Test list/read/write workspace files via CoPilot tools
  - [ ] Test local storage backend for self-hosted deployments

#### For configuration changes:
- [x] `.env.default` is updated or already compatible with my changes
- [x] `docker-compose.yml` is updated or already compatible with my
changes
- [x] I have included a list of my configuration changes in the PR
description (under **Changes**)

🤖 Generated with [Claude Code](https://claude.ai/code)

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> **Medium Risk**
> Introduces a new persistent file-storage surface area (DB tables,
storage backends, download API, and chat tools) and rewires
`store_media_file()`/block execution context across many blocks, so
regressions could impact file handling, access control, or storage
costs.
> 
> **Overview**
> Adds a **persistent per-user Workspace** (new
`UserWorkspace`/`UserWorkspaceFile` models plus `WorkspaceManager` +
`WorkspaceStorageBackend` with GCS/local implementations) and wires it
into the API via a new `/api/workspace/files/{file_id}/download` route
(including header-sanitized `Content-Disposition`) and shutdown
lifecycle hooks.
> 
> Extends `ExecutionContext` to carry execution identity +
`workspace_id`/`session_id`, updates executor tooling to clone
node-specific contexts, and updates `run_block` (CoPilot) to create a
session-scoped workspace and synthetic graph/run/node IDs.
> 
> Refactors `store_media_file()` to require `execution_context` +
`return_format` and to support `workspace://` references; migrates many
media/file-handling blocks and related tests to the new API and to
persist generated media as `workspace://...` (or fall back to data URIs
outside CoPilot), and adds CoPilot chat tools for
listing/reading/writing/deleting workspace files with safeguards against
context bloat.
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
6abc70f793. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Co-authored-by: Reinier van der Leer <pwuts@agpt.co>
2026-01-29 05:49:47 +00:00
Nicholas Tindle
47a3a5ef41 feat(backend,frontend): optional credentials flag for blocks at agent level (#11716)
This feature allows agent makers to mark credential fields as optional.
When credentials are not configured for an optional block, the block
will be skipped during execution rather than causing a validation error.

**Use case:** An agent with multiple notification channels (Discord,
Twilio, Slack) where the user only needs to configure one - unconfigured
channels are simply skipped.

### Changes 🏗️

#### Backend

**Data Model Changes:**
- `backend/data/graph.py`: Added `credentials_optional` property to
`Node` model that reads from node metadata
- `backend/data/execution.py`: Added `nodes_to_skip` field to
`GraphExecutionEntry` model to track nodes that should be skipped

**Validation Changes:**
- `backend/executor/utils.py`:
- Updated `_validate_node_input_credentials()` to return a tuple of
`(credential_errors, nodes_to_skip)`
- Nodes with `credentials_optional=True` and missing credentials are
added to `nodes_to_skip` instead of raising validation errors
- Updated `validate_graph_with_credentials()` to propagate
`nodes_to_skip` set
- Updated `validate_and_construct_node_execution_input()` to return
`nodes_to_skip`
- Updated `add_graph_execution()` to pass `nodes_to_skip` to execution
entry

**Execution Changes:**
- `backend/executor/manager.py`:
  - Added skip logic in `_on_graph_execution()` dispatch loop
- When a node is in `nodes_to_skip`, it is marked as `COMPLETED` without
execution
  - No outputs are produced, so downstream nodes won't trigger

#### Frontend

**Node Store:**
- `frontend/src/app/(platform)/build/stores/nodeStore.ts`:
- Added `credentials_optional` to node metadata serialization in
`convertCustomNodeToBackendNode()`
- Added `getCredentialsOptional()` and `setCredentialsOptional()` helper
methods

**Credential Field Component:**
-
`frontend/src/components/renderers/input-renderer/fields/CredentialField/CredentialField.tsx`:
  - Added "Optional - skip block if not configured" switch toggle
  - Switch controls the `credentials_optional` metadata flag
  - Placeholder text updates based on optional state

**Credential Field Hook:**
-
`frontend/src/components/renderers/input-renderer/fields/CredentialField/useCredentialField.ts`:
  - Added `disableAutoSelect` parameter
- When credentials are optional, auto-selection of credentials is
disabled

**Feature Flags:**
- `frontend/src/services/feature-flags/use-get-flag.ts`: Minor refactor
(condition ordering)

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
- [x] Build an agent using smart decision maker and down stream blocks
to test this

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Introduces optional credentials across graph execution and UI,
allowing nodes to be skipped (no outputs, no downstream triggers) when
their credentials are not configured.
> 
> - Backend
> - Adds `Node.credentials_optional` (from node `metadata`) and computes
required credential fields in `Graph.credentials_input_schema` based on
usage.
> - Validates credentials with `_validate_node_input_credentials` →
returns `(errors, nodes_to_skip)`; plumbs `nodes_to_skip` through
`validate_graph_with_credentials`,
`_construct_starting_node_execution_input`,
`validate_and_construct_node_execution_input`, and `add_graph_execution`
into `GraphExecutionEntry`.
> - Executor: dispatch loop skips nodes in `nodes_to_skip` (marks
`COMPLETED`); `execute_node`/`on_node_execution` accept `nodes_to_skip`;
`SmartDecisionMakerBlock.run` filters tool functions whose
`_sink_node_id` is in `nodes_to_skip` and errors only if all tools are
filtered.
> - Models: `GraphExecutionEntry` gains `nodes_to_skip` field. Tests and
snapshots updated accordingly.
> 
> - Frontend
> - Builder: credential field uses `custom/credential_field` with an
"Optional – skip block if not configured" toggle; `nodeStore` persists
`credentials_optional` and history; UI hides optional toggle in run
dialogs.
> - Run dialogs: compute required credentials from
`credentials_input_schema.required`; allow selecting "None"; avoid
auto-select for optional; filter out incomplete creds before execute.
>   - Minor schema/UI wiring updates (`uiSchema`, form context flags).
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
5e01fd6a3e. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com>
Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
2026-01-09 14:11:35 +00:00
Nicholas Tindle
79d45a15d0 feat(platform): Deduplicate insufficient funds Discord + email notifications (#11672)
Add Redis-based deduplication for insufficient funds notifications (both
Discord alerts and user emails) when users run out of credits. This
prevents spamming users and the PRODUCT Discord channel with repeated
alerts for the same user+agent combination.

### Changes 🏗️

- **Redis-based deduplication** (`backend/executor/manager.py`):
- Add `INSUFFICIENT_FUNDS_NOTIFIED_PREFIX` constant for Redis key prefix
- Add `INSUFFICIENT_FUNDS_NOTIFIED_TTL_SECONDS` (30 days) as fallback
cleanup
- Implement deduplication in `_handle_insufficient_funds_notif` using
Redis `SET NX`
- Skip both email (`ZERO_BALANCE`) and Discord notifications for
duplicate alerts per user+agent
- Add `clear_insufficient_funds_notifications(user_id)` function to
remove all notification flags for a user

- **Clear flags on credit top-up** (`backend/data/credit.py`):
- Call `clear_insufficient_funds_notifications` in `_top_up_credits`
after successful auto-charge
- Call `clear_insufficient_funds_notifications` in `fulfill_checkout`
after successful manual top-up
- This allows users to receive notifications again if they run out of
funds in the future

- **Comprehensive test coverage**
(`backend/executor/manager_insufficient_funds_test.py`):
  - Test first-time notification sends both email and Discord alert
  - Test duplicate notifications are skipped for same user+agent
  - Test different agents for same user get separate alerts
  - Test clearing notifications removes all keys for a user
  - Test handling when no notification keys exist
- Test notifications still sent when Redis fails (graceful degradation)

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
- [x] First insufficient funds alert sends both email and Discord
notification
  - [x] Duplicate alerts for same user+agent are skipped
  - [x] Different agents for same user each get their own notification
  - [x] Topping up credits clears notification flags
  - [x] Redis failure gracefully falls back to sending notifications
  - [x] 30-day TTL provides automatic cleanup as fallback
  - [x] Manually test this works with scheduled agents
 

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Introduces Redis-backed deduplication for insufficient-funds alerts
and resets flags on successful credit additions.
> 
> - **Dedup insufficient-funds alerts** in `executor/manager.py` using
Redis `SET NX` with `INSUFFICIENT_FUNDS_NOTIFIED_PREFIX` and 30‑day TTL;
skips duplicate ZERO_BALANCE email + Discord alerts per
`user_id`+`graph_id`, with graceful fallback if Redis fails.
> - **Reset notification flags on credit increases** by adding
`clear_insufficient_funds_notifications(user_id)` and invoking it when
enabling/adding positive `GRANT`/`TOP_UP` transactions in
`data/credit.py`.
> - **Tests** (`executor/manager_insufficient_funds_test.py`):
first-time vs duplicate behavior, per-agent separation, clearing keys
(including no-key and Redis-error cases), and clearing on
`_add_transaction`/`_enable_transaction`.
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
1a4413b3a1. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Co-authored-by: Ubbe <hi@ubbe.dev>
Co-authored-by: Claude <noreply@anthropic.com>
2025-12-30 18:10:30 +00:00
Reinier van der Leer
de78d062a9 refactor(backend/api): Clean up API file structure (#11629)
We'll soon be needing a more feature-complete external API. To make way
for this, I'm moving some files around so:
- We can more easily create new versions of our external API
- The file structure of our internal API is more homogeneous

These changes are quite opinionated, but IMO in any case they're better
than the chaotic structure we have now.

### Changes 🏗️

- Move `backend/server` -> `backend/api`
- Move `backend/server/routers` + `backend/server/v2` ->
`backend/api/features`
  - Change absolute sibling imports to relative imports
- Move `backend/server/v2/AutoMod` -> `backend/executor/automod`
- Combine `backend/server/routers/analytics_*test.py` ->
`backend/api/features/analytics_test.py`
- Sort OpenAPI spec file

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - CI tests
  - [x] Clicking around in the app -> no obvious breakage
2025-12-20 20:33:10 +00:00
Zamil Majdy
71157bddd7 feat(backend): add agent mode support to SmartDecisionMakerBlock with autonomous tool execution loops (#11547)
## Summary

<img width="2072" height="1836" alt="image"
src="https://github.com/user-attachments/assets/9d231a77-6309-46b9-bc11-befb5d8e9fcc"
/>

**🚀 Major Feature: Agent Mode Support**

Adds autonomous agent mode to SmartDecisionMakerBlock, enabling it to
execute tools directly in loops until tasks are completed, rather than
just yielding tool calls for external execution.

##  **Key New Features**

### 🤖 **Agent Mode with Tool Execution Loops**
- **New `agent_mode_max_iterations` parameter** controls execution
behavior:
  - `0` = Traditional mode (single LLM call, yield tool calls)
  - `1+` = Agent mode with iteration limit
  - `-1` = Infinite agent mode (loop until finished)

### 🔄 **Autonomous Tool Execution**  
- **Direct tool execution** instead of yielding for external handling
- **Multi-iteration loops** with conversation state management
- **Automatic completion detection** when LLM stops making tool calls
- **Iteration limit handling** with graceful completion messages

### 🏗️ **Proper Database Operations**
- **Replace manual execution ID generation** with proper
`upsert_execution_input`/`upsert_execution_output`
- **Real NodeExecutionEntry objects** from database results
- **Proper execution status management**: QUEUED → RUNNING →
COMPLETED/FAILED

### 🔧 **Enhanced Type Safety**
- **Pydantic models** replace TypedDict: `ToolInfo` and
`ExecutionParams`
- **Runtime validation** with better error messages
- **Improved developer experience** with IDE support

## 🔧 **Technical Implementation**

### Agent Mode Flow:
```python
# Agent mode enabled with iterations
if input_data.agent_mode_max_iterations != 0:
    async for result in self._execute_tools_agent_mode(...):
        yield result  # "conversations", "finished"
    return

# Traditional mode (existing behavior)  
# Single LLM call + yield tool calls for external execution
```

### Tool Execution with Database Operations:
```python
# Before: Manual execution IDs
tool_exec_id = f"{node_exec_id}_tool_{sink_node_id}_{len(input_data)}"

# After: Proper database operations
node_exec_result, final_input_data = await db_client.upsert_execution_input(
    node_id=sink_node_id,
    graph_exec_id=execution_params.graph_exec_id,
    input_name=input_name, 
    input_data=input_value,
)
```

### Type Safety with Pydantic:
```python
# Before: Dict access prone to errors
execution_params["user_id"]  

# After: Validated model access
execution_params.user_id  # Runtime validation + IDE support
```

## 🧪 **Comprehensive Test Coverage**

- **Agent mode execution tests** with multi-iteration scenarios
- **Database operation verification** 
- **Type safety validation**
- **Backward compatibility** for traditional mode
- **Enhanced dynamic fields tests**

## 📊 **Usage Examples**

### Traditional Mode (Existing Behavior):
```python
SmartDecisionMakerBlock.Input(
    prompt="Search for keywords",
    agent_mode_max_iterations=0  # Default
)
# → Yields tool calls for external execution
```

### Agent Mode (New Feature):
```python  
SmartDecisionMakerBlock.Input(
    prompt="Complete this task using available tools",
    agent_mode_max_iterations=5  # Max 5 iterations
)
# → Executes tools directly until task completion or iteration limit
```

### Infinite Agent Mode:
```python
SmartDecisionMakerBlock.Input(
    prompt="Analyze and process this data thoroughly", 
    agent_mode_max_iterations=-1  # No limit, run until finished
)
# → Executes tools autonomously until LLM indicates completion
```

##  **Backward Compatibility**

- **Zero breaking changes** to existing functionality
- **Traditional mode remains default** (`agent_mode_max_iterations=0`)
- **All existing tests pass**
- **Same API for tool definitions and execution**

This transforms the SmartDecisionMakerBlock from a simple tool call
generator into a powerful autonomous agent capable of complex multi-step
task execution! 🎯

🤖 Generated with [Claude Code](https://claude.ai/code)

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-12-12 09:58:06 +00:00
Nicholas Tindle
113df689dc feat(platform): Improve Google Sheets/Drive integration with unified credentials (#11520)
Simplifies and improves the Google Sheets/Drive integration by merging
credentials with the file picker and using narrower OAuth scopes.

### Changes 🏗️

- Merge Google credentials and file picker into a single unified input
field for better UX
- Create spreadsheets using Drive API instead of Sheets API for proper
scope support
- Simplify Google Drive OAuth scope to only use `drive.file` (narrowest
permission needed)
- Clean up unused imports (NormalizedPickedFile)

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
- [x] Test creating a new Google Spreadsheet with
GoogleSheetsCreateSpreadsheetBlock
- [x] Test reading from existing spreadsheets with GoogleSheetsReadBlock
  - [x] Test writing to spreadsheets with GoogleSheetsWriteBlock
  - [x] Verify OAuth flow works with simplified scopes
  - [x] Verify file picker works with merged credentials field

#### For configuration changes:
- [x] `.env.default` is updated or already compatible with my changes
- [x] `docker-compose.yml` is updated or already compatible with my
changes
- [x] I have included a list of my configuration changes in the PR
description (under **Changes**)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Unifies Google Drive picker and credentials with auto-credentials
across backend and frontend, updates all Sheets blocks and execution to
use it, and adds Drive-based spreadsheet creation plus supporting tests
and UI fixes.
> 
> - **Backend**:
> - **Google Drive model/field**: Introduce `GoogleDriveFile` (with
`_credentials_id`) and `GoogleDriveFileField()` for unified auth+picker
(`backend/blocks/google/_drive.py`).
> - **Sheets blocks**: Replace `GoogleDrivePickerField` and explicit
credentials with `GoogleDriveFileField` across all Sheets blocks;
preserve and emit credentials for chaining; add Drive service; create
spreadsheets via Drive API then manage via Sheets API.
> - **IO block**: Add `AgentGoogleDriveFileInputBlock` providing a Drive
picker input.
> - **Execution**: Support auto-generated credentials via
`BlockSchema.get_auto_credentials_fields()`; acquire/release multiple
credential locks; pass creds by `credentials_kwarg`
(`executor/manager.py`, `data/block.py`, `util/test.py`).
> - **Tests**: Add validation tests for duplicate/unique
`auto_credentials.kwarg_name` and defaults.
> - **Frontend**:
> - **Picker**: Enhance Google Drive picker to require/use saved
platform credentials, pass `_credentials_id`, validate scopes, and
manage dialog z-index/interaction; expose `requirePlatformCredentials`.
> - **UI**: Update dialogs/CSS to keep Google picker on top and prevent
overlay interactions.
> - **Types**: Extend `GoogleDrivePickerConfig` with `auto_credentials`
and related typings.
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
7d25534def. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com>
2025-12-04 14:40:30 +00:00
Zamil Majdy
7b951c977e feat(platform): implement graph-level Safe Mode toggle for HITL blocks (#11455)
## Summary

This PR implements a graph-level Safe Mode toggle system for
Human-in-the-Loop (HITL) blocks. When Safe Mode is ON (default), HITL
blocks require manual review before proceeding. When OFF, they execute
automatically.

## 🔧 Backend Changes

- **Database**: Added `metadata` JSON column to `AgentGraph` table with
migration
- **API**: Updated `execute_graph` endpoint to accept `safe_mode`
parameter
- **Execution**: Enhanced execution context to use graph metadata as
default with API override capability
- **Auto-detection**: Automatically populate `has_human_in_the_loop` for
graphs containing HITL blocks
- **Block Detection**: HITL block ID:
`8b2a7b3c-6e9d-4a5f-8c1b-2e3f4a5b6c7d`

## 🎨 Frontend Changes

- **Component**: New `FloatingSafeModeToggle` with dual variants:
  - **White variant**: For library pages, integrates with action buttons
  - **Black variant**: For builders, floating positioned  
- **Integration**: Added toggles to both new/legacy builders and library
pages
- **API Integration**: Direct graph metadata updates via
`usePutV1UpdateGraphVersion`
- **Query Management**: React Query cache invalidation for consistent UI
updates
- **Conditional Display**: Toggle only appears when graph contains HITL
blocks

## 🛠 Technical Implementation

- **Safe Mode ON** (default): HITL blocks require manual review before
proceeding
- **Safe Mode OFF**: HITL blocks execute automatically without
intervention
- **Priority**: Backend API `safe_mode` parameter takes precedence over
graph metadata
- **Detection**: Auto-populates `has_human_in_the_loop` metadata field
- **Positioning**: Proper z-index and responsive positioning for
floating elements

## 🚧 Known Issues (Work in Progress)

### High Priority
- [ ] **Toggle state persistence**: Always shows "ON" regardless of
actual state - query invalidation issue
- [ ] **LibraryAgent metadata**: Missing metadata field causing
TypeScript errors
- [ ] **Tooltip z-index**: Still covered by some UI elements despite
high z-index

### Medium Priority  
- [ ] **HITL detection**: Logic needs improvement for reliable block
detection
- [ ] **Error handling**: Removing HITL blocks from graph causes save
errors
- [ ] **TypeScript**: Fix type mismatches between GraphModel and
LibraryAgent

### Low Priority
- [ ] **Frontend API**: Add `safe_mode` parameter to execution calls
once OpenAPI is regenerated
- [ ] **Performance**: Consider debouncing rapid toggle clicks

## 🧪 Test Plan

- [ ] Verify toggle appears only when graph has HITL blocks
- [ ] Test toggle persistence across page refreshes  
- [ ] Confirm API calls update graph metadata correctly
- [ ] Validate execution behavior respects safe mode setting
- [ ] Check styling consistency across builder and library contexts

## 🔗 Related

- Addresses requirements for graph-level HITL configuration
- Builds on existing FloatingReviewsPanel infrastructure
- Integrates with existing graph metadata system

🤖 Generated with [Claude Code](https://claude.ai/code)
2025-12-02 09:55:55 +00:00
Zamil Majdy
3d08c22dd5 feat(platform): add Human In The Loop block with review workflow (#11380)
## Summary
This PR implements a comprehensive Human In The Loop (HITL) block that
allows agents to pause execution and wait for human
approval/modification of data before continuing.



https://github.com/user-attachments/assets/c027d731-17d3-494c-85ca-97c3bf33329c


## Key Features
- Added WAITING_FOR_REVIEW status to AgentExecutionStatus enum
- Created PendingHumanReview database table for storing review requests
- Implemented HumanInTheLoopBlock that extracts input data and creates
review entries
- Added API endpoints at /api/executions/review for fetching and
reviewing pending data
- Updated execution manager to properly handle waiting status and resume
after approval

## Frontend Components
- PendingReviewCard for individual review handling
- PendingReviewsList for multiple reviews
- FloatingReviewsPanel for graph builder integration
- Integrated review UI into 3 locations: legacy library, new library,
and graph builder

## Technical Implementation
- Added proper type safety throughout with SafeJson handling
- Optimized database queries using count functions instead of full data
fetching
- Fixed imports to be top-level instead of local
- All formatters and linters pass

## Test plan
- [ ] Test Human In The Loop block creation in graph builder
- [ ] Test block execution pauses and creates pending review
- [ ] Test review UI appears in all 3 locations
- [ ] Test data modification and approval workflow
- [ ] Test rejection workflow
- [ ] Test execution resumes after approval

🤖 Generated with [Claude Code](https://claude.ai/code)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Added Human-In-The-Loop review workflows to pause executions for human
validation.
* Users can approve or reject pending tasks, optionally editing
submitted data and adding a message.
* New "Waiting for Review" execution status with UI indicators across
run lists, badges, and activity views.
* Review management UI: pending review cards, list view, and a floating
reviews panel for quick access.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-11-27 12:07:46 +07:00
Swifty
9438817702 fix(platform): Capture Sentry Block Errors Correctly (#11404)
Currently we are capturing block errors via the scope only, this change
captures the error directly.

### Changes 🏗️

- capture the error as well as the scope in the executor manager
- Update the block error message to include additional details
- remove the __str__ function from blockerror as it is no longer needed

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] Checked that errors are still captured in dev
2025-11-19 12:21:47 +01:00
Reinier van der Leer
536e2a5ec8 fix(blocks): Make Smart Decision Maker tool pin handling consistent and reliable (#11363)
- Resolves #11345

### Changes 🏗️

- Move tool use routing logic from frontend to backend: routing info was
being baked into graph links by the frontend, inconsistently, causing
issues
- Rework tool use routing to use target node ID instead of target block
name
- Add a bit of magic to `NodeOutputs` component to show tool node title
instead of ID

DX:
- Removed `build` from `.prettierignore` -> re-enable formatting for
builder components

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Use SDM block in a graph; verify it works
  - [x] Use SDM block with agent executor block as tool; verify it works
  - Tests for `parse_execution_output` pass (checked by CI)
2025-11-12 18:55:38 +01:00
Zamil Majdy
6037f80502 feat(backend): Add correctness score to execution activity generation (#11325)
## Summary
Add AI-generated correctness score field to execution activity status
generation to provide quantitative assessment of how well executions
achieved their intended purpose.

New page:
<img width="1000" height="229" alt="image"
src="https://github.com/user-attachments/assets/5cb907cf-5bc7-4b96-8128-8eecccde9960"
/>

Old page:
<img width="1000" alt="image"
src="https://github.com/user-attachments/assets/ece0dfab-1e50-4121-9985-d585f7fcd4d2"
/>


## What Changed
- Added `correctness_score` field (float 0.0-1.0) to
`GraphExecutionStats` model
- **REFACTORED**: Removed duplicate `llm_utils.py` and reused existing
`AIStructuredResponseGeneratorBlock` logic
- Updated activity status generator to use structured responses instead
of plain text
- Modified prompts to include correctness assessment with 5-tier scoring
system:
  - 0.0-0.2: Failure
  - 0.2-0.4: Poor 
  - 0.4-0.6: Partial Success
  - 0.6-0.8: Mostly Successful
  - 0.8-1.0: Success
- Updated manager.py to extract and set both activity_status and
correctness_score
- Fixed tests to work with existing structured response interface

## Technical Details
- **Code Reuse**: Eliminated duplication by using existing
`AIStructuredResponseGeneratorBlock` instead of creating new LLM
utilities
- Added JSON validation with retry logic for malformed responses
- Maintained backward compatibility for existing activity status
functionality
- Score is clamped to valid 0.0-1.0 range and validated
- All type errors resolved and linting passes

## Test Plan
- [x] All existing tests pass with refactored structure
- [x] Structured LLM call functionality tested with success and error
cases
- [x] Activity status generation tested with various execution scenarios
- [x] Integration tests verify both fields are properly set in execution
stats
- [x] No code duplication - reuses existing block logic

🤖 Generated with [Claude Code](https://claude.ai/code)

---------

Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: claude[bot] <41898282+claude[bot]@users.noreply.github.com>
Co-authored-by: Zamil Majdy <majdyz@users.noreply.github.com>
2025-11-06 04:42:13 +00:00
Zamil Majdy
193866232c hotfix(backend): fix rate-limited messages blocking queue by republishing to back (#11326)
## Summary
Fix critical queue blocking issue where rate-limited user messages
prevent other users' executions from being processed, causing the 135
late executions reported in production.

## Root Cause Analysis
When a user exceeds `max_concurrent_graph_executions_per_user` (25), the
executor uses `basic_nack(requeue=True)` which sends the message to the
**FRONT** of the RabbitMQ queue. This creates an infinite blocking loop
where:
1. Rate-limited message goes to front of queue
2. Gets processed, hits rate limit again  
3. Goes back to front of queue
4. Blocks all other users' messages indefinitely

## Solution Implementation

### 🔧 Core Changes
- **New setting**: `requeue_by_republishing` (default: `True`) in
`backend/util/settings.py`
- **Smart `_ack_message`**: Automatically uses republishing when
`requeue=True` and setting enabled
- **Efficient implementation**: Uses existing `self.run_client`
connection instead of creating new ones
- **Integration test**: Real RabbitMQ test validates queue ordering
behavior

### 🔄 Technical Implementation
**Before (blocking):**
```python
basic_nack(delivery_tag, requeue=True)  # Goes to FRONT of queue 
```

**After (non-blocking):**
```python
if requeue and self.config.requeue_by_republishing:
    # First: Republish to BACK of queue
    self.run_client.publish_message(...)
    # Then: Reject without requeue
    basic_nack(delivery_tag, requeue=False)
```

### 📊 Impact
-  **Other users' executions no longer blocked** by rate-limited users
-  **Fair queue processing** - FIFO behavior maintained for all users
-  **Rate limiting still works** - just doesn't block others
-  **Configurable** - can revert to old behavior with
`requeue_by_republishing=False`
-  **Zero performance impact** - uses existing connections

## Test Plan
- **Integration test**: `test_requeue_integration.py` validates real
RabbitMQ queue ordering
- **Scenario testing**: Confirms rate-limited messages go to back of
queue
- **Cross-user validation**: Verifies other users' messages process
correctly
- **Setting test**: Confirms configuration loads with correct defaults

## Deployment Strategy
This is a **hotfix** that can be deployed immediately:
- **Backward compatible**: Old behavior available via config
- **Safe default**: New behavior is safer than current state
- **No breaking changes**: All existing functionality preserved
- **Immediate relief**: Resolves production queue blocking

## Files Modified
- `backend/executor/manager.py`: Enhanced `_ack_message` logic and
`_requeue_message_to_back` method
- `backend/util/settings.py`: Added `requeue_by_republishing`
configuration field
- `test_requeue_integration.py`: Integration test for queue ordering
validation

## Related Issues
Fixes the 135 late executions issue where messages were stuck in QUEUED
state despite available executor capacity (583m/600m utilization).

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-11-05 16:24:07 +00:00
Zamil Majdy
4922f88851 feat(backend/executor): Implement cascading stop for nested graph executions (#11277)
## Summary
Fixes critical issue where child executions spawned by
`AgentExecutorBlock` continue running after parent execution is stopped.
Implements parent-child execution tracking and recursive cascading stop
logic to ensure entire execution trees are terminated together.

## Background
When a parent graph execution containing `AgentExecutorBlock` nodes is
stopped, only the parent was terminated. Child executions continued
running, leading to:
-  Orphaned child executions consuming credits
-  No user control over execution trees  
-  Race conditions where children start after parent stops
-  Resource leaks from abandoned executions

## Core Changes

### 1. Database Schema (`schema.prisma` + migration)
```sql
-- Add nullable parent tracking field
ALTER TABLE "AgentGraphExecution" ADD COLUMN "parentGraphExecutionId" TEXT;

-- Add self-referential foreign key with graceful deletion
ALTER TABLE "AgentGraphExecution" ADD CONSTRAINT "AgentGraphExecution_parentGraphExecutionId_fkey" 
  FOREIGN KEY ("parentGraphExecutionId") REFERENCES "AgentGraphExecution"("id") 
  ON DELETE SET NULL ON UPDATE CASCADE;

-- Add index for efficient child queries
CREATE INDEX "AgentGraphExecution_parentGraphExecutionId_idx" 
  ON "AgentGraphExecution"("parentGraphExecutionId");
```

### 2. Parent ID Propagation (`backend/blocks/agent.py`)
```python
# Extract current graph execution ID and pass as parent to child
execution = add_graph_execution(
    # ... other params
    parent_graph_exec_id=graph_exec_id,  # NEW: Track parent relationship
)
```

### 3. Data Layer (`backend/data/execution.py`)
```python
async def get_child_graph_executions(parent_exec_id: str) -> list[GraphExecution]:
    """Get all child executions of a parent execution."""
    children = await AgentGraphExecution.prisma().find_many(
        where={"parentGraphExecutionId": parent_exec_id, "isDeleted": False}
    )
    return [GraphExecution.from_db(child) for child in children]
```

### 4. Cascading Stop Logic (`backend/executor/utils.py`)
```python
async def stop_graph_execution(
    user_id: str,
    graph_exec_id: str,
    wait_timeout: float = 15.0,
    cascade: bool = True,  # NEW parameter
):
    # 1. Find all child executions
    if cascade:
        children = await _get_child_executions(graph_exec_id)
        
        # 2. Stop all children recursively in parallel
        if children:
            await asyncio.gather(
                *[stop_graph_execution(user_id, child.id, wait_timeout, True) 
                  for child in children],
                return_exceptions=True,  # Don't fail parent if child fails
            )
    
    # 3. Stop the parent execution
    # ... existing stop logic
```

### 5. Race Condition Prevention (`backend/executor/manager.py`)
```python
# Before executing queued child, check if parent was terminated
if parent_graph_exec_id:
    parent_exec = get_db_client().get_graph_execution_meta(parent_graph_exec_id, user_id)
    if parent_exec and parent_exec.status == ExecutionStatus.TERMINATED:
        # Skip execution, mark child as terminated
        get_db_client().update_graph_execution_stats(
            graph_exec_id=graph_exec_id,
            status=ExecutionStatus.TERMINATED,
        )
        return  # Don't start orphaned child
```

## How It Works

### Before (Broken)
```
User stops parent execution
    ↓
Parent terminates ✓
    ↓
Child executions keep running ✗
    ↓
User cannot stop children ✗
```

### After (Fixed)
```
User stops parent execution
    ↓
Query database for all children
    ↓
Recursively stop all children in parallel
    ↓
Wait for children to terminate
    ↓
Stop parent execution
    ↓
All executions in tree stopped ✓
```

### Race Prevention
```
Child in QUEUED status
    ↓
Parent stopped
    ↓
Child picked up by executor
    ↓
Pre-flight check: parent TERMINATED?
    ↓
Yes → Skip execution, mark child TERMINATED
    ↓
Child never runs ✓
```

## Edge Cases Handled
 **Deep nesting** - Recursive cascading handles multi-level trees  
 **Queued children** - Pre-flight check prevents execution  
 **Race conditions** - Child spawned during stop operation  
 **Partial failures** - `return_exceptions=True` continues on error  
 **Multiple children** - Parallel stop via `asyncio.gather()`  
 **No parent** - Backward compatible (nullable field)  
 **Already completed** - Existing status check handles it  

## Performance Impact
- **Stop operation**: O(depth) with parallel execution vs O(1) before
- **Memory**: +36 bytes per execution (one UUID reference)
- **Database**: +1 query per tree level, indexed for efficiency

## API Changes (Backward Compatible)

### `stop_graph_execution()` - New Optional Parameter
```python
# Before
async def stop_graph_execution(user_id: str, graph_exec_id: str, wait_timeout: float = 15.0)

# After  
async def stop_graph_execution(user_id: str, graph_exec_id: str, wait_timeout: float = 15.0, cascade: bool = True)
```
**Default `cascade=True`** means existing callers get the new behavior
automatically.

### `add_graph_execution()` - New Optional Parameter
```python
async def add_graph_execution(..., parent_graph_exec_id: Optional[str] = None)
```

## Security & Safety
-  **User verification** - Users can only stop their own executions
(parent + children)
-  **No cycles** - Self-referential FK prevents infinite loops  
-  **Graceful degradation** - Errors in child stops don't block parent
stop
-  **Rate limits** - Existing execution rate limits still apply

## Testing Checklist

### Database Migration
- [x] Migration runs successfully  
- [x] Prisma client regenerates without errors
- [x] Existing tests pass

### Core Functionality  
- [ ] Manual test: Stop parent with running child → child stops
- [ ] Manual test: Stop parent with queued child → child never starts
- [ ] Unit test: Cascading stop with multiple children
- [ ] Unit test: Deep nesting (3+ levels)
- [ ] Integration test: Race condition prevention

## Breaking Changes
**None** - All changes are backward compatible with existing code.

## Rollback Plan
If issues arise:
1. **Code rollback**: Revert PR, redeploy
2. **Database rollback**: Drop column and constraints (non-destructive)

---

**Note**: This branch contains additional unrelated changes from merging
with `dev`. The core cascading stop feature involves only:
- `schema.prisma` + migration
- `backend/data/execution.py` 
- `backend/executor/utils.py`
- `backend/blocks/agent.py`
- `backend/executor/manager.py`

All other file changes are from dev branch updates and not part of this
feature.

🤖 Generated with [Claude Code](https://claude.ai/code)

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Nested graph executions: parent-child tracking and retrieval of child
executions

* **Improvements**
* Cascading stop: stopping a parent optionally terminates child
executions
  * Parent execution IDs propagated through runs and surfaced in logs
  * Per-user/graph concurrent execution limits enforced

* **Bug Fixes**
* Skip enqueuing children if parent is terminated; robust handling when
parent-status checks fail

* **Tests**
  * Updated tests to cover parent linkage in graph creation
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-10-29 11:11:22 +00:00
Zamil Majdy
5fb142c656 fix(backend/executor): ensure cluster lock release on all execution submission failures (#11281)
## Root Cause
During rolling deployment, execution
`97058338-052a-4528-87f4-98c88416bb7f` got stuck in QUEUED state
because:

1. Pod acquired cluster lock successfully during shutdown  
2. Subsequent setup operations failed (ThreadPoolExecutor shutdown,
resource exhaustion, etc.)
3. **No error handling existed** around the critical section after lock
acquisition
4. Cluster lock remained stuck in Redis for 5 minutes (TTL timeout)
5. Other pods couldn't acquire the lock, leaving execution permanently
queued

## The Fix

### Problem: Critical Section Not Protected
The original code had no error handling for the entire critical section
after successful lock acquisition:
```python
# Original code - no error handling after lock acquired
current_owner = cluster_lock.try_acquire()
if current_owner != self.executor_id:
    return  # didn't get lock
    
# CRITICAL SECTION - any failure here leaves lock stuck
self._execution_locks[graph_exec_id] = cluster_lock  # Could fail: memory
logger.info("Acquired cluster lock...")              # Could fail: logging  
cancel_event = threading.Event()                     # Could fail: resources
future = self.executor.submit(...)                   # Could fail: shutdown
self.active_graph_runs[...] = (future, cancel_event) # Could fail: memory
```

### Solution: Wrap Entire Critical Section  
Protect ALL operations after successful lock acquisition:
```python
# Fixed code - comprehensive error handling
current_owner = cluster_lock.try_acquire()
if current_owner != self.executor_id:
    return  # didn't get lock

# Wrap ENTIRE critical section after successful acquisition
try:
    self._execution_locks[graph_exec_id] = cluster_lock
    logger.info("Acquired cluster lock...")
    cancel_event = threading.Event()
    future = self.executor.submit(...)
    self.active_graph_runs[...] = (future, cancel_event)
except Exception as e:
    # Release cluster lock before requeue
    cluster_lock.release()
    del self._execution_locks[graph_exec_id] 
    _ack_message(reject=True, requeue=True)
    return
```

### Why This Comprehensive Approach Works
- **Complete protection**: Any failure in critical section → lock
released
- **Proper cleanup order**: Lock released → message requeued → another
pod can try
- **Uses existing infrastructure**: Leverages established
`_ack_message()` requeue logic
- **Handles all scenarios**: ThreadPoolExecutor shutdown, resource
exhaustion, memory issues, logging failures

## Protected Failure Scenarios
1. **Memory exhaustion**: `_execution_locks` assignment or
`active_graph_runs` assignment
2. **Resource exhaustion**: `threading.Event()` creation fails
3. **ThreadPoolExecutor shutdown**: `executor.submit()` with "cannot
schedule new futures after shutdown"
4. **Logging system failures**: `logger.info()` calls fail
5. **Any unexpected exceptions**: Network issues, disk problems, etc.

## Validation
-  All existing tests pass  
-  Maintains exact same success path behavior
-  Comprehensive error handling for all failure points
-  Minimal code change with maximum protection

## Impact
- **Eliminates stuck executions** during pod lifecycle events (rolling
deployments, scaling, crashes)
- **Faster recovery**: Immediate requeue vs 5-minute Redis TTL wait
- **Higher reliability**: Handles ANY failure in the critical section
- **Production-ready**: Comprehensive solution for distributed lock
management

This prevents the exact race condition that caused execution
`97058338-052a-4528-87f4-98c88416bb7f` to be stuck for >300 seconds,
plus many other potential failure scenarios.

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-10-29 08:56:24 +00:00
Reinier van der Leer
5e5f45a713 fix(backend): Fix various warnings (#11252)
- Resolves #11251

This fixes all the warnings mentioned in #11251, reducing noise and
making our logs and error alerts more useful :)

### Changes 🏗️

- Remove "Block {block_name} has multiple credential inputs" warning
(not actually an issue)
- Rename `json` attribute of `MainCodeExecutionResult` to `json_data`;
retain serialized name through a field alias
- Replace `Path(regex=...)` with `Path(pattern=...)` in
`get_shared_execution` endpoint parameter config
- Change Uvicorn's WebSocket module to new Sans-I/O implementation for
WS server
- Disable Uvicorn's WebSocket module for REST server
- Remove deprecated `enable_cleanup_closed=True` argument in
`CloudStorageHandler` implementation
- Replace Prisma transaction timeout `int` argument with a `timedelta`
value
- Update Sentry SDK to latest version (v2.42.1)
- Broaden filter for cleanup warnings from indirect dependency `litellm`
- Fix handling of `MissingConfigError` in REST server endpoints

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - Check that the warnings are actually gone
- [x] Deploy to dev environment and run a graph; check for any warnings
  - Test WebSocket server
- [x] Run an agent in the Builder; make sure real-time execution updates
still work
2025-10-28 13:18:45 +00:00
Reinier van der Leer
e06e7ff33f fix(backend): Implement graceful shutdown in AppService to prevent RPC errors (#11240)
We're currently seeing errors in the `DatabaseManager` while it's
shutting down, like:

```
WARNING [DatabaseManager] Termination request: SystemExit; 0 executing cleanup.
INFO [DatabaseManager]  Disconnecting Database...
INFO [PID-1|THREAD-29|DatabaseManager|Prisma-82fb1994-4b87-40c1-8869-fbd97bd33fc8] Releasing connection started...
INFO [PID-1|THREAD-29|DatabaseManager|Prisma-82fb1994-4b87-40c1-8869-fbd97bd33fc8] Releasing connection completed successfully.
INFO [DatabaseManager] Terminated.
ERROR POST /create_or_add_to_user_notification_batch failed: Failed to create or add to notification batch for user {user_id} and type AGENT_RUN: NoneType: None
```

This indicates two issues:
- The service doesn't wait for pending RPC calls to finish before
terminating
- We're using `logger.exception` outside an error handling context,
causing the confusing and not much useful `NoneType: None` to be printed
instead of error info

### Changes 🏗️

- Implement graceful shutdown in `AppService` so in-flight RPC calls can
finish
  - Add tests for graceful shutdown
  - Prevent `AppService` accepting new requests during shutdown
- Rework `AppService` lifecycle management; add support for async
`lifespan`
- Fix `AppService` endpoint error logging
- Improve logging in `AppProcess` and `AppService`

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
- Deploy to Dev cluster, then `kubectl rollout restart` the different
services a few times
    - [x] -> `DatabaseManager` doesn't break on re-deployment
    - [x] -> `Scheduler` doesn't break on re-deployment
    - [x] -> `NotificationManager` doesn't break on re-deployment
2025-10-25 14:47:19 +00:00
Zamil Majdy
0bb2b87c32 fix(backend): resolve UserBalance migration issues and credit spending bug (#11192)
## Summary
Fix critical UserBalance migration and spending issues affecting users
with credits from transaction history but no UserBalance records.

## Root Issues Fixed

### Issue 1: UserBalance Migration Complexity
- **Problem**: Complex data migration with timestamp logic issues and
potential race conditions
- **Solution**: Simplified to idempotent table creation only,
application handles auto-population

### Issue 2: Credit Spending Bug  
- **Problem**: Users with $10.0 from transaction history couldn't spend
$0.16
- **Root Cause**: `_add_transaction` and `_enable_transaction` only
checked UserBalance table, returning 0 balance for users without records
- **Solution**: Enhanced both methods with transaction history fallback
logic

### Issue 3: Exception Handling Inconsistency
- **Problem**: Raw SQL unique violations raised different exception
types than Prisma ORM
- **Solution**: Convert raw SQL unique violations to
`UniqueViolationError` at source

## Changes Made

### Migration Cleanup
- **Idempotent operations**: Use `CREATE TABLE IF NOT EXISTS`, `CREATE
INDEX IF NOT EXISTS`
- **Inline foreign key**: Define constraint within `CREATE TABLE`
instead of separate `ALTER TABLE`
- **Removed data migration**: Application creates UserBalance records
on-demand
- **Safe to re-run**: No errors if table/index/constraint already exists

### Credit Logic Fixes
- **Enhanced `_add_transaction`**: Added transaction history fallback in
`user_balance_lock` CTE
- **Enhanced `_enable_transaction`**: Added same fallback logic for
payment fulfillment
- **Exception normalization**: Convert raw SQL unique violations to
`UniqueViolationError`
- **Simplified `onboarding_reward`**: Use standardized
`UniqueViolationError` catching

### SQL Fallback Pattern
```sql
COALESCE(
    (SELECT balance FROM UserBalance WHERE userId = ? FOR UPDATE),
    -- Fallback: compute from transaction history if UserBalance doesn't exist
    (SELECT COALESCE(ct.runningBalance, 0) 
     FROM CreditTransaction ct 
     WHERE ct.userId = ? AND ct.isActive = true AND ct.runningBalance IS NOT NULL 
     ORDER BY ct.createdAt DESC LIMIT 1),
    0
) as balance
```

## Impact

### Before
-  Users with transaction history but no UserBalance couldn't spend
credits
-  Migration had complex timestamp logic with potential bugs  
-  Raw SQL and Prisma exceptions handled differently
-  Error: "Insufficient balance of $10.0, where this will cost $0.16"

### After  
-  Seamless spending for all users regardless of UserBalance record
existence
-  Simple, idempotent migration that's safe to re-run
-  Consistent exception handling across all credit operations
-  Automatic UserBalance record creation during first transaction
-  Backward compatible - existing users unaffected

## Business Value
- **Eliminates user frustration**: Users can spend their credits
immediately
- **Smooth migration path**: From old User.balance to new UserBalance
table
- **Better reliability**: Atomic operations with proper error handling
- **Maintainable code**: Consistent patterns across credit operations

## Test Plan
- [ ] Manual testing with users who have transaction history but no
UserBalance records
- [ ] Verify migration can be run multiple times safely
- [ ] Test spending credits works for all user scenarios
- [ ] Verify payment fulfillment (`_enable_transaction`) works correctly
- [ ] Add comprehensive test coverage for this scenario

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-10-17 19:46:13 +07:00
Zamil Majdy
dfdd632161 fix(backend/util): handle nested Pydantic models in SafeJson (#11188)
## Summary

Fixes a critical serialization bug introduced in PR #11187 where
`SafeJson` failed to serialize dictionaries containing Pydantic models,
causing 500 Internal Server Errors in the executor service.

## Problem

The error manifested as:
```
CRITICAL: Operation Approaching Failure Threshold: Service communication: '_call_method_async'
Current attempt: 50/50
Error: HTTPServerError: HTTP 500: Server error '500 Internal Server Error' 
for url 'http://autogpt-database-manager.prod-agpt.svc.cluster.local:8005/create_graph_execution'
```

Root cause in `create_graph_execution`
(backend/data/execution.py:656-657):
```python
"credentialInputs": SafeJson(credential_inputs) if credential_inputs else Json({})
```

Where `credential_inputs: Mapping[str, CredentialsMetaInput]` is a dict
containing Pydantic models.

After PR #11187's refactor, `_sanitize_value()` only converted top-level
BaseModel instances to dicts, but didn't handle BaseModel instances
nested inside dicts/lists/tuples. This caused Prisma's JSON serializer
to fail with:
```
TypeError: Type <class 'backend.data.model.CredentialsMetaInput'> not serializable
```

## Solution

Added BaseModel handling to `_sanitize_value()` to recursively convert
Pydantic models to dicts before sanitizing:

```python
elif isinstance(value, BaseModel):
    # Convert Pydantic models to dict and recursively sanitize
    return _sanitize_value(value.model_dump(exclude_none=True))
```

This ensures all nested Pydantic models are properly serialized
regardless of nesting depth.

## Changes

- **backend/util/json.py**: Added BaseModel check to `_sanitize_value()`
function
- **backend/util/test_json.py**: Added 6 comprehensive tests covering:
  - Dict containing Pydantic models
  - Deeply nested Pydantic models  
  - Lists of Pydantic models in dicts
  - The exact CredentialsMetaInput scenario
  - Complex mixed structures
  - Models with control characters

## Testing

 All new tests pass  
 Verified fix resolves the production 500 error  
 Code formatted with `poetry run format`

## Related

- Fixes issues introduced in PR #11187
- Related to executor service 500 errors in production

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Bentlybro <Github@bentlybro.com>
Co-authored-by: Claude <noreply@anthropic.com>
2025-10-17 09:27:09 +00:00
Nicholas Tindle
b230b1b5cf feat(backend): Add Sentry user and tag tracking to node execution (#11170)
Integrates Sentry SDK to set user and contextual tags during node
execution for improved error tracking and user count analytics. Ensures
Sentry context is properly set and restored, and exceptions are captured
with relevant context before scope restoration.

<!-- Clearly explain the need for these changes: -->

### Changes 🏗️
Adds sentry tracking to block failures
<!-- Concisely describe all of the changes made in this pull request:
-->

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] Test to make sure the userid and block details show up in Sentry
  - [x] make sure other errors aren't contaminated 


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

- New Features
- Added conditional support for feature flags when configured, enabling
targeted rollouts and experiments without impacting unconfigured
environments.

- Chores
- Enhanced error monitoring with richer contextual data during node
execution to improve stability and diagnostics.
- Updated metrics initialization to dynamically include feature flag
integrations when available, without altering behavior for unconfigured
setups.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-10-15 14:33:08 +00:00
Zamil Majdy
934cb3a9c7 feat(backend): Make execution limit per user per graph and reduce to 25 (#11169)
## Summary
- Changed max_concurrent_graph_executions_per_user from 50 to 25
concurrent executions
- Updated the limit to be per user per graph instead of globally per
user
- Users can now run different graphs concurrently without being limited
by executions of other graphs
- Enhanced database query to filter by both user_id and graph_id

## Changes Made
- **Settings**: Reduced default limit from 50 to 25 and updated
description to clarify per-graph scope
- **Database Layer**: Modified `get_graph_executions_count` to accept
optional `graph_id` parameter
- **Executor Manager**: Updated rate limiting logic to check
per-user-per-graph instead of per-user globally
- **Logging**: Enhanced warning messages to include graph_id context

## Test plan
- [ ] Verify that users can run up to 25 concurrent executions of the
same graph
- [ ] Verify that users can run different graphs concurrently without
interference
- [ ] Test rate limiting behavior when limit is exceeded for a specific
graph
- [ ] Confirm logging shows correct graph_id context in rate limit
messages

## Impact
This change improves the user experience by allowing concurrent
execution of different graphs while still preventing resource exhaustion
from running too many instances of the same graph.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-authored-by: Claude <noreply@anthropic.com>
2025-10-15 00:02:55 +00:00
Zamil Majdy
05a72f4185 feat(backend): implement user rate limiting for concurrent graph executions (#11128)
## Summary
Add configurable rate limiting to prevent users from exceeding the
maximum number of concurrent graph executions, defaulting to 50 per
user.

## Changes Made

### Configuration (`backend/util/settings.py`)
- Add `max_concurrent_graph_executions_per_user` setting (default: 50,
range: 1-1000)
- Configurable via environment variables or settings file

### Database Query Function (`backend/data/execution.py`) 
- Add `get_graph_executions_count()` function for efficient count
queries
- Supports filtering by user_id, statuses, and time ranges
- Used to check current RUNNING/QUEUED executions per user

### Database Manager Integration (`backend/executor/database.py`)
- Expose `get_graph_executions_count` through DatabaseManager RPC
interface
- Follows existing patterns for database operations
- Enables proper service-to-service communication

### Rate Limiting Logic (`backend/executor/manager.py`)
- Inline rate limit check in `_handle_run_message()` before cluster lock
- Use existing `db_client` pattern for consistency
- Reject and requeue executions when limit exceeded
- Graceful error handling - proceed if rate limit check fails
- Enhanced logging with user_id and current/max execution counts

## Technical Implementation
- **Database approach**: Query actual execution statuses for accuracy
- **RPC pattern**: Use DatabaseManager client following existing
codebase patterns
- **Fail-safe design**: Proceed with execution if rate limit check fails
- **Requeue on limit**: Rejected executions are requeued for later
processing
- **Early rejection**: Check rate limit before expensive cluster lock
operations

## Rate Limiting Flow
1. Parse incoming graph execution request
2. Query database via RPC for user's current RUNNING/QUEUED execution
count
3. Compare against configured limit (default: 50)
4. If limit exceeded: reject and requeue message
5. If within limit: proceed with normal execution flow

## Configuration Example
```env
MAX_CONCURRENT_GRAPH_EXECUTIONS_PER_USER=25  # Reduce to 25 for stricter limits
```

## Test plan
- [x] Basic functionality tested - settings load correctly, database
function works
- [x] ExecutionManager imports and initializes without errors
- [x] Database manager exposes the new function through RPC
- [x] Code follows existing patterns and conventions
- [ ] Integration testing with actual rate limiting scenarios
- [ ] Performance testing to ensure minimal impact on execution pipeline

🤖 Generated with [Claude Code](https://claude.ai/code)

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-10-11 08:02:34 +07:00
Bently
4856bd1f3a fix(backend): prevent sub-agent execution visibility across users (#11132)
Fixes a issue where sub-agent executions triggered by one user were
visible in the original agent author's execution library.
 ## Solution

Fixed the user_id attribution in
`autogpt_platform/backend/backend/executor/manager.py` by ensuring that
sub-agent executions always use the actual executor's user_id rather
than the agent author's user_id stored in node defaults.

### Changes
- Added user_id override in `execute_node()` function when preparing
AgentExecutorBlock input (line 194)
- Ensures sub-agent executions are correctly attributed to the user
running them, not the agent author
- Maintains proper privacy isolation between users in marketplace agent
scenarios

### Security Impact
- **Before**: When User B downloaded and ran a marketplace agent
containing sub-agents owned by User A, the sub-agent executions appeared
in User A's library
- **After**: Sub-agent executions now only appear in the library of the
user who actually ran them
- Prevents unauthorized access to execution data and user privacy
violation

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Test plan: -->
  - [x] Create an agent with sub-agents as User A
  - [x] Publish agent to marketplace
  - [x] Run the agent as User B
- [x] Verify User A cannot see User B's sub-agent executions in their
library
  - [x] Verify User B can see their own sub-agent executions
  - [x] Verify primary agent executions remain correctly filtered
2025-10-09 11:17:26 +00:00
Zamil Majdy
59c27fe248 feat(backend): implement comprehensive rate-limited Discord alerting system (#11106)
## Summary
Implement comprehensive Discord alerting system with intelligent rate
limiting to prevent spam and provide proper visibility into system
failures across retry mechanisms and execution errors.

## Key Features

### 🚨 Rate-Limited Discord Alerting Infrastructure
- **Reusable rate-limited alerts**: `send_rate_limited_discord_alert()`
function for any Discord alerts
- **5-minute rate limiting**: Prevents spam for identical error
signatures (function+error+context)
- **Thread-safe**: Proper locking for concurrent alert attempts
- **Configurable channels**: Support custom Discord channels or default
to PLATFORM
- **Graceful failure handling**: Alert failures don't break main
application flow

### 🔄 Enhanced Retry Alert System
- **Unified threshold alerting**: Both general retries and
infrastructure retries alert at EXCESSIVE_RETRY_THRESHOLD (50 attempts)
- **Critical retry alerts**: Early warning when operations approach
failure threshold
- **Infrastructure monitoring**: Dedicated alerts for database, Redis,
RabbitMQ connection issues
- **Rate limited**: All retry alerts use rate limiting to prevent
overwhelming Discord channels

### 📊 Unknown Execution Error Alerts
- **Automatic error detection**: Alert for unexpected graph execution
failures
- **Rich context**: Include user ID, graph ID, execution ID, error type
and message
- **Filtered alerts**: Skip known errors (InsufficientBalanceError,
ModerationError)
- **Proper error tracking**: Ensure execution_stats.error is set for all
error types

## Technical Implementation

### Rate Limiting Strategy
```python
# Create unique signatures based on function+error+context
error_signature = f"{context}:{func_name}:{type(exception).__name__}:{str(exception)[:100]}"
```
- **5-minute windows**: ALERT_RATE_LIMIT_SECONDS = 300 prevents
duplicate alerts
- **Memory efficient**: Only store last alert timestamp per unique error
signature
- **Context awareness**: Same error in different contexts can send
separate alerts

### Alerting Hierarchy
1. **50 attempts**: Critical alert warning about approaching failure
(EXCESSIVE_RETRY_THRESHOLD)
2. **100 attempts**: Final infrastructure failure (conn_retry max_retry)
3. **Unknown execution errors**: Immediate rate-limited alerts for
unexpected failures

## Files Modified

### Core Implementation
- `backend/executor/manager.py`: Unknown execution error alerts with
rate limiting
- `backend/util/retry.py`: Comprehensive rate-limited alerting
infrastructure
- `backend/util/retry_test.py`: Full test coverage for rate limiting
functionality (14 tests)

### Code Quality Improvements
- **Inlined alert messages**: Eliminated unnecessary temporary variables
- **Simplified logic**: Removed excessive comments and redundant alerts
- **Consistent patterns**: All alert functions follow same clean code
style
- **DRY principle**: Reusable rate-limited alert system for future
monitoring needs

## Benefits

### 🛡️ Prevents Alert Spam
- **Rate limiting**: No more overwhelming Discord channels with
duplicate alerts
- **Intelligent deduplication**: Same errors rate limited while
different errors get through
- **Thread safety**: Concurrent operations handled correctly

### 🔍 Better System Visibility  
- **Unknown errors**: Issues that need investigation are properly
surfaced
- **Infrastructure monitoring**: Early warning for
database/Redis/RabbitMQ issues
- **Rich context**: All necessary debugging information included in
alerts

### 🧹 Maintainable Codebase
- **Reusable infrastructure**: `send_rate_limited_discord_alert()` for
future monitoring
- **Clean, consistent code**: Inlined messages, simplified logic, proper
abstractions
- **Comprehensive testing**: Rate limiting edge cases and real-world
scenarios covered

## Validation Results
-  All 14 retry tests pass including comprehensive rate limiting
coverage
-  Manager execution tests pass validating integration with execution
flow
-  Thread safety validated with concurrent alert attempt tests
-  Real-world scenarios tested including the specific spend_credits
spam issue that motivated this work
-  Code formatting, linting, and type checking all pass

## Before/After Comparison

### Before
- No rate limiting → Discord spam for repeated errors
- Unknown execution errors not monitored → Issues went unnoticed  
- Inconsistent alerting thresholds → Confusing monitoring
- Verbose code with temporary variables → Harder to maintain

### After  
-  Rate-limited intelligent alerting prevents spam
-  Unknown execution errors properly monitored with context
-  Unified 50-attempt threshold for consistent monitoring
-  Clean, maintainable code with reusable infrastructure

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-10-09 08:22:15 +07:00
Zamil Majdy
4e1557e498 fix(backend): Add dynamic input pin support for Smart Decision Maker Block (#11082)
## Summary

- Centralize dynamic field delimiters and helpers in
backend/data/dynamic_fields.py.
- Refactor SmartDecisionMaker: build function signatures with
dynamic-field mapping and re-map tool outputs back to original dynamic
names.
- Deterministic retry loop with retry-only feedback to avoid polluting
final conversation history.
- Update executor/utils.py and data/graph.py to use centralized
utilities.
- Update and extend tests: dynamic-field E2E flow, mapping verification,
output yielding, and retry validation; switch mocked llm_call to
AsyncMock; align tool-name expectations.
- Add a single-tool fallback in schema lookup to support mocked
scenarios.

## Validation

- Full backend test suite: 1125 passed, 88 skipped, 53 warnings (local).
- Backend lint/format pass.

## Scope

- Minimal and localized to SmartDecisionMaker and dynamic-field
utilities; unrelated pyright warnings remain unchanged.

## Risks/Mitigations

- Behavior is backward-compatible; dynamic-field constants are
centralized and reused.
- Output re-mapping only affects SmartDecisionMaker tool outputs and
matches existing link naming conventions.

## Checklist

- [x] Formatted and linted
- [x] All updated tests pass locally
- [x] No secrets introduced

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-10-04 14:23:13 +00:00
Zamil Majdy
f314fbf14f fix(backend): resolve two critical long-running agent execution failures (#11011)
## Summary

Fix two production issues causing agent execution failures that occurred
this morning:

1. **AsyncRedisLock Release Error** (ExecutionID:
08b2c251-ee27-45de-b88d-1792823ca3ee)
   - Error: "Cannot release a lock that's no longer owned" 
- Root cause: Race condition where lock expires during long database
operations
   - Location: backend/executor/manager.py synchronized context manager

2. **Tool Call Parameter Validation** (ExecutionID:
766fd9a0-5f22-4a77-96e8-14c9d02f3292)
- Issue: LLM used typo'd parameter 'maximum_keyword_difficulty' instead
of 'max_keyword_difficulty'
- SmartDecisionMakerBlock silently accepted typo, setting correct
parameter to null
- Result: Downstream blocks received null values causing execution
failures

## Changes Made

### AsyncRedisLock Error Handling
- Add try-catch blocks around AsyncRedisLock.release() calls in
ExecutionManager and OAuth refresh
- Prevent crashes when locks expire between ownership check and release
- Log warnings instead of crashing execution

### Tool Call Parameter Validation  
- **Reject unknown parameters**: Raise ValueError for typo'd parameter
names with detailed error messages
- **Allow optional parameters**: Only validate missing REQUIRED
parameters
- **Safe parameter access**: Use .get() to handle optional parameters
with defaults
- **Clean code**: Extract parameters object once to eliminate
duplication

## Technical Implementation

**Lock Release Protection:**
```python
if await lock.locked() and await lock.owned():
    try:
        await lock.release()
    except Exception as e:
        logger.warning(f"Failed to release lock for key {key}: {e}")
```

**Parameter Validation Logic:**
```python
# Get parameters schema from tool definition  
if tool_def and "function" in tool_def and "parameters" in tool_def["function"]:
    parameters = tool_def["function"]["parameters"]
    expected_args = parameters.get("properties", {})
    required_params = set(parameters.get("required", []))

# Detect parameter typos and missing required params
unexpected_args = provided_args - expected_args_set  
missing_required_args = required_params - provided_args

if unexpected_args or missing_required_args:
    raise ValueError(error_msg)  # Detailed error explaining the problem
```

## Testing

- [x] All existing tests pass
- [x] Lock error handling prevents execution crashes  
- [x] Tool validation catches typos while allowing optional parameters
- [x] Maintains backward compatibility with existing workflows

## Impact

-  No more "Cannot release a lock" crashes during long database
operations
-  Tool calls with typo'd parameters are rejected with clear error
messages
-  Optional parameters work correctly with default values  
-  Production stability improved with graceful error handling

## Files Modified

- `backend/executor/manager.py` - AsyncRedisLock error handling in
synchronized context
- `backend/integrations/creds_manager.py` - OAuth refresh lock error
handling
- `backend/blocks/smart_decision_maker.py` - Tool call parameter
validation with typo detection

Fixes two critical production failures that were causing 2/5 agent runs
to fail this morning.

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-09-29 15:34:20 +00:00
Zamil Majdy
a97ff641c3 feat(backend): optimize FastAPI endpoints performance and alert system (#11000)
## Summary

Comprehensive performance optimization fixing event loop binding issues
and addressing all PR feedback.

### Original Performance Issues Fixed

**Event Loop Binding Problems:**
- JWT authentication dependencies were synchronous, causing thread pool
bottlenecks under high concurrency
- FastAPI's default thread pool (40 threads) was insufficient for
high-load scenarios
- Backend services lacked proper event loop configuration

**Security & Performance Improvements:**
- Security middleware converted from BaseHTTPMiddleware to pure ASGI for
better performance
- Added blocks endpoint to cacheable paths for improved response times
- Cross-platform uvloop detection with Windows compatibility

### Key Changes Made

#### 1. JWT Authentication Async Conversion
- **Files**: `autogpt_libs/auth/dependencies.py`,
`autogpt_libs/auth/jwt_utils.py`
- **Change**: Convert all JWT functions to async (`requires_user`,
`requires_admin_user`, `get_user_id`, `get_jwt_payload`)
- **Impact**: Eliminates thread pool blocking, improves concurrency
handling
- **Tests**: All 25+ authentication tests updated to async patterns

#### 2. FastAPI Thread Pool Optimization  
- **File**: `backend/server/rest_api.py:82-93`
- **Change**: Configure thread pool size via
`config.fastapi_thread_pool_size`
- **Default**: Increased from 40 to higher limit for sync operations
- **Impact**: Better handling of remaining sync dependencies

#### 3. Performance-Optimized Security Middleware
- **File**: `backend/server/middleware/security.py`
- **Change**: Pure ASGI implementation replacing BaseHTTPMiddleware
- **Headers**: HTTP spec compliant capitalization
(X-Content-Type-Options, X-Frame-Options, etc.)
- **Caching**: Added `/api/blocks` and `/api/v1/blocks` to cacheable
paths
- **Impact**: Reduced middleware overhead, improved header compliance

#### 4. Cross-Platform Event Loop Configuration
- **File**: `backend/server/rest_api.py:311-312`
- **Change**: Platform-aware uvloop detection: `'uvloop' if
platform.system() != 'Windows' else 'auto'`
- **Impact**: Windows compatibility while maintaining Unix performance
benefits
- **Verified**: 'auto' is valid uvicorn default parameter

#### 5. Enhanced Caching Infrastructure
- **File**: `autogpt_libs/utils/cache.py:118-132`
- **Change**: Per-event-loop asyncio.Lock instances prevent cross-loop
deadlocks
- **Impact**: Thread-safe caching across multiple event loops

#### 6. Database Query Limits & Performance
- **Files**: Multiple data layer files
- **Change**: Added configurable limits to prevent unbounded queries
- **Constants**: `MAX_GRAPH_VERSIONS_FETCH=50`,
`MAX_USER_API_KEYS_FETCH=500`, etc.
- **Impact**: Consistent performance regardless of data volume

#### 7. OpenAPI Documentation Improvements
- **File**: `backend/server/routers/v1.py:68-85`
- **Change**: Added proper response model and schema for blocks endpoint
- **Impact**: Better API documentation and type safety

#### 8. Error Handling & Retry Logic Fixes
- **File**: `backend/util/retry.py:63`
- **Change**: Accurate retry threshold comments referencing
EXCESSIVE_RETRY_THRESHOLD
- **Impact**: Clear documentation for debugging retry scenarios

### ntindle Feedback Addressed

 **HTTP Header Capitalization**: All headers now use proper HTTP spec
capitalization
 **Windows uvloop Compatibility**: Clean platform detection with inline
conditional
 **OpenAPI Response Model**: Blocks endpoint properly documented in
schema
 **Retry Comment Accuracy**: References actual threshold constants
instead of hardcoded numbers
 **Code Cleanliness**: Inline conditionals preferred over verbose if
statements

### Performance Testing Results

**Before Optimization:**
- High latency under concurrent load
- Thread pool exhaustion at ~40 concurrent requests
- Event loop binding issues causing timeouts

**After Optimization:**
- Improved concurrency handling with async JWT pipeline
- Configurable thread pool scaling
- Cross-platform event loop optimization
- Reduced middleware overhead

### Backward Compatibility

 **All existing functionality preserved**  
 **No breaking API changes**  
 **Enhanced test coverage with async patterns**  
 **Windows and Unix compatibility maintained**

### Files Modified

**Core Authentication & Performance:**
- `autogpt_libs/auth/dependencies.py` - Async JWT dependencies
- `autogpt_libs/auth/jwt_utils.py` - Async JWT utilities  
- `backend/server/rest_api.py` - Thread pool config + uvloop detection
- `backend/server/middleware/security.py` - ASGI security middleware

**Database & Limits:**
- `backend/data/includes.py` - Performance constants and configurable
includes
- `backend/data/api_key.py`, `backend/data/credit.py`,
`backend/data/graph.py`, `backend/data/integrations.py` - Query limits

**Caching & Infrastructure:**
- `autogpt_libs/utils/cache.py` - Per-event-loop lock safety
- `backend/server/routers/v1.py` - OpenAPI improvements
- `backend/util/retry.py` - Comment accuracy

**Testing:**
- `autogpt_libs/auth/dependencies_test.py` - 25+ async test conversions
- `autogpt_libs/auth/jwt_utils_test.py` - Async JWT test patterns

Ready for review and production deployment. 🚀

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-09-29 05:32:48 +00:00
Zamil Majdy
3abea1ed96 fix(backend): prevent duplicate graph executions across multiple executor pods (#11008)
## Problem
Multiple executor pods could simultaneously execute the same graph,
leading to:
- Duplicate executions and wasted resources
- Inconsistent execution states and results
- Race conditions in graph execution management
- Inefficient resource utilization in cluster environments

## Solution
Implement distributed locking using ClusterLock to ensure only one
executor pod can process a specific graph execution at a time.

## Key Changes

### Core Fix: Distributed Execution Coordination
- **ClusterLock implementation**: Redis-based distributed locking
prevents duplicate executions
- **Atomic lock acquisition**: Only one executor can hold the lock for a
specific graph execution
- **Automatic lock expiry**: Prevents deadlocks if executor pods crash
or become unresponsive
- **Graceful degradation**: System continues operating even if Redis
becomes temporarily unavailable

### Technical Implementation
- Move ClusterLock to `backend/executor/` alongside ExecutionManager
(its primary consumer)
- Comprehensive integration tests (27 test scenarios) ensure reliability
under all conditions
- Redis client compatibility for different deployment configurations
- Rate-limited lock refresh to minimize Redis load

### Reliability Improvements
- **Context manager support**: Automatic lock cleanup prevents resource
leaks
- **Ownership verification**: Locks can only be refreshed/released by
the owner
- **Concurrency testing**: Thread-safe operations verified under high
contention
- **Error handling**: Robust failure scenarios including network
partitions

## Test Coverage
-  Concurrent executor coordination (prevents duplicate executions)
-  Lock expiry and refresh mechanisms (prevents deadlocks)
-  Redis connection failures (graceful degradation)
-  Thread safety under high load (production scenarios)
-  Long-running executions with periodic refresh

## Impact
- **No more duplicate executions**: Eliminates wasted compute resources
and inconsistent results
- **Improved reliability**: Robust distributed coordination across
executor pods
- **Better resource utilization**: Only one pod processes each execution
- **Scalable architecture**: Supports multiple executor pods without
conflicts

## Validation
- All integration tests pass 
- Existing ExecutionManager functionality preserved   
- No breaking changes to APIs 
- Production-ready distributed locking 

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-09-27 11:42:40 +00:00
Zamil Majdy
27fccdbf31 fix(backend/executor): Make graph execution status transitions atomic and enforce state machine (#10863)
## Summary
- Fixed race condition issues in `update_graph_execution_stats` function
- Implemented atomic status transitions using database-level constraints
- Added state machine enforcement to prevent invalid status transitions
- Eliminated code duplication and improved error handling

## Problem
The `update_graph_execution_stats` function had race condition
vulnerabilities where concurrent status updates could cause invalid
transitions like RUNNING → QUEUED. The function was not durable and
could result in executions moving backwards in their lifecycle, causing
confusion and potential system inconsistencies.

## Root Cause Analysis
1. **Race Conditions**: The function used a broad OR clause that allowed
updates from multiple source statuses without validating the specific
transition
2. **No Atomicity**: No atomic check to ensure the status hadn't changed
between read and write operations
3. **Missing State Machine**: No enforcement of valid state transitions
according to execution lifecycle rules

## Solution Implementation

### 1. Atomic Status Transitions
- Use database-level atomicity by including the current allowed source
statuses in the WHERE clause during updates
- This ensures only valid transitions can occur at the database level

### 2. State Machine Enforcement
Define valid transitions as a module constant
`VALID_STATUS_TRANSITIONS`:
- `INCOMPLETE` → `QUEUED`, `RUNNING`, `FAILED`, `TERMINATED`
- `QUEUED` → `RUNNING`, `FAILED`, `TERMINATED`  
- `RUNNING` → `COMPLETED`, `TERMINATED`, `FAILED`
- `TERMINATED` → `RUNNING` (for resuming halted execution)
- `COMPLETED` and `FAILED` are terminal states with no allowed
transitions

### 3. Improved Error Handling
- Early validation with clear error messages for invalid parameters
- Graceful handling when transitions fail - return current state instead
of None
- Proper logging of invalid transition attempts

### 4. Code Quality Improvements
- Eliminated code duplication in fetch logic
- Added proper type hints and casting
- Made status transitions constant for better maintainability

## Benefits
 **Prevents Invalid Regressions**: No more RUNNING → QUEUED transitions
 **Atomic Operations**: Database-level consistency guarantees  
 **Clear Error Messages**: Better debugging and monitoring  
 **Maintainable Code**: Clean logic flow without duplication  
 **Race Condition Safe**: Handles concurrent updates gracefully  

## Test Plan
- [x] Function imports and basic structure validation
- [x] Code formatting and linting checks pass
- [x] Type checking passes for modified files
- [x] Pre-commit hooks validation

## Technical Details
The key insight is using the database query itself to enforce valid
transitions by filtering on allowed source statuses in the WHERE clause.
This makes the operation truly atomic and eliminates the race condition
window that existed in the previous implementation.

🤖 Generated with [Claude Code](https://claude.ai/code)

---------

Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Nicholas Tindle <nicholas.tindle@agpt.co>
2025-09-14 23:31:02 +00:00
Reinier van der Leer
e16e69ca55 feat(library, executor): Make "Run Again" work with credentials (#10821)
- Resolves [OPEN-2549: Make "Run again" work with credentials in
`AgentRunDetailsView`](https://linear.app/autogpt/issue/OPEN-2549/make-run-again-work-with-credentials-in-agentrundetailsview)
- Resolves #10237

### Changes 🏗️

- feat(frontend/library): Make "Run Again" button work for runs with
credentials
- feat(backend/executor): Store passed-in credentials on
`GraphExecution`

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - Go to `/library/agents/[id]` for an agent with credentials inputs
  - Run the agent manually
    - [x] -> runs successfully
- [x] -> "Run again" shows among the action buttons on the newly created
run
  - Click "Run again"
    - [x] -> runs successfully
2025-09-02 18:34:56 +00:00
Zamil Majdy
c0172c93aa fix(backend/executor): prevent infinite requeueing of malformed messages (#10746)
### Changes 🏗️

This PR fixes an infinite loop issue in the execution manager where
malformed or unparseable messages would be continuously requeued,
causing high CPU usage and preventing the system from processing
legitimate messages.

**Key changes:**
- Modified `_ack_message()` function to accept explicit `requeue`
parameter
- Set `requeue=False` for malformed/unparseable messages that cannot be
fixed by retrying
- Set `requeue=False` for duplicate execution attempts (graph already
running)
- Kept `requeue=True` for legitimate failures that may succeed on retry
(e.g., temporary resource constraints, network issues)

**Technical details:**
The previous implementation always set `requeue=True` when rejecting
messages with `basic_nack()`. This caused problematic messages to be
immediately re-delivered to the consumer, creating an infinite loop for:
1. Messages with invalid JSON that cannot be parsed
2. Messages for executions that are already running (duplicates)

These scenarios will never succeed regardless of how many times they're
retried, so they should be rejected without requeueing to prevent
resource exhaustion.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Verified malformed messages are rejected without requeue
- [x] Confirmed duplicate execution messages are rejected without
requeue
- [x] Ensured legitimate failures (shutdown, pool full) still requeue
properly
- [x] Tested that normal message processing continues to work correctly
2025-08-26 18:34:58 +07:00
Nicholas Tindle
890bb3b8b4 feat(backend): implement low balance and insufficient funds notifications (#10656)
Co-authored-by: SwiftyOS <craigswift13@gmail.com>
Co-authored-by: Claude <claude@users.noreply.github.com>
Co-authored-by: majdyz <zamil@agpt.co>
2025-08-25 11:17:40 -05:00
Nicholas Tindle
2bb8e91040 feat(backend): Add user timezone support to backend (#10707)
Co-authored-by: Swifty <craigswift13@gmail.com>
resolve issue #10692 where scheduled time and actual run
2025-08-25 11:00:07 -05:00
Zamil Majdy
bf92e7dbc8 hotfix(backend/executor): Fix RabbitMQ channel retry logic in executor (#10661)
## Summary
**HOTFIX for production** - Fixes executor being stuck in infinite retry
loop when RabbitMQ channels are closed
- Ensures proper reconnection by checking channel state before
attempting to consume messages
- Prevents accumulation of thousands of retry attempts (was seeing 7000+
retries)

## Changes
The executor was stuck repeatedly failing with "Channel is closed"
errors because the `continuous_retry` decorator was attempting to reuse
closed channels instead of creating new ones.

Added channel state checks (`is_ready`) before connecting in both:
- `_consume_execution_run()` 
- `_consume_execution_cancel()`

When a channel is not ready (closed), the code now:
1. Disconnects the client (safe operation, checks if already
disconnected)
2. Establishes a fresh connection with new channel
3. Proceeds with message consumption

## Test plan
- [x] Verified the disconnect() method is safe to call on already
disconnected clients
- [x] Confirmed is_ready property checks both connection and channel
state
- [ ] Deploy to environment and verify executors reconnect properly
after channel failures
- [ ] Monitor logs to ensure no more "Channel is closed" retry loops

## Related Issues
Fixes critical production issue where:
- Executor pods show repeated "Channel is closed" errors
- 757 messages stuck in `graph_execution_queue`
- 102,286 messages in `failed_notifications` queue
- RabbitMQ logs show connections being closed due to missed heartbeats

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-authored-by: Claude <noreply@anthropic.com>
2025-08-16 17:06:06 -05:00
Bently
28d85ad61c feat(backend/AM): Integrate AutoMod content moderation (#10539)
Copy of [feat(backend/AM): Integrate AutoMod content moderation - By
Bentlybro - PR
#10490](https://github.com/Significant-Gravitas/AutoGPT/pull/10490) cos
i messed it up 🤦

Adds AutoMod input and output moderation to the execution flow.
Introduces a new AutoMod manager and models, updates settings for
moderation configuration, and modifies execution result handling to
support moderation-cleared data. Moderation failures now clear sensitive
data and mark executions as failed.

<img width="921" height="816" alt="image"
src="https://github.com/user-attachments/assets/65c0fee8-d652-42bc-9553-ff507bc067c5"
/>


### Changes 🏗️

I have made some small changes to
``autogpt_platform\backend\backend\executor\manager.py`` to send the
needed into to the AutoMod system which collects the data, combines and
makes the api call to AM and based on its reply lets it run or not!

I also had to make small changes to
``autogpt_platform\backend\backend\data\execution.py`` to add checks
that allow me to clear the content from the blocks if it was flagged

I am working on finalizing the AM repo then that will be public

To note: we will want to set this up behind launch darkly first for
testing on the team before we roll it out any more

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
- [x] Setup and run the platform with ``automod_enabled`` set to False
and it works normally
- [x] Setup and run the platform with ``automod_enabled`` set to True,
set the AM URL and API Key and test it runs safe blocks normally
- [x] Test AM with content that would trigger it to flag and watch it
stop and clear all the blocks outputs

Message @Bentlybro for the URL and an API key to AM for local testing!

## Changes made to Settings.py 

I have added a few new options to the settings.py for AutoMod Config!

```
    # AutoMod configuration
    automod_enabled: bool = Field(
        default=False,
        description="Whether AutoMod content moderation is enabled",
    )
    automod_api_url: str = Field(
        default="",
        description="AutoMod API base URL - Make sure it ends in /api",
    )
    automod_timeout: int = Field(
        default=30,
        description="Timeout in seconds for AutoMod API requests",
    )
    automod_retry_attempts: int = Field(
        default=3,
        description="Number of retry attempts for AutoMod API requests",
    )
    automod_retry_delay: float = Field(
        default=1.0,
        description="Delay between retries for AutoMod API requests in seconds",
    )
    automod_fail_open: bool = Field(
        default=False,
        description="If True, allow execution to continue if AutoMod fails",
    )
    automod_moderate_inputs: bool = Field(
        default=True,
        description="Whether to moderate block inputs",
    )
    automod_moderate_outputs: bool = Field(
        default=True,
        description="Whether to moderate block outputs",
    )
```
and
```
automod_api_key: str = Field(default="", description="AutoMod API key")
```

---------

Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
2025-08-11 09:39:28 +00:00
Zamil Majdy
de7b6b503f fix(backend): Add timeout on stopping message consumer on manager 2025-08-08 18:04:10 +07:00
Zamil Majdy
40601f1616 fix(backend): Fix executor running RabbitMQ operations on closed/closing connection (#10578)
The RabbitMQ connection is unreliable (fixing it is a separate issue)
and sometimes get restarted. The scope of this PR is to avoid the
operation break due to executing on a stale, broken connection.

### Changes 🏗️

Fix executor running RabbitMQ operations on closed/closing connection

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
- [x] Manually kill rabbitmq and see how it goes while executing an
agent
2025-08-07 23:53:52 +00:00
Zamil Majdy
e2af2f454d fix(backend): migrate notification service to fully async to resolve RabbitMQ connection issues (#10564)
## Summary
- **Remove background_executor from NotificationManager** to eliminate
event loop conflicts that were causing RabbitMQ "Connection reset by
peer" errors
- **Convert all notification processing to fully async** using async
database clients
- **Optimize Settings instantiation** to prevent file descriptor leaks
by moving to module level
- **Fix scheduler event loop management** to use single shared loop
instead of thread-cached approach

## Changes 🏗️

### 1. Remove ProcessPoolExecutor from NotificationManager
- Eliminated `background_executor` entirely from notification service
- Converted `queue_weekly_summary()` and `process_existing_batches()`
from sync to async
- Fixed the root cause: `asyncio.run()` was creating new event loops,
conflicting with existing RabbitMQ connections

### 2. Full Async Conversion
- Updated `_consume_queue` to only accept async functions:
`Callable[[str], Awaitable[bool]]`
- Replaced sync `DatabaseManagerClient` with
`DatabaseManagerAsyncClient` throughout notification service
- Added missing async methods to `DatabaseManagerAsyncClient`:
  - `get_active_user_ids_in_timerange`
  - `get_user_email_by_id` 
  - `get_user_email_verification`
  - `get_user_notification_preference`
  - `create_or_add_to_user_notification_batch`
  - `empty_user_notification_batch`
  - `get_all_batches_by_type`

### 3. Settings Optimization
- Moved `Settings()` instantiation to module level in:
  - `backend/util/metrics.py`
  - `backend/blocks/google_calendar.py`
  - `backend/blocks/gmail.py`
  - `backend/blocks/slant3d.py`
  - `backend/blocks/user.py`
- Prevents multiple file descriptor reads per process, reducing resource
usage

### 4. Scheduler Event Loop Fix
- **Simplified event loop initialization** in `Scheduler.run_service()`
to create single shared loop
- **Removed complex thread caching and locking** that could create
multiple connections
- **Fixed daemon thread lifecycle** by using non-daemon thread with
proper cleanup
- **Event loop runs in dedicated background thread** with graceful
shutdown handling

## Root Cause Analysis

The RabbitMQ "Connection reset by peer" errors were caused by:
1. **Event Loop Conflicts**: `asyncio.run()` in `queue_weekly_summary`
created new event loops, disrupting existing RabbitMQ heartbeat
connections
2. **Thread Resource Waste**: Thread-cached event loops in scheduler
created unnecessary connections
3. **File Descriptor Leaks**: Multiple Settings instantiations per
process increased resource pressure

## Why This Fixes the Issue

1. **Eliminates Event Loop Creation**: By using `asyncio.create_task()`
instead of `asyncio.run()`, we reuse the existing event loop
2. **Maintains Heartbeat Connections**: Async RabbitMQ connections
remain stable without event loop disruption
3. **Reduces Resource Pressure**: Settings optimization and simplified
scheduler reduce file descriptor usage
4. **Ensures Connection Stability**: Single shared event loop prevents
connection multiplexing issues

## Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
- [x] Verified RabbitMQ connection stability by checking heartbeat logs
- [x] Confirmed async conversion maintains all notification
functionality
  - [x] Tested scheduler job execution with simplified event loop
  - [x] Validated Settings optimization reduces file descriptor usage
  - [x] Ensured notification processing works end-to-end

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-08-07 08:25:09 +00:00
Zamil Majdy
c9360555b2 fix(backend): Persist any non interruption error on node execution as output (#10562)
Some non-node execution errors and system failures (like credentials not
found, or database failure) are not logged and exposed to the user. This
will make the node execution look like it's failed without an error
message:

<img width="804" height="1141" alt="image"
src="https://github.com/user-attachments/assets/e81314a0-b9af-4a95-bba7-8df576911e96"
/>

### Changes 🏗️

Make all non-interruption errors yielded as node execution error output.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] CI
2025-08-07 06:28:24 +07:00
Zamil Majdy
3fe88b6106 refactor(backend): Refactor log client and resource cleanup (#10558)
## Summary
- Created centralized service client helpers with thread caching in
`util/clients.py`
- Refactored service client management to eliminate health checks and
improve performance
- Enhanced logging in process cleanup to include error details
- Improved retry mechanisms and resource cleanup across the platform
- Updated multiple services to use new centralized client patterns

## Key Changes
### New Centralized Client Factory (`util/clients.py`)
- Added thread-cached factory functions for all major service clients:
  - Database managers (sync and async)
  - Scheduler client
  - Notification manager
  - Execution event bus (Redis-based)
  - RabbitMQ execution queue (sync and async)
  - Integration credentials store
- All clients use `@thread_cached` decorator for performance
optimization

### Service Client Improvements
- **Removed health checks**: Eliminated unnecessary health check calls
from `get_service_client()` to reduce startup overhead
- **Enhanced retry support**: Database manager clients now use request
retry by default
- **Better error handling**: Improved error propagation and logging

### Enhanced Logging and Cleanup
- **Process termination logs**: Added error details to termination
messages in `util/process.py`
- **Retry mechanism updates**: Improved retry logic with better error
handling in `util/retry.py`
- **Resource cleanup**: Better resource management across executors and
monitoring services

### Updated Service Usage
- Refactored 21+ files to use new centralized client patterns
- Updated all executor, monitoring, and notification services
- Maintained backward compatibility while improving performance

## Files Changed
- **Created**: `backend/util/clients.py` - Centralized client factory
with thread caching
- **Modified**: 21 files across blocks, executor, monitoring, and
utility modules
- **Key areas**: Service client initialization, resource cleanup, retry
mechanisms

## Test Plan
- [x] Verify all existing tests pass
- [x] Validate service startup and client initialization  
- [x] Test resource cleanup on process termination
- [x] Confirm retry mechanisms work correctly
- [x] Validate thread caching performance improvements
- [x] Ensure no breaking changes to existing functionality

## Breaking Changes
None - all changes maintain backward compatibility.

## Additional Notes
This refactoring centralizes client management patterns that were
scattered across the codebase, making them more consistent and
performant through thread caching. The removal of health checks reduces
startup time while maintaining reliability through improved retry
mechanisms.

🤖 Generated with [Claude Code](https://claude.ai/code)
2025-08-06 13:53:01 +07:00