Files
AutoGPT/autogpt_platform/backend/test/server/test_ws_api.py
Reinier van der Leer 1f2af18388 feat(platform/library): Real-time execution updates (#9695)
- Resolves #8782

### Changes 🏗️

- feat(frontend/library): Use WS subscription to get real-time execution
updates
- feat(backend/ws_api): Send `GraphExecutionUpdate` on all new agent I/O
- Include agent I/O in `GraphExecutionUpdate` (by subclassing
`GraphExecution`)
    - Add `IO_BLOCK_IDs` to `.blocks.io`
- feat(backend/ws_api): Add `subscribe_graph_executions` method to
WebSocket API

- feat(backend): Withhold `GraphExecution.node_executions` from requests
by non-graph-owners
  - Split `GraphExecutionWithNodes` off of `GraphExecution`
- Use `GraphExecution` as much as possible, as it's a much cheaper query
than `GraphExecutionWithNodes`
  - refactor(frontend): Make `GraphExecution.node_executions` optional

- fix(frontend): Parse dates in responses of `/executions` and
`/graphs/{graph_id}/executions`

- refactor(frontend/library): Move sorting logic for agent runs list
from `AgentRunsPage` to `AgentRunsSelectorList`

- refactor(backend/ws_api): Clean up message handler implementations

- refactor(backend/tests): Use `.data.execution.get_graph_execution(..)`
directly instead of `AgentServer.test_get_graph_run_results(..)`

Out-of-scope changes:
- refactor(backend): Remove unnecessary query include from
`.data.graph.get_graph_metadata(..)`

Demo:


https://github.com/user-attachments/assets/8ea6225d-7334-49cb-a522-83f153d840da

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - Go to `/library/agents/[id]` for an agent with inputs and outputs
    - Draft and run a new run
      - [x] -> should appear in the list of runs at the top
      - [x] -> should be selected as soon as the request finishes
      - [x] -> new I/O should appear as it is generated
- [x] -> status should be updated in real-time (both in list and in
adjacent details view)
    - Click "Run again"
      - [x] -> should appear in the list of runs at the top
      - [x] -> should be selected as soon as the request finishes
      - [x] -> new I/O should appear as it is generated
- [x] -> status should be updated in real-time (both in list and in
adjacent details view)
- Click "Open in builder" under "Agent actions"; run the agent from the
builder
      - [x] -> should work the same as before
        - [x] -> node I/O should appear in real-time
        - [x] -> node execution statuses should update in real-time
2025-03-28 12:19:14 +00:00

209 lines
6.7 KiB
Python

from typing import cast
from unittest.mock import AsyncMock
import pytest
from fastapi import WebSocket, WebSocketDisconnect
from backend.data.user import DEFAULT_USER_ID
from backend.server.conn_manager import ConnectionManager
from backend.server.ws_api import (
WSMessage,
WSMethod,
handle_subscribe,
handle_unsubscribe,
websocket_router,
)
@pytest.fixture
def mock_websocket() -> AsyncMock:
return AsyncMock(spec=WebSocket)
@pytest.fixture
def mock_manager() -> AsyncMock:
return AsyncMock(spec=ConnectionManager)
@pytest.mark.asyncio
async def test_websocket_router_subscribe(
mock_websocket: AsyncMock, mock_manager: AsyncMock
) -> None:
mock_websocket.receive_text.side_effect = [
WSMessage(
method=WSMethod.SUBSCRIBE_GRAPH_EXEC,
data={"graph_exec_id": "test-graph-exec-1"},
).model_dump_json(),
WebSocketDisconnect(),
]
mock_manager.subscribe_graph_exec.return_value = (
f"{DEFAULT_USER_ID}|graph_exec#test-graph-exec-1"
)
await websocket_router(
cast(WebSocket, mock_websocket), cast(ConnectionManager, mock_manager)
)
mock_manager.connect_socket.assert_called_once_with(mock_websocket)
mock_manager.subscribe_graph_exec.assert_called_once_with(
user_id=DEFAULT_USER_ID,
graph_exec_id="test-graph-exec-1",
websocket=mock_websocket,
)
mock_websocket.send_text.assert_called_once()
assert (
'"method":"subscribe_graph_execution"'
in mock_websocket.send_text.call_args[0][0]
)
assert '"success":true' in mock_websocket.send_text.call_args[0][0]
mock_manager.disconnect_socket.assert_called_once_with(mock_websocket)
@pytest.mark.asyncio
async def test_websocket_router_unsubscribe(
mock_websocket: AsyncMock, mock_manager: AsyncMock
) -> None:
mock_websocket.receive_text.side_effect = [
WSMessage(
method=WSMethod.UNSUBSCRIBE,
data={"graph_exec_id": "test-graph-exec-1"},
).model_dump_json(),
WebSocketDisconnect(),
]
mock_manager.unsubscribe_graph_exec.return_value = (
f"{DEFAULT_USER_ID}|graph_exec#test-graph-exec-1"
)
await websocket_router(
cast(WebSocket, mock_websocket), cast(ConnectionManager, mock_manager)
)
mock_manager.connect_socket.assert_called_once_with(mock_websocket)
mock_manager.unsubscribe_graph_exec.assert_called_once_with(
user_id=DEFAULT_USER_ID,
graph_exec_id="test-graph-exec-1",
websocket=mock_websocket,
)
mock_websocket.send_text.assert_called_once()
assert '"method":"unsubscribe"' in mock_websocket.send_text.call_args[0][0]
assert '"success":true' in mock_websocket.send_text.call_args[0][0]
mock_manager.disconnect_socket.assert_called_once_with(mock_websocket)
@pytest.mark.asyncio
async def test_websocket_router_invalid_method(
mock_websocket: AsyncMock, mock_manager: AsyncMock
) -> None:
mock_websocket.receive_text.side_effect = [
WSMessage(method=WSMethod.GRAPH_EXECUTION_EVENT).model_dump_json(),
WebSocketDisconnect(),
]
await websocket_router(
cast(WebSocket, mock_websocket), cast(ConnectionManager, mock_manager)
)
mock_manager.connect_socket.assert_called_once_with(mock_websocket)
mock_websocket.send_text.assert_called_once()
assert '"method":"error"' in mock_websocket.send_text.call_args[0][0]
assert '"success":false' in mock_websocket.send_text.call_args[0][0]
mock_manager.disconnect_socket.assert_called_once_with(mock_websocket)
@pytest.mark.asyncio
async def test_handle_subscribe_success(
mock_websocket: AsyncMock, mock_manager: AsyncMock
) -> None:
message = WSMessage(
method=WSMethod.SUBSCRIBE_GRAPH_EXEC,
data={"graph_exec_id": "test-graph-exec-id"},
)
mock_manager.subscribe_graph_exec.return_value = (
"user-1|graph_exec#test-graph-exec-id"
)
await handle_subscribe(
connection_manager=cast(ConnectionManager, mock_manager),
websocket=cast(WebSocket, mock_websocket),
user_id="user-1",
message=message,
)
mock_manager.subscribe_graph_exec.assert_called_once_with(
user_id="user-1",
graph_exec_id="test-graph-exec-id",
websocket=mock_websocket,
)
mock_websocket.send_text.assert_called_once()
assert (
'"method":"subscribe_graph_execution"'
in mock_websocket.send_text.call_args[0][0]
)
assert '"success":true' in mock_websocket.send_text.call_args[0][0]
@pytest.mark.asyncio
async def test_handle_subscribe_missing_data(
mock_websocket: AsyncMock, mock_manager: AsyncMock
) -> None:
message = WSMessage(method=WSMethod.SUBSCRIBE_GRAPH_EXEC)
await handle_subscribe(
connection_manager=cast(ConnectionManager, mock_manager),
websocket=cast(WebSocket, mock_websocket),
user_id="user-1",
message=message,
)
mock_manager.subscribe_graph_exec.assert_not_called()
mock_websocket.send_text.assert_called_once()
assert '"method":"error"' in mock_websocket.send_text.call_args[0][0]
assert '"success":false' in mock_websocket.send_text.call_args[0][0]
@pytest.mark.asyncio
async def test_handle_unsubscribe_success(
mock_websocket: AsyncMock, mock_manager: AsyncMock
) -> None:
message = WSMessage(
method=WSMethod.UNSUBSCRIBE, data={"graph_exec_id": "test-graph-exec-id"}
)
mock_manager.unsubscribe_graph_exec.return_value = (
"user-1|graph_exec#test-graph-exec-id"
)
await handle_unsubscribe(
connection_manager=cast(ConnectionManager, mock_manager),
websocket=cast(WebSocket, mock_websocket),
user_id="user-1",
message=message,
)
mock_manager.unsubscribe_graph_exec.assert_called_once_with(
user_id="user-1",
graph_exec_id="test-graph-exec-id",
websocket=mock_websocket,
)
mock_websocket.send_text.assert_called_once()
assert '"method":"unsubscribe"' in mock_websocket.send_text.call_args[0][0]
assert '"success":true' in mock_websocket.send_text.call_args[0][0]
@pytest.mark.asyncio
async def test_handle_unsubscribe_missing_data(
mock_websocket: AsyncMock, mock_manager: AsyncMock
) -> None:
message = WSMessage(method=WSMethod.UNSUBSCRIBE)
await handle_unsubscribe(
connection_manager=cast(ConnectionManager, mock_manager),
websocket=cast(WebSocket, mock_websocket),
user_id="user-1",
message=message,
)
mock_manager._unsubscribe.assert_not_called()
mock_websocket.send_text.assert_called_once()
assert '"method":"error"' in mock_websocket.send_text.call_args[0][0]
assert '"success":false' in mock_websocket.send_text.call_args[0][0]