diff --git a/autogpt_platform/autogpt_libs/autogpt_libs/utils/cache.py b/autogpt_platform/autogpt_libs/autogpt_libs/utils/cache.py index 3506290391..2e9a7815a0 100644 --- a/autogpt_platform/autogpt_libs/autogpt_libs/utils/cache.py +++ b/autogpt_platform/autogpt_libs/autogpt_libs/utils/cache.py @@ -25,7 +25,7 @@ def thread_cached( cache = getattr(thread_local, "cache", None) if cache is None: cache = thread_local.cache = {} - key = (func, args, tuple(sorted(kwargs.items()))) + key = (args, tuple(sorted(kwargs.items()))) if key not in cache: cache[key] = await cast(Callable[P, Awaitable[R]], func)( *args, **kwargs @@ -40,7 +40,7 @@ def thread_cached( if cache is None: cache = thread_local.cache = {} # Include function in the key to prevent collisions between different functions - key = (func, args, tuple(sorted(kwargs.items()))) + key = (args, tuple(sorted(kwargs.items()))) if key not in cache: cache[key] = func(*args, **kwargs) return cache[key] diff --git a/autogpt_platform/backend/backend/executor/manager.py b/autogpt_platform/backend/backend/executor/manager.py index a266c5b3d6..b5686b5aab 100644 --- a/autogpt_platform/backend/backend/executor/manager.py +++ b/autogpt_platform/backend/backend/executor/manager.py @@ -1015,9 +1015,13 @@ class ExecutionManager(AppProcess): logger.error( f"[{self.service_name}] Execution for {graph_exec_id} failed: {f.exception()}" ) - channel.basic_nack(delivery_tag, requeue=False) + channel.connection.add_callback_threadsafe( + lambda: channel.basic_nack(delivery_tag, requeue=False) + ) else: - channel.basic_ack(delivery_tag) + channel.connection.add_callback_threadsafe( + lambda: channel.basic_ack(delivery_tag) + ) except Exception as e: logger.error(f"[{self.service_name}] Error acknowledging message: {e}") diff --git a/autogpt_platform/backend/backend/server/routers/v1.py b/autogpt_platform/backend/backend/server/routers/v1.py index 54443a596b..c4f343ba67 100644 --- a/autogpt_platform/backend/backend/server/routers/v1.py +++ b/autogpt_platform/backend/backend/server/routers/v1.py @@ -612,8 +612,12 @@ async def execute_graph( user_id=user_id, preset_id=preset_id, ) - execution_utils.get_execution_event_bus().publish(graph_exec) - execution_utils.get_execution_queue().publish_message( + + bus = execution_event_bus() + await bus.publish(graph_exec) + + queue = await execution_queue_client() + await queue.publish_message( routing_key=execution_utils.GRAPH_EXECUTION_ROUTING_KEY, message=graph_exec.to_graph_execution_entry().model_dump_json(), exchange=execution_utils.GRAPH_EXECUTION_EXCHANGE,