fix(blocks): prevent exponential filename growth when chaining video blocks

Add extract_source_name() utility that strips accumulated
{node_exec_id}_{operation}_ prefixes so chained blocks produce flat
filenames like copilot-uuid_clip_My_Video.mp4 instead of embedding
the entire input path.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Nicholas Tindle
2026-02-04 21:27:06 -06:00
parent 93d49b34d2
commit fe264545a9
7 changed files with 83 additions and 26 deletions

View File

@@ -4,10 +4,57 @@ from __future__ import annotations
import logging
import os
import re
import subprocess
from pathlib import Path
logger = logging.getLogger(__name__)
# Known operation tags added by video blocks
_VIDEO_OPS = (
r"(?:clip|overlay|narrated|looped|concat|audio_attached|with_audio|narration)"
)
# Matches: {node_exec_id}_{operation}_ where node_exec_id contains a UUID
_BLOCK_PREFIX_RE = re.compile(
r"^[a-zA-Z0-9_-]*"
r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"
r"[a-zA-Z0-9_-]*"
r"_" + _VIDEO_OPS + r"_"
)
# Matches: a lone {node_exec_id}_ prefix (no operation keyword, e.g. download output)
_UUID_PREFIX_RE = re.compile(
r"^[a-zA-Z0-9_-]*"
r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"
r"[a-zA-Z0-9_-]*_"
)
def extract_source_name(input_path: str, max_length: int = 50) -> str:
"""Extract the original source filename by stripping block-generated prefixes.
Iteratively removes {node_exec_id}_{operation}_ prefixes that accumulate
when chaining video blocks, recovering the original human-readable name.
Safe for plain filenames (no UUID -> no stripping).
Falls back to "video" if everything is stripped.
"""
stem = Path(input_path).stem
# Pass 1: strip {node_exec_id}_{operation}_ prefixes iteratively
while _BLOCK_PREFIX_RE.match(stem):
stem = _BLOCK_PREFIX_RE.sub("", stem, count=1)
# Pass 2: strip a lone {node_exec_id}_ prefix (e.g. from download block)
if _UUID_PREFIX_RE.match(stem):
stem = _UUID_PREFIX_RE.sub("", stem, count=1)
if not stem:
return "video"
return stem[:max_length]
def get_video_codecs(output_path: str) -> tuple[str, str]:
"""Get appropriate video and audio codecs based on output file extension.

View File

@@ -6,7 +6,7 @@ import tempfile
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.blocks.video._utils import strip_chapters_inplace
from backend.blocks.video._utils import extract_source_name, strip_chapters_inplace
from backend.data.block import (
Block,
BlockCategory,
@@ -88,9 +88,8 @@ class AddAudioToVideoBlock(Block):
final_clip = video_clip.with_audio(audio_clip)
# 4) Write to output file
output_filename = MediaFileType(
f"{node_exec_id}_audio_attached_{os.path.basename(local_video_path)}"
)
source = extract_source_name(local_video_path)
output_filename = MediaFileType(f"{node_exec_id}_with_audio_{source}.mp4")
output_abspath = os.path.join(abs_temp_dir, output_filename)
final_clip.write_videofile(output_abspath, codec="libx264", audio_codec="aac")

View File

@@ -1,11 +1,14 @@
"""VideoClipBlock - Extract a segment from a video file."""
import os
from typing import Literal
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.blocks.video._utils import get_video_codecs, strip_chapters_inplace
from backend.blocks.video._utils import (
extract_source_name,
get_video_codecs,
strip_chapters_inplace,
)
from backend.data.block import (
Block,
BlockCategory,
@@ -131,12 +134,10 @@ class VideoClipBlock(Block):
)
# Build output path
source = extract_source_name(local_video_path)
output_filename = MediaFileType(
f"{node_exec_id}_clip_{os.path.basename(local_video_path)}"
f"{node_exec_id}_clip_{source}.{input_data.output_format}"
)
# Ensure correct extension
base, _ = os.path.splitext(output_filename)
output_filename = MediaFileType(f"{base}.{input_data.output_format}")
output_abspath = get_exec_file_path(
execution_context.graph_exec_id, output_filename
)

View File

@@ -6,7 +6,11 @@ from moviepy import concatenate_videoclips
from moviepy.video.fx import CrossFadeIn, CrossFadeOut, FadeIn, FadeOut
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.blocks.video._utils import get_video_codecs, strip_chapters_inplace
from backend.blocks.video._utils import (
extract_source_name,
get_video_codecs,
strip_chapters_inplace,
)
from backend.data.block import (
Block,
BlockCategory,
@@ -175,8 +179,11 @@ class VideoConcatBlock(Block):
)
# Build output path
source = (
extract_source_name(video_abspaths[0]) if video_abspaths else "video"
)
output_filename = MediaFileType(
f"{node_exec_id}_concat.{input_data.output_format}"
f"{node_exec_id}_concat_{source}.{input_data.output_format}"
)
output_abspath = get_exec_file_path(
execution_context.graph_exec_id, output_filename

View File

@@ -1,12 +1,11 @@
"""LoopVideoBlock - Loop a video to a given duration or number of repeats."""
import os
from typing import Optional
from moviepy.video.fx.Loop import Loop
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.blocks.video._utils import strip_chapters_inplace
from backend.blocks.video._utils import extract_source_name, strip_chapters_inplace
from backend.data.block import (
Block,
BlockCategory,
@@ -88,9 +87,8 @@ class LoopVideoBlock(Block):
assert isinstance(looped_clip, VideoFileClip)
# 4) Save the looped output
output_filename = MediaFileType(
f"{node_exec_id}_looped_{os.path.basename(local_video_path)}"
)
source = extract_source_name(local_video_path)
output_filename = MediaFileType(f"{node_exec_id}_looped_{source}.mp4")
output_abspath = get_exec_file_path(graph_exec_id, output_filename)
looped_clip = looped_clip.with_audio(clip.audio)

View File

@@ -14,7 +14,11 @@ from backend.blocks.elevenlabs._auth import (
ElevenLabsCredentials,
ElevenLabsCredentialsInput,
)
from backend.blocks.video._utils import get_video_codecs, strip_chapters_inplace
from backend.blocks.video._utils import (
extract_source_name,
get_video_codecs,
strip_chapters_inplace,
)
from backend.data.block import (
Block,
BlockCategory,
@@ -229,9 +233,8 @@ class VideoNarrationBlock(Block):
f.write(audio_content)
# Add narration to video
output_filename = MediaFileType(
f"{node_exec_id}_narrated_{os.path.basename(local_video_path)}"
)
source = extract_source_name(local_video_path)
output_filename = MediaFileType(f"{node_exec_id}_narrated_{source}.mp4")
output_abspath = get_exec_file_path(
execution_context.graph_exec_id, output_filename
)

View File

@@ -1,12 +1,15 @@
"""VideoTextOverlayBlock - Add text overlay to video."""
import os
from typing import Literal
from moviepy import CompositeVideoClip, TextClip
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.blocks.video._utils import get_video_codecs, strip_chapters_inplace
from backend.blocks.video._utils import (
extract_source_name,
get_video_codecs,
strip_chapters_inplace,
)
from backend.data.block import (
Block,
BlockCategory,
@@ -192,9 +195,8 @@ class VideoTextOverlayBlock(Block):
)
# Build output path
output_filename = MediaFileType(
f"{node_exec_id}_overlay_{os.path.basename(local_video_path)}"
)
source = extract_source_name(local_video_path)
output_filename = MediaFileType(f"{node_exec_id}_overlay_{source}.mp4")
output_abspath = get_exec_file_path(
execution_context.graph_exec_id, output_filename
)