mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-03-17 03:00:27 -04:00
Compare commits
1 Commits
feat/workf
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8892bcd230 |
@@ -178,6 +178,16 @@ yield "image_url", result_url
|
||||
3. Write tests alongside the route file
|
||||
4. Run `poetry run test` to verify
|
||||
|
||||
## Workspace & Media Files
|
||||
|
||||
**Read [Workspace & Media Architecture](../../docs/platform/workspace-media-architecture.md) when:**
|
||||
- Working on CoPilot file upload/download features
|
||||
- Building blocks that handle `MediaFileType` inputs/outputs
|
||||
- Modifying `WorkspaceManager` or `store_media_file()`
|
||||
- Debugging file persistence or virus scanning issues
|
||||
|
||||
Covers: `WorkspaceManager` (persistent storage with session scoping), `store_media_file()` (media normalization pipeline), and responsibility boundaries for virus scanning and persistence.
|
||||
|
||||
## Security Implementation
|
||||
|
||||
### Cache Protection Middleware
|
||||
|
||||
@@ -1,100 +0,0 @@
|
||||
"""API endpoint for importing external workflows via CoPilot."""
|
||||
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
import pydantic
|
||||
from autogpt_libs.auth import requires_user
|
||||
from fastapi import APIRouter, HTTPException, Security
|
||||
|
||||
from backend.copilot.workflow_import.converter import build_copilot_prompt
|
||||
from backend.copilot.workflow_import.describers import describe_workflow
|
||||
from backend.copilot.workflow_import.format_detector import (
|
||||
SourcePlatform,
|
||||
detect_format,
|
||||
)
|
||||
from backend.copilot.workflow_import.url_fetcher import fetch_n8n_template
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
class ImportWorkflowRequest(pydantic.BaseModel):
|
||||
"""Request body for importing an external workflow."""
|
||||
|
||||
workflow_json: dict[str, Any] | None = None
|
||||
template_url: str | None = None
|
||||
|
||||
@pydantic.model_validator(mode="after")
|
||||
def check_exactly_one_source(self) -> "ImportWorkflowRequest":
|
||||
has_json = self.workflow_json is not None
|
||||
has_url = self.template_url is not None
|
||||
if not has_json and not has_url:
|
||||
raise ValueError("Provide either 'workflow_json' or 'template_url'")
|
||||
if has_json and has_url:
|
||||
raise ValueError(
|
||||
"Provide only one of 'workflow_json' or 'template_url', not both"
|
||||
)
|
||||
return self
|
||||
|
||||
|
||||
class ImportWorkflowResponse(pydantic.BaseModel):
|
||||
"""Response from parsing an external workflow.
|
||||
|
||||
Returns a CoPilot prompt that the frontend uses to redirect the user
|
||||
to CoPilot, where the agentic agent-generator handles the conversion.
|
||||
"""
|
||||
|
||||
copilot_prompt: str
|
||||
source_format: str
|
||||
source_name: str
|
||||
|
||||
|
||||
@router.post(
|
||||
path="/workflow",
|
||||
summary="Import a workflow from another tool (n8n, Make.com, Zapier)",
|
||||
dependencies=[Security(requires_user)],
|
||||
)
|
||||
async def import_workflow(
|
||||
request: ImportWorkflowRequest,
|
||||
) -> ImportWorkflowResponse:
|
||||
"""Parse an external workflow and return a CoPilot prompt.
|
||||
|
||||
Accepts either raw workflow JSON or a template URL (n8n only for now).
|
||||
The workflow is parsed and described, then a structured prompt is returned
|
||||
for CoPilot's agent-generator to handle the actual conversion.
|
||||
"""
|
||||
# Step 1: Get the raw workflow JSON
|
||||
if request.template_url is not None:
|
||||
try:
|
||||
workflow_json = await fetch_n8n_template(request.template_url)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e)) from e
|
||||
except RuntimeError as e:
|
||||
raise HTTPException(status_code=502, detail=str(e)) from e
|
||||
else:
|
||||
workflow_json = request.workflow_json
|
||||
assert workflow_json is not None # guaranteed by validator
|
||||
|
||||
# Step 2: Detect format
|
||||
fmt = detect_format(workflow_json)
|
||||
if fmt == SourcePlatform.UNKNOWN:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Could not detect workflow format. Supported formats: "
|
||||
"n8n, Make.com, Zapier. Ensure you're uploading a valid "
|
||||
"workflow export file.",
|
||||
)
|
||||
|
||||
# Step 3: Describe the workflow
|
||||
desc = describe_workflow(workflow_json, fmt)
|
||||
|
||||
# Step 4: Build CoPilot prompt
|
||||
prompt = build_copilot_prompt(desc)
|
||||
|
||||
return ImportWorkflowResponse(
|
||||
copilot_prompt=prompt,
|
||||
source_format=fmt.value,
|
||||
source_name=desc.name,
|
||||
)
|
||||
@@ -1,173 +0,0 @@
|
||||
"""Tests for workflow_import.py API endpoint."""
|
||||
|
||||
from unittest.mock import AsyncMock
|
||||
|
||||
import fastapi
|
||||
import pytest
|
||||
from autogpt_libs.auth.jwt_utils import get_jwt_payload
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from backend.api.features.workflow_import import router
|
||||
|
||||
app = fastapi.FastAPI()
|
||||
app.include_router(router)
|
||||
client = TestClient(app)
|
||||
|
||||
# Sample workflow fixtures
|
||||
N8N_WORKFLOW = {
|
||||
"name": "Email on Webhook",
|
||||
"nodes": [
|
||||
{
|
||||
"name": "Webhook",
|
||||
"type": "n8n-nodes-base.webhookTrigger",
|
||||
"parameters": {"path": "/incoming"},
|
||||
},
|
||||
{
|
||||
"name": "Send Email",
|
||||
"type": "n8n-nodes-base.gmail",
|
||||
"parameters": {"resource": "message", "operation": "send"},
|
||||
},
|
||||
],
|
||||
"connections": {
|
||||
"Webhook": {"main": [[{"node": "Send Email", "type": "main", "index": 0}]]}
|
||||
},
|
||||
}
|
||||
|
||||
MAKE_WORKFLOW = {
|
||||
"name": "Sheets to Calendar",
|
||||
"flow": [
|
||||
{
|
||||
"module": "google-sheets:watchUpdatedCells",
|
||||
"mapper": {"spreadsheetId": "abc"},
|
||||
},
|
||||
{
|
||||
"module": "google-calendar:createAnEvent",
|
||||
"mapper": {"title": "Meeting"},
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
ZAPIER_WORKFLOW = {
|
||||
"name": "Gmail to Slack",
|
||||
"steps": [
|
||||
{"app": "Gmail", "action": "new_email"},
|
||||
{"app": "Slack", "action": "send_message", "params": {"channel": "#alerts"}},
|
||||
],
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_app_auth(mock_jwt_user):
|
||||
app.dependency_overrides[get_jwt_payload] = mock_jwt_user["get_jwt_payload"]
|
||||
yield
|
||||
app.dependency_overrides.clear()
|
||||
|
||||
|
||||
class TestImportWorkflow:
|
||||
def test_import_n8n_workflow(self):
|
||||
response = client.post(
|
||||
"/workflow",
|
||||
json={"workflow_json": N8N_WORKFLOW},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["source_format"] == "n8n"
|
||||
assert data["source_name"] == "Email on Webhook"
|
||||
assert "copilot_prompt" in data
|
||||
assert "n8n" in data["copilot_prompt"]
|
||||
assert "Email on Webhook" in data["copilot_prompt"]
|
||||
|
||||
def test_import_make_workflow(self):
|
||||
response = client.post(
|
||||
"/workflow",
|
||||
json={"workflow_json": MAKE_WORKFLOW},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["source_format"] == "make"
|
||||
assert data["source_name"] == "Sheets to Calendar"
|
||||
assert "copilot_prompt" in data
|
||||
|
||||
def test_import_zapier_workflow(self):
|
||||
response = client.post(
|
||||
"/workflow",
|
||||
json={"workflow_json": ZAPIER_WORKFLOW},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["source_format"] == "zapier"
|
||||
assert data["source_name"] == "Gmail to Slack"
|
||||
assert "copilot_prompt" in data
|
||||
|
||||
def test_prompt_includes_steps(self):
|
||||
response = client.post(
|
||||
"/workflow",
|
||||
json={"workflow_json": N8N_WORKFLOW},
|
||||
)
|
||||
prompt = response.json()["copilot_prompt"]
|
||||
# Should include step details from the workflow
|
||||
assert "Webhook" in prompt or "webhook" in prompt
|
||||
assert "Gmail" in prompt or "gmail" in prompt
|
||||
|
||||
def test_no_source_provided(self):
|
||||
response = client.post(
|
||||
"/workflow",
|
||||
json={},
|
||||
)
|
||||
assert response.status_code == 422 # Pydantic validation error
|
||||
|
||||
def test_both_sources_provided(self):
|
||||
response = client.post(
|
||||
"/workflow",
|
||||
json={
|
||||
"workflow_json": N8N_WORKFLOW,
|
||||
"template_url": "https://n8n.io/workflows/123",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 422
|
||||
|
||||
def test_unknown_format_returns_400(self):
|
||||
response = client.post(
|
||||
"/workflow",
|
||||
json={"workflow_json": {"foo": "bar"}},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert "Could not detect workflow format" in response.json()["detail"]
|
||||
|
||||
def test_url_fetch_bad_url_returns_400(self, mocker):
|
||||
mocker.patch(
|
||||
"backend.api.features.workflow_import.fetch_n8n_template",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=ValueError("Invalid URL format"),
|
||||
)
|
||||
response = client.post(
|
||||
"/workflow",
|
||||
json={"template_url": "https://bad-url.com"},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert "Invalid URL format" in response.json()["detail"]
|
||||
|
||||
def test_url_fetch_upstream_error_returns_502(self, mocker):
|
||||
mocker.patch(
|
||||
"backend.api.features.workflow_import.fetch_n8n_template",
|
||||
new_callable=AsyncMock,
|
||||
side_effect=RuntimeError("n8n API returned 500"),
|
||||
)
|
||||
response = client.post(
|
||||
"/workflow",
|
||||
json={"template_url": "https://n8n.io/workflows/123"},
|
||||
)
|
||||
assert response.status_code == 502
|
||||
assert "n8n API returned 500" in response.json()["detail"]
|
||||
|
||||
def test_response_model_shape(self):
|
||||
response = client.post(
|
||||
"/workflow",
|
||||
json={"workflow_json": N8N_WORKFLOW},
|
||||
)
|
||||
data = response.json()
|
||||
assert "copilot_prompt" in data
|
||||
assert "source_format" in data
|
||||
assert "source_name" in data
|
||||
assert isinstance(data["copilot_prompt"], str)
|
||||
assert len(data["copilot_prompt"]) > 0
|
||||
@@ -34,7 +34,6 @@ import backend.api.features.postmark.postmark
|
||||
import backend.api.features.store.model
|
||||
import backend.api.features.store.routes
|
||||
import backend.api.features.v1
|
||||
import backend.api.features.workflow_import
|
||||
import backend.api.features.workspace.routes as workspace_routes
|
||||
import backend.data.block
|
||||
import backend.data.db
|
||||
@@ -355,11 +354,6 @@ app.include_router(
|
||||
tags=["oauth"],
|
||||
prefix="/api/oauth",
|
||||
)
|
||||
app.include_router(
|
||||
backend.api.features.workflow_import.router,
|
||||
tags=["v2", "import"],
|
||||
prefix="/api/import",
|
||||
)
|
||||
|
||||
app.mount("/external-api", external_api)
|
||||
|
||||
|
||||
@@ -1,16 +0,0 @@
|
||||
"""Workflow import module.
|
||||
|
||||
Parses workflows from n8n, Make.com, and Zapier into structured descriptions,
|
||||
then builds CoPilot prompts for the agentic agent-generator to handle conversion.
|
||||
"""
|
||||
|
||||
from .converter import build_copilot_prompt
|
||||
from .format_detector import SourcePlatform, detect_format
|
||||
from .models import WorkflowDescription
|
||||
|
||||
__all__ = [
|
||||
"SourcePlatform",
|
||||
"WorkflowDescription",
|
||||
"build_copilot_prompt",
|
||||
"detect_format",
|
||||
]
|
||||
@@ -1,49 +0,0 @@
|
||||
"""Build a CoPilot prompt from a WorkflowDescription.
|
||||
|
||||
Instead of a custom single-shot LLM conversion, we generate a structured
|
||||
prompt that CoPilot's existing agentic agent-generator handles. This reuses
|
||||
the multi-turn tool-use pipeline (find_block, create_agent, fixer, validator)
|
||||
for reliable workflow-to-agent conversion.
|
||||
"""
|
||||
|
||||
import json
|
||||
|
||||
from .models import WorkflowDescription
|
||||
|
||||
|
||||
def build_copilot_prompt(desc: WorkflowDescription) -> str:
|
||||
"""Build a CoPilot prompt from a parsed WorkflowDescription.
|
||||
|
||||
The prompt describes the external workflow in enough detail for CoPilot's
|
||||
agent-generator to recreate it as an AutoGPT agent graph.
|
||||
|
||||
Args:
|
||||
desc: Structured description of the source workflow.
|
||||
|
||||
Returns:
|
||||
A user-facing prompt string for CoPilot.
|
||||
"""
|
||||
steps_text = ""
|
||||
for step in desc.steps:
|
||||
conns = (
|
||||
f" → connects to steps {step.connections_to}" if step.connections_to else ""
|
||||
)
|
||||
params_str = ""
|
||||
if step.parameters:
|
||||
truncated = json.dumps(step.parameters, default=str)[:300]
|
||||
params_str = f" (params: {truncated})"
|
||||
steps_text += (
|
||||
f" {step.order}. [{step.service}] {step.action}{params_str}{conns}\n"
|
||||
)
|
||||
|
||||
trigger_line = f"Trigger: {desc.trigger_type}" if desc.trigger_type else ""
|
||||
|
||||
return f"""I want to import a workflow from {desc.source_format.value} and recreate it as an AutoGPT agent.
|
||||
|
||||
**Workflow name**: {desc.name}
|
||||
**Description**: {desc.description}
|
||||
{trigger_line}
|
||||
|
||||
**Steps** (from the original {desc.source_format.value} workflow):
|
||||
{steps_text}
|
||||
Please build an AutoGPT agent that replicates this workflow. Map each step to the most appropriate AutoGPT block(s), wire them together, and save it.""".strip()
|
||||
@@ -1,269 +0,0 @@
|
||||
"""Extract structured WorkflowDescription from external workflow JSONs.
|
||||
|
||||
Each describer is a pure function that deterministically parses the source
|
||||
format into a platform-agnostic WorkflowDescription. No LLM calls are made here.
|
||||
"""
|
||||
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
from .models import SourcePlatform, StepDescription, WorkflowDescription
|
||||
|
||||
|
||||
def describe_workflow(
|
||||
json_data: dict[str, Any], fmt: SourcePlatform
|
||||
) -> WorkflowDescription:
|
||||
"""Route to the appropriate describer based on detected format."""
|
||||
describers = {
|
||||
SourcePlatform.N8N: describe_n8n_workflow,
|
||||
SourcePlatform.MAKE: describe_make_workflow,
|
||||
SourcePlatform.ZAPIER: describe_zapier_workflow,
|
||||
}
|
||||
describer = describers.get(fmt)
|
||||
if not describer:
|
||||
raise ValueError(f"No describer available for format: {fmt}")
|
||||
result = describer(json_data)
|
||||
if not result.steps:
|
||||
raise ValueError(f"Workflow contains no steps (format: {fmt.value})")
|
||||
return result
|
||||
|
||||
|
||||
def describe_n8n_workflow(json_data: dict[str, Any]) -> WorkflowDescription:
|
||||
"""Extract a structured description from an n8n workflow JSON."""
|
||||
nodes = json_data.get("nodes", [])
|
||||
connections = json_data.get("connections", {})
|
||||
|
||||
# Build node index by name for connection resolution
|
||||
node_index: dict[str, int] = {}
|
||||
steps: list[StepDescription] = []
|
||||
|
||||
for i, node in enumerate(nodes):
|
||||
if not isinstance(node, dict):
|
||||
continue
|
||||
|
||||
node_name = node.get("name", f"Node {i}")
|
||||
node_index[node_name] = len(steps)
|
||||
|
||||
node_type = node.get("type", "unknown")
|
||||
# Extract service name from type (e.g., "n8n-nodes-base.gmail" -> "Gmail")
|
||||
service = _extract_n8n_service(node_type)
|
||||
|
||||
# Build action description from type and parameters
|
||||
params = node.get("parameters", {})
|
||||
if not isinstance(params, dict):
|
||||
params = {}
|
||||
action = _describe_n8n_action(node_type, node_name, params)
|
||||
|
||||
# Extract key parameters (skip large/internal ones)
|
||||
clean_params = _clean_params(params)
|
||||
|
||||
steps.append(
|
||||
StepDescription(
|
||||
order=len(steps),
|
||||
action=action,
|
||||
service=service,
|
||||
parameters=clean_params,
|
||||
connections_to=[], # filled below
|
||||
)
|
||||
)
|
||||
|
||||
# Resolve connections: n8n format is {NodeName: {main: [[{node, type, index}]]}}
|
||||
for source_name, conn_data in connections.items():
|
||||
source_idx = node_index.get(source_name)
|
||||
if source_idx is None:
|
||||
continue
|
||||
main_outputs = conn_data.get("main", [])
|
||||
for output_group in main_outputs:
|
||||
if not isinstance(output_group, list):
|
||||
continue
|
||||
for conn in output_group:
|
||||
if not isinstance(conn, dict):
|
||||
continue
|
||||
target_name = conn.get("node")
|
||||
if not isinstance(target_name, str):
|
||||
continue
|
||||
target_idx = node_index.get(target_name)
|
||||
if target_idx is not None:
|
||||
steps[source_idx].connections_to.append(target_idx)
|
||||
|
||||
# Detect trigger type
|
||||
trigger_type = None
|
||||
if nodes and isinstance(nodes[0], dict):
|
||||
first_type = nodes[0].get("type", "")
|
||||
if isinstance(first_type, str) and (
|
||||
"trigger" in first_type.lower() or "webhook" in first_type.lower()
|
||||
):
|
||||
trigger_type = _extract_n8n_service(first_type)
|
||||
|
||||
return WorkflowDescription(
|
||||
name=json_data.get("name", "Imported n8n Workflow"),
|
||||
description=_build_workflow_summary(steps),
|
||||
steps=steps,
|
||||
trigger_type=trigger_type,
|
||||
source_format=SourcePlatform.N8N,
|
||||
)
|
||||
|
||||
|
||||
def describe_make_workflow(json_data: dict[str, Any]) -> WorkflowDescription:
|
||||
"""Extract a structured description from a Make.com scenario blueprint."""
|
||||
flow = json_data.get("flow", [])
|
||||
valid_modules = [m for m in flow if isinstance(m, dict)]
|
||||
steps: list[StepDescription] = []
|
||||
|
||||
for i, module in enumerate(valid_modules):
|
||||
module_ref = module.get("module", "unknown:unknown")
|
||||
if not isinstance(module_ref, str):
|
||||
module_ref = "unknown:unknown"
|
||||
parts = module_ref.split(":", 1)
|
||||
service = parts[0].replace("-", " ").title() if parts else "Unknown"
|
||||
action_verb = parts[1] if len(parts) > 1 else "process"
|
||||
|
||||
# Build human-readable action
|
||||
action = f"{str(action_verb).replace(':', ' ').title()} via {service}"
|
||||
|
||||
params = module.get("mapper", module.get("parameters", {}))
|
||||
clean_params = _clean_params(params) if isinstance(params, dict) else {}
|
||||
|
||||
# Check for routes (branching) — routers don't connect sequentially
|
||||
routes = module.get("routes", [])
|
||||
if routes:
|
||||
# Router modules branch; don't assign sequential connections
|
||||
connections_to: list[int] = []
|
||||
clean_params["_has_routes"] = len(routes)
|
||||
else:
|
||||
# Make.com flows are sequential by default; each step connects to next
|
||||
connections_to = [i + 1] if i < len(valid_modules) - 1 else []
|
||||
|
||||
steps.append(
|
||||
StepDescription(
|
||||
order=i,
|
||||
action=action,
|
||||
service=service,
|
||||
parameters=clean_params,
|
||||
connections_to=connections_to,
|
||||
)
|
||||
)
|
||||
|
||||
# Detect trigger
|
||||
trigger_type = None
|
||||
if flow and isinstance(flow[0], dict):
|
||||
first_module = flow[0].get("module", "")
|
||||
if isinstance(first_module, str) and (
|
||||
"watch" in first_module.lower() or "trigger" in first_module.lower()
|
||||
):
|
||||
trigger_type = first_module.split(":")[0].replace("-", " ").title()
|
||||
|
||||
return WorkflowDescription(
|
||||
name=json_data.get("name", "Imported Make.com Scenario"),
|
||||
description=_build_workflow_summary(steps),
|
||||
steps=steps,
|
||||
trigger_type=trigger_type,
|
||||
source_format=SourcePlatform.MAKE,
|
||||
)
|
||||
|
||||
|
||||
def describe_zapier_workflow(json_data: dict[str, Any]) -> WorkflowDescription:
|
||||
"""Extract a structured description from a Zapier Zap JSON."""
|
||||
zap_steps = json_data.get("steps", [])
|
||||
valid_steps = [s for s in zap_steps if isinstance(s, dict)]
|
||||
steps: list[StepDescription] = []
|
||||
|
||||
for i, step in enumerate(valid_steps):
|
||||
app = step.get("app", "Unknown")
|
||||
action = step.get("action", "process")
|
||||
action_desc = f"{str(action).replace('_', ' ').title()} via {app}"
|
||||
|
||||
params = step.get("params", step.get("inputFields", {}))
|
||||
clean_params = _clean_params(params) if isinstance(params, dict) else {}
|
||||
|
||||
# Zapier zaps are linear: each step connects to next
|
||||
connections_to = [i + 1] if i < len(valid_steps) - 1 else []
|
||||
|
||||
steps.append(
|
||||
StepDescription(
|
||||
order=i,
|
||||
action=action_desc,
|
||||
service=app,
|
||||
parameters=clean_params,
|
||||
connections_to=connections_to,
|
||||
)
|
||||
)
|
||||
|
||||
trigger_type = None
|
||||
if valid_steps:
|
||||
trigger_type = valid_steps[0].get("app")
|
||||
|
||||
return WorkflowDescription(
|
||||
name=json_data.get("name", json_data.get("title", "Imported Zapier Zap")),
|
||||
description=_build_workflow_summary(steps),
|
||||
steps=steps,
|
||||
trigger_type=trigger_type,
|
||||
source_format=SourcePlatform.ZAPIER,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _extract_n8n_service(node_type: str) -> str:
|
||||
"""Extract a human-readable service name from an n8n node type.
|
||||
|
||||
Examples:
|
||||
"n8n-nodes-base.gmail" -> "Gmail"
|
||||
"@n8n/n8n-nodes-langchain.agent" -> "Langchain Agent"
|
||||
"n8n-nodes-base.httpRequest" -> "Http Request"
|
||||
"""
|
||||
# Strip common prefixes
|
||||
name = node_type
|
||||
for prefix in ("n8n-nodes-base.", "@n8n/n8n-nodes-langchain.", "@n8n/"):
|
||||
if name.startswith(prefix):
|
||||
name = name[len(prefix) :]
|
||||
break
|
||||
|
||||
# Convert camelCase to Title Case
|
||||
name = re.sub(r"([a-z])([A-Z])", r"\1 \2", name)
|
||||
return name.replace(".", " ").replace("-", " ").title()
|
||||
|
||||
|
||||
def _describe_n8n_action(node_type: str, node_name: str, params: dict[str, Any]) -> str:
|
||||
"""Build a human-readable action description for an n8n node."""
|
||||
service = _extract_n8n_service(node_type)
|
||||
resource = str(params.get("resource", ""))
|
||||
operation = str(params.get("operation", ""))
|
||||
|
||||
if resource and operation:
|
||||
return f"{operation.title()} {resource} via {service}"
|
||||
if operation:
|
||||
return f"{operation.title()} via {service}"
|
||||
return f"{node_name} ({service})"
|
||||
|
||||
|
||||
def _clean_params(params: dict[str, Any], max_keys: int = 10) -> dict[str, Any]:
|
||||
"""Extract key parameters, skipping large or internal values."""
|
||||
cleaned: dict[str, Any] = {}
|
||||
for key, value in list(params.items())[:max_keys]:
|
||||
if key.startswith("_") or key in ("credentials", "webhookId"):
|
||||
continue
|
||||
if isinstance(value, str) and len(value) > 500:
|
||||
cleaned[key] = value[:500] + "..."
|
||||
elif isinstance(value, (str, int, float, bool)):
|
||||
cleaned[key] = value
|
||||
elif isinstance(value, list) and len(value) <= 5:
|
||||
cleaned[key] = value
|
||||
return cleaned
|
||||
|
||||
|
||||
def _build_workflow_summary(steps: list[StepDescription]) -> str:
|
||||
"""Build a one-line summary of the workflow from its steps."""
|
||||
if not steps:
|
||||
return "Empty workflow"
|
||||
services = []
|
||||
for s in steps:
|
||||
if s.service not in services:
|
||||
services.append(s.service)
|
||||
service_chain = " -> ".join(services[:6])
|
||||
if len(services) > 6:
|
||||
service_chain += f" (and {len(services) - 6} more)"
|
||||
return f"Workflow with {len(steps)} steps: {service_chain}"
|
||||
@@ -1,135 +0,0 @@
|
||||
"""Tests for describers.py."""
|
||||
|
||||
import pytest
|
||||
|
||||
from .describers import (
|
||||
describe_make_workflow,
|
||||
describe_n8n_workflow,
|
||||
describe_workflow,
|
||||
describe_zapier_workflow,
|
||||
)
|
||||
from .models import SourcePlatform
|
||||
|
||||
|
||||
class TestDescribeN8nWorkflow:
|
||||
def test_basic_workflow(self):
|
||||
data = {
|
||||
"name": "Email on Webhook",
|
||||
"nodes": [
|
||||
{
|
||||
"name": "Webhook",
|
||||
"type": "n8n-nodes-base.webhookTrigger",
|
||||
"parameters": {"path": "/incoming"},
|
||||
},
|
||||
{
|
||||
"name": "Send Email",
|
||||
"type": "n8n-nodes-base.gmail",
|
||||
"parameters": {"resource": "message", "operation": "send"},
|
||||
},
|
||||
],
|
||||
"connections": {
|
||||
"Webhook": {
|
||||
"main": [[{"node": "Send Email", "type": "main", "index": 0}]]
|
||||
}
|
||||
},
|
||||
}
|
||||
desc = describe_n8n_workflow(data)
|
||||
assert desc.name == "Email on Webhook"
|
||||
assert desc.source_format == SourcePlatform.N8N
|
||||
assert len(desc.steps) == 2
|
||||
assert desc.steps[0].connections_to == [1]
|
||||
assert desc.steps[1].connections_to == []
|
||||
assert desc.trigger_type is not None
|
||||
|
||||
def test_step_extraction(self):
|
||||
data = {
|
||||
"name": "Test",
|
||||
"nodes": [
|
||||
{
|
||||
"name": "HTTP",
|
||||
"type": "n8n-nodes-base.httpRequest",
|
||||
"parameters": {"url": "https://example.com", "method": "GET"},
|
||||
},
|
||||
],
|
||||
"connections": {},
|
||||
}
|
||||
desc = describe_n8n_workflow(data)
|
||||
step = desc.steps[0]
|
||||
assert step.service == "Http Request"
|
||||
assert step.order == 0
|
||||
assert "url" in step.parameters
|
||||
|
||||
def test_empty_nodes(self):
|
||||
data = {"name": "Empty", "nodes": [], "connections": {}}
|
||||
desc = describe_n8n_workflow(data)
|
||||
assert len(desc.steps) == 0
|
||||
assert desc.trigger_type is None
|
||||
|
||||
|
||||
class TestDescribeMakeWorkflow:
|
||||
def test_basic_scenario(self):
|
||||
data = {
|
||||
"name": "Sheets to Calendar",
|
||||
"flow": [
|
||||
{
|
||||
"module": "google-sheets:watchUpdatedCells",
|
||||
"mapper": {"spreadsheetId": "abc"},
|
||||
},
|
||||
{
|
||||
"module": "google-calendar:createAnEvent",
|
||||
"mapper": {"title": "Meeting"},
|
||||
},
|
||||
],
|
||||
}
|
||||
desc = describe_make_workflow(data)
|
||||
assert desc.name == "Sheets to Calendar"
|
||||
assert desc.source_format == SourcePlatform.MAKE
|
||||
assert len(desc.steps) == 2
|
||||
# Sequential: step 0 connects to step 1
|
||||
assert desc.steps[0].connections_to == [1]
|
||||
assert desc.steps[1].connections_to == []
|
||||
assert desc.trigger_type is not None # "watch" in module name
|
||||
|
||||
def test_service_extraction(self):
|
||||
data = {
|
||||
"flow": [{"module": "slack:sendMessage", "mapper": {"text": "hello"}}],
|
||||
}
|
||||
desc = describe_make_workflow(data)
|
||||
assert desc.steps[0].service == "Slack"
|
||||
|
||||
|
||||
class TestDescribeZapierWorkflow:
|
||||
def test_basic_zap(self):
|
||||
data = {
|
||||
"name": "Gmail to Slack",
|
||||
"steps": [
|
||||
{"app": "Gmail", "action": "new_email"},
|
||||
{
|
||||
"app": "Slack",
|
||||
"action": "send_message",
|
||||
"params": {"channel": "#alerts"},
|
||||
},
|
||||
],
|
||||
}
|
||||
desc = describe_zapier_workflow(data)
|
||||
assert desc.name == "Gmail to Slack"
|
||||
assert desc.source_format == SourcePlatform.ZAPIER
|
||||
assert len(desc.steps) == 2
|
||||
assert desc.steps[0].connections_to == [1]
|
||||
assert desc.trigger_type == "Gmail"
|
||||
|
||||
|
||||
class TestDescribeWorkflowRouter:
|
||||
def test_routes_to_n8n(self):
|
||||
data = {
|
||||
"nodes": [
|
||||
{"name": "N", "type": "n8n-nodes-base.webhook", "parameters": {}}
|
||||
],
|
||||
"connections": {},
|
||||
}
|
||||
desc = describe_workflow(data, SourcePlatform.N8N)
|
||||
assert desc.source_format == SourcePlatform.N8N
|
||||
|
||||
def test_unknown_raises(self):
|
||||
with pytest.raises(ValueError, match="No describer"):
|
||||
describe_workflow({}, SourcePlatform.UNKNOWN)
|
||||
@@ -1,71 +0,0 @@
|
||||
"""Detect the source platform of a workflow JSON."""
|
||||
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
from .models import SourcePlatform
|
||||
|
||||
_N8N_TYPE_RE = re.compile(r"^(n8n-nodes-base\.|@n8n/)")
|
||||
|
||||
|
||||
def detect_format(json_data: dict[str, Any]) -> SourcePlatform:
|
||||
"""Inspect a workflow JSON and determine which platform it came from.
|
||||
|
||||
Args:
|
||||
json_data: The parsed JSON data from a workflow export file.
|
||||
|
||||
Returns:
|
||||
The detected SourcePlatform.
|
||||
"""
|
||||
if _is_n8n(json_data):
|
||||
return SourcePlatform.N8N
|
||||
if _is_make(json_data):
|
||||
return SourcePlatform.MAKE
|
||||
if _is_zapier(json_data):
|
||||
return SourcePlatform.ZAPIER
|
||||
return SourcePlatform.UNKNOWN
|
||||
|
||||
|
||||
def _is_n8n(data: dict[str, Any]) -> bool:
|
||||
"""n8n workflows have a `nodes` array with items containing `type` fields
|
||||
matching patterns like `n8n-nodes-base.*` or `@n8n/*`, plus a `connections`
|
||||
object."""
|
||||
nodes = data.get("nodes")
|
||||
connections = data.get("connections")
|
||||
if not isinstance(nodes, list) or not isinstance(connections, dict):
|
||||
return False
|
||||
if not nodes:
|
||||
return False
|
||||
# Check if at least one node has an n8n-style type
|
||||
return any(
|
||||
isinstance(n, dict)
|
||||
and isinstance(n.get("type"), str)
|
||||
and _N8N_TYPE_RE.match(n["type"])
|
||||
for n in nodes
|
||||
)
|
||||
|
||||
|
||||
def _is_make(data: dict[str, Any]) -> bool:
|
||||
"""Make.com scenarios have a `flow` array with items containing `module`
|
||||
fields in `service:action` URI format."""
|
||||
flow = data.get("flow")
|
||||
if not isinstance(flow, list) or not flow:
|
||||
return False
|
||||
# Check if at least one module has `service:action` pattern
|
||||
return any(
|
||||
isinstance(item, dict)
|
||||
and isinstance(item.get("module"), str)
|
||||
and ":" in item["module"]
|
||||
for item in flow
|
||||
)
|
||||
|
||||
|
||||
def _is_zapier(data: dict[str, Any]) -> bool:
|
||||
"""Zapier Zaps have a `steps` array with items containing `app` and
|
||||
`action` fields."""
|
||||
steps = data.get("steps")
|
||||
if not isinstance(steps, list) or not steps:
|
||||
return False
|
||||
return any(
|
||||
isinstance(step, dict) and "app" in step and "action" in step for step in steps
|
||||
)
|
||||
@@ -1,101 +0,0 @@
|
||||
"""Tests for format_detector.py."""
|
||||
|
||||
from .format_detector import detect_format
|
||||
from .models import SourcePlatform
|
||||
|
||||
|
||||
class TestDetectFormat:
|
||||
def test_n8n_workflow(self):
|
||||
data = {
|
||||
"name": "My n8n Workflow",
|
||||
"nodes": [
|
||||
{
|
||||
"name": "Webhook",
|
||||
"type": "n8n-nodes-base.webhook",
|
||||
"parameters": {"path": "/hook"},
|
||||
},
|
||||
{
|
||||
"name": "HTTP Request",
|
||||
"type": "n8n-nodes-base.httpRequest",
|
||||
"parameters": {"url": "https://api.example.com"},
|
||||
},
|
||||
],
|
||||
"connections": {
|
||||
"Webhook": {
|
||||
"main": [[{"node": "HTTP Request", "type": "main", "index": 0}]]
|
||||
}
|
||||
},
|
||||
}
|
||||
assert detect_format(data) == SourcePlatform.N8N
|
||||
|
||||
def test_n8n_langchain_nodes(self):
|
||||
data = {
|
||||
"nodes": [
|
||||
{
|
||||
"name": "Agent",
|
||||
"type": "@n8n/n8n-nodes-langchain.agent",
|
||||
"parameters": {},
|
||||
},
|
||||
],
|
||||
"connections": {},
|
||||
}
|
||||
assert detect_format(data) == SourcePlatform.N8N
|
||||
|
||||
def test_make_scenario(self):
|
||||
data = {
|
||||
"name": "My Make Scenario",
|
||||
"flow": [
|
||||
{
|
||||
"module": "google-sheets:watchUpdatedCells",
|
||||
"mapper": {"spreadsheetId": "123"},
|
||||
},
|
||||
{
|
||||
"module": "google-calendar:createAnEvent",
|
||||
"mapper": {"title": "Test"},
|
||||
},
|
||||
],
|
||||
}
|
||||
assert detect_format(data) == SourcePlatform.MAKE
|
||||
|
||||
def test_zapier_zap(self):
|
||||
data = {
|
||||
"name": "My Zap",
|
||||
"steps": [
|
||||
{"app": "gmail", "action": "new_email"},
|
||||
{
|
||||
"app": "slack",
|
||||
"action": "send_message",
|
||||
"params": {"channel": "#general"},
|
||||
},
|
||||
],
|
||||
}
|
||||
assert detect_format(data) == SourcePlatform.ZAPIER
|
||||
|
||||
def test_unknown_format(self):
|
||||
data = {"foo": "bar", "nodes": []}
|
||||
assert detect_format(data) == SourcePlatform.UNKNOWN
|
||||
|
||||
def test_empty_dict(self):
|
||||
assert detect_format({}) == SourcePlatform.UNKNOWN
|
||||
|
||||
def test_autogpt_graph_not_detected_as_n8n(self):
|
||||
"""AutoGPT graphs have nodes but not n8n-style types."""
|
||||
data = {
|
||||
"nodes": [
|
||||
{"id": "abc", "block_id": "some-uuid", "input_default": {}},
|
||||
],
|
||||
"connections": {},
|
||||
}
|
||||
assert detect_format(data) == SourcePlatform.UNKNOWN
|
||||
|
||||
def test_make_without_colon_not_detected(self):
|
||||
data = {
|
||||
"flow": [{"module": "simplemodule", "mapper": {}}],
|
||||
}
|
||||
assert detect_format(data) == SourcePlatform.UNKNOWN
|
||||
|
||||
def test_zapier_without_action_not_detected(self):
|
||||
data = {
|
||||
"steps": [{"app": "gmail"}],
|
||||
}
|
||||
assert detect_format(data) == SourcePlatform.UNKNOWN
|
||||
@@ -1,33 +0,0 @@
|
||||
"""Data models for external workflow import."""
|
||||
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
import pydantic
|
||||
|
||||
|
||||
class SourcePlatform(str, Enum):
|
||||
N8N = "n8n"
|
||||
MAKE = "make"
|
||||
ZAPIER = "zapier"
|
||||
UNKNOWN = "unknown"
|
||||
|
||||
|
||||
class StepDescription(pydantic.BaseModel):
|
||||
"""A single step/node extracted from an external workflow."""
|
||||
|
||||
order: int
|
||||
action: str
|
||||
service: str
|
||||
parameters: dict[str, Any] = pydantic.Field(default_factory=dict)
|
||||
connections_to: list[int] = pydantic.Field(default_factory=list)
|
||||
|
||||
|
||||
class WorkflowDescription(pydantic.BaseModel):
|
||||
"""Structured description of an external workflow."""
|
||||
|
||||
name: str
|
||||
description: str
|
||||
steps: list[StepDescription]
|
||||
trigger_type: str | None = None
|
||||
source_format: SourcePlatform
|
||||
@@ -1,74 +0,0 @@
|
||||
"""Fetch workflow templates by URL."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from typing import Any
|
||||
|
||||
from backend.util.request import HTTPClientError, Requests
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Patterns for extracting template IDs from n8n URLs
|
||||
_N8N_WORKFLOW_URL_RE = re.compile(
|
||||
r"https?://(?:www\.)?n8n\.io/workflows/(\d+)", re.IGNORECASE
|
||||
)
|
||||
_N8N_TEMPLATES_API = "https://api.n8n.io/api/templates/workflows/{id}"
|
||||
|
||||
|
||||
async def fetch_n8n_template(url: str) -> dict[str, Any]:
|
||||
"""Fetch an n8n workflow template by its URL.
|
||||
|
||||
Supports URLs like:
|
||||
- https://n8n.io/workflows/1234
|
||||
- https://n8n.io/workflows/1234-some-slug
|
||||
|
||||
Args:
|
||||
url: The n8n template URL.
|
||||
|
||||
Returns:
|
||||
The n8n workflow JSON.
|
||||
|
||||
Raises:
|
||||
ValueError: If the URL is not a valid n8n template URL.
|
||||
RuntimeError: If the fetch fails.
|
||||
"""
|
||||
match = _N8N_WORKFLOW_URL_RE.match(url.strip())
|
||||
if not match:
|
||||
raise ValueError(
|
||||
"Not a valid n8n workflow URL. Expected format: "
|
||||
"https://n8n.io/workflows/<id>"
|
||||
)
|
||||
|
||||
template_id = match.group(1)
|
||||
api_url = _N8N_TEMPLATES_API.format(id=template_id)
|
||||
|
||||
client = Requests(raise_for_status=True)
|
||||
try:
|
||||
response = await client.get(api_url)
|
||||
data = response.json()
|
||||
except HTTPClientError as e:
|
||||
# 4xx from n8n API (e.g. 404 template not found) → bad user input
|
||||
raise ValueError(
|
||||
f"n8n template {template_id} not found or inaccessible: {e}"
|
||||
) from e
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"Failed to fetch n8n template {template_id}: {e}") from e
|
||||
|
||||
if not isinstance(data, dict):
|
||||
raise RuntimeError(
|
||||
f"Unexpected response format from n8n API for template {template_id}: "
|
||||
"expected JSON object"
|
||||
)
|
||||
|
||||
# n8n API wraps the workflow in a `workflow` key
|
||||
workflow = data.get("workflow", data)
|
||||
if not isinstance(workflow, dict):
|
||||
raise RuntimeError(
|
||||
f"Unexpected response format from n8n API for template {template_id}"
|
||||
)
|
||||
|
||||
# Preserve the workflow name from the template metadata
|
||||
if "name" not in workflow and "name" in data:
|
||||
workflow["name"] = data["name"]
|
||||
|
||||
return workflow
|
||||
@@ -183,7 +183,8 @@ class WorkspaceManager:
|
||||
f"{Config().max_file_size_mb}MB limit"
|
||||
)
|
||||
|
||||
# Virus scan content before persisting (defense in depth)
|
||||
# Scan here — callers must NOT duplicate this scan.
|
||||
# WorkspaceManager owns virus scanning for all persisted files.
|
||||
await scan_content_safe(content, filename=filename)
|
||||
|
||||
# Determine path with session scoping
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import LibraryImportWorkflowDialog from "../LibraryImportWorkflowDialog/LibraryImportWorkflowDialog";
|
||||
import { LibrarySearchBar } from "../LibrarySearchBar/LibrarySearchBar";
|
||||
import LibraryUploadAgentDialog from "../LibraryUploadAgentDialog/LibraryUploadAgentDialog";
|
||||
|
||||
@@ -12,14 +11,12 @@ export function LibraryActionHeader({ setSearchTerm }: Props) {
|
||||
<div className="mb-[32px] hidden items-center justify-center gap-4 md:flex">
|
||||
<LibrarySearchBar setSearchTerm={setSearchTerm} />
|
||||
<LibraryUploadAgentDialog />
|
||||
<LibraryImportWorkflowDialog />
|
||||
</div>
|
||||
|
||||
{/* Mobile and tablet */}
|
||||
<div className="flex flex-col gap-4 p-4 pt-[52px] md:hidden">
|
||||
<div className="flex w-full justify-between gap-2">
|
||||
<div className="flex w-full justify-between">
|
||||
<LibraryUploadAgentDialog />
|
||||
<LibraryImportWorkflowDialog />
|
||||
</div>
|
||||
|
||||
<div className="flex items-center justify-center">
|
||||
|
||||
@@ -1,156 +0,0 @@
|
||||
"use client";
|
||||
import { Button } from "@/components/atoms/Button/Button";
|
||||
import { FileInput } from "@/components/atoms/FileInput/FileInput";
|
||||
import { Input } from "@/components/atoms/Input/Input";
|
||||
import { LoadingSpinner } from "@/components/atoms/LoadingSpinner/LoadingSpinner";
|
||||
import { Dialog } from "@/components/molecules/Dialog/Dialog";
|
||||
import {
|
||||
Form,
|
||||
FormControl,
|
||||
FormField,
|
||||
FormItem,
|
||||
FormMessage,
|
||||
} from "@/components/molecules/Form/Form";
|
||||
import { ArrowsClockwiseIcon } from "@phosphor-icons/react";
|
||||
import { z } from "zod";
|
||||
import { useLibraryImportWorkflowDialog } from "./useLibraryImportWorkflowDialog";
|
||||
|
||||
export const importWorkflowFormSchema = z.object({
|
||||
workflowFile: z.string(),
|
||||
templateUrl: z.string(),
|
||||
});
|
||||
|
||||
export default function LibraryImportWorkflowDialog() {
|
||||
const {
|
||||
onSubmit,
|
||||
isConverting,
|
||||
isOpen,
|
||||
setIsOpen,
|
||||
form,
|
||||
importMode,
|
||||
setImportMode,
|
||||
} = useLibraryImportWorkflowDialog();
|
||||
|
||||
const hasInput =
|
||||
importMode === "url"
|
||||
? !!form.watch("templateUrl")
|
||||
: !!form.watch("workflowFile");
|
||||
|
||||
return (
|
||||
<Dialog
|
||||
title="Import Workflow"
|
||||
styling={{ maxWidth: "32rem" }}
|
||||
controlled={{
|
||||
isOpen,
|
||||
set: setIsOpen,
|
||||
}}
|
||||
onClose={() => {
|
||||
setIsOpen(false);
|
||||
form.reset();
|
||||
}}
|
||||
>
|
||||
<Dialog.Trigger>
|
||||
<Button
|
||||
data-testid="import-workflow-button"
|
||||
variant="primary"
|
||||
className="h-[2.78rem] w-full md:w-[14rem]"
|
||||
size="small"
|
||||
>
|
||||
<ArrowsClockwiseIcon width={18} height={18} />
|
||||
<span>Import workflow</span>
|
||||
</Button>
|
||||
</Dialog.Trigger>
|
||||
<Dialog.Content>
|
||||
{/* Mode toggle */}
|
||||
<div className="mb-4 flex gap-2">
|
||||
<Button
|
||||
variant={importMode === "file" ? "primary" : "outline"}
|
||||
size="small"
|
||||
onClick={() => setImportMode("file")}
|
||||
type="button"
|
||||
>
|
||||
Upload file
|
||||
</Button>
|
||||
<Button
|
||||
variant={importMode === "url" ? "primary" : "outline"}
|
||||
size="small"
|
||||
onClick={() => setImportMode("url")}
|
||||
type="button"
|
||||
>
|
||||
Paste URL
|
||||
</Button>
|
||||
</div>
|
||||
|
||||
<p className="mb-4 text-sm text-neutral-500">
|
||||
Import workflows from n8n, Make.com, or Zapier. The workflow will be
|
||||
automatically converted to an AutoGPT agent.
|
||||
</p>
|
||||
|
||||
<Form
|
||||
form={form}
|
||||
onSubmit={onSubmit}
|
||||
className="flex flex-col justify-center gap-0 px-1"
|
||||
>
|
||||
{importMode === "file" ? (
|
||||
<FormField
|
||||
control={form.control}
|
||||
name="workflowFile"
|
||||
render={({ field }) => (
|
||||
<FormItem>
|
||||
<FormControl>
|
||||
<FileInput
|
||||
mode="base64"
|
||||
value={field.value}
|
||||
onChange={field.onChange}
|
||||
accept=".json,application/json"
|
||||
placeholder="Workflow JSON file (n8n, Make.com, or Zapier export)"
|
||||
maxFileSize={10 * 1024 * 1024}
|
||||
showStorageNote={false}
|
||||
className="mb-4 mt-2"
|
||||
/>
|
||||
</FormControl>
|
||||
<FormMessage />
|
||||
</FormItem>
|
||||
)}
|
||||
/>
|
||||
) : (
|
||||
<FormField
|
||||
control={form.control}
|
||||
name="templateUrl"
|
||||
render={({ field }) => (
|
||||
<FormItem>
|
||||
<FormControl>
|
||||
<Input
|
||||
{...field}
|
||||
id={field.name}
|
||||
label="n8n template URL"
|
||||
placeholder="https://n8n.io/workflows/1234"
|
||||
className="mb-4 mt-2 w-full rounded-[10px]"
|
||||
/>
|
||||
</FormControl>
|
||||
<FormMessage />
|
||||
</FormItem>
|
||||
)}
|
||||
/>
|
||||
)}
|
||||
|
||||
<Button
|
||||
type="submit"
|
||||
variant="primary"
|
||||
className="min-w-[18rem]"
|
||||
disabled={!hasInput || isConverting}
|
||||
>
|
||||
{isConverting ? (
|
||||
<div className="flex items-center gap-2">
|
||||
<LoadingSpinner size="small" className="text-white" />
|
||||
<span>Parsing workflow...</span>
|
||||
</div>
|
||||
) : (
|
||||
"Import to AutoPilot"
|
||||
)}
|
||||
</Button>
|
||||
</Form>
|
||||
</Dialog.Content>
|
||||
</Dialog>
|
||||
);
|
||||
}
|
||||
@@ -1,89 +0,0 @@
|
||||
import { usePostV2ImportAWorkflowFromAnotherToolN8nMakeComZapier } from "@/app/api/__generated__/endpoints/import/import";
|
||||
import type { ImportWorkflowRequest } from "@/app/api/__generated__/models/importWorkflowRequest";
|
||||
import type { ImportWorkflowResponse } from "@/app/api/__generated__/models/importWorkflowResponse";
|
||||
import { useToast } from "@/components/molecules/Toast/use-toast";
|
||||
import { zodResolver } from "@hookform/resolvers/zod";
|
||||
import { useRouter } from "next/navigation";
|
||||
import { useState } from "react";
|
||||
import { useForm } from "react-hook-form";
|
||||
import { z } from "zod";
|
||||
import { importWorkflowFormSchema } from "./LibraryImportWorkflowDialog";
|
||||
|
||||
export function useLibraryImportWorkflowDialog() {
|
||||
const [isOpen, setIsOpen] = useState(false);
|
||||
const { toast } = useToast();
|
||||
const router = useRouter();
|
||||
const [importMode, setImportMode] = useState<"file" | "url">("file");
|
||||
|
||||
const { mutateAsync: importWorkflow, isPending: isConverting } =
|
||||
usePostV2ImportAWorkflowFromAnotherToolN8nMakeComZapier();
|
||||
|
||||
const form = useForm<z.infer<typeof importWorkflowFormSchema>>({
|
||||
resolver: zodResolver(importWorkflowFormSchema),
|
||||
defaultValues: {
|
||||
workflowFile: "",
|
||||
templateUrl: "",
|
||||
},
|
||||
});
|
||||
|
||||
const onSubmit = async (values: z.infer<typeof importWorkflowFormSchema>) => {
|
||||
try {
|
||||
let body: ImportWorkflowRequest;
|
||||
|
||||
if (importMode === "url" && values.templateUrl) {
|
||||
body = { template_url: values.templateUrl };
|
||||
} else if (importMode === "file" && values.workflowFile) {
|
||||
// Decode base64 file to JSON
|
||||
const base64Match = values.workflowFile.match(
|
||||
/^data:[^;]+;base64,(.+)$/,
|
||||
);
|
||||
if (!base64Match) {
|
||||
throw new Error("Invalid file format");
|
||||
}
|
||||
const jsonString = atob(base64Match[1]);
|
||||
const workflowJson = JSON.parse(jsonString);
|
||||
body = { workflow_json: workflowJson };
|
||||
} else {
|
||||
throw new Error("Please provide a workflow file or template URL");
|
||||
}
|
||||
|
||||
const response = await importWorkflow({ data: body });
|
||||
// Cast needed: generated client returns union with error types,
|
||||
// but errors throw before reaching here
|
||||
const data = response.data as ImportWorkflowResponse;
|
||||
|
||||
setIsOpen(false);
|
||||
form.reset();
|
||||
|
||||
toast({
|
||||
title: "Workflow Parsed",
|
||||
description: `Detected ${data.source_format} workflow "${data.source_name}". Redirecting to AutoPilot...`,
|
||||
});
|
||||
|
||||
// Redirect to AutoPilot with the prompt pre-filled and auto-submitted
|
||||
const encodedPrompt = encodeURIComponent(data.copilot_prompt);
|
||||
router.push(`/copilot?autosubmit=true#prompt=${encodedPrompt}`);
|
||||
} catch (error) {
|
||||
console.error("Import failed:", error);
|
||||
toast({
|
||||
title: "Import Failed",
|
||||
description:
|
||||
error instanceof Error
|
||||
? error.message
|
||||
: "Failed to parse workflow. Please check the file format.",
|
||||
variant: "destructive",
|
||||
duration: 5000,
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
return {
|
||||
onSubmit,
|
||||
isConverting,
|
||||
isOpen,
|
||||
setIsOpen,
|
||||
form,
|
||||
importMode,
|
||||
setImportMode,
|
||||
};
|
||||
}
|
||||
@@ -2920,46 +2920,6 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"/api/import/workflow": {
|
||||
"post": {
|
||||
"tags": ["v2", "import"],
|
||||
"summary": "Import a workflow from another tool (n8n, Make.com, Zapier)",
|
||||
"description": "Parse an external workflow and return a CoPilot prompt.\n\nAccepts either raw workflow JSON or a template URL (n8n only for now).\nThe workflow is parsed and described, then a structured prompt is returned\nfor CoPilot's agent-generator to handle the actual conversion.",
|
||||
"operationId": "postV2Import a workflow from another tool (n8n, make.com, zapier)",
|
||||
"requestBody": {
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": { "$ref": "#/components/schemas/ImportWorkflowRequest" }
|
||||
}
|
||||
},
|
||||
"required": true
|
||||
},
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successful Response",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"$ref": "#/components/schemas/ImportWorkflowResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"401": {
|
||||
"$ref": "#/components/responses/HTTP401NotAuthenticatedError"
|
||||
},
|
||||
"422": {
|
||||
"description": "Validation Error",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"security": [{ "HTTPBearerJWT": [] }]
|
||||
}
|
||||
},
|
||||
"/api/integrations/ayrshare/sso_url": {
|
||||
"get": {
|
||||
"tags": ["v1", "integrations"],
|
||||
@@ -10011,35 +9971,6 @@
|
||||
"required": ["image_url"],
|
||||
"title": "ImageURLResponse"
|
||||
},
|
||||
"ImportWorkflowRequest": {
|
||||
"properties": {
|
||||
"workflow_json": {
|
||||
"anyOf": [
|
||||
{ "additionalProperties": true, "type": "object" },
|
||||
{ "type": "null" }
|
||||
],
|
||||
"title": "Workflow Json"
|
||||
},
|
||||
"template_url": {
|
||||
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||
"title": "Template Url"
|
||||
}
|
||||
},
|
||||
"type": "object",
|
||||
"title": "ImportWorkflowRequest",
|
||||
"description": "Request body for importing an external workflow."
|
||||
},
|
||||
"ImportWorkflowResponse": {
|
||||
"properties": {
|
||||
"copilot_prompt": { "type": "string", "title": "Copilot Prompt" },
|
||||
"source_format": { "type": "string", "title": "Source Format" },
|
||||
"source_name": { "type": "string", "title": "Source Name" }
|
||||
},
|
||||
"type": "object",
|
||||
"required": ["copilot_prompt", "source_format", "source_name"],
|
||||
"title": "ImportWorkflowResponse",
|
||||
"description": "Response from parsing an external workflow.\n\nReturns a CoPilot prompt that the frontend uses to redirect the user\nto CoPilot, where the agentic agent-generator handles the conversion."
|
||||
},
|
||||
"InputValidationErrorResponse": {
|
||||
"properties": {
|
||||
"type": {
|
||||
|
||||
343
docs/platform/workspace-media-architecture.md
Normal file
343
docs/platform/workspace-media-architecture.md
Normal file
@@ -0,0 +1,343 @@
|
||||
# Workspace & Media File Architecture
|
||||
|
||||
This document describes the architecture for handling user files in AutoGPT Platform, covering persistent user storage (Workspace) and ephemeral media processing pipelines.
|
||||
|
||||
## Overview
|
||||
|
||||
The platform has two distinct file-handling layers:
|
||||
|
||||
| Layer | Purpose | Persistence | Scope |
|
||||
|-------|---------|-------------|-------|
|
||||
| **Workspace** | Long-term user file storage | Persistent (DB + GCS/local) | Per-user, session-scoped access |
|
||||
| **Media Pipeline** | Ephemeral file processing for blocks | Temporary (local disk) | Per-execution |
|
||||
|
||||
## Database Models
|
||||
|
||||
### UserWorkspace
|
||||
|
||||
Represents a user's file storage space. Created on-demand (one per user).
|
||||
|
||||
```prisma
|
||||
model UserWorkspace {
|
||||
id String @id @default(uuid())
|
||||
createdAt DateTime @default(now())
|
||||
updatedAt DateTime @updatedAt
|
||||
userId String @unique
|
||||
Files UserWorkspaceFile[]
|
||||
}
|
||||
```
|
||||
|
||||
**Key points:**
|
||||
- One workspace per user (enforced by `@unique` on `userId`)
|
||||
- Created lazily via `get_or_create_workspace()`
|
||||
- Uses upsert to handle race conditions
|
||||
|
||||
### UserWorkspaceFile
|
||||
|
||||
Represents a file stored in a user's workspace.
|
||||
|
||||
```prisma
|
||||
model UserWorkspaceFile {
|
||||
id String @id @default(uuid())
|
||||
workspaceId String
|
||||
name String // User-visible filename
|
||||
path String // Virtual path (e.g., "/sessions/abc123/image.png")
|
||||
storagePath String // Actual storage path (gcs://... or local://...)
|
||||
mimeType String
|
||||
sizeBytes BigInt
|
||||
checksum String? // SHA256 for integrity
|
||||
isDeleted Boolean @default(false)
|
||||
deletedAt DateTime?
|
||||
metadata Json @default("{}")
|
||||
|
||||
@@unique([workspaceId, path]) // Enforce unique paths within workspace
|
||||
}
|
||||
```
|
||||
|
||||
**Key points:**
|
||||
- `path` is a virtual path for organizing files (not actual filesystem path)
|
||||
- `storagePath` contains the actual GCS or local storage location
|
||||
- Soft-delete pattern: `isDeleted` flag with `deletedAt` timestamp
|
||||
- Path is modified on delete to free up the virtual path for reuse
|
||||
|
||||
---
|
||||
|
||||
## WorkspaceManager
|
||||
|
||||
**Location:** `backend/util/workspace.py`
|
||||
|
||||
High-level API for workspace file operations. Combines storage backend operations with database record management.
|
||||
|
||||
### Initialization
|
||||
|
||||
```python
|
||||
from backend.util.workspace import WorkspaceManager
|
||||
|
||||
# Basic usage
|
||||
manager = WorkspaceManager(user_id="user-123", workspace_id="ws-456")
|
||||
|
||||
# With session scoping (CoPilot sessions)
|
||||
manager = WorkspaceManager(
|
||||
user_id="user-123",
|
||||
workspace_id="ws-456",
|
||||
session_id="session-789"
|
||||
)
|
||||
```
|
||||
|
||||
### Session Scoping
|
||||
|
||||
When `session_id` is provided, files are isolated to `/sessions/{session_id}/`:
|
||||
|
||||
```python
|
||||
# With session_id="abc123":
|
||||
manager.write_file(content, "image.png")
|
||||
# → stored at /sessions/abc123/image.png
|
||||
|
||||
# Cross-session access is explicit:
|
||||
manager.read_file("/sessions/other-session/file.txt") # Works
|
||||
```
|
||||
|
||||
**Why session scoping?**
|
||||
- CoPilot conversations need file isolation
|
||||
- Prevents file collisions between concurrent sessions
|
||||
- Allows session cleanup without affecting other sessions
|
||||
|
||||
### Core Methods
|
||||
|
||||
| Method | Description |
|
||||
|--------|-------------|
|
||||
| `write_file(content, filename, path?, mime_type?, overwrite?)` | Write file to workspace |
|
||||
| `read_file(path)` | Read file by virtual path |
|
||||
| `read_file_by_id(file_id)` | Read file by ID |
|
||||
| `list_files(path?, limit?, offset?, include_all_sessions?)` | List files |
|
||||
| `delete_file(file_id)` | Soft-delete a file |
|
||||
| `get_download_url(file_id, expires_in?)` | Get signed download URL |
|
||||
| `get_file_info(file_id)` | Get file metadata |
|
||||
| `get_file_info_by_path(path)` | Get file metadata by path |
|
||||
| `get_file_count(path?, include_all_sessions?)` | Count files |
|
||||
|
||||
### Storage Backends
|
||||
|
||||
WorkspaceManager delegates to `WorkspaceStorageBackend`:
|
||||
|
||||
| Backend | When Used | Storage Path Format |
|
||||
|---------|-----------|---------------------|
|
||||
| `GCSWorkspaceStorage` | `media_gcs_bucket_name` is configured | `gcs://bucket/workspaces/{ws_id}/{file_id}/{filename}` |
|
||||
| `LocalWorkspaceStorage` | No GCS bucket configured | `local://{ws_id}/{file_id}/{filename}` |
|
||||
|
||||
---
|
||||
|
||||
## store_media_file()
|
||||
|
||||
**Location:** `backend/util/file.py`
|
||||
|
||||
The media normalization pipeline. Handles various input types and normalizes them for processing or output.
|
||||
|
||||
### Purpose
|
||||
|
||||
Blocks receive files in many formats (URLs, data URIs, workspace references, local paths). `store_media_file()` normalizes these to a consistent format based on what the block needs.
|
||||
|
||||
### Input Types Handled
|
||||
|
||||
| Input Format | Example | How It's Processed |
|
||||
|--------------|---------|-------------------|
|
||||
| Data URI | `data:image/png;base64,iVBOR...` | Decoded, virus scanned, written locally |
|
||||
| HTTP(S) URL | `https://example.com/image.png` | Downloaded, virus scanned, written locally |
|
||||
| Workspace URI | `workspace://abc123` or `workspace:///path/to/file` | Read from workspace, virus scanned, written locally |
|
||||
| Cloud path | `gcs://bucket/path` | Downloaded, virus scanned, written locally |
|
||||
| Local path | `image.png` | Verified to exist in exec_file directory |
|
||||
|
||||
### Return Formats
|
||||
|
||||
The `return_format` parameter determines what you get back:
|
||||
|
||||
```python
|
||||
from backend.util.file import store_media_file
|
||||
|
||||
# For local processing (ffmpeg, MoviePy, PIL)
|
||||
local_path = await store_media_file(
|
||||
file=input_file,
|
||||
execution_context=ctx,
|
||||
return_format="for_local_processing"
|
||||
)
|
||||
# Returns: "image.png" (relative path in exec_file dir)
|
||||
|
||||
# For external APIs (Replicate, OpenAI, etc.)
|
||||
data_uri = await store_media_file(
|
||||
file=input_file,
|
||||
execution_context=ctx,
|
||||
return_format="for_external_api"
|
||||
)
|
||||
# Returns: "data:image/png;base64,iVBOR..."
|
||||
|
||||
# For block output (adapts to execution context)
|
||||
output = await store_media_file(
|
||||
file=input_file,
|
||||
execution_context=ctx,
|
||||
return_format="for_block_output"
|
||||
)
|
||||
# In CoPilot: Returns "workspace://file-id#image/png"
|
||||
# In graphs: Returns "data:image/png;base64,..."
|
||||
```
|
||||
|
||||
### Execution Context
|
||||
|
||||
`store_media_file()` requires an `ExecutionContext` with:
|
||||
- `graph_exec_id` - Required for temp file location
|
||||
- `user_id` - Required for workspace access
|
||||
- `workspace_id` - Optional; enables workspace features
|
||||
- `session_id` - Optional; for session scoping in CoPilot
|
||||
|
||||
---
|
||||
|
||||
## Responsibility Boundaries
|
||||
|
||||
### Virus Scanning
|
||||
|
||||
| Component | Scans? | Notes |
|
||||
|-----------|--------|-------|
|
||||
| `store_media_file()` | ✅ Yes | Scans **all** content before writing to local disk |
|
||||
| `WorkspaceManager.write_file()` | ✅ Yes | Scans content before persisting |
|
||||
|
||||
**Scanning happens at:**
|
||||
1. `store_media_file()` — scans everything it downloads/decodes
|
||||
2. `WorkspaceManager.write_file()` — scans before persistence
|
||||
|
||||
Tools like `WriteWorkspaceFileTool` don't need to scan because `WorkspaceManager.write_file()` handles it.
|
||||
|
||||
### Persistence
|
||||
|
||||
| Component | Persists To | Lifecycle |
|
||||
|-----------|-------------|-----------|
|
||||
| `store_media_file()` | Temp dir (`/tmp/exec_file/{exec_id}/`) | Cleaned after execution |
|
||||
| `WorkspaceManager` | GCS or local storage + DB | Persistent until deleted |
|
||||
|
||||
**Automatic cleanup:** `clean_exec_files(graph_exec_id)` removes temp files after execution completes.
|
||||
|
||||
---
|
||||
|
||||
## Decision Tree: WorkspaceManager vs store_media_file
|
||||
|
||||
```text
|
||||
┌─────────────────────────────────────────────────────┐
|
||||
│ What do you need to do with the file? │
|
||||
└─────────────────────────────────────────────────────┘
|
||||
│
|
||||
┌─────────────┴─────────────┐
|
||||
▼ ▼
|
||||
Process in a block Store for user access
|
||||
(ffmpeg, PIL, etc.) (CoPilot files, uploads)
|
||||
│ │
|
||||
▼ ▼
|
||||
store_media_file() WorkspaceManager
|
||||
with appropriate
|
||||
return_format
|
||||
│
|
||||
│
|
||||
┌──────┴──────┐
|
||||
▼ ▼
|
||||
"for_local_ "for_block_
|
||||
processing" output"
|
||||
│ │
|
||||
▼ ▼
|
||||
Get local Auto-saves to
|
||||
path for workspace in
|
||||
tools CoPilot context
|
||||
|
||||
Store for user access
|
||||
│
|
||||
├── write_file() ─── Upload + persist (scans internally)
|
||||
├── read_file() / get_download_url() ─── Retrieve
|
||||
└── list_files() / delete_file() ─── Manage
|
||||
```
|
||||
|
||||
### Quick Reference
|
||||
|
||||
| Scenario | Use |
|
||||
|----------|-----|
|
||||
| Block needs to process a file with ffmpeg | `store_media_file(..., return_format="for_local_processing")` |
|
||||
| Block needs to send file to external API | `store_media_file(..., return_format="for_external_api")` |
|
||||
| Block returning a generated file | `store_media_file(..., return_format="for_block_output")` |
|
||||
| API endpoint handling file upload | `WorkspaceManager.write_file()` (handles virus scanning internally) |
|
||||
| API endpoint serving file download | `WorkspaceManager.get_download_url()` |
|
||||
| Listing user's files | `WorkspaceManager.list_files()` |
|
||||
|
||||
---
|
||||
|
||||
## Key Files Reference
|
||||
|
||||
| File | Purpose |
|
||||
|------|---------|
|
||||
| `backend/data/workspace.py` | Database CRUD operations for UserWorkspace and UserWorkspaceFile |
|
||||
| `backend/util/workspace.py` | `WorkspaceManager` class - high-level workspace API |
|
||||
| `backend/util/workspace_storage.py` | Storage backends (GCS, local) and `WorkspaceStorageBackend` interface |
|
||||
| `backend/util/file.py` | `store_media_file()` and media processing utilities |
|
||||
| `backend/util/virus_scanner.py` | `VirusScannerService` and `scan_content_safe()` |
|
||||
| `schema.prisma` | Database model definitions |
|
||||
|
||||
---
|
||||
|
||||
## Common Patterns
|
||||
|
||||
### Block Processing a User's File
|
||||
|
||||
```python
|
||||
async def run(self, input_data, *, execution_context, **kwargs):
|
||||
# Normalize input to local path
|
||||
local_path = await store_media_file(
|
||||
file=input_data.video,
|
||||
execution_context=execution_context,
|
||||
return_format="for_local_processing",
|
||||
)
|
||||
|
||||
# Process with local tools
|
||||
output_path = process_video(local_path)
|
||||
|
||||
# Return (auto-saves to workspace in CoPilot)
|
||||
result = await store_media_file(
|
||||
file=output_path,
|
||||
execution_context=execution_context,
|
||||
return_format="for_block_output",
|
||||
)
|
||||
yield "output", result
|
||||
```
|
||||
|
||||
### API Upload Endpoint
|
||||
|
||||
```python
|
||||
from backend.util.virus_scanner import VirusDetectedError, VirusScanError
|
||||
|
||||
async def upload_file(file: UploadFile, user_id: str, workspace_id: str):
|
||||
content = await file.read()
|
||||
|
||||
# write_file handles virus scanning internally
|
||||
manager = WorkspaceManager(user_id, workspace_id)
|
||||
try:
|
||||
workspace_file = await manager.write_file(
|
||||
content=content,
|
||||
filename=file.filename,
|
||||
)
|
||||
except VirusDetectedError:
|
||||
raise HTTPException(status_code=400, detail="File rejected: virus detected")
|
||||
except VirusScanError:
|
||||
raise HTTPException(status_code=503, detail="Virus scanning unavailable")
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e))
|
||||
|
||||
return {"file_id": workspace_file.id}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Configuration
|
||||
|
||||
| Setting | Purpose | Default |
|
||||
|---------|---------|---------|
|
||||
| `media_gcs_bucket_name` | GCS bucket for workspace storage | None (uses local) |
|
||||
| `workspace_storage_dir` | Local storage directory | `{app_data}/workspaces` |
|
||||
| `max_file_size_mb` | Maximum file size in MB | 100 |
|
||||
| `clamav_service_enabled` | Enable virus scanning | true |
|
||||
| `clamav_service_host` | ClamAV daemon host | localhost |
|
||||
| `clamav_service_port` | ClamAV daemon port | 3310 |
|
||||
| `clamav_max_concurrency` | Max concurrent scans to ClamAV daemon | 5 |
|
||||
| `clamav_mark_failed_scans_as_clean` | If true, scan failures pass content through instead of rejecting (⚠️ security risk if ClamAV is unreachable) | false |
|
||||
Reference in New Issue
Block a user