Fix V1 MCP services (Fix tavily search) (#11840)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Tim O'Farrell
2025-12-01 14:19:19 -07:00
committed by GitHub
parent 33eec7cb09
commit fd6e0cab3f
13 changed files with 1031 additions and 82 deletions

View File

@@ -4,12 +4,12 @@ from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta
from time import time
from typing import AsyncGenerator, Sequence
from typing import Any, AsyncGenerator, Sequence
from uuid import UUID, uuid4
import httpx
from fastapi import Request
from pydantic import Field, TypeAdapter
from pydantic import Field, SecretStr, TypeAdapter
from openhands.agent_server.models import (
ConversationInfo,
@@ -63,19 +63,27 @@ from openhands.app_server.sandbox.sandbox_spec_service import SandboxSpecService
from openhands.app_server.services.injector import InjectorState
from openhands.app_server.services.jwt_service import JwtService
from openhands.app_server.user.user_context import UserContext
from openhands.app_server.user.user_models import UserInfo
from openhands.app_server.utils.docker_utils import (
replace_localhost_hostname_for_docker,
)
from openhands.experiments.experiment_manager import ExperimentManagerImpl
from openhands.integrations.provider import ProviderType
from openhands.sdk import AgentContext, LocalWorkspace
from openhands.sdk import Agent, AgentContext, LocalWorkspace
from openhands.sdk.conversation.secret_source import LookupSecret, StaticSecret
from openhands.sdk.llm import LLM
from openhands.sdk.security.confirmation_policy import AlwaysConfirm
from openhands.sdk.workspace.remote.async_remote_workspace import AsyncRemoteWorkspace
from openhands.server.types import AppMode
from openhands.tools.preset.default import get_default_agent
from openhands.tools.preset.planning import get_planning_agent
from openhands.tools.preset.default import (
get_default_condenser,
get_default_tools,
)
from openhands.tools.preset.planning import (
format_plan_structure,
get_planning_condenser,
get_planning_tools,
)
_conversation_info_type_adapter = TypeAdapter(list[ConversationInfo | None])
_logger = logging.getLogger(__name__)
@@ -99,6 +107,7 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
access_token_hard_timeout: timedelta | None
app_mode: str | None = None
keycloak_auth_cookie: str | None = None
tavily_api_key: str | None = None
async def search_app_conversations(
self,
@@ -519,6 +528,223 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
if not request.llm_model and parent_info.llm_model:
request.llm_model = parent_info.llm_model
async def _setup_secrets_for_git_provider(
self, git_provider: ProviderType | None, user: UserInfo
) -> dict:
"""Set up secrets for git provider authentication.
Args:
git_provider: The git provider type (GitHub, GitLab, etc.)
user: User information containing authentication details
Returns:
Dictionary of secrets for the conversation
"""
secrets = await self.user_context.get_secrets()
if not git_provider:
return secrets
secret_name = f'{git_provider.name}_TOKEN'
if self.web_url:
# Create an access token for web-based authentication
access_token = self.jwt_service.create_jws_token(
payload={
'user_id': user.id,
'provider_type': git_provider.value,
},
expires_in=self.access_token_hard_timeout,
)
headers = {'X-Access-Token': access_token}
# Include keycloak_auth cookie in headers if app_mode is SaaS
if self.app_mode == 'saas' and self.keycloak_auth_cookie:
headers['Cookie'] = f'keycloak_auth={self.keycloak_auth_cookie}'
secrets[secret_name] = LookupSecret(
url=self.web_url + '/api/v1/webhooks/secrets',
headers=headers,
)
else:
# Use static token for environments without web URL access
static_token = await self.user_context.get_latest_token(git_provider)
if static_token:
secrets[secret_name] = StaticSecret(value=static_token)
return secrets
async def _configure_llm_and_mcp(
self, user: UserInfo, llm_model: str | None
) -> tuple[LLM, dict]:
"""Configure LLM and MCP (Model Context Protocol) settings.
Args:
user: User information containing LLM preferences
llm_model: Optional specific model to use, falls back to user default
Returns:
Tuple of (configured LLM instance, MCP config dictionary)
"""
# Configure LLM
model = llm_model or user.llm_model
llm = LLM(
model=model,
base_url=user.llm_base_url,
api_key=user.llm_api_key,
usage_id='agent',
)
# Configure MCP
mcp_config: dict[str, Any] = {}
if self.web_url:
mcp_url = f'{self.web_url}/mcp/mcp'
mcp_config = {
'default': {
'url': mcp_url,
}
}
# Add API key if available
mcp_api_key = await self.user_context.get_mcp_api_key()
if mcp_api_key:
mcp_config['default']['headers'] = {
'X-Session-API-Key': mcp_api_key,
}
# Get the actual API key values, prioritizing user's key over service key
user_search_key = None
if user.search_api_key:
key_value = user.search_api_key.get_secret_value()
if key_value and key_value.strip():
user_search_key = key_value
service_tavily_key = None
if self.tavily_api_key:
# tavily_api_key is already a string (extracted in the factory method)
if self.tavily_api_key.strip():
service_tavily_key = self.tavily_api_key
tavily_api_key = user_search_key or service_tavily_key
if tavily_api_key:
_logger.info('Adding search engine to MCP config')
mcp_config['tavily'] = {
'url': f'https://mcp.tavily.com/mcp/?tavilyApiKey={tavily_api_key}'
}
else:
_logger.info('No search engine API key found, skipping search engine')
return llm, mcp_config
def _create_agent_with_context(
self,
llm: LLM,
agent_type: AgentType,
system_message_suffix: str | None,
mcp_config: dict,
) -> Agent:
"""Create an agent with appropriate tools and context based on agent type.
Args:
llm: Configured LLM instance
agent_type: Type of agent to create (PLAN or DEFAULT)
system_message_suffix: Optional suffix for system messages
mcp_config: MCP configuration dictionary
Returns:
Configured Agent instance with context
"""
# Create agent based on type
if agent_type == AgentType.PLAN:
agent = Agent(
llm=llm,
tools=get_planning_tools(),
system_prompt_filename='system_prompt_planning.j2',
system_prompt_kwargs={'plan_structure': format_plan_structure()},
condenser=get_planning_condenser(
llm=llm.model_copy(update={'usage_id': 'planning_condenser'})
),
security_analyzer=None,
mcp_config=mcp_config,
)
else:
agent = Agent(
llm=llm,
tools=get_default_tools(enable_browser=True),
system_prompt_kwargs={'cli_mode': False},
condenser=get_default_condenser(
llm=llm.model_copy(update={'usage_id': 'condenser'})
),
mcp_config=mcp_config,
)
# Add agent context
agent_context = AgentContext(system_message_suffix=system_message_suffix)
agent = agent.model_copy(update={'agent_context': agent_context})
return agent
async def _finalize_conversation_request(
self,
agent: Agent,
conversation_id: UUID | None,
user: UserInfo,
workspace: LocalWorkspace,
initial_message: SendMessageRequest | None,
secrets: dict,
sandbox: SandboxInfo,
remote_workspace: AsyncRemoteWorkspace | None,
selected_repository: str | None,
working_dir: str,
) -> StartConversationRequest:
"""Finalize the conversation request with experiment variants and skills.
Args:
agent: The configured agent
conversation_id: Optional conversation ID, generates new one if None
user: User information
workspace: Local workspace instance
initial_message: Optional initial message for the conversation
secrets: Dictionary of secrets for authentication
sandbox: Sandbox information
remote_workspace: Optional remote workspace for skills loading
selected_repository: Optional repository name
working_dir: Working directory path
Returns:
Complete StartConversationRequest ready for use
"""
# Generate conversation ID if not provided
conversation_id = conversation_id or uuid4()
# Apply experiment variants
agent = ExperimentManagerImpl.run_agent_variant_tests__v1(
user.id, conversation_id, agent
)
# Load and merge skills if remote workspace is available
if remote_workspace:
try:
agent = await self._load_skills_and_update_agent(
sandbox, agent, remote_workspace, selected_repository, working_dir
)
except Exception as e:
_logger.warning(f'Failed to load skills: {e}', exc_info=True)
# Continue without skills - don't fail conversation startup
# Create and return the final request
return StartConversationRequest(
conversation_id=conversation_id,
agent=agent,
workspace=workspace,
confirmation_policy=(
AlwaysConfirm() if user.confirmation_mode else NeverConfirm()
),
initial_message=initial_message,
secrets=secrets,
)
async def _build_start_conversation_request_for_user(
self,
sandbox: SandboxInfo,
@@ -532,87 +758,41 @@ class LiveStatusAppConversationService(AppConversationServiceBase):
remote_workspace: AsyncRemoteWorkspace | None = None,
selected_repository: str | None = None,
) -> StartConversationRequest:
"""Build a complete conversation request for a user.
This method orchestrates the creation of a conversation request by:
1. Setting up git provider secrets
2. Configuring LLM and MCP settings
3. Creating an agent with appropriate context
4. Finalizing the request with skills and experiment variants
"""
user = await self.user_context.get_user_info()
# Set up a secret for the git token
secrets = await self.user_context.get_secrets()
if git_provider:
secret_name = f'{git_provider.name}_TOKEN'
if self.web_url:
# If there is a web url, then we create an access token to access it.
# For security reasons, we are explicit here - only this user, and
# only this provider, with a timeout
access_token = self.jwt_service.create_jws_token(
payload={
'user_id': user.id,
'provider_type': git_provider.value,
},
expires_in=self.access_token_hard_timeout,
)
headers = {'X-Access-Token': access_token}
# Include keycloak_auth cookie in headers if app_mode is SaaS
if self.app_mode == 'saas' and self.keycloak_auth_cookie:
headers['Cookie'] = f'keycloak_auth={self.keycloak_auth_cookie}'
secrets[secret_name] = LookupSecret(
url=self.web_url + '/api/v1/webhooks/secrets',
headers=headers,
)
else:
# If there is no URL specified where the sandbox can access the app server
# then we supply a static secret with the most recent value. Depending
# on the type, this may eventually expire.
static_token = await self.user_context.get_latest_token(git_provider)
if static_token:
secrets[secret_name] = StaticSecret(value=static_token)
workspace = LocalWorkspace(working_dir=working_dir)
# Use provided llm_model if available, otherwise fall back to user's default
model = llm_model or user.llm_model
llm = LLM(
model=model,
base_url=user.llm_base_url,
api_key=user.llm_api_key,
usage_id='agent',
)
# The agent gets passed initial instructions
# Select agent based on agent_type
if agent_type == AgentType.PLAN:
agent = get_planning_agent(llm=llm)
else:
agent = get_default_agent(llm=llm)
# Set up secrets for git provider
secrets = await self._setup_secrets_for_git_provider(git_provider, user)
agent_context = AgentContext(system_message_suffix=system_message_suffix)
agent = agent.model_copy(update={'agent_context': agent_context})
# Configure LLM and MCP
llm, mcp_config = await self._configure_llm_and_mcp(user, llm_model)
conversation_id = conversation_id or uuid4()
agent = ExperimentManagerImpl.run_agent_variant_tests__v1(
user.id, conversation_id, agent
# Create agent with context
agent = self._create_agent_with_context(
llm, agent_type, system_message_suffix, mcp_config
)
# Load and merge all skills if remote_workspace is available
if remote_workspace:
try:
agent = await self._load_skills_and_update_agent(
sandbox, agent, remote_workspace, selected_repository, working_dir
)
except Exception as e:
_logger.warning(f'Failed to load skills: {e}', exc_info=True)
# Continue without skills - don't fail conversation startup
start_conversation_request = StartConversationRequest(
conversation_id=conversation_id,
agent=agent,
workspace=workspace,
confirmation_policy=(
AlwaysConfirm() if user.confirmation_mode else NeverConfirm()
),
initial_message=initial_message,
secrets=secrets,
# Finalize and return the conversation request
return await self._finalize_conversation_request(
agent,
conversation_id,
user,
workspace,
initial_message,
secrets,
sandbox,
remote_workspace,
selected_repository,
working_dir,
)
return start_conversation_request
async def update_agent_server_conversation_title(
self,
@@ -817,6 +997,10 @@ class LiveStatusAppConversationServiceInjector(AppConversationServiceInjector):
'be retrieved by a sandboxed conversation.'
),
)
tavily_api_key: SecretStr | None = Field(
default=None,
description='The Tavily Search API key to add to MCP integration',
)
async def inject(
self, state: InjectorState, request: Request | None = None
@@ -874,6 +1058,14 @@ class LiveStatusAppConversationServiceInjector(AppConversationServiceInjector):
# If server_config is not available (e.g., in tests), continue without it
pass
# We supply the global tavily key only if the app mode is not SAAS, where
# currently the search endpoints are patched into the app server instead
# so the tavily key does not need to be shared
if self.tavily_api_key and app_mode != AppMode.SAAS:
tavily_api_key = self.tavily_api_key.get_secret_value()
else:
tavily_api_key = None
yield LiveStatusAppConversationService(
init_git_in_empty_workspace=self.init_git_in_empty_workspace,
user_context=user_context,
@@ -890,4 +1082,5 @@ class LiveStatusAppConversationServiceInjector(AppConversationServiceInjector):
access_token_hard_timeout=access_token_hard_timeout,
app_mode=app_mode,
keycloak_auth_cookie=keycloak_auth_cookie,
tavily_api_key=tavily_api_key,
)

View File

@@ -6,7 +6,7 @@ from typing import AsyncContextManager
import httpx
from fastapi import Depends, Request
from pydantic import Field
from pydantic import Field, SecretStr
from sqlalchemy.ext.asyncio import AsyncSession
# Import the event_callback module to ensure all processors are registered
@@ -185,7 +185,13 @@ def config_from_env() -> AppServerConfig:
)
if config.app_conversation is None:
config.app_conversation = LiveStatusAppConversationServiceInjector()
tavily_api_key = None
tavily_api_key_str = os.getenv('TAVILY_API_KEY') or os.getenv('SEARCH_API_KEY')
if tavily_api_key_str:
tavily_api_key = SecretStr(tavily_api_key_str)
config.app_conversation = LiveStatusAppConversationServiceInjector(
tavily_api_key=tavily_api_key
)
if config.user is None:
config.user = AuthUserContextInjector()

View File

@@ -78,6 +78,10 @@ class AuthUserContext(UserContext):
return results
async def get_mcp_api_key(self) -> str | None:
mcp_api_key = await self.user_auth.get_mcp_api_key()
return mcp_api_key
USER_ID_ATTR = 'user_id'

View File

@@ -30,6 +30,9 @@ class SpecifyUserContext(UserContext):
async def get_secrets(self) -> dict[str, SecretSource]:
raise NotImplementedError()
async def get_mcp_api_key(self) -> str | None:
raise NotImplementedError()
USER_CONTEXT_ATTR = 'user_context'
ADMIN = SpecifyUserContext(user_id=None)

View File

@@ -34,6 +34,10 @@ class UserContext(ABC):
async def get_secrets(self) -> dict[str, SecretSource]:
"""Get custom secrets and github provider secrets for the conversation."""
@abstractmethod
async def get_mcp_api_key(self) -> str | None:
"""Get an MCP API Key."""
class UserContextInjector(DiscriminatedUnionMixin, Injector[UserContext], ABC):
"""Injector for user contexts."""

View File

@@ -88,6 +88,9 @@ class DefaultUserAuth(UserAuth):
return None
return user_secrets.provider_tokens
async def get_mcp_api_key(self) -> str | None:
return None
@classmethod
async def get_instance(cls, request: Request) -> UserAuth:
user_auth = DefaultUserAuth()

View File

@@ -75,6 +75,10 @@ class UserAuth(ABC):
def get_auth_type(self) -> AuthType | None:
return None
@abstractmethod
async def get_mcp_api_key(self) -> str | None:
"""Get an mcp api key for the user"""
@classmethod
@abstractmethod
async def get_instance(cls, request: Request) -> UserAuth: