Compare commits

..

3 Commits

Author SHA1 Message Date
Bently
e0862e8086 fix: add docs and fix error handling in TextEncoderBlock
- Add encoder_block.md documentation
- Remove error handling that yields undeclared output field
- Match pattern used by TextDecoderBlock
2026-01-28 11:05:03 +00:00
Bently
b1259e0bdd docs(blocks): Add docstrings and error handling to TextEncoderBlock
- Add module, class, and method docstrings for 80%+ coverage
- Add try/except error handling per CodeRabbit review
- Use inherited error field from BlockSchemaOutput
2026-01-27 15:58:19 +00:00
Bently
5244bd94fc feat(blocks): Implement Text Encode block (fixes #11111) 2026-01-27 15:48:10 +00:00
4 changed files with 113 additions and 166 deletions

View File

@@ -0,0 +1,68 @@
"""Text encoding block for converting special characters to escape sequences."""
import codecs
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
class TextEncoderBlock(Block):
"""
Encodes a string by converting special characters into escape sequences.
This block is the inverse of TextDecoderBlock. It takes text containing
special characters (like newlines, tabs, etc.) and converts them into
their escape sequence representations (e.g., newline becomes \\n).
"""
class Input(BlockSchemaInput):
"""Input schema for TextEncoderBlock."""
text: str = SchemaField(
description="A string containing special characters to be encoded",
placeholder="Your text with newlines and quotes to encode",
)
class Output(BlockSchemaOutput):
"""Output schema for TextEncoderBlock."""
encoded_text: str = SchemaField(
description="The encoded text with special characters converted to escape sequences"
)
def __init__(self):
super().__init__(
id="5185f32e-4b65-4ecf-8fbb-873f003f09d6",
description="Encodes a string by converting special characters into escape sequences",
categories={BlockCategory.TEXT},
input_schema=TextEncoderBlock.Input,
output_schema=TextEncoderBlock.Output,
test_input={"text": """Hello
World!
This is a "quoted" string."""},
test_output=[
(
"encoded_text",
"""Hello\\nWorld!\\nThis is a "quoted" string.""",
)
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
"""
Encode the input text by converting special characters to escape sequences.
Args:
input_data: The input containing the text to encode.
**kwargs: Additional keyword arguments (unused).
Yields:
The encoded text with escape sequences.
"""
encoded_text = codecs.encode(input_data.text, "unicode_escape").decode("utf-8")
yield "encoded_text", encoded_text

View File

@@ -26,31 +26,6 @@ def add_param(url: str, key: str, value: str) -> str:
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://localhost:5432")
# Extract the application schema from DATABASE_URL for use in queries
_parsed = urlparse(DATABASE_URL)
_query_params = dict(parse_qsl(_parsed.query))
_app_schema = _query_params.get("schema", "public")
# Build search_path that includes app schema and extension schemas where pgvector may live.
# This is used both in connection options (may be ignored by PgBouncer) and in SET LOCAL
# statements before raw queries (guaranteed to work).
SEARCH_PATH = (
f"{_app_schema},extensions,public"
if _app_schema != "public"
else "public,extensions"
)
# Try to set search_path via PostgreSQL options parameter at connection time.
# NOTE: This may be ignored by PgBouncer in transaction pooling mode.
# As a fallback, we also SET LOCAL search_path before raw queries.
if "options" in _query_params:
_query_params["options"] = (
_query_params["options"] + f" -c search_path={SEARCH_PATH}"
)
else:
_query_params["options"] = f"-c search_path={SEARCH_PATH}"
DATABASE_URL = urlunparse(_parsed._replace(query=urlencode(_query_params)))
CONN_LIMIT = os.getenv("DB_CONNECTION_LIMIT")
if CONN_LIMIT:
DATABASE_URL = add_param(DATABASE_URL, "connection_limit", CONN_LIMIT)
@@ -133,70 +108,6 @@ def get_database_schema() -> str:
return query_params.get("schema", "public")
def get_pod_info() -> dict:
"""Get information about the current pod/host.
Returns dict with: hostname, pod_name (from HOSTNAME env var in k8s),
pod_namespace, pod_ip if available.
"""
import socket
return {
"hostname": socket.gethostname(),
"pod_name": os.getenv("HOSTNAME", "unknown"),
"pod_namespace": os.getenv("POD_NAMESPACE", "unknown"),
"pod_ip": os.getenv("POD_IP", "unknown"),
}
async def get_connection_debug_info(tx=None) -> dict:
"""Get diagnostic info about the current database connection and pod.
Useful for debugging "table does not exist" or "type does not exist" errors
that may indicate connections going to different database instances or pods.
Args:
tx: Optional transaction client to use for the query (ensures same connection)
Returns dict with: search_path, current_schema, server_version, pg_backend_pid,
pgvector_installed, pgvector_schema, plus pod info
"""
import prisma as prisma_module
pod_info = get_pod_info()
db_client = tx if tx else prisma_module.get_client()
try:
# Get connection info and check for pgvector in a single query
result = await db_client.query_raw(
"""
SELECT
current_setting('search_path') as search_path,
current_schema() as current_schema,
current_database() as current_database,
inet_server_addr() as server_addr,
inet_server_port() as server_port,
pg_backend_pid() as backend_pid,
version() as server_version,
(SELECT EXISTS(
SELECT 1 FROM pg_extension WHERE extname = 'vector'
)) as pgvector_installed,
(SELECT nspname FROM pg_extension e
JOIN pg_namespace n ON e.extnamespace = n.oid
WHERE e.extname = 'vector'
LIMIT 1) as pgvector_schema,
(SELECT string_agg(extname || ' in ' || nspname, ', ')
FROM pg_extension e
JOIN pg_namespace n ON e.extnamespace = n.oid
) as all_extensions
"""
)
db_info = result[0] if result else {}
return {**pod_info, **db_info}
except Exception as e:
return {**pod_info, "db_error": str(e)}
async def _raw_with_schema(
query_template: str,
*args,
@@ -213,9 +124,8 @@ async def _raw_with_schema(
Note on pgvector types:
Use unqualified ::vector and <=> operator in queries. PostgreSQL resolves
these via search_path. The connection's search_path is configured at module
load to include common extension schemas (public, extensions) where pgvector
may be installed across different environments (local, CI, Supabase).
these via search_path, which includes the schema where pgvector is installed
on all environments (local, CI, dev).
Args:
query_template: SQL query with {schema_prefix} and/or {schema} placeholders
@@ -245,60 +155,12 @@ async def _raw_with_schema(
db_client = client if client else prisma_module.get_client()
# For queries that might use pgvector types (::vector or <=> operator),
# we need to ensure search_path includes the schema where pgvector is installed.
# PgBouncer in transaction mode may ignore connection-level options, so we
# use SET LOCAL within a transaction to guarantee correct search_path.
needs_vector_search_path = "::vector" in formatted_query or "<=>" in formatted_query
if execute:
result = await db_client.execute_raw(formatted_query, *args) # type: ignore
else:
result = await db_client.query_raw(formatted_query, *args) # type: ignore
try:
if needs_vector_search_path and client is None:
# Use transaction to set search_path for vector queries
async with db_client.tx() as tx:
# Log debug info BEFORE the query to capture which backend we're hitting
debug_info = await get_connection_debug_info(tx)
logger.info(
f"Vector query starting. backend_pid={debug_info.get('backend_pid')}, "
f"server_addr={debug_info.get('server_addr')}, "
f"pgvector_installed={debug_info.get('pgvector_installed')}, "
f"pgvector_schema={debug_info.get('pgvector_schema')}, "
f"search_path={debug_info.get('search_path')}, "
f"pod={debug_info.get('pod_name')}"
)
await tx.execute_raw(f"SET LOCAL search_path TO {SEARCH_PATH}")
if execute:
result = await tx.execute_raw(formatted_query, *args) # type: ignore
else:
result = await tx.query_raw(formatted_query, *args) # type: ignore
logger.info(
f"Vector query SUCCESS. backend_pid={debug_info.get('backend_pid')}"
)
else:
# Regular query without vector types, or already in a transaction
if execute:
result = await db_client.execute_raw(formatted_query, *args) # type: ignore
else:
result = await db_client.query_raw(formatted_query, *args) # type: ignore
return result
except Exception as e:
error_msg = str(e)
# Log connection debug info for "does not exist" errors to help diagnose
# whether connections are going to different database instances
if "does not exist" in error_msg:
try:
debug_info = await get_connection_debug_info()
logger.error(
f"Vector query FAILED. Connection debug info: {debug_info}. "
f"Query template: {query_template[:200]}... Error: {error_msg}"
)
except Exception:
logger.error(
f"Vector query FAILED (debug info unavailable). "
f"Query template: {query_template[:200]}... Error: {error_msg}"
)
raise
return result
async def query_raw_with_schema(query_template: str, *args) -> list[dict]:

View File

@@ -216,27 +216,7 @@ async def get_business_understanding(
# Cache miss - load from database
logger.debug(f"Business understanding cache miss for user {user_id}")
try:
record = await CoPilotUnderstanding.prisma().find_unique(where={"userId": user_id})
except Exception as e:
error_msg = str(e)
if "does not exist" in error_msg:
# Log connection debug info to diagnose if connections go to different DBs
from backend.data.db import get_connection_debug_info
try:
debug_info = await get_connection_debug_info()
logger.error(
f"CoPilotUnderstanding table not found. Connection debug: {debug_info}. "
f"Error: {error_msg}"
)
except Exception:
logger.error(
f"CoPilotUnderstanding table not found (debug unavailable). "
f"Error: {error_msg}"
)
raise
record = await CoPilotUnderstanding.prisma().find_unique(where={"userId": user_id})
if record is None:
return None

View File

@@ -0,0 +1,37 @@
# Text Encoder
## What it is
A tool that converts text containing special characters into escaped text sequences.
## What it does
It takes a string of text that contains special characters (like new lines or quotation marks) and converts them into their escape sequence representations (like '\n' for new lines).
## How it works
The Text Encoder examines the input text and identifies special characters. It then replaces these characters with their escape sequence equivalents, making the text safe for storage or transmission in formats that don't support raw special characters.
## Inputs
| Input | Description |
|-------|-------------|
| Text | The text you want to encode, which may contain special characters like new lines or quotation marks |
## Outputs
| Output | Description |
|--------|-------------|
| Encoded Text | The text after processing, with all special characters converted to their escape sequences |
## Possible use case
Imagine you have a piece of text with line breaks that you need to store in a JSON file or send through an API:
```
Hello
World!
This is a "quoted" string.
```
The Text Encoder can convert it into:
```
Hello\nWorld!\nThis is a "quoted" string.
```
This is useful when you need to prepare text for storage in formats that require escape sequences, or when sending data to systems that expect encoded text. It's the inverse operation of the Text Decoder block.