Compare commits

...

11 Commits

Author SHA1 Message Date
Swifty
fed206fdc0 Merge branch 'dev' into swiftyos/improve-langfuse-tracing 2026-01-21 16:40:58 +01:00
Swifty
20f4e14b2d pass tags 2026-01-21 15:03:56 +01:00
Swifty
1aee738f89 Merge branch 'swiftyos/improve-langfuse-tracing' of github.com:Significant-Gravitas/AutoGPT into swiftyos/improve-langfuse-tracing 2026-01-21 15:03:37 +01:00
Swifty
8b983cf2dc pass tags 2026-01-21 15:03:23 +01:00
Swifty
153c3df03a update openapi.jsom 2026-01-21 15:02:55 +01:00
Swifty
999b397a87 Merge branch 'dev' into swiftyos/improve-langfuse-tracing 2026-01-21 14:51:58 +01:00
Swifty
a79d19247c add tags and track prompt version in traces 2026-01-20 15:46:18 +01:00
Swifty
74ed697239 fix typo 2026-01-19 16:28:44 +01:00
Swifty
47b7470c36 fix return type 2026-01-19 16:27:03 +01:00
Swifty
3f6649a892 fixed outputs 2026-01-19 16:00:43 +01:00
Swifty
6572dcdb4f improved langfuse tracing 2026-01-19 13:58:10 +01:00
3 changed files with 73 additions and 7 deletions

View File

@@ -45,6 +45,9 @@ class StreamChatRequest(BaseModel):
message: str
is_user_message: bool = True
context: dict[str, str] | None = None # {url: str, content: str}
tags: list[str] | None = (
None # Custom tags for Langfuse tracing (e.g., experiment names)
)
class CreateSessionResponse(BaseModel):
@@ -229,6 +232,7 @@ async def stream_chat_post(
user_id=user_id,
session=session, # Pass pre-fetched session to avoid double-fetch
context=request.context,
tags=request.tags,
):
yield chunk.to_sse()
# AI SDK protocol termination

View File

@@ -63,7 +63,7 @@ def _is_langfuse_configured() -> bool:
)
async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
async def _build_system_prompt(user_id: str | None) -> tuple[str, Any, Any]:
"""Build the full system prompt including business understanding if available.
Args:
@@ -71,7 +71,7 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
If "default" and this is the user's first session, will use "onboarding" instead.
Returns:
Tuple of (compiled prompt string, Langfuse prompt object for tracing)
Tuple of (compiled prompt string, understanding object, Langfuse prompt object for tracing)
"""
# cache_ttl_seconds=0 disables SDK caching to always get the latest prompt
@@ -91,7 +91,7 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
compiled = prompt.compile(users_information=context)
return compiled, understanding
return compiled, understanding, prompt
async def _generate_session_title(message: str) -> str | None:
@@ -156,6 +156,7 @@ async def stream_chat_completion(
retry_count: int = 0,
session: ChatSession | None = None,
context: dict[str, str] | None = None, # {url: str, content: str}
tags: list[str] | None = None, # Custom tags for Langfuse tracing
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Main entry point for streaming chat completions with database handling.
@@ -265,7 +266,7 @@ async def stream_chat_completion(
asyncio.create_task(_update_title())
# Build system prompt with business understanding
system_prompt, understanding = await _build_system_prompt(user_id)
system_prompt, understanding, langfuse_prompt = await _build_system_prompt(user_id)
# Create Langfuse trace for this LLM call (each call gets its own trace, grouped by session_id)
# Using v3 SDK: start_observation creates a root span, update_trace sets trace-level attributes
@@ -279,10 +280,15 @@ async def stream_chat_completion(
name="user-copilot-request",
input=input,
) as span:
# Merge custom tags with default "copilot" tag
all_tags = ["copilot"]
if tags:
all_tags.extend(tags)
with propagate_attributes(
session_id=session_id,
user_id=user_id,
tags=["copilot"],
tags=all_tags,
metadata={
"users_information": format_understanding_for_prompt(understanding)[
:200
@@ -321,6 +327,7 @@ async def stream_chat_completion(
tools=tools,
system_prompt=system_prompt,
text_block_id=text_block_id,
langfuse_prompt=langfuse_prompt,
):
if isinstance(chunk, StreamTextStart):
@@ -467,6 +474,7 @@ async def stream_chat_completion(
retry_count=retry_count + 1,
session=session,
context=context,
tags=tags,
):
yield chunk
return # Exit after retry to avoid double-saving in finally block
@@ -516,6 +524,7 @@ async def stream_chat_completion(
session=session, # Pass session object to avoid Redis refetch
context=context,
tool_call_response=str(tool_response_messages),
tags=tags,
):
yield chunk
@@ -534,8 +543,8 @@ def _is_retryable_error(error: Exception) -> bool:
return True
if isinstance(error, APIStatusError):
# APIStatusError has a response with status_code
# Retry on 5xx status codes (server errors)
if error.response.status_code >= 500:
# Retry on 5xx status codes (server errors) or 429 (rate limit)
if error.response.status_code >= 500 or error.response.status_code == 429:
return True
if isinstance(error, APIError):
# Retry on overloaded errors or 500 errors (may not have status code)
@@ -550,6 +559,7 @@ async def _stream_chat_chunks(
tools: list[ChatCompletionToolParam],
system_prompt: str | None = None,
text_block_id: str | None = None,
langfuse_prompt: Any | None = None,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""
Pure streaming function for OpenAI chat completions with tool calling.
@@ -561,6 +571,7 @@ async def _stream_chat_chunks(
session: Chat session with conversation history
tools: Available tools for the model
system_prompt: System prompt to prepend to messages
langfuse_prompt: Langfuse prompt object for linking to traces
Yields:
SSE formatted JSON response objects
@@ -594,6 +605,7 @@ async def _stream_chat_chunks(
)
# Create the stream with proper types
# Pass langfuse_prompt to link generation to prompt version in Langfuse
stream = await client.chat.completions.create(
model=model,
messages=messages,
@@ -601,6 +613,7 @@ async def _stream_chat_chunks(
tool_choice="auto",
stream=True,
stream_options={"include_usage": True},
langfuse_prompt=langfuse_prompt, # type: ignore[call-overload]
)
# Variables to accumulate tool calls

View File

@@ -4085,6 +4085,48 @@
}
}
},
"/api/local-media/users/{user_id}/{media_type}/{filename}": {
"get": {
"tags": ["media", "media"],
"summary": "Serve local media file",
"description": "Serve a media file from local storage.\nOnly available when GCS is not configured.",
"operationId": "getMediaServe local media file",
"parameters": [
{
"name": "user_id",
"in": "path",
"required": true,
"schema": { "type": "string", "title": "User Id" }
},
{
"name": "media_type",
"in": "path",
"required": true,
"schema": { "type": "string", "title": "Media Type" }
},
{
"name": "filename",
"in": "path",
"required": true,
"schema": { "type": "string", "title": "Filename" }
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": { "application/json": { "schema": {} } }
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
}
}
}
}
}
},
"/api/oauth/app/{client_id}": {
"get": {
"tags": ["oauth"],
@@ -10187,6 +10229,13 @@
{ "type": "null" }
],
"title": "Context"
},
"tags": {
"anyOf": [
{ "items": { "type": "string" }, "type": "array" },
{ "type": "null" }
],
"title": "Tags"
}
},
"type": "object",