From bab436231a1e330e4213cca0993ad5dbcdc2efea Mon Sep 17 00:00:00 2001 From: Swifty Date: Tue, 27 Jan 2026 13:07:42 +0100 Subject: [PATCH] refactor(backend): remove Langfuse tracing from chat system (#11829) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We are removing Langfuse tracing from the chat/copilot system in favor of using OpenRouter's broadcast feature, which keeps our codebase simpler. Langfuse prompt management is retained for fetching system prompts. ### Changes 🏗️ **Removed Langfuse tracing:** - Removed `@observe` decorators from all 11 chat tool files - Removed `langfuse.openai` wrapper (now using standard `openai` client) - Removed `start_as_current_observation` and `propagate_attributes` context managers from `service.py` - Removed `update_current_trace()`, `update_current_span()`, `span.update()` calls **Retained Langfuse prompt management:** - `langfuse.get_prompt()` for fetching system prompts - `_is_langfuse_configured()` check for prompt availability - Configuration for `langfuse_prompt_name` **Files modified:** - `backend/api/features/chat/service.py` - `backend/api/features/chat/tools/*.py` (11 tool files) ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - [x] Verified `poetry run format` passes - [x] Verified no `@observe` decorators remain in chat tools - [x] Verified Langfuse prompt fetching is still functional (code preserved) --- .../backend/api/features/chat/service.py | 609 ++++++++---------- .../features/chat/tools/add_understanding.py | 3 - .../api/features/chat/tools/agent_output.py | 2 - .../api/features/chat/tools/create_agent.py | 3 - .../api/features/chat/tools/edit_agent.py | 3 - .../api/features/chat/tools/find_agent.py | 3 - .../api/features/chat/tools/find_block.py | 2 - .../features/chat/tools/find_library_agent.py | 3 - .../api/features/chat/tools/get_doc_page.py | 3 - .../api/features/chat/tools/run_agent.py | 2 - .../api/features/chat/tools/run_block.py | 3 - .../api/features/chat/tools/search_docs.py | 2 - 12 files changed, 282 insertions(+), 356 deletions(-) diff --git a/autogpt_platform/backend/backend/api/features/chat/service.py b/autogpt_platform/backend/backend/api/features/chat/service.py index e10343fff6..3976cd5f38 100644 --- a/autogpt_platform/backend/backend/api/features/chat/service.py +++ b/autogpt_platform/backend/backend/api/features/chat/service.py @@ -5,9 +5,9 @@ from asyncio import CancelledError from collections.abc import AsyncGenerator from typing import Any +import openai import orjson -from langfuse import get_client, propagate_attributes -from langfuse.openai import openai # type: ignore +from langfuse import get_client from openai import ( APIConnectionError, APIError, @@ -299,347 +299,302 @@ async def stream_chat_completion( # Build system prompt with business understanding system_prompt, understanding = await _build_system_prompt(user_id) - # Create Langfuse trace for this LLM call (each call gets its own trace, grouped by session_id) - # Using v3 SDK: start_observation creates a root span, update_trace sets trace-level attributes - input = message - if not message and tool_call_response: - input = tool_call_response + # Initialize variables for streaming + assistant_response = ChatMessage( + role="assistant", + content="", + ) + accumulated_tool_calls: list[dict[str, Any]] = [] + has_saved_assistant_message = False + has_appended_streaming_message = False + last_cache_time = 0.0 + last_cache_content_len = 0 - langfuse = get_client() - with langfuse.start_as_current_observation( - as_type="span", - name="user-copilot-request", - input=input, - ) as span: - with propagate_attributes( - session_id=session_id, - user_id=user_id, - tags=["copilot"], - metadata={ - "users_information": format_understanding_for_prompt(understanding)[ - :200 - ] # langfuse only accepts upto to 200 chars - }, + has_yielded_end = False + has_yielded_error = False + has_done_tool_call = False + has_received_text = False + text_streaming_ended = False + tool_response_messages: list[ChatMessage] = [] + should_retry = False + + # Generate unique IDs for AI SDK protocol + import uuid as uuid_module + + message_id = str(uuid_module.uuid4()) + text_block_id = str(uuid_module.uuid4()) + + # Yield message start + yield StreamStart(messageId=message_id) + + try: + async for chunk in _stream_chat_chunks( + session=session, + tools=tools, + system_prompt=system_prompt, + text_block_id=text_block_id, ): - # Initialize variables that will be used in finally block (must be defined before try) - assistant_response = ChatMessage( - role="assistant", - content="", - ) - accumulated_tool_calls: list[dict[str, Any]] = [] - has_saved_assistant_message = False - has_appended_streaming_message = False - last_cache_time = 0.0 - last_cache_content_len = 0 - - # Wrap main logic in try/finally to ensure Langfuse observations are always ended - has_yielded_end = False - has_yielded_error = False - has_done_tool_call = False - has_received_text = False - text_streaming_ended = False - tool_response_messages: list[ChatMessage] = [] - should_retry = False - - # Generate unique IDs for AI SDK protocol - import uuid as uuid_module - - message_id = str(uuid_module.uuid4()) - text_block_id = str(uuid_module.uuid4()) - - # Yield message start - yield StreamStart(messageId=message_id) - - try: - async for chunk in _stream_chat_chunks( - session=session, - tools=tools, - system_prompt=system_prompt, - text_block_id=text_block_id, + if isinstance(chunk, StreamTextStart): + # Emit text-start before first text delta + if not has_received_text: + yield chunk + elif isinstance(chunk, StreamTextDelta): + delta = chunk.delta or "" + assert assistant_response.content is not None + assistant_response.content += delta + has_received_text = True + if not has_appended_streaming_message: + session.messages.append(assistant_response) + has_appended_streaming_message = True + current_time = time.monotonic() + content_len = len(assistant_response.content) + if ( + current_time - last_cache_time >= 1.0 + and content_len > last_cache_content_len ): - - if isinstance(chunk, StreamTextStart): - # Emit text-start before first text delta - if not has_received_text: - yield chunk - elif isinstance(chunk, StreamTextDelta): - delta = chunk.delta or "" - assert assistant_response.content is not None - assistant_response.content += delta - has_received_text = True - if not has_appended_streaming_message: - session.messages.append(assistant_response) - has_appended_streaming_message = True - current_time = time.monotonic() - content_len = len(assistant_response.content) - if ( - current_time - last_cache_time >= 1.0 - and content_len > last_cache_content_len - ): - try: - await cache_chat_session(session) - except Exception as e: - logger.warning( - f"Failed to cache partial session {session.session_id}: {e}" - ) - last_cache_time = current_time - last_cache_content_len = content_len - yield chunk - elif isinstance(chunk, StreamTextEnd): - # Emit text-end after text completes - if has_received_text and not text_streaming_ended: - text_streaming_ended = True - if assistant_response.content: - logger.warn( - f"StreamTextEnd: Attempting to set output {assistant_response.content}" - ) - span.update_trace(output=assistant_response.content) - span.update(output=assistant_response.content) - yield chunk - elif isinstance(chunk, StreamToolInputStart): - # Emit text-end before first tool call, but only if we've received text - if has_received_text and not text_streaming_ended: - yield StreamTextEnd(id=text_block_id) - text_streaming_ended = True - yield chunk - elif isinstance(chunk, StreamToolInputAvailable): - # Accumulate tool calls in OpenAI format - accumulated_tool_calls.append( - { - "id": chunk.toolCallId, - "type": "function", - "function": { - "name": chunk.toolName, - "arguments": orjson.dumps(chunk.input).decode( - "utf-8" - ), - }, - } - ) - elif isinstance(chunk, StreamToolOutputAvailable): - result_content = ( - chunk.output - if isinstance(chunk.output, str) - else orjson.dumps(chunk.output).decode("utf-8") - ) - tool_response_messages.append( - ChatMessage( - role="tool", - content=result_content, - tool_call_id=chunk.toolCallId, - ) - ) - has_done_tool_call = True - # Track if any tool execution failed - if not chunk.success: - logger.warning( - f"Tool {chunk.toolName} (ID: {chunk.toolCallId}) execution failed" - ) - yield chunk - elif isinstance(chunk, StreamFinish): - if not has_done_tool_call: - # Emit text-end before finish if we received text but haven't closed it - if has_received_text and not text_streaming_ended: - yield StreamTextEnd(id=text_block_id) - text_streaming_ended = True - - # Save assistant message before yielding finish to ensure it's persisted - # even if client disconnects immediately after receiving StreamFinish - if not has_saved_assistant_message: - messages_to_save_early: list[ChatMessage] = [] - if accumulated_tool_calls: - assistant_response.tool_calls = ( - accumulated_tool_calls - ) - if not has_appended_streaming_message and ( - assistant_response.content - or assistant_response.tool_calls - ): - messages_to_save_early.append(assistant_response) - messages_to_save_early.extend(tool_response_messages) - - if messages_to_save_early: - session.messages.extend(messages_to_save_early) - logger.info( - f"Saving assistant message before StreamFinish: " - f"content_len={len(assistant_response.content or '')}, " - f"tool_calls={len(assistant_response.tool_calls or [])}, " - f"tool_responses={len(tool_response_messages)}" - ) - if ( - messages_to_save_early - or has_appended_streaming_message - ): - await upsert_chat_session(session) - has_saved_assistant_message = True - - has_yielded_end = True - yield chunk - elif isinstance(chunk, StreamError): - has_yielded_error = True - yield chunk - elif isinstance(chunk, StreamUsage): - session.usage.append( - Usage( - prompt_tokens=chunk.promptTokens, - completion_tokens=chunk.completionTokens, - total_tokens=chunk.totalTokens, - ) - ) - else: - logger.error( - f"Unknown chunk type: {type(chunk)}", exc_info=True - ) - if assistant_response.content: - langfuse.update_current_trace(output=assistant_response.content) - langfuse.update_current_span(output=assistant_response.content) - elif tool_response_messages: - langfuse.update_current_trace(output=str(tool_response_messages)) - langfuse.update_current_span(output=str(tool_response_messages)) - - except CancelledError: - if not has_saved_assistant_message: - if accumulated_tool_calls: - assistant_response.tool_calls = accumulated_tool_calls - if assistant_response.content: - assistant_response.content = ( - f"{assistant_response.content}\n\n[interrupted]" - ) - else: - assistant_response.content = "[interrupted]" - if not has_appended_streaming_message: - session.messages.append(assistant_response) - if tool_response_messages: - session.messages.extend(tool_response_messages) try: - await upsert_chat_session(session) + await cache_chat_session(session) except Exception as e: logger.warning( - f"Failed to save interrupted session {session.session_id}: {e}" + f"Failed to cache partial session {session.session_id}: {e}" ) - raise - except Exception as e: - logger.error(f"Error during stream: {e!s}", exc_info=True) - - # Check if this is a retryable error (JSON parsing, incomplete tool calls, etc.) - is_retryable = isinstance( - e, (orjson.JSONDecodeError, KeyError, TypeError) - ) - - if is_retryable and retry_count < config.max_retries: - logger.info( - f"Retryable error encountered. Attempt {retry_count + 1}/{config.max_retries}" - ) - should_retry = True - else: - # Non-retryable error or max retries exceeded - # Save any partial progress before reporting error - messages_to_save: list[ChatMessage] = [] - - # Add assistant message if it has content or tool calls - if accumulated_tool_calls: - assistant_response.tool_calls = accumulated_tool_calls - if not has_appended_streaming_message and ( - assistant_response.content or assistant_response.tool_calls - ): - messages_to_save.append(assistant_response) - - # Add tool response messages after assistant message - messages_to_save.extend(tool_response_messages) - - if not has_saved_assistant_message: - if messages_to_save: - session.messages.extend(messages_to_save) - if messages_to_save or has_appended_streaming_message: - await upsert_chat_session(session) - - if not has_yielded_error: - error_message = str(e) - if not is_retryable: - error_message = f"Non-retryable error: {error_message}" - elif retry_count >= config.max_retries: - error_message = f"Max retries ({config.max_retries}) exceeded: {error_message}" - - error_response = StreamError(errorText=error_message) - yield error_response - if not has_yielded_end: - yield StreamFinish() - return - - # Handle retry outside of exception handler to avoid nesting - if should_retry and retry_count < config.max_retries: - logger.info( - f"Retrying stream_chat_completion for session {session_id}, attempt {retry_count + 1}" - ) - async for chunk in stream_chat_completion( - session_id=session.session_id, - user_id=user_id, - retry_count=retry_count + 1, - session=session, - context=context, - ): + last_cache_time = current_time + last_cache_content_len = content_len + yield chunk + elif isinstance(chunk, StreamTextEnd): + # Emit text-end after text completes + if has_received_text and not text_streaming_ended: + text_streaming_ended = True yield chunk - return # Exit after retry to avoid double-saving in finally block + elif isinstance(chunk, StreamToolInputStart): + # Emit text-end before first tool call, but only if we've received text + if has_received_text and not text_streaming_ended: + yield StreamTextEnd(id=text_block_id) + text_streaming_ended = True + yield chunk + elif isinstance(chunk, StreamToolInputAvailable): + # Accumulate tool calls in OpenAI format + accumulated_tool_calls.append( + { + "id": chunk.toolCallId, + "type": "function", + "function": { + "name": chunk.toolName, + "arguments": orjson.dumps(chunk.input).decode("utf-8"), + }, + } + ) + yield chunk + elif isinstance(chunk, StreamToolOutputAvailable): + result_content = ( + chunk.output + if isinstance(chunk.output, str) + else orjson.dumps(chunk.output).decode("utf-8") + ) + tool_response_messages.append( + ChatMessage( + role="tool", + content=result_content, + tool_call_id=chunk.toolCallId, + ) + ) + has_done_tool_call = True + # Track if any tool execution failed + if not chunk.success: + logger.warning( + f"Tool {chunk.toolName} (ID: {chunk.toolCallId}) execution failed" + ) + yield chunk + elif isinstance(chunk, StreamFinish): + if not has_done_tool_call: + # Emit text-end before finish if we received text but haven't closed it + if has_received_text and not text_streaming_ended: + yield StreamTextEnd(id=text_block_id) + text_streaming_ended = True + + # Save assistant message before yielding finish to ensure it's persisted + # even if client disconnects immediately after receiving StreamFinish + if not has_saved_assistant_message: + messages_to_save_early: list[ChatMessage] = [] + if accumulated_tool_calls: + assistant_response.tool_calls = accumulated_tool_calls + if not has_appended_streaming_message and ( + assistant_response.content or assistant_response.tool_calls + ): + messages_to_save_early.append(assistant_response) + messages_to_save_early.extend(tool_response_messages) + + if messages_to_save_early: + session.messages.extend(messages_to_save_early) + logger.info( + f"Saving assistant message before StreamFinish: " + f"content_len={len(assistant_response.content or '')}, " + f"tool_calls={len(assistant_response.tool_calls or [])}, " + f"tool_responses={len(tool_response_messages)}" + ) + if messages_to_save_early or has_appended_streaming_message: + await upsert_chat_session(session) + has_saved_assistant_message = True + + has_yielded_end = True + yield chunk + elif isinstance(chunk, StreamError): + has_yielded_error = True + yield chunk + elif isinstance(chunk, StreamUsage): + session.usage.append( + Usage( + prompt_tokens=chunk.promptTokens, + completion_tokens=chunk.completionTokens, + total_tokens=chunk.totalTokens, + ) + ) + else: + logger.error(f"Unknown chunk type: {type(chunk)}", exc_info=True) + + except CancelledError: + if not has_saved_assistant_message: + if accumulated_tool_calls: + assistant_response.tool_calls = accumulated_tool_calls + if assistant_response.content: + assistant_response.content = ( + f"{assistant_response.content}\n\n[interrupted]" + ) + else: + assistant_response.content = "[interrupted]" + if not has_appended_streaming_message: + session.messages.append(assistant_response) + if tool_response_messages: + session.messages.extend(tool_response_messages) + try: + await upsert_chat_session(session) + except Exception as e: + logger.warning( + f"Failed to save interrupted session {session.session_id}: {e}" + ) + raise + except Exception as e: + logger.error(f"Error during stream: {e!s}", exc_info=True) + + # Check if this is a retryable error (JSON parsing, incomplete tool calls, etc.) + is_retryable = isinstance(e, (orjson.JSONDecodeError, KeyError, TypeError)) + + if is_retryable and retry_count < config.max_retries: + logger.info( + f"Retryable error encountered. Attempt {retry_count + 1}/{config.max_retries}" + ) + should_retry = True + else: + # Non-retryable error or max retries exceeded + # Save any partial progress before reporting error + messages_to_save: list[ChatMessage] = [] + + # Add assistant message if it has content or tool calls + if accumulated_tool_calls: + assistant_response.tool_calls = accumulated_tool_calls + if not has_appended_streaming_message and ( + assistant_response.content or assistant_response.tool_calls + ): + messages_to_save.append(assistant_response) + + # Add tool response messages after assistant message + messages_to_save.extend(tool_response_messages) - # Normal completion path - save session and handle tool call continuation - # Only save if we haven't already saved when StreamFinish was received if not has_saved_assistant_message: - logger.info( - f"Normal completion path: session={session.session_id}, " - f"current message_count={len(session.messages)}" - ) - - # Build the messages list in the correct order - messages_to_save: list[ChatMessage] = [] - - # Add assistant message with tool_calls if any - if accumulated_tool_calls: - assistant_response.tool_calls = accumulated_tool_calls - logger.info( - f"Added {len(accumulated_tool_calls)} tool calls to assistant message" - ) - if not has_appended_streaming_message and ( - assistant_response.content or assistant_response.tool_calls - ): - messages_to_save.append(assistant_response) - logger.info( - f"Saving assistant message with content_len={len(assistant_response.content or '')}, tool_calls={len(assistant_response.tool_calls or [])}" - ) - - # Add tool response messages after assistant message - messages_to_save.extend(tool_response_messages) - logger.info( - f"Saving {len(tool_response_messages)} tool response messages, " - f"total_to_save={len(messages_to_save)}" - ) - if messages_to_save: session.messages.extend(messages_to_save) - logger.info( - f"Extended session messages, new message_count={len(session.messages)}" - ) if messages_to_save or has_appended_streaming_message: await upsert_chat_session(session) - else: - logger.info( - "Assistant message already saved when StreamFinish was received, " - "skipping duplicate save" - ) - # If we did a tool call, stream the chat completion again to get the next response - if has_done_tool_call: - logger.info( - "Tool call executed, streaming chat completion again to get assistant response" - ) - async for chunk in stream_chat_completion( - session_id=session.session_id, - user_id=user_id, - session=session, # Pass session object to avoid Redis refetch - context=context, - tool_call_response=str(tool_response_messages), - ): - yield chunk + if not has_yielded_error: + error_message = str(e) + if not is_retryable: + error_message = f"Non-retryable error: {error_message}" + elif retry_count >= config.max_retries: + error_message = ( + f"Max retries ({config.max_retries}) exceeded: {error_message}" + ) + + error_response = StreamError(errorText=error_message) + yield error_response + if not has_yielded_end: + yield StreamFinish() + return + + # Handle retry outside of exception handler to avoid nesting + if should_retry and retry_count < config.max_retries: + logger.info( + f"Retrying stream_chat_completion for session {session_id}, attempt {retry_count + 1}" + ) + async for chunk in stream_chat_completion( + session_id=session.session_id, + user_id=user_id, + retry_count=retry_count + 1, + session=session, + context=context, + ): + yield chunk + return # Exit after retry to avoid double-saving in finally block + + # Normal completion path - save session and handle tool call continuation + # Only save if we haven't already saved when StreamFinish was received + if not has_saved_assistant_message: + logger.info( + f"Normal completion path: session={session.session_id}, " + f"current message_count={len(session.messages)}" + ) + + # Build the messages list in the correct order + messages_to_save: list[ChatMessage] = [] + + # Add assistant message with tool_calls if any + if accumulated_tool_calls: + assistant_response.tool_calls = accumulated_tool_calls + logger.info( + f"Added {len(accumulated_tool_calls)} tool calls to assistant message" + ) + if not has_appended_streaming_message and ( + assistant_response.content or assistant_response.tool_calls + ): + messages_to_save.append(assistant_response) + logger.info( + f"Saving assistant message with content_len={len(assistant_response.content or '')}, tool_calls={len(assistant_response.tool_calls or [])}" + ) + + # Add tool response messages after assistant message + messages_to_save.extend(tool_response_messages) + logger.info( + f"Saving {len(tool_response_messages)} tool response messages, " + f"total_to_save={len(messages_to_save)}" + ) + + if messages_to_save: + session.messages.extend(messages_to_save) + logger.info( + f"Extended session messages, new message_count={len(session.messages)}" + ) + if messages_to_save or has_appended_streaming_message: + await upsert_chat_session(session) + else: + logger.info( + "Assistant message already saved when StreamFinish was received, " + "skipping duplicate save" + ) + + # If we did a tool call, stream the chat completion again to get the next response + if has_done_tool_call: + logger.info( + "Tool call executed, streaming chat completion again to get assistant response" + ) + async for chunk in stream_chat_completion( + session_id=session.session_id, + user_id=user_id, + session=session, # Pass session object to avoid Redis refetch + context=context, + tool_call_response=str(tool_response_messages), + ): + yield chunk # Retry configuration for OpenAI API calls diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/add_understanding.py b/autogpt_platform/backend/backend/api/features/chat/tools/add_understanding.py index bd93f0e2a6..fe3d5e8984 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/add_understanding.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/add_understanding.py @@ -3,8 +3,6 @@ import logging from typing import Any -from langfuse import observe - from backend.api.features.chat.model import ChatSession from backend.data.understanding import ( BusinessUnderstandingInput, @@ -61,7 +59,6 @@ and automations for the user's specific needs.""" """Requires authentication to store user-specific data.""" return True - @observe(as_type="tool", name="add_understanding") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py b/autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py index 00c6d8499b..457e4a4f9b 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/agent_output.py @@ -5,7 +5,6 @@ import re from datetime import datetime, timedelta, timezone from typing import Any -from langfuse import observe from pydantic import BaseModel, field_validator from backend.api.features.chat.model import ChatSession @@ -329,7 +328,6 @@ class AgentOutputTool(BaseTool): total_executions=len(available_executions) if available_executions else 1, ) - @observe(as_type="tool", name="view_agent_output") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py index 5a3c44fb94..6469cc4442 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/create_agent.py @@ -3,8 +3,6 @@ import logging from typing import Any -from langfuse import observe - from backend.api.features.chat.model import ChatSession from .agent_generator import ( @@ -75,7 +73,6 @@ class CreateAgentTool(BaseTool): "required": ["description"], } - @observe(as_type="tool", name="create_agent") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py index 777c39a254..df1e4a9c3e 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/edit_agent.py @@ -3,8 +3,6 @@ import logging from typing import Any -from langfuse import observe - from backend.api.features.chat.model import ChatSession from .agent_generator import ( @@ -81,7 +79,6 @@ class EditAgentTool(BaseTool): "required": ["agent_id", "changes"], } - @observe(as_type="tool", name="edit_agent") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/find_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/find_agent.py index f231ef4484..477522757d 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/find_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/find_agent.py @@ -2,8 +2,6 @@ from typing import Any -from langfuse import observe - from backend.api.features.chat.model import ChatSession from .agent_search import search_agents @@ -37,7 +35,6 @@ class FindAgentTool(BaseTool): "required": ["query"], } - @observe(as_type="tool", name="find_agent") async def _execute( self, user_id: str | None, session: ChatSession, **kwargs ) -> ToolResponseBase: diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/find_block.py b/autogpt_platform/backend/backend/api/features/chat/tools/find_block.py index fc20fdfc4a..a5e66f0a1c 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/find_block.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/find_block.py @@ -1,7 +1,6 @@ import logging from typing import Any -from langfuse import observe from prisma.enums import ContentType from backend.api.features.chat.model import ChatSession @@ -56,7 +55,6 @@ class FindBlockTool(BaseTool): def requires_auth(self) -> bool: return True - @observe(as_type="tool", name="find_block") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/find_library_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/find_library_agent.py index d9b5edfa9b..108fba75ae 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/find_library_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/find_library_agent.py @@ -2,8 +2,6 @@ from typing import Any -from langfuse import observe - from backend.api.features.chat.model import ChatSession from .agent_search import search_agents @@ -43,7 +41,6 @@ class FindLibraryAgentTool(BaseTool): def requires_auth(self) -> bool: return True - @observe(as_type="tool", name="find_library_agent") async def _execute( self, user_id: str | None, session: ChatSession, **kwargs ) -> ToolResponseBase: diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/get_doc_page.py b/autogpt_platform/backend/backend/api/features/chat/tools/get_doc_page.py index b2fdcccfcd..7040cd7db5 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/get_doc_page.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/get_doc_page.py @@ -4,8 +4,6 @@ import logging from pathlib import Path from typing import Any -from langfuse import observe - from backend.api.features.chat.model import ChatSession from backend.api.features.chat.tools.base import BaseTool from backend.api.features.chat.tools.models import ( @@ -73,7 +71,6 @@ class GetDocPageTool(BaseTool): url_path = path.rsplit(".", 1)[0] if "." in path else path return f"{DOCS_BASE_URL}/{url_path}" - @observe(as_type="tool", name="get_doc_page") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py b/autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py index 88d432a797..a7fa65348a 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/run_agent.py @@ -3,7 +3,6 @@ import logging from typing import Any -from langfuse import observe from pydantic import BaseModel, Field, field_validator from backend.api.features.chat.config import ChatConfig @@ -159,7 +158,6 @@ class RunAgentTool(BaseTool): """All operations require authentication.""" return True - @observe(as_type="tool", name="run_agent") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/run_block.py b/autogpt_platform/backend/backend/api/features/chat/tools/run_block.py index 0d233fcfec..3f57236564 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/run_block.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/run_block.py @@ -4,8 +4,6 @@ import logging from collections import defaultdict from typing import Any -from langfuse import observe - from backend.api.features.chat.model import ChatSession from backend.data.block import get_block from backend.data.execution import ExecutionContext @@ -130,7 +128,6 @@ class RunBlockTool(BaseTool): return matched_credentials, missing_credentials - @observe(as_type="tool", name="run_block") async def _execute( self, user_id: str | None, diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/search_docs.py b/autogpt_platform/backend/backend/api/features/chat/tools/search_docs.py index 4903230b40..edb0c0de1e 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/search_docs.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/search_docs.py @@ -3,7 +3,6 @@ import logging from typing import Any -from langfuse import observe from prisma.enums import ContentType from backend.api.features.chat.model import ChatSession @@ -88,7 +87,6 @@ class SearchDocsTool(BaseTool): url_path = path.rsplit(".", 1)[0] if "." in path else path return f"{DOCS_BASE_URL}/{url_path}" - @observe(as_type="tool", name="search_docs") async def _execute( self, user_id: str | None,