diff --git a/autogpt_platform/backend/backend/api/features/chat/response_model.py b/autogpt_platform/backend/backend/api/features/chat/response_model.py index 07849d02da..1ae836f7d1 100644 --- a/autogpt_platform/backend/backend/api/features/chat/response_model.py +++ b/autogpt_platform/backend/backend/api/features/chat/response_model.py @@ -18,6 +18,10 @@ class ResponseType(str, Enum): START = "start" FINISH = "finish" + # Step lifecycle (one LLM API call within a message) + START_STEP = "start-step" + FINISH_STEP = "finish-step" + # Text streaming TEXT_START = "text-start" TEXT_DELTA = "text-delta" @@ -74,6 +78,26 @@ class StreamFinish(StreamBaseResponse): type: ResponseType = ResponseType.FINISH +class StreamStartStep(StreamBaseResponse): + """Start of a step (one LLM API call within a message). + + The AI SDK uses this to add a step-start boundary to message.parts, + enabling visual separation between multiple LLM calls in a single message. + """ + + type: ResponseType = ResponseType.START_STEP + + +class StreamFinishStep(StreamBaseResponse): + """End of a step (one LLM API call within a message). + + The AI SDK uses this to reset activeTextParts and activeReasoningParts, + so the next LLM call in a tool-call continuation starts with clean state. + """ + + type: ResponseType = ResponseType.FINISH_STEP + + # ========== Text Streaming ========== diff --git a/autogpt_platform/backend/backend/api/features/chat/routes.py b/autogpt_platform/backend/backend/api/features/chat/routes.py index 9b021c0601..cb855f2c62 100644 --- a/autogpt_platform/backend/backend/api/features/chat/routes.py +++ b/autogpt_platform/backend/backend/api/features/chat/routes.py @@ -17,7 +17,7 @@ from . import stream_registry from .completion_handler import process_operation_failure, process_operation_success from .config import ChatConfig from .model import ChatSession, create_chat_session, get_chat_session, get_user_sessions -from .response_model import StreamFinish, StreamHeartbeat, StreamStart +from .response_model import StreamFinish, StreamHeartbeat from .tools.models import ( AgentDetailsResponse, AgentOutputResponse, @@ -306,10 +306,6 @@ async def stream_chat_post( # Background task that runs the AI generation independently of SSE connection async def run_ai_generation(): try: - # Emit a start event with task_id for reconnection - start_chunk = StreamStart(messageId=task_id, taskId=task_id) - await stream_registry.publish_chunk(task_id, start_chunk) - async for chunk in chat_service.stream_chat_completion( session_id, request.message, @@ -317,6 +313,7 @@ async def stream_chat_post( user_id=user_id, session=session, # Pass pre-fetched session to avoid double-fetch context=request.context, + _task_id=task_id, # Pass task_id so service emits start with taskId for reconnection ): # Write to Redis (subscribers will receive via XREAD) await stream_registry.publish_chunk(task_id, chunk) diff --git a/autogpt_platform/backend/backend/api/features/chat/service.py b/autogpt_platform/backend/backend/api/features/chat/service.py index 501721fc41..e4b679b286 100644 --- a/autogpt_platform/backend/backend/api/features/chat/service.py +++ b/autogpt_platform/backend/backend/api/features/chat/service.py @@ -52,8 +52,10 @@ from .response_model import ( StreamBaseResponse, StreamError, StreamFinish, + StreamFinishStep, StreamHeartbeat, StreamStart, + StreamStartStep, StreamTextDelta, StreamTextEnd, StreamTextStart, @@ -354,6 +356,7 @@ async def stream_chat_completion( _continuation_message_id: ( str | None ) = None, # Internal: reuse message ID for tool call continuations + _task_id: str | None = None, # Internal: task ID for SSE reconnection support ) -> AsyncGenerator[StreamBaseResponse, None]: """Main entry point for streaming chat completions with database handling. @@ -486,8 +489,13 @@ async def stream_chat_completion( message_id = _continuation_message_id or str(uuid_module.uuid4()) text_block_id = str(uuid_module.uuid4()) + # Only yield message start for the initial call, not for continuations. + # This is the single place where StreamStart is emitted (removed from routes.py). if not is_continuation: - yield StreamStart(messageId=message_id) + yield StreamStart(messageId=message_id, taskId=_task_id) + + # Emit start-step before each LLM call (AI SDK uses this to add step boundaries) + yield StreamStartStep() try: async for chunk in _stream_chat_chunks( @@ -589,6 +597,10 @@ async def stream_chat_completion( ) yield chunk elif isinstance(chunk, StreamFinish): + if has_done_tool_call: + # Tool calls happened — close the step but don't send message-level finish. + # The continuation will open a new step, and finish will come at the end. + yield StreamFinishStep() if not has_done_tool_call: # Emit text-end before finish if we received text but haven't closed it if has_received_text and not text_streaming_ended: @@ -620,6 +632,8 @@ async def stream_chat_completion( has_saved_assistant_message = True has_yielded_end = True + # Emit finish-step before finish (resets AI SDK text/reasoning state) + yield StreamFinishStep() yield chunk elif isinstance(chunk, StreamError): has_yielded_error = True @@ -704,6 +718,7 @@ async def stream_chat_completion( error_response = StreamError(errorText=error_message) yield error_response if not has_yielded_end: + yield StreamFinishStep() yield StreamFinish() return @@ -719,6 +734,7 @@ async def stream_chat_completion( session=session, context=context, _continuation_message_id=message_id, # Reuse message ID since start was already sent + _task_id=_task_id, ): yield chunk return # Exit after retry to avoid double-saving in finally block @@ -789,6 +805,7 @@ async def stream_chat_completion( context=context, tool_call_response=str(tool_response_messages), _continuation_message_id=message_id, # Reuse message ID to avoid duplicates + _task_id=_task_id, ): yield chunk @@ -1571,6 +1588,7 @@ async def _execute_long_running_tool_with_streaming( task_id, StreamError(errorText=str(e)), ) + await stream_registry.publish_chunk(task_id, StreamFinishStep()) await stream_registry.publish_chunk(task_id, StreamFinish()) await _update_pending_operation( @@ -1828,6 +1846,7 @@ async def _generate_llm_continuation_with_streaming( # Publish start event await stream_registry.publish_chunk(task_id, StreamStart(messageId=message_id)) + await stream_registry.publish_chunk(task_id, StreamStartStep()) await stream_registry.publish_chunk(task_id, StreamTextStart(id=text_block_id)) # Stream the response @@ -1851,6 +1870,7 @@ async def _generate_llm_continuation_with_streaming( # Publish end events await stream_registry.publish_chunk(task_id, StreamTextEnd(id=text_block_id)) + await stream_registry.publish_chunk(task_id, StreamFinishStep()) if assistant_content: # Reload session from DB to avoid race condition with user messages @@ -1892,4 +1912,5 @@ async def _generate_llm_continuation_with_streaming( task_id, StreamError(errorText=f"Failed to generate response: {e}"), ) + await stream_registry.publish_chunk(task_id, StreamFinishStep()) await stream_registry.publish_chunk(task_id, StreamFinish()) diff --git a/autogpt_platform/backend/backend/api/features/chat/stream_registry.py b/autogpt_platform/backend/backend/api/features/chat/stream_registry.py index 88a5023e2b..739ccdfe4b 100644 --- a/autogpt_platform/backend/backend/api/features/chat/stream_registry.py +++ b/autogpt_platform/backend/backend/api/features/chat/stream_registry.py @@ -598,8 +598,10 @@ def _reconstruct_chunk(chunk_data: dict) -> StreamBaseResponse | None: ResponseType, StreamError, StreamFinish, + StreamFinishStep, StreamHeartbeat, StreamStart, + StreamStartStep, StreamTextDelta, StreamTextEnd, StreamTextStart, @@ -613,6 +615,8 @@ def _reconstruct_chunk(chunk_data: dict) -> StreamBaseResponse | None: type_to_class: dict[str, type[StreamBaseResponse]] = { ResponseType.START.value: StreamStart, ResponseType.FINISH.value: StreamFinish, + ResponseType.START_STEP.value: StreamStartStep, + ResponseType.FINISH_STEP.value: StreamFinishStep, ResponseType.TEXT_START.value: StreamTextStart, ResponseType.TEXT_DELTA.value: StreamTextDelta, ResponseType.TEXT_END.value: StreamTextEnd,