From 0e88dd15b2e948a376f5b63df88955f3bfb3a822 Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Thu, 12 Feb 2026 03:35:16 +0000 Subject: [PATCH] feat(chat): add hook-based tracing integration for Claude Agent SDK - Add create_tracing_hooks() for fine-grained tool timing - Add merge_hooks() utility to combine security + tracing hooks - Captures precise pre/post timing for tool executions - Tracks tool failures via PostToolUseFailure hook - Integrates seamlessly with existing security hooks --- .../backend/api/features/chat/sdk/service.py | 14 ++- .../backend/api/features/chat/sdk/tracing.py | 99 +++++++++++++++++++ 2 files changed, 109 insertions(+), 4 deletions(-) diff --git a/autogpt_platform/backend/backend/api/features/chat/sdk/service.py b/autogpt_platform/backend/backend/api/features/chat/sdk/service.py index 951003ab3f..19cb3ff99e 100644 --- a/autogpt_platform/backend/backend/api/features/chat/sdk/service.py +++ b/autogpt_platform/backend/backend/api/features/chat/sdk/service.py @@ -38,7 +38,7 @@ from .tool_adapter import ( create_copilot_mcp_server, set_execution_context, ) -from .tracing import TracedSession +from .tracing import TracedSession, create_tracing_hooks, merge_hooks logger = logging.getLogger(__name__) config = ChatConfig() @@ -317,11 +317,19 @@ async def stream_chat_completion_sdk( mcp_server = create_copilot_mcp_server() + # Initialize Langfuse tracing (no-op if not configured) + tracer = TracedSession(session_id, user_id, system_prompt) + + # Merge security hooks with optional tracing hooks + security_hooks = create_security_hooks(user_id, sdk_cwd=sdk_cwd) + tracing_hooks = create_tracing_hooks(tracer) + combined_hooks = merge_hooks(security_hooks, tracing_hooks) + options = ClaudeAgentOptions( system_prompt=system_prompt, mcp_servers={"copilot": mcp_server}, # type: ignore[arg-type] allowed_tools=COPILOT_TOOL_NAMES, - hooks=create_security_hooks(user_id, sdk_cwd=sdk_cwd), # type: ignore[arg-type] + hooks=combined_hooks, # type: ignore[arg-type] cwd=sdk_cwd, max_buffer_size=config.sdk_max_buffer_size, ) @@ -329,8 +337,6 @@ async def stream_chat_completion_sdk( adapter = SDKResponseAdapter(message_id=message_id) adapter.set_task_id(task_id) - # Initialize Langfuse tracing (no-op if not configured) - tracer = TracedSession(session_id, user_id, system_prompt) async with tracer, ClaudeSDKClient(options=options) as client: current_message = message or "" if not current_message and session.messages: diff --git a/autogpt_platform/backend/backend/api/features/chat/sdk/tracing.py b/autogpt_platform/backend/backend/api/features/chat/sdk/tracing.py index d010a3b058..4c453a787d 100644 --- a/autogpt_platform/backend/backend/api/features/chat/sdk/tracing.py +++ b/autogpt_platform/backend/backend/api/features/chat/sdk/tracing.py @@ -325,3 +325,102 @@ async def traced_session( tracer = TracedSession(session_id, user_id, system_prompt) async with tracer: yield tracer + + +def create_tracing_hooks(tracer: TracedSession) -> dict[str, Any]: + """Create SDK hooks for fine-grained Langfuse tracing. + + These hooks capture precise timing for tool executions and failures + that may not be visible in the message stream. + + Designed to be merged with security hooks: + hooks = {**security_hooks, **create_tracing_hooks(tracer)} + + Args: + tracer: The active TracedSession instance + + Returns: + Hooks configuration dict for ClaudeAgentOptions + """ + if not tracer.enabled: + return {} + + try: + from claude_agent_sdk import HookMatcher + from claude_agent_sdk.types import HookContext, HookInput, SyncHookJSONOutput + + async def trace_pre_tool_use( + input_data: HookInput, + tool_use_id: str | None, + context: HookContext, + ) -> SyncHookJSONOutput: + """Record tool start time for accurate duration tracking.""" + _ = context + if not tool_use_id: + return {} + tool_name = str(input_data.get("tool_name", "unknown")) + tool_input = input_data.get("tool_input", {}) + + # Record start time in pending tools + tracer._pending_tools[tool_use_id] = ToolSpan( + tool_call_id=tool_use_id, + tool_name=tool_name, + input=tool_input if isinstance(tool_input, dict) else {}, + ) + return {} + + async def trace_post_tool_use( + input_data: HookInput, + tool_use_id: str | None, + context: HookContext, + ) -> SyncHookJSONOutput: + """Record tool completion for duration calculation.""" + _ = context + if tool_use_id and tool_use_id in tracer._pending_tools: + tracer._pending_tools[tool_use_id].end_time = time.perf_counter() + tracer._pending_tools[tool_use_id].success = True + return {} + + async def trace_post_tool_failure( + input_data: HookInput, + tool_use_id: str | None, + context: HookContext, + ) -> SyncHookJSONOutput: + """Record tool failures for error tracking.""" + _ = context + if tool_use_id and tool_use_id in tracer._pending_tools: + tracer._pending_tools[tool_use_id].end_time = time.perf_counter() + tracer._pending_tools[tool_use_id].success = False + error = input_data.get("error", "Unknown error") + tracer._pending_tools[tool_use_id].output = f"ERROR: {error}" + return {} + + return { + "PreToolUse": [HookMatcher(matcher="*", hooks=[trace_pre_tool_use])], + "PostToolUse": [HookMatcher(matcher="*", hooks=[trace_post_tool_use])], + "PostToolUseFailure": [ + HookMatcher(matcher="*", hooks=[trace_post_tool_failure]) + ], + } + + except ImportError: + logger.debug("[Tracing] SDK not available for hook-based tracing") + return {} + + +def merge_hooks(*hook_dicts: dict[str, Any]) -> dict[str, Any]: + """Merge multiple hook configurations into one. + + Combines hook matchers for the same event type, allowing both + security and tracing hooks to coexist. + + Usage: + combined = merge_hooks(security_hooks, tracing_hooks) + """ + result: dict[str, list[Any]] = {} + for hook_dict in hook_dicts: + for event_name, matchers in hook_dict.items(): + if event_name not in result: + result[event_name] = [] + result[event_name].extend(matchers) + return result