mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-04-29 03:00:45 -04:00
Compare commits
1 Commits
neubig/cli
...
openhands/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d00d37897 |
@@ -19,6 +19,7 @@ import time
|
||||
import traceback
|
||||
from contextlib import asynccontextmanager
|
||||
from pathlib import Path
|
||||
from queue import Queue
|
||||
from zipfile import ZipFile
|
||||
|
||||
from binaryornot.check import is_binary
|
||||
@@ -77,6 +78,21 @@ from openhands.utils.async_utils import call_sync_from_async, wait_all
|
||||
mcp_router_logger.setLevel(logger.getEffectiveLevel())
|
||||
|
||||
|
||||
class QueueHandler(logging.Handler):
|
||||
"""A logging handler that puts logs into a queue."""
|
||||
|
||||
def __init__(self, log_queue):
|
||||
super().__init__()
|
||||
self.log_queue = log_queue
|
||||
|
||||
def emit(self, record):
|
||||
try:
|
||||
msg = self.format(record)
|
||||
self.log_queue.put(msg)
|
||||
except Exception:
|
||||
self.handleError(record)
|
||||
|
||||
|
||||
if sys.platform == 'win32':
|
||||
from openhands.runtime.utils.windows_bash import WindowsPowershellSession
|
||||
|
||||
@@ -846,6 +862,14 @@ if __name__ == '__main__':
|
||||
|
||||
write_profile(current_profile)
|
||||
|
||||
# Set up log capture
|
||||
log_queue: Queue = Queue()
|
||||
queue_handler = QueueHandler(log_queue)
|
||||
queue_handler.setFormatter(
|
||||
logging.Formatter('%(levelname)s:%(name)s:%(message)s')
|
||||
)
|
||||
mcp_router_logger.addHandler(queue_handler)
|
||||
|
||||
# Manually reload the profile and update the servers
|
||||
mcp_router.profile_manager.reload()
|
||||
servers_wait_for_update = mcp_router.get_unique_servers()
|
||||
@@ -854,9 +878,23 @@ if __name__ == '__main__':
|
||||
f'MCP router updated successfully with unique servers: {servers_wait_for_update}'
|
||||
)
|
||||
|
||||
return JSONResponse(
|
||||
status_code=200, content={'detail': 'MCP server updated successfully'}
|
||||
)
|
||||
# Wait for a short time to capture logs
|
||||
await asyncio.sleep(2)
|
||||
|
||||
# Collect logs
|
||||
captured_logs = []
|
||||
while not log_queue.empty():
|
||||
captured_logs.append(log_queue.get())
|
||||
|
||||
# Remove the handler
|
||||
mcp_router_logger.removeHandler(queue_handler)
|
||||
|
||||
# Return the result with captured logs
|
||||
return {
|
||||
'status': 'success',
|
||||
'servers_updated': servers_wait_for_update,
|
||||
'logs': captured_logs,
|
||||
}
|
||||
|
||||
@app.post('/upload_file')
|
||||
async def upload_file(
|
||||
|
||||
@@ -366,10 +366,20 @@ class ActionExecutionClient(Runtime):
|
||||
'POST',
|
||||
f'{self.action_execution_server_url}/update_mcp_server',
|
||||
json=stdio_tools,
|
||||
timeout=10,
|
||||
timeout=15, # Increased timeout to account for log collection
|
||||
)
|
||||
if response.status_code != 200:
|
||||
self.log('warning', f'Failed to update MCP server: {response.text}')
|
||||
else:
|
||||
# Process and log the captured logs from the response
|
||||
try:
|
||||
response_data = response.json()
|
||||
if 'logs' in response_data and response_data['logs']:
|
||||
self.log('info', 'MCP server logs:')
|
||||
for log_entry in response_data['logs']:
|
||||
self.log('info', f'MCP: {log_entry}')
|
||||
except Exception as e:
|
||||
self.log('warning', f'Failed to process MCP server logs: {str(e)}')
|
||||
|
||||
# No API key by default. Child runtime can override this when appropriate
|
||||
updated_mcp_config.sse_servers.append(
|
||||
|
||||
115
tests/unit/test_mcp_log_passing.py
Normal file
115
tests/unit/test_mcp_log_passing.py
Normal file
@@ -0,0 +1,115 @@
|
||||
import logging
|
||||
import unittest
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from openhands.runtime.action_execution_server import QueueHandler
|
||||
|
||||
|
||||
class TestMCPLogPassing(unittest.TestCase):
|
||||
"""Test the MCP log passing mechanism."""
|
||||
|
||||
def test_queue_handler(self):
|
||||
"""Test that the QueueHandler correctly captures logs."""
|
||||
from queue import Queue
|
||||
|
||||
# Create a queue and handler
|
||||
log_queue = Queue()
|
||||
handler = QueueHandler(log_queue)
|
||||
handler.setFormatter(logging.Formatter('%(levelname)s:%(name)s:%(message)s'))
|
||||
|
||||
# Create a logger and add the handler
|
||||
logger = logging.getLogger('test_logger')
|
||||
logger.setLevel(logging.DEBUG)
|
||||
logger.addHandler(handler)
|
||||
|
||||
# Log some messages
|
||||
logger.debug('Debug message')
|
||||
logger.info('Info message')
|
||||
logger.warning('Warning message')
|
||||
logger.error('Error message')
|
||||
|
||||
# Check that the messages were captured
|
||||
self.assertEqual(log_queue.qsize(), 4)
|
||||
self.assertEqual(log_queue.get(), 'DEBUG:test_logger:Debug message')
|
||||
self.assertEqual(log_queue.get(), 'INFO:test_logger:Info message')
|
||||
self.assertEqual(log_queue.get(), 'WARNING:test_logger:Warning message')
|
||||
self.assertEqual(log_queue.get(), 'ERROR:test_logger:Error message')
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_mcp_server_log_capture(self):
|
||||
"""Test that the update_mcp_server endpoint captures logs."""
|
||||
with patch(
|
||||
'openhands.runtime.action_execution_server.mcp_router'
|
||||
) as mock_router:
|
||||
# Mock the router and its methods
|
||||
mock_router.profile_manager = MagicMock()
|
||||
mock_router.get_unique_servers = MagicMock(
|
||||
return_value=['server1', 'server2']
|
||||
)
|
||||
mock_router.update_servers = AsyncMock()
|
||||
|
||||
# Mock the mcp_router_logger to simulate log messages
|
||||
with patch(
|
||||
'openhands.runtime.action_execution_server.mcp_router_logger'
|
||||
) as mock_logger:
|
||||
# Set up the mock logger to emit a log message when addHandler is called
|
||||
def add_handler_side_effect(handler):
|
||||
record = logging.LogRecord(
|
||||
'mcpm.router.router',
|
||||
logging.INFO,
|
||||
'',
|
||||
0,
|
||||
'Connected to server jetbrains with capabilities',
|
||||
(),
|
||||
None,
|
||||
)
|
||||
handler.emit(record)
|
||||
|
||||
error_record = logging.LogRecord(
|
||||
'mcpm.router.router',
|
||||
logging.ERROR,
|
||||
'',
|
||||
0,
|
||||
'Failed to add server jetbrains: No working IDE endpoint available',
|
||||
(),
|
||||
None,
|
||||
)
|
||||
handler.emit(error_record)
|
||||
|
||||
mock_logger.addHandler.side_effect = add_handler_side_effect
|
||||
|
||||
# Mock the request
|
||||
mock_request = MagicMock()
|
||||
mock_request.json = AsyncMock(return_value=[{'name': 'test_server'}])
|
||||
|
||||
# Mock file operations
|
||||
with patch('builtins.open', MagicMock()):
|
||||
with patch('json.load', MagicMock(return_value={'default': []})):
|
||||
with patch('json.dump', MagicMock()):
|
||||
with patch('os.path.exists', MagicMock(return_value=True)):
|
||||
# Import the function to test
|
||||
from openhands.runtime.action_execution_server import (
|
||||
update_mcp_server,
|
||||
)
|
||||
|
||||
# Call the function
|
||||
result = await update_mcp_server(mock_request)
|
||||
|
||||
# Check the result
|
||||
self.assertEqual(result['status'], 'success')
|
||||
self.assertEqual(
|
||||
result['servers_updated'], ['server1', 'server2']
|
||||
)
|
||||
self.assertEqual(len(result['logs']), 2)
|
||||
self.assertIn(
|
||||
'Connected to server jetbrains', result['logs'][0]
|
||||
)
|
||||
self.assertIn(
|
||||
'Failed to add server jetbrains', result['logs'][1]
|
||||
)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user