diff --git a/openhands/utils/async_utils.py b/openhands/utils/async_utils.py index 678f486c23..e2c92ed772 100644 --- a/openhands/utils/async_utils.py +++ b/openhands/utils/async_utils.py @@ -1,6 +1,5 @@ import asyncio -from concurrent import futures -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, wait from typing import Callable, Coroutine, Iterable GENERAL_TIMEOUT: int = 15 @@ -24,32 +23,38 @@ def call_async_from_sync( """Shorthand for running a coroutine in the default background thread pool executor and awaiting the result """ + """Run a coroutine in a thread-safe way, handling existing event loops.""" if corofn is None: raise ValueError('corofn is None') if not asyncio.iscoroutinefunction(corofn): raise ValueError('corofn is not a coroutine function') async def arun(): - coro = corofn(*args, **kwargs) - result = await coro - return result + return await corofn(*args, **kwargs) - def run(): - loop_for_thread = asyncio.new_event_loop() + def run_in_thread(): + """Run coroutine in a separate thread with its own event loop.""" + loop = asyncio.new_event_loop() try: - asyncio.set_event_loop(loop_for_thread) - return asyncio.run(arun()) + asyncio.set_event_loop(loop) + return loop.run_until_complete(arun()) finally: - loop_for_thread.close() + loop.close() + # If executor is shut down, just run in this thread if getattr(EXECUTOR, '_shutdown', False): - result = run() - return result + return run_in_thread() - future = EXECUTOR.submit(run) - futures.wait([future], timeout=timeout or None) - result = future.result() - return result + try: + # Check if there’s a running loop in this thread + asyncio.get_running_loop() + # Loop is running → run coroutine in executor thread + future = EXECUTOR.submit(run_in_thread) + wait([future], timeout=timeout or None) + return future.result() + except RuntimeError: + # No loop running → safe to run in this thread + return run_in_thread() async def call_coro_in_bg_thread(