mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-09 22:35:54 -05:00
Compare commits
5 Commits
dependabot
...
fix/copilo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
62c9e840b8 | ||
|
|
495d01b09b | ||
|
|
b334f1a843 | ||
|
|
5fd1482944 | ||
|
|
efd1e96235 |
2
.github/workflows/claude-dependabot.yml
vendored
2
.github/workflows/claude-dependabot.yml
vendored
@@ -78,7 +78,7 @@ jobs:
|
||||
|
||||
# Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml)
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: "22"
|
||||
|
||||
|
||||
2
.github/workflows/claude.yml
vendored
2
.github/workflows/claude.yml
vendored
@@ -94,7 +94,7 @@ jobs:
|
||||
|
||||
# Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml)
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: "22"
|
||||
|
||||
|
||||
2
.github/workflows/copilot-setup-steps.yml
vendored
2
.github/workflows/copilot-setup-steps.yml
vendored
@@ -76,7 +76,7 @@ jobs:
|
||||
|
||||
# Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml)
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: "22"
|
||||
|
||||
|
||||
10
.github/workflows/platform-frontend-ci.yml
vendored
10
.github/workflows/platform-frontend-ci.yml
vendored
@@ -42,7 +42,7 @@ jobs:
|
||||
- 'autogpt_platform/frontend/src/components/**'
|
||||
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: "22.18.0"
|
||||
|
||||
@@ -74,7 +74,7 @@ jobs:
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: "22.18.0"
|
||||
|
||||
@@ -112,7 +112,7 @@ jobs:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: "22.18.0"
|
||||
|
||||
@@ -153,7 +153,7 @@ jobs:
|
||||
submodules: recursive
|
||||
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: "22.18.0"
|
||||
|
||||
@@ -282,7 +282,7 @@ jobs:
|
||||
submodules: recursive
|
||||
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: "22.18.0"
|
||||
|
||||
|
||||
4
.github/workflows/platform-fullstack-ci.yml
vendored
4
.github/workflows/platform-fullstack-ci.yml
vendored
@@ -32,7 +32,7 @@ jobs:
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: "22.18.0"
|
||||
|
||||
@@ -68,7 +68,7 @@ jobs:
|
||||
submodules: recursive
|
||||
|
||||
- name: Set up Node.js
|
||||
uses: actions/setup-node@v6
|
||||
uses: actions/setup-node@v4
|
||||
with:
|
||||
node-version: "22.18.0"
|
||||
|
||||
|
||||
@@ -45,7 +45,10 @@ async def create_chat_session(
|
||||
successfulAgentRuns=SafeJson({}),
|
||||
successfulAgentSchedules=SafeJson({}),
|
||||
)
|
||||
return await PrismaChatSession.prisma().create(data=data)
|
||||
return await PrismaChatSession.prisma().create(
|
||||
data=data,
|
||||
include={"Messages": True},
|
||||
)
|
||||
|
||||
|
||||
async def update_chat_session(
|
||||
|
||||
@@ -266,38 +266,12 @@ async def stream_chat_post(
|
||||
|
||||
"""
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
stream_start_time = time.perf_counter()
|
||||
|
||||
# Base log metadata (task_id added after creation)
|
||||
log_meta = {"component": "ChatStream", "session_id": session_id}
|
||||
if user_id:
|
||||
log_meta["user_id"] = user_id
|
||||
|
||||
logger.info(
|
||||
f"[TIMING] stream_chat_post STARTED, session={session_id}, "
|
||||
f"user={user_id}, message_len={len(request.message)}",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
|
||||
session = await _validate_and_get_session(session_id, user_id)
|
||||
logger.info(
|
||||
f"[TIMING] session validated in {(time.perf_counter() - stream_start_time)*1000:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"duration_ms": (time.perf_counter() - stream_start_time) * 1000,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Create a task in the stream registry for reconnection support
|
||||
task_id = str(uuid_module.uuid4())
|
||||
operation_id = str(uuid_module.uuid4())
|
||||
log_meta["task_id"] = task_id
|
||||
|
||||
task_create_start = time.perf_counter()
|
||||
await stream_registry.create_task(
|
||||
task_id=task_id,
|
||||
session_id=session_id,
|
||||
@@ -306,46 +280,14 @@ async def stream_chat_post(
|
||||
tool_name="chat",
|
||||
operation_id=operation_id,
|
||||
)
|
||||
logger.info(
|
||||
f"[TIMING] create_task completed in {(time.perf_counter() - task_create_start)*1000:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"duration_ms": (time.perf_counter() - task_create_start) * 1000,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Background task that runs the AI generation independently of SSE connection
|
||||
async def run_ai_generation():
|
||||
import time as time_module
|
||||
|
||||
gen_start_time = time_module.perf_counter()
|
||||
logger.info(
|
||||
f"[TIMING] run_ai_generation STARTED, task={task_id}, session={session_id}, user={user_id}",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
first_chunk_time, ttfc = None, None
|
||||
chunk_count = 0
|
||||
try:
|
||||
# Emit a start event with task_id for reconnection
|
||||
start_chunk = StreamStart(messageId=task_id, taskId=task_id)
|
||||
await stream_registry.publish_chunk(task_id, start_chunk)
|
||||
logger.info(
|
||||
f"[TIMING] StreamStart published at {(time_module.perf_counter() - gen_start_time)*1000:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"elapsed_ms": (time_module.perf_counter() - gen_start_time)
|
||||
* 1000,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"[TIMING] Calling stream_chat_completion",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
async for chunk in chat_service.stream_chat_completion(
|
||||
session_id,
|
||||
request.message,
|
||||
@@ -354,202 +296,54 @@ async def stream_chat_post(
|
||||
session=session, # Pass pre-fetched session to avoid double-fetch
|
||||
context=request.context,
|
||||
):
|
||||
chunk_count += 1
|
||||
if first_chunk_time is None:
|
||||
first_chunk_time = time_module.perf_counter()
|
||||
ttfc = first_chunk_time - gen_start_time
|
||||
logger.info(
|
||||
f"[TIMING] FIRST AI CHUNK at {ttfc:.2f}s, type={type(chunk).__name__}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"chunk_type": type(chunk).__name__,
|
||||
"time_to_first_chunk_ms": ttfc * 1000,
|
||||
}
|
||||
},
|
||||
)
|
||||
# Write to Redis (subscribers will receive via XREAD)
|
||||
await stream_registry.publish_chunk(task_id, chunk)
|
||||
|
||||
gen_end_time = time_module.perf_counter()
|
||||
total_time = (gen_end_time - gen_start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] run_ai_generation FINISHED in {total_time/1000:.1f}s; "
|
||||
f"task={task_id}, session={session_id}, "
|
||||
f"ttfc={ttfc or -1:.2f}s, n_chunks={chunk_count}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"total_time_ms": total_time,
|
||||
"time_to_first_chunk_ms": (
|
||||
ttfc * 1000 if ttfc is not None else None
|
||||
),
|
||||
"n_chunks": chunk_count,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Mark task as completed
|
||||
await stream_registry.mark_task_completed(task_id, "completed")
|
||||
except Exception as e:
|
||||
elapsed = time_module.perf_counter() - gen_start_time
|
||||
logger.error(
|
||||
f"[TIMING] run_ai_generation ERROR after {elapsed:.2f}s: {e}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"elapsed_ms": elapsed * 1000,
|
||||
"error": str(e),
|
||||
}
|
||||
},
|
||||
f"Error in background AI generation for session {session_id}: {e}"
|
||||
)
|
||||
await stream_registry.mark_task_completed(task_id, "failed")
|
||||
|
||||
# Start the AI generation in a background task
|
||||
bg_task = asyncio.create_task(run_ai_generation())
|
||||
await stream_registry.set_task_asyncio_task(task_id, bg_task)
|
||||
setup_time = (time.perf_counter() - stream_start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] Background task started, setup={setup_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "setup_time_ms": setup_time}},
|
||||
)
|
||||
|
||||
# SSE endpoint that subscribes to the task's stream
|
||||
async def event_generator() -> AsyncGenerator[str, None]:
|
||||
import time as time_module
|
||||
|
||||
event_gen_start = time_module.perf_counter()
|
||||
logger.info(
|
||||
f"[TIMING] event_generator STARTED, task={task_id}, session={session_id}, "
|
||||
f"user={user_id}",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
subscriber_queue = None
|
||||
first_chunk_yielded = False
|
||||
chunks_yielded = 0
|
||||
try:
|
||||
# Subscribe to the task stream (this replays existing messages + live updates)
|
||||
subscribe_start = time_module.perf_counter()
|
||||
logger.info(
|
||||
"[TIMING] Calling subscribe_to_task",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
subscriber_queue = await stream_registry.subscribe_to_task(
|
||||
task_id=task_id,
|
||||
user_id=user_id,
|
||||
last_message_id="0-0", # Get all messages from the beginning
|
||||
)
|
||||
subscribe_time = (time_module.perf_counter() - subscribe_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] subscribe_to_task completed in {subscribe_time:.1f}ms, "
|
||||
f"queue_ok={subscriber_queue is not None}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"duration_ms": subscribe_time,
|
||||
"queue_obtained": subscriber_queue is not None,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
if subscriber_queue is None:
|
||||
logger.info(
|
||||
"[TIMING] subscriber_queue is None, yielding finish",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
yield StreamFinish().to_sse()
|
||||
yield "data: [DONE]\n\n"
|
||||
return
|
||||
|
||||
# Read from the subscriber queue and yield to SSE
|
||||
logger.info(
|
||||
"[TIMING] Starting to read from subscriber_queue",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
while True:
|
||||
try:
|
||||
queue_wait_start = time_module.perf_counter()
|
||||
chunk = await asyncio.wait_for(subscriber_queue.get(), timeout=30.0)
|
||||
queue_wait_time = (
|
||||
time_module.perf_counter() - queue_wait_start
|
||||
) * 1000
|
||||
chunks_yielded += 1
|
||||
|
||||
if not first_chunk_yielded:
|
||||
first_chunk_yielded = True
|
||||
elapsed = time_module.perf_counter() - event_gen_start
|
||||
logger.info(
|
||||
f"[TIMING] FIRST CHUNK from queue at {elapsed:.2f}s, "
|
||||
f"type={type(chunk).__name__}, "
|
||||
f"wait={queue_wait_time:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"chunk_type": type(chunk).__name__,
|
||||
"elapsed_ms": elapsed * 1000,
|
||||
"queue_wait_ms": queue_wait_time,
|
||||
}
|
||||
},
|
||||
)
|
||||
elif chunks_yielded % 50 == 0:
|
||||
logger.info(
|
||||
f"[TIMING] Chunk #{chunks_yielded}, "
|
||||
f"type={type(chunk).__name__}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"chunk_number": chunks_yielded,
|
||||
"chunk_type": type(chunk).__name__,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
yield chunk.to_sse()
|
||||
|
||||
# Check for finish signal
|
||||
if isinstance(chunk, StreamFinish):
|
||||
total_time = time_module.perf_counter() - event_gen_start
|
||||
logger.info(
|
||||
f"[TIMING] StreamFinish received in {total_time:.2f}s; "
|
||||
f"n_chunks={chunks_yielded}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"chunks_yielded": chunks_yielded,
|
||||
"total_time_ms": total_time * 1000,
|
||||
}
|
||||
},
|
||||
)
|
||||
break
|
||||
except asyncio.TimeoutError:
|
||||
# Send heartbeat to keep connection alive
|
||||
logger.info(
|
||||
f"[TIMING] Heartbeat timeout, chunks_so_far={chunks_yielded}",
|
||||
extra={
|
||||
"json_fields": {**log_meta, "chunks_so_far": chunks_yielded}
|
||||
},
|
||||
)
|
||||
yield StreamHeartbeat().to_sse()
|
||||
|
||||
except GeneratorExit:
|
||||
logger.info(
|
||||
f"[TIMING] GeneratorExit (client disconnected), chunks={chunks_yielded}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"chunks_yielded": chunks_yielded,
|
||||
"reason": "client_disconnect",
|
||||
}
|
||||
},
|
||||
)
|
||||
pass # Client disconnected - background task continues
|
||||
except Exception as e:
|
||||
elapsed = (time_module.perf_counter() - event_gen_start) * 1000
|
||||
logger.error(
|
||||
f"[TIMING] event_generator ERROR after {elapsed:.1f}ms: {e}",
|
||||
extra={
|
||||
"json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)}
|
||||
},
|
||||
)
|
||||
logger.error(f"Error in SSE stream for task {task_id}: {e}")
|
||||
finally:
|
||||
# Unsubscribe when client disconnects or stream ends to prevent resource leak
|
||||
if subscriber_queue is not None:
|
||||
@@ -563,18 +357,6 @@ async def stream_chat_post(
|
||||
exc_info=True,
|
||||
)
|
||||
# AI SDK protocol termination - always yield even if unsubscribe fails
|
||||
total_time = time_module.perf_counter() - event_gen_start
|
||||
logger.info(
|
||||
f"[TIMING] event_generator FINISHED in {total_time:.2f}s; "
|
||||
f"task={task_id}, session={session_id}, n_chunks={chunks_yielded}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"total_time_ms": total_time * 1000,
|
||||
"chunks_yielded": chunks_yielded,
|
||||
}
|
||||
},
|
||||
)
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
return StreamingResponse(
|
||||
@@ -643,7 +425,7 @@ async def stream_chat_get(
|
||||
"Chat stream completed",
|
||||
extra={
|
||||
"session_id": session_id,
|
||||
"n_chunks": chunk_count,
|
||||
"chunk_count": chunk_count,
|
||||
"first_chunk_type": first_chunk_type,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -371,45 +371,21 @@ async def stream_chat_completion(
|
||||
ValueError: If max_context_messages is exceeded
|
||||
|
||||
"""
|
||||
completion_start = time.monotonic()
|
||||
|
||||
# Build log metadata for structured logging
|
||||
log_meta = {"component": "ChatService", "session_id": session_id}
|
||||
if user_id:
|
||||
log_meta["user_id"] = user_id
|
||||
|
||||
logger.info(
|
||||
f"[TIMING] stream_chat_completion STARTED, session={session_id}, user={user_id}, "
|
||||
f"message_len={len(message) if message else 0}, is_user={is_user_message}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"message_len": len(message) if message else 0,
|
||||
"is_user_message": is_user_message,
|
||||
}
|
||||
},
|
||||
f"Streaming chat completion for session {session_id} for message {message} and user id {user_id}. Message is user message: {is_user_message}"
|
||||
)
|
||||
|
||||
# Only fetch from Redis if session not provided (initial call)
|
||||
if session is None:
|
||||
fetch_start = time.monotonic()
|
||||
session = await get_chat_session(session_id, user_id)
|
||||
fetch_time = (time.monotonic() - fetch_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] get_chat_session took {fetch_time:.1f}ms, "
|
||||
f"n_messages={len(session.messages) if session else 0}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"duration_ms": fetch_time,
|
||||
"n_messages": len(session.messages) if session else 0,
|
||||
}
|
||||
},
|
||||
f"Fetched session from Redis: {session.session_id if session else 'None'}, "
|
||||
f"message_count={len(session.messages) if session else 0}"
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f"[TIMING] Using provided session, messages={len(session.messages)}",
|
||||
extra={"json_fields": {**log_meta, "n_messages": len(session.messages)}},
|
||||
f"Using provided session object: {session.session_id}, "
|
||||
f"message_count={len(session.messages)}"
|
||||
)
|
||||
|
||||
if not session:
|
||||
@@ -430,25 +406,17 @@ async def stream_chat_completion(
|
||||
|
||||
# Track user message in PostHog
|
||||
if is_user_message:
|
||||
posthog_start = time.monotonic()
|
||||
track_user_message(
|
||||
user_id=user_id,
|
||||
session_id=session_id,
|
||||
message_length=len(message),
|
||||
)
|
||||
posthog_time = (time.monotonic() - posthog_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] track_user_message took {posthog_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": posthog_time}},
|
||||
)
|
||||
|
||||
upsert_start = time.monotonic()
|
||||
session = await upsert_chat_session(session)
|
||||
upsert_time = (time.monotonic() - upsert_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] upsert_chat_session took {upsert_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": upsert_time}},
|
||||
f"Upserting session: {session.session_id} with user id {session.user_id}, "
|
||||
f"message_count={len(session.messages)}"
|
||||
)
|
||||
session = await upsert_chat_session(session)
|
||||
assert session, "Session not found"
|
||||
|
||||
# Generate title for new sessions on first user message (non-blocking)
|
||||
@@ -486,13 +454,7 @@ async def stream_chat_completion(
|
||||
asyncio.create_task(_update_title())
|
||||
|
||||
# Build system prompt with business understanding
|
||||
prompt_start = time.monotonic()
|
||||
system_prompt, understanding = await _build_system_prompt(user_id)
|
||||
prompt_time = (time.monotonic() - prompt_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] _build_system_prompt took {prompt_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": prompt_time}},
|
||||
)
|
||||
|
||||
# Initialize variables for streaming
|
||||
assistant_response = ChatMessage(
|
||||
@@ -521,18 +483,9 @@ async def stream_chat_completion(
|
||||
text_block_id = str(uuid_module.uuid4())
|
||||
|
||||
# Yield message start
|
||||
setup_time = (time.monotonic() - completion_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] Setup complete, yielding StreamStart at {setup_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "setup_time_ms": setup_time}},
|
||||
)
|
||||
yield StreamStart(messageId=message_id)
|
||||
|
||||
try:
|
||||
logger.info(
|
||||
"[TIMING] Calling _stream_chat_chunks",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
async for chunk in _stream_chat_chunks(
|
||||
session=session,
|
||||
tools=tools,
|
||||
@@ -940,21 +893,9 @@ async def _stream_chat_chunks(
|
||||
SSE formatted JSON response objects
|
||||
|
||||
"""
|
||||
import time as time_module
|
||||
|
||||
stream_chunks_start = time_module.perf_counter()
|
||||
model = config.model
|
||||
|
||||
# Build log metadata for structured logging
|
||||
log_meta = {"component": "ChatService", "session_id": session.session_id}
|
||||
if session.user_id:
|
||||
log_meta["user_id"] = session.user_id
|
||||
|
||||
logger.info(
|
||||
f"[TIMING] _stream_chat_chunks STARTED, session={session.session_id}, "
|
||||
f"user={session.user_id}, n_messages={len(session.messages)}",
|
||||
extra={"json_fields": {**log_meta, "n_messages": len(session.messages)}},
|
||||
)
|
||||
logger.info("Starting pure chat stream")
|
||||
|
||||
messages = session.to_openai_messages()
|
||||
if system_prompt:
|
||||
@@ -965,18 +906,12 @@ async def _stream_chat_chunks(
|
||||
messages = [system_message] + messages
|
||||
|
||||
# Apply context window management
|
||||
context_start = time_module.perf_counter()
|
||||
context_result = await _manage_context_window(
|
||||
messages=messages,
|
||||
model=model,
|
||||
api_key=config.api_key,
|
||||
base_url=config.base_url,
|
||||
)
|
||||
context_time = (time_module.perf_counter() - context_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] _manage_context_window took {context_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": context_time}},
|
||||
)
|
||||
|
||||
if context_result.error:
|
||||
if "System prompt dropped" in context_result.error:
|
||||
@@ -1011,19 +946,9 @@ async def _stream_chat_chunks(
|
||||
|
||||
while retry_count <= MAX_RETRIES:
|
||||
try:
|
||||
elapsed = (time_module.perf_counter() - stream_chunks_start) * 1000
|
||||
retry_info = (
|
||||
f" (retry {retry_count}/{MAX_RETRIES})" if retry_count > 0 else ""
|
||||
)
|
||||
logger.info(
|
||||
f"[TIMING] Creating OpenAI stream at {elapsed:.1f}ms{retry_info}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"elapsed_ms": elapsed,
|
||||
"retry_count": retry_count,
|
||||
}
|
||||
},
|
||||
f"Creating OpenAI chat completion stream..."
|
||||
f"{f' (retry {retry_count}/{MAX_RETRIES})' if retry_count > 0 else ''}"
|
||||
)
|
||||
|
||||
# Build extra_body for OpenRouter tracing and PostHog analytics
|
||||
@@ -1040,7 +965,6 @@ async def _stream_chat_chunks(
|
||||
:128
|
||||
] # OpenRouter limit
|
||||
|
||||
api_call_start = time_module.perf_counter()
|
||||
stream = await client.chat.completions.create(
|
||||
model=model,
|
||||
messages=cast(list[ChatCompletionMessageParam], messages),
|
||||
@@ -1050,11 +974,6 @@ async def _stream_chat_chunks(
|
||||
stream_options=ChatCompletionStreamOptionsParam(include_usage=True),
|
||||
extra_body=extra_body,
|
||||
)
|
||||
api_init_time = (time_module.perf_counter() - api_call_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] OpenAI stream object returned in {api_init_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": api_init_time}},
|
||||
)
|
||||
|
||||
# Variables to accumulate tool calls
|
||||
tool_calls: list[dict[str, Any]] = []
|
||||
@@ -1065,13 +984,10 @@ async def _stream_chat_chunks(
|
||||
|
||||
# Track if we've started the text block
|
||||
text_started = False
|
||||
first_content_chunk = True
|
||||
chunk_count = 0
|
||||
|
||||
# Process the stream
|
||||
chunk: ChatCompletionChunk
|
||||
async for chunk in stream:
|
||||
chunk_count += 1
|
||||
if chunk.usage:
|
||||
yield StreamUsage(
|
||||
promptTokens=chunk.usage.prompt_tokens,
|
||||
@@ -1094,23 +1010,6 @@ async def _stream_chat_chunks(
|
||||
if not text_started and text_block_id:
|
||||
yield StreamTextStart(id=text_block_id)
|
||||
text_started = True
|
||||
# Log timing for first content chunk
|
||||
if first_content_chunk:
|
||||
first_content_chunk = False
|
||||
ttfc = (
|
||||
time_module.perf_counter() - api_call_start
|
||||
) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] FIRST CONTENT CHUNK at {ttfc:.1f}ms "
|
||||
f"(since API call), n_chunks={chunk_count}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"time_to_first_chunk_ms": ttfc,
|
||||
"n_chunks": chunk_count,
|
||||
}
|
||||
},
|
||||
)
|
||||
# Stream the text delta
|
||||
text_response = StreamTextDelta(
|
||||
id=text_block_id or "",
|
||||
@@ -1167,21 +1066,7 @@ async def _stream_chat_chunks(
|
||||
toolName=tool_calls[idx]["function"]["name"],
|
||||
)
|
||||
emitted_start_for_idx.add(idx)
|
||||
stream_duration = time_module.perf_counter() - api_call_start
|
||||
logger.info(
|
||||
f"[TIMING] OpenAI stream COMPLETE, finish_reason={finish_reason}, "
|
||||
f"duration={stream_duration:.2f}s, "
|
||||
f"n_chunks={chunk_count}, n_tool_calls={len(tool_calls)}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"stream_duration_ms": stream_duration * 1000,
|
||||
"finish_reason": finish_reason,
|
||||
"n_chunks": chunk_count,
|
||||
"n_tool_calls": len(tool_calls),
|
||||
}
|
||||
},
|
||||
)
|
||||
logger.info(f"Stream complete. Finish reason: {finish_reason}")
|
||||
|
||||
# Yield all accumulated tool calls after the stream is complete
|
||||
# This ensures all tool call arguments have been fully received
|
||||
@@ -1201,16 +1086,11 @@ async def _stream_chat_chunks(
|
||||
# Re-raise to trigger retry logic in the parent function
|
||||
raise
|
||||
|
||||
total_time = (time_module.perf_counter() - stream_chunks_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] _stream_chat_chunks COMPLETED in {total_time/1000:.1f}s; "
|
||||
f"session={session.session_id}, user={session.user_id}",
|
||||
extra={"json_fields": {**log_meta, "total_time_ms": total_time}},
|
||||
)
|
||||
yield StreamFinish()
|
||||
return
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
|
||||
if _is_retryable_error(e) and retry_count < MAX_RETRIES:
|
||||
retry_count += 1
|
||||
# Calculate delay with exponential backoff
|
||||
@@ -1226,12 +1106,26 @@ async def _stream_chat_chunks(
|
||||
continue # Retry the stream
|
||||
else:
|
||||
# Non-retryable error or max retries exceeded
|
||||
logger.error(
|
||||
f"Error in stream (not retrying): {e!s}",
|
||||
exc_info=True,
|
||||
_log_api_error(
|
||||
error=e,
|
||||
session_id=session.session_id if session else None,
|
||||
message_count=len(messages) if messages else None,
|
||||
model=model,
|
||||
retry_count=retry_count,
|
||||
)
|
||||
error_code = None
|
||||
error_text = str(e)
|
||||
|
||||
error_details = _extract_api_error_details(e)
|
||||
if error_details.get("response_body"):
|
||||
body = error_details["response_body"]
|
||||
if isinstance(body, dict):
|
||||
err = body.get("error")
|
||||
if isinstance(err, dict) and err.get("message"):
|
||||
error_text = err["message"]
|
||||
elif body.get("message"):
|
||||
error_text = body["message"]
|
||||
|
||||
if _is_region_blocked_error(e):
|
||||
error_code = "MODEL_NOT_AVAILABLE_REGION"
|
||||
error_text = (
|
||||
@@ -1248,9 +1142,12 @@ async def _stream_chat_chunks(
|
||||
|
||||
# If we exit the retry loop without returning, it means we exhausted retries
|
||||
if last_error:
|
||||
logger.error(
|
||||
f"Max retries ({MAX_RETRIES}) exceeded. Last error: {last_error!s}",
|
||||
exc_info=True,
|
||||
_log_api_error(
|
||||
error=last_error,
|
||||
session_id=session.session_id if session else None,
|
||||
message_count=len(messages) if messages else None,
|
||||
model=model,
|
||||
retry_count=MAX_RETRIES,
|
||||
)
|
||||
yield StreamError(errorText=f"Max retries exceeded: {last_error!s}")
|
||||
yield StreamFinish()
|
||||
@@ -1822,6 +1719,7 @@ async def _generate_llm_continuation(
|
||||
break # Success, exit retry loop
|
||||
except Exception as e:
|
||||
last_error = e
|
||||
|
||||
if _is_retryable_error(e) and retry_count < MAX_RETRIES:
|
||||
retry_count += 1
|
||||
delay = min(
|
||||
@@ -1835,17 +1733,23 @@ async def _generate_llm_continuation(
|
||||
await asyncio.sleep(delay)
|
||||
continue
|
||||
else:
|
||||
# Non-retryable error - log and exit gracefully
|
||||
logger.error(
|
||||
f"Non-retryable error in LLM continuation: {e!s}",
|
||||
exc_info=True,
|
||||
# Non-retryable error - log details and exit gracefully
|
||||
_log_api_error(
|
||||
error=e,
|
||||
session_id=session_id,
|
||||
message_count=len(messages) if messages else None,
|
||||
model=config.model,
|
||||
retry_count=retry_count,
|
||||
)
|
||||
return
|
||||
|
||||
if last_error:
|
||||
logger.error(
|
||||
f"Max retries ({MAX_RETRIES}) exceeded for LLM continuation. "
|
||||
f"Last error: {last_error!s}"
|
||||
_log_api_error(
|
||||
error=last_error,
|
||||
session_id=session_id,
|
||||
message_count=len(messages) if messages else None,
|
||||
model=config.model,
|
||||
retry_count=MAX_RETRIES,
|
||||
)
|
||||
return
|
||||
|
||||
@@ -1885,6 +1789,89 @@ async def _generate_llm_continuation(
|
||||
logger.error(f"Failed to generate LLM continuation: {e}", exc_info=True)
|
||||
|
||||
|
||||
def _log_api_error(
|
||||
error: Exception,
|
||||
session_id: str | None = None,
|
||||
message_count: int | None = None,
|
||||
model: str | None = None,
|
||||
retry_count: int = 0,
|
||||
) -> None:
|
||||
"""Log detailed API error information for debugging."""
|
||||
details = _extract_api_error_details(error)
|
||||
details["session_id"] = session_id
|
||||
details["message_count"] = message_count
|
||||
details["model"] = model
|
||||
details["retry_count"] = retry_count
|
||||
|
||||
if isinstance(error, RateLimitError):
|
||||
logger.warning(f"Rate limit error: {details}")
|
||||
elif isinstance(error, APIConnectionError):
|
||||
logger.warning(f"API connection error: {details}")
|
||||
elif isinstance(error, APIStatusError) and error.status_code >= 500:
|
||||
logger.error(f"API server error (5xx): {details}")
|
||||
else:
|
||||
logger.error(f"API error: {details}")
|
||||
|
||||
|
||||
def _extract_api_error_details(error: Exception) -> dict[str, Any]:
|
||||
"""Extract detailed information from OpenAI/OpenRouter API errors."""
|
||||
error_msg = str(error)
|
||||
details: dict[str, Any] = {
|
||||
"error_type": type(error).__name__,
|
||||
"error_message": error_msg[:500] + "..." if len(error_msg) > 500 else error_msg,
|
||||
}
|
||||
|
||||
if hasattr(error, "code"):
|
||||
details["code"] = getattr(error, "code", None)
|
||||
if hasattr(error, "param"):
|
||||
details["param"] = getattr(error, "param", None)
|
||||
|
||||
if isinstance(error, APIStatusError):
|
||||
details["status_code"] = error.status_code
|
||||
details["request_id"] = getattr(error, "request_id", None)
|
||||
|
||||
if hasattr(error, "body") and error.body:
|
||||
details["response_body"] = _sanitize_error_body(error.body)
|
||||
|
||||
if hasattr(error, "response") and error.response:
|
||||
headers = error.response.headers
|
||||
details["openrouter_provider"] = headers.get("x-openrouter-provider")
|
||||
details["openrouter_model"] = headers.get("x-openrouter-model")
|
||||
details["retry_after"] = headers.get("retry-after")
|
||||
details["rate_limit_remaining"] = headers.get("x-ratelimit-remaining")
|
||||
|
||||
return details
|
||||
|
||||
|
||||
def _sanitize_error_body(
|
||||
body: Any, max_length: int = 2000
|
||||
) -> dict[str, Any] | str | None:
|
||||
"""Extract only safe fields from error response body to avoid logging sensitive data."""
|
||||
if not isinstance(body, dict):
|
||||
# Non-dict bodies (e.g., HTML error pages) - return truncated string
|
||||
if body is not None:
|
||||
body_str = str(body)
|
||||
if len(body_str) > max_length:
|
||||
return body_str[:max_length] + "...[truncated]"
|
||||
return body_str
|
||||
return None
|
||||
|
||||
safe_fields = ("message", "type", "code", "param", "error")
|
||||
sanitized: dict[str, Any] = {}
|
||||
|
||||
for field in safe_fields:
|
||||
if field in body:
|
||||
value = body[field]
|
||||
if field == "error" and isinstance(value, dict):
|
||||
sanitized[field] = _sanitize_error_body(value, max_length)
|
||||
elif isinstance(value, str) and len(value) > max_length:
|
||||
sanitized[field] = value[:max_length] + "...[truncated]"
|
||||
else:
|
||||
sanitized[field] = value
|
||||
|
||||
return sanitized if sanitized else None
|
||||
|
||||
|
||||
async def _generate_llm_continuation_with_streaming(
|
||||
session_id: str,
|
||||
user_id: str | None,
|
||||
|
||||
@@ -104,24 +104,6 @@ async def create_task(
|
||||
Returns:
|
||||
The created ActiveTask instance (metadata only)
|
||||
"""
|
||||
import time
|
||||
|
||||
start_time = time.perf_counter()
|
||||
|
||||
# Build log metadata for structured logging
|
||||
log_meta = {
|
||||
"component": "StreamRegistry",
|
||||
"task_id": task_id,
|
||||
"session_id": session_id,
|
||||
}
|
||||
if user_id:
|
||||
log_meta["user_id"] = user_id
|
||||
|
||||
logger.info(
|
||||
f"[TIMING] create_task STARTED, task={task_id}, session={session_id}, user={user_id}",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
|
||||
task = ActiveTask(
|
||||
task_id=task_id,
|
||||
session_id=session_id,
|
||||
@@ -132,18 +114,10 @@ async def create_task(
|
||||
)
|
||||
|
||||
# Store metadata in Redis
|
||||
redis_start = time.perf_counter()
|
||||
redis = await get_redis_async()
|
||||
redis_time = (time.perf_counter() - redis_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] get_redis_async took {redis_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": redis_time}},
|
||||
)
|
||||
|
||||
meta_key = _get_task_meta_key(task_id)
|
||||
op_key = _get_operation_mapping_key(operation_id)
|
||||
|
||||
hset_start = time.perf_counter()
|
||||
await redis.hset( # type: ignore[misc]
|
||||
meta_key,
|
||||
mapping={
|
||||
@@ -157,22 +131,12 @@ async def create_task(
|
||||
"created_at": task.created_at.isoformat(),
|
||||
},
|
||||
)
|
||||
hset_time = (time.perf_counter() - hset_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] redis.hset took {hset_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": hset_time}},
|
||||
)
|
||||
|
||||
await redis.expire(meta_key, config.stream_ttl)
|
||||
|
||||
# Create operation_id -> task_id mapping for webhook lookups
|
||||
await redis.set(op_key, task_id, ex=config.stream_ttl)
|
||||
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] create_task COMPLETED in {total_time:.1f}ms; task={task_id}, session={session_id}",
|
||||
extra={"json_fields": {**log_meta, "total_time_ms": total_time}},
|
||||
)
|
||||
logger.debug(f"Created task {task_id} for session {session_id}")
|
||||
|
||||
return task
|
||||
|
||||
@@ -192,60 +156,26 @@ async def publish_chunk(
|
||||
Returns:
|
||||
The Redis Stream message ID
|
||||
"""
|
||||
import time
|
||||
|
||||
start_time = time.perf_counter()
|
||||
chunk_type = type(chunk).__name__
|
||||
chunk_json = chunk.model_dump_json()
|
||||
message_id = "0-0"
|
||||
|
||||
# Build log metadata
|
||||
log_meta = {
|
||||
"component": "StreamRegistry",
|
||||
"task_id": task_id,
|
||||
"chunk_type": chunk_type,
|
||||
}
|
||||
|
||||
try:
|
||||
redis = await get_redis_async()
|
||||
stream_key = _get_task_stream_key(task_id)
|
||||
|
||||
# Write to Redis Stream for persistence and real-time delivery
|
||||
xadd_start = time.perf_counter()
|
||||
raw_id = await redis.xadd(
|
||||
stream_key,
|
||||
{"data": chunk_json},
|
||||
maxlen=config.stream_max_length,
|
||||
)
|
||||
xadd_time = (time.perf_counter() - xadd_start) * 1000
|
||||
message_id = raw_id if isinstance(raw_id, str) else raw_id.decode()
|
||||
|
||||
# Set TTL on stream to match task metadata TTL
|
||||
await redis.expire(stream_key, config.stream_ttl)
|
||||
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
# Only log timing for significant chunks or slow operations
|
||||
if (
|
||||
chunk_type
|
||||
in ("StreamStart", "StreamFinish", "StreamTextStart", "StreamTextEnd")
|
||||
or total_time > 50
|
||||
):
|
||||
logger.info(
|
||||
f"[TIMING] publish_chunk {chunk_type} in {total_time:.1f}ms (xadd={xadd_time:.1f}ms)",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"total_time_ms": total_time,
|
||||
"xadd_time_ms": xadd_time,
|
||||
"message_id": message_id,
|
||||
}
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
elapsed = (time.perf_counter() - start_time) * 1000
|
||||
logger.error(
|
||||
f"[TIMING] Failed to publish chunk {chunk_type} after {elapsed:.1f}ms: {e}",
|
||||
extra={"json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)}},
|
||||
f"Failed to publish chunk for task {task_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
@@ -270,61 +200,24 @@ async def subscribe_to_task(
|
||||
An asyncio Queue that will receive stream chunks, or None if task not found
|
||||
or user doesn't have access
|
||||
"""
|
||||
import time
|
||||
|
||||
start_time = time.perf_counter()
|
||||
|
||||
# Build log metadata
|
||||
log_meta = {"component": "StreamRegistry", "task_id": task_id}
|
||||
if user_id:
|
||||
log_meta["user_id"] = user_id
|
||||
|
||||
logger.info(
|
||||
f"[TIMING] subscribe_to_task STARTED, task={task_id}, user={user_id}, last_msg={last_message_id}",
|
||||
extra={"json_fields": {**log_meta, "last_message_id": last_message_id}},
|
||||
)
|
||||
|
||||
redis_start = time.perf_counter()
|
||||
redis = await get_redis_async()
|
||||
meta_key = _get_task_meta_key(task_id)
|
||||
meta: dict[Any, Any] = await redis.hgetall(meta_key) # type: ignore[misc]
|
||||
hgetall_time = (time.perf_counter() - redis_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] Redis hgetall took {hgetall_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": hgetall_time}},
|
||||
)
|
||||
|
||||
if not meta:
|
||||
elapsed = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] Task not found in Redis after {elapsed:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"elapsed_ms": elapsed,
|
||||
"reason": "task_not_found",
|
||||
}
|
||||
},
|
||||
)
|
||||
logger.debug(f"Task {task_id} not found in Redis")
|
||||
return None
|
||||
|
||||
# Note: Redis client uses decode_responses=True, so keys are strings
|
||||
task_status = meta.get("status", "")
|
||||
task_user_id = meta.get("user_id", "") or None
|
||||
log_meta["session_id"] = meta.get("session_id", "")
|
||||
|
||||
# Validate ownership - if task has an owner, requester must match
|
||||
if task_user_id:
|
||||
if user_id != task_user_id:
|
||||
logger.warning(
|
||||
f"[TIMING] Access denied: user {user_id} tried to access task owned by {task_user_id}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"task_owner": task_user_id,
|
||||
"reason": "access_denied",
|
||||
}
|
||||
},
|
||||
f"User {user_id} denied access to task {task_id} "
|
||||
f"owned by {task_user_id}"
|
||||
)
|
||||
return None
|
||||
|
||||
@@ -332,19 +225,7 @@ async def subscribe_to_task(
|
||||
stream_key = _get_task_stream_key(task_id)
|
||||
|
||||
# Step 1: Replay messages from Redis Stream
|
||||
xread_start = time.perf_counter()
|
||||
messages = await redis.xread({stream_key: last_message_id}, block=0, count=1000)
|
||||
xread_time = (time.perf_counter() - xread_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] Redis xread (replay) took {xread_time:.1f}ms, status={task_status}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"duration_ms": xread_time,
|
||||
"task_status": task_status,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
replayed_count = 0
|
||||
replay_last_id = last_message_id
|
||||
@@ -363,48 +244,19 @@ async def subscribe_to_task(
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to replay message: {e}")
|
||||
|
||||
logger.info(
|
||||
f"[TIMING] Replayed {replayed_count} messages, last_id={replay_last_id}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"n_messages_replayed": replayed_count,
|
||||
"replay_last_id": replay_last_id,
|
||||
}
|
||||
},
|
||||
)
|
||||
logger.debug(f"Task {task_id}: replayed {replayed_count} messages")
|
||||
|
||||
# Step 2: If task is still running, start stream listener for live updates
|
||||
if task_status == "running":
|
||||
logger.info(
|
||||
"[TIMING] Task still running, starting _stream_listener",
|
||||
extra={"json_fields": {**log_meta, "task_status": task_status}},
|
||||
)
|
||||
listener_task = asyncio.create_task(
|
||||
_stream_listener(task_id, subscriber_queue, replay_last_id, log_meta)
|
||||
_stream_listener(task_id, subscriber_queue, replay_last_id)
|
||||
)
|
||||
# Track listener task for cleanup on unsubscribe
|
||||
_listener_tasks[id(subscriber_queue)] = (task_id, listener_task)
|
||||
else:
|
||||
# Task is completed/failed - add finish marker
|
||||
logger.info(
|
||||
f"[TIMING] Task already {task_status}, adding StreamFinish",
|
||||
extra={"json_fields": {**log_meta, "task_status": task_status}},
|
||||
)
|
||||
await subscriber_queue.put(StreamFinish())
|
||||
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] subscribe_to_task COMPLETED in {total_time:.1f}ms; task={task_id}, "
|
||||
f"n_messages_replayed={replayed_count}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"total_time_ms": total_time,
|
||||
"n_messages_replayed": replayed_count,
|
||||
}
|
||||
},
|
||||
)
|
||||
return subscriber_queue
|
||||
|
||||
|
||||
@@ -412,7 +264,6 @@ async def _stream_listener(
|
||||
task_id: str,
|
||||
subscriber_queue: asyncio.Queue[StreamBaseResponse],
|
||||
last_replayed_id: str,
|
||||
log_meta: dict | None = None,
|
||||
) -> None:
|
||||
"""Listen to Redis Stream for new messages using blocking XREAD.
|
||||
|
||||
@@ -423,27 +274,10 @@ async def _stream_listener(
|
||||
task_id: Task ID to listen for
|
||||
subscriber_queue: Queue to deliver messages to
|
||||
last_replayed_id: Last message ID from replay (continue from here)
|
||||
log_meta: Structured logging metadata
|
||||
"""
|
||||
import time
|
||||
|
||||
start_time = time.perf_counter()
|
||||
|
||||
# Use provided log_meta or build minimal one
|
||||
if log_meta is None:
|
||||
log_meta = {"component": "StreamRegistry", "task_id": task_id}
|
||||
|
||||
logger.info(
|
||||
f"[TIMING] _stream_listener STARTED, task={task_id}, last_id={last_replayed_id}",
|
||||
extra={"json_fields": {**log_meta, "last_replayed_id": last_replayed_id}},
|
||||
)
|
||||
|
||||
queue_id = id(subscriber_queue)
|
||||
# Track the last successfully delivered message ID for recovery hints
|
||||
last_delivered_id = last_replayed_id
|
||||
messages_delivered = 0
|
||||
first_message_time = None
|
||||
xread_count = 0
|
||||
|
||||
try:
|
||||
redis = await get_redis_async()
|
||||
@@ -453,39 +287,9 @@ async def _stream_listener(
|
||||
while True:
|
||||
# Block for up to 30 seconds waiting for new messages
|
||||
# This allows periodic checking if task is still running
|
||||
xread_start = time.perf_counter()
|
||||
xread_count += 1
|
||||
messages = await redis.xread(
|
||||
{stream_key: current_id}, block=30000, count=100
|
||||
)
|
||||
xread_time = (time.perf_counter() - xread_start) * 1000
|
||||
|
||||
if messages:
|
||||
msg_count = sum(len(msgs) for _, msgs in messages)
|
||||
logger.info(
|
||||
f"[TIMING] xread #{xread_count} returned {msg_count} messages in {xread_time:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"xread_count": xread_count,
|
||||
"n_messages": msg_count,
|
||||
"duration_ms": xread_time,
|
||||
}
|
||||
},
|
||||
)
|
||||
elif xread_time > 1000:
|
||||
# Only log timeouts (30s blocking)
|
||||
logger.info(
|
||||
f"[TIMING] xread #{xread_count} timeout after {xread_time:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"xread_count": xread_count,
|
||||
"duration_ms": xread_time,
|
||||
"reason": "timeout",
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
if not messages:
|
||||
# Timeout - check if task is still running
|
||||
@@ -522,30 +326,10 @@ async def _stream_listener(
|
||||
)
|
||||
# Update last delivered ID on successful delivery
|
||||
last_delivered_id = current_id
|
||||
messages_delivered += 1
|
||||
if first_message_time is None:
|
||||
first_message_time = time.perf_counter()
|
||||
elapsed = (first_message_time - start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] FIRST live message at {elapsed:.1f}ms, type={type(chunk).__name__}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"elapsed_ms": elapsed,
|
||||
"chunk_type": type(chunk).__name__,
|
||||
}
|
||||
},
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(
|
||||
f"[TIMING] Subscriber queue full, delivery timed out after {QUEUE_PUT_TIMEOUT}s",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"timeout_s": QUEUE_PUT_TIMEOUT,
|
||||
"reason": "queue_full",
|
||||
}
|
||||
},
|
||||
f"Subscriber queue full for task {task_id}, "
|
||||
f"message delivery timed out after {QUEUE_PUT_TIMEOUT}s"
|
||||
)
|
||||
# Send overflow error with recovery info
|
||||
try:
|
||||
@@ -567,44 +351,15 @@ async def _stream_listener(
|
||||
|
||||
# Stop listening on finish
|
||||
if isinstance(chunk, StreamFinish):
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] StreamFinish received in {total_time/1000:.1f}s; delivered={messages_delivered}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"total_time_ms": total_time,
|
||||
"messages_delivered": messages_delivered,
|
||||
}
|
||||
},
|
||||
)
|
||||
return
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Error processing stream message: {e}",
|
||||
extra={"json_fields": {**log_meta, "error": str(e)}},
|
||||
)
|
||||
logger.warning(f"Error processing stream message: {e}")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
elapsed = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] _stream_listener CANCELLED after {elapsed:.1f}ms, delivered={messages_delivered}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"elapsed_ms": elapsed,
|
||||
"messages_delivered": messages_delivered,
|
||||
"reason": "cancelled",
|
||||
}
|
||||
},
|
||||
)
|
||||
logger.debug(f"Stream listener cancelled for task {task_id}")
|
||||
raise # Re-raise to propagate cancellation
|
||||
except Exception as e:
|
||||
elapsed = (time.perf_counter() - start_time) * 1000
|
||||
logger.error(
|
||||
f"[TIMING] _stream_listener ERROR after {elapsed:.1f}ms: {e}",
|
||||
extra={"json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)}},
|
||||
)
|
||||
logger.error(f"Stream listener error for task {task_id}: {e}")
|
||||
# On error, send finish to unblock subscriber
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
@@ -613,24 +368,10 @@ async def _stream_listener(
|
||||
)
|
||||
except (asyncio.TimeoutError, asyncio.QueueFull):
|
||||
logger.warning(
|
||||
"Could not deliver finish event after error",
|
||||
extra={"json_fields": log_meta},
|
||||
f"Could not deliver finish event for task {task_id} after error"
|
||||
)
|
||||
finally:
|
||||
# Clean up listener task mapping on exit
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] _stream_listener FINISHED in {total_time/1000:.1f}s; task={task_id}, "
|
||||
f"delivered={messages_delivered}, xread_count={xread_count}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"total_time_ms": total_time,
|
||||
"messages_delivered": messages_delivered,
|
||||
"xread_count": xread_count,
|
||||
}
|
||||
},
|
||||
)
|
||||
_listener_tasks.pop(queue_id, None)
|
||||
|
||||
|
||||
|
||||
@@ -13,32 +13,10 @@ from backend.api.features.chat.tools.models import (
|
||||
NoResultsResponse,
|
||||
)
|
||||
from backend.api.features.store.hybrid_search import unified_hybrid_search
|
||||
from backend.data.block import BlockType, get_block
|
||||
from backend.data.block import get_block
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_TARGET_RESULTS = 10
|
||||
# Over-fetch to compensate for post-hoc filtering of graph-only blocks.
|
||||
# 40 is 2x current removed; speed of query 10 vs 40 is minimial
|
||||
_OVERFETCH_PAGE_SIZE = 40
|
||||
|
||||
# Block types that only work within graphs and cannot run standalone in CoPilot.
|
||||
COPILOT_EXCLUDED_BLOCK_TYPES = {
|
||||
BlockType.INPUT, # Graph interface definition - data enters via chat, not graph inputs
|
||||
BlockType.OUTPUT, # Graph interface definition - data exits via chat, not graph outputs
|
||||
BlockType.WEBHOOK, # Wait for external events - would hang forever in CoPilot
|
||||
BlockType.WEBHOOK_MANUAL, # Same as WEBHOOK
|
||||
BlockType.NOTE, # Visual annotation only - no runtime behavior
|
||||
BlockType.HUMAN_IN_THE_LOOP, # Pauses for human approval - CoPilot IS human-in-the-loop
|
||||
BlockType.AGENT, # AgentExecutorBlock requires execution_context - use run_agent tool
|
||||
}
|
||||
|
||||
# Specific block IDs excluded from CoPilot (STANDARD type but still require graph context)
|
||||
COPILOT_EXCLUDED_BLOCK_IDS = {
|
||||
# SmartDecisionMakerBlock - dynamically discovers downstream blocks via graph topology
|
||||
"3b191d9f-356f-482d-8238-ba04b6d18381",
|
||||
}
|
||||
|
||||
|
||||
class FindBlockTool(BaseTool):
|
||||
"""Tool for searching available blocks."""
|
||||
@@ -110,7 +88,7 @@ class FindBlockTool(BaseTool):
|
||||
query=query,
|
||||
content_types=[ContentType.BLOCK],
|
||||
page=1,
|
||||
page_size=_OVERFETCH_PAGE_SIZE,
|
||||
page_size=10,
|
||||
)
|
||||
|
||||
if not results:
|
||||
@@ -130,90 +108,60 @@ class FindBlockTool(BaseTool):
|
||||
block = get_block(block_id)
|
||||
|
||||
# Skip disabled blocks
|
||||
if not block or block.disabled:
|
||||
continue
|
||||
if block and not block.disabled:
|
||||
# Get input/output schemas
|
||||
input_schema = {}
|
||||
output_schema = {}
|
||||
try:
|
||||
input_schema = block.input_schema.jsonschema()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
output_schema = block.output_schema.jsonschema()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Skip blocks excluded from CoPilot (graph-only blocks)
|
||||
if (
|
||||
block.block_type in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
or block.id in COPILOT_EXCLUDED_BLOCK_IDS
|
||||
):
|
||||
continue
|
||||
# Get categories from block instance
|
||||
categories = []
|
||||
if hasattr(block, "categories") and block.categories:
|
||||
categories = [cat.value for cat in block.categories]
|
||||
|
||||
# Get input/output schemas
|
||||
input_schema = {}
|
||||
output_schema = {}
|
||||
try:
|
||||
input_schema = block.input_schema.jsonschema()
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"Failed to generate input schema for block %s: %s",
|
||||
block_id,
|
||||
e,
|
||||
)
|
||||
try:
|
||||
output_schema = block.output_schema.jsonschema()
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"Failed to generate output schema for block %s: %s",
|
||||
block_id,
|
||||
e,
|
||||
)
|
||||
|
||||
# Get categories from block instance
|
||||
categories = []
|
||||
if hasattr(block, "categories") and block.categories:
|
||||
categories = [cat.value for cat in block.categories]
|
||||
|
||||
# Extract required inputs for easier use
|
||||
required_inputs: list[BlockInputFieldInfo] = []
|
||||
if input_schema:
|
||||
properties = input_schema.get("properties", {})
|
||||
required_fields = set(input_schema.get("required", []))
|
||||
# Get credential field names to exclude from required inputs
|
||||
credentials_fields = set(
|
||||
block.input_schema.get_credentials_fields().keys()
|
||||
)
|
||||
|
||||
for field_name, field_schema in properties.items():
|
||||
# Skip credential fields - they're handled separately
|
||||
if field_name in credentials_fields:
|
||||
continue
|
||||
|
||||
required_inputs.append(
|
||||
BlockInputFieldInfo(
|
||||
name=field_name,
|
||||
type=field_schema.get("type", "string"),
|
||||
description=field_schema.get("description", ""),
|
||||
required=field_name in required_fields,
|
||||
default=field_schema.get("default"),
|
||||
)
|
||||
# Extract required inputs for easier use
|
||||
required_inputs: list[BlockInputFieldInfo] = []
|
||||
if input_schema:
|
||||
properties = input_schema.get("properties", {})
|
||||
required_fields = set(input_schema.get("required", []))
|
||||
# Get credential field names to exclude from required inputs
|
||||
credentials_fields = set(
|
||||
block.input_schema.get_credentials_fields().keys()
|
||||
)
|
||||
|
||||
blocks.append(
|
||||
BlockInfoSummary(
|
||||
id=block_id,
|
||||
name=block.name,
|
||||
description=block.description or "",
|
||||
categories=categories,
|
||||
input_schema=input_schema,
|
||||
output_schema=output_schema,
|
||||
required_inputs=required_inputs,
|
||||
for field_name, field_schema in properties.items():
|
||||
# Skip credential fields - they're handled separately
|
||||
if field_name in credentials_fields:
|
||||
continue
|
||||
|
||||
required_inputs.append(
|
||||
BlockInputFieldInfo(
|
||||
name=field_name,
|
||||
type=field_schema.get("type", "string"),
|
||||
description=field_schema.get("description", ""),
|
||||
required=field_name in required_fields,
|
||||
default=field_schema.get("default"),
|
||||
)
|
||||
)
|
||||
|
||||
blocks.append(
|
||||
BlockInfoSummary(
|
||||
id=block_id,
|
||||
name=block.name,
|
||||
description=block.description or "",
|
||||
categories=categories,
|
||||
input_schema=input_schema,
|
||||
output_schema=output_schema,
|
||||
required_inputs=required_inputs,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
if len(blocks) >= _TARGET_RESULTS:
|
||||
break
|
||||
|
||||
if blocks and len(blocks) < _TARGET_RESULTS:
|
||||
logger.debug(
|
||||
"find_block returned %d/%d results for query '%s' "
|
||||
"(filtered %d excluded/disabled blocks)",
|
||||
len(blocks),
|
||||
_TARGET_RESULTS,
|
||||
query,
|
||||
len(results) - len(blocks),
|
||||
)
|
||||
|
||||
if not blocks:
|
||||
return NoResultsResponse(
|
||||
|
||||
@@ -1,139 +0,0 @@
|
||||
"""Tests for block filtering in FindBlockTool."""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from backend.api.features.chat.tools.find_block import (
|
||||
COPILOT_EXCLUDED_BLOCK_IDS,
|
||||
COPILOT_EXCLUDED_BLOCK_TYPES,
|
||||
FindBlockTool,
|
||||
)
|
||||
from backend.api.features.chat.tools.models import BlockListResponse
|
||||
from backend.data.block import BlockType
|
||||
|
||||
from ._test_data import make_session
|
||||
|
||||
_TEST_USER_ID = "test-user-find-block"
|
||||
|
||||
|
||||
def make_mock_block(
|
||||
block_id: str, name: str, block_type: BlockType, disabled: bool = False
|
||||
):
|
||||
"""Create a mock block for testing."""
|
||||
mock = MagicMock()
|
||||
mock.id = block_id
|
||||
mock.name = name
|
||||
mock.description = f"{name} description"
|
||||
mock.block_type = block_type
|
||||
mock.disabled = disabled
|
||||
mock.input_schema = MagicMock()
|
||||
mock.input_schema.jsonschema.return_value = {"properties": {}, "required": []}
|
||||
mock.input_schema.get_credentials_fields.return_value = {}
|
||||
mock.output_schema = MagicMock()
|
||||
mock.output_schema.jsonschema.return_value = {}
|
||||
mock.categories = []
|
||||
return mock
|
||||
|
||||
|
||||
class TestFindBlockFiltering:
|
||||
"""Tests for block filtering in FindBlockTool."""
|
||||
|
||||
def test_excluded_block_types_contains_expected_types(self):
|
||||
"""Verify COPILOT_EXCLUDED_BLOCK_TYPES contains all graph-only types."""
|
||||
assert BlockType.INPUT in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
assert BlockType.OUTPUT in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
assert BlockType.WEBHOOK in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
assert BlockType.WEBHOOK_MANUAL in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
assert BlockType.NOTE in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
assert BlockType.HUMAN_IN_THE_LOOP in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
assert BlockType.AGENT in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
|
||||
def test_excluded_block_ids_contains_smart_decision_maker(self):
|
||||
"""Verify SmartDecisionMakerBlock is in COPILOT_EXCLUDED_BLOCK_IDS."""
|
||||
assert "3b191d9f-356f-482d-8238-ba04b6d18381" in COPILOT_EXCLUDED_BLOCK_IDS
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_excluded_block_type_filtered_from_results(self):
|
||||
"""Verify blocks with excluded BlockTypes are filtered from search results."""
|
||||
session = make_session(user_id=_TEST_USER_ID)
|
||||
|
||||
# Mock search returns an INPUT block (excluded) and a STANDARD block (included)
|
||||
search_results = [
|
||||
{"content_id": "input-block-id", "score": 0.9},
|
||||
{"content_id": "standard-block-id", "score": 0.8},
|
||||
]
|
||||
|
||||
input_block = make_mock_block("input-block-id", "Input Block", BlockType.INPUT)
|
||||
standard_block = make_mock_block(
|
||||
"standard-block-id", "HTTP Request", BlockType.STANDARD
|
||||
)
|
||||
|
||||
def mock_get_block(block_id):
|
||||
return {
|
||||
"input-block-id": input_block,
|
||||
"standard-block-id": standard_block,
|
||||
}.get(block_id)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.tools.find_block.unified_hybrid_search",
|
||||
new_callable=AsyncMock,
|
||||
return_value=(search_results, 2),
|
||||
):
|
||||
with patch(
|
||||
"backend.api.features.chat.tools.find_block.get_block",
|
||||
side_effect=mock_get_block,
|
||||
):
|
||||
tool = FindBlockTool()
|
||||
response = await tool._execute(
|
||||
user_id=_TEST_USER_ID, session=session, query="test"
|
||||
)
|
||||
|
||||
# Should only return the standard block, not the INPUT block
|
||||
assert isinstance(response, BlockListResponse)
|
||||
assert len(response.blocks) == 1
|
||||
assert response.blocks[0].id == "standard-block-id"
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_excluded_block_id_filtered_from_results(self):
|
||||
"""Verify SmartDecisionMakerBlock is filtered from search results."""
|
||||
session = make_session(user_id=_TEST_USER_ID)
|
||||
|
||||
smart_decision_id = "3b191d9f-356f-482d-8238-ba04b6d18381"
|
||||
search_results = [
|
||||
{"content_id": smart_decision_id, "score": 0.9},
|
||||
{"content_id": "normal-block-id", "score": 0.8},
|
||||
]
|
||||
|
||||
# SmartDecisionMakerBlock has STANDARD type but is excluded by ID
|
||||
smart_block = make_mock_block(
|
||||
smart_decision_id, "Smart Decision Maker", BlockType.STANDARD
|
||||
)
|
||||
normal_block = make_mock_block(
|
||||
"normal-block-id", "Normal Block", BlockType.STANDARD
|
||||
)
|
||||
|
||||
def mock_get_block(block_id):
|
||||
return {
|
||||
smart_decision_id: smart_block,
|
||||
"normal-block-id": normal_block,
|
||||
}.get(block_id)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.tools.find_block.unified_hybrid_search",
|
||||
new_callable=AsyncMock,
|
||||
return_value=(search_results, 2),
|
||||
):
|
||||
with patch(
|
||||
"backend.api.features.chat.tools.find_block.get_block",
|
||||
side_effect=mock_get_block,
|
||||
):
|
||||
tool = FindBlockTool()
|
||||
response = await tool._execute(
|
||||
user_id=_TEST_USER_ID, session=session, query="decision"
|
||||
)
|
||||
|
||||
# Should only return normal block, not SmartDecisionMakerBlock
|
||||
assert isinstance(response, BlockListResponse)
|
||||
assert len(response.blocks) == 1
|
||||
assert response.blocks[0].id == "normal-block-id"
|
||||
@@ -1,29 +0,0 @@
|
||||
"""Shared helpers for chat tools."""
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
def get_inputs_from_schema(
|
||||
input_schema: dict[str, Any],
|
||||
exclude_fields: set[str] | None = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Extract input field info from JSON schema."""
|
||||
if not isinstance(input_schema, dict):
|
||||
return []
|
||||
|
||||
exclude = exclude_fields or set()
|
||||
properties = input_schema.get("properties", {})
|
||||
required = set(input_schema.get("required", []))
|
||||
|
||||
return [
|
||||
{
|
||||
"name": name,
|
||||
"title": schema.get("title", name),
|
||||
"type": schema.get("type", "string"),
|
||||
"description": schema.get("description", ""),
|
||||
"required": name in required,
|
||||
"default": schema.get("default"),
|
||||
}
|
||||
for name, schema in properties.items()
|
||||
if name not in exclude
|
||||
]
|
||||
@@ -24,7 +24,6 @@ from backend.util.timezone_utils import (
|
||||
)
|
||||
|
||||
from .base import BaseTool
|
||||
from .helpers import get_inputs_from_schema
|
||||
from .models import (
|
||||
AgentDetails,
|
||||
AgentDetailsResponse,
|
||||
@@ -262,7 +261,7 @@ class RunAgentTool(BaseTool):
|
||||
),
|
||||
requirements={
|
||||
"credentials": requirements_creds_list,
|
||||
"inputs": get_inputs_from_schema(graph.input_schema),
|
||||
"inputs": self._get_inputs_list(graph.input_schema),
|
||||
"execution_modes": self._get_execution_modes(graph),
|
||||
},
|
||||
),
|
||||
@@ -370,6 +369,22 @@ class RunAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
def _get_inputs_list(self, input_schema: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
"""Extract inputs list from schema."""
|
||||
inputs_list = []
|
||||
if isinstance(input_schema, dict) and "properties" in input_schema:
|
||||
for field_name, field_schema in input_schema["properties"].items():
|
||||
inputs_list.append(
|
||||
{
|
||||
"name": field_name,
|
||||
"title": field_schema.get("title", field_name),
|
||||
"type": field_schema.get("type", "string"),
|
||||
"description": field_schema.get("description", ""),
|
||||
"required": field_name in input_schema.get("required", []),
|
||||
}
|
||||
)
|
||||
return inputs_list
|
||||
|
||||
def _get_execution_modes(self, graph: GraphModel) -> list[str]:
|
||||
"""Get available execution modes for the graph."""
|
||||
trigger_info = graph.trigger_setup_info
|
||||
@@ -383,7 +398,7 @@ class RunAgentTool(BaseTool):
|
||||
suffix: str,
|
||||
) -> str:
|
||||
"""Build a message describing available inputs for an agent."""
|
||||
inputs_list = get_inputs_from_schema(graph.input_schema)
|
||||
inputs_list = self._get_inputs_list(graph.input_schema)
|
||||
required_names = [i["name"] for i in inputs_list if i["required"]]
|
||||
optional_names = [i["name"] for i in inputs_list if not i["required"]]
|
||||
|
||||
|
||||
@@ -8,19 +8,14 @@ from typing import Any
|
||||
from pydantic_core import PydanticUndefined
|
||||
|
||||
from backend.api.features.chat.model import ChatSession
|
||||
from backend.api.features.chat.tools.find_block import (
|
||||
COPILOT_EXCLUDED_BLOCK_IDS,
|
||||
COPILOT_EXCLUDED_BLOCK_TYPES,
|
||||
)
|
||||
from backend.data.block import AnyBlockSchema, get_block
|
||||
from backend.data.block import get_block
|
||||
from backend.data.execution import ExecutionContext
|
||||
from backend.data.model import CredentialsFieldInfo, CredentialsMetaInput
|
||||
from backend.data.model import CredentialsMetaInput
|
||||
from backend.data.workspace import get_or_create_workspace
|
||||
from backend.integrations.creds_manager import IntegrationCredentialsManager
|
||||
from backend.util.exceptions import BlockError
|
||||
|
||||
from .base import BaseTool
|
||||
from .helpers import get_inputs_from_schema
|
||||
from .models import (
|
||||
BlockOutputResponse,
|
||||
ErrorResponse,
|
||||
@@ -29,10 +24,7 @@ from .models import (
|
||||
ToolResponseBase,
|
||||
UserReadiness,
|
||||
)
|
||||
from .utils import (
|
||||
build_missing_credentials_from_field_info,
|
||||
match_credentials_to_requirements,
|
||||
)
|
||||
from .utils import build_missing_credentials_from_field_info
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -81,6 +73,91 @@ class RunBlockTool(BaseTool):
|
||||
def requires_auth(self) -> bool:
|
||||
return True
|
||||
|
||||
async def _check_block_credentials(
|
||||
self,
|
||||
user_id: str,
|
||||
block: Any,
|
||||
input_data: dict[str, Any] | None = None,
|
||||
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
|
||||
"""
|
||||
Check if user has required credentials for a block.
|
||||
|
||||
Args:
|
||||
user_id: User ID
|
||||
block: Block to check credentials for
|
||||
input_data: Input data for the block (used to determine provider via discriminator)
|
||||
|
||||
Returns:
|
||||
tuple[matched_credentials, missing_credentials]
|
||||
"""
|
||||
matched_credentials: dict[str, CredentialsMetaInput] = {}
|
||||
missing_credentials: list[CredentialsMetaInput] = []
|
||||
input_data = input_data or {}
|
||||
|
||||
# Get credential field info from block's input schema
|
||||
credentials_fields_info = block.input_schema.get_credentials_fields_info()
|
||||
|
||||
if not credentials_fields_info:
|
||||
return matched_credentials, missing_credentials
|
||||
|
||||
# Get user's available credentials
|
||||
creds_manager = IntegrationCredentialsManager()
|
||||
available_creds = await creds_manager.store.get_all_creds(user_id)
|
||||
|
||||
for field_name, field_info in credentials_fields_info.items():
|
||||
effective_field_info = field_info
|
||||
if field_info.discriminator and field_info.discriminator_mapping:
|
||||
# Get discriminator from input, falling back to schema default
|
||||
discriminator_value = input_data.get(field_info.discriminator)
|
||||
if discriminator_value is None:
|
||||
field = block.input_schema.model_fields.get(
|
||||
field_info.discriminator
|
||||
)
|
||||
if field and field.default is not PydanticUndefined:
|
||||
discriminator_value = field.default
|
||||
|
||||
if (
|
||||
discriminator_value
|
||||
and discriminator_value in field_info.discriminator_mapping
|
||||
):
|
||||
effective_field_info = field_info.discriminate(discriminator_value)
|
||||
logger.debug(
|
||||
f"Discriminated provider for {field_name}: "
|
||||
f"{discriminator_value} -> {effective_field_info.provider}"
|
||||
)
|
||||
|
||||
matching_cred = next(
|
||||
(
|
||||
cred
|
||||
for cred in available_creds
|
||||
if cred.provider in effective_field_info.provider
|
||||
and cred.type in effective_field_info.supported_types
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
if matching_cred:
|
||||
matched_credentials[field_name] = CredentialsMetaInput(
|
||||
id=matching_cred.id,
|
||||
provider=matching_cred.provider, # type: ignore
|
||||
type=matching_cred.type,
|
||||
title=matching_cred.title,
|
||||
)
|
||||
else:
|
||||
# Create a placeholder for the missing credential
|
||||
provider = next(iter(effective_field_info.provider), "unknown")
|
||||
cred_type = next(iter(effective_field_info.supported_types), "api_key")
|
||||
missing_credentials.append(
|
||||
CredentialsMetaInput(
|
||||
id=field_name,
|
||||
provider=provider, # type: ignore
|
||||
type=cred_type, # type: ignore
|
||||
title=field_name.replace("_", " ").title(),
|
||||
)
|
||||
)
|
||||
|
||||
return matched_credentials, missing_credentials
|
||||
|
||||
async def _execute(
|
||||
self,
|
||||
user_id: str | None,
|
||||
@@ -135,24 +212,11 @@ class RunBlockTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Check if block is excluded from CoPilot (graph-only blocks)
|
||||
if (
|
||||
block.block_type in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
or block.id in COPILOT_EXCLUDED_BLOCK_IDS
|
||||
):
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
f"Block '{block.name}' cannot be run directly in CoPilot. "
|
||||
"This block is designed for use within graphs only."
|
||||
),
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
logger.info(f"Executing block {block.name} ({block_id}) for user {user_id}")
|
||||
|
||||
creds_manager = IntegrationCredentialsManager()
|
||||
matched_credentials, missing_credentials = (
|
||||
await self._resolve_block_credentials(user_id, block, input_data)
|
||||
matched_credentials, missing_credentials = await self._check_block_credentials(
|
||||
user_id, block, input_data
|
||||
)
|
||||
|
||||
if missing_credentials:
|
||||
@@ -281,75 +345,29 @@ class RunBlockTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
async def _resolve_block_credentials(
|
||||
self,
|
||||
user_id: str,
|
||||
block: AnyBlockSchema,
|
||||
input_data: dict[str, Any] | None = None,
|
||||
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
|
||||
"""
|
||||
Resolve credentials for a block by matching user's available credentials.
|
||||
|
||||
Args:
|
||||
user_id: User ID
|
||||
block: Block to resolve credentials for
|
||||
input_data: Input data for the block (used to determine provider via discriminator)
|
||||
|
||||
Returns:
|
||||
tuple of (matched_credentials, missing_credentials) - matched credentials
|
||||
are used for block execution, missing ones indicate setup requirements.
|
||||
"""
|
||||
input_data = input_data or {}
|
||||
requirements = self._resolve_discriminated_credentials(block, input_data)
|
||||
|
||||
if not requirements:
|
||||
return {}, []
|
||||
|
||||
return await match_credentials_to_requirements(user_id, requirements)
|
||||
|
||||
def _get_inputs_list(self, block: AnyBlockSchema) -> list[dict[str, Any]]:
|
||||
def _get_inputs_list(self, block: Any) -> list[dict[str, Any]]:
|
||||
"""Extract non-credential inputs from block schema."""
|
||||
inputs_list = []
|
||||
schema = block.input_schema.jsonschema()
|
||||
properties = schema.get("properties", {})
|
||||
required_fields = set(schema.get("required", []))
|
||||
|
||||
# Get credential field names to exclude
|
||||
credentials_fields = set(block.input_schema.get_credentials_fields().keys())
|
||||
return get_inputs_from_schema(schema, exclude_fields=credentials_fields)
|
||||
|
||||
def _resolve_discriminated_credentials(
|
||||
self,
|
||||
block: AnyBlockSchema,
|
||||
input_data: dict[str, Any],
|
||||
) -> dict[str, CredentialsFieldInfo]:
|
||||
"""Resolve credential requirements, applying discriminator logic where needed."""
|
||||
credentials_fields_info = block.input_schema.get_credentials_fields_info()
|
||||
if not credentials_fields_info:
|
||||
return {}
|
||||
for field_name, field_schema in properties.items():
|
||||
# Skip credential fields
|
||||
if field_name in credentials_fields:
|
||||
continue
|
||||
|
||||
resolved: dict[str, CredentialsFieldInfo] = {}
|
||||
inputs_list.append(
|
||||
{
|
||||
"name": field_name,
|
||||
"title": field_schema.get("title", field_name),
|
||||
"type": field_schema.get("type", "string"),
|
||||
"description": field_schema.get("description", ""),
|
||||
"required": field_name in required_fields,
|
||||
}
|
||||
)
|
||||
|
||||
for field_name, field_info in credentials_fields_info.items():
|
||||
effective_field_info = field_info
|
||||
|
||||
if field_info.discriminator and field_info.discriminator_mapping:
|
||||
discriminator_value = input_data.get(field_info.discriminator)
|
||||
if discriminator_value is None:
|
||||
field = block.input_schema.model_fields.get(
|
||||
field_info.discriminator
|
||||
)
|
||||
if field and field.default is not PydanticUndefined:
|
||||
discriminator_value = field.default
|
||||
|
||||
if (
|
||||
discriminator_value
|
||||
and discriminator_value in field_info.discriminator_mapping
|
||||
):
|
||||
effective_field_info = field_info.discriminate(discriminator_value)
|
||||
# For host-scoped credentials, add the discriminator value
|
||||
# (e.g., URL) so _credential_is_for_host can match it
|
||||
effective_field_info.discriminator_values.add(discriminator_value)
|
||||
logger.debug(
|
||||
f"Discriminated provider for {field_name}: "
|
||||
f"{discriminator_value} -> {effective_field_info.provider}"
|
||||
)
|
||||
|
||||
resolved[field_name] = effective_field_info
|
||||
|
||||
return resolved
|
||||
return inputs_list
|
||||
|
||||
@@ -1,106 +0,0 @@
|
||||
"""Tests for block execution guards in RunBlockTool."""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from backend.api.features.chat.tools.models import ErrorResponse
|
||||
from backend.api.features.chat.tools.run_block import RunBlockTool
|
||||
from backend.data.block import BlockType
|
||||
|
||||
from ._test_data import make_session
|
||||
|
||||
_TEST_USER_ID = "test-user-run-block"
|
||||
|
||||
|
||||
def make_mock_block(
|
||||
block_id: str, name: str, block_type: BlockType, disabled: bool = False
|
||||
):
|
||||
"""Create a mock block for testing."""
|
||||
mock = MagicMock()
|
||||
mock.id = block_id
|
||||
mock.name = name
|
||||
mock.block_type = block_type
|
||||
mock.disabled = disabled
|
||||
mock.input_schema = MagicMock()
|
||||
mock.input_schema.jsonschema.return_value = {"properties": {}, "required": []}
|
||||
mock.input_schema.get_credentials_fields_info.return_value = []
|
||||
return mock
|
||||
|
||||
|
||||
class TestRunBlockFiltering:
|
||||
"""Tests for block execution guards in RunBlockTool."""
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_excluded_block_type_returns_error(self):
|
||||
"""Attempting to execute a block with excluded BlockType returns error."""
|
||||
session = make_session(user_id=_TEST_USER_ID)
|
||||
|
||||
input_block = make_mock_block("input-block-id", "Input Block", BlockType.INPUT)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.tools.run_block.get_block",
|
||||
return_value=input_block,
|
||||
):
|
||||
tool = RunBlockTool()
|
||||
response = await tool._execute(
|
||||
user_id=_TEST_USER_ID,
|
||||
session=session,
|
||||
block_id="input-block-id",
|
||||
input_data={},
|
||||
)
|
||||
|
||||
assert isinstance(response, ErrorResponse)
|
||||
assert "cannot be run directly in CoPilot" in response.message
|
||||
assert "designed for use within graphs only" in response.message
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_excluded_block_id_returns_error(self):
|
||||
"""Attempting to execute SmartDecisionMakerBlock returns error."""
|
||||
session = make_session(user_id=_TEST_USER_ID)
|
||||
|
||||
smart_decision_id = "3b191d9f-356f-482d-8238-ba04b6d18381"
|
||||
smart_block = make_mock_block(
|
||||
smart_decision_id, "Smart Decision Maker", BlockType.STANDARD
|
||||
)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.tools.run_block.get_block",
|
||||
return_value=smart_block,
|
||||
):
|
||||
tool = RunBlockTool()
|
||||
response = await tool._execute(
|
||||
user_id=_TEST_USER_ID,
|
||||
session=session,
|
||||
block_id=smart_decision_id,
|
||||
input_data={},
|
||||
)
|
||||
|
||||
assert isinstance(response, ErrorResponse)
|
||||
assert "cannot be run directly in CoPilot" in response.message
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_non_excluded_block_passes_guard(self):
|
||||
"""Non-excluded blocks pass the filtering guard (may fail later for other reasons)."""
|
||||
session = make_session(user_id=_TEST_USER_ID)
|
||||
|
||||
standard_block = make_mock_block(
|
||||
"standard-id", "HTTP Request", BlockType.STANDARD
|
||||
)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.tools.run_block.get_block",
|
||||
return_value=standard_block,
|
||||
):
|
||||
tool = RunBlockTool()
|
||||
response = await tool._execute(
|
||||
user_id=_TEST_USER_ID,
|
||||
session=session,
|
||||
block_id="standard-id",
|
||||
input_data={},
|
||||
)
|
||||
|
||||
# Should NOT be an ErrorResponse about CoPilot exclusion
|
||||
# (may be other errors like missing credentials, but not the exclusion guard)
|
||||
if isinstance(response, ErrorResponse):
|
||||
assert "cannot be run directly in CoPilot" not in response.message
|
||||
@@ -8,7 +8,6 @@ from backend.api.features.library import model as library_model
|
||||
from backend.api.features.store import db as store_db
|
||||
from backend.data.graph import GraphModel
|
||||
from backend.data.model import (
|
||||
Credentials,
|
||||
CredentialsFieldInfo,
|
||||
CredentialsMetaInput,
|
||||
HostScopedCredentials,
|
||||
@@ -224,99 +223,6 @@ async def get_or_create_library_agent(
|
||||
return library_agents[0]
|
||||
|
||||
|
||||
async def match_credentials_to_requirements(
|
||||
user_id: str,
|
||||
requirements: dict[str, CredentialsFieldInfo],
|
||||
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
|
||||
"""
|
||||
Match user's credentials against a dictionary of credential requirements.
|
||||
|
||||
This is the core matching logic shared by both graph and block credential matching.
|
||||
"""
|
||||
matched: dict[str, CredentialsMetaInput] = {}
|
||||
missing: list[CredentialsMetaInput] = []
|
||||
|
||||
if not requirements:
|
||||
return matched, missing
|
||||
|
||||
available_creds = await get_user_credentials(user_id)
|
||||
|
||||
for field_name, field_info in requirements.items():
|
||||
matching_cred = find_matching_credential(available_creds, field_info)
|
||||
|
||||
if matching_cred:
|
||||
try:
|
||||
matched[field_name] = create_credential_meta_from_match(matching_cred)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to create CredentialsMetaInput for field '{field_name}': "
|
||||
f"provider={matching_cred.provider}, type={matching_cred.type}, "
|
||||
f"credential_id={matching_cred.id}",
|
||||
exc_info=True,
|
||||
)
|
||||
provider = next(iter(field_info.provider), "unknown")
|
||||
cred_type = next(iter(field_info.supported_types), "api_key")
|
||||
missing.append(
|
||||
CredentialsMetaInput(
|
||||
id=field_name,
|
||||
provider=provider, # type: ignore
|
||||
type=cred_type, # type: ignore
|
||||
title=f"{field_name} (validation failed: {e})",
|
||||
)
|
||||
)
|
||||
else:
|
||||
provider = next(iter(field_info.provider), "unknown")
|
||||
cred_type = next(iter(field_info.supported_types), "api_key")
|
||||
missing.append(
|
||||
CredentialsMetaInput(
|
||||
id=field_name,
|
||||
provider=provider, # type: ignore
|
||||
type=cred_type, # type: ignore
|
||||
title=field_name.replace("_", " ").title(),
|
||||
)
|
||||
)
|
||||
|
||||
return matched, missing
|
||||
|
||||
|
||||
async def get_user_credentials(user_id: str) -> list[Credentials]:
|
||||
"""Get all available credentials for a user."""
|
||||
creds_manager = IntegrationCredentialsManager()
|
||||
return await creds_manager.store.get_all_creds(user_id)
|
||||
|
||||
|
||||
def find_matching_credential(
|
||||
available_creds: list[Credentials],
|
||||
field_info: CredentialsFieldInfo,
|
||||
) -> Credentials | None:
|
||||
"""Find a credential that matches the required provider, type, scopes, and host."""
|
||||
for cred in available_creds:
|
||||
if cred.provider not in field_info.provider:
|
||||
continue
|
||||
if cred.type not in field_info.supported_types:
|
||||
continue
|
||||
if cred.type == "oauth2" and not _credential_has_required_scopes(
|
||||
cred, field_info
|
||||
):
|
||||
continue
|
||||
if cred.type == "host_scoped" and not _credential_is_for_host(cred, field_info):
|
||||
continue
|
||||
return cred
|
||||
return None
|
||||
|
||||
|
||||
def create_credential_meta_from_match(
|
||||
matching_cred: Credentials,
|
||||
) -> CredentialsMetaInput:
|
||||
"""Create a CredentialsMetaInput from a matched credential."""
|
||||
return CredentialsMetaInput(
|
||||
id=matching_cred.id,
|
||||
provider=matching_cred.provider, # type: ignore
|
||||
type=matching_cred.type,
|
||||
title=matching_cred.title,
|
||||
)
|
||||
|
||||
|
||||
async def match_user_credentials_to_graph(
|
||||
user_id: str,
|
||||
graph: GraphModel,
|
||||
@@ -425,6 +331,8 @@ def _credential_has_required_scopes(
|
||||
# If no scopes are required, any credential matches
|
||||
if not requirements.required_scopes:
|
||||
return True
|
||||
|
||||
# Check that credential scopes are a superset of required scopes
|
||||
return set(credential.scopes).issuperset(requirements.required_scopes)
|
||||
|
||||
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
@@ -226,10 +225,6 @@ class SyncRabbitMQ(RabbitMQBase):
|
||||
class AsyncRabbitMQ(RabbitMQBase):
|
||||
"""Asynchronous RabbitMQ client"""
|
||||
|
||||
def __init__(self, config: RabbitMQConfig):
|
||||
super().__init__(config)
|
||||
self._reconnect_lock: asyncio.Lock | None = None
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return bool(self._connection and not self._connection.is_closed)
|
||||
@@ -240,17 +235,7 @@ class AsyncRabbitMQ(RabbitMQBase):
|
||||
|
||||
@conn_retry("AsyncRabbitMQ", "Acquiring async connection")
|
||||
async def connect(self):
|
||||
if self.is_connected and self._channel and not self._channel.is_closed:
|
||||
return
|
||||
|
||||
if (
|
||||
self.is_connected
|
||||
and self._connection
|
||||
and (self._channel is None or self._channel.is_closed)
|
||||
):
|
||||
self._channel = await self._connection.channel()
|
||||
await self._channel.set_qos(prefetch_count=1)
|
||||
await self.declare_infrastructure()
|
||||
if self.is_connected:
|
||||
return
|
||||
|
||||
self._connection = await aio_pika.connect_robust(
|
||||
@@ -306,46 +291,24 @@ class AsyncRabbitMQ(RabbitMQBase):
|
||||
exchange, routing_key=queue.routing_key or queue.name
|
||||
)
|
||||
|
||||
@property
|
||||
def _lock(self) -> asyncio.Lock:
|
||||
if self._reconnect_lock is None:
|
||||
self._reconnect_lock = asyncio.Lock()
|
||||
return self._reconnect_lock
|
||||
|
||||
async def _ensure_channel(self) -> aio_pika.abc.AbstractChannel:
|
||||
"""Get a valid channel, reconnecting if the current one is stale.
|
||||
|
||||
Uses a lock to prevent concurrent reconnection attempts from racing.
|
||||
"""
|
||||
if self.is_ready:
|
||||
return self._channel # type: ignore # is_ready guarantees non-None
|
||||
|
||||
async with self._lock:
|
||||
# Double-check after acquiring lock
|
||||
if self.is_ready:
|
||||
return self._channel # type: ignore
|
||||
|
||||
self._channel = None
|
||||
await self.connect()
|
||||
|
||||
if self._channel is None:
|
||||
raise RuntimeError("Channel should be established after connect")
|
||||
|
||||
return self._channel
|
||||
|
||||
async def _publish_once(
|
||||
@func_retry
|
||||
async def publish_message(
|
||||
self,
|
||||
routing_key: str,
|
||||
message: str,
|
||||
exchange: Optional[Exchange] = None,
|
||||
persistent: bool = True,
|
||||
) -> None:
|
||||
channel = await self._ensure_channel()
|
||||
if not self.is_ready:
|
||||
await self.connect()
|
||||
|
||||
if self._channel is None:
|
||||
raise RuntimeError("Channel should be established after connect")
|
||||
|
||||
if exchange:
|
||||
exchange_obj = await channel.get_exchange(exchange.name)
|
||||
exchange_obj = await self._channel.get_exchange(exchange.name)
|
||||
else:
|
||||
exchange_obj = channel.default_exchange
|
||||
exchange_obj = self._channel.default_exchange
|
||||
|
||||
await exchange_obj.publish(
|
||||
aio_pika.Message(
|
||||
@@ -359,23 +322,9 @@ class AsyncRabbitMQ(RabbitMQBase):
|
||||
routing_key=routing_key,
|
||||
)
|
||||
|
||||
@func_retry
|
||||
async def publish_message(
|
||||
self,
|
||||
routing_key: str,
|
||||
message: str,
|
||||
exchange: Optional[Exchange] = None,
|
||||
persistent: bool = True,
|
||||
) -> None:
|
||||
try:
|
||||
await self._publish_once(routing_key, message, exchange, persistent)
|
||||
except aio_pika.exceptions.ChannelInvalidStateError:
|
||||
logger.warning(
|
||||
"RabbitMQ channel invalid, forcing reconnect and retrying publish"
|
||||
)
|
||||
async with self._lock:
|
||||
self._channel = None
|
||||
await self._publish_once(routing_key, message, exchange, persistent)
|
||||
|
||||
async def get_channel(self) -> aio_pika.abc.AbstractChannel:
|
||||
return await self._ensure_channel()
|
||||
if not self.is_ready:
|
||||
await self.connect()
|
||||
if self._channel is None:
|
||||
raise RuntimeError("Channel should be established after connect")
|
||||
return self._channel
|
||||
|
||||
51
autogpt_platform/backend/poetry.lock
generated
51
autogpt_platform/backend/poetry.lock
generated
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aio-pika"
|
||||
@@ -46,14 +46,14 @@ pycares = ">=4.9.0,<5"
|
||||
|
||||
[[package]]
|
||||
name = "aiofiles"
|
||||
version = "25.1.0"
|
||||
version = "24.1.0"
|
||||
description = "File support for asyncio."
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "aiofiles-25.1.0-py3-none-any.whl", hash = "sha256:abe311e527c862958650f9438e859c1fa7568a141b22abcd015e120e86a85695"},
|
||||
{file = "aiofiles-25.1.0.tar.gz", hash = "sha256:a8d728f0a29de45dc521f18f07297428d56992a742f0cd2701ba86e44d23d5b2"},
|
||||
{file = "aiofiles-24.1.0-py3-none-any.whl", hash = "sha256:b4ec55f4195e3eb5d7abd1bf7e061763e864dd4954231fb8539a0ef8bb8260e5"},
|
||||
{file = "aiofiles-24.1.0.tar.gz", hash = "sha256:22a075c9e5a3810f0c2e48f3008c94d68c65d763b9b03857924c99e57355166c"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -374,7 +374,7 @@ description = "LTS Port of Python audioop"
|
||||
optional = false
|
||||
python-versions = ">=3.13"
|
||||
groups = ["main"]
|
||||
markers = "python_version == \"3.13\""
|
||||
markers = "python_version >= \"3.13\""
|
||||
files = [
|
||||
{file = "audioop_lts-0.2.2-cp313-abi3-macosx_10_13_universal2.whl", hash = "sha256:fd3d4602dc64914d462924a08c1a9816435a2155d74f325853c1f1ac3b2d9800"},
|
||||
{file = "audioop_lts-0.2.2-cp313-abi3-macosx_10_13_x86_64.whl", hash = "sha256:550c114a8df0aafe9a05442a1162dfc8fec37e9af1d625ae6060fed6e756f303"},
|
||||
@@ -474,7 +474,7 @@ description = "Backport of asyncio.Runner, a context manager that controls event
|
||||
optional = false
|
||||
python-versions = "<3.11,>=3.8"
|
||||
groups = ["main"]
|
||||
markers = "python_version == \"3.10\""
|
||||
markers = "python_version < \"3.11\""
|
||||
files = [
|
||||
{file = "backports_asyncio_runner-1.2.0-py3-none-any.whl", hash = "sha256:0da0a936a8aeb554eccb426dc55af3ba63bcdc69fa1a600b5bb305413a4477b5"},
|
||||
{file = "backports_asyncio_runner-1.2.0.tar.gz", hash = "sha256:a5aa7b2b7d8f8bfcaa2b57313f70792df84e32a2a746f585213373f900b42162"},
|
||||
@@ -487,7 +487,7 @@ description = "Backport of CPython tarfile module"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main"]
|
||||
markers = "python_version < \"3.12\""
|
||||
markers = "python_version <= \"3.11\""
|
||||
files = [
|
||||
{file = "backports.tarfile-1.2.0-py3-none-any.whl", hash = "sha256:77e284d754527b01fb1e6fa8a1afe577858ebe4e9dad8919e34c862cb399bc34"},
|
||||
{file = "backports_tarfile-1.2.0.tar.gz", hash = "sha256:d75e02c268746e1b8144c278978b6e98e85de6ad16f8e4b0844a154557eca991"},
|
||||
@@ -659,6 +659,7 @@ description = "Foreign Function Interface for Python calling C code."
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
markers = "platform_python_implementation != \"PyPy\" or sys_platform == \"darwin\""
|
||||
files = [
|
||||
{file = "cffi-2.0.0-cp310-cp310-macosx_10_13_x86_64.whl", hash = "sha256:0cf2d91ecc3fcc0625c2c530fe004f82c110405f101548512cce44322fa8ac44"},
|
||||
{file = "cffi-2.0.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:f73b96c41e3b2adedc34a7356e64c8eb96e03a3782b535e043a986276ce12a49"},
|
||||
@@ -1337,7 +1338,7 @@ description = "Backport of PEP 654 (exception groups)"
|
||||
optional = false
|
||||
python-versions = ">=3.7"
|
||||
groups = ["main", "dev"]
|
||||
markers = "python_version == \"3.10\""
|
||||
markers = "python_version < \"3.11\""
|
||||
files = [
|
||||
{file = "exceptiongroup-1.3.1-py3-none-any.whl", hash = "sha256:a7a39a3bd276781e98394987d3a5701d0c4edffb633bb7a5144577f82c773598"},
|
||||
{file = "exceptiongroup-1.3.1.tar.gz", hash = "sha256:8b412432c6055b0b7d14c310000ae93352ed6754f70fa8f7c34141f91c4e3219"},
|
||||
@@ -1819,16 +1820,16 @@ files = [
|
||||
google-auth = ">=2.14.1,<3.0.0"
|
||||
googleapis-common-protos = ">=1.56.2,<2.0.0"
|
||||
grpcio = [
|
||||
{version = ">=1.49.1,<2.0.0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""},
|
||||
{version = ">=1.33.2,<2.0.0", optional = true, markers = "extra == \"grpc\""},
|
||||
{version = ">=1.49.1,<2.0.0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""},
|
||||
]
|
||||
grpcio-status = [
|
||||
{version = ">=1.49.1,<2.0.0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""},
|
||||
{version = ">=1.33.2,<2.0.0", optional = true, markers = "extra == \"grpc\""},
|
||||
{version = ">=1.49.1,<2.0.0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""},
|
||||
]
|
||||
proto-plus = [
|
||||
{version = ">=1.25.0,<2.0.0", markers = "python_version >= \"3.13\""},
|
||||
{version = ">=1.22.3,<2.0.0"},
|
||||
{version = ">=1.25.0,<2.0.0", markers = "python_version >= \"3.13\""},
|
||||
]
|
||||
protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<7.0.0"
|
||||
requests = ">=2.18.0,<3.0.0"
|
||||
@@ -1939,8 +1940,8 @@ google-api-core = {version = ">=1.34.1,<2.0.dev0 || >=2.11.dev0,<3.0.0", extras
|
||||
google-auth = ">=2.14.1,<2.24.0 || >2.24.0,<2.25.0 || >2.25.0,<3.0.0"
|
||||
grpcio = ">=1.33.2,<2.0.0"
|
||||
proto-plus = [
|
||||
{version = ">=1.25.0,<2.0.0", markers = "python_version >= \"3.13\""},
|
||||
{version = ">=1.22.3,<2.0.0"},
|
||||
{version = ">=1.25.0,<2.0.0", markers = "python_version >= \"3.13\""},
|
||||
]
|
||||
protobuf = ">=3.20.2,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<7.0.0"
|
||||
|
||||
@@ -2000,9 +2001,9 @@ google-cloud-core = ">=2.0.0,<3.0.0"
|
||||
grpc-google-iam-v1 = ">=0.12.4,<1.0.0"
|
||||
opentelemetry-api = ">=1.9.0"
|
||||
proto-plus = [
|
||||
{version = ">=1.22.0,<2.0.0"},
|
||||
{version = ">=1.22.2,<2.0.0", markers = "python_version >= \"3.11\""},
|
||||
{version = ">=1.25.0,<2.0.0", markers = "python_version >= \"3.13\""},
|
||||
{version = ">=1.22.2,<2.0.0", markers = "python_version >= \"3.11\" and python_version < \"3.13\""},
|
||||
{version = ">=1.22.0,<2.0.0", markers = "python_version < \"3.11\""},
|
||||
]
|
||||
protobuf = ">=3.20.2,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<7.0.0"
|
||||
|
||||
@@ -3801,7 +3802,7 @@ description = "Fundamental package for array computing in Python"
|
||||
optional = false
|
||||
python-versions = ">=3.10"
|
||||
groups = ["main"]
|
||||
markers = "python_version == \"3.10\""
|
||||
markers = "python_version < \"3.11\""
|
||||
files = [
|
||||
{file = "numpy-2.2.6-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b412caa66f72040e6d268491a59f2c43bf03eb6c96dd8f0307829feb7fa2b6fb"},
|
||||
{file = "numpy-2.2.6-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8e41fd67c52b86603a91c1a505ebaef50b3314de0213461c7a6e99c9a3beff90"},
|
||||
@@ -4286,9 +4287,9 @@ files = [
|
||||
|
||||
[package.dependencies]
|
||||
numpy = [
|
||||
{version = ">=1.26.0", markers = "python_version >= \"3.12\""},
|
||||
{version = ">=1.23.2", markers = "python_version == \"3.11\""},
|
||||
{version = ">=1.22.4", markers = "python_version < \"3.11\""},
|
||||
{version = ">=1.23.2", markers = "python_version == \"3.11\""},
|
||||
{version = ">=1.26.0", markers = "python_version >= \"3.12\""},
|
||||
]
|
||||
python-dateutil = ">=2.8.2"
|
||||
pytz = ">=2020.1"
|
||||
@@ -4531,8 +4532,8 @@ pinecone-plugin-interface = ">=0.0.7,<0.0.8"
|
||||
python-dateutil = ">=2.5.3"
|
||||
typing-extensions = ">=3.7.4"
|
||||
urllib3 = [
|
||||
{version = ">=1.26.5", markers = "python_version >= \"3.12\" and python_version < \"4.0\""},
|
||||
{version = ">=1.26.0", markers = "python_version >= \"3.8\" and python_version < \"3.12\""},
|
||||
{version = ">=1.26.5", markers = "python_version >= \"3.12\" and python_version < \"4.0\""},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
@@ -5360,7 +5361,7 @@ description = "C parser in Python"
|
||||
optional = false
|
||||
python-versions = ">=3.10"
|
||||
groups = ["main"]
|
||||
markers = "implementation_name != \"PyPy\""
|
||||
markers = "(platform_python_implementation != \"PyPy\" or sys_platform == \"darwin\") and implementation_name != \"PyPy\""
|
||||
files = [
|
||||
{file = "pycparser-3.0-py3-none-any.whl", hash = "sha256:b727414169a36b7d524c1c3e31839a521725078d7b2ff038656844266160a992"},
|
||||
{file = "pycparser-3.0.tar.gz", hash = "sha256:600f49d217304a5902ac3c37e1281c9fe94e4d0489de643a9504c5cdfdfc6b29"},
|
||||
@@ -6129,10 +6130,10 @@ files = [
|
||||
grpcio = ">=1.41.0"
|
||||
httpx = {version = ">=0.20.0", extras = ["http2"]}
|
||||
numpy = [
|
||||
{version = ">=2.1.0", markers = "python_version == \"3.13\""},
|
||||
{version = ">=1.21", markers = "python_version == \"3.11\""},
|
||||
{version = ">=1.26", markers = "python_version == \"3.12\""},
|
||||
{version = ">=1.21,<2.3.0", markers = "python_version == \"3.10\""},
|
||||
{version = ">=1.21", markers = "python_version == \"3.11\""},
|
||||
{version = ">=2.1.0", markers = "python_version == \"3.13\""},
|
||||
{version = ">=1.26", markers = "python_version == \"3.12\""},
|
||||
]
|
||||
portalocker = ">=2.7.0,<4.0"
|
||||
protobuf = ">=3.20.0"
|
||||
@@ -7316,7 +7317,7 @@ description = "A lil' TOML parser"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
groups = ["main", "dev"]
|
||||
markers = "python_version == \"3.10\""
|
||||
markers = "python_version < \"3.11\""
|
||||
files = [
|
||||
{file = "tomli-2.4.0-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:b5ef256a3fd497d4973c11bf142e9ed78b150d36f5773f1ca6088c230ffc5867"},
|
||||
{file = "tomli-2.4.0-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:5572e41282d5268eb09a697c89a7bee84fae66511f87533a6f88bd2f7b652da9"},
|
||||
@@ -8439,4 +8440,4 @@ cffi = ["cffi (>=1.17,<2.0) ; platform_python_implementation != \"PyPy\" and pyt
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.10,<3.14"
|
||||
content-hash = "8b5ab34975748482dbd84de8e6a3f2970f81bd6fae782ae0f58716cb040da5c5"
|
||||
content-hash = "14686ee0e2dc446a75d0db145b08dc410dc31c357e25085bb0f9b0174711c4b1"
|
||||
|
||||
@@ -76,7 +76,7 @@ yt-dlp = "2025.12.08"
|
||||
zerobouncesdk = "^1.1.2"
|
||||
# NOTE: please insert new dependencies in their alphabetical location
|
||||
pytest-snapshot = "^0.9.0"
|
||||
aiofiles = "^25.1.0"
|
||||
aiofiles = "^24.1.0"
|
||||
tiktoken = "^0.12.0"
|
||||
aioclamd = "^1.0.0"
|
||||
setuptools = "^80.9.0"
|
||||
|
||||
@@ -104,31 +104,7 @@ export function FileInput(props: Props) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const getFileLabelFromValue = (val: unknown): string => {
|
||||
// Handle object format from external API: { name, type, size, data }
|
||||
if (val && typeof val === "object") {
|
||||
const obj = val as Record<string, unknown>;
|
||||
if (typeof obj.name === "string") {
|
||||
return getFileLabel(
|
||||
obj.name,
|
||||
typeof obj.type === "string" ? obj.type : "",
|
||||
);
|
||||
}
|
||||
if (typeof obj.type === "string") {
|
||||
const mimeParts = obj.type.split("/");
|
||||
if (mimeParts.length > 1) {
|
||||
return `${mimeParts[1].toUpperCase()} file`;
|
||||
}
|
||||
return `${obj.type} file`;
|
||||
}
|
||||
return "File";
|
||||
}
|
||||
|
||||
// Handle string values (data URIs or file paths)
|
||||
if (typeof val !== "string") {
|
||||
return "File";
|
||||
}
|
||||
|
||||
const getFileLabelFromValue = (val: string) => {
|
||||
if (val.startsWith("data:")) {
|
||||
const matches = val.match(/^data:([^;]+);/);
|
||||
if (matches?.[1]) {
|
||||
|
||||
Reference in New Issue
Block a user