mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-09 22:35:54 -05:00
Compare commits
1 Commits
dependabot
...
codex/add-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fda10563e7 |
@@ -52,7 +52,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Trigger deploy workflow
|
||||
uses: peter-evans/repository-dispatch@v4
|
||||
uses: peter-evans/repository-dispatch@v3
|
||||
with:
|
||||
token: ${{ secrets.DEPLOY_TOKEN }}
|
||||
repository: Significant-Gravitas/AutoGPT_cloud_infrastructure
|
||||
|
||||
@@ -45,7 +45,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Trigger deploy workflow
|
||||
uses: peter-evans/repository-dispatch@v4
|
||||
uses: peter-evans/repository-dispatch@v3
|
||||
with:
|
||||
token: ${{ secrets.DEPLOY_TOKEN }}
|
||||
repository: Significant-Gravitas/AutoGPT_cloud_infrastructure
|
||||
|
||||
@@ -82,7 +82,7 @@ jobs:
|
||||
|
||||
- name: Dispatch Deploy Event
|
||||
if: steps.check_status.outputs.should_deploy == 'true'
|
||||
uses: peter-evans/repository-dispatch@v4
|
||||
uses: peter-evans/repository-dispatch@v3
|
||||
with:
|
||||
token: ${{ secrets.DISPATCH_TOKEN }}
|
||||
repository: Significant-Gravitas/AutoGPT_cloud_infrastructure
|
||||
@@ -110,7 +110,7 @@ jobs:
|
||||
|
||||
- name: Dispatch Undeploy Event (from comment)
|
||||
if: steps.check_status.outputs.should_undeploy == 'true'
|
||||
uses: peter-evans/repository-dispatch@v4
|
||||
uses: peter-evans/repository-dispatch@v3
|
||||
with:
|
||||
token: ${{ secrets.DISPATCH_TOKEN }}
|
||||
repository: Significant-Gravitas/AutoGPT_cloud_infrastructure
|
||||
@@ -168,7 +168,7 @@ jobs:
|
||||
github.event_name == 'pull_request' &&
|
||||
github.event.action == 'closed' &&
|
||||
steps.check_pr_close.outputs.should_undeploy == 'true'
|
||||
uses: peter-evans/repository-dispatch@v4
|
||||
uses: peter-evans/repository-dispatch@v3
|
||||
with:
|
||||
token: ${{ secrets.DISPATCH_TOKEN }}
|
||||
repository: Significant-Gravitas/AutoGPT_cloud_infrastructure
|
||||
|
||||
@@ -45,7 +45,10 @@ async def create_chat_session(
|
||||
successfulAgentRuns=SafeJson({}),
|
||||
successfulAgentSchedules=SafeJson({}),
|
||||
)
|
||||
return await PrismaChatSession.prisma().create(data=data)
|
||||
return await PrismaChatSession.prisma().create(
|
||||
data=data,
|
||||
include={"Messages": True},
|
||||
)
|
||||
|
||||
|
||||
async def update_chat_session(
|
||||
|
||||
@@ -266,38 +266,12 @@ async def stream_chat_post(
|
||||
|
||||
"""
|
||||
import asyncio
|
||||
import time
|
||||
|
||||
stream_start_time = time.perf_counter()
|
||||
|
||||
# Base log metadata (task_id added after creation)
|
||||
log_meta = {"component": "ChatStream", "session_id": session_id}
|
||||
if user_id:
|
||||
log_meta["user_id"] = user_id
|
||||
|
||||
logger.info(
|
||||
f"[TIMING] stream_chat_post STARTED, session={session_id}, "
|
||||
f"user={user_id}, message_len={len(request.message)}",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
|
||||
session = await _validate_and_get_session(session_id, user_id)
|
||||
logger.info(
|
||||
f"[TIMING] session validated in {(time.perf_counter() - stream_start_time)*1000:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"duration_ms": (time.perf_counter() - stream_start_time) * 1000,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Create a task in the stream registry for reconnection support
|
||||
task_id = str(uuid_module.uuid4())
|
||||
operation_id = str(uuid_module.uuid4())
|
||||
log_meta["task_id"] = task_id
|
||||
|
||||
task_create_start = time.perf_counter()
|
||||
await stream_registry.create_task(
|
||||
task_id=task_id,
|
||||
session_id=session_id,
|
||||
@@ -306,46 +280,14 @@ async def stream_chat_post(
|
||||
tool_name="chat",
|
||||
operation_id=operation_id,
|
||||
)
|
||||
logger.info(
|
||||
f"[TIMING] create_task completed in {(time.perf_counter() - task_create_start)*1000:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"duration_ms": (time.perf_counter() - task_create_start) * 1000,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Background task that runs the AI generation independently of SSE connection
|
||||
async def run_ai_generation():
|
||||
import time as time_module
|
||||
|
||||
gen_start_time = time_module.perf_counter()
|
||||
logger.info(
|
||||
f"[TIMING] run_ai_generation STARTED, task={task_id}, session={session_id}, user={user_id}",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
first_chunk_time, ttfc = None, None
|
||||
chunk_count = 0
|
||||
try:
|
||||
# Emit a start event with task_id for reconnection
|
||||
start_chunk = StreamStart(messageId=task_id, taskId=task_id)
|
||||
await stream_registry.publish_chunk(task_id, start_chunk)
|
||||
logger.info(
|
||||
f"[TIMING] StreamStart published at {(time_module.perf_counter() - gen_start_time)*1000:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"elapsed_ms": (time_module.perf_counter() - gen_start_time)
|
||||
* 1000,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"[TIMING] Calling stream_chat_completion",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
async for chunk in chat_service.stream_chat_completion(
|
||||
session_id,
|
||||
request.message,
|
||||
@@ -354,202 +296,54 @@ async def stream_chat_post(
|
||||
session=session, # Pass pre-fetched session to avoid double-fetch
|
||||
context=request.context,
|
||||
):
|
||||
chunk_count += 1
|
||||
if first_chunk_time is None:
|
||||
first_chunk_time = time_module.perf_counter()
|
||||
ttfc = first_chunk_time - gen_start_time
|
||||
logger.info(
|
||||
f"[TIMING] FIRST AI CHUNK at {ttfc:.2f}s, type={type(chunk).__name__}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"chunk_type": type(chunk).__name__,
|
||||
"time_to_first_chunk_ms": ttfc * 1000,
|
||||
}
|
||||
},
|
||||
)
|
||||
# Write to Redis (subscribers will receive via XREAD)
|
||||
await stream_registry.publish_chunk(task_id, chunk)
|
||||
|
||||
gen_end_time = time_module.perf_counter()
|
||||
total_time = (gen_end_time - gen_start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] run_ai_generation FINISHED in {total_time/1000:.1f}s; "
|
||||
f"task={task_id}, session={session_id}, "
|
||||
f"ttfc={ttfc or -1:.2f}s, n_chunks={chunk_count}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"total_time_ms": total_time,
|
||||
"time_to_first_chunk_ms": (
|
||||
ttfc * 1000 if ttfc is not None else None
|
||||
),
|
||||
"n_chunks": chunk_count,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
# Mark task as completed
|
||||
await stream_registry.mark_task_completed(task_id, "completed")
|
||||
except Exception as e:
|
||||
elapsed = time_module.perf_counter() - gen_start_time
|
||||
logger.error(
|
||||
f"[TIMING] run_ai_generation ERROR after {elapsed:.2f}s: {e}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"elapsed_ms": elapsed * 1000,
|
||||
"error": str(e),
|
||||
}
|
||||
},
|
||||
f"Error in background AI generation for session {session_id}: {e}"
|
||||
)
|
||||
await stream_registry.mark_task_completed(task_id, "failed")
|
||||
|
||||
# Start the AI generation in a background task
|
||||
bg_task = asyncio.create_task(run_ai_generation())
|
||||
await stream_registry.set_task_asyncio_task(task_id, bg_task)
|
||||
setup_time = (time.perf_counter() - stream_start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] Background task started, setup={setup_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "setup_time_ms": setup_time}},
|
||||
)
|
||||
|
||||
# SSE endpoint that subscribes to the task's stream
|
||||
async def event_generator() -> AsyncGenerator[str, None]:
|
||||
import time as time_module
|
||||
|
||||
event_gen_start = time_module.perf_counter()
|
||||
logger.info(
|
||||
f"[TIMING] event_generator STARTED, task={task_id}, session={session_id}, "
|
||||
f"user={user_id}",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
subscriber_queue = None
|
||||
first_chunk_yielded = False
|
||||
chunks_yielded = 0
|
||||
try:
|
||||
# Subscribe to the task stream (this replays existing messages + live updates)
|
||||
subscribe_start = time_module.perf_counter()
|
||||
logger.info(
|
||||
"[TIMING] Calling subscribe_to_task",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
subscriber_queue = await stream_registry.subscribe_to_task(
|
||||
task_id=task_id,
|
||||
user_id=user_id,
|
||||
last_message_id="0-0", # Get all messages from the beginning
|
||||
)
|
||||
subscribe_time = (time_module.perf_counter() - subscribe_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] subscribe_to_task completed in {subscribe_time:.1f}ms, "
|
||||
f"queue_ok={subscriber_queue is not None}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"duration_ms": subscribe_time,
|
||||
"queue_obtained": subscriber_queue is not None,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
if subscriber_queue is None:
|
||||
logger.info(
|
||||
"[TIMING] subscriber_queue is None, yielding finish",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
yield StreamFinish().to_sse()
|
||||
yield "data: [DONE]\n\n"
|
||||
return
|
||||
|
||||
# Read from the subscriber queue and yield to SSE
|
||||
logger.info(
|
||||
"[TIMING] Starting to read from subscriber_queue",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
while True:
|
||||
try:
|
||||
queue_wait_start = time_module.perf_counter()
|
||||
chunk = await asyncio.wait_for(subscriber_queue.get(), timeout=30.0)
|
||||
queue_wait_time = (
|
||||
time_module.perf_counter() - queue_wait_start
|
||||
) * 1000
|
||||
chunks_yielded += 1
|
||||
|
||||
if not first_chunk_yielded:
|
||||
first_chunk_yielded = True
|
||||
elapsed = time_module.perf_counter() - event_gen_start
|
||||
logger.info(
|
||||
f"[TIMING] FIRST CHUNK from queue at {elapsed:.2f}s, "
|
||||
f"type={type(chunk).__name__}, "
|
||||
f"wait={queue_wait_time:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"chunk_type": type(chunk).__name__,
|
||||
"elapsed_ms": elapsed * 1000,
|
||||
"queue_wait_ms": queue_wait_time,
|
||||
}
|
||||
},
|
||||
)
|
||||
elif chunks_yielded % 50 == 0:
|
||||
logger.info(
|
||||
f"[TIMING] Chunk #{chunks_yielded}, "
|
||||
f"type={type(chunk).__name__}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"chunk_number": chunks_yielded,
|
||||
"chunk_type": type(chunk).__name__,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
yield chunk.to_sse()
|
||||
|
||||
# Check for finish signal
|
||||
if isinstance(chunk, StreamFinish):
|
||||
total_time = time_module.perf_counter() - event_gen_start
|
||||
logger.info(
|
||||
f"[TIMING] StreamFinish received in {total_time:.2f}s; "
|
||||
f"n_chunks={chunks_yielded}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"chunks_yielded": chunks_yielded,
|
||||
"total_time_ms": total_time * 1000,
|
||||
}
|
||||
},
|
||||
)
|
||||
break
|
||||
except asyncio.TimeoutError:
|
||||
# Send heartbeat to keep connection alive
|
||||
logger.info(
|
||||
f"[TIMING] Heartbeat timeout, chunks_so_far={chunks_yielded}",
|
||||
extra={
|
||||
"json_fields": {**log_meta, "chunks_so_far": chunks_yielded}
|
||||
},
|
||||
)
|
||||
yield StreamHeartbeat().to_sse()
|
||||
|
||||
except GeneratorExit:
|
||||
logger.info(
|
||||
f"[TIMING] GeneratorExit (client disconnected), chunks={chunks_yielded}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"chunks_yielded": chunks_yielded,
|
||||
"reason": "client_disconnect",
|
||||
}
|
||||
},
|
||||
)
|
||||
pass # Client disconnected - background task continues
|
||||
except Exception as e:
|
||||
elapsed = (time_module.perf_counter() - event_gen_start) * 1000
|
||||
logger.error(
|
||||
f"[TIMING] event_generator ERROR after {elapsed:.1f}ms: {e}",
|
||||
extra={
|
||||
"json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)}
|
||||
},
|
||||
)
|
||||
logger.error(f"Error in SSE stream for task {task_id}: {e}")
|
||||
finally:
|
||||
# Unsubscribe when client disconnects or stream ends to prevent resource leak
|
||||
if subscriber_queue is not None:
|
||||
@@ -563,18 +357,6 @@ async def stream_chat_post(
|
||||
exc_info=True,
|
||||
)
|
||||
# AI SDK protocol termination - always yield even if unsubscribe fails
|
||||
total_time = time_module.perf_counter() - event_gen_start
|
||||
logger.info(
|
||||
f"[TIMING] event_generator FINISHED in {total_time:.2f}s; "
|
||||
f"task={task_id}, session={session_id}, n_chunks={chunks_yielded}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"total_time_ms": total_time * 1000,
|
||||
"chunks_yielded": chunks_yielded,
|
||||
}
|
||||
},
|
||||
)
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
return StreamingResponse(
|
||||
@@ -643,7 +425,7 @@ async def stream_chat_get(
|
||||
"Chat stream completed",
|
||||
extra={
|
||||
"session_id": session_id,
|
||||
"n_chunks": chunk_count,
|
||||
"chunk_count": chunk_count,
|
||||
"first_chunk_type": first_chunk_type,
|
||||
},
|
||||
)
|
||||
|
||||
@@ -371,45 +371,21 @@ async def stream_chat_completion(
|
||||
ValueError: If max_context_messages is exceeded
|
||||
|
||||
"""
|
||||
completion_start = time.monotonic()
|
||||
|
||||
# Build log metadata for structured logging
|
||||
log_meta = {"component": "ChatService", "session_id": session_id}
|
||||
if user_id:
|
||||
log_meta["user_id"] = user_id
|
||||
|
||||
logger.info(
|
||||
f"[TIMING] stream_chat_completion STARTED, session={session_id}, user={user_id}, "
|
||||
f"message_len={len(message) if message else 0}, is_user={is_user_message}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"message_len": len(message) if message else 0,
|
||||
"is_user_message": is_user_message,
|
||||
}
|
||||
},
|
||||
f"Streaming chat completion for session {session_id} for message {message} and user id {user_id}. Message is user message: {is_user_message}"
|
||||
)
|
||||
|
||||
# Only fetch from Redis if session not provided (initial call)
|
||||
if session is None:
|
||||
fetch_start = time.monotonic()
|
||||
session = await get_chat_session(session_id, user_id)
|
||||
fetch_time = (time.monotonic() - fetch_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] get_chat_session took {fetch_time:.1f}ms, "
|
||||
f"n_messages={len(session.messages) if session else 0}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"duration_ms": fetch_time,
|
||||
"n_messages": len(session.messages) if session else 0,
|
||||
}
|
||||
},
|
||||
f"Fetched session from Redis: {session.session_id if session else 'None'}, "
|
||||
f"message_count={len(session.messages) if session else 0}"
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f"[TIMING] Using provided session, messages={len(session.messages)}",
|
||||
extra={"json_fields": {**log_meta, "n_messages": len(session.messages)}},
|
||||
f"Using provided session object: {session.session_id}, "
|
||||
f"message_count={len(session.messages)}"
|
||||
)
|
||||
|
||||
if not session:
|
||||
@@ -430,25 +406,17 @@ async def stream_chat_completion(
|
||||
|
||||
# Track user message in PostHog
|
||||
if is_user_message:
|
||||
posthog_start = time.monotonic()
|
||||
track_user_message(
|
||||
user_id=user_id,
|
||||
session_id=session_id,
|
||||
message_length=len(message),
|
||||
)
|
||||
posthog_time = (time.monotonic() - posthog_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] track_user_message took {posthog_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": posthog_time}},
|
||||
)
|
||||
|
||||
upsert_start = time.monotonic()
|
||||
session = await upsert_chat_session(session)
|
||||
upsert_time = (time.monotonic() - upsert_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] upsert_chat_session took {upsert_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": upsert_time}},
|
||||
f"Upserting session: {session.session_id} with user id {session.user_id}, "
|
||||
f"message_count={len(session.messages)}"
|
||||
)
|
||||
session = await upsert_chat_session(session)
|
||||
assert session, "Session not found"
|
||||
|
||||
# Generate title for new sessions on first user message (non-blocking)
|
||||
@@ -486,13 +454,7 @@ async def stream_chat_completion(
|
||||
asyncio.create_task(_update_title())
|
||||
|
||||
# Build system prompt with business understanding
|
||||
prompt_start = time.monotonic()
|
||||
system_prompt, understanding = await _build_system_prompt(user_id)
|
||||
prompt_time = (time.monotonic() - prompt_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] _build_system_prompt took {prompt_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": prompt_time}},
|
||||
)
|
||||
|
||||
# Initialize variables for streaming
|
||||
assistant_response = ChatMessage(
|
||||
@@ -521,18 +483,9 @@ async def stream_chat_completion(
|
||||
text_block_id = str(uuid_module.uuid4())
|
||||
|
||||
# Yield message start
|
||||
setup_time = (time.monotonic() - completion_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] Setup complete, yielding StreamStart at {setup_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "setup_time_ms": setup_time}},
|
||||
)
|
||||
yield StreamStart(messageId=message_id)
|
||||
|
||||
try:
|
||||
logger.info(
|
||||
"[TIMING] Calling _stream_chat_chunks",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
async for chunk in _stream_chat_chunks(
|
||||
session=session,
|
||||
tools=tools,
|
||||
@@ -940,21 +893,9 @@ async def _stream_chat_chunks(
|
||||
SSE formatted JSON response objects
|
||||
|
||||
"""
|
||||
import time as time_module
|
||||
|
||||
stream_chunks_start = time_module.perf_counter()
|
||||
model = config.model
|
||||
|
||||
# Build log metadata for structured logging
|
||||
log_meta = {"component": "ChatService", "session_id": session.session_id}
|
||||
if session.user_id:
|
||||
log_meta["user_id"] = session.user_id
|
||||
|
||||
logger.info(
|
||||
f"[TIMING] _stream_chat_chunks STARTED, session={session.session_id}, "
|
||||
f"user={session.user_id}, n_messages={len(session.messages)}",
|
||||
extra={"json_fields": {**log_meta, "n_messages": len(session.messages)}},
|
||||
)
|
||||
logger.info("Starting pure chat stream")
|
||||
|
||||
messages = session.to_openai_messages()
|
||||
if system_prompt:
|
||||
@@ -965,18 +906,12 @@ async def _stream_chat_chunks(
|
||||
messages = [system_message] + messages
|
||||
|
||||
# Apply context window management
|
||||
context_start = time_module.perf_counter()
|
||||
context_result = await _manage_context_window(
|
||||
messages=messages,
|
||||
model=model,
|
||||
api_key=config.api_key,
|
||||
base_url=config.base_url,
|
||||
)
|
||||
context_time = (time_module.perf_counter() - context_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] _manage_context_window took {context_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": context_time}},
|
||||
)
|
||||
|
||||
if context_result.error:
|
||||
if "System prompt dropped" in context_result.error:
|
||||
@@ -1011,19 +946,9 @@ async def _stream_chat_chunks(
|
||||
|
||||
while retry_count <= MAX_RETRIES:
|
||||
try:
|
||||
elapsed = (time_module.perf_counter() - stream_chunks_start) * 1000
|
||||
retry_info = (
|
||||
f" (retry {retry_count}/{MAX_RETRIES})" if retry_count > 0 else ""
|
||||
)
|
||||
logger.info(
|
||||
f"[TIMING] Creating OpenAI stream at {elapsed:.1f}ms{retry_info}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"elapsed_ms": elapsed,
|
||||
"retry_count": retry_count,
|
||||
}
|
||||
},
|
||||
f"Creating OpenAI chat completion stream..."
|
||||
f"{f' (retry {retry_count}/{MAX_RETRIES})' if retry_count > 0 else ''}"
|
||||
)
|
||||
|
||||
# Build extra_body for OpenRouter tracing and PostHog analytics
|
||||
@@ -1040,7 +965,6 @@ async def _stream_chat_chunks(
|
||||
:128
|
||||
] # OpenRouter limit
|
||||
|
||||
api_call_start = time_module.perf_counter()
|
||||
stream = await client.chat.completions.create(
|
||||
model=model,
|
||||
messages=cast(list[ChatCompletionMessageParam], messages),
|
||||
@@ -1050,11 +974,6 @@ async def _stream_chat_chunks(
|
||||
stream_options=ChatCompletionStreamOptionsParam(include_usage=True),
|
||||
extra_body=extra_body,
|
||||
)
|
||||
api_init_time = (time_module.perf_counter() - api_call_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] OpenAI stream object returned in {api_init_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": api_init_time}},
|
||||
)
|
||||
|
||||
# Variables to accumulate tool calls
|
||||
tool_calls: list[dict[str, Any]] = []
|
||||
@@ -1065,13 +984,10 @@ async def _stream_chat_chunks(
|
||||
|
||||
# Track if we've started the text block
|
||||
text_started = False
|
||||
first_content_chunk = True
|
||||
chunk_count = 0
|
||||
|
||||
# Process the stream
|
||||
chunk: ChatCompletionChunk
|
||||
async for chunk in stream:
|
||||
chunk_count += 1
|
||||
if chunk.usage:
|
||||
yield StreamUsage(
|
||||
promptTokens=chunk.usage.prompt_tokens,
|
||||
@@ -1094,23 +1010,6 @@ async def _stream_chat_chunks(
|
||||
if not text_started and text_block_id:
|
||||
yield StreamTextStart(id=text_block_id)
|
||||
text_started = True
|
||||
# Log timing for first content chunk
|
||||
if first_content_chunk:
|
||||
first_content_chunk = False
|
||||
ttfc = (
|
||||
time_module.perf_counter() - api_call_start
|
||||
) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] FIRST CONTENT CHUNK at {ttfc:.1f}ms "
|
||||
f"(since API call), n_chunks={chunk_count}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"time_to_first_chunk_ms": ttfc,
|
||||
"n_chunks": chunk_count,
|
||||
}
|
||||
},
|
||||
)
|
||||
# Stream the text delta
|
||||
text_response = StreamTextDelta(
|
||||
id=text_block_id or "",
|
||||
@@ -1167,21 +1066,7 @@ async def _stream_chat_chunks(
|
||||
toolName=tool_calls[idx]["function"]["name"],
|
||||
)
|
||||
emitted_start_for_idx.add(idx)
|
||||
stream_duration = time_module.perf_counter() - api_call_start
|
||||
logger.info(
|
||||
f"[TIMING] OpenAI stream COMPLETE, finish_reason={finish_reason}, "
|
||||
f"duration={stream_duration:.2f}s, "
|
||||
f"n_chunks={chunk_count}, n_tool_calls={len(tool_calls)}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"stream_duration_ms": stream_duration * 1000,
|
||||
"finish_reason": finish_reason,
|
||||
"n_chunks": chunk_count,
|
||||
"n_tool_calls": len(tool_calls),
|
||||
}
|
||||
},
|
||||
)
|
||||
logger.info(f"Stream complete. Finish reason: {finish_reason}")
|
||||
|
||||
# Yield all accumulated tool calls after the stream is complete
|
||||
# This ensures all tool call arguments have been fully received
|
||||
@@ -1201,12 +1086,6 @@ async def _stream_chat_chunks(
|
||||
# Re-raise to trigger retry logic in the parent function
|
||||
raise
|
||||
|
||||
total_time = (time_module.perf_counter() - stream_chunks_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] _stream_chat_chunks COMPLETED in {total_time/1000:.1f}s; "
|
||||
f"session={session.session_id}, user={session.user_id}",
|
||||
extra={"json_fields": {**log_meta, "total_time_ms": total_time}},
|
||||
)
|
||||
yield StreamFinish()
|
||||
return
|
||||
except Exception as e:
|
||||
|
||||
@@ -104,24 +104,6 @@ async def create_task(
|
||||
Returns:
|
||||
The created ActiveTask instance (metadata only)
|
||||
"""
|
||||
import time
|
||||
|
||||
start_time = time.perf_counter()
|
||||
|
||||
# Build log metadata for structured logging
|
||||
log_meta = {
|
||||
"component": "StreamRegistry",
|
||||
"task_id": task_id,
|
||||
"session_id": session_id,
|
||||
}
|
||||
if user_id:
|
||||
log_meta["user_id"] = user_id
|
||||
|
||||
logger.info(
|
||||
f"[TIMING] create_task STARTED, task={task_id}, session={session_id}, user={user_id}",
|
||||
extra={"json_fields": log_meta},
|
||||
)
|
||||
|
||||
task = ActiveTask(
|
||||
task_id=task_id,
|
||||
session_id=session_id,
|
||||
@@ -132,18 +114,10 @@ async def create_task(
|
||||
)
|
||||
|
||||
# Store metadata in Redis
|
||||
redis_start = time.perf_counter()
|
||||
redis = await get_redis_async()
|
||||
redis_time = (time.perf_counter() - redis_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] get_redis_async took {redis_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": redis_time}},
|
||||
)
|
||||
|
||||
meta_key = _get_task_meta_key(task_id)
|
||||
op_key = _get_operation_mapping_key(operation_id)
|
||||
|
||||
hset_start = time.perf_counter()
|
||||
await redis.hset( # type: ignore[misc]
|
||||
meta_key,
|
||||
mapping={
|
||||
@@ -157,22 +131,12 @@ async def create_task(
|
||||
"created_at": task.created_at.isoformat(),
|
||||
},
|
||||
)
|
||||
hset_time = (time.perf_counter() - hset_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] redis.hset took {hset_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": hset_time}},
|
||||
)
|
||||
|
||||
await redis.expire(meta_key, config.stream_ttl)
|
||||
|
||||
# Create operation_id -> task_id mapping for webhook lookups
|
||||
await redis.set(op_key, task_id, ex=config.stream_ttl)
|
||||
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] create_task COMPLETED in {total_time:.1f}ms; task={task_id}, session={session_id}",
|
||||
extra={"json_fields": {**log_meta, "total_time_ms": total_time}},
|
||||
)
|
||||
logger.debug(f"Created task {task_id} for session {session_id}")
|
||||
|
||||
return task
|
||||
|
||||
@@ -192,60 +156,26 @@ async def publish_chunk(
|
||||
Returns:
|
||||
The Redis Stream message ID
|
||||
"""
|
||||
import time
|
||||
|
||||
start_time = time.perf_counter()
|
||||
chunk_type = type(chunk).__name__
|
||||
chunk_json = chunk.model_dump_json()
|
||||
message_id = "0-0"
|
||||
|
||||
# Build log metadata
|
||||
log_meta = {
|
||||
"component": "StreamRegistry",
|
||||
"task_id": task_id,
|
||||
"chunk_type": chunk_type,
|
||||
}
|
||||
|
||||
try:
|
||||
redis = await get_redis_async()
|
||||
stream_key = _get_task_stream_key(task_id)
|
||||
|
||||
# Write to Redis Stream for persistence and real-time delivery
|
||||
xadd_start = time.perf_counter()
|
||||
raw_id = await redis.xadd(
|
||||
stream_key,
|
||||
{"data": chunk_json},
|
||||
maxlen=config.stream_max_length,
|
||||
)
|
||||
xadd_time = (time.perf_counter() - xadd_start) * 1000
|
||||
message_id = raw_id if isinstance(raw_id, str) else raw_id.decode()
|
||||
|
||||
# Set TTL on stream to match task metadata TTL
|
||||
await redis.expire(stream_key, config.stream_ttl)
|
||||
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
# Only log timing for significant chunks or slow operations
|
||||
if (
|
||||
chunk_type
|
||||
in ("StreamStart", "StreamFinish", "StreamTextStart", "StreamTextEnd")
|
||||
or total_time > 50
|
||||
):
|
||||
logger.info(
|
||||
f"[TIMING] publish_chunk {chunk_type} in {total_time:.1f}ms (xadd={xadd_time:.1f}ms)",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"total_time_ms": total_time,
|
||||
"xadd_time_ms": xadd_time,
|
||||
"message_id": message_id,
|
||||
}
|
||||
},
|
||||
)
|
||||
except Exception as e:
|
||||
elapsed = (time.perf_counter() - start_time) * 1000
|
||||
logger.error(
|
||||
f"[TIMING] Failed to publish chunk {chunk_type} after {elapsed:.1f}ms: {e}",
|
||||
extra={"json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)}},
|
||||
f"Failed to publish chunk for task {task_id}: {e}",
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
@@ -270,61 +200,24 @@ async def subscribe_to_task(
|
||||
An asyncio Queue that will receive stream chunks, or None if task not found
|
||||
or user doesn't have access
|
||||
"""
|
||||
import time
|
||||
|
||||
start_time = time.perf_counter()
|
||||
|
||||
# Build log metadata
|
||||
log_meta = {"component": "StreamRegistry", "task_id": task_id}
|
||||
if user_id:
|
||||
log_meta["user_id"] = user_id
|
||||
|
||||
logger.info(
|
||||
f"[TIMING] subscribe_to_task STARTED, task={task_id}, user={user_id}, last_msg={last_message_id}",
|
||||
extra={"json_fields": {**log_meta, "last_message_id": last_message_id}},
|
||||
)
|
||||
|
||||
redis_start = time.perf_counter()
|
||||
redis = await get_redis_async()
|
||||
meta_key = _get_task_meta_key(task_id)
|
||||
meta: dict[Any, Any] = await redis.hgetall(meta_key) # type: ignore[misc]
|
||||
hgetall_time = (time.perf_counter() - redis_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] Redis hgetall took {hgetall_time:.1f}ms",
|
||||
extra={"json_fields": {**log_meta, "duration_ms": hgetall_time}},
|
||||
)
|
||||
|
||||
if not meta:
|
||||
elapsed = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] Task not found in Redis after {elapsed:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"elapsed_ms": elapsed,
|
||||
"reason": "task_not_found",
|
||||
}
|
||||
},
|
||||
)
|
||||
logger.debug(f"Task {task_id} not found in Redis")
|
||||
return None
|
||||
|
||||
# Note: Redis client uses decode_responses=True, so keys are strings
|
||||
task_status = meta.get("status", "")
|
||||
task_user_id = meta.get("user_id", "") or None
|
||||
log_meta["session_id"] = meta.get("session_id", "")
|
||||
|
||||
# Validate ownership - if task has an owner, requester must match
|
||||
if task_user_id:
|
||||
if user_id != task_user_id:
|
||||
logger.warning(
|
||||
f"[TIMING] Access denied: user {user_id} tried to access task owned by {task_user_id}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"task_owner": task_user_id,
|
||||
"reason": "access_denied",
|
||||
}
|
||||
},
|
||||
f"User {user_id} denied access to task {task_id} "
|
||||
f"owned by {task_user_id}"
|
||||
)
|
||||
return None
|
||||
|
||||
@@ -332,19 +225,7 @@ async def subscribe_to_task(
|
||||
stream_key = _get_task_stream_key(task_id)
|
||||
|
||||
# Step 1: Replay messages from Redis Stream
|
||||
xread_start = time.perf_counter()
|
||||
messages = await redis.xread({stream_key: last_message_id}, block=0, count=1000)
|
||||
xread_time = (time.perf_counter() - xread_start) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] Redis xread (replay) took {xread_time:.1f}ms, status={task_status}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"duration_ms": xread_time,
|
||||
"task_status": task_status,
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
replayed_count = 0
|
||||
replay_last_id = last_message_id
|
||||
@@ -363,48 +244,19 @@ async def subscribe_to_task(
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to replay message: {e}")
|
||||
|
||||
logger.info(
|
||||
f"[TIMING] Replayed {replayed_count} messages, last_id={replay_last_id}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"n_messages_replayed": replayed_count,
|
||||
"replay_last_id": replay_last_id,
|
||||
}
|
||||
},
|
||||
)
|
||||
logger.debug(f"Task {task_id}: replayed {replayed_count} messages")
|
||||
|
||||
# Step 2: If task is still running, start stream listener for live updates
|
||||
if task_status == "running":
|
||||
logger.info(
|
||||
"[TIMING] Task still running, starting _stream_listener",
|
||||
extra={"json_fields": {**log_meta, "task_status": task_status}},
|
||||
)
|
||||
listener_task = asyncio.create_task(
|
||||
_stream_listener(task_id, subscriber_queue, replay_last_id, log_meta)
|
||||
_stream_listener(task_id, subscriber_queue, replay_last_id)
|
||||
)
|
||||
# Track listener task for cleanup on unsubscribe
|
||||
_listener_tasks[id(subscriber_queue)] = (task_id, listener_task)
|
||||
else:
|
||||
# Task is completed/failed - add finish marker
|
||||
logger.info(
|
||||
f"[TIMING] Task already {task_status}, adding StreamFinish",
|
||||
extra={"json_fields": {**log_meta, "task_status": task_status}},
|
||||
)
|
||||
await subscriber_queue.put(StreamFinish())
|
||||
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] subscribe_to_task COMPLETED in {total_time:.1f}ms; task={task_id}, "
|
||||
f"n_messages_replayed={replayed_count}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"total_time_ms": total_time,
|
||||
"n_messages_replayed": replayed_count,
|
||||
}
|
||||
},
|
||||
)
|
||||
return subscriber_queue
|
||||
|
||||
|
||||
@@ -412,7 +264,6 @@ async def _stream_listener(
|
||||
task_id: str,
|
||||
subscriber_queue: asyncio.Queue[StreamBaseResponse],
|
||||
last_replayed_id: str,
|
||||
log_meta: dict | None = None,
|
||||
) -> None:
|
||||
"""Listen to Redis Stream for new messages using blocking XREAD.
|
||||
|
||||
@@ -423,27 +274,10 @@ async def _stream_listener(
|
||||
task_id: Task ID to listen for
|
||||
subscriber_queue: Queue to deliver messages to
|
||||
last_replayed_id: Last message ID from replay (continue from here)
|
||||
log_meta: Structured logging metadata
|
||||
"""
|
||||
import time
|
||||
|
||||
start_time = time.perf_counter()
|
||||
|
||||
# Use provided log_meta or build minimal one
|
||||
if log_meta is None:
|
||||
log_meta = {"component": "StreamRegistry", "task_id": task_id}
|
||||
|
||||
logger.info(
|
||||
f"[TIMING] _stream_listener STARTED, task={task_id}, last_id={last_replayed_id}",
|
||||
extra={"json_fields": {**log_meta, "last_replayed_id": last_replayed_id}},
|
||||
)
|
||||
|
||||
queue_id = id(subscriber_queue)
|
||||
# Track the last successfully delivered message ID for recovery hints
|
||||
last_delivered_id = last_replayed_id
|
||||
messages_delivered = 0
|
||||
first_message_time = None
|
||||
xread_count = 0
|
||||
|
||||
try:
|
||||
redis = await get_redis_async()
|
||||
@@ -453,39 +287,9 @@ async def _stream_listener(
|
||||
while True:
|
||||
# Block for up to 30 seconds waiting for new messages
|
||||
# This allows periodic checking if task is still running
|
||||
xread_start = time.perf_counter()
|
||||
xread_count += 1
|
||||
messages = await redis.xread(
|
||||
{stream_key: current_id}, block=30000, count=100
|
||||
)
|
||||
xread_time = (time.perf_counter() - xread_start) * 1000
|
||||
|
||||
if messages:
|
||||
msg_count = sum(len(msgs) for _, msgs in messages)
|
||||
logger.info(
|
||||
f"[TIMING] xread #{xread_count} returned {msg_count} messages in {xread_time:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"xread_count": xread_count,
|
||||
"n_messages": msg_count,
|
||||
"duration_ms": xread_time,
|
||||
}
|
||||
},
|
||||
)
|
||||
elif xread_time > 1000:
|
||||
# Only log timeouts (30s blocking)
|
||||
logger.info(
|
||||
f"[TIMING] xread #{xread_count} timeout after {xread_time:.1f}ms",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"xread_count": xread_count,
|
||||
"duration_ms": xread_time,
|
||||
"reason": "timeout",
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
if not messages:
|
||||
# Timeout - check if task is still running
|
||||
@@ -522,30 +326,10 @@ async def _stream_listener(
|
||||
)
|
||||
# Update last delivered ID on successful delivery
|
||||
last_delivered_id = current_id
|
||||
messages_delivered += 1
|
||||
if first_message_time is None:
|
||||
first_message_time = time.perf_counter()
|
||||
elapsed = (first_message_time - start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] FIRST live message at {elapsed:.1f}ms, type={type(chunk).__name__}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"elapsed_ms": elapsed,
|
||||
"chunk_type": type(chunk).__name__,
|
||||
}
|
||||
},
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(
|
||||
f"[TIMING] Subscriber queue full, delivery timed out after {QUEUE_PUT_TIMEOUT}s",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"timeout_s": QUEUE_PUT_TIMEOUT,
|
||||
"reason": "queue_full",
|
||||
}
|
||||
},
|
||||
f"Subscriber queue full for task {task_id}, "
|
||||
f"message delivery timed out after {QUEUE_PUT_TIMEOUT}s"
|
||||
)
|
||||
# Send overflow error with recovery info
|
||||
try:
|
||||
@@ -567,44 +351,15 @@ async def _stream_listener(
|
||||
|
||||
# Stop listening on finish
|
||||
if isinstance(chunk, StreamFinish):
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] StreamFinish received in {total_time/1000:.1f}s; delivered={messages_delivered}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"total_time_ms": total_time,
|
||||
"messages_delivered": messages_delivered,
|
||||
}
|
||||
},
|
||||
)
|
||||
return
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Error processing stream message: {e}",
|
||||
extra={"json_fields": {**log_meta, "error": str(e)}},
|
||||
)
|
||||
logger.warning(f"Error processing stream message: {e}")
|
||||
|
||||
except asyncio.CancelledError:
|
||||
elapsed = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] _stream_listener CANCELLED after {elapsed:.1f}ms, delivered={messages_delivered}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"elapsed_ms": elapsed,
|
||||
"messages_delivered": messages_delivered,
|
||||
"reason": "cancelled",
|
||||
}
|
||||
},
|
||||
)
|
||||
logger.debug(f"Stream listener cancelled for task {task_id}")
|
||||
raise # Re-raise to propagate cancellation
|
||||
except Exception as e:
|
||||
elapsed = (time.perf_counter() - start_time) * 1000
|
||||
logger.error(
|
||||
f"[TIMING] _stream_listener ERROR after {elapsed:.1f}ms: {e}",
|
||||
extra={"json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)}},
|
||||
)
|
||||
logger.error(f"Stream listener error for task {task_id}: {e}")
|
||||
# On error, send finish to unblock subscriber
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
@@ -613,24 +368,10 @@ async def _stream_listener(
|
||||
)
|
||||
except (asyncio.TimeoutError, asyncio.QueueFull):
|
||||
logger.warning(
|
||||
"Could not deliver finish event after error",
|
||||
extra={"json_fields": log_meta},
|
||||
f"Could not deliver finish event for task {task_id} after error"
|
||||
)
|
||||
finally:
|
||||
# Clean up listener task mapping on exit
|
||||
total_time = (time.perf_counter() - start_time) * 1000
|
||||
logger.info(
|
||||
f"[TIMING] _stream_listener FINISHED in {total_time/1000:.1f}s; task={task_id}, "
|
||||
f"delivered={messages_delivered}, xread_count={xread_count}",
|
||||
extra={
|
||||
"json_fields": {
|
||||
**log_meta,
|
||||
"total_time_ms": total_time,
|
||||
"messages_delivered": messages_delivered,
|
||||
"xread_count": xread_count,
|
||||
}
|
||||
},
|
||||
)
|
||||
_listener_tasks.pop(queue_id, None)
|
||||
|
||||
|
||||
|
||||
@@ -13,32 +13,10 @@ from backend.api.features.chat.tools.models import (
|
||||
NoResultsResponse,
|
||||
)
|
||||
from backend.api.features.store.hybrid_search import unified_hybrid_search
|
||||
from backend.data.block import BlockType, get_block
|
||||
from backend.data.block import get_block
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_TARGET_RESULTS = 10
|
||||
# Over-fetch to compensate for post-hoc filtering of graph-only blocks.
|
||||
# 40 is 2x current removed; speed of query 10 vs 40 is minimial
|
||||
_OVERFETCH_PAGE_SIZE = 40
|
||||
|
||||
# Block types that only work within graphs and cannot run standalone in CoPilot.
|
||||
COPILOT_EXCLUDED_BLOCK_TYPES = {
|
||||
BlockType.INPUT, # Graph interface definition - data enters via chat, not graph inputs
|
||||
BlockType.OUTPUT, # Graph interface definition - data exits via chat, not graph outputs
|
||||
BlockType.WEBHOOK, # Wait for external events - would hang forever in CoPilot
|
||||
BlockType.WEBHOOK_MANUAL, # Same as WEBHOOK
|
||||
BlockType.NOTE, # Visual annotation only - no runtime behavior
|
||||
BlockType.HUMAN_IN_THE_LOOP, # Pauses for human approval - CoPilot IS human-in-the-loop
|
||||
BlockType.AGENT, # AgentExecutorBlock requires execution_context - use run_agent tool
|
||||
}
|
||||
|
||||
# Specific block IDs excluded from CoPilot (STANDARD type but still require graph context)
|
||||
COPILOT_EXCLUDED_BLOCK_IDS = {
|
||||
# SmartDecisionMakerBlock - dynamically discovers downstream blocks via graph topology
|
||||
"3b191d9f-356f-482d-8238-ba04b6d18381",
|
||||
}
|
||||
|
||||
|
||||
class FindBlockTool(BaseTool):
|
||||
"""Tool for searching available blocks."""
|
||||
@@ -110,7 +88,7 @@ class FindBlockTool(BaseTool):
|
||||
query=query,
|
||||
content_types=[ContentType.BLOCK],
|
||||
page=1,
|
||||
page_size=_OVERFETCH_PAGE_SIZE,
|
||||
page_size=10,
|
||||
)
|
||||
|
||||
if not results:
|
||||
@@ -130,90 +108,60 @@ class FindBlockTool(BaseTool):
|
||||
block = get_block(block_id)
|
||||
|
||||
# Skip disabled blocks
|
||||
if not block or block.disabled:
|
||||
continue
|
||||
if block and not block.disabled:
|
||||
# Get input/output schemas
|
||||
input_schema = {}
|
||||
output_schema = {}
|
||||
try:
|
||||
input_schema = block.input_schema.jsonschema()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
output_schema = block.output_schema.jsonschema()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Skip blocks excluded from CoPilot (graph-only blocks)
|
||||
if (
|
||||
block.block_type in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
or block.id in COPILOT_EXCLUDED_BLOCK_IDS
|
||||
):
|
||||
continue
|
||||
# Get categories from block instance
|
||||
categories = []
|
||||
if hasattr(block, "categories") and block.categories:
|
||||
categories = [cat.value for cat in block.categories]
|
||||
|
||||
# Get input/output schemas
|
||||
input_schema = {}
|
||||
output_schema = {}
|
||||
try:
|
||||
input_schema = block.input_schema.jsonschema()
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"Failed to generate input schema for block %s: %s",
|
||||
block_id,
|
||||
e,
|
||||
)
|
||||
try:
|
||||
output_schema = block.output_schema.jsonschema()
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
"Failed to generate output schema for block %s: %s",
|
||||
block_id,
|
||||
e,
|
||||
)
|
||||
|
||||
# Get categories from block instance
|
||||
categories = []
|
||||
if hasattr(block, "categories") and block.categories:
|
||||
categories = [cat.value for cat in block.categories]
|
||||
|
||||
# Extract required inputs for easier use
|
||||
required_inputs: list[BlockInputFieldInfo] = []
|
||||
if input_schema:
|
||||
properties = input_schema.get("properties", {})
|
||||
required_fields = set(input_schema.get("required", []))
|
||||
# Get credential field names to exclude from required inputs
|
||||
credentials_fields = set(
|
||||
block.input_schema.get_credentials_fields().keys()
|
||||
)
|
||||
|
||||
for field_name, field_schema in properties.items():
|
||||
# Skip credential fields - they're handled separately
|
||||
if field_name in credentials_fields:
|
||||
continue
|
||||
|
||||
required_inputs.append(
|
||||
BlockInputFieldInfo(
|
||||
name=field_name,
|
||||
type=field_schema.get("type", "string"),
|
||||
description=field_schema.get("description", ""),
|
||||
required=field_name in required_fields,
|
||||
default=field_schema.get("default"),
|
||||
)
|
||||
# Extract required inputs for easier use
|
||||
required_inputs: list[BlockInputFieldInfo] = []
|
||||
if input_schema:
|
||||
properties = input_schema.get("properties", {})
|
||||
required_fields = set(input_schema.get("required", []))
|
||||
# Get credential field names to exclude from required inputs
|
||||
credentials_fields = set(
|
||||
block.input_schema.get_credentials_fields().keys()
|
||||
)
|
||||
|
||||
blocks.append(
|
||||
BlockInfoSummary(
|
||||
id=block_id,
|
||||
name=block.name,
|
||||
description=block.description or "",
|
||||
categories=categories,
|
||||
input_schema=input_schema,
|
||||
output_schema=output_schema,
|
||||
required_inputs=required_inputs,
|
||||
for field_name, field_schema in properties.items():
|
||||
# Skip credential fields - they're handled separately
|
||||
if field_name in credentials_fields:
|
||||
continue
|
||||
|
||||
required_inputs.append(
|
||||
BlockInputFieldInfo(
|
||||
name=field_name,
|
||||
type=field_schema.get("type", "string"),
|
||||
description=field_schema.get("description", ""),
|
||||
required=field_name in required_fields,
|
||||
default=field_schema.get("default"),
|
||||
)
|
||||
)
|
||||
|
||||
blocks.append(
|
||||
BlockInfoSummary(
|
||||
id=block_id,
|
||||
name=block.name,
|
||||
description=block.description or "",
|
||||
categories=categories,
|
||||
input_schema=input_schema,
|
||||
output_schema=output_schema,
|
||||
required_inputs=required_inputs,
|
||||
)
|
||||
)
|
||||
)
|
||||
|
||||
if len(blocks) >= _TARGET_RESULTS:
|
||||
break
|
||||
|
||||
if blocks and len(blocks) < _TARGET_RESULTS:
|
||||
logger.debug(
|
||||
"find_block returned %d/%d results for query '%s' "
|
||||
"(filtered %d excluded/disabled blocks)",
|
||||
len(blocks),
|
||||
_TARGET_RESULTS,
|
||||
query,
|
||||
len(results) - len(blocks),
|
||||
)
|
||||
|
||||
if not blocks:
|
||||
return NoResultsResponse(
|
||||
|
||||
@@ -1,139 +0,0 @@
|
||||
"""Tests for block filtering in FindBlockTool."""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from backend.api.features.chat.tools.find_block import (
|
||||
COPILOT_EXCLUDED_BLOCK_IDS,
|
||||
COPILOT_EXCLUDED_BLOCK_TYPES,
|
||||
FindBlockTool,
|
||||
)
|
||||
from backend.api.features.chat.tools.models import BlockListResponse
|
||||
from backend.data.block import BlockType
|
||||
|
||||
from ._test_data import make_session
|
||||
|
||||
_TEST_USER_ID = "test-user-find-block"
|
||||
|
||||
|
||||
def make_mock_block(
|
||||
block_id: str, name: str, block_type: BlockType, disabled: bool = False
|
||||
):
|
||||
"""Create a mock block for testing."""
|
||||
mock = MagicMock()
|
||||
mock.id = block_id
|
||||
mock.name = name
|
||||
mock.description = f"{name} description"
|
||||
mock.block_type = block_type
|
||||
mock.disabled = disabled
|
||||
mock.input_schema = MagicMock()
|
||||
mock.input_schema.jsonschema.return_value = {"properties": {}, "required": []}
|
||||
mock.input_schema.get_credentials_fields.return_value = {}
|
||||
mock.output_schema = MagicMock()
|
||||
mock.output_schema.jsonschema.return_value = {}
|
||||
mock.categories = []
|
||||
return mock
|
||||
|
||||
|
||||
class TestFindBlockFiltering:
|
||||
"""Tests for block filtering in FindBlockTool."""
|
||||
|
||||
def test_excluded_block_types_contains_expected_types(self):
|
||||
"""Verify COPILOT_EXCLUDED_BLOCK_TYPES contains all graph-only types."""
|
||||
assert BlockType.INPUT in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
assert BlockType.OUTPUT in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
assert BlockType.WEBHOOK in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
assert BlockType.WEBHOOK_MANUAL in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
assert BlockType.NOTE in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
assert BlockType.HUMAN_IN_THE_LOOP in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
assert BlockType.AGENT in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
|
||||
def test_excluded_block_ids_contains_smart_decision_maker(self):
|
||||
"""Verify SmartDecisionMakerBlock is in COPILOT_EXCLUDED_BLOCK_IDS."""
|
||||
assert "3b191d9f-356f-482d-8238-ba04b6d18381" in COPILOT_EXCLUDED_BLOCK_IDS
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_excluded_block_type_filtered_from_results(self):
|
||||
"""Verify blocks with excluded BlockTypes are filtered from search results."""
|
||||
session = make_session(user_id=_TEST_USER_ID)
|
||||
|
||||
# Mock search returns an INPUT block (excluded) and a STANDARD block (included)
|
||||
search_results = [
|
||||
{"content_id": "input-block-id", "score": 0.9},
|
||||
{"content_id": "standard-block-id", "score": 0.8},
|
||||
]
|
||||
|
||||
input_block = make_mock_block("input-block-id", "Input Block", BlockType.INPUT)
|
||||
standard_block = make_mock_block(
|
||||
"standard-block-id", "HTTP Request", BlockType.STANDARD
|
||||
)
|
||||
|
||||
def mock_get_block(block_id):
|
||||
return {
|
||||
"input-block-id": input_block,
|
||||
"standard-block-id": standard_block,
|
||||
}.get(block_id)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.tools.find_block.unified_hybrid_search",
|
||||
new_callable=AsyncMock,
|
||||
return_value=(search_results, 2),
|
||||
):
|
||||
with patch(
|
||||
"backend.api.features.chat.tools.find_block.get_block",
|
||||
side_effect=mock_get_block,
|
||||
):
|
||||
tool = FindBlockTool()
|
||||
response = await tool._execute(
|
||||
user_id=_TEST_USER_ID, session=session, query="test"
|
||||
)
|
||||
|
||||
# Should only return the standard block, not the INPUT block
|
||||
assert isinstance(response, BlockListResponse)
|
||||
assert len(response.blocks) == 1
|
||||
assert response.blocks[0].id == "standard-block-id"
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_excluded_block_id_filtered_from_results(self):
|
||||
"""Verify SmartDecisionMakerBlock is filtered from search results."""
|
||||
session = make_session(user_id=_TEST_USER_ID)
|
||||
|
||||
smart_decision_id = "3b191d9f-356f-482d-8238-ba04b6d18381"
|
||||
search_results = [
|
||||
{"content_id": smart_decision_id, "score": 0.9},
|
||||
{"content_id": "normal-block-id", "score": 0.8},
|
||||
]
|
||||
|
||||
# SmartDecisionMakerBlock has STANDARD type but is excluded by ID
|
||||
smart_block = make_mock_block(
|
||||
smart_decision_id, "Smart Decision Maker", BlockType.STANDARD
|
||||
)
|
||||
normal_block = make_mock_block(
|
||||
"normal-block-id", "Normal Block", BlockType.STANDARD
|
||||
)
|
||||
|
||||
def mock_get_block(block_id):
|
||||
return {
|
||||
smart_decision_id: smart_block,
|
||||
"normal-block-id": normal_block,
|
||||
}.get(block_id)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.tools.find_block.unified_hybrid_search",
|
||||
new_callable=AsyncMock,
|
||||
return_value=(search_results, 2),
|
||||
):
|
||||
with patch(
|
||||
"backend.api.features.chat.tools.find_block.get_block",
|
||||
side_effect=mock_get_block,
|
||||
):
|
||||
tool = FindBlockTool()
|
||||
response = await tool._execute(
|
||||
user_id=_TEST_USER_ID, session=session, query="decision"
|
||||
)
|
||||
|
||||
# Should only return normal block, not SmartDecisionMakerBlock
|
||||
assert isinstance(response, BlockListResponse)
|
||||
assert len(response.blocks) == 1
|
||||
assert response.blocks[0].id == "normal-block-id"
|
||||
@@ -1,29 +0,0 @@
|
||||
"""Shared helpers for chat tools."""
|
||||
|
||||
from typing import Any
|
||||
|
||||
|
||||
def get_inputs_from_schema(
|
||||
input_schema: dict[str, Any],
|
||||
exclude_fields: set[str] | None = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Extract input field info from JSON schema."""
|
||||
if not isinstance(input_schema, dict):
|
||||
return []
|
||||
|
||||
exclude = exclude_fields or set()
|
||||
properties = input_schema.get("properties", {})
|
||||
required = set(input_schema.get("required", []))
|
||||
|
||||
return [
|
||||
{
|
||||
"name": name,
|
||||
"title": schema.get("title", name),
|
||||
"type": schema.get("type", "string"),
|
||||
"description": schema.get("description", ""),
|
||||
"required": name in required,
|
||||
"default": schema.get("default"),
|
||||
}
|
||||
for name, schema in properties.items()
|
||||
if name not in exclude
|
||||
]
|
||||
@@ -24,7 +24,6 @@ from backend.util.timezone_utils import (
|
||||
)
|
||||
|
||||
from .base import BaseTool
|
||||
from .helpers import get_inputs_from_schema
|
||||
from .models import (
|
||||
AgentDetails,
|
||||
AgentDetailsResponse,
|
||||
@@ -262,7 +261,7 @@ class RunAgentTool(BaseTool):
|
||||
),
|
||||
requirements={
|
||||
"credentials": requirements_creds_list,
|
||||
"inputs": get_inputs_from_schema(graph.input_schema),
|
||||
"inputs": self._get_inputs_list(graph.input_schema),
|
||||
"execution_modes": self._get_execution_modes(graph),
|
||||
},
|
||||
),
|
||||
@@ -370,6 +369,22 @@ class RunAgentTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
def _get_inputs_list(self, input_schema: dict[str, Any]) -> list[dict[str, Any]]:
|
||||
"""Extract inputs list from schema."""
|
||||
inputs_list = []
|
||||
if isinstance(input_schema, dict) and "properties" in input_schema:
|
||||
for field_name, field_schema in input_schema["properties"].items():
|
||||
inputs_list.append(
|
||||
{
|
||||
"name": field_name,
|
||||
"title": field_schema.get("title", field_name),
|
||||
"type": field_schema.get("type", "string"),
|
||||
"description": field_schema.get("description", ""),
|
||||
"required": field_name in input_schema.get("required", []),
|
||||
}
|
||||
)
|
||||
return inputs_list
|
||||
|
||||
def _get_execution_modes(self, graph: GraphModel) -> list[str]:
|
||||
"""Get available execution modes for the graph."""
|
||||
trigger_info = graph.trigger_setup_info
|
||||
@@ -383,7 +398,7 @@ class RunAgentTool(BaseTool):
|
||||
suffix: str,
|
||||
) -> str:
|
||||
"""Build a message describing available inputs for an agent."""
|
||||
inputs_list = get_inputs_from_schema(graph.input_schema)
|
||||
inputs_list = self._get_inputs_list(graph.input_schema)
|
||||
required_names = [i["name"] for i in inputs_list if i["required"]]
|
||||
optional_names = [i["name"] for i in inputs_list if not i["required"]]
|
||||
|
||||
|
||||
@@ -8,19 +8,14 @@ from typing import Any
|
||||
from pydantic_core import PydanticUndefined
|
||||
|
||||
from backend.api.features.chat.model import ChatSession
|
||||
from backend.api.features.chat.tools.find_block import (
|
||||
COPILOT_EXCLUDED_BLOCK_IDS,
|
||||
COPILOT_EXCLUDED_BLOCK_TYPES,
|
||||
)
|
||||
from backend.data.block import AnyBlockSchema, get_block
|
||||
from backend.data.block import get_block
|
||||
from backend.data.execution import ExecutionContext
|
||||
from backend.data.model import CredentialsFieldInfo, CredentialsMetaInput
|
||||
from backend.data.model import CredentialsMetaInput
|
||||
from backend.data.workspace import get_or_create_workspace
|
||||
from backend.integrations.creds_manager import IntegrationCredentialsManager
|
||||
from backend.util.exceptions import BlockError
|
||||
|
||||
from .base import BaseTool
|
||||
from .helpers import get_inputs_from_schema
|
||||
from .models import (
|
||||
BlockOutputResponse,
|
||||
ErrorResponse,
|
||||
@@ -29,10 +24,7 @@ from .models import (
|
||||
ToolResponseBase,
|
||||
UserReadiness,
|
||||
)
|
||||
from .utils import (
|
||||
build_missing_credentials_from_field_info,
|
||||
match_credentials_to_requirements,
|
||||
)
|
||||
from .utils import build_missing_credentials_from_field_info
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -81,6 +73,91 @@ class RunBlockTool(BaseTool):
|
||||
def requires_auth(self) -> bool:
|
||||
return True
|
||||
|
||||
async def _check_block_credentials(
|
||||
self,
|
||||
user_id: str,
|
||||
block: Any,
|
||||
input_data: dict[str, Any] | None = None,
|
||||
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
|
||||
"""
|
||||
Check if user has required credentials for a block.
|
||||
|
||||
Args:
|
||||
user_id: User ID
|
||||
block: Block to check credentials for
|
||||
input_data: Input data for the block (used to determine provider via discriminator)
|
||||
|
||||
Returns:
|
||||
tuple[matched_credentials, missing_credentials]
|
||||
"""
|
||||
matched_credentials: dict[str, CredentialsMetaInput] = {}
|
||||
missing_credentials: list[CredentialsMetaInput] = []
|
||||
input_data = input_data or {}
|
||||
|
||||
# Get credential field info from block's input schema
|
||||
credentials_fields_info = block.input_schema.get_credentials_fields_info()
|
||||
|
||||
if not credentials_fields_info:
|
||||
return matched_credentials, missing_credentials
|
||||
|
||||
# Get user's available credentials
|
||||
creds_manager = IntegrationCredentialsManager()
|
||||
available_creds = await creds_manager.store.get_all_creds(user_id)
|
||||
|
||||
for field_name, field_info in credentials_fields_info.items():
|
||||
effective_field_info = field_info
|
||||
if field_info.discriminator and field_info.discriminator_mapping:
|
||||
# Get discriminator from input, falling back to schema default
|
||||
discriminator_value = input_data.get(field_info.discriminator)
|
||||
if discriminator_value is None:
|
||||
field = block.input_schema.model_fields.get(
|
||||
field_info.discriminator
|
||||
)
|
||||
if field and field.default is not PydanticUndefined:
|
||||
discriminator_value = field.default
|
||||
|
||||
if (
|
||||
discriminator_value
|
||||
and discriminator_value in field_info.discriminator_mapping
|
||||
):
|
||||
effective_field_info = field_info.discriminate(discriminator_value)
|
||||
logger.debug(
|
||||
f"Discriminated provider for {field_name}: "
|
||||
f"{discriminator_value} -> {effective_field_info.provider}"
|
||||
)
|
||||
|
||||
matching_cred = next(
|
||||
(
|
||||
cred
|
||||
for cred in available_creds
|
||||
if cred.provider in effective_field_info.provider
|
||||
and cred.type in effective_field_info.supported_types
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
if matching_cred:
|
||||
matched_credentials[field_name] = CredentialsMetaInput(
|
||||
id=matching_cred.id,
|
||||
provider=matching_cred.provider, # type: ignore
|
||||
type=matching_cred.type,
|
||||
title=matching_cred.title,
|
||||
)
|
||||
else:
|
||||
# Create a placeholder for the missing credential
|
||||
provider = next(iter(effective_field_info.provider), "unknown")
|
||||
cred_type = next(iter(effective_field_info.supported_types), "api_key")
|
||||
missing_credentials.append(
|
||||
CredentialsMetaInput(
|
||||
id=field_name,
|
||||
provider=provider, # type: ignore
|
||||
type=cred_type, # type: ignore
|
||||
title=field_name.replace("_", " ").title(),
|
||||
)
|
||||
)
|
||||
|
||||
return matched_credentials, missing_credentials
|
||||
|
||||
async def _execute(
|
||||
self,
|
||||
user_id: str | None,
|
||||
@@ -135,24 +212,11 @@ class RunBlockTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
# Check if block is excluded from CoPilot (graph-only blocks)
|
||||
if (
|
||||
block.block_type in COPILOT_EXCLUDED_BLOCK_TYPES
|
||||
or block.id in COPILOT_EXCLUDED_BLOCK_IDS
|
||||
):
|
||||
return ErrorResponse(
|
||||
message=(
|
||||
f"Block '{block.name}' cannot be run directly in CoPilot. "
|
||||
"This block is designed for use within graphs only."
|
||||
),
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
logger.info(f"Executing block {block.name} ({block_id}) for user {user_id}")
|
||||
|
||||
creds_manager = IntegrationCredentialsManager()
|
||||
matched_credentials, missing_credentials = (
|
||||
await self._resolve_block_credentials(user_id, block, input_data)
|
||||
matched_credentials, missing_credentials = await self._check_block_credentials(
|
||||
user_id, block, input_data
|
||||
)
|
||||
|
||||
if missing_credentials:
|
||||
@@ -281,75 +345,29 @@ class RunBlockTool(BaseTool):
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
async def _resolve_block_credentials(
|
||||
self,
|
||||
user_id: str,
|
||||
block: AnyBlockSchema,
|
||||
input_data: dict[str, Any] | None = None,
|
||||
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
|
||||
"""
|
||||
Resolve credentials for a block by matching user's available credentials.
|
||||
|
||||
Args:
|
||||
user_id: User ID
|
||||
block: Block to resolve credentials for
|
||||
input_data: Input data for the block (used to determine provider via discriminator)
|
||||
|
||||
Returns:
|
||||
tuple of (matched_credentials, missing_credentials) - matched credentials
|
||||
are used for block execution, missing ones indicate setup requirements.
|
||||
"""
|
||||
input_data = input_data or {}
|
||||
requirements = self._resolve_discriminated_credentials(block, input_data)
|
||||
|
||||
if not requirements:
|
||||
return {}, []
|
||||
|
||||
return await match_credentials_to_requirements(user_id, requirements)
|
||||
|
||||
def _get_inputs_list(self, block: AnyBlockSchema) -> list[dict[str, Any]]:
|
||||
def _get_inputs_list(self, block: Any) -> list[dict[str, Any]]:
|
||||
"""Extract non-credential inputs from block schema."""
|
||||
inputs_list = []
|
||||
schema = block.input_schema.jsonschema()
|
||||
properties = schema.get("properties", {})
|
||||
required_fields = set(schema.get("required", []))
|
||||
|
||||
# Get credential field names to exclude
|
||||
credentials_fields = set(block.input_schema.get_credentials_fields().keys())
|
||||
return get_inputs_from_schema(schema, exclude_fields=credentials_fields)
|
||||
|
||||
def _resolve_discriminated_credentials(
|
||||
self,
|
||||
block: AnyBlockSchema,
|
||||
input_data: dict[str, Any],
|
||||
) -> dict[str, CredentialsFieldInfo]:
|
||||
"""Resolve credential requirements, applying discriminator logic where needed."""
|
||||
credentials_fields_info = block.input_schema.get_credentials_fields_info()
|
||||
if not credentials_fields_info:
|
||||
return {}
|
||||
for field_name, field_schema in properties.items():
|
||||
# Skip credential fields
|
||||
if field_name in credentials_fields:
|
||||
continue
|
||||
|
||||
resolved: dict[str, CredentialsFieldInfo] = {}
|
||||
inputs_list.append(
|
||||
{
|
||||
"name": field_name,
|
||||
"title": field_schema.get("title", field_name),
|
||||
"type": field_schema.get("type", "string"),
|
||||
"description": field_schema.get("description", ""),
|
||||
"required": field_name in required_fields,
|
||||
}
|
||||
)
|
||||
|
||||
for field_name, field_info in credentials_fields_info.items():
|
||||
effective_field_info = field_info
|
||||
|
||||
if field_info.discriminator and field_info.discriminator_mapping:
|
||||
discriminator_value = input_data.get(field_info.discriminator)
|
||||
if discriminator_value is None:
|
||||
field = block.input_schema.model_fields.get(
|
||||
field_info.discriminator
|
||||
)
|
||||
if field and field.default is not PydanticUndefined:
|
||||
discriminator_value = field.default
|
||||
|
||||
if (
|
||||
discriminator_value
|
||||
and discriminator_value in field_info.discriminator_mapping
|
||||
):
|
||||
effective_field_info = field_info.discriminate(discriminator_value)
|
||||
# For host-scoped credentials, add the discriminator value
|
||||
# (e.g., URL) so _credential_is_for_host can match it
|
||||
effective_field_info.discriminator_values.add(discriminator_value)
|
||||
logger.debug(
|
||||
f"Discriminated provider for {field_name}: "
|
||||
f"{discriminator_value} -> {effective_field_info.provider}"
|
||||
)
|
||||
|
||||
resolved[field_name] = effective_field_info
|
||||
|
||||
return resolved
|
||||
return inputs_list
|
||||
|
||||
@@ -1,106 +0,0 @@
|
||||
"""Tests for block execution guards in RunBlockTool."""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from backend.api.features.chat.tools.models import ErrorResponse
|
||||
from backend.api.features.chat.tools.run_block import RunBlockTool
|
||||
from backend.data.block import BlockType
|
||||
|
||||
from ._test_data import make_session
|
||||
|
||||
_TEST_USER_ID = "test-user-run-block"
|
||||
|
||||
|
||||
def make_mock_block(
|
||||
block_id: str, name: str, block_type: BlockType, disabled: bool = False
|
||||
):
|
||||
"""Create a mock block for testing."""
|
||||
mock = MagicMock()
|
||||
mock.id = block_id
|
||||
mock.name = name
|
||||
mock.block_type = block_type
|
||||
mock.disabled = disabled
|
||||
mock.input_schema = MagicMock()
|
||||
mock.input_schema.jsonschema.return_value = {"properties": {}, "required": []}
|
||||
mock.input_schema.get_credentials_fields_info.return_value = []
|
||||
return mock
|
||||
|
||||
|
||||
class TestRunBlockFiltering:
|
||||
"""Tests for block execution guards in RunBlockTool."""
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_excluded_block_type_returns_error(self):
|
||||
"""Attempting to execute a block with excluded BlockType returns error."""
|
||||
session = make_session(user_id=_TEST_USER_ID)
|
||||
|
||||
input_block = make_mock_block("input-block-id", "Input Block", BlockType.INPUT)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.tools.run_block.get_block",
|
||||
return_value=input_block,
|
||||
):
|
||||
tool = RunBlockTool()
|
||||
response = await tool._execute(
|
||||
user_id=_TEST_USER_ID,
|
||||
session=session,
|
||||
block_id="input-block-id",
|
||||
input_data={},
|
||||
)
|
||||
|
||||
assert isinstance(response, ErrorResponse)
|
||||
assert "cannot be run directly in CoPilot" in response.message
|
||||
assert "designed for use within graphs only" in response.message
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_excluded_block_id_returns_error(self):
|
||||
"""Attempting to execute SmartDecisionMakerBlock returns error."""
|
||||
session = make_session(user_id=_TEST_USER_ID)
|
||||
|
||||
smart_decision_id = "3b191d9f-356f-482d-8238-ba04b6d18381"
|
||||
smart_block = make_mock_block(
|
||||
smart_decision_id, "Smart Decision Maker", BlockType.STANDARD
|
||||
)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.tools.run_block.get_block",
|
||||
return_value=smart_block,
|
||||
):
|
||||
tool = RunBlockTool()
|
||||
response = await tool._execute(
|
||||
user_id=_TEST_USER_ID,
|
||||
session=session,
|
||||
block_id=smart_decision_id,
|
||||
input_data={},
|
||||
)
|
||||
|
||||
assert isinstance(response, ErrorResponse)
|
||||
assert "cannot be run directly in CoPilot" in response.message
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
async def test_non_excluded_block_passes_guard(self):
|
||||
"""Non-excluded blocks pass the filtering guard (may fail later for other reasons)."""
|
||||
session = make_session(user_id=_TEST_USER_ID)
|
||||
|
||||
standard_block = make_mock_block(
|
||||
"standard-id", "HTTP Request", BlockType.STANDARD
|
||||
)
|
||||
|
||||
with patch(
|
||||
"backend.api.features.chat.tools.run_block.get_block",
|
||||
return_value=standard_block,
|
||||
):
|
||||
tool = RunBlockTool()
|
||||
response = await tool._execute(
|
||||
user_id=_TEST_USER_ID,
|
||||
session=session,
|
||||
block_id="standard-id",
|
||||
input_data={},
|
||||
)
|
||||
|
||||
# Should NOT be an ErrorResponse about CoPilot exclusion
|
||||
# (may be other errors like missing credentials, but not the exclusion guard)
|
||||
if isinstance(response, ErrorResponse):
|
||||
assert "cannot be run directly in CoPilot" not in response.message
|
||||
@@ -8,7 +8,6 @@ from backend.api.features.library import model as library_model
|
||||
from backend.api.features.store import db as store_db
|
||||
from backend.data.graph import GraphModel
|
||||
from backend.data.model import (
|
||||
Credentials,
|
||||
CredentialsFieldInfo,
|
||||
CredentialsMetaInput,
|
||||
HostScopedCredentials,
|
||||
@@ -224,99 +223,6 @@ async def get_or_create_library_agent(
|
||||
return library_agents[0]
|
||||
|
||||
|
||||
async def match_credentials_to_requirements(
|
||||
user_id: str,
|
||||
requirements: dict[str, CredentialsFieldInfo],
|
||||
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
|
||||
"""
|
||||
Match user's credentials against a dictionary of credential requirements.
|
||||
|
||||
This is the core matching logic shared by both graph and block credential matching.
|
||||
"""
|
||||
matched: dict[str, CredentialsMetaInput] = {}
|
||||
missing: list[CredentialsMetaInput] = []
|
||||
|
||||
if not requirements:
|
||||
return matched, missing
|
||||
|
||||
available_creds = await get_user_credentials(user_id)
|
||||
|
||||
for field_name, field_info in requirements.items():
|
||||
matching_cred = find_matching_credential(available_creds, field_info)
|
||||
|
||||
if matching_cred:
|
||||
try:
|
||||
matched[field_name] = create_credential_meta_from_match(matching_cred)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Failed to create CredentialsMetaInput for field '{field_name}': "
|
||||
f"provider={matching_cred.provider}, type={matching_cred.type}, "
|
||||
f"credential_id={matching_cred.id}",
|
||||
exc_info=True,
|
||||
)
|
||||
provider = next(iter(field_info.provider), "unknown")
|
||||
cred_type = next(iter(field_info.supported_types), "api_key")
|
||||
missing.append(
|
||||
CredentialsMetaInput(
|
||||
id=field_name,
|
||||
provider=provider, # type: ignore
|
||||
type=cred_type, # type: ignore
|
||||
title=f"{field_name} (validation failed: {e})",
|
||||
)
|
||||
)
|
||||
else:
|
||||
provider = next(iter(field_info.provider), "unknown")
|
||||
cred_type = next(iter(field_info.supported_types), "api_key")
|
||||
missing.append(
|
||||
CredentialsMetaInput(
|
||||
id=field_name,
|
||||
provider=provider, # type: ignore
|
||||
type=cred_type, # type: ignore
|
||||
title=field_name.replace("_", " ").title(),
|
||||
)
|
||||
)
|
||||
|
||||
return matched, missing
|
||||
|
||||
|
||||
async def get_user_credentials(user_id: str) -> list[Credentials]:
|
||||
"""Get all available credentials for a user."""
|
||||
creds_manager = IntegrationCredentialsManager()
|
||||
return await creds_manager.store.get_all_creds(user_id)
|
||||
|
||||
|
||||
def find_matching_credential(
|
||||
available_creds: list[Credentials],
|
||||
field_info: CredentialsFieldInfo,
|
||||
) -> Credentials | None:
|
||||
"""Find a credential that matches the required provider, type, scopes, and host."""
|
||||
for cred in available_creds:
|
||||
if cred.provider not in field_info.provider:
|
||||
continue
|
||||
if cred.type not in field_info.supported_types:
|
||||
continue
|
||||
if cred.type == "oauth2" and not _credential_has_required_scopes(
|
||||
cred, field_info
|
||||
):
|
||||
continue
|
||||
if cred.type == "host_scoped" and not _credential_is_for_host(cred, field_info):
|
||||
continue
|
||||
return cred
|
||||
return None
|
||||
|
||||
|
||||
def create_credential_meta_from_match(
|
||||
matching_cred: Credentials,
|
||||
) -> CredentialsMetaInput:
|
||||
"""Create a CredentialsMetaInput from a matched credential."""
|
||||
return CredentialsMetaInput(
|
||||
id=matching_cred.id,
|
||||
provider=matching_cred.provider, # type: ignore
|
||||
type=matching_cred.type,
|
||||
title=matching_cred.title,
|
||||
)
|
||||
|
||||
|
||||
async def match_user_credentials_to_graph(
|
||||
user_id: str,
|
||||
graph: GraphModel,
|
||||
@@ -425,6 +331,8 @@ def _credential_has_required_scopes(
|
||||
# If no scopes are required, any credential matches
|
||||
if not requirements.required_scopes:
|
||||
return True
|
||||
|
||||
# Check that credential scopes are a superset of required scopes
|
||||
return set(credential.scopes).issuperset(requirements.required_scopes)
|
||||
|
||||
|
||||
|
||||
@@ -9,11 +9,14 @@ This module provides blocks for:
|
||||
- Getting media duration
|
||||
- Looping videos
|
||||
- Adding audio to videos
|
||||
- Transcribing video speech to text
|
||||
- Editing videos by modifying their transcript
|
||||
|
||||
Dependencies:
|
||||
- yt-dlp: For video downloading
|
||||
- moviepy: For video editing operations
|
||||
- elevenlabs: For AI narration (optional)
|
||||
- replicate: For video transcription and text-based editing
|
||||
"""
|
||||
|
||||
from backend.blocks.video.add_audio import AddAudioToVideoBlock
|
||||
@@ -21,14 +24,18 @@ from backend.blocks.video.clip import VideoClipBlock
|
||||
from backend.blocks.video.concat import VideoConcatBlock
|
||||
from backend.blocks.video.download import VideoDownloadBlock
|
||||
from backend.blocks.video.duration import MediaDurationBlock
|
||||
from backend.blocks.video.edit_by_text import EditVideoByTextBlock
|
||||
from backend.blocks.video.loop import LoopVideoBlock
|
||||
from backend.blocks.video.narration import VideoNarrationBlock
|
||||
from backend.blocks.video.text_overlay import VideoTextOverlayBlock
|
||||
from backend.blocks.video.transcribe import TranscribeVideoBlock
|
||||
|
||||
__all__ = [
|
||||
"AddAudioToVideoBlock",
|
||||
"EditVideoByTextBlock",
|
||||
"LoopVideoBlock",
|
||||
"MediaDurationBlock",
|
||||
"TranscribeVideoBlock",
|
||||
"VideoClipBlock",
|
||||
"VideoConcatBlock",
|
||||
"VideoDownloadBlock",
|
||||
|
||||
159
autogpt_platform/backend/backend/blocks/video/edit_by_text.py
Normal file
159
autogpt_platform/backend/backend/blocks/video/edit_by_text.py
Normal file
@@ -0,0 +1,159 @@
|
||||
"""EditVideoByTextBlock - Edit a video by modifying its transcript via Replicate."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Literal
|
||||
|
||||
from replicate.client import Client as ReplicateClient
|
||||
from replicate.helpers import FileOutput
|
||||
|
||||
from backend.blocks.replicate._auth import (
|
||||
TEST_CREDENTIALS,
|
||||
TEST_CREDENTIALS_INPUT,
|
||||
ReplicateCredentials,
|
||||
ReplicateCredentialsInput,
|
||||
)
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.data.execution import ExecutionContext
|
||||
from backend.data.model import CredentialsField, SchemaField
|
||||
from backend.util.exceptions import BlockExecutionError
|
||||
from backend.util.file import MediaFileType, store_media_file
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EditVideoByTextBlock(Block):
|
||||
"""Edit a video by modifying its transcript, cutting segments via Replicate API."""
|
||||
|
||||
class Input(BlockSchemaInput):
|
||||
credentials: ReplicateCredentialsInput = CredentialsField(
|
||||
description="Replicate API key for video editing.",
|
||||
)
|
||||
video_in: MediaFileType = SchemaField(
|
||||
description="Input video file to edit (URL, data URI, or local path)",
|
||||
)
|
||||
transcription: str = SchemaField(
|
||||
description="Desired transcript for the output video",
|
||||
)
|
||||
split_at: Literal["word", "character"] = SchemaField(
|
||||
description="Granularity for transcript matching",
|
||||
default="word",
|
||||
)
|
||||
|
||||
class Output(BlockSchemaOutput):
|
||||
video_url: str = SchemaField(
|
||||
description="URL of the edited video",
|
||||
)
|
||||
transcription: str = SchemaField(
|
||||
description="Transcription used for editing",
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="98d40049-a1de-465f-bba1-47411298ad1a",
|
||||
description="Edit a video by modifying its transcript",
|
||||
categories={BlockCategory.MULTIMEDIA},
|
||||
input_schema=self.Input,
|
||||
output_schema=self.Output,
|
||||
test_input={
|
||||
"credentials": TEST_CREDENTIALS_INPUT,
|
||||
"video_in": "data:video/mp4;base64,AAAA",
|
||||
"transcription": "edited transcript",
|
||||
},
|
||||
test_output=[
|
||||
("video_url", "https://replicate.com/output/video.mp4"),
|
||||
("transcription", "edited transcript"),
|
||||
],
|
||||
test_mock={
|
||||
"_edit_video": lambda *args: "https://replicate.com/output/video.mp4",
|
||||
"_store_input_video": lambda *args, **kwargs: "data:video/mp4;base64,AAAA",
|
||||
},
|
||||
test_credentials=TEST_CREDENTIALS,
|
||||
)
|
||||
|
||||
async def _store_input_video(
|
||||
self, execution_context: ExecutionContext, file: MediaFileType
|
||||
) -> MediaFileType:
|
||||
"""Store input video locally. Extracted for testability."""
|
||||
return await store_media_file(
|
||||
file=file,
|
||||
execution_context=execution_context,
|
||||
return_format="for_external_api",
|
||||
)
|
||||
|
||||
async def _edit_video(
|
||||
self, data_uri: str, transcription: str, split_at: str, api_key: str
|
||||
) -> str:
|
||||
"""Call Replicate API to edit the video based on the transcript."""
|
||||
client = ReplicateClient(api_token=api_key)
|
||||
|
||||
output = await client.async_run(
|
||||
"jd7h/edit-video-by-editing-text:e010b880347314d07e3ce3b21cbd4c57add51fea3474677a6cb1316751c4cb90",
|
||||
input={
|
||||
"mode": "edit",
|
||||
"video_in": data_uri,
|
||||
"transcription": transcription,
|
||||
"split_at": split_at,
|
||||
},
|
||||
)
|
||||
|
||||
# Get video URL from output
|
||||
if isinstance(output, dict) and "video" in output:
|
||||
video_output = output["video"]
|
||||
if isinstance(video_output, FileOutput):
|
||||
return video_output.url
|
||||
return str(video_output)
|
||||
|
||||
if isinstance(output, list) and len(output) > 0:
|
||||
video_url = output[0]
|
||||
if isinstance(video_url, FileOutput):
|
||||
return video_url.url
|
||||
return str(video_url)
|
||||
|
||||
if isinstance(output, FileOutput):
|
||||
return output.url
|
||||
|
||||
if isinstance(output, str):
|
||||
return output
|
||||
|
||||
raise ValueError(f"Unexpected output format from Replicate API: {output}")
|
||||
|
||||
async def run(
|
||||
self,
|
||||
input_data: Input,
|
||||
*,
|
||||
credentials: ReplicateCredentials,
|
||||
execution_context: ExecutionContext,
|
||||
**kwargs,
|
||||
) -> BlockOutput:
|
||||
try:
|
||||
# Store video and get data URI for API submission
|
||||
data_uri = await self._store_input_video(
|
||||
execution_context, input_data.video_in
|
||||
)
|
||||
|
||||
video_url = await self._edit_video(
|
||||
data_uri,
|
||||
input_data.transcription,
|
||||
input_data.split_at,
|
||||
credentials.api_key.get_secret_value(),
|
||||
)
|
||||
|
||||
yield "video_url", video_url
|
||||
yield "transcription", input_data.transcription
|
||||
|
||||
except BlockExecutionError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise BlockExecutionError(
|
||||
message=f"Failed to edit video: {e}",
|
||||
block_name=self.name,
|
||||
block_id=str(self.id),
|
||||
) from e
|
||||
139
autogpt_platform/backend/backend/blocks/video/transcribe.py
Normal file
139
autogpt_platform/backend/backend/blocks/video/transcribe.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""TranscribeVideoBlock - Transcribe speech from a video file using Replicate."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
|
||||
from replicate.client import Client as ReplicateClient
|
||||
from replicate.helpers import FileOutput
|
||||
|
||||
from backend.blocks.replicate._auth import (
|
||||
TEST_CREDENTIALS,
|
||||
TEST_CREDENTIALS_INPUT,
|
||||
ReplicateCredentials,
|
||||
ReplicateCredentialsInput,
|
||||
)
|
||||
from backend.data.block import (
|
||||
Block,
|
||||
BlockCategory,
|
||||
BlockOutput,
|
||||
BlockSchemaInput,
|
||||
BlockSchemaOutput,
|
||||
)
|
||||
from backend.data.execution import ExecutionContext
|
||||
from backend.data.model import CredentialsField, SchemaField
|
||||
from backend.util.exceptions import BlockExecutionError
|
||||
from backend.util.file import MediaFileType, store_media_file
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TranscribeVideoBlock(Block):
|
||||
"""Transcribe speech from a video file to text via Replicate API."""
|
||||
|
||||
class Input(BlockSchemaInput):
|
||||
credentials: ReplicateCredentialsInput = CredentialsField(
|
||||
description="Replicate API key for video transcription.",
|
||||
)
|
||||
video_in: MediaFileType = SchemaField(
|
||||
description="Input video file to transcribe (URL, data URI, or local path)",
|
||||
)
|
||||
|
||||
class Output(BlockSchemaOutput):
|
||||
transcription: str = SchemaField(
|
||||
description="Text transcription extracted from the video",
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="fa49dad0-a5fc-441c-ba04-2ac206e392d8",
|
||||
description="Transcribe speech from a video file to text",
|
||||
categories={BlockCategory.MULTIMEDIA},
|
||||
input_schema=self.Input,
|
||||
output_schema=self.Output,
|
||||
test_input={
|
||||
"credentials": TEST_CREDENTIALS_INPUT,
|
||||
"video_in": "data:video/mp4;base64,AAAA",
|
||||
},
|
||||
test_output=[("transcription", "example transcript")],
|
||||
test_mock={
|
||||
"_transcribe": lambda *args: "example transcript",
|
||||
"_store_input_video": lambda *args, **kwargs: "test.mp4",
|
||||
},
|
||||
test_credentials=TEST_CREDENTIALS,
|
||||
)
|
||||
|
||||
async def _store_input_video(
|
||||
self, execution_context: ExecutionContext, file: MediaFileType
|
||||
) -> MediaFileType:
|
||||
"""Store input video locally. Extracted for testability."""
|
||||
return await store_media_file(
|
||||
file=file,
|
||||
execution_context=execution_context,
|
||||
return_format="for_external_api",
|
||||
)
|
||||
|
||||
async def _transcribe(self, data_uri: str, api_key: str) -> str:
|
||||
"""Call Replicate API to transcribe the video."""
|
||||
client = ReplicateClient(api_token=api_key)
|
||||
|
||||
output = await client.async_run(
|
||||
"jd7h/edit-video-by-editing-text:e010b880347314d07e3ce3b21cbd4c57add51fea3474677a6cb1316751c4cb90",
|
||||
input={
|
||||
"mode": "transcribe",
|
||||
"video_in": data_uri,
|
||||
},
|
||||
)
|
||||
|
||||
# Handle dictionary response format
|
||||
if isinstance(output, dict):
|
||||
if "transcription" in output:
|
||||
return str(output["transcription"])
|
||||
if "error" in output:
|
||||
raise ValueError(f"API returned error: {output['error']}")
|
||||
|
||||
# Handle list formats
|
||||
if isinstance(output, list) and len(output) > 0:
|
||||
if isinstance(output[0], FileOutput):
|
||||
return output[0].url
|
||||
if isinstance(output[0], dict) and "text" in output[0]:
|
||||
return " ".join(
|
||||
segment.get("text", "") for segment in output # type: ignore
|
||||
)
|
||||
return str(output[0])
|
||||
|
||||
if isinstance(output, FileOutput):
|
||||
return output.url
|
||||
|
||||
if isinstance(output, str):
|
||||
return output
|
||||
|
||||
raise ValueError(f"Unexpected output format from Replicate API: {output}")
|
||||
|
||||
async def run(
|
||||
self,
|
||||
input_data: Input,
|
||||
*,
|
||||
credentials: ReplicateCredentials,
|
||||
execution_context: ExecutionContext,
|
||||
**kwargs,
|
||||
) -> BlockOutput:
|
||||
try:
|
||||
# Store video and get data URI for API submission
|
||||
data_uri = await self._store_input_video(
|
||||
execution_context, input_data.video_in
|
||||
)
|
||||
|
||||
transcript = await self._transcribe(
|
||||
data_uri, credentials.api_key.get_secret_value()
|
||||
)
|
||||
yield "transcription", transcript
|
||||
|
||||
except BlockExecutionError:
|
||||
raise
|
||||
except Exception as e:
|
||||
raise BlockExecutionError(
|
||||
message=f"Failed to transcribe video: {e}",
|
||||
block_name=self.name,
|
||||
block_id=str(self.id),
|
||||
) from e
|
||||
@@ -1,4 +1,3 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
@@ -226,10 +225,6 @@ class SyncRabbitMQ(RabbitMQBase):
|
||||
class AsyncRabbitMQ(RabbitMQBase):
|
||||
"""Asynchronous RabbitMQ client"""
|
||||
|
||||
def __init__(self, config: RabbitMQConfig):
|
||||
super().__init__(config)
|
||||
self._reconnect_lock: asyncio.Lock | None = None
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return bool(self._connection and not self._connection.is_closed)
|
||||
@@ -240,17 +235,7 @@ class AsyncRabbitMQ(RabbitMQBase):
|
||||
|
||||
@conn_retry("AsyncRabbitMQ", "Acquiring async connection")
|
||||
async def connect(self):
|
||||
if self.is_connected and self._channel and not self._channel.is_closed:
|
||||
return
|
||||
|
||||
if (
|
||||
self.is_connected
|
||||
and self._connection
|
||||
and (self._channel is None or self._channel.is_closed)
|
||||
):
|
||||
self._channel = await self._connection.channel()
|
||||
await self._channel.set_qos(prefetch_count=1)
|
||||
await self.declare_infrastructure()
|
||||
if self.is_connected:
|
||||
return
|
||||
|
||||
self._connection = await aio_pika.connect_robust(
|
||||
@@ -306,46 +291,24 @@ class AsyncRabbitMQ(RabbitMQBase):
|
||||
exchange, routing_key=queue.routing_key or queue.name
|
||||
)
|
||||
|
||||
@property
|
||||
def _lock(self) -> asyncio.Lock:
|
||||
if self._reconnect_lock is None:
|
||||
self._reconnect_lock = asyncio.Lock()
|
||||
return self._reconnect_lock
|
||||
|
||||
async def _ensure_channel(self) -> aio_pika.abc.AbstractChannel:
|
||||
"""Get a valid channel, reconnecting if the current one is stale.
|
||||
|
||||
Uses a lock to prevent concurrent reconnection attempts from racing.
|
||||
"""
|
||||
if self.is_ready:
|
||||
return self._channel # type: ignore # is_ready guarantees non-None
|
||||
|
||||
async with self._lock:
|
||||
# Double-check after acquiring lock
|
||||
if self.is_ready:
|
||||
return self._channel # type: ignore
|
||||
|
||||
self._channel = None
|
||||
await self.connect()
|
||||
|
||||
if self._channel is None:
|
||||
raise RuntimeError("Channel should be established after connect")
|
||||
|
||||
return self._channel
|
||||
|
||||
async def _publish_once(
|
||||
@func_retry
|
||||
async def publish_message(
|
||||
self,
|
||||
routing_key: str,
|
||||
message: str,
|
||||
exchange: Optional[Exchange] = None,
|
||||
persistent: bool = True,
|
||||
) -> None:
|
||||
channel = await self._ensure_channel()
|
||||
if not self.is_ready:
|
||||
await self.connect()
|
||||
|
||||
if self._channel is None:
|
||||
raise RuntimeError("Channel should be established after connect")
|
||||
|
||||
if exchange:
|
||||
exchange_obj = await channel.get_exchange(exchange.name)
|
||||
exchange_obj = await self._channel.get_exchange(exchange.name)
|
||||
else:
|
||||
exchange_obj = channel.default_exchange
|
||||
exchange_obj = self._channel.default_exchange
|
||||
|
||||
await exchange_obj.publish(
|
||||
aio_pika.Message(
|
||||
@@ -359,23 +322,9 @@ class AsyncRabbitMQ(RabbitMQBase):
|
||||
routing_key=routing_key,
|
||||
)
|
||||
|
||||
@func_retry
|
||||
async def publish_message(
|
||||
self,
|
||||
routing_key: str,
|
||||
message: str,
|
||||
exchange: Optional[Exchange] = None,
|
||||
persistent: bool = True,
|
||||
) -> None:
|
||||
try:
|
||||
await self._publish_once(routing_key, message, exchange, persistent)
|
||||
except aio_pika.exceptions.ChannelInvalidStateError:
|
||||
logger.warning(
|
||||
"RabbitMQ channel invalid, forcing reconnect and retrying publish"
|
||||
)
|
||||
async with self._lock:
|
||||
self._channel = None
|
||||
await self._publish_once(routing_key, message, exchange, persistent)
|
||||
|
||||
async def get_channel(self) -> aio_pika.abc.AbstractChannel:
|
||||
return await self._ensure_channel()
|
||||
if not self.is_ready:
|
||||
await self.connect()
|
||||
if self._channel is None:
|
||||
raise RuntimeError("Channel should be established after connect")
|
||||
return self._channel
|
||||
|
||||
@@ -104,31 +104,7 @@ export function FileInput(props: Props) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const getFileLabelFromValue = (val: unknown): string => {
|
||||
// Handle object format from external API: { name, type, size, data }
|
||||
if (val && typeof val === "object") {
|
||||
const obj = val as Record<string, unknown>;
|
||||
if (typeof obj.name === "string") {
|
||||
return getFileLabel(
|
||||
obj.name,
|
||||
typeof obj.type === "string" ? obj.type : "",
|
||||
);
|
||||
}
|
||||
if (typeof obj.type === "string") {
|
||||
const mimeParts = obj.type.split("/");
|
||||
if (mimeParts.length > 1) {
|
||||
return `${mimeParts[1].toUpperCase()} file`;
|
||||
}
|
||||
return `${obj.type} file`;
|
||||
}
|
||||
return "File";
|
||||
}
|
||||
|
||||
// Handle string values (data URIs or file paths)
|
||||
if (typeof val !== "string") {
|
||||
return "File";
|
||||
}
|
||||
|
||||
const getFileLabelFromValue = (val: string) => {
|
||||
if (val.startsWith("data:")) {
|
||||
const matches = val.match(/^data:([^;]+);/);
|
||||
if (matches?.[1]) {
|
||||
|
||||
@@ -474,8 +474,10 @@ Below is a comprehensive list of all available blocks, categorized by their prim
|
||||
| Block Name | Description |
|
||||
|------------|-------------|
|
||||
| [Add Audio To Video](block-integrations/video/add_audio.md#add-audio-to-video) | Block to attach an audio file to a video file using moviepy |
|
||||
| [Edit Video By Text](block-integrations/video/edit_by_text.md#edit-video-by-text) | Edit a video by modifying its transcript |
|
||||
| [Loop Video](block-integrations/video/loop.md#loop-video) | Block to loop a video to a given duration or number of repeats |
|
||||
| [Media Duration](block-integrations/video/duration.md#media-duration) | Block to get the duration of a media file |
|
||||
| [Transcribe Video](block-integrations/video/transcribe.md#transcribe-video) | Transcribe speech from a video file to text |
|
||||
| [Video Clip](block-integrations/video/clip.md#video-clip) | Extract a time segment from a video |
|
||||
| [Video Concat](block-integrations/video/concat.md#video-concat) | Merge multiple video clips into one continuous video |
|
||||
| [Video Download](block-integrations/video/download.md#video-download) | Download video from URL (YouTube, Vimeo, news sites, direct links) |
|
||||
|
||||
@@ -133,8 +133,10 @@
|
||||
* [Video Concat](block-integrations/video/concat.md)
|
||||
* [Video Download](block-integrations/video/download.md)
|
||||
* [Video Duration](block-integrations/video/duration.md)
|
||||
* [Video Edit By Text](block-integrations/video/edit_by_text.md)
|
||||
* [Video Loop](block-integrations/video/loop.md)
|
||||
* [Video Narration](block-integrations/video/narration.md)
|
||||
* [Video Text Overlay](block-integrations/video/text_overlay.md)
|
||||
* [Video Transcribe](block-integrations/video/transcribe.md)
|
||||
* [Wolfram LLM API](block-integrations/wolfram/llm_api.md)
|
||||
* [Zerobounce Validate Emails](block-integrations/zerobounce/validate_emails.md)
|
||||
|
||||
37
docs/integrations/block-integrations/video/edit_by_text.md
Normal file
37
docs/integrations/block-integrations/video/edit_by_text.md
Normal file
@@ -0,0 +1,37 @@
|
||||
# Video Edit By Text
|
||||
<!-- MANUAL: file_description -->
|
||||
_Add a description of this category of blocks._
|
||||
<!-- END MANUAL -->
|
||||
|
||||
## Edit Video By Text
|
||||
|
||||
### What it is
|
||||
Edit a video by modifying its transcript
|
||||
|
||||
### How it works
|
||||
<!-- MANUAL: how_it_works -->
|
||||
_Add technical explanation here._
|
||||
<!-- END MANUAL -->
|
||||
|
||||
### Inputs
|
||||
|
||||
| Input | Description | Type | Required |
|
||||
|-------|-------------|------|----------|
|
||||
| video_in | Input video file to edit (URL, data URI, or local path) | str (file) | Yes |
|
||||
| transcription | Desired transcript for the output video | str | Yes |
|
||||
| split_at | Granularity for transcript matching | "word" \| "character" | No |
|
||||
|
||||
### Outputs
|
||||
|
||||
| Output | Description | Type |
|
||||
|--------|-------------|------|
|
||||
| error | Error message if the operation failed | str |
|
||||
| video_url | URL of the edited video | str |
|
||||
| transcription | Transcription used for editing | str |
|
||||
|
||||
### Possible use case
|
||||
<!-- MANUAL: use_case -->
|
||||
_Add practical use case examples here._
|
||||
<!-- END MANUAL -->
|
||||
|
||||
---
|
||||
34
docs/integrations/block-integrations/video/transcribe.md
Normal file
34
docs/integrations/block-integrations/video/transcribe.md
Normal file
@@ -0,0 +1,34 @@
|
||||
# Video Transcribe
|
||||
<!-- MANUAL: file_description -->
|
||||
_Add a description of this category of blocks._
|
||||
<!-- END MANUAL -->
|
||||
|
||||
## Transcribe Video
|
||||
|
||||
### What it is
|
||||
Transcribe speech from a video file to text
|
||||
|
||||
### How it works
|
||||
<!-- MANUAL: how_it_works -->
|
||||
_Add technical explanation here._
|
||||
<!-- END MANUAL -->
|
||||
|
||||
### Inputs
|
||||
|
||||
| Input | Description | Type | Required |
|
||||
|-------|-------------|------|----------|
|
||||
| video_in | Input video file to transcribe (URL, data URI, or local path) | str (file) | Yes |
|
||||
|
||||
### Outputs
|
||||
|
||||
| Output | Description | Type |
|
||||
|--------|-------------|------|
|
||||
| error | Error message if the operation failed | str |
|
||||
| transcription | Text transcription extracted from the video | str |
|
||||
|
||||
### Possible use case
|
||||
<!-- MANUAL: use_case -->
|
||||
_Add practical use case examples here._
|
||||
<!-- END MANUAL -->
|
||||
|
||||
---
|
||||
Reference in New Issue
Block a user