Compare commits

..

27 Commits

Author SHA1 Message Date
Nicholas Tindle
6d6d3b820e feat(video): add model_id to VideoNarrationBlock for enhanced TTS model selection 2026-01-23 14:20:04 -06:00
Nicholas Tindle
8b5c018032 feat(video): add video codec utility and update video processing blocks for codec handling 2026-01-23 13:52:11 -06:00
Nicholas Tindle
b5611b00b3 feat(video): update video processing blocks and documentation for enhanced functionality 2026-01-23 13:27:34 -06:00
Nicholas Tindle
6cd62c4d50 Merge branch 'dev' into feature/video-editing-blocks 2026-01-23 12:39:34 -06:00
Nicholas Tindle
9f4c33a695 feat(video): refactor video storage methods for improved testability across blocks 2026-01-23 12:36:28 -06:00
Nicholas Tindle
b0debe9488 Merge branch 'feature/video-editing-blocks' of https://github.com/Significant-Gravitas/AutoGPT into feature/video-editing-blocks 2026-01-23 12:16:34 -06:00
Nicholas Tindle
b20767bde9 feat(blocks): add ElevenLabs integration and enhance video processing blocks with media file handling 2026-01-23 12:15:59 -06:00
claude[bot]
b9a9481381 chore(backend): regenerate poetry.lock file
Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
2026-01-23 01:43:25 +00:00
Nicholas Tindle
d2d2a0c0c9 feat(backend): integrate ElevenLabs for video narration and add cost configuration
- Implemented ElevenLabs API integration for generating AI narration in videos.
- Updated VideoNarrationBlock to handle audio generation and mixing with video.
- Added ElevenLabs credentials to the credentials store.
- Configured block costs for using ElevenLabs TTS.
- Enhanced video processing blocks (concat, download, text overlay) for improved functionality.
- Updated dependencies in poetry.lock for ElevenLabs SDK and yt-dlp.
- Added provider icon for ElevenLabs in frontend credentials input.
2026-01-22 19:26:39 -06:00
Nicholas Tindle
521f69220d feat(blocks): export all 8 video blocks from module
Includes migrated blocks from media.py:
- MediaDurationBlock
- LoopVideoBlock  
- AddAudioToVideoBlock
2026-01-22 13:55:22 -06:00
Nicholas Tindle
368adc985d feat(blocks): migrate AddAudioToVideoBlock from media.py
Per review feedback from @majdyz - consolidating video blocks
2026-01-22 13:55:03 -06:00
Nicholas Tindle
8c3216f0a2 feat(blocks): migrate LoopVideoBlock from media.py
Per review feedback from @majdyz - consolidating video blocks
2026-01-22 13:55:02 -06:00
Nicholas Tindle
94063616e5 feat(blocks): migrate MediaDurationBlock from media.py
Per review feedback from @majdyz - consolidating video blocks
2026-01-22 13:55:00 -06:00
Nicholas Tindle
2433a86cb1 fix(blocks): correct import paths in video __init__.py 2026-01-22 13:52:26 -06:00
Nicholas Tindle
0ede203f8e feat(blocks): add VideoNarrationBlock
- Move imports to top level
- Use tempfile for secure temp paths
- Add exception chaining (from e)
- Close AudioFileClip in finally block
- Document that ducking = reduced volume mix
- Extract helper method for test mocking
- Proper resource cleanup
2026-01-22 13:52:10 -06:00
Nicholas Tindle
dc751316c5 feat(blocks): add VideoTextOverlayBlock
- Move imports to top level
- Use tempfile for secure temp paths
- Add exception chaining (from e)
- Add start_time/end_time validation
- Extract helper method for test mocking
- Proper resource cleanup in finally
2026-01-22 13:51:37 -06:00
Nicholas Tindle
e7fb54e6af feat(blocks): add VideoDownloadBlock
- Move imports to top level
- Use tempfile for secure temp paths
- Add exception chaining (from e)
- Extract helper method for test mocking
2026-01-22 13:51:04 -06:00
Nicholas Tindle
7b76f4d1e4 feat(blocks): add VideoConcatBlock
- Move imports to top level
- Use tempfile for secure temp paths
- Add exception chaining (from e)
- Constrain output_format to enum
- Add ge=0.0 to transition_duration
- Extract helper method for test mocking
- Proper resource cleanup in finally
2026-01-22 13:50:35 -06:00
Nicholas Tindle
3cc56de0fa feat(blocks): add VideoClipBlock
- Move imports to top level
- Use tempfile for secure temp paths
- Add exception chaining (from e)
- Constrain output_format to enum
- Extract helper method for test mocking
- Proper resource cleanup in finally
2026-01-22 13:50:12 -06:00
Nicholas Tindle
d2bead0f7a feat(blocks): create video module with all blocks
Consolidate video editing blocks into dedicated module.
Migrate blocks from media.py per review feedback.

Addresses: @majdyz review comment
2026-01-22 13:49:48 -06:00
claude[bot]
f8d3893c16 fix(blocks): Address review feedback for video editing blocks
- Add start_time < end_time validation in VideoClipBlock and VideoTextOverlayBlock
- Fix resource leaks: close AudioFileClip in narration.py, TextClip in text_overlay.py
- Fix concat.py: proper resource cleanup in finally block, load clips individually
- Implement proper crossfade using crossfadein/crossfadeout
- Implement ducking mode with stronger attenuation (0.3x original_volume)
- Remove unused start_time/end_time params from VideoDownloadBlock
- Fix None handling for duration/title in download.py (use 'or' instead of 'get' default)
- Add exception chaining with 'from e' in all blocks
- Add minimum clips validation in VideoConcatBlock
- Sort __all__ in __init__.py
- Increase ElevenLabs API timeout to 120s for longer scripts

Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
2026-01-18 23:27:04 +00:00
Nicholas Tindle
1cfbc0dd08 feat(video): Update __init__.py with full exports 2026-01-18 15:34:04 -06:00
Nicholas Tindle
ff84643b48 feat(video): Add VideoNarrationBlock 2026-01-18 15:33:48 -06:00
Nicholas Tindle
c19c3c834a feat(video): Add VideoTextOverlayBlock 2026-01-18 15:33:47 -06:00
Nicholas Tindle
d0f7ba8cfd feat(video): Add VideoConcatBlock 2026-01-18 15:33:46 -06:00
Nicholas Tindle
2a855f4bd0 feat(video): Add VideoClipBlock 2026-01-18 15:32:59 -06:00
Nicholas Tindle
b93bb3b9f8 feat(video): Add VideoDownloadBlock 2026-01-18 15:32:58 -06:00
95 changed files with 4449 additions and 4635 deletions

View File

@@ -152,6 +152,7 @@ REPLICATE_API_KEY=
REVID_API_KEY=
SCREENSHOTONE_API_KEY=
UNREAL_SPEECH_API_KEY=
ELEVENLABS_API_KEY=
# Data & Search Services
E2B_API_KEY=

View File

@@ -62,10 +62,11 @@ ENV POETRY_HOME=/opt/poetry \
DEBIAN_FRONTEND=noninteractive
ENV PATH=/opt/poetry/bin:$PATH
# Install Python without upgrading system-managed packages
# Install Python and FFmpeg (required for video processing blocks)
RUN apt-get update && apt-get install -y \
python3.13 \
python3-pip \
ffmpeg \
&& rm -rf /var/lib/apt/lists/*
# Copy only necessary files from builder

View File

@@ -1,28 +1,29 @@
"""Agent generator package - Creates agents from natural language."""
from .core import (
AgentGeneratorNotConfiguredError,
apply_agent_patch,
decompose_goal,
generate_agent,
generate_agent_patch,
get_agent_as_json,
json_to_graph,
save_agent_to_library,
)
from .service import health_check as check_external_service_health
from .service import is_external_service_configured
from .fixer import apply_all_fixes
from .utils import get_blocks_info
from .validator import validate_agent
__all__ = [
# Core functions
"decompose_goal",
"generate_agent",
"generate_agent_patch",
"apply_agent_patch",
"save_agent_to_library",
"get_agent_as_json",
"json_to_graph",
# Exceptions
"AgentGeneratorNotConfiguredError",
# Service
"is_external_service_configured",
"check_external_service_health",
# Fixer
"apply_all_fixes",
# Validator
"validate_agent",
# Utils
"get_blocks_info",
]

View File

@@ -0,0 +1,25 @@
"""OpenRouter client configuration for agent generation."""
import os
from openai import AsyncOpenAI
# Configuration - use OPEN_ROUTER_API_KEY for consistency with chat/config.py
OPENROUTER_API_KEY = os.getenv("OPEN_ROUTER_API_KEY")
AGENT_GENERATOR_MODEL = os.getenv("AGENT_GENERATOR_MODEL", "anthropic/claude-opus-4.5")
# OpenRouter client (OpenAI-compatible API)
_client: AsyncOpenAI | None = None
def get_client() -> AsyncOpenAI:
"""Get or create the OpenRouter client."""
global _client
if _client is None:
if not OPENROUTER_API_KEY:
raise ValueError("OPENROUTER_API_KEY environment variable is required")
_client = AsyncOpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=OPENROUTER_API_KEY,
)
return _client

View File

@@ -1,5 +1,7 @@
"""Core agent generation functions."""
import copy
import json
import logging
import uuid
from typing import Any
@@ -7,35 +9,13 @@ from typing import Any
from backend.api.features.library import db as library_db
from backend.data.graph import Graph, Link, Node, create_graph
from .service import (
decompose_goal_external,
generate_agent_external,
generate_agent_patch_external,
is_external_service_configured,
)
from .client import AGENT_GENERATOR_MODEL, get_client
from .prompts import DECOMPOSITION_PROMPT, GENERATION_PROMPT, PATCH_PROMPT
from .utils import get_block_summaries, parse_json_from_llm
logger = logging.getLogger(__name__)
class AgentGeneratorNotConfiguredError(Exception):
"""Raised when the external Agent Generator service is not configured."""
pass
def _check_service_configured() -> None:
"""Check if the external Agent Generator service is configured.
Raises:
AgentGeneratorNotConfiguredError: If the service is not configured.
"""
if not is_external_service_configured():
raise AgentGeneratorNotConfiguredError(
"Agent Generator service is not configured. "
"Set AGENTGENERATOR_HOST environment variable to enable agent generation."
)
async def decompose_goal(description: str, context: str = "") -> dict[str, Any] | None:
"""Break down a goal into steps or return clarifying questions.
@@ -48,13 +28,40 @@ async def decompose_goal(description: str, context: str = "") -> dict[str, Any]
- {"type": "clarifying_questions", "questions": [...]}
- {"type": "instructions", "steps": [...]}
Or None on error
Raises:
AgentGeneratorNotConfiguredError: If the external service is not configured.
"""
_check_service_configured()
logger.info("Calling external Agent Generator service for decompose_goal")
return await decompose_goal_external(description, context)
client = get_client()
prompt = DECOMPOSITION_PROMPT.format(block_summaries=get_block_summaries())
full_description = description
if context:
full_description = f"{description}\n\nAdditional context:\n{context}"
try:
response = await client.chat.completions.create(
model=AGENT_GENERATOR_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": full_description},
],
temperature=0,
)
content = response.choices[0].message.content
if content is None:
logger.error("LLM returned empty content for decomposition")
return None
result = parse_json_from_llm(content)
if result is None:
logger.error(f"Failed to parse decomposition response: {content[:200]}")
return None
return result
except Exception as e:
logger.error(f"Error decomposing goal: {e}")
return None
async def generate_agent(instructions: dict[str, Any]) -> dict[str, Any] | None:
@@ -65,14 +72,31 @@ async def generate_agent(instructions: dict[str, Any]) -> dict[str, Any] | None:
Returns:
Agent JSON dict or None on error
Raises:
AgentGeneratorNotConfiguredError: If the external service is not configured.
"""
_check_service_configured()
logger.info("Calling external Agent Generator service for generate_agent")
result = await generate_agent_external(instructions)
if result:
client = get_client()
prompt = GENERATION_PROMPT.format(block_summaries=get_block_summaries())
try:
response = await client.chat.completions.create(
model=AGENT_GENERATOR_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": json.dumps(instructions, indent=2)},
],
temperature=0,
)
content = response.choices[0].message.content
if content is None:
logger.error("LLM returned empty content for agent generation")
return None
result = parse_json_from_llm(content)
if result is None:
logger.error(f"Failed to parse agent JSON: {content[:200]}")
return None
# Ensure required fields
if "id" not in result:
result["id"] = str(uuid.uuid4())
@@ -80,7 +104,12 @@ async def generate_agent(instructions: dict[str, Any]) -> dict[str, Any] | None:
result["version"] = 1
if "is_active" not in result:
result["is_active"] = True
return result
return result
except Exception as e:
logger.error(f"Error generating agent: {e}")
return None
def json_to_graph(agent_json: dict[str, Any]) -> Graph:
@@ -255,23 +284,108 @@ async def get_agent_as_json(
async def generate_agent_patch(
update_request: str, current_agent: dict[str, Any]
) -> dict[str, Any] | None:
"""Update an existing agent using natural language.
The external Agent Generator service handles:
- Generating the patch
- Applying the patch
- Fixing and validating the result
"""Generate a patch to update an existing agent.
Args:
update_request: Natural language description of changes
current_agent: Current agent JSON
Returns:
Updated agent JSON, clarifying questions dict, or None on error
Raises:
AgentGeneratorNotConfiguredError: If the external service is not configured.
Patch dict or clarifying questions, or None on error
"""
_check_service_configured()
logger.info("Calling external Agent Generator service for generate_agent_patch")
return await generate_agent_patch_external(update_request, current_agent)
client = get_client()
prompt = PATCH_PROMPT.format(
current_agent=json.dumps(current_agent, indent=2),
block_summaries=get_block_summaries(),
)
try:
response = await client.chat.completions.create(
model=AGENT_GENERATOR_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": update_request},
],
temperature=0,
)
content = response.choices[0].message.content
if content is None:
logger.error("LLM returned empty content for patch generation")
return None
return parse_json_from_llm(content)
except Exception as e:
logger.error(f"Error generating patch: {e}")
return None
def apply_agent_patch(
current_agent: dict[str, Any], patch: dict[str, Any]
) -> dict[str, Any]:
"""Apply a patch to an existing agent.
Args:
current_agent: Current agent JSON
patch: Patch dict with operations
Returns:
Updated agent JSON
"""
agent = copy.deepcopy(current_agent)
patches = patch.get("patches", [])
for p in patches:
patch_type = p.get("type")
if patch_type == "modify":
node_id = p.get("node_id")
changes = p.get("changes", {})
for node in agent.get("nodes", []):
if node["id"] == node_id:
_deep_update(node, changes)
logger.debug(f"Modified node {node_id}")
break
elif patch_type == "add":
new_nodes = p.get("new_nodes", [])
new_links = p.get("new_links", [])
agent["nodes"] = agent.get("nodes", []) + new_nodes
agent["links"] = agent.get("links", []) + new_links
logger.debug(f"Added {len(new_nodes)} nodes, {len(new_links)} links")
elif patch_type == "remove":
node_ids_to_remove = set(p.get("node_ids", []))
link_ids_to_remove = set(p.get("link_ids", []))
# Remove nodes
agent["nodes"] = [
n for n in agent.get("nodes", []) if n["id"] not in node_ids_to_remove
]
# Remove links (both explicit and those referencing removed nodes)
agent["links"] = [
link
for link in agent.get("links", [])
if link["id"] not in link_ids_to_remove
and link["source_id"] not in node_ids_to_remove
and link["sink_id"] not in node_ids_to_remove
]
logger.debug(
f"Removed {len(node_ids_to_remove)} nodes, {len(link_ids_to_remove)} links"
)
return agent
def _deep_update(target: dict, source: dict) -> None:
"""Recursively update a dict with another dict."""
for key, value in source.items():
if key in target and isinstance(target[key], dict) and isinstance(value, dict):
_deep_update(target[key], value)
else:
target[key] = value

View File

@@ -0,0 +1,606 @@
"""Agent fixer - Fixes common LLM generation errors."""
import logging
import re
import uuid
from typing import Any
from .utils import (
ADDTODICTIONARY_BLOCK_ID,
ADDTOLIST_BLOCK_ID,
CODE_EXECUTION_BLOCK_ID,
CONDITION_BLOCK_ID,
CREATEDICT_BLOCK_ID,
CREATELIST_BLOCK_ID,
DATA_SAMPLING_BLOCK_ID,
DOUBLE_CURLY_BRACES_BLOCK_IDS,
GET_CURRENT_DATE_BLOCK_ID,
STORE_VALUE_BLOCK_ID,
UNIVERSAL_TYPE_CONVERTER_BLOCK_ID,
get_blocks_info,
is_valid_uuid,
)
logger = logging.getLogger(__name__)
def fix_agent_ids(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix invalid UUIDs in agent and link IDs."""
# Fix agent ID
if not is_valid_uuid(agent.get("id", "")):
agent["id"] = str(uuid.uuid4())
logger.debug(f"Fixed agent ID: {agent['id']}")
# Fix node IDs
id_mapping = {} # Old ID -> New ID
for node in agent.get("nodes", []):
if not is_valid_uuid(node.get("id", "")):
old_id = node.get("id", "")
new_id = str(uuid.uuid4())
id_mapping[old_id] = new_id
node["id"] = new_id
logger.debug(f"Fixed node ID: {old_id} -> {new_id}")
# Fix link IDs and update references
for link in agent.get("links", []):
if not is_valid_uuid(link.get("id", "")):
link["id"] = str(uuid.uuid4())
logger.debug(f"Fixed link ID: {link['id']}")
# Update source/sink IDs if they were remapped
if link.get("source_id") in id_mapping:
link["source_id"] = id_mapping[link["source_id"]]
if link.get("sink_id") in id_mapping:
link["sink_id"] = id_mapping[link["sink_id"]]
return agent
def fix_double_curly_braces(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix single curly braces to double in template blocks."""
for node in agent.get("nodes", []):
if node.get("block_id") not in DOUBLE_CURLY_BRACES_BLOCK_IDS:
continue
input_data = node.get("input_default", {})
for key in ("prompt", "format"):
if key in input_data and isinstance(input_data[key], str):
original = input_data[key]
# Fix simple variable references: {var} -> {{var}}
fixed = re.sub(
r"(?<!\{)\{([a-zA-Z_][a-zA-Z0-9_]*)\}(?!\})",
r"{{\1}}",
original,
)
if fixed != original:
input_data[key] = fixed
logger.debug(f"Fixed curly braces in {key}")
return agent
def fix_storevalue_before_condition(agent: dict[str, Any]) -> dict[str, Any]:
"""Add StoreValueBlock before ConditionBlock if needed for value2."""
nodes = agent.get("nodes", [])
links = agent.get("links", [])
# Find all ConditionBlock nodes
condition_node_ids = {
node["id"] for node in nodes if node.get("block_id") == CONDITION_BLOCK_ID
}
if not condition_node_ids:
return agent
new_nodes = []
new_links = []
processed_conditions = set()
for link in links:
sink_id = link.get("sink_id")
sink_name = link.get("sink_name")
# Check if this link goes to a ConditionBlock's value2
if sink_id in condition_node_ids and sink_name == "value2":
source_node = next(
(n for n in nodes if n["id"] == link.get("source_id")), None
)
# Skip if source is already a StoreValueBlock
if source_node and source_node.get("block_id") == STORE_VALUE_BLOCK_ID:
continue
# Skip if we already processed this condition
if sink_id in processed_conditions:
continue
processed_conditions.add(sink_id)
# Create StoreValueBlock
store_node_id = str(uuid.uuid4())
store_node = {
"id": store_node_id,
"block_id": STORE_VALUE_BLOCK_ID,
"input_default": {"data": None},
"metadata": {"position": {"x": 0, "y": -100}},
}
new_nodes.append(store_node)
# Create link: original source -> StoreValueBlock
new_links.append(
{
"id": str(uuid.uuid4()),
"source_id": link["source_id"],
"source_name": link["source_name"],
"sink_id": store_node_id,
"sink_name": "input",
"is_static": False,
}
)
# Update original link: StoreValueBlock -> ConditionBlock
link["source_id"] = store_node_id
link["source_name"] = "output"
logger.debug(f"Added StoreValueBlock before ConditionBlock {sink_id}")
if new_nodes:
agent["nodes"] = nodes + new_nodes
return agent
def fix_addtolist_blocks(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix AddToList blocks by adding prerequisite empty AddToList block.
When an AddToList block is found:
1. Checks if there's a CreateListBlock before it
2. Removes CreateListBlock if linked directly to AddToList
3. Adds an empty AddToList block before the original
4. Ensures the original has a self-referencing link
"""
nodes = agent.get("nodes", [])
links = agent.get("links", [])
new_nodes = []
original_addtolist_ids = set()
nodes_to_remove = set()
links_to_remove = []
# First pass: identify CreateListBlock nodes to remove
for link in links:
source_node = next(
(n for n in nodes if n.get("id") == link.get("source_id")), None
)
sink_node = next((n for n in nodes if n.get("id") == link.get("sink_id")), None)
if (
source_node
and sink_node
and source_node.get("block_id") == CREATELIST_BLOCK_ID
and sink_node.get("block_id") == ADDTOLIST_BLOCK_ID
):
nodes_to_remove.add(source_node.get("id"))
links_to_remove.append(link)
logger.debug(f"Removing CreateListBlock {source_node.get('id')}")
# Second pass: process AddToList blocks
filtered_nodes = []
for node in nodes:
if node.get("id") in nodes_to_remove:
continue
if node.get("block_id") == ADDTOLIST_BLOCK_ID:
original_addtolist_ids.add(node.get("id"))
node_id = node.get("id")
pos = node.get("metadata", {}).get("position", {"x": 0, "y": 0})
# Check if already has prerequisite
has_prereq = any(
link.get("sink_id") == node_id
and link.get("sink_name") == "list"
and link.get("source_name") == "updated_list"
for link in links
)
if not has_prereq:
# Remove links to "list" input (except self-reference)
for link in links:
if (
link.get("sink_id") == node_id
and link.get("sink_name") == "list"
and link.get("source_id") != node_id
and link not in links_to_remove
):
links_to_remove.append(link)
# Create prerequisite AddToList block
prereq_id = str(uuid.uuid4())
prereq_node = {
"id": prereq_id,
"block_id": ADDTOLIST_BLOCK_ID,
"input_default": {"list": [], "entry": None, "entries": []},
"metadata": {
"position": {"x": pos.get("x", 0) - 800, "y": pos.get("y", 0)}
},
}
new_nodes.append(prereq_node)
# Link prerequisite to original
links.append(
{
"id": str(uuid.uuid4()),
"source_id": prereq_id,
"source_name": "updated_list",
"sink_id": node_id,
"sink_name": "list",
"is_static": False,
}
)
logger.debug(f"Added prerequisite AddToList block for {node_id}")
filtered_nodes.append(node)
# Remove marked links
filtered_links = [link for link in links if link not in links_to_remove]
# Add self-referencing links for original AddToList blocks
for node in filtered_nodes + new_nodes:
if (
node.get("block_id") == ADDTOLIST_BLOCK_ID
and node.get("id") in original_addtolist_ids
):
node_id = node.get("id")
has_self_ref = any(
link["source_id"] == node_id
and link["sink_id"] == node_id
and link["source_name"] == "updated_list"
and link["sink_name"] == "list"
for link in filtered_links
)
if not has_self_ref:
filtered_links.append(
{
"id": str(uuid.uuid4()),
"source_id": node_id,
"source_name": "updated_list",
"sink_id": node_id,
"sink_name": "list",
"is_static": False,
}
)
logger.debug(f"Added self-reference for AddToList {node_id}")
agent["nodes"] = filtered_nodes + new_nodes
agent["links"] = filtered_links
return agent
def fix_addtodictionary_blocks(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix AddToDictionary blocks by removing empty CreateDictionary nodes."""
nodes = agent.get("nodes", [])
links = agent.get("links", [])
nodes_to_remove = set()
links_to_remove = []
for link in links:
source_node = next(
(n for n in nodes if n.get("id") == link.get("source_id")), None
)
sink_node = next((n for n in nodes if n.get("id") == link.get("sink_id")), None)
if (
source_node
and sink_node
and source_node.get("block_id") == CREATEDICT_BLOCK_ID
and sink_node.get("block_id") == ADDTODICTIONARY_BLOCK_ID
):
nodes_to_remove.add(source_node.get("id"))
links_to_remove.append(link)
logger.debug(f"Removing CreateDictionary {source_node.get('id')}")
agent["nodes"] = [n for n in nodes if n.get("id") not in nodes_to_remove]
agent["links"] = [link for link in links if link not in links_to_remove]
return agent
def fix_code_execution_output(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix CodeExecutionBlock output: change 'response' to 'stdout_logs'."""
nodes = agent.get("nodes", [])
links = agent.get("links", [])
for link in links:
source_node = next(
(n for n in nodes if n.get("id") == link.get("source_id")), None
)
if (
source_node
and source_node.get("block_id") == CODE_EXECUTION_BLOCK_ID
and link.get("source_name") == "response"
):
link["source_name"] = "stdout_logs"
logger.debug("Fixed CodeExecutionBlock output: response -> stdout_logs")
return agent
def fix_data_sampling_sample_size(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix DataSamplingBlock by setting sample_size to 1 as default."""
nodes = agent.get("nodes", [])
links = agent.get("links", [])
links_to_remove = []
for node in nodes:
if node.get("block_id") == DATA_SAMPLING_BLOCK_ID:
node_id = node.get("id")
input_default = node.get("input_default", {})
# Remove links to sample_size
for link in links:
if (
link.get("sink_id") == node_id
and link.get("sink_name") == "sample_size"
):
links_to_remove.append(link)
# Set default
input_default["sample_size"] = 1
node["input_default"] = input_default
logger.debug(f"Fixed DataSamplingBlock {node_id} sample_size to 1")
if links_to_remove:
agent["links"] = [link for link in links if link not in links_to_remove]
return agent
def fix_node_x_coordinates(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix node x-coordinates to ensure 800+ unit spacing between linked nodes."""
nodes = agent.get("nodes", [])
links = agent.get("links", [])
node_lookup = {n.get("id"): n for n in nodes}
for link in links:
source_id = link.get("source_id")
sink_id = link.get("sink_id")
source_node = node_lookup.get(source_id)
sink_node = node_lookup.get(sink_id)
if not source_node or not sink_node:
continue
source_pos = source_node.get("metadata", {}).get("position", {})
sink_pos = sink_node.get("metadata", {}).get("position", {})
source_x = source_pos.get("x", 0)
sink_x = sink_pos.get("x", 0)
if abs(sink_x - source_x) < 800:
new_x = source_x + 800
if "metadata" not in sink_node:
sink_node["metadata"] = {}
if "position" not in sink_node["metadata"]:
sink_node["metadata"]["position"] = {}
sink_node["metadata"]["position"]["x"] = new_x
logger.debug(f"Fixed node {sink_id} x: {sink_x} -> {new_x}")
return agent
def fix_getcurrentdate_offset(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix GetCurrentDateBlock offset to ensure it's positive."""
for node in agent.get("nodes", []):
if node.get("block_id") == GET_CURRENT_DATE_BLOCK_ID:
input_default = node.get("input_default", {})
if "offset" in input_default:
offset = input_default["offset"]
if isinstance(offset, (int, float)) and offset < 0:
input_default["offset"] = abs(offset)
logger.debug(f"Fixed offset: {offset} -> {abs(offset)}")
return agent
def fix_ai_model_parameter(
agent: dict[str, Any],
blocks_info: list[dict[str, Any]],
default_model: str = "gpt-4o",
) -> dict[str, Any]:
"""Add default model parameter to AI blocks if missing."""
block_map = {b.get("id"): b for b in blocks_info}
for node in agent.get("nodes", []):
block_id = node.get("block_id")
block = block_map.get(block_id)
if not block:
continue
# Check if block has AI category
categories = block.get("categories", [])
is_ai_block = any(
cat.get("category") == "AI" for cat in categories if isinstance(cat, dict)
)
if is_ai_block:
input_default = node.get("input_default", {})
if "model" not in input_default:
input_default["model"] = default_model
node["input_default"] = input_default
logger.debug(
f"Added model '{default_model}' to AI block {node.get('id')}"
)
return agent
def fix_link_static_properties(
agent: dict[str, Any], blocks_info: list[dict[str, Any]]
) -> dict[str, Any]:
"""Fix is_static property based on source block's staticOutput."""
block_map = {b.get("id"): b for b in blocks_info}
node_lookup = {n.get("id"): n for n in agent.get("nodes", [])}
for link in agent.get("links", []):
source_node = node_lookup.get(link.get("source_id"))
if not source_node:
continue
source_block = block_map.get(source_node.get("block_id"))
if not source_block:
continue
static_output = source_block.get("staticOutput", False)
if link.get("is_static") != static_output:
link["is_static"] = static_output
logger.debug(f"Fixed link {link.get('id')} is_static to {static_output}")
return agent
def fix_data_type_mismatch(
agent: dict[str, Any], blocks_info: list[dict[str, Any]]
) -> dict[str, Any]:
"""Fix data type mismatches by inserting UniversalTypeConverterBlock."""
nodes = agent.get("nodes", [])
links = agent.get("links", [])
block_map = {b.get("id"): b for b in blocks_info}
node_lookup = {n.get("id"): n for n in nodes}
def get_property_type(schema: dict, name: str) -> str | None:
if "_#_" in name:
parent, child = name.split("_#_", 1)
parent_schema = schema.get(parent, {})
if "properties" in parent_schema:
return parent_schema["properties"].get(child, {}).get("type")
return None
return schema.get(name, {}).get("type")
def are_types_compatible(src: str, sink: str) -> bool:
if {src, sink} <= {"integer", "number"}:
return True
return src == sink
type_mapping = {
"string": "string",
"text": "string",
"integer": "number",
"number": "number",
"float": "number",
"boolean": "boolean",
"bool": "boolean",
"array": "list",
"list": "list",
"object": "dictionary",
"dict": "dictionary",
"dictionary": "dictionary",
}
new_links = []
nodes_to_add = []
for link in links:
source_node = node_lookup.get(link.get("source_id"))
sink_node = node_lookup.get(link.get("sink_id"))
if not source_node or not sink_node:
new_links.append(link)
continue
source_block = block_map.get(source_node.get("block_id"))
sink_block = block_map.get(sink_node.get("block_id"))
if not source_block or not sink_block:
new_links.append(link)
continue
source_outputs = source_block.get("outputSchema", {}).get("properties", {})
sink_inputs = sink_block.get("inputSchema", {}).get("properties", {})
source_type = get_property_type(source_outputs, link.get("source_name", ""))
sink_type = get_property_type(sink_inputs, link.get("sink_name", ""))
if (
source_type
and sink_type
and not are_types_compatible(source_type, sink_type)
):
# Insert type converter
converter_id = str(uuid.uuid4())
target_type = type_mapping.get(sink_type, sink_type)
converter_node = {
"id": converter_id,
"block_id": UNIVERSAL_TYPE_CONVERTER_BLOCK_ID,
"input_default": {"type": target_type},
"metadata": {"position": {"x": 0, "y": 100}},
}
nodes_to_add.append(converter_node)
# source -> converter
new_links.append(
{
"id": str(uuid.uuid4()),
"source_id": link["source_id"],
"source_name": link["source_name"],
"sink_id": converter_id,
"sink_name": "value",
"is_static": False,
}
)
# converter -> sink
new_links.append(
{
"id": str(uuid.uuid4()),
"source_id": converter_id,
"source_name": "value",
"sink_id": link["sink_id"],
"sink_name": link["sink_name"],
"is_static": False,
}
)
logger.debug(f"Inserted type converter: {source_type} -> {target_type}")
else:
new_links.append(link)
if nodes_to_add:
agent["nodes"] = nodes + nodes_to_add
agent["links"] = new_links
return agent
def apply_all_fixes(
agent: dict[str, Any], blocks_info: list[dict[str, Any]] | None = None
) -> dict[str, Any]:
"""Apply all fixes to an agent JSON.
Args:
agent: Agent JSON dict
blocks_info: Optional list of block info dicts for advanced fixes
Returns:
Fixed agent JSON
"""
# Basic fixes (no block info needed)
agent = fix_agent_ids(agent)
agent = fix_double_curly_braces(agent)
agent = fix_storevalue_before_condition(agent)
agent = fix_addtolist_blocks(agent)
agent = fix_addtodictionary_blocks(agent)
agent = fix_code_execution_output(agent)
agent = fix_data_sampling_sample_size(agent)
agent = fix_node_x_coordinates(agent)
agent = fix_getcurrentdate_offset(agent)
# Advanced fixes (require block info)
if blocks_info is None:
blocks_info = get_blocks_info()
agent = fix_ai_model_parameter(agent, blocks_info)
agent = fix_link_static_properties(agent, blocks_info)
agent = fix_data_type_mismatch(agent, blocks_info)
return agent

View File

@@ -0,0 +1,225 @@
"""Prompt templates for agent generation."""
DECOMPOSITION_PROMPT = """
You are an expert AutoGPT Workflow Decomposer. Your task is to analyze a user's high-level goal and break it down into a clear, step-by-step plan using the available blocks.
Each step should represent a distinct, automatable action suitable for execution by an AI automation system.
---
FIRST: Analyze the user's goal and determine:
1) Design-time configuration (fixed settings that won't change per run)
2) Runtime inputs (values the agent's end-user will provide each time it runs)
For anything that can vary per run (email addresses, names, dates, search terms, etc.):
- DO NOT ask for the actual value
- Instead, define it as an Agent Input with a clear name, type, and description
Only ask clarifying questions about design-time config that affects how you build the workflow:
- Which external service to use (e.g., "Gmail vs Outlook", "Notion vs Google Docs")
- Required formats or structures (e.g., "CSV, JSON, or PDF output?")
- Business rules that must be hard-coded
IMPORTANT CLARIFICATIONS POLICY:
- Ask no more than five essential questions
- Do not ask for concrete values that can be provided at runtime as Agent Inputs
- Do not ask for API keys or credentials; the platform handles those directly
- If there is enough information to infer reasonable defaults, prefer to propose defaults
---
GUIDELINES:
1. List each step as a numbered item
2. Describe the action clearly and specify inputs/outputs
3. Ensure steps are in logical, sequential order
4. Mention block names naturally (e.g., "Use GetWeatherByLocationBlock to...")
5. Help the user reach their goal efficiently
---
RULES:
1. OUTPUT FORMAT: Only output either clarifying questions OR step-by-step instructions, not both
2. USE ONLY THE BLOCKS PROVIDED
3. ALL required_input fields must be provided
4. Data types of linked properties must match
5. Write expert-level prompts for AI-related blocks
---
CRITICAL BLOCK RESTRICTIONS:
1. AddToListBlock: Outputs updated list EVERY addition, not after all additions
2. SendEmailBlock: Draft the email for user review; set SMTP config based on email type
3. ConditionBlock: value2 is reference, value1 is contrast
4. CodeExecutionBlock: DO NOT USE - use AI blocks instead
5. ReadCsvBlock: Only use the 'rows' output, not 'row'
---
OUTPUT FORMAT:
If more information is needed:
```json
{{
"type": "clarifying_questions",
"questions": [
{{
"question": "Which email provider should be used? (Gmail, Outlook, custom SMTP)",
"keyword": "email_provider",
"example": "Gmail"
}}
]
}}
```
If ready to proceed:
```json
{{
"type": "instructions",
"steps": [
{{
"step_number": 1,
"block_name": "AgentShortTextInputBlock",
"description": "Get the URL of the content to analyze.",
"inputs": [{{"name": "name", "value": "URL"}}],
"outputs": [{{"name": "result", "description": "The URL entered by user"}}]
}}
]
}}
```
---
AVAILABLE BLOCKS:
{block_summaries}
"""
GENERATION_PROMPT = """
You are an expert AI workflow builder. Generate a valid agent JSON from the given instructions.
---
NODES:
Each node must include:
- `id`: Unique UUID v4 (e.g. `a8f5b1e2-c3d4-4e5f-8a9b-0c1d2e3f4a5b`)
- `block_id`: The block identifier (must match an Allowed Block)
- `input_default`: Dict of inputs (can be empty if no static inputs needed)
- `metadata`: Must contain:
- `position`: {{"x": number, "y": number}} - adjacent nodes should differ by 800+ in X
- `customized_name`: Clear name describing this block's purpose in the workflow
---
LINKS:
Each link connects a source node's output to a sink node's input:
- `id`: MUST be UUID v4 (NOT "link-1", "link-2", etc.)
- `source_id`: ID of the source node
- `source_name`: Output field name from the source block
- `sink_id`: ID of the sink node
- `sink_name`: Input field name on the sink block
- `is_static`: true only if source block has static_output: true
CRITICAL: All IDs must be valid UUID v4 format!
---
AGENT (GRAPH):
Wrap nodes and links in:
- `id`: UUID of the agent
- `name`: Short, generic name (avoid specific company names, URLs)
- `description`: Short, generic description
- `nodes`: List of all nodes
- `links`: List of all links
- `version`: 1
- `is_active`: true
---
TIPS:
- All required_input fields must be provided via input_default or a valid link
- Ensure consistent source_id and sink_id references
- Avoid dangling links
- Input/output pins must match block schemas
- Do not invent unknown block_ids
---
ALLOWED BLOCKS:
{block_summaries}
---
Generate the complete agent JSON. Output ONLY valid JSON, no explanation.
"""
PATCH_PROMPT = """
You are an expert at modifying AutoGPT agent workflows. Given the current agent and a modification request, generate a JSON patch to update the agent.
CURRENT AGENT:
{current_agent}
AVAILABLE BLOCKS:
{block_summaries}
---
PATCH FORMAT:
Return a JSON object with the following structure:
```json
{{
"type": "patch",
"intent": "Brief description of what the patch does",
"patches": [
{{
"type": "modify",
"node_id": "uuid-of-node-to-modify",
"changes": {{
"input_default": {{"field": "new_value"}},
"metadata": {{"customized_name": "New Name"}}
}}
}},
{{
"type": "add",
"new_nodes": [
{{
"id": "new-uuid",
"block_id": "block-uuid",
"input_default": {{}},
"metadata": {{"position": {{"x": 0, "y": 0}}, "customized_name": "Name"}}
}}
],
"new_links": [
{{
"id": "link-uuid",
"source_id": "source-node-id",
"source_name": "output_field",
"sink_id": "sink-node-id",
"sink_name": "input_field"
}}
]
}},
{{
"type": "remove",
"node_ids": ["uuid-of-node-to-remove"],
"link_ids": ["uuid-of-link-to-remove"]
}}
]
}}
```
If you need more information, return:
```json
{{
"type": "clarifying_questions",
"questions": [
{{
"question": "What specific change do you want?",
"keyword": "change_type",
"example": "Add error handling"
}}
]
}}
```
Generate the minimal patch needed. Output ONLY valid JSON.
"""

View File

@@ -1,269 +0,0 @@
"""External Agent Generator service client.
This module provides a client for communicating with the external Agent Generator
microservice. When AGENTGENERATOR_HOST is configured, the agent generation functions
will delegate to the external service instead of using the built-in LLM-based implementation.
"""
import logging
from typing import Any
import httpx
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
_client: httpx.AsyncClient | None = None
_settings: Settings | None = None
def _get_settings() -> Settings:
"""Get or create settings singleton."""
global _settings
if _settings is None:
_settings = Settings()
return _settings
def is_external_service_configured() -> bool:
"""Check if external Agent Generator service is configured."""
settings = _get_settings()
return bool(settings.config.agentgenerator_host)
def _get_base_url() -> str:
"""Get the base URL for the external service."""
settings = _get_settings()
host = settings.config.agentgenerator_host
port = settings.config.agentgenerator_port
return f"http://{host}:{port}"
def _get_client() -> httpx.AsyncClient:
"""Get or create the HTTP client for the external service."""
global _client
if _client is None:
settings = _get_settings()
_client = httpx.AsyncClient(
base_url=_get_base_url(),
timeout=httpx.Timeout(settings.config.agentgenerator_timeout),
)
return _client
async def decompose_goal_external(
description: str, context: str = ""
) -> dict[str, Any] | None:
"""Call the external service to decompose a goal.
Args:
description: Natural language goal description
context: Additional context (e.g., answers to previous questions)
Returns:
Dict with either:
- {"type": "clarifying_questions", "questions": [...]}
- {"type": "instructions", "steps": [...]}
- {"type": "unachievable_goal", ...}
- {"type": "vague_goal", ...}
Or None on error
"""
client = _get_client()
# Build the request payload
payload: dict[str, Any] = {"description": description}
if context:
# The external service uses user_instruction for additional context
payload["user_instruction"] = context
try:
response = await client.post("/api/decompose-description", json=payload)
response.raise_for_status()
data = response.json()
if not data.get("success"):
logger.error(f"External service returned error: {data.get('error')}")
return None
# Map the response to the expected format
response_type = data.get("type")
if response_type == "instructions":
return {"type": "instructions", "steps": data.get("steps", [])}
elif response_type == "clarifying_questions":
return {
"type": "clarifying_questions",
"questions": data.get("questions", []),
}
elif response_type == "unachievable_goal":
return {
"type": "unachievable_goal",
"reason": data.get("reason"),
"suggested_goal": data.get("suggested_goal"),
}
elif response_type == "vague_goal":
return {
"type": "vague_goal",
"suggested_goal": data.get("suggested_goal"),
}
else:
logger.error(
f"Unknown response type from external service: {response_type}"
)
return None
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error calling external agent generator: {e}")
return None
except httpx.RequestError as e:
logger.error(f"Request error calling external agent generator: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error calling external agent generator: {e}")
return None
async def generate_agent_external(
instructions: dict[str, Any]
) -> dict[str, Any] | None:
"""Call the external service to generate an agent from instructions.
Args:
instructions: Structured instructions from decompose_goal
Returns:
Agent JSON dict or None on error
"""
client = _get_client()
try:
response = await client.post(
"/api/generate-agent", json={"instructions": instructions}
)
response.raise_for_status()
data = response.json()
if not data.get("success"):
logger.error(f"External service returned error: {data.get('error')}")
return None
return data.get("agent_json")
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error calling external agent generator: {e}")
return None
except httpx.RequestError as e:
logger.error(f"Request error calling external agent generator: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error calling external agent generator: {e}")
return None
async def generate_agent_patch_external(
update_request: str, current_agent: dict[str, Any]
) -> dict[str, Any] | None:
"""Call the external service to generate a patch for an existing agent.
Args:
update_request: Natural language description of changes
current_agent: Current agent JSON
Returns:
Updated agent JSON, clarifying questions dict, or None on error
"""
client = _get_client()
try:
response = await client.post(
"/api/update-agent",
json={
"update_request": update_request,
"current_agent_json": current_agent,
},
)
response.raise_for_status()
data = response.json()
if not data.get("success"):
logger.error(f"External service returned error: {data.get('error')}")
return None
# Check if it's clarifying questions
if data.get("type") == "clarifying_questions":
return {
"type": "clarifying_questions",
"questions": data.get("questions", []),
}
# Otherwise return the updated agent JSON
return data.get("agent_json")
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error calling external agent generator: {e}")
return None
except httpx.RequestError as e:
logger.error(f"Request error calling external agent generator: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error calling external agent generator: {e}")
return None
async def get_blocks_external() -> list[dict[str, Any]] | None:
"""Get available blocks from the external service.
Returns:
List of block info dicts or None on error
"""
client = _get_client()
try:
response = await client.get("/api/blocks")
response.raise_for_status()
data = response.json()
if not data.get("success"):
logger.error("External service returned error getting blocks")
return None
return data.get("blocks", [])
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error getting blocks from external service: {e}")
return None
except httpx.RequestError as e:
logger.error(f"Request error getting blocks from external service: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error getting blocks from external service: {e}")
return None
async def health_check() -> bool:
"""Check if the external service is healthy.
Returns:
True if healthy, False otherwise
"""
if not is_external_service_configured():
return False
client = _get_client()
try:
response = await client.get("/health")
response.raise_for_status()
data = response.json()
return data.get("status") == "healthy" and data.get("blocks_loaded", False)
except Exception as e:
logger.warning(f"External agent generator health check failed: {e}")
return False
async def close_client() -> None:
"""Close the HTTP client."""
global _client
if _client is not None:
await _client.aclose()
_client = None

View File

@@ -0,0 +1,213 @@
"""Utilities for agent generation."""
import json
import re
from typing import Any
from backend.data.block import get_blocks
# UUID validation regex
UUID_REGEX = re.compile(
r"^[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$"
)
# Block IDs for various fixes
STORE_VALUE_BLOCK_ID = "1ff065e9-88e8-4358-9d82-8dc91f622ba9"
CONDITION_BLOCK_ID = "715696a0-e1da-45c8-b209-c2fa9c3b0be6"
ADDTOLIST_BLOCK_ID = "aeb08fc1-2fc1-4141-bc8e-f758f183a822"
ADDTODICTIONARY_BLOCK_ID = "31d1064e-7446-4693-a7d4-65e5ca1180d1"
CREATELIST_BLOCK_ID = "a912d5c7-6e00-4542-b2a9-8034136930e4"
CREATEDICT_BLOCK_ID = "b924ddf4-de4f-4b56-9a85-358930dcbc91"
CODE_EXECUTION_BLOCK_ID = "0b02b072-abe7-11ef-8372-fb5d162dd712"
DATA_SAMPLING_BLOCK_ID = "4a448883-71fa-49cf-91cf-70d793bd7d87"
UNIVERSAL_TYPE_CONVERTER_BLOCK_ID = "95d1b990-ce13-4d88-9737-ba5c2070c97b"
GET_CURRENT_DATE_BLOCK_ID = "b29c1b50-5d0e-4d9f-8f9d-1b0e6fcbf0b1"
DOUBLE_CURLY_BRACES_BLOCK_IDS = [
"44f6c8ad-d75c-4ae1-8209-aad1c0326928", # FillTextTemplateBlock
"6ab085e2-20b3-4055-bc3e-08036e01eca6",
"90f8c45e-e983-4644-aa0b-b4ebe2f531bc",
"363ae599-353e-4804-937e-b2ee3cef3da4", # AgentOutputBlock
"3b191d9f-356f-482d-8238-ba04b6d18381",
"db7d8f02-2f44-4c55-ab7a-eae0941f0c30",
"3a7c4b8d-6e2f-4a5d-b9c1-f8d23c5a9b0e",
"ed1ae7a0-b770-4089-b520-1f0005fad19a",
"a892b8d9-3e4e-4e9c-9c1e-75f8efcf1bfa",
"b29c1b50-5d0e-4d9f-8f9d-1b0e6fcbf0b1",
"716a67b3-6760-42e7-86dc-18645c6e00fc",
"530cf046-2ce0-4854-ae2c-659db17c7a46",
"ed55ac19-356e-4243-a6cb-bc599e9b716f",
"1f292d4a-41a4-4977-9684-7c8d560b9f91", # LLM blocks
"32a87eab-381e-4dd4-bdb8-4c47151be35a",
]
def is_valid_uuid(value: str) -> bool:
"""Check if a string is a valid UUID v4."""
return isinstance(value, str) and UUID_REGEX.match(value) is not None
def _compact_schema(schema: dict) -> dict[str, str]:
"""Extract compact type info from a JSON schema properties dict.
Returns a dict of {field_name: type_string} for essential info only.
"""
props = schema.get("properties", {})
result = {}
for name, prop in props.items():
# Skip internal/complex fields
if name.startswith("_"):
continue
# Get type string
type_str = prop.get("type", "any")
# Handle anyOf/oneOf (optional types)
if "anyOf" in prop:
types = [t.get("type", "?") for t in prop["anyOf"] if t.get("type")]
type_str = "|".join(types) if types else "any"
elif "allOf" in prop:
type_str = "object"
# Add array item type if present
if type_str == "array" and "items" in prop:
items = prop["items"]
if isinstance(items, dict):
item_type = items.get("type", "any")
type_str = f"array[{item_type}]"
result[name] = type_str
return result
def get_block_summaries(include_schemas: bool = True) -> str:
"""Generate compact block summaries for prompts.
Args:
include_schemas: Whether to include input/output type info
Returns:
Formatted string of block summaries (compact format)
"""
blocks = get_blocks()
summaries = []
for block_id, block_cls in blocks.items():
block = block_cls()
name = block.name
desc = getattr(block, "description", "") or ""
# Truncate description
if len(desc) > 150:
desc = desc[:147] + "..."
if not include_schemas:
summaries.append(f"- {name} (id: {block_id}): {desc}")
else:
# Compact format with type info only
inputs = {}
outputs = {}
required = []
if hasattr(block, "input_schema"):
try:
schema = block.input_schema.jsonschema()
inputs = _compact_schema(schema)
required = schema.get("required", [])
except Exception:
pass
if hasattr(block, "output_schema"):
try:
schema = block.output_schema.jsonschema()
outputs = _compact_schema(schema)
except Exception:
pass
# Build compact line format
# Format: NAME (id): desc | in: {field:type, ...} [required] | out: {field:type}
in_str = ", ".join(f"{k}:{v}" for k, v in inputs.items())
out_str = ", ".join(f"{k}:{v}" for k, v in outputs.items())
req_str = f" req=[{','.join(required)}]" if required else ""
static = " [static]" if getattr(block, "static_output", False) else ""
line = f"- {name} (id: {block_id}): {desc}"
if in_str:
line += f"\n in: {{{in_str}}}{req_str}"
if out_str:
line += f"\n out: {{{out_str}}}{static}"
summaries.append(line)
return "\n".join(summaries)
def get_blocks_info() -> list[dict[str, Any]]:
"""Get block information with schemas for validation and fixing."""
blocks = get_blocks()
blocks_info = []
for block_id, block_cls in blocks.items():
block = block_cls()
blocks_info.append(
{
"id": block_id,
"name": block.name,
"description": getattr(block, "description", ""),
"categories": getattr(block, "categories", []),
"staticOutput": getattr(block, "static_output", False),
"inputSchema": (
block.input_schema.jsonschema()
if hasattr(block, "input_schema")
else {}
),
"outputSchema": (
block.output_schema.jsonschema()
if hasattr(block, "output_schema")
else {}
),
}
)
return blocks_info
def parse_json_from_llm(text: str) -> dict[str, Any] | None:
"""Extract JSON from LLM response (handles markdown code blocks)."""
if not text:
return None
# Try fenced code block
match = re.search(r"```(?:json)?\s*([\s\S]*?)```", text, re.IGNORECASE)
if match:
try:
return json.loads(match.group(1).strip())
except json.JSONDecodeError:
pass
# Try raw text
try:
return json.loads(text.strip())
except json.JSONDecodeError:
pass
# Try finding {...} span
start = text.find("{")
end = text.rfind("}")
if start != -1 and end > start:
try:
return json.loads(text[start : end + 1])
except json.JSONDecodeError:
pass
# Try finding [...] span
start = text.find("[")
end = text.rfind("]")
if start != -1 and end > start:
try:
return json.loads(text[start : end + 1])
except json.JSONDecodeError:
pass
return None

View File

@@ -0,0 +1,279 @@
"""Agent validator - Validates agent structure and connections."""
import logging
import re
from typing import Any
from .utils import get_blocks_info
logger = logging.getLogger(__name__)
class AgentValidator:
"""Validator for AutoGPT agents with detailed error reporting."""
def __init__(self):
self.errors: list[str] = []
def add_error(self, error: str) -> None:
"""Add an error message."""
self.errors.append(error)
def validate_block_existence(
self, agent: dict[str, Any], blocks_info: list[dict[str, Any]]
) -> bool:
"""Validate all block IDs exist in the blocks library."""
valid = True
valid_block_ids = {b.get("id") for b in blocks_info if b.get("id")}
for node in agent.get("nodes", []):
block_id = node.get("block_id")
node_id = node.get("id")
if not block_id:
self.add_error(f"Node '{node_id}' is missing 'block_id' field.")
valid = False
continue
if block_id not in valid_block_ids:
self.add_error(
f"Node '{node_id}' references block_id '{block_id}' which does not exist."
)
valid = False
return valid
def validate_link_node_references(self, agent: dict[str, Any]) -> bool:
"""Validate all node IDs referenced in links exist."""
valid = True
valid_node_ids = {n.get("id") for n in agent.get("nodes", []) if n.get("id")}
for link in agent.get("links", []):
link_id = link.get("id", "Unknown")
source_id = link.get("source_id")
sink_id = link.get("sink_id")
if not source_id:
self.add_error(f"Link '{link_id}' is missing 'source_id'.")
valid = False
elif source_id not in valid_node_ids:
self.add_error(
f"Link '{link_id}' references non-existent source_id '{source_id}'."
)
valid = False
if not sink_id:
self.add_error(f"Link '{link_id}' is missing 'sink_id'.")
valid = False
elif sink_id not in valid_node_ids:
self.add_error(
f"Link '{link_id}' references non-existent sink_id '{sink_id}'."
)
valid = False
return valid
def validate_required_inputs(
self, agent: dict[str, Any], blocks_info: list[dict[str, Any]]
) -> bool:
"""Validate required inputs are provided."""
valid = True
block_map = {b.get("id"): b for b in blocks_info}
for node in agent.get("nodes", []):
block_id = node.get("block_id")
block = block_map.get(block_id)
if not block:
continue
required_inputs = block.get("inputSchema", {}).get("required", [])
input_defaults = node.get("input_default", {})
node_id = node.get("id")
# Get linked inputs
linked_inputs = {
link["sink_name"]
for link in agent.get("links", [])
if link.get("sink_id") == node_id
}
for req_input in required_inputs:
if (
req_input not in input_defaults
and req_input not in linked_inputs
and req_input != "credentials"
):
block_name = block.get("name", "Unknown Block")
self.add_error(
f"Node '{node_id}' ({block_name}) is missing required input '{req_input}'."
)
valid = False
return valid
def validate_data_type_compatibility(
self, agent: dict[str, Any], blocks_info: list[dict[str, Any]]
) -> bool:
"""Validate linked data types are compatible."""
valid = True
block_map = {b.get("id"): b for b in blocks_info}
node_lookup = {n.get("id"): n for n in agent.get("nodes", [])}
def get_type(schema: dict, name: str) -> str | None:
if "_#_" in name:
parent, child = name.split("_#_", 1)
parent_schema = schema.get(parent, {})
if "properties" in parent_schema:
return parent_schema["properties"].get(child, {}).get("type")
return None
return schema.get(name, {}).get("type")
def are_compatible(src: str, sink: str) -> bool:
if {src, sink} <= {"integer", "number"}:
return True
return src == sink
for link in agent.get("links", []):
source_node = node_lookup.get(link.get("source_id"))
sink_node = node_lookup.get(link.get("sink_id"))
if not source_node or not sink_node:
continue
source_block = block_map.get(source_node.get("block_id"))
sink_block = block_map.get(sink_node.get("block_id"))
if not source_block or not sink_block:
continue
source_outputs = source_block.get("outputSchema", {}).get("properties", {})
sink_inputs = sink_block.get("inputSchema", {}).get("properties", {})
source_type = get_type(source_outputs, link.get("source_name", ""))
sink_type = get_type(sink_inputs, link.get("sink_name", ""))
if source_type and sink_type and not are_compatible(source_type, sink_type):
self.add_error(
f"Type mismatch: {source_block.get('name')} output '{link['source_name']}' "
f"({source_type}) -> {sink_block.get('name')} input '{link['sink_name']}' ({sink_type})."
)
valid = False
return valid
def validate_nested_sink_links(
self, agent: dict[str, Any], blocks_info: list[dict[str, Any]]
) -> bool:
"""Validate nested sink links (with _#_ notation)."""
valid = True
block_map = {b.get("id"): b for b in blocks_info}
node_lookup = {n.get("id"): n for n in agent.get("nodes", [])}
for link in agent.get("links", []):
sink_name = link.get("sink_name", "")
if "_#_" in sink_name:
parent, child = sink_name.split("_#_", 1)
sink_node = node_lookup.get(link.get("sink_id"))
if not sink_node:
continue
block = block_map.get(sink_node.get("block_id"))
if not block:
continue
input_props = block.get("inputSchema", {}).get("properties", {})
parent_schema = input_props.get(parent)
if not parent_schema:
self.add_error(
f"Invalid nested link '{sink_name}': parent '{parent}' not found."
)
valid = False
continue
if not parent_schema.get("additionalProperties"):
if not (
isinstance(parent_schema, dict)
and "properties" in parent_schema
and child in parent_schema.get("properties", {})
):
self.add_error(
f"Invalid nested link '{sink_name}': child '{child}' not found in '{parent}'."
)
valid = False
return valid
def validate_prompt_spaces(self, agent: dict[str, Any]) -> bool:
"""Validate prompts don't have spaces in template variables."""
valid = True
for node in agent.get("nodes", []):
input_default = node.get("input_default", {})
prompt = input_default.get("prompt", "")
if not isinstance(prompt, str):
continue
# Find {{...}} with spaces
matches = re.finditer(r"\{\{([^}]+)\}\}", prompt)
for match in matches:
content = match.group(1)
if " " in content:
self.add_error(
f"Node '{node.get('id')}' has spaces in template variable: "
f"'{{{{{content}}}}}' should be '{{{{{content.replace(' ', '_')}}}}}'."
)
valid = False
return valid
def validate(
self, agent: dict[str, Any], blocks_info: list[dict[str, Any]] | None = None
) -> tuple[bool, str | None]:
"""Run all validations.
Returns:
Tuple of (is_valid, error_message)
"""
self.errors = []
if blocks_info is None:
blocks_info = get_blocks_info()
checks = [
self.validate_block_existence(agent, blocks_info),
self.validate_link_node_references(agent),
self.validate_required_inputs(agent, blocks_info),
self.validate_data_type_compatibility(agent, blocks_info),
self.validate_nested_sink_links(agent, blocks_info),
self.validate_prompt_spaces(agent),
]
all_passed = all(checks)
if all_passed:
logger.info("Agent validation successful")
return True, None
error_message = "Agent validation failed:\n"
for i, error in enumerate(self.errors, 1):
error_message += f"{i}. {error}\n"
logger.warning(f"Agent validation failed with {len(self.errors)} errors")
return False, error_message
def validate_agent(
agent: dict[str, Any], blocks_info: list[dict[str, Any]] | None = None
) -> tuple[bool, str | None]:
"""Convenience function to validate an agent.
Returns:
Tuple of (is_valid, error_message)
"""
validator = AgentValidator()
return validator.validate(agent, blocks_info)

View File

@@ -8,10 +8,12 @@ from langfuse import observe
from backend.api.features.chat.model import ChatSession
from .agent_generator import (
AgentGeneratorNotConfiguredError,
apply_all_fixes,
decompose_goal,
generate_agent,
get_blocks_info,
save_agent_to_library,
validate_agent,
)
from .base import BaseTool
from .models import (
@@ -25,6 +27,9 @@ from .models import (
logger = logging.getLogger(__name__)
# Maximum retries for agent generation with validation feedback
MAX_GENERATION_RETRIES = 2
class CreateAgentTool(BaseTool):
"""Tool for creating agents from natural language descriptions."""
@@ -86,8 +91,9 @@ class CreateAgentTool(BaseTool):
Flow:
1. Decompose the description into steps (may return clarifying questions)
2. Generate agent JSON (external service handles fixing and validation)
3. Preview or save based on the save parameter
2. Generate agent JSON from the steps
3. Apply fixes to correct common LLM errors
4. Preview or save based on the save parameter
"""
description = kwargs.get("description", "").strip()
context = kwargs.get("context", "")
@@ -104,13 +110,11 @@ class CreateAgentTool(BaseTool):
# Step 1: Decompose goal into steps
try:
decomposition_result = await decompose_goal(description, context)
except AgentGeneratorNotConfiguredError:
except ValueError as e:
# Handle missing API key or configuration errors
return ErrorResponse(
message=(
"Agent generation is not available. "
"The Agent Generator service is not configured."
),
error="service_not_configured",
message=f"Agent generation is not configured: {str(e)}",
error="configuration_error",
session_id=session_id,
)
@@ -167,32 +171,72 @@ class CreateAgentTool(BaseTool):
session_id=session_id,
)
# Step 2: Generate agent JSON (external service handles fixing and validation)
try:
agent_json = await generate_agent(decomposition_result)
except AgentGeneratorNotConfiguredError:
return ErrorResponse(
message=(
"Agent generation is not available. "
"The Agent Generator service is not configured."
),
error="service_not_configured",
session_id=session_id,
# Step 2: Generate agent JSON with retry on validation failure
blocks_info = get_blocks_info()
agent_json = None
validation_errors = None
for attempt in range(MAX_GENERATION_RETRIES + 1):
# Generate agent (include validation errors from previous attempt)
if attempt == 0:
agent_json = await generate_agent(decomposition_result)
else:
# Retry with validation error feedback
logger.info(
f"Retry {attempt}/{MAX_GENERATION_RETRIES} with validation feedback"
)
retry_instructions = {
**decomposition_result,
"previous_errors": validation_errors,
"retry_instructions": (
"The previous generation had validation errors. "
"Please fix these issues in the new generation:\n"
f"{validation_errors}"
),
}
agent_json = await generate_agent(retry_instructions)
if agent_json is None:
if attempt == MAX_GENERATION_RETRIES:
return ErrorResponse(
message="Failed to generate the agent. Please try again.",
error="Generation failed",
session_id=session_id,
)
continue
# Step 3: Apply fixes to correct common errors
agent_json = apply_all_fixes(agent_json, blocks_info)
# Step 4: Validate the agent
is_valid, validation_errors = validate_agent(agent_json, blocks_info)
if is_valid:
logger.info(f"Agent generated successfully on attempt {attempt + 1}")
break
logger.warning(
f"Validation failed on attempt {attempt + 1}: {validation_errors}"
)
if agent_json is None:
return ErrorResponse(
message="Failed to generate the agent. Please try again.",
error="Generation failed",
session_id=session_id,
)
if attempt == MAX_GENERATION_RETRIES:
# Return error with validation details
return ErrorResponse(
message=(
f"Generated agent has validation errors after {MAX_GENERATION_RETRIES + 1} attempts. "
f"Please try rephrasing your request or simplify the workflow."
),
error="validation_failed",
details={"validation_errors": validation_errors},
session_id=session_id,
)
agent_name = agent_json.get("name", "Generated Agent")
agent_description = agent_json.get("description", "")
node_count = len(agent_json.get("nodes", []))
link_count = len(agent_json.get("links", []))
# Step 3: Preview or save
# Step 4: Preview or save
if not save:
return AgentPreviewResponse(
message=(

View File

@@ -8,10 +8,13 @@ from langfuse import observe
from backend.api.features.chat.model import ChatSession
from .agent_generator import (
AgentGeneratorNotConfiguredError,
apply_agent_patch,
apply_all_fixes,
generate_agent_patch,
get_agent_as_json,
get_blocks_info,
save_agent_to_library,
validate_agent,
)
from .base import BaseTool
from .models import (
@@ -25,6 +28,9 @@ from .models import (
logger = logging.getLogger(__name__)
# Maximum retries for patch generation with validation feedback
MAX_GENERATION_RETRIES = 2
class EditAgentTool(BaseTool):
"""Tool for editing existing agents using natural language."""
@@ -37,7 +43,7 @@ class EditAgentTool(BaseTool):
def description(self) -> str:
return (
"Edit an existing agent from the user's library using natural language. "
"Generates updates to the agent while preserving unchanged parts."
"Generates a patch to update the agent while preserving unchanged parts."
)
@property
@@ -92,8 +98,9 @@ class EditAgentTool(BaseTool):
Flow:
1. Fetch the current agent
2. Generate updated agent (external service handles fixing and validation)
3. Preview or save based on the save parameter
2. Generate a patch based on the requested changes
3. Apply the patch to create an updated agent
4. Preview or save based on the save parameter
"""
agent_id = kwargs.get("agent_id", "").strip()
changes = kwargs.get("changes", "").strip()
@@ -130,58 +137,121 @@ class EditAgentTool(BaseTool):
if context:
update_request = f"{changes}\n\nAdditional context:\n{context}"
# Step 2: Generate updated agent (external service handles fixing and validation)
try:
result = await generate_agent_patch(update_request, current_agent)
except AgentGeneratorNotConfiguredError:
return ErrorResponse(
message=(
"Agent editing is not available. "
"The Agent Generator service is not configured."
),
error="service_not_configured",
session_id=session_id,
)
# Step 2: Generate patch with retry on validation failure
blocks_info = get_blocks_info()
updated_agent = None
validation_errors = None
intent = "Applied requested changes"
if result is None:
return ErrorResponse(
message="Failed to generate changes. Please try rephrasing.",
error="Update generation failed",
session_id=session_id,
)
# Check if LLM returned clarifying questions
if result.get("type") == "clarifying_questions":
questions = result.get("questions", [])
return ClarificationNeededResponse(
message=(
"I need some more information about the changes. "
"Please answer the following questions:"
),
questions=[
ClarifyingQuestion(
question=q.get("question", ""),
keyword=q.get("keyword", ""),
example=q.get("example"),
for attempt in range(MAX_GENERATION_RETRIES + 1):
# Generate patch (include validation errors from previous attempt)
try:
if attempt == 0:
patch_result = await generate_agent_patch(
update_request, current_agent
)
for q in questions
],
session_id=session_id,
else:
# Retry with validation error feedback
logger.info(
f"Retry {attempt}/{MAX_GENERATION_RETRIES} with validation feedback"
)
retry_request = (
f"{update_request}\n\n"
f"IMPORTANT: The previous edit had validation errors. "
f"Please fix these issues:\n{validation_errors}"
)
patch_result = await generate_agent_patch(
retry_request, current_agent
)
except ValueError as e:
# Handle missing API key or configuration errors
return ErrorResponse(
message=f"Agent generation is not configured: {str(e)}",
error="configuration_error",
session_id=session_id,
)
if patch_result is None:
if attempt == MAX_GENERATION_RETRIES:
return ErrorResponse(
message="Failed to generate changes. Please try rephrasing.",
error="Patch generation failed",
session_id=session_id,
)
continue
# Check if LLM returned clarifying questions
if patch_result.get("type") == "clarifying_questions":
questions = patch_result.get("questions", [])
return ClarificationNeededResponse(
message=(
"I need some more information about the changes. "
"Please answer the following questions:"
),
questions=[
ClarifyingQuestion(
question=q.get("question", ""),
keyword=q.get("keyword", ""),
example=q.get("example"),
)
for q in questions
],
session_id=session_id,
)
# Step 3: Apply patch and fixes
try:
updated_agent = apply_agent_patch(current_agent, patch_result)
updated_agent = apply_all_fixes(updated_agent, blocks_info)
except Exception as e:
if attempt == MAX_GENERATION_RETRIES:
return ErrorResponse(
message=f"Failed to apply changes: {str(e)}",
error="patch_apply_failed",
details={"exception": str(e)},
session_id=session_id,
)
validation_errors = str(e)
continue
# Step 4: Validate the updated agent
is_valid, validation_errors = validate_agent(updated_agent, blocks_info)
if is_valid:
logger.info(f"Agent edited successfully on attempt {attempt + 1}")
intent = patch_result.get("intent", "Applied requested changes")
break
logger.warning(
f"Validation failed on attempt {attempt + 1}: {validation_errors}"
)
# Result is the updated agent JSON
updated_agent = result
if attempt == MAX_GENERATION_RETRIES:
# Return error with validation details
return ErrorResponse(
message=(
f"Updated agent has validation errors after "
f"{MAX_GENERATION_RETRIES + 1} attempts. "
f"Please try rephrasing your request or simplify the changes."
),
error="validation_failed",
details={"validation_errors": validation_errors},
session_id=session_id,
)
# At this point, updated_agent is guaranteed to be set (we return on all failure paths)
assert updated_agent is not None
agent_name = updated_agent.get("name", "Updated Agent")
agent_description = updated_agent.get("description", "")
node_count = len(updated_agent.get("nodes", []))
link_count = len(updated_agent.get("links", []))
# Step 3: Preview or save
# Step 5: Preview or save
if not save:
return AgentPreviewResponse(
message=(
f"I've updated the agent. "
f"I've updated the agent. Changes: {intent}. "
f"The agent now has {node_count} blocks. "
f"Review it and call edit_agent with save=true to save the changes."
),
@@ -207,7 +277,10 @@ class EditAgentTool(BaseTool):
)
return AgentSavedResponse(
message=f"Updated agent '{created_graph.name}' has been saved to your library!",
message=(
f"Updated agent '{created_graph.name}' has been saved to your library! "
f"Changes: {intent}"
),
agent_id=created_graph.id,
agent_name=created_graph.name,
library_agent_id=library_agent.id,

View File

@@ -29,7 +29,7 @@ def mock_embedding_functions():
yield
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.asyncio(scope="session")
async def test_run_agent(setup_test_data):
"""Test that the run_agent tool successfully executes an approved agent"""
# Use test data from fixture
@@ -70,7 +70,7 @@ async def test_run_agent(setup_test_data):
assert result_data["graph_name"] == "Test Agent"
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.asyncio(scope="session")
async def test_run_agent_missing_inputs(setup_test_data):
"""Test that the run_agent tool returns error when inputs are missing"""
# Use test data from fixture
@@ -106,7 +106,7 @@ async def test_run_agent_missing_inputs(setup_test_data):
assert "message" in result_data
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.asyncio(scope="session")
async def test_run_agent_invalid_agent_id(setup_test_data):
"""Test that the run_agent tool returns error for invalid agent ID"""
# Use test data from fixture
@@ -141,7 +141,7 @@ async def test_run_agent_invalid_agent_id(setup_test_data):
)
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.asyncio(scope="session")
async def test_run_agent_with_llm_credentials(setup_llm_test_data):
"""Test that run_agent works with an agent requiring LLM credentials"""
# Use test data from fixture
@@ -185,7 +185,7 @@ async def test_run_agent_with_llm_credentials(setup_llm_test_data):
assert result_data["graph_name"] == "LLM Test Agent"
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.asyncio(scope="session")
async def test_run_agent_shows_available_inputs_when_none_provided(setup_test_data):
"""Test that run_agent returns available inputs when called without inputs or use_defaults."""
user = setup_test_data["user"]
@@ -219,7 +219,7 @@ async def test_run_agent_shows_available_inputs_when_none_provided(setup_test_da
assert "inputs" in result_data["message"].lower()
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.asyncio(scope="session")
async def test_run_agent_with_use_defaults(setup_test_data):
"""Test that run_agent executes successfully with use_defaults=True."""
user = setup_test_data["user"]
@@ -251,7 +251,7 @@ async def test_run_agent_with_use_defaults(setup_test_data):
assert result_data["graph_id"] == graph.id
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.asyncio(scope="session")
async def test_run_agent_missing_credentials(setup_firecrawl_test_data):
"""Test that run_agent returns setup_requirements when credentials are missing."""
user = setup_firecrawl_test_data["user"]
@@ -285,7 +285,7 @@ async def test_run_agent_missing_credentials(setup_firecrawl_test_data):
assert len(setup_info["user_readiness"]["missing_credentials"]) > 0
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.asyncio(scope="session")
async def test_run_agent_invalid_slug_format(setup_test_data):
"""Test that run_agent returns error for invalid slug format (no slash)."""
user = setup_test_data["user"]
@@ -313,7 +313,7 @@ async def test_run_agent_invalid_slug_format(setup_test_data):
assert "username/agent-name" in result_data["message"]
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.asyncio(scope="session")
async def test_run_agent_unauthenticated():
"""Test that run_agent returns need_login for unauthenticated users."""
tool = RunAgentTool()
@@ -340,7 +340,7 @@ async def test_run_agent_unauthenticated():
assert "sign in" in result_data["message"].lower()
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.asyncio(scope="session")
async def test_run_agent_schedule_without_cron(setup_test_data):
"""Test that run_agent returns error when scheduling without cron expression."""
user = setup_test_data["user"]
@@ -372,7 +372,7 @@ async def test_run_agent_schedule_without_cron(setup_test_data):
assert "cron" in result_data["message"].lower()
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.asyncio(scope="session")
async def test_run_agent_schedule_without_name(setup_test_data):
"""Test that run_agent returns error when scheduling without schedule_name."""
user = setup_test_data["user"]

View File

@@ -23,7 +23,6 @@ class PendingHumanReviewModel(BaseModel):
id: Unique identifier for the review record
user_id: ID of the user who must perform the review
node_exec_id: ID of the node execution that created this review
node_id: ID of the node definition (for grouping reviews from same node)
graph_exec_id: ID of the graph execution containing the node
graph_id: ID of the graph template being executed
graph_version: Version number of the graph template
@@ -38,10 +37,6 @@ class PendingHumanReviewModel(BaseModel):
"""
node_exec_id: str = Field(description="Node execution ID (primary key)")
node_id: str = Field(
description="Node definition ID (for grouping)",
default="", # Temporary default for test compatibility
)
user_id: str = Field(description="User ID associated with the review")
graph_exec_id: str = Field(description="Graph execution ID")
graph_id: str = Field(description="Graph ID")
@@ -71,9 +66,7 @@ class PendingHumanReviewModel(BaseModel):
)
@classmethod
def from_db(
cls, review: "PendingHumanReview", node_id: str
) -> "PendingHumanReviewModel":
def from_db(cls, review: "PendingHumanReview") -> "PendingHumanReviewModel":
"""
Convert a database model to a response model.
@@ -81,14 +74,9 @@ class PendingHumanReviewModel(BaseModel):
payload, instructions, and editable flag.
Handles invalid data gracefully by using safe defaults.
Args:
review: Database review object
node_id: Node definition ID (fetched from NodeExecution)
"""
return cls(
node_exec_id=review.nodeExecId,
node_id=node_id,
user_id=review.userId,
graph_exec_id=review.graphExecId,
graph_id=review.graphId,
@@ -119,13 +107,6 @@ class ReviewItem(BaseModel):
reviewed_data: SafeJsonData | None = Field(
None, description="Optional edited data (ignored if approved=False)"
)
auto_approve_future: bool = Field(
default=False,
description=(
"If true and this review is approved, future executions of this same "
"block (node) will be automatically approved. This only affects approved reviews."
),
)
@field_validator("reviewed_data")
@classmethod
@@ -193,9 +174,6 @@ class ReviewRequest(BaseModel):
This request must include ALL pending reviews for a graph execution.
Each review will be either approved (with optional data modifications)
or rejected (data ignored). The execution will resume only after ALL reviews are processed.
Each review item can individually specify whether to auto-approve future executions
of the same block via the `auto_approve_future` field on ReviewItem.
"""
reviews: List[ReviewItem] = Field(

View File

@@ -1,27 +1,17 @@
import asyncio
import logging
from typing import Any, List
from typing import List
import autogpt_libs.auth as autogpt_auth_lib
from fastapi import APIRouter, HTTPException, Query, Security, status
from prisma.enums import ReviewStatus
from backend.data.execution import (
ExecutionContext,
ExecutionStatus,
get_graph_execution_meta,
)
from backend.data.graph import get_graph_settings
from backend.data.execution import get_graph_execution_meta
from backend.data.human_review import (
create_auto_approval_record,
get_pending_reviews_by_node_exec_ids,
get_pending_reviews_for_execution,
get_pending_reviews_for_user,
has_pending_reviews_for_graph_exec,
process_all_reviews_for_execution,
)
from backend.data.model import USER_TIMEZONE_NOT_SET
from backend.data.user import get_user_by_id
from backend.executor.utils import add_graph_execution
from .model import PendingHumanReviewModel, ReviewRequest, ReviewResponse
@@ -137,70 +127,17 @@ async def process_review_action(
detail="At least one review must be provided",
)
# Batch fetch all requested reviews
reviews_map = await get_pending_reviews_by_node_exec_ids(
list(all_request_node_ids), user_id
)
# Validate all reviews were found
missing_ids = all_request_node_ids - set(reviews_map.keys())
if missing_ids:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"No pending review found for node execution(s): {', '.join(missing_ids)}",
)
# Validate all reviews belong to the same execution
graph_exec_ids = {review.graph_exec_id for review in reviews_map.values()}
if len(graph_exec_ids) > 1:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="All reviews in a single request must belong to the same execution.",
)
graph_exec_id = next(iter(graph_exec_ids))
# Validate execution status before processing reviews
graph_exec_meta = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec_meta:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Graph execution #{graph_exec_id} not found",
)
# Only allow processing reviews if execution is paused for review
# or incomplete (partial execution with some reviews already processed)
if graph_exec_meta.status not in (
ExecutionStatus.REVIEW,
ExecutionStatus.INCOMPLETE,
):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"Cannot process reviews while execution status is {graph_exec_meta.status}. "
f"Reviews can only be processed when execution is paused (REVIEW status). "
f"Current status: {graph_exec_meta.status}",
)
# Build review decisions map and track which reviews requested auto-approval
# Auto-approved reviews use original data (no modifications allowed)
# Build review decisions map
review_decisions = {}
auto_approve_requests = {} # Map node_exec_id -> auto_approve_future flag
for review in request.reviews:
review_status = (
ReviewStatus.APPROVED if review.approved else ReviewStatus.REJECTED
)
# If this review requested auto-approval, don't allow data modifications
reviewed_data = None if review.auto_approve_future else review.reviewed_data
review_decisions[review.node_exec_id] = (
review_status,
reviewed_data,
review.reviewed_data,
review.message,
)
auto_approve_requests[review.node_exec_id] = review.auto_approve_future
# Process all reviews
updated_reviews = await process_all_reviews_for_execution(
@@ -208,87 +145,6 @@ async def process_review_action(
review_decisions=review_decisions,
)
# Create auto-approval records for approved reviews that requested it
# Deduplicate by node_id to avoid race conditions when multiple reviews
# for the same node are processed in parallel
async def create_auto_approval_for_node(
node_id: str, review_result
) -> tuple[str, bool]:
"""
Create auto-approval record for a node.
Returns (node_id, success) tuple for tracking failures.
"""
try:
await create_auto_approval_record(
user_id=user_id,
graph_exec_id=review_result.graph_exec_id,
graph_id=review_result.graph_id,
graph_version=review_result.graph_version,
node_id=node_id,
payload=review_result.payload,
)
return (node_id, True)
except Exception as e:
logger.error(
f"Failed to create auto-approval record for node {node_id}",
exc_info=e,
)
return (node_id, False)
# Collect node_exec_ids that need auto-approval
node_exec_ids_needing_auto_approval = [
node_exec_id
for node_exec_id, review_result in updated_reviews.items()
if review_result.status == ReviewStatus.APPROVED
and auto_approve_requests.get(node_exec_id, False)
]
# Batch-fetch node executions to get node_ids
nodes_needing_auto_approval: dict[str, Any] = {}
if node_exec_ids_needing_auto_approval:
from backend.data.execution import get_node_executions
node_execs = await get_node_executions(
graph_exec_id=graph_exec_id, include_exec_data=False
)
node_exec_map = {node_exec.node_exec_id: node_exec for node_exec in node_execs}
for node_exec_id in node_exec_ids_needing_auto_approval:
node_exec = node_exec_map.get(node_exec_id)
if node_exec:
review_result = updated_reviews[node_exec_id]
# Use the first approved review for this node (deduplicate by node_id)
if node_exec.node_id not in nodes_needing_auto_approval:
nodes_needing_auto_approval[node_exec.node_id] = review_result
else:
logger.error(
f"Failed to create auto-approval record for {node_exec_id}: "
f"Node execution not found. This may indicate a race condition "
f"or data inconsistency."
)
# Execute all auto-approval creations in parallel (deduplicated by node_id)
auto_approval_results = await asyncio.gather(
*[
create_auto_approval_for_node(node_id, review_result)
for node_id, review_result in nodes_needing_auto_approval.items()
],
return_exceptions=True,
)
# Count auto-approval failures
auto_approval_failed_count = 0
for result in auto_approval_results:
if isinstance(result, Exception):
# Unexpected exception during auto-approval creation
auto_approval_failed_count += 1
logger.error(
f"Unexpected exception during auto-approval creation: {result}"
)
elif isinstance(result, tuple) and len(result) == 2 and not result[1]:
# Auto-approval creation failed (returned False)
auto_approval_failed_count += 1
# Count results
approved_count = sum(
1
@@ -301,53 +157,30 @@ async def process_review_action(
if review.status == ReviewStatus.REJECTED
)
# Resume execution only if ALL pending reviews for this execution have been processed
# Resume execution if we processed some reviews
if updated_reviews:
# Get graph execution ID from any processed review
first_review = next(iter(updated_reviews.values()))
graph_exec_id = first_review.graph_exec_id
# Check if any pending reviews remain for this execution
still_has_pending = await has_pending_reviews_for_graph_exec(graph_exec_id)
if not still_has_pending:
# Get the graph_id from any processed review
first_review = next(iter(updated_reviews.values()))
# Resume execution
try:
# Fetch user and settings to build complete execution context
user = await get_user_by_id(user_id)
settings = await get_graph_settings(
user_id=user_id, graph_id=first_review.graph_id
)
# Preserve user's timezone preference when resuming execution
user_timezone = (
user.timezone if user.timezone != USER_TIMEZONE_NOT_SET else "UTC"
)
execution_context = ExecutionContext(
human_in_the_loop_safe_mode=settings.human_in_the_loop_safe_mode,
sensitive_action_safe_mode=settings.sensitive_action_safe_mode,
user_timezone=user_timezone,
)
await add_graph_execution(
graph_id=first_review.graph_id,
user_id=user_id,
graph_exec_id=graph_exec_id,
execution_context=execution_context,
)
logger.info(f"Resumed execution {graph_exec_id}")
except Exception as e:
logger.error(f"Failed to resume execution {graph_exec_id}: {str(e)}")
# Build error message if auto-approvals failed
error_message = None
if auto_approval_failed_count > 0:
error_message = (
f"{auto_approval_failed_count} auto-approval setting(s) could not be saved. "
f"You may need to manually approve these reviews in future executions."
)
return ReviewResponse(
approved_count=approved_count,
rejected_count=rejected_count,
failed_count=auto_approval_failed_count,
error=error_message,
failed_count=0,
error=None,
)

View File

@@ -583,13 +583,7 @@ async def update_library_agent(
)
update_fields["isDeleted"] = is_deleted
if settings is not None:
existing_agent = await get_library_agent(id=library_agent_id, user_id=user_id)
current_settings_dict = (
existing_agent.settings.model_dump() if existing_agent.settings else {}
)
new_settings = settings.model_dump(exclude_unset=True)
merged_settings = {**current_settings_dict, **new_settings}
update_fields["settings"] = SafeJson(merged_settings)
update_fields["settings"] = SafeJson(settings.model_dump())
try:
# If graph_version is provided, update to that specific version

View File

@@ -20,7 +20,6 @@ from typing import AsyncGenerator
import httpx
import pytest
import pytest_asyncio
from autogpt_libs.api_key.keysmith import APIKeySmith
from prisma.enums import APIKeyPermission
from prisma.models import OAuthAccessToken as PrismaOAuthAccessToken
@@ -39,13 +38,13 @@ keysmith = APIKeySmith()
# ============================================================================
@pytest.fixture(scope="session")
@pytest.fixture
def test_user_id() -> str:
"""Test user ID for OAuth tests."""
return str(uuid.uuid4())
@pytest_asyncio.fixture(scope="session", loop_scope="session")
@pytest.fixture
async def test_user(server, test_user_id: str):
"""Create a test user in the database."""
await PrismaUser.prisma().create(
@@ -68,7 +67,7 @@ async def test_user(server, test_user_id: str):
await PrismaUser.prisma().delete(where={"id": test_user_id})
@pytest_asyncio.fixture
@pytest.fixture
async def test_oauth_app(test_user: str):
"""Create a test OAuth application in the database."""
app_id = str(uuid.uuid4())
@@ -123,7 +122,7 @@ def pkce_credentials() -> tuple[str, str]:
return generate_pkce()
@pytest_asyncio.fixture
@pytest.fixture
async def client(server, test_user: str) -> AsyncGenerator[httpx.AsyncClient, None]:
"""
Create an async HTTP client that talks directly to the FastAPI app.
@@ -288,7 +287,7 @@ async def test_authorize_invalid_client_returns_error(
assert query_params["error"][0] == "invalid_client"
@pytest_asyncio.fixture
@pytest.fixture
async def inactive_oauth_app(test_user: str):
"""Create an inactive test OAuth application in the database."""
app_id = str(uuid.uuid4())
@@ -1005,7 +1004,7 @@ async def test_token_refresh_revoked(
assert "revoked" in response.json()["detail"].lower()
@pytest_asyncio.fixture
@pytest.fixture
async def other_oauth_app(test_user: str):
"""Create a second OAuth application for cross-app tests."""
app_id = str(uuid.uuid4())

View File

@@ -1552,7 +1552,7 @@ async def review_store_submission(
# Generate embedding for approved listing (blocking - admin operation)
# Inside transaction: if embedding fails, entire transaction rolls back
await ensure_embedding(
embedding_success = await ensure_embedding(
version_id=store_listing_version_id,
name=store_listing_version.name,
description=store_listing_version.description,
@@ -1560,6 +1560,12 @@ async def review_store_submission(
categories=store_listing_version.categories or [],
tx=tx,
)
if not embedding_success:
raise ValueError(
f"Failed to generate embedding for listing {store_listing_version_id}. "
"This is likely due to OpenAI API being unavailable. "
"Please try again later or contact support if the issue persists."
)
await prisma.models.StoreListing.prisma(tx).update(
where={"id": store_listing_version.StoreListing.id},

View File

@@ -21,6 +21,7 @@ from backend.util.json import dumps
logger = logging.getLogger(__name__)
# OpenAI embedding model configuration
EMBEDDING_MODEL = "text-embedding-3-small"
# Embedding dimension for the model above
@@ -62,42 +63,49 @@ def build_searchable_text(
return " ".join(parts)
async def generate_embedding(text: str) -> list[float]:
async def generate_embedding(text: str) -> list[float] | None:
"""
Generate embedding for text using OpenAI API.
Raises exceptions on failure - caller should handle.
Returns None if embedding generation fails.
Fail-fast: no retries to maintain consistency with approval flow.
"""
client = get_openai_client()
if not client:
raise RuntimeError("openai_internal_api_key not set, cannot generate embedding")
try:
client = get_openai_client()
if not client:
logger.error("openai_internal_api_key not set, cannot generate embedding")
return None
# Truncate text to token limit using tiktoken
# Character-based truncation is insufficient because token ratios vary by content type
enc = encoding_for_model(EMBEDDING_MODEL)
tokens = enc.encode(text)
if len(tokens) > EMBEDDING_MAX_TOKENS:
tokens = tokens[:EMBEDDING_MAX_TOKENS]
truncated_text = enc.decode(tokens)
logger.info(
f"Truncated text from {len(enc.encode(text))} to {len(tokens)} tokens"
# Truncate text to token limit using tiktoken
# Character-based truncation is insufficient because token ratios vary by content type
enc = encoding_for_model(EMBEDDING_MODEL)
tokens = enc.encode(text)
if len(tokens) > EMBEDDING_MAX_TOKENS:
tokens = tokens[:EMBEDDING_MAX_TOKENS]
truncated_text = enc.decode(tokens)
logger.info(
f"Truncated text from {len(enc.encode(text))} to {len(tokens)} tokens"
)
else:
truncated_text = text
start_time = time.time()
response = await client.embeddings.create(
model=EMBEDDING_MODEL,
input=truncated_text,
)
else:
truncated_text = text
latency_ms = (time.time() - start_time) * 1000
start_time = time.time()
response = await client.embeddings.create(
model=EMBEDDING_MODEL,
input=truncated_text,
)
latency_ms = (time.time() - start_time) * 1000
embedding = response.data[0].embedding
logger.info(
f"Generated embedding: {len(embedding)} dims, "
f"{len(tokens)} tokens, {latency_ms:.0f}ms"
)
return embedding
embedding = response.data[0].embedding
logger.info(
f"Generated embedding: {len(embedding)} dims, "
f"{len(tokens)} tokens, {latency_ms:.0f}ms"
)
return embedding
except Exception as e:
logger.error(f"Failed to generate embedding: {e}")
return None
async def store_embedding(
@@ -136,45 +144,48 @@ async def store_content_embedding(
New function for unified content embedding storage.
Uses raw SQL since Prisma doesn't natively support pgvector.
Raises exceptions on failure - caller should handle.
"""
client = tx if tx else prisma.get_client()
try:
client = tx if tx else prisma.get_client()
# Convert embedding to PostgreSQL vector format
embedding_str = embedding_to_vector_string(embedding)
metadata_json = dumps(metadata or {})
# Convert embedding to PostgreSQL vector format
embedding_str = embedding_to_vector_string(embedding)
metadata_json = dumps(metadata or {})
# Upsert the embedding
# WHERE clause in DO UPDATE prevents PostgreSQL 15 bug with NULLS NOT DISTINCT
# Use unqualified ::vector - pgvector is in search_path on all environments
await execute_raw_with_schema(
"""
INSERT INTO {schema_prefix}"UnifiedContentEmbedding" (
"id", "contentType", "contentId", "userId", "embedding", "searchableText", "metadata", "createdAt", "updatedAt"
# Upsert the embedding
# WHERE clause in DO UPDATE prevents PostgreSQL 15 bug with NULLS NOT DISTINCT
# Use unqualified ::vector - pgvector is in search_path on all environments
await execute_raw_with_schema(
"""
INSERT INTO {schema_prefix}"UnifiedContentEmbedding" (
"id", "contentType", "contentId", "userId", "embedding", "searchableText", "metadata", "createdAt", "updatedAt"
)
VALUES (gen_random_uuid()::text, $1::{schema_prefix}"ContentType", $2, $3, $4::vector, $5, $6::jsonb, NOW(), NOW())
ON CONFLICT ("contentType", "contentId", "userId")
DO UPDATE SET
"embedding" = $4::vector,
"searchableText" = $5,
"metadata" = $6::jsonb,
"updatedAt" = NOW()
WHERE {schema_prefix}"UnifiedContentEmbedding"."contentType" = $1::{schema_prefix}"ContentType"
AND {schema_prefix}"UnifiedContentEmbedding"."contentId" = $2
AND ({schema_prefix}"UnifiedContentEmbedding"."userId" = $3 OR ($3 IS NULL AND {schema_prefix}"UnifiedContentEmbedding"."userId" IS NULL))
""",
content_type,
content_id,
user_id,
embedding_str,
searchable_text,
metadata_json,
client=client,
)
VALUES (gen_random_uuid()::text, $1::{schema_prefix}"ContentType", $2, $3, $4::vector, $5, $6::jsonb, NOW(), NOW())
ON CONFLICT ("contentType", "contentId", "userId")
DO UPDATE SET
"embedding" = $4::vector,
"searchableText" = $5,
"metadata" = $6::jsonb,
"updatedAt" = NOW()
WHERE {schema_prefix}"UnifiedContentEmbedding"."contentType" = $1::{schema_prefix}"ContentType"
AND {schema_prefix}"UnifiedContentEmbedding"."contentId" = $2
AND ({schema_prefix}"UnifiedContentEmbedding"."userId" = $3 OR ($3 IS NULL AND {schema_prefix}"UnifiedContentEmbedding"."userId" IS NULL))
""",
content_type,
content_id,
user_id,
embedding_str,
searchable_text,
metadata_json,
client=client,
)
logger.info(f"Stored embedding for {content_type}:{content_id}")
return True
logger.info(f"Stored embedding for {content_type}:{content_id}")
return True
except Exception as e:
logger.error(f"Failed to store embedding for {content_type}:{content_id}: {e}")
return False
async def get_embedding(version_id: str) -> dict[str, Any] | None:
@@ -206,31 +217,34 @@ async def get_content_embedding(
New function for unified content embedding retrieval.
Returns dict with contentType, contentId, embedding, timestamps or None if not found.
Raises exceptions on failure - caller should handle.
"""
result = await query_raw_with_schema(
"""
SELECT
"contentType",
"contentId",
"userId",
"embedding"::text as "embedding",
"searchableText",
"metadata",
"createdAt",
"updatedAt"
FROM {schema_prefix}"UnifiedContentEmbedding"
WHERE "contentType" = $1::{schema_prefix}"ContentType" AND "contentId" = $2 AND ("userId" = $3 OR ($3 IS NULL AND "userId" IS NULL))
""",
content_type,
content_id,
user_id,
)
try:
result = await query_raw_with_schema(
"""
SELECT
"contentType",
"contentId",
"userId",
"embedding"::text as "embedding",
"searchableText",
"metadata",
"createdAt",
"updatedAt"
FROM {schema_prefix}"UnifiedContentEmbedding"
WHERE "contentType" = $1::{schema_prefix}"ContentType" AND "contentId" = $2 AND ("userId" = $3 OR ($3 IS NULL AND "userId" IS NULL))
""",
content_type,
content_id,
user_id,
)
if result and len(result) > 0:
return result[0]
return None
if result and len(result) > 0:
return result[0]
return None
except Exception as e:
logger.error(f"Failed to get embedding for {content_type}:{content_id}: {e}")
return None
async def ensure_embedding(
@@ -258,38 +272,46 @@ async def ensure_embedding(
tx: Optional transaction client
Returns:
True if embedding exists/was created
Raises exceptions on failure - caller should handle.
True if embedding exists/was created, False on failure
"""
# Check if embedding already exists
if not force:
existing = await get_embedding(version_id)
if existing and existing.get("embedding"):
logger.debug(f"Embedding for version {version_id} already exists")
return True
try:
# Check if embedding already exists
if not force:
existing = await get_embedding(version_id)
if existing and existing.get("embedding"):
logger.debug(f"Embedding for version {version_id} already exists")
return True
# Build searchable text for embedding
searchable_text = build_searchable_text(name, description, sub_heading, categories)
# Build searchable text for embedding
searchable_text = build_searchable_text(
name, description, sub_heading, categories
)
# Generate new embedding
embedding = await generate_embedding(searchable_text)
# Generate new embedding
embedding = await generate_embedding(searchable_text)
if embedding is None:
logger.warning(f"Could not generate embedding for version {version_id}")
return False
# Store the embedding with metadata using new function
metadata = {
"name": name,
"subHeading": sub_heading,
"categories": categories,
}
return await store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id=version_id,
embedding=embedding,
searchable_text=searchable_text,
metadata=metadata,
user_id=None, # Store agents are public
tx=tx,
)
# Store the embedding with metadata using new function
metadata = {
"name": name,
"subHeading": sub_heading,
"categories": categories,
}
return await store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id=version_id,
embedding=embedding,
searchable_text=searchable_text,
metadata=metadata,
user_id=None, # Store agents are public
tx=tx,
)
except Exception as e:
logger.error(f"Failed to ensure embedding for version {version_id}: {e}")
return False
async def delete_embedding(version_id: str) -> bool:
@@ -499,24 +521,6 @@ async def backfill_all_content_types(batch_size: int = 10) -> dict[str, Any]:
success = sum(1 for result in results if result is True)
failed = len(results) - success
# Aggregate unique errors to avoid Sentry spam
if failed > 0:
# Group errors by type and message
error_summary: dict[str, int] = {}
for result in results:
if isinstance(result, Exception):
error_key = f"{type(result).__name__}: {str(result)}"
error_summary[error_key] = error_summary.get(error_key, 0) + 1
# Log aggregated error summary
error_details = ", ".join(
f"{error} ({count}x)" for error, count in error_summary.items()
)
logger.error(
f"{content_type.value}: {failed}/{len(results)} embeddings failed. "
f"Errors: {error_details}"
)
results_by_type[content_type.value] = {
"processed": len(missing_items),
"success": success,
@@ -553,12 +557,11 @@ async def backfill_all_content_types(batch_size: int = 10) -> dict[str, Any]:
}
async def embed_query(query: str) -> list[float]:
async def embed_query(query: str) -> list[float] | None:
"""
Generate embedding for a search query.
Same as generate_embedding but with clearer intent.
Raises exceptions on failure - caller should handle.
"""
return await generate_embedding(query)
@@ -591,30 +594,40 @@ async def ensure_content_embedding(
tx: Optional transaction client
Returns:
True if embedding exists/was created
Raises exceptions on failure - caller should handle.
True if embedding exists/was created, False on failure
"""
# Check if embedding already exists
if not force:
existing = await get_content_embedding(content_type, content_id, user_id)
if existing and existing.get("embedding"):
logger.debug(f"Embedding for {content_type}:{content_id} already exists")
return True
try:
# Check if embedding already exists
if not force:
existing = await get_content_embedding(content_type, content_id, user_id)
if existing and existing.get("embedding"):
logger.debug(
f"Embedding for {content_type}:{content_id} already exists"
)
return True
# Generate new embedding
embedding = await generate_embedding(searchable_text)
# Generate new embedding
embedding = await generate_embedding(searchable_text)
if embedding is None:
logger.warning(
f"Could not generate embedding for {content_type}:{content_id}"
)
return False
# Store the embedding
return await store_content_embedding(
content_type=content_type,
content_id=content_id,
embedding=embedding,
searchable_text=searchable_text,
metadata=metadata or {},
user_id=user_id,
tx=tx,
)
# Store the embedding
return await store_content_embedding(
content_type=content_type,
content_id=content_id,
embedding=embedding,
searchable_text=searchable_text,
metadata=metadata or {},
user_id=user_id,
tx=tx,
)
except Exception as e:
logger.error(f"Failed to ensure embedding for {content_type}:{content_id}: {e}")
return False
async def cleanup_orphaned_embeddings() -> dict[str, Any]:
@@ -841,8 +854,9 @@ async def semantic_search(
limit = 100
# Generate query embedding
try:
query_embedding = await embed_query(query)
query_embedding = await embed_query(query)
if query_embedding is not None:
# Semantic search with embeddings
embedding_str = embedding_to_vector_string(query_embedding)
@@ -893,21 +907,24 @@ async def semantic_search(
"""
)
results = await query_raw_with_schema(sql, *params)
return [
{
"content_id": row["content_id"],
"content_type": row["content_type"],
"searchable_text": row["searchable_text"],
"metadata": row["metadata"],
"similarity": float(row["similarity"]),
}
for row in results
]
except Exception as e:
logger.warning(f"Semantic search failed, falling back to lexical search: {e}")
try:
results = await query_raw_with_schema(sql, *params)
return [
{
"content_id": row["content_id"],
"content_type": row["content_type"],
"searchable_text": row["searchable_text"],
"metadata": row["metadata"],
"similarity": float(row["similarity"]),
}
for row in results
]
except Exception as e:
logger.error(f"Semantic search failed: {e}")
# Fall through to lexical search below
# Fallback to lexical search if embeddings unavailable
logger.warning("Falling back to lexical search (embeddings unavailable)")
params_lexical: list[Any] = [limit]
user_filter = ""

View File

@@ -298,16 +298,17 @@ async def test_schema_handling_error_cases():
mock_client.execute_raw.side_effect = Exception("Database error")
mock_get_client.return_value = mock_client
# Should raise exception on error
with pytest.raises(Exception, match="Database error"):
await embeddings.store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id="test-id",
embedding=[0.1] * EMBEDDING_DIM,
searchable_text="test",
metadata=None,
user_id=None,
)
result = await embeddings.store_content_embedding(
content_type=ContentType.STORE_AGENT,
content_id="test-id",
embedding=[0.1] * EMBEDDING_DIM,
searchable_text="test",
metadata=None,
user_id=None,
)
# Should return False on error, not raise
assert result is False
if __name__ == "__main__":

View File

@@ -80,8 +80,9 @@ async def test_generate_embedding_no_api_key():
) as mock_get_client:
mock_get_client.return_value = None
with pytest.raises(RuntimeError, match="openai_internal_api_key not set"):
await embeddings.generate_embedding("test text")
result = await embeddings.generate_embedding("test text")
assert result is None
@pytest.mark.asyncio(loop_scope="session")
@@ -96,8 +97,9 @@ async def test_generate_embedding_api_error():
) as mock_get_client:
mock_get_client.return_value = mock_client
with pytest.raises(Exception, match="API Error"):
await embeddings.generate_embedding("test text")
result = await embeddings.generate_embedding("test text")
assert result is None
@pytest.mark.asyncio(loop_scope="session")
@@ -171,10 +173,11 @@ async def test_store_embedding_database_error(mocker):
embedding = [0.1, 0.2, 0.3]
with pytest.raises(Exception, match="Database error"):
await embeddings.store_embedding(
version_id="test-version-id", embedding=embedding, tx=mock_client
)
result = await embeddings.store_embedding(
version_id="test-version-id", embedding=embedding, tx=mock_client
)
assert result is False
@pytest.mark.asyncio(loop_scope="session")
@@ -274,16 +277,17 @@ async def test_ensure_embedding_create_new(mock_get, mock_store, mock_generate):
async def test_ensure_embedding_generation_fails(mock_get, mock_generate):
"""Test ensure_embedding when generation fails."""
mock_get.return_value = None
mock_generate.side_effect = Exception("Generation failed")
mock_generate.return_value = None
with pytest.raises(Exception, match="Generation failed"):
await embeddings.ensure_embedding(
version_id="test-id",
name="Test",
description="Test description",
sub_heading="Test heading",
categories=["test"],
)
result = await embeddings.ensure_embedding(
version_id="test-id",
name="Test",
description="Test description",
sub_heading="Test heading",
categories=["test"],
)
assert result is False
@pytest.mark.asyncio(loop_scope="session")

View File

@@ -186,12 +186,13 @@ async def unified_hybrid_search(
offset = (page - 1) * page_size
# Generate query embedding with graceful degradation
try:
query_embedding = await embed_query(query)
except Exception as e:
# Generate query embedding
query_embedding = await embed_query(query)
# Graceful degradation if embedding unavailable
if query_embedding is None or not query_embedding:
logger.warning(
f"Failed to generate query embedding - falling back to lexical-only search: {e}. "
"Failed to generate query embedding - falling back to lexical-only search. "
"Check that openai_internal_api_key is configured and OpenAI API is accessible."
)
query_embedding = [0.0] * EMBEDDING_DIM
@@ -463,12 +464,13 @@ async def hybrid_search(
offset = (page - 1) * page_size
# Generate query embedding with graceful degradation
try:
query_embedding = await embed_query(query)
except Exception as e:
# Generate query embedding
query_embedding = await embed_query(query)
# Graceful degradation
if query_embedding is None or not query_embedding:
logger.warning(
f"Failed to generate query embedding - falling back to lexical-only search: {e}"
"Failed to generate query embedding - falling back to lexical-only search."
)
query_embedding = [0.0] * EMBEDDING_DIM
total_non_semantic = (

View File

@@ -172,8 +172,8 @@ async def test_hybrid_search_without_embeddings():
with patch(
"backend.api.features.store.hybrid_search.query_raw_with_schema"
) as mock_query:
# Simulate embedding failure by raising exception
mock_embed.side_effect = Exception("Embedding generation failed")
# Simulate embedding failure
mock_embed.return_value = None
mock_query.return_value = mock_results
# Should NOT raise - graceful degradation
@@ -613,9 +613,7 @@ async def test_unified_hybrid_search_graceful_degradation():
"backend.api.features.store.hybrid_search.embed_query"
) as mock_embed:
mock_query.return_value = mock_results
mock_embed.side_effect = Exception(
"Embedding generation failed"
) # Embedding failure
mock_embed.return_value = None # Embedding failure
# Should NOT raise - graceful degradation
results, total = await unified_hybrid_search(

View File

@@ -116,7 +116,6 @@ class PrintToConsoleBlock(Block):
input_schema=PrintToConsoleBlock.Input,
output_schema=PrintToConsoleBlock.Output,
test_input={"text": "Hello, World!"},
is_sensitive_action=True,
test_output=[
("output", "Hello, World!"),
("status", "printed"),

View File

@@ -0,0 +1,28 @@
"""ElevenLabs integration blocks - test credentials and shared utilities."""
from typing import Literal
from pydantic import SecretStr
from backend.data.model import APIKeyCredentials, CredentialsMetaInput
from backend.integrations.providers import ProviderName
TEST_CREDENTIALS = APIKeyCredentials(
id="01234567-89ab-cdef-0123-456789abcdef",
provider="elevenlabs",
api_key=SecretStr("mock-elevenlabs-api-key"),
title="Mock ElevenLabs API key",
expires_at=None,
)
TEST_CREDENTIALS_INPUT = {
"provider": TEST_CREDENTIALS.provider,
"id": TEST_CREDENTIALS.id,
"type": TEST_CREDENTIALS.type,
"title": TEST_CREDENTIALS.title,
}
ElevenLabsCredentials = APIKeyCredentials
ElevenLabsCredentialsInput = CredentialsMetaInput[
Literal[ProviderName.ELEVENLABS], Literal["api_key"]
]

View File

@@ -9,7 +9,7 @@ from typing import Any, Optional
from prisma.enums import ReviewStatus
from pydantic import BaseModel
from backend.data.execution import ExecutionStatus
from backend.data.execution import ExecutionContext, ExecutionStatus
from backend.data.human_review import ReviewResult
from backend.executor.manager import async_update_node_execution_status
from backend.util.clients import get_database_manager_async_client
@@ -28,11 +28,6 @@ class ReviewDecision(BaseModel):
class HITLReviewHelper:
"""Helper class for Human-In-The-Loop review operations."""
@staticmethod
async def check_approval(**kwargs) -> Optional[ReviewResult]:
"""Check if there's an existing approval for this node execution."""
return await get_database_manager_async_client().check_approval(**kwargs)
@staticmethod
async def get_or_create_human_review(**kwargs) -> Optional[ReviewResult]:
"""Create or retrieve a human review from the database."""
@@ -60,11 +55,11 @@ class HITLReviewHelper:
async def _handle_review_request(
input_data: Any,
user_id: str,
node_id: str,
node_exec_id: str,
graph_exec_id: str,
graph_id: str,
graph_version: int,
execution_context: ExecutionContext,
block_name: str = "Block",
editable: bool = False,
) -> Optional[ReviewResult]:
@@ -74,11 +69,11 @@ class HITLReviewHelper:
Args:
input_data: The input data to be reviewed
user_id: ID of the user requesting the review
node_id: ID of the node in the graph definition
node_exec_id: ID of the node execution
graph_exec_id: ID of the graph execution
graph_id: ID of the graph
graph_version: Version of the graph
execution_context: Current execution context
block_name: Name of the block requesting review
editable: Whether the reviewer can edit the data
@@ -88,41 +83,15 @@ class HITLReviewHelper:
Raises:
Exception: If review creation or status update fails
"""
# Note: Safe mode checks (human_in_the_loop_safe_mode, sensitive_action_safe_mode)
# are handled by the caller:
# - HITL blocks check human_in_the_loop_safe_mode in their run() method
# - Sensitive action blocks check sensitive_action_safe_mode in is_block_exec_need_review()
# This function only handles checking for existing approvals.
# Check if this node has already been approved (normal or auto-approval)
if approval_result := await HITLReviewHelper.check_approval(
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
node_id=node_id,
user_id=user_id,
input_data=input_data,
):
# Skip review if safe mode is disabled - return auto-approved result
if not execution_context.human_in_the_loop_safe_mode:
logger.info(
f"Block {block_name} skipping review for node {node_exec_id} - "
f"found existing approval"
)
# Return a new ReviewResult with the current node_exec_id but approved status
# For auto-approvals, always use current input_data
# For normal approvals, use approval_result.data unless it's None
is_auto_approval = approval_result.node_exec_id != node_exec_id
approved_data = (
input_data
if is_auto_approval
else (
approval_result.data
if approval_result.data is not None
else input_data
)
f"Block {block_name} skipping review for node {node_exec_id} - safe mode disabled"
)
return ReviewResult(
data=approved_data,
data=input_data,
status=ReviewStatus.APPROVED,
message=approval_result.message,
message="Auto-approved (safe mode disabled)",
processed=True,
node_exec_id=node_exec_id,
)
@@ -134,7 +103,7 @@ class HITLReviewHelper:
graph_id=graph_id,
graph_version=graph_version,
input_data=input_data,
message=block_name, # Use block_name directly as the message
message=f"Review required for {block_name} execution",
editable=editable,
)
@@ -160,11 +129,11 @@ class HITLReviewHelper:
async def handle_review_decision(
input_data: Any,
user_id: str,
node_id: str,
node_exec_id: str,
graph_exec_id: str,
graph_id: str,
graph_version: int,
execution_context: ExecutionContext,
block_name: str = "Block",
editable: bool = False,
) -> Optional[ReviewDecision]:
@@ -174,11 +143,11 @@ class HITLReviewHelper:
Args:
input_data: The input data to be reviewed
user_id: ID of the user requesting the review
node_id: ID of the node in the graph definition
node_exec_id: ID of the node execution
graph_exec_id: ID of the graph execution
graph_id: ID of the graph
graph_version: Version of the graph
execution_context: Current execution context
block_name: Name of the block requesting review
editable: Whether the reviewer can edit the data
@@ -189,11 +158,11 @@ class HITLReviewHelper:
review_result = await HITLReviewHelper._handle_review_request(
input_data=input_data,
user_id=user_id,
node_id=node_id,
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
graph_version=graph_version,
execution_context=execution_context,
block_name=block_name,
editable=editable,
)

View File

@@ -97,7 +97,6 @@ class HumanInTheLoopBlock(Block):
input_data: Input,
*,
user_id: str,
node_id: str,
node_exec_id: str,
graph_exec_id: str,
graph_id: str,
@@ -116,12 +115,12 @@ class HumanInTheLoopBlock(Block):
decision = await self.handle_review_decision(
input_data=input_data.data,
user_id=user_id,
node_id=node_id,
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
graph_version=graph_version,
block_name=input_data.name, # Use user-provided name instead of block type
execution_context=execution_context,
block_name=self.name,
editable=input_data.editable,
)

View File

@@ -1,251 +0,0 @@
import os
import tempfile
from typing import Literal, Optional
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.fx.Loop import Loop
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.file import MediaFileType, get_exec_file_path, store_media_file
class MediaDurationBlock(Block):
class Input(BlockSchemaInput):
media_in: MediaFileType = SchemaField(
description="Media input (URL, data URI, or local path)."
)
is_video: bool = SchemaField(
description="Whether the media is a video (True) or audio (False).",
default=True,
)
class Output(BlockSchemaOutput):
duration: float = SchemaField(
description="Duration of the media file (in seconds)."
)
def __init__(self):
super().__init__(
id="d8b91fd4-da26-42d4-8ecb-8b196c6d84b6",
description="Block to get the duration of a media file.",
categories={BlockCategory.MULTIMEDIA},
input_schema=MediaDurationBlock.Input,
output_schema=MediaDurationBlock.Output,
)
async def run(
self,
input_data: Input,
*,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the input media locally
local_media_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.media_in,
user_id=user_id,
return_content=False,
)
media_abspath = get_exec_file_path(graph_exec_id, local_media_path)
# 2) Load the clip
if input_data.is_video:
clip = VideoFileClip(media_abspath)
else:
clip = AudioFileClip(media_abspath)
yield "duration", clip.duration
class LoopVideoBlock(Block):
"""
Block for looping (repeating) a video clip until a given duration or number of loops.
"""
class Input(BlockSchemaInput):
video_in: MediaFileType = SchemaField(
description="The input video (can be a URL, data URI, or local path)."
)
# Provide EITHER a `duration` or `n_loops` or both. We'll demonstrate `duration`.
duration: Optional[float] = SchemaField(
description="Target duration (in seconds) to loop the video to. If omitted, defaults to no looping.",
default=None,
ge=0.0,
)
n_loops: Optional[int] = SchemaField(
description="Number of times to repeat the video. If omitted, defaults to 1 (no repeat).",
default=None,
ge=1,
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="How to return the output video. Either a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Looped video returned either as a relative path or a data URI."
)
def __init__(self):
super().__init__(
id="8bf9eef6-5451-4213-b265-25306446e94b",
description="Block to loop a video to a given duration or number of repeats.",
categories={BlockCategory.MULTIMEDIA},
input_schema=LoopVideoBlock.Input,
output_schema=LoopVideoBlock.Output,
)
async def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the input video locally
local_video_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.video_in,
user_id=user_id,
return_content=False,
)
input_abspath = get_exec_file_path(graph_exec_id, local_video_path)
# 2) Load the clip
clip = VideoFileClip(input_abspath)
# 3) Apply the loop effect
looped_clip = clip
if input_data.duration:
# Loop until we reach the specified duration
looped_clip = looped_clip.with_effects([Loop(duration=input_data.duration)])
elif input_data.n_loops:
looped_clip = looped_clip.with_effects([Loop(n=input_data.n_loops)])
else:
raise ValueError("Either 'duration' or 'n_loops' must be provided.")
assert isinstance(looped_clip, VideoFileClip)
# 4) Save the looped output
output_filename = MediaFileType(
f"{node_exec_id}_looped_{os.path.basename(local_video_path)}"
)
output_abspath = get_exec_file_path(graph_exec_id, output_filename)
looped_clip = looped_clip.with_audio(clip.audio)
looped_clip.write_videofile(output_abspath, codec="libx264", audio_codec="aac")
# Return as data URI
video_out = await store_media_file(
graph_exec_id=graph_exec_id,
file=output_filename,
user_id=user_id,
return_content=input_data.output_return_type == "data_uri",
)
yield "video_out", video_out
class AddAudioToVideoBlock(Block):
"""
Block that adds (attaches) an audio track to an existing video.
Optionally scale the volume of the new track.
"""
class Input(BlockSchemaInput):
video_in: MediaFileType = SchemaField(
description="Video input (URL, data URI, or local path)."
)
audio_in: MediaFileType = SchemaField(
description="Audio input (URL, data URI, or local path)."
)
volume: float = SchemaField(
description="Volume scale for the newly attached audio track (1.0 = original).",
default=1.0,
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="Return the final output as a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchemaOutput):
video_out: MediaFileType = SchemaField(
description="Final video (with attached audio), as a path or data URI."
)
def __init__(self):
super().__init__(
id="3503748d-62b6-4425-91d6-725b064af509",
description="Block to attach an audio file to a video file using moviepy.",
categories={BlockCategory.MULTIMEDIA},
input_schema=AddAudioToVideoBlock.Input,
output_schema=AddAudioToVideoBlock.Output,
)
async def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the inputs locally
local_video_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.video_in,
user_id=user_id,
return_content=False,
)
local_audio_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.audio_in,
user_id=user_id,
return_content=False,
)
abs_temp_dir = os.path.join(tempfile.gettempdir(), "exec_file", graph_exec_id)
video_abspath = os.path.join(abs_temp_dir, local_video_path)
audio_abspath = os.path.join(abs_temp_dir, local_audio_path)
# 2) Load video + audio with moviepy
video_clip = VideoFileClip(video_abspath)
audio_clip = AudioFileClip(audio_abspath)
# Optionally scale volume
if input_data.volume != 1.0:
audio_clip = audio_clip.with_volume_scaled(input_data.volume)
# 3) Attach the new audio track
final_clip = video_clip.with_audio(audio_clip)
# 4) Write to output file
output_filename = MediaFileType(
f"{node_exec_id}_audio_attached_{os.path.basename(local_video_path)}"
)
output_abspath = os.path.join(abs_temp_dir, output_filename)
final_clip.write_videofile(output_abspath, codec="libx264", audio_codec="aac")
# 5) Return either path or data URI
video_out = await store_media_file(
graph_exec_id=graph_exec_id,
file=output_filename,
user_id=user_id,
return_content=input_data.output_return_type == "data_uri",
)
yield "video_out", video_out

View File

@@ -0,0 +1,37 @@
"""Video editing blocks for AutoGPT Platform.
This module provides blocks for:
- Downloading videos from URLs (YouTube, Vimeo, news sites, direct links)
- Clipping/trimming video segments
- Concatenating multiple videos
- Adding text overlays
- Adding AI-generated narration
- Getting media duration
- Looping videos
- Adding audio to videos
Dependencies:
- yt-dlp: For video downloading
- moviepy: For video editing operations
- requests: For API calls (narration block)
"""
from backend.blocks.video.add_audio import AddAudioToVideoBlock
from backend.blocks.video.clip import VideoClipBlock
from backend.blocks.video.concat import VideoConcatBlock
from backend.blocks.video.download import VideoDownloadBlock
from backend.blocks.video.duration import MediaDurationBlock
from backend.blocks.video.loop import LoopVideoBlock
from backend.blocks.video.narration import VideoNarrationBlock
from backend.blocks.video.text_overlay import VideoTextOverlayBlock
__all__ = [
"AddAudioToVideoBlock",
"LoopVideoBlock",
"MediaDurationBlock",
"VideoClipBlock",
"VideoConcatBlock",
"VideoDownloadBlock",
"VideoNarrationBlock",
"VideoTextOverlayBlock",
]

View File

@@ -0,0 +1,34 @@
"""Shared utilities for video blocks."""
import os
def get_video_codecs(output_path: str) -> tuple[str, str]:
"""Get appropriate video and audio codecs based on output file extension.
Args:
output_path: Path to the output file (used to determine extension)
Returns:
Tuple of (video_codec, audio_codec)
Codec mappings:
- .mp4: H.264 + AAC (universal compatibility)
- .webm: VP8 + Vorbis (web streaming)
- .mkv: H.264 + AAC (container supports many codecs)
- .mov: H.264 + AAC (Apple QuickTime, widely compatible)
- .m4v: H.264 + AAC (Apple iTunes/devices)
- .avi: MPEG-4 + MP3 (legacy Windows)
"""
ext = os.path.splitext(output_path)[1].lower()
codec_map: dict[str, tuple[str, str]] = {
".mp4": ("libx264", "aac"),
".webm": ("libvpx", "libvorbis"),
".mkv": ("libx264", "aac"),
".mov": ("libx264", "aac"),
".m4v": ("libx264", "aac"),
".avi": ("mpeg4", "libmp3lame"),
}
return codec_map.get(ext, ("libx264", "aac"))

View File

@@ -0,0 +1,127 @@
"""AddAudioToVideoBlock - Attach an audio track to a video."""
import os
from typing import Literal
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.blocks.video._utils import get_video_codecs
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.file import MediaFileType, get_exec_file_path, store_media_file
class AddAudioToVideoBlock(Block):
"""Attach an audio track to an existing video."""
class Input(BlockSchemaInput):
video_in: MediaFileType = SchemaField(
description="Video input (URL, data URI, or local path)."
)
audio_in: MediaFileType = SchemaField(
description="Audio input (URL, data URI, or local path)."
)
volume: float = SchemaField(
description="Volume scale for the newly attached audio track (1.0 = original).",
default=1.0,
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="Return the final output as a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchemaOutput):
video_out: MediaFileType = SchemaField(
description="Final video (with attached audio), as a path or data URI."
)
def __init__(self):
super().__init__(
id="3503748d-62b6-4425-91d6-725b064af509",
description="Block to attach an audio file to a video file using moviepy.",
categories={BlockCategory.MULTIMEDIA},
input_schema=AddAudioToVideoBlock.Input,
output_schema=AddAudioToVideoBlock.Output,
)
async def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the inputs locally
local_video_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.video_in,
user_id=user_id,
return_content=False,
)
local_audio_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.audio_in,
user_id=user_id,
return_content=False,
)
video_abspath = get_exec_file_path(graph_exec_id, local_video_path)
audio_abspath = get_exec_file_path(graph_exec_id, local_audio_path)
video_clip = None
audio_clip_original = None
audio_clip_scaled = None
final_clip = None
try:
# 2) Load video + audio with moviepy
video_clip = VideoFileClip(video_abspath)
audio_clip_original = AudioFileClip(audio_abspath)
# Optionally scale volume
audio_to_use = audio_clip_original
if input_data.volume != 1.0:
audio_clip_scaled = audio_clip_original.with_volume_scaled(
input_data.volume
)
audio_to_use = audio_clip_scaled
# 3) Attach the new audio track
final_clip = video_clip.with_audio(audio_to_use)
# 4) Write to output file
output_filename = MediaFileType(
f"{node_exec_id}_audio_attached_{os.path.basename(local_video_path)}"
)
output_abspath = get_exec_file_path(graph_exec_id, output_filename)
video_codec, audio_codec = get_video_codecs(output_abspath)
final_clip.write_videofile(
output_abspath, codec=video_codec, audio_codec=audio_codec
)
# 5) Return either path or data URI
video_out = await store_media_file(
graph_exec_id=graph_exec_id,
file=output_filename,
user_id=user_id,
return_content=input_data.output_return_type == "data_uri",
)
yield "video_out", video_out
finally:
if final_clip:
final_clip.close()
if audio_clip_scaled:
audio_clip_scaled.close()
if audio_clip_original:
audio_clip_original.close()
if video_clip:
video_clip.close()

View File

@@ -0,0 +1,172 @@
"""VideoClipBlock - Extract a segment from a video file."""
import os
from typing import Literal
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.blocks.video._utils import get_video_codecs
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
from backend.util.file import MediaFileType, get_exec_file_path, store_media_file
class VideoClipBlock(Block):
"""Extract a time segment from a video."""
class Input(BlockSchemaInput):
video_in: MediaFileType = SchemaField(
description="Input video (URL, data URI, or local path)"
)
start_time: float = SchemaField(description="Start time in seconds", ge=0.0)
end_time: float = SchemaField(description="End time in seconds", ge=0.0)
output_format: Literal["mp4", "webm", "mkv", "mov"] = SchemaField(
description="Output format", default="mp4", advanced=True
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="Return the output as a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchemaOutput):
video_out: MediaFileType = SchemaField(
description="Clipped video file (path or data URI)"
)
duration: float = SchemaField(description="Clip duration in seconds")
def __init__(self):
super().__init__(
id="8f539119-e580-4d86-ad41-86fbcb22abb1",
description="Extract a time segment from a video",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={
"video_in": "/tmp/test.mp4",
"start_time": 0.0,
"end_time": 10.0,
},
test_output=[("video_out", str), ("duration", float)],
test_mock={
"_clip_video": lambda *args: 10.0,
"_store_input_video": lambda *args, **kwargs: "test.mp4",
"_store_output_video": lambda *args, **kwargs: "clip_test.mp4",
},
)
async def _store_input_video(
self, graph_exec_id: str, file: MediaFileType, user_id: str
) -> MediaFileType:
"""Store input video. Extracted for testability."""
return await store_media_file(
graph_exec_id=graph_exec_id,
file=file,
user_id=user_id,
return_content=False,
)
async def _store_output_video(
self,
graph_exec_id: str,
file: MediaFileType,
user_id: str,
return_content: bool,
) -> MediaFileType:
"""Store output video. Extracted for testability."""
return await store_media_file(
graph_exec_id=graph_exec_id,
file=file,
user_id=user_id,
return_content=return_content,
)
def _clip_video(
self,
video_abspath: str,
output_abspath: str,
start_time: float,
end_time: float,
) -> float:
"""Extract a clip from a video. Extracted for testability."""
clip = None
subclip = None
try:
clip = VideoFileClip(video_abspath)
subclip = clip.subclipped(start_time, end_time)
video_codec, audio_codec = get_video_codecs(output_abspath)
subclip.write_videofile(
output_abspath, codec=video_codec, audio_codec=audio_codec
)
return subclip.duration
finally:
if subclip:
subclip.close()
if clip:
clip.close()
async def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
# Validate time range
if input_data.end_time <= input_data.start_time:
raise BlockExecutionError(
message=f"end_time ({input_data.end_time}) must be greater than start_time ({input_data.start_time})",
block_name=self.name,
block_id=str(self.id),
)
try:
# Store the input video locally
local_video_path = await self._store_input_video(
graph_exec_id, input_data.video_in, user_id
)
video_abspath = get_exec_file_path(graph_exec_id, local_video_path)
# Build output path
output_filename = MediaFileType(
f"{node_exec_id}_clip_{os.path.basename(local_video_path)}"
)
# Ensure correct extension
base, _ = os.path.splitext(output_filename)
output_filename = MediaFileType(f"{base}.{input_data.output_format}")
output_abspath = get_exec_file_path(graph_exec_id, output_filename)
duration = self._clip_video(
video_abspath,
output_abspath,
input_data.start_time,
input_data.end_time,
)
# Return as data URI or path
video_out = await self._store_output_video(
graph_exec_id,
output_filename,
user_id,
input_data.output_return_type == "data_uri",
)
yield "video_out", video_out
yield "duration", duration
except BlockExecutionError:
raise
except Exception as e:
raise BlockExecutionError(
message=f"Failed to clip video: {e}",
block_name=self.name,
block_id=str(self.id),
) from e

View File

@@ -0,0 +1,206 @@
"""VideoConcatBlock - Concatenate multiple video clips into one."""
from typing import Literal
from moviepy import concatenate_videoclips
from moviepy.video.fx import CrossFadeIn, CrossFadeOut, FadeIn, FadeOut
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.blocks.video._utils import get_video_codecs
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
from backend.util.file import MediaFileType, get_exec_file_path, store_media_file
class VideoConcatBlock(Block):
"""Merge multiple video clips into one continuous video."""
class Input(BlockSchemaInput):
videos: list[MediaFileType] = SchemaField(
description="List of video files to concatenate (in order)"
)
transition: Literal["none", "crossfade", "fade_black"] = SchemaField(
description="Transition between clips", default="none"
)
transition_duration: int = SchemaField(
description="Transition duration in seconds",
default=1,
ge=0,
advanced=True,
)
output_format: Literal["mp4", "webm", "mkv", "mov"] = SchemaField(
description="Output format", default="mp4", advanced=True
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="Return the output as a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchemaOutput):
video_out: MediaFileType = SchemaField(
description="Concatenated video file (path or data URI)"
)
total_duration: float = SchemaField(description="Total duration in seconds")
def __init__(self):
super().__init__(
id="9b0f531a-1118-487f-aeec-3fa63ea8900a",
description="Merge multiple video clips into one continuous video",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={"videos": ["/tmp/a.mp4", "/tmp/b.mp4"]},
test_output=[("video_out", str), ("total_duration", float)],
test_mock={
"_concat_videos": lambda *args: 20.0,
"_store_input_video": lambda *args, **kwargs: "test.mp4",
"_store_output_video": lambda *args, **kwargs: "concat_test.mp4",
},
)
async def _store_input_video(
self, graph_exec_id: str, file: MediaFileType, user_id: str
) -> MediaFileType:
"""Store input video. Extracted for testability."""
return await store_media_file(
graph_exec_id=graph_exec_id,
file=file,
user_id=user_id,
return_content=False,
)
async def _store_output_video(
self,
graph_exec_id: str,
file: MediaFileType,
user_id: str,
return_content: bool,
) -> MediaFileType:
"""Store output video. Extracted for testability."""
return await store_media_file(
graph_exec_id=graph_exec_id,
file=file,
user_id=user_id,
return_content=return_content,
)
def _concat_videos(
self,
video_abspaths: list[str],
output_abspath: str,
transition: str,
transition_duration: int,
) -> float:
"""Concatenate videos. Extracted for testability."""
clips = []
faded_clips = []
final = None
try:
# Load clips
for v in video_abspaths:
clips.append(VideoFileClip(v))
if transition == "crossfade":
for i, clip in enumerate(clips):
effects = []
if i > 0:
effects.append(CrossFadeIn(transition_duration))
if i < len(clips) - 1:
effects.append(CrossFadeOut(transition_duration))
if effects:
clip = clip.with_effects(effects)
faded_clips.append(clip)
final = concatenate_videoclips(
faded_clips,
method="compose",
padding=-transition_duration,
)
elif transition == "fade_black":
for clip in clips:
faded = clip.with_effects(
[FadeIn(transition_duration), FadeOut(transition_duration)]
)
faded_clips.append(faded)
final = concatenate_videoclips(faded_clips)
else:
final = concatenate_videoclips(clips)
video_codec, audio_codec = get_video_codecs(output_abspath)
final.write_videofile(
output_abspath, codec=video_codec, audio_codec=audio_codec
)
return final.duration
finally:
if final:
final.close()
for clip in faded_clips:
clip.close()
for clip in clips:
clip.close()
async def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
# Validate minimum clips
if len(input_data.videos) < 2:
raise BlockExecutionError(
message="At least 2 videos are required for concatenation",
block_name=self.name,
block_id=str(self.id),
)
try:
# Store all input videos locally
video_abspaths = []
for video in input_data.videos:
local_path = await self._store_input_video(
graph_exec_id, video, user_id
)
video_abspaths.append(get_exec_file_path(graph_exec_id, local_path))
# Build output path
output_filename = MediaFileType(
f"{node_exec_id}_concat.{input_data.output_format}"
)
output_abspath = get_exec_file_path(graph_exec_id, output_filename)
total_duration = self._concat_videos(
video_abspaths,
output_abspath,
input_data.transition,
input_data.transition_duration,
)
# Return as data URI or path
video_out = await self._store_output_video(
graph_exec_id,
output_filename,
user_id,
input_data.output_return_type == "data_uri",
)
yield "video_out", video_out
yield "total_duration", total_duration
except BlockExecutionError:
raise
except Exception as e:
raise BlockExecutionError(
message=f"Failed to concatenate videos: {e}",
block_name=self.name,
block_id=str(self.id),
) from e

View File

@@ -0,0 +1,177 @@
"""VideoDownloadBlock - Download video from URL (YouTube, Vimeo, news sites, direct links)."""
import os
import typing
from typing import Literal
import yt_dlp
if typing.TYPE_CHECKING:
from yt_dlp import _Params
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
from backend.util.file import MediaFileType, get_exec_file_path, store_media_file
class VideoDownloadBlock(Block):
"""Download video from URL using yt-dlp."""
class Input(BlockSchemaInput):
url: str = SchemaField(
description="URL of the video to download (YouTube, Vimeo, direct link, etc.)",
placeholder="https://www.youtube.com/watch?v=...",
)
quality: Literal["best", "1080p", "720p", "480p", "audio_only"] = SchemaField(
description="Video quality preference", default="720p"
)
output_format: Literal["mp4", "webm", "mkv"] = SchemaField(
description="Output video format", default="mp4", advanced=True
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="Return the output as a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchemaOutput):
video_file: MediaFileType = SchemaField(
description="Downloaded video (path or data URI)"
)
duration: float = SchemaField(description="Video duration in seconds")
title: str = SchemaField(description="Video title from source")
source_url: str = SchemaField(description="Original source URL")
def __init__(self):
super().__init__(
id="c35daabb-cd60-493b-b9ad-51f1fe4b50c4",
description="Download video from URL (YouTube, Vimeo, news sites, direct links)",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={
"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ",
"quality": "480p",
},
test_output=[
("video_file", str),
("duration", float),
("title", str),
("source_url", str),
],
test_mock={
"_download_video": lambda *args: ("video.mp4", 212.0, "Test Video"),
"_store_output_video": lambda *args, **kwargs: "video.mp4",
},
)
async def _store_output_video(
self,
graph_exec_id: str,
file: MediaFileType,
user_id: str,
return_content: bool,
) -> MediaFileType:
"""Store output video. Extracted for testability."""
return await store_media_file(
graph_exec_id=graph_exec_id,
file=file,
user_id=user_id,
return_content=return_content,
)
def _get_format_string(self, quality: str) -> str:
formats = {
"best": "bestvideo+bestaudio/best",
"1080p": "bestvideo[height<=1080]+bestaudio/best[height<=1080]",
"720p": "bestvideo[height<=720]+bestaudio/best[height<=720]",
"480p": "bestvideo[height<=480]+bestaudio/best[height<=480]",
"audio_only": "bestaudio/best",
}
return formats.get(quality, formats["720p"])
def _download_video(
self,
url: str,
quality: str,
output_format: str,
output_dir: str,
node_exec_id: str,
) -> tuple[str, float, str]:
"""Download video. Extracted for testability."""
output_template = os.path.join(
output_dir, f"{node_exec_id}_%(title).50s.%(ext)s"
)
ydl_opts: "_Params" = {
"format": self._get_format_string(quality),
"outtmpl": output_template,
"merge_output_format": output_format,
"quiet": True,
"no_warnings": True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
video_path = ydl.prepare_filename(info)
# Handle format conversion in filename
if not video_path.endswith(f".{output_format}"):
video_path = video_path.rsplit(".", 1)[0] + f".{output_format}"
# Return just the filename, not the full path
filename = os.path.basename(video_path)
return (
filename,
info.get("duration") or 0.0,
info.get("title") or "Unknown",
)
async def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
try:
# Get the exec file directory
output_dir = get_exec_file_path(graph_exec_id, "")
os.makedirs(output_dir, exist_ok=True)
filename, duration, title = self._download_video(
input_data.url,
input_data.quality,
input_data.output_format,
output_dir,
node_exec_id,
)
# Return as data URI or path
video_out = await self._store_output_video(
graph_exec_id,
MediaFileType(filename),
user_id,
input_data.output_return_type == "data_uri",
)
yield "video_file", video_out
yield "duration", duration
yield "title", title
yield "source_url", input_data.url
except Exception as e:
raise BlockExecutionError(
message=f"Failed to download video: {e}",
block_name=self.name,
block_id=str(self.id),
) from e

View File

@@ -0,0 +1,71 @@
"""MediaDurationBlock - Get the duration of a media file."""
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.file import MediaFileType, get_exec_file_path, store_media_file
class MediaDurationBlock(Block):
"""Get the duration of a media file."""
class Input(BlockSchemaInput):
media_in: MediaFileType = SchemaField(
description="Media input (URL, data URI, or local path)."
)
is_video: bool = SchemaField(
description="Whether the media is a video (True) or audio (False).",
default=True,
)
class Output(BlockSchemaOutput):
duration: float = SchemaField(
description="Duration of the media file (in seconds)."
)
def __init__(self):
super().__init__(
id="d8b91fd4-da26-42d4-8ecb-8b196c6d84b6",
description="Block to get the duration of a media file.",
categories={BlockCategory.MULTIMEDIA},
input_schema=MediaDurationBlock.Input,
output_schema=MediaDurationBlock.Output,
)
async def run(
self,
input_data: Input,
*,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the input media locally
local_media_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.media_in,
user_id=user_id,
return_content=False,
)
media_abspath = get_exec_file_path(graph_exec_id, local_media_path)
# 2) Load the clip
clip = None
try:
if input_data.is_video:
clip = VideoFileClip(media_abspath)
else:
clip = AudioFileClip(media_abspath)
yield "duration", clip.duration
finally:
if clip:
clip.close()

View File

@@ -0,0 +1,116 @@
"""LoopVideoBlock - Loop a video to a given duration or number of repeats."""
import os
from typing import Literal, Optional
from moviepy.video.fx.Loop import Loop
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.blocks.video._utils import get_video_codecs
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.file import MediaFileType, get_exec_file_path, store_media_file
class LoopVideoBlock(Block):
"""Loop (repeat) a video clip until a given duration or number of loops."""
class Input(BlockSchemaInput):
video_in: MediaFileType = SchemaField(
description="The input video (can be a URL, data URI, or local path)."
)
duration: Optional[float] = SchemaField(
description="Target duration (in seconds) to loop the video to. If omitted, defaults to no looping.",
default=None,
ge=0.0,
)
n_loops: Optional[int] = SchemaField(
description="Number of times to repeat the video. If omitted, defaults to 1 (no repeat).",
default=None,
ge=1,
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="How to return the output video. Either a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Looped video returned either as a relative path or a data URI."
)
def __init__(self):
super().__init__(
id="8bf9eef6-5451-4213-b265-25306446e94b",
description="Block to loop a video to a given duration or number of repeats.",
categories={BlockCategory.MULTIMEDIA},
input_schema=LoopVideoBlock.Input,
output_schema=LoopVideoBlock.Output,
)
async def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the input video locally
local_video_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.video_in,
user_id=user_id,
return_content=False,
)
input_abspath = get_exec_file_path(graph_exec_id, local_video_path)
clip: VideoFileClip | None = None
looped_clip: VideoFileClip | None = None
try:
# 2) Load the clip
clip = VideoFileClip(input_abspath)
# 3) Apply the loop effect
# Note: Loop effect handles both video and audio looping automatically
if input_data.duration:
looped_clip = clip.with_effects([Loop(duration=input_data.duration)]) # type: ignore[arg-type] Clip implements shallow copy that loses type info
elif input_data.n_loops:
looped_clip = clip.with_effects([Loop(n=input_data.n_loops)]) # type: ignore[arg-type] Clip implements shallow copy that loses type info
else:
raise ValueError("Either 'duration' or 'n_loops' must be provided.")
# 4) Save the looped output
output_filename = MediaFileType(
f"{node_exec_id}_looped_{os.path.basename(local_video_path)}"
)
output_abspath = get_exec_file_path(graph_exec_id, output_filename)
assert looped_clip is not None
video_codec, audio_codec = get_video_codecs(output_abspath)
looped_clip.write_videofile(
output_abspath, codec=video_codec, audio_codec=audio_codec
)
# Return as data URI or path
video_out = await store_media_file(
graph_exec_id=graph_exec_id,
file=output_filename,
user_id=user_id,
return_content=input_data.output_return_type == "data_uri",
)
yield "video_out", video_out
finally:
if looped_clip is not None:
looped_clip.close()
if clip is not None:
clip.close()

View File

@@ -0,0 +1,268 @@
"""VideoNarrationBlock - Generate AI voice narration and add to video."""
import os
from typing import Literal
from elevenlabs import ElevenLabs
from moviepy import CompositeAudioClip
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.blocks.elevenlabs._auth import (
TEST_CREDENTIALS,
TEST_CREDENTIALS_INPUT,
ElevenLabsCredentials,
ElevenLabsCredentialsInput,
)
from backend.blocks.video._utils import get_video_codecs
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import CredentialsField, SchemaField
from backend.util.exceptions import BlockExecutionError
from backend.util.file import MediaFileType, get_exec_file_path, store_media_file
class VideoNarrationBlock(Block):
"""Generate AI narration and add to video."""
class Input(BlockSchemaInput):
credentials: ElevenLabsCredentialsInput = CredentialsField(
description="ElevenLabs API key for voice synthesis"
)
video_in: MediaFileType = SchemaField(
description="Input video (URL, data URI, or local path)"
)
script: str = SchemaField(description="Narration script text")
voice_id: str = SchemaField(
description="ElevenLabs voice ID", default="21m00Tcm4TlvDq8ikWAM" # Rachel
)
model_id: Literal[
"eleven_multilingual_v2",
"eleven_flash_v2_5",
"eleven_turbo_v2_5",
"eleven_turbo_v2",
] = SchemaField(
description="ElevenLabs TTS model",
default="eleven_multilingual_v2",
)
mix_mode: Literal["replace", "mix", "ducking"] = SchemaField(
description="How to combine with original audio. 'ducking' applies stronger attenuation than 'mix'.",
default="ducking",
)
narration_volume: float = SchemaField(
description="Narration volume (0.0 to 2.0)",
default=1.0,
ge=0.0,
le=2.0,
advanced=True,
)
original_volume: float = SchemaField(
description="Original audio volume when mixing (0.0 to 1.0)",
default=0.3,
ge=0.0,
le=1.0,
advanced=True,
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="Return the output as a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchemaOutput):
video_out: MediaFileType = SchemaField(
description="Video with narration (path or data URI)"
)
audio_file: MediaFileType = SchemaField(
description="Generated audio file (path or data URI)"
)
def __init__(self):
super().__init__(
id="3d036b53-859c-4b17-9826-ca340f736e0e",
description="Generate AI narration and add to video",
categories={BlockCategory.MULTIMEDIA, BlockCategory.AI},
input_schema=self.Input,
output_schema=self.Output,
test_input={
"video_in": "/tmp/test.mp4",
"script": "Hello world",
"credentials": TEST_CREDENTIALS_INPUT,
},
test_credentials=TEST_CREDENTIALS,
test_output=[("video_out", str), ("audio_file", str)],
test_mock={
"_generate_narration_audio": lambda *args: b"mock audio content",
"_add_narration_to_video": lambda *args: None,
"_store_input_video": lambda *args, **kwargs: "test.mp4",
"_store_output_video": lambda *args, **kwargs: "narrated_test.mp4",
},
)
async def _store_input_video(
self, graph_exec_id: str, file: MediaFileType, user_id: str
) -> MediaFileType:
"""Store input video. Extracted for testability."""
return await store_media_file(
graph_exec_id=graph_exec_id,
file=file,
user_id=user_id,
return_content=False,
)
async def _store_output_video(
self,
graph_exec_id: str,
file: MediaFileType,
user_id: str,
return_content: bool,
) -> MediaFileType:
"""Store output video. Extracted for testability."""
return await store_media_file(
graph_exec_id=graph_exec_id,
file=file,
user_id=user_id,
return_content=return_content,
)
def _generate_narration_audio(
self, api_key: str, script: str, voice_id: str, model_id: str
) -> bytes:
"""Generate narration audio via ElevenLabs API."""
client = ElevenLabs(api_key=api_key)
audio_generator = client.text_to_speech.convert(
voice_id=voice_id,
text=script,
model_id=model_id,
)
# The SDK returns a generator, collect all chunks
return b"".join(audio_generator)
def _add_narration_to_video(
self,
video_abspath: str,
audio_abspath: str,
output_abspath: str,
mix_mode: str,
narration_volume: float,
original_volume: float,
) -> None:
"""Add narration audio to video. Extracted for testability."""
video = None
final = None
narration_original = None
narration_scaled = None
original = None
try:
video = VideoFileClip(video_abspath)
narration_original = AudioFileClip(audio_abspath)
narration_scaled = narration_original.with_volume_scaled(narration_volume)
narration = narration_scaled
if mix_mode == "replace":
final_audio = narration
elif mix_mode == "mix":
if video.audio:
original = video.audio.with_volume_scaled(original_volume)
final_audio = CompositeAudioClip([original, narration])
else:
final_audio = narration
else: # ducking - apply stronger attenuation
if video.audio:
# Ducking uses a much lower volume for original audio
ducking_volume = original_volume * 0.3
original = video.audio.with_volume_scaled(ducking_volume)
final_audio = CompositeAudioClip([original, narration])
else:
final_audio = narration
final = video.with_audio(final_audio)
video_codec, audio_codec = get_video_codecs(output_abspath)
final.write_videofile(
output_abspath, codec=video_codec, audio_codec=audio_codec
)
finally:
if original:
original.close()
if narration_scaled:
narration_scaled.close()
if narration_original:
narration_original.close()
if final:
final.close()
if video:
video.close()
async def run(
self,
input_data: Input,
*,
credentials: ElevenLabsCredentials,
node_exec_id: str,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
try:
# Store the input video locally
local_video_path = await self._store_input_video(
graph_exec_id, input_data.video_in, user_id
)
video_abspath = get_exec_file_path(graph_exec_id, local_video_path)
# Generate narration audio via ElevenLabs
audio_content = self._generate_narration_audio(
credentials.api_key.get_secret_value(),
input_data.script,
input_data.voice_id,
input_data.model_id,
)
# Save audio to exec file path
audio_filename = MediaFileType(f"{node_exec_id}_narration.mp3")
audio_abspath = get_exec_file_path(graph_exec_id, audio_filename)
os.makedirs(os.path.dirname(audio_abspath), exist_ok=True)
with open(audio_abspath, "wb") as f:
f.write(audio_content)
# Add narration to video
output_filename = MediaFileType(
f"{node_exec_id}_narrated_{os.path.basename(local_video_path)}"
)
output_abspath = get_exec_file_path(graph_exec_id, output_filename)
self._add_narration_to_video(
video_abspath,
audio_abspath,
output_abspath,
input_data.mix_mode,
input_data.narration_volume,
input_data.original_volume,
)
# Return as data URI or path
return_as_data_uri = input_data.output_return_type == "data_uri"
video_out = await self._store_output_video(
graph_exec_id, output_filename, user_id, return_as_data_uri
)
audio_out = await self._store_output_video(
graph_exec_id, audio_filename, user_id, return_as_data_uri
)
yield "video_out", video_out
yield "audio_file", audio_out
except Exception as e:
raise BlockExecutionError(
message=f"Failed to add narration: {e}",
block_name=self.name,
block_id=str(self.id),
) from e

View File

@@ -0,0 +1,234 @@
"""VideoTextOverlayBlock - Add text overlay to video."""
import os
from typing import Literal
from moviepy import CompositeVideoClip, TextClip
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.blocks.video._utils import get_video_codecs
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
from backend.util.file import MediaFileType, get_exec_file_path, store_media_file
class VideoTextOverlayBlock(Block):
"""Add text overlay/caption to video."""
class Input(BlockSchemaInput):
video_in: MediaFileType = SchemaField(
description="Input video (URL, data URI, or local path)"
)
text: str = SchemaField(description="Text to overlay on video")
position: Literal[
"top",
"center",
"bottom",
"top-left",
"top-right",
"bottom-left",
"bottom-right",
] = SchemaField(description="Position of text on screen", default="bottom")
start_time: float | None = SchemaField(
description="When to show text (seconds). None = entire video",
default=None,
advanced=True,
)
end_time: float | None = SchemaField(
description="When to hide text (seconds). None = until end",
default=None,
advanced=True,
)
font_size: int = SchemaField(
description="Font size", default=48, ge=12, le=200, advanced=True
)
font_color: str = SchemaField(
description="Font color (hex or name)", default="white", advanced=True
)
bg_color: str | None = SchemaField(
description="Background color behind text (None for transparent)",
default=None,
advanced=True,
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="Return the output as a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchemaOutput):
video_out: MediaFileType = SchemaField(
description="Video with text overlay (path or data URI)"
)
def __init__(self):
super().__init__(
id="8ef14de6-cc90-430a-8cfa-3a003be92454",
description="Add text overlay/caption to video",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={"video_in": "/tmp/test.mp4", "text": "Hello World"},
test_output=[("video_out", str)],
test_mock={
"_add_text_overlay": lambda *args: None,
"_store_input_video": lambda *args, **kwargs: "test.mp4",
"_store_output_video": lambda *args, **kwargs: "overlay_test.mp4",
},
)
async def _store_input_video(
self, graph_exec_id: str, file: MediaFileType, user_id: str
) -> MediaFileType:
"""Store input video. Extracted for testability."""
return await store_media_file(
graph_exec_id=graph_exec_id,
file=file,
user_id=user_id,
return_content=False,
)
async def _store_output_video(
self,
graph_exec_id: str,
file: MediaFileType,
user_id: str,
return_content: bool,
) -> MediaFileType:
"""Store output video. Extracted for testability."""
return await store_media_file(
graph_exec_id=graph_exec_id,
file=file,
user_id=user_id,
return_content=return_content,
)
def _add_text_overlay(
self,
video_abspath: str,
output_abspath: str,
text: str,
position: str,
start_time: float | None,
end_time: float | None,
font_size: int,
font_color: str,
bg_color: str | None,
) -> None:
"""Add text overlay to video. Extracted for testability."""
video = None
final = None
txt_clip = None
try:
video = VideoFileClip(video_abspath)
txt_clip = TextClip(
text=text,
font_size=font_size,
color=font_color,
bg_color=bg_color,
)
# Position mapping
pos_map = {
"top": ("center", "top"),
"center": ("center", "center"),
"bottom": ("center", "bottom"),
"top-left": ("left", "top"),
"top-right": ("right", "top"),
"bottom-left": ("left", "bottom"),
"bottom-right": ("right", "bottom"),
}
txt_clip = txt_clip.with_position(pos_map[position])
# Set timing
start = start_time or 0
end = end_time or video.duration
duration = max(0, end - start)
txt_clip = txt_clip.with_start(start).with_end(end).with_duration(duration)
final = CompositeVideoClip([video, txt_clip])
video_codec, audio_codec = get_video_codecs(output_abspath)
final.write_videofile(
output_abspath, codec=video_codec, audio_codec=audio_codec
)
finally:
if txt_clip:
txt_clip.close()
if final:
final.close()
if video:
video.close()
async def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
# Validate time range if both are provided
if (
input_data.start_time is not None
and input_data.end_time is not None
and input_data.end_time <= input_data.start_time
):
raise BlockExecutionError(
message=f"end_time ({input_data.end_time}) must be greater than start_time ({input_data.start_time})",
block_name=self.name,
block_id=str(self.id),
)
try:
# Store the input video locally
local_video_path = await self._store_input_video(
graph_exec_id, input_data.video_in, user_id
)
video_abspath = get_exec_file_path(graph_exec_id, local_video_path)
# Build output path
output_filename = MediaFileType(
f"{node_exec_id}_overlay_{os.path.basename(local_video_path)}"
)
output_abspath = get_exec_file_path(graph_exec_id, output_filename)
self._add_text_overlay(
video_abspath,
output_abspath,
input_data.text,
input_data.position,
input_data.start_time,
input_data.end_time,
input_data.font_size,
input_data.font_color,
input_data.bg_color,
)
# Return as data URI or path
video_out = await self._store_output_video(
graph_exec_id,
output_filename,
user_id,
input_data.output_return_type == "data_uri",
)
yield "video_out", video_out
except BlockExecutionError:
raise
except Exception as e:
raise BlockExecutionError(
message=f"Failed to add text overlay: {e}",
block_name=self.name,
block_id=str(self.id),
) from e

View File

@@ -1,7 +1,7 @@
import logging
import os
import pytest_asyncio
import pytest
from dotenv import load_dotenv
from backend.util.logging import configure_logging
@@ -19,7 +19,7 @@ if not os.getenv("PRISMA_DEBUG"):
prisma_logger.setLevel(logging.INFO)
@pytest_asyncio.fixture(scope="session", loop_scope="session")
@pytest.fixture(scope="session")
async def server():
from backend.util.test import SpinTestServer
@@ -27,7 +27,7 @@ async def server():
yield server
@pytest_asyncio.fixture(scope="session", loop_scope="session", autouse=True)
@pytest.fixture(scope="session", autouse=True)
async def graph_cleanup(server):
created_graph_ids = []
original_create_graph = server.agent_server.test_create_graph

View File

@@ -441,7 +441,6 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
static_output: bool = False,
block_type: BlockType = BlockType.STANDARD,
webhook_config: Optional[BlockWebhookConfig | BlockManualWebhookConfig] = None,
is_sensitive_action: bool = False,
):
"""
Initialize the block with the given schema.
@@ -474,8 +473,8 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
self.static_output = static_output
self.block_type = block_type
self.webhook_config = webhook_config
self.is_sensitive_action = is_sensitive_action
self.execution_stats: NodeExecutionStats = NodeExecutionStats()
self.is_sensitive_action: bool = False
if self.webhook_config:
if isinstance(self.webhook_config, BlockWebhookConfig):
@@ -623,7 +622,6 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
input_data: BlockInput,
*,
user_id: str,
node_id: str,
node_exec_id: str,
graph_exec_id: str,
graph_id: str,
@@ -650,11 +648,11 @@ class Block(ABC, Generic[BlockSchemaInputType, BlockSchemaOutputType]):
decision = await HITLReviewHelper.handle_review_decision(
input_data=input_data,
user_id=user_id,
node_id=node_id,
node_exec_id=node_exec_id,
graph_exec_id=graph_exec_id,
graph_id=graph_id,
graph_version=graph_version,
execution_context=execution_context,
block_name=self.name,
editable=True,
)

View File

@@ -36,12 +36,14 @@ from backend.blocks.replicate.replicate_block import ReplicateModelBlock
from backend.blocks.smart_decision_maker import SmartDecisionMakerBlock
from backend.blocks.talking_head import CreateTalkingAvatarVideoBlock
from backend.blocks.text_to_speech_block import UnrealTextToSpeechBlock
from backend.blocks.video.narration import VideoNarrationBlock
from backend.data.block import Block, BlockCost, BlockCostType
from backend.integrations.credentials_store import (
aiml_api_credentials,
anthropic_credentials,
apollo_credentials,
did_credentials,
elevenlabs_credentials,
enrichlayer_credentials,
groq_credentials,
ideogram_credentials,
@@ -640,4 +642,16 @@ BLOCK_COSTS: dict[Type[Block], list[BlockCost]] = {
},
),
],
VideoNarrationBlock: [
BlockCost(
cost_amount=5, # ElevenLabs TTS cost
cost_filter={
"credentials": {
"id": elevenlabs_credentials.id,
"provider": elevenlabs_credentials.provider,
"type": elevenlabs_credentials.type,
}
},
)
],
}

View File

@@ -6,10 +6,10 @@ Handles all database operations for pending human reviews.
import asyncio
import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Optional
from typing import Optional
from prisma.enums import ReviewStatus
from prisma.models import AgentNodeExecution, PendingHumanReview
from prisma.models import PendingHumanReview
from prisma.types import PendingHumanReviewUpdateInput
from pydantic import BaseModel
@@ -17,12 +17,8 @@ from backend.api.features.executions.review.model import (
PendingHumanReviewModel,
SafeJsonData,
)
from backend.data.execution import get_graph_execution_meta
from backend.util.json import SafeJson
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
@@ -36,125 +32,6 @@ class ReviewResult(BaseModel):
node_exec_id: str
def get_auto_approve_key(graph_exec_id: str, node_id: str) -> str:
"""Generate the special nodeExecId key for auto-approval records."""
return f"auto_approve_{graph_exec_id}_{node_id}"
async def check_approval(
node_exec_id: str,
graph_exec_id: str,
node_id: str,
user_id: str,
input_data: SafeJsonData | None = None,
) -> Optional[ReviewResult]:
"""
Check if there's an existing approval for this node execution.
Checks both:
1. Normal approval by node_exec_id (previous run of the same node execution)
2. Auto-approval by special key pattern "auto_approve_{graph_exec_id}_{node_id}"
Args:
node_exec_id: ID of the node execution
graph_exec_id: ID of the graph execution
node_id: ID of the node definition (not execution)
user_id: ID of the user (for data isolation)
input_data: Current input data (used for auto-approvals to avoid stale data)
Returns:
ReviewResult if approval found (either normal or auto), None otherwise
"""
auto_approve_key = get_auto_approve_key(graph_exec_id, node_id)
# Check for either normal approval or auto-approval in a single query
existing_review = await PendingHumanReview.prisma().find_first(
where={
"OR": [
{"nodeExecId": node_exec_id},
{"nodeExecId": auto_approve_key},
],
"status": ReviewStatus.APPROVED,
"userId": user_id,
},
)
if existing_review:
is_auto_approval = existing_review.nodeExecId == auto_approve_key
logger.info(
f"Found {'auto-' if is_auto_approval else ''}approval for node {node_id} "
f"(exec: {node_exec_id}) in execution {graph_exec_id}"
)
# For auto-approvals, use current input_data to avoid replaying stale payload
# For normal approvals, use the stored payload (which may have been edited)
return ReviewResult(
data=(
input_data
if is_auto_approval and input_data is not None
else existing_review.payload
),
status=ReviewStatus.APPROVED,
message=(
"Auto-approved (user approved all future actions for this node)"
if is_auto_approval
else existing_review.reviewMessage or ""
),
processed=True,
node_exec_id=existing_review.nodeExecId,
)
return None
async def create_auto_approval_record(
user_id: str,
graph_exec_id: str,
graph_id: str,
graph_version: int,
node_id: str,
payload: SafeJsonData,
) -> None:
"""
Create an auto-approval record for a node in this execution.
This is stored as a PendingHumanReview with a special nodeExecId pattern
and status=APPROVED, so future executions of the same node can skip review.
Raises:
ValueError: If the graph execution doesn't belong to the user
"""
# Validate that the graph execution belongs to this user (defense in depth)
graph_exec = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec:
raise ValueError(
f"Graph execution {graph_exec_id} not found or doesn't belong to user {user_id}"
)
auto_approve_key = get_auto_approve_key(graph_exec_id, node_id)
await PendingHumanReview.prisma().upsert(
where={"nodeExecId": auto_approve_key},
data={
"create": {
"nodeExecId": auto_approve_key,
"userId": user_id,
"graphExecId": graph_exec_id,
"graphId": graph_id,
"graphVersion": graph_version,
"payload": SafeJson(payload),
"instructions": "Auto-approval record",
"editable": False,
"status": ReviewStatus.APPROVED,
"processed": True,
"reviewedAt": datetime.now(timezone.utc),
},
"update": {}, # Already exists, no update needed
},
)
async def get_or_create_human_review(
user_id: str,
node_exec_id: str,
@@ -231,87 +108,6 @@ async def get_or_create_human_review(
)
async def get_pending_review_by_node_exec_id(
node_exec_id: str, user_id: str
) -> Optional["PendingHumanReviewModel"]:
"""
Get a pending review by its node execution ID.
Args:
node_exec_id: The node execution ID to look up
user_id: User ID for authorization (only returns if review belongs to this user)
Returns:
The pending review if found and belongs to user, None otherwise
"""
review = await PendingHumanReview.prisma().find_first(
where={
"nodeExecId": node_exec_id,
"userId": user_id,
"status": ReviewStatus.WAITING,
}
)
if not review:
return None
# Local import to avoid event loop conflicts in tests
from backend.data.execution import get_node_execution
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
return PendingHumanReviewModel.from_db(review, node_id=node_id)
async def get_pending_reviews_by_node_exec_ids(
node_exec_ids: list[str], user_id: str
) -> dict[str, "PendingHumanReviewModel"]:
"""
Get multiple pending reviews by their node execution IDs in a single batch query.
Args:
node_exec_ids: List of node execution IDs to look up
user_id: User ID for authorization (only returns reviews belonging to this user)
Returns:
Dictionary mapping node_exec_id -> PendingHumanReviewModel for found reviews
"""
if not node_exec_ids:
return {}
reviews = await PendingHumanReview.prisma().find_many(
where={
"nodeExecId": {"in": node_exec_ids},
"userId": user_id,
"status": ReviewStatus.WAITING,
}
)
if not reviews:
return {}
# Batch fetch all node executions to avoid N+1 queries
node_exec_ids_to_fetch = [review.nodeExecId for review in reviews]
node_execs = await AgentNodeExecution.prisma().find_many(
where={"id": {"in": node_exec_ids_to_fetch}},
include={"Node": True},
)
# Create mapping from node_exec_id to node_id
node_exec_id_to_node_id = {
node_exec.id: node_exec.agentNodeId for node_exec in node_execs
}
result = {}
for review in reviews:
node_id = node_exec_id_to_node_id.get(review.nodeExecId, review.nodeExecId)
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
review, node_id=node_id
)
return result
async def has_pending_reviews_for_graph_exec(graph_exec_id: str) -> bool:
"""
Check if a graph execution has any pending reviews.
@@ -341,11 +137,8 @@ async def get_pending_reviews_for_user(
page_size: Number of reviews per page
Returns:
List of pending review models with node_id included
List of pending review models
"""
# Local import to avoid event loop conflicts in tests
from backend.data.execution import get_node_execution
# Calculate offset for pagination
offset = (page - 1) * page_size
@@ -356,14 +149,7 @@ async def get_pending_reviews_for_user(
take=page_size,
)
# Fetch node_id for each review from NodeExecution
result = []
for review in reviews:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
result.append(PendingHumanReviewModel.from_db(review, node_id=node_id))
return result
return [PendingHumanReviewModel.from_db(review) for review in reviews]
async def get_pending_reviews_for_execution(
@@ -377,11 +163,8 @@ async def get_pending_reviews_for_execution(
user_id: User ID for security validation
Returns:
List of pending review models with node_id included
List of pending review models
"""
# Local import to avoid event loop conflicts in tests
from backend.data.execution import get_node_execution
reviews = await PendingHumanReview.prisma().find_many(
where={
"userId": user_id,
@@ -391,14 +174,7 @@ async def get_pending_reviews_for_execution(
order={"createdAt": "asc"},
)
# Fetch node_id for each review from NodeExecution
result = []
for review in reviews:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
result.append(PendingHumanReviewModel.from_db(review, node_id=node_id))
return result
return [PendingHumanReviewModel.from_db(review) for review in reviews]
async def process_all_reviews_for_execution(
@@ -468,19 +244,11 @@ async def process_all_reviews_for_execution(
# Note: Execution resumption is now handled at the API layer after ALL reviews
# for an execution are processed (both approved and rejected)
# Fetch node_id for each review and return as dict for easy access
# Local import to avoid event loop conflicts in tests
from backend.data.execution import get_node_execution
result = {}
for review in updated_reviews:
node_exec = await get_node_execution(review.nodeExecId)
node_id = node_exec.node_id if node_exec else review.nodeExecId
result[review.nodeExecId] = PendingHumanReviewModel.from_db(
review, node_id=node_id
)
return result
# Return as dict for easy access
return {
review.nodeExecId: PendingHumanReviewModel.from_db(review)
for review in updated_reviews
}
async def update_review_processed_status(node_exec_id: str, processed: bool) -> None:
@@ -488,44 +256,3 @@ async def update_review_processed_status(node_exec_id: str, processed: bool) ->
await PendingHumanReview.prisma().update(
where={"nodeExecId": node_exec_id}, data={"processed": processed}
)
async def cancel_pending_reviews_for_execution(graph_exec_id: str, user_id: str) -> int:
"""
Cancel all pending reviews for a graph execution (e.g., when execution is stopped).
Marks all WAITING reviews as REJECTED with a message indicating the execution was stopped.
Args:
graph_exec_id: The graph execution ID
user_id: User ID who owns the execution (for security validation)
Returns:
Number of reviews cancelled
Raises:
ValueError: If the graph execution doesn't belong to the user
"""
# Validate user ownership before cancelling reviews
graph_exec = await get_graph_execution_meta(
user_id=user_id, execution_id=graph_exec_id
)
if not graph_exec:
raise ValueError(
f"Graph execution {graph_exec_id} not found or doesn't belong to user {user_id}"
)
result = await PendingHumanReview.prisma().update_many(
where={
"graphExecId": graph_exec_id,
"userId": user_id,
"status": ReviewStatus.WAITING,
},
data={
"status": ReviewStatus.REJECTED,
"reviewMessage": "Execution was stopped by user",
"processed": True,
"reviewedAt": datetime.now(timezone.utc),
},
)
return result

View File

@@ -36,7 +36,7 @@ def sample_db_review():
return mock_review
@pytest.mark.asyncio(loop_scope="function")
@pytest.mark.asyncio
async def test_get_or_create_human_review_new(
mocker: pytest_mock.MockFixture,
sample_db_review,
@@ -46,8 +46,8 @@ async def test_get_or_create_human_review_new(
sample_db_review.status = ReviewStatus.WAITING
sample_db_review.processed = False
mock_prisma = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_prisma.return_value.upsert = AsyncMock(return_value=sample_db_review)
mock_upsert = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_upsert.return_value.upsert = AsyncMock(return_value=sample_db_review)
result = await get_or_create_human_review(
user_id="test-user-123",
@@ -64,7 +64,7 @@ async def test_get_or_create_human_review_new(
assert result is None
@pytest.mark.asyncio(loop_scope="function")
@pytest.mark.asyncio
async def test_get_or_create_human_review_approved(
mocker: pytest_mock.MockFixture,
sample_db_review,
@@ -75,8 +75,8 @@ async def test_get_or_create_human_review_approved(
sample_db_review.processed = False
sample_db_review.reviewMessage = "Looks good"
mock_prisma = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_prisma.return_value.upsert = AsyncMock(return_value=sample_db_review)
mock_upsert = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_upsert.return_value.upsert = AsyncMock(return_value=sample_db_review)
result = await get_or_create_human_review(
user_id="test-user-123",
@@ -96,7 +96,7 @@ async def test_get_or_create_human_review_approved(
assert result.message == "Looks good"
@pytest.mark.asyncio(loop_scope="function")
@pytest.mark.asyncio
async def test_has_pending_reviews_for_graph_exec_true(
mocker: pytest_mock.MockFixture,
):
@@ -109,7 +109,7 @@ async def test_has_pending_reviews_for_graph_exec_true(
assert result is True
@pytest.mark.asyncio(loop_scope="function")
@pytest.mark.asyncio
async def test_has_pending_reviews_for_graph_exec_false(
mocker: pytest_mock.MockFixture,
):
@@ -122,7 +122,7 @@ async def test_has_pending_reviews_for_graph_exec_false(
assert result is False
@pytest.mark.asyncio(loop_scope="function")
@pytest.mark.asyncio
async def test_get_pending_reviews_for_user(
mocker: pytest_mock.MockFixture,
sample_db_review,
@@ -131,19 +131,10 @@ async def test_get_pending_reviews_for_user(
mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review])
# Mock get_node_execution to return node with node_id (async function)
mock_node_exec = Mock()
mock_node_exec.node_id = "test_node_def_789"
mocker.patch(
"backend.data.execution.get_node_execution",
new=AsyncMock(return_value=mock_node_exec),
)
result = await get_pending_reviews_for_user("test_user", page=2, page_size=10)
assert len(result) == 1
assert result[0].node_exec_id == "test_node_123"
assert result[0].node_id == "test_node_def_789"
# Verify pagination parameters
call_args = mock_find_many.return_value.find_many.call_args
@@ -151,7 +142,7 @@ async def test_get_pending_reviews_for_user(
assert call_args.kwargs["take"] == 10
@pytest.mark.asyncio(loop_scope="function")
@pytest.mark.asyncio
async def test_get_pending_reviews_for_execution(
mocker: pytest_mock.MockFixture,
sample_db_review,
@@ -160,21 +151,12 @@ async def test_get_pending_reviews_for_execution(
mock_find_many = mocker.patch("backend.data.human_review.PendingHumanReview.prisma")
mock_find_many.return_value.find_many = AsyncMock(return_value=[sample_db_review])
# Mock get_node_execution to return node with node_id (async function)
mock_node_exec = Mock()
mock_node_exec.node_id = "test_node_def_789"
mocker.patch(
"backend.data.execution.get_node_execution",
new=AsyncMock(return_value=mock_node_exec),
)
result = await get_pending_reviews_for_execution(
"test_graph_exec_456", "test-user-123"
)
assert len(result) == 1
assert result[0].graph_exec_id == "test_graph_exec_456"
assert result[0].node_id == "test_node_def_789"
# Verify it filters by execution and user
call_args = mock_find_many.return_value.find_many.call_args
@@ -184,7 +166,7 @@ async def test_get_pending_reviews_for_execution(
assert where_clause["status"] == ReviewStatus.WAITING
@pytest.mark.asyncio(loop_scope="function")
@pytest.mark.asyncio
async def test_process_all_reviews_for_execution_success(
mocker: pytest_mock.MockFixture,
sample_db_review,
@@ -219,14 +201,6 @@ async def test_process_all_reviews_for_execution_success(
new=AsyncMock(return_value=[updated_review]),
)
# Mock get_node_execution to return node with node_id (async function)
mock_node_exec = Mock()
mock_node_exec.node_id = "test_node_def_789"
mocker.patch(
"backend.data.execution.get_node_execution",
new=AsyncMock(return_value=mock_node_exec),
)
result = await process_all_reviews_for_execution(
user_id="test-user-123",
review_decisions={
@@ -237,10 +211,9 @@ async def test_process_all_reviews_for_execution_success(
assert len(result) == 1
assert "test_node_123" in result
assert result["test_node_123"].status == ReviewStatus.APPROVED
assert result["test_node_123"].node_id == "test_node_def_789"
@pytest.mark.asyncio(loop_scope="function")
@pytest.mark.asyncio
async def test_process_all_reviews_for_execution_validation_errors(
mocker: pytest_mock.MockFixture,
):
@@ -260,7 +233,7 @@ async def test_process_all_reviews_for_execution_validation_errors(
)
@pytest.mark.asyncio(loop_scope="function")
@pytest.mark.asyncio
async def test_process_all_reviews_edit_permission_error(
mocker: pytest_mock.MockFixture,
sample_db_review,
@@ -286,7 +259,7 @@ async def test_process_all_reviews_edit_permission_error(
)
@pytest.mark.asyncio(loop_scope="function")
@pytest.mark.asyncio
async def test_process_all_reviews_mixed_approval_rejection(
mocker: pytest_mock.MockFixture,
sample_db_review,
@@ -356,14 +329,6 @@ async def test_process_all_reviews_mixed_approval_rejection(
new=AsyncMock(return_value=[approved_review, rejected_review]),
)
# Mock get_node_execution to return node with node_id (async function)
mock_node_exec = Mock()
mock_node_exec.node_id = "test_node_def_789"
mocker.patch(
"backend.data.execution.get_node_execution",
new=AsyncMock(return_value=mock_node_exec),
)
result = await process_all_reviews_for_execution(
user_id="test-user-123",
review_decisions={
@@ -375,5 +340,3 @@ async def test_process_all_reviews_mixed_approval_rejection(
assert len(result) == 2
assert "test_node_123" in result
assert "test_node_456" in result
assert result["test_node_123"].node_id == "test_node_def_789"
assert result["test_node_456"].node_id == "test_node_def_789"

View File

@@ -50,8 +50,6 @@ from backend.data.graph import (
validate_graph_execution_permissions,
)
from backend.data.human_review import (
cancel_pending_reviews_for_execution,
check_approval,
get_or_create_human_review,
has_pending_reviews_for_graph_exec,
update_review_processed_status,
@@ -192,8 +190,6 @@ class DatabaseManager(AppService):
get_user_notification_preference = _(get_user_notification_preference)
# Human In The Loop
cancel_pending_reviews_for_execution = _(cancel_pending_reviews_for_execution)
check_approval = _(check_approval)
get_or_create_human_review = _(get_or_create_human_review)
has_pending_reviews_for_graph_exec = _(has_pending_reviews_for_graph_exec)
update_review_processed_status = _(update_review_processed_status)
@@ -317,8 +313,6 @@ class DatabaseManagerAsyncClient(AppServiceClient):
set_execution_kv_data = d.set_execution_kv_data
# Human In The Loop
cancel_pending_reviews_for_execution = d.cancel_pending_reviews_for_execution
check_approval = d.check_approval
get_or_create_human_review = d.get_or_create_human_review
update_review_processed_status = d.update_review_processed_status

View File

@@ -10,7 +10,6 @@ from pydantic import BaseModel, JsonValue, ValidationError
from backend.data import execution as execution_db
from backend.data import graph as graph_db
from backend.data import human_review as human_review_db
from backend.data import onboarding as onboarding_db
from backend.data import user as user_db
from backend.data.block import (
@@ -750,27 +749,9 @@ async def stop_graph_execution(
if graph_exec.status in [
ExecutionStatus.QUEUED,
ExecutionStatus.INCOMPLETE,
ExecutionStatus.REVIEW,
]:
# If the graph is queued/incomplete/paused for review, terminate immediately
# No need to wait for executor since it's not actively running
# If graph is in REVIEW status, clean up pending reviews before terminating
if graph_exec.status == ExecutionStatus.REVIEW:
# Use human_review_db if Prisma connected, else database manager
review_db = (
human_review_db
if prisma.is_connected()
else get_database_manager_async_client()
)
# Mark all pending reviews as rejected/cancelled
cancelled_count = await review_db.cancel_pending_reviews_for_execution(
graph_exec_id, user_id
)
logger.info(
f"Cancelled {cancelled_count} pending review(s) for stopped execution {graph_exec_id}"
)
# If the graph is still on the queue, we can prevent them from being executed
# by setting the status to TERMINATED.
graph_exec.status = ExecutionStatus.TERMINATED
await asyncio.gather(
@@ -906,28 +887,9 @@ async def add_graph_execution(
nodes_to_skip=nodes_to_skip,
execution_context=execution_context,
)
logger.info(f"Queueing execution {graph_exec.id}")
# Update execution status to QUEUED BEFORE publishing to prevent race condition
# where two concurrent requests could both publish the same execution
updated_exec = await edb.update_graph_execution_stats(
graph_exec_id=graph_exec.id,
status=ExecutionStatus.QUEUED,
)
# Verify the status update succeeded (prevents duplicate queueing in race conditions)
# If another request already updated the status, this execution will not be QUEUED
if not updated_exec or updated_exec.status != ExecutionStatus.QUEUED:
logger.warning(
f"Skipping queue publish for execution {graph_exec.id} - "
f"status update failed or execution already queued by another request"
)
return graph_exec
graph_exec.status = ExecutionStatus.QUEUED
logger.info(f"Publishing execution {graph_exec.id} to execution queue")
# Publish to execution queue for executor to pick up
# This happens AFTER status update to ensure only one request publishes
exec_queue = await get_async_execution_queue()
await exec_queue.publish_message(
routing_key=GRAPH_EXECUTION_ROUTING_KEY,
@@ -935,6 +897,13 @@ async def add_graph_execution(
exchange=GRAPH_EXECUTION_EXCHANGE,
)
logger.info(f"Published execution {graph_exec.id} to RabbitMQ queue")
# Update execution status to QUEUED
graph_exec.status = ExecutionStatus.QUEUED
await edb.update_graph_execution_stats(
graph_exec_id=graph_exec.id,
status=graph_exec.status,
)
except BaseException as e:
err = str(e) or type(e).__name__
if not graph_exec:

View File

@@ -4,7 +4,6 @@ import pytest
from pytest_mock import MockerFixture
from backend.data.dynamic_fields import merge_execution_input, parse_execution_output
from backend.data.execution import ExecutionStatus
from backend.util.mock import MockObject
@@ -347,7 +346,6 @@ async def test_add_graph_execution_is_repeatable(mocker: MockerFixture):
mock_graph_exec = mocker.MagicMock(spec=GraphExecutionWithNodes)
mock_graph_exec.id = "execution-id-123"
mock_graph_exec.node_executions = [] # Add this to avoid AttributeError
mock_graph_exec.status = ExecutionStatus.QUEUED # Required for race condition check
mock_graph_exec.to_graph_execution_entry.return_value = mocker.MagicMock()
# Mock the queue and event bus
@@ -613,7 +611,6 @@ async def test_add_graph_execution_with_nodes_to_skip(mocker: MockerFixture):
mock_graph_exec = mocker.MagicMock(spec=GraphExecutionWithNodes)
mock_graph_exec.id = "execution-id-123"
mock_graph_exec.node_executions = []
mock_graph_exec.status = ExecutionStatus.QUEUED # Required for race condition check
# Track what's passed to to_graph_execution_entry
captured_kwargs = {}
@@ -673,232 +670,3 @@ async def test_add_graph_execution_with_nodes_to_skip(mocker: MockerFixture):
# Verify nodes_to_skip was passed to to_graph_execution_entry
assert "nodes_to_skip" in captured_kwargs
assert captured_kwargs["nodes_to_skip"] == nodes_to_skip
@pytest.mark.asyncio
async def test_stop_graph_execution_in_review_status_cancels_pending_reviews(
mocker: MockerFixture,
):
"""Test that stopping an execution in REVIEW status cancels pending reviews."""
from backend.data.execution import ExecutionStatus, GraphExecutionMeta
from backend.executor.utils import stop_graph_execution
user_id = "test-user"
graph_exec_id = "test-exec-123"
# Mock graph execution in REVIEW status
mock_graph_exec = mocker.MagicMock(spec=GraphExecutionMeta)
mock_graph_exec.id = graph_exec_id
mock_graph_exec.status = ExecutionStatus.REVIEW
# Mock dependencies
mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue")
mock_queue_client = mocker.AsyncMock()
mock_get_queue.return_value = mock_queue_client
mock_prisma = mocker.patch("backend.executor.utils.prisma")
mock_prisma.is_connected.return_value = True
mock_human_review_db = mocker.patch("backend.executor.utils.human_review_db")
mock_human_review_db.cancel_pending_reviews_for_execution = mocker.AsyncMock(
return_value=2 # 2 reviews cancelled
)
mock_execution_db = mocker.patch("backend.executor.utils.execution_db")
mock_execution_db.get_graph_execution_meta = mocker.AsyncMock(
return_value=mock_graph_exec
)
mock_execution_db.update_graph_execution_stats = mocker.AsyncMock()
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
)
mock_event_bus = mocker.MagicMock()
mock_event_bus.publish = mocker.AsyncMock()
mock_get_event_bus.return_value = mock_event_bus
mock_get_child_executions = mocker.patch(
"backend.executor.utils._get_child_executions"
)
mock_get_child_executions.return_value = [] # No children
# Call stop_graph_execution with timeout to allow status check
await stop_graph_execution(
user_id=user_id,
graph_exec_id=graph_exec_id,
wait_timeout=1.0, # Wait to allow status check
cascade=True,
)
# Verify pending reviews were cancelled
mock_human_review_db.cancel_pending_reviews_for_execution.assert_called_once_with(
graph_exec_id, user_id
)
# Verify execution status was updated to TERMINATED
mock_execution_db.update_graph_execution_stats.assert_called_once()
call_kwargs = mock_execution_db.update_graph_execution_stats.call_args[1]
assert call_kwargs["graph_exec_id"] == graph_exec_id
assert call_kwargs["status"] == ExecutionStatus.TERMINATED
@pytest.mark.asyncio
async def test_stop_graph_execution_with_database_manager_when_prisma_disconnected(
mocker: MockerFixture,
):
"""Test that stop uses database manager when Prisma is not connected."""
from backend.data.execution import ExecutionStatus, GraphExecutionMeta
from backend.executor.utils import stop_graph_execution
user_id = "test-user"
graph_exec_id = "test-exec-456"
# Mock graph execution in REVIEW status
mock_graph_exec = mocker.MagicMock(spec=GraphExecutionMeta)
mock_graph_exec.id = graph_exec_id
mock_graph_exec.status = ExecutionStatus.REVIEW
# Mock dependencies
mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue")
mock_queue_client = mocker.AsyncMock()
mock_get_queue.return_value = mock_queue_client
# Prisma is NOT connected
mock_prisma = mocker.patch("backend.executor.utils.prisma")
mock_prisma.is_connected.return_value = False
# Mock database manager client
mock_get_db_manager = mocker.patch(
"backend.executor.utils.get_database_manager_async_client"
)
mock_db_manager = mocker.AsyncMock()
mock_db_manager.get_graph_execution_meta = mocker.AsyncMock(
return_value=mock_graph_exec
)
mock_db_manager.cancel_pending_reviews_for_execution = mocker.AsyncMock(
return_value=3 # 3 reviews cancelled
)
mock_db_manager.update_graph_execution_stats = mocker.AsyncMock()
mock_get_db_manager.return_value = mock_db_manager
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
)
mock_event_bus = mocker.MagicMock()
mock_event_bus.publish = mocker.AsyncMock()
mock_get_event_bus.return_value = mock_event_bus
mock_get_child_executions = mocker.patch(
"backend.executor.utils._get_child_executions"
)
mock_get_child_executions.return_value = [] # No children
# Call stop_graph_execution with timeout
await stop_graph_execution(
user_id=user_id,
graph_exec_id=graph_exec_id,
wait_timeout=1.0,
cascade=True,
)
# Verify database manager was used for cancel_pending_reviews
mock_db_manager.cancel_pending_reviews_for_execution.assert_called_once_with(
graph_exec_id, user_id
)
# Verify execution status was updated via database manager
mock_db_manager.update_graph_execution_stats.assert_called_once()
@pytest.mark.asyncio
async def test_stop_graph_execution_cascades_to_child_with_reviews(
mocker: MockerFixture,
):
"""Test that stopping parent execution cascades to children and cancels their reviews."""
from backend.data.execution import ExecutionStatus, GraphExecutionMeta
from backend.executor.utils import stop_graph_execution
user_id = "test-user"
parent_exec_id = "parent-exec"
child_exec_id = "child-exec"
# Mock parent execution in RUNNING status
mock_parent_exec = mocker.MagicMock(spec=GraphExecutionMeta)
mock_parent_exec.id = parent_exec_id
mock_parent_exec.status = ExecutionStatus.RUNNING
# Mock child execution in REVIEW status
mock_child_exec = mocker.MagicMock(spec=GraphExecutionMeta)
mock_child_exec.id = child_exec_id
mock_child_exec.status = ExecutionStatus.REVIEW
# Mock dependencies
mock_get_queue = mocker.patch("backend.executor.utils.get_async_execution_queue")
mock_queue_client = mocker.AsyncMock()
mock_get_queue.return_value = mock_queue_client
mock_prisma = mocker.patch("backend.executor.utils.prisma")
mock_prisma.is_connected.return_value = True
mock_human_review_db = mocker.patch("backend.executor.utils.human_review_db")
mock_human_review_db.cancel_pending_reviews_for_execution = mocker.AsyncMock(
return_value=1 # 1 child review cancelled
)
# Mock execution_db to return different status based on which execution is queried
mock_execution_db = mocker.patch("backend.executor.utils.execution_db")
# Track call count to simulate status transition
call_count = {"count": 0}
async def get_exec_meta_side_effect(execution_id, user_id):
call_count["count"] += 1
if execution_id == parent_exec_id:
# After a few calls (child processing happens), transition parent to TERMINATED
# This simulates the executor service processing the stop request
if call_count["count"] > 3:
mock_parent_exec.status = ExecutionStatus.TERMINATED
return mock_parent_exec
elif execution_id == child_exec_id:
return mock_child_exec
return None
mock_execution_db.get_graph_execution_meta = mocker.AsyncMock(
side_effect=get_exec_meta_side_effect
)
mock_execution_db.update_graph_execution_stats = mocker.AsyncMock()
mock_get_event_bus = mocker.patch(
"backend.executor.utils.get_async_execution_event_bus"
)
mock_event_bus = mocker.MagicMock()
mock_event_bus.publish = mocker.AsyncMock()
mock_get_event_bus.return_value = mock_event_bus
# Mock _get_child_executions to return the child
mock_get_child_executions = mocker.patch(
"backend.executor.utils._get_child_executions"
)
def get_children_side_effect(parent_id):
if parent_id == parent_exec_id:
return [mock_child_exec]
return []
mock_get_child_executions.side_effect = get_children_side_effect
# Call stop_graph_execution on parent with cascade=True
await stop_graph_execution(
user_id=user_id,
graph_exec_id=parent_exec_id,
wait_timeout=1.0,
cascade=True,
)
# Verify child reviews were cancelled
mock_human_review_db.cancel_pending_reviews_for_execution.assert_called_once_with(
child_exec_id, user_id
)
# Verify both parent and child status updates
assert mock_execution_db.update_graph_execution_stats.call_count >= 1

View File

@@ -224,6 +224,14 @@ openweathermap_credentials = APIKeyCredentials(
expires_at=None,
)
elevenlabs_credentials = APIKeyCredentials(
id="f4a8b6c2-3d1e-4f5a-9b8c-7d6e5f4a3b2c",
provider="elevenlabs",
api_key=SecretStr(settings.secrets.elevenlabs_api_key),
title="Use Credits for ElevenLabs",
expires_at=None,
)
DEFAULT_CREDENTIALS = [
ollama_credentials,
revid_credentials,
@@ -252,6 +260,7 @@ DEFAULT_CREDENTIALS = [
v0_credentials,
webshare_proxy_credentials,
openweathermap_credentials,
elevenlabs_credentials,
]
SYSTEM_CREDENTIAL_IDS = {cred.id for cred in DEFAULT_CREDENTIALS}
@@ -366,6 +375,8 @@ class IntegrationCredentialsStore:
all_credentials.append(webshare_proxy_credentials)
if settings.secrets.openweathermap_api_key:
all_credentials.append(openweathermap_credentials)
if settings.secrets.elevenlabs_api_key:
all_credentials.append(elevenlabs_credentials)
return all_credentials
async def get_creds_by_id(

View File

@@ -18,6 +18,7 @@ class ProviderName(str, Enum):
DISCORD = "discord"
D_ID = "d_id"
E2B = "e2b"
ELEVENLABS = "elevenlabs"
FAL = "fal"
GITHUB = "github"
GOOGLE = "google"

View File

@@ -350,19 +350,6 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="Whether to mark failed scans as clean or not",
)
agentgenerator_host: str = Field(
default="",
description="The host for the Agent Generator service (empty to use built-in)",
)
agentgenerator_port: int = Field(
default=8000,
description="The port for the Agent Generator service",
)
agentgenerator_timeout: int = Field(
default=120,
description="The timeout in seconds for Agent Generator service requests",
)
enable_example_blocks: bool = Field(
default=False,
description="Whether to enable example blocks in production",
@@ -643,6 +630,7 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
e2b_api_key: str = Field(default="", description="E2B API key")
nvidia_api_key: str = Field(default="", description="Nvidia API key")
mem0_api_key: str = Field(default="", description="Mem0 API key")
elevenlabs_api_key: str = Field(default="", description="ElevenLabs API key")
linear_client_id: str = Field(default="", description="Linear client ID")
linear_client_secret: str = Field(default="", description="Linear client secret")

View File

@@ -1,4 +1,3 @@
import asyncio
import inspect
import logging
import time
@@ -59,11 +58,6 @@ class SpinTestServer:
self.db_api.__exit__(exc_type, exc_val, exc_tb)
self.notif_manager.__exit__(exc_type, exc_val, exc_tb)
# Give services time to fully shut down
# This prevents event loop issues where services haven't fully cleaned up
# before the next test starts
await asyncio.sleep(0.5)
def setup_dependency_overrides(self):
# Override get_user_id for testing
self.agent_server.set_test_dependency_overrides(

View File

@@ -1,7 +0,0 @@
-- Remove NodeExecution foreign key from PendingHumanReview
-- The nodeExecId column remains as the primary key, but we remove the FK constraint
-- to AgentNodeExecution since PendingHumanReview records can persist after node
-- execution records are deleted.
-- Drop foreign key constraint that linked PendingHumanReview.nodeExecId to AgentNodeExecution.id
ALTER TABLE "PendingHumanReview" DROP CONSTRAINT IF EXISTS "PendingHumanReview_nodeExecId_fkey";

View File

@@ -1169,6 +1169,29 @@ attrs = ">=21.3.0"
e2b = ">=1.5.4,<2.0.0"
httpx = ">=0.20.0,<1.0.0"
[[package]]
name = "elevenlabs"
version = "1.59.0"
description = ""
optional = false
python-versions = "<4.0,>=3.8"
groups = ["main"]
files = [
{file = "elevenlabs-1.59.0-py3-none-any.whl", hash = "sha256:468145db81a0bc867708b4a8619699f75583e9481b395ec1339d0b443da771ed"},
{file = "elevenlabs-1.59.0.tar.gz", hash = "sha256:16e735bd594e86d415dd445d249c8cc28b09996cfd627fbc10102c0a84698859"},
]
[package.dependencies]
httpx = ">=0.21.2"
pydantic = ">=1.9.2"
pydantic-core = ">=2.18.2,<3.0.0"
requests = ">=2.20"
typing_extensions = ">=4.0.0"
websockets = ">=11.0"
[package.extras]
pyaudio = ["pyaudio (>=0.2.14)"]
[[package]]
name = "email-validator"
version = "2.2.0"
@@ -7361,6 +7384,28 @@ files = [
defusedxml = ">=0.7.1,<0.8.0"
requests = "*"
[[package]]
name = "yt-dlp"
version = "2025.12.8"
description = "A feature-rich command-line audio/video downloader"
optional = false
python-versions = ">=3.10"
groups = ["main"]
files = [
{file = "yt_dlp-2025.12.8-py3-none-any.whl", hash = "sha256:36e2584342e409cfbfa0b5e61448a1c5189e345cf4564294456ee509e7d3e065"},
{file = "yt_dlp-2025.12.8.tar.gz", hash = "sha256:b773c81bb6b71cb2c111cfb859f453c7a71cf2ef44eff234ff155877184c3e4f"},
]
[package.extras]
build = ["build", "hatchling (>=1.27.0)", "pip", "setuptools (>=71.0.2)", "wheel"]
curl-cffi = ["curl-cffi (>=0.5.10,<0.6.dev0 || >=0.10.dev0,<0.14) ; implementation_name == \"cpython\""]
default = ["brotli ; implementation_name == \"cpython\"", "brotlicffi ; implementation_name != \"cpython\"", "certifi", "mutagen", "pycryptodomex", "requests (>=2.32.2,<3)", "urllib3 (>=2.0.2,<3)", "websockets (>=13.0)", "yt-dlp-ejs (==0.3.2)"]
dev = ["autopep8 (>=2.0,<3.0)", "pre-commit", "pytest (>=8.1,<9.0)", "pytest-rerunfailures (>=14.0,<15.0)", "ruff (>=0.14.0,<0.15.0)"]
pyinstaller = ["pyinstaller (>=6.17.0)"]
secretstorage = ["cffi", "secretstorage"]
static-analysis = ["autopep8 (>=2.0,<3.0)", "ruff (>=0.14.0,<0.15.0)"]
test = ["pytest (>=8.1,<9.0)", "pytest-rerunfailures (>=14.0,<15.0)"]
[[package]]
name = "zerobouncesdk"
version = "1.1.2"
@@ -7512,4 +7557,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.14"
content-hash = "18b92e09596298c82432e4d0a85cb6d80a40b4229bee0a0c15f0529fd6cb21a4"
content-hash = "a82dc5db159eb332ef6ae27d392dc1dfdeb2b70ef3595482829e51fdb9e3ffe2"

View File

@@ -20,6 +20,7 @@ click = "^8.2.0"
cryptography = "^45.0"
discord-py = "^2.5.2"
e2b-code-interpreter = "^1.5.2"
elevenlabs = "^1.50.0"
fastapi = "^0.116.1"
feedparser = "^6.0.11"
flake8 = "^7.3.0"
@@ -71,6 +72,7 @@ tweepy = "^4.16.0"
uvicorn = { extras = ["standard"], version = "^0.35.0" }
websockets = "^15.0"
youtube-transcript-api = "^1.2.1"
yt-dlp = "2025.12.08"
zerobouncesdk = "^1.1.2"
# NOTE: please insert new dependencies in their alphabetical location
pytest-snapshot = "^0.9.0"

View File

@@ -517,6 +517,8 @@ model AgentNodeExecution {
stats Json?
PendingHumanReview PendingHumanReview?
@@index([agentGraphExecutionId, agentNodeId, executionStatus])
@@index([agentNodeId, executionStatus])
@@index([addedTime, queuedTime])
@@ -565,7 +567,6 @@ enum ReviewStatus {
}
// Pending human reviews for Human-in-the-loop blocks
// Also stores auto-approval records with special nodeExecId patterns (e.g., "auto_approve_{graph_exec_id}_{node_id}")
model PendingHumanReview {
nodeExecId String @id
userId String
@@ -584,6 +585,7 @@ model PendingHumanReview {
reviewedAt DateTime?
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
NodeExecution AgentNodeExecution @relation(fields: [nodeExecId], references: [id], onDelete: Cascade)
GraphExecution AgentGraphExecution @relation(fields: [graphExecId], references: [id], onDelete: Cascade)
@@unique([nodeExecId]) // One pending review per node execution

View File

@@ -1 +0,0 @@
"""Tests for agent generator module."""

View File

@@ -1,273 +0,0 @@
"""
Tests for the Agent Generator core module.
This test suite verifies that the core functions correctly delegate to
the external Agent Generator service.
"""
from unittest.mock import AsyncMock, patch
import pytest
from backend.api.features.chat.tools.agent_generator import core
from backend.api.features.chat.tools.agent_generator.core import (
AgentGeneratorNotConfiguredError,
)
class TestServiceNotConfigured:
"""Test that functions raise AgentGeneratorNotConfiguredError when service is not configured."""
@pytest.mark.asyncio
async def test_decompose_goal_raises_when_not_configured(self):
"""Test that decompose_goal raises error when service not configured."""
with patch.object(core, "is_external_service_configured", return_value=False):
with pytest.raises(AgentGeneratorNotConfiguredError):
await core.decompose_goal("Build a chatbot")
@pytest.mark.asyncio
async def test_generate_agent_raises_when_not_configured(self):
"""Test that generate_agent raises error when service not configured."""
with patch.object(core, "is_external_service_configured", return_value=False):
with pytest.raises(AgentGeneratorNotConfiguredError):
await core.generate_agent({"steps": []})
@pytest.mark.asyncio
async def test_generate_agent_patch_raises_when_not_configured(self):
"""Test that generate_agent_patch raises error when service not configured."""
with patch.object(core, "is_external_service_configured", return_value=False):
with pytest.raises(AgentGeneratorNotConfiguredError):
await core.generate_agent_patch("Add a node", {"nodes": []})
class TestDecomposeGoal:
"""Test decompose_goal function service delegation."""
@pytest.mark.asyncio
async def test_calls_external_service(self):
"""Test that decompose_goal calls the external service."""
expected_result = {"type": "instructions", "steps": ["Step 1"]}
with patch.object(
core, "is_external_service_configured", return_value=True
), patch.object(
core, "decompose_goal_external", new_callable=AsyncMock
) as mock_external:
mock_external.return_value = expected_result
result = await core.decompose_goal("Build a chatbot")
mock_external.assert_called_once_with("Build a chatbot", "")
assert result == expected_result
@pytest.mark.asyncio
async def test_passes_context_to_external_service(self):
"""Test that decompose_goal passes context to external service."""
expected_result = {"type": "instructions", "steps": ["Step 1"]}
with patch.object(
core, "is_external_service_configured", return_value=True
), patch.object(
core, "decompose_goal_external", new_callable=AsyncMock
) as mock_external:
mock_external.return_value = expected_result
await core.decompose_goal("Build a chatbot", "Use Python")
mock_external.assert_called_once_with("Build a chatbot", "Use Python")
@pytest.mark.asyncio
async def test_returns_none_on_service_failure(self):
"""Test that decompose_goal returns None when external service fails."""
with patch.object(
core, "is_external_service_configured", return_value=True
), patch.object(
core, "decompose_goal_external", new_callable=AsyncMock
) as mock_external:
mock_external.return_value = None
result = await core.decompose_goal("Build a chatbot")
assert result is None
class TestGenerateAgent:
"""Test generate_agent function service delegation."""
@pytest.mark.asyncio
async def test_calls_external_service(self):
"""Test that generate_agent calls the external service."""
expected_result = {"name": "Test Agent", "nodes": [], "links": []}
with patch.object(
core, "is_external_service_configured", return_value=True
), patch.object(
core, "generate_agent_external", new_callable=AsyncMock
) as mock_external:
mock_external.return_value = expected_result
instructions = {"type": "instructions", "steps": ["Step 1"]}
result = await core.generate_agent(instructions)
mock_external.assert_called_once_with(instructions)
# Result should have id, version, is_active added if not present
assert result is not None
assert result["name"] == "Test Agent"
assert "id" in result
assert result["version"] == 1
assert result["is_active"] is True
@pytest.mark.asyncio
async def test_preserves_existing_id_and_version(self):
"""Test that external service result preserves existing id and version."""
expected_result = {
"id": "existing-id",
"version": 3,
"is_active": False,
"name": "Test Agent",
}
with patch.object(
core, "is_external_service_configured", return_value=True
), patch.object(
core, "generate_agent_external", new_callable=AsyncMock
) as mock_external:
mock_external.return_value = expected_result.copy()
result = await core.generate_agent({"steps": []})
assert result is not None
assert result["id"] == "existing-id"
assert result["version"] == 3
assert result["is_active"] is False
@pytest.mark.asyncio
async def test_returns_none_when_external_service_fails(self):
"""Test that generate_agent returns None when external service fails."""
with patch.object(
core, "is_external_service_configured", return_value=True
), patch.object(
core, "generate_agent_external", new_callable=AsyncMock
) as mock_external:
mock_external.return_value = None
result = await core.generate_agent({"steps": []})
assert result is None
class TestGenerateAgentPatch:
"""Test generate_agent_patch function service delegation."""
@pytest.mark.asyncio
async def test_calls_external_service(self):
"""Test that generate_agent_patch calls the external service."""
expected_result = {"name": "Updated Agent", "nodes": [], "links": []}
with patch.object(
core, "is_external_service_configured", return_value=True
), patch.object(
core, "generate_agent_patch_external", new_callable=AsyncMock
) as mock_external:
mock_external.return_value = expected_result
current_agent = {"nodes": [], "links": []}
result = await core.generate_agent_patch("Add a node", current_agent)
mock_external.assert_called_once_with("Add a node", current_agent)
assert result == expected_result
@pytest.mark.asyncio
async def test_returns_clarifying_questions(self):
"""Test that generate_agent_patch returns clarifying questions."""
expected_result = {
"type": "clarifying_questions",
"questions": [{"question": "What type of node?"}],
}
with patch.object(
core, "is_external_service_configured", return_value=True
), patch.object(
core, "generate_agent_patch_external", new_callable=AsyncMock
) as mock_external:
mock_external.return_value = expected_result
result = await core.generate_agent_patch("Add a node", {"nodes": []})
assert result == expected_result
@pytest.mark.asyncio
async def test_returns_none_when_external_service_fails(self):
"""Test that generate_agent_patch returns None when service fails."""
with patch.object(
core, "is_external_service_configured", return_value=True
), patch.object(
core, "generate_agent_patch_external", new_callable=AsyncMock
) as mock_external:
mock_external.return_value = None
result = await core.generate_agent_patch("Add a node", {"nodes": []})
assert result is None
class TestJsonToGraph:
"""Test json_to_graph function."""
def test_converts_agent_json_to_graph(self):
"""Test conversion of agent JSON to Graph model."""
agent_json = {
"id": "test-id",
"version": 2,
"is_active": True,
"name": "Test Agent",
"description": "A test agent",
"nodes": [
{
"id": "node1",
"block_id": "block1",
"input_default": {"key": "value"},
"metadata": {"x": 100},
}
],
"links": [
{
"id": "link1",
"source_id": "node1",
"sink_id": "output",
"source_name": "result",
"sink_name": "input",
"is_static": False,
}
],
}
graph = core.json_to_graph(agent_json)
assert graph.id == "test-id"
assert graph.version == 2
assert graph.is_active is True
assert graph.name == "Test Agent"
assert graph.description == "A test agent"
assert len(graph.nodes) == 1
assert graph.nodes[0].id == "node1"
assert graph.nodes[0].block_id == "block1"
assert len(graph.links) == 1
assert graph.links[0].source_id == "node1"
def test_generates_ids_if_missing(self):
"""Test that missing IDs are generated."""
agent_json = {
"name": "Test Agent",
"nodes": [{"block_id": "block1"}],
"links": [],
}
graph = core.json_to_graph(agent_json)
assert graph.id is not None
assert graph.nodes[0].id is not None
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -1,422 +0,0 @@
"""
Tests for the Agent Generator external service client.
This test suite verifies the external Agent Generator service integration,
including service detection, API calls, and error handling.
"""
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
import pytest
from backend.api.features.chat.tools.agent_generator import service
class TestServiceConfiguration:
"""Test service configuration detection."""
def setup_method(self):
"""Reset settings singleton before each test."""
service._settings = None
service._client = None
def test_external_service_not_configured_when_host_empty(self):
"""Test that external service is not configured when host is empty."""
mock_settings = MagicMock()
mock_settings.config.agentgenerator_host = ""
with patch.object(service, "_get_settings", return_value=mock_settings):
assert service.is_external_service_configured() is False
def test_external_service_configured_when_host_set(self):
"""Test that external service is configured when host is set."""
mock_settings = MagicMock()
mock_settings.config.agentgenerator_host = "agent-generator.local"
with patch.object(service, "_get_settings", return_value=mock_settings):
assert service.is_external_service_configured() is True
def test_get_base_url(self):
"""Test base URL construction."""
mock_settings = MagicMock()
mock_settings.config.agentgenerator_host = "agent-generator.local"
mock_settings.config.agentgenerator_port = 8000
with patch.object(service, "_get_settings", return_value=mock_settings):
url = service._get_base_url()
assert url == "http://agent-generator.local:8000"
class TestDecomposeGoalExternal:
"""Test decompose_goal_external function."""
def setup_method(self):
"""Reset client singleton before each test."""
service._settings = None
service._client = None
@pytest.mark.asyncio
async def test_decompose_goal_returns_instructions(self):
"""Test successful decomposition returning instructions."""
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
"type": "instructions",
"steps": ["Step 1", "Step 2"],
}
mock_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.decompose_goal_external("Build a chatbot")
assert result == {"type": "instructions", "steps": ["Step 1", "Step 2"]}
mock_client.post.assert_called_once_with(
"/api/decompose-description", json={"description": "Build a chatbot"}
)
@pytest.mark.asyncio
async def test_decompose_goal_returns_clarifying_questions(self):
"""Test decomposition returning clarifying questions."""
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
"type": "clarifying_questions",
"questions": ["What platform?", "What language?"],
}
mock_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.decompose_goal_external("Build something")
assert result == {
"type": "clarifying_questions",
"questions": ["What platform?", "What language?"],
}
@pytest.mark.asyncio
async def test_decompose_goal_with_context(self):
"""Test decomposition with additional context."""
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
"type": "instructions",
"steps": ["Step 1"],
}
mock_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with patch.object(service, "_get_client", return_value=mock_client):
await service.decompose_goal_external(
"Build a chatbot", context="Use Python"
)
mock_client.post.assert_called_once_with(
"/api/decompose-description",
json={"description": "Build a chatbot", "user_instruction": "Use Python"},
)
@pytest.mark.asyncio
async def test_decompose_goal_returns_unachievable_goal(self):
"""Test decomposition returning unachievable goal response."""
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
"type": "unachievable_goal",
"reason": "Cannot do X",
"suggested_goal": "Try Y instead",
}
mock_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.decompose_goal_external("Do something impossible")
assert result == {
"type": "unachievable_goal",
"reason": "Cannot do X",
"suggested_goal": "Try Y instead",
}
@pytest.mark.asyncio
async def test_decompose_goal_handles_http_error(self):
"""Test decomposition handles HTTP errors gracefully."""
mock_client = AsyncMock()
mock_client.post.side_effect = httpx.HTTPStatusError(
"Server error", request=MagicMock(), response=MagicMock()
)
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.decompose_goal_external("Build a chatbot")
assert result is None
@pytest.mark.asyncio
async def test_decompose_goal_handles_request_error(self):
"""Test decomposition handles request errors gracefully."""
mock_client = AsyncMock()
mock_client.post.side_effect = httpx.RequestError("Connection failed")
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.decompose_goal_external("Build a chatbot")
assert result is None
@pytest.mark.asyncio
async def test_decompose_goal_handles_service_error(self):
"""Test decomposition handles service returning error."""
mock_response = MagicMock()
mock_response.json.return_value = {
"success": False,
"error": "Internal error",
}
mock_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.decompose_goal_external("Build a chatbot")
assert result is None
class TestGenerateAgentExternal:
"""Test generate_agent_external function."""
def setup_method(self):
"""Reset client singleton before each test."""
service._settings = None
service._client = None
@pytest.mark.asyncio
async def test_generate_agent_success(self):
"""Test successful agent generation."""
agent_json = {
"name": "Test Agent",
"nodes": [],
"links": [],
}
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
"agent_json": agent_json,
}
mock_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
instructions = {"type": "instructions", "steps": ["Step 1"]}
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.generate_agent_external(instructions)
assert result == agent_json
mock_client.post.assert_called_once_with(
"/api/generate-agent", json={"instructions": instructions}
)
@pytest.mark.asyncio
async def test_generate_agent_handles_error(self):
"""Test agent generation handles errors gracefully."""
mock_client = AsyncMock()
mock_client.post.side_effect = httpx.RequestError("Connection failed")
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.generate_agent_external({"steps": []})
assert result is None
class TestGenerateAgentPatchExternal:
"""Test generate_agent_patch_external function."""
def setup_method(self):
"""Reset client singleton before each test."""
service._settings = None
service._client = None
@pytest.mark.asyncio
async def test_generate_patch_returns_updated_agent(self):
"""Test successful patch generation returning updated agent."""
updated_agent = {
"name": "Updated Agent",
"nodes": [{"id": "1", "block_id": "test"}],
"links": [],
}
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
"agent_json": updated_agent,
}
mock_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
current_agent = {"name": "Old Agent", "nodes": [], "links": []}
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.generate_agent_patch_external(
"Add a new node", current_agent
)
assert result == updated_agent
mock_client.post.assert_called_once_with(
"/api/update-agent",
json={
"update_request": "Add a new node",
"current_agent_json": current_agent,
},
)
@pytest.mark.asyncio
async def test_generate_patch_returns_clarifying_questions(self):
"""Test patch generation returning clarifying questions."""
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
"type": "clarifying_questions",
"questions": ["What type of node?"],
}
mock_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.post.return_value = mock_response
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.generate_agent_patch_external(
"Add something", {"nodes": []}
)
assert result == {
"type": "clarifying_questions",
"questions": ["What type of node?"],
}
class TestHealthCheck:
"""Test health_check function."""
def setup_method(self):
"""Reset singletons before each test."""
service._settings = None
service._client = None
@pytest.mark.asyncio
async def test_health_check_returns_false_when_not_configured(self):
"""Test health check returns False when service not configured."""
with patch.object(
service, "is_external_service_configured", return_value=False
):
result = await service.health_check()
assert result is False
@pytest.mark.asyncio
async def test_health_check_returns_true_when_healthy(self):
"""Test health check returns True when service is healthy."""
mock_response = MagicMock()
mock_response.json.return_value = {
"status": "healthy",
"blocks_loaded": True,
}
mock_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.get.return_value = mock_response
with patch.object(service, "is_external_service_configured", return_value=True):
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.health_check()
assert result is True
mock_client.get.assert_called_once_with("/health")
@pytest.mark.asyncio
async def test_health_check_returns_false_when_not_healthy(self):
"""Test health check returns False when service is not healthy."""
mock_response = MagicMock()
mock_response.json.return_value = {
"status": "unhealthy",
"blocks_loaded": False,
}
mock_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.get.return_value = mock_response
with patch.object(service, "is_external_service_configured", return_value=True):
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.health_check()
assert result is False
@pytest.mark.asyncio
async def test_health_check_returns_false_on_error(self):
"""Test health check returns False on connection error."""
mock_client = AsyncMock()
mock_client.get.side_effect = httpx.RequestError("Connection failed")
with patch.object(service, "is_external_service_configured", return_value=True):
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.health_check()
assert result is False
class TestGetBlocksExternal:
"""Test get_blocks_external function."""
def setup_method(self):
"""Reset client singleton before each test."""
service._settings = None
service._client = None
@pytest.mark.asyncio
async def test_get_blocks_success(self):
"""Test successful blocks retrieval."""
blocks = [
{"id": "block1", "name": "Block 1"},
{"id": "block2", "name": "Block 2"},
]
mock_response = MagicMock()
mock_response.json.return_value = {
"success": True,
"blocks": blocks,
}
mock_response.raise_for_status = MagicMock()
mock_client = AsyncMock()
mock_client.get.return_value = mock_response
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.get_blocks_external()
assert result == blocks
mock_client.get.assert_called_once_with("/api/blocks")
@pytest.mark.asyncio
async def test_get_blocks_handles_error(self):
"""Test blocks retrieval handles errors gracefully."""
mock_client = AsyncMock()
mock_client.get.side_effect = httpx.RequestError("Connection failed")
with patch.object(service, "_get_client", return_value=mock_client):
result = await service.get_blocks_external()
assert result is None
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -38,12 +38,8 @@ export const AgentOutputs = ({ flowID }: { flowID: string | null }) => {
return outputNodes
.map((node) => {
const executionResults = node.data.nodeExecutionResults || [];
const latestResult =
executionResults.length > 0
? executionResults[executionResults.length - 1]
: undefined;
const outputData = latestResult?.output_data?.output;
const executionResult = node.data.nodeExecutionResult;
const outputData = executionResult?.output_data?.output;
const renderer = globalRegistry.getRenderer(outputData);

View File

@@ -153,9 +153,6 @@ export const useRunInputDialog = ({
Object.entries(credentialValues).filter(([_, cred]) => cred && cred.id),
);
useNodeStore.getState().clearAllNodeExecutionResults();
useNodeStore.getState().cleanNodesStatuses();
await executeGraph({
graphId: flowID ?? "",
graphVersion: flowVersion || null,

View File

@@ -86,6 +86,7 @@ export function FloatingSafeModeToggle({
const {
currentHITLSafeMode,
showHITLToggle,
isHITLStateUndetermined,
handleHITLToggle,
currentSensitiveActionSafeMode,
showSensitiveActionToggle,
@@ -98,9 +99,16 @@ export function FloatingSafeModeToggle({
return null;
}
const showHITL = showHITLToggle && !isHITLStateUndetermined;
const showSensitive = showSensitiveActionToggle;
if (!showHITL && !showSensitive) {
return null;
}
return (
<div className={cn("fixed z-50 flex flex-col gap-2", className)}>
{showHITLToggle && (
{showHITL && (
<SafeModeButton
isEnabled={currentHITLSafeMode}
label="Human in the loop block approval"
@@ -111,7 +119,7 @@ export function FloatingSafeModeToggle({
fullWidth={fullWidth}
/>
)}
{showSensitiveActionToggle && (
{showSensitive && (
<SafeModeButton
isEnabled={currentSensitiveActionSafeMode}
label="Sensitive actions blocks approval"

View File

@@ -34,7 +34,7 @@ export type CustomNodeData = {
uiType: BlockUIType;
block_id: string;
status?: AgentExecutionStatus;
nodeExecutionResults?: NodeExecutionResult[];
nodeExecutionResult?: NodeExecutionResult;
staticOutput?: boolean;
// TODO : We need better type safety for the following backend fields.
costs: BlockCost[];
@@ -75,11 +75,7 @@ export const CustomNode: React.FC<NodeProps<CustomNode>> = React.memo(
(value) => value !== null && value !== undefined && value !== "",
);
const latestResult =
data.nodeExecutionResults && data.nodeExecutionResults.length > 0
? data.nodeExecutionResults[data.nodeExecutionResults.length - 1]
: undefined;
const outputData = latestResult?.output_data;
const outputData = data.nodeExecutionResult?.output_data;
const hasOutputError =
typeof outputData === "object" &&
outputData !== null &&

View File

@@ -14,15 +14,10 @@ import { useNodeOutput } from "./useNodeOutput";
import { ViewMoreData } from "./components/ViewMoreData";
export const NodeDataRenderer = ({ nodeId }: { nodeId: string }) => {
const {
latestOutputData,
copiedKey,
handleCopy,
executionResultId,
latestInputData,
} = useNodeOutput(nodeId);
const { outputData, copiedKey, handleCopy, executionResultId, inputData } =
useNodeOutput(nodeId);
if (Object.keys(latestOutputData).length === 0) {
if (Object.keys(outputData).length === 0) {
return null;
}
@@ -46,19 +41,18 @@ export const NodeDataRenderer = ({ nodeId }: { nodeId: string }) => {
<div className="space-y-2">
<Text variant="small-medium">Input</Text>
<ContentRenderer value={latestInputData} shortContent={false} />
<ContentRenderer value={inputData} shortContent={false} />
<div className="mt-1 flex justify-end gap-1">
<NodeDataViewer
data={inputData}
pinName="Input"
nodeId={nodeId}
execId={executionResultId}
dataType="input"
/>
<Button
variant="secondary"
size="small"
onClick={() => handleCopy("input", latestInputData)}
onClick={() => handleCopy("input", inputData)}
className={cn(
"h-fit min-w-0 gap-1.5 border border-zinc-200 p-2 text-black hover:text-slate-900",
copiedKey === "input" &&
@@ -74,72 +68,70 @@ export const NodeDataRenderer = ({ nodeId }: { nodeId: string }) => {
</div>
</div>
{Object.entries(latestOutputData)
{Object.entries(outputData)
.slice(0, 2)
.map(([key, value]) => {
return (
<div key={key} className="flex flex-col gap-2">
<div className="flex items-center gap-2">
<Text
variant="small-medium"
className="!font-semibold text-slate-600"
>
Pin:
</Text>
<Text variant="small" className="text-slate-700">
{beautifyString(key)}
</Text>
</div>
<div className="w-full space-y-2">
<Text
variant="small"
className="!font-semibold text-slate-600"
>
Data:
</Text>
<div className="relative space-y-2">
{value.map((item, index) => (
<div key={index}>
<ContentRenderer
value={item}
shortContent={true}
/>
</div>
))}
<div className="mt-1 flex justify-end gap-1">
<NodeDataViewer
pinName={key}
nodeId={nodeId}
execId={executionResultId}
/>
<Button
variant="secondary"
size="small"
onClick={() => handleCopy(key, value)}
className={cn(
"h-fit min-w-0 gap-1.5 border border-zinc-200 p-2 text-black hover:text-slate-900",
copiedKey === key &&
"border-green-400 bg-green-100 hover:border-green-400 hover:bg-green-200",
)}
>
{copiedKey === key ? (
<CheckIcon
size={12}
className="text-green-600"
/>
) : (
<CopyIcon size={12} />
)}
</Button>
.map(([key, value]) => (
<div key={key} className="flex flex-col gap-2">
<div className="flex items-center gap-2">
<Text
variant="small-medium"
className="!font-semibold text-slate-600"
>
Pin:
</Text>
<Text variant="small" className="text-slate-700">
{beautifyString(key)}
</Text>
</div>
<div className="w-full space-y-2">
<Text
variant="small"
className="!font-semibold text-slate-600"
>
Data:
</Text>
<div className="relative space-y-2">
{value.map((item, index) => (
<div key={index}>
<ContentRenderer value={item} shortContent={true} />
</div>
))}
<div className="mt-1 flex justify-end gap-1">
<NodeDataViewer
data={value}
pinName={key}
execId={executionResultId}
/>
<Button
variant="secondary"
size="small"
onClick={() => handleCopy(key, value)}
className={cn(
"h-fit min-w-0 gap-1.5 border border-zinc-200 p-2 text-black hover:text-slate-900",
copiedKey === key &&
"border-green-400 bg-green-100 hover:border-green-400 hover:bg-green-200",
)}
>
{copiedKey === key ? (
<CheckIcon size={12} className="text-green-600" />
) : (
<CopyIcon size={12} />
)}
</Button>
</div>
</div>
</div>
);
})}
</div>
))}
</div>
<ViewMoreData nodeId={nodeId} />
{Object.keys(outputData).length > 2 && (
<ViewMoreData
outputData={outputData}
execId={executionResultId}
/>
)}
</AccordionContent>
</AccordionItem>
</Accordion>

View File

@@ -19,51 +19,22 @@ import {
CopyIcon,
DownloadIcon,
} from "@phosphor-icons/react";
import React, { FC } from "react";
import { FC } from "react";
import { useNodeDataViewer } from "./useNodeDataViewer";
import { useNodeStore } from "@/app/(platform)/build/stores/nodeStore";
import { useShallow } from "zustand/react/shallow";
import { NodeDataType } from "../../helpers";
export interface NodeDataViewerProps {
data?: any;
interface NodeDataViewerProps {
data: any;
pinName: string;
nodeId?: string;
execId?: string;
isViewMoreData?: boolean;
dataType?: NodeDataType;
}
export const NodeDataViewer: FC<NodeDataViewerProps> = ({
data,
pinName,
nodeId,
execId = "N/A",
isViewMoreData = false,
dataType = "output",
}) => {
const executionResults = useNodeStore(
useShallow((state) =>
nodeId ? state.getNodeExecutionResults(nodeId) : [],
),
);
const latestInputData = useNodeStore(
useShallow((state) =>
nodeId ? state.getLatestNodeInputData(nodeId) : undefined,
),
);
const accumulatedOutputData = useNodeStore(
useShallow((state) =>
nodeId ? state.getAccumulatedNodeOutputData(nodeId) : {},
),
);
const resolvedData =
data ??
(dataType === "input"
? (latestInputData ?? {})
: (accumulatedOutputData[pinName] ?? []));
const {
outputItems,
copyExecutionId,
@@ -71,20 +42,7 @@ export const NodeDataViewer: FC<NodeDataViewerProps> = ({
handleDownloadItem,
dataArray,
copiedIndex,
groupedExecutions,
totalGroupedItems,
handleCopyGroupedItem,
handleDownloadGroupedItem,
copiedKey,
} = useNodeDataViewer(
resolvedData,
pinName,
execId,
executionResults,
dataType,
);
const shouldGroupExecutions = groupedExecutions.length > 0;
} = useNodeDataViewer(data, pinName, execId);
return (
<Dialog styling={{ width: "600px" }}>
<TooltipProvider>
@@ -110,141 +68,44 @@ export const NodeDataViewer: FC<NodeDataViewerProps> = ({
<div className="flex items-center gap-4">
<div className="flex items-center gap-2">
<Text variant="large-medium" className="text-slate-900">
Full {dataType === "input" ? "Input" : "Output"} Preview
Full Output Preview
</Text>
</div>
<div className="rounded-full border border-slate-300 bg-slate-100 px-3 py-1.5 text-xs font-medium text-black">
{shouldGroupExecutions ? totalGroupedItems : dataArray.length}{" "}
item
{shouldGroupExecutions
? totalGroupedItems !== 1
? "s"
: ""
: dataArray.length !== 1
? "s"
: ""}{" "}
total
{dataArray.length} item{dataArray.length !== 1 ? "s" : ""} total
</div>
</div>
<div className="text-sm text-gray-600">
{shouldGroupExecutions ? (
<div>
Pin:{" "}
<span className="font-semibold">{beautifyString(pinName)}</span>
</div>
) : (
<>
<div className="flex items-center gap-2">
<Text variant="body" className="text-slate-600">
Execution ID:
</Text>
<Text
variant="body-medium"
className="rounded-full border border-gray-300 bg-gray-50 px-2 py-1 font-mono text-xs"
>
{execId}
</Text>
<Button
variant="ghost"
size="small"
onClick={copyExecutionId}
className="h-6 w-6 min-w-0 p-0"
>
<CopyIcon size={14} />
</Button>
</div>
<div className="mt-2">
Pin:{" "}
<span className="font-semibold">
{beautifyString(pinName)}
</span>
</div>
</>
)}
<div className="flex items-center gap-2">
<Text variant="body" className="text-slate-600">
Execution ID:
</Text>
<Text
variant="body-medium"
className="rounded-full border border-gray-300 bg-gray-50 px-2 py-1 font-mono text-xs"
>
{execId}
</Text>
<Button
variant="ghost"
size="small"
onClick={copyExecutionId}
className="h-6 w-6 min-w-0 p-0"
>
<CopyIcon size={14} />
</Button>
</div>
<div className="mt-2">
Pin:{" "}
<span className="font-semibold">{beautifyString(pinName)}</span>
</div>
</div>
</div>
<div className="flex-1 overflow-hidden">
<ScrollArea className="h-full">
<div className="my-4">
{shouldGroupExecutions ? (
<div className="space-y-4">
{groupedExecutions.map((execution) => (
<div
key={execution.execId}
className="rounded-3xl border border-slate-200 bg-white p-4 shadow-sm"
>
<div className="flex items-center gap-2">
<Text variant="body" className="text-slate-600">
Execution ID:
</Text>
<Text
variant="body-medium"
className="rounded-full border border-gray-300 bg-gray-50 px-2 py-1 font-mono text-xs"
>
{execution.execId}
</Text>
</div>
<div className="mt-2 space-y-4">
{execution.outputItems.length > 0 ? (
execution.outputItems.map((item, index) => (
<div
key={item.key}
className="group flex items-start gap-4"
>
<div className="w-full flex-1">
<OutputItem
value={item.value}
metadata={item.metadata}
renderer={item.renderer}
/>
</div>
<div className="flex w-fit gap-3">
<Button
variant="secondary"
className="min-w-0 p-1"
size="icon"
onClick={() =>
handleCopyGroupedItem(
execution.execId,
index,
item,
)
}
aria-label="Copy item"
>
{copiedKey ===
`${execution.execId}-${index}` ? (
<CheckIcon className="size-4 text-green-600" />
) : (
<CopyIcon className="size-4 text-black" />
)}
</Button>
<Button
variant="secondary"
size="icon"
className="min-w-0 p-1"
onClick={() =>
handleDownloadGroupedItem(item)
}
aria-label="Download item"
>
<DownloadIcon className="size-4 text-black" />
</Button>
</div>
</div>
))
) : (
<div className="py-4 text-center text-gray-500">
No data available
</div>
)}
</div>
</div>
))}
</div>
) : dataArray.length > 0 ? (
{dataArray.length > 0 ? (
<div className="space-y-4">
{outputItems.map((item, index) => (
<div key={item.key} className="group relative">

View File

@@ -1,70 +1,82 @@
import type { OutputMetadata } from "@/components/contextual/OutputRenderers";
import { globalRegistry } from "@/components/contextual/OutputRenderers";
import { downloadOutputs } from "@/components/contextual/OutputRenderers/utils/download";
import { useToast } from "@/components/molecules/Toast/use-toast";
import { beautifyString } from "@/lib/utils";
import { useState } from "react";
import type { NodeExecutionResult } from "@/app/api/__generated__/models/nodeExecutionResult";
import {
NodeDataType,
createOutputItems,
getExecutionData,
normalizeToArray,
type OutputItem,
} from "../../helpers";
export type GroupedExecution = {
execId: string;
outputItems: Array<OutputItem>;
};
import React, { useMemo, useState } from "react";
export const useNodeDataViewer = (
data: any,
pinName: string,
execId: string,
executionResults?: NodeExecutionResult[],
dataType?: NodeDataType,
) => {
const { toast } = useToast();
const [copiedIndex, setCopiedIndex] = useState<number | null>(null);
const [copiedKey, setCopiedKey] = useState<string | null>(null);
const dataArray = Array.isArray(data) ? data : [data];
// Normalize data to array format
const dataArray = useMemo(() => {
return Array.isArray(data) ? data : [data];
}, [data]);
const outputItems =
!dataArray || dataArray.length === 0
? []
: createOutputItems(dataArray).map((item, index) => ({
...item,
// Prepare items for the enhanced renderer system
const outputItems = useMemo(() => {
if (!dataArray) return [];
const items: Array<{
key: string;
label: string;
value: unknown;
metadata?: OutputMetadata;
renderer: any;
}> = [];
dataArray.forEach((value, index) => {
const metadata: OutputMetadata = {};
// Extract metadata from the value if it's an object
if (
typeof value === "object" &&
value !== null &&
!React.isValidElement(value)
) {
const objValue = value as any;
if (objValue.type) metadata.type = objValue.type;
if (objValue.mimeType) metadata.mimeType = objValue.mimeType;
if (objValue.filename) metadata.filename = objValue.filename;
if (objValue.language) metadata.language = objValue.language;
}
const renderer = globalRegistry.getRenderer(value, metadata);
if (renderer) {
items.push({
key: `item-${index}`,
label: index === 0 ? beautifyString(pinName) : "",
}));
const groupedExecutions =
!executionResults || executionResults.length === 0
? []
: [...executionResults].reverse().map((result) => {
const rawData = getExecutionData(
result,
dataType || "output",
pinName,
);
let dataArray: unknown[];
if (dataType === "input") {
dataArray =
rawData !== undefined && rawData !== null ? [rawData] : [];
} else {
dataArray = normalizeToArray(rawData);
}
const outputItems = createOutputItems(dataArray);
return {
execId: result.node_exec_id,
outputItems,
};
value,
metadata,
renderer,
});
} else {
// Fallback to text renderer
const textRenderer = globalRegistry
.getAllRenderers()
.find((r) => r.name === "TextRenderer");
if (textRenderer) {
items.push({
key: `item-${index}`,
label: index === 0 ? beautifyString(pinName) : "",
value:
typeof value === "string"
? value
: JSON.stringify(value, null, 2),
metadata,
renderer: textRenderer,
});
}
}
});
const totalGroupedItems = groupedExecutions.reduce(
(total, execution) => total + execution.outputItems.length,
0,
);
return items;
}, [dataArray, pinName]);
const copyExecutionId = () => {
navigator.clipboard.writeText(execId).then(() => {
@@ -110,45 +122,6 @@ export const useNodeDataViewer = (
]);
};
const handleCopyGroupedItem = async (
execId: string,
index: number,
item: OutputItem,
) => {
const copyContent = item.renderer.getCopyContent(item.value, item.metadata);
if (!copyContent) {
return;
}
try {
let text: string;
if (typeof copyContent.data === "string") {
text = copyContent.data;
} else if (copyContent.fallbackText) {
text = copyContent.fallbackText;
} else {
return;
}
await navigator.clipboard.writeText(text);
setCopiedKey(`${execId}-${index}`);
setTimeout(() => setCopiedKey(null), 2000);
} catch (error) {
console.error("Failed to copy:", error);
}
};
const handleDownloadGroupedItem = (item: OutputItem) => {
downloadOutputs([
{
value: item.value,
metadata: item.metadata,
renderer: item.renderer,
},
]);
};
return {
outputItems,
dataArray,
@@ -156,10 +129,5 @@ export const useNodeDataViewer = (
handleCopyItem,
handleDownloadItem,
copiedIndex,
groupedExecutions,
totalGroupedItems,
handleCopyGroupedItem,
handleDownloadGroupedItem,
copiedKey,
};
};

View File

@@ -8,28 +8,16 @@ import { useState } from "react";
import { NodeDataViewer } from "./NodeDataViewer/NodeDataViewer";
import { useToast } from "@/components/molecules/Toast/use-toast";
import { CheckIcon, CopyIcon } from "@phosphor-icons/react";
import { useNodeStore } from "@/app/(platform)/build/stores/nodeStore";
import { useShallow } from "zustand/react/shallow";
import {
NodeDataType,
getExecutionEntries,
normalizeToArray,
} from "../helpers";
export const ViewMoreData = ({
nodeId,
dataType = "output",
outputData,
execId,
}: {
nodeId: string;
dataType?: NodeDataType;
outputData: Record<string, Array<any>>;
execId?: string;
}) => {
const [copiedKey, setCopiedKey] = useState<string | null>(null);
const { toast } = useToast();
const executionResults = useNodeStore(
useShallow((state) => state.getNodeExecutionResults(nodeId)),
);
const reversedExecutionResults = [...executionResults].reverse();
const handleCopy = (key: string, value: any) => {
const textToCopy =
@@ -41,8 +29,8 @@ export const ViewMoreData = ({
setTimeout(() => setCopiedKey(null), 2000);
};
const copyExecutionId = (executionId: string) => {
navigator.clipboard.writeText(executionId || "N/A").then(() => {
const copyExecutionId = () => {
navigator.clipboard.writeText(execId || "N/A").then(() => {
toast({
title: "Execution ID copied to clipboard!",
duration: 2000,
@@ -54,7 +42,7 @@ export const ViewMoreData = ({
<Dialog styling={{ width: "600px", paddingRight: "16px" }}>
<Dialog.Trigger>
<Button
variant="secondary"
variant="primary"
size="small"
className="h-fit w-fit min-w-0 !text-xs"
>
@@ -64,114 +52,83 @@ export const ViewMoreData = ({
<Dialog.Content>
<div className="flex flex-col gap-4">
<Text variant="h4" className="text-slate-900">
Complete {dataType === "input" ? "Input" : "Output"} Data
Complete Output Data
</Text>
<div className="flex items-center gap-2">
<Text variant="body" className="text-slate-600">
Execution ID:
</Text>
<Text
variant="body-medium"
className="rounded-full border border-gray-300 bg-gray-50 px-2 py-1 font-mono text-xs"
>
{execId}
</Text>
<Button
variant="ghost"
size="small"
onClick={copyExecutionId}
className="h-6 w-6 min-w-0 p-0"
>
<CopyIcon size={14} />
</Button>
</div>
<ScrollArea className="h-full">
<div className="flex flex-col gap-4">
{reversedExecutionResults.map((result) => (
<div
key={result.node_exec_id}
className="rounded-3xl border border-slate-200 bg-white p-4 shadow-sm"
>
{Object.entries(outputData).map(([key, value]) => (
<div key={key} className="flex flex-col gap-2">
<div className="flex items-center gap-2">
<Text variant="body" className="text-slate-600">
Execution ID:
</Text>
<Text
variant="body-medium"
className="rounded-full border border-gray-300 bg-gray-50 px-2 py-1 font-mono text-xs"
className="!font-semibold text-slate-600"
>
{result.node_exec_id}
Pin:
</Text>
<Text variant="body-medium" className="text-slate-700">
{beautifyString(key)}
</Text>
<Button
variant="ghost"
size="small"
onClick={() => copyExecutionId(result.node_exec_id)}
className="h-6 w-6 min-w-0 p-0"
>
<CopyIcon size={14} />
</Button>
</div>
<div className="w-full space-y-2">
<Text
variant="body-medium"
className="!font-semibold text-slate-600"
>
Data:
</Text>
<div className="relative space-y-2">
{value.map((item, index) => (
<div key={index}>
<ContentRenderer value={item} shortContent={false} />
</div>
))}
<div className="mt-4 flex flex-col gap-4">
{getExecutionEntries(result, dataType).map(
([key, value]) => {
const normalizedValue = normalizeToArray(value);
return (
<div key={key} className="flex flex-col gap-2">
<div className="flex items-center gap-2">
<Text
variant="body-medium"
className="!font-semibold text-slate-600"
>
Pin:
</Text>
<Text
variant="body-medium"
className="text-slate-700"
>
{beautifyString(key)}
</Text>
</div>
<div className="w-full space-y-2">
<Text
variant="body-medium"
className="!font-semibold text-slate-600"
>
Data:
</Text>
<div className="relative space-y-2">
{normalizedValue.map((item, index) => (
<div key={index}>
<ContentRenderer
value={item}
shortContent={false}
/>
</div>
))}
<div className="mt-1 flex justify-end gap-1">
<NodeDataViewer
data={normalizedValue}
pinName={key}
execId={result.node_exec_id}
isViewMoreData={true}
dataType={dataType}
/>
<Button
variant="secondary"
size="small"
onClick={() =>
handleCopy(
`${result.node_exec_id}-${key}`,
normalizedValue,
)
}
className={cn(
"h-fit min-w-0 gap-1.5 border border-zinc-200 p-2 text-black hover:text-slate-900",
copiedKey ===
`${result.node_exec_id}-${key}` &&
"border-green-400 bg-green-100 hover:border-green-400 hover:bg-green-200",
)}
>
{copiedKey ===
`${result.node_exec_id}-${key}` ? (
<CheckIcon
size={16}
className="text-green-600"
/>
) : (
<CopyIcon size={16} />
)}
</Button>
</div>
</div>
</div>
</div>
);
},
)}
<div className="mt-1 flex justify-end gap-1">
<NodeDataViewer
data={value}
pinName={key}
execId={execId}
isViewMoreData={true}
/>
<Button
variant="secondary"
size="small"
onClick={() => handleCopy(key, value)}
className={cn(
"h-fit min-w-0 gap-1.5 border border-zinc-200 p-2 text-black hover:text-slate-900",
copiedKey === key &&
"border-green-400 bg-green-100 hover:border-green-400 hover:bg-green-200",
)}
>
{copiedKey === key ? (
<CheckIcon size={16} className="text-green-600" />
) : (
<CopyIcon size={16} />
)}
</Button>
</div>
</div>
</div>
</div>
))}

View File

@@ -1,83 +0,0 @@
import type { NodeExecutionResult } from "@/app/api/__generated__/models/nodeExecutionResult";
import type { OutputMetadata } from "@/components/contextual/OutputRenderers";
import { globalRegistry } from "@/components/contextual/OutputRenderers";
import React from "react";
export type NodeDataType = "input" | "output";
export type OutputItem = {
key: string;
value: unknown;
metadata?: OutputMetadata;
renderer: any;
};
export const normalizeToArray = (value: unknown) => {
if (value === undefined) return [];
return Array.isArray(value) ? value : [value];
};
export const getExecutionData = (
result: NodeExecutionResult,
dataType: NodeDataType,
pinName: string,
) => {
if (dataType === "input") {
return result.input_data;
}
return result.output_data?.[pinName];
};
export const createOutputItems = (dataArray: unknown[]): Array<OutputItem> => {
const items: Array<OutputItem> = [];
dataArray.forEach((value, index) => {
const metadata: OutputMetadata = {};
if (
typeof value === "object" &&
value !== null &&
!React.isValidElement(value)
) {
const objValue = value as any;
if (objValue.type) metadata.type = objValue.type;
if (objValue.mimeType) metadata.mimeType = objValue.mimeType;
if (objValue.filename) metadata.filename = objValue.filename;
if (objValue.language) metadata.language = objValue.language;
}
const renderer = globalRegistry.getRenderer(value, metadata);
if (renderer) {
items.push({
key: `item-${index}`,
value,
metadata,
renderer,
});
} else {
const textRenderer = globalRegistry
.getAllRenderers()
.find((r) => r.name === "TextRenderer");
if (textRenderer) {
items.push({
key: `item-${index}`,
value:
typeof value === "string" ? value : JSON.stringify(value, null, 2),
metadata,
renderer: textRenderer,
});
}
}
});
return items;
};
export const getExecutionEntries = (
result: NodeExecutionResult,
dataType: NodeDataType,
) => {
const data = dataType === "input" ? result.input_data : result.output_data;
return Object.entries(data || {});
};

View File

@@ -7,18 +7,15 @@ export const useNodeOutput = (nodeId: string) => {
const [copiedKey, setCopiedKey] = useState<string | null>(null);
const { toast } = useToast();
const latestResult = useNodeStore(
useShallow((state) => state.getLatestNodeExecutionResult(nodeId)),
const nodeExecutionResult = useNodeStore(
useShallow((state) => state.getNodeExecutionResult(nodeId)),
);
const latestInputData = useNodeStore(
useShallow((state) => state.getLatestNodeInputData(nodeId)),
);
const latestOutputData: Record<string, Array<any>> = useNodeStore(
useShallow((state) => state.getLatestNodeOutputData(nodeId) || {}),
);
const inputData = nodeExecutionResult?.input_data;
const outputData: Record<string, Array<any>> = {
...nodeExecutionResult?.output_data,
};
const handleCopy = async (key: string, value: any) => {
try {
const text = JSON.stringify(value, null, 2);
@@ -38,12 +35,11 @@ export const useNodeOutput = (nodeId: string) => {
});
}
};
return {
latestOutputData,
latestInputData,
outputData,
inputData,
copiedKey,
handleCopy,
executionResultId: latestResult?.node_exec_id,
executionResultId: nodeExecutionResult?.node_exec_id,
};
};

View File

@@ -1,7 +1,10 @@
import { useState, useCallback, useEffect } from "react";
import { useShallow } from "zustand/react/shallow";
import { useGraphStore } from "@/app/(platform)/build/stores/graphStore";
import { useNodeStore } from "@/app/(platform)/build/stores/nodeStore";
import {
useNodeStore,
NodeResolutionData,
} from "@/app/(platform)/build/stores/nodeStore";
import { useEdgeStore } from "@/app/(platform)/build/stores/edgeStore";
import {
useSubAgentUpdate,
@@ -10,7 +13,6 @@ import {
} from "@/app/(platform)/build/hooks/useSubAgentUpdate";
import { GraphInputSchema, GraphOutputSchema } from "@/lib/autogpt-server-api";
import { CustomNodeData } from "../../CustomNode";
import { NodeResolutionData } from "@/app/(platform)/build/stores/types";
// Stable empty set to avoid creating new references in selectors
const EMPTY_SET: Set<string> = new Set();

View File

@@ -1,5 +1,5 @@
import { AgentExecutionStatus } from "@/app/api/__generated__/models/agentExecutionStatus";
import { NodeResolutionData } from "@/app/(platform)/build/stores/types";
import { NodeResolutionData } from "@/app/(platform)/build/stores/nodeStore";
import { RJSFSchema } from "@rjsf/utils";
export const nodeStyleBasedOnStatus: Record<AgentExecutionStatus, string> = {

View File

@@ -1,16 +0,0 @@
export const accumulateExecutionData = (
accumulated: Record<string, unknown[]>,
data: Record<string, unknown> | undefined,
) => {
if (!data) return { ...accumulated };
const next = { ...accumulated };
Object.entries(data).forEach(([key, values]) => {
const nextValues = Array.isArray(values) ? values : [values];
if (next[key]) {
next[key] = [...next[key], ...nextValues];
} else {
next[key] = [...nextValues];
}
});
return next;
};

View File

@@ -10,8 +10,6 @@ import {
import { Node } from "@/app/api/__generated__/models/node";
import { AgentExecutionStatus } from "@/app/api/__generated__/models/agentExecutionStatus";
import { NodeExecutionResult } from "@/app/api/__generated__/models/nodeExecutionResult";
import { NodeExecutionResultInputData } from "@/app/api/__generated__/models/nodeExecutionResultInputData";
import { NodeExecutionResultOutputData } from "@/app/api/__generated__/models/nodeExecutionResultOutputData";
import { useHistoryStore } from "./historyStore";
import { useEdgeStore } from "./edgeStore";
import { BlockUIType } from "../components/types";
@@ -20,10 +18,31 @@ import {
ensurePathExists,
parseHandleIdToPath,
} from "@/components/renderers/InputRenderer/helpers";
import { accumulateExecutionData } from "./helpers";
import { NodeResolutionData } from "./types";
import { IncompatibilityInfo } from "../hooks/useSubAgentUpdate/types";
// Resolution mode data stored per node
export type NodeResolutionData = {
incompatibilities: IncompatibilityInfo;
// The NEW schema from the update (what we're updating TO)
pendingUpdate: {
input_schema: Record<string, unknown>;
output_schema: Record<string, unknown>;
};
// The OLD schema before the update (what we're updating FROM)
// Needed to merge and show removed inputs during resolution
currentSchema: {
input_schema: Record<string, unknown>;
output_schema: Record<string, unknown>;
};
// The full updated hardcoded values to apply when resolution completes
pendingHardcodedValues: Record<string, unknown>;
};
// Minimum movement (in pixels) required before logging position change to history
// Prevents spamming history with small movements when clicking on inputs inside blocks
const MINIMUM_MOVE_BEFORE_LOG = 50;
// Track initial positions when drag starts (outside store to avoid re-renders)
const dragStartPositions: Record<string, XYPosition> = {};
let dragStartState: { nodes: CustomNode[]; edges: CustomEdge[] } | null = null;
@@ -33,15 +52,6 @@ type NodeStore = {
nodeCounter: number;
setNodeCounter: (nodeCounter: number) => void;
nodeAdvancedStates: Record<string, boolean>;
latestNodeInputData: Record<string, NodeExecutionResultInputData | undefined>;
latestNodeOutputData: Record<
string,
NodeExecutionResultOutputData | undefined
>;
accumulatedNodeInputData: Record<string, Record<string, unknown[]>>;
accumulatedNodeOutputData: Record<string, Record<string, unknown[]>>;
setNodes: (nodes: CustomNode[]) => void;
onNodesChange: (changes: NodeChange<CustomNode>[]) => void;
addNode: (node: CustomNode) => void;
@@ -62,26 +72,12 @@ type NodeStore = {
updateNodeStatus: (nodeId: string, status: AgentExecutionStatus) => void;
getNodeStatus: (nodeId: string) => AgentExecutionStatus | undefined;
cleanNodesStatuses: () => void;
updateNodeExecutionResult: (
nodeId: string,
result: NodeExecutionResult,
) => void;
getNodeExecutionResults: (nodeId: string) => NodeExecutionResult[];
getLatestNodeInputData: (
nodeId: string,
) => NodeExecutionResultInputData | undefined;
getLatestNodeOutputData: (
nodeId: string,
) => NodeExecutionResultOutputData | undefined;
getAccumulatedNodeInputData: (nodeId: string) => Record<string, unknown[]>;
getAccumulatedNodeOutputData: (nodeId: string) => Record<string, unknown[]>;
getLatestNodeExecutionResult: (
nodeId: string,
) => NodeExecutionResult | undefined;
clearAllNodeExecutionResults: () => void;
getNodeExecutionResult: (nodeId: string) => NodeExecutionResult | undefined;
getNodeBlockUIType: (nodeId: string) => BlockUIType;
hasWebhookNodes: () => boolean;
@@ -126,10 +122,6 @@ export const useNodeStore = create<NodeStore>((set, get) => ({
nodeCounter: 0,
setNodeCounter: (nodeCounter) => set({ nodeCounter }),
nodeAdvancedStates: {},
latestNodeInputData: {},
latestNodeOutputData: {},
accumulatedNodeInputData: {},
accumulatedNodeOutputData: {},
incrementNodeCounter: () =>
set((state) => ({
nodeCounter: state.nodeCounter + 1,
@@ -325,163 +317,18 @@ export const useNodeStore = create<NodeStore>((set, get) => ({
return get().nodes.find((n) => n.id === nodeId)?.data?.status;
},
cleanNodesStatuses: () => {
set((state) => ({
nodes: state.nodes.map((n) => ({
...n,
data: { ...n.data, status: undefined },
})),
}));
},
updateNodeExecutionResult: (nodeId: string, result: NodeExecutionResult) => {
set((state) => {
let latestNodeInputData = state.latestNodeInputData;
let latestNodeOutputData = state.latestNodeOutputData;
let accumulatedNodeInputData = state.accumulatedNodeInputData;
let accumulatedNodeOutputData = state.accumulatedNodeOutputData;
const nodes = state.nodes.map((n) => {
if (n.id !== nodeId) return n;
const existingResults = n.data.nodeExecutionResults || [];
const duplicateIndex = existingResults.findIndex(
(r) => r.node_exec_id === result.node_exec_id,
);
if (duplicateIndex !== -1) {
const oldResult = existingResults[duplicateIndex];
const inputDataChanged =
JSON.stringify(oldResult.input_data) !==
JSON.stringify(result.input_data);
const outputDataChanged =
JSON.stringify(oldResult.output_data) !==
JSON.stringify(result.output_data);
if (!inputDataChanged && !outputDataChanged) {
return n;
}
const updatedResults = [...existingResults];
updatedResults[duplicateIndex] = result;
const recomputedAccumulatedInput = updatedResults.reduce(
(acc, r) => accumulateExecutionData(acc, r.input_data),
{} as Record<string, unknown[]>,
);
const recomputedAccumulatedOutput = updatedResults.reduce(
(acc, r) => accumulateExecutionData(acc, r.output_data),
{} as Record<string, unknown[]>,
);
const mostRecentResult = updatedResults[updatedResults.length - 1];
latestNodeInputData = {
...latestNodeInputData,
[nodeId]: mostRecentResult.input_data,
};
latestNodeOutputData = {
...latestNodeOutputData,
[nodeId]: mostRecentResult.output_data,
};
accumulatedNodeInputData = {
...accumulatedNodeInputData,
[nodeId]: recomputedAccumulatedInput,
};
accumulatedNodeOutputData = {
...accumulatedNodeOutputData,
[nodeId]: recomputedAccumulatedOutput,
};
return {
...n,
data: {
...n.data,
nodeExecutionResults: updatedResults,
},
};
}
accumulatedNodeInputData = {
...accumulatedNodeInputData,
[nodeId]: accumulateExecutionData(
accumulatedNodeInputData[nodeId] || {},
result.input_data,
),
};
accumulatedNodeOutputData = {
...accumulatedNodeOutputData,
[nodeId]: accumulateExecutionData(
accumulatedNodeOutputData[nodeId] || {},
result.output_data,
),
};
latestNodeInputData = {
...latestNodeInputData,
[nodeId]: result.input_data,
};
latestNodeOutputData = {
...latestNodeOutputData,
[nodeId]: result.output_data,
};
return {
...n,
data: {
...n.data,
nodeExecutionResults: [...existingResults, result],
},
};
});
return {
nodes,
latestNodeInputData,
latestNodeOutputData,
accumulatedNodeInputData,
accumulatedNodeOutputData,
};
});
},
getNodeExecutionResults: (nodeId: string) => {
return (
get().nodes.find((n) => n.id === nodeId)?.data?.nodeExecutionResults || []
);
},
getLatestNodeInputData: (nodeId: string) => {
return get().latestNodeInputData[nodeId];
},
getLatestNodeOutputData: (nodeId: string) => {
return get().latestNodeOutputData[nodeId];
},
getAccumulatedNodeInputData: (nodeId: string) => {
return get().accumulatedNodeInputData[nodeId] || {};
},
getAccumulatedNodeOutputData: (nodeId: string) => {
return get().accumulatedNodeOutputData[nodeId] || {};
},
getLatestNodeExecutionResult: (nodeId: string) => {
const results =
get().nodes.find((n) => n.id === nodeId)?.data?.nodeExecutionResults ||
[];
return results.length > 0 ? results[results.length - 1] : undefined;
},
clearAllNodeExecutionResults: () => {
set((state) => ({
nodes: state.nodes.map((n) => ({
...n,
data: {
...n.data,
nodeExecutionResults: [],
},
})),
latestNodeInputData: {},
latestNodeOutputData: {},
accumulatedNodeInputData: {},
accumulatedNodeOutputData: {},
nodes: state.nodes.map((n) =>
n.id === nodeId
? { ...n, data: { ...n.data, nodeExecutionResult: result } }
: n,
),
}));
},
getNodeExecutionResult: (nodeId: string) => {
return get().nodes.find((n) => n.id === nodeId)?.data?.nodeExecutionResult;
},
getNodeBlockUIType: (nodeId: string) => {
return (
get().nodes.find((n) => n.id === nodeId)?.data?.uiType ??

View File

@@ -1,14 +0,0 @@
import { IncompatibilityInfo } from "../hooks/useSubAgentUpdate/types";
export type NodeResolutionData = {
incompatibilities: IncompatibilityInfo;
pendingUpdate: {
input_schema: Record<string, unknown>;
output_schema: Record<string, unknown>;
};
currentSchema: {
input_schema: Record<string, unknown>;
output_schema: Record<string, unknown>;
};
pendingHardcodedValues: Record<string, unknown>;
};

View File

@@ -14,10 +14,6 @@ import {
import { Dialog } from "@/components/molecules/Dialog/Dialog";
import { useEffect, useRef, useState } from "react";
import { ScheduleAgentModal } from "../ScheduleAgentModal/ScheduleAgentModal";
import {
AIAgentSafetyPopup,
useAIAgentSafetyPopup,
} from "./components/AIAgentSafetyPopup/AIAgentSafetyPopup";
import { ModalHeader } from "./components/ModalHeader/ModalHeader";
import { ModalRunSection } from "./components/ModalRunSection/ModalRunSection";
import { RunActions } from "./components/RunActions/RunActions";
@@ -87,18 +83,8 @@ export function RunAgentModal({
const [isScheduleModalOpen, setIsScheduleModalOpen] = useState(false);
const [hasOverflow, setHasOverflow] = useState(false);
const [isSafetyPopupOpen, setIsSafetyPopupOpen] = useState(false);
const [pendingRunAction, setPendingRunAction] = useState<(() => void) | null>(
null,
);
const contentRef = useRef<HTMLDivElement>(null);
const { shouldShowPopup, dismissPopup } = useAIAgentSafetyPopup(
agent.id,
agent.has_sensitive_action,
agent.has_human_in_the_loop,
);
const hasAnySetupFields =
Object.keys(agentInputFields || {}).length > 0 ||
Object.keys(agentCredentialsInputFields || {}).length > 0;
@@ -179,24 +165,6 @@ export function RunAgentModal({
onScheduleCreated?.(schedule);
}
function handleRunWithSafetyCheck() {
if (shouldShowPopup) {
setPendingRunAction(() => handleRun);
setIsSafetyPopupOpen(true);
} else {
handleRun();
}
}
function handleSafetyPopupAcknowledge() {
setIsSafetyPopupOpen(false);
dismissPopup();
if (pendingRunAction) {
pendingRunAction();
setPendingRunAction(null);
}
}
return (
<>
<Dialog
@@ -280,7 +248,7 @@ export function RunAgentModal({
)}
<RunActions
defaultRunType={defaultRunType}
onRun={handleRunWithSafetyCheck}
onRun={handleRun}
isExecuting={isExecuting}
isSettingUpTrigger={isSettingUpTrigger}
isRunReady={allRequiredInputsAreSet}
@@ -298,12 +266,6 @@ export function RunAgentModal({
</div>
</Dialog.Content>
</Dialog>
<AIAgentSafetyPopup
agentId={agent.id}
isOpen={isSafetyPopupOpen}
onAcknowledge={handleSafetyPopupAcknowledge}
/>
</>
);
}

View File

@@ -1,108 +0,0 @@
"use client";
import { Button } from "@/components/atoms/Button/Button";
import { Text } from "@/components/atoms/Text/Text";
import { Dialog } from "@/components/molecules/Dialog/Dialog";
import { Key, storage } from "@/services/storage/local-storage";
import { ShieldCheckIcon } from "@phosphor-icons/react";
import { useCallback, useEffect, useState } from "react";
interface Props {
agentId: string;
onAcknowledge: () => void;
isOpen: boolean;
}
export function AIAgentSafetyPopup({ agentId, onAcknowledge, isOpen }: Props) {
function handleAcknowledge() {
// Add this agent to the list of agents for which popup has been shown
const seenAgentsJson = storage.get(Key.AI_AGENT_SAFETY_POPUP_SHOWN);
const seenAgents: string[] = seenAgentsJson
? JSON.parse(seenAgentsJson)
: [];
if (!seenAgents.includes(agentId)) {
seenAgents.push(agentId);
storage.set(Key.AI_AGENT_SAFETY_POPUP_SHOWN, JSON.stringify(seenAgents));
}
onAcknowledge();
}
if (!isOpen) return null;
return (
<Dialog
controlled={{ isOpen, set: () => {} }}
styling={{ maxWidth: "480px" }}
>
<Dialog.Content>
<div className="flex flex-col items-center p-6 text-center">
<div className="mb-6 flex h-16 w-16 items-center justify-center rounded-full bg-blue-50">
<ShieldCheckIcon
weight="fill"
size={32}
className="text-blue-600"
/>
</div>
<Text variant="h3" className="mb-4">
Safety Checks Enabled
</Text>
<Text variant="body" className="mb-2 text-zinc-700">
AI-generated agents may take actions that affect your data or
external systems.
</Text>
<Text variant="body" className="mb-8 text-zinc-700">
AutoGPT includes safety checks so you&apos;ll always have the
opportunity to review and approve sensitive actions before they
happen.
</Text>
<Button
variant="primary"
size="large"
className="w-full"
onClick={handleAcknowledge}
>
Got it
</Button>
</div>
</Dialog.Content>
</Dialog>
);
}
export function useAIAgentSafetyPopup(
agentId: string,
hasSensitiveAction: boolean,
hasHumanInTheLoop: boolean,
) {
const [shouldShowPopup, setShouldShowPopup] = useState(false);
const [hasChecked, setHasChecked] = useState(false);
useEffect(() => {
if (hasChecked) return;
const seenAgentsJson = storage.get(Key.AI_AGENT_SAFETY_POPUP_SHOWN);
const seenAgents: string[] = seenAgentsJson
? JSON.parse(seenAgentsJson)
: [];
const hasSeenPopupForThisAgent = seenAgents.includes(agentId);
const isRelevantAgent = hasSensitiveAction || hasHumanInTheLoop;
setShouldShowPopup(!hasSeenPopupForThisAgent && isRelevantAgent);
setHasChecked(true);
}, [agentId, hasSensitiveAction, hasHumanInTheLoop, hasChecked]);
const dismissPopup = useCallback(() => {
setShouldShowPopup(false);
}, []);
return {
shouldShowPopup,
dismissPopup,
};
}

View File

@@ -69,6 +69,7 @@ export function SafeModeToggle({ graph, className }: Props) {
const {
currentHITLSafeMode,
showHITLToggle,
isHITLStateUndetermined,
handleHITLToggle,
currentSensitiveActionSafeMode,
showSensitiveActionToggle,
@@ -77,13 +78,20 @@ export function SafeModeToggle({ graph, className }: Props) {
shouldShowToggle,
} = useAgentSafeMode(graph);
if (!shouldShowToggle) {
if (!shouldShowToggle || isHITLStateUndetermined) {
return null;
}
const showHITL = showHITLToggle && !isHITLStateUndetermined;
const showSensitive = showSensitiveActionToggle;
if (!showHITL && !showSensitive) {
return null;
}
return (
<div className={cn("flex gap-1", className)}>
{showHITLToggle && (
{showHITL && (
<SafeModeIconButton
isEnabled={currentHITLSafeMode}
label="Human-in-the-loop"
@@ -93,7 +101,7 @@ export function SafeModeToggle({ graph, className }: Props) {
isPending={isPending}
/>
)}
{showSensitiveActionToggle && (
{showSensitive && (
<SafeModeIconButton
isEnabled={currentSensitiveActionSafeMode}
label="Sensitive actions"

View File

@@ -8809,12 +8809,6 @@
"title": "Node Exec Id",
"description": "Node execution ID (primary key)"
},
"node_id": {
"type": "string",
"title": "Node Id",
"description": "Node definition ID (for grouping)",
"default": ""
},
"user_id": {
"type": "string",
"title": "User Id",
@@ -8914,7 +8908,7 @@
"created_at"
],
"title": "PendingHumanReviewModel",
"description": "Response model for pending human review data.\n\nRepresents a human review request that is awaiting user action.\nContains all necessary information for a user to review and approve\nor reject data from a Human-in-the-Loop block execution.\n\nAttributes:\n id: Unique identifier for the review record\n user_id: ID of the user who must perform the review\n node_exec_id: ID of the node execution that created this review\n node_id: ID of the node definition (for grouping reviews from same node)\n graph_exec_id: ID of the graph execution containing the node\n graph_id: ID of the graph template being executed\n graph_version: Version number of the graph template\n payload: The actual data payload awaiting review\n instructions: Instructions or message for the reviewer\n editable: Whether the reviewer can edit the data\n status: Current review status (WAITING, APPROVED, or REJECTED)\n review_message: Optional message from the reviewer\n created_at: Timestamp when review was created\n updated_at: Timestamp when review was last modified\n reviewed_at: Timestamp when review was completed (if applicable)"
"description": "Response model for pending human review data.\n\nRepresents a human review request that is awaiting user action.\nContains all necessary information for a user to review and approve\nor reject data from a Human-in-the-Loop block execution.\n\nAttributes:\n id: Unique identifier for the review record\n user_id: ID of the user who must perform the review\n node_exec_id: ID of the node execution that created this review\n graph_exec_id: ID of the graph execution containing the node\n graph_id: ID of the graph template being executed\n graph_version: Version number of the graph template\n payload: The actual data payload awaiting review\n instructions: Instructions or message for the reviewer\n editable: Whether the reviewer can edit the data\n status: Current review status (WAITING, APPROVED, or REJECTED)\n review_message: Optional message from the reviewer\n created_at: Timestamp when review was created\n updated_at: Timestamp when review was last modified\n reviewed_at: Timestamp when review was completed (if applicable)"
},
"PostmarkBounceEnum": {
"type": "integer",
@@ -9417,12 +9411,6 @@
],
"title": "Reviewed Data",
"description": "Optional edited data (ignored if approved=False)"
},
"auto_approve_future": {
"type": "boolean",
"title": "Auto Approve Future",
"description": "If true and this review is approved, future executions of this same block (node) will be automatically approved. This only affects approved reviews.",
"default": false
}
},
"type": "object",
@@ -9442,7 +9430,7 @@
"type": "object",
"required": ["reviews"],
"title": "ReviewRequest",
"description": "Request model for processing ALL pending reviews for an execution.\n\nThis request must include ALL pending reviews for a graph execution.\nEach review will be either approved (with optional data modifications)\nor rejected (data ignored). The execution will resume only after ALL reviews are processed.\n\nEach review item can individually specify whether to auto-approve future executions\nof the same block via the `auto_approve_future` field on ReviewItem."
"description": "Request model for processing ALL pending reviews for an execution.\n\nThis request must include ALL pending reviews for a graph execution.\nEach review will be either approved (with optional data modifications)\nor rejected (data ignored). The execution will resume only after ALL reviews are processed."
},
"ReviewResponse": {
"properties": {

View File

@@ -26,6 +26,7 @@ export const providerIcons: Partial<
nvidia: fallbackIcon,
discord: FaDiscord,
d_id: fallbackIcon,
elevenlabs: fallbackIcon,
google_maps: FaGoogle,
jina: fallbackIcon,
ideogram: fallbackIcon,

View File

@@ -31,29 +31,6 @@ export function FloatingReviewsPanel({
query: {
enabled: !!(graphId && executionId),
select: okData,
// Poll while execution is in progress to detect status changes
refetchInterval: (q) => {
// Note: refetchInterval callback receives raw data before select transform
const rawData = q.state.data as
| { status: number; data?: { status?: string } }
| undefined;
if (rawData?.status !== 200) return false;
const status = rawData?.data?.status;
if (!status) return false;
// Poll every 2 seconds while running or in review
if (
status === AgentExecutionStatus.RUNNING ||
status === AgentExecutionStatus.QUEUED ||
status === AgentExecutionStatus.INCOMPLETE ||
status === AgentExecutionStatus.REVIEW
) {
return 2000;
}
return false;
},
refetchIntervalInBackground: true,
},
},
);
@@ -63,47 +40,28 @@ export function FloatingReviewsPanel({
useShallow((state) => state.graphExecutionStatus),
);
// Determine if we should poll for pending reviews
const isInReviewStatus =
executionDetails?.status === AgentExecutionStatus.REVIEW ||
graphExecutionStatus === AgentExecutionStatus.REVIEW;
const { pendingReviews, isLoading, refetch } = usePendingReviewsForExecution(
executionId || "",
{
enabled: !!executionId,
// Poll every 2 seconds when in REVIEW status to catch new reviews
refetchInterval: isInReviewStatus ? 2000 : false,
},
);
// Refetch pending reviews when execution status changes
useEffect(() => {
if (executionId && executionDetails?.status) {
if (executionId) {
refetch();
}
}, [executionDetails?.status, executionId, refetch]);
// Hide panel if:
// 1. No execution ID
// 2. No pending reviews and not in REVIEW status
// 3. Execution is RUNNING or QUEUED (hasn't paused for review yet)
if (!executionId) {
return null;
}
// Refetch when graph execution status changes to REVIEW
useEffect(() => {
if (graphExecutionStatus === AgentExecutionStatus.REVIEW && executionId) {
refetch();
}
}, [graphExecutionStatus, executionId, refetch]);
if (
!isLoading &&
pendingReviews.length === 0 &&
executionDetails?.status !== AgentExecutionStatus.REVIEW
) {
return null;
}
// Don't show panel while execution is still running/queued (not paused for review)
if (
executionDetails?.status === AgentExecutionStatus.RUNNING ||
executionDetails?.status === AgentExecutionStatus.QUEUED
!executionId ||
(!isLoading &&
pendingReviews.length === 0 &&
executionDetails?.status !== AgentExecutionStatus.REVIEW)
) {
return null;
}

View File

@@ -1,8 +1,10 @@
import { PendingHumanReviewModel } from "@/app/api/__generated__/models/pendingHumanReviewModel";
import { Text } from "@/components/atoms/Text/Text";
import { Button } from "@/components/atoms/Button/Button";
import { Input } from "@/components/atoms/Input/Input";
import { Switch } from "@/components/atoms/Switch/Switch";
import { useEffect, useState } from "react";
import { TrashIcon, EyeSlashIcon } from "@phosphor-icons/react";
import { useState } from "react";
interface StructuredReviewPayload {
data: unknown;
@@ -38,49 +40,37 @@ function extractReviewData(payload: unknown): {
interface PendingReviewCardProps {
review: PendingHumanReviewModel;
onReviewDataChange: (nodeExecId: string, data: string) => void;
autoApproveFuture?: boolean;
onAutoApproveFutureChange?: (nodeExecId: string, enabled: boolean) => void;
externalDataValue?: string;
showAutoApprove?: boolean;
nodeId?: string;
reviewMessage?: string;
onReviewMessageChange?: (nodeExecId: string, message: string) => void;
isDisabled?: boolean;
onToggleDisabled?: (nodeExecId: string) => void;
}
export function PendingReviewCard({
review,
onReviewDataChange,
autoApproveFuture = false,
onAutoApproveFutureChange,
externalDataValue,
showAutoApprove = true,
nodeId,
reviewMessage = "",
onReviewMessageChange,
isDisabled = false,
onToggleDisabled,
}: PendingReviewCardProps) {
const extractedData = extractReviewData(review.payload);
const isDataEditable = review.editable;
let instructions = review.instructions;
const isHITLBlock = instructions && !instructions.includes("Block");
if (instructions && !isHITLBlock) {
instructions = undefined;
}
const instructions = extractedData.instructions || review.instructions;
const [currentData, setCurrentData] = useState(extractedData.data);
useEffect(() => {
if (externalDataValue !== undefined) {
try {
const parsedData = JSON.parse(externalDataValue);
setCurrentData(parsedData);
} catch {}
}
}, [externalDataValue]);
const handleDataChange = (newValue: unknown) => {
setCurrentData(newValue);
onReviewDataChange(review.node_exec_id, JSON.stringify(newValue, null, 2));
};
const handleMessageChange = (newMessage: string) => {
onReviewMessageChange?.(review.node_exec_id, newMessage);
};
// Show simplified view when no toggle functionality is provided (Screenshot 1 mode)
const showSimplified = !onToggleDisabled;
const renderDataInput = () => {
const data = currentData;
@@ -147,59 +137,97 @@ export function PendingReviewCard({
}
};
const getShortenedNodeId = (id: string) => {
if (id.length <= 8) return id;
return `${id.slice(0, 4)}...${id.slice(-4)}`;
// Helper function to get proper field label
const getFieldLabel = (instructions?: string) => {
if (instructions)
return instructions.charAt(0).toUpperCase() + instructions.slice(1);
return "Data to Review";
};
// Use the existing HITL review interface
return (
<div className="space-y-4">
{nodeId && (
<Text variant="small" className="text-gray-500">
Node #{getShortenedNodeId(nodeId)}
</Text>
{!showSimplified && (
<div className="flex items-start justify-between">
<div className="flex-1">
{isDisabled && (
<Text variant="small" className="text-muted-foreground">
This item will be rejected
</Text>
)}
</div>
<Button
onClick={() => onToggleDisabled!(review.node_exec_id)}
variant={isDisabled ? "primary" : "secondary"}
size="small"
leftIcon={
isDisabled ? <EyeSlashIcon size={14} /> : <TrashIcon size={14} />
}
>
{isDisabled ? "Include" : "Exclude"}
</Button>
</div>
)}
<div className="space-y-3">
{instructions && (
{/* Show instructions as field label */}
{instructions && (
<div className="space-y-3">
<Text variant="body" className="font-semibold text-gray-900">
{instructions}
{getFieldLabel(instructions)}
</Text>
)}
{isDataEditable && !autoApproveFuture ? (
renderDataInput()
) : (
<div className="rounded-lg border border-gray-200 bg-white p-3">
<Text variant="small" className="text-gray-600">
{JSON.stringify(currentData, null, 2)}
</Text>
</div>
)}
</div>
{/* Auto-approve toggle for this review */}
{showAutoApprove && onAutoApproveFutureChange && (
<div className="space-y-2 pt-2">
<div className="flex items-center gap-3">
<Switch
checked={autoApproveFuture}
onCheckedChange={(enabled: boolean) =>
onAutoApproveFutureChange(review.node_exec_id, enabled)
}
/>
<Text variant="small" className="text-gray-700">
Auto-approve future executions of this block
</Text>
</div>
{autoApproveFuture && (
<Text variant="small" className="pl-11 text-gray-500">
Original data will be used for this and all future reviews from
this block.
</Text>
{isDataEditable && !isDisabled ? (
renderDataInput()
) : (
<div className="rounded-lg border border-gray-200 bg-white p-3">
<Text variant="small" className="text-gray-600">
{JSON.stringify(currentData, null, 2)}
</Text>
</div>
)}
</div>
)}
{/* If no instructions, show data directly */}
{!instructions && (
<div className="space-y-3">
<Text variant="body" className="font-semibold text-gray-900">
Data to Review
{!isDataEditable && (
<span className="ml-2 text-xs text-muted-foreground">
(Read-only)
</span>
)}
</Text>
{isDataEditable && !isDisabled ? (
renderDataInput()
) : (
<div className="rounded-lg border border-gray-200 bg-white p-3">
<Text variant="small" className="text-gray-600">
{JSON.stringify(currentData, null, 2)}
</Text>
</div>
)}
</div>
)}
{!showSimplified && isDisabled && (
<div>
<Text variant="body" className="mb-2 font-semibold">
Rejection Reason (Optional):
</Text>
<Input
id="rejection-reason"
label="Rejection Reason"
hideLabel
size="small"
type="textarea"
rows={3}
value={reviewMessage}
onChange={(e) => handleMessageChange(e.target.value)}
placeholder="Add any notes about why you're rejecting this..."
/>
</div>
)}
</div>
);
}

View File

@@ -1,16 +1,10 @@
import { useMemo, useState } from "react";
import { useState } from "react";
import { PendingHumanReviewModel } from "@/app/api/__generated__/models/pendingHumanReviewModel";
import { PendingReviewCard } from "@/components/organisms/PendingReviewCard/PendingReviewCard";
import { Text } from "@/components/atoms/Text/Text";
import { Button } from "@/components/atoms/Button/Button";
import { Switch } from "@/components/atoms/Switch/Switch";
import { useToast } from "@/components/molecules/Toast/use-toast";
import {
ClockIcon,
WarningIcon,
CaretDownIcon,
CaretRightIcon,
} from "@phosphor-icons/react";
import { ClockIcon, WarningIcon } from "@phosphor-icons/react";
import { usePostV2ProcessReviewAction } from "@/app/api/__generated__/endpoints/executions/executions";
interface PendingReviewsListProps {
@@ -38,34 +32,16 @@ export function PendingReviewsList({
},
);
const [reviewMessageMap, setReviewMessageMap] = useState<
Record<string, string>
>({});
const [pendingAction, setPendingAction] = useState<
"approve" | "reject" | null
>(null);
const [autoApproveFutureMap, setAutoApproveFutureMap] = useState<
Record<string, boolean>
>({});
const [collapsedGroups, setCollapsedGroups] = useState<
Record<string, boolean>
>({});
const { toast } = useToast();
const groupedReviews = useMemo(() => {
return reviews.reduce(
(acc, review) => {
const nodeId = review.node_id || "unknown";
if (!acc[nodeId]) {
acc[nodeId] = [];
}
acc[nodeId].push(review);
return acc;
},
{} as Record<string, PendingHumanReviewModel[]>,
);
}, [reviews]);
const reviewActionMutation = usePostV2ProcessReviewAction({
mutation: {
onSuccess: (res) => {
@@ -112,33 +88,8 @@ export function PendingReviewsList({
setReviewDataMap((prev) => ({ ...prev, [nodeExecId]: data }));
}
function handleAutoApproveFutureToggle(nodeId: string, enabled: boolean) {
setAutoApproveFutureMap((prev) => ({
...prev,
[nodeId]: enabled,
}));
if (enabled) {
const nodeReviews = groupedReviews[nodeId] || [];
setReviewDataMap((prev) => {
const updated = { ...prev };
nodeReviews.forEach((review) => {
updated[review.node_exec_id] = JSON.stringify(
review.payload,
null,
2,
);
});
return updated;
});
}
}
function toggleGroupCollapse(nodeId: string) {
setCollapsedGroups((prev) => ({
...prev,
[nodeId]: !prev[nodeId],
}));
function handleReviewMessageChange(nodeExecId: string, message: string) {
setReviewMessageMap((prev) => ({ ...prev, [nodeExecId]: message }));
}
function processReviews(approved: boolean) {
@@ -156,25 +107,22 @@ export function PendingReviewsList({
for (const review of reviews) {
const reviewData = reviewDataMap[review.node_exec_id];
const autoApproveThisNode = autoApproveFutureMap[review.node_id || ""];
const reviewMessage = reviewMessageMap[review.node_exec_id];
let parsedData: any = undefined;
let parsedData: any = review.payload; // Default to original payload
if (!autoApproveThisNode) {
if (review.editable && reviewData) {
try {
parsedData = JSON.parse(reviewData);
} catch (error) {
toast({
title: "Invalid JSON",
description: `Please fix the JSON format in review for node ${review.node_exec_id}: ${error instanceof Error ? error.message : "Invalid syntax"}`,
variant: "destructive",
});
setPendingAction(null);
return;
}
} else {
parsedData = review.payload;
// Parse edited data if available and editable
if (review.editable && reviewData) {
try {
parsedData = JSON.parse(reviewData);
} catch (error) {
toast({
title: "Invalid JSON",
description: `Please fix the JSON format in review for node ${review.node_exec_id}: ${error instanceof Error ? error.message : "Invalid syntax"}`,
variant: "destructive",
});
setPendingAction(null);
return;
}
}
@@ -182,7 +130,7 @@ export function PendingReviewsList({
node_exec_id: review.node_exec_id,
approved,
reviewed_data: parsedData,
auto_approve_future: autoApproveThisNode && approved,
message: reviewMessage || undefined,
});
}
@@ -210,6 +158,7 @@ export function PendingReviewsList({
return (
<div className="space-y-7 rounded-xl border border-yellow-150 bg-yellow-25 p-6">
{/* Warning Box Header */}
<div className="space-y-6">
<div className="flex items-start gap-2">
<WarningIcon
@@ -231,76 +180,23 @@ export function PendingReviewsList({
</div>
<div className="space-y-7">
{Object.entries(groupedReviews).map(([nodeId, nodeReviews]) => {
const isCollapsed = collapsedGroups[nodeId] ?? nodeReviews.length > 1;
const reviewCount = nodeReviews.length;
const firstReview = nodeReviews[0];
const blockName = firstReview?.instructions;
const reviewTitle = `Review required for ${blockName}`;
const getShortenedNodeId = (id: string) => {
if (id.length <= 8) return id;
return `${id.slice(0, 4)}...${id.slice(-4)}`;
};
return (
<div key={nodeId} className="space-y-4">
<button
onClick={() => toggleGroupCollapse(nodeId)}
className="flex w-full items-center gap-2 text-left"
>
{isCollapsed ? (
<CaretRightIcon size={20} className="text-gray-600" />
) : (
<CaretDownIcon size={20} className="text-gray-600" />
)}
<div className="flex-1">
<Text variant="body" className="font-semibold text-gray-900">
{reviewTitle}
</Text>
<Text variant="small" className="text-gray-500">
Node #{getShortenedNodeId(nodeId)}
</Text>
</div>
<span className="text-xs text-gray-600">
{reviewCount} {reviewCount === 1 ? "review" : "reviews"}
</span>
</button>
{!isCollapsed && (
<div className="space-y-4">
{nodeReviews.map((review) => (
<PendingReviewCard
key={review.node_exec_id}
review={review}
onReviewDataChange={handleReviewDataChange}
autoApproveFuture={autoApproveFutureMap[nodeId] || false}
externalDataValue={reviewDataMap[review.node_exec_id]}
showAutoApprove={false}
/>
))}
<div className="flex items-center gap-3 pt-2">
<Switch
checked={autoApproveFutureMap[nodeId] || false}
onCheckedChange={(enabled: boolean) =>
handleAutoApproveFutureToggle(nodeId, enabled)
}
/>
<Text variant="small" className="text-gray-700">
Auto-approve future executions of this node
</Text>
</div>
</div>
)}
</div>
);
})}
{reviews.map((review) => (
<PendingReviewCard
key={review.node_exec_id}
review={review}
onReviewDataChange={handleReviewDataChange}
onReviewMessageChange={handleReviewMessageChange}
reviewMessage={reviewMessageMap[review.node_exec_id] || ""}
/>
))}
</div>
<div className="space-y-4">
<div className="flex flex-wrap gap-2">
<div className="space-y-7">
<Text variant="body" className="text-textGrey">
Note: Changes you make here apply only to this task
</Text>
<div className="flex gap-2">
<Button
onClick={() => processReviews(true)}
disabled={reviewActionMutation.isPending || reviews.length === 0}
@@ -324,11 +220,6 @@ export function PendingReviewsList({
Reject
</Button>
</div>
<Text variant="small" className="text-textGrey">
You can turn auto-approval on or off using the toggle above for each
node.
</Text>
</div>
</div>
);

View File

@@ -15,22 +15,8 @@ export function usePendingReviews() {
};
}
interface UsePendingReviewsForExecutionOptions {
enabled?: boolean;
refetchInterval?: number | false;
}
export function usePendingReviewsForExecution(
graphExecId: string,
options?: UsePendingReviewsForExecutionOptions,
) {
const query = useGetV2GetPendingReviewsForExecution(graphExecId, {
query: {
enabled: options?.enabled ?? !!graphExecId,
refetchInterval: options?.refetchInterval,
refetchIntervalInBackground: !!options?.refetchInterval,
},
});
export function usePendingReviewsForExecution(graphExecId: string) {
const query = useGetV2GetPendingReviewsForExecution(graphExecId);
return {
pendingReviews: okData(query.data) || [],

View File

@@ -10,7 +10,6 @@ export enum Key {
LIBRARY_AGENTS_CACHE = "library-agents-cache",
CHAT_SESSION_ID = "chat_session_id",
COOKIE_CONSENT = "autogpt_cookie_consent",
AI_AGENT_SAFETY_POPUP_SHOWN = "ai-agent-safety-popup-shown",
}
function get(key: Key) {

View File

@@ -1 +0,0 @@
# Video editing blocks

View File

@@ -233,6 +233,7 @@ Below is a comprehensive list of all available blocks, categorized by their prim
| [Stagehand Extract](block-integrations/stagehand/blocks.md#stagehand-extract) | Extract structured data from a webpage |
| [Stagehand Observe](block-integrations/stagehand/blocks.md#stagehand-observe) | Find suggested actions for your workflows |
| [Unreal Text To Speech](block-integrations/llm.md#unreal-text-to-speech) | Converts text to speech using the Unreal Speech API |
| [Video Narration](block-integrations/video/narration.md#video-narration) | Generate AI narration and add to video |
## Search and Information Retrieval
@@ -472,9 +473,13 @@ Below is a comprehensive list of all available blocks, categorized by their prim
| Block Name | Description |
|------------|-------------|
| [Add Audio To Video](block-integrations/multimedia.md#add-audio-to-video) | Block to attach an audio file to a video file using moviepy |
| [Loop Video](block-integrations/multimedia.md#loop-video) | Block to loop a video to a given duration or number of repeats |
| [Media Duration](block-integrations/multimedia.md#media-duration) | Block to get the duration of a media file |
| [Add Audio To Video](block-integrations/video/add_audio.md#add-audio-to-video) | Block to attach an audio file to a video file using moviepy |
| [Loop Video](block-integrations/video/loop.md#loop-video) | Block to loop a video to a given duration or number of repeats |
| [Media Duration](block-integrations/video/duration.md#media-duration) | Block to get the duration of a media file |
| [Video Clip](block-integrations/video/clip.md#video-clip) | Extract a time segment from a video |
| [Video Concat](block-integrations/video/concat.md#video-concat) | Merge multiple video clips into one continuous video |
| [Video Download](block-integrations/video/download.md#video-download) | Download video from URL (YouTube, Vimeo, news sites, direct links) |
| [Video Text Overlay](block-integrations/video/text_overlay.md#video-text-overlay) | Add text overlay/caption to video |
## Productivity

View File

@@ -85,7 +85,6 @@
* [LLM](block-integrations/llm.md)
* [Logic](block-integrations/logic.md)
* [Misc](block-integrations/misc.md)
* [Multimedia](block-integrations/multimedia.md)
* [Notion Create Page](block-integrations/notion/create_page.md)
* [Notion Read Database](block-integrations/notion/read_database.md)
* [Notion Read Page](block-integrations/notion/read_page.md)
@@ -129,5 +128,13 @@
* [Twitter Timeline](block-integrations/twitter/timeline.md)
* [Twitter Tweet Lookup](block-integrations/twitter/tweet_lookup.md)
* [Twitter User Lookup](block-integrations/twitter/user_lookup.md)
* [Video Add Audio](block-integrations/video/add_audio.md)
* [Video Clip](block-integrations/video/clip.md)
* [Video Concat](block-integrations/video/concat.md)
* [Video Download](block-integrations/video/download.md)
* [Video Duration](block-integrations/video/duration.md)
* [Video Loop](block-integrations/video/loop.md)
* [Video Narration](block-integrations/video/narration.md)
* [Video Text Overlay](block-integrations/video/text_overlay.md)
* [Wolfram LLM API](block-integrations/wolfram/llm_api.md)
* [Zerobounce Validate Emails](block-integrations/zerobounce/validate_emails.md)

View File

@@ -1,119 +0,0 @@
# Multimedia
<!-- MANUAL: file_description -->
Blocks for processing and manipulating video and audio files.
<!-- END MANUAL -->
## Add Audio To Video
### What it is
Block to attach an audio file to a video file using moviepy.
### How it works
<!-- MANUAL: how_it_works -->
This block combines a video file with an audio file using the moviepy library. The audio track is attached to the video, optionally with volume adjustment via the volume parameter (1.0 = original volume).
Input files can be URLs, data URIs, or local paths. The output can be returned as either a file path or base64 data URI.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| video_in | Video input (URL, data URI, or local path). | str (file) | Yes |
| audio_in | Audio input (URL, data URI, or local path). | str (file) | Yes |
| volume | Volume scale for the newly attached audio track (1.0 = original). | float | No |
| output_return_type | Return the final output as a relative path or base64 data URI. | "file_path" \| "data_uri" | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| video_out | Final video (with attached audio), as a path or data URI. | str (file) |
### Possible use case
<!-- MANUAL: use_case -->
**Add Voiceover**: Combine generated voiceover audio with video content for narrated videos.
**Background Music**: Add music tracks to silent videos or replace existing audio.
**Audio Replacement**: Swap the audio track of a video for localization or accessibility.
<!-- END MANUAL -->
---
## Loop Video
### What it is
Block to loop a video to a given duration or number of repeats.
### How it works
<!-- MANUAL: how_it_works -->
This block extends a video by repeating it to reach a target duration or number of loops. Set duration to specify the total length in seconds, or use n_loops to repeat the video a specific number of times.
The looped video is seamlessly concatenated and can be output as a file path or base64 data URI.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| video_in | The input video (can be a URL, data URI, or local path). | str (file) | Yes |
| duration | Target duration (in seconds) to loop the video to. If omitted, defaults to no looping. | float | No |
| n_loops | Number of times to repeat the video. If omitted, defaults to 1 (no repeat). | int | No |
| output_return_type | How to return the output video. Either a relative path or base64 data URI. | "file_path" \| "data_uri" | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| video_out | Looped video returned either as a relative path or a data URI. | str |
### Possible use case
<!-- MANUAL: use_case -->
**Background Videos**: Loop short clips to match the duration of longer audio or content.
**GIF-Like Content**: Create seamlessly looping video content for social media.
**Filler Content**: Extend short video clips to meet minimum duration requirements.
<!-- END MANUAL -->
---
## Media Duration
### What it is
Block to get the duration of a media file.
### How it works
<!-- MANUAL: how_it_works -->
This block analyzes a media file and returns its duration in seconds. Set is_video to true for video files or false for audio files to ensure proper parsing.
The input can be a URL, data URI, or local file path. The duration is returned as a float for precise timing calculations.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| media_in | Media input (URL, data URI, or local path). | str (file) | Yes |
| is_video | Whether the media is a video (True) or audio (False). | bool | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| duration | Duration of the media file (in seconds). | float |
### Possible use case
<!-- MANUAL: use_case -->
**Video Processing Prep**: Get video duration before deciding how to loop, trim, or synchronize it.
**Audio Matching**: Determine audio length to generate matching-length video content.
**Content Validation**: Verify that uploaded media meets duration requirements.
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,41 @@
# Video Add Audio
<!-- MANUAL: file_description -->
This block allows you to attach a separate audio track to a video file, replacing or combining with the original audio.
<!-- END MANUAL -->
## Add Audio To Video
### What it is
Block to attach an audio file to a video file using moviepy.
### How it works
<!-- MANUAL: how_it_works -->
The block uses MoviePy to combine video and audio files. It loads the video and audio inputs (which can be URLs, data URIs, or local paths), optionally scales the audio volume, then writes the combined result to a new video file using H.264 video codec and AAC audio codec.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| video_in | Video input (URL, data URI, or local path). | str (file) | Yes |
| audio_in | Audio input (URL, data URI, or local path). | str (file) | Yes |
| volume | Volume scale for the newly attached audio track (1.0 = original). | float | No |
| output_return_type | Return the final output as a relative path or base64 data URI. | "file_path" \| "data_uri" | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| video_out | Final video (with attached audio), as a path or data URI. | str (file) |
### Possible use case
<!-- MANUAL: use_case -->
- Adding background music to a silent screen recording
- Replacing original audio with a voiceover or translated audio track
- Combining AI-generated speech with stock footage
- Adding sound effects to video content
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,42 @@
# Video Clip
<!-- MANUAL: file_description -->
This block extracts a specific time segment from a video file, allowing you to trim videos to precise start and end times.
<!-- END MANUAL -->
## Video Clip
### What it is
Extract a time segment from a video
### How it works
<!-- MANUAL: how_it_works -->
The block uses MoviePy's `subclipped` function to extract a portion of the video between specified start and end times. It validates that end time is greater than start time, then creates a new video file containing only the selected segment. The output is encoded with H.264 video codec and AAC audio codec, preserving both video and audio from the original clip.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| video_in | Input video (URL, data URI, or local path) | str (file) | Yes |
| start_time | Start time in seconds | float | Yes |
| end_time | End time in seconds | float | Yes |
| output_format | Output format | "mp4" \| "webm" \| "mkv" \| "mov" | No |
| output_return_type | Return the output as a relative path or base64 data URI. | "file_path" \| "data_uri" | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| video_out | Clipped video file (path or data URI) | str (file) |
| duration | Clip duration in seconds | float |
### Possible use case
<!-- MANUAL: use_case -->
- Extracting highlights from a longer video
- Trimming intro/outro from recorded content
- Creating short clips for social media from longer videos
- Isolating specific segments for further processing in a workflow
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,42 @@
# Video Concat
<!-- MANUAL: file_description -->
This block merges multiple video clips into a single continuous video, with optional transitions between clips.
<!-- END MANUAL -->
## Video Concat
### What it is
Merge multiple video clips into one continuous video
### How it works
<!-- MANUAL: how_it_works -->
The block uses MoviePy's `concatenate_videoclips` function to join multiple videos in sequence. It supports three transition modes: **none** (direct concatenation), **crossfade** (smooth blending where clips overlap), and **fade_black** (each clip fades out to black and the next fades in). At least 2 videos are required. The output is encoded with H.264 video codec and AAC audio codec.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| videos | List of video files to concatenate (in order) | List[str (file)] | Yes |
| transition | Transition between clips | "none" \| "crossfade" \| "fade_black" | No |
| transition_duration | Transition duration in seconds | int | No |
| output_format | Output format | "mp4" \| "webm" \| "mkv" \| "mov" | No |
| output_return_type | Return the output as a relative path or base64 data URI. | "file_path" \| "data_uri" | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| video_out | Concatenated video file (path or data URI) | str (file) |
| total_duration | Total duration in seconds | float |
### Possible use case
<!-- MANUAL: use_case -->
- Combining multiple clips into a compilation video
- Assembling intro, main content, and outro segments
- Creating montages from multiple source videos
- Building video playlists or slideshows with transitions
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,43 @@
# Video Download
<!-- MANUAL: file_description -->
This block downloads videos from URLs, supporting a wide range of video platforms and direct links.
<!-- END MANUAL -->
## Video Download
### What it is
Download video from URL (YouTube, Vimeo, news sites, direct links)
### How it works
<!-- MANUAL: how_it_works -->
The block uses yt-dlp, a powerful video downloading library that supports over 1000 websites. It accepts a URL, quality preference, and output format, then downloads the video while merging the best available video and audio streams for the selected quality. Quality options: **best** (highest available), **1080p/720p/480p** (maximum resolution at that height), **audio_only** (extracts just the audio track).
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| url | URL of the video to download (YouTube, Vimeo, direct link, etc.) | str | Yes |
| quality | Video quality preference | "best" \| "1080p" \| "720p" \| "480p" \| "audio_only" | No |
| output_format | Output video format | "mp4" \| "webm" \| "mkv" | No |
| output_return_type | Return the output as a relative path or base64 data URI. | "file_path" \| "data_uri" | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| video_file | Downloaded video (path or data URI) | str (file) |
| duration | Video duration in seconds | float |
| title | Video title from source | str |
| source_url | Original source URL | str |
### Possible use case
<!-- MANUAL: use_case -->
- Downloading source videos for editing or remixing
- Archiving video content for offline processing
- Extracting audio from videos for transcription or podcast creation
- Gathering video content for automated content pipelines
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,38 @@
# Video Duration
<!-- MANUAL: file_description -->
This block retrieves the duration of video or audio files, useful for planning and conditional logic in media workflows.
<!-- END MANUAL -->
## Media Duration
### What it is
Block to get the duration of a media file.
### How it works
<!-- MANUAL: how_it_works -->
The block uses MoviePy to load the media file and extract its duration property. It supports both video files (using VideoFileClip) and audio files (using AudioFileClip), determined by the `is_video` flag. The media can be provided as a URL, data URI, or local file path. The duration is returned in seconds as a floating-point number.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| media_in | Media input (URL, data URI, or local path). | str (file) | Yes |
| is_video | Whether the media is a video (True) or audio (False). | bool | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| duration | Duration of the media file (in seconds). | float |
### Possible use case
<!-- MANUAL: use_case -->
- Checking video length before processing to avoid timeout issues
- Calculating how many times to loop a video to reach a target duration
- Validating that uploaded content meets length requirements
- Building conditional workflows based on media duration
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,40 @@
# Video Loop
<!-- MANUAL: file_description -->
This block repeats a video to extend its duration, either to a specific length or a set number of repetitions.
<!-- END MANUAL -->
## Loop Video
### What it is
Block to loop a video to a given duration or number of repeats.
### How it works
<!-- MANUAL: how_it_works -->
The block uses MoviePy's Loop effect to repeat a video clip. You can specify either a target duration (the video will repeat until reaching that length) or a number of loops (the video will repeat that many times). The Loop effect handles both video and audio looping automatically, maintaining sync. Either `duration` or `n_loops` must be provided. The output is encoded with H.264 video codec and AAC audio codec.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| video_in | The input video (can be a URL, data URI, or local path). | str (file) | Yes |
| duration | Target duration (in seconds) to loop the video to. If omitted, defaults to no looping. | float | No |
| n_loops | Number of times to repeat the video. If omitted, defaults to 1 (no repeat). | int | No |
| output_return_type | How to return the output video. Either a relative path or base64 data URI. | "file_path" \| "data_uri" | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| video_out | Looped video returned either as a relative path or a data URI. | str |
### Possible use case
<!-- MANUAL: use_case -->
- Extending a short background video to match the length of narration audio
- Creating seamless looping content for digital signage
- Repeating a product demo video multiple times for emphasis
- Extending short clips to meet minimum duration requirements for platforms
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,44 @@
# Video Narration
<!-- MANUAL: file_description -->
This block generates AI voiceover narration using ElevenLabs and adds it to a video, with flexible audio mixing options.
<!-- END MANUAL -->
## Video Narration
### What it is
Generate AI narration and add to video
### How it works
<!-- MANUAL: how_it_works -->
The block uses ElevenLabs text-to-speech API to generate natural-sounding narration from your script. It then combines the narration with the video using MoviePy. Three audio mixing modes are available: **replace** (completely replaces original audio), **mix** (blends narration with original audio at configurable volumes), and **ducking** (similar to mix but applies stronger attenuation to original audio, making narration more prominent). The block outputs both the final video and the generated audio file separately.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| video_in | Input video (URL, data URI, or local path) | str (file) | Yes |
| script | Narration script text | str | Yes |
| voice_id | ElevenLabs voice ID | str | No |
| mix_mode | How to combine with original audio. 'ducking' applies stronger attenuation than 'mix'. | "replace" \| "mix" \| "ducking" | No |
| narration_volume | Narration volume (0.0 to 2.0) | float | No |
| original_volume | Original audio volume when mixing (0.0 to 1.0) | float | No |
| output_return_type | Return the output as a relative path or base64 data URI. | "file_path" \| "data_uri" | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| video_out | Video with narration (path or data URI) | str (file) |
| audio_file | Generated audio file (path or data URI) | str (file) |
### Possible use case
<!-- MANUAL: use_case -->
- Adding professional voiceover to product demos or tutorials
- Creating narrated explainer videos from screen recordings
- Generating multi-language versions of video content
- Adding commentary to gameplay or walkthrough videos
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,45 @@
# Video Text Overlay
<!-- MANUAL: file_description -->
This block adds customizable text captions or titles to videos, with control over positioning, timing, and styling.
<!-- END MANUAL -->
## Video Text Overlay
### What it is
Add text overlay/caption to video
### How it works
<!-- MANUAL: how_it_works -->
The block uses MoviePy's TextClip and CompositeVideoClip to render text onto video frames. The text is created as a separate clip with configurable font size, color, and optional background color, then composited over the video at the specified position. Timing can be controlled to show text only during specific portions of the video. Position options include center alignments (top, center, bottom) and corner positions (top-left, top-right, bottom-left, bottom-right). The output is encoded with H.264 video codec and AAC audio codec.
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| video_in | Input video (URL, data URI, or local path) | str (file) | Yes |
| text | Text to overlay on video | str | Yes |
| position | Position of text on screen | "top" \| "center" \| "bottom" \| "top-left" \| "top-right" \| "bottom-left" \| "bottom-right" | No |
| start_time | When to show text (seconds). None = entire video | float | No |
| end_time | When to hide text (seconds). None = until end | float | No |
| font_size | Font size | int | No |
| font_color | Font color (hex or name) | str | No |
| bg_color | Background color behind text (None for transparent) | str | No |
| output_return_type | Return the output as a relative path or base64 data URI. | "file_path" \| "data_uri" | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| video_out | Video with text overlay (path or data URI) | str (file) |
### Possible use case
<!-- MANUAL: use_case -->
- Adding titles or chapter headings to video content
- Creating lower-thirds with speaker names or captions
- Watermarking videos with branding text
- Adding call-to-action text at specific moments in a video
<!-- END MANUAL -->
---