Compare commits

...

1 Commits

Author SHA1 Message Date
openhands
0d00d37897 Add MCP log passing mechanism to expose error logs from MCP router 2025-05-16 03:34:56 +00:00
3 changed files with 167 additions and 4 deletions

View File

@@ -19,6 +19,7 @@ import time
import traceback
from contextlib import asynccontextmanager
from pathlib import Path
from queue import Queue
from zipfile import ZipFile
from binaryornot.check import is_binary
@@ -77,6 +78,21 @@ from openhands.utils.async_utils import call_sync_from_async, wait_all
mcp_router_logger.setLevel(logger.getEffectiveLevel())
class QueueHandler(logging.Handler):
"""A logging handler that puts logs into a queue."""
def __init__(self, log_queue):
super().__init__()
self.log_queue = log_queue
def emit(self, record):
try:
msg = self.format(record)
self.log_queue.put(msg)
except Exception:
self.handleError(record)
if sys.platform == 'win32':
from openhands.runtime.utils.windows_bash import WindowsPowershellSession
@@ -846,6 +862,14 @@ if __name__ == '__main__':
write_profile(current_profile)
# Set up log capture
log_queue: Queue = Queue()
queue_handler = QueueHandler(log_queue)
queue_handler.setFormatter(
logging.Formatter('%(levelname)s:%(name)s:%(message)s')
)
mcp_router_logger.addHandler(queue_handler)
# Manually reload the profile and update the servers
mcp_router.profile_manager.reload()
servers_wait_for_update = mcp_router.get_unique_servers()
@@ -854,9 +878,23 @@ if __name__ == '__main__':
f'MCP router updated successfully with unique servers: {servers_wait_for_update}'
)
return JSONResponse(
status_code=200, content={'detail': 'MCP server updated successfully'}
)
# Wait for a short time to capture logs
await asyncio.sleep(2)
# Collect logs
captured_logs = []
while not log_queue.empty():
captured_logs.append(log_queue.get())
# Remove the handler
mcp_router_logger.removeHandler(queue_handler)
# Return the result with captured logs
return {
'status': 'success',
'servers_updated': servers_wait_for_update,
'logs': captured_logs,
}
@app.post('/upload_file')
async def upload_file(

View File

@@ -366,10 +366,20 @@ class ActionExecutionClient(Runtime):
'POST',
f'{self.action_execution_server_url}/update_mcp_server',
json=stdio_tools,
timeout=10,
timeout=15, # Increased timeout to account for log collection
)
if response.status_code != 200:
self.log('warning', f'Failed to update MCP server: {response.text}')
else:
# Process and log the captured logs from the response
try:
response_data = response.json()
if 'logs' in response_data and response_data['logs']:
self.log('info', 'MCP server logs:')
for log_entry in response_data['logs']:
self.log('info', f'MCP: {log_entry}')
except Exception as e:
self.log('warning', f'Failed to process MCP server logs: {str(e)}')
# No API key by default. Child runtime can override this when appropriate
updated_mcp_config.sse_servers.append(

View File

@@ -0,0 +1,115 @@
import logging
import unittest
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from openhands.runtime.action_execution_server import QueueHandler
class TestMCPLogPassing(unittest.TestCase):
"""Test the MCP log passing mechanism."""
def test_queue_handler(self):
"""Test that the QueueHandler correctly captures logs."""
from queue import Queue
# Create a queue and handler
log_queue = Queue()
handler = QueueHandler(log_queue)
handler.setFormatter(logging.Formatter('%(levelname)s:%(name)s:%(message)s'))
# Create a logger and add the handler
logger = logging.getLogger('test_logger')
logger.setLevel(logging.DEBUG)
logger.addHandler(handler)
# Log some messages
logger.debug('Debug message')
logger.info('Info message')
logger.warning('Warning message')
logger.error('Error message')
# Check that the messages were captured
self.assertEqual(log_queue.qsize(), 4)
self.assertEqual(log_queue.get(), 'DEBUG:test_logger:Debug message')
self.assertEqual(log_queue.get(), 'INFO:test_logger:Info message')
self.assertEqual(log_queue.get(), 'WARNING:test_logger:Warning message')
self.assertEqual(log_queue.get(), 'ERROR:test_logger:Error message')
@pytest.mark.asyncio
async def test_update_mcp_server_log_capture(self):
"""Test that the update_mcp_server endpoint captures logs."""
with patch(
'openhands.runtime.action_execution_server.mcp_router'
) as mock_router:
# Mock the router and its methods
mock_router.profile_manager = MagicMock()
mock_router.get_unique_servers = MagicMock(
return_value=['server1', 'server2']
)
mock_router.update_servers = AsyncMock()
# Mock the mcp_router_logger to simulate log messages
with patch(
'openhands.runtime.action_execution_server.mcp_router_logger'
) as mock_logger:
# Set up the mock logger to emit a log message when addHandler is called
def add_handler_side_effect(handler):
record = logging.LogRecord(
'mcpm.router.router',
logging.INFO,
'',
0,
'Connected to server jetbrains with capabilities',
(),
None,
)
handler.emit(record)
error_record = logging.LogRecord(
'mcpm.router.router',
logging.ERROR,
'',
0,
'Failed to add server jetbrains: No working IDE endpoint available',
(),
None,
)
handler.emit(error_record)
mock_logger.addHandler.side_effect = add_handler_side_effect
# Mock the request
mock_request = MagicMock()
mock_request.json = AsyncMock(return_value=[{'name': 'test_server'}])
# Mock file operations
with patch('builtins.open', MagicMock()):
with patch('json.load', MagicMock(return_value={'default': []})):
with patch('json.dump', MagicMock()):
with patch('os.path.exists', MagicMock(return_value=True)):
# Import the function to test
from openhands.runtime.action_execution_server import (
update_mcp_server,
)
# Call the function
result = await update_mcp_server(mock_request)
# Check the result
self.assertEqual(result['status'], 'success')
self.assertEqual(
result['servers_updated'], ['server1', 'server2']
)
self.assertEqual(len(result['logs']), 2)
self.assertIn(
'Connected to server jetbrains', result['logs'][0]
)
self.assertIn(
'Failed to add server jetbrains', result['logs'][1]
)
if __name__ == '__main__':
unittest.main()