Merge branch 'master' into dev

This commit is contained in:
Reinier van der Leer
2025-03-25 17:00:53 +01:00
4 changed files with 14 additions and 13 deletions

View File

@@ -2,9 +2,6 @@
If you are reading this, you are probably looking for the full **[contribution guide]**,
which is part of our [wiki].
Also check out our [🚀 Roadmap][roadmap] for information about our priorities and associated tasks.
<!-- You can find our immediate priorities and their progress on our public [kanban board]. -->
[contribution guide]: https://github.com/Significant-Gravitas/AutoGPT/wiki/Contributing
[wiki]: https://github.com/Significant-Gravitas/AutoGPT/wiki
[roadmap]: https://github.com/Significant-Gravitas/AutoGPT/discussions/6971

View File

@@ -90,7 +90,9 @@ class AgentExecutorBlock(Block):
logger.info(f"Starting execution of {log_id}")
for event in event_bus.listen(
graph_id=graph_exec.graph_id, graph_exec_id=graph_exec.graph_exec_id
user_id=graph_exec.user_id,
graph_id=graph_exec.graph_id,
graph_exec_id=graph_exec.graph_exec_id,
):
if event.event_type == ExecutionEventType.GRAPH_EXEC_UPDATE:
if event.status in [

View File

@@ -808,16 +808,16 @@ class RedisExecutionEventBus(RedisEventBus[ExecutionEvent]):
def publish_node_exec_update(self, res: NodeExecutionResult):
event = NodeExecutionEvent.model_validate(res.model_dump())
self.publish_event(event, f"{res.graph_id}/{res.graph_exec_id}")
self.publish_event(event, f"{res.user_id}/{res.graph_id}/{res.graph_exec_id}")
def publish_graph_exec_update(self, res: GraphExecutionMeta):
event = GraphExecutionEvent.model_validate(res.model_dump())
self.publish_event(event, f"{res.graph_id}/{res.id}")
self.publish_event(event, f"{res.user_id}/{res.graph_id}/{res.id}")
def listen(
self, graph_id: str = "*", graph_exec_id: str = "*"
self, user_id: str, graph_id: str = "*", graph_exec_id: str = "*"
) -> Generator[ExecutionEvent, None, None]:
for event in self.listen_events(f"{graph_id}/{graph_exec_id}"):
for event in self.listen_events(f"{user_id}/{graph_id}/{graph_exec_id}"):
yield event
@@ -836,14 +836,16 @@ class AsyncRedisExecutionEventBus(AsyncRedisEventBus[ExecutionEvent]):
async def publish_node_exec_update(self, res: NodeExecutionResult):
event = NodeExecutionEvent.model_validate(res.model_dump())
await self.publish_event(event, f"{res.graph_id}/{res.graph_exec_id}")
await self.publish_event(
event, f"{res.user_id}/{res.graph_id}/{res.graph_exec_id}"
)
async def publish_graph_exec_update(self, res: GraphExecutionMeta):
event = GraphExecutionEvent.model_validate(res.model_dump())
await self.publish_event(event, f"{res.graph_id}/{res.id}")
await self.publish_event(event, f"{res.user_id}/{res.graph_id}/{res.id}")
async def listen(
self, graph_id: str = "*", graph_exec_id: str = "*"
self, user_id: str, graph_id: str = "*", graph_exec_id: str = "*"
) -> AsyncGenerator[ExecutionEvent, None]:
async for event in self.listen_events(f"{graph_id}/{graph_exec_id}"):
async for event in self.listen_events(f"{user_id}/{graph_id}/{graph_exec_id}"):
yield event

View File

@@ -51,7 +51,7 @@ async def event_broadcaster(manager: ConnectionManager):
try:
redis.connect()
event_queue = AsyncRedisExecutionEventBus()
async for event in event_queue.listen():
async for event in event_queue.listen("*"):
await manager.send_execution_update(event)
except Exception as e:
logger.exception(f"Event broadcaster error: {e}")