feat(platform): add competitor workflow import (n8n, Make.com, Zapier)

Enable one-click import of workflows from competitor platforms into AutoGPT.
Uses a hybrid approach: deterministic parsing extracts structured workflow
descriptions, then the CoPilot LLM pipeline generates equivalent AutoGPT
agent graphs with auto-fixing and validation.

Backend:
- New `copilot/workflow_import/` module with format detection, describers,
  URL fetcher (n8n templates API), and LLM-powered converter
- New `POST /api/import/competitor-workflow` endpoint
- 17 unit tests for format detection and description extraction

Frontend:
- New LibraryImportCompetitorDialog with file upload and URL paste modes
- Integrated into LibraryActionHeader alongside existing Upload button
This commit is contained in:
Zamil Majdy
2026-03-16 21:58:13 +07:00
parent d9c16ded65
commit 758500bc58
14 changed files with 1368 additions and 1 deletions

View File

@@ -0,0 +1,117 @@
"""API endpoint for importing competitor workflows."""
import logging
from typing import Annotated, Any
import pydantic
from autogpt_libs.auth import get_user_id, requires_user
from fastapi import APIRouter, Security
from backend.copilot.workflow_import.converter import convert_competitor_workflow
from backend.copilot.workflow_import.describers import describe_workflow
from backend.copilot.workflow_import.format_detector import (
CompetitorFormat,
detect_format,
)
from backend.copilot.workflow_import.url_fetcher import fetch_n8n_template
logger = logging.getLogger(__name__)
router = APIRouter()
class ImportWorkflowRequest(pydantic.BaseModel):
"""Request body for importing a competitor workflow."""
workflow_json: dict[str, Any] | None = None
template_url: str | None = None
save: bool = True
@pydantic.model_validator(mode="after")
def check_exactly_one_source(self) -> "ImportWorkflowRequest":
if not self.workflow_json and not self.template_url:
raise ValueError("Provide either 'workflow_json' or 'template_url'")
if self.workflow_json and self.template_url:
raise ValueError(
"Provide only one of 'workflow_json' or 'template_url', not both"
)
return self
class ImportWorkflowResponse(pydantic.BaseModel):
"""Response from importing a competitor workflow."""
graph: dict[str, Any]
graph_id: str | None = None
library_agent_id: str | None = None
source_format: str
source_name: str
conversion_notes: list[str] = []
@router.post(
path="/competitor-workflow",
summary="Import a competitor workflow (n8n, Make.com, Zapier)",
tags=["import"],
dependencies=[Security(requires_user)],
)
async def import_competitor_workflow(
request: ImportWorkflowRequest,
user_id: Annotated[str, Security(get_user_id)],
) -> ImportWorkflowResponse:
"""Import a workflow from a competitor platform and convert it to an AutoGPT agent.
Accepts either raw workflow JSON or a template URL (n8n only for now).
The workflow is parsed, described, and then converted to an AutoGPT graph
using LLM-powered block mapping.
"""
# Step 1: Get the raw workflow JSON
if request.template_url:
workflow_json = await fetch_n8n_template(request.template_url)
else:
workflow_json = request.workflow_json
assert workflow_json is not None # guaranteed by validator
# Step 2: Detect format
fmt = detect_format(workflow_json)
if fmt == CompetitorFormat.UNKNOWN:
raise ValueError(
"Could not detect workflow format. Supported formats: "
"n8n, Make.com, Zapier. Ensure you're uploading a valid "
"workflow export file."
)
# Step 3: Describe the workflow
desc = describe_workflow(workflow_json, fmt)
# Step 4: Convert to AutoGPT agent
agent_json, conversion_notes = await convert_competitor_workflow(desc, user_id)
# Step 5: Optionally save
graph_id = None
library_agent_id = None
if request.save:
from backend.copilot.tools.agent_generator.core import save_agent_to_library
try:
created_graph, library_agent = await save_agent_to_library(
agent_json, user_id
)
graph_id = created_graph.id
library_agent_id = library_agent.id
conversion_notes.append(f"Agent saved as '{created_graph.name}'")
except Exception as e:
logger.error(f"Failed to save imported agent: {e}", exc_info=True)
conversion_notes.append(
f"Save failed: {e}. You can try saving manually from the builder."
)
return ImportWorkflowResponse(
graph=agent_json,
graph_id=graph_id,
library_agent_id=library_agent_id,
source_format=fmt.value,
source_name=desc.name,
conversion_notes=conversion_notes,
)

View File

@@ -34,6 +34,7 @@ import backend.api.features.postmark.postmark
import backend.api.features.store.model
import backend.api.features.store.routes
import backend.api.features.v1
import backend.api.features.workflow_import
import backend.api.features.workspace.routes as workspace_routes
import backend.data.block
import backend.data.db
@@ -354,6 +355,11 @@ app.include_router(
tags=["oauth"],
prefix="/api/oauth",
)
app.include_router(
backend.api.features.workflow_import.router,
tags=["v2", "import"],
prefix="/api/import",
)
app.mount("/external-api", external_api)

View File

@@ -0,0 +1,15 @@
"""Competitor workflow import module.
Converts workflows from n8n, Make.com, and Zapier into AutoGPT agent graphs.
"""
from .converter import convert_competitor_workflow
from .format_detector import CompetitorFormat, detect_format
from .models import WorkflowDescription
__all__ = [
"CompetitorFormat",
"WorkflowDescription",
"convert_competitor_workflow",
"detect_format",
]

View File

@@ -0,0 +1,217 @@
"""LLM-powered conversion of competitor workflows to AutoGPT agent graphs.
Uses the CoPilot's LLM client to generate AutoGPT agent JSON from a structured
WorkflowDescription, then validates and fixes via the existing pipeline.
"""
import json
import logging
import pathlib
from typing import Any
from backend.copilot.config import ChatConfig
from backend.copilot.tools.agent_generator.blocks import get_blocks_as_dicts
from backend.copilot.tools.agent_generator.fixer import AgentFixer
from backend.copilot.tools.agent_generator.validator import AgentValidator
from .models import WorkflowDescription
logger = logging.getLogger(__name__)
_AGENT_GUIDE_PATH = (
pathlib.Path(__file__).resolve().parents[1] / "sdk" / "agent_generation_guide.md"
)
_MAX_RETRIES = 1
def _load_agent_guide() -> str:
"""Load the agent generation guide markdown."""
return _AGENT_GUIDE_PATH.read_text()
def _build_block_catalog(blocks: list[dict[str, Any]]) -> str:
"""Build a compact block catalog string for the LLM prompt."""
lines: list[str] = []
for b in blocks:
desc = (b.get("description") or "")[:200]
lines.append(f"- **{b['name']}** (id: `{b['id']}`): {desc}")
return "\n".join(lines)
def _build_conversion_prompt(
desc: WorkflowDescription,
block_catalog: str,
agent_guide: str,
error_feedback: str | None = None,
) -> list[dict[str, str]]:
"""Build the messages for the LLM conversion call."""
steps_text = ""
for step in desc.steps:
conns = (
f" -> connects to steps {step.connections_to}"
if step.connections_to
else ""
)
params_str = (
f" (params: {json.dumps(step.parameters, default=str)[:300]})"
if step.parameters
else ""
)
steps_text += (
f" {step.order}. [{step.service}] {step.action}{params_str}{conns}\n"
)
system_msg = f"""You are an expert at converting automation workflows into AutoGPT agent graphs.
Your task: Convert the competitor workflow described below into a valid AutoGPT agent JSON.
## Agent Generation Guide
{agent_guide}
## Available AutoGPT Blocks
{block_catalog}
## Instructions
1. Map each competitor workflow step to the most appropriate AutoGPT block(s)
2. If no exact block match exists, use the closest alternative (e.g., HttpRequestBlock for generic API calls)
3. Every agent MUST have at least one AgentInputBlock and one AgentOutputBlock
4. Wire blocks together with links matching the original workflow's data flow
5. Set meaningful input_default values based on the competitor's parameters
6. Position nodes with 800+ X-unit spacing
7. Return ONLY valid JSON — no markdown fences, no explanation"""
user_msg = f"""Convert this {desc.source_format.value} workflow to an AutoGPT agent:
**Name**: {desc.name}
**Description**: {desc.description}
**Trigger**: {desc.trigger_type or 'Manual'}
**Steps**:
{steps_text}
Generate the complete AutoGPT agent JSON with nodes and links."""
if error_feedback:
user_msg += f"""
IMPORTANT: Your previous attempt had validation errors. Fix them:
{error_feedback}"""
return [
{"role": "system", "content": system_msg},
{"role": "user", "content": user_msg},
]
async def convert_competitor_workflow(
desc: WorkflowDescription,
user_id: str,
) -> tuple[dict[str, Any], list[str]]:
"""Convert a WorkflowDescription into an AutoGPT agent JSON.
Args:
desc: Structured description of the competitor workflow.
user_id: The user ID for saving.
Returns:
Tuple of (agent_json dict, conversion_notes list).
Raises:
ValueError: If conversion fails after retries.
"""
from langfuse.openai import (
AsyncOpenAI as LangfuseAsyncOpenAI, # pyright: ignore[reportPrivateImportUsage]
)
config = ChatConfig()
client = LangfuseAsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
blocks = get_blocks_as_dicts()
block_catalog = _build_block_catalog(blocks)
agent_guide = _load_agent_guide()
conversion_notes: list[str] = []
error_feedback: str | None = None
for attempt in range(_MAX_RETRIES + 1):
messages = _build_conversion_prompt(
desc, block_catalog, agent_guide, error_feedback
)
try:
response = await client.chat.completions.create(
model=config.model,
messages=messages, # type: ignore[arg-type]
temperature=0.2,
max_tokens=8192,
)
except Exception as e:
raise ValueError(f"LLM call failed: {e}") from e
raw_content = response.choices[0].message.content or ""
# Strip markdown fences if present
content = raw_content.strip()
if content.startswith("```"):
lines = content.split("\n")
# Remove first line (```json) and last line (```)
lines = [line for line in lines[1:] if line.strip() != "```"]
content = "\n".join(lines)
try:
agent_json = json.loads(content)
except json.JSONDecodeError as e:
if attempt < _MAX_RETRIES:
error_feedback = f"Invalid JSON: {e}"
conversion_notes.append(
f"Retry {attempt + 1}: LLM output was not valid JSON"
)
continue
raise ValueError(
f"LLM produced invalid JSON after {_MAX_RETRIES + 1} attempts: {e}"
) from e
# Set metadata
agent_json.setdefault("name", desc.name)
agent_json.setdefault(
"description",
f"Imported from {desc.source_format.value}: {desc.description}",
)
agent_json.setdefault("version", 1)
agent_json.setdefault("is_active", True)
# Auto-fix
try:
fixer = AgentFixer()
agent_json = fixer.apply_all_fixes(agent_json, blocks)
fixes = fixer.get_fixes_applied()
if fixes:
conversion_notes.append(f"Applied {len(fixes)} auto-fixes")
logger.info(f"Applied {len(fixes)} auto-fixes to imported agent")
except Exception as e:
logger.warning(f"Auto-fix failed: {e}")
conversion_notes.append(f"Auto-fix warning: {e}")
# Validate
try:
validator = AgentValidator()
is_valid, _ = validator.validate(agent_json, blocks)
if not is_valid:
errors = validator.errors
if attempt < _MAX_RETRIES:
error_feedback = "\n".join(f"- {e}" for e in errors[:5])
conversion_notes.append(
f"Retry {attempt + 1}: validation errors found"
)
continue
# On final attempt, return with warnings rather than failing
conversion_notes.extend(f"Validation warning: {e}" for e in errors[:5])
conversion_notes.append("Agent may need manual fixes in the builder")
except Exception as e:
logger.warning(f"Validation exception: {e}")
conversion_notes.append(f"Validation could not complete: {e}")
return agent_json, conversion_notes
raise ValueError("Conversion failed after all retries")

View File

@@ -0,0 +1,256 @@
"""Extract structured WorkflowDescription from competitor workflow JSONs.
Each describer is a pure function that deterministically parses the competitor
format into a platform-agnostic WorkflowDescription. No LLM calls are made here.
"""
from typing import Any
from .models import CompetitorFormat, StepDescription, WorkflowDescription
def describe_workflow(
json_data: dict[str, Any], fmt: CompetitorFormat
) -> WorkflowDescription:
"""Route to the appropriate describer based on detected format."""
describers = {
CompetitorFormat.N8N: describe_n8n_workflow,
CompetitorFormat.MAKE: describe_make_workflow,
CompetitorFormat.ZAPIER: describe_zapier_workflow,
}
describer = describers.get(fmt)
if not describer:
raise ValueError(f"No describer available for format: {fmt}")
return describer(json_data)
def describe_n8n_workflow(json_data: dict[str, Any]) -> WorkflowDescription:
"""Extract a structured description from an n8n workflow JSON."""
nodes = json_data.get("nodes", [])
connections = json_data.get("connections", {})
# Build node index by name for connection resolution
node_index: dict[str, int] = {}
steps: list[StepDescription] = []
for i, node in enumerate(nodes):
node_name = node.get("name", f"Node {i}")
node_index[node_name] = i
node_type = node.get("type", "unknown")
# Extract service name from type (e.g., "n8n-nodes-base.gmail" -> "Gmail")
service = _extract_n8n_service(node_type)
# Build action description from type and parameters
params = node.get("parameters", {})
action = _describe_n8n_action(node_type, node_name, params)
# Extract key parameters (skip large/internal ones)
clean_params = _clean_params(params)
steps.append(
StepDescription(
order=i,
action=action,
service=service,
parameters=clean_params,
connections_to=[], # filled below
)
)
# Resolve connections: n8n format is {NodeName: {main: [[{node, type, index}]]}}
for source_name, conn_data in connections.items():
source_idx = node_index.get(source_name)
if source_idx is None:
continue
main_outputs = conn_data.get("main", [])
for output_group in main_outputs:
if not isinstance(output_group, list):
continue
for conn in output_group:
target_name = conn.get("node")
target_idx = node_index.get(target_name)
if target_idx is not None:
steps[source_idx].connections_to.append(target_idx)
# Detect trigger type
trigger_type = None
if nodes:
first_type = nodes[0].get("type", "")
if "trigger" in first_type.lower() or "webhook" in first_type.lower():
trigger_type = _extract_n8n_service(first_type)
return WorkflowDescription(
name=json_data.get("name", "Imported n8n Workflow"),
description=_build_workflow_summary(steps),
steps=steps,
trigger_type=trigger_type,
source_format=CompetitorFormat.N8N,
raw_json=json_data,
)
def describe_make_workflow(json_data: dict[str, Any]) -> WorkflowDescription:
"""Extract a structured description from a Make.com scenario blueprint."""
flow = json_data.get("flow", [])
steps: list[StepDescription] = []
for i, module in enumerate(flow):
module_ref = module.get("module", "unknown:unknown")
parts = module_ref.split(":", 1)
service = parts[0].replace("-", " ").title() if parts else "Unknown"
action_verb = parts[1] if len(parts) > 1 else "process"
# Build human-readable action
action = f"{action_verb.replace(':', ' ').title()} via {service}"
params = module.get("mapper", module.get("parameters", {}))
clean_params = _clean_params(params) if isinstance(params, dict) else {}
# Make.com flows are sequential by default; each step connects to next
connections_to = [i + 1] if i < len(flow) - 1 else []
# Check for routes (branching)
routes = module.get("routes", [])
if routes:
# Routes contain nested flows; flatten as additional steps
for route in routes:
route_flow = route.get("flow", [])
if route_flow:
next_idx = len(steps) + len(flow)
connections_to.append(next_idx)
steps.append(
StepDescription(
order=i,
action=action,
service=service,
parameters=clean_params,
connections_to=connections_to,
)
)
# Detect trigger
trigger_type = None
if flow:
first_module = flow[0].get("module", "")
if "watch" in first_module.lower() or "trigger" in first_module.lower():
trigger_type = first_module.split(":")[0].replace("-", " ").title()
return WorkflowDescription(
name=json_data.get("name", "Imported Make.com Scenario"),
description=_build_workflow_summary(steps),
steps=steps,
trigger_type=trigger_type,
source_format=CompetitorFormat.MAKE,
raw_json=json_data,
)
def describe_zapier_workflow(json_data: dict[str, Any]) -> WorkflowDescription:
"""Extract a structured description from a Zapier Zap JSON."""
zap_steps = json_data.get("steps", [])
steps: list[StepDescription] = []
for i, step in enumerate(zap_steps):
app = step.get("app", "Unknown")
action = step.get("action", "process")
action_desc = f"{action.replace('_', ' ').title()} via {app}"
params = step.get("params", step.get("inputFields", {}))
clean_params = _clean_params(params) if isinstance(params, dict) else {}
# Zapier zaps are linear: each step connects to next
connections_to = [i + 1] if i < len(zap_steps) - 1 else []
steps.append(
StepDescription(
order=i,
action=action_desc,
service=app,
parameters=clean_params,
connections_to=connections_to,
)
)
trigger_type = None
if zap_steps:
trigger_type = zap_steps[0].get("app")
return WorkflowDescription(
name=json_data.get("name", json_data.get("title", "Imported Zapier Zap")),
description=_build_workflow_summary(steps),
steps=steps,
trigger_type=trigger_type,
source_format=CompetitorFormat.ZAPIER,
raw_json=json_data,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _extract_n8n_service(node_type: str) -> str:
"""Extract a human-readable service name from an n8n node type.
Examples:
"n8n-nodes-base.gmail" -> "Gmail"
"@n8n/n8n-nodes-langchain.agent" -> "Langchain Agent"
"n8n-nodes-base.httpRequest" -> "Http Request"
"""
# Strip common prefixes
name = node_type
for prefix in ("n8n-nodes-base.", "@n8n/n8n-nodes-langchain.", "@n8n/"):
if name.startswith(prefix):
name = name[len(prefix) :]
break
# Convert camelCase to Title Case
import re
name = re.sub(r"([a-z])([A-Z])", r"\1 \2", name)
return name.replace(".", " ").replace("-", " ").title()
def _describe_n8n_action(node_type: str, node_name: str, params: dict[str, Any]) -> str:
"""Build a human-readable action description for an n8n node."""
service = _extract_n8n_service(node_type)
resource = params.get("resource", "")
operation = params.get("operation", "")
if resource and operation:
return f"{operation.title()} {resource} via {service}"
if operation:
return f"{operation.title()} via {service}"
return f"{node_name} ({service})"
def _clean_params(params: dict[str, Any], max_keys: int = 10) -> dict[str, Any]:
"""Extract key parameters, skipping large or internal values."""
cleaned: dict[str, Any] = {}
for key, value in list(params.items())[:max_keys]:
if key.startswith("_") or key in ("credentials", "webhookId"):
continue
if isinstance(value, str) and len(value) > 500:
cleaned[key] = value[:500] + "..."
elif isinstance(value, (str, int, float, bool)):
cleaned[key] = value
elif isinstance(value, list) and len(value) <= 5:
cleaned[key] = value
return cleaned
def _build_workflow_summary(steps: list[StepDescription]) -> str:
"""Build a one-line summary of the workflow from its steps."""
if not steps:
return "Empty workflow"
services = []
for s in steps:
if s.service not in services:
services.append(s.service)
service_chain = " -> ".join(services[:6])
if len(services) > 6:
service_chain += f" (and {len(services) - 6} more)"
return f"Workflow with {len(steps)} steps: {service_chain}"

View File

@@ -0,0 +1,135 @@
"""Tests for describers.py."""
import pytest
from .describers import (
describe_make_workflow,
describe_n8n_workflow,
describe_workflow,
describe_zapier_workflow,
)
from .models import CompetitorFormat
class TestDescribeN8nWorkflow:
def test_basic_workflow(self):
data = {
"name": "Email on Webhook",
"nodes": [
{
"name": "Webhook",
"type": "n8n-nodes-base.webhookTrigger",
"parameters": {"path": "/incoming"},
},
{
"name": "Send Email",
"type": "n8n-nodes-base.gmail",
"parameters": {"resource": "message", "operation": "send"},
},
],
"connections": {
"Webhook": {
"main": [[{"node": "Send Email", "type": "main", "index": 0}]]
}
},
}
desc = describe_n8n_workflow(data)
assert desc.name == "Email on Webhook"
assert desc.source_format == CompetitorFormat.N8N
assert len(desc.steps) == 2
assert desc.steps[0].connections_to == [1]
assert desc.steps[1].connections_to == []
assert desc.trigger_type is not None
def test_step_extraction(self):
data = {
"name": "Test",
"nodes": [
{
"name": "HTTP",
"type": "n8n-nodes-base.httpRequest",
"parameters": {"url": "https://example.com", "method": "GET"},
},
],
"connections": {},
}
desc = describe_n8n_workflow(data)
step = desc.steps[0]
assert step.service == "Http Request"
assert step.order == 0
assert "url" in step.parameters
def test_empty_nodes(self):
data = {"name": "Empty", "nodes": [], "connections": {}}
desc = describe_n8n_workflow(data)
assert len(desc.steps) == 0
assert desc.trigger_type is None
class TestDescribeMakeWorkflow:
def test_basic_scenario(self):
data = {
"name": "Sheets to Calendar",
"flow": [
{
"module": "google-sheets:watchUpdatedCells",
"mapper": {"spreadsheetId": "abc"},
},
{
"module": "google-calendar:createAnEvent",
"mapper": {"title": "Meeting"},
},
],
}
desc = describe_make_workflow(data)
assert desc.name == "Sheets to Calendar"
assert desc.source_format == CompetitorFormat.MAKE
assert len(desc.steps) == 2
# Sequential: step 0 connects to step 1
assert desc.steps[0].connections_to == [1]
assert desc.steps[1].connections_to == []
assert desc.trigger_type is not None # "watch" in module name
def test_service_extraction(self):
data = {
"flow": [{"module": "slack:sendMessage", "mapper": {"text": "hello"}}],
}
desc = describe_make_workflow(data)
assert desc.steps[0].service == "Slack"
class TestDescribeZapierWorkflow:
def test_basic_zap(self):
data = {
"name": "Gmail to Slack",
"steps": [
{"app": "Gmail", "action": "new_email"},
{
"app": "Slack",
"action": "send_message",
"params": {"channel": "#alerts"},
},
],
}
desc = describe_zapier_workflow(data)
assert desc.name == "Gmail to Slack"
assert desc.source_format == CompetitorFormat.ZAPIER
assert len(desc.steps) == 2
assert desc.steps[0].connections_to == [1]
assert desc.trigger_type == "Gmail"
class TestDescribeWorkflowRouter:
def test_routes_to_n8n(self):
data = {
"nodes": [
{"name": "N", "type": "n8n-nodes-base.webhook", "parameters": {}}
],
"connections": {},
}
desc = describe_workflow(data, CompetitorFormat.N8N)
assert desc.source_format == CompetitorFormat.N8N
def test_unknown_raises(self):
with pytest.raises(ValueError, match="No describer"):
describe_workflow({}, CompetitorFormat.UNKNOWN)

View File

@@ -0,0 +1,70 @@
"""Detect the source platform of a competitor workflow JSON."""
import re
from typing import Any
from .models import CompetitorFormat
def detect_format(json_data: dict[str, Any]) -> CompetitorFormat:
"""Inspect a workflow JSON and determine which platform it came from.
Args:
json_data: The parsed JSON data from a competitor workflow file.
Returns:
The detected CompetitorFormat.
"""
if _is_n8n(json_data):
return CompetitorFormat.N8N
if _is_make(json_data):
return CompetitorFormat.MAKE
if _is_zapier(json_data):
return CompetitorFormat.ZAPIER
return CompetitorFormat.UNKNOWN
def _is_n8n(data: dict[str, Any]) -> bool:
"""n8n workflows have a `nodes` array with items containing `type` fields
matching patterns like `n8n-nodes-base.*` or `@n8n/*`, plus a `connections`
object."""
nodes = data.get("nodes")
connections = data.get("connections")
if not isinstance(nodes, list) or not isinstance(connections, dict):
return False
if not nodes:
return False
# Check if at least one node has an n8n-style type
n8n_type_re = re.compile(r"^(n8n-nodes-base\.|@n8n/)")
return any(
isinstance(n, dict)
and isinstance(n.get("type"), str)
and n8n_type_re.match(n["type"])
for n in nodes
)
def _is_make(data: dict[str, Any]) -> bool:
"""Make.com scenarios have a `flow` array with items containing `module`
fields in `service:action` URI format."""
flow = data.get("flow")
if not isinstance(flow, list) or not flow:
return False
# Check if at least one module has `service:action` pattern
return any(
isinstance(item, dict)
and isinstance(item.get("module"), str)
and ":" in item["module"]
for item in flow
)
def _is_zapier(data: dict[str, Any]) -> bool:
"""Zapier Zaps have a `steps` array with items containing `app` and
`action` fields."""
steps = data.get("steps")
if not isinstance(steps, list) or not steps:
return False
return any(
isinstance(step, dict) and "app" in step and "action" in step for step in steps
)

View File

@@ -0,0 +1,101 @@
"""Tests for format_detector.py."""
from .format_detector import detect_format
from .models import CompetitorFormat
class TestDetectFormat:
def test_n8n_workflow(self):
data = {
"name": "My n8n Workflow",
"nodes": [
{
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"parameters": {"path": "/hook"},
},
{
"name": "HTTP Request",
"type": "n8n-nodes-base.httpRequest",
"parameters": {"url": "https://api.example.com"},
},
],
"connections": {
"Webhook": {
"main": [[{"node": "HTTP Request", "type": "main", "index": 0}]]
}
},
}
assert detect_format(data) == CompetitorFormat.N8N
def test_n8n_langchain_nodes(self):
data = {
"nodes": [
{
"name": "Agent",
"type": "@n8n/n8n-nodes-langchain.agent",
"parameters": {},
},
],
"connections": {},
}
assert detect_format(data) == CompetitorFormat.N8N
def test_make_scenario(self):
data = {
"name": "My Make Scenario",
"flow": [
{
"module": "google-sheets:watchUpdatedCells",
"mapper": {"spreadsheetId": "123"},
},
{
"module": "google-calendar:createAnEvent",
"mapper": {"title": "Test"},
},
],
}
assert detect_format(data) == CompetitorFormat.MAKE
def test_zapier_zap(self):
data = {
"name": "My Zap",
"steps": [
{"app": "gmail", "action": "new_email"},
{
"app": "slack",
"action": "send_message",
"params": {"channel": "#general"},
},
],
}
assert detect_format(data) == CompetitorFormat.ZAPIER
def test_unknown_format(self):
data = {"foo": "bar", "nodes": []}
assert detect_format(data) == CompetitorFormat.UNKNOWN
def test_empty_dict(self):
assert detect_format({}) == CompetitorFormat.UNKNOWN
def test_autogpt_graph_not_detected_as_n8n(self):
"""AutoGPT graphs have nodes but not n8n-style types."""
data = {
"nodes": [
{"id": "abc", "block_id": "some-uuid", "input_default": {}},
],
"connections": {},
}
assert detect_format(data) == CompetitorFormat.UNKNOWN
def test_make_without_colon_not_detected(self):
data = {
"flow": [{"module": "simplemodule", "mapper": {}}],
}
assert detect_format(data) == CompetitorFormat.UNKNOWN
def test_zapier_without_action_not_detected(self):
data = {
"steps": [{"app": "gmail"}],
}
assert detect_format(data) == CompetitorFormat.UNKNOWN

View File

@@ -0,0 +1,34 @@
"""Data models for competitor workflow import."""
from enum import Enum
from typing import Any
import pydantic
class CompetitorFormat(str, Enum):
N8N = "n8n"
MAKE = "make"
ZAPIER = "zapier"
UNKNOWN = "unknown"
class StepDescription(pydantic.BaseModel):
"""A single step/node extracted from a competitor workflow."""
order: int
action: str
service: str
parameters: dict[str, Any] = {}
connections_to: list[int] = []
class WorkflowDescription(pydantic.BaseModel):
"""Structured description of a competitor workflow."""
name: str
description: str
steps: list[StepDescription]
trigger_type: str | None = None
source_format: CompetitorFormat
raw_json: dict[str, Any] = {}

View File

@@ -0,0 +1,63 @@
"""Fetch competitor workflow templates by URL."""
import logging
import re
from typing import Any
from backend.util.request import Requests
logger = logging.getLogger(__name__)
# Patterns for extracting template IDs from n8n URLs
_N8N_WORKFLOW_URL_RE = re.compile(
r"https?://(?:www\.)?n8n\.io/workflows/(\d+)", re.IGNORECASE
)
_N8N_TEMPLATES_API = "https://api.n8n.io/api/templates/workflows/{id}"
async def fetch_n8n_template(url: str) -> dict[str, Any]:
"""Fetch an n8n workflow template by its URL.
Supports URLs like:
- https://n8n.io/workflows/1234
- https://n8n.io/workflows/1234-some-slug
Args:
url: The n8n template URL.
Returns:
The n8n workflow JSON.
Raises:
ValueError: If the URL is not a valid n8n template URL.
RuntimeError: If the fetch fails.
"""
match = _N8N_WORKFLOW_URL_RE.match(url.strip())
if not match:
raise ValueError(
"Not a valid n8n workflow URL. Expected format: "
"https://n8n.io/workflows/<id>"
)
template_id = match.group(1)
api_url = _N8N_TEMPLATES_API.format(id=template_id)
client = Requests(raise_for_status=True)
try:
response = await client.get(api_url)
data = response.json()
except Exception as e:
raise RuntimeError(f"Failed to fetch n8n template {template_id}: {e}") from e
# n8n API wraps the workflow in a `workflow` key
workflow = data.get("workflow", data)
if not isinstance(workflow, dict):
raise RuntimeError(
f"Unexpected response format from n8n API for template {template_id}"
)
# Preserve the workflow name from the template metadata
if "name" not in workflow and "name" in data:
workflow["name"] = data["name"]
return workflow

View File

@@ -1,3 +1,4 @@
import LibraryImportCompetitorDialog from "../LibraryImportCompetitorDialog/LibraryImportCompetitorDialog";
import { LibrarySearchBar } from "../LibrarySearchBar/LibrarySearchBar";
import LibraryUploadAgentDialog from "../LibraryUploadAgentDialog/LibraryUploadAgentDialog";
@@ -11,12 +12,14 @@ export function LibraryActionHeader({ setSearchTerm }: Props) {
<div className="mb-[32px] hidden items-center justify-center gap-4 md:flex">
<LibrarySearchBar setSearchTerm={setSearchTerm} />
<LibraryUploadAgentDialog />
<LibraryImportCompetitorDialog />
</div>
{/* Mobile and tablet */}
<div className="flex flex-col gap-4 p-4 pt-[52px] md:hidden">
<div className="flex w-full justify-between">
<div className="flex w-full justify-between gap-2">
<LibraryUploadAgentDialog />
<LibraryImportCompetitorDialog />
</div>
<div className="flex items-center justify-center">

View File

@@ -0,0 +1,156 @@
"use client";
import { Button } from "@/components/atoms/Button/Button";
import { FileInput } from "@/components/atoms/FileInput/FileInput";
import { Input } from "@/components/atoms/Input/Input";
import { LoadingSpinner } from "@/components/atoms/LoadingSpinner/LoadingSpinner";
import { Dialog } from "@/components/molecules/Dialog/Dialog";
import {
Form,
FormControl,
FormField,
FormItem,
FormMessage,
} from "@/components/molecules/Form/Form";
import { ArrowsClockwiseIcon } from "@phosphor-icons/react";
import { z } from "zod";
import { useLibraryImportCompetitorDialog } from "./useLibraryImportCompetitorDialog";
export const importCompetitorFormSchema = z.object({
workflowFile: z.string(),
templateUrl: z.string(),
});
export default function LibraryImportCompetitorDialog() {
const {
onSubmit,
isConverting,
isOpen,
setIsOpen,
form,
importMode,
setImportMode,
} = useLibraryImportCompetitorDialog();
const hasInput =
importMode === "url"
? !!form.watch("templateUrl")
: !!form.watch("workflowFile");
return (
<Dialog
title="Import Competitor Workflow"
styling={{ maxWidth: "32rem" }}
controlled={{
isOpen,
set: setIsOpen,
}}
onClose={() => {
setIsOpen(false);
form.reset();
}}
>
<Dialog.Trigger>
<Button
data-testid="import-competitor-button"
variant="primary"
className="h-[2.78rem] w-full md:w-[14rem]"
size="small"
>
<ArrowsClockwiseIcon width={18} height={18} />
<span>Import workflow</span>
</Button>
</Dialog.Trigger>
<Dialog.Content>
{/* Mode toggle */}
<div className="mb-4 flex gap-2">
<Button
variant={importMode === "file" ? "primary" : "outline"}
size="small"
onClick={() => setImportMode("file")}
type="button"
>
Upload file
</Button>
<Button
variant={importMode === "url" ? "primary" : "outline"}
size="small"
onClick={() => setImportMode("url")}
type="button"
>
Paste URL
</Button>
</div>
<p className="mb-4 text-sm text-neutral-500">
Import workflows from n8n, Make.com, or Zapier. The workflow will be
automatically converted to an AutoGPT agent.
</p>
<Form
form={form}
onSubmit={onSubmit}
className="flex flex-col justify-center gap-0 px-1"
>
{importMode === "file" ? (
<FormField
control={form.control}
name="workflowFile"
render={({ field }) => (
<FormItem>
<FormControl>
<FileInput
mode="base64"
value={field.value}
onChange={field.onChange}
accept=".json,application/json"
placeholder="Workflow JSON file (n8n, Make.com, or Zapier export)"
maxFileSize={10 * 1024 * 1024}
showStorageNote={false}
className="mb-4 mt-2"
/>
</FormControl>
<FormMessage />
</FormItem>
)}
/>
) : (
<FormField
control={form.control}
name="templateUrl"
render={({ field }) => (
<FormItem>
<FormControl>
<Input
{...field}
id={field.name}
label="n8n template URL"
placeholder="https://n8n.io/workflows/1234"
className="mb-4 mt-2 w-full rounded-[10px]"
/>
</FormControl>
<FormMessage />
</FormItem>
)}
/>
)}
<Button
type="submit"
variant="primary"
className="min-w-[18rem]"
disabled={!hasInput || isConverting}
>
{isConverting ? (
<div className="flex items-center gap-2">
<LoadingSpinner size="small" className="text-white" />
<span>Converting workflow...</span>
</div>
) : (
"Import & Convert"
)}
</Button>
</Form>
</Dialog.Content>
</Dialog>
);
}

View File

@@ -0,0 +1,106 @@
import { useToast } from "@/components/molecules/Toast/use-toast";
import { zodResolver } from "@hookform/resolvers/zod";
import { useState } from "react";
import { useForm } from "react-hook-form";
import { z } from "zod";
import { importCompetitorFormSchema } from "./LibraryImportCompetitorDialog";
export function useLibraryImportCompetitorDialog() {
const [isOpen, setIsOpen] = useState(false);
const { toast } = useToast();
const [isConverting, setIsConverting] = useState(false);
const [importMode, setImportMode] = useState<"file" | "url">("file");
const form = useForm<z.infer<typeof importCompetitorFormSchema>>({
resolver: zodResolver(importCompetitorFormSchema),
defaultValues: {
workflowFile: "",
templateUrl: "",
},
});
const onSubmit = async (
values: z.infer<typeof importCompetitorFormSchema>,
) => {
setIsConverting(true);
try {
let body: Record<string, unknown>;
if (importMode === "url" && values.templateUrl) {
body = { template_url: values.templateUrl, save: true };
} else if (importMode === "file" && values.workflowFile) {
// Decode base64 file to JSON
const base64Match = values.workflowFile.match(
/^data:[^;]+;base64,(.+)$/,
);
if (!base64Match) {
throw new Error("Invalid file format");
}
const jsonString = atob(base64Match[1]);
const workflowJson = JSON.parse(jsonString);
body = { workflow_json: workflowJson, save: true };
} else {
throw new Error("Please provide a workflow file or template URL");
}
const response = await fetch("/api/import/competitor-workflow", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body),
});
if (!response.ok) {
const errorData = await response.json().catch(() => null);
throw new Error(
errorData?.detail || `Import failed (${response.status})`,
);
}
const data = await response.json();
setIsOpen(false);
form.reset();
const notes = data.conversion_notes || [];
const hasWarnings = notes.some(
(n: string) => n.includes("warning") || n.includes("Warning"),
);
toast({
title: "Workflow Imported",
description: hasWarnings
? `Imported from ${data.source_format} with warnings. Check the builder for details.`
: `Successfully imported "${data.source_name}" from ${data.source_format}`,
variant: hasWarnings ? "default" : "default",
});
if (data.graph_id) {
window.location.href = `/build?flowID=${data.graph_id}`;
}
} catch (error) {
console.error("Import failed:", error);
toast({
title: "Import Failed",
description:
error instanceof Error
? error.message
: "Failed to import workflow. Please check the file format.",
variant: "destructive",
duration: 5000,
});
} finally {
setIsConverting(false);
}
};
return {
onSubmit,
isConverting,
isOpen,
setIsOpen,
form,
importMode,
setImportMode,
};
}

View File

@@ -2920,6 +2920,46 @@
}
}
},
"/api/import/competitor-workflow": {
"post": {
"tags": ["v2", "import", "import"],
"summary": "Import a competitor workflow (n8n, Make.com, Zapier)",
"description": "Import a workflow from a competitor platform and convert it to an AutoGPT agent.\n\nAccepts either raw workflow JSON or a template URL (n8n only for now).\nThe workflow is parsed, described, and then converted to an AutoGPT graph\nusing LLM-powered block mapping.",
"operationId": "postV2Import a competitor workflow (n8n, make.com, zapier)",
"requestBody": {
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/ImportWorkflowRequest" }
}
},
"required": true
},
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/ImportWorkflowResponse"
}
}
}
},
"401": {
"$ref": "#/components/responses/HTTP401NotAuthenticatedError"
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
}
}
}
},
"security": [{ "HTTPBearerJWT": [] }]
}
},
"/api/integrations/ayrshare/sso_url": {
"get": {
"tags": ["v1", "integrations"],
@@ -9971,6 +10011,54 @@
"required": ["image_url"],
"title": "ImageURLResponse"
},
"ImportWorkflowRequest": {
"properties": {
"workflow_json": {
"anyOf": [
{ "additionalProperties": true, "type": "object" },
{ "type": "null" }
],
"title": "Workflow Json"
},
"template_url": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Template Url"
},
"save": { "type": "boolean", "title": "Save", "default": true }
},
"type": "object",
"title": "ImportWorkflowRequest",
"description": "Request body for importing a competitor workflow."
},
"ImportWorkflowResponse": {
"properties": {
"graph": {
"additionalProperties": true,
"type": "object",
"title": "Graph"
},
"graph_id": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Graph Id"
},
"library_agent_id": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Library Agent Id"
},
"source_format": { "type": "string", "title": "Source Format" },
"source_name": { "type": "string", "title": "Source Name" },
"conversion_notes": {
"items": { "type": "string" },
"type": "array",
"title": "Conversion Notes",
"default": []
}
},
"type": "object",
"required": ["graph", "source_format", "source_name"],
"title": "ImportWorkflowResponse",
"description": "Response from importing a competitor workflow."
},
"InputValidationErrorResponse": {
"properties": {
"type": {