feat(backend): add chat search tools and BM25 reranking (#11782)

This PR adds new chat tools for searching blocks and documentation,
along with BM25 reranking for improved search relevance.

### Changes 🏗️

**New Chat Tools:**
- `find_block` - Search for available blocks by name/description using
hybrid search
- `run_block` - Execute a block directly with provided inputs and
credentials
- `search_docs` - Search documentation with section-level granularity  
- `get_doc_page` - Retrieve full documentation page content

**Search Improvements:**
- Added BM25 reranking to hybrid search for better lexical relevance
- Documentation handler now chunks markdown by headings (##) for
finer-grained embeddings
- Section-based content IDs (`doc_path::section_index`) for precise doc
retrieval
- Startup embedding backfill in scheduler for immediate searchability

**Other Changes:**
- New response models for block and documentation search results
- Updated orphan cleanup to handle section-based doc embeddings
- Added `rank-bm25` dependency for BM25 scoring
- Removed max message limit check in chat service

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Run find_block tool to search for blocks (e.g., "current time")
  - [x] Run run_block tool to execute a found block
  - [x] Run search_docs tool to search documentation
  - [x] Run get_doc_page tool to retrieve full doc content
- [x] Verify BM25 reranking improves search relevance for exact term
matches
  - [x] Verify documentation sections are properly chunked and embedded

#### For configuration changes:
- [x] `.env.default` is updated or already compatible with my changes
- [x] `docker-compose.yml` is updated or already compatible with my
changes
- [x] I have included a list of my configuration changes in the PR
description (under **Changes**)

**Dependencies added:** `rank-bm25` for BM25 scoring algorithm
This commit is contained in:
Swifty
2026-01-16 16:18:10 +01:00
committed by GitHub
parent 4a9b13acb6
commit e55f05c7a8
16 changed files with 1478 additions and 105 deletions

View File

@@ -299,9 +299,6 @@ async def stream_chat_completion(
f"new message_count={len(session.messages)}"
)
if len(session.messages) > config.max_context_messages:
raise ValueError(f"Max messages exceeded: {config.max_context_messages}")
logger.info(
f"Upserting session: {session.session_id} with user id {session.user_id}, "
f"message_count={len(session.messages)}"

View File

@@ -8,8 +8,12 @@ from .add_understanding import AddUnderstandingTool
from .agent_output import AgentOutputTool
from .base import BaseTool
from .find_agent import FindAgentTool
from .find_block import FindBlockTool
from .find_library_agent import FindLibraryAgentTool
from .get_doc_page import GetDocPageTool
from .run_agent import RunAgentTool
from .run_block import RunBlockTool
from .search_docs import SearchDocsTool
if TYPE_CHECKING:
from backend.api.features.chat.response_model import StreamToolOutputAvailable
@@ -18,9 +22,13 @@ if TYPE_CHECKING:
TOOL_REGISTRY: dict[str, BaseTool] = {
"add_understanding": AddUnderstandingTool(),
"find_agent": FindAgentTool(),
"find_block": FindBlockTool(),
"find_library_agent": FindLibraryAgentTool(),
"run_agent": RunAgentTool(),
"run_block": RunBlockTool(),
"agent_output": AgentOutputTool(),
"search_docs": SearchDocsTool(),
"get_doc_page": GetDocPageTool(),
}
# Export individual tool instances for backwards compatibility

View File

@@ -0,0 +1,192 @@
import logging
from typing import Any
from prisma.enums import ContentType
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools.base import BaseTool, ToolResponseBase
from backend.api.features.chat.tools.models import (
BlockInfoSummary,
BlockInputFieldInfo,
BlockListResponse,
ErrorResponse,
NoResultsResponse,
)
from backend.api.features.store.hybrid_search import unified_hybrid_search
from backend.data.block import get_block
logger = logging.getLogger(__name__)
class FindBlockTool(BaseTool):
"""Tool for searching available blocks."""
@property
def name(self) -> str:
return "find_block"
@property
def description(self) -> str:
return (
"Search for available blocks by name or description. "
"Blocks are reusable components that perform specific tasks like "
"sending emails, making API calls, processing text, etc. "
"IMPORTANT: Use this tool FIRST to get the block's 'id' before calling run_block. "
"The response includes each block's id, required_inputs, and input_schema."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"Search query to find blocks by name or description. "
"Use keywords like 'email', 'http', 'text', 'ai', etc."
),
},
},
"required": ["query"],
}
@property
def requires_auth(self) -> bool:
return True
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Search for blocks matching the query.
Args:
user_id: User ID (required)
session: Chat session
query: Search query
Returns:
BlockListResponse: List of matching blocks
NoResultsResponse: No blocks found
ErrorResponse: Error message
"""
query = kwargs.get("query", "").strip()
session_id = session.session_id
if not query:
return ErrorResponse(
message="Please provide a search query",
session_id=session_id,
)
try:
# Search for blocks using hybrid search
results, total = await unified_hybrid_search(
query=query,
content_types=[ContentType.BLOCK],
page=1,
page_size=10,
)
if not results:
return NoResultsResponse(
message=f"No blocks found for '{query}'",
suggestions=[
"Try broader keywords like 'email', 'http', 'text', 'ai'",
"Check spelling of technical terms",
],
session_id=session_id,
)
# Enrich results with full block information
blocks: list[BlockInfoSummary] = []
for result in results:
block_id = result["content_id"]
block = get_block(block_id)
if block:
# Get input/output schemas
input_schema = {}
output_schema = {}
try:
input_schema = block.input_schema.jsonschema()
except Exception:
pass
try:
output_schema = block.output_schema.jsonschema()
except Exception:
pass
# Get categories from block instance
categories = []
if hasattr(block, "categories") and block.categories:
categories = [cat.value for cat in block.categories]
# Extract required inputs for easier use
required_inputs: list[BlockInputFieldInfo] = []
if input_schema:
properties = input_schema.get("properties", {})
required_fields = set(input_schema.get("required", []))
# Get credential field names to exclude from required inputs
credentials_fields = set(
block.input_schema.get_credentials_fields().keys()
)
for field_name, field_schema in properties.items():
# Skip credential fields - they're handled separately
if field_name in credentials_fields:
continue
required_inputs.append(
BlockInputFieldInfo(
name=field_name,
type=field_schema.get("type", "string"),
description=field_schema.get("description", ""),
required=field_name in required_fields,
default=field_schema.get("default"),
)
)
blocks.append(
BlockInfoSummary(
id=block_id,
name=block.name,
description=block.description or "",
categories=categories,
input_schema=input_schema,
output_schema=output_schema,
required_inputs=required_inputs,
)
)
if not blocks:
return NoResultsResponse(
message=f"No blocks found for '{query}'",
suggestions=[
"Try broader keywords like 'email', 'http', 'text', 'ai'",
],
session_id=session_id,
)
return BlockListResponse(
message=(
f"Found {len(blocks)} block(s) matching '{query}'. "
"To execute a block, use run_block with the block's 'id' field "
"and provide 'input_data' matching the block's input_schema."
),
blocks=blocks,
count=len(blocks),
query=query,
session_id=session_id,
)
except Exception as e:
logger.error(f"Error searching blocks: {e}", exc_info=True)
return ErrorResponse(
message="Failed to search blocks",
error=str(e),
session_id=session_id,
)

View File

@@ -0,0 +1,148 @@
"""GetDocPageTool - Fetch full content of a documentation page."""
import logging
from pathlib import Path
from typing import Any
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools.base import BaseTool
from backend.api.features.chat.tools.models import (
DocPageResponse,
ErrorResponse,
ToolResponseBase,
)
logger = logging.getLogger(__name__)
# Base URL for documentation (can be configured)
DOCS_BASE_URL = "https://docs.agpt.co"
class GetDocPageTool(BaseTool):
"""Tool for fetching full content of a documentation page."""
@property
def name(self) -> str:
return "get_doc_page"
@property
def description(self) -> str:
return (
"Get the full content of a documentation page by its path. "
"Use this after search_docs to read the complete content of a relevant page."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": (
"The path to the documentation file, as returned by search_docs. "
"Example: 'platform/block-sdk-guide.md'"
),
},
},
"required": ["path"],
}
@property
def requires_auth(self) -> bool:
return False # Documentation is public
def _get_docs_root(self) -> Path:
"""Get the documentation root directory."""
this_file = Path(__file__)
project_root = this_file.parent.parent.parent.parent.parent.parent.parent.parent
return project_root / "docs"
def _extract_title(self, content: str, fallback: str) -> str:
"""Extract title from markdown content."""
lines = content.split("\n")
for line in lines:
if line.startswith("# "):
return line[2:].strip()
return fallback
def _make_doc_url(self, path: str) -> str:
"""Create a URL for a documentation page."""
url_path = path.rsplit(".", 1)[0] if "." in path else path
return f"{DOCS_BASE_URL}/{url_path}"
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Fetch full content of a documentation page.
Args:
user_id: User ID (not required for docs)
session: Chat session
path: Path to the documentation file
Returns:
DocPageResponse: Full document content
ErrorResponse: Error message
"""
path = kwargs.get("path", "").strip()
session_id = session.session_id if session else None
if not path:
return ErrorResponse(
message="Please provide a documentation path.",
error="Missing path parameter",
session_id=session_id,
)
# Sanitize path to prevent directory traversal
if ".." in path or path.startswith("/"):
return ErrorResponse(
message="Invalid documentation path.",
error="invalid_path",
session_id=session_id,
)
docs_root = self._get_docs_root()
full_path = docs_root / path
if not full_path.exists():
return ErrorResponse(
message=f"Documentation page not found: {path}",
error="not_found",
session_id=session_id,
)
# Ensure the path is within docs root
try:
full_path.resolve().relative_to(docs_root.resolve())
except ValueError:
return ErrorResponse(
message="Invalid documentation path.",
error="invalid_path",
session_id=session_id,
)
try:
content = full_path.read_text(encoding="utf-8")
title = self._extract_title(content, path)
return DocPageResponse(
message=f"Retrieved documentation page: {title}",
title=title,
path=path,
content=content,
doc_url=self._make_doc_url(path),
session_id=session_id,
)
except Exception as e:
logger.error(f"Failed to read documentation page {path}: {e}")
return ErrorResponse(
message=f"Failed to read documentation page: {str(e)}",
error="read_failed",
session_id=session_id,
)

View File

@@ -21,6 +21,10 @@ class ResponseType(str, Enum):
NO_RESULTS = "no_results"
AGENT_OUTPUT = "agent_output"
UNDERSTANDING_UPDATED = "understanding_updated"
BLOCK_LIST = "block_list"
BLOCK_OUTPUT = "block_output"
DOC_SEARCH_RESULTS = "doc_search_results"
DOC_PAGE = "doc_page"
# Base response model
@@ -209,3 +213,83 @@ class UnderstandingUpdatedResponse(ToolResponseBase):
type: ResponseType = ResponseType.UNDERSTANDING_UPDATED
updated_fields: list[str] = Field(default_factory=list)
current_understanding: dict[str, Any] = Field(default_factory=dict)
# Documentation search models
class DocSearchResult(BaseModel):
"""A single documentation search result."""
title: str
path: str
section: str
snippet: str # Short excerpt for UI display
score: float
doc_url: str | None = None
class DocSearchResultsResponse(ToolResponseBase):
"""Response for search_docs tool."""
type: ResponseType = ResponseType.DOC_SEARCH_RESULTS
results: list[DocSearchResult]
count: int
query: str
class DocPageResponse(ToolResponseBase):
"""Response for get_doc_page tool."""
type: ResponseType = ResponseType.DOC_PAGE
title: str
path: str
content: str # Full document content
doc_url: str | None = None
# Block models
class BlockInputFieldInfo(BaseModel):
"""Information about a block input field."""
name: str
type: str
description: str = ""
required: bool = False
default: Any | None = None
class BlockInfoSummary(BaseModel):
"""Summary of a block for search results."""
id: str
name: str
description: str
categories: list[str]
input_schema: dict[str, Any]
output_schema: dict[str, Any]
required_inputs: list[BlockInputFieldInfo] = Field(
default_factory=list,
description="List of required input fields for this block",
)
class BlockListResponse(ToolResponseBase):
"""Response for find_block tool."""
type: ResponseType = ResponseType.BLOCK_LIST
blocks: list[BlockInfoSummary]
count: int
query: str
usage_hint: str = Field(
default="To execute a block, call run_block with block_id set to the block's "
"'id' field and input_data containing the required fields from input_schema."
)
class BlockOutputResponse(ToolResponseBase):
"""Response for run_block tool."""
type: ResponseType = ResponseType.BLOCK_OUTPUT
block_id: str
block_name: str
outputs: dict[str, list[Any]]
success: bool = True

View File

@@ -0,0 +1,297 @@
"""Tool for executing blocks directly."""
import logging
from collections import defaultdict
from typing import Any
from backend.api.features.chat.model import ChatSession
from backend.data.block import get_block
from backend.data.execution import ExecutionContext
from backend.data.model import CredentialsMetaInput
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.util.exceptions import BlockError
from .base import BaseTool
from .models import (
BlockOutputResponse,
ErrorResponse,
SetupInfo,
SetupRequirementsResponse,
ToolResponseBase,
UserReadiness,
)
logger = logging.getLogger(__name__)
class RunBlockTool(BaseTool):
"""Tool for executing a block and returning its outputs."""
@property
def name(self) -> str:
return "run_block"
@property
def description(self) -> str:
return (
"Execute a specific block with the provided input data. "
"IMPORTANT: You MUST call find_block first to get the block's 'id' - "
"do NOT guess or make up block IDs. "
"Use the 'id' from find_block results and provide input_data "
"matching the block's required_inputs."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"block_id": {
"type": "string",
"description": (
"The block's 'id' field from find_block results. "
"NEVER guess this - always get it from find_block first."
),
},
"input_data": {
"type": "object",
"description": (
"Input values for the block. Use the 'required_inputs' field "
"from find_block to see what fields are needed."
),
},
},
"required": ["block_id", "input_data"],
}
@property
def requires_auth(self) -> bool:
return True
async def _check_block_credentials(
self,
user_id: str,
block: Any,
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
"""
Check if user has required credentials for a block.
Returns:
tuple[matched_credentials, missing_credentials]
"""
matched_credentials: dict[str, CredentialsMetaInput] = {}
missing_credentials: list[CredentialsMetaInput] = []
# Get credential field info from block's input schema
credentials_fields_info = block.input_schema.get_credentials_fields_info()
if not credentials_fields_info:
return matched_credentials, missing_credentials
# Get user's available credentials
creds_manager = IntegrationCredentialsManager()
available_creds = await creds_manager.store.get_all_creds(user_id)
for field_name, field_info in credentials_fields_info.items():
# field_info.provider is a frozenset of acceptable providers
# field_info.supported_types is a frozenset of acceptable types
matching_cred = next(
(
cred
for cred in available_creds
if cred.provider in field_info.provider
and cred.type in field_info.supported_types
),
None,
)
if matching_cred:
matched_credentials[field_name] = CredentialsMetaInput(
id=matching_cred.id,
provider=matching_cred.provider, # type: ignore
type=matching_cred.type,
title=matching_cred.title,
)
else:
# Create a placeholder for the missing credential
provider = next(iter(field_info.provider), "unknown")
cred_type = next(iter(field_info.supported_types), "api_key")
missing_credentials.append(
CredentialsMetaInput(
id=field_name,
provider=provider, # type: ignore
type=cred_type, # type: ignore
title=field_name.replace("_", " ").title(),
)
)
return matched_credentials, missing_credentials
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Execute a block with the given input data.
Args:
user_id: User ID (required)
session: Chat session
block_id: Block UUID to execute
input_data: Input values for the block
Returns:
BlockOutputResponse: Block execution outputs
SetupRequirementsResponse: Missing credentials
ErrorResponse: Error message
"""
block_id = kwargs.get("block_id", "").strip()
input_data = kwargs.get("input_data", {})
session_id = session.session_id
if not block_id:
return ErrorResponse(
message="Please provide a block_id",
session_id=session_id,
)
if not isinstance(input_data, dict):
return ErrorResponse(
message="input_data must be an object",
session_id=session_id,
)
if not user_id:
return ErrorResponse(
message="Authentication required",
session_id=session_id,
)
# Get the block
block = get_block(block_id)
if not block:
return ErrorResponse(
message=f"Block '{block_id}' not found",
session_id=session_id,
)
logger.info(f"Executing block {block.name} ({block_id}) for user {user_id}")
# Check credentials
creds_manager = IntegrationCredentialsManager()
matched_credentials, missing_credentials = await self._check_block_credentials(
user_id, block
)
if missing_credentials:
# Return setup requirements response with missing credentials
missing_creds_dict = {c.id: c.model_dump() for c in missing_credentials}
return SetupRequirementsResponse(
message=(
f"Block '{block.name}' requires credentials that are not configured. "
"Please set up the required credentials before running this block."
),
session_id=session_id,
setup_info=SetupInfo(
agent_id=block_id,
agent_name=block.name,
user_readiness=UserReadiness(
has_all_credentials=False,
missing_credentials=missing_creds_dict,
ready_to_run=False,
),
requirements={
"credentials": [c.model_dump() for c in missing_credentials],
"inputs": self._get_inputs_list(block),
"execution_modes": ["immediate"],
},
),
graph_id=None,
graph_version=None,
)
try:
# Fetch actual credentials and prepare kwargs for block execution
# Create execution context with defaults (blocks may require it)
exec_kwargs: dict[str, Any] = {
"user_id": user_id,
"execution_context": ExecutionContext(),
}
for field_name, cred_meta in matched_credentials.items():
# Inject metadata into input_data (for validation)
if field_name not in input_data:
input_data[field_name] = cred_meta.model_dump()
# Fetch actual credentials and pass as kwargs (for execution)
actual_credentials = await creds_manager.get(
user_id, cred_meta.id, lock=False
)
if actual_credentials:
exec_kwargs[field_name] = actual_credentials
else:
return ErrorResponse(
message=f"Failed to retrieve credentials for {field_name}",
session_id=session_id,
)
# Execute the block and collect outputs
outputs: dict[str, list[Any]] = defaultdict(list)
async for output_name, output_data in block.execute(
input_data,
**exec_kwargs,
):
outputs[output_name].append(output_data)
return BlockOutputResponse(
message=f"Block '{block.name}' executed successfully",
block_id=block_id,
block_name=block.name,
outputs=dict(outputs),
success=True,
session_id=session_id,
)
except BlockError as e:
logger.warning(f"Block execution failed: {e}")
return ErrorResponse(
message=f"Block execution failed: {e}",
error=str(e),
session_id=session_id,
)
except Exception as e:
logger.error(f"Unexpected error executing block: {e}", exc_info=True)
return ErrorResponse(
message=f"Failed to execute block: {str(e)}",
error=str(e),
session_id=session_id,
)
def _get_inputs_list(self, block: Any) -> list[dict[str, Any]]:
"""Extract non-credential inputs from block schema."""
inputs_list = []
schema = block.input_schema.jsonschema()
properties = schema.get("properties", {})
required_fields = set(schema.get("required", []))
# Get credential field names to exclude
credentials_fields = set(block.input_schema.get_credentials_fields().keys())
for field_name, field_schema in properties.items():
# Skip credential fields
if field_name in credentials_fields:
continue
inputs_list.append(
{
"name": field_name,
"title": field_schema.get("title", field_name),
"type": field_schema.get("type", "string"),
"description": field_schema.get("description", ""),
"required": field_name in required_fields,
}
)
return inputs_list

View File

@@ -0,0 +1,208 @@
"""SearchDocsTool - Search documentation using hybrid search."""
import logging
from typing import Any
from prisma.enums import ContentType
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools.base import BaseTool
from backend.api.features.chat.tools.models import (
DocSearchResult,
DocSearchResultsResponse,
ErrorResponse,
NoResultsResponse,
ToolResponseBase,
)
from backend.api.features.store.hybrid_search import unified_hybrid_search
logger = logging.getLogger(__name__)
# Base URL for documentation (can be configured)
DOCS_BASE_URL = "https://docs.agpt.co"
# Maximum number of results to return
MAX_RESULTS = 5
# Snippet length for preview
SNIPPET_LENGTH = 200
class SearchDocsTool(BaseTool):
"""Tool for searching AutoGPT platform documentation."""
@property
def name(self) -> str:
return "search_docs"
@property
def description(self) -> str:
return (
"Search the AutoGPT platform documentation for information about "
"how to use the platform, build agents, configure blocks, and more. "
"Returns relevant documentation sections. Use get_doc_page to read full content."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"Search query to find relevant documentation. "
"Use natural language to describe what you're looking for."
),
},
},
"required": ["query"],
}
@property
def requires_auth(self) -> bool:
return False # Documentation is public
def _create_snippet(self, content: str, max_length: int = SNIPPET_LENGTH) -> str:
"""Create a short snippet from content for preview."""
# Remove markdown formatting for cleaner snippet
clean_content = content.replace("#", "").replace("*", "").replace("`", "")
# Remove extra whitespace
clean_content = " ".join(clean_content.split())
if len(clean_content) <= max_length:
return clean_content
# Truncate at word boundary
truncated = clean_content[:max_length]
last_space = truncated.rfind(" ")
if last_space > max_length // 2:
truncated = truncated[:last_space]
return truncated + "..."
def _make_doc_url(self, path: str) -> str:
"""Create a URL for a documentation page."""
# Remove file extension for URL
url_path = path.rsplit(".", 1)[0] if "." in path else path
return f"{DOCS_BASE_URL}/{url_path}"
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Search documentation and return relevant sections.
Args:
user_id: User ID (not required for docs)
session: Chat session
query: Search query
Returns:
DocSearchResultsResponse: List of matching documentation sections
NoResultsResponse: No results found
ErrorResponse: Error message
"""
query = kwargs.get("query", "").strip()
session_id = session.session_id if session else None
if not query:
return ErrorResponse(
message="Please provide a search query.",
error="Missing query parameter",
session_id=session_id,
)
try:
# Search using hybrid search for DOCUMENTATION content type only
results, total = await unified_hybrid_search(
query=query,
content_types=[ContentType.DOCUMENTATION],
page=1,
page_size=MAX_RESULTS * 2, # Fetch extra for deduplication
min_score=0.1, # Lower threshold for docs
)
if not results:
return NoResultsResponse(
message=f"No documentation found for '{query}'.",
suggestions=[
"Try different keywords",
"Use more general terms",
"Check for typos in your query",
],
session_id=session_id,
)
# Deduplicate by document path (keep highest scoring section per doc)
seen_docs: dict[str, dict[str, Any]] = {}
for result in results:
metadata = result.get("metadata", {})
doc_path = metadata.get("path", "")
if not doc_path:
continue
# Keep the highest scoring result for each document
if doc_path not in seen_docs:
seen_docs[doc_path] = result
elif result.get("combined_score", 0) > seen_docs[doc_path].get(
"combined_score", 0
):
seen_docs[doc_path] = result
# Sort by score and take top MAX_RESULTS
deduplicated = sorted(
seen_docs.values(),
key=lambda x: x.get("combined_score", 0),
reverse=True,
)[:MAX_RESULTS]
if not deduplicated:
return NoResultsResponse(
message=f"No documentation found for '{query}'.",
suggestions=[
"Try different keywords",
"Use more general terms",
],
session_id=session_id,
)
# Build response
doc_results: list[DocSearchResult] = []
for result in deduplicated:
metadata = result.get("metadata", {})
doc_path = metadata.get("path", "")
doc_title = metadata.get("doc_title", "")
section_title = metadata.get("section_title", "")
searchable_text = result.get("searchable_text", "")
score = result.get("combined_score", 0)
doc_results.append(
DocSearchResult(
title=doc_title or section_title or doc_path,
path=doc_path,
section=section_title,
snippet=self._create_snippet(searchable_text),
score=round(score, 3),
doc_url=self._make_doc_url(doc_path),
)
)
return DocSearchResultsResponse(
message=f"Found {len(doc_results)} relevant documentation sections.",
results=doc_results,
count=len(doc_results),
query=query,
session_id=session_id,
)
except Exception as e:
logger.error(f"Documentation search failed: {e}")
return ErrorResponse(
message=f"Failed to search documentation: {str(e)}",
error="search_failed",
session_id=session_id,
)

View File

@@ -275,8 +275,22 @@ class BlockHandler(ContentHandler):
}
@dataclass
class MarkdownSection:
"""Represents a section of a markdown document."""
title: str # Section heading text
content: str # Section content (including the heading line)
level: int # Heading level (1 for #, 2 for ##, etc.)
index: int # Section index within the document
class DocumentationHandler(ContentHandler):
"""Handler for documentation files (.md/.mdx)."""
"""Handler for documentation files (.md/.mdx).
Chunks documents by markdown headings to create multiple embeddings per file.
Each section (## heading) becomes a separate embedding for better retrieval.
"""
@property
def content_type(self) -> ContentType:
@@ -297,35 +311,162 @@ class DocumentationHandler(ContentHandler):
docs_root = project_root / "docs"
return docs_root
def _extract_title_and_content(self, file_path: Path) -> tuple[str, str]:
"""Extract title and content from markdown file."""
def _extract_doc_title(self, file_path: Path) -> str:
"""Extract the document title from a markdown file."""
try:
content = file_path.read_text(encoding="utf-8")
lines = content.split("\n")
# Try to extract title from first # heading
lines = content.split("\n")
title = ""
body_lines = []
for line in lines:
if line.startswith("# ") and not title:
title = line[2:].strip()
else:
body_lines.append(line)
if line.startswith("# "):
return line[2:].strip()
# If no title found, use filename
if not title:
title = file_path.stem.replace("-", " ").replace("_", " ").title()
return file_path.stem.replace("-", " ").replace("_", " ").title()
except Exception as e:
logger.warning(f"Failed to read title from {file_path}: {e}")
return file_path.stem.replace("-", " ").replace("_", " ").title()
body = "\n".join(body_lines)
def _chunk_markdown_by_headings(
self, file_path: Path, min_heading_level: int = 2
) -> list[MarkdownSection]:
"""
Split a markdown file into sections based on headings.
return title, body
Args:
file_path: Path to the markdown file
min_heading_level: Minimum heading level to split on (default: 2 for ##)
Returns:
List of MarkdownSection objects, one per section.
If no headings found, returns a single section with all content.
"""
try:
content = file_path.read_text(encoding="utf-8")
except Exception as e:
logger.warning(f"Failed to read {file_path}: {e}")
return file_path.stem, ""
return []
lines = content.split("\n")
sections: list[MarkdownSection] = []
current_section_lines: list[str] = []
current_title = ""
current_level = 0
section_index = 0
doc_title = ""
for line in lines:
# Check if line is a heading
if line.startswith("#"):
# Count heading level
level = 0
for char in line:
if char == "#":
level += 1
else:
break
heading_text = line[level:].strip()
# Track document title (level 1 heading)
if level == 1 and not doc_title:
doc_title = heading_text
# Don't create a section for just the title - add it to first section
current_section_lines.append(line)
continue
# Check if this heading should start a new section
if level >= min_heading_level:
# Save previous section if it has content
if current_section_lines:
section_content = "\n".join(current_section_lines).strip()
if section_content:
# Use doc title for first section if no specific title
title = current_title if current_title else doc_title
if not title:
title = file_path.stem.replace("-", " ").replace(
"_", " "
)
sections.append(
MarkdownSection(
title=title,
content=section_content,
level=current_level if current_level else 1,
index=section_index,
)
)
section_index += 1
# Start new section
current_section_lines = [line]
current_title = heading_text
current_level = level
else:
# Lower level heading (e.g., # when splitting on ##)
current_section_lines.append(line)
else:
current_section_lines.append(line)
# Don't forget the last section
if current_section_lines:
section_content = "\n".join(current_section_lines).strip()
if section_content:
title = current_title if current_title else doc_title
if not title:
title = file_path.stem.replace("-", " ").replace("_", " ")
sections.append(
MarkdownSection(
title=title,
content=section_content,
level=current_level if current_level else 1,
index=section_index,
)
)
# If no sections were created (no headings found), create one section with all content
if not sections and content.strip():
title = (
doc_title
if doc_title
else file_path.stem.replace("-", " ").replace("_", " ")
)
sections.append(
MarkdownSection(
title=title,
content=content.strip(),
level=1,
index=0,
)
)
return sections
def _make_section_content_id(self, doc_path: str, section_index: int) -> str:
"""Create a unique content ID for a document section.
Format: doc_path::section_index
Example: 'platform/getting-started.md::0'
"""
return f"{doc_path}::{section_index}"
def _parse_section_content_id(self, content_id: str) -> tuple[str, int]:
"""Parse a section content ID back into doc_path and section_index.
Returns: (doc_path, section_index)
"""
if "::" in content_id:
parts = content_id.rsplit("::", 1)
return parts[0], int(parts[1])
# Legacy format (whole document)
return content_id, 0
async def get_missing_items(self, batch_size: int) -> list[ContentItem]:
"""Fetch documentation files without embeddings."""
"""Fetch documentation sections without embeddings.
Chunks each document by markdown headings and creates embeddings for each section.
Content IDs use the format: 'path/to/doc.md::section_index'
"""
docs_root = self._get_docs_root()
if not docs_root.exists():
@@ -335,14 +476,28 @@ class DocumentationHandler(ContentHandler):
# Find all .md and .mdx files
all_docs = list(docs_root.rglob("*.md")) + list(docs_root.rglob("*.mdx"))
# Get relative paths for content IDs
doc_paths = [str(doc.relative_to(docs_root)) for doc in all_docs]
if not doc_paths:
if not all_docs:
return []
# Build list of all sections from all documents
all_sections: list[tuple[str, Path, MarkdownSection]] = []
for doc_file in all_docs:
doc_path = str(doc_file.relative_to(docs_root))
sections = self._chunk_markdown_by_headings(doc_file)
for section in sections:
all_sections.append((doc_path, doc_file, section))
if not all_sections:
return []
# Generate content IDs for all sections
section_content_ids = [
self._make_section_content_id(doc_path, section.index)
for doc_path, _, section in all_sections
]
# Check which ones have embeddings
placeholders = ",".join([f"${i+1}" for i in range(len(doc_paths))])
placeholders = ",".join([f"${i+1}" for i in range(len(section_content_ids))])
existing_result = await query_raw_with_schema(
f"""
SELECT "contentId"
@@ -350,76 +505,100 @@ class DocumentationHandler(ContentHandler):
WHERE "contentType" = 'DOCUMENTATION'::{{schema_prefix}}"ContentType"
AND "contentId" = ANY(ARRAY[{placeholders}])
""",
*doc_paths,
*section_content_ids,
)
existing_ids = {row["contentId"] for row in existing_result}
missing_docs = [
(doc_path, doc_file)
for doc_path, doc_file in zip(doc_paths, all_docs)
if doc_path not in existing_ids
# Filter to missing sections
missing_sections = [
(doc_path, doc_file, section, content_id)
for (doc_path, doc_file, section), content_id in zip(
all_sections, section_content_ids
)
if content_id not in existing_ids
]
# Convert to ContentItem
# Convert to ContentItem (up to batch_size)
items = []
for doc_path, doc_file in missing_docs[:batch_size]:
for doc_path, doc_file, section, content_id in missing_sections[:batch_size]:
try:
title, content = self._extract_title_and_content(doc_file)
# Get document title for context
doc_title = self._extract_doc_title(doc_file)
# Build searchable text
searchable_text = f"{title} {content}"
# Build searchable text with context
# Include doc title and section title for better search relevance
searchable_text = f"{doc_title} - {section.title}\n\n{section.content}"
items.append(
ContentItem(
content_id=doc_path,
content_id=content_id,
content_type=ContentType.DOCUMENTATION,
searchable_text=searchable_text,
metadata={
"title": title,
"doc_title": doc_title,
"section_title": section.title,
"section_index": section.index,
"heading_level": section.level,
"path": doc_path,
},
user_id=None, # Documentation is public
)
)
except Exception as e:
logger.warning(f"Failed to process doc {doc_path}: {e}")
logger.warning(f"Failed to process section {content_id}: {e}")
continue
return items
def _get_all_section_content_ids(self, docs_root: Path) -> set[str]:
"""Get all current section content IDs from the docs directory.
Used for stats and cleanup to know what sections should exist.
"""
all_docs = list(docs_root.rglob("*.md")) + list(docs_root.rglob("*.mdx"))
content_ids = set()
for doc_file in all_docs:
doc_path = str(doc_file.relative_to(docs_root))
sections = self._chunk_markdown_by_headings(doc_file)
for section in sections:
content_ids.add(self._make_section_content_id(doc_path, section.index))
return content_ids
async def get_stats(self) -> dict[str, int]:
"""Get statistics about documentation embedding coverage."""
"""Get statistics about documentation embedding coverage.
Counts sections (not documents) since each section gets its own embedding.
"""
docs_root = self._get_docs_root()
if not docs_root.exists():
return {"total": 0, "with_embeddings": 0, "without_embeddings": 0}
# Count all .md and .mdx files
all_docs = list(docs_root.rglob("*.md")) + list(docs_root.rglob("*.mdx"))
total_docs = len(all_docs)
# Get all section content IDs
all_section_ids = self._get_all_section_content_ids(docs_root)
total_sections = len(all_section_ids)
if total_docs == 0:
if total_sections == 0:
return {"total": 0, "with_embeddings": 0, "without_embeddings": 0}
doc_paths = [str(doc.relative_to(docs_root)) for doc in all_docs]
placeholders = ",".join([f"${i+1}" for i in range(len(doc_paths))])
# Count embeddings in database for DOCUMENTATION type
embedded_result = await query_raw_with_schema(
f"""
"""
SELECT COUNT(*) as count
FROM {{schema_prefix}}"UnifiedContentEmbedding"
WHERE "contentType" = 'DOCUMENTATION'::{{schema_prefix}}"ContentType"
AND "contentId" = ANY(ARRAY[{placeholders}])
""",
*doc_paths,
FROM {schema_prefix}"UnifiedContentEmbedding"
WHERE "contentType" = 'DOCUMENTATION'::{schema_prefix}"ContentType"
"""
)
with_embeddings = embedded_result[0]["count"] if embedded_result else 0
return {
"total": total_docs,
"total": total_sections,
"with_embeddings": with_embeddings,
"without_embeddings": total_docs - with_embeddings,
"without_embeddings": total_sections - with_embeddings,
}

View File

@@ -164,20 +164,20 @@ async def test_documentation_handler_get_missing_items(tmp_path, mocker):
assert len(items) == 2
# Check guide.md
# Check guide.md (content_id format: doc_path::section_index)
guide_item = next(
(item for item in items if item.content_id == "guide.md"), None
(item for item in items if item.content_id == "guide.md::0"), None
)
assert guide_item is not None
assert guide_item.content_type == ContentType.DOCUMENTATION
assert "Getting Started" in guide_item.searchable_text
assert "This is a guide" in guide_item.searchable_text
assert guide_item.metadata["title"] == "Getting Started"
assert guide_item.metadata["doc_title"] == "Getting Started"
assert guide_item.user_id is None
# Check api.mdx
# Check api.mdx (content_id format: doc_path::section_index)
api_item = next(
(item for item in items if item.content_id == "api.mdx"), None
(item for item in items if item.content_id == "api.mdx::0"), None
)
assert api_item is not None
assert "API Reference" in api_item.searchable_text
@@ -218,17 +218,74 @@ async def test_documentation_handler_title_extraction(tmp_path):
# Test with heading
doc_with_heading = tmp_path / "with_heading.md"
doc_with_heading.write_text("# My Title\n\nContent here")
title, content = handler._extract_title_and_content(doc_with_heading)
title = handler._extract_doc_title(doc_with_heading)
assert title == "My Title"
assert "# My Title" not in content
assert "Content here" in content
# Test without heading
doc_without_heading = tmp_path / "no-heading.md"
doc_without_heading.write_text("Just content, no heading")
title, content = handler._extract_title_and_content(doc_without_heading)
title = handler._extract_doc_title(doc_without_heading)
assert title == "No Heading" # Uses filename
assert "Just content" in content
@pytest.mark.asyncio(loop_scope="session")
async def test_documentation_handler_markdown_chunking(tmp_path):
"""Test DocumentationHandler chunks markdown by headings."""
handler = DocumentationHandler()
# Test document with multiple sections
doc_with_sections = tmp_path / "sections.md"
doc_with_sections.write_text(
"# Document Title\n\n"
"Intro paragraph.\n\n"
"## Section One\n\n"
"Content for section one.\n\n"
"## Section Two\n\n"
"Content for section two.\n"
)
sections = handler._chunk_markdown_by_headings(doc_with_sections)
# Should have 3 sections: intro (with doc title), section one, section two
assert len(sections) == 3
assert sections[0].title == "Document Title"
assert sections[0].index == 0
assert "Intro paragraph" in sections[0].content
assert sections[1].title == "Section One"
assert sections[1].index == 1
assert "Content for section one" in sections[1].content
assert sections[2].title == "Section Two"
assert sections[2].index == 2
assert "Content for section two" in sections[2].content
# Test document without headings
doc_no_sections = tmp_path / "no-sections.md"
doc_no_sections.write_text("Just plain content without any headings.")
sections = handler._chunk_markdown_by_headings(doc_no_sections)
assert len(sections) == 1
assert sections[0].index == 0
assert "Just plain content" in sections[0].content
@pytest.mark.asyncio(loop_scope="session")
async def test_documentation_handler_section_content_ids():
"""Test DocumentationHandler creates and parses section content IDs."""
handler = DocumentationHandler()
# Test making content ID
content_id = handler._make_section_content_id("docs/guide.md", 2)
assert content_id == "docs/guide.md::2"
# Test parsing content ID
doc_path, section_index = handler._parse_section_content_id("docs/guide.md::2")
assert doc_path == "docs/guide.md"
assert section_index == 2
# Test parsing legacy format (no section index)
doc_path, section_index = handler._parse_section_content_id("docs/old-format.md")
assert doc_path == "docs/old-format.md"
assert section_index == 0
@pytest.mark.asyncio(loop_scope="session")

View File

@@ -683,20 +683,20 @@ async def cleanup_orphaned_embeddings() -> dict[str, Any]:
current_ids = set(get_blocks().keys())
elif content_type == ContentType.DOCUMENTATION:
from pathlib import Path
# embeddings.py is at: backend/backend/api/features/store/embeddings.py
# Need to go up to project root then into docs/
this_file = Path(__file__)
project_root = (
this_file.parent.parent.parent.parent.parent.parent.parent
# Use DocumentationHandler to get section-based content IDs
from backend.api.features.store.content_handlers import (
DocumentationHandler,
)
docs_root = project_root / "docs"
if docs_root.exists():
all_docs = list(docs_root.rglob("*.md")) + list(
docs_root.rglob("*.mdx")
)
current_ids = {str(doc.relative_to(docs_root)) for doc in all_docs}
doc_handler = CONTENT_HANDLERS.get(ContentType.DOCUMENTATION)
if isinstance(doc_handler, DocumentationHandler):
docs_root = doc_handler._get_docs_root()
if docs_root.exists():
current_ids = doc_handler._get_all_section_content_ids(
docs_root
)
else:
current_ids = set()
else:
current_ids = set()
else:

View File

@@ -3,13 +3,16 @@ Unified Hybrid Search
Combines semantic (embedding) search with lexical (tsvector) search
for improved relevance across all content types (agents, blocks, docs).
Includes BM25 reranking for improved lexical relevance.
"""
import logging
import re
from dataclasses import dataclass
from typing import Any, Literal
from prisma.enums import ContentType
from rank_bm25 import BM25Okapi
from backend.api.features.store.embeddings import (
EMBEDDING_DIM,
@@ -21,6 +24,84 @@ from backend.data.db import query_raw_with_schema
logger = logging.getLogger(__name__)
# ============================================================================
# BM25 Reranking
# ============================================================================
def tokenize(text: str) -> list[str]:
"""Simple tokenizer for BM25 - lowercase and split on non-alphanumeric."""
if not text:
return []
# Lowercase and split on non-alphanumeric characters
tokens = re.findall(r"\b\w+\b", text.lower())
return tokens
def bm25_rerank(
query: str,
results: list[dict[str, Any]],
text_field: str = "searchable_text",
bm25_weight: float = 0.3,
original_score_field: str = "combined_score",
) -> list[dict[str, Any]]:
"""
Rerank search results using BM25.
Combines the original combined_score with BM25 score for improved
lexical relevance, especially for exact term matches.
Args:
query: The search query
results: List of result dicts with text_field and original_score_field
text_field: Field name containing the text to score
bm25_weight: Weight for BM25 score (0-1). Original score gets (1 - bm25_weight)
original_score_field: Field name containing the original score
Returns:
Results list sorted by combined score (BM25 + original)
"""
if not results or not query:
return results
# Extract texts and tokenize
corpus = [tokenize(r.get(text_field, "") or "") for r in results]
# Handle edge case where all documents are empty
if all(len(doc) == 0 for doc in corpus):
return results
# Build BM25 index
bm25 = BM25Okapi(corpus)
# Score query against corpus
query_tokens = tokenize(query)
if not query_tokens:
return results
bm25_scores = bm25.get_scores(query_tokens)
# Normalize BM25 scores to 0-1 range
max_bm25 = max(bm25_scores) if max(bm25_scores) > 0 else 1.0
normalized_bm25 = [s / max_bm25 for s in bm25_scores]
# Combine scores
original_weight = 1.0 - bm25_weight
for i, result in enumerate(results):
original_score = result.get(original_score_field, 0) or 0
result["bm25_score"] = normalized_bm25[i]
final_score = (
original_weight * original_score + bm25_weight * normalized_bm25[i]
)
result["final_score"] = final_score
result["relevance"] = final_score
# Sort by relevance descending
results.sort(key=lambda x: x.get("relevance", 0), reverse=True)
return results
@dataclass
class UnifiedSearchWeights:
"""Weights for unified search (no popularity signal)."""
@@ -273,9 +354,7 @@ async def unified_hybrid_search(
FROM normalized
),
filtered AS (
SELECT
*,
COUNT(*) OVER () as total_count
SELECT *, COUNT(*) OVER () as total_count
FROM scored
WHERE combined_score >= {min_score_param}
)
@@ -289,6 +368,15 @@ async def unified_hybrid_search(
)
total = results[0]["total_count"] if results else 0
# Apply BM25 reranking
if results:
results = bm25_rerank(
query=query,
results=results,
text_field="searchable_text",
bm25_weight=0.3,
original_score_field="combined_score",
)
# Clean up results
for result in results:
@@ -516,6 +604,8 @@ async def hybrid_search(
sa.featured,
sa.is_available,
sa.updated_at,
-- Searchable text for BM25 reranking
COALESCE(sa.agent_name, '') || ' ' || COALESCE(sa.sub_heading, '') || ' ' || COALESCE(sa.description, '') as searchable_text,
-- Semantic score
COALESCE(1 - (uce.embedding <=> {embedding_param}::vector), 0) as semantic_score,
-- Lexical score (raw, will normalize)
@@ -573,6 +663,7 @@ async def hybrid_search(
featured,
is_available,
updated_at,
searchable_text,
semantic_score,
lexical_score,
category_score,
@@ -603,8 +694,19 @@ async def hybrid_search(
total = results[0]["total_count"] if results else 0
# Apply BM25 reranking
if results:
results = bm25_rerank(
query=query,
results=results,
text_field="searchable_text",
bm25_weight=0.3,
original_score_field="combined_score",
)
for result in results:
result.pop("total_count", None)
result.pop("searchable_text", None)
logger.info(f"Hybrid search (store agents): {len(results)} results, {total} total")

View File

@@ -311,11 +311,43 @@ async def test_hybrid_search_min_score_filtering():
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.integration
async def test_hybrid_search_pagination():
"""Test hybrid search pagination."""
"""Test hybrid search pagination.
Pagination happens in SQL (LIMIT/OFFSET), then BM25 reranking is applied
to the paginated results.
"""
# Create mock results that SQL would return for a page
mock_results = [
{
"slug": f"agent-{i}",
"agent_name": f"Agent {i}",
"agent_image": "test.png",
"creator_username": "test",
"creator_avatar": "avatar.png",
"sub_heading": "Test",
"description": "Test description",
"runs": 100 - i,
"rating": 4.5,
"categories": ["test"],
"featured": False,
"is_available": True,
"updated_at": "2024-01-01T00:00:00Z",
"searchable_text": f"Agent {i} test description",
"combined_score": 0.9 - (i * 0.01),
"semantic_score": 0.7,
"lexical_score": 0.6,
"category_score": 0.5,
"recency_score": 0.4,
"popularity_score": 0.3,
"total_count": 25,
}
for i in range(10) # SQL returns page_size results
]
with patch(
"backend.api.features.store.hybrid_search.query_raw_with_schema"
) as mock_query:
mock_query.return_value = []
mock_query.return_value = mock_results
with patch(
"backend.api.features.store.hybrid_search.embed_query"
@@ -329,16 +361,18 @@ async def test_hybrid_search_pagination():
page_size=10,
)
# Verify pagination parameters
# Verify results returned
assert len(results) == 10
assert total == 25 # Total from SQL COUNT(*) OVER()
# Verify the SQL query uses page_size and offset
call_args = mock_query.call_args
params = call_args[0]
# Last two params should be LIMIT and OFFSET
limit = params[-2]
offset = params[-1]
assert limit == 10 # page_size
assert offset == 10 # (page - 1) * page_size = (2 - 1) * 10
# Last two params are page_size and offset
page_size_param = params[-2]
offset_param = params[-1]
assert page_size_param == 10
assert offset_param == 10 # (page 2 - 1) * 10
@pytest.mark.asyncio(loop_scope="session")
@@ -609,14 +643,36 @@ async def test_unified_hybrid_search_empty_query():
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.integration
async def test_unified_hybrid_search_pagination():
"""Test unified search pagination."""
"""Test unified search pagination with BM25 reranking.
Pagination happens in SQL (LIMIT/OFFSET), then BM25 reranking is applied
to the paginated results.
"""
# Create mock results that SQL would return for a page
mock_results = [
{
"content_type": "STORE_AGENT",
"content_id": f"agent-{i}",
"searchable_text": f"Agent {i} description",
"metadata": {"name": f"Agent {i}"},
"updated_at": "2025-01-01T00:00:00Z",
"semantic_score": 0.7,
"lexical_score": 0.8 - (i * 0.01),
"category_score": 0.5,
"recency_score": 0.3,
"combined_score": 0.6 - (i * 0.01),
"total_count": 50,
}
for i in range(15) # SQL returns page_size results
]
with patch(
"backend.api.features.store.hybrid_search.query_raw_with_schema"
) as mock_query:
with patch(
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_query.return_value = []
mock_query.return_value = mock_results
mock_embed.return_value = [0.1] * embeddings.EMBEDDING_DIM
results, total = await unified_hybrid_search(
@@ -625,15 +681,18 @@ async def test_unified_hybrid_search_pagination():
page_size=15,
)
# Verify pagination parameters (last two params are LIMIT and OFFSET)
# Verify results returned
assert len(results) == 15
assert total == 50 # Total from SQL COUNT(*) OVER()
# Verify the SQL query uses page_size and offset
call_args = mock_query.call_args
params = call_args[0]
limit = params[-2]
offset = params[-1]
assert limit == 15 # page_size
assert offset == 30 # (page - 1) * page_size = (3 - 1) * 15
# Last two params are page_size and offset
page_size_param = params[-2]
offset_param = params[-1]
assert page_size_param == 15
assert offset_param == 30 # (page 3 - 1) * 15
@pytest.mark.asyncio(loop_scope="session")

View File

@@ -680,12 +680,23 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
return False, reviewed_data
async def _execute(self, input_data: BlockInput, **kwargs) -> BlockOutput:
# Check for review requirement and get potentially modified input data
should_pause, input_data = await self.is_block_exec_need_review(
input_data, **kwargs
# Check for review requirement only if running within a graph execution context
# Direct block execution (e.g., from chat) skips the review process
has_graph_context = all(
key in kwargs
for key in (
"node_exec_id",
"graph_exec_id",
"graph_id",
"execution_context",
)
)
if should_pause:
return
if has_graph_context:
should_pause, input_data = await self.is_block_exec_need_review(
input_data, **kwargs
)
if should_pause:
return
# Validate the input data (original or reviewer-modified) once
if error := self.input_schema.validate_data(input_data):

View File

@@ -602,6 +602,18 @@ class Scheduler(AppService):
self.scheduler.add_listener(job_max_instances_listener, EVENT_JOB_MAX_INSTANCES)
self.scheduler.start()
# Run embedding backfill immediately on startup
# This ensures blocks/docs are searchable right away, not after 6 hours
# Safe to run on multiple pods - uses upserts and checks for existing embeddings
if self.register_system_tasks:
logger.info("Running embedding backfill on startup...")
try:
result = ensure_embeddings_coverage()
logger.info(f"Startup embedding backfill complete: {result}")
except Exception as e:
logger.error(f"Startup embedding backfill failed: {e}")
# Don't fail startup - the scheduled job will retry later
# Keep the service running since BackgroundScheduler doesn't block
super().run_service()

View File

@@ -5339,6 +5339,24 @@ urllib3 = ">=1.26.14,<3"
fastembed = ["fastembed (>=0.7,<0.8)"]
fastembed-gpu = ["fastembed-gpu (>=0.7,<0.8)"]
[[package]]
name = "rank-bm25"
version = "0.2.2"
description = "Various BM25 algorithms for document ranking"
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "rank_bm25-0.2.2-py3-none-any.whl", hash = "sha256:7bd4a95571adadfc271746fa146a4bcfd89c0cf731e49c3d1ad863290adbe8ae"},
{file = "rank_bm25-0.2.2.tar.gz", hash = "sha256:096ccef76f8188563419aaf384a02f0ea459503fdf77901378d4fd9d87e5e51d"},
]
[package.dependencies]
numpy = "*"
[package.extras]
dev = ["pytest"]
[[package]]
name = "rapidfuzz"
version = "3.13.0"
@@ -7494,4 +7512,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.14"
content-hash = "86838b5ae40d606d6e01a14dad8a56c389d890d7a6a0c274a6602cca80f0df84"
content-hash = "18b92e09596298c82432e4d0a85cb6d80a40b4229bee0a0c15f0529fd6cb21a4"

View File

@@ -46,6 +46,7 @@ poetry = "2.1.1" # CHECK DEPENDABOT SUPPORT BEFORE UPGRADING
postmarker = "^1.0"
praw = "~7.8.1"
prisma = "^0.15.0"
rank-bm25 = "^0.2.2"
prometheus-client = "^0.22.1"
prometheus-fastapi-instrumentator = "^7.0.0"
psutil = "^7.0.0"