Files
OpenHands/openhands/app_server/event_callback/webhook_router.py
2026-04-28 15:00:50 -06:00

364 lines
13 KiB
Python

"""Event Callback router for OpenHands App Server."""
import asyncio
import importlib
import logging
import pkgutil
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Request, Response, status
from fastapi.security import APIKeyHeader
from jwt import InvalidTokenError
from pydantic import SecretStr
from openhands import tools # type: ignore[attr-defined]
from openhands.agent_server.models import ConversationInfo, Success
from openhands.app_server.app_conversation.app_conversation_info_service import (
AppConversationInfoService,
)
from openhands.app_server.app_conversation.app_conversation_models import (
AppConversationInfo,
ConversationTrigger,
)
from openhands.app_server.config import (
depends_app_conversation_info_service,
depends_event_service,
depends_jwt_service,
get_event_callback_service,
get_global_config,
get_sandbox_service,
)
from openhands.app_server.errors import AuthError
from openhands.app_server.event.event_service import EventService
from openhands.app_server.event_callback.event_callback_models import EventCallback
from openhands.app_server.event_callback.set_title_callback_processor import (
SetTitleCallbackProcessor,
)
from openhands.app_server.integrations.provider import ProviderType
from openhands.app_server.sandbox.sandbox_models import SandboxInfo
from openhands.app_server.services.injector import InjectorState
from openhands.app_server.services.jwt_service import JwtService
from openhands.app_server.user.auth_user_context import AuthUserContext
from openhands.app_server.user.specifiy_user_context import (
ADMIN,
USER_CONTEXT_ATTR,
SpecifyUserContext,
)
from openhands.app_server.user_auth.default_user_auth import DefaultUserAuth
from openhands.app_server.user_auth.user_auth import (
get_for_user as get_user_auth_for_user,
)
from openhands.sdk import ConversationExecutionStatus, Event
from openhands.sdk.event import ConversationStateUpdateEvent
from openhands.server.types import AppMode
router = APIRouter(prefix='/webhooks', tags=['Webhooks'])
event_service_dependency = depends_event_service()
app_conversation_info_service_dependency = depends_app_conversation_info_service()
jwt_dependency = depends_jwt_service()
app_mode = get_global_config().app_mode
_logger = logging.getLogger(__name__)
def merge_conversation_tags(
existing_tags: dict[str, str] | None,
incoming_tags: dict[str, str] | None,
) -> dict[str, str]:
"""Merge conversation tags with incoming tags overriding existing ones.
Args:
existing_tags: Tags from the existing conversation (may be None)
incoming_tags: Tags from the incoming update (may be None)
Returns:
Merged tags dict (empty dict if both inputs are None/empty)
"""
existing = existing_tags or {}
incoming = incoming_tags or {}
return {**existing, **incoming}
def detect_automation_trigger(
current_trigger: ConversationTrigger | None,
merged_tags: dict[str, str],
conversation_id: str | None = None,
sandbox_id: str | None = None,
) -> ConversationTrigger | None:
"""Detect if conversation should have AUTOMATION trigger based on tags.
Only sets AUTOMATION trigger if:
- Current trigger is None (don't override existing trigger)
- Tags contain 'automationtrigger', 'automationid', or 'automationrunid' key
Args:
current_trigger: The existing trigger value (may be None)
merged_tags: Merged tags dict to inspect
conversation_id: Optional conversation ID for logging
sandbox_id: Optional sandbox ID for logging
Returns:
ConversationTrigger.AUTOMATION if detected, otherwise current_trigger
"""
if current_trigger is not None:
return current_trigger
if merged_tags and (
merged_tags.get('automationtrigger')
or merged_tags.get('automationid')
or merged_tags.get('automationrunid')
):
_logger.info(
'Detected automation trigger from conversation tags',
extra={
'conversation_id': conversation_id,
'sandbox_id': sandbox_id,
'automationtrigger': merged_tags.get('automationtrigger'),
'automationid': merged_tags.get('automationid'),
'automationrunid': merged_tags.get('automationrunid'),
},
)
return ConversationTrigger.AUTOMATION
return None
async def valid_sandbox(
request: Request,
session_api_key: str = Depends(
APIKeyHeader(name='X-Session-API-Key', auto_error=False)
),
) -> SandboxInfo:
"""Use a session api key for validation, and get a sandbox. Subsequent actions
are executed in the context of the owner of the sandbox"""
if not session_api_key:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED, detail='X-Session-API-Key header is required'
)
# Create a state which will be used internally only for this operation
state = InjectorState()
# Since we need access to all sandboxes, this is executed in the context of the admin.
setattr(state, USER_CONTEXT_ATTR, ADMIN)
async with get_sandbox_service(state) as sandbox_service:
sandbox_info = await sandbox_service.get_sandbox_by_session_api_key(
session_api_key
)
if sandbox_info is None:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED, detail='Invalid session API key'
)
# In SAAS Mode there is always a user, so we set the owner of the sandbox
# as the current user (Validated by the session_api_key they provided)
if sandbox_info.created_by_user_id:
setattr(
request.state,
USER_CONTEXT_ATTR,
SpecifyUserContext(sandbox_info.created_by_user_id),
)
elif app_mode == AppMode.SAAS:
_logger.error(
'Sandbox had no user specified', extra={'sandbox_id': sandbox_info.id}
)
raise HTTPException(
status.HTTP_401_UNAUTHORIZED, detail='Sandbox had no user specified'
)
return sandbox_info
async def valid_conversation(
conversation_id: UUID,
sandbox_info: SandboxInfo = Depends(valid_sandbox),
app_conversation_info_service: AppConversationInfoService = app_conversation_info_service_dependency,
) -> AppConversationInfo:
app_conversation_info = (
await app_conversation_info_service.get_app_conversation_info(conversation_id)
)
if not app_conversation_info:
# Conversation does not yet exist - create a stub
return AppConversationInfo(
id=conversation_id,
sandbox_id=sandbox_info.id,
created_by_user_id=sandbox_info.created_by_user_id,
)
# Sanity check - Make sure that the conversation and sandbox were created by the same user
if app_conversation_info.created_by_user_id != sandbox_info.created_by_user_id:
raise AuthError()
return app_conversation_info
@router.post('/conversations')
async def on_conversation_update(
conversation_info: ConversationInfo,
sandbox_info: SandboxInfo = Depends(valid_sandbox),
app_conversation_info_service: AppConversationInfoService = app_conversation_info_service_dependency,
) -> Success:
"""Webhook callback for when a conversation starts, pauses, resumes, or deletes."""
existing = await valid_conversation(
conversation_info.id, sandbox_info, app_conversation_info_service
)
# If the conversation is being deleted, no action is required...
# Later we may consider deleting the conversation if it exists...
if conversation_info.execution_status == ConversationExecutionStatus.DELETING:
return Success()
# Detect if this is a new conversation (stub has title=None)
is_new_conversation = existing.title is None
# Merge tags from incoming conversation info
# SDK can set tags via Conversation(tags=...) which includes automation context
merged_tags = merge_conversation_tags(existing.tags, conversation_info.tags)
# Determine trigger - check if tags indicate automation, then fall back to existing
trigger = detect_automation_trigger(
existing.trigger,
merged_tags,
conversation_id=str(conversation_info.id),
sandbox_id=sandbox_info.id,
)
app_conversation_info = AppConversationInfo(
id=conversation_info.id,
title=existing.title or f'Conversation {conversation_info.id.hex}',
sandbox_id=sandbox_info.id,
created_by_user_id=sandbox_info.created_by_user_id,
llm_model=conversation_info.agent.llm.model,
# Git parameters
selected_repository=existing.selected_repository,
selected_branch=existing.selected_branch,
git_provider=existing.git_provider,
trigger=trigger,
pr_number=existing.pr_number,
# Preserve parent/child relationship and other metadata
parent_conversation_id=existing.parent_conversation_id,
metrics=conversation_info.stats.get_combined_metrics(),
# Store merged tags (includes automation context, skills, etc.)
tags=merged_tags,
)
await app_conversation_info_service.save_app_conversation_info(
app_conversation_info
)
# Register SetTitleCallbackProcessor for new conversations created via webhook.
# This enables auto-titling for conversations created directly on the agent-server
# (e.g., automation runs) that notify the app-server via webhook.
if is_new_conversation:
state = InjectorState()
setattr(
state,
USER_CONTEXT_ATTR,
SpecifyUserContext(sandbox_info.created_by_user_id),
)
async with get_event_callback_service(state) as event_callback_service:
await event_callback_service.save_event_callback(
EventCallback(
conversation_id=conversation_info.id,
processor=SetTitleCallbackProcessor(),
)
)
return Success()
@router.post('/events/{conversation_id}')
async def on_event(
events: list[Event],
conversation_id: UUID,
app_conversation_info: AppConversationInfo = Depends(valid_conversation),
app_conversation_info_service: AppConversationInfoService = app_conversation_info_service_dependency,
event_service: EventService = event_service_dependency,
) -> Success:
"""Webhook callback for when event stream events occur."""
try:
# Save events...
await asyncio.gather(
*[event_service.save_event(conversation_id, event) for event in events]
)
# Process stats events for V1 conversations
for event in events:
if isinstance(event, ConversationStateUpdateEvent) and event.key == 'stats':
await app_conversation_info_service.process_stats_event(
event, conversation_id
)
asyncio.create_task(
_run_callbacks_in_bg_and_close(
conversation_id, app_conversation_info.created_by_user_id, events
)
)
except Exception:
_logger.exception('Error in webhook', stack_info=True)
return Success()
@router.get('/secrets')
async def get_secret(
access_token: str = Depends(APIKeyHeader(name='X-Access-Token', auto_error=False)),
jwt_service: JwtService = jwt_dependency,
) -> Response:
"""Given an access token, retrieve a user secret. The access token
is limited by user and provider type, and may include a timeout, limiting
the damage in the event that a token is ever leaked"""
try:
payload = jwt_service.verify_jws_token(access_token)
user_id = payload['user_id']
provider_type = ProviderType(payload['provider_type'])
# Get UserAuth for the user_id
if user_id:
user_auth = await get_user_auth_for_user(user_id)
else:
# OpenHands (OSS mode) - use default user auth
user_auth = DefaultUserAuth()
# Create UserContext directly
user_context = AuthUserContext(user_auth=user_auth)
secret = await user_context.get_latest_token(provider_type)
if secret is None:
raise HTTPException(404, 'No such provider')
if isinstance(secret, SecretStr):
secret_value = secret.get_secret_value()
else:
secret_value = secret
return Response(content=secret_value, media_type='text/plain')
except InvalidTokenError:
raise HTTPException(status.HTTP_401_UNAUTHORIZED)
async def _run_callbacks_in_bg_and_close(
conversation_id: UUID,
user_id: str | None,
events: list[Event],
):
"""Run all callbacks and close the session"""
state = InjectorState()
setattr(state, USER_CONTEXT_ATTR, SpecifyUserContext(user_id=user_id))
async with get_event_callback_service(state) as event_callback_service:
# We don't use asynio.gather here because callbacks must be run in sequence.
for event in events:
await event_callback_service.execute_callbacks(conversation_id, event)
def _import_all_tools():
"""We need to import all tools so that they are available for deserialization in webhooks."""
for _, name, is_pkg in pkgutil.walk_packages(tools.__path__, tools.__name__ + '.'):
if is_pkg: # Check if it's a subpackage
try:
importlib.import_module(name)
except ImportError as e:
_logger.error(f"Warning: Could not import subpackage '{name}': {e}")
_import_all_tools()