fix(backend): Filter Redis messages by user ID (#9697)

This commit is contained in:
Reinier van der Leer
2025-03-25 16:56:16 +01:00
committed by GitHub
parent 66ebe4376e
commit 9077323b89
3 changed files with 14 additions and 10 deletions

View File

@@ -88,7 +88,9 @@ class AgentExecutorBlock(Block):
logger.info(f"Starting execution of {log_id}")
for event in event_bus.listen(
graph_id=graph_exec.graph_id, graph_exec_id=graph_exec.graph_exec_id
user_id=graph_exec.user_id,
graph_id=graph_exec.graph_id,
graph_exec_id=graph_exec.graph_exec_id,
):
logger.info(
f"Execution {log_id} produced input {event.input_data} output {event.output_data}"

View File

@@ -579,13 +579,13 @@ class RedisExecutionEventBus(RedisEventBus[ExecutionResult]):
return config.execution_event_bus_name
def publish(self, res: ExecutionResult):
self.publish_event(res, f"{res.graph_id}/{res.graph_exec_id}")
self.publish_event(res, f"{res.user_id}/{res.graph_id}/{res.graph_exec_id}")
def listen(
self, graph_id: str = "*", graph_exec_id: str = "*"
self, user_id: str, graph_id: str = "*", graph_exec_id: str = "*"
) -> Generator[ExecutionResult, None, None]:
for execution_result in self.listen_events(f"{graph_id}/{graph_exec_id}"):
yield execution_result
for event in self.listen_events(f"{user_id}/{graph_id}/{graph_exec_id}"):
yield event
class AsyncRedisExecutionEventBus(AsyncRedisEventBus[ExecutionResult]):
@@ -596,10 +596,12 @@ class AsyncRedisExecutionEventBus(AsyncRedisEventBus[ExecutionResult]):
return config.execution_event_bus_name
async def publish(self, res: ExecutionResult):
await self.publish_event(res, f"{res.graph_id}/{res.graph_exec_id}")
await self.publish_event(
res, f"{res.user_id}/{res.graph_id}/{res.graph_exec_id}"
)
async def listen(
self, graph_id: str = "*", graph_exec_id: str = "*"
self, user_id: str, graph_id: str = "*", graph_exec_id: str = "*"
) -> AsyncGenerator[ExecutionResult, None]:
async for execution_result in self.listen_events(f"{graph_id}/{graph_exec_id}"):
yield execution_result
async for event in self.listen_events(f"{user_id}/{graph_id}/{graph_exec_id}"):
yield event

View File

@@ -51,7 +51,7 @@ async def event_broadcaster(manager: ConnectionManager):
try:
redis.connect()
event_queue = AsyncRedisExecutionEventBus()
async for event in event_queue.listen():
async for event in event_queue.listen("*"):
await manager.send_execution_result(event)
except Exception as e:
logger.exception(f"Event broadcaster error: {e}")