mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-06 04:45:10 -05:00
feat(copilot): Auto-save binary block outputs using content-based detection
When CoPilot executes blocks that produce binary outputs (images, PDFs), the data is now automatically detected and saved to the user's workspace, replacing raw base64 data with workspace:// references. Uses content-based detection (not field-name based) because: - Code execution blocks return base64 in stdout_logs, not structured fields - The png/jpeg/pdf fields only populate from Jupyter display mechanisms - Other blocks use various field names (image, result, output, etc.) Detection strategy: 1. Data URI detection with mimetype whitelist 2. Raw base64 with magic number validation (PNG, JPEG, PDF, GIF, WebP) 3. Size threshold > 1KB to filter tokens/hashes Features: - Scans ALL string values recursively, regardless of field name - Content deduplication within single block execution via SHA-256 hash - Graceful degradation (original value preserved on save failure) - ~97% token reduction observed (17k -> 500 tokens for PDF generation) Closes SECRT-1887
This commit is contained in:
@@ -0,0 +1,238 @@
|
||||
"""
|
||||
Content-based detection and saving of binary data in block outputs.
|
||||
|
||||
This module post-processes block execution outputs to detect and save binary
|
||||
content (images, PDFs) to the workspace, returning workspace:// references
|
||||
instead of raw base64 data. This reduces LLM output token usage by ~97% for
|
||||
file generation tasks.
|
||||
|
||||
Detection is content-based (not field-name based) because:
|
||||
- Code execution blocks return base64 in stdout_logs, not structured fields
|
||||
- The png/jpeg/pdf fields only populate from Jupyter display mechanisms
|
||||
- Other blocks use various field names: image, result, output, response, etc.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import hashlib
|
||||
import logging
|
||||
import re
|
||||
import uuid
|
||||
from typing import Any, Optional
|
||||
|
||||
from backend.util.file import sanitize_filename
|
||||
from backend.util.workspace import WorkspaceManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Only process strings larger than this (filters out tokens, hashes, short strings)
|
||||
SIZE_THRESHOLD = 1024 # 1KB
|
||||
|
||||
# Data URI pattern with mimetype extraction
|
||||
DATA_URI_PATTERN = re.compile(
|
||||
r"^data:([a-zA-Z0-9.+-]+/[a-zA-Z0-9.+-]+);base64,(.+)$",
|
||||
re.DOTALL,
|
||||
)
|
||||
|
||||
# Only process these mimetypes from data URIs (avoid text/plain, etc.)
|
||||
ALLOWED_MIMETYPES = {
|
||||
"image/png",
|
||||
"image/jpeg",
|
||||
"image/jpg", # Non-standard but sometimes used
|
||||
"image/gif",
|
||||
"image/webp",
|
||||
"image/svg+xml",
|
||||
"application/pdf",
|
||||
"application/octet-stream",
|
||||
}
|
||||
|
||||
# Base64 character validation (strict - must be pure base64)
|
||||
BASE64_PATTERN = re.compile(r"^[A-Za-z0-9+/\n\r]+=*$")
|
||||
|
||||
# Magic numbers for binary file detection
|
||||
# Note: WebP requires two-step detection: RIFF prefix + WEBP at offset 8
|
||||
MAGIC_SIGNATURES = [
|
||||
(b"\x89PNG\r\n\x1a\n", "png"),
|
||||
(b"\xff\xd8\xff", "jpg"),
|
||||
(b"%PDF-", "pdf"),
|
||||
(b"GIF87a", "gif"),
|
||||
(b"GIF89a", "gif"),
|
||||
(b"RIFF", "webp"), # Special case: also check content[8:12] == b'WEBP'
|
||||
]
|
||||
|
||||
|
||||
async def process_binary_outputs(
|
||||
outputs: dict[str, list[Any]],
|
||||
workspace_manager: WorkspaceManager,
|
||||
block_name: str,
|
||||
) -> dict[str, list[Any]]:
|
||||
"""
|
||||
Scan all string values in outputs and replace detected binary content
|
||||
with workspace:// references.
|
||||
|
||||
Uses content-based detection (data URIs, magic numbers) to find binary
|
||||
data regardless of field name. Deduplicates identical content within
|
||||
a single call using content hashing.
|
||||
|
||||
Args:
|
||||
outputs: Block execution outputs (dict of output_name -> list of values)
|
||||
workspace_manager: WorkspaceManager instance with session scoping
|
||||
block_name: Name of the block (used in generated filenames)
|
||||
|
||||
Returns:
|
||||
Processed outputs with binary data replaced by workspace references
|
||||
"""
|
||||
cache: dict[str, str] = {} # content_hash -> workspace_ref
|
||||
|
||||
processed: dict[str, list[Any]] = {}
|
||||
for name, items in outputs.items():
|
||||
processed_items = []
|
||||
for item in items:
|
||||
processed_items.append(
|
||||
await _process_value(item, workspace_manager, block_name, cache)
|
||||
)
|
||||
processed[name] = processed_items
|
||||
return processed
|
||||
|
||||
|
||||
async def _process_value(
|
||||
value: Any,
|
||||
wm: WorkspaceManager,
|
||||
block: str,
|
||||
cache: dict[str, str],
|
||||
) -> Any:
|
||||
"""Recursively process a value, detecting binary content in strings."""
|
||||
if isinstance(value, dict):
|
||||
result = {}
|
||||
for k, v in value.items():
|
||||
result[k] = await _process_value(v, wm, block, cache)
|
||||
return result
|
||||
if isinstance(value, list):
|
||||
return [await _process_value(v, wm, block, cache) for v in value]
|
||||
if isinstance(value, str) and len(value) > SIZE_THRESHOLD:
|
||||
return await _try_detect_and_save(value, wm, block, cache)
|
||||
return value
|
||||
|
||||
|
||||
async def _try_detect_and_save(
|
||||
value: str,
|
||||
wm: WorkspaceManager,
|
||||
block: str,
|
||||
cache: dict[str, str],
|
||||
) -> str:
|
||||
"""Attempt to detect binary content and save it. Returns original if not binary."""
|
||||
|
||||
# Try data URI first (highest confidence - explicit mimetype)
|
||||
result = _detect_data_uri(value)
|
||||
if result:
|
||||
content, ext = result
|
||||
return await _save_binary(content, ext, wm, block, cache, value)
|
||||
|
||||
# Try raw base64 with magic number detection
|
||||
result = _detect_raw_base64(value)
|
||||
if result:
|
||||
content, ext = result
|
||||
return await _save_binary(content, ext, wm, block, cache, value)
|
||||
|
||||
return value # Not binary, return unchanged
|
||||
|
||||
|
||||
def _detect_data_uri(value: str) -> Optional[tuple[bytes, str]]:
|
||||
"""
|
||||
Detect data URI with whitelisted mimetype.
|
||||
|
||||
Returns (content, extension) or None.
|
||||
"""
|
||||
match = DATA_URI_PATTERN.match(value)
|
||||
if not match:
|
||||
return None
|
||||
|
||||
mimetype, b64_payload = match.groups()
|
||||
if mimetype not in ALLOWED_MIMETYPES:
|
||||
return None
|
||||
|
||||
try:
|
||||
content = base64.b64decode(b64_payload, validate=True)
|
||||
except (ValueError, binascii.Error):
|
||||
return None
|
||||
|
||||
ext = _mimetype_to_ext(mimetype)
|
||||
return content, ext
|
||||
|
||||
|
||||
def _detect_raw_base64(value: str) -> Optional[tuple[bytes, str]]:
|
||||
"""
|
||||
Detect raw base64 with magic number validation.
|
||||
|
||||
Only processes strings that:
|
||||
1. Look like pure base64 (regex pre-filter)
|
||||
2. Successfully decode as base64
|
||||
3. Start with a known binary file magic number
|
||||
|
||||
Returns (content, extension) or None.
|
||||
"""
|
||||
# Pre-filter: must look like base64 (no spaces, punctuation, etc.)
|
||||
if not BASE64_PATTERN.match(value):
|
||||
return None
|
||||
|
||||
try:
|
||||
content = base64.b64decode(value, validate=True)
|
||||
except (ValueError, binascii.Error):
|
||||
return None
|
||||
|
||||
# Check magic numbers
|
||||
for magic, ext in MAGIC_SIGNATURES:
|
||||
if content.startswith(magic):
|
||||
# Special case for WebP: RIFF container, verify "WEBP" at offset 8
|
||||
if magic == b"RIFF":
|
||||
if len(content) < 12 or content[8:12] != b"WEBP":
|
||||
continue
|
||||
return content, ext
|
||||
|
||||
return None # No magic number match = not a recognized binary format
|
||||
|
||||
|
||||
async def _save_binary(
|
||||
content: bytes,
|
||||
ext: str,
|
||||
wm: WorkspaceManager,
|
||||
block: str,
|
||||
cache: dict[str, str],
|
||||
original: str,
|
||||
) -> str:
|
||||
"""
|
||||
Save binary content to workspace with deduplication.
|
||||
|
||||
Returns workspace://file-id reference, or original value on failure.
|
||||
"""
|
||||
content_hash = hashlib.sha256(content).hexdigest()
|
||||
|
||||
if content_hash in cache:
|
||||
return cache[content_hash]
|
||||
|
||||
try:
|
||||
safe_block = sanitize_filename(block)[:20].lower()
|
||||
filename = f"{safe_block}_{ext}_{uuid.uuid4().hex[:12]}.{ext}"
|
||||
|
||||
file = await wm.write_file(content, filename)
|
||||
ref = f"workspace://{file.id}"
|
||||
cache[content_hash] = ref
|
||||
return ref
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to save binary output: {e}")
|
||||
return original # Graceful degradation
|
||||
|
||||
|
||||
def _mimetype_to_ext(mimetype: str) -> str:
|
||||
"""Convert mimetype to file extension."""
|
||||
mapping = {
|
||||
"image/png": "png",
|
||||
"image/jpeg": "jpg",
|
||||
"image/jpg": "jpg",
|
||||
"image/gif": "gif",
|
||||
"image/webp": "webp",
|
||||
"image/svg+xml": "svg",
|
||||
"application/pdf": "pdf",
|
||||
"application/octet-stream": "bin",
|
||||
}
|
||||
return mapping.get(mimetype, "bin")
|
||||
@@ -14,8 +14,10 @@ from backend.data.model import CredentialsMetaInput
|
||||
from backend.data.workspace import get_or_create_workspace
|
||||
from backend.integrations.creds_manager import IntegrationCredentialsManager
|
||||
from backend.util.exceptions import BlockError
|
||||
from backend.util.workspace import WorkspaceManager
|
||||
|
||||
from .base import BaseTool
|
||||
from .binary_output_processor import process_binary_outputs
|
||||
from .models import (
|
||||
BlockOutputResponse,
|
||||
ErrorResponse,
|
||||
@@ -321,6 +323,16 @@ class RunBlockTool(BaseTool):
|
||||
):
|
||||
outputs[output_name].append(output_data)
|
||||
|
||||
# Post-process outputs to save binary content to workspace
|
||||
workspace_manager = WorkspaceManager(
|
||||
user_id=user_id,
|
||||
workspace_id=workspace.id,
|
||||
session_id=session.session_id,
|
||||
)
|
||||
outputs = await process_binary_outputs(
|
||||
dict(outputs), workspace_manager, block.name
|
||||
)
|
||||
|
||||
return BlockOutputResponse(
|
||||
message=f"Block '{block.name}' executed successfully",
|
||||
block_id=block_id,
|
||||
|
||||
@@ -0,0 +1,390 @@
|
||||
"""Tests for content-based binary output detection and saving."""
|
||||
|
||||
import base64
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from .binary_output_processor import (
|
||||
_detect_data_uri,
|
||||
_detect_raw_base64,
|
||||
_mimetype_to_ext,
|
||||
process_binary_outputs,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_workspace_manager():
|
||||
"""Create a mock workspace manager that returns predictable file IDs."""
|
||||
wm = MagicMock()
|
||||
|
||||
async def mock_write_file(content, filename):
|
||||
file = MagicMock()
|
||||
file.id = f"file-{filename[:10]}"
|
||||
return file
|
||||
|
||||
wm.write_file = AsyncMock(side_effect=mock_write_file)
|
||||
return wm
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Data URI Detection Tests
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestDetectDataUri:
|
||||
"""Tests for _detect_data_uri function."""
|
||||
|
||||
def test_detects_png_data_uri(self):
|
||||
"""Should detect valid PNG data URI."""
|
||||
# Minimal valid PNG (1x1 transparent)
|
||||
png_b64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="
|
||||
data_uri = f"data:image/png;base64,{png_b64}"
|
||||
|
||||
result = _detect_data_uri(data_uri)
|
||||
|
||||
assert result is not None
|
||||
content, ext = result
|
||||
assert ext == "png"
|
||||
assert content.startswith(b"\x89PNG")
|
||||
|
||||
def test_detects_pdf_data_uri(self):
|
||||
"""Should detect valid PDF data URI."""
|
||||
pdf_content = b"%PDF-1.4 test content"
|
||||
pdf_b64 = base64.b64encode(pdf_content).decode()
|
||||
data_uri = f"data:application/pdf;base64,{pdf_b64}"
|
||||
|
||||
result = _detect_data_uri(data_uri)
|
||||
|
||||
assert result is not None
|
||||
content, ext = result
|
||||
assert ext == "pdf"
|
||||
assert content == pdf_content
|
||||
|
||||
def test_rejects_text_plain_mimetype(self):
|
||||
"""Should reject text/plain mimetype (not in whitelist)."""
|
||||
text_b64 = base64.b64encode(b"Hello World").decode()
|
||||
data_uri = f"data:text/plain;base64,{text_b64}"
|
||||
|
||||
result = _detect_data_uri(data_uri)
|
||||
|
||||
assert result is None
|
||||
|
||||
def test_rejects_non_data_uri_string(self):
|
||||
"""Should return None for non-data-URI strings."""
|
||||
result = _detect_data_uri("https://example.com/image.png")
|
||||
assert result is None
|
||||
|
||||
def test_rejects_invalid_base64_in_data_uri(self):
|
||||
"""Should return None for data URI with invalid base64."""
|
||||
data_uri = "-valid-base64!!!"
|
||||
result = _detect_data_uri(data_uri)
|
||||
assert result is None
|
||||
|
||||
def test_handles_jpeg_mimetype(self):
|
||||
"""Should handle image/jpeg mimetype."""
|
||||
jpeg_content = b"\xff\xd8\xff\xe0test"
|
||||
jpeg_b64 = base64.b64encode(jpeg_content).decode()
|
||||
data_uri = f"data:image/jpeg;base64,{jpeg_b64}"
|
||||
|
||||
result = _detect_data_uri(data_uri)
|
||||
|
||||
assert result is not None
|
||||
_, ext = result
|
||||
assert ext == "jpg"
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Raw Base64 Detection Tests
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestDetectRawBase64:
|
||||
"""Tests for _detect_raw_base64 function."""
|
||||
|
||||
def test_detects_png_magic_number(self):
|
||||
"""Should detect raw base64 PNG by magic number."""
|
||||
# Minimal valid PNG
|
||||
png_b64 = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg=="
|
||||
|
||||
result = _detect_raw_base64(png_b64)
|
||||
|
||||
assert result is not None
|
||||
content, ext = result
|
||||
assert ext == "png"
|
||||
assert content.startswith(b"\x89PNG")
|
||||
|
||||
def test_detects_jpeg_magic_number(self):
|
||||
"""Should detect raw base64 JPEG by magic number."""
|
||||
jpeg_content = b"\xff\xd8\xff\xe0" + b"\x00" * 100
|
||||
jpeg_b64 = base64.b64encode(jpeg_content).decode()
|
||||
|
||||
result = _detect_raw_base64(jpeg_b64)
|
||||
|
||||
assert result is not None
|
||||
_, ext = result
|
||||
assert ext == "jpg"
|
||||
|
||||
def test_detects_pdf_magic_number(self):
|
||||
"""Should detect raw base64 PDF by magic number."""
|
||||
pdf_content = b"%PDF-1.4 " + b"x" * 100
|
||||
pdf_b64 = base64.b64encode(pdf_content).decode()
|
||||
|
||||
result = _detect_raw_base64(pdf_b64)
|
||||
|
||||
assert result is not None
|
||||
_, ext = result
|
||||
assert ext == "pdf"
|
||||
|
||||
def test_detects_gif87a_magic_number(self):
|
||||
"""Should detect GIF87a magic number."""
|
||||
gif_content = b"GIF87a" + b"\x00" * 100
|
||||
gif_b64 = base64.b64encode(gif_content).decode()
|
||||
|
||||
result = _detect_raw_base64(gif_b64)
|
||||
|
||||
assert result is not None
|
||||
_, ext = result
|
||||
assert ext == "gif"
|
||||
|
||||
def test_detects_gif89a_magic_number(self):
|
||||
"""Should detect GIF89a magic number."""
|
||||
gif_content = b"GIF89a" + b"\x00" * 100
|
||||
gif_b64 = base64.b64encode(gif_content).decode()
|
||||
|
||||
result = _detect_raw_base64(gif_b64)
|
||||
|
||||
assert result is not None
|
||||
_, ext = result
|
||||
assert ext == "gif"
|
||||
|
||||
def test_detects_webp_magic_number(self):
|
||||
"""Should detect WebP (RIFF + WEBP at offset 8)."""
|
||||
# WebP header: RIFF + size (4 bytes) + WEBP
|
||||
webp_content = b"RIFF\x00\x00\x00\x00WEBP" + b"\x00" * 100
|
||||
webp_b64 = base64.b64encode(webp_content).decode()
|
||||
|
||||
result = _detect_raw_base64(webp_b64)
|
||||
|
||||
assert result is not None
|
||||
_, ext = result
|
||||
assert ext == "webp"
|
||||
|
||||
def test_rejects_riff_without_webp(self):
|
||||
"""Should reject RIFF files that aren't WebP (e.g., WAV)."""
|
||||
wav_content = b"RIFF\x00\x00\x00\x00WAVE" + b"\x00" * 100
|
||||
wav_b64 = base64.b64encode(wav_content).decode()
|
||||
|
||||
result = _detect_raw_base64(wav_b64)
|
||||
|
||||
assert result is None
|
||||
|
||||
def test_rejects_non_base64_string(self):
|
||||
"""Should reject strings that don't look like base64."""
|
||||
result = _detect_raw_base64("Hello, this is regular text with spaces!")
|
||||
assert result is None
|
||||
|
||||
def test_rejects_base64_without_magic_number(self):
|
||||
"""Should reject valid base64 that doesn't have a known magic number."""
|
||||
random_content = b"This is just random text, not a binary file"
|
||||
random_b64 = base64.b64encode(random_content).decode()
|
||||
|
||||
result = _detect_raw_base64(random_b64)
|
||||
|
||||
assert result is None
|
||||
|
||||
def test_rejects_invalid_base64(self):
|
||||
"""Should return None for invalid base64."""
|
||||
result = _detect_raw_base64("not-valid-base64!!!")
|
||||
assert result is None
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Process Binary Outputs Tests
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestProcessBinaryOutputs:
|
||||
"""Tests for process_binary_outputs function."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_saves_large_png_and_returns_reference(self, mock_workspace_manager):
|
||||
"""Should save PNG > 1KB and return workspace reference."""
|
||||
# Create PNG > 1KB
|
||||
png_header = b"\x89PNG\r\n\x1a\n"
|
||||
png_content = png_header + b"\x00" * 2000
|
||||
png_b64 = base64.b64encode(png_content).decode()
|
||||
|
||||
outputs = {"result": [png_b64]}
|
||||
|
||||
result = await process_binary_outputs(
|
||||
outputs, mock_workspace_manager, "TestBlock"
|
||||
)
|
||||
|
||||
assert result["result"][0].startswith("workspace://")
|
||||
mock_workspace_manager.write_file.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_preserves_small_content(self, mock_workspace_manager):
|
||||
"""Should not process strings smaller than threshold."""
|
||||
small_content = "small"
|
||||
|
||||
outputs = {"result": [small_content]}
|
||||
|
||||
result = await process_binary_outputs(
|
||||
outputs, mock_workspace_manager, "TestBlock"
|
||||
)
|
||||
|
||||
assert result["result"][0] == small_content
|
||||
mock_workspace_manager.write_file.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_deduplicates_identical_content(self, mock_workspace_manager):
|
||||
"""Should save identical content only once."""
|
||||
png_header = b"\x89PNG\r\n\x1a\n"
|
||||
png_content = png_header + b"\x00" * 2000
|
||||
png_b64 = base64.b64encode(png_content).decode()
|
||||
|
||||
outputs = {"result": [png_b64, png_b64]}
|
||||
|
||||
result = await process_binary_outputs(
|
||||
outputs, mock_workspace_manager, "TestBlock"
|
||||
)
|
||||
|
||||
# Both should have references
|
||||
assert result["result"][0].startswith("workspace://")
|
||||
assert result["result"][1].startswith("workspace://")
|
||||
# But only one write should have happened
|
||||
assert mock_workspace_manager.write_file.call_count == 1
|
||||
# And they should be the same reference
|
||||
assert result["result"][0] == result["result"][1]
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_processes_nested_dict(self, mock_workspace_manager):
|
||||
"""Should recursively process nested dictionaries."""
|
||||
png_header = b"\x89PNG\r\n\x1a\n"
|
||||
png_content = png_header + b"\x00" * 2000
|
||||
png_b64 = base64.b64encode(png_content).decode()
|
||||
|
||||
outputs = {"result": [{"nested": {"deep": png_b64}}]}
|
||||
|
||||
result = await process_binary_outputs(
|
||||
outputs, mock_workspace_manager, "TestBlock"
|
||||
)
|
||||
|
||||
assert result["result"][0]["nested"]["deep"].startswith("workspace://")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_processes_nested_list(self, mock_workspace_manager):
|
||||
"""Should recursively process nested lists."""
|
||||
png_header = b"\x89PNG\r\n\x1a\n"
|
||||
png_content = png_header + b"\x00" * 2000
|
||||
png_b64 = base64.b64encode(png_content).decode()
|
||||
|
||||
outputs = {"result": [[png_b64]]}
|
||||
|
||||
result = await process_binary_outputs(
|
||||
outputs, mock_workspace_manager, "TestBlock"
|
||||
)
|
||||
|
||||
assert result["result"][0][0].startswith("workspace://")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handles_data_uri_format(self, mock_workspace_manager):
|
||||
"""Should handle data URI format."""
|
||||
png_header = b"\x89PNG\r\n\x1a\n"
|
||||
png_content = png_header + b"\x00" * 2000
|
||||
png_b64 = base64.b64encode(png_content).decode()
|
||||
data_uri = f"data:image/png;base64,{png_b64}"
|
||||
|
||||
outputs = {"result": [data_uri]}
|
||||
|
||||
result = await process_binary_outputs(
|
||||
outputs, mock_workspace_manager, "TestBlock"
|
||||
)
|
||||
|
||||
assert result["result"][0].startswith("workspace://")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_preserves_non_binary_large_strings(self, mock_workspace_manager):
|
||||
"""Should preserve large strings that aren't binary."""
|
||||
large_text = "A" * 2000 # Large but not base64 or binary
|
||||
|
||||
outputs = {"result": [large_text]}
|
||||
|
||||
result = await process_binary_outputs(
|
||||
outputs, mock_workspace_manager, "TestBlock"
|
||||
)
|
||||
|
||||
assert result["result"][0] == large_text
|
||||
mock_workspace_manager.write_file.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_graceful_degradation_on_save_failure(self, mock_workspace_manager):
|
||||
"""Should preserve original value if save fails."""
|
||||
mock_workspace_manager.write_file = AsyncMock(
|
||||
side_effect=Exception("Storage error")
|
||||
)
|
||||
|
||||
png_header = b"\x89PNG\r\n\x1a\n"
|
||||
png_content = png_header + b"\x00" * 2000
|
||||
png_b64 = base64.b64encode(png_content).decode()
|
||||
|
||||
outputs = {"result": [png_b64]}
|
||||
|
||||
result = await process_binary_outputs(
|
||||
outputs, mock_workspace_manager, "TestBlock"
|
||||
)
|
||||
|
||||
# Should return original value on failure
|
||||
assert result["result"][0] == png_b64
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_handles_stdout_logs_field(self, mock_workspace_manager):
|
||||
"""Should detect binary in stdout_logs (the actual failing case)."""
|
||||
pdf_content = b"%PDF-1.4 " + b"x" * 2000
|
||||
pdf_b64 = base64.b64encode(pdf_content).decode()
|
||||
|
||||
outputs = {"stdout_logs": [pdf_b64]}
|
||||
|
||||
result = await process_binary_outputs(
|
||||
outputs, mock_workspace_manager, "ExecuteCodeBlock"
|
||||
)
|
||||
|
||||
assert result["stdout_logs"][0].startswith("workspace://")
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Mimetype to Extension Tests
|
||||
# =============================================================================
|
||||
|
||||
|
||||
class TestMimetypeToExt:
|
||||
"""Tests for _mimetype_to_ext function."""
|
||||
|
||||
def test_png_mapping(self):
|
||||
assert _mimetype_to_ext("image/png") == "png"
|
||||
|
||||
def test_jpeg_mapping(self):
|
||||
assert _mimetype_to_ext("image/jpeg") == "jpg"
|
||||
|
||||
def test_nonstandard_jpg_mapping(self):
|
||||
assert _mimetype_to_ext("image/jpg") == "jpg"
|
||||
|
||||
def test_gif_mapping(self):
|
||||
assert _mimetype_to_ext("image/gif") == "gif"
|
||||
|
||||
def test_webp_mapping(self):
|
||||
assert _mimetype_to_ext("image/webp") == "webp"
|
||||
|
||||
def test_svg_mapping(self):
|
||||
assert _mimetype_to_ext("image/svg+xml") == "svg"
|
||||
|
||||
def test_pdf_mapping(self):
|
||||
assert _mimetype_to_ext("application/pdf") == "pdf"
|
||||
|
||||
def test_octet_stream_mapping(self):
|
||||
assert _mimetype_to_ext("application/octet-stream") == "bin"
|
||||
|
||||
def test_unknown_mimetype(self):
|
||||
assert _mimetype_to_ext("application/unknown") == "bin"
|
||||
Reference in New Issue
Block a user