mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
fix(backend): address autogpt-pr-reviewer-in-dev blockers
- Add WriteWorkspaceFileTool tests for quota, virus, scan, and conflict error handling (the PR's stated motivation was untested) - Add ZeroDivisionError guard on storage_limit in quota check - Assert scan_content_safe not called on quota rejection (fast-path) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -623,3 +623,73 @@ async def test_read_workspace_file_no_fallback_when_resolve_succeeds(setup_test_
|
||||
# Normal workspace path must have produced a content response.
|
||||
assert isinstance(result, WorkspaceFileContentResponse)
|
||||
assert base64.b64decode(result.content_base64) == fake_content
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# WriteWorkspaceFileTool exception handling (quota, virus, scan errors)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio(loop_scope="session")
|
||||
class TestWriteWorkspaceFileToolErrorHandling:
|
||||
"""Verify WriteWorkspaceFileTool returns proper ErrorResponse for exceptions."""
|
||||
|
||||
async def _execute_write(self, side_effect, setup_test_data):
|
||||
"""Helper: run WriteWorkspaceFileTool with mocked write_file."""
|
||||
user = setup_test_data["user"]
|
||||
session = make_session(user.id)
|
||||
|
||||
mock_manager = AsyncMock()
|
||||
mock_manager.write_file = AsyncMock(side_effect=side_effect)
|
||||
|
||||
with patch(
|
||||
"backend.copilot.tools.workspace_files.get_workspace_manager",
|
||||
AsyncMock(return_value=mock_manager),
|
||||
):
|
||||
tool = WriteWorkspaceFileTool()
|
||||
return await tool._execute(
|
||||
user_id=user.id,
|
||||
session=session,
|
||||
filename="test.txt",
|
||||
content="hello",
|
||||
)
|
||||
|
||||
async def test_quota_exceeded_returns_error_response(self, setup_test_data):
|
||||
"""Quota exceeded (ValueError) → ErrorResponse with storage message."""
|
||||
result = await self._execute_write(
|
||||
ValueError("Storage limit exceeded: 250 MB used of 250 MB (100.0%)"),
|
||||
setup_test_data,
|
||||
)
|
||||
assert isinstance(result, ErrorResponse)
|
||||
assert "Storage limit exceeded" in result.message
|
||||
|
||||
async def test_virus_detected_returns_error_response(self, setup_test_data):
|
||||
"""VirusDetectedError → ErrorResponse with virus message."""
|
||||
from backend.api.features.store.exceptions import VirusDetectedError
|
||||
|
||||
result = await self._execute_write(
|
||||
VirusDetectedError("Eicar-Test-Signature"),
|
||||
setup_test_data,
|
||||
)
|
||||
assert isinstance(result, ErrorResponse)
|
||||
assert "Virus detected" in result.message
|
||||
|
||||
async def test_virus_scan_error_returns_error_response(self, setup_test_data):
|
||||
"""VirusScanError (infra failure) → ErrorResponse with scan message."""
|
||||
from backend.api.features.store.exceptions import VirusScanError
|
||||
|
||||
result = await self._execute_write(
|
||||
VirusScanError("ClamAV connection refused"),
|
||||
setup_test_data,
|
||||
)
|
||||
assert isinstance(result, ErrorResponse)
|
||||
assert "ClamAV" in result.message
|
||||
|
||||
async def test_file_conflict_returns_error_response(self, setup_test_data):
|
||||
"""File path conflict (ValueError) → ErrorResponse."""
|
||||
result = await self._execute_write(
|
||||
ValueError("File already exists at path: /sessions/abc/test.txt"),
|
||||
setup_test_data,
|
||||
)
|
||||
assert isinstance(result, ErrorResponse)
|
||||
assert "already exists" in result.message
|
||||
|
||||
@@ -210,13 +210,13 @@ class WorkspaceManager:
|
||||
current_usage = max(0, current_usage - existing.size_bytes)
|
||||
|
||||
projected_usage = current_usage + len(content)
|
||||
if projected_usage > storage_limit:
|
||||
if storage_limit > 0 and projected_usage > storage_limit:
|
||||
used_pct = (current_usage / storage_limit) * 100
|
||||
raise ValueError(
|
||||
f"Storage limit exceeded: {current_usage:,} bytes used "
|
||||
f"of {storage_limit:,} bytes ({used_pct:.1f}%)"
|
||||
)
|
||||
if projected_usage / storage_limit >= 0.8:
|
||||
if storage_limit > 0 and projected_usage / storage_limit >= 0.8:
|
||||
logger.warning(
|
||||
f"User {self.user_id} workspace storage at "
|
||||
f"{projected_usage / storage_limit * 100:.1f}% "
|
||||
|
||||
@@ -186,7 +186,9 @@ async def test_write_file_quota_exceeded_raises_value_error(
|
||||
return_value=mock_storage,
|
||||
),
|
||||
patch("backend.util.workspace.workspace_db", return_value=mock_db),
|
||||
patch("backend.util.workspace.scan_content_safe", new_callable=AsyncMock),
|
||||
patch(
|
||||
"backend.util.workspace.scan_content_safe", new_callable=AsyncMock
|
||||
) as mock_scan,
|
||||
patch(
|
||||
"backend.util.workspace.get_workspace_storage_limit_bytes",
|
||||
return_value=250 * 1024 * 1024, # 250 MB limit
|
||||
@@ -199,6 +201,8 @@ async def test_write_file_quota_exceeded_raises_value_error(
|
||||
with pytest.raises(ValueError, match="Storage limit exceeded"):
|
||||
await manager.write_file(filename="test.txt", content=b"hello")
|
||||
|
||||
# Quota rejection should short-circuit before expensive virus scan
|
||||
mock_scan.assert_not_called()
|
||||
# Storage should NOT have been written to
|
||||
mock_storage.store.assert_not_called()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user