mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-05 04:15:08 -05:00
Compare commits
3 Commits
feature/vi
...
otto/secrt
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e5aad862ce | ||
|
|
4769a281cc | ||
|
|
96ca9daefe |
@@ -0,0 +1,123 @@
|
||||
"""Save binary block outputs to workspace, return references instead of base64."""
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import hashlib
|
||||
import logging
|
||||
import uuid
|
||||
from typing import Any
|
||||
|
||||
from backend.util.workspace import WorkspaceManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
BINARY_FIELDS = {"png", "jpeg", "pdf"} # Base64 encoded
|
||||
TEXT_FIELDS = {"svg"} # Large text, save raw
|
||||
SAVEABLE_FIELDS = BINARY_FIELDS | TEXT_FIELDS
|
||||
SIZE_THRESHOLD = 1024 # Only process content > 1KB (string length, not decoded size)
|
||||
|
||||
|
||||
async def process_binary_outputs(
|
||||
outputs: dict[str, list[Any]],
|
||||
workspace_manager: WorkspaceManager,
|
||||
block_name: str,
|
||||
) -> dict[str, list[Any]]:
|
||||
"""
|
||||
Replace binary data in block outputs with workspace:// references.
|
||||
|
||||
Deduplicates identical content within a single call (e.g., same PDF
|
||||
appearing in both main_result and results).
|
||||
"""
|
||||
cache: dict[str, str] = {} # content_hash -> workspace_ref
|
||||
|
||||
processed: dict[str, list[Any]] = {}
|
||||
for name, items in outputs.items():
|
||||
processed_items: list[Any] = []
|
||||
for item in items:
|
||||
processed_items.append(
|
||||
await _process_item(item, workspace_manager, block_name, cache)
|
||||
)
|
||||
processed[name] = processed_items
|
||||
return processed
|
||||
|
||||
|
||||
async def _process_item(
|
||||
item: Any, wm: WorkspaceManager, block: str, cache: dict
|
||||
) -> Any:
|
||||
"""Recursively process an item, handling dicts and lists."""
|
||||
if isinstance(item, dict):
|
||||
return await _process_dict(item, wm, block, cache)
|
||||
if isinstance(item, list):
|
||||
processed: list[Any] = []
|
||||
for i in item:
|
||||
processed.append(await _process_item(i, wm, block, cache))
|
||||
return processed
|
||||
return item
|
||||
|
||||
|
||||
async def _process_dict(
|
||||
data: dict, wm: WorkspaceManager, block: str, cache: dict
|
||||
) -> dict:
|
||||
"""Process a dict, saving binary fields and recursing into nested structures."""
|
||||
result: dict[str, Any] = {}
|
||||
|
||||
for key, value in data.items():
|
||||
if (
|
||||
key in SAVEABLE_FIELDS
|
||||
and isinstance(value, str)
|
||||
and len(value) > SIZE_THRESHOLD
|
||||
):
|
||||
content_hash = hashlib.sha256(value.encode()).hexdigest()
|
||||
|
||||
if content_hash in cache:
|
||||
result[key] = cache[content_hash]
|
||||
elif ref := await _save(value, key, wm, block):
|
||||
cache[content_hash] = ref
|
||||
result[key] = ref
|
||||
else:
|
||||
result[key] = value # Save failed, keep original
|
||||
|
||||
elif isinstance(value, dict):
|
||||
result[key] = await _process_dict(value, wm, block, cache)
|
||||
elif isinstance(value, list):
|
||||
processed: list[Any] = []
|
||||
for i in value:
|
||||
processed.append(await _process_item(i, wm, block, cache))
|
||||
result[key] = processed
|
||||
else:
|
||||
result[key] = value
|
||||
|
||||
return result
|
||||
|
||||
|
||||
async def _save(value: str, field: str, wm: WorkspaceManager, block: str) -> str | None:
|
||||
"""Save content to workspace, return workspace:// reference or None on failure."""
|
||||
try:
|
||||
if field in BINARY_FIELDS:
|
||||
content = _decode_base64(value)
|
||||
if content is None:
|
||||
return None
|
||||
else:
|
||||
content = value.encode("utf-8")
|
||||
|
||||
ext = {"jpeg": "jpg"}.get(field, field)
|
||||
filename = f"{block.lower().replace(' ', '_')[:20]}_{field}_{uuid.uuid4().hex[:12]}.{ext}"
|
||||
|
||||
file = await wm.write_file(content=content, filename=filename)
|
||||
return f"workspace://{file.id}"
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save {field} to workspace for block '{block}': {e}")
|
||||
return None
|
||||
|
||||
|
||||
def _decode_base64(value: str) -> bytes | None:
|
||||
"""Decode base64, handling data URI format. Returns None on failure."""
|
||||
try:
|
||||
if value.startswith("data:"):
|
||||
value = value.split(",", 1)[1] if "," in value else value
|
||||
# Normalize padding and use strict validation to prevent corrupted data
|
||||
padded = value + "=" * (-len(value) % 4)
|
||||
return base64.b64decode(padded, validate=True)
|
||||
except (binascii.Error, ValueError):
|
||||
return None
|
||||
@@ -8,12 +8,16 @@ from typing import Any
|
||||
from pydantic_core import PydanticUndefined
|
||||
|
||||
from backend.api.features.chat.model import ChatSession
|
||||
from backend.api.features.chat.tools.binary_output_processor import (
|
||||
process_binary_outputs,
|
||||
)
|
||||
from backend.data.block import get_block
|
||||
from backend.data.execution import ExecutionContext
|
||||
from backend.data.model import CredentialsMetaInput
|
||||
from backend.data.workspace import get_or_create_workspace
|
||||
from backend.integrations.creds_manager import IntegrationCredentialsManager
|
||||
from backend.util.exceptions import BlockError
|
||||
from backend.util.workspace import WorkspaceManager
|
||||
|
||||
from .base import BaseTool
|
||||
from .models import (
|
||||
@@ -321,11 +325,19 @@ class RunBlockTool(BaseTool):
|
||||
):
|
||||
outputs[output_name].append(output_data)
|
||||
|
||||
# Save binary outputs to workspace to prevent context bloat
|
||||
workspace_manager = WorkspaceManager(
|
||||
user_id, workspace.id, session.session_id
|
||||
)
|
||||
processed_outputs = await process_binary_outputs(
|
||||
dict(outputs), workspace_manager, block.name
|
||||
)
|
||||
|
||||
return BlockOutputResponse(
|
||||
message=f"Block '{block.name}' executed successfully",
|
||||
block_id=block_id,
|
||||
block_name=block.name,
|
||||
outputs=dict(outputs),
|
||||
outputs=processed_outputs,
|
||||
success=True,
|
||||
session_id=session_id,
|
||||
)
|
||||
|
||||
@@ -0,0 +1,92 @@
|
||||
import base64
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from backend.api.features.chat.tools.binary_output_processor import (
|
||||
_decode_base64,
|
||||
process_binary_outputs,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def workspace_manager():
|
||||
wm = AsyncMock()
|
||||
wm.write_file = AsyncMock(return_value=MagicMock(id="file-123"))
|
||||
return wm
|
||||
|
||||
|
||||
class TestDecodeBase64:
|
||||
def test_raw_base64(self):
|
||||
assert _decode_base64(base64.b64encode(b"test").decode()) == b"test"
|
||||
|
||||
def test_data_uri(self):
|
||||
encoded = base64.b64encode(b"test").decode()
|
||||
assert _decode_base64(f"data:image/png;base64,{encoded}") == b"test"
|
||||
|
||||
def test_invalid_returns_none(self):
|
||||
assert _decode_base64("not base64!!!") is None
|
||||
|
||||
|
||||
class TestProcessBinaryOutputs:
|
||||
@pytest.mark.asyncio
|
||||
async def test_saves_large_binary(self, workspace_manager):
|
||||
content = base64.b64encode(b"x" * 2000).decode()
|
||||
outputs = {"result": [{"png": content, "text": "ok"}]}
|
||||
|
||||
result = await process_binary_outputs(outputs, workspace_manager, "Test")
|
||||
|
||||
assert result["result"][0]["png"] == "workspace://file-123"
|
||||
assert result["result"][0]["text"] == "ok"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_small_content(self, workspace_manager):
|
||||
content = base64.b64encode(b"tiny").decode()
|
||||
outputs = {"result": [{"png": content}]}
|
||||
|
||||
result = await process_binary_outputs(outputs, workspace_manager, "Test")
|
||||
|
||||
assert result["result"][0]["png"] == content
|
||||
workspace_manager.write_file.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deduplicates_identical_content(self, workspace_manager):
|
||||
content = base64.b64encode(b"x" * 2000).decode()
|
||||
outputs = {"a": [{"pdf": content}], "b": [{"pdf": content}]}
|
||||
|
||||
result = await process_binary_outputs(outputs, workspace_manager, "Test")
|
||||
|
||||
assert result["a"][0]["pdf"] == result["b"][0]["pdf"] == "workspace://file-123"
|
||||
assert workspace_manager.write_file.call_count == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_failure_preserves_original(self, workspace_manager):
|
||||
workspace_manager.write_file.side_effect = Exception("Storage error")
|
||||
content = base64.b64encode(b"x" * 2000).decode()
|
||||
|
||||
result = await process_binary_outputs(
|
||||
{"r": [{"png": content}]}, workspace_manager, "Test"
|
||||
)
|
||||
|
||||
assert result["r"][0]["png"] == content
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handles_nested_structures(self, workspace_manager):
|
||||
content = base64.b64encode(b"x" * 2000).decode()
|
||||
outputs = {"result": [{"outer": {"inner": {"png": content}}}]}
|
||||
|
||||
result = await process_binary_outputs(outputs, workspace_manager, "Test")
|
||||
|
||||
assert result["result"][0]["outer"]["inner"]["png"] == "workspace://file-123"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handles_lists_in_output(self, workspace_manager):
|
||||
content = base64.b64encode(b"x" * 2000).decode()
|
||||
outputs = {"result": [{"images": [{"png": content}, {"png": content}]}]}
|
||||
|
||||
result = await process_binary_outputs(outputs, workspace_manager, "Test")
|
||||
|
||||
assert result["result"][0]["images"][0]["png"] == "workspace://file-123"
|
||||
assert result["result"][0]["images"][1]["png"] == "workspace://file-123"
|
||||
# Deduplication should still work
|
||||
assert workspace_manager.write_file.call_count == 1
|
||||
Reference in New Issue
Block a user