Compare commits

...

7 Commits

Author SHA1 Message Date
claude[bot]
f8d3893c16 fix(blocks): Address review feedback for video editing blocks
- Add start_time < end_time validation in VideoClipBlock and VideoTextOverlayBlock
- Fix resource leaks: close AudioFileClip in narration.py, TextClip in text_overlay.py
- Fix concat.py: proper resource cleanup in finally block, load clips individually
- Implement proper crossfade using crossfadein/crossfadeout
- Implement ducking mode with stronger attenuation (0.3x original_volume)
- Remove unused start_time/end_time params from VideoDownloadBlock
- Fix None handling for duration/title in download.py (use 'or' instead of 'get' default)
- Add exception chaining with 'from e' in all blocks
- Add minimum clips validation in VideoConcatBlock
- Sort __all__ in __init__.py
- Increase ElevenLabs API timeout to 120s for longer scripts

Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
2026-01-18 23:27:04 +00:00
Nicholas Tindle
1cfbc0dd08 feat(video): Update __init__.py with full exports 2026-01-18 15:34:04 -06:00
Nicholas Tindle
ff84643b48 feat(video): Add VideoNarrationBlock 2026-01-18 15:33:48 -06:00
Nicholas Tindle
c19c3c834a feat(video): Add VideoTextOverlayBlock 2026-01-18 15:33:47 -06:00
Nicholas Tindle
d0f7ba8cfd feat(video): Add VideoConcatBlock 2026-01-18 15:33:46 -06:00
Nicholas Tindle
2a855f4bd0 feat(video): Add VideoClipBlock 2026-01-18 15:32:59 -06:00
Nicholas Tindle
b93bb3b9f8 feat(video): Add VideoDownloadBlock 2026-01-18 15:32:58 -06:00
6 changed files with 662 additions and 1 deletions

View File

@@ -1 +1,28 @@
# Video editing blocks
"""Video editing blocks for AutoGPT Platform.
This module provides blocks for:
- Downloading videos from URLs (YouTube, Vimeo, news sites, direct links)
- Clipping/trimming video segments
- Concatenating multiple videos
- Adding text overlays
- Adding AI-generated narration
Dependencies:
- yt-dlp: For video downloading
- moviepy: For video editing operations
- requests: For API calls (narration block)
"""
from .download import VideoDownloadBlock
from .clip import VideoClipBlock
from .concat import VideoConcatBlock
from .text_overlay import VideoTextOverlayBlock
from .narration import VideoNarrationBlock
__all__ = [
"VideoClipBlock",
"VideoConcatBlock",
"VideoDownloadBlock",
"VideoNarrationBlock",
"VideoTextOverlayBlock",
]

View File

@@ -0,0 +1,93 @@
"""
VideoClipBlock - Extract a segment from a video file
"""
import uuid
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
class VideoClipBlock(Block):
"""Extract a time segment from a video."""
class Input(BlockSchemaInput):
video_in: str = SchemaField(
description="Input video (URL, data URI, or file path)",
json_schema_extra={"format": "file"}
)
start_time: float = SchemaField(
description="Start time in seconds",
ge=0.0
)
end_time: float = SchemaField(
description="End time in seconds",
ge=0.0
)
output_format: str = SchemaField(
description="Output format",
default="mp4",
advanced=True
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Clipped video file",
json_schema_extra={"format": "file"}
)
duration: float = SchemaField(description="Clip duration in seconds")
def __init__(self):
super().__init__(
id="b2c3d4e5-f6a7-8901-bcde-f23456789012",
description="Extract a time segment from a video",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={"video_in": "/tmp/test.mp4", "start_time": 0.0, "end_time": 10.0},
test_output=[("video_out", str), ("duration", float)],
test_mock={"_clip_video": lambda *args: ("/tmp/clip.mp4", 10.0)}
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
# Validate time range
if input_data.end_time <= input_data.start_time:
raise BlockExecutionError(
message=f"end_time ({input_data.end_time}) must be greater than start_time ({input_data.start_time})",
block_name=self.name,
block_id=str(self.id)
)
try:
from moviepy.video.io.VideoFileClip import VideoFileClip
except ImportError as e:
raise BlockExecutionError(
message="moviepy is not installed. Please install it with: pip install moviepy",
block_name=self.name,
block_id=str(self.id)
) from e
clip = None
subclip = None
try:
clip = VideoFileClip(input_data.video_in)
subclip = clip.subclip(input_data.start_time, input_data.end_time)
output_path = f"/tmp/clip_{uuid.uuid4()}.{input_data.output_format}"
subclip.write_videofile(output_path, logger=None)
yield "video_out", output_path
yield "duration", subclip.duration
except Exception as e:
raise BlockExecutionError(
message=f"Failed to clip video: {e}",
block_name=self.name,
block_id=str(self.id)
) from e
finally:
if subclip:
subclip.close()
if clip:
clip.close()

View File

@@ -0,0 +1,123 @@
"""
VideoConcatBlock - Concatenate multiple video clips into one
"""
import uuid
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
class VideoConcatBlock(Block):
"""Merge multiple video clips into one continuous video."""
class Input(BlockSchemaInput):
videos: list[str] = SchemaField(
description="List of video files to concatenate (in order)"
)
transition: str = SchemaField(
description="Transition between clips",
default="none",
enum=["none", "crossfade", "fade_black"]
)
transition_duration: float = SchemaField(
description="Transition duration in seconds",
default=0.5,
advanced=True
)
output_format: str = SchemaField(
description="Output format",
default="mp4",
advanced=True
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Concatenated video file",
json_schema_extra={"format": "file"}
)
total_duration: float = SchemaField(description="Total duration in seconds")
def __init__(self):
super().__init__(
id="c3d4e5f6-a7b8-9012-cdef-345678901234",
description="Merge multiple video clips into one continuous video",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={"videos": ["/tmp/a.mp4", "/tmp/b.mp4"]},
test_output=[("video_out", str), ("total_duration", float)],
test_mock={"_concat_videos": lambda *args: ("/tmp/concat.mp4", 20.0)}
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
from moviepy.editor import VideoFileClip, concatenate_videoclips
except ImportError as e:
raise BlockExecutionError(
message="moviepy is not installed. Please install it with: pip install moviepy",
block_name=self.name,
block_id=str(self.id)
) from e
# Validate minimum clips
if len(input_data.videos) < 2:
raise BlockExecutionError(
message="At least 2 videos are required for concatenation",
block_name=self.name,
block_id=str(self.id)
)
clips = []
faded_clips = []
final = None
try:
# Load clips one by one to handle partial failures
for v in input_data.videos:
clips.append(VideoFileClip(v))
if input_data.transition == "crossfade":
# Apply crossfade between clips using crossfadein/crossfadeout
transition_dur = input_data.transition_duration
for i, clip in enumerate(clips):
if i > 0:
clip = clip.crossfadein(transition_dur)
if i < len(clips) - 1:
clip = clip.crossfadeout(transition_dur)
faded_clips.append(clip)
final = concatenate_videoclips(
faded_clips,
method="compose",
padding=-transition_dur
)
elif input_data.transition == "fade_black":
# Fade to black between clips
for clip in clips:
faded = clip.fadein(input_data.transition_duration).fadeout(
input_data.transition_duration
)
faded_clips.append(faded)
final = concatenate_videoclips(faded_clips)
else:
final = concatenate_videoclips(clips)
output_path = f"/tmp/concat_{uuid.uuid4()}.{input_data.output_format}"
final.write_videofile(output_path, logger=None)
yield "video_out", output_path
yield "total_duration", final.duration
except Exception as e:
raise BlockExecutionError(
message=f"Failed to concatenate videos: {e}",
block_name=self.name,
block_id=str(self.id)
) from e
finally:
if final:
final.close()
for clip in faded_clips:
clip.close()
for clip in clips:
clip.close()

View File

@@ -0,0 +1,102 @@
"""
VideoDownloadBlock - Download video from URL (YouTube, Vimeo, news sites, direct links)
"""
import uuid
from typing import Literal
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
class VideoDownloadBlock(Block):
"""Download video from URL using yt-dlp."""
class Input(BlockSchemaInput):
url: str = SchemaField(
description="URL of the video to download (YouTube, Vimeo, direct link, etc.)",
placeholder="https://www.youtube.com/watch?v=..."
)
quality: Literal["best", "1080p", "720p", "480p", "audio_only"] = SchemaField(
description="Video quality preference",
default="720p"
)
output_format: Literal["mp4", "webm", "mkv"] = SchemaField(
description="Output video format",
default="mp4",
advanced=True
)
class Output(BlockSchemaOutput):
video_file: str = SchemaField(
description="Path or data URI of downloaded video",
json_schema_extra={"format": "file"}
)
duration: float = SchemaField(description="Video duration in seconds")
title: str = SchemaField(description="Video title from source")
source_url: str = SchemaField(description="Original source URL")
def __init__(self):
super().__init__(
id="a1b2c3d4-e5f6-7890-abcd-ef1234567890",
description="Download video from URL (YouTube, Vimeo, news sites, direct links)",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ", "quality": "480p"},
test_output=[("video_file", str), ("duration", float), ("title", str), ("source_url", str)],
test_mock={"_download_video": lambda *args: ("/tmp/video.mp4", 212.0, "Test Video")}
)
def _get_format_string(self, quality: str) -> str:
formats = {
"best": "bestvideo+bestaudio/best",
"1080p": "bestvideo[height<=1080]+bestaudio/best[height<=1080]",
"720p": "bestvideo[height<=720]+bestaudio/best[height<=720]",
"480p": "bestvideo[height<=480]+bestaudio/best[height<=480]",
"audio_only": "bestaudio/best"
}
return formats.get(quality, formats["720p"])
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
import yt_dlp
except ImportError as e:
raise BlockExecutionError(
message="yt-dlp is not installed. Please install it with: pip install yt-dlp",
block_name=self.name,
block_id=str(self.id)
) from e
video_id = str(uuid.uuid4())[:8]
output_template = f"/tmp/{video_id}.%(ext)s"
ydl_opts = {
"format": self._get_format_string(input_data.quality),
"outtmpl": output_template,
"merge_output_format": input_data.output_format,
"quiet": True,
"no_warnings": True,
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(input_data.url, download=True)
video_path = ydl.prepare_filename(info)
# Handle format conversion in filename
if not video_path.endswith(f".{input_data.output_format}"):
video_path = video_path.rsplit(".", 1)[0] + f".{input_data.output_format}"
yield "video_file", video_path
yield "duration", info.get("duration") or 0.0
yield "title", info.get("title") or "Unknown"
yield "source_url", input_data.url
except Exception as e:
raise BlockExecutionError(
message=f"Failed to download video: {e}",
block_name=self.name,
block_id=str(self.id)
) from e

View File

@@ -0,0 +1,167 @@
"""
VideoNarrationBlock - Generate AI voice narration and add to video
"""
import uuid
from typing import Literal
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.model import SchemaField, CredentialsMetaInput, APIKeyCredentials
from backend.integrations.providers import ProviderName
from backend.util.exceptions import BlockExecutionError
class VideoNarrationBlock(Block):
"""Generate AI narration and add to video."""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput[
Literal[ProviderName.ELEVENLABS], Literal["api_key"]
] = SchemaField(
description="ElevenLabs API key for voice synthesis"
)
video_in: str = SchemaField(
description="Input video file",
json_schema_extra={"format": "file"}
)
script: str = SchemaField(
description="Narration script text"
)
voice_id: str = SchemaField(
description="ElevenLabs voice ID",
default="21m00Tcm4TlvDq8ikWAM" # Rachel
)
mix_mode: Literal["replace", "mix", "ducking"] = SchemaField(
description="How to combine with original audio",
default="ducking"
)
narration_volume: float = SchemaField(
description="Narration volume (0.0 to 2.0)",
default=1.0,
ge=0.0,
le=2.0,
advanced=True
)
original_volume: float = SchemaField(
description="Original audio volume when mixing (0.0 to 1.0)",
default=0.3,
ge=0.0,
le=1.0,
advanced=True
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Video with narration",
json_schema_extra={"format": "file"}
)
audio_file: str = SchemaField(
description="Generated audio file",
json_schema_extra={"format": "file"}
)
def __init__(self):
super().__init__(
id="e5f6a7b8-c9d0-1234-ef56-789012345678",
description="Generate AI narration and add to video",
categories={BlockCategory.MULTIMEDIA, BlockCategory.AI},
input_schema=self.Input,
output_schema=self.Output,
test_input={
"video_in": "/tmp/test.mp4",
"script": "Hello world",
"credentials": {"provider": "elevenlabs", "id": "test", "type": "api_key"}
},
test_output=[("video_out", str), ("audio_file", str)],
test_mock={"_generate_narration": lambda *args: ("/tmp/narrated.mp4", "/tmp/audio.mp3")}
)
async def run(
self,
input_data: Input,
*,
credentials: APIKeyCredentials,
**kwargs
) -> BlockOutput:
try:
import requests
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip
except ImportError as e:
raise BlockExecutionError(
message=f"Missing dependency: {e}. Install moviepy and requests.",
block_name=self.name,
block_id=str(self.id)
) from e
video = None
final = None
narration = None
try:
# Generate narration via ElevenLabs
response = requests.post(
f"https://api.elevenlabs.io/v1/text-to-speech/{input_data.voice_id}",
headers={
"xi-api-key": credentials.api_key.get_secret_value(),
"Content-Type": "application/json"
},
json={
"text": input_data.script,
"model_id": "eleven_monolingual_v1"
},
timeout=120
)
response.raise_for_status()
audio_path = f"/tmp/narration_{uuid.uuid4()}.mp3"
with open(audio_path, "wb") as f:
f.write(response.content)
# Combine with video
video = VideoFileClip(input_data.video_in)
narration = AudioFileClip(audio_path)
narration = narration.volumex(input_data.narration_volume)
if input_data.mix_mode == "replace":
final_audio = narration
elif input_data.mix_mode == "mix":
if video.audio:
original = video.audio.volumex(input_data.original_volume)
final_audio = CompositeAudioClip([original, narration])
else:
final_audio = narration
else: # ducking - lower original volume more when narration plays
if video.audio:
# Apply stronger attenuation for ducking effect
ducking_volume = input_data.original_volume * 0.3
original = video.audio.volumex(ducking_volume)
final_audio = CompositeAudioClip([original, narration])
else:
final_audio = narration
final = video.set_audio(final_audio)
output_path = f"/tmp/narrated_{uuid.uuid4()}.mp4"
final.write_videofile(output_path, logger=None)
yield "video_out", output_path
yield "audio_file", audio_path
except requests.exceptions.RequestException as e:
raise BlockExecutionError(
message=f"ElevenLabs API error: {e}",
block_name=self.name,
block_id=str(self.id)
) from e
except Exception as e:
raise BlockExecutionError(
message=f"Failed to add narration: {e}",
block_name=self.name,
block_id=str(self.id)
) from e
finally:
if narration:
narration.close()
if final:
final.close()
if video:
video.close()

View File

@@ -0,0 +1,149 @@
"""
VideoTextOverlayBlock - Add text overlay to video
"""
import uuid
from typing import Literal
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
class VideoTextOverlayBlock(Block):
"""Add text overlay/caption to video."""
class Input(BlockSchemaInput):
video_in: str = SchemaField(
description="Input video file",
json_schema_extra={"format": "file"}
)
text: str = SchemaField(
description="Text to overlay on video"
)
position: Literal[
"top", "center", "bottom",
"top-left", "top-right",
"bottom-left", "bottom-right"
] = SchemaField(
description="Position of text on screen",
default="bottom"
)
start_time: float | None = SchemaField(
description="When to show text (seconds). None = entire video",
default=None,
advanced=True
)
end_time: float | None = SchemaField(
description="When to hide text (seconds). None = until end",
default=None,
advanced=True
)
font_size: int = SchemaField(
description="Font size",
default=48,
ge=12,
le=200,
advanced=True
)
font_color: str = SchemaField(
description="Font color (hex or name)",
default="white",
advanced=True
)
bg_color: str | None = SchemaField(
description="Background color behind text (None for transparent)",
default=None,
advanced=True
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Video with text overlay",
json_schema_extra={"format": "file"}
)
def __init__(self):
super().__init__(
id="d4e5f6a7-b8c9-0123-def4-567890123456",
description="Add text overlay/caption to video",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={"video_in": "/tmp/test.mp4", "text": "Hello World"},
test_output=[("video_out", str)],
test_mock={"_add_text": lambda *args: "/tmp/overlay.mp4"}
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip
except ImportError as e:
raise BlockExecutionError(
message="moviepy is not installed. Please install it with: pip install moviepy",
block_name=self.name,
block_id=str(self.id)
) from e
# Validate time range if both are provided
if (input_data.start_time is not None and
input_data.end_time is not None and
input_data.end_time <= input_data.start_time):
raise BlockExecutionError(
message=f"end_time ({input_data.end_time}) must be greater than start_time ({input_data.start_time})",
block_name=self.name,
block_id=str(self.id)
)
video = None
final = None
txt_clip = None
try:
video = VideoFileClip(input_data.video_in)
txt_clip = TextClip(
input_data.text,
fontsize=input_data.font_size,
color=input_data.font_color,
bg_color=input_data.bg_color,
)
# Position mapping
pos_map = {
"top": ("center", "top"),
"center": ("center", "center"),
"bottom": ("center", "bottom"),
"top-left": ("left", "top"),
"top-right": ("right", "top"),
"bottom-left": ("left", "bottom"),
"bottom-right": ("right", "bottom"),
}
txt_clip = txt_clip.set_position(pos_map[input_data.position])
# Set timing
start = input_data.start_time or 0
end = input_data.end_time or video.duration
duration = max(0, end - start)
txt_clip = txt_clip.set_start(start).set_end(end).set_duration(duration)
final = CompositeVideoClip([video, txt_clip])
output_path = f"/tmp/overlay_{uuid.uuid4()}.mp4"
final.write_videofile(output_path, logger=None)
yield "video_out", output_path
except Exception as e:
raise BlockExecutionError(
message=f"Failed to add text overlay: {e}",
block_name=self.name,
block_id=str(self.id)
) from e
finally:
if txt_clip:
txt_clip.close()
if final:
final.close()
if video:
video.close()