Compare commits

..

1 Commits

Author SHA1 Message Date
Zamil Majdy
12690ad0a9 fix(backend): Use explicit {schema}.vector for pgvector types
The unqualified ::vector type fails in ORDER BY context with PgBouncer.
Use explicit schema qualification ({schema}.vector) which resolves to
platform.vector where the pgvector extension is installed.

Changes:
- Add {schema} placeholder to db.py for raw schema name
- Use {schema}.vector instead of unqualified ::vector in embeddings.py
- Use {{schema}}.vector instead of unqualified ::vector in hybrid_search.py

Tested on dev: explicit platform.vector works in all contexts.

Fixes: AUTOGPT-SERVER-76B
2026-01-21 10:19:08 -05:00
10 changed files with 90 additions and 291 deletions

View File

@@ -45,9 +45,6 @@ class StreamChatRequest(BaseModel):
message: str
is_user_message: bool = True
context: dict[str, str] | None = None # {url: str, content: str}
tags: list[str] | None = (
None # Custom tags for Langfuse tracing (e.g., experiment names)
)
class CreateSessionResponse(BaseModel):
@@ -232,7 +229,6 @@ async def stream_chat_post(
user_id=user_id,
session=session, # Pass pre-fetched session to avoid double-fetch
context=request.context,
tags=request.tags,
):
yield chunk.to_sse()
# AI SDK protocol termination

View File

@@ -63,7 +63,7 @@ def _is_langfuse_configured() -> bool:
)
async def _build_system_prompt(user_id: str | None) -> tuple[str, Any, Any]:
async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
"""Build the full system prompt including business understanding if available.
Args:
@@ -71,7 +71,7 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any, Any]:
If "default" and this is the user's first session, will use "onboarding" instead.
Returns:
Tuple of (compiled prompt string, understanding object, Langfuse prompt object for tracing)
Tuple of (compiled prompt string, Langfuse prompt object for tracing)
"""
# cache_ttl_seconds=0 disables SDK caching to always get the latest prompt
@@ -91,7 +91,7 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any, Any]:
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
compiled = prompt.compile(users_information=context)
return compiled, understanding, prompt
return compiled, understanding
async def _generate_session_title(message: str) -> str | None:
@@ -156,7 +156,6 @@ async def stream_chat_completion(
retry_count: int = 0,
session: ChatSession | None = None,
context: dict[str, str] | None = None, # {url: str, content: str}
tags: list[str] | None = None, # Custom tags for Langfuse tracing
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Main entry point for streaming chat completions with database handling.
@@ -266,7 +265,7 @@ async def stream_chat_completion(
asyncio.create_task(_update_title())
# Build system prompt with business understanding
system_prompt, understanding, langfuse_prompt = await _build_system_prompt(user_id)
system_prompt, understanding = await _build_system_prompt(user_id)
# Create Langfuse trace for this LLM call (each call gets its own trace, grouped by session_id)
# Using v3 SDK: start_observation creates a root span, update_trace sets trace-level attributes
@@ -280,15 +279,10 @@ async def stream_chat_completion(
name="user-copilot-request",
input=input,
) as span:
# Merge custom tags with default "copilot" tag
all_tags = ["copilot"]
if tags:
all_tags.extend(tags)
with propagate_attributes(
session_id=session_id,
user_id=user_id,
tags=all_tags,
tags=["copilot"],
metadata={
"users_information": format_understanding_for_prompt(understanding)[
:200
@@ -327,7 +321,6 @@ async def stream_chat_completion(
tools=tools,
system_prompt=system_prompt,
text_block_id=text_block_id,
langfuse_prompt=langfuse_prompt,
):
if isinstance(chunk, StreamTextStart):
@@ -474,7 +467,6 @@ async def stream_chat_completion(
retry_count=retry_count + 1,
session=session,
context=context,
tags=tags,
):
yield chunk
return # Exit after retry to avoid double-saving in finally block
@@ -524,7 +516,6 @@ async def stream_chat_completion(
session=session, # Pass session object to avoid Redis refetch
context=context,
tool_call_response=str(tool_response_messages),
tags=tags,
):
yield chunk
@@ -543,8 +534,8 @@ def _is_retryable_error(error: Exception) -> bool:
return True
if isinstance(error, APIStatusError):
# APIStatusError has a response with status_code
# Retry on 5xx status codes (server errors) or 429 (rate limit)
if error.response.status_code >= 500 or error.response.status_code == 429:
# Retry on 5xx status codes (server errors)
if error.response.status_code >= 500:
return True
if isinstance(error, APIError):
# Retry on overloaded errors or 500 errors (may not have status code)
@@ -559,7 +550,6 @@ async def _stream_chat_chunks(
tools: list[ChatCompletionToolParam],
system_prompt: str | None = None,
text_block_id: str | None = None,
langfuse_prompt: Any | None = None,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""
Pure streaming function for OpenAI chat completions with tool calling.
@@ -571,7 +561,6 @@ async def _stream_chat_chunks(
session: Chat session with conversation history
tools: Available tools for the model
system_prompt: System prompt to prepend to messages
langfuse_prompt: Langfuse prompt object for linking to traces
Yields:
SSE formatted JSON response objects
@@ -605,7 +594,6 @@ async def _stream_chat_chunks(
)
# Create the stream with proper types
# Pass langfuse_prompt to link generation to prompt version in Langfuse
stream = await client.chat.completions.create(
model=model,
messages=messages,
@@ -613,7 +601,6 @@ async def _stream_chat_chunks(
tool_choice="auto",
stream=True,
stream_options={"include_usage": True},
langfuse_prompt=langfuse_prompt, # type: ignore[call-overload]
)
# Variables to accumulate tool calls

View File

@@ -154,16 +154,15 @@ async def store_content_embedding(
# Upsert the embedding
# WHERE clause in DO UPDATE prevents PostgreSQL 15 bug with NULLS NOT DISTINCT
# Use {pgvector_schema}.vector for explicit pgvector type qualification
await execute_raw_with_schema(
"""
INSERT INTO {schema_prefix}"UnifiedContentEmbedding" (
"id", "contentType", "contentId", "userId", "embedding", "searchableText", "metadata", "createdAt", "updatedAt"
)
VALUES (gen_random_uuid()::text, $1::{schema_prefix}"ContentType", $2, $3, $4::{pgvector_schema}.vector, $5, $6::jsonb, NOW(), NOW())
VALUES (gen_random_uuid()::text, $1::{schema_prefix}"ContentType", $2, $3, $4::{schema}.vector, $5, $6::jsonb, NOW(), NOW())
ON CONFLICT ("contentType", "contentId", "userId")
DO UPDATE SET
"embedding" = $4::{pgvector_schema}.vector,
"embedding" = $4::{schema}.vector,
"searchableText" = $5,
"metadata" = $6::jsonb,
"updatedAt" = NOW()
@@ -879,8 +878,6 @@ async def semantic_search(
min_similarity_idx = len(params) + 1
params.append(min_similarity)
# Use regular string (not f-string) for template to preserve {schema_prefix} and {schema} placeholders
# Use OPERATOR({pgvector_schema}.<=>) for explicit operator schema qualification
sql = (
"""
SELECT
@@ -888,9 +885,9 @@ async def semantic_search(
"contentType" as content_type,
"searchableText" as searchable_text,
metadata,
1 - (embedding OPERATOR({pgvector_schema}.<=>) '"""
1 - (embedding <=> '"""
+ embedding_str
+ """'::{pgvector_schema}.vector) as similarity
+ """'::{schema}.vector) as similarity
FROM {schema_prefix}"UnifiedContentEmbedding"
WHERE "contentType" IN ("""
+ content_type_placeholders
@@ -898,9 +895,9 @@ async def semantic_search(
"""
+ user_filter
+ """
AND 1 - (embedding OPERATOR({pgvector_schema}.<=>) '"""
AND 1 - (embedding <=> '"""
+ embedding_str
+ """'::{pgvector_schema}.vector) >= $"""
+ """'::{schema}.vector) >= $"""
+ str(min_similarity_idx)
+ """
ORDER BY similarity DESC

View File

@@ -295,7 +295,7 @@ async def unified_hybrid_search(
FROM {{schema_prefix}}"UnifiedContentEmbedding" uce
WHERE uce."contentType" = ANY({content_types_param}::{{schema_prefix}}"ContentType"[])
{user_filter}
ORDER BY uce.embedding OPERATOR({{pgvector_schema}}.<=>) {embedding_param}::{{pgvector_schema}}.vector
ORDER BY uce.embedding <=> {embedding_param}::{{schema}}.vector
LIMIT 200
)
),
@@ -307,7 +307,7 @@ async def unified_hybrid_search(
uce.metadata,
uce."updatedAt" as updated_at,
-- Semantic score: cosine similarity (1 - distance)
COALESCE(1 - (uce.embedding OPERATOR({{pgvector_schema}}.<=>) {embedding_param}::{{pgvector_schema}}.vector), 0) as semantic_score,
COALESCE(1 - (uce.embedding <=> {embedding_param}::{{schema}}.vector), 0) as semantic_score,
-- Lexical score: ts_rank_cd
COALESCE(ts_rank_cd(uce.search, plainto_tsquery('english', {query_param})), 0) as lexical_raw,
-- Category match from metadata
@@ -583,7 +583,7 @@ async def hybrid_search(
WHERE uce."contentType" = 'STORE_AGENT'::{{schema_prefix}}"ContentType"
AND uce."userId" IS NULL
AND {where_clause}
ORDER BY uce.embedding OPERATOR({{pgvector_schema}}.<=>) {embedding_param}::{{pgvector_schema}}.vector
ORDER BY uce.embedding <=> {embedding_param}::{{schema}}.vector
LIMIT 200
) uce
),
@@ -605,7 +605,7 @@ async def hybrid_search(
-- Searchable text for BM25 reranking
COALESCE(sa.agent_name, '') || ' ' || COALESCE(sa.sub_heading, '') || ' ' || COALESCE(sa.description, '') as searchable_text,
-- Semantic score
COALESCE(1 - (uce.embedding OPERATOR({{pgvector_schema}}.<=>) {embedding_param}::{{pgvector_schema}}.vector), 0) as semantic_score,
COALESCE(1 - (uce.embedding <=> {embedding_param}::{{schema}}.vector), 0) as semantic_score,
-- Lexical score (raw, will normalize)
COALESCE(ts_rank_cd(uce.search, plainto_tsquery('english', {query_param})), 0) as lexical_raw,
-- Category match

View File

@@ -120,11 +120,10 @@ async def _raw_with_schema(
Supports placeholders:
- {schema_prefix}: Table/type prefix (e.g., "platform".)
- {schema}: Raw schema name for application tables (e.g., platform)
- {pgvector_schema}: Schema where pgvector is installed (defaults to "public")
- {schema}: Raw schema name (e.g., platform) for pgvector types
Args:
query_template: SQL query with {schema_prefix}, {schema}, and/or {pgvector_schema} placeholders
query_template: SQL query with {schema_prefix} and/or {schema} placeholders
*args: Query parameters
execute: If False, executes SELECT query. If True, executes INSERT/UPDATE/DELETE.
client: Optional Prisma client for transactions (only used when execute=True).
@@ -133,23 +132,16 @@ async def _raw_with_schema(
- list[dict] if execute=False (query results)
- int if execute=True (number of affected rows)
Example with vector type:
Example:
await execute_raw_with_schema(
'INSERT INTO {schema_prefix}"Embedding" (vec) VALUES ($1::{pgvector_schema}.vector)',
'INSERT INTO {schema_prefix}"Embedding" (vec) VALUES ($1::{schema}.vector)',
embedding_data
)
"""
schema = get_database_schema()
schema_prefix = f'"{schema}".' if schema != "public" else ""
# pgvector extension is typically installed in "public" schema
# On Supabase it may be in "extensions" but "public" is the common default
pgvector_schema = "public"
formatted_query = query_template.format(
schema_prefix=schema_prefix,
schema=schema,
pgvector_schema=pgvector_schema,
)
formatted_query = query_template.format(schema_prefix=schema_prefix, schema=schema)
import prisma as prisma_module

View File

@@ -1,15 +1,8 @@
"use client";
import React, {
useCallback,
useContext,
useEffect,
useMemo,
useState,
} from "react";
import React, { useCallback, useEffect, useMemo, useState } from "react";
import {
CredentialsMetaInput,
CredentialsType,
GraphExecutionID,
GraphMeta,
LibraryAgentPreset,
@@ -36,11 +29,7 @@ import {
} from "@/components/__legacy__/ui/icons";
import { Input } from "@/components/__legacy__/ui/input";
import { Button } from "@/components/atoms/Button/Button";
import { CredentialsGroupedView } from "@/components/contextual/CredentialsInput/components/CredentialsGroupedView/CredentialsGroupedView";
import {
findSavedCredentialByProviderAndType,
findSavedUserCredentialByProviderAndType,
} from "@/components/contextual/CredentialsInput/components/CredentialsGroupedView/helpers";
import { CredentialsInput } from "@/components/contextual/CredentialsInput/CredentialsInput";
import { InformationTooltip } from "@/components/molecules/InformationTooltip/InformationTooltip";
import {
useToast,
@@ -48,7 +37,6 @@ import {
} from "@/components/molecules/Toast/use-toast";
import { humanizeCronExpression } from "@/lib/cron-expression-utils";
import { cn, isEmpty } from "@/lib/utils";
import { CredentialsProvidersContext } from "@/providers/agent-credentials/credentials-provider";
import { ClockIcon, CopyIcon, InfoIcon } from "@phosphor-icons/react";
import { CalendarClockIcon, Trash2Icon } from "lucide-react";
@@ -102,7 +90,6 @@ export function AgentRunDraftView({
const api = useBackendAPI();
const { toast } = useToast();
const toastOnFail = useToastOnFail();
const allProviders = useContext(CredentialsProvidersContext);
const [inputValues, setInputValues] = useState<Record<string, any>>({});
const [inputCredentials, setInputCredentials] = useState<
@@ -141,77 +128,6 @@ export function AgentRunDraftView({
() => graph.credentials_input_schema.properties,
[graph],
);
const credentialFields = useMemo(
function getCredentialFields() {
return Object.entries(agentCredentialsInputFields);
},
[agentCredentialsInputFields],
);
const requiredCredentials = useMemo(
function getRequiredCredentials() {
return new Set(
(graph.credentials_input_schema?.required as string[]) || [],
);
},
[graph.credentials_input_schema?.required],
);
useEffect(
function initializeDefaultCredentials() {
if (!allProviders) return;
if (!graph.credentials_input_schema?.properties) return;
if (requiredCredentials.size === 0) return;
setInputCredentials(function updateCredentials(currentCreds) {
const next = { ...currentCreds };
let didAdd = false;
for (const key of requiredCredentials) {
if (next[key]) continue;
const schema = graph.credentials_input_schema.properties[key];
if (!schema) continue;
const providerNames = schema.credentials_provider || [];
const credentialTypes = schema.credentials_types || [];
const requiredScopes = schema.credentials_scopes;
const userCredential = findSavedUserCredentialByProviderAndType(
providerNames,
credentialTypes,
requiredScopes,
allProviders,
);
const savedCredential =
userCredential ||
findSavedCredentialByProviderAndType(
providerNames,
credentialTypes,
requiredScopes,
allProviders,
);
if (!savedCredential) continue;
next[key] = {
id: savedCredential.id,
provider: savedCredential.provider,
type: savedCredential.type as CredentialsType,
title: savedCredential.title,
};
didAdd = true;
}
if (!didAdd) return currentCreds;
return next;
});
},
[
allProviders,
graph.credentials_input_schema?.properties,
requiredCredentials,
],
);
const [allRequiredInputsAreSet, missingInputs] = useMemo(() => {
const nonEmptyInputs = new Set(
@@ -229,35 +145,18 @@ export function AgentRunDraftView({
);
return [isSuperset, difference];
}, [agentInputSchema.required, inputValues]);
const [allCredentialsAreSet, missingCredentials] = useMemo(
function getCredentialStatus() {
const missing = Array.from(requiredCredentials).filter((key) => {
const cred = inputCredentials[key];
return !cred || !cred.id;
});
return [missing.length === 0, missing];
},
[requiredCredentials, inputCredentials],
);
function addChangedCredentials(prev: Set<keyof LibraryAgentPresetUpdatable>) {
const next = new Set(prev);
next.add("credentials");
return next;
}
function handleCredentialChange(key: string, value?: CredentialsMetaInput) {
setInputCredentials(function updateInputCredentials(currentCreds) {
const next = { ...currentCreds };
if (value === undefined) {
delete next[key];
return next;
}
next[key] = value;
return next;
});
setChangedPresetAttributes(addChangedCredentials);
}
const [allCredentialsAreSet, missingCredentials] = useMemo(() => {
const availableCredentials = new Set(Object.keys(inputCredentials));
const allCredentials = new Set(Object.keys(agentCredentialsInputFields));
// Backwards-compatible implementation of isSupersetOf and difference
const isSuperset = Array.from(allCredentials).every((item) =>
availableCredentials.has(item),
);
const difference = Array.from(allCredentials).filter(
(item) => !availableCredentials.has(item),
);
return [isSuperset, difference];
}, [agentCredentialsInputFields, inputCredentials]);
const notifyMissingInputs = useCallback(
(needPresetName: boolean = true) => {
const allMissingFields = (
@@ -750,6 +649,35 @@ export function AgentRunDraftView({
</>
)}
{/* Credentials inputs */}
{Object.entries(agentCredentialsInputFields).map(
([key, inputSubSchema]) => (
<CredentialsInput
key={key}
schema={{ ...inputSubSchema, discriminator: undefined }}
selectedCredentials={
inputCredentials[key] ?? inputSubSchema.default
}
onSelectCredentials={(value) => {
setInputCredentials((obj) => {
const newObj = { ...obj };
if (value === undefined) {
delete newObj[key];
return newObj;
}
return {
...obj,
[key]: value,
};
});
setChangedPresetAttributes((prev) =>
prev.add("credentials"),
);
}}
/>
),
)}
{/* Regular inputs */}
{Object.entries(agentInputFields).map(([key, inputSubSchema]) => (
<RunAgentInputs
@@ -767,17 +695,6 @@ export function AgentRunDraftView({
data-testid={`agent-input-${key}`}
/>
))}
{/* Credentials inputs */}
{credentialFields.length > 0 && (
<CredentialsGroupedView
credentialFields={credentialFields}
requiredCredentials={requiredCredentials}
inputCredentials={inputCredentials}
inputValues={inputValues}
onCredentialChange={handleCredentialChange}
/>
)}
</CardContent>
</Card>
</div>

View File

@@ -4085,48 +4085,6 @@
}
}
},
"/api/local-media/users/{user_id}/{media_type}/{filename}": {
"get": {
"tags": ["media", "media"],
"summary": "Serve local media file",
"description": "Serve a media file from local storage.\nOnly available when GCS is not configured.",
"operationId": "getMediaServe local media file",
"parameters": [
{
"name": "user_id",
"in": "path",
"required": true,
"schema": { "type": "string", "title": "User Id" }
},
{
"name": "media_type",
"in": "path",
"required": true,
"schema": { "type": "string", "title": "Media Type" }
},
{
"name": "filename",
"in": "path",
"required": true,
"schema": { "type": "string", "title": "Filename" }
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": { "application/json": { "schema": {} } }
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
}
}
}
}
}
},
"/api/oauth/app/{client_id}": {
"get": {
"tags": ["oauth"],
@@ -10229,13 +10187,6 @@
{ "type": "null" }
],
"title": "Context"
},
"tags": {
"anyOf": [
{ "items": { "type": "string" }, "type": "array" },
{ "type": "null" }
],
"title": "Tags"
}
},
"type": "object",

View File

@@ -1,5 +1,5 @@
import { CredentialsProvidersContextType } from "@/providers/agent-credentials/credentials-provider";
import { filterSystemCredentials, getSystemCredentials } from "../../helpers";
import { getSystemCredentials } from "../../helpers";
export type CredentialField = [string, any];
@@ -208,42 +208,3 @@ export function findSavedCredentialByProviderAndType(
return undefined;
}
export function findSavedUserCredentialByProviderAndType(
providerNames: string[],
credentialTypes: string[],
requiredScopes: string[] | undefined,
allProviders: CredentialsProvidersContextType | null,
): SavedCredential | undefined {
for (const providerName of providerNames) {
const providerData = allProviders?.[providerName];
if (!providerData) continue;
const userCredentials = filterSystemCredentials(
providerData.savedCredentials ?? [],
);
const matchingCredentials: SavedCredential[] = [];
for (const credential of userCredentials) {
const typeMatches =
credentialTypes.length === 0 ||
credentialTypes.includes(credential.type);
const scopesMatch = hasRequiredScopes(credential, requiredScopes);
if (!typeMatches) continue;
if (!scopesMatch) continue;
matchingCredentials.push(credential as SavedCredential);
}
if (matchingCredentials.length === 1) {
return matchingCredentials[0];
}
if (matchingCredentials.length > 1) {
return undefined;
}
}
return undefined;
}

View File

@@ -98,20 +98,24 @@ export function useCredentialsInput({
// Auto-select the first available credential on initial mount
// Once a user has made a selection, we don't override it
useEffect(
function autoSelectCredential() {
if (readOnly) return;
if (!credentials || !("savedCredentials" in credentials)) return;
if (selectedCredential?.id) return;
useEffect(() => {
if (readOnly) return;
if (!credentials || !("savedCredentials" in credentials)) return;
const savedCreds = credentials.savedCredentials;
if (savedCreds.length === 0) return;
// If already selected, don't auto-select
if (selectedCredential?.id) return;
if (hasAttemptedAutoSelect.current) return;
hasAttemptedAutoSelect.current = true;
// Only attempt auto-selection once
if (hasAttemptedAutoSelect.current) return;
hasAttemptedAutoSelect.current = true;
if (isOptional) return;
// If optional, don't auto-select (user can choose "None")
if (isOptional) return;
const savedCreds = credentials.savedCredentials;
// Auto-select the first credential if any are available
if (savedCreds.length > 0) {
const cred = savedCreds[0];
onSelectCredential({
id: cred.id,
@@ -119,15 +123,14 @@ export function useCredentialsInput({
provider: credentials.provider,
title: (cred as any).title,
});
},
[
credentials,
selectedCredential?.id,
readOnly,
isOptional,
onSelectCredential,
],
);
}
}, [
credentials,
selectedCredential?.id,
readOnly,
isOptional,
onSelectCredential,
]);
if (
!credentials ||

View File

@@ -106,14 +106,9 @@ export function getTimezoneDisplayName(timezone: string): string {
const parts = timezone.split("/");
const city = parts[parts.length - 1].replace(/_/g, " ");
const abbr = getTimezoneAbbreviation(timezone);
if (abbr && abbr !== timezone) {
return `${city} (${abbr})`;
}
// If abbreviation is same as timezone or not found, show timezone with underscores replaced
const timezoneDisplay = timezone.replace(/_/g, " ");
return `${city} (${timezoneDisplay})`;
return abbr ? `${city} (${abbr})` : city;
} catch {
return timezone.replace(/_/g, " ");
return timezone;
}
}