Merge remote-tracking branch 'origin/hackathon/copilot' into hackathon/copilot

This commit is contained in:
Lluis Agusti
2025-12-16 18:32:46 +01:00
8 changed files with 1400 additions and 58 deletions

View File

@@ -12,7 +12,7 @@ class ChatConfig(BaseSettings):
# OpenAI API Configuration
model: str = Field(
default="qwen/qwen3-235b-a22b-2507", description="Default model to use"
default="anthropic/claude-opus-4.5", description="Default model to use"
)
api_key: str | None = Field(default=None, description="OpenAI API key")
base_url: str | None = Field(
@@ -72,8 +72,31 @@ class ChatConfig(BaseSettings):
v = "https://openrouter.ai/api/v1"
return v
# Prompt paths for different contexts
PROMPT_PATHS: dict[str, str] = {
"default": "prompts/chat_system.md",
"onboarding": "prompts/onboarding_system.md",
}
def get_system_prompt_for_type(
self, prompt_type: str = "default", **template_vars
) -> str:
"""Load and render a system prompt by type.
Args:
prompt_type: The type of prompt to load ("default" or "onboarding")
**template_vars: Variables to substitute in the template
Returns:
Rendered system prompt string
"""
prompt_path_str = self.PROMPT_PATHS.get(
prompt_type, self.PROMPT_PATHS["default"]
)
return self._load_prompt_from_path(prompt_path_str, **template_vars)
def get_system_prompt(self, **template_vars) -> str:
"""Load and render the system prompt from file.
"""Load and render the default system prompt from file.
Args:
**template_vars: Variables to substitute in the template
@@ -82,9 +105,21 @@ class ChatConfig(BaseSettings):
Rendered system prompt string
"""
return self._load_prompt_from_path(self.system_prompt_path, **template_vars)
def _load_prompt_from_path(self, prompt_path_str: str, **template_vars) -> str:
"""Load and render a system prompt from a given path.
Args:
prompt_path_str: Path to the prompt file relative to chat module
**template_vars: Variables to substitute in the template
Returns:
Rendered system prompt string
"""
# Get the path relative to this module
module_dir = Path(__file__).parent
prompt_path = module_dir / self.system_prompt_path
prompt_path = module_dir / prompt_path_str
# Check for .j2 extension first (Jinja2 template)
j2_path = Path(str(prompt_path) + ".j2")

View File

@@ -0,0 +1,155 @@
You are Otto, an AI Co-Pilot helping new users get started with AutoGPT, an AI Business Automation platform. Your mission is to welcome them, learn about their needs, and help them run their first successful agent.
Here are the functions available to you:
<functions>
**Understanding & Discovery:**
1. **add_understanding** - Save information about the user's business context (use this as you learn about them)
2. **find_agent** - Search the marketplace for pre-built agents that solve the user's problem
3. **find_library_agent** - Search the user's personal library of saved agents
4. **find_block** - Search for individual blocks (building components for agents)
5. **search_platform_docs** - Search AutoGPT documentation for help
**Agent Creation & Editing:**
6. **create_agent** - Create a new custom agent from scratch based on user requirements
7. **edit_agent** - Modify an existing agent (add/remove blocks, change configuration)
**Execution & Output:**
8. **run_agent** - Run or schedule an agent (automatically handles setup)
9. **run_block** - Run a single block directly without creating an agent
10. **agent_output** - Get the output/results from a running or completed agent execution
</functions>
## YOUR ONBOARDING MISSION
You are guiding a new user through their first experience with AutoGPT. Your goal is to:
1. Welcome them warmly and get their name
2. Learn about them and their business
3. Find or create an agent that solves a real problem for them
4. Get that agent running successfully
5. Celebrate their success and point them to next steps
## PHASE 1: WELCOME & INTRODUCTION
**Start every conversation by:**
- Giving a warm, friendly greeting
- Introducing yourself as Otto, their AI assistant
- Asking for their name immediately
**Example opening:**
```
Hi! I'm Otto, your AI assistant. Welcome to AutoGPT! I'm here to help you set up your first automation. What's your name?
```
Once you have their name, save it immediately with `add_understanding(user_name="...")` and use it throughout.
## PHASE 2: DISCOVERY
**After getting their name, learn about them:**
- What's their role/job title?
- What industry/business are they in?
- What's one thing they'd love to automate?
**Keep it conversational - don't interrogate. Example:**
```
Nice to meet you, Sarah! What do you do for work, and what's one task you wish you could automate?
```
Save everything you learn with `add_understanding`.
## PHASE 3: FIND OR CREATE AN AGENT
**Once you understand their need:**
- Search for existing agents with `find_agent`
- Present the best match and explain how it helps them
- If nothing fits, offer to create a custom agent with `create_agent`
**Be enthusiastic about the solution:**
```
I found a great agent for you! The "Social Media Scheduler" can automatically post to your accounts on a schedule. Want to try it?
```
## PHASE 4: SETUP & RUN
**Guide them through running the agent:**
1. Call `run_agent` without inputs first to see what's needed
2. Explain each input in simple terms
3. Ask what values they want to use
4. Run the agent with their inputs or defaults
**Don't mention credentials** - the UI handles that automatically.
## PHASE 5: CELEBRATE & HANDOFF
**After successful execution:**
- Congratulate them on their first automation!
- Tell them where to find this agent (their Library)
- Mention they can explore more agents in the Marketplace
- Offer to help with anything else
**Example:**
```
You did it! Your first agent is running. You can find it anytime in your Library. Ready to explore more automations?
```
## KEY RULES
**What You DON'T Do:**
- Don't help with login (frontend handles this)
- Don't mention credentials (UI handles automatically)
- Don't run agents without showing inputs first
- Don't use `use_defaults=true` without explicit confirmation
- Don't write responses longer than 3 sentences
- Don't overwhelm with too many questions at once
**What You DO:**
- ALWAYS get the user's name first
- Be warm, encouraging, and celebratory
- Save info with `add_understanding` as you learn it
- Use their name when addressing them
- Keep responses to maximum 3 sentences
- Make them feel successful at each step
## USING add_understanding
Save information as you learn it:
**User info:** `user_name`, `job_title`
**Business:** `business_name`, `industry`, `business_size`, `user_role`
**Pain points:** `pain_points`, `manual_tasks`, `automation_goals`
**Tools:** `current_software`
Example: `add_understanding(user_name="Sarah", job_title="Marketing Manager", automation_goals=["social media scheduling"])`
## HOW run_agent WORKS
1. **First call** (no inputs) → Shows available inputs
2. **Credentials** → UI handles automatically (don't mention)
3. **Execution** → Run with `inputs={...}` or `use_defaults=true`
## RESPONSE STRUCTURE
Before responding, plan your approach in <thinking> tags:
- What phase am I in? (Welcome/Discovery/Find/Setup/Celebrate)
- Do I know their name? If not, ask for it
- What's the next step to move them forward?
- Keep response under 3 sentences
**Example flow:**
```
User: "Hi"
Otto: <thinking>Phase 1 - I need to welcome them and get their name.</thinking>
Hi! I'm Otto, welcome to AutoGPT! I'm here to help you set up your first automation - what's your name?
User: "I'm Alex"
Otto: [calls add_understanding with user_name="Alex"]
<thinking>Got their name. Phase 2 - learn about them.</thinking>
Great to meet you, Alex! What do you do for work, and what's one task you'd love to automate?
User: "I run an e-commerce store and spend hours on customer support emails"
Otto: [calls add_understanding with industry="e-commerce", pain_points=["customer support emails"]]
<thinking>Phase 3 - search for agents.</thinking>
[calls find_agent with query="customer support email automation"]
```
KEEP ANSWERS TO 3 SENTENCES - Be warm, helpful, and focused on their success!

View File

@@ -310,6 +310,125 @@ async def session_assign_user(
return {"status": "ok"}
# ========== Onboarding Routes ==========
# These routes use a specialized onboarding system prompt
@router.post(
"/onboarding/sessions",
)
async def create_onboarding_session(
user_id: Annotated[str | None, Depends(auth.get_user_id)],
) -> CreateSessionResponse:
"""
Create a new onboarding chat session.
Initiates a new chat session specifically for user onboarding,
using a specialized prompt that guides users through their first
experience with AutoGPT.
Args:
user_id: The optional authenticated user ID parsed from the JWT.
Returns:
CreateSessionResponse: Details of the created onboarding session.
"""
logger.info(
f"Creating onboarding session with user_id: "
f"...{user_id[-8:] if user_id and len(user_id) > 8 else '<redacted>'}"
)
session = await chat_service.create_chat_session(user_id)
return CreateSessionResponse(
id=session.session_id,
created_at=session.started_at.isoformat(),
user_id=session.user_id or None,
)
@router.get(
"/onboarding/sessions/{session_id}",
)
async def get_onboarding_session(
session_id: str,
user_id: Annotated[str | None, Depends(auth.get_user_id)],
) -> SessionDetailResponse:
"""
Retrieve the details of an onboarding chat session.
Args:
session_id: The unique identifier for the onboarding session.
user_id: The optional authenticated user ID.
Returns:
SessionDetailResponse: Details for the requested session.
"""
session = await chat_service.get_session(session_id, user_id)
if not session:
raise NotFoundError(f"Session {session_id} not found")
return SessionDetailResponse(
id=session.session_id,
created_at=session.started_at.isoformat(),
updated_at=session.updated_at.isoformat(),
user_id=session.user_id or None,
messages=[message.model_dump() for message in session.messages],
)
@router.post(
"/onboarding/sessions/{session_id}/stream",
)
async def stream_onboarding_chat(
session_id: str,
request: StreamChatRequest,
user_id: str | None = Depends(auth.get_user_id),
):
"""
Stream onboarding chat responses for a session.
Uses the specialized onboarding system prompt to guide new users
through their first experience with AutoGPT. Streams AI responses
in real time over Server-Sent Events (SSE).
Args:
session_id: The onboarding session identifier.
request: Request body containing message and optional context.
user_id: Optional authenticated user ID.
Returns:
StreamingResponse: SSE-formatted response chunks.
"""
session = await chat_service.get_session(session_id, user_id)
if not session:
raise NotFoundError(f"Session {session_id} not found.")
if session.user_id is None and user_id is not None:
session = await chat_service.assign_user_to_session(session_id, user_id)
async def event_generator() -> AsyncGenerator[str, None]:
async for chunk in chat_service.stream_chat_completion(
session_id,
request.message,
is_user_message=request.is_user_message,
user_id=user_id,
session=session,
context=request.context,
prompt_type="onboarding", # Use onboarding system prompt
):
yield chunk.to_sse()
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
# ========== Health Check ==========

View File

@@ -37,10 +37,20 @@ config = backend.server.v2.chat.config.ChatConfig()
client = AsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
async def _build_system_prompt(user_id: str | None) -> str:
"""Build the full system prompt including business understanding if available."""
# Start with the base system prompt
base_prompt = config.get_system_prompt()
async def _build_system_prompt(
user_id: str | None, prompt_type: str = "default"
) -> str:
"""Build the full system prompt including business understanding if available.
Args:
user_id: The user ID for fetching business understanding
prompt_type: The type of prompt to load ("default" or "onboarding")
Returns:
The full system prompt with business understanding context if available
"""
# Start with the base system prompt for the specified type
base_prompt = config.get_system_prompt_for_type(prompt_type)
# If user is authenticated, try to fetch their business understanding
if user_id:
@@ -118,6 +128,7 @@ async def stream_chat_completion(
retry_count: int = 0,
session: ChatSession | None = None,
context: dict[str, str] | None = None, # {url: str, content: str}
prompt_type: str = "default",
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Main entry point for streaming chat completions with database handling.
@@ -129,6 +140,7 @@ async def stream_chat_completion(
user_message: User's input message
user_id: User ID for authentication (None for anonymous)
session: Optional pre-loaded session object (for recursive calls to avoid Redis refetch)
prompt_type: The type of prompt to use ("default" or "onboarding")
Yields:
StreamBaseResponse objects formatted as SSE
@@ -191,7 +203,7 @@ async def stream_chat_completion(
assert session, "Session not found"
# Build system prompt with business understanding
system_prompt = await _build_system_prompt(user_id)
system_prompt = await _build_system_prompt(user_id, prompt_type)
assistant_response = ChatMessage(
role="assistant",
@@ -332,6 +344,7 @@ async def stream_chat_completion(
user_id=user_id,
retry_count=retry_count + 1,
session=session,
prompt_type=prompt_type,
):
yield chunk
return # Exit after retry to avoid double-saving in finally block
@@ -377,6 +390,7 @@ async def stream_chat_completion(
session_id=session.session_id,
user_id=user_id,
session=session, # Pass session object to avoid Redis refetch
prompt_type=prompt_type,
):
yield chunk

File diff suppressed because one or more lines are too long

View File

@@ -1,4 +1,4 @@
"""Tool for searching available blocks."""
"""Tool for searching available blocks using hybrid search."""
import logging
from typing import Any
@@ -13,6 +13,7 @@ from backend.server.v2.chat.tools.models import (
NoResultsResponse,
ToolResponseBase,
)
from backend.server.v2.chat.tools.search_blocks import get_block_search_index
logger = logging.getLogger(__name__)
@@ -114,59 +115,54 @@ class FindBlockTool(BaseTool):
)
try:
all_blocks = load_all_blocks()
logger.info(f"Searching {len(all_blocks)} blocks for: {query}")
# Try hybrid search first
search_results = self._hybrid_search(query)
# Find matching blocks with priority scores
matches: list[tuple[int, Any]] = []
for block_id, block_cls in all_blocks.items():
block = block_cls()
priority, is_match = self._matches_query(block, query)
if is_match:
matches.append((priority, block))
# Sort by priority (lower is better)
matches.sort(key=lambda x: x[0])
# Take top 10 results
top_matches = [block for _, block in matches[:10]]
if not top_matches:
return NoResultsResponse(
message=f"No blocks found matching '{query}'",
session_id=session_id,
suggestions=[
"Try more general terms",
"Search by category: ai, text, social, search, etc.",
"Check block names like 'SendEmail', 'HttpRequest', etc.",
],
)
# Build response
blocks = []
for block in top_matches:
blocks.append(
BlockInfoSummary(
id=block.id,
name=block.name,
description=block.description,
categories=[cat.name for cat in block.categories],
input_schema=block.input_schema.jsonschema(),
output_schema=block.output_schema.jsonschema(),
if search_results is not None:
# Hybrid search succeeded
if not search_results:
return NoResultsResponse(
message=f"No blocks found matching '{query}'",
session_id=session_id,
suggestions=[
"Try more general terms",
"Search by category: ai, text, social, search, etc.",
"Check block names like 'SendEmail', 'HttpRequest', etc.",
],
)
# Get full block info for each result
all_blocks = load_all_blocks()
blocks = []
for result in search_results:
block_cls = all_blocks.get(result.block_id)
if block_cls:
block = block_cls()
blocks.append(
BlockInfoSummary(
id=block.id,
name=block.name,
description=block.description,
categories=[cat.name for cat in block.categories],
input_schema=block.input_schema.jsonschema(),
output_schema=block.output_schema.jsonschema(),
)
)
return BlockListResponse(
message=(
f"Found {len(blocks)} block{'s' if len(blocks) != 1 else ''} "
f"matching '{query}'. Use run_block to execute a block with "
"the required inputs."
),
blocks=blocks,
count=len(blocks),
query=query,
session_id=session_id,
)
return BlockListResponse(
message=(
f"Found {len(blocks)} block{'s' if len(blocks) != 1 else ''} "
f"matching '{query}'. Use run_block to execute a block with "
"the required inputs."
),
blocks=blocks,
count=len(blocks),
query=query,
session_id=session_id,
)
# Fallback to simple search if hybrid search failed
return self._simple_search(query, session_id)
except Exception as e:
logger.error(f"Error searching blocks: {e}", exc_info=True)
@@ -175,3 +171,82 @@ class FindBlockTool(BaseTool):
error=str(e),
session_id=session_id,
)
def _hybrid_search(self, query: str) -> list | None:
"""
Perform hybrid search using embeddings and BM25.
Returns:
List of BlockSearchResult if successful, None if index not available
"""
try:
index = get_block_search_index()
if not index.load():
logger.info(
"Block search index not available, falling back to simple search"
)
return None
results = index.search(query, top_k=10)
logger.info(f"Hybrid search found {len(results)} blocks for: {query}")
return results
except Exception as e:
logger.warning(f"Hybrid search failed, falling back to simple: {e}")
return None
def _simple_search(self, query: str, session_id: str) -> ToolResponseBase:
"""Fallback simple search using substring matching."""
all_blocks = load_all_blocks()
logger.info(f"Simple searching {len(all_blocks)} blocks for: {query}")
# Find matching blocks with priority scores
matches: list[tuple[int, Any]] = []
for block_id, block_cls in all_blocks.items():
block = block_cls()
priority, is_match = self._matches_query(block, query)
if is_match:
matches.append((priority, block))
# Sort by priority (lower is better)
matches.sort(key=lambda x: x[0])
# Take top 10 results
top_matches = [block for _, block in matches[:10]]
if not top_matches:
return NoResultsResponse(
message=f"No blocks found matching '{query}'",
session_id=session_id,
suggestions=[
"Try more general terms",
"Search by category: ai, text, social, search, etc.",
"Check block names like 'SendEmail', 'HttpRequest', etc.",
],
)
# Build response
blocks = []
for block in top_matches:
blocks.append(
BlockInfoSummary(
id=block.id,
name=block.name,
description=block.description,
categories=[cat.name for cat in block.categories],
input_schema=block.input_schema.jsonschema(),
output_schema=block.output_schema.jsonschema(),
)
)
return BlockListResponse(
message=(
f"Found {len(blocks)} block{'s' if len(blocks) != 1 else ''} "
f"matching '{query}'. Use run_block to execute a block with "
"the required inputs."
),
blocks=blocks,
count=len(blocks),
query=query,
session_id=session_id,
)

View File

@@ -0,0 +1,483 @@
#!/usr/bin/env python3
"""
Block Indexer for Hybrid Search
Creates a hybrid search index from blocks:
- OpenAI embeddings (text-embedding-3-small)
- BM25 index for lexical search
- Name index for title matching boost
Supports incremental updates by tracking content hashes.
Usage:
python -m backend.server.v2.chat.tools.index_blocks [--force]
"""
import argparse
import base64
import hashlib
import json
import logging
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
import numpy as np
logger = logging.getLogger(__name__)
# Optional imports with graceful fallback
try:
from openai import OpenAI
HAS_OPENAI = True
except ImportError:
HAS_OPENAI = False
print("Warning: openai not installed. Run: pip install openai")
# Default embedding model (OpenAI)
DEFAULT_EMBEDDING_MODEL = "text-embedding-3-small"
DEFAULT_EMBEDDING_DIM = 1536
# Output path (relative to this file)
INDEX_PATH = Path(__file__).parent / "blocks_index.json"
# Stopwords for tokenization
STOPWORDS = {
"the",
"a",
"an",
"is",
"are",
"was",
"were",
"be",
"been",
"being",
"have",
"has",
"had",
"do",
"does",
"did",
"will",
"would",
"could",
"should",
"may",
"might",
"must",
"shall",
"can",
"need",
"dare",
"ought",
"used",
"to",
"of",
"in",
"for",
"on",
"with",
"at",
"by",
"from",
"as",
"into",
"through",
"during",
"before",
"after",
"above",
"below",
"between",
"under",
"again",
"further",
"then",
"once",
"and",
"but",
"or",
"nor",
"so",
"yet",
"both",
"either",
"neither",
"not",
"only",
"own",
"same",
"than",
"too",
"very",
"just",
"also",
"now",
"here",
"there",
"when",
"where",
"why",
"how",
"all",
"each",
"every",
"few",
"more",
"most",
"other",
"some",
"such",
"no",
"any",
"this",
"that",
"these",
"those",
"it",
"its",
"block", # Too common in block context
}
def tokenize(text: str) -> list[str]:
"""Simple tokenizer for BM25."""
text = text.lower()
# Remove code blocks if any
text = re.sub(r"```[\s\S]*?```", "", text)
text = re.sub(r"`[^`]+`", "", text)
# Extract words (including camelCase split)
# First, split camelCase
text = re.sub(r"([a-z])([A-Z])", r"\1 \2", text)
# Extract words
words = re.findall(r"\b[a-z][a-z0-9_-]*\b", text)
# Remove very short words and stopwords
return [w for w in words if len(w) > 2 and w not in STOPWORDS]
def build_searchable_text(block: Any) -> str:
"""Build searchable text from block attributes."""
parts = []
# Block name (split camelCase for better tokenization)
name = block.name
# Split camelCase: GetCurrentTimeBlock -> Get Current Time Block
name_split = re.sub(r"([a-z])([A-Z])", r"\1 \2", name)
parts.append(name_split)
# Description
if block.description:
parts.append(block.description)
# Categories
for category in block.categories:
parts.append(category.name)
# Input schema field names and descriptions
try:
input_schema = block.input_schema.jsonschema()
if "properties" in input_schema:
for field_name, field_info in input_schema["properties"].items():
parts.append(field_name)
if "description" in field_info:
parts.append(field_info["description"])
except Exception:
pass
# Output schema field names
try:
output_schema = block.output_schema.jsonschema()
if "properties" in output_schema:
for field_name in output_schema["properties"]:
parts.append(field_name)
except Exception:
pass
return " ".join(parts)
def compute_content_hash(text: str) -> str:
"""Compute MD5 hash of text for change detection."""
return hashlib.md5(text.encode()).hexdigest()
def load_existing_index(index_path: Path) -> dict[str, Any] | None:
"""Load existing index if present."""
if not index_path.exists():
return None
try:
with open(index_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load existing index: {e}")
return None
def create_embeddings(
texts: list[str],
model_name: str = DEFAULT_EMBEDDING_MODEL,
batch_size: int = 100,
) -> np.ndarray:
"""Create embeddings using OpenAI API."""
if not HAS_OPENAI:
raise RuntimeError("openai not installed. Run: pip install openai")
# Import here to satisfy type checker
from openai import OpenAI
# Check for API key
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise RuntimeError("OPENAI_API_KEY environment variable not set")
client = OpenAI(api_key=api_key)
embeddings = []
print(f"Creating embeddings for {len(texts)} texts using {model_name}...")
for i in range(0, len(texts), batch_size):
batch = texts[i : i + batch_size]
# Truncate texts to max token limit (8191 tokens for text-embedding-3-small)
# Roughly 4 chars per token, so ~32000 chars max
batch = [text[:32000] for text in batch]
response = client.embeddings.create(
model=model_name,
input=batch,
)
for embedding_data in response.data:
embeddings.append(embedding_data.embedding)
print(f" Processed {min(i + batch_size, len(texts))}/{len(texts)} texts")
return np.array(embeddings, dtype=np.float32)
def build_bm25_data(
blocks_data: list[dict[str, Any]],
) -> dict[str, Any]:
"""Build BM25 metadata from block data."""
# Tokenize all searchable texts
tokenized_docs = []
for block in blocks_data:
tokens = tokenize(block["searchable_text"])
tokenized_docs.append(tokens)
# Calculate document frequencies
doc_freq: dict[str, int] = {}
for tokens in tokenized_docs:
seen = set()
for token in tokens:
if token not in seen:
doc_freq[token] = doc_freq.get(token, 0) + 1
seen.add(token)
n_docs = len(tokenized_docs)
doc_lens = [len(d) for d in tokenized_docs]
avgdl = sum(doc_lens) / max(n_docs, 1)
return {
"n_docs": n_docs,
"avgdl": avgdl,
"df": doc_freq,
"doc_lens": doc_lens,
}
def build_name_index(
blocks_data: list[dict[str, Any]],
) -> dict[str, list[list[int | float]]]:
"""Build inverted index for name search boost."""
index: dict[str, list[list[int | float]]] = defaultdict(list)
for idx, block in enumerate(blocks_data):
# Tokenize block name
name_tokens = tokenize(block["name"])
seen = set()
for i, token in enumerate(name_tokens):
if token in seen:
continue
seen.add(token)
# Score: first token gets higher weight
score = 1.5 if i == 0 else 1.0
index[token].append([idx, score])
return dict(index)
def build_block_index(
force_rebuild: bool = False,
output_path: Path = INDEX_PATH,
) -> dict[str, Any]:
"""
Build the block search index.
Args:
force_rebuild: If True, rebuild all embeddings even if unchanged
output_path: Path to save the index
Returns:
The generated index dictionary
"""
# Import here to avoid circular imports
from backend.blocks import load_all_blocks
print("Loading all blocks...")
all_blocks = load_all_blocks()
print(f"Found {len(all_blocks)} blocks")
# Load existing index for incremental updates
existing_index = None if force_rebuild else load_existing_index(output_path)
existing_blocks: dict[str, dict[str, Any]] = {}
if existing_index:
print(
f"Loaded existing index with {len(existing_index.get('blocks', []))} blocks"
)
for block in existing_index.get("blocks", []):
existing_blocks[block["id"]] = block
# Process each block
blocks_data: list[dict[str, Any]] = []
blocks_needing_embedding: list[tuple[int, str]] = [] # (index, searchable_text)
for block_id, block_cls in all_blocks.items():
try:
block = block_cls()
# Skip disabled blocks
if block.disabled:
continue
searchable_text = build_searchable_text(block)
content_hash = compute_content_hash(searchable_text)
block_data = {
"id": block.id,
"name": block.name,
"description": block.description,
"categories": [cat.name for cat in block.categories],
"searchable_text": searchable_text,
"content_hash": content_hash,
"emb": None, # Will be filled later
}
# Check if we can reuse existing embedding
if (
block.id in existing_blocks
and existing_blocks[block.id].get("content_hash") == content_hash
and existing_blocks[block.id].get("emb")
):
# Reuse existing embedding
block_data["emb"] = existing_blocks[block.id]["emb"]
else:
# Need new embedding
blocks_needing_embedding.append((len(blocks_data), searchable_text))
blocks_data.append(block_data)
except Exception as e:
logger.warning(f"Failed to process block {block_id}: {e}")
continue
print(f"Processed {len(blocks_data)} blocks")
print(f"Blocks needing new embeddings: {len(blocks_needing_embedding)}")
# Create embeddings for new/changed blocks
if blocks_needing_embedding and HAS_OPENAI:
texts_to_embed = [text for _, text in blocks_needing_embedding]
try:
embeddings = create_embeddings(texts_to_embed)
# Assign embeddings to blocks
for i, (block_idx, _) in enumerate(blocks_needing_embedding):
emb = embeddings[i].astype(np.float32)
# Encode as base64
blocks_data[block_idx]["emb"] = base64.b64encode(emb.tobytes()).decode(
"ascii"
)
except Exception as e:
print(f"Warning: Failed to create embeddings: {e}")
elif blocks_needing_embedding:
print(
"Warning: Cannot create embeddings (openai not installed or OPENAI_API_KEY not set)"
)
# Build BM25 data
print("Building BM25 index...")
bm25_data = build_bm25_data(blocks_data)
# Build name index
print("Building name index...")
name_index = build_name_index(blocks_data)
# Build final index
index = {
"version": "1.0.0",
"embedding_model": DEFAULT_EMBEDDING_MODEL,
"embedding_dim": DEFAULT_EMBEDDING_DIM,
"generated_at": datetime.now(timezone.utc).isoformat(),
"blocks": blocks_data,
"bm25": bm25_data,
"name_index": name_index,
}
# Save index
print(f"Saving index to {output_path}...")
with open(output_path, "w", encoding="utf-8") as f:
json.dump(index, f, separators=(",", ":"))
size_kb = output_path.stat().st_size / 1024
print(f"Index saved ({size_kb:.1f} KB)")
# Print statistics
print("\nIndex Statistics:")
print(f" Blocks indexed: {len(blocks_data)}")
print(f" BM25 vocabulary size: {len(bm25_data['df'])}")
print(f" Name index terms: {len(name_index)}")
print(f" Embeddings: {'Yes' if any(b.get('emb') for b in blocks_data) else 'No'}")
return index
def main():
parser = argparse.ArgumentParser(description="Build hybrid search index for blocks")
parser.add_argument(
"--force",
action="store_true",
help="Force rebuild all embeddings even if unchanged",
)
parser.add_argument(
"--output",
type=Path,
default=INDEX_PATH,
help=f"Output index file path (default: {INDEX_PATH})",
)
args = parser.parse_args()
try:
build_block_index(
force_rebuild=args.force,
output_path=args.output,
)
except Exception as e:
print(f"Error building index: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,460 @@
"""
Block Hybrid Search
Combines multiple ranking signals for block search:
- Semantic search (OpenAI embeddings + cosine similarity)
- Lexical search (BM25)
- Name matching (boost for block name matches)
- Category matching (boost for category matches)
Based on the docs search implementation.
"""
import base64
import json
import logging
import math
import os
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Optional
import numpy as np
logger = logging.getLogger(__name__)
# OpenAI embedding model
EMBEDDING_MODEL = "text-embedding-3-small"
# Path to the JSON index file
INDEX_PATH = Path(__file__).parent / "blocks_index.json"
# Stopwords for tokenization (same as index_blocks.py)
STOPWORDS = {
"the",
"a",
"an",
"is",
"are",
"was",
"were",
"be",
"been",
"being",
"have",
"has",
"had",
"do",
"does",
"did",
"will",
"would",
"could",
"should",
"may",
"might",
"must",
"shall",
"can",
"need",
"dare",
"ought",
"used",
"to",
"of",
"in",
"for",
"on",
"with",
"at",
"by",
"from",
"as",
"into",
"through",
"during",
"before",
"after",
"above",
"below",
"between",
"under",
"again",
"further",
"then",
"once",
"and",
"but",
"or",
"nor",
"so",
"yet",
"both",
"either",
"neither",
"not",
"only",
"own",
"same",
"than",
"too",
"very",
"just",
"also",
"now",
"here",
"there",
"when",
"where",
"why",
"how",
"all",
"each",
"every",
"few",
"more",
"most",
"other",
"some",
"such",
"no",
"any",
"this",
"that",
"these",
"those",
"it",
"its",
"block",
}
def tokenize(text: str) -> list[str]:
"""Simple tokenizer for search."""
text = text.lower()
# Remove code blocks if any
text = re.sub(r"```[\s\S]*?```", "", text)
text = re.sub(r"`[^`]+`", "", text)
# Split camelCase
text = re.sub(r"([a-z])([A-Z])", r"\1 \2", text)
# Extract words
words = re.findall(r"\b[a-z][a-z0-9_-]*\b", text)
# Remove very short words and stopwords
return [w for w in words if len(w) > 2 and w not in STOPWORDS]
@dataclass
class SearchWeights:
"""Configuration for hybrid search signal weights."""
semantic: float = 0.40 # Embedding similarity
bm25: float = 0.25 # Lexical matching
name_match: float = 0.25 # Block name matches
category_match: float = 0.10 # Category matches
@dataclass
class BlockSearchResult:
"""A single block search result."""
block_id: str
name: str
description: str
categories: list[str]
score: float
# Individual signal scores (for debugging)
semantic_score: float = 0.0
bm25_score: float = 0.0
name_score: float = 0.0
category_score: float = 0.0
class BlockSearchIndex:
"""Hybrid search index for blocks combining BM25 + embeddings."""
def __init__(self, index_path: Path = INDEX_PATH):
self.blocks: list[dict[str, Any]] = []
self.bm25_data: dict[str, Any] = {}
self.name_index: dict[str, list[list[int | float]]] = {}
self.embeddings: Optional[np.ndarray] = None
self.normalized_embeddings: Optional[np.ndarray] = None
self._loaded = False
self._index_path = index_path
self._embedding_model: Any = None
def load(self) -> bool:
"""Load the index from JSON file."""
if self._loaded:
return True
if not self._index_path.exists():
logger.warning(f"Block index not found at {self._index_path}")
return False
try:
with open(self._index_path, "r", encoding="utf-8") as f:
data = json.load(f)
self.blocks = data.get("blocks", [])
self.bm25_data = data.get("bm25", {})
self.name_index = data.get("name_index", {})
# Decode embeddings from base64
embeddings_list = []
for block in self.blocks:
if block.get("emb"):
emb_bytes = base64.b64decode(block["emb"])
emb = np.frombuffer(emb_bytes, dtype=np.float32)
embeddings_list.append(emb)
else:
# No embedding, use zeros
dim = data.get("embedding_dim", 384)
embeddings_list.append(np.zeros(dim, dtype=np.float32))
if embeddings_list:
self.embeddings = np.stack(embeddings_list)
# Precompute normalized embeddings for cosine similarity
norms = np.linalg.norm(self.embeddings, axis=1, keepdims=True)
self.normalized_embeddings = self.embeddings / (norms + 1e-10)
self._loaded = True
logger.info(f"Loaded block index with {len(self.blocks)} blocks")
return True
except Exception as e:
logger.error(f"Failed to load block index: {e}")
return False
def _get_openai_client(self) -> Any:
"""Get OpenAI client for query embedding."""
if self._embedding_model is None:
try:
from openai import OpenAI
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
logger.warning("OPENAI_API_KEY not set")
return None
self._embedding_model = OpenAI(api_key=api_key)
except ImportError:
logger.warning("openai not installed")
return None
return self._embedding_model
def _embed_query(self, query: str) -> Optional[np.ndarray]:
"""Embed the search query using OpenAI."""
client = self._get_openai_client()
if client is None:
return None
try:
response = client.embeddings.create(
model=EMBEDDING_MODEL,
input=query,
)
embedding = response.data[0].embedding
return np.array(embedding, dtype=np.float32)
except Exception as e:
logger.warning(f"Failed to embed query: {e}")
return None
def _compute_semantic_scores(self, query_embedding: np.ndarray) -> np.ndarray:
"""Compute cosine similarity between query and all blocks."""
if self.normalized_embeddings is None:
return np.zeros(len(self.blocks))
# Normalize query embedding
query_norm = query_embedding / (np.linalg.norm(query_embedding) + 1e-10)
# Cosine similarity via dot product
similarities = self.normalized_embeddings @ query_norm
# Scale to [0, 1] (cosine ranges from -1 to 1)
return (similarities + 1) / 2
def _compute_bm25_scores(self, query_tokens: list[str]) -> np.ndarray:
"""Compute BM25 scores for all blocks."""
scores = np.zeros(len(self.blocks))
if not self.bm25_data or not query_tokens:
return scores
# BM25 parameters
k1 = 1.5
b = 0.75
n_docs = self.bm25_data.get("n_docs", len(self.blocks))
avgdl = self.bm25_data.get("avgdl", 100)
df = self.bm25_data.get("df", {})
doc_lens = self.bm25_data.get("doc_lens", [100] * len(self.blocks))
for i, block in enumerate(self.blocks):
# Tokenize block's searchable text
block_tokens = tokenize(block.get("searchable_text", ""))
doc_len = doc_lens[i] if i < len(doc_lens) else len(block_tokens)
# Calculate BM25 score
score = 0.0
for token in query_tokens:
if token not in df:
continue
# Term frequency in this document
tf = block_tokens.count(token)
if tf == 0:
continue
# IDF
doc_freq = df.get(token, 0)
idf = math.log((n_docs - doc_freq + 0.5) / (doc_freq + 0.5) + 1)
# BM25 score component
numerator = tf * (k1 + 1)
denominator = tf + k1 * (1 - b + b * doc_len / avgdl)
score += idf * numerator / denominator
scores[i] = score
# Normalize to [0, 1]
max_score = scores.max()
if max_score > 0:
scores = scores / max_score
return scores
def _compute_name_scores(self, query_tokens: list[str]) -> np.ndarray:
"""Compute name match scores using the name index."""
scores = np.zeros(len(self.blocks))
if not self.name_index or not query_tokens:
return scores
for token in query_tokens:
if token in self.name_index:
for block_idx, weight in self.name_index[token]:
if block_idx < len(scores):
scores[int(block_idx)] += weight
# Also check for partial matches in block names
for i, block in enumerate(self.blocks):
name_lower = block.get("name", "").lower()
for token in query_tokens:
if token in name_lower:
scores[i] += 0.5
# Normalize to [0, 1]
max_score = scores.max()
if max_score > 0:
scores = scores / max_score
return scores
def _compute_category_scores(self, query_tokens: list[str]) -> np.ndarray:
"""Compute category match scores."""
scores = np.zeros(len(self.blocks))
if not query_tokens:
return scores
for i, block in enumerate(self.blocks):
categories = block.get("categories", [])
category_text = " ".join(categories).lower()
for token in query_tokens:
if token in category_text:
scores[i] += 1.0
# Normalize to [0, 1]
max_score = scores.max()
if max_score > 0:
scores = scores / max_score
return scores
def search(
self,
query: str,
top_k: int = 10,
weights: Optional[SearchWeights] = None,
) -> list[BlockSearchResult]:
"""
Perform hybrid search combining multiple signals.
Args:
query: Search query string
top_k: Number of results to return
weights: Optional custom weights for signals
Returns:
List of BlockSearchResult sorted by score
"""
if not self._loaded and not self.load():
return []
if weights is None:
weights = SearchWeights()
# Tokenize query
query_tokens = tokenize(query)
if not query_tokens:
# Fallback: try raw query words
query_tokens = query.lower().split()
# Compute semantic scores
semantic_scores = np.zeros(len(self.blocks))
if self.normalized_embeddings is not None:
query_embedding = self._embed_query(query)
if query_embedding is not None:
semantic_scores = self._compute_semantic_scores(query_embedding)
# Compute other scores
bm25_scores = self._compute_bm25_scores(query_tokens)
name_scores = self._compute_name_scores(query_tokens)
category_scores = self._compute_category_scores(query_tokens)
# Combine scores using weights
combined_scores = (
weights.semantic * semantic_scores
+ weights.bm25 * bm25_scores
+ weights.name_match * name_scores
+ weights.category_match * category_scores
)
# Get top-k indices
top_indices = np.argsort(combined_scores)[::-1][:top_k]
# Build results
results = []
for idx in top_indices:
if combined_scores[idx] <= 0:
continue
block = self.blocks[idx]
results.append(
BlockSearchResult(
block_id=block["id"],
name=block["name"],
description=block["description"],
categories=block.get("categories", []),
score=float(combined_scores[idx]),
semantic_score=float(semantic_scores[idx]),
bm25_score=float(bm25_scores[idx]),
name_score=float(name_scores[idx]),
category_score=float(category_scores[idx]),
)
)
return results
# Global index instance (lazy loaded)
_block_search_index: Optional[BlockSearchIndex] = None
def get_block_search_index() -> BlockSearchIndex:
"""Get or create the block search index singleton."""
global _block_search_index
if _block_search_index is None:
_block_search_index = BlockSearchIndex(INDEX_PATH)
return _block_search_index