From 6572dcdb4f9f80faa44bce985b80f478c3aba995 Mon Sep 17 00:00:00 2001 From: Swifty Date: Mon, 19 Jan 2026 13:58:10 +0100 Subject: [PATCH] improved langfuse tracing --- .../backend/api/features/chat/service.py | 521 ++++++++---------- .../api/features/chat/tools/__init__.py | 2 +- .../features/chat/tools/add_understanding.py | 3 + .../api/features/chat/tools/agent_output.py | 4 +- .../api/features/chat/tools/create_agent.py | 3 + .../api/features/chat/tools/edit_agent.py | 3 + .../api/features/chat/tools/find_agent.py | 3 + .../api/features/chat/tools/find_block.py | 2 + .../features/chat/tools/find_library_agent.py | 3 + .../api/features/chat/tools/get_doc_page.py | 3 + .../api/features/chat/tools/run_agent.py | 2 + .../api/features/chat/tools/run_block.py | 3 + .../api/features/chat/tools/search_docs.py | 2 + 13 files changed, 251 insertions(+), 303 deletions(-) diff --git a/autogpt_platform/backend/backend/api/features/chat/service.py b/autogpt_platform/backend/backend/api/features/chat/service.py index 7b41b040ba..dd300199ec 100644 --- a/autogpt_platform/backend/backend/api/features/chat/service.py +++ b/autogpt_platform/backend/backend/api/features/chat/service.py @@ -4,14 +4,9 @@ from collections.abc import AsyncGenerator from typing import Any import orjson -from langfuse import Langfuse -from openai import ( - APIConnectionError, - APIError, - APIStatusError, - AsyncOpenAI, - RateLimitError, -) +from langfuse import get_client, propagate_attributes +from langfuse.openai import openai # type: ignore +from openai import APIConnectionError, APIError, APIStatusError, RateLimitError from openai.types.chat import ChatCompletionChunk, ChatCompletionToolParam from backend.data.understanding import ( @@ -50,10 +45,10 @@ logger = logging.getLogger(__name__) config = ChatConfig() settings = Settings() -client = AsyncOpenAI(api_key=config.api_key, base_url=config.base_url) +client = openai.AsyncOpenAI(api_key=config.api_key, base_url=config.base_url) -# Langfuse client (lazy initialization) -_langfuse_client: Langfuse | None = None + +langfuse = get_client() class LangfuseNotConfiguredError(Exception): @@ -69,23 +64,6 @@ def _is_langfuse_configured() -> bool: ) -def _get_langfuse_client() -> Langfuse: - """Get or create the Langfuse client for prompt management and tracing.""" - global _langfuse_client - if _langfuse_client is None: - if not _is_langfuse_configured(): - raise LangfuseNotConfiguredError( - "Langfuse is not configured. The chat feature requires Langfuse for prompt management. " - "Please set the LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY environment variables." - ) - _langfuse_client = Langfuse( - public_key=settings.secrets.langfuse_public_key, - secret_key=settings.secrets.langfuse_secret_key, - host=settings.secrets.langfuse_host or "https://cloud.langfuse.com", - ) - return _langfuse_client - - def _get_environment() -> str: """Get the current environment name for Langfuse tagging.""" return settings.config.app_env.value @@ -101,7 +79,6 @@ def _get_langfuse_prompt() -> str: Exception: If Langfuse is unavailable or prompt fetch fails. """ try: - langfuse = _get_langfuse_client() # cache_ttl_seconds=0 disables SDK caching to always get the latest prompt prompt = langfuse.get_prompt(config.langfuse_prompt_name, cache_ttl_seconds=0) compiled = prompt.compile() @@ -139,8 +116,6 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]: Tuple of (compiled prompt string, Langfuse prompt object for tracing) """ - langfuse = _get_langfuse_client() - # cache_ttl_seconds=0 disables SDK caching to always get the latest prompt prompt = langfuse.get_prompt(config.langfuse_prompt_name, cache_ttl_seconds=0) @@ -158,7 +133,7 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]: context = "This is the first time you are meeting the user. Greet them and introduce them to the platform" compiled = prompt.compile(users_information=context) - return compiled, prompt + return compiled, understanding async def _generate_session_title(message: str) -> str | None: @@ -217,6 +192,7 @@ async def assign_user_to_session( async def stream_chat_completion( session_id: str, message: str | None = None, + tool_call_reponse: str | None = None, is_user_message: bool = True, user_id: str | None = None, retry_count: int = 0, @@ -256,11 +232,6 @@ async def stream_chat_completion( yield StreamFinish() return - # Langfuse observations will be created after session is loaded (need messages for input) - # Initialize to None so finally block can safely check and end them - trace = None - generation = None - # Only fetch from Redis if session not provided (initial call) if session is None: session = await get_chat_session(session_id, user_id) @@ -336,297 +307,246 @@ async def stream_chat_completion( asyncio.create_task(_update_title()) # Build system prompt with business understanding - system_prompt, langfuse_prompt = await _build_system_prompt(user_id) - - # Build input messages including system prompt for complete Langfuse logging - trace_input_messages = [{"role": "system", "content": system_prompt}] + [ - m.model_dump() for m in session.messages - ] + system_prompt, understanding = await _build_system_prompt(user_id) # Create Langfuse trace for this LLM call (each call gets its own trace, grouped by session_id) # Using v3 SDK: start_observation creates a root span, update_trace sets trace-level attributes - try: - langfuse = _get_langfuse_client() - env = _get_environment() - trace = langfuse.start_observation( - name="chat_completion", - input={"messages": trace_input_messages}, - metadata={ - "environment": env, - "model": config.model, - "message_count": len(session.messages), - "prompt_name": langfuse_prompt.name if langfuse_prompt else None, - "prompt_version": langfuse_prompt.version if langfuse_prompt else None, - }, - ) - # Set trace-level attributes (session_id, user_id, tags) - trace.update_trace( + input = message + if not message and tool_call_reponse: + input = tool_call_reponse + + langfuse = get_client() + with langfuse.start_as_current_observation( + as_type="span", + name="user-copilot-request", + input=input, + ): + with propagate_attributes( session_id=session_id, user_id=user_id, - tags=[env, "copilot"], - ) - except Exception as e: - logger.warning(f"Failed to create Langfuse trace: {e}") + tags=["copilot"], + metadata={ + "users_information": format_understanding_for_prompt(understanding)[ + :200 + ] # langfuse only accepts upto too chars + }, + ): - # Initialize variables that will be used in finally block (must be defined before try) - assistant_response = ChatMessage( - role="assistant", - content="", - ) - accumulated_tool_calls: list[dict[str, Any]] = [] - - # Wrap main logic in try/finally to ensure Langfuse observations are always ended - try: - has_yielded_end = False - has_yielded_error = False - has_done_tool_call = False - has_received_text = False - text_streaming_ended = False - tool_response_messages: list[ChatMessage] = [] - should_retry = False - - # Generate unique IDs for AI SDK protocol - import uuid as uuid_module - - message_id = str(uuid_module.uuid4()) - text_block_id = str(uuid_module.uuid4()) - - # Yield message start - yield StreamStart(messageId=message_id) - - # Create Langfuse generation for each LLM call, linked to the prompt - # Using v3 SDK: start_observation with as_type="generation" - generation = ( - trace.start_observation( - as_type="generation", - name="llm_call", - model=config.model, - input={"messages": trace_input_messages}, - prompt=langfuse_prompt, + # Initialize variables that will be used in finally block (must be defined before try) + assistant_response = ChatMessage( + role="assistant", + content="", ) - if trace - else None - ) + accumulated_tool_calls: list[dict[str, Any]] = [] - try: - async for chunk in _stream_chat_chunks( - session=session, - tools=tools, - system_prompt=system_prompt, - text_block_id=text_block_id, - ): + # Wrap main logic in try/finally to ensure Langfuse observations are always ended + has_yielded_end = False + has_yielded_error = False + has_done_tool_call = False + has_received_text = False + text_streaming_ended = False + tool_response_messages: list[ChatMessage] = [] + should_retry = False - if isinstance(chunk, StreamTextStart): - # Emit text-start before first text delta - if not has_received_text: + # Generate unique IDs for AI SDK protocol + import uuid as uuid_module + + message_id = str(uuid_module.uuid4()) + text_block_id = str(uuid_module.uuid4()) + + # Yield message start + yield StreamStart(messageId=message_id) + + try: + async for chunk in _stream_chat_chunks( + session=session, + tools=tools, + system_prompt=system_prompt, + text_block_id=text_block_id, + ): + + if isinstance(chunk, StreamTextStart): + # Emit text-start before first text delta + if not has_received_text: + yield chunk + elif isinstance(chunk, StreamTextDelta): + delta = chunk.delta or "" + assert assistant_response.content is not None + assistant_response.content += delta + has_received_text = True yield chunk - elif isinstance(chunk, StreamTextDelta): - delta = chunk.delta or "" - assert assistant_response.content is not None - assistant_response.content += delta - has_received_text = True - yield chunk - elif isinstance(chunk, StreamTextEnd): - # Emit text-end after text completes - if has_received_text and not text_streaming_ended: - text_streaming_ended = True - yield chunk - elif isinstance(chunk, StreamToolInputStart): - # Emit text-end before first tool call, but only if we've received text - if has_received_text and not text_streaming_ended: - yield StreamTextEnd(id=text_block_id) - text_streaming_ended = True - yield chunk - elif isinstance(chunk, StreamToolInputAvailable): - # Accumulate tool calls in OpenAI format - accumulated_tool_calls.append( - { - "id": chunk.toolCallId, - "type": "function", - "function": { - "name": chunk.toolName, - "arguments": orjson.dumps(chunk.input).decode("utf-8"), - }, - } - ) - elif isinstance(chunk, StreamToolOutputAvailable): - result_content = ( - chunk.output - if isinstance(chunk.output, str) - else orjson.dumps(chunk.output).decode("utf-8") - ) - tool_response_messages.append( - ChatMessage( - role="tool", - content=result_content, - tool_call_id=chunk.toolCallId, - ) - ) - has_done_tool_call = True - # Track if any tool execution failed - if not chunk.success: - logger.warning( - f"Tool {chunk.toolName} (ID: {chunk.toolCallId}) execution failed" - ) - yield chunk - elif isinstance(chunk, StreamFinish): - if not has_done_tool_call: - # Emit text-end before finish if we received text but haven't closed it + elif isinstance(chunk, StreamTextEnd): + # Emit text-end after text completes + if has_received_text and not text_streaming_ended: + text_streaming_ended = True + yield chunk + elif isinstance(chunk, StreamToolInputStart): + # Emit text-end before first tool call, but only if we've received text if has_received_text and not text_streaming_ended: yield StreamTextEnd(id=text_block_id) text_streaming_ended = True - has_yielded_end = True yield chunk - elif isinstance(chunk, StreamError): - has_yielded_error = True - elif isinstance(chunk, StreamUsage): - session.usage.append( - Usage( - prompt_tokens=chunk.promptTokens, - completion_tokens=chunk.completionTokens, - total_tokens=chunk.totalTokens, + elif isinstance(chunk, StreamToolInputAvailable): + # Accumulate tool calls in OpenAI format + accumulated_tool_calls.append( + { + "id": chunk.toolCallId, + "type": "function", + "function": { + "name": chunk.toolName, + "arguments": orjson.dumps(chunk.input).decode( + "utf-8" + ), + }, + } ) + elif isinstance(chunk, StreamToolOutputAvailable): + result_content = ( + chunk.output + if isinstance(chunk.output, str) + else orjson.dumps(chunk.output).decode("utf-8") + ) + tool_response_messages.append( + ChatMessage( + role="tool", + content=result_content, + tool_call_id=chunk.toolCallId, + ) + ) + has_done_tool_call = True + # Track if any tool execution failed + if not chunk.success: + logger.warning( + f"Tool {chunk.toolName} (ID: {chunk.toolCallId}) execution failed" + ) + yield chunk + elif isinstance(chunk, StreamFinish): + if not has_done_tool_call: + # Emit text-end before finish if we received text but haven't closed it + if has_received_text and not text_streaming_ended: + yield StreamTextEnd(id=text_block_id) + text_streaming_ended = True + has_yielded_end = True + yield chunk + elif isinstance(chunk, StreamError): + has_yielded_error = True + elif isinstance(chunk, StreamUsage): + session.usage.append( + Usage( + prompt_tokens=chunk.promptTokens, + completion_tokens=chunk.completionTokens, + total_tokens=chunk.totalTokens, + ) + ) + else: + logger.error( + f"Unknown chunk type: {type(chunk)}", exc_info=True + ) + except Exception as e: + logger.error(f"Error during stream: {e!s}", exc_info=True) + + # Check if this is a retryable error (JSON parsing, incomplete tool calls, etc.) + is_retryable = isinstance( + e, (orjson.JSONDecodeError, KeyError, TypeError) + ) + + if is_retryable and retry_count < config.max_retries: + logger.info( + f"Retryable error encountered. Attempt {retry_count + 1}/{config.max_retries}" ) + should_retry = True else: - logger.error(f"Unknown chunk type: {type(chunk)}", exc_info=True) - except Exception as e: - logger.error(f"Error during stream: {e!s}", exc_info=True) + # Non-retryable error or max retries exceeded + # Save any partial progress before reporting error + messages_to_save: list[ChatMessage] = [] - # Check if this is a retryable error (JSON parsing, incomplete tool calls, etc.) - is_retryable = isinstance(e, (orjson.JSONDecodeError, KeyError, TypeError)) + # Add assistant message if it has content or tool calls + if accumulated_tool_calls: + assistant_response.tool_calls = accumulated_tool_calls + if assistant_response.content or assistant_response.tool_calls: + messages_to_save.append(assistant_response) - if is_retryable and retry_count < config.max_retries: + # Add tool response messages after assistant message + messages_to_save.extend(tool_response_messages) + + session.messages.extend(messages_to_save) + await upsert_chat_session(session) + + if not has_yielded_error: + error_message = str(e) + if not is_retryable: + error_message = f"Non-retryable error: {error_message}" + elif retry_count >= config.max_retries: + error_message = f"Max retries ({config.max_retries}) exceeded: {error_message}" + + error_response = StreamError(errorText=error_message) + yield error_response + if not has_yielded_end: + yield StreamFinish() + return + + # Handle retry outside of exception handler to avoid nesting + if should_retry and retry_count < config.max_retries: logger.info( - f"Retryable error encountered. Attempt {retry_count + 1}/{config.max_retries}" + f"Retrying stream_chat_completion for session {session_id}, attempt {retry_count + 1}" ) - should_retry = True - else: - # Non-retryable error or max retries exceeded - # Save any partial progress before reporting error - messages_to_save: list[ChatMessage] = [] + async for chunk in stream_chat_completion( + session_id=session.session_id, + user_id=user_id, + retry_count=retry_count + 1, + session=session, + context=context, + ): + yield chunk + return # Exit after retry to avoid double-saving in finally block - # Add assistant message if it has content or tool calls - if accumulated_tool_calls: - assistant_response.tool_calls = accumulated_tool_calls - if assistant_response.content or assistant_response.tool_calls: - messages_to_save.append(assistant_response) - - # Add tool response messages after assistant message - messages_to_save.extend(tool_response_messages) - - session.messages.extend(messages_to_save) - await upsert_chat_session(session) - - if not has_yielded_error: - error_message = str(e) - if not is_retryable: - error_message = f"Non-retryable error: {error_message}" - elif retry_count >= config.max_retries: - error_message = f"Max retries ({config.max_retries}) exceeded: {error_message}" - - error_response = StreamError(errorText=error_message) - yield error_response - if not has_yielded_end: - yield StreamFinish() - return - - # Handle retry outside of exception handler to avoid nesting - if should_retry and retry_count < config.max_retries: + # Normal completion path - save session and handle tool call continuation logger.info( - f"Retrying stream_chat_completion for session {session_id}, attempt {retry_count + 1}" - ) - async for chunk in stream_chat_completion( - session_id=session.session_id, - user_id=user_id, - retry_count=retry_count + 1, - session=session, - context=context, - ): - yield chunk - return # Exit after retry to avoid double-saving in finally block - - # Normal completion path - save session and handle tool call continuation - logger.info( - f"Normal completion path: session={session.session_id}, " - f"current message_count={len(session.messages)}" - ) - - # Build the messages list in the correct order - messages_to_save: list[ChatMessage] = [] - - # Add assistant message with tool_calls if any - if accumulated_tool_calls: - assistant_response.tool_calls = accumulated_tool_calls - logger.info( - f"Added {len(accumulated_tool_calls)} tool calls to assistant message" - ) - if assistant_response.content or assistant_response.tool_calls: - messages_to_save.append(assistant_response) - logger.info( - f"Saving assistant message with content_len={len(assistant_response.content or '')}, tool_calls={len(assistant_response.tool_calls or [])}" + f"Normal completion path: session={session.session_id}, " + f"current message_count={len(session.messages)}" ) - # Add tool response messages after assistant message - messages_to_save.extend(tool_response_messages) - logger.info( - f"Saving {len(tool_response_messages)} tool response messages, " - f"total_to_save={len(messages_to_save)}" - ) + # Build the messages list in the correct order + messages_to_save: list[ChatMessage] = [] - session.messages.extend(messages_to_save) - logger.info( - f"Extended session messages, new message_count={len(session.messages)}" - ) - await upsert_chat_session(session) - - # If we did a tool call, stream the chat completion again to get the next response - if has_done_tool_call: - logger.info( - "Tool call executed, streaming chat completion again to get assistant response" - ) - async for chunk in stream_chat_completion( - session_id=session.session_id, - user_id=user_id, - session=session, # Pass session object to avoid Redis refetch - context=context, - ): - yield chunk - - finally: - # Always end Langfuse observations to prevent resource leaks - # Guard against None and catch errors to avoid masking original exceptions - if generation is not None: - try: - latest_usage = session.usage[-1] if session.usage else None - generation.update( - model=config.model, - output={ - "content": assistant_response.content, - "tool_calls": accumulated_tool_calls or None, - }, - usage_details=( - { - "input": latest_usage.prompt_tokens, - "output": latest_usage.completion_tokens, - "total": latest_usage.total_tokens, - } - if latest_usage - else None - ), + # Add assistant message with tool_calls if any + if accumulated_tool_calls: + assistant_response.tool_calls = accumulated_tool_calls + logger.info( + f"Added {len(accumulated_tool_calls)} tool calls to assistant message" + ) + if assistant_response.content or assistant_response.tool_calls: + messages_to_save.append(assistant_response) + logger.info( + f"Saving assistant message with content_len={len(assistant_response.content or '')}, tool_calls={len(assistant_response.tool_calls or [])}" ) - generation.end() - except Exception as e: - logger.warning(f"Failed to end Langfuse generation: {e}") - if trace is not None: - try: - if accumulated_tool_calls: - trace.update_trace(output={"tool_calls": accumulated_tool_calls}) - else: - trace.update_trace(output={"response": assistant_response.content}) - trace.end() - except Exception as e: - logger.warning(f"Failed to end Langfuse trace: {e}") + # Add tool response messages after assistant message + messages_to_save.extend(tool_response_messages) + logger.info( + f"Saving {len(tool_response_messages)} tool response messages, " + f"total_to_save={len(messages_to_save)}" + ) + + session.messages.extend(messages_to_save) + logger.info( + f"Extended session messages, new message_count={len(session.messages)}" + ) + await upsert_chat_session(session) + + # If we did a tool call, stream the chat completion again to get the next response + if has_done_tool_call: + logger.info( + "Tool call executed, streaming chat completion again to get assistant response" + ) + async for chunk in stream_chat_completion( + session_id=session.session_id, + user_id=user_id, + session=session, # Pass session object to avoid Redis refetch + context=context, + tool_call_reponse=str(tool_response_messages), + ): + yield chunk # Retry configuration for OpenAI API calls @@ -900,5 +820,4 @@ async def _yield_tool_call( session=session, ) - logger.info(f"Yielding Tool execution response: {tool_execution_response}") yield tool_execution_response diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py b/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py index fc0fdf9064..82ce5cfd6f 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/__init__.py @@ -30,7 +30,7 @@ TOOL_REGISTRY: dict[str, BaseTool] = { "find_library_agent": FindLibraryAgentTool(), "run_agent": RunAgentTool(), "run_block": RunBlockTool(), - "agent_output": AgentOutputTool(), + "view_agent_output": AgentOutputTool(), "search_docs": SearchDocsTool(), "get_doc_page": GetDocPageTool(), } diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/add_understanding.py b/autogpt_platform/backend/backend/api/features/chat/tools/add_understanding.py index fe3d5e8984..bd93f0e2a6 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/add_understanding.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/add_understanding.py @@ -3,6 +3,8 @@ import logging from typing import Any +from langfuse import observe + from backend.api.features.chat.model import ChatSession from backend.data.understanding import ( BusinessUnderstandingInput, @@ -59,6 +61,7 @@ and automations for the user's specific needs.""" """Requires authentication to store user-specific data.""" return True + @observe(as_type="tool", name="add_understanding") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py b/autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py index d81a11362b..00c6d8499b 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py @@ -5,6 +5,7 @@ import re from datetime import datetime, timedelta, timezone from typing import Any +from langfuse import observe from pydantic import BaseModel, field_validator from backend.api.features.chat.model import ChatSession @@ -103,7 +104,7 @@ class AgentOutputTool(BaseTool): @property def name(self) -> str: - return "agent_output" + return "view_agent_output" @property def description(self) -> str: @@ -328,6 +329,7 @@ class AgentOutputTool(BaseTool): total_executions=len(available_executions) if available_executions else 1, ) + @observe(as_type="tool", name="view_agent_output") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py index c8168f473d..26c980c6c5 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py @@ -3,6 +3,8 @@ import logging from typing import Any +from langfuse import observe + from backend.api.features.chat.model import ChatSession from .agent_generator import ( @@ -78,6 +80,7 @@ class CreateAgentTool(BaseTool): "required": ["description"], } + @observe(as_type="tool", name="create_agent") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py index 5aaa166036..a50a89c5c7 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py @@ -3,6 +3,8 @@ import logging from typing import Any +from langfuse import observe + from backend.api.features.chat.model import ChatSession from .agent_generator import ( @@ -85,6 +87,7 @@ class EditAgentTool(BaseTool): "required": ["agent_id", "changes"], } + @observe(as_type="tool", name="edit_agent") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/find_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/find_agent.py index 477522757d..f231ef4484 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/find_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/find_agent.py @@ -2,6 +2,8 @@ from typing import Any +from langfuse import observe + from backend.api.features.chat.model import ChatSession from .agent_search import search_agents @@ -35,6 +37,7 @@ class FindAgentTool(BaseTool): "required": ["query"], } + @observe(as_type="tool", name="find_agent") async def _execute( self, user_id: str | None, session: ChatSession, **kwargs ) -> ToolResponseBase: diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/find_block.py b/autogpt_platform/backend/backend/api/features/chat/tools/find_block.py index a5e66f0a1c..fc20fdfc4a 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/find_block.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/find_block.py @@ -1,6 +1,7 @@ import logging from typing import Any +from langfuse import observe from prisma.enums import ContentType from backend.api.features.chat.model import ChatSession @@ -55,6 +56,7 @@ class FindBlockTool(BaseTool): def requires_auth(self) -> bool: return True + @observe(as_type="tool", name="find_block") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/find_library_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/find_library_agent.py index 108fba75ae..d9b5edfa9b 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/find_library_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/find_library_agent.py @@ -2,6 +2,8 @@ from typing import Any +from langfuse import observe + from backend.api.features.chat.model import ChatSession from .agent_search import search_agents @@ -41,6 +43,7 @@ class FindLibraryAgentTool(BaseTool): def requires_auth(self) -> bool: return True + @observe(as_type="tool", name="find_library_agent") async def _execute( self, user_id: str | None, session: ChatSession, **kwargs ) -> ToolResponseBase: diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/get_doc_page.py b/autogpt_platform/backend/backend/api/features/chat/tools/get_doc_page.py index 7040cd7db5..b2fdcccfcd 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/get_doc_page.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/get_doc_page.py @@ -4,6 +4,8 @@ import logging from pathlib import Path from typing import Any +from langfuse import observe + from backend.api.features.chat.model import ChatSession from backend.api.features.chat.tools.base import BaseTool from backend.api.features.chat.tools.models import ( @@ -71,6 +73,7 @@ class GetDocPageTool(BaseTool): url_path = path.rsplit(".", 1)[0] if "." in path else path return f"{DOCS_BASE_URL}/{url_path}" + @observe(as_type="tool", name="get_doc_page") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py index 1f0a836543..4d93a3af30 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py @@ -3,6 +3,7 @@ import logging from typing import Any +from langfuse import observe from pydantic import BaseModel, Field, field_validator from backend.api.features.chat.config import ChatConfig @@ -154,6 +155,7 @@ class RunAgentTool(BaseTool): """All operations require authentication.""" return True + @observe(as_type="tool", name="run_agent") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/run_block.py b/autogpt_platform/backend/backend/api/features/chat/tools/run_block.py index 48cbcb5e5c..02f493df71 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/run_block.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/run_block.py @@ -4,6 +4,8 @@ import logging from collections import defaultdict from typing import Any +from langfuse import observe + from backend.api.features.chat.model import ChatSession from backend.data.block import get_block from backend.data.execution import ExecutionContext @@ -127,6 +129,7 @@ class RunBlockTool(BaseTool): return matched_credentials, missing_credentials + @observe(as_type="tool", name="run_block") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/search_docs.py b/autogpt_platform/backend/backend/api/features/chat/tools/search_docs.py index edb0c0de1e..4903230b40 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/search_docs.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/search_docs.py @@ -3,6 +3,7 @@ import logging from typing import Any +from langfuse import observe from prisma.enums import ContentType from backend.api.features.chat.model import ChatSession @@ -87,6 +88,7 @@ class SearchDocsTool(BaseTool): url_path = path.rsplit(".", 1)[0] if "." in path else path return f"{DOCS_BASE_URL}/{url_path}" + @observe(as_type="tool", name="search_docs") async def _execute( self, user_id: str | None,