mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-01-21 04:57:58 -05:00
Compare commits
4 Commits
dev
...
kpczerwins
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
af85c4a3a9 | ||
|
|
43794c71fa | ||
|
|
259eff725e | ||
|
|
9577b93576 |
@@ -1,16 +1,17 @@
|
||||
import logging
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from difflib import SequenceMatcher
|
||||
from typing import Sequence
|
||||
|
||||
import prisma
|
||||
from prisma.enums import ContentType
|
||||
|
||||
import backend.api.features.library.db as library_db
|
||||
import backend.api.features.library.model as library_model
|
||||
import backend.api.features.store.db as store_db
|
||||
import backend.api.features.store.model as store_model
|
||||
import backend.data.block
|
||||
from backend.api.features.store.hybrid_search import unified_hybrid_search
|
||||
from backend.blocks import load_all_blocks
|
||||
from backend.blocks.llm import LlmModel
|
||||
from backend.data.block import AnyBlockSchema, BlockCategory, BlockInfo, BlockSchema
|
||||
@@ -37,6 +38,14 @@ MAX_LIBRARY_AGENT_RESULTS = 100
|
||||
MAX_MARKETPLACE_AGENT_RESULTS = 100
|
||||
MIN_SCORE_FOR_FILTERED_RESULTS = 10.0
|
||||
|
||||
# Boost blocks over marketplace agents in search results
|
||||
BLOCK_SCORE_BOOST = 50.0
|
||||
|
||||
# Block IDs to exclude from search results
|
||||
EXCLUDED_BLOCK_IDS = frozenset({
|
||||
"e189baac-8c20-45a1-94a7-55177ea42565", # AgentExecutorBlock
|
||||
})
|
||||
|
||||
SearchResultItem = BlockInfo | library_model.LibraryAgent | store_model.StoreAgent
|
||||
|
||||
|
||||
@@ -250,14 +259,25 @@ async def _build_cached_search_results(
|
||||
"my_agents": 0,
|
||||
}
|
||||
|
||||
block_results, block_total, integration_total = _collect_block_results(
|
||||
normalized_query=normalized_query,
|
||||
include_blocks=include_blocks,
|
||||
include_integrations=include_integrations,
|
||||
)
|
||||
scored_items.extend(block_results)
|
||||
total_items["blocks"] = block_total
|
||||
total_items["integrations"] = integration_total
|
||||
# Use hybrid search when query is present, otherwise list all blocks
|
||||
if (include_blocks or include_integrations) and normalized_query:
|
||||
block_results, block_total, integration_total = await _hybrid_search_blocks(
|
||||
query=search_query,
|
||||
include_blocks=include_blocks,
|
||||
include_integrations=include_integrations,
|
||||
)
|
||||
scored_items.extend(block_results)
|
||||
total_items["blocks"] = block_total
|
||||
total_items["integrations"] = integration_total
|
||||
elif include_blocks or include_integrations:
|
||||
# No query - list all blocks using in-memory approach
|
||||
block_results, block_total, integration_total = _collect_block_results(
|
||||
include_blocks=include_blocks,
|
||||
include_integrations=include_integrations,
|
||||
)
|
||||
scored_items.extend(block_results)
|
||||
total_items["blocks"] = block_total
|
||||
total_items["integrations"] = integration_total
|
||||
|
||||
if include_library_agents:
|
||||
library_response = await library_db.list_library_agents(
|
||||
@@ -302,10 +322,14 @@ async def _build_cached_search_results(
|
||||
|
||||
def _collect_block_results(
|
||||
*,
|
||||
normalized_query: str,
|
||||
include_blocks: bool,
|
||||
include_integrations: bool,
|
||||
) -> tuple[list[_ScoredItem], int, int]:
|
||||
"""
|
||||
Collect all blocks for listing (no search query).
|
||||
|
||||
All blocks get BLOCK_SCORE_BOOST to prioritize them over marketplace agents.
|
||||
"""
|
||||
results: list[_ScoredItem] = []
|
||||
block_count = 0
|
||||
integration_count = 0
|
||||
@@ -318,6 +342,10 @@ def _collect_block_results(
|
||||
if block.disabled:
|
||||
continue
|
||||
|
||||
# Skip excluded blocks
|
||||
if block.id in EXCLUDED_BLOCK_IDS:
|
||||
continue
|
||||
|
||||
block_info = block.get_info()
|
||||
credentials = list(block.input_schema.get_credentials_fields().values())
|
||||
is_integration = len(credentials) > 0
|
||||
@@ -327,10 +355,6 @@ def _collect_block_results(
|
||||
if not is_integration and not include_blocks:
|
||||
continue
|
||||
|
||||
score = _score_block(block, block_info, normalized_query)
|
||||
if not _should_include_item(score, normalized_query):
|
||||
continue
|
||||
|
||||
filter_type: FilterType = "integrations" if is_integration else "blocks"
|
||||
if is_integration:
|
||||
integration_count += 1
|
||||
@@ -341,8 +365,116 @@ def _collect_block_results(
|
||||
_ScoredItem(
|
||||
item=block_info,
|
||||
filter_type=filter_type,
|
||||
score=score,
|
||||
sort_key=_get_item_name(block_info),
|
||||
score=BLOCK_SCORE_BOOST,
|
||||
sort_key=block_info.name.lower(),
|
||||
)
|
||||
)
|
||||
|
||||
return results, block_count, integration_count
|
||||
|
||||
|
||||
async def _hybrid_search_blocks(
|
||||
*,
|
||||
query: str,
|
||||
include_blocks: bool,
|
||||
include_integrations: bool,
|
||||
) -> tuple[list[_ScoredItem], int, int]:
|
||||
"""
|
||||
Search blocks using hybrid search with builder-specific filtering.
|
||||
|
||||
Uses unified_hybrid_search for semantic + lexical search, then applies
|
||||
post-filtering for block/integration types and LLM model bonus scoring.
|
||||
|
||||
Args:
|
||||
query: The search query string
|
||||
include_blocks: Whether to include regular blocks
|
||||
include_integrations: Whether to include integration blocks
|
||||
|
||||
Returns:
|
||||
Tuple of (scored_items, block_count, integration_count)
|
||||
"""
|
||||
results: list[_ScoredItem] = []
|
||||
block_count = 0
|
||||
integration_count = 0
|
||||
|
||||
if not include_blocks and not include_integrations:
|
||||
return results, block_count, integration_count
|
||||
|
||||
normalized_query = query.strip().lower()
|
||||
|
||||
# Fetch more results to account for post-filtering
|
||||
search_results, _ = await unified_hybrid_search(
|
||||
query=query,
|
||||
content_types=[ContentType.BLOCK],
|
||||
page=1,
|
||||
page_size=150,
|
||||
min_score=0.10,
|
||||
)
|
||||
|
||||
# Load all blocks for getting BlockInfo
|
||||
all_blocks = load_all_blocks()
|
||||
|
||||
for result in search_results:
|
||||
block_id = result["content_id"]
|
||||
|
||||
# Skip excluded blocks
|
||||
if block_id in EXCLUDED_BLOCK_IDS:
|
||||
continue
|
||||
|
||||
metadata = result.get("metadata", {})
|
||||
hybrid_score = result.get("relevance", 0.0)
|
||||
|
||||
# Get the actual block class
|
||||
if block_id not in all_blocks:
|
||||
continue
|
||||
|
||||
block_cls = all_blocks[block_id]
|
||||
block: AnyBlockSchema = block_cls()
|
||||
|
||||
if block.disabled:
|
||||
continue
|
||||
|
||||
# Check block/integration filter using metadata
|
||||
is_integration = metadata.get("is_integration", False)
|
||||
|
||||
if is_integration and not include_integrations:
|
||||
continue
|
||||
if not is_integration and not include_blocks:
|
||||
continue
|
||||
|
||||
# Get block info
|
||||
block_info = block.get_info()
|
||||
|
||||
# Calculate final score: scale hybrid score and add builder-specific bonuses
|
||||
# Hybrid scores are 0-1, builder scores were 0-200+
|
||||
# Add BLOCK_SCORE_BOOST to prioritize blocks over marketplace agents
|
||||
final_score = hybrid_score * 100 + BLOCK_SCORE_BOOST
|
||||
|
||||
# Add LLM model match bonus
|
||||
has_llm_field = metadata.get("has_llm_model_field", False)
|
||||
if has_llm_field and _matches_llm_model(block.input_schema, normalized_query):
|
||||
final_score += 20
|
||||
|
||||
# Add exact/prefix match bonus for deterministic tie-breaking
|
||||
name = block_info.name.lower()
|
||||
if name == normalized_query:
|
||||
final_score += 30
|
||||
elif name.startswith(normalized_query):
|
||||
final_score += 15
|
||||
|
||||
# Track counts
|
||||
filter_type: FilterType = "integrations" if is_integration else "blocks"
|
||||
if is_integration:
|
||||
integration_count += 1
|
||||
else:
|
||||
block_count += 1
|
||||
|
||||
results.append(
|
||||
_ScoredItem(
|
||||
item=block_info,
|
||||
filter_type=filter_type,
|
||||
score=final_score,
|
||||
sort_key=name,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -502,38 +634,6 @@ def _matches_llm_model(schema_cls: type[BlockSchema], query: str) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
def _score_block(
|
||||
block: AnyBlockSchema,
|
||||
block_info: BlockInfo,
|
||||
normalized_query: str,
|
||||
) -> float:
|
||||
if not normalized_query:
|
||||
return 0.0
|
||||
|
||||
name = block_info.name.lower()
|
||||
description = block_info.description.lower()
|
||||
score = _score_primary_fields(name, description, normalized_query)
|
||||
|
||||
category_text = " ".join(
|
||||
category.get("category", "").lower() for category in block_info.categories
|
||||
)
|
||||
score += _score_additional_field(category_text, normalized_query, 12, 6)
|
||||
|
||||
credentials_info = block.input_schema.get_credentials_fields_info().values()
|
||||
provider_names = [
|
||||
provider.value.lower()
|
||||
for info in credentials_info
|
||||
for provider in info.provider
|
||||
]
|
||||
provider_text = " ".join(provider_names)
|
||||
score += _score_additional_field(provider_text, normalized_query, 15, 6)
|
||||
|
||||
if _matches_llm_model(block.input_schema, normalized_query):
|
||||
score += 20
|
||||
|
||||
return score
|
||||
|
||||
|
||||
def _score_library_agent(
|
||||
agent: library_model.LibraryAgent,
|
||||
normalized_query: str,
|
||||
@@ -640,26 +740,15 @@ def _get_all_providers() -> dict[ProviderName, Provider]:
|
||||
return providers
|
||||
|
||||
|
||||
@cached(ttl_seconds=3600)
|
||||
@cached(ttl_seconds=3600, shared_cache=True)
|
||||
async def get_suggested_blocks(count: int = 5) -> list[BlockInfo]:
|
||||
suggested_blocks = []
|
||||
# Sum the number of executions for each block type
|
||||
# Prisma cannot group by nested relations, so we do a raw query
|
||||
# Calculate the cutoff timestamp
|
||||
timestamp_threshold = datetime.now(timezone.utc) - timedelta(days=30)
|
||||
|
||||
# Query the materialized view for execution counts per block
|
||||
# The view aggregates executions from the last 14 days and is refreshed hourly
|
||||
results = await query_raw_with_schema(
|
||||
"""
|
||||
SELECT
|
||||
agent_node."agentBlockId" AS block_id,
|
||||
COUNT(execution.id) AS execution_count
|
||||
FROM {schema_prefix}"AgentNodeExecution" execution
|
||||
JOIN {schema_prefix}"AgentNode" agent_node ON execution."agentNodeId" = agent_node.id
|
||||
WHERE execution."endedTime" >= $1::timestamp
|
||||
GROUP BY agent_node."agentBlockId"
|
||||
ORDER BY execution_count DESC;
|
||||
""",
|
||||
timestamp_threshold,
|
||||
SELECT block_id, execution_count
|
||||
FROM {schema_prefix}"mv_suggested_blocks";
|
||||
"""
|
||||
)
|
||||
|
||||
# Get the top blocks based on execution count
|
||||
|
||||
@@ -27,7 +27,6 @@ class SearchEntry(BaseModel):
|
||||
|
||||
# Suggestions
|
||||
class SuggestionsResponse(BaseModel):
|
||||
otto_suggestions: list[str]
|
||||
recent_searches: list[SearchEntry]
|
||||
providers: list[ProviderName]
|
||||
top_blocks: list[BlockInfo]
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import logging
|
||||
from typing import Annotated, Sequence
|
||||
from typing import Annotated, Sequence, cast, get_args
|
||||
|
||||
import fastapi
|
||||
from autogpt_libs.auth.dependencies import get_user_id, requires_user
|
||||
@@ -10,6 +10,8 @@ from backend.util.models import Pagination
|
||||
from . import db as builder_db
|
||||
from . import model as builder_model
|
||||
|
||||
VALID_FILTER_VALUES = get_args(builder_model.FilterType)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = fastapi.APIRouter(
|
||||
@@ -49,11 +51,6 @@ async def get_suggestions(
|
||||
Get all suggestions for the Blocks Menu.
|
||||
"""
|
||||
return builder_model.SuggestionsResponse(
|
||||
otto_suggestions=[
|
||||
"What blocks do I need to get started?",
|
||||
"Help me create a list",
|
||||
"Help me feed my data to Google Maps",
|
||||
],
|
||||
recent_searches=await builder_db.get_recent_searches(user_id),
|
||||
providers=[
|
||||
ProviderName.TWITTER,
|
||||
@@ -151,7 +148,7 @@ async def get_providers(
|
||||
async def search(
|
||||
user_id: Annotated[str, fastapi.Security(get_user_id)],
|
||||
search_query: Annotated[str | None, fastapi.Query()] = None,
|
||||
filter: Annotated[list[builder_model.FilterType] | None, fastapi.Query()] = None,
|
||||
filter: Annotated[str | None, fastapi.Query()] = None,
|
||||
search_id: Annotated[str | None, fastapi.Query()] = None,
|
||||
by_creator: Annotated[list[str] | None, fastapi.Query()] = None,
|
||||
page: Annotated[int, fastapi.Query()] = 1,
|
||||
@@ -160,9 +157,20 @@ async def search(
|
||||
"""
|
||||
Search for blocks (including integrations), marketplace agents, and user library agents.
|
||||
"""
|
||||
# If no filters are provided, then we will return all types
|
||||
if not filter:
|
||||
filter = [
|
||||
# Parse and validate filter parameter
|
||||
filters: list[builder_model.FilterType]
|
||||
if filter:
|
||||
filter_values = [f.strip() for f in filter.split(",")]
|
||||
invalid_filters = [f for f in filter_values if f not in VALID_FILTER_VALUES]
|
||||
if invalid_filters:
|
||||
raise fastapi.HTTPException(
|
||||
status_code=400,
|
||||
detail=f"Invalid filter value(s): {', '.join(invalid_filters)}. "
|
||||
f"Valid values are: {', '.join(VALID_FILTER_VALUES)}",
|
||||
)
|
||||
filters = cast(list[builder_model.FilterType], filter_values)
|
||||
else:
|
||||
filters = [
|
||||
"blocks",
|
||||
"integrations",
|
||||
"marketplace_agents",
|
||||
@@ -174,7 +182,7 @@ async def search(
|
||||
cached_results = await builder_db.get_sorted_search_results(
|
||||
user_id=user_id,
|
||||
search_query=search_query,
|
||||
filters=filter,
|
||||
filters=filters,
|
||||
by_creator=by_creator,
|
||||
)
|
||||
|
||||
@@ -196,7 +204,7 @@ async def search(
|
||||
user_id,
|
||||
builder_model.SearchEntry(
|
||||
search_query=search_query,
|
||||
filter=filter,
|
||||
filter=filters,
|
||||
by_creator=by_creator,
|
||||
search_id=search_id,
|
||||
),
|
||||
|
||||
@@ -225,6 +225,28 @@ class BlockHandler(ContentHandler):
|
||||
[cat.value for cat in categories] if categories else []
|
||||
)
|
||||
|
||||
# Extract provider names from credentials fields
|
||||
provider_names: list[str] = []
|
||||
is_integration = False
|
||||
if hasattr(block_instance, "input_schema"):
|
||||
credentials_info = (
|
||||
block_instance.input_schema.get_credentials_fields_info()
|
||||
)
|
||||
is_integration = len(credentials_info) > 0
|
||||
for info in credentials_info.values():
|
||||
for provider in info.provider:
|
||||
provider_names.append(provider.value.lower())
|
||||
|
||||
# Check if block has LlmModel field in input schema
|
||||
has_llm_model_field = False
|
||||
if hasattr(block_instance, "input_schema"):
|
||||
from backend.blocks.llm import LlmModel
|
||||
|
||||
for field in block_instance.input_schema.model_fields.values():
|
||||
if field.annotation == LlmModel:
|
||||
has_llm_model_field = True
|
||||
break
|
||||
|
||||
items.append(
|
||||
ContentItem(
|
||||
content_id=block_id,
|
||||
@@ -233,6 +255,9 @@ class BlockHandler(ContentHandler):
|
||||
metadata={
|
||||
"name": getattr(block_instance, "name", ""),
|
||||
"categories": categories_list,
|
||||
"providers": provider_names,
|
||||
"has_llm_model_field": has_llm_model_field,
|
||||
"is_integration": is_integration,
|
||||
},
|
||||
user_id=None, # Blocks are public
|
||||
)
|
||||
|
||||
@@ -0,0 +1,102 @@
|
||||
-- This migration creates a materialized view for suggested blocks based on execution counts
|
||||
-- The view aggregates execution counts per block for the last 14 days
|
||||
--
|
||||
-- IMPORTANT: For production environments, pg_cron is REQUIRED for automatic refresh
|
||||
-- Prerequisites for production:
|
||||
-- 1. pg_cron extension must be installed: CREATE EXTENSION pg_cron;
|
||||
-- 2. pg_cron must be configured in postgresql.conf:
|
||||
-- shared_preload_libraries = 'pg_cron'
|
||||
-- cron.database_name = 'your_database_name'
|
||||
--
|
||||
-- For development environments without pg_cron:
|
||||
-- The migration will succeed but you must manually refresh views with:
|
||||
-- SELECT refresh_suggested_blocks_view();
|
||||
|
||||
-- Check if pg_cron extension is installed and set a flag
|
||||
DO $$
|
||||
DECLARE
|
||||
has_pg_cron BOOLEAN;
|
||||
BEGIN
|
||||
SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') INTO has_pg_cron;
|
||||
|
||||
IF NOT has_pg_cron THEN
|
||||
RAISE WARNING 'pg_cron extension is not installed!';
|
||||
RAISE WARNING 'Materialized view will be created but WILL NOT refresh automatically.';
|
||||
RAISE WARNING 'For production use, install pg_cron with: CREATE EXTENSION pg_cron;';
|
||||
RAISE WARNING 'For development, manually refresh with: SELECT refresh_suggested_blocks_view();';
|
||||
END IF;
|
||||
|
||||
-- Store the flag for later use in the migration
|
||||
PERFORM set_config('migration.has_pg_cron', has_pg_cron::text, false);
|
||||
END
|
||||
$$;
|
||||
|
||||
-- Create materialized view for suggested blocks based on execution counts in last 14 days
|
||||
-- The 14-day threshold is hardcoded to ensure consistent behavior
|
||||
CREATE MATERIALIZED VIEW IF NOT EXISTS "mv_suggested_blocks" AS
|
||||
SELECT
|
||||
agent_node."agentBlockId" AS block_id,
|
||||
COUNT(execution.id) AS execution_count
|
||||
FROM "AgentNodeExecution" execution
|
||||
JOIN "AgentNode" agent_node ON execution."agentNodeId" = agent_node.id
|
||||
WHERE execution."endedTime" >= (NOW() - INTERVAL '14 days')
|
||||
GROUP BY agent_node."agentBlockId"
|
||||
ORDER BY execution_count DESC;
|
||||
|
||||
-- Create unique index for concurrent refresh support
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS "idx_mv_suggested_blocks_block_id" ON "mv_suggested_blocks"("block_id");
|
||||
|
||||
-- Create refresh function
|
||||
CREATE OR REPLACE FUNCTION refresh_suggested_blocks_view()
|
||||
RETURNS void
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
target_schema text := current_schema();
|
||||
BEGIN
|
||||
-- Use CONCURRENTLY for better performance during refresh
|
||||
REFRESH MATERIALIZED VIEW CONCURRENTLY "mv_suggested_blocks";
|
||||
RAISE NOTICE 'Suggested blocks materialized view refreshed in schema % at %', target_schema, NOW();
|
||||
EXCEPTION
|
||||
WHEN OTHERS THEN
|
||||
-- Fallback to non-concurrent refresh if concurrent fails
|
||||
REFRESH MATERIALIZED VIEW "mv_suggested_blocks";
|
||||
RAISE NOTICE 'Suggested blocks materialized view refreshed (non-concurrent) in schema % at %. Concurrent refresh failed due to: %', target_schema, NOW(), SQLERRM;
|
||||
END;
|
||||
$$;
|
||||
|
||||
-- Schedule automatic refresh every hour (only if pg_cron is available)
|
||||
DO $$
|
||||
DECLARE
|
||||
has_pg_cron BOOLEAN;
|
||||
current_schema_name text := current_schema();
|
||||
old_job_name text;
|
||||
job_name text;
|
||||
BEGIN
|
||||
-- Check if pg_cron extension exists
|
||||
SELECT EXISTS (SELECT 1 FROM pg_extension WHERE extname = 'pg_cron') INTO has_pg_cron;
|
||||
|
||||
IF has_pg_cron THEN
|
||||
job_name := format('refresh-suggested-blocks_%s', current_schema_name);
|
||||
|
||||
-- Try to unschedule existing job (ignore errors if it doesn't exist)
|
||||
BEGIN
|
||||
PERFORM cron.unschedule(job_name);
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
NULL;
|
||||
END;
|
||||
|
||||
-- Schedule the new job to run every hour
|
||||
PERFORM cron.schedule(
|
||||
job_name,
|
||||
'0 * * * *', -- Every hour at minute 0
|
||||
format('SET search_path TO %I; SELECT refresh_suggested_blocks_view();', current_schema_name)
|
||||
);
|
||||
RAISE NOTICE 'Scheduled job %; runs every hour for schema %', job_name, current_schema_name;
|
||||
ELSE
|
||||
RAISE WARNING 'Automatic refresh NOT configured - pg_cron is not available';
|
||||
RAISE WARNING 'You must manually refresh the view with: SELECT refresh_suggested_blocks_view();';
|
||||
RAISE WARNING 'Or install pg_cron for automatic refresh in production';
|
||||
END IF;
|
||||
END;
|
||||
$$;
|
||||
Reference in New Issue
Block a user