From 9077323b891495509f63637536e51788198cea8e Mon Sep 17 00:00:00 2001 From: Reinier van der Leer Date: Tue, 25 Mar 2025 16:56:16 +0100 Subject: [PATCH] fix(backend): Filter Redis messages by user ID (#9697) --- .../backend/backend/blocks/agent.py | 4 +++- .../backend/backend/data/execution.py | 18 ++++++++++-------- .../backend/backend/server/ws_api.py | 2 +- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/autogpt_platform/backend/backend/blocks/agent.py b/autogpt_platform/backend/backend/blocks/agent.py index 30e8743a6d..f278d039a2 100644 --- a/autogpt_platform/backend/backend/blocks/agent.py +++ b/autogpt_platform/backend/backend/blocks/agent.py @@ -88,7 +88,9 @@ class AgentExecutorBlock(Block): logger.info(f"Starting execution of {log_id}") for event in event_bus.listen( - graph_id=graph_exec.graph_id, graph_exec_id=graph_exec.graph_exec_id + user_id=graph_exec.user_id, + graph_id=graph_exec.graph_id, + graph_exec_id=graph_exec.graph_exec_id, ): logger.info( f"Execution {log_id} produced input {event.input_data} output {event.output_data}" diff --git a/autogpt_platform/backend/backend/data/execution.py b/autogpt_platform/backend/backend/data/execution.py index 397187d5f5..e4db2c94f7 100644 --- a/autogpt_platform/backend/backend/data/execution.py +++ b/autogpt_platform/backend/backend/data/execution.py @@ -579,13 +579,13 @@ class RedisExecutionEventBus(RedisEventBus[ExecutionResult]): return config.execution_event_bus_name def publish(self, res: ExecutionResult): - self.publish_event(res, f"{res.graph_id}/{res.graph_exec_id}") + self.publish_event(res, f"{res.user_id}/{res.graph_id}/{res.graph_exec_id}") def listen( - self, graph_id: str = "*", graph_exec_id: str = "*" + self, user_id: str, graph_id: str = "*", graph_exec_id: str = "*" ) -> Generator[ExecutionResult, None, None]: - for execution_result in self.listen_events(f"{graph_id}/{graph_exec_id}"): - yield execution_result + for event in self.listen_events(f"{user_id}/{graph_id}/{graph_exec_id}"): + yield event class AsyncRedisExecutionEventBus(AsyncRedisEventBus[ExecutionResult]): @@ -596,10 +596,12 @@ class AsyncRedisExecutionEventBus(AsyncRedisEventBus[ExecutionResult]): return config.execution_event_bus_name async def publish(self, res: ExecutionResult): - await self.publish_event(res, f"{res.graph_id}/{res.graph_exec_id}") + await self.publish_event( + res, f"{res.user_id}/{res.graph_id}/{res.graph_exec_id}" + ) async def listen( - self, graph_id: str = "*", graph_exec_id: str = "*" + self, user_id: str, graph_id: str = "*", graph_exec_id: str = "*" ) -> AsyncGenerator[ExecutionResult, None]: - async for execution_result in self.listen_events(f"{graph_id}/{graph_exec_id}"): - yield execution_result + async for event in self.listen_events(f"{user_id}/{graph_id}/{graph_exec_id}"): + yield event diff --git a/autogpt_platform/backend/backend/server/ws_api.py b/autogpt_platform/backend/backend/server/ws_api.py index d826ee8364..2bb3af7a2c 100644 --- a/autogpt_platform/backend/backend/server/ws_api.py +++ b/autogpt_platform/backend/backend/server/ws_api.py @@ -51,7 +51,7 @@ async def event_broadcaster(manager: ConnectionManager): try: redis.connect() event_queue = AsyncRedisExecutionEventBus() - async for event in event_queue.listen(): + async for event in event_queue.listen("*"): await manager.send_execution_result(event) except Exception as e: logger.exception(f"Event broadcaster error: {e}")