From 033f58c075637fd22ba46d3f8dc0184393e5b1c4 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Wed, 21 Jan 2026 09:51:26 -0600 Subject: [PATCH 1/2] fix(backend): Make Redis event bus gracefully handle connection failures (#11817) ## Summary Adds graceful error handling to AsyncRedisEventBus and RedisEventBus so that connection failures log exceptions with full traceback while remaining non-breaking. This allows DatabaseManager to operate without Redis connectivity. ## Problem DatabaseManager was failing with "Authentication required" when trying to publish notifications via AsyncRedisNotificationEventBus. The service has no Redis credentials configured, causing `increment_onboarding_runs` to fail. ## Root Cause When `increment_onboarding_runs` publishes a notification: 1. Calls `AsyncRedisNotificationEventBus().publish()` 2. Attempts to connect to Redis via `get_redis_async()` 3. Connection fails due to missing credentials 4. Exception propagates, failing the entire DB operation Previous fix (#11775) made the cache module lazy, but didn't address the notification bus which also requires Redis. ## Solution Wrap Redis operations in try-except blocks: - `publish_event`: Logs exception with traceback, continues without publishing - `listen_events`: Logs exception with traceback, returns empty generator - `wait_for_event`: Returns None on connection failure Using `logger.exception()` instead of `logger.warning()` ensures full stack traces are captured for debugging while keeping operations non-breaking. This allows services to operate without Redis when only using event bus for non-critical notifications. ## Changes - Modified `backend/data/event_bus.py`: - Added graceful error handling to `RedisEventBus` and `AsyncRedisEventBus` - All Redis operations now catch exceptions and log with `logger.exception()` - Added `backend/data/event_bus_test.py`: - Tests verify graceful degradation when Redis is unavailable - Tests verify normal operation when Redis is available ## Test Plan - [x] New tests verify graceful degradation when Redis unavailable - [x] Existing notification tests still pass - [x] DatabaseManager can increment onboarding runs without Redis ## Related Issues Fixes https://significant-gravitas.sentry.io/issues/7205834440/ (AUTOGPT-SERVER-76D) --- .../backend/backend/data/event_bus.py | 30 ++++++++-- .../backend/backend/data/event_bus_test.py | 56 +++++++++++++++++++ .../backend/backend/data/execution.py | 2 + .../backend/backend/data/graph.py | 2 + .../backend/scripts/generate_block_docs.py | 10 ++-- 5 files changed, 90 insertions(+), 10 deletions(-) create mode 100644 autogpt_platform/backend/backend/data/event_bus_test.py diff --git a/autogpt_platform/backend/backend/data/event_bus.py b/autogpt_platform/backend/backend/data/event_bus.py index 48f9df87fa..d8a1c5b729 100644 --- a/autogpt_platform/backend/backend/data/event_bus.py +++ b/autogpt_platform/backend/backend/data/event_bus.py @@ -103,8 +103,18 @@ class RedisEventBus(BaseRedisEventBus[M], ABC): return redis.get_redis() def publish_event(self, event: M, channel_key: str): - message, full_channel_name = self._serialize_message(event, channel_key) - self.connection.publish(full_channel_name, message) + """ + Publish an event to Redis. Gracefully handles connection failures + by logging the error instead of raising exceptions. + """ + try: + message, full_channel_name = self._serialize_message(event, channel_key) + self.connection.publish(full_channel_name, message) + except Exception: + logger.exception( + f"Failed to publish event to Redis channel {channel_key}. " + "Event bus operation will continue without Redis connectivity." + ) def listen_events(self, channel_key: str) -> Generator[M, None, None]: pubsub, full_channel_name = self._get_pubsub_channel( @@ -128,9 +138,19 @@ class AsyncRedisEventBus(BaseRedisEventBus[M], ABC): return await redis.get_redis_async() async def publish_event(self, event: M, channel_key: str): - message, full_channel_name = self._serialize_message(event, channel_key) - connection = await self.connection - await connection.publish(full_channel_name, message) + """ + Publish an event to Redis. Gracefully handles connection failures + by logging the error instead of raising exceptions. + """ + try: + message, full_channel_name = self._serialize_message(event, channel_key) + connection = await self.connection + await connection.publish(full_channel_name, message) + except Exception: + logger.exception( + f"Failed to publish event to Redis channel {channel_key}. " + "Event bus operation will continue without Redis connectivity." + ) async def listen_events(self, channel_key: str) -> AsyncGenerator[M, None]: pubsub, full_channel_name = self._get_pubsub_channel( diff --git a/autogpt_platform/backend/backend/data/event_bus_test.py b/autogpt_platform/backend/backend/data/event_bus_test.py new file mode 100644 index 0000000000..93ecc0ba2a --- /dev/null +++ b/autogpt_platform/backend/backend/data/event_bus_test.py @@ -0,0 +1,56 @@ +""" +Tests for event_bus graceful degradation when Redis is unavailable. +""" + +from unittest.mock import AsyncMock, patch + +import pytest +from pydantic import BaseModel + +from backend.data.event_bus import AsyncRedisEventBus + + +class TestEvent(BaseModel): + """Test event model.""" + + message: str + + +class TestNotificationBus(AsyncRedisEventBus[TestEvent]): + """Test implementation of AsyncRedisEventBus.""" + + Model = TestEvent + + @property + def event_bus_name(self) -> str: + return "test_event_bus" + + +@pytest.mark.asyncio +async def test_publish_event_handles_connection_failure_gracefully(): + """Test that publish_event logs exception instead of raising when Redis is unavailable.""" + bus = TestNotificationBus() + event = TestEvent(message="test message") + + # Mock get_redis_async to raise connection error + with patch( + "backend.data.event_bus.redis.get_redis_async", + side_effect=ConnectionError("Authentication required."), + ): + # Should not raise exception + await bus.publish_event(event, "test_channel") + + +@pytest.mark.asyncio +async def test_publish_event_works_with_redis_available(): + """Test that publish_event works normally when Redis is available.""" + bus = TestNotificationBus() + event = TestEvent(message="test message") + + # Mock successful Redis connection + mock_redis = AsyncMock() + mock_redis.publish = AsyncMock() + + with patch("backend.data.event_bus.redis.get_redis_async", return_value=mock_redis): + await bus.publish_event(event, "test_channel") + mock_redis.publish.assert_called_once() diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index cee0b82137..3c1fd25c51 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -81,6 +81,8 @@ class ExecutionContext(BaseModel): This includes information needed by blocks, sub-graphs, and execution management. """ + model_config = {"extra": "ignore"} + human_in_the_loop_safe_mode: bool = True sensitive_action_safe_mode: bool = False user_timezone: str = "UTC" diff --git a/autogpt_platform/backend/backend/data/graph.py b/autogpt_platform/backend/backend/data/graph.py index d70bdbdc61..c1f38f81d5 100644 --- a/autogpt_platform/backend/backend/data/graph.py +++ b/autogpt_platform/backend/backend/data/graph.py @@ -64,6 +64,8 @@ logger = logging.getLogger(__name__) class GraphSettings(BaseModel): # Use Annotated with BeforeValidator to coerce None to default values. # This handles cases where the database has null values for these fields. + model_config = {"extra": "ignore"} + human_in_the_loop_safe_mode: Annotated[ bool, BeforeValidator(lambda v: v if v is not None else True) ] = True diff --git a/autogpt_platform/backend/scripts/generate_block_docs.py b/autogpt_platform/backend/scripts/generate_block_docs.py index 1a49cc8805..4fa85e9bf0 100644 --- a/autogpt_platform/backend/scripts/generate_block_docs.py +++ b/autogpt_platform/backend/scripts/generate_block_docs.py @@ -366,12 +366,12 @@ def generate_block_markdown( lines.append("") # What it is (full description) - lines.append(f"### What it is") + lines.append("### What it is") lines.append(block.description or "No description available.") lines.append("") # How it works (manual section) - lines.append(f"### How it works") + lines.append("### How it works") how_it_works = manual_content.get( "how_it_works", "_Add technical explanation here._" ) @@ -383,7 +383,7 @@ def generate_block_markdown( # Inputs table (auto-generated) visible_inputs = [f for f in block.inputs if not f.hidden] if visible_inputs: - lines.append(f"### Inputs") + lines.append("### Inputs") lines.append("") lines.append("| Input | Description | Type | Required |") lines.append("|-------|-------------|------|----------|") @@ -400,7 +400,7 @@ def generate_block_markdown( # Outputs table (auto-generated) visible_outputs = [f for f in block.outputs if not f.hidden] if visible_outputs: - lines.append(f"### Outputs") + lines.append("### Outputs") lines.append("") lines.append("| Output | Description | Type |") lines.append("|--------|-------------|------|") @@ -414,7 +414,7 @@ def generate_block_markdown( lines.append("") # Possible use case (manual section) - lines.append(f"### Possible use case") + lines.append("### Possible use case") use_case = manual_content.get("use_case", "_Add practical use case examples here._") lines.append("") lines.append(use_case) From 5d0cd88d9824edea08aead02f95802a4244a95a6 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Wed, 21 Jan 2026 13:11:58 -0500 Subject: [PATCH 2/2] fix(backend): Use unqualified vector type for pgvector queries (#11818) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary - Remove explicit schema qualification (`{schema}.vector` and `OPERATOR({schema}.<=>)`) from pgvector queries in `embeddings.py` and `hybrid_search.py` - Use unqualified `::vector` type cast and `<=>` operator which work because pgvector is in the search_path on all environments ## Problem The previous approach tried to explicitly qualify the vector type with schema names, but this failed because: - **CI environment**: pgvector is in `public` schema → `platform.vector` doesn't exist - **Dev (Supabase)**: pgvector is in `platform` schema → `public.vector` doesn't exist ## Solution Use unqualified `::vector` and `<=>` operator. PostgreSQL resolves these via `search_path`, which includes the schema where pgvector is installed on all environments. Tested on both local and dev environments with a test script that verified: - ✅ Unqualified `::vector` type cast - ✅ Unqualified `<=>` operator in ORDER BY - ✅ Unqualified `<=>` in SELECT (similarity calculation) - ✅ Combined query patterns matching actual usage ## Test plan - [ ] CI tests pass - [ ] Marketplace approval works on dev after deployment Fixes: AUTOGPT-SERVER-763, AUTOGPT-SERVER-764, AUTOGPT-SERVER-76B --- .../backend/api/features/store/embeddings.py | 17 ++++++++--------- .../backend/api/features/store/hybrid_search.py | 8 ++++---- autogpt_platform/backend/backend/data/db.py | 14 +++++++------- .../migration.sql | 9 +++++---- .../frontend/src/tests/marketplace.spec.ts | 1 + 5 files changed, 25 insertions(+), 24 deletions(-) diff --git a/autogpt_platform/backend/backend/api/features/store/embeddings.py b/autogpt_platform/backend/backend/api/features/store/embeddings.py index 68b0d6aea0..efe896f665 100644 --- a/autogpt_platform/backend/backend/api/features/store/embeddings.py +++ b/autogpt_platform/backend/backend/api/features/store/embeddings.py @@ -154,16 +154,16 @@ async def store_content_embedding( # Upsert the embedding # WHERE clause in DO UPDATE prevents PostgreSQL 15 bug with NULLS NOT DISTINCT - # Use {pgvector_schema}.vector for explicit pgvector type qualification + # Use unqualified ::vector - pgvector is in search_path on all environments await execute_raw_with_schema( """ INSERT INTO {schema_prefix}"UnifiedContentEmbedding" ( "id", "contentType", "contentId", "userId", "embedding", "searchableText", "metadata", "createdAt", "updatedAt" ) - VALUES (gen_random_uuid()::text, $1::{schema_prefix}"ContentType", $2, $3, $4::{pgvector_schema}.vector, $5, $6::jsonb, NOW(), NOW()) + VALUES (gen_random_uuid()::text, $1::{schema_prefix}"ContentType", $2, $3, $4::vector, $5, $6::jsonb, NOW(), NOW()) ON CONFLICT ("contentType", "contentId", "userId") DO UPDATE SET - "embedding" = $4::{pgvector_schema}.vector, + "embedding" = $4::vector, "searchableText" = $5, "metadata" = $6::jsonb, "updatedAt" = NOW() @@ -879,8 +879,7 @@ async def semantic_search( min_similarity_idx = len(params) + 1 params.append(min_similarity) - # Use regular string (not f-string) for template to preserve {schema_prefix} and {schema} placeholders - # Use OPERATOR({pgvector_schema}.<=>) for explicit operator schema qualification + # Use unqualified ::vector and <=> operator - pgvector is in search_path on all environments sql = ( """ SELECT @@ -888,9 +887,9 @@ async def semantic_search( "contentType" as content_type, "searchableText" as searchable_text, metadata, - 1 - (embedding OPERATOR({pgvector_schema}.<=>) '""" + 1 - (embedding <=> '""" + embedding_str - + """'::{pgvector_schema}.vector) as similarity + + """'::vector) as similarity FROM {schema_prefix}"UnifiedContentEmbedding" WHERE "contentType" IN (""" + content_type_placeholders @@ -898,9 +897,9 @@ async def semantic_search( """ + user_filter + """ - AND 1 - (embedding OPERATOR({pgvector_schema}.<=>) '""" + AND 1 - (embedding <=> '""" + embedding_str - + """'::{pgvector_schema}.vector) >= $""" + + """'::vector) >= $""" + str(min_similarity_idx) + """ ORDER BY similarity DESC diff --git a/autogpt_platform/backend/backend/api/features/store/hybrid_search.py b/autogpt_platform/backend/backend/api/features/store/hybrid_search.py index 2f2beb80ff..95ec3f4ff9 100644 --- a/autogpt_platform/backend/backend/api/features/store/hybrid_search.py +++ b/autogpt_platform/backend/backend/api/features/store/hybrid_search.py @@ -295,7 +295,7 @@ async def unified_hybrid_search( FROM {{schema_prefix}}"UnifiedContentEmbedding" uce WHERE uce."contentType" = ANY({content_types_param}::{{schema_prefix}}"ContentType"[]) {user_filter} - ORDER BY uce.embedding OPERATOR({{pgvector_schema}}.<=>) {embedding_param}::{{pgvector_schema}}.vector + ORDER BY uce.embedding <=> {embedding_param}::vector LIMIT 200 ) ), @@ -307,7 +307,7 @@ async def unified_hybrid_search( uce.metadata, uce."updatedAt" as updated_at, -- Semantic score: cosine similarity (1 - distance) - COALESCE(1 - (uce.embedding OPERATOR({{pgvector_schema}}.<=>) {embedding_param}::{{pgvector_schema}}.vector), 0) as semantic_score, + COALESCE(1 - (uce.embedding <=> {embedding_param}::vector), 0) as semantic_score, -- Lexical score: ts_rank_cd COALESCE(ts_rank_cd(uce.search, plainto_tsquery('english', {query_param})), 0) as lexical_raw, -- Category match from metadata @@ -583,7 +583,7 @@ async def hybrid_search( WHERE uce."contentType" = 'STORE_AGENT'::{{schema_prefix}}"ContentType" AND uce."userId" IS NULL AND {where_clause} - ORDER BY uce.embedding OPERATOR({{pgvector_schema}}.<=>) {embedding_param}::{{pgvector_schema}}.vector + ORDER BY uce.embedding <=> {embedding_param}::vector LIMIT 200 ) uce ), @@ -605,7 +605,7 @@ async def hybrid_search( -- Searchable text for BM25 reranking COALESCE(sa.agent_name, '') || ' ' || COALESCE(sa.sub_heading, '') || ' ' || COALESCE(sa.description, '') as searchable_text, -- Semantic score - COALESCE(1 - (uce.embedding OPERATOR({{pgvector_schema}}.<=>) {embedding_param}::{{pgvector_schema}}.vector), 0) as semantic_score, + COALESCE(1 - (uce.embedding <=> {embedding_param}::vector), 0) as semantic_score, -- Lexical score (raw, will normalize) COALESCE(ts_rank_cd(uce.search, plainto_tsquery('english', {query_param})), 0) as lexical_raw, -- Category match diff --git a/autogpt_platform/backend/backend/data/db.py b/autogpt_platform/backend/backend/data/db.py index bdba5eca02..48f420aec7 100644 --- a/autogpt_platform/backend/backend/data/db.py +++ b/autogpt_platform/backend/backend/data/db.py @@ -121,10 +121,14 @@ async def _raw_with_schema( Supports placeholders: - {schema_prefix}: Table/type prefix (e.g., "platform".) - {schema}: Raw schema name for application tables (e.g., platform) - - {pgvector_schema}: Schema where pgvector is installed (defaults to "public") + + Note on pgvector types: + Use unqualified ::vector and <=> operator in queries. PostgreSQL resolves + these via search_path, which includes the schema where pgvector is installed + on all environments (local, CI, dev). Args: - query_template: SQL query with {schema_prefix}, {schema}, and/or {pgvector_schema} placeholders + query_template: SQL query with {schema_prefix} and/or {schema} placeholders *args: Query parameters execute: If False, executes SELECT query. If True, executes INSERT/UPDATE/DELETE. client: Optional Prisma client for transactions (only used when execute=True). @@ -135,20 +139,16 @@ async def _raw_with_schema( Example with vector type: await execute_raw_with_schema( - 'INSERT INTO {schema_prefix}"Embedding" (vec) VALUES ($1::{pgvector_schema}.vector)', + 'INSERT INTO {schema_prefix}"Embedding" (vec) VALUES ($1::vector)', embedding_data ) """ schema = get_database_schema() schema_prefix = f'"{schema}".' if schema != "public" else "" - # pgvector extension is typically installed in "public" schema - # On Supabase it may be in "extensions" but "public" is the common default - pgvector_schema = "public" formatted_query = query_template.format( schema_prefix=schema_prefix, schema=schema, - pgvector_schema=pgvector_schema, ) import prisma as prisma_module diff --git a/autogpt_platform/backend/migrations/20260109181714_add_docs_embedding/migration.sql b/autogpt_platform/backend/migrations/20260109181714_add_docs_embedding/migration.sql index 0897e1865a..855fe36933 100644 --- a/autogpt_platform/backend/migrations/20260109181714_add_docs_embedding/migration.sql +++ b/autogpt_platform/backend/migrations/20260109181714_add_docs_embedding/migration.sql @@ -1,9 +1,10 @@ -- CreateExtension -- Supabase: pgvector must be enabled via Dashboard → Database → Extensions first --- Create in public schema so vector type is available across all schemas +-- Creates extension in current schema (determined by search_path from DATABASE_URL ?schema= param) +-- This ensures vector type is in the same schema as tables, making ::vector work without explicit qualification DO $$ BEGIN - CREATE EXTENSION IF NOT EXISTS "vector" WITH SCHEMA "public"; + CREATE EXTENSION IF NOT EXISTS "vector"; EXCEPTION WHEN OTHERS THEN RAISE NOTICE 'vector extension not available or already exists, skipping'; END $$; @@ -19,7 +20,7 @@ CREATE TABLE "UnifiedContentEmbedding" ( "contentType" "ContentType" NOT NULL, "contentId" TEXT NOT NULL, "userId" TEXT, - "embedding" public.vector(1536) NOT NULL, + "embedding" vector(1536) NOT NULL, "searchableText" TEXT NOT NULL, "metadata" JSONB NOT NULL DEFAULT '{}', @@ -45,4 +46,4 @@ CREATE UNIQUE INDEX "UnifiedContentEmbedding_contentType_contentId_userId_key" O -- Uses cosine distance operator (<=>), which matches the query in hybrid_search.py -- Note: Drop first in case Prisma created a btree index (Prisma doesn't support HNSW) DROP INDEX IF EXISTS "UnifiedContentEmbedding_embedding_idx"; -CREATE INDEX "UnifiedContentEmbedding_embedding_idx" ON "UnifiedContentEmbedding" USING hnsw ("embedding" public.vector_cosine_ops); +CREATE INDEX "UnifiedContentEmbedding_embedding_idx" ON "UnifiedContentEmbedding" USING hnsw ("embedding" vector_cosine_ops); diff --git a/autogpt_platform/frontend/src/tests/marketplace.spec.ts b/autogpt_platform/frontend/src/tests/marketplace.spec.ts index 6f06f7addb..774713dc82 100644 --- a/autogpt_platform/frontend/src/tests/marketplace.spec.ts +++ b/autogpt_platform/frontend/src/tests/marketplace.spec.ts @@ -4,6 +4,7 @@ import { LoginPage } from "./pages/login.page"; import { MarketplacePage } from "./pages/marketplace.page"; import { hasMinCount, hasUrl, isVisible, matchesUrl } from "./utils/assertion"; +// Marketplace tests for store agent search functionality test.describe("Marketplace – Basic Functionality", () => { test("User can access marketplace page when logged out", async ({ page }) => { const marketplacePage = new MarketplacePage(page);