Compare commits

..

1 Commits

Author SHA1 Message Date
Nicholas Tindle
8892bcd230 docs: Add workspace and media file architecture documentation (#11989)
### Changes 🏗️

- Added comprehensive architecture documentation at
`docs/platform/workspace-media-architecture.md` covering:
  - Database models (`UserWorkspace`, `UserWorkspaceFile`)
  - `WorkspaceManager` API with session scoping
- `store_media_file()` media normalization pipeline (input types, return
formats)
  - Virus scanning responsibility boundaries
- Decision tree for choosing `WorkspaceManager` vs `store_media_file()`
- Configuration reference including `clamav_max_concurrency` and
`clamav_mark_failed_scans_as_clean`
  - Common patterns with error handling examples
- Updated `autogpt_platform/backend/CLAUDE.md` with a "Workspace & Media
Files" section referencing the new docs
- Removed duplicate `scan_content_safe()` call from
`WriteWorkspaceFileTool` — `WorkspaceManager.write_file()` already scans
internally, so the tool was double-scanning every file
- Replaced removed comment in `workspace.py` with explicit ownership
comment clarifying that `WorkspaceManager` is the single scanning
boundary

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
- [x] Verified `scan_content_safe()` is called inside
`WorkspaceManager.write_file()` (workspace.py:186)
- [x] Verified `store_media_file()` scans all input branches including
local paths (file.py:351)
- [x] Verified documentation accuracy against current source code after
merge with dev
  - [x] CI checks all passing

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> **Low Risk**
> Mostly adds documentation and internal developer guidance; the only
code change is a comment clarifying `WorkspaceManager.write_file()` as
the single virus-scanning boundary, with no behavior change.
> 
> **Overview**
> Adds a new `docs/platform/workspace-media-architecture.md` describing
the Workspace storage layer vs the `store_media_file()` media pipeline,
including session scoping and virus-scanning/persistence responsibility
boundaries.
> 
> Updates backend `CLAUDE.md` to point contributors to the new doc when
working on CoPilot uploads/downloads or
`WorkspaceManager`/`store_media_file()`, and clarifies in
`WorkspaceManager.write_file()` (comment-only) that callers should not
duplicate virus scanning.
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
18fcfa03f8. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 06:12:26 +00:00
18 changed files with 356 additions and 1346 deletions

View File

@@ -178,6 +178,16 @@ yield "image_url", result_url
3. Write tests alongside the route file
4. Run `poetry run test` to verify
## Workspace & Media Files
**Read [Workspace & Media Architecture](../../docs/platform/workspace-media-architecture.md) when:**
- Working on CoPilot file upload/download features
- Building blocks that handle `MediaFileType` inputs/outputs
- Modifying `WorkspaceManager` or `store_media_file()`
- Debugging file persistence or virus scanning issues
Covers: `WorkspaceManager` (persistent storage with session scoping), `store_media_file()` (media normalization pipeline), and responsibility boundaries for virus scanning and persistence.
## Security Implementation
### Cache Protection Middleware

View File

@@ -1,100 +0,0 @@
"""API endpoint for importing external workflows via CoPilot."""
import logging
from typing import Any
import pydantic
from autogpt_libs.auth import requires_user
from fastapi import APIRouter, HTTPException, Security
from backend.copilot.workflow_import.converter import build_copilot_prompt
from backend.copilot.workflow_import.describers import describe_workflow
from backend.copilot.workflow_import.format_detector import (
SourcePlatform,
detect_format,
)
from backend.copilot.workflow_import.url_fetcher import fetch_n8n_template
logger = logging.getLogger(__name__)
router = APIRouter()
class ImportWorkflowRequest(pydantic.BaseModel):
"""Request body for importing an external workflow."""
workflow_json: dict[str, Any] | None = None
template_url: str | None = None
@pydantic.model_validator(mode="after")
def check_exactly_one_source(self) -> "ImportWorkflowRequest":
has_json = self.workflow_json is not None
has_url = self.template_url is not None
if not has_json and not has_url:
raise ValueError("Provide either 'workflow_json' or 'template_url'")
if has_json and has_url:
raise ValueError(
"Provide only one of 'workflow_json' or 'template_url', not both"
)
return self
class ImportWorkflowResponse(pydantic.BaseModel):
"""Response from parsing an external workflow.
Returns a CoPilot prompt that the frontend uses to redirect the user
to CoPilot, where the agentic agent-generator handles the conversion.
"""
copilot_prompt: str
source_format: str
source_name: str
@router.post(
path="/workflow",
summary="Import a workflow from another tool (n8n, Make.com, Zapier)",
dependencies=[Security(requires_user)],
)
async def import_workflow(
request: ImportWorkflowRequest,
) -> ImportWorkflowResponse:
"""Parse an external workflow and return a CoPilot prompt.
Accepts either raw workflow JSON or a template URL (n8n only for now).
The workflow is parsed and described, then a structured prompt is returned
for CoPilot's agent-generator to handle the actual conversion.
"""
# Step 1: Get the raw workflow JSON
if request.template_url is not None:
try:
workflow_json = await fetch_n8n_template(request.template_url)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
except RuntimeError as e:
raise HTTPException(status_code=502, detail=str(e)) from e
else:
workflow_json = request.workflow_json
assert workflow_json is not None # guaranteed by validator
# Step 2: Detect format
fmt = detect_format(workflow_json)
if fmt == SourcePlatform.UNKNOWN:
raise HTTPException(
status_code=400,
detail="Could not detect workflow format. Supported formats: "
"n8n, Make.com, Zapier. Ensure you're uploading a valid "
"workflow export file.",
)
# Step 3: Describe the workflow
desc = describe_workflow(workflow_json, fmt)
# Step 4: Build CoPilot prompt
prompt = build_copilot_prompt(desc)
return ImportWorkflowResponse(
copilot_prompt=prompt,
source_format=fmt.value,
source_name=desc.name,
)

View File

@@ -1,173 +0,0 @@
"""Tests for workflow_import.py API endpoint."""
from unittest.mock import AsyncMock
import fastapi
import pytest
from autogpt_libs.auth.jwt_utils import get_jwt_payload
from fastapi.testclient import TestClient
from backend.api.features.workflow_import import router
app = fastapi.FastAPI()
app.include_router(router)
client = TestClient(app)
# Sample workflow fixtures
N8N_WORKFLOW = {
"name": "Email on Webhook",
"nodes": [
{
"name": "Webhook",
"type": "n8n-nodes-base.webhookTrigger",
"parameters": {"path": "/incoming"},
},
{
"name": "Send Email",
"type": "n8n-nodes-base.gmail",
"parameters": {"resource": "message", "operation": "send"},
},
],
"connections": {
"Webhook": {"main": [[{"node": "Send Email", "type": "main", "index": 0}]]}
},
}
MAKE_WORKFLOW = {
"name": "Sheets to Calendar",
"flow": [
{
"module": "google-sheets:watchUpdatedCells",
"mapper": {"spreadsheetId": "abc"},
},
{
"module": "google-calendar:createAnEvent",
"mapper": {"title": "Meeting"},
},
],
}
ZAPIER_WORKFLOW = {
"name": "Gmail to Slack",
"steps": [
{"app": "Gmail", "action": "new_email"},
{"app": "Slack", "action": "send_message", "params": {"channel": "#alerts"}},
],
}
@pytest.fixture(autouse=True)
def setup_app_auth(mock_jwt_user):
app.dependency_overrides[get_jwt_payload] = mock_jwt_user["get_jwt_payload"]
yield
app.dependency_overrides.clear()
class TestImportWorkflow:
def test_import_n8n_workflow(self):
response = client.post(
"/workflow",
json={"workflow_json": N8N_WORKFLOW},
)
assert response.status_code == 200
data = response.json()
assert data["source_format"] == "n8n"
assert data["source_name"] == "Email on Webhook"
assert "copilot_prompt" in data
assert "n8n" in data["copilot_prompt"]
assert "Email on Webhook" in data["copilot_prompt"]
def test_import_make_workflow(self):
response = client.post(
"/workflow",
json={"workflow_json": MAKE_WORKFLOW},
)
assert response.status_code == 200
data = response.json()
assert data["source_format"] == "make"
assert data["source_name"] == "Sheets to Calendar"
assert "copilot_prompt" in data
def test_import_zapier_workflow(self):
response = client.post(
"/workflow",
json={"workflow_json": ZAPIER_WORKFLOW},
)
assert response.status_code == 200
data = response.json()
assert data["source_format"] == "zapier"
assert data["source_name"] == "Gmail to Slack"
assert "copilot_prompt" in data
def test_prompt_includes_steps(self):
response = client.post(
"/workflow",
json={"workflow_json": N8N_WORKFLOW},
)
prompt = response.json()["copilot_prompt"]
# Should include step details from the workflow
assert "Webhook" in prompt or "webhook" in prompt
assert "Gmail" in prompt or "gmail" in prompt
def test_no_source_provided(self):
response = client.post(
"/workflow",
json={},
)
assert response.status_code == 422 # Pydantic validation error
def test_both_sources_provided(self):
response = client.post(
"/workflow",
json={
"workflow_json": N8N_WORKFLOW,
"template_url": "https://n8n.io/workflows/123",
},
)
assert response.status_code == 422
def test_unknown_format_returns_400(self):
response = client.post(
"/workflow",
json={"workflow_json": {"foo": "bar"}},
)
assert response.status_code == 400
assert "Could not detect workflow format" in response.json()["detail"]
def test_url_fetch_bad_url_returns_400(self, mocker):
mocker.patch(
"backend.api.features.workflow_import.fetch_n8n_template",
new_callable=AsyncMock,
side_effect=ValueError("Invalid URL format"),
)
response = client.post(
"/workflow",
json={"template_url": "https://bad-url.com"},
)
assert response.status_code == 400
assert "Invalid URL format" in response.json()["detail"]
def test_url_fetch_upstream_error_returns_502(self, mocker):
mocker.patch(
"backend.api.features.workflow_import.fetch_n8n_template",
new_callable=AsyncMock,
side_effect=RuntimeError("n8n API returned 500"),
)
response = client.post(
"/workflow",
json={"template_url": "https://n8n.io/workflows/123"},
)
assert response.status_code == 502
assert "n8n API returned 500" in response.json()["detail"]
def test_response_model_shape(self):
response = client.post(
"/workflow",
json={"workflow_json": N8N_WORKFLOW},
)
data = response.json()
assert "copilot_prompt" in data
assert "source_format" in data
assert "source_name" in data
assert isinstance(data["copilot_prompt"], str)
assert len(data["copilot_prompt"]) > 0

View File

@@ -34,7 +34,6 @@ import backend.api.features.postmark.postmark
import backend.api.features.store.model
import backend.api.features.store.routes
import backend.api.features.v1
import backend.api.features.workflow_import
import backend.api.features.workspace.routes as workspace_routes
import backend.data.block
import backend.data.db
@@ -355,11 +354,6 @@ app.include_router(
tags=["oauth"],
prefix="/api/oauth",
)
app.include_router(
backend.api.features.workflow_import.router,
tags=["v2", "import"],
prefix="/api/import",
)
app.mount("/external-api", external_api)

View File

@@ -1,16 +0,0 @@
"""Workflow import module.
Parses workflows from n8n, Make.com, and Zapier into structured descriptions,
then builds CoPilot prompts for the agentic agent-generator to handle conversion.
"""
from .converter import build_copilot_prompt
from .format_detector import SourcePlatform, detect_format
from .models import WorkflowDescription
__all__ = [
"SourcePlatform",
"WorkflowDescription",
"build_copilot_prompt",
"detect_format",
]

View File

@@ -1,49 +0,0 @@
"""Build a CoPilot prompt from a WorkflowDescription.
Instead of a custom single-shot LLM conversion, we generate a structured
prompt that CoPilot's existing agentic agent-generator handles. This reuses
the multi-turn tool-use pipeline (find_block, create_agent, fixer, validator)
for reliable workflow-to-agent conversion.
"""
import json
from .models import WorkflowDescription
def build_copilot_prompt(desc: WorkflowDescription) -> str:
"""Build a CoPilot prompt from a parsed WorkflowDescription.
The prompt describes the external workflow in enough detail for CoPilot's
agent-generator to recreate it as an AutoGPT agent graph.
Args:
desc: Structured description of the source workflow.
Returns:
A user-facing prompt string for CoPilot.
"""
steps_text = ""
for step in desc.steps:
conns = (
f" → connects to steps {step.connections_to}" if step.connections_to else ""
)
params_str = ""
if step.parameters:
truncated = json.dumps(step.parameters, default=str)[:300]
params_str = f" (params: {truncated})"
steps_text += (
f" {step.order}. [{step.service}] {step.action}{params_str}{conns}\n"
)
trigger_line = f"Trigger: {desc.trigger_type}" if desc.trigger_type else ""
return f"""I want to import a workflow from {desc.source_format.value} and recreate it as an AutoGPT agent.
**Workflow name**: {desc.name}
**Description**: {desc.description}
{trigger_line}
**Steps** (from the original {desc.source_format.value} workflow):
{steps_text}
Please build an AutoGPT agent that replicates this workflow. Map each step to the most appropriate AutoGPT block(s), wire them together, and save it.""".strip()

View File

@@ -1,269 +0,0 @@
"""Extract structured WorkflowDescription from external workflow JSONs.
Each describer is a pure function that deterministically parses the source
format into a platform-agnostic WorkflowDescription. No LLM calls are made here.
"""
import re
from typing import Any
from .models import SourcePlatform, StepDescription, WorkflowDescription
def describe_workflow(
json_data: dict[str, Any], fmt: SourcePlatform
) -> WorkflowDescription:
"""Route to the appropriate describer based on detected format."""
describers = {
SourcePlatform.N8N: describe_n8n_workflow,
SourcePlatform.MAKE: describe_make_workflow,
SourcePlatform.ZAPIER: describe_zapier_workflow,
}
describer = describers.get(fmt)
if not describer:
raise ValueError(f"No describer available for format: {fmt}")
result = describer(json_data)
if not result.steps:
raise ValueError(f"Workflow contains no steps (format: {fmt.value})")
return result
def describe_n8n_workflow(json_data: dict[str, Any]) -> WorkflowDescription:
"""Extract a structured description from an n8n workflow JSON."""
nodes = json_data.get("nodes", [])
connections = json_data.get("connections", {})
# Build node index by name for connection resolution
node_index: dict[str, int] = {}
steps: list[StepDescription] = []
for i, node in enumerate(nodes):
if not isinstance(node, dict):
continue
node_name = node.get("name", f"Node {i}")
node_index[node_name] = len(steps)
node_type = node.get("type", "unknown")
# Extract service name from type (e.g., "n8n-nodes-base.gmail" -> "Gmail")
service = _extract_n8n_service(node_type)
# Build action description from type and parameters
params = node.get("parameters", {})
if not isinstance(params, dict):
params = {}
action = _describe_n8n_action(node_type, node_name, params)
# Extract key parameters (skip large/internal ones)
clean_params = _clean_params(params)
steps.append(
StepDescription(
order=len(steps),
action=action,
service=service,
parameters=clean_params,
connections_to=[], # filled below
)
)
# Resolve connections: n8n format is {NodeName: {main: [[{node, type, index}]]}}
for source_name, conn_data in connections.items():
source_idx = node_index.get(source_name)
if source_idx is None:
continue
main_outputs = conn_data.get("main", [])
for output_group in main_outputs:
if not isinstance(output_group, list):
continue
for conn in output_group:
if not isinstance(conn, dict):
continue
target_name = conn.get("node")
if not isinstance(target_name, str):
continue
target_idx = node_index.get(target_name)
if target_idx is not None:
steps[source_idx].connections_to.append(target_idx)
# Detect trigger type
trigger_type = None
if nodes and isinstance(nodes[0], dict):
first_type = nodes[0].get("type", "")
if isinstance(first_type, str) and (
"trigger" in first_type.lower() or "webhook" in first_type.lower()
):
trigger_type = _extract_n8n_service(first_type)
return WorkflowDescription(
name=json_data.get("name", "Imported n8n Workflow"),
description=_build_workflow_summary(steps),
steps=steps,
trigger_type=trigger_type,
source_format=SourcePlatform.N8N,
)
def describe_make_workflow(json_data: dict[str, Any]) -> WorkflowDescription:
"""Extract a structured description from a Make.com scenario blueprint."""
flow = json_data.get("flow", [])
valid_modules = [m for m in flow if isinstance(m, dict)]
steps: list[StepDescription] = []
for i, module in enumerate(valid_modules):
module_ref = module.get("module", "unknown:unknown")
if not isinstance(module_ref, str):
module_ref = "unknown:unknown"
parts = module_ref.split(":", 1)
service = parts[0].replace("-", " ").title() if parts else "Unknown"
action_verb = parts[1] if len(parts) > 1 else "process"
# Build human-readable action
action = f"{str(action_verb).replace(':', ' ').title()} via {service}"
params = module.get("mapper", module.get("parameters", {}))
clean_params = _clean_params(params) if isinstance(params, dict) else {}
# Check for routes (branching) — routers don't connect sequentially
routes = module.get("routes", [])
if routes:
# Router modules branch; don't assign sequential connections
connections_to: list[int] = []
clean_params["_has_routes"] = len(routes)
else:
# Make.com flows are sequential by default; each step connects to next
connections_to = [i + 1] if i < len(valid_modules) - 1 else []
steps.append(
StepDescription(
order=i,
action=action,
service=service,
parameters=clean_params,
connections_to=connections_to,
)
)
# Detect trigger
trigger_type = None
if flow and isinstance(flow[0], dict):
first_module = flow[0].get("module", "")
if isinstance(first_module, str) and (
"watch" in first_module.lower() or "trigger" in first_module.lower()
):
trigger_type = first_module.split(":")[0].replace("-", " ").title()
return WorkflowDescription(
name=json_data.get("name", "Imported Make.com Scenario"),
description=_build_workflow_summary(steps),
steps=steps,
trigger_type=trigger_type,
source_format=SourcePlatform.MAKE,
)
def describe_zapier_workflow(json_data: dict[str, Any]) -> WorkflowDescription:
"""Extract a structured description from a Zapier Zap JSON."""
zap_steps = json_data.get("steps", [])
valid_steps = [s for s in zap_steps if isinstance(s, dict)]
steps: list[StepDescription] = []
for i, step in enumerate(valid_steps):
app = step.get("app", "Unknown")
action = step.get("action", "process")
action_desc = f"{str(action).replace('_', ' ').title()} via {app}"
params = step.get("params", step.get("inputFields", {}))
clean_params = _clean_params(params) if isinstance(params, dict) else {}
# Zapier zaps are linear: each step connects to next
connections_to = [i + 1] if i < len(valid_steps) - 1 else []
steps.append(
StepDescription(
order=i,
action=action_desc,
service=app,
parameters=clean_params,
connections_to=connections_to,
)
)
trigger_type = None
if valid_steps:
trigger_type = valid_steps[0].get("app")
return WorkflowDescription(
name=json_data.get("name", json_data.get("title", "Imported Zapier Zap")),
description=_build_workflow_summary(steps),
steps=steps,
trigger_type=trigger_type,
source_format=SourcePlatform.ZAPIER,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _extract_n8n_service(node_type: str) -> str:
"""Extract a human-readable service name from an n8n node type.
Examples:
"n8n-nodes-base.gmail" -> "Gmail"
"@n8n/n8n-nodes-langchain.agent" -> "Langchain Agent"
"n8n-nodes-base.httpRequest" -> "Http Request"
"""
# Strip common prefixes
name = node_type
for prefix in ("n8n-nodes-base.", "@n8n/n8n-nodes-langchain.", "@n8n/"):
if name.startswith(prefix):
name = name[len(prefix) :]
break
# Convert camelCase to Title Case
name = re.sub(r"([a-z])([A-Z])", r"\1 \2", name)
return name.replace(".", " ").replace("-", " ").title()
def _describe_n8n_action(node_type: str, node_name: str, params: dict[str, Any]) -> str:
"""Build a human-readable action description for an n8n node."""
service = _extract_n8n_service(node_type)
resource = str(params.get("resource", ""))
operation = str(params.get("operation", ""))
if resource and operation:
return f"{operation.title()} {resource} via {service}"
if operation:
return f"{operation.title()} via {service}"
return f"{node_name} ({service})"
def _clean_params(params: dict[str, Any], max_keys: int = 10) -> dict[str, Any]:
"""Extract key parameters, skipping large or internal values."""
cleaned: dict[str, Any] = {}
for key, value in list(params.items())[:max_keys]:
if key.startswith("_") or key in ("credentials", "webhookId"):
continue
if isinstance(value, str) and len(value) > 500:
cleaned[key] = value[:500] + "..."
elif isinstance(value, (str, int, float, bool)):
cleaned[key] = value
elif isinstance(value, list) and len(value) <= 5:
cleaned[key] = value
return cleaned
def _build_workflow_summary(steps: list[StepDescription]) -> str:
"""Build a one-line summary of the workflow from its steps."""
if not steps:
return "Empty workflow"
services = []
for s in steps:
if s.service not in services:
services.append(s.service)
service_chain = " -> ".join(services[:6])
if len(services) > 6:
service_chain += f" (and {len(services) - 6} more)"
return f"Workflow with {len(steps)} steps: {service_chain}"

View File

@@ -1,135 +0,0 @@
"""Tests for describers.py."""
import pytest
from .describers import (
describe_make_workflow,
describe_n8n_workflow,
describe_workflow,
describe_zapier_workflow,
)
from .models import SourcePlatform
class TestDescribeN8nWorkflow:
def test_basic_workflow(self):
data = {
"name": "Email on Webhook",
"nodes": [
{
"name": "Webhook",
"type": "n8n-nodes-base.webhookTrigger",
"parameters": {"path": "/incoming"},
},
{
"name": "Send Email",
"type": "n8n-nodes-base.gmail",
"parameters": {"resource": "message", "operation": "send"},
},
],
"connections": {
"Webhook": {
"main": [[{"node": "Send Email", "type": "main", "index": 0}]]
}
},
}
desc = describe_n8n_workflow(data)
assert desc.name == "Email on Webhook"
assert desc.source_format == SourcePlatform.N8N
assert len(desc.steps) == 2
assert desc.steps[0].connections_to == [1]
assert desc.steps[1].connections_to == []
assert desc.trigger_type is not None
def test_step_extraction(self):
data = {
"name": "Test",
"nodes": [
{
"name": "HTTP",
"type": "n8n-nodes-base.httpRequest",
"parameters": {"url": "https://example.com", "method": "GET"},
},
],
"connections": {},
}
desc = describe_n8n_workflow(data)
step = desc.steps[0]
assert step.service == "Http Request"
assert step.order == 0
assert "url" in step.parameters
def test_empty_nodes(self):
data = {"name": "Empty", "nodes": [], "connections": {}}
desc = describe_n8n_workflow(data)
assert len(desc.steps) == 0
assert desc.trigger_type is None
class TestDescribeMakeWorkflow:
def test_basic_scenario(self):
data = {
"name": "Sheets to Calendar",
"flow": [
{
"module": "google-sheets:watchUpdatedCells",
"mapper": {"spreadsheetId": "abc"},
},
{
"module": "google-calendar:createAnEvent",
"mapper": {"title": "Meeting"},
},
],
}
desc = describe_make_workflow(data)
assert desc.name == "Sheets to Calendar"
assert desc.source_format == SourcePlatform.MAKE
assert len(desc.steps) == 2
# Sequential: step 0 connects to step 1
assert desc.steps[0].connections_to == [1]
assert desc.steps[1].connections_to == []
assert desc.trigger_type is not None # "watch" in module name
def test_service_extraction(self):
data = {
"flow": [{"module": "slack:sendMessage", "mapper": {"text": "hello"}}],
}
desc = describe_make_workflow(data)
assert desc.steps[0].service == "Slack"
class TestDescribeZapierWorkflow:
def test_basic_zap(self):
data = {
"name": "Gmail to Slack",
"steps": [
{"app": "Gmail", "action": "new_email"},
{
"app": "Slack",
"action": "send_message",
"params": {"channel": "#alerts"},
},
],
}
desc = describe_zapier_workflow(data)
assert desc.name == "Gmail to Slack"
assert desc.source_format == SourcePlatform.ZAPIER
assert len(desc.steps) == 2
assert desc.steps[0].connections_to == [1]
assert desc.trigger_type == "Gmail"
class TestDescribeWorkflowRouter:
def test_routes_to_n8n(self):
data = {
"nodes": [
{"name": "N", "type": "n8n-nodes-base.webhook", "parameters": {}}
],
"connections": {},
}
desc = describe_workflow(data, SourcePlatform.N8N)
assert desc.source_format == SourcePlatform.N8N
def test_unknown_raises(self):
with pytest.raises(ValueError, match="No describer"):
describe_workflow({}, SourcePlatform.UNKNOWN)

View File

@@ -1,71 +0,0 @@
"""Detect the source platform of a workflow JSON."""
import re
from typing import Any
from .models import SourcePlatform
_N8N_TYPE_RE = re.compile(r"^(n8n-nodes-base\.|@n8n/)")
def detect_format(json_data: dict[str, Any]) -> SourcePlatform:
"""Inspect a workflow JSON and determine which platform it came from.
Args:
json_data: The parsed JSON data from a workflow export file.
Returns:
The detected SourcePlatform.
"""
if _is_n8n(json_data):
return SourcePlatform.N8N
if _is_make(json_data):
return SourcePlatform.MAKE
if _is_zapier(json_data):
return SourcePlatform.ZAPIER
return SourcePlatform.UNKNOWN
def _is_n8n(data: dict[str, Any]) -> bool:
"""n8n workflows have a `nodes` array with items containing `type` fields
matching patterns like `n8n-nodes-base.*` or `@n8n/*`, plus a `connections`
object."""
nodes = data.get("nodes")
connections = data.get("connections")
if not isinstance(nodes, list) or not isinstance(connections, dict):
return False
if not nodes:
return False
# Check if at least one node has an n8n-style type
return any(
isinstance(n, dict)
and isinstance(n.get("type"), str)
and _N8N_TYPE_RE.match(n["type"])
for n in nodes
)
def _is_make(data: dict[str, Any]) -> bool:
"""Make.com scenarios have a `flow` array with items containing `module`
fields in `service:action` URI format."""
flow = data.get("flow")
if not isinstance(flow, list) or not flow:
return False
# Check if at least one module has `service:action` pattern
return any(
isinstance(item, dict)
and isinstance(item.get("module"), str)
and ":" in item["module"]
for item in flow
)
def _is_zapier(data: dict[str, Any]) -> bool:
"""Zapier Zaps have a `steps` array with items containing `app` and
`action` fields."""
steps = data.get("steps")
if not isinstance(steps, list) or not steps:
return False
return any(
isinstance(step, dict) and "app" in step and "action" in step for step in steps
)

View File

@@ -1,101 +0,0 @@
"""Tests for format_detector.py."""
from .format_detector import detect_format
from .models import SourcePlatform
class TestDetectFormat:
def test_n8n_workflow(self):
data = {
"name": "My n8n Workflow",
"nodes": [
{
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"parameters": {"path": "/hook"},
},
{
"name": "HTTP Request",
"type": "n8n-nodes-base.httpRequest",
"parameters": {"url": "https://api.example.com"},
},
],
"connections": {
"Webhook": {
"main": [[{"node": "HTTP Request", "type": "main", "index": 0}]]
}
},
}
assert detect_format(data) == SourcePlatform.N8N
def test_n8n_langchain_nodes(self):
data = {
"nodes": [
{
"name": "Agent",
"type": "@n8n/n8n-nodes-langchain.agent",
"parameters": {},
},
],
"connections": {},
}
assert detect_format(data) == SourcePlatform.N8N
def test_make_scenario(self):
data = {
"name": "My Make Scenario",
"flow": [
{
"module": "google-sheets:watchUpdatedCells",
"mapper": {"spreadsheetId": "123"},
},
{
"module": "google-calendar:createAnEvent",
"mapper": {"title": "Test"},
},
],
}
assert detect_format(data) == SourcePlatform.MAKE
def test_zapier_zap(self):
data = {
"name": "My Zap",
"steps": [
{"app": "gmail", "action": "new_email"},
{
"app": "slack",
"action": "send_message",
"params": {"channel": "#general"},
},
],
}
assert detect_format(data) == SourcePlatform.ZAPIER
def test_unknown_format(self):
data = {"foo": "bar", "nodes": []}
assert detect_format(data) == SourcePlatform.UNKNOWN
def test_empty_dict(self):
assert detect_format({}) == SourcePlatform.UNKNOWN
def test_autogpt_graph_not_detected_as_n8n(self):
"""AutoGPT graphs have nodes but not n8n-style types."""
data = {
"nodes": [
{"id": "abc", "block_id": "some-uuid", "input_default": {}},
],
"connections": {},
}
assert detect_format(data) == SourcePlatform.UNKNOWN
def test_make_without_colon_not_detected(self):
data = {
"flow": [{"module": "simplemodule", "mapper": {}}],
}
assert detect_format(data) == SourcePlatform.UNKNOWN
def test_zapier_without_action_not_detected(self):
data = {
"steps": [{"app": "gmail"}],
}
assert detect_format(data) == SourcePlatform.UNKNOWN

View File

@@ -1,33 +0,0 @@
"""Data models for external workflow import."""
from enum import Enum
from typing import Any
import pydantic
class SourcePlatform(str, Enum):
N8N = "n8n"
MAKE = "make"
ZAPIER = "zapier"
UNKNOWN = "unknown"
class StepDescription(pydantic.BaseModel):
"""A single step/node extracted from an external workflow."""
order: int
action: str
service: str
parameters: dict[str, Any] = pydantic.Field(default_factory=dict)
connections_to: list[int] = pydantic.Field(default_factory=list)
class WorkflowDescription(pydantic.BaseModel):
"""Structured description of an external workflow."""
name: str
description: str
steps: list[StepDescription]
trigger_type: str | None = None
source_format: SourcePlatform

View File

@@ -1,74 +0,0 @@
"""Fetch workflow templates by URL."""
import logging
import re
from typing import Any
from backend.util.request import HTTPClientError, Requests
logger = logging.getLogger(__name__)
# Patterns for extracting template IDs from n8n URLs
_N8N_WORKFLOW_URL_RE = re.compile(
r"https?://(?:www\.)?n8n\.io/workflows/(\d+)", re.IGNORECASE
)
_N8N_TEMPLATES_API = "https://api.n8n.io/api/templates/workflows/{id}"
async def fetch_n8n_template(url: str) -> dict[str, Any]:
"""Fetch an n8n workflow template by its URL.
Supports URLs like:
- https://n8n.io/workflows/1234
- https://n8n.io/workflows/1234-some-slug
Args:
url: The n8n template URL.
Returns:
The n8n workflow JSON.
Raises:
ValueError: If the URL is not a valid n8n template URL.
RuntimeError: If the fetch fails.
"""
match = _N8N_WORKFLOW_URL_RE.match(url.strip())
if not match:
raise ValueError(
"Not a valid n8n workflow URL. Expected format: "
"https://n8n.io/workflows/<id>"
)
template_id = match.group(1)
api_url = _N8N_TEMPLATES_API.format(id=template_id)
client = Requests(raise_for_status=True)
try:
response = await client.get(api_url)
data = response.json()
except HTTPClientError as e:
# 4xx from n8n API (e.g. 404 template not found) → bad user input
raise ValueError(
f"n8n template {template_id} not found or inaccessible: {e}"
) from e
except Exception as e:
raise RuntimeError(f"Failed to fetch n8n template {template_id}: {e}") from e
if not isinstance(data, dict):
raise RuntimeError(
f"Unexpected response format from n8n API for template {template_id}: "
"expected JSON object"
)
# n8n API wraps the workflow in a `workflow` key
workflow = data.get("workflow", data)
if not isinstance(workflow, dict):
raise RuntimeError(
f"Unexpected response format from n8n API for template {template_id}"
)
# Preserve the workflow name from the template metadata
if "name" not in workflow and "name" in data:
workflow["name"] = data["name"]
return workflow

View File

@@ -183,7 +183,8 @@ class WorkspaceManager:
f"{Config().max_file_size_mb}MB limit"
)
# Virus scan content before persisting (defense in depth)
# Scan here — callers must NOT duplicate this scan.
# WorkspaceManager owns virus scanning for all persisted files.
await scan_content_safe(content, filename=filename)
# Determine path with session scoping

View File

@@ -1,4 +1,3 @@
import LibraryImportWorkflowDialog from "../LibraryImportWorkflowDialog/LibraryImportWorkflowDialog";
import { LibrarySearchBar } from "../LibrarySearchBar/LibrarySearchBar";
import LibraryUploadAgentDialog from "../LibraryUploadAgentDialog/LibraryUploadAgentDialog";
@@ -12,14 +11,12 @@ export function LibraryActionHeader({ setSearchTerm }: Props) {
<div className="mb-[32px] hidden items-center justify-center gap-4 md:flex">
<LibrarySearchBar setSearchTerm={setSearchTerm} />
<LibraryUploadAgentDialog />
<LibraryImportWorkflowDialog />
</div>
{/* Mobile and tablet */}
<div className="flex flex-col gap-4 p-4 pt-[52px] md:hidden">
<div className="flex w-full justify-between gap-2">
<div className="flex w-full justify-between">
<LibraryUploadAgentDialog />
<LibraryImportWorkflowDialog />
</div>
<div className="flex items-center justify-center">

View File

@@ -1,156 +0,0 @@
"use client";
import { Button } from "@/components/atoms/Button/Button";
import { FileInput } from "@/components/atoms/FileInput/FileInput";
import { Input } from "@/components/atoms/Input/Input";
import { LoadingSpinner } from "@/components/atoms/LoadingSpinner/LoadingSpinner";
import { Dialog } from "@/components/molecules/Dialog/Dialog";
import {
Form,
FormControl,
FormField,
FormItem,
FormMessage,
} from "@/components/molecules/Form/Form";
import { ArrowsClockwiseIcon } from "@phosphor-icons/react";
import { z } from "zod";
import { useLibraryImportWorkflowDialog } from "./useLibraryImportWorkflowDialog";
export const importWorkflowFormSchema = z.object({
workflowFile: z.string(),
templateUrl: z.string(),
});
export default function LibraryImportWorkflowDialog() {
const {
onSubmit,
isConverting,
isOpen,
setIsOpen,
form,
importMode,
setImportMode,
} = useLibraryImportWorkflowDialog();
const hasInput =
importMode === "url"
? !!form.watch("templateUrl")
: !!form.watch("workflowFile");
return (
<Dialog
title="Import Workflow"
styling={{ maxWidth: "32rem" }}
controlled={{
isOpen,
set: setIsOpen,
}}
onClose={() => {
setIsOpen(false);
form.reset();
}}
>
<Dialog.Trigger>
<Button
data-testid="import-workflow-button"
variant="primary"
className="h-[2.78rem] w-full md:w-[14rem]"
size="small"
>
<ArrowsClockwiseIcon width={18} height={18} />
<span>Import workflow</span>
</Button>
</Dialog.Trigger>
<Dialog.Content>
{/* Mode toggle */}
<div className="mb-4 flex gap-2">
<Button
variant={importMode === "file" ? "primary" : "outline"}
size="small"
onClick={() => setImportMode("file")}
type="button"
>
Upload file
</Button>
<Button
variant={importMode === "url" ? "primary" : "outline"}
size="small"
onClick={() => setImportMode("url")}
type="button"
>
Paste URL
</Button>
</div>
<p className="mb-4 text-sm text-neutral-500">
Import workflows from n8n, Make.com, or Zapier. The workflow will be
automatically converted to an AutoGPT agent.
</p>
<Form
form={form}
onSubmit={onSubmit}
className="flex flex-col justify-center gap-0 px-1"
>
{importMode === "file" ? (
<FormField
control={form.control}
name="workflowFile"
render={({ field }) => (
<FormItem>
<FormControl>
<FileInput
mode="base64"
value={field.value}
onChange={field.onChange}
accept=".json,application/json"
placeholder="Workflow JSON file (n8n, Make.com, or Zapier export)"
maxFileSize={10 * 1024 * 1024}
showStorageNote={false}
className="mb-4 mt-2"
/>
</FormControl>
<FormMessage />
</FormItem>
)}
/>
) : (
<FormField
control={form.control}
name="templateUrl"
render={({ field }) => (
<FormItem>
<FormControl>
<Input
{...field}
id={field.name}
label="n8n template URL"
placeholder="https://n8n.io/workflows/1234"
className="mb-4 mt-2 w-full rounded-[10px]"
/>
</FormControl>
<FormMessage />
</FormItem>
)}
/>
)}
<Button
type="submit"
variant="primary"
className="min-w-[18rem]"
disabled={!hasInput || isConverting}
>
{isConverting ? (
<div className="flex items-center gap-2">
<LoadingSpinner size="small" className="text-white" />
<span>Parsing workflow...</span>
</div>
) : (
"Import to AutoPilot"
)}
</Button>
</Form>
</Dialog.Content>
</Dialog>
);
}

View File

@@ -1,89 +0,0 @@
import { usePostV2ImportAWorkflowFromAnotherToolN8nMakeComZapier } from "@/app/api/__generated__/endpoints/import/import";
import type { ImportWorkflowRequest } from "@/app/api/__generated__/models/importWorkflowRequest";
import type { ImportWorkflowResponse } from "@/app/api/__generated__/models/importWorkflowResponse";
import { useToast } from "@/components/molecules/Toast/use-toast";
import { zodResolver } from "@hookform/resolvers/zod";
import { useRouter } from "next/navigation";
import { useState } from "react";
import { useForm } from "react-hook-form";
import { z } from "zod";
import { importWorkflowFormSchema } from "./LibraryImportWorkflowDialog";
export function useLibraryImportWorkflowDialog() {
const [isOpen, setIsOpen] = useState(false);
const { toast } = useToast();
const router = useRouter();
const [importMode, setImportMode] = useState<"file" | "url">("file");
const { mutateAsync: importWorkflow, isPending: isConverting } =
usePostV2ImportAWorkflowFromAnotherToolN8nMakeComZapier();
const form = useForm<z.infer<typeof importWorkflowFormSchema>>({
resolver: zodResolver(importWorkflowFormSchema),
defaultValues: {
workflowFile: "",
templateUrl: "",
},
});
const onSubmit = async (values: z.infer<typeof importWorkflowFormSchema>) => {
try {
let body: ImportWorkflowRequest;
if (importMode === "url" && values.templateUrl) {
body = { template_url: values.templateUrl };
} else if (importMode === "file" && values.workflowFile) {
// Decode base64 file to JSON
const base64Match = values.workflowFile.match(
/^data:[^;]+;base64,(.+)$/,
);
if (!base64Match) {
throw new Error("Invalid file format");
}
const jsonString = atob(base64Match[1]);
const workflowJson = JSON.parse(jsonString);
body = { workflow_json: workflowJson };
} else {
throw new Error("Please provide a workflow file or template URL");
}
const response = await importWorkflow({ data: body });
// Cast needed: generated client returns union with error types,
// but errors throw before reaching here
const data = response.data as ImportWorkflowResponse;
setIsOpen(false);
form.reset();
toast({
title: "Workflow Parsed",
description: `Detected ${data.source_format} workflow "${data.source_name}". Redirecting to AutoPilot...`,
});
// Redirect to AutoPilot with the prompt pre-filled and auto-submitted
const encodedPrompt = encodeURIComponent(data.copilot_prompt);
router.push(`/copilot?autosubmit=true#prompt=${encodedPrompt}`);
} catch (error) {
console.error("Import failed:", error);
toast({
title: "Import Failed",
description:
error instanceof Error
? error.message
: "Failed to parse workflow. Please check the file format.",
variant: "destructive",
duration: 5000,
});
}
};
return {
onSubmit,
isConverting,
isOpen,
setIsOpen,
form,
importMode,
setImportMode,
};
}

View File

@@ -2920,46 +2920,6 @@
}
}
},
"/api/import/workflow": {
"post": {
"tags": ["v2", "import"],
"summary": "Import a workflow from another tool (n8n, Make.com, Zapier)",
"description": "Parse an external workflow and return a CoPilot prompt.\n\nAccepts either raw workflow JSON or a template URL (n8n only for now).\nThe workflow is parsed and described, then a structured prompt is returned\nfor CoPilot's agent-generator to handle the actual conversion.",
"operationId": "postV2Import a workflow from another tool (n8n, make.com, zapier)",
"requestBody": {
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/ImportWorkflowRequest" }
}
},
"required": true
},
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ImportWorkflowResponse"
}
}
}
},
"401": {
"$ref": "#/components/responses/HTTP401NotAuthenticatedError"
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
}
}
}
},
"security": [{ "HTTPBearerJWT": [] }]
}
},
"/api/integrations/ayrshare/sso_url": {
"get": {
"tags": ["v1", "integrations"],
@@ -10011,35 +9971,6 @@
"required": ["image_url"],
"title": "ImageURLResponse"
},
"ImportWorkflowRequest": {
"properties": {
"workflow_json": {
"anyOf": [
{ "additionalProperties": true, "type": "object" },
{ "type": "null" }
],
"title": "Workflow Json"
},
"template_url": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Template Url"
}
},
"type": "object",
"title": "ImportWorkflowRequest",
"description": "Request body for importing an external workflow."
},
"ImportWorkflowResponse": {
"properties": {
"copilot_prompt": { "type": "string", "title": "Copilot Prompt" },
"source_format": { "type": "string", "title": "Source Format" },
"source_name": { "type": "string", "title": "Source Name" }
},
"type": "object",
"required": ["copilot_prompt", "source_format", "source_name"],
"title": "ImportWorkflowResponse",
"description": "Response from parsing an external workflow.\n\nReturns a CoPilot prompt that the frontend uses to redirect the user\nto CoPilot, where the agentic agent-generator handles the conversion."
},
"InputValidationErrorResponse": {
"properties": {
"type": {

View File

@@ -0,0 +1,343 @@
# Workspace & Media File Architecture
This document describes the architecture for handling user files in AutoGPT Platform, covering persistent user storage (Workspace) and ephemeral media processing pipelines.
## Overview
The platform has two distinct file-handling layers:
| Layer | Purpose | Persistence | Scope |
|-------|---------|-------------|-------|
| **Workspace** | Long-term user file storage | Persistent (DB + GCS/local) | Per-user, session-scoped access |
| **Media Pipeline** | Ephemeral file processing for blocks | Temporary (local disk) | Per-execution |
## Database Models
### UserWorkspace
Represents a user's file storage space. Created on-demand (one per user).
```prisma
model UserWorkspace {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
userId String @unique
Files UserWorkspaceFile[]
}
```
**Key points:**
- One workspace per user (enforced by `@unique` on `userId`)
- Created lazily via `get_or_create_workspace()`
- Uses upsert to handle race conditions
### UserWorkspaceFile
Represents a file stored in a user's workspace.
```prisma
model UserWorkspaceFile {
id String @id @default(uuid())
workspaceId String
name String // User-visible filename
path String // Virtual path (e.g., "/sessions/abc123/image.png")
storagePath String // Actual storage path (gcs://... or local://...)
mimeType String
sizeBytes BigInt
checksum String? // SHA256 for integrity
isDeleted Boolean @default(false)
deletedAt DateTime?
metadata Json @default("{}")
@@unique([workspaceId, path]) // Enforce unique paths within workspace
}
```
**Key points:**
- `path` is a virtual path for organizing files (not actual filesystem path)
- `storagePath` contains the actual GCS or local storage location
- Soft-delete pattern: `isDeleted` flag with `deletedAt` timestamp
- Path is modified on delete to free up the virtual path for reuse
---
## WorkspaceManager
**Location:** `backend/util/workspace.py`
High-level API for workspace file operations. Combines storage backend operations with database record management.
### Initialization
```python
from backend.util.workspace import WorkspaceManager
# Basic usage
manager = WorkspaceManager(user_id="user-123", workspace_id="ws-456")
# With session scoping (CoPilot sessions)
manager = WorkspaceManager(
user_id="user-123",
workspace_id="ws-456",
session_id="session-789"
)
```
### Session Scoping
When `session_id` is provided, files are isolated to `/sessions/{session_id}/`:
```python
# With session_id="abc123":
manager.write_file(content, "image.png")
# → stored at /sessions/abc123/image.png
# Cross-session access is explicit:
manager.read_file("/sessions/other-session/file.txt") # Works
```
**Why session scoping?**
- CoPilot conversations need file isolation
- Prevents file collisions between concurrent sessions
- Allows session cleanup without affecting other sessions
### Core Methods
| Method | Description |
|--------|-------------|
| `write_file(content, filename, path?, mime_type?, overwrite?)` | Write file to workspace |
| `read_file(path)` | Read file by virtual path |
| `read_file_by_id(file_id)` | Read file by ID |
| `list_files(path?, limit?, offset?, include_all_sessions?)` | List files |
| `delete_file(file_id)` | Soft-delete a file |
| `get_download_url(file_id, expires_in?)` | Get signed download URL |
| `get_file_info(file_id)` | Get file metadata |
| `get_file_info_by_path(path)` | Get file metadata by path |
| `get_file_count(path?, include_all_sessions?)` | Count files |
### Storage Backends
WorkspaceManager delegates to `WorkspaceStorageBackend`:
| Backend | When Used | Storage Path Format |
|---------|-----------|---------------------|
| `GCSWorkspaceStorage` | `media_gcs_bucket_name` is configured | `gcs://bucket/workspaces/{ws_id}/{file_id}/{filename}` |
| `LocalWorkspaceStorage` | No GCS bucket configured | `local://{ws_id}/{file_id}/{filename}` |
---
## store_media_file()
**Location:** `backend/util/file.py`
The media normalization pipeline. Handles various input types and normalizes them for processing or output.
### Purpose
Blocks receive files in many formats (URLs, data URIs, workspace references, local paths). `store_media_file()` normalizes these to a consistent format based on what the block needs.
### Input Types Handled
| Input Format | Example | How It's Processed |
|--------------|---------|-------------------|
| Data URI | `data:image/png;base64,iVBOR...` | Decoded, virus scanned, written locally |
| HTTP(S) URL | `https://example.com/image.png` | Downloaded, virus scanned, written locally |
| Workspace URI | `workspace://abc123` or `workspace:///path/to/file` | Read from workspace, virus scanned, written locally |
| Cloud path | `gcs://bucket/path` | Downloaded, virus scanned, written locally |
| Local path | `image.png` | Verified to exist in exec_file directory |
### Return Formats
The `return_format` parameter determines what you get back:
```python
from backend.util.file import store_media_file
# For local processing (ffmpeg, MoviePy, PIL)
local_path = await store_media_file(
file=input_file,
execution_context=ctx,
return_format="for_local_processing"
)
# Returns: "image.png" (relative path in exec_file dir)
# For external APIs (Replicate, OpenAI, etc.)
data_uri = await store_media_file(
file=input_file,
execution_context=ctx,
return_format="for_external_api"
)
# Returns: "data:image/png;base64,iVBOR..."
# For block output (adapts to execution context)
output = await store_media_file(
file=input_file,
execution_context=ctx,
return_format="for_block_output"
)
# In CoPilot: Returns "workspace://file-id#image/png"
# In graphs: Returns "data:image/png;base64,..."
```
### Execution Context
`store_media_file()` requires an `ExecutionContext` with:
- `graph_exec_id` - Required for temp file location
- `user_id` - Required for workspace access
- `workspace_id` - Optional; enables workspace features
- `session_id` - Optional; for session scoping in CoPilot
---
## Responsibility Boundaries
### Virus Scanning
| Component | Scans? | Notes |
|-----------|--------|-------|
| `store_media_file()` | ✅ Yes | Scans **all** content before writing to local disk |
| `WorkspaceManager.write_file()` | ✅ Yes | Scans content before persisting |
**Scanning happens at:**
1. `store_media_file()` — scans everything it downloads/decodes
2. `WorkspaceManager.write_file()` — scans before persistence
Tools like `WriteWorkspaceFileTool` don't need to scan because `WorkspaceManager.write_file()` handles it.
### Persistence
| Component | Persists To | Lifecycle |
|-----------|-------------|-----------|
| `store_media_file()` | Temp dir (`/tmp/exec_file/{exec_id}/`) | Cleaned after execution |
| `WorkspaceManager` | GCS or local storage + DB | Persistent until deleted |
**Automatic cleanup:** `clean_exec_files(graph_exec_id)` removes temp files after execution completes.
---
## Decision Tree: WorkspaceManager vs store_media_file
```text
┌─────────────────────────────────────────────────────┐
│ What do you need to do with the file? │
└─────────────────────────────────────────────────────┘
┌─────────────┴─────────────┐
▼ ▼
Process in a block Store for user access
(ffmpeg, PIL, etc.) (CoPilot files, uploads)
│ │
▼ ▼
store_media_file() WorkspaceManager
with appropriate
return_format
┌──────┴──────┐
▼ ▼
"for_local_ "for_block_
processing" output"
│ │
▼ ▼
Get local Auto-saves to
path for workspace in
tools CoPilot context
Store for user access
├── write_file() ─── Upload + persist (scans internally)
├── read_file() / get_download_url() ─── Retrieve
└── list_files() / delete_file() ─── Manage
```
### Quick Reference
| Scenario | Use |
|----------|-----|
| Block needs to process a file with ffmpeg | `store_media_file(..., return_format="for_local_processing")` |
| Block needs to send file to external API | `store_media_file(..., return_format="for_external_api")` |
| Block returning a generated file | `store_media_file(..., return_format="for_block_output")` |
| API endpoint handling file upload | `WorkspaceManager.write_file()` (handles virus scanning internally) |
| API endpoint serving file download | `WorkspaceManager.get_download_url()` |
| Listing user's files | `WorkspaceManager.list_files()` |
---
## Key Files Reference
| File | Purpose |
|------|---------|
| `backend/data/workspace.py` | Database CRUD operations for UserWorkspace and UserWorkspaceFile |
| `backend/util/workspace.py` | `WorkspaceManager` class - high-level workspace API |
| `backend/util/workspace_storage.py` | Storage backends (GCS, local) and `WorkspaceStorageBackend` interface |
| `backend/util/file.py` | `store_media_file()` and media processing utilities |
| `backend/util/virus_scanner.py` | `VirusScannerService` and `scan_content_safe()` |
| `schema.prisma` | Database model definitions |
---
## Common Patterns
### Block Processing a User's File
```python
async def run(self, input_data, *, execution_context, **kwargs):
# Normalize input to local path
local_path = await store_media_file(
file=input_data.video,
execution_context=execution_context,
return_format="for_local_processing",
)
# Process with local tools
output_path = process_video(local_path)
# Return (auto-saves to workspace in CoPilot)
result = await store_media_file(
file=output_path,
execution_context=execution_context,
return_format="for_block_output",
)
yield "output", result
```
### API Upload Endpoint
```python
from backend.util.virus_scanner import VirusDetectedError, VirusScanError
async def upload_file(file: UploadFile, user_id: str, workspace_id: str):
content = await file.read()
# write_file handles virus scanning internally
manager = WorkspaceManager(user_id, workspace_id)
try:
workspace_file = await manager.write_file(
content=content,
filename=file.filename,
)
except VirusDetectedError:
raise HTTPException(status_code=400, detail="File rejected: virus detected")
except VirusScanError:
raise HTTPException(status_code=503, detail="Virus scanning unavailable")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
return {"file_id": workspace_file.id}
```
---
## Configuration
| Setting | Purpose | Default |
|---------|---------|---------|
| `media_gcs_bucket_name` | GCS bucket for workspace storage | None (uses local) |
| `workspace_storage_dir` | Local storage directory | `{app_data}/workspaces` |
| `max_file_size_mb` | Maximum file size in MB | 100 |
| `clamav_service_enabled` | Enable virus scanning | true |
| `clamav_service_host` | ClamAV daemon host | localhost |
| `clamav_service_port` | ClamAV daemon port | 3310 |
| `clamav_max_concurrency` | Max concurrent scans to ClamAV daemon | 5 |
| `clamav_mark_failed_scans_as_clean` | If true, scan failures pass content through instead of rejecting (⚠️ security risk if ClamAV is unreachable) | false |