diff --git a/autogpt_platform/backend/backend/api/features/chat/db.py b/autogpt_platform/backend/backend/api/features/chat/db.py index d34b4e5b07..303ea0a698 100644 --- a/autogpt_platform/backend/backend/api/features/chat/db.py +++ b/autogpt_platform/backend/backend/api/features/chat/db.py @@ -45,10 +45,7 @@ async def create_chat_session( successfulAgentRuns=SafeJson({}), successfulAgentSchedules=SafeJson({}), ) - return await PrismaChatSession.prisma().create( - data=data, - include={"Messages": True}, - ) + return await PrismaChatSession.prisma().create(data=data) async def update_chat_session( diff --git a/autogpt_platform/backend/backend/api/features/chat/routes.py b/autogpt_platform/backend/backend/api/features/chat/routes.py index 3e731d86ac..74e6e8ba1e 100644 --- a/autogpt_platform/backend/backend/api/features/chat/routes.py +++ b/autogpt_platform/backend/backend/api/features/chat/routes.py @@ -266,12 +266,38 @@ async def stream_chat_post( """ import asyncio + import time + + stream_start_time = time.perf_counter() + + # Base log metadata (task_id added after creation) + log_meta = {"component": "ChatStream", "session_id": session_id} + if user_id: + log_meta["user_id"] = user_id + + logger.info( + f"[TIMING] stream_chat_post STARTED, session={session_id}, " + f"user={user_id}, message_len={len(request.message)}", + extra={"json_fields": log_meta}, + ) session = await _validate_and_get_session(session_id, user_id) + logger.info( + f"[TIMING] session validated in {(time.perf_counter() - stream_start_time)*1000:.1f}ms", + extra={ + "json_fields": { + **log_meta, + "duration_ms": (time.perf_counter() - stream_start_time) * 1000, + } + }, + ) # Create a task in the stream registry for reconnection support task_id = str(uuid_module.uuid4()) operation_id = str(uuid_module.uuid4()) + log_meta["task_id"] = task_id + + task_create_start = time.perf_counter() await stream_registry.create_task( task_id=task_id, session_id=session_id, @@ -280,14 +306,46 @@ async def stream_chat_post( tool_name="chat", operation_id=operation_id, ) + logger.info( + f"[TIMING] create_task completed in {(time.perf_counter() - task_create_start)*1000:.1f}ms", + extra={ + "json_fields": { + **log_meta, + "duration_ms": (time.perf_counter() - task_create_start) * 1000, + } + }, + ) # Background task that runs the AI generation independently of SSE connection async def run_ai_generation(): + import time as time_module + + gen_start_time = time_module.perf_counter() + logger.info( + f"[TIMING] run_ai_generation STARTED, task={task_id}, session={session_id}, user={user_id}", + extra={"json_fields": log_meta}, + ) + first_chunk_time, ttfc = None, None + chunk_count = 0 try: # Emit a start event with task_id for reconnection start_chunk = StreamStart(messageId=task_id, taskId=task_id) await stream_registry.publish_chunk(task_id, start_chunk) + logger.info( + f"[TIMING] StreamStart published at {(time_module.perf_counter() - gen_start_time)*1000:.1f}ms", + extra={ + "json_fields": { + **log_meta, + "elapsed_ms": (time_module.perf_counter() - gen_start_time) + * 1000, + } + }, + ) + logger.info( + "[TIMING] Calling stream_chat_completion", + extra={"json_fields": log_meta}, + ) async for chunk in chat_service.stream_chat_completion( session_id, request.message, @@ -296,54 +354,202 @@ async def stream_chat_post( session=session, # Pass pre-fetched session to avoid double-fetch context=request.context, ): + chunk_count += 1 + if first_chunk_time is None: + first_chunk_time = time_module.perf_counter() + ttfc = first_chunk_time - gen_start_time + logger.info( + f"[TIMING] FIRST AI CHUNK at {ttfc:.2f}s, type={type(chunk).__name__}", + extra={ + "json_fields": { + **log_meta, + "chunk_type": type(chunk).__name__, + "time_to_first_chunk_ms": ttfc * 1000, + } + }, + ) # Write to Redis (subscribers will receive via XREAD) await stream_registry.publish_chunk(task_id, chunk) - # Mark task as completed + gen_end_time = time_module.perf_counter() + total_time = (gen_end_time - gen_start_time) * 1000 + logger.info( + f"[TIMING] run_ai_generation FINISHED in {total_time/1000:.1f}s; " + f"task={task_id}, session={session_id}, " + f"ttfc={ttfc or -1:.2f}s, n_chunks={chunk_count}", + extra={ + "json_fields": { + **log_meta, + "total_time_ms": total_time, + "time_to_first_chunk_ms": ( + ttfc * 1000 if ttfc is not None else None + ), + "n_chunks": chunk_count, + } + }, + ) + await stream_registry.mark_task_completed(task_id, "completed") except Exception as e: + elapsed = time_module.perf_counter() - gen_start_time logger.error( - f"Error in background AI generation for session {session_id}: {e}" + f"[TIMING] run_ai_generation ERROR after {elapsed:.2f}s: {e}", + extra={ + "json_fields": { + **log_meta, + "elapsed_ms": elapsed * 1000, + "error": str(e), + } + }, ) await stream_registry.mark_task_completed(task_id, "failed") # Start the AI generation in a background task bg_task = asyncio.create_task(run_ai_generation()) await stream_registry.set_task_asyncio_task(task_id, bg_task) + setup_time = (time.perf_counter() - stream_start_time) * 1000 + logger.info( + f"[TIMING] Background task started, setup={setup_time:.1f}ms", + extra={"json_fields": {**log_meta, "setup_time_ms": setup_time}}, + ) # SSE endpoint that subscribes to the task's stream async def event_generator() -> AsyncGenerator[str, None]: + import time as time_module + + event_gen_start = time_module.perf_counter() + logger.info( + f"[TIMING] event_generator STARTED, task={task_id}, session={session_id}, " + f"user={user_id}", + extra={"json_fields": log_meta}, + ) subscriber_queue = None + first_chunk_yielded = False + chunks_yielded = 0 try: # Subscribe to the task stream (this replays existing messages + live updates) + subscribe_start = time_module.perf_counter() + logger.info( + "[TIMING] Calling subscribe_to_task", + extra={"json_fields": log_meta}, + ) subscriber_queue = await stream_registry.subscribe_to_task( task_id=task_id, user_id=user_id, last_message_id="0-0", # Get all messages from the beginning ) + subscribe_time = (time_module.perf_counter() - subscribe_start) * 1000 + logger.info( + f"[TIMING] subscribe_to_task completed in {subscribe_time:.1f}ms, " + f"queue_ok={subscriber_queue is not None}", + extra={ + "json_fields": { + **log_meta, + "duration_ms": subscribe_time, + "queue_obtained": subscriber_queue is not None, + } + }, + ) if subscriber_queue is None: + logger.info( + "[TIMING] subscriber_queue is None, yielding finish", + extra={"json_fields": log_meta}, + ) yield StreamFinish().to_sse() yield "data: [DONE]\n\n" return # Read from the subscriber queue and yield to SSE + logger.info( + "[TIMING] Starting to read from subscriber_queue", + extra={"json_fields": log_meta}, + ) while True: try: + queue_wait_start = time_module.perf_counter() chunk = await asyncio.wait_for(subscriber_queue.get(), timeout=30.0) + queue_wait_time = ( + time_module.perf_counter() - queue_wait_start + ) * 1000 + chunks_yielded += 1 + + if not first_chunk_yielded: + first_chunk_yielded = True + elapsed = time_module.perf_counter() - event_gen_start + logger.info( + f"[TIMING] FIRST CHUNK from queue at {elapsed:.2f}s, " + f"type={type(chunk).__name__}, " + f"wait={queue_wait_time:.1f}ms", + extra={ + "json_fields": { + **log_meta, + "chunk_type": type(chunk).__name__, + "elapsed_ms": elapsed * 1000, + "queue_wait_ms": queue_wait_time, + } + }, + ) + elif chunks_yielded % 50 == 0: + logger.info( + f"[TIMING] Chunk #{chunks_yielded}, " + f"type={type(chunk).__name__}", + extra={ + "json_fields": { + **log_meta, + "chunk_number": chunks_yielded, + "chunk_type": type(chunk).__name__, + } + }, + ) + yield chunk.to_sse() # Check for finish signal if isinstance(chunk, StreamFinish): + total_time = time_module.perf_counter() - event_gen_start + logger.info( + f"[TIMING] StreamFinish received in {total_time:.2f}s; " + f"n_chunks={chunks_yielded}", + extra={ + "json_fields": { + **log_meta, + "chunks_yielded": chunks_yielded, + "total_time_ms": total_time * 1000, + } + }, + ) break except asyncio.TimeoutError: # Send heartbeat to keep connection alive + logger.info( + f"[TIMING] Heartbeat timeout, chunks_so_far={chunks_yielded}", + extra={ + "json_fields": {**log_meta, "chunks_so_far": chunks_yielded} + }, + ) yield StreamHeartbeat().to_sse() except GeneratorExit: + logger.info( + f"[TIMING] GeneratorExit (client disconnected), chunks={chunks_yielded}", + extra={ + "json_fields": { + **log_meta, + "chunks_yielded": chunks_yielded, + "reason": "client_disconnect", + } + }, + ) pass # Client disconnected - background task continues except Exception as e: - logger.error(f"Error in SSE stream for task {task_id}: {e}") + elapsed = (time_module.perf_counter() - event_gen_start) * 1000 + logger.error( + f"[TIMING] event_generator ERROR after {elapsed:.1f}ms: {e}", + extra={ + "json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)} + }, + ) finally: # Unsubscribe when client disconnects or stream ends to prevent resource leak if subscriber_queue is not None: @@ -357,6 +563,18 @@ async def stream_chat_post( exc_info=True, ) # AI SDK protocol termination - always yield even if unsubscribe fails + total_time = time_module.perf_counter() - event_gen_start + logger.info( + f"[TIMING] event_generator FINISHED in {total_time:.2f}s; " + f"task={task_id}, session={session_id}, n_chunks={chunks_yielded}", + extra={ + "json_fields": { + **log_meta, + "total_time_ms": total_time * 1000, + "chunks_yielded": chunks_yielded, + } + }, + ) yield "data: [DONE]\n\n" return StreamingResponse( @@ -425,7 +643,7 @@ async def stream_chat_get( "Chat stream completed", extra={ "session_id": session_id, - "chunk_count": chunk_count, + "n_chunks": chunk_count, "first_chunk_type": first_chunk_type, }, ) diff --git a/autogpt_platform/backend/backend/api/features/chat/service.py b/autogpt_platform/backend/backend/api/features/chat/service.py index 06da6bdf2b..da18421b98 100644 --- a/autogpt_platform/backend/backend/api/features/chat/service.py +++ b/autogpt_platform/backend/backend/api/features/chat/service.py @@ -371,21 +371,45 @@ async def stream_chat_completion( ValueError: If max_context_messages is exceeded """ + completion_start = time.monotonic() + + # Build log metadata for structured logging + log_meta = {"component": "ChatService", "session_id": session_id} + if user_id: + log_meta["user_id"] = user_id + logger.info( - f"Streaming chat completion for session {session_id} for message {message} and user id {user_id}. Message is user message: {is_user_message}" + f"[TIMING] stream_chat_completion STARTED, session={session_id}, user={user_id}, " + f"message_len={len(message) if message else 0}, is_user={is_user_message}", + extra={ + "json_fields": { + **log_meta, + "message_len": len(message) if message else 0, + "is_user_message": is_user_message, + } + }, ) # Only fetch from Redis if session not provided (initial call) if session is None: + fetch_start = time.monotonic() session = await get_chat_session(session_id, user_id) + fetch_time = (time.monotonic() - fetch_start) * 1000 logger.info( - f"Fetched session from Redis: {session.session_id if session else 'None'}, " - f"message_count={len(session.messages) if session else 0}" + f"[TIMING] get_chat_session took {fetch_time:.1f}ms, " + f"n_messages={len(session.messages) if session else 0}", + extra={ + "json_fields": { + **log_meta, + "duration_ms": fetch_time, + "n_messages": len(session.messages) if session else 0, + } + }, ) else: logger.info( - f"Using provided session object: {session.session_id}, " - f"message_count={len(session.messages)}" + f"[TIMING] Using provided session, messages={len(session.messages)}", + extra={"json_fields": {**log_meta, "n_messages": len(session.messages)}}, ) if not session: @@ -406,17 +430,25 @@ async def stream_chat_completion( # Track user message in PostHog if is_user_message: + posthog_start = time.monotonic() track_user_message( user_id=user_id, session_id=session_id, message_length=len(message), ) + posthog_time = (time.monotonic() - posthog_start) * 1000 + logger.info( + f"[TIMING] track_user_message took {posthog_time:.1f}ms", + extra={"json_fields": {**log_meta, "duration_ms": posthog_time}}, + ) - logger.info( - f"Upserting session: {session.session_id} with user id {session.user_id}, " - f"message_count={len(session.messages)}" - ) + upsert_start = time.monotonic() session = await upsert_chat_session(session) + upsert_time = (time.monotonic() - upsert_start) * 1000 + logger.info( + f"[TIMING] upsert_chat_session took {upsert_time:.1f}ms", + extra={"json_fields": {**log_meta, "duration_ms": upsert_time}}, + ) assert session, "Session not found" # Generate title for new sessions on first user message (non-blocking) @@ -454,7 +486,13 @@ async def stream_chat_completion( asyncio.create_task(_update_title()) # Build system prompt with business understanding + prompt_start = time.monotonic() system_prompt, understanding = await _build_system_prompt(user_id) + prompt_time = (time.monotonic() - prompt_start) * 1000 + logger.info( + f"[TIMING] _build_system_prompt took {prompt_time:.1f}ms", + extra={"json_fields": {**log_meta, "duration_ms": prompt_time}}, + ) # Initialize variables for streaming assistant_response = ChatMessage( @@ -483,9 +521,18 @@ async def stream_chat_completion( text_block_id = str(uuid_module.uuid4()) # Yield message start + setup_time = (time.monotonic() - completion_start) * 1000 + logger.info( + f"[TIMING] Setup complete, yielding StreamStart at {setup_time:.1f}ms", + extra={"json_fields": {**log_meta, "setup_time_ms": setup_time}}, + ) yield StreamStart(messageId=message_id) try: + logger.info( + "[TIMING] Calling _stream_chat_chunks", + extra={"json_fields": log_meta}, + ) async for chunk in _stream_chat_chunks( session=session, tools=tools, @@ -893,9 +940,21 @@ async def _stream_chat_chunks( SSE formatted JSON response objects """ + import time as time_module + + stream_chunks_start = time_module.perf_counter() model = config.model - logger.info("Starting pure chat stream") + # Build log metadata for structured logging + log_meta = {"component": "ChatService", "session_id": session.session_id} + if session.user_id: + log_meta["user_id"] = session.user_id + + logger.info( + f"[TIMING] _stream_chat_chunks STARTED, session={session.session_id}, " + f"user={session.user_id}, n_messages={len(session.messages)}", + extra={"json_fields": {**log_meta, "n_messages": len(session.messages)}}, + ) messages = session.to_openai_messages() if system_prompt: @@ -906,12 +965,18 @@ async def _stream_chat_chunks( messages = [system_message] + messages # Apply context window management + context_start = time_module.perf_counter() context_result = await _manage_context_window( messages=messages, model=model, api_key=config.api_key, base_url=config.base_url, ) + context_time = (time_module.perf_counter() - context_start) * 1000 + logger.info( + f"[TIMING] _manage_context_window took {context_time:.1f}ms", + extra={"json_fields": {**log_meta, "duration_ms": context_time}}, + ) if context_result.error: if "System prompt dropped" in context_result.error: @@ -946,9 +1011,19 @@ async def _stream_chat_chunks( while retry_count <= MAX_RETRIES: try: + elapsed = (time_module.perf_counter() - stream_chunks_start) * 1000 + retry_info = ( + f" (retry {retry_count}/{MAX_RETRIES})" if retry_count > 0 else "" + ) logger.info( - f"Creating OpenAI chat completion stream..." - f"{f' (retry {retry_count}/{MAX_RETRIES})' if retry_count > 0 else ''}" + f"[TIMING] Creating OpenAI stream at {elapsed:.1f}ms{retry_info}", + extra={ + "json_fields": { + **log_meta, + "elapsed_ms": elapsed, + "retry_count": retry_count, + } + }, ) # Build extra_body for OpenRouter tracing and PostHog analytics @@ -965,6 +1040,7 @@ async def _stream_chat_chunks( :128 ] # OpenRouter limit + api_call_start = time_module.perf_counter() stream = await client.chat.completions.create( model=model, messages=cast(list[ChatCompletionMessageParam], messages), @@ -974,6 +1050,11 @@ async def _stream_chat_chunks( stream_options=ChatCompletionStreamOptionsParam(include_usage=True), extra_body=extra_body, ) + api_init_time = (time_module.perf_counter() - api_call_start) * 1000 + logger.info( + f"[TIMING] OpenAI stream object returned in {api_init_time:.1f}ms", + extra={"json_fields": {**log_meta, "duration_ms": api_init_time}}, + ) # Variables to accumulate tool calls tool_calls: list[dict[str, Any]] = [] @@ -984,10 +1065,13 @@ async def _stream_chat_chunks( # Track if we've started the text block text_started = False + first_content_chunk = True + chunk_count = 0 # Process the stream chunk: ChatCompletionChunk async for chunk in stream: + chunk_count += 1 if chunk.usage: yield StreamUsage( promptTokens=chunk.usage.prompt_tokens, @@ -1010,6 +1094,23 @@ async def _stream_chat_chunks( if not text_started and text_block_id: yield StreamTextStart(id=text_block_id) text_started = True + # Log timing for first content chunk + if first_content_chunk: + first_content_chunk = False + ttfc = ( + time_module.perf_counter() - api_call_start + ) * 1000 + logger.info( + f"[TIMING] FIRST CONTENT CHUNK at {ttfc:.1f}ms " + f"(since API call), n_chunks={chunk_count}", + extra={ + "json_fields": { + **log_meta, + "time_to_first_chunk_ms": ttfc, + "n_chunks": chunk_count, + } + }, + ) # Stream the text delta text_response = StreamTextDelta( id=text_block_id or "", @@ -1066,7 +1167,21 @@ async def _stream_chat_chunks( toolName=tool_calls[idx]["function"]["name"], ) emitted_start_for_idx.add(idx) - logger.info(f"Stream complete. Finish reason: {finish_reason}") + stream_duration = time_module.perf_counter() - api_call_start + logger.info( + f"[TIMING] OpenAI stream COMPLETE, finish_reason={finish_reason}, " + f"duration={stream_duration:.2f}s, " + f"n_chunks={chunk_count}, n_tool_calls={len(tool_calls)}", + extra={ + "json_fields": { + **log_meta, + "stream_duration_ms": stream_duration * 1000, + "finish_reason": finish_reason, + "n_chunks": chunk_count, + "n_tool_calls": len(tool_calls), + } + }, + ) # Yield all accumulated tool calls after the stream is complete # This ensures all tool call arguments have been fully received @@ -1086,6 +1201,12 @@ async def _stream_chat_chunks( # Re-raise to trigger retry logic in the parent function raise + total_time = (time_module.perf_counter() - stream_chunks_start) * 1000 + logger.info( + f"[TIMING] _stream_chat_chunks COMPLETED in {total_time/1000:.1f}s; " + f"session={session.session_id}, user={session.user_id}", + extra={"json_fields": {**log_meta, "total_time_ms": total_time}}, + ) yield StreamFinish() return except Exception as e: diff --git a/autogpt_platform/backend/backend/api/features/chat/stream_registry.py b/autogpt_platform/backend/backend/api/features/chat/stream_registry.py index 88a5023e2b..509d20d9f4 100644 --- a/autogpt_platform/backend/backend/api/features/chat/stream_registry.py +++ b/autogpt_platform/backend/backend/api/features/chat/stream_registry.py @@ -104,6 +104,24 @@ async def create_task( Returns: The created ActiveTask instance (metadata only) """ + import time + + start_time = time.perf_counter() + + # Build log metadata for structured logging + log_meta = { + "component": "StreamRegistry", + "task_id": task_id, + "session_id": session_id, + } + if user_id: + log_meta["user_id"] = user_id + + logger.info( + f"[TIMING] create_task STARTED, task={task_id}, session={session_id}, user={user_id}", + extra={"json_fields": log_meta}, + ) + task = ActiveTask( task_id=task_id, session_id=session_id, @@ -114,10 +132,18 @@ async def create_task( ) # Store metadata in Redis + redis_start = time.perf_counter() redis = await get_redis_async() + redis_time = (time.perf_counter() - redis_start) * 1000 + logger.info( + f"[TIMING] get_redis_async took {redis_time:.1f}ms", + extra={"json_fields": {**log_meta, "duration_ms": redis_time}}, + ) + meta_key = _get_task_meta_key(task_id) op_key = _get_operation_mapping_key(operation_id) + hset_start = time.perf_counter() await redis.hset( # type: ignore[misc] meta_key, mapping={ @@ -131,12 +157,22 @@ async def create_task( "created_at": task.created_at.isoformat(), }, ) + hset_time = (time.perf_counter() - hset_start) * 1000 + logger.info( + f"[TIMING] redis.hset took {hset_time:.1f}ms", + extra={"json_fields": {**log_meta, "duration_ms": hset_time}}, + ) + await redis.expire(meta_key, config.stream_ttl) # Create operation_id -> task_id mapping for webhook lookups await redis.set(op_key, task_id, ex=config.stream_ttl) - logger.debug(f"Created task {task_id} for session {session_id}") + total_time = (time.perf_counter() - start_time) * 1000 + logger.info( + f"[TIMING] create_task COMPLETED in {total_time:.1f}ms; task={task_id}, session={session_id}", + extra={"json_fields": {**log_meta, "total_time_ms": total_time}}, + ) return task @@ -156,26 +192,60 @@ async def publish_chunk( Returns: The Redis Stream message ID """ + import time + + start_time = time.perf_counter() + chunk_type = type(chunk).__name__ chunk_json = chunk.model_dump_json() message_id = "0-0" + # Build log metadata + log_meta = { + "component": "StreamRegistry", + "task_id": task_id, + "chunk_type": chunk_type, + } + try: redis = await get_redis_async() stream_key = _get_task_stream_key(task_id) # Write to Redis Stream for persistence and real-time delivery + xadd_start = time.perf_counter() raw_id = await redis.xadd( stream_key, {"data": chunk_json}, maxlen=config.stream_max_length, ) + xadd_time = (time.perf_counter() - xadd_start) * 1000 message_id = raw_id if isinstance(raw_id, str) else raw_id.decode() # Set TTL on stream to match task metadata TTL await redis.expire(stream_key, config.stream_ttl) + + total_time = (time.perf_counter() - start_time) * 1000 + # Only log timing for significant chunks or slow operations + if ( + chunk_type + in ("StreamStart", "StreamFinish", "StreamTextStart", "StreamTextEnd") + or total_time > 50 + ): + logger.info( + f"[TIMING] publish_chunk {chunk_type} in {total_time:.1f}ms (xadd={xadd_time:.1f}ms)", + extra={ + "json_fields": { + **log_meta, + "total_time_ms": total_time, + "xadd_time_ms": xadd_time, + "message_id": message_id, + } + }, + ) except Exception as e: + elapsed = (time.perf_counter() - start_time) * 1000 logger.error( - f"Failed to publish chunk for task {task_id}: {e}", + f"[TIMING] Failed to publish chunk {chunk_type} after {elapsed:.1f}ms: {e}", + extra={"json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)}}, exc_info=True, ) @@ -200,24 +270,61 @@ async def subscribe_to_task( An asyncio Queue that will receive stream chunks, or None if task not found or user doesn't have access """ + import time + + start_time = time.perf_counter() + + # Build log metadata + log_meta = {"component": "StreamRegistry", "task_id": task_id} + if user_id: + log_meta["user_id"] = user_id + + logger.info( + f"[TIMING] subscribe_to_task STARTED, task={task_id}, user={user_id}, last_msg={last_message_id}", + extra={"json_fields": {**log_meta, "last_message_id": last_message_id}}, + ) + + redis_start = time.perf_counter() redis = await get_redis_async() meta_key = _get_task_meta_key(task_id) meta: dict[Any, Any] = await redis.hgetall(meta_key) # type: ignore[misc] + hgetall_time = (time.perf_counter() - redis_start) * 1000 + logger.info( + f"[TIMING] Redis hgetall took {hgetall_time:.1f}ms", + extra={"json_fields": {**log_meta, "duration_ms": hgetall_time}}, + ) if not meta: - logger.debug(f"Task {task_id} not found in Redis") + elapsed = (time.perf_counter() - start_time) * 1000 + logger.info( + f"[TIMING] Task not found in Redis after {elapsed:.1f}ms", + extra={ + "json_fields": { + **log_meta, + "elapsed_ms": elapsed, + "reason": "task_not_found", + } + }, + ) return None # Note: Redis client uses decode_responses=True, so keys are strings task_status = meta.get("status", "") task_user_id = meta.get("user_id", "") or None + log_meta["session_id"] = meta.get("session_id", "") # Validate ownership - if task has an owner, requester must match if task_user_id: if user_id != task_user_id: logger.warning( - f"User {user_id} denied access to task {task_id} " - f"owned by {task_user_id}" + f"[TIMING] Access denied: user {user_id} tried to access task owned by {task_user_id}", + extra={ + "json_fields": { + **log_meta, + "task_owner": task_user_id, + "reason": "access_denied", + } + }, ) return None @@ -225,7 +332,19 @@ async def subscribe_to_task( stream_key = _get_task_stream_key(task_id) # Step 1: Replay messages from Redis Stream + xread_start = time.perf_counter() messages = await redis.xread({stream_key: last_message_id}, block=0, count=1000) + xread_time = (time.perf_counter() - xread_start) * 1000 + logger.info( + f"[TIMING] Redis xread (replay) took {xread_time:.1f}ms, status={task_status}", + extra={ + "json_fields": { + **log_meta, + "duration_ms": xread_time, + "task_status": task_status, + } + }, + ) replayed_count = 0 replay_last_id = last_message_id @@ -244,19 +363,48 @@ async def subscribe_to_task( except Exception as e: logger.warning(f"Failed to replay message: {e}") - logger.debug(f"Task {task_id}: replayed {replayed_count} messages") + logger.info( + f"[TIMING] Replayed {replayed_count} messages, last_id={replay_last_id}", + extra={ + "json_fields": { + **log_meta, + "n_messages_replayed": replayed_count, + "replay_last_id": replay_last_id, + } + }, + ) # Step 2: If task is still running, start stream listener for live updates if task_status == "running": + logger.info( + "[TIMING] Task still running, starting _stream_listener", + extra={"json_fields": {**log_meta, "task_status": task_status}}, + ) listener_task = asyncio.create_task( - _stream_listener(task_id, subscriber_queue, replay_last_id) + _stream_listener(task_id, subscriber_queue, replay_last_id, log_meta) ) # Track listener task for cleanup on unsubscribe _listener_tasks[id(subscriber_queue)] = (task_id, listener_task) else: # Task is completed/failed - add finish marker + logger.info( + f"[TIMING] Task already {task_status}, adding StreamFinish", + extra={"json_fields": {**log_meta, "task_status": task_status}}, + ) await subscriber_queue.put(StreamFinish()) + total_time = (time.perf_counter() - start_time) * 1000 + logger.info( + f"[TIMING] subscribe_to_task COMPLETED in {total_time:.1f}ms; task={task_id}, " + f"n_messages_replayed={replayed_count}", + extra={ + "json_fields": { + **log_meta, + "total_time_ms": total_time, + "n_messages_replayed": replayed_count, + } + }, + ) return subscriber_queue @@ -264,6 +412,7 @@ async def _stream_listener( task_id: str, subscriber_queue: asyncio.Queue[StreamBaseResponse], last_replayed_id: str, + log_meta: dict | None = None, ) -> None: """Listen to Redis Stream for new messages using blocking XREAD. @@ -274,10 +423,27 @@ async def _stream_listener( task_id: Task ID to listen for subscriber_queue: Queue to deliver messages to last_replayed_id: Last message ID from replay (continue from here) + log_meta: Structured logging metadata """ + import time + + start_time = time.perf_counter() + + # Use provided log_meta or build minimal one + if log_meta is None: + log_meta = {"component": "StreamRegistry", "task_id": task_id} + + logger.info( + f"[TIMING] _stream_listener STARTED, task={task_id}, last_id={last_replayed_id}", + extra={"json_fields": {**log_meta, "last_replayed_id": last_replayed_id}}, + ) + queue_id = id(subscriber_queue) # Track the last successfully delivered message ID for recovery hints last_delivered_id = last_replayed_id + messages_delivered = 0 + first_message_time = None + xread_count = 0 try: redis = await get_redis_async() @@ -287,9 +453,39 @@ async def _stream_listener( while True: # Block for up to 30 seconds waiting for new messages # This allows periodic checking if task is still running + xread_start = time.perf_counter() + xread_count += 1 messages = await redis.xread( {stream_key: current_id}, block=30000, count=100 ) + xread_time = (time.perf_counter() - xread_start) * 1000 + + if messages: + msg_count = sum(len(msgs) for _, msgs in messages) + logger.info( + f"[TIMING] xread #{xread_count} returned {msg_count} messages in {xread_time:.1f}ms", + extra={ + "json_fields": { + **log_meta, + "xread_count": xread_count, + "n_messages": msg_count, + "duration_ms": xread_time, + } + }, + ) + elif xread_time > 1000: + # Only log timeouts (30s blocking) + logger.info( + f"[TIMING] xread #{xread_count} timeout after {xread_time:.1f}ms", + extra={ + "json_fields": { + **log_meta, + "xread_count": xread_count, + "duration_ms": xread_time, + "reason": "timeout", + } + }, + ) if not messages: # Timeout - check if task is still running @@ -326,10 +522,30 @@ async def _stream_listener( ) # Update last delivered ID on successful delivery last_delivered_id = current_id + messages_delivered += 1 + if first_message_time is None: + first_message_time = time.perf_counter() + elapsed = (first_message_time - start_time) * 1000 + logger.info( + f"[TIMING] FIRST live message at {elapsed:.1f}ms, type={type(chunk).__name__}", + extra={ + "json_fields": { + **log_meta, + "elapsed_ms": elapsed, + "chunk_type": type(chunk).__name__, + } + }, + ) except asyncio.TimeoutError: logger.warning( - f"Subscriber queue full for task {task_id}, " - f"message delivery timed out after {QUEUE_PUT_TIMEOUT}s" + f"[TIMING] Subscriber queue full, delivery timed out after {QUEUE_PUT_TIMEOUT}s", + extra={ + "json_fields": { + **log_meta, + "timeout_s": QUEUE_PUT_TIMEOUT, + "reason": "queue_full", + } + }, ) # Send overflow error with recovery info try: @@ -351,15 +567,44 @@ async def _stream_listener( # Stop listening on finish if isinstance(chunk, StreamFinish): + total_time = (time.perf_counter() - start_time) * 1000 + logger.info( + f"[TIMING] StreamFinish received in {total_time/1000:.1f}s; delivered={messages_delivered}", + extra={ + "json_fields": { + **log_meta, + "total_time_ms": total_time, + "messages_delivered": messages_delivered, + } + }, + ) return except Exception as e: - logger.warning(f"Error processing stream message: {e}") + logger.warning( + f"Error processing stream message: {e}", + extra={"json_fields": {**log_meta, "error": str(e)}}, + ) except asyncio.CancelledError: - logger.debug(f"Stream listener cancelled for task {task_id}") + elapsed = (time.perf_counter() - start_time) * 1000 + logger.info( + f"[TIMING] _stream_listener CANCELLED after {elapsed:.1f}ms, delivered={messages_delivered}", + extra={ + "json_fields": { + **log_meta, + "elapsed_ms": elapsed, + "messages_delivered": messages_delivered, + "reason": "cancelled", + } + }, + ) raise # Re-raise to propagate cancellation except Exception as e: - logger.error(f"Stream listener error for task {task_id}: {e}") + elapsed = (time.perf_counter() - start_time) * 1000 + logger.error( + f"[TIMING] _stream_listener ERROR after {elapsed:.1f}ms: {e}", + extra={"json_fields": {**log_meta, "elapsed_ms": elapsed, "error": str(e)}}, + ) # On error, send finish to unblock subscriber try: await asyncio.wait_for( @@ -368,10 +613,24 @@ async def _stream_listener( ) except (asyncio.TimeoutError, asyncio.QueueFull): logger.warning( - f"Could not deliver finish event for task {task_id} after error" + "Could not deliver finish event after error", + extra={"json_fields": log_meta}, ) finally: # Clean up listener task mapping on exit + total_time = (time.perf_counter() - start_time) * 1000 + logger.info( + f"[TIMING] _stream_listener FINISHED in {total_time/1000:.1f}s; task={task_id}, " + f"delivered={messages_delivered}, xread_count={xread_count}", + extra={ + "json_fields": { + **log_meta, + "total_time_ms": total_time, + "messages_delivered": messages_delivered, + "xread_count": xread_count, + } + }, + ) _listener_tasks.pop(queue_id, None)