fix(blocks): Address review feedback for video editing blocks

- Add start_time < end_time validation in VideoClipBlock and VideoTextOverlayBlock
- Fix resource leaks: close AudioFileClip in narration.py, TextClip in text_overlay.py
- Fix concat.py: proper resource cleanup in finally block, load clips individually
- Implement proper crossfade using crossfadein/crossfadeout
- Implement ducking mode with stronger attenuation (0.3x original_volume)
- Remove unused start_time/end_time params from VideoDownloadBlock
- Fix None handling for duration/title in download.py (use 'or' instead of 'get' default)
- Add exception chaining with 'from e' in all blocks
- Add minimum clips validation in VideoConcatBlock
- Sort __all__ in __init__.py
- Increase ElevenLabs API timeout to 120s for longer scripts

Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
This commit is contained in:
claude[bot]
2026-01-18 23:27:04 +00:00
parent 1cfbc0dd08
commit f8d3893c16
6 changed files with 78 additions and 40 deletions

View File

@@ -20,9 +20,9 @@ from .text_overlay import VideoTextOverlayBlock
from .narration import VideoNarrationBlock
__all__ = [
"VideoDownloadBlock",
"VideoClipBlock",
"VideoConcatBlock",
"VideoTextOverlayBlock",
"VideoDownloadBlock",
"VideoNarrationBlock",
"VideoTextOverlayBlock",
]

View File

@@ -51,14 +51,22 @@ class VideoClipBlock(Block):
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
# Validate time range
if input_data.end_time <= input_data.start_time:
raise BlockExecutionError(
message=f"end_time ({input_data.end_time}) must be greater than start_time ({input_data.start_time})",
block_name=self.name,
block_id=str(self.id)
)
try:
from moviepy.video.io.VideoFileClip import VideoFileClip
except ImportError:
except ImportError as e:
raise BlockExecutionError(
message="moviepy is not installed. Please install it with: pip install moviepy",
block_name=self.name,
block_id=str(self.id)
)
) from e
clip = None
subclip = None
@@ -77,7 +85,7 @@ class VideoClipBlock(Block):
message=f"Failed to clip video: {e}",
block_name=self.name,
block_id=str(self.id)
)
) from e
finally:
if subclip:
subclip.close()

View File

@@ -54,27 +54,45 @@ class VideoConcatBlock(Block):
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
from moviepy.editor import VideoFileClip, concatenate_videoclips
except ImportError:
except ImportError as e:
raise BlockExecutionError(
message="moviepy is not installed. Please install it with: pip install moviepy",
block_name=self.name,
block_id=str(self.id)
) from e
# Validate minimum clips
if len(input_data.videos) < 2:
raise BlockExecutionError(
message="At least 2 videos are required for concatenation",
block_name=self.name,
block_id=str(self.id)
)
clips = []
faded_clips = []
final = None
try:
clips = [VideoFileClip(v) for v in input_data.videos]
# Load clips one by one to handle partial failures
for v in input_data.videos:
clips.append(VideoFileClip(v))
if input_data.transition == "crossfade":
# Apply crossfade between clips
# Apply crossfade between clips using crossfadein/crossfadeout
transition_dur = input_data.transition_duration
for i, clip in enumerate(clips):
if i > 0:
clip = clip.crossfadein(transition_dur)
if i < len(clips) - 1:
clip = clip.crossfadeout(transition_dur)
faded_clips.append(clip)
final = concatenate_videoclips(
clips,
faded_clips,
method="compose",
padding=-input_data.transition_duration
padding=-transition_dur
)
elif input_data.transition == "fade_black":
# Fade to black between clips
faded_clips = []
for clip in clips:
faded = clip.fadein(input_data.transition_duration).fadeout(
input_data.transition_duration
@@ -90,14 +108,16 @@ class VideoConcatBlock(Block):
yield "video_out", output_path
yield "total_duration", final.duration
final.close()
except Exception as e:
raise BlockExecutionError(
message=f"Failed to concatenate videos: {e}",
block_name=self.name,
block_id=str(self.id)
)
) from e
finally:
if final:
final.close()
for clip in faded_clips:
clip.close()
for clip in clips:
clip.close()

View File

@@ -4,7 +4,7 @@ VideoDownloadBlock - Download video from URL (YouTube, Vimeo, news sites, direct
import uuid
from typing import Literal
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
@@ -22,16 +22,6 @@ class VideoDownloadBlock(Block):
description="Video quality preference",
default="720p"
)
start_time: float | None = SchemaField(
description="Start time in seconds (optional, for clipping)",
default=None,
advanced=True
)
end_time: float | None = SchemaField(
description="End time in seconds (optional, for clipping)",
default=None,
advanced=True
)
output_format: Literal["mp4", "webm", "mkv"] = SchemaField(
description="Output video format",
default="mp4",
@@ -72,12 +62,12 @@ class VideoDownloadBlock(Block):
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
import yt_dlp
except ImportError:
except ImportError as e:
raise BlockExecutionError(
message="yt-dlp is not installed. Please install it with: pip install yt-dlp",
block_name=self.name,
block_id=str(self.id)
)
) from e
video_id = str(uuid.uuid4())[:8]
output_template = f"/tmp/{video_id}.%(ext)s"
@@ -100,8 +90,8 @@ class VideoDownloadBlock(Block):
video_path = video_path.rsplit(".", 1)[0] + f".{input_data.output_format}"
yield "video_file", video_path
yield "duration", info.get("duration", 0.0)
yield "title", info.get("title", "Unknown")
yield "duration", info.get("duration") or 0.0
yield "title", info.get("title") or "Unknown"
yield "source_url", input_data.url
except Exception as e:
@@ -109,4 +99,4 @@ class VideoDownloadBlock(Block):
message=f"Failed to download video: {e}",
block_name=self.name,
block_id=str(self.id)
)
) from e

View File

@@ -91,10 +91,11 @@ class VideoNarrationBlock(Block):
message=f"Missing dependency: {e}. Install moviepy and requests.",
block_name=self.name,
block_id=str(self.id)
)
) from e
video = None
final = None
narration = None
try:
# Generate narration via ElevenLabs
response = requests.post(
@@ -107,7 +108,7 @@ class VideoNarrationBlock(Block):
"text": input_data.script,
"model_id": "eleven_monolingual_v1"
},
timeout=60
timeout=120
)
response.raise_for_status()
@@ -117,7 +118,8 @@ class VideoNarrationBlock(Block):
# Combine with video
video = VideoFileClip(input_data.video_in)
narration = AudioFileClip(audio_path).volumex(input_data.narration_volume)
narration = AudioFileClip(audio_path)
narration = narration.volumex(input_data.narration_volume)
if input_data.mix_mode == "replace":
final_audio = narration
@@ -127,9 +129,11 @@ class VideoNarrationBlock(Block):
final_audio = CompositeAudioClip([original, narration])
else:
final_audio = narration
else: # ducking
else: # ducking - lower original volume more when narration plays
if video.audio:
original = video.audio.volumex(input_data.original_volume)
# Apply stronger attenuation for ducking effect
ducking_volume = input_data.original_volume * 0.3
original = video.audio.volumex(ducking_volume)
final_audio = CompositeAudioClip([original, narration])
else:
final_audio = narration
@@ -147,14 +151,16 @@ class VideoNarrationBlock(Block):
message=f"ElevenLabs API error: {e}",
block_name=self.name,
block_id=str(self.id)
)
) from e
except Exception as e:
raise BlockExecutionError(
message=f"Failed to add narration: {e}",
block_name=self.name,
block_id=str(self.id)
)
) from e
finally:
if narration:
narration.close()
if final:
final.close()
if video:

View File

@@ -78,15 +78,26 @@ class VideoTextOverlayBlock(Block):
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip
except ImportError:
except ImportError as e:
raise BlockExecutionError(
message="moviepy is not installed. Please install it with: pip install moviepy",
block_name=self.name,
block_id=str(self.id)
) from e
# Validate time range if both are provided
if (input_data.start_time is not None and
input_data.end_time is not None and
input_data.end_time <= input_data.start_time):
raise BlockExecutionError(
message=f"end_time ({input_data.end_time}) must be greater than start_time ({input_data.start_time})",
block_name=self.name,
block_id=str(self.id)
)
video = None
final = None
txt_clip = None
try:
video = VideoFileClip(input_data.video_in)
@@ -113,7 +124,8 @@ class VideoTextOverlayBlock(Block):
# Set timing
start = input_data.start_time or 0
end = input_data.end_time or video.duration
txt_clip = txt_clip.set_start(start).set_end(end)
duration = max(0, end - start)
txt_clip = txt_clip.set_start(start).set_end(end).set_duration(duration)
final = CompositeVideoClip([video, txt_clip])
@@ -127,8 +139,10 @@ class VideoTextOverlayBlock(Block):
message=f"Failed to add text overlay: {e}",
block_name=self.name,
block_id=str(self.id)
)
) from e
finally:
if txt_clip:
txt_clip.close()
if final:
final.close()
if video: