mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-04-29 03:00:45 -04:00
Compare commits
9 Commits
clean-up
...
feature/te
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9c1484ecf3 | ||
|
|
33c80d816e | ||
|
|
c164063d19 | ||
|
|
30f09f4aec | ||
|
|
cb87340cb8 | ||
|
|
15886acc3f | ||
|
|
c64eef19df | ||
|
|
be0bb3f388 | ||
|
|
c020268f5b |
0
.devcontainer/setup.sh
Executable file → Normal file
0
.devcontainer/setup.sh
Executable file → Normal file
@@ -109,14 +109,6 @@ OpenHands requires an API key to access most language models. Here's how to get
|
||||
|
||||
</Accordion>
|
||||
|
||||
<Accordion title="Google (Gemini)">
|
||||
|
||||
1. Create a Google account if you don't already have one.
|
||||
2. [Generate an API key](https://aistudio.google.com/apikey).
|
||||
3. [Set up billing](https://aistudio.google.com/usage?tab=billing).
|
||||
|
||||
</Accordion>
|
||||
|
||||
</AccordionGroup>
|
||||
|
||||
Consider setting usage limits to control costs.
|
||||
|
||||
107
docs/usage/team-cli.mdx
Normal file
107
docs/usage/team-cli.mdx
Normal file
@@ -0,0 +1,107 @@
|
||||
---
|
||||
title: Team CLI
|
||||
---
|
||||
|
||||
# OpenHands Team CLI
|
||||
|
||||
The Team CLI provides a command-line interface for interacting with the OpenHands HTTP and WebSocket APIs. It allows you to create conversations, list existing conversations, and join conversations to interact with the agent.
|
||||
|
||||
## Getting Started
|
||||
|
||||
To use the Team CLI, you need to have OpenHands installed. You can then use the `team` command to access the Team CLI:
|
||||
|
||||
```bash
|
||||
openhands team [command] [options]
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
The Team CLI uses the following environment variables for configuration:
|
||||
|
||||
- `OPENHANDS_API_URL`: The base URL for the OpenHands API (default: `https://staging.all-hands.dev`)
|
||||
- `OPENHANDS_API_KEY`: The API key for authentication (if required)
|
||||
|
||||
You can also specify these values using command-line options:
|
||||
|
||||
```bash
|
||||
openhands team --url https://app.all-hands.dev --api-key your-api-key [command] [options]
|
||||
```
|
||||
|
||||
## Commands
|
||||
|
||||
### List Conversations
|
||||
|
||||
List all available conversations:
|
||||
|
||||
```bash
|
||||
openhands team list [options]
|
||||
```
|
||||
|
||||
Options:
|
||||
- `-l, --limit`: Maximum number of conversations to list (default: 20)
|
||||
|
||||
### Create a Conversation
|
||||
|
||||
Create a new conversation:
|
||||
|
||||
```bash
|
||||
openhands team create [options]
|
||||
```
|
||||
|
||||
Options:
|
||||
- `-r, --repository`: Repository name (format: owner/repo)
|
||||
- `-g, --git-provider`: Git provider (github or gitlab)
|
||||
- `-b, --branch`: Branch name
|
||||
- `-m, --message`: Initial user message
|
||||
- `-i, --instructions`: Conversation instructions
|
||||
- `-j, --join`: Join the conversation after creation
|
||||
|
||||
### Join a Conversation
|
||||
|
||||
Join an existing conversation:
|
||||
|
||||
```bash
|
||||
openhands team join [conversation_id]
|
||||
```
|
||||
|
||||
## Examples
|
||||
|
||||
List all conversations:
|
||||
|
||||
```bash
|
||||
openhands team list
|
||||
```
|
||||
|
||||
Create a new conversation with a GitHub repository:
|
||||
|
||||
```bash
|
||||
openhands team create -r All-Hands-AI/OpenHands -m "Help me understand the codebase"
|
||||
```
|
||||
|
||||
Create a conversation and join it immediately:
|
||||
|
||||
```bash
|
||||
openhands team create -m "Let's build a web app" -j
|
||||
```
|
||||
|
||||
Join an existing conversation:
|
||||
|
||||
```bash
|
||||
openhands team join abc123def456
|
||||
```
|
||||
|
||||
## Using with a Remote Server
|
||||
|
||||
To use the Team CLI with a remote OpenHands server:
|
||||
|
||||
```bash
|
||||
export OPENHANDS_API_URL="https://app.all-hands.dev"
|
||||
export OPENHANDS_API_KEY="your-api-key"
|
||||
openhands team list
|
||||
```
|
||||
|
||||
Or specify the URL and API key directly:
|
||||
|
||||
```bash
|
||||
openhands team --url https://app.all-hands.dev --api-key your-api-key list
|
||||
```
|
||||
@@ -17,10 +17,6 @@ export const useActiveConversation = () => {
|
||||
useEffect(() => {
|
||||
const conversation = userConversation.data;
|
||||
OpenHands.setCurrentConversation(conversation || null);
|
||||
}, [
|
||||
conversationId,
|
||||
userConversation.isFetched,
|
||||
userConversation?.data?.status,
|
||||
]);
|
||||
}, [conversationId, userConversation.isFetched]);
|
||||
return userConversation;
|
||||
};
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from prompt_toolkit import print_formatted_text
|
||||
from prompt_toolkit.formatted_text import HTML
|
||||
@@ -358,7 +357,7 @@ async def main_with_loop(loop: asyncio.AbstractEventLoop) -> None:
|
||||
|
||||
# Load settings from Settings Store
|
||||
# TODO: Make this generic?
|
||||
settings_store = await FileSettingsStore.get_instance(config=config)
|
||||
settings_store = await FileSettingsStore.get_instance(config=config, user_id=None)
|
||||
settings = await settings_store.load()
|
||||
|
||||
# Track if we've shown the banner during setup
|
||||
@@ -454,6 +453,37 @@ async def main_with_loop(loop: asyncio.AbstractEventLoop) -> None:
|
||||
|
||||
|
||||
def main():
|
||||
args = parse_arguments()
|
||||
|
||||
# Check if team command is used
|
||||
if hasattr(args, 'command') and args.command == 'team':
|
||||
# Import and run the team CLI directly
|
||||
import sys
|
||||
|
||||
from openhands.cli.team import main as team_main
|
||||
|
||||
# Get arguments after 'team'
|
||||
team_args = []
|
||||
if len(sys.argv) > 2:
|
||||
# Pass all arguments after 'team'
|
||||
team_args = sys.argv[2:]
|
||||
|
||||
if not team_args:
|
||||
# If no additional arguments, show help message
|
||||
print('OpenHands Team CLI')
|
||||
print('=================')
|
||||
print('To use the team CLI, run one of the following commands:')
|
||||
print(' openhands team list - List all conversations')
|
||||
print(' openhands team create - Create a new conversation')
|
||||
print(' openhands team join <id> - Join an existing conversation')
|
||||
print()
|
||||
print("For more information, run 'openhands team --help'")
|
||||
return
|
||||
|
||||
# Run the team CLI with the arguments
|
||||
team_main(team_args)
|
||||
return
|
||||
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
try:
|
||||
|
||||
549
openhands/cli/team.py
Normal file
549
openhands/cli/team.py
Normal file
@@ -0,0 +1,549 @@
|
||||
"""Team CLI interface for OpenHands.
|
||||
|
||||
This module provides a CLI interface for interacting with the OpenHands HTTP and WebSocket APIs.
|
||||
It allows creating conversations and showing the current list of conversations/statuses.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from typing import Any, Optional
|
||||
|
||||
import aiohttp
|
||||
import socketio
|
||||
from prompt_toolkit import print_formatted_text
|
||||
from prompt_toolkit.formatted_text import HTML
|
||||
from prompt_toolkit.shortcuts import clear
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
from openhands.cli.tui import (
|
||||
display_banner,
|
||||
display_event,
|
||||
display_welcome_message,
|
||||
read_prompt_input,
|
||||
)
|
||||
from openhands.core.schema import AgentState
|
||||
from openhands.events.action import MessageAction
|
||||
from openhands.events.serialization import event_from_dict, event_to_dict
|
||||
|
||||
|
||||
class TeamClient:
|
||||
"""Client for interacting with the OpenHands HTTP and WebSocket APIs."""
|
||||
|
||||
def __init__(self, base_url: str, api_key: Optional[str] = None):
|
||||
"""Initialize the TeamClient.
|
||||
|
||||
Args:
|
||||
base_url: The base URL for the OpenHands API.
|
||||
api_key: Optional API key for authentication.
|
||||
"""
|
||||
self.base_url = base_url.rstrip('/')
|
||||
self.api_key = api_key
|
||||
self.sio = socketio.AsyncClient()
|
||||
self.console = Console()
|
||||
self.headers = {}
|
||||
if api_key:
|
||||
self.headers['Authorization'] = f'Bearer {api_key}'
|
||||
|
||||
async def list_conversations(self, limit: int = 20) -> list[dict[str, Any]]:
|
||||
"""List conversations.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of conversations to return.
|
||||
|
||||
Returns:
|
||||
List of conversation objects.
|
||||
"""
|
||||
async with aiohttp.ClientSession(headers=self.headers) as session:
|
||||
async with session.get(
|
||||
f'{self.base_url}/api/conversations?limit={limit}'
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
raise Exception(f'Failed to list conversations: {error_text}')
|
||||
data = await response.json()
|
||||
return data.get('results', [])
|
||||
|
||||
async def create_conversation(
|
||||
self,
|
||||
repository: Optional[str] = None,
|
||||
git_provider: Optional[str] = None,
|
||||
selected_branch: Optional[str] = None,
|
||||
initial_user_msg: Optional[str] = None,
|
||||
conversation_instructions: Optional[str] = None,
|
||||
) -> str:
|
||||
"""Create a new conversation.
|
||||
|
||||
Args:
|
||||
repository: Optional repository name (owner/repo).
|
||||
git_provider: Optional git provider (github or gitlab).
|
||||
selected_branch: Optional branch name.
|
||||
initial_user_msg: Optional initial user message.
|
||||
conversation_instructions: Optional conversation instructions.
|
||||
|
||||
Returns:
|
||||
The conversation ID.
|
||||
"""
|
||||
payload = {
|
||||
'repository': repository,
|
||||
'git_provider': git_provider,
|
||||
'selected_branch': selected_branch,
|
||||
'initial_user_msg': initial_user_msg,
|
||||
'conversation_instructions': conversation_instructions,
|
||||
}
|
||||
# Remove None values
|
||||
payload = {k: v for k, v in payload.items() if v is not None}
|
||||
|
||||
async with aiohttp.ClientSession(headers=self.headers) as session:
|
||||
async with session.post(
|
||||
f'{self.base_url}/api/conversations', json=payload
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
raise Exception(f'Failed to create conversation: {error_text}')
|
||||
data = await response.json()
|
||||
return data.get('conversation_id')
|
||||
|
||||
async def get_conversation(self, conversation_id: str) -> dict[str, Any]:
|
||||
"""Get conversation details.
|
||||
|
||||
Args:
|
||||
conversation_id: The conversation ID.
|
||||
|
||||
Returns:
|
||||
Conversation details.
|
||||
"""
|
||||
async with aiohttp.ClientSession(headers=self.headers) as session:
|
||||
async with session.get(
|
||||
f'{self.base_url}/api/conversations/{conversation_id}'
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
raise Exception(f'Failed to get conversation: {error_text}')
|
||||
return await response.json()
|
||||
|
||||
async def connect_to_conversation(
|
||||
self, conversation_id: str, latest_event_id: int = -1
|
||||
) -> None:
|
||||
"""Connect to a conversation via WebSocket.
|
||||
|
||||
Args:
|
||||
conversation_id: The conversation ID.
|
||||
latest_event_id: The latest event ID to start from.
|
||||
"""
|
||||
|
||||
# Set up event handlers
|
||||
@self.sio.event
|
||||
async def connect():
|
||||
self.console.print('[green]Connected to conversation[/green]')
|
||||
|
||||
@self.sio.event
|
||||
async def disconnect():
|
||||
self.console.print('[yellow]Disconnected from conversation[/yellow]')
|
||||
|
||||
@self.sio.event
|
||||
async def oh_event(data):
|
||||
event = event_from_dict(data)
|
||||
# Create a dummy config object to satisfy the type checker
|
||||
from openhands.core.config import OpenHandsConfig
|
||||
|
||||
dummy_config = OpenHandsConfig()
|
||||
display_event(event, dummy_config)
|
||||
|
||||
# Connect to the WebSocket
|
||||
query = {
|
||||
'conversation_id': conversation_id,
|
||||
'latest_event_id': str(latest_event_id),
|
||||
}
|
||||
if self.api_key:
|
||||
query['session_api_key'] = self.api_key
|
||||
|
||||
await self.sio.connect(
|
||||
f'{self.base_url}',
|
||||
headers=self.headers,
|
||||
transports=['websocket'],
|
||||
socketio_path='socket.io',
|
||||
wait_timeout=10,
|
||||
query=query,
|
||||
)
|
||||
|
||||
async def send_message(self, message: str) -> None:
|
||||
"""Send a message to the conversation.
|
||||
|
||||
Args:
|
||||
message: The message to send.
|
||||
"""
|
||||
event = MessageAction(content=message)
|
||||
event_dict = event_to_dict(event)
|
||||
await self.sio.emit('oh_user_action', event_dict)
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
"""Disconnect from the WebSocket."""
|
||||
await self.sio.disconnect()
|
||||
|
||||
|
||||
async def list_conversations_cmd(client: TeamClient, args: argparse.Namespace) -> None:
|
||||
"""List conversations command.
|
||||
|
||||
Args:
|
||||
client: The TeamClient instance.
|
||||
args: Command line arguments.
|
||||
"""
|
||||
conversations = await client.list_conversations(limit=args.limit)
|
||||
|
||||
if not conversations:
|
||||
print('No conversations found.')
|
||||
return
|
||||
|
||||
table = Table(title='Conversations')
|
||||
table.add_column('ID', style='cyan')
|
||||
table.add_column('Title', style='green')
|
||||
table.add_column('Status', style='magenta')
|
||||
table.add_column('Repository', style='blue')
|
||||
table.add_column('Last Updated', style='yellow')
|
||||
table.add_column('Created', style='yellow')
|
||||
|
||||
for convo in conversations:
|
||||
# Format dates
|
||||
created_at = datetime.fromisoformat(convo['created_at'].replace('Z', '+00:00'))
|
||||
last_updated_at = datetime.fromisoformat(
|
||||
convo['last_updated_at'].replace('Z', '+00:00')
|
||||
)
|
||||
|
||||
created_str = created_at.strftime('%Y-%m-%d %H:%M:%S')
|
||||
updated_str = last_updated_at.strftime('%Y-%m-%d %H:%M:%S')
|
||||
|
||||
# Add row to table
|
||||
table.add_row(
|
||||
convo['conversation_id'],
|
||||
convo['title'],
|
||||
convo['status'],
|
||||
convo.get('selected_repository', ''),
|
||||
updated_str,
|
||||
created_str,
|
||||
)
|
||||
|
||||
client.console.print(table)
|
||||
|
||||
|
||||
async def create_conversation_cmd(client: TeamClient, args: argparse.Namespace) -> None:
|
||||
"""Create a conversation command.
|
||||
|
||||
Args:
|
||||
client: The TeamClient instance.
|
||||
args: Command line arguments.
|
||||
"""
|
||||
initial_message = args.message
|
||||
|
||||
# If no message provided, prompt for one
|
||||
if not initial_message:
|
||||
print_formatted_text(HTML('<green>Enter your initial message:</green>'))
|
||||
initial_message = input('> ')
|
||||
|
||||
try:
|
||||
conversation_id = await client.create_conversation(
|
||||
repository=args.repository,
|
||||
git_provider=args.git_provider,
|
||||
selected_branch=args.branch,
|
||||
initial_user_msg=initial_message,
|
||||
conversation_instructions=args.instructions,
|
||||
)
|
||||
|
||||
print_formatted_text(
|
||||
HTML(f'<green>Conversation created with ID: {conversation_id}</green>')
|
||||
)
|
||||
|
||||
if args.join:
|
||||
await join_conversation_cmd(
|
||||
client, argparse.Namespace(conversation_id=conversation_id)
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
print_formatted_text(HTML(f'<red>Error creating conversation: {str(e)}</red>'))
|
||||
|
||||
|
||||
async def join_conversation_cmd(client: TeamClient, args: argparse.Namespace) -> None:
|
||||
"""Join a conversation command.
|
||||
|
||||
Args:
|
||||
client: The TeamClient instance.
|
||||
args: Command line arguments.
|
||||
"""
|
||||
conversation_id = args.conversation_id
|
||||
|
||||
try:
|
||||
# Get conversation details
|
||||
conversation = await client.get_conversation(conversation_id)
|
||||
|
||||
# Clear screen and show banner
|
||||
clear()
|
||||
display_banner(session_id=conversation_id)
|
||||
|
||||
# Show conversation title
|
||||
title = conversation.get('title', 'Untitled Conversation')
|
||||
display_welcome_message(f'Joined conversation: {title}')
|
||||
|
||||
# Connect to the WebSocket
|
||||
await client.connect_to_conversation(conversation_id)
|
||||
|
||||
# Main conversation loop
|
||||
try:
|
||||
while True:
|
||||
next_message = await read_prompt_input(
|
||||
AgentState.AWAITING_USER_INPUT.value
|
||||
)
|
||||
|
||||
if not next_message.strip():
|
||||
continue
|
||||
|
||||
if next_message.lower() in ['exit', 'quit', '/exit', '/quit']:
|
||||
break
|
||||
|
||||
await client.send_message(next_message)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print('\nDisconnecting...')
|
||||
finally:
|
||||
await client.disconnect()
|
||||
|
||||
except Exception as e:
|
||||
print_formatted_text(HTML(f'<red>Error joining conversation: {str(e)}</red>'))
|
||||
|
||||
|
||||
def get_base_url() -> str:
|
||||
"""Get the base URL for the OpenHands API.
|
||||
|
||||
Returns:
|
||||
The base URL.
|
||||
"""
|
||||
# Check environment variables first
|
||||
base_url = os.environ.get('OPENHANDS_API_URL')
|
||||
if base_url:
|
||||
return base_url
|
||||
|
||||
# Default to staging server
|
||||
return 'https://staging.all-hands.dev'
|
||||
|
||||
|
||||
def get_api_key() -> Optional[str]:
|
||||
"""Get the API key for authentication.
|
||||
|
||||
Returns:
|
||||
The API key, or None if not found.
|
||||
"""
|
||||
return os.environ.get('OPENHANDS_API_KEY')
|
||||
|
||||
|
||||
def setup_parser() -> argparse.ArgumentParser:
|
||||
"""Set up the argument parser for the team CLI.
|
||||
|
||||
Returns:
|
||||
The argument parser.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description='OpenHands Team CLI')
|
||||
parser.formatter_class = argparse.ArgumentDefaultsHelpFormatter
|
||||
|
||||
# Server configuration
|
||||
parser.add_argument(
|
||||
'--url',
|
||||
help='OpenHands API URL (default: $OPENHANDS_API_URL or https://staging.all-hands.dev)',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--api-key', help='OpenHands API key (default: $OPENHANDS_API_KEY)'
|
||||
)
|
||||
|
||||
subparsers = parser.add_subparsers(dest='command', help='Command to run')
|
||||
|
||||
# List conversations command
|
||||
list_parser = subparsers.add_parser(
|
||||
'list',
|
||||
help='List conversations',
|
||||
description='List all available conversations',
|
||||
)
|
||||
list_parser.add_argument(
|
||||
'-l',
|
||||
'--limit',
|
||||
type=int,
|
||||
default=20,
|
||||
help='Maximum number of conversations to list',
|
||||
)
|
||||
# Add help formatter
|
||||
list_parser.formatter_class = argparse.ArgumentDefaultsHelpFormatter
|
||||
|
||||
# Create conversation command
|
||||
create_parser = subparsers.add_parser(
|
||||
'create',
|
||||
help='Create a new conversation',
|
||||
description='Create a new conversation with optional repository and message',
|
||||
)
|
||||
create_parser.add_argument(
|
||||
'-r', '--repository', help='Repository name (owner/repo)'
|
||||
)
|
||||
create_parser.add_argument(
|
||||
'-g', '--git-provider', help='Git provider (github or gitlab)'
|
||||
)
|
||||
create_parser.add_argument('-b', '--branch', help='Branch name')
|
||||
create_parser.add_argument('-m', '--message', help='Initial user message')
|
||||
create_parser.add_argument('-i', '--instructions', help='Conversation instructions')
|
||||
create_parser.add_argument(
|
||||
'-j', '--join', action='store_true', help='Join the conversation after creation'
|
||||
)
|
||||
# Add help formatter
|
||||
create_parser.formatter_class = argparse.ArgumentDefaultsHelpFormatter
|
||||
|
||||
# Join conversation command
|
||||
join_parser = subparsers.add_parser(
|
||||
'join',
|
||||
help='Join an existing conversation',
|
||||
description='Join an existing conversation by ID',
|
||||
)
|
||||
join_parser.add_argument('conversation_id', help='Conversation ID')
|
||||
# Add help formatter
|
||||
join_parser.formatter_class = argparse.ArgumentDefaultsHelpFormatter
|
||||
|
||||
return parser
|
||||
|
||||
|
||||
async def main_async(args: argparse.Namespace) -> None:
|
||||
"""Main async function for the team CLI.
|
||||
|
||||
Args:
|
||||
args: Command line arguments.
|
||||
"""
|
||||
# Get base URL and API key
|
||||
base_url = args.url or get_base_url()
|
||||
api_key = args.api_key or get_api_key()
|
||||
|
||||
# Create client
|
||||
client = TeamClient(base_url, api_key)
|
||||
|
||||
# Run command
|
||||
if args.command == 'list':
|
||||
await list_conversations_cmd(client, args)
|
||||
elif args.command == 'create':
|
||||
await create_conversation_cmd(client, args)
|
||||
elif args.command == 'join':
|
||||
await join_conversation_cmd(client, args)
|
||||
else:
|
||||
print('No command specified. Use --help for usage information.')
|
||||
|
||||
|
||||
def main(args: Optional[list[str]] = None) -> None:
|
||||
"""Main function for the team CLI.
|
||||
|
||||
Args:
|
||||
args: Command line arguments.
|
||||
"""
|
||||
parser = setup_parser()
|
||||
|
||||
# If no arguments provided, show help
|
||||
if not args or len(args) == 0:
|
||||
parser.print_help()
|
||||
return
|
||||
|
||||
# Special case for subcommand help
|
||||
if (
|
||||
len(args) >= 2
|
||||
and args[0] in ['list', 'create', 'join']
|
||||
and args[1] in ['-h', '--help']
|
||||
):
|
||||
# Create a new parser just for this subcommand
|
||||
if args[0] == 'list':
|
||||
subparser = argparse.ArgumentParser(
|
||||
description='List all available conversations',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
)
|
||||
subparser.add_argument(
|
||||
'-l',
|
||||
'--limit',
|
||||
type=int,
|
||||
default=20,
|
||||
help='Maximum number of conversations to list',
|
||||
)
|
||||
subparser.add_argument(
|
||||
'--url',
|
||||
help='OpenHands API URL (default: $OPENHANDS_API_URL or https://staging.all-hands.dev)',
|
||||
)
|
||||
subparser.add_argument(
|
||||
'--api-key',
|
||||
help='OpenHands API key (default: $OPENHANDS_API_KEY)',
|
||||
)
|
||||
subparser.print_help()
|
||||
return
|
||||
elif args[0] == 'create':
|
||||
subparser = argparse.ArgumentParser(
|
||||
description='Create a new conversation with optional repository and message',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
)
|
||||
subparser.add_argument(
|
||||
'-r',
|
||||
'--repository',
|
||||
help='Repository name (owner/repo)',
|
||||
)
|
||||
subparser.add_argument(
|
||||
'-g',
|
||||
'--git-provider',
|
||||
help='Git provider (github or gitlab)',
|
||||
)
|
||||
subparser.add_argument('-b', '--branch', help='Branch name')
|
||||
subparser.add_argument('-m', '--message', help='Initial user message')
|
||||
subparser.add_argument(
|
||||
'-i',
|
||||
'--instructions',
|
||||
help='Conversation instructions',
|
||||
)
|
||||
subparser.add_argument(
|
||||
'-j',
|
||||
'--join',
|
||||
action='store_true',
|
||||
help='Join the conversation after creation',
|
||||
)
|
||||
subparser.add_argument(
|
||||
'--url',
|
||||
help='OpenHands API URL (default: $OPENHANDS_API_URL or https://staging.all-hands.dev)',
|
||||
)
|
||||
subparser.add_argument(
|
||||
'--api-key',
|
||||
help='OpenHands API key (default: $OPENHANDS_API_KEY)',
|
||||
)
|
||||
subparser.print_help()
|
||||
return
|
||||
elif args[0] == 'join':
|
||||
subparser = argparse.ArgumentParser(
|
||||
description='Join an existing conversation by ID',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
)
|
||||
subparser.add_argument('conversation_id', help='Conversation ID')
|
||||
subparser.add_argument(
|
||||
'--url',
|
||||
help='OpenHands API URL (default: $OPENHANDS_API_URL or https://staging.all-hands.dev)',
|
||||
)
|
||||
subparser.add_argument(
|
||||
'--api-key',
|
||||
help='OpenHands API key (default: $OPENHANDS_API_KEY)',
|
||||
)
|
||||
subparser.print_help()
|
||||
return
|
||||
|
||||
try:
|
||||
parsed_args = parser.parse_args(args)
|
||||
|
||||
# If no command specified, show help
|
||||
if not parsed_args.command:
|
||||
parser.print_help()
|
||||
return
|
||||
|
||||
# Run the command
|
||||
asyncio.run(main_async(parsed_args))
|
||||
except KeyboardInterrupt:
|
||||
print('\nOperation cancelled by user.')
|
||||
except Exception as e:
|
||||
print(f'Error: {str(e)}')
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
7
openhands/cli/team_cli.sh
Executable file
7
openhands/cli/team_cli.sh
Executable file
@@ -0,0 +1,7 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Get the Python executable
|
||||
PYTHON_EXE=$(which python)
|
||||
|
||||
# Run the team CLI
|
||||
$PYTHON_EXE -m openhands.cli.team "$@"
|
||||
76
openhands/cli/team_create.py
Normal file
76
openhands/cli/team_create.py
Normal file
@@ -0,0 +1,76 @@
|
||||
"""Create conversation command for the OpenHands Team CLI."""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
from openhands.cli.team import TeamClient
|
||||
|
||||
|
||||
def setup_parser() -> argparse.ArgumentParser:
|
||||
"""Set up the argument parser for the create command.
|
||||
|
||||
Returns:
|
||||
The argument parser.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Create a new conversation with optional repository and message',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
)
|
||||
parser.add_argument('-r', '--repository', help='Repository name (owner/repo)')
|
||||
parser.add_argument('-g', '--git-provider', help='Git provider (github or gitlab)')
|
||||
parser.add_argument('-b', '--branch', help='Branch name')
|
||||
parser.add_argument('-m', '--message', help='Initial user message')
|
||||
parser.add_argument('-i', '--instructions', help='Conversation instructions')
|
||||
parser.add_argument(
|
||||
'-j', '--join', action='store_true', help='Join the conversation after creation'
|
||||
)
|
||||
parser.add_argument(
|
||||
'--url',
|
||||
help='OpenHands API URL (default: $OPENHANDS_API_URL or http://localhost:3000)',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--api-key', help='OpenHands API key (default: $OPENHANDS_API_KEY)'
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
async def create_conversation(args: argparse.Namespace) -> None:
|
||||
"""Create a conversation command.
|
||||
|
||||
Args:
|
||||
args: Command line arguments.
|
||||
"""
|
||||
# Create client
|
||||
client = TeamClient(args.url, args.api_key)
|
||||
|
||||
try:
|
||||
# Create conversation
|
||||
await client.create_conversation(
|
||||
repository=args.repository,
|
||||
git_provider=args.git_provider,
|
||||
selected_branch=args.branch,
|
||||
initial_user_msg=args.message,
|
||||
conversation_instructions=args.instructions,
|
||||
)
|
||||
except Exception as e:
|
||||
print(f'Error creating conversation: {e}')
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def main(args: Optional[list[str]] = None) -> None:
|
||||
"""Main function for the create command.
|
||||
|
||||
Args:
|
||||
args: Command line arguments.
|
||||
"""
|
||||
parser = setup_parser()
|
||||
parsed_args = parser.parse_args(args)
|
||||
|
||||
import asyncio
|
||||
|
||||
asyncio.run(create_conversation(parsed_args))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
63
openhands/cli/team_join.py
Normal file
63
openhands/cli/team_join.py
Normal file
@@ -0,0 +1,63 @@
|
||||
"""Join conversation command for the OpenHands Team CLI."""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
from openhands.cli.team import TeamClient, join_conversation_cmd
|
||||
|
||||
|
||||
def setup_parser() -> argparse.ArgumentParser:
|
||||
"""Set up the argument parser for the join command.
|
||||
|
||||
Returns:
|
||||
The argument parser.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description='Join an existing conversation by ID',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
)
|
||||
parser.add_argument('conversation_id', help='Conversation ID')
|
||||
parser.add_argument(
|
||||
'--url',
|
||||
help='OpenHands API URL (default: $OPENHANDS_API_URL or http://localhost:3000)',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--api-key', help='OpenHands API key (default: $OPENHANDS_API_KEY)'
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
async def join_conversation(args: argparse.Namespace) -> None:
|
||||
"""Join a conversation command.
|
||||
|
||||
Args:
|
||||
args: Command line arguments.
|
||||
"""
|
||||
# Create client
|
||||
client = TeamClient(args.url, args.api_key)
|
||||
|
||||
try:
|
||||
# Join conversation
|
||||
await join_conversation_cmd(client, args)
|
||||
except Exception as e:
|
||||
print(f'Error joining conversation: {e}')
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def main(args: Optional[list[str]] = None) -> None:
|
||||
"""Main function for the join command.
|
||||
|
||||
Args:
|
||||
args: Command line arguments.
|
||||
"""
|
||||
parser = setup_parser()
|
||||
parsed_args = parser.parse_args(args)
|
||||
|
||||
import asyncio
|
||||
|
||||
asyncio.run(join_conversation(parsed_args))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
69
openhands/cli/team_list.py
Normal file
69
openhands/cli/team_list.py
Normal file
@@ -0,0 +1,69 @@
|
||||
"""List conversations command for the OpenHands Team CLI."""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
from openhands.cli.team import TeamClient
|
||||
|
||||
|
||||
def setup_parser() -> argparse.ArgumentParser:
|
||||
"""Set up the argument parser for the list command.
|
||||
|
||||
Returns:
|
||||
The argument parser.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(
|
||||
description='List all available conversations',
|
||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||
)
|
||||
parser.add_argument(
|
||||
'-l',
|
||||
'--limit',
|
||||
type=int,
|
||||
default=20,
|
||||
help='Maximum number of conversations to list',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--url',
|
||||
help='OpenHands API URL (default: $OPENHANDS_API_URL or http://localhost:3000)',
|
||||
)
|
||||
parser.add_argument(
|
||||
'--api-key', help='OpenHands API key (default: $OPENHANDS_API_KEY)'
|
||||
)
|
||||
return parser
|
||||
|
||||
|
||||
async def list_conversations(args: argparse.Namespace) -> None:
|
||||
"""List conversations command.
|
||||
|
||||
Args:
|
||||
args: Command line arguments.
|
||||
"""
|
||||
# Create client
|
||||
client = TeamClient(args.url, args.api_key)
|
||||
|
||||
try:
|
||||
# List conversations
|
||||
await client.list_conversations(limit=args.limit)
|
||||
except Exception as e:
|
||||
print(f'Error listing conversations: {e}')
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def main(args: Optional[list[str]] = None) -> None:
|
||||
"""Main function for the list command.
|
||||
|
||||
Args:
|
||||
args: Command line arguments.
|
||||
"""
|
||||
parser = setup_parser()
|
||||
parsed_args = parser.parse_args(args)
|
||||
|
||||
import asyncio
|
||||
|
||||
asyncio.run(list_conversations(parsed_args))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
||||
@@ -744,13 +744,26 @@ def get_parser() -> argparse.ArgumentParser:
|
||||
type=bool,
|
||||
default=False,
|
||||
)
|
||||
# Add team subcommand
|
||||
subparsers = parser.add_subparsers(dest='command')
|
||||
subparsers.add_parser(
|
||||
'team', help='Use team mode to interact with the OpenHands API'
|
||||
)
|
||||
# We'll handle the team subcommands separately
|
||||
return parser
|
||||
|
||||
|
||||
def parse_arguments() -> argparse.Namespace:
|
||||
"""Parse command line arguments."""
|
||||
parser = get_parser()
|
||||
args = parser.parse_args()
|
||||
|
||||
# Check if 'team' command is present
|
||||
if len(sys.argv) > 1 and sys.argv[1] == 'team':
|
||||
# Only parse known arguments, ignoring any team-specific arguments
|
||||
args, _ = parser.parse_known_args()
|
||||
else:
|
||||
# Parse all arguments normally
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.version:
|
||||
print(f'OpenHands version: {__version__}')
|
||||
|
||||
@@ -1,15 +0,0 @@
|
||||
from openhands.core.config.openhands_config import OpenHandsConfig
|
||||
from openhands.core.plugins.plugin_schema import PluginSchema
|
||||
from openhands.storage.settings.settings_store import SettingsStore
|
||||
from openhands.storage.settings.file_settings_store import FileSettingsStore
|
||||
|
||||
|
||||
class OpenHandsPlugin(PluginSchema):
|
||||
"""Base class for OpenHands plugins."""
|
||||
def __init__(self, config: OpenHandsConfig):
|
||||
self.config = config
|
||||
|
||||
async def get_settings_store(self) -> type[SettingsStore]:
|
||||
"""Get the settings store implementation."""
|
||||
return await FileSettingsStore.get_instance(self.config)
|
||||
|
||||
@@ -1,25 +0,0 @@
|
||||
from openhands.core.plugins.openhands_plugin import OpenHandsPlugin
|
||||
from openhands.core.plugins.plugin_schema import MissingPluginException, PluginSchema
|
||||
from openhands.storage.settings.settings_store import SettingsStore
|
||||
|
||||
|
||||
class PluginRegistry:
|
||||
_instance = None
|
||||
_plugins: list[type[PluginSchema]] = []
|
||||
|
||||
@classmethod
|
||||
def get_instance(cls):
|
||||
if cls._instance is None:
|
||||
cls._instance = cls()
|
||||
return cls._instance
|
||||
|
||||
def register_plugin(self, plugin: type[PluginSchema]):
|
||||
self._plugins.append(plugin)
|
||||
|
||||
async def get_settings_store(self) -> type[SettingsStore]:
|
||||
# Return the last registered implementation or default
|
||||
for plugin in reversed(self._plugins):
|
||||
store = await plugin.get_settings_store()
|
||||
return store
|
||||
|
||||
raise MissingPluginException('Did not find plugin for settings store')
|
||||
@@ -1,13 +0,0 @@
|
||||
from abc import abstractmethod
|
||||
from openhands.storage.settings.settings_store import SettingsStore
|
||||
|
||||
|
||||
class MissingPluginException(Exception):
|
||||
"""Raised when no plugin was found for the plugin registry."""
|
||||
|
||||
|
||||
class PluginSchema:
|
||||
@abstractmethod
|
||||
async def get_settings_store(self) -> type[SettingsStore]:
|
||||
"""Get the settings store implementation."""
|
||||
...
|
||||
@@ -1,12 +1,13 @@
|
||||
import asyncio
|
||||
import datetime
|
||||
from contextlib import AsyncExitStack
|
||||
from typing import Optional
|
||||
|
||||
from fastmcp import Client
|
||||
from fastmcp.client.transports import SSETransport, StreamableHttpTransport
|
||||
from mcp import McpError
|
||||
from mcp.types import CallToolResult
|
||||
from mcp import ClientSession
|
||||
from mcp.client.sse import sse_client
|
||||
from mcp.client.streamable_http import streamablehttp_client
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
from openhands.core.config.mcp_config import MCPSHTTPServerConfig, MCPSSEServerConfig
|
||||
from openhands.core.logger import openhands_logger as logger
|
||||
from openhands.mcp.tool import MCPClientTool
|
||||
|
||||
@@ -16,7 +17,8 @@ class MCPClient(BaseModel):
|
||||
A collection of tools that connects to an MCP server and manages available tools through the Model Context Protocol.
|
||||
"""
|
||||
|
||||
client: Optional[Client] = None
|
||||
session: Optional[ClientSession] = None
|
||||
exit_stack: AsyncExitStack = AsyncExitStack()
|
||||
description: str = 'MCP client tools for server interaction'
|
||||
tools: list[MCPClientTool] = Field(default_factory=list)
|
||||
tool_map: dict[str, MCPClientTool] = Field(default_factory=dict)
|
||||
@@ -24,87 +26,189 @@ class MCPClient(BaseModel):
|
||||
class Config:
|
||||
arbitrary_types_allowed = True
|
||||
|
||||
async def connect_sse(
|
||||
self,
|
||||
server_url: str,
|
||||
api_key: str | None = None,
|
||||
conversation_id: str | None = None,
|
||||
timeout: float = 30.0,
|
||||
) -> None:
|
||||
"""Connect to an MCP server using SSE transport.
|
||||
|
||||
Args:
|
||||
server_url: The URL of the SSE server to connect to.
|
||||
timeout: Connection timeout in seconds. Default is 30 seconds.
|
||||
"""
|
||||
if not server_url:
|
||||
raise ValueError('Server URL is required.')
|
||||
if self.session:
|
||||
await self.disconnect()
|
||||
|
||||
try:
|
||||
# Use asyncio.wait_for to enforce the timeout
|
||||
async def connect_with_timeout():
|
||||
headers = (
|
||||
{
|
||||
'Authorization': f'Bearer {api_key}',
|
||||
's': api_key, # We need this for action execution server's MCP Router
|
||||
'X-Session-API-Key': api_key, # We need this for Remote Runtime
|
||||
}
|
||||
if api_key
|
||||
else {}
|
||||
)
|
||||
|
||||
if conversation_id:
|
||||
headers['X-OpenHands-Conversation-ID'] = conversation_id
|
||||
|
||||
# Convert float timeout to datetime.timedelta for consistency
|
||||
timeout_delta = datetime.timedelta(seconds=timeout)
|
||||
|
||||
streams_context = sse_client(
|
||||
url=server_url,
|
||||
headers=headers if headers else None,
|
||||
timeout=timeout,
|
||||
)
|
||||
streams = await self.exit_stack.enter_async_context(streams_context)
|
||||
# For SSE client, we only get read_stream and write_stream (2 values)
|
||||
read_stream, write_stream = streams
|
||||
self.session = await self.exit_stack.enter_async_context(
|
||||
ClientSession(
|
||||
read_stream, write_stream, read_timeout_seconds=timeout_delta
|
||||
)
|
||||
)
|
||||
await self._initialize_and_list_tools()
|
||||
|
||||
# Apply timeout to the entire connection process
|
||||
await asyncio.wait_for(connect_with_timeout(), timeout=timeout)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(
|
||||
f'Connection to {server_url} timed out after {timeout} seconds'
|
||||
)
|
||||
await self.disconnect() # Clean up resources
|
||||
raise # Re-raise the TimeoutError
|
||||
except Exception as e:
|
||||
logger.error(f'Error connecting to {server_url}: {str(e)}')
|
||||
await self.disconnect() # Clean up resources
|
||||
raise
|
||||
|
||||
async def _initialize_and_list_tools(self) -> None:
|
||||
"""Initialize session and populate tool map."""
|
||||
if not self.client:
|
||||
if not self.session:
|
||||
raise RuntimeError('Session not initialized.')
|
||||
|
||||
async with self.client:
|
||||
tools = await self.client.list_tools()
|
||||
await self.session.initialize()
|
||||
response = await self.session.list_tools()
|
||||
|
||||
# Clear existing tools
|
||||
self.tools = []
|
||||
|
||||
# Create proper tool objects for each server tool
|
||||
for tool in tools:
|
||||
for tool in response.tools:
|
||||
server_tool = MCPClientTool(
|
||||
name=tool.name,
|
||||
description=tool.description,
|
||||
inputSchema=tool.inputSchema,
|
||||
session=self.client,
|
||||
session=self.session,
|
||||
)
|
||||
self.tool_map[tool.name] = server_tool
|
||||
self.tools.append(server_tool)
|
||||
|
||||
logger.info(f'Connected to server with tools: {[tool.name for tool in tools]}')
|
||||
logger.info(
|
||||
f'Connected to server with tools: {[tool.name for tool in response.tools]}'
|
||||
)
|
||||
|
||||
async def connect_http(
|
||||
self,
|
||||
server: MCPSSEServerConfig | MCPSHTTPServerConfig,
|
||||
conversation_id: str | None = None,
|
||||
timeout: float = 30.0,
|
||||
):
|
||||
"""Connect to MCP server using SHTTP or SSE transport"""
|
||||
server_url = server.url
|
||||
api_key = server.api_key
|
||||
|
||||
if not server_url:
|
||||
raise ValueError('Server URL is required.')
|
||||
|
||||
try:
|
||||
headers = (
|
||||
{
|
||||
'Authorization': f'Bearer {api_key}',
|
||||
's': api_key, # We need this for action execution server's MCP Router
|
||||
'X-Session-API-Key': api_key, # We need this for Remote Runtime
|
||||
}
|
||||
if api_key
|
||||
else {}
|
||||
)
|
||||
|
||||
if conversation_id:
|
||||
headers['X-OpenHands-Conversation-ID'] = conversation_id
|
||||
|
||||
# Instantiate custom transports due to custom headers
|
||||
if isinstance(server, MCPSHTTPServerConfig):
|
||||
transport = StreamableHttpTransport(
|
||||
url=server_url,
|
||||
headers=headers if headers else None,
|
||||
)
|
||||
else:
|
||||
transport = SSETransport(
|
||||
url=server_url,
|
||||
headers=headers if headers else None,
|
||||
)
|
||||
|
||||
self.client = Client(transport, timeout=timeout)
|
||||
|
||||
await self._initialize_and_list_tools()
|
||||
except McpError as e:
|
||||
logger.error(f'McpError connecting to {server_url}: {e}')
|
||||
raise # Re-raise the error
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'Error connecting to {server_url}: {e}')
|
||||
raise
|
||||
|
||||
async def call_tool(self, tool_name: str, args: dict) -> CallToolResult:
|
||||
async def call_tool(self, tool_name: str, args: dict):
|
||||
"""Call a tool on the MCP server."""
|
||||
if tool_name not in self.tool_map:
|
||||
raise ValueError(f'Tool {tool_name} not found.')
|
||||
# The MCPClientTool is primarily for metadata; use the session to call the actual tool.
|
||||
if not self.client:
|
||||
if not self.session:
|
||||
raise RuntimeError('Client session is not available.')
|
||||
return await self.session.call_tool(name=tool_name, arguments=args)
|
||||
|
||||
async with self.client:
|
||||
return await self.client.call_tool_mcp(name=tool_name, arguments=args)
|
||||
async def connect_shttp(
|
||||
self,
|
||||
server_url: str,
|
||||
api_key: str | None = None,
|
||||
conversation_id: str | None = None,
|
||||
timeout: float = 30.0,
|
||||
) -> None:
|
||||
"""Connect to an MCP server using StreamableHTTP transport.
|
||||
|
||||
Args:
|
||||
server_url: The URL of the StreamableHTTP server to connect to.
|
||||
api_key: Optional API key for authentication.
|
||||
conversation_id: Optional conversation ID for session tracking.
|
||||
timeout: Connection timeout in seconds. Default is 30 seconds.
|
||||
"""
|
||||
if not server_url:
|
||||
raise ValueError('Server URL is required.')
|
||||
if self.session:
|
||||
await self.disconnect()
|
||||
|
||||
try:
|
||||
# Use asyncio.wait_for to enforce the timeout
|
||||
async def connect_with_timeout():
|
||||
headers = (
|
||||
{
|
||||
'Authorization': f'Bearer {api_key}',
|
||||
's': api_key, # We need this for action execution server's MCP Router
|
||||
'X-Session-API-Key': api_key, # We need this for Remote Runtime
|
||||
}
|
||||
if api_key
|
||||
else {}
|
||||
)
|
||||
|
||||
if conversation_id:
|
||||
headers['X-OpenHands-Conversation-ID'] = conversation_id
|
||||
|
||||
# Convert float timeout to datetime.timedelta
|
||||
timeout_delta = datetime.timedelta(seconds=timeout)
|
||||
sse_read_timeout_delta = datetime.timedelta(
|
||||
seconds=timeout * 10
|
||||
) # 10x longer for read timeout
|
||||
|
||||
streams_context = streamablehttp_client(
|
||||
url=server_url,
|
||||
headers=headers if headers else None,
|
||||
timeout=timeout_delta,
|
||||
sse_read_timeout=sse_read_timeout_delta,
|
||||
)
|
||||
streams = await self.exit_stack.enter_async_context(streams_context)
|
||||
# For StreamableHTTP client, we get read_stream, write_stream, and get_session_id (3 values)
|
||||
read_stream, write_stream, _ = streams
|
||||
self.session = await self.exit_stack.enter_async_context(
|
||||
ClientSession(
|
||||
read_stream, write_stream, read_timeout_seconds=timeout_delta
|
||||
)
|
||||
)
|
||||
await self._initialize_and_list_tools()
|
||||
|
||||
# Apply timeout to the entire connection process
|
||||
await asyncio.wait_for(connect_with_timeout(), timeout=timeout)
|
||||
except asyncio.TimeoutError:
|
||||
logger.error(
|
||||
f'Connection to {server_url} timed out after {timeout} seconds'
|
||||
)
|
||||
await self.disconnect() # Clean up resources
|
||||
raise # Re-raise the TimeoutError
|
||||
except Exception as e:
|
||||
logger.error(f'Error connecting to {server_url}: {str(e)}')
|
||||
await self.disconnect() # Clean up resources
|
||||
raise
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
"""Disconnect from the MCP server and clean up resources."""
|
||||
if self.session:
|
||||
try:
|
||||
# Close the session first
|
||||
if hasattr(self.session, 'close'):
|
||||
await self.session.close()
|
||||
# Then close the exit stack
|
||||
await self.exit_stack.aclose()
|
||||
except Exception as e:
|
||||
logger.error(f'Error during disconnect: {str(e)}')
|
||||
finally:
|
||||
self.session = None
|
||||
self.tools = []
|
||||
logger.info('Disconnected from MCP server')
|
||||
|
||||
@@ -72,22 +72,38 @@ async def create_mcp_clients(
|
||||
mcp_clients = []
|
||||
|
||||
for server in servers:
|
||||
is_shttp = isinstance(server, MCPSHTTPServerConfig)
|
||||
connection_type = 'SHTTP' if is_shttp else 'SSE'
|
||||
is_sse = isinstance(server, MCPSSEServerConfig)
|
||||
connection_type = 'SSE' if is_sse else 'SHTTP'
|
||||
logger.info(
|
||||
f'Initializing MCP agent for {server} with {connection_type} connection...'
|
||||
)
|
||||
client = MCPClient()
|
||||
|
||||
try:
|
||||
await client.connect_http(server, conversation_id=conversation_id)
|
||||
if is_sse:
|
||||
await client.connect_sse(
|
||||
server.url,
|
||||
api_key=server.api_key,
|
||||
conversation_id=conversation_id,
|
||||
)
|
||||
else:
|
||||
await client.connect_shttp(
|
||||
server.url,
|
||||
api_key=server.api_key,
|
||||
conversation_id=conversation_id,
|
||||
)
|
||||
|
||||
# Only add the client to the list after a successful connection
|
||||
mcp_clients.append(client)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'Failed to connect to {server}: {str(e)}', exc_info=True)
|
||||
|
||||
try:
|
||||
await client.disconnect()
|
||||
except Exception as disconnect_error:
|
||||
logger.error(
|
||||
f'Error during disconnect after failed connection: {str(disconnect_error)}'
|
||||
)
|
||||
return mcp_clients
|
||||
|
||||
|
||||
@@ -127,6 +143,13 @@ async def fetch_mcp_tools_from_config(
|
||||
# Convert tools to the format expected by the agent
|
||||
mcp_tools = convert_mcp_clients_to_tools(mcp_clients)
|
||||
|
||||
# Always disconnect clients to clean up resources
|
||||
for mcp_client in mcp_clients:
|
||||
try:
|
||||
await mcp_client.disconnect()
|
||||
except Exception as disconnect_error:
|
||||
logger.error(f'Error disconnecting MCP client: {str(disconnect_error)}')
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f'Error fetching MCP tools: {str(e)}')
|
||||
return []
|
||||
|
||||
@@ -471,6 +471,11 @@ class ActionExecutionClient(Runtime):
|
||||
# Call the tool and return the result
|
||||
# No need for try/finally since disconnect() is now just resetting state
|
||||
result = await call_tool_mcp_handler(mcp_clients, action)
|
||||
|
||||
# Reset client state (no active connections to worry about)
|
||||
for client in mcp_clients:
|
||||
await client.disconnect()
|
||||
|
||||
return result
|
||||
|
||||
def close(self) -> None:
|
||||
|
||||
@@ -17,7 +17,6 @@ from openhands.events.observation.commands import (
|
||||
CmdOutputMetadata,
|
||||
CmdOutputObservation,
|
||||
)
|
||||
from openhands.runtime.utils.bash_constants import TIMEOUT_MESSAGE_TEMPLATE
|
||||
from openhands.utils.shutdown_listener import should_continue
|
||||
|
||||
|
||||
@@ -380,7 +379,9 @@ class BashSession:
|
||||
metadata = CmdOutputMetadata() # No metadata available
|
||||
metadata.suffix = (
|
||||
f'\n[The command has no new output after {self.NO_CHANGE_TIMEOUT_SECONDS} seconds. '
|
||||
f'{TIMEOUT_MESSAGE_TEMPLATE}]'
|
||||
"You may wait longer to see additional output by sending empty command '', "
|
||||
'send other commands to interact with the current process, '
|
||||
'or send keys to interrupt/kill the command.]'
|
||||
)
|
||||
command_output = self._get_command_output(
|
||||
command,
|
||||
@@ -413,7 +414,9 @@ class BashSession:
|
||||
metadata = CmdOutputMetadata() # No metadata available
|
||||
metadata.suffix = (
|
||||
f'\n[The command timed out after {timeout} seconds. '
|
||||
f'{TIMEOUT_MESSAGE_TEMPLATE}]'
|
||||
"You may wait longer to see additional output by sending empty command '', "
|
||||
'send other commands to interact with the current process, '
|
||||
'or send keys to interrupt/kill the command.]'
|
||||
)
|
||||
command_output = self._get_command_output(
|
||||
command,
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
# Common timeout message that can be used across different timeout scenarios
|
||||
TIMEOUT_MESSAGE_TEMPLATE = (
|
||||
"You may wait longer to see additional output by sending empty command '', "
|
||||
'send other commands to interact with the current process, '
|
||||
'send keys to interrupt/kill the command, '
|
||||
'or use the timeout parameter in execute_bash for future commands.'
|
||||
)
|
||||
@@ -20,7 +20,6 @@ from openhands.events.observation.commands import (
|
||||
CmdOutputMetadata,
|
||||
CmdOutputObservation,
|
||||
)
|
||||
from openhands.runtime.utils.bash_constants import TIMEOUT_MESSAGE_TEMPLATE
|
||||
from openhands.utils.shutdown_listener import should_continue
|
||||
|
||||
pythonnet.load('coreclr')
|
||||
@@ -560,7 +559,9 @@ class WindowsPowershellSession:
|
||||
else:
|
||||
metadata.suffix = (
|
||||
f'\n[The command timed out after {timeout_seconds} seconds. '
|
||||
f'{TIMEOUT_MESSAGE_TEMPLATE}]'
|
||||
"You may wait longer to see additional output by sending empty command '', "
|
||||
'send other commands to interact with the current process, '
|
||||
'or send keys to interrupt/kill the command.]'
|
||||
)
|
||||
|
||||
return CmdOutputObservation(
|
||||
@@ -1330,7 +1331,9 @@ class WindowsPowershellSession:
|
||||
# Align suffix with bash.py timeout message
|
||||
suffix = (
|
||||
f'\n[The command timed out after {timeout_seconds} seconds. '
|
||||
f'{TIMEOUT_MESSAGE_TEMPLATE}]'
|
||||
"You may wait longer to see additional output by sending empty command '', "
|
||||
'send other commands to interact with the current process, '
|
||||
'or send keys to interrupt/kill the command.]'
|
||||
)
|
||||
elif shutdown_requested:
|
||||
# Align suffix with bash.py equivalent (though bash.py might not have specific shutdown message)
|
||||
|
||||
@@ -31,7 +31,7 @@ class FileSettingsStore(SettingsStore):
|
||||
|
||||
@classmethod
|
||||
async def get_instance(
|
||||
cls, config: OpenHandsConfig
|
||||
cls, config: OpenHandsConfig, user_id: str | None
|
||||
) -> FileSettingsStore:
|
||||
file_store = file_store = get_file_store(
|
||||
config.file_store,
|
||||
|
||||
@@ -31,6 +31,6 @@ class SettingsStore(ABC):
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
async def get_instance(
|
||||
cls, config: OpenHandsConfig
|
||||
cls, config: OpenHandsConfig, user_id: str | None
|
||||
) -> SettingsStore:
|
||||
"""Get a store for the user represented by the token given."""
|
||||
|
||||
@@ -48,6 +48,7 @@ dirhash = "*"
|
||||
tornado = "*"
|
||||
python-dotenv = "*"
|
||||
rapidfuzz = "^3.9.0"
|
||||
rich = "^13.7.0"
|
||||
whatthepatch = "^1.0.6"
|
||||
protobuf = "^5.0.0,<6.0.0" # Updated to support newer opentelemetry
|
||||
opentelemetry-api = "^1.33.1"
|
||||
|
||||
@@ -16,16 +16,6 @@ from openhands.events.action import CmdRunAction
|
||||
from openhands.events.observation import CmdOutputObservation, ErrorObservation
|
||||
from openhands.runtime.impl.cli.cli_runtime import CLIRuntime
|
||||
from openhands.runtime.impl.local.local_runtime import LocalRuntime
|
||||
from openhands.runtime.utils.bash_constants import TIMEOUT_MESSAGE_TEMPLATE
|
||||
|
||||
|
||||
def get_timeout_suffix(timeout_seconds):
|
||||
"""Helper function to generate the expected timeout suffix."""
|
||||
return (
|
||||
f'[The command timed out after {timeout_seconds} seconds. '
|
||||
f'{TIMEOUT_MESSAGE_TEMPLATE}]'
|
||||
)
|
||||
|
||||
|
||||
# ============================================================================================================================
|
||||
# Bash-specific tests
|
||||
@@ -66,7 +56,10 @@ def test_bash_server(temp_dir, runtime_cls, run_as_openhands):
|
||||
if runtime_cls == CLIRuntime:
|
||||
assert '[The command timed out after 1.0 seconds.]' in obs.metadata.suffix
|
||||
else:
|
||||
assert get_timeout_suffix(1.0) in obs.metadata.suffix
|
||||
assert (
|
||||
"[The command timed out after 1.0 seconds. You may wait longer to see additional output by sending empty command '', send other commands to interact with the current process, or send keys to interrupt/kill the command.]"
|
||||
in obs.metadata.suffix
|
||||
)
|
||||
|
||||
action = CmdRunAction(command='C-c', is_input=True)
|
||||
action.set_hard_timeout(30)
|
||||
|
||||
@@ -589,7 +589,7 @@
|
||||
"working_dir": null,
|
||||
"py_interpreter_path": null,
|
||||
"prefix": "",
|
||||
"suffix": "\n[The command has no new output after 30 seconds. You may wait longer to see additional output by sending empty command '', send other commands to interact with the current process, send keys to interrupt/kill the command, or use the timeout parameter in execute_bash for future commands.]"
|
||||
"suffix": "\n[The command has no new output after 30 seconds. You may wait longer to see additional output by sending empty command '', send other commands to interact with the current process, or send keys to interrupt/kill the command.]"
|
||||
},
|
||||
"hidden": false
|
||||
},
|
||||
|
||||
@@ -5,15 +5,6 @@ import time
|
||||
from openhands.core.logger import openhands_logger as logger
|
||||
from openhands.events.action import CmdRunAction
|
||||
from openhands.runtime.utils.bash import BashCommandStatus, BashSession
|
||||
from openhands.runtime.utils.bash_constants import TIMEOUT_MESSAGE_TEMPLATE
|
||||
|
||||
|
||||
def get_no_change_timeout_suffix(timeout_seconds):
|
||||
"""Helper function to generate the expected no-change timeout suffix."""
|
||||
return (
|
||||
f'\n[The command has no new output after {timeout_seconds} seconds. '
|
||||
f'{TIMEOUT_MESSAGE_TEMPLATE}]'
|
||||
)
|
||||
|
||||
|
||||
def test_session_initialization():
|
||||
@@ -92,7 +83,12 @@ def test_long_running_command_follow_by_execute():
|
||||
assert '1' in obs.content # First number should appear before timeout
|
||||
assert obs.metadata.exit_code == -1 # -1 indicates command is still running
|
||||
assert session.prev_status == BashCommandStatus.NO_CHANGE_TIMEOUT
|
||||
assert obs.metadata.suffix == get_no_change_timeout_suffix(2)
|
||||
assert obs.metadata.suffix == (
|
||||
'\n[The command has no new output after 2 seconds. '
|
||||
"You may wait longer to see additional output by sending empty command '', "
|
||||
'send other commands to interact with the current process, '
|
||||
'or send keys to interrupt/kill the command.]'
|
||||
)
|
||||
assert obs.metadata.prefix == ''
|
||||
|
||||
# Continue watching output
|
||||
@@ -100,7 +96,12 @@ def test_long_running_command_follow_by_execute():
|
||||
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
|
||||
assert '2' in obs.content
|
||||
assert obs.metadata.prefix == '[Below is the output of the previous command.]\n'
|
||||
assert obs.metadata.suffix == get_no_change_timeout_suffix(2)
|
||||
assert obs.metadata.suffix == (
|
||||
'\n[The command has no new output after 2 seconds. '
|
||||
"You may wait longer to see additional output by sending empty command '', "
|
||||
'send other commands to interact with the current process, '
|
||||
'or send keys to interrupt/kill the command.]'
|
||||
)
|
||||
assert obs.metadata.exit_code == -1 # -1 indicates command is still running
|
||||
assert session.prev_status == BashCommandStatus.NO_CHANGE_TIMEOUT
|
||||
|
||||
@@ -141,7 +142,12 @@ def test_interactive_command():
|
||||
assert 'Enter name:' in obs.content
|
||||
assert obs.metadata.exit_code == -1 # -1 indicates command is still running
|
||||
assert session.prev_status == BashCommandStatus.NO_CHANGE_TIMEOUT
|
||||
assert obs.metadata.suffix == get_no_change_timeout_suffix(3)
|
||||
assert obs.metadata.suffix == (
|
||||
'\n[The command has no new output after 3 seconds. '
|
||||
"You may wait longer to see additional output by sending empty command '', "
|
||||
'send other commands to interact with the current process, '
|
||||
'or send keys to interrupt/kill the command.]'
|
||||
)
|
||||
assert obs.metadata.prefix == ''
|
||||
|
||||
# Send input
|
||||
@@ -158,21 +164,36 @@ def test_interactive_command():
|
||||
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
|
||||
assert obs.metadata.exit_code == -1
|
||||
assert session.prev_status == BashCommandStatus.NO_CHANGE_TIMEOUT
|
||||
assert obs.metadata.suffix == get_no_change_timeout_suffix(3)
|
||||
assert obs.metadata.suffix == (
|
||||
'\n[The command has no new output after 3 seconds. '
|
||||
"You may wait longer to see additional output by sending empty command '', "
|
||||
'send other commands to interact with the current process, '
|
||||
'or send keys to interrupt/kill the command.]'
|
||||
)
|
||||
assert obs.metadata.prefix == ''
|
||||
|
||||
obs = session.execute(CmdRunAction('line 1', is_input=True))
|
||||
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
|
||||
assert obs.metadata.exit_code == -1
|
||||
assert session.prev_status == BashCommandStatus.NO_CHANGE_TIMEOUT
|
||||
assert obs.metadata.suffix == get_no_change_timeout_suffix(3)
|
||||
assert obs.metadata.suffix == (
|
||||
'\n[The command has no new output after 3 seconds. '
|
||||
"You may wait longer to see additional output by sending empty command '', "
|
||||
'send other commands to interact with the current process, '
|
||||
'or send keys to interrupt/kill the command.]'
|
||||
)
|
||||
assert obs.metadata.prefix == '[Below is the output of the previous command.]\n'
|
||||
|
||||
obs = session.execute(CmdRunAction('line 2', is_input=True))
|
||||
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
|
||||
assert obs.metadata.exit_code == -1
|
||||
assert session.prev_status == BashCommandStatus.NO_CHANGE_TIMEOUT
|
||||
assert obs.metadata.suffix == get_no_change_timeout_suffix(3)
|
||||
assert obs.metadata.suffix == (
|
||||
'\n[The command has no new output after 3 seconds. '
|
||||
"You may wait longer to see additional output by sending empty command '', "
|
||||
'send other commands to interact with the current process, '
|
||||
'or send keys to interrupt/kill the command.]'
|
||||
)
|
||||
assert obs.metadata.prefix == '[Below is the output of the previous command.]\n'
|
||||
|
||||
obs = session.execute(CmdRunAction('EOF', is_input=True))
|
||||
@@ -195,7 +216,12 @@ def test_ctrl_c():
|
||||
)
|
||||
logger.info(obs, extra={'msg_type': 'OBSERVATION'})
|
||||
assert 'looping' in obs.content
|
||||
assert obs.metadata.suffix == get_no_change_timeout_suffix(2)
|
||||
assert obs.metadata.suffix == (
|
||||
'\n[The command has no new output after 2 seconds. '
|
||||
"You may wait longer to see additional output by sending empty command '', "
|
||||
'send other commands to interact with the current process, '
|
||||
'or send keys to interrupt/kill the command.]'
|
||||
)
|
||||
assert obs.metadata.prefix == ''
|
||||
assert obs.metadata.exit_code == -1 # -1 indicates command is still running
|
||||
assert session.prev_status == BashCommandStatus.NO_CHANGE_TIMEOUT
|
||||
|
||||
@@ -82,7 +82,7 @@ async def test_get_instance():
|
||||
mock_store = MagicMock(spec=FileStore)
|
||||
mock_get_store.return_value = mock_store
|
||||
|
||||
store = await FileSettingsStore.get_instance(config)
|
||||
store = await FileSettingsStore.get_instance(config, None)
|
||||
|
||||
assert isinstance(store, FileSettingsStore)
|
||||
assert store.file_store == mock_store
|
||||
|
||||
49
tests/unit/test_mcp_client_timeout.py
Normal file
49
tests/unit/test_mcp_client_timeout.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import asyncio
|
||||
from contextlib import asynccontextmanager
|
||||
from unittest import mock
|
||||
|
||||
import pytest
|
||||
|
||||
from openhands.mcp.client import MCPClient
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_sse_timeout():
|
||||
"""Test that connect_sse properly times out when server_url is invalid."""
|
||||
client = MCPClient()
|
||||
|
||||
# Create a mock async context manager that simulates a timeout
|
||||
@asynccontextmanager
|
||||
async def mock_slow_context(*args, **kwargs):
|
||||
# This will hang for longer than our timeout
|
||||
await asyncio.sleep(10.0)
|
||||
yield (mock.AsyncMock(), mock.AsyncMock())
|
||||
|
||||
# Patch the sse_client function to return our slow context manager
|
||||
with mock.patch(
|
||||
'openhands.mcp.client.sse_client', return_value=mock_slow_context()
|
||||
):
|
||||
# Test with a very short timeout
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await client.connect_sse('http://example.com', timeout=0.1)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_connect_streamable_http_timeout():
|
||||
"""Test that connect_streamable_http properly times out when server_url is invalid."""
|
||||
client = MCPClient()
|
||||
|
||||
# Create a mock async context manager that simulates a timeout
|
||||
@asynccontextmanager
|
||||
async def mock_slow_context(*args, **kwargs):
|
||||
# This will hang for longer than our timeout
|
||||
await asyncio.sleep(10.0)
|
||||
yield (mock.AsyncMock(), mock.AsyncMock(), mock.AsyncMock())
|
||||
|
||||
# Patch the streamablehttp_client function to return our slow context manager
|
||||
with mock.patch(
|
||||
'openhands.mcp.client.streamablehttp_client', return_value=mock_slow_context()
|
||||
):
|
||||
# Test with a very short timeout
|
||||
with pytest.raises(asyncio.TimeoutError):
|
||||
await client.connect_shttp('http://example.com', timeout=0.1)
|
||||
@@ -2,7 +2,6 @@ import asyncio
|
||||
|
||||
import pytest
|
||||
|
||||
from openhands.core.config.mcp_config import MCPSSEServerConfig
|
||||
from openhands.mcp.client import MCPClient
|
||||
from openhands.mcp.utils import create_mcp_clients
|
||||
|
||||
@@ -11,24 +10,22 @@ from openhands.mcp.utils import create_mcp_clients
|
||||
async def test_create_mcp_clients_timeout_with_invalid_url():
|
||||
"""Test that create_mcp_clients properly times out when given an invalid URL."""
|
||||
# Use a non-existent domain that should cause a connection timeout
|
||||
server = MCPSSEServerConfig(
|
||||
url='http://non-existent-domain-that-will-timeout.invalid'
|
||||
)
|
||||
invalid_url = 'http://non-existent-domain-that-will-timeout.invalid'
|
||||
|
||||
# Temporarily modify the default timeout for the MCPClient.connect_http method
|
||||
original_connect_connect_http = MCPClient.connect_http
|
||||
# Temporarily modify the default timeout for the MCPClient.connect_sse method
|
||||
original_connect_sse = MCPClient.connect_sse
|
||||
|
||||
# Create a wrapper that calls the original method but with a shorter timeout
|
||||
async def connect_http_with_short_timeout(self, server_url, timeout=30.0):
|
||||
return await original_connect_connect_http(self, server_url, timeout=0.5)
|
||||
async def connect_sse_with_short_timeout(self, server_url, timeout=30.0):
|
||||
return await original_connect_sse(self, server_url, timeout=0.5)
|
||||
|
||||
try:
|
||||
# Replace the method with our wrapper
|
||||
MCPClient.connect_http = connect_http_with_short_timeout
|
||||
MCPClient.connect_sse = connect_sse_with_short_timeout
|
||||
|
||||
# Call create_mcp_clients with the invalid URL
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
clients = await create_mcp_clients([server], [])
|
||||
clients = await create_mcp_clients([invalid_url], [])
|
||||
end_time = asyncio.get_event_loop().time()
|
||||
|
||||
# Verify that no clients were successfully connected
|
||||
@@ -41,7 +38,7 @@ async def test_create_mcp_clients_timeout_with_invalid_url():
|
||||
)
|
||||
finally:
|
||||
# Restore the original method
|
||||
MCPClient.connect_http = original_connect_connect_http
|
||||
MCPClient.connect_sse = original_connect_sse
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@@ -51,16 +48,16 @@ async def test_create_mcp_clients_with_unreachable_host():
|
||||
# This IP is in the TEST-NET-1 range (192.0.2.0/24) reserved for documentation and examples
|
||||
unreachable_url = 'http://192.0.2.1:8080'
|
||||
|
||||
# Temporarily modify the default timeout for the MCPClient.connect_http method
|
||||
original_connect_http = MCPClient.connect_http
|
||||
# Temporarily modify the default timeout for the MCPClient.connect_sse method
|
||||
original_connect_sse = MCPClient.connect_sse
|
||||
|
||||
# Create a wrapper that calls the original method but with a shorter timeout
|
||||
async def connect_http_with_short_timeout(self, server_url, timeout=30.0):
|
||||
return await original_connect_http(self, server_url, timeout=1.0)
|
||||
async def connect_sse_with_short_timeout(self, server_url, timeout=30.0):
|
||||
return await original_connect_sse(self, server_url, timeout=1.0)
|
||||
|
||||
try:
|
||||
# Replace the method with our wrapper
|
||||
MCPClient.connect_http = connect_http_with_short_timeout
|
||||
MCPClient.connect_sse = connect_sse_with_short_timeout
|
||||
|
||||
# Call create_mcp_clients with the unreachable URL
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
@@ -76,4 +73,4 @@ async def test_create_mcp_clients_with_unreachable_host():
|
||||
)
|
||||
finally:
|
||||
# Restore the original method
|
||||
MCPClient.connect_http = original_connect_http
|
||||
MCPClient.connect_sse = original_connect_sse
|
||||
|
||||
@@ -13,12 +13,12 @@ async def test_sse_connection_timeout():
|
||||
# Create a mock MCPClient
|
||||
mock_client = mock.MagicMock(spec=MCPClient)
|
||||
|
||||
# Configure the mock to raise a TimeoutError when connect_http is called
|
||||
async def mock_connect_http(*args, **kwargs):
|
||||
# Configure the mock to raise a TimeoutError when connect_sse is called
|
||||
async def mock_connect_sse(*args, **kwargs):
|
||||
await asyncio.sleep(0.1) # Simulate some delay
|
||||
raise asyncio.TimeoutError('Connection timed out')
|
||||
|
||||
mock_client.connect_http.side_effect = mock_connect_http
|
||||
mock_client.connect_sse.side_effect = mock_connect_sse
|
||||
mock_client.disconnect = mock.AsyncMock()
|
||||
|
||||
# Mock the MCPClient constructor to return our mock
|
||||
@@ -35,8 +35,11 @@ async def test_sse_connection_timeout():
|
||||
# Verify that no clients were successfully connected
|
||||
assert len(clients) == 0
|
||||
|
||||
# Verify that connect_http was called for each server
|
||||
assert mock_client.connect_http.call_count == 2
|
||||
# Verify that connect_sse was called for each server
|
||||
assert mock_client.connect_sse.call_count == 2
|
||||
|
||||
# Verify that disconnect was called for each failed connection
|
||||
assert mock_client.disconnect.call_count == 2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
@@ -24,7 +24,7 @@ async def test_create_mcp_clients_success(mock_mcp_client):
|
||||
# Setup mock
|
||||
mock_client_instance = AsyncMock()
|
||||
mock_mcp_client.return_value = mock_client_instance
|
||||
mock_client_instance.connect_http = AsyncMock()
|
||||
mock_client_instance.connect_sse = AsyncMock()
|
||||
|
||||
# Test with two servers
|
||||
server_configs = [
|
||||
@@ -38,12 +38,12 @@ async def test_create_mcp_clients_success(mock_mcp_client):
|
||||
assert len(clients) == 2
|
||||
assert mock_mcp_client.call_count == 2
|
||||
|
||||
# Check that connect_http was called with correct parameters
|
||||
mock_client_instance.connect_http.assert_any_call(
|
||||
server_configs[0], conversation_id=None
|
||||
# Check that connect_sse was called with correct parameters
|
||||
mock_client_instance.connect_sse.assert_any_call(
|
||||
'http://server1:8080', api_key=None, conversation_id=None
|
||||
)
|
||||
mock_client_instance.connect_http.assert_any_call(
|
||||
server_configs[1], conversation_id=None
|
||||
mock_client_instance.connect_sse.assert_any_call(
|
||||
'http://server2:8080', api_key='test-key', conversation_id=None
|
||||
)
|
||||
|
||||
|
||||
@@ -56,10 +56,11 @@ async def test_create_mcp_clients_connection_failure(mock_mcp_client):
|
||||
mock_mcp_client.return_value = mock_client_instance
|
||||
|
||||
# First connection succeeds, second fails
|
||||
mock_client_instance.connect_http.side_effect = [
|
||||
mock_client_instance.connect_sse.side_effect = [
|
||||
None, # Success
|
||||
Exception('Connection failed'), # Failure
|
||||
]
|
||||
mock_client_instance.disconnect = AsyncMock()
|
||||
|
||||
server_configs = [
|
||||
MCPSSEServerConfig(url='http://server1:8080'),
|
||||
@@ -70,6 +71,7 @@ async def test_create_mcp_clients_connection_failure(mock_mcp_client):
|
||||
|
||||
# Verify only one client was successfully created
|
||||
assert len(clients) == 1
|
||||
assert mock_client_instance.disconnect.call_count == 1
|
||||
|
||||
|
||||
def test_convert_mcp_clients_to_tools_empty():
|
||||
|
||||
@@ -12,16 +12,6 @@ from openhands.events.observation import ErrorObservation
|
||||
from openhands.events.observation.commands import (
|
||||
CmdOutputObservation,
|
||||
)
|
||||
from openhands.runtime.utils.bash_constants import TIMEOUT_MESSAGE_TEMPLATE
|
||||
|
||||
|
||||
def get_timeout_suffix(timeout_seconds):
|
||||
"""Helper function to generate the expected timeout suffix."""
|
||||
return (
|
||||
f'[The command timed out after {timeout_seconds} seconds. '
|
||||
f'{TIMEOUT_MESSAGE_TEMPLATE}]'
|
||||
)
|
||||
|
||||
|
||||
# Skip all tests in this module if not running on Windows
|
||||
pytestmark = pytest.mark.skipif(
|
||||
@@ -178,7 +168,10 @@ def test_long_running_command(windows_bash_session):
|
||||
# Verify the initial output was captured
|
||||
assert 'Serving HTTP on' in result.content
|
||||
# Check for timeout specific metadata
|
||||
assert get_timeout_suffix(1.0) in result.metadata.suffix
|
||||
assert (
|
||||
"[The command timed out after 1.0 seconds. You may wait longer to see additional output by sending empty command '', send other commands to interact with the current process, or send keys to interrupt/kill the command.]"
|
||||
in result.metadata.suffix
|
||||
)
|
||||
assert result.exit_code == -1
|
||||
|
||||
# The action timed out, but the command should be still running
|
||||
|
||||
Reference in New Issue
Block a user