fix(backend): address Pwuts review feedback on block search PR

- Use AnyBlockSchema instead of Block[Any, Any] for better type inference
- Remove MappingProxyType wrapper from _get_enabled_blocks (unnecessary complexity)
- Move split_camelcase to backend/util/text.py (proper shared location)
- Add camelcase exception list (YouTube, OpenAI, AutoGPT, GitHub, LinkedIn)
- Revert logger formatting from %s back to f-strings (readability)
- Log warning when block.name is not set (error condition)
This commit is contained in:
Zamil Majdy
2026-03-17 02:24:01 +07:00
parent c87e954644
commit a08673cdbe
6 changed files with 114 additions and 115 deletions

View File

@@ -11,7 +11,6 @@ import asyncio
import functools
import itertools
import logging
import types
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
@@ -19,13 +18,13 @@ from typing import TYPE_CHECKING, Any, get_args, get_origin
from prisma.enums import ContentType
from backend.api.features.store.text_utils import split_camelcase
from backend.blocks import get_blocks
from backend.blocks.llm import LlmModel
from backend.data.db import query_raw_with_schema
from backend.util.text import split_camelcase
if TYPE_CHECKING:
from backend.blocks._base import Block
from backend.blocks._base import AnyBlockSchema
logger = logging.getLogger(__name__)
@@ -166,38 +165,25 @@ class StoreAgentHandler(ContentHandler):
@functools.lru_cache(maxsize=1)
def _get_enabled_blocks() -> types.MappingProxyType[str, Block[Any, Any]]:
def _get_enabled_blocks() -> dict[str, AnyBlockSchema]:
"""Return ``{block_id: block_instance}`` for all enabled, instantiable blocks.
Disabled blocks and blocks that fail to instantiate are silently skipped
(with a warning log), so callers never need their own try/except loop.
**Caching:** Results are cached for the lifetime of the process via
``lru_cache``. This is intentional — blocks are registered at import time
and their set never changes while the process is running. The returned
mapping is wrapped in ``MappingProxyType`` so callers cannot accidentally
mutate the cached data.
The cached block instances are lightweight singletons (no heavy resources
or open connections), so pinning them for the process lifetime is fine.
.. note::
No TTL is needed here, unlike the upstream ``get_blocks()`` which uses
a 1-hour TTL because it queries DB-backed block metadata. This layer
only caches in-memory class instances that are defined at import time
and are immutable for the life of the process.
Results are cached for the process lifetime via ``lru_cache`` because
blocks are registered at import time and never change while running.
"""
enabled: dict[str, Block[Any, Any]] = {}
enabled: dict[str, AnyBlockSchema] = {}
for block_id, block_cls in get_blocks().items():
try:
instance = block_cls()
except Exception as e:
logger.warning("Skipping block %s: init failed: %s", block_id, e)
logger.warning(f"Skipping block {block_id}: init failed: {e}")
continue
if not instance.disabled:
enabled[block_id] = instance
return types.MappingProxyType(enabled)
return enabled
class BlockHandler(ContentHandler):
@@ -239,6 +225,10 @@ class BlockHandler(ContentHandler):
for block_id, block in itertools.islice(missing, batch_size):
try:
# Build searchable text from block metadata
if not block.name:
logger.warning(
f"Block {block_id} has no name — using block_id as fallback"
)
display_name = split_camelcase(block.name) if block.name else ""
parts = []
if display_name:
@@ -292,7 +282,7 @@ class BlockHandler(ContentHandler):
)
)
except Exception as e:
logger.warning("Failed to process block %s: %s", block_id, e)
logger.warning(f"Failed to process block {block_id}: {e}")
continue
return items
@@ -377,7 +367,7 @@ class DocumentationHandler(ContentHandler):
# If no title found, use filename
return file_path.stem.replace("-", " ").replace("_", " ").title()
except Exception as e:
logger.warning("Failed to read title from %s: %s", file_path, e)
logger.warning(f"Failed to read title from {file_path}: {e}")
return file_path.stem.replace("-", " ").replace("_", " ").title()
def _chunk_markdown_by_headings(
@@ -397,7 +387,7 @@ class DocumentationHandler(ContentHandler):
try:
content = file_path.read_text(encoding="utf-8")
except Exception as e:
logger.warning("Failed to read %s: %s", file_path, e)
logger.warning(f"Failed to read {file_path}: {e}")
return []
lines = content.split("\n")
@@ -522,7 +512,7 @@ class DocumentationHandler(ContentHandler):
docs_root = self._get_docs_root()
if not docs_root.exists():
logger.warning("Documentation root not found: %s", docs_root)
logger.warning(f"Documentation root not found: {docs_root}")
return []
# Find all .md and .mdx files
@@ -598,7 +588,7 @@ class DocumentationHandler(ContentHandler):
)
)
except Exception as e:
logger.warning("Failed to process section %s: %s", content_id, e)
logger.warning(f"Failed to process section {content_id}: {e}")
continue
return items

View File

@@ -2,9 +2,7 @@
Tests for content handlers (blocks, store agents, documentation).
"""
import types
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
@@ -102,19 +100,6 @@ def test_get_enabled_blocks_cached():
mock_get_blocks.assert_called_once()
def test_get_enabled_blocks_returns_immutable_mapping():
"""The returned mapping is a MappingProxyType — mutation raises TypeError."""
blocks = {"b1": _make_block_class(name="B1")}
with patch(
"backend.api.features.store.content_handlers.get_blocks", return_value=blocks
):
result = _get_enabled_blocks()
assert isinstance(result, types.MappingProxyType)
with pytest.raises(TypeError):
result_any: Any = result
result_any["new_key"] = object()
# ---------------------------------------------------------------------------
# StoreAgentHandler
# ---------------------------------------------------------------------------

View File

@@ -190,9 +190,8 @@ async def unified_hybrid_search(
query_embedding = await embed_query(query)
except Exception as e:
logger.warning(
"Failed to generate query embedding - falling back to lexical-only search: %s. "
"Check that openai_internal_api_key is configured and OpenAI API is accessible.",
e,
f"Failed to generate query embedding - falling back to lexical-only search: {e}. "
"Check that openai_internal_api_key is configured and OpenAI API is accessible."
)
query_embedding = [0.0] * EMBEDDING_DIM
# Redistribute semantic weight to lexical
@@ -383,7 +382,7 @@ async def unified_hybrid_search(
for result in results:
result.pop("total_count", None)
logger.info("Unified hybrid search: %d results, %d total", len(results), total)
logger.info(f"Unified hybrid search: {len(results)} results, {total} total")
return results, total
@@ -472,8 +471,7 @@ async def hybrid_search(
query_embedding = await embed_query(query)
except Exception as e:
logger.warning(
"Failed to generate query embedding - falling back to lexical-only search: %s",
e,
f"Failed to generate query embedding - falling back to lexical-only search: {e}"
)
query_embedding = [0.0] * EMBEDDING_DIM
total_non_semantic = (
@@ -713,9 +711,7 @@ async def hybrid_search(
result.pop("total_count", None)
result.pop("searchable_text", None)
logger.info(
"Hybrid search (store agents): %d results, %d total", len(results), total
)
logger.info(f"Hybrid search (store agents): {len(results)} results, {total} total")
return results, total
@@ -799,20 +795,15 @@ async def _log_vector_error_diagnostics(error: Exception) -> None:
diagnostics["pgvector_extension"] = f"Error: {e}"
logger.error(
"Vector type error diagnostics:\n"
" Error: %s\n"
" search_path: %s\n"
" current_schema: %s\n"
" user_info: %s\n"
" pgvector_extension: %s",
error,
diagnostics.get("search_path"),
diagnostics.get("current_schema"),
diagnostics.get("user_info"),
diagnostics.get("pgvector_extension"),
f"Vector type error diagnostics:\n"
f" Error: {error}\n"
f" search_path: {diagnostics.get('search_path')}\n"
f" current_schema: {diagnostics.get('current_schema')}\n"
f" user_info: {diagnostics.get('user_info')}\n"
f" pgvector_extension: {diagnostics.get('pgvector_extension')}"
)
except Exception as diag_error:
logger.error("Failed to collect vector error diagnostics: %s", diag_error)
logger.error(f"Failed to collect vector error diagnostics: {diag_error}")
# Backward compatibility alias - HybridSearchWeights maps to StoreAgentSearchWeights

View File

@@ -1,52 +1,5 @@
"""Small text helpers shared across store search modules."""
"""Backward-compatibility shim — ``split_camelcase`` now lives in backend.util.text."""
from backend.util.text import split_camelcase # noqa: F401
def split_camelcase(text: str) -> str:
"""Split CamelCase into separate words.
Uses a single-pass character-by-character algorithm to avoid any
regex backtracking concerns (guaranteed O(n) time).
.. note::
Only CamelCase boundaries are detected. Underscores, hyphens, and
other non-alpha separators are left as-is (e.g. ``"HTTP_Request"``
is **not** converted to ``"HTTP Request"``).
Single-letter uppercase prefixes are not split from the following
word (e.g. ``"ABlock"`` stays ``"ABlock"``). This matches the
original regex behaviour ``([A-Z]{2,})([A-Z][a-z])``.
Examples::
>>> split_camelcase("AITextGeneratorBlock")
'AI Text Generator Block'
>>> split_camelcase("OAuth2Block")
'OAuth2 Block'
"""
if len(text) <= 1:
return text
parts: list[str] = []
prev = 0
for i in range(1, len(text)):
# Insert split between lowercase/digit and uppercase: "camelCase" -> "camel|Case"
if (text[i - 1].islower() or text[i - 1].isdigit()) and text[i].isupper():
parts.append(text[prev:i])
prev = i
# Insert split between uppercase run (2+ chars) and uppercase+lowercase:
# "AIText" -> "AI|Text". Requires at least 3 consecutive uppercase chars
# before the lowercase so that the left part keeps 2+ uppercase chars
# (mirrors the original regex r"([A-Z]{2,})([A-Z][a-z])").
elif (
i >= 2
and text[i - 2].isupper()
and text[i - 1].isupper()
and text[i].islower()
and (i - 1 - prev) >= 2 # left part must retain at least 2 upper chars
):
parts.append(text[prev : i - 1])
prev = i - 1
parts.append(text[prev:])
return " ".join(parts)
__all__ = ["split_camelcase"]

View File

@@ -1,8 +1,8 @@
"""Tests for text_utils helpers."""
"""Tests for split_camelcase (now in backend.util.text)."""
import pytest
from backend.api.features.store.text_utils import split_camelcase
from backend.util.text import split_camelcase
# ---------------------------------------------------------------------------
# split_camelcase
@@ -37,6 +37,12 @@ from backend.api.features.store.text_utils import split_camelcase
# char so the uppercase-run rule cannot fire, producing "3 DRenderer"
# rather than the ideal "3D Renderer".
("3DRenderer", "3 DRenderer"),
# Exception list — compound terms that should stay together
("YouTubeBlock", "YouTube Block"),
("OpenAIBlock", "OpenAI Block"),
("AutoGPTAgent", "AutoGPT Agent"),
("GitHubIntegration", "GitHub Integration"),
("LinkedInBlock", "LinkedIn Block"),
],
)
def test_split_camelcase(input_text: str, expected: str):

View File

@@ -1,4 +1,5 @@
import logging
import re
import bleach
from bleach.css_sanitizer import CSSSanitizer
@@ -154,3 +155,76 @@ class TextFormatter:
)
return rendered_subject_template, rendered_base_template
# ---------------------------------------------------------------------------
# CamelCase splitting
# ---------------------------------------------------------------------------
# Map of split forms back to their canonical compound terms.
# Mirrors the frontend exception list in frontend/src/lib/utils.ts.
_CAMELCASE_EXCEPTIONS: dict[str, str] = {
"Auto GPT": "AutoGPT",
"Open AI": "OpenAI",
"You Tube": "YouTube",
"Git Hub": "GitHub",
"Linked In": "LinkedIn",
}
_CAMELCASE_EXCEPTION_RE = re.compile(
"|".join(re.escape(k) for k in _CAMELCASE_EXCEPTIONS),
)
def split_camelcase(text: str) -> str:
"""Split CamelCase into separate words.
Uses a single-pass character-by-character algorithm to avoid any
regex backtracking concerns (guaranteed O(n) time).
After splitting, known compound terms are restored via an exception
list (e.g. ``"YouTube"`` stays ``"YouTube"`` instead of becoming
``"You Tube"``). The list mirrors the frontend mapping in
``frontend/src/lib/utils.ts``.
Examples::
>>> split_camelcase("AITextGeneratorBlock")
'AI Text Generator Block'
>>> split_camelcase("OAuth2Block")
'OAuth2 Block'
>>> split_camelcase("YouTubeBlock")
'YouTube Block'
"""
if len(text) <= 1:
return text
parts: list[str] = []
prev = 0
for i in range(1, len(text)):
# Insert split between lowercase/digit and uppercase: "camelCase" -> "camel|Case"
if (text[i - 1].islower() or text[i - 1].isdigit()) and text[i].isupper():
parts.append(text[prev:i])
prev = i
# Insert split between uppercase run (2+ chars) and uppercase+lowercase:
# "AIText" -> "AI|Text". Requires at least 3 consecutive uppercase chars
# before the lowercase so that the left part keeps 2+ uppercase chars
# (mirrors the original regex r"([A-Z]{2,})([A-Z][a-z])").
elif (
i >= 2
and text[i - 2].isupper()
and text[i - 1].isupper()
and text[i].islower()
and (i - 1 - prev) >= 2 # left part must retain at least 2 upper chars
):
parts.append(text[prev : i - 1])
prev = i - 1
parts.append(text[prev:])
result = " ".join(parts)
# Restore known compound terms that should not be split.
result = _CAMELCASE_EXCEPTION_RE.sub(
lambda m: _CAMELCASE_EXCEPTIONS[m.group()], result
)
return result