feat(backend): integrate ElevenLabs for video narration and add cost configuration

- Implemented ElevenLabs API integration for generating AI narration in videos.
- Updated VideoNarrationBlock to handle audio generation and mixing with video.
- Added ElevenLabs credentials to the credentials store.
- Configured block costs for using ElevenLabs TTS.
- Enhanced video processing blocks (concat, download, text overlay) for improved functionality.
- Updated dependencies in poetry.lock for ElevenLabs SDK and yt-dlp.
- Added provider icon for ElevenLabs in frontend credentials input.
This commit is contained in:
Nicholas Tindle
2026-01-22 19:26:39 -06:00
parent 521f69220d
commit d2d2a0c0c9
22 changed files with 337 additions and 1146 deletions

View File

@@ -152,6 +152,7 @@ REPLICATE_API_KEY=
REVID_API_KEY=
SCREENSHOTONE_API_KEY=
UNREAL_SPEECH_API_KEY=
ELEVENLABS_API_KEY=
# Data & Search Services
E2B_API_KEY=

View File

@@ -1,251 +0,0 @@
import os
import tempfile
from typing import Literal, Optional
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.fx.Loop import Loop
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.file import MediaFileType, get_exec_file_path, store_media_file
class MediaDurationBlock(Block):
class Input(BlockSchemaInput):
media_in: MediaFileType = SchemaField(
description="Media input (URL, data URI, or local path)."
)
is_video: bool = SchemaField(
description="Whether the media is a video (True) or audio (False).",
default=True,
)
class Output(BlockSchemaOutput):
duration: float = SchemaField(
description="Duration of the media file (in seconds)."
)
def __init__(self):
super().__init__(
id="d8b91fd4-da26-42d4-8ecb-8b196c6d84b6",
description="Block to get the duration of a media file.",
categories={BlockCategory.MULTIMEDIA},
input_schema=MediaDurationBlock.Input,
output_schema=MediaDurationBlock.Output,
)
async def run(
self,
input_data: Input,
*,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the input media locally
local_media_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.media_in,
user_id=user_id,
return_content=False,
)
media_abspath = get_exec_file_path(graph_exec_id, local_media_path)
# 2) Load the clip
if input_data.is_video:
clip = VideoFileClip(media_abspath)
else:
clip = AudioFileClip(media_abspath)
yield "duration", clip.duration
class LoopVideoBlock(Block):
"""
Block for looping (repeating) a video clip until a given duration or number of loops.
"""
class Input(BlockSchemaInput):
video_in: MediaFileType = SchemaField(
description="The input video (can be a URL, data URI, or local path)."
)
# Provide EITHER a `duration` or `n_loops` or both. We'll demonstrate `duration`.
duration: Optional[float] = SchemaField(
description="Target duration (in seconds) to loop the video to. If omitted, defaults to no looping.",
default=None,
ge=0.0,
)
n_loops: Optional[int] = SchemaField(
description="Number of times to repeat the video. If omitted, defaults to 1 (no repeat).",
default=None,
ge=1,
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="How to return the output video. Either a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Looped video returned either as a relative path or a data URI."
)
def __init__(self):
super().__init__(
id="8bf9eef6-5451-4213-b265-25306446e94b",
description="Block to loop a video to a given duration or number of repeats.",
categories={BlockCategory.MULTIMEDIA},
input_schema=LoopVideoBlock.Input,
output_schema=LoopVideoBlock.Output,
)
async def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the input video locally
local_video_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.video_in,
user_id=user_id,
return_content=False,
)
input_abspath = get_exec_file_path(graph_exec_id, local_video_path)
# 2) Load the clip
clip = VideoFileClip(input_abspath)
# 3) Apply the loop effect
looped_clip = clip
if input_data.duration:
# Loop until we reach the specified duration
looped_clip = looped_clip.with_effects([Loop(duration=input_data.duration)])
elif input_data.n_loops:
looped_clip = looped_clip.with_effects([Loop(n=input_data.n_loops)])
else:
raise ValueError("Either 'duration' or 'n_loops' must be provided.")
assert isinstance(looped_clip, VideoFileClip)
# 4) Save the looped output
output_filename = MediaFileType(
f"{node_exec_id}_looped_{os.path.basename(local_video_path)}"
)
output_abspath = get_exec_file_path(graph_exec_id, output_filename)
looped_clip = looped_clip.with_audio(clip.audio)
looped_clip.write_videofile(output_abspath, codec="libx264", audio_codec="aac")
# Return as data URI
video_out = await store_media_file(
graph_exec_id=graph_exec_id,
file=output_filename,
user_id=user_id,
return_content=input_data.output_return_type == "data_uri",
)
yield "video_out", video_out
class AddAudioToVideoBlock(Block):
"""
Block that adds (attaches) an audio track to an existing video.
Optionally scale the volume of the new track.
"""
class Input(BlockSchemaInput):
video_in: MediaFileType = SchemaField(
description="Video input (URL, data URI, or local path)."
)
audio_in: MediaFileType = SchemaField(
description="Audio input (URL, data URI, or local path)."
)
volume: float = SchemaField(
description="Volume scale for the newly attached audio track (1.0 = original).",
default=1.0,
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="Return the final output as a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchemaOutput):
video_out: MediaFileType = SchemaField(
description="Final video (with attached audio), as a path or data URI."
)
def __init__(self):
super().__init__(
id="3503748d-62b6-4425-91d6-725b064af509",
description="Block to attach an audio file to a video file using moviepy.",
categories={BlockCategory.MULTIMEDIA},
input_schema=AddAudioToVideoBlock.Input,
output_schema=AddAudioToVideoBlock.Output,
)
async def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
user_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the inputs locally
local_video_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.video_in,
user_id=user_id,
return_content=False,
)
local_audio_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.audio_in,
user_id=user_id,
return_content=False,
)
abs_temp_dir = os.path.join(tempfile.gettempdir(), "exec_file", graph_exec_id)
video_abspath = os.path.join(abs_temp_dir, local_video_path)
audio_abspath = os.path.join(abs_temp_dir, local_audio_path)
# 2) Load video + audio with moviepy
video_clip = VideoFileClip(video_abspath)
audio_clip = AudioFileClip(audio_abspath)
# Optionally scale volume
if input_data.volume != 1.0:
audio_clip = audio_clip.with_volume_scaled(input_data.volume)
# 3) Attach the new audio track
final_clip = video_clip.with_audio(audio_clip)
# 4) Write to output file
output_filename = MediaFileType(
f"{node_exec_id}_audio_attached_{os.path.basename(local_video_path)}"
)
output_abspath = os.path.join(abs_temp_dir, output_filename)
final_clip.write_videofile(output_abspath, codec="libx264", audio_codec="aac")
# 5) Return either path or data URI
video_out = await store_media_file(
graph_exec_id=graph_exec_id,
file=output_filename,
user_id=user_id,
return_content=input_data.output_return_type == "data_uri",
)
yield "video_out", video_out

View File

@@ -1,7 +1,6 @@
"""AddAudioToVideoBlock - Attach an audio track to a video."""
import os
import tempfile
from typing import Literal
from moviepy.audio.io.AudioFileClip import AudioFileClip
@@ -74,31 +73,37 @@ class AddAudioToVideoBlock(Block):
return_content=False,
)
abs_temp_dir = os.path.join(tempfile.gettempdir(), "exec_file", graph_exec_id)
video_abspath = os.path.join(abs_temp_dir, local_video_path)
audio_abspath = os.path.join(abs_temp_dir, local_audio_path)
video_abspath = get_exec_file_path(graph_exec_id, local_video_path)
audio_abspath = get_exec_file_path(graph_exec_id, local_audio_path)
video_clip = None
audio_clip = None
audio_clip_original = None
audio_clip_scaled = None
final_clip = None
try:
# 2) Load video + audio with moviepy
video_clip = VideoFileClip(video_abspath)
audio_clip = AudioFileClip(audio_abspath)
audio_clip_original = AudioFileClip(audio_abspath)
# Optionally scale volume
audio_to_use = audio_clip_original
if input_data.volume != 1.0:
audio_clip = audio_clip.with_volume_scaled(input_data.volume)
audio_clip_scaled = audio_clip_original.with_volume_scaled(
input_data.volume
)
audio_to_use = audio_clip_scaled
# 3) Attach the new audio track
final_clip = video_clip.with_audio(audio_clip)
final_clip = video_clip.with_audio(audio_to_use)
# 4) Write to output file
output_filename = MediaFileType(
f"{node_exec_id}_audio_attached_{os.path.basename(local_video_path)}"
)
output_abspath = os.path.join(abs_temp_dir, output_filename)
final_clip.write_videofile(output_abspath, codec="libx264", audio_codec="aac")
output_abspath = get_exec_file_path(graph_exec_id, output_filename)
final_clip.write_videofile(
output_abspath, codec="libx264", audio_codec="aac"
)
# 5) Return either path or data URI
video_out = await store_media_file(
@@ -112,7 +117,9 @@ class AddAudioToVideoBlock(Block):
finally:
if final_clip:
final_clip.close()
if audio_clip:
audio_clip.close()
if audio_clip_scaled:
audio_clip_scaled.close()
if audio_clip_original:
audio_clip_original.close()
if video_clip:
video_clip.close()

View File

@@ -2,13 +2,17 @@
import os
import tempfile
import uuid
from typing import Literal
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
@@ -19,39 +23,34 @@ class VideoClipBlock(Block):
class Input(BlockSchemaInput):
video_in: str = SchemaField(
description="Input video (URL, data URI, or file path)",
json_schema_extra={"format": "file"}
)
start_time: float = SchemaField(
description="Start time in seconds",
ge=0.0
)
end_time: float = SchemaField(
description="End time in seconds",
ge=0.0
json_schema_extra={"format": "file"},
)
start_time: float = SchemaField(description="Start time in seconds", ge=0.0)
end_time: float = SchemaField(description="End time in seconds", ge=0.0)
output_format: Literal["mp4", "webm", "mkv", "mov"] = SchemaField(
description="Output format",
default="mp4",
advanced=True
description="Output format", default="mp4", advanced=True
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Clipped video file",
json_schema_extra={"format": "file"}
description="Clipped video file", json_schema_extra={"format": "file"}
)
duration: float = SchemaField(description="Clip duration in seconds")
def __init__(self):
super().__init__(
id="b2c3d4e5-f6a7-8901-bcde-f23456789012",
id="8f539119-e580-4d86-ad41-86fbcb22abb1",
description="Extract a time segment from a video",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={"video_in": "/tmp/test.mp4", "start_time": 0.0, "end_time": 10.0},
test_input={
"video_in": "/tmp/test.mp4",
"start_time": 0.0,
"end_time": 10.0,
},
test_output=[("video_out", str), ("duration", float)],
test_mock={"_clip_video": lambda *args: ("/tmp/clip.mp4", 10.0)}
test_mock={"_clip_video": lambda *args: ("/tmp/clip.mp4", 10.0)},
)
def _clip_video(
@@ -66,7 +65,7 @@ class VideoClipBlock(Block):
subclip = None
try:
clip = VideoFileClip(video_in)
subclip = clip.subclip(start_time, end_time)
subclip = clip.subclipped(start_time, end_time)
fd, output_path = tempfile.mkstemp(suffix=f".{output_format}")
os.close(fd)
@@ -85,7 +84,7 @@ class VideoClipBlock(Block):
raise BlockExecutionError(
message=f"end_time ({input_data.end_time}) must be greater than start_time ({input_data.start_time})",
block_name=self.name,
block_id=str(self.id)
block_id=str(self.id),
)
try:
@@ -104,5 +103,5 @@ class VideoClipBlock(Block):
raise BlockExecutionError(
message=f"Failed to clip video: {e}",
block_name=self.name,
block_id=str(self.id)
block_id=str(self.id),
) from e

View File

@@ -2,13 +2,19 @@
import os
import tempfile
import uuid
from typing import Literal
from moviepy.editor import VideoFileClip, concatenate_videoclips
from moviepy import concatenate_videoclips
from moviepy.video.fx import CrossFadeIn, CrossFadeOut, FadeIn, FadeOut
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
@@ -21,45 +27,41 @@ class VideoConcatBlock(Block):
description="List of video files to concatenate (in order)"
)
transition: Literal["none", "crossfade", "fade_black"] = SchemaField(
description="Transition between clips",
default="none"
description="Transition between clips", default="none"
)
transition_duration: float = SchemaField(
transition_duration: int = SchemaField(
description="Transition duration in seconds",
default=0.5,
ge=0.0,
advanced=True
default=1,
ge=0,
advanced=True,
)
output_format: Literal["mp4", "webm", "mkv", "mov"] = SchemaField(
description="Output format",
default="mp4",
advanced=True
description="Output format", default="mp4", advanced=True
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Concatenated video file",
json_schema_extra={"format": "file"}
description="Concatenated video file", json_schema_extra={"format": "file"}
)
total_duration: float = SchemaField(description="Total duration in seconds")
def __init__(self):
super().__init__(
id="c3d4e5f6-a7b8-9012-cdef-345678901234",
id="9b0f531a-1118-487f-aeec-3fa63ea8900a",
description="Merge multiple video clips into one continuous video",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={"videos": ["/tmp/a.mp4", "/tmp/b.mp4"]},
test_output=[("video_out", str), ("total_duration", float)],
test_mock={"_concat_videos": lambda *args: ("/tmp/concat.mp4", 20.0)}
test_mock={"_concat_videos": lambda *args: ("/tmp/concat.mp4", 20.0)},
)
def _concat_videos(
self,
videos: list[str],
transition: str,
transition_duration: float,
transition_duration: int,
output_format: str,
) -> tuple[str, float]:
"""Concatenate videos. Extracted for testability."""
@@ -73,20 +75,23 @@ class VideoConcatBlock(Block):
if transition == "crossfade":
for i, clip in enumerate(clips):
effects = []
if i > 0:
clip = clip.crossfadein(transition_duration)
effects.append(CrossFadeIn(transition_duration))
if i < len(clips) - 1:
clip = clip.crossfadeout(transition_duration)
effects.append(CrossFadeOut(transition_duration))
if effects:
clip = clip.with_effects(effects)
faded_clips.append(clip)
final = concatenate_videoclips(
faded_clips,
method="compose",
padding=-transition_duration
padding=-transition_duration,
)
elif transition == "fade_black":
for clip in clips:
faded = clip.fadein(transition_duration).fadeout(
transition_duration
faded = clip.with_effects(
[FadeIn(transition_duration), FadeOut(transition_duration)]
)
faded_clips.append(faded)
final = concatenate_videoclips(faded_clips)
@@ -112,7 +117,7 @@ class VideoConcatBlock(Block):
raise BlockExecutionError(
message="At least 2 videos are required for concatenation",
block_name=self.name,
block_id=str(self.id)
block_id=str(self.id),
)
try:
@@ -131,5 +136,5 @@ class VideoConcatBlock(Block):
raise BlockExecutionError(
message=f"Failed to concatenate videos: {e}",
block_name=self.name,
block_id=str(self.id)
block_id=str(self.id),
) from e

View File

@@ -6,9 +6,15 @@ import uuid
from typing import Literal
import yt_dlp
from yt_dlp import _Params
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
@@ -19,22 +25,19 @@ class VideoDownloadBlock(Block):
class Input(BlockSchemaInput):
url: str = SchemaField(
description="URL of the video to download (YouTube, Vimeo, direct link, etc.)",
placeholder="https://www.youtube.com/watch?v=..."
placeholder="https://www.youtube.com/watch?v=...",
)
quality: Literal["best", "1080p", "720p", "480p", "audio_only"] = SchemaField(
description="Video quality preference",
default="720p"
description="Video quality preference", default="720p"
)
output_format: Literal["mp4", "webm", "mkv"] = SchemaField(
description="Output video format",
default="mp4",
advanced=True
description="Output video format", default="mp4", advanced=True
)
class Output(BlockSchemaOutput):
video_file: str = SchemaField(
description="Path or data URI of downloaded video",
json_schema_extra={"format": "file"}
json_schema_extra={"format": "file"},
)
duration: float = SchemaField(description="Video duration in seconds")
title: str = SchemaField(description="Video title from source")
@@ -42,14 +45,24 @@ class VideoDownloadBlock(Block):
def __init__(self):
super().__init__(
id="a1b2c3d4-e5f6-7890-abcd-ef1234567890",
id="c35daabb-cd60-493b-b9ad-51f1fe4b50c4",
description="Download video from URL (YouTube, Vimeo, news sites, direct links)",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ", "quality": "480p"},
test_output=[("video_file", str), ("duration", float), ("title", str), ("source_url", str)],
test_mock={"_download_video": lambda *args: ("/tmp/video.mp4", 212.0, "Test Video")}
test_input={
"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ",
"quality": "480p",
},
test_output=[
("video_file", str),
("duration", float),
("title", str),
("source_url", str),
],
test_mock={
"_download_video": lambda *args: ("/tmp/video.mp4", 212.0, "Test Video")
},
)
def _get_format_string(self, quality: str) -> str:
@@ -58,7 +71,7 @@ class VideoDownloadBlock(Block):
"1080p": "bestvideo[height<=1080]+bestaudio/best[height<=1080]",
"720p": "bestvideo[height<=720]+bestaudio/best[height<=720]",
"480p": "bestvideo[height<=480]+bestaudio/best[height<=480]",
"audio_only": "bestaudio/best"
"audio_only": "bestaudio/best",
}
return formats.get(quality, formats["720p"])
@@ -73,7 +86,7 @@ class VideoDownloadBlock(Block):
temp_dir = tempfile.gettempdir()
output_template = os.path.join(temp_dir, f"{video_id}.%(ext)s")
ydl_opts = {
ydl_opts: _Params = {
"format": self._get_format_string(quality),
"outtmpl": output_template,
"merge_output_format": output_format,
@@ -89,7 +102,11 @@ class VideoDownloadBlock(Block):
if not video_path.endswith(f".{output_format}"):
video_path = video_path.rsplit(".", 1)[0] + f".{output_format}"
return video_path, info.get("duration") or 0.0, info.get("title") or "Unknown"
return (
video_path,
info.get("duration") or 0.0,
info.get("title") or "Unknown",
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
@@ -107,5 +124,5 @@ class VideoDownloadBlock(Block):
raise BlockExecutionError(
message=f"Failed to download video: {e}",
block_name=self.name,
block_id=str(self.id)
block_id=str(self.id),
) from e

View File

@@ -1,7 +1,7 @@
"""LoopVideoBlock - Loop a video to a given duration or number of repeats."""
import os
from typing import Literal, Optional
from typing import Any, Literal, Optional
from moviepy.video.fx.Loop import Loop
from moviepy.video.io.VideoFileClip import VideoFileClip
@@ -71,31 +71,30 @@ class LoopVideoBlock(Block):
)
input_abspath = get_exec_file_path(graph_exec_id, local_video_path)
clip = None
looped_clip = None
clip: VideoFileClip | None = None
looped_clip: Any = None
try:
# 2) Load the clip
clip = VideoFileClip(input_abspath)
# 3) Apply the loop effect
looped_clip = clip
# Note: Loop effect handles both video and audio looping automatically
if input_data.duration:
looped_clip = looped_clip.with_effects([Loop(duration=input_data.duration)])
looped_clip = clip.with_effects([Loop(duration=input_data.duration)])
elif input_data.n_loops:
looped_clip = looped_clip.with_effects([Loop(n=input_data.n_loops)])
looped_clip = clip.with_effects([Loop(n=input_data.n_loops)])
else:
raise ValueError("Either 'duration' or 'n_loops' must be provided.")
assert isinstance(looped_clip, VideoFileClip)
# 4) Save the looped output
output_filename = MediaFileType(
f"{node_exec_id}_looped_{os.path.basename(local_video_path)}"
)
output_abspath = get_exec_file_path(graph_exec_id, output_filename)
looped_clip = looped_clip.with_audio(clip.audio)
looped_clip.write_videofile(output_abspath, codec="libx264", audio_codec="aac")
looped_clip.write_videofile(
output_abspath, codec="libx264", audio_codec="aac"
)
# Return as data URI or path
video_out = await store_media_file(
@@ -107,7 +106,7 @@ class LoopVideoBlock(Block):
yield "video_out", video_out
finally:
if looped_clip and looped_clip is not clip:
if looped_clip is not None:
looped_clip.close()
if clip:
if clip is not None:
clip.close()

View File

@@ -2,15 +2,21 @@
import os
import tempfile
import uuid
from typing import Literal
import requests
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip
from elevenlabs import ElevenLabs
from moviepy import CompositeAudioClip
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.model import SchemaField, CredentialsMetaInput, APIKeyCredentials
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import APIKeyCredentials, CredentialsMetaInput, SchemaField
from backend.integrations.providers import ProviderName
from backend.util.exceptions import BlockExecutionError
@@ -20,53 +26,46 @@ class VideoNarrationBlock(Block):
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput[
Literal[ProviderName.ELEVENLABS], Literal["api_key"]
] = SchemaField(
description="ElevenLabs API key for voice synthesis"
)
Literal[ProviderName.ELEVENLABS],
Literal["api_key"],
] = SchemaField(description="ElevenLabs API key for voice synthesis")
video_in: str = SchemaField(
description="Input video file",
json_schema_extra={"format": "file"}
)
script: str = SchemaField(
description="Narration script text"
description="Input video file", json_schema_extra={"format": "file"}
)
script: str = SchemaField(description="Narration script text")
voice_id: str = SchemaField(
description="ElevenLabs voice ID",
default="21m00Tcm4TlvDq8ikWAM" # Rachel
description="ElevenLabs voice ID", default="21m00Tcm4TlvDq8ikWAM" # Rachel
)
mix_mode: Literal["replace", "mix", "ducking"] = SchemaField(
description="How to combine with original audio. 'ducking' applies stronger attenuation than 'mix'.",
default="ducking"
default="ducking",
)
narration_volume: float = SchemaField(
description="Narration volume (0.0 to 2.0)",
default=1.0,
ge=0.0,
le=2.0,
advanced=True
advanced=True,
)
original_volume: float = SchemaField(
description="Original audio volume when mixing (0.0 to 1.0)",
default=0.3,
ge=0.0,
le=1.0,
advanced=True
advanced=True,
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Video with narration",
json_schema_extra={"format": "file"}
description="Video with narration", json_schema_extra={"format": "file"}
)
audio_file: str = SchemaField(
description="Generated audio file",
json_schema_extra={"format": "file"}
description="Generated audio file", json_schema_extra={"format": "file"}
)
def __init__(self):
super().__init__(
id="e5f6a7b8-c9d0-1234-ef56-789012345678",
id="3d036b53-859c-4b17-9826-ca340f736e0e",
description="Generate AI narration and add to video",
categories={BlockCategory.MULTIMEDIA, BlockCategory.AI},
input_schema=self.Input,
@@ -74,58 +73,58 @@ class VideoNarrationBlock(Block):
test_input={
"video_in": "/tmp/test.mp4",
"script": "Hello world",
"credentials": {"provider": "elevenlabs", "id": "test", "type": "api_key"}
"credentials": {
"provider": "elevenlabs",
"id": "test",
"type": "api_key",
},
},
test_output=[("video_out", str), ("audio_file", str)],
test_mock={"_generate_and_add_narration": lambda *args: ("/tmp/narrated.mp4", "/tmp/audio.mp3")}
test_mock={
"_generate_narration_audio": lambda *args: b"mock audio content",
"_add_narration_to_video": lambda *args: "/tmp/narrated.mp4",
},
)
def _generate_and_add_narration(
def _generate_narration_audio(
self, api_key: str, script: str, voice_id: str
) -> bytes:
"""Generate narration audio via ElevenLabs API."""
client = ElevenLabs(api_key=api_key)
audio_generator = client.text_to_speech.convert(
voice_id=voice_id,
text=script,
model_id="eleven_monolingual_v1",
)
# The SDK returns a generator, collect all chunks
return b"".join(audio_generator)
def _add_narration_to_video(
self,
api_key: str,
video_in: str,
script: str,
voice_id: str,
audio_path: str,
mix_mode: str,
narration_volume: float,
original_volume: float,
) -> tuple[str, str]:
"""Generate narration and add to video. Extracted for testability."""
) -> str:
"""Add narration audio to video. Extracted for testability."""
video = None
final = None
narration = None
narration_original = None
narration_scaled = None
original = None
try:
# Generate narration via ElevenLabs
response = requests.post(
f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}",
headers={
"xi-api-key": api_key,
"Content-Type": "application/json"
},
json={
"text": script,
"model_id": "eleven_monolingual_v1"
},
timeout=120
)
response.raise_for_status()
fd, audio_path = tempfile.mkstemp(suffix=".mp3")
with os.fdopen(fd, "wb") as f:
f.write(response.content)
# Combine with video
video = VideoFileClip(video_in)
narration = AudioFileClip(audio_path)
narration = narration.volumex(narration_volume)
narration_original = AudioFileClip(audio_path)
narration_scaled = narration_original.with_volume_scaled(narration_volume)
narration = narration_scaled
if mix_mode == "replace":
final_audio = narration
elif mix_mode == "mix":
if video.audio:
original = video.audio.volumex(original_volume)
original = video.audio.with_volume_scaled(original_volume)
final_audio = CompositeAudioClip([original, narration])
else:
final_audio = narration
@@ -133,58 +132,62 @@ class VideoNarrationBlock(Block):
if video.audio:
# Ducking uses a much lower volume for original audio
ducking_volume = original_volume * 0.3
original = video.audio.volumex(ducking_volume)
original = video.audio.with_volume_scaled(ducking_volume)
final_audio = CompositeAudioClip([original, narration])
else:
final_audio = narration
final = video.set_audio(final_audio)
final = video.with_audio(final_audio)
fd, output_path = tempfile.mkstemp(suffix=".mp4")
os.close(fd)
final.write_videofile(output_path, logger=None)
return output_path, audio_path
return output_path
finally:
if original:
original.close()
if narration:
narration.close()
if narration_scaled:
narration_scaled.close()
if narration_original:
narration_original.close()
if final:
final.close()
if video:
video.close()
async def run(
self,
input_data: Input,
*,
credentials: APIKeyCredentials,
**kwargs
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
try:
output_path, audio_path = self._generate_and_add_narration(
# Generate narration audio via ElevenLabs
audio_content = self._generate_narration_audio(
credentials.api_key.get_secret_value(),
input_data.video_in,
input_data.script,
input_data.voice_id,
)
# Save audio to temp file
fd, audio_path = tempfile.mkstemp(suffix=".mp3")
with os.fdopen(fd, "wb") as f:
f.write(audio_content)
# Add narration to video
output_path = self._add_narration_to_video(
input_data.video_in,
audio_path,
input_data.mix_mode,
input_data.narration_volume,
input_data.original_volume,
)
yield "video_out", output_path
yield "audio_file", audio_path
except requests.exceptions.RequestException as e:
raise BlockExecutionError(
message=f"ElevenLabs API error: {e}",
block_name=self.name,
block_id=str(self.id)
) from e
except Exception as e:
raise BlockExecutionError(
message=f"Failed to add narration: {e}",
block_name=self.name,
block_id=str(self.id)
block_id=str(self.id),
) from e

View File

@@ -2,13 +2,18 @@
import os
import tempfile
import uuid
from typing import Literal
from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip
from moviepy import CompositeVideoClip, TextClip
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
@@ -18,64 +23,55 @@ class VideoTextOverlayBlock(Block):
class Input(BlockSchemaInput):
video_in: str = SchemaField(
description="Input video file",
json_schema_extra={"format": "file"}
)
text: str = SchemaField(
description="Text to overlay on video"
description="Input video file", json_schema_extra={"format": "file"}
)
text: str = SchemaField(description="Text to overlay on video")
position: Literal[
"top", "center", "bottom",
"top-left", "top-right",
"bottom-left", "bottom-right"
] = SchemaField(
description="Position of text on screen",
default="bottom"
)
"top",
"center",
"bottom",
"top-left",
"top-right",
"bottom-left",
"bottom-right",
] = SchemaField(description="Position of text on screen", default="bottom")
start_time: float | None = SchemaField(
description="When to show text (seconds). None = entire video",
default=None,
advanced=True
advanced=True,
)
end_time: float | None = SchemaField(
description="When to hide text (seconds). None = until end",
default=None,
advanced=True
advanced=True,
)
font_size: int = SchemaField(
description="Font size",
default=48,
ge=12,
le=200,
advanced=True
description="Font size", default=48, ge=12, le=200, advanced=True
)
font_color: str = SchemaField(
description="Font color (hex or name)",
default="white",
advanced=True
description="Font color (hex or name)", default="white", advanced=True
)
bg_color: str | None = SchemaField(
description="Background color behind text (None for transparent)",
default=None,
advanced=True
advanced=True,
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Video with text overlay",
json_schema_extra={"format": "file"}
description="Video with text overlay", json_schema_extra={"format": "file"}
)
def __init__(self):
super().__init__(
id="d4e5f6a7-b8c9-0123-def4-567890123456",
id="8ef14de6-cc90-430a-8cfa-3a003be92454",
description="Add text overlay/caption to video",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={"video_in": "/tmp/test.mp4", "text": "Hello World"},
test_output=[("video_out", str)],
test_mock={"_add_text_overlay": lambda *args: "/tmp/overlay.mp4"}
test_mock={"_add_text_overlay": lambda *args: "/tmp/overlay.mp4"},
)
def _add_text_overlay(
@@ -97,8 +93,8 @@ class VideoTextOverlayBlock(Block):
video = VideoFileClip(video_in)
txt_clip = TextClip(
text,
fontsize=font_size,
text=text,
font_size=font_size,
color=font_color,
bg_color=bg_color,
)
@@ -114,13 +110,13 @@ class VideoTextOverlayBlock(Block):
"bottom-right": ("right", "bottom"),
}
txt_clip = txt_clip.set_position(pos_map[position])
txt_clip = txt_clip.with_position(pos_map[position])
# Set timing
start = start_time or 0
end = end_time or video.duration
duration = max(0, end - start)
txt_clip = txt_clip.set_start(start).set_end(end).set_duration(duration)
txt_clip = txt_clip.with_start(start).with_end(end).with_duration(duration)
final = CompositeVideoClip([video, txt_clip])
@@ -139,13 +135,15 @@ class VideoTextOverlayBlock(Block):
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
# Validate time range if both are provided
if (input_data.start_time is not None and
input_data.end_time is not None and
input_data.end_time <= input_data.start_time):
if (
input_data.start_time is not None
and input_data.end_time is not None
and input_data.end_time <= input_data.start_time
):
raise BlockExecutionError(
message=f"end_time ({input_data.end_time}) must be greater than start_time ({input_data.start_time})",
block_name=self.name,
block_id=str(self.id)
block_id=str(self.id),
)
try:
@@ -167,5 +165,5 @@ class VideoTextOverlayBlock(Block):
raise BlockExecutionError(
message=f"Failed to add text overlay: {e}",
block_name=self.name,
block_id=str(self.id)
block_id=str(self.id),
) from e

View File

@@ -36,12 +36,14 @@ from backend.blocks.replicate.replicate_block import ReplicateModelBlock
from backend.blocks.smart_decision_maker import SmartDecisionMakerBlock
from backend.blocks.talking_head import CreateTalkingAvatarVideoBlock
from backend.blocks.text_to_speech_block import UnrealTextToSpeechBlock
from backend.blocks.video.narration import VideoNarrationBlock
from backend.data.block import Block, BlockCost, BlockCostType
from backend.integrations.credentials_store import (
aiml_api_credentials,
anthropic_credentials,
apollo_credentials,
did_credentials,
elevenlabs_credentials,
enrichlayer_credentials,
groq_credentials,
ideogram_credentials,
@@ -640,4 +642,16 @@ BLOCK_COSTS: dict[Type[Block], list[BlockCost]] = {
},
),
],
VideoNarrationBlock: [
BlockCost(
cost_amount=5, # ElevenLabs TTS cost
cost_filter={
"credentials": {
"id": elevenlabs_credentials.id,
"provider": elevenlabs_credentials.provider,
"type": elevenlabs_credentials.type,
}
},
)
],
}

View File

@@ -224,6 +224,14 @@ openweathermap_credentials = APIKeyCredentials(
expires_at=None,
)
elevenlabs_credentials = APIKeyCredentials(
id="f4a8b6c2-3d1e-4f5a-9b8c-7d6e5f4a3b2c",
provider="elevenlabs",
api_key=SecretStr(settings.secrets.elevenlabs_api_key),
title="Use Credits for ElevenLabs",
expires_at=None,
)
DEFAULT_CREDENTIALS = [
ollama_credentials,
revid_credentials,
@@ -252,6 +260,7 @@ DEFAULT_CREDENTIALS = [
v0_credentials,
webshare_proxy_credentials,
openweathermap_credentials,
elevenlabs_credentials,
]
SYSTEM_CREDENTIAL_IDS = {cred.id for cred in DEFAULT_CREDENTIALS}
@@ -366,6 +375,8 @@ class IntegrationCredentialsStore:
all_credentials.append(webshare_proxy_credentials)
if settings.secrets.openweathermap_api_key:
all_credentials.append(openweathermap_credentials)
if settings.secrets.elevenlabs_api_key:
all_credentials.append(elevenlabs_credentials)
return all_credentials
async def get_creds_by_id(

View File

@@ -18,6 +18,7 @@ class ProviderName(str, Enum):
DISCORD = "discord"
D_ID = "d_id"
E2B = "e2b"
ELEVENLABS = "elevenlabs"
FAL = "fal"
GITHUB = "github"
GOOGLE = "google"

View File

@@ -630,6 +630,7 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
e2b_api_key: str = Field(default="", description="E2B API key")
nvidia_api_key: str = Field(default="", description="Nvidia API key")
mem0_api_key: str = Field(default="", description="Mem0 API key")
elevenlabs_api_key: str = Field(default="", description="ElevenLabs API key")
linear_client_id: str = Field(default="", description="Linear client ID")
linear_client_secret: str = Field(default="", description="Linear client secret")

View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.3.0 and should not be changed by hand.
[[package]]
name = "aio-pika"
@@ -338,7 +338,7 @@ description = "LTS Port of Python audioop"
optional = false
python-versions = ">=3.13"
groups = ["main"]
markers = "python_version >= \"3.13\""
markers = "python_version == \"3.13\""
files = [
{file = "audioop_lts-0.2.2-cp313-abi3-macosx_10_13_universal2.whl", hash = "sha256:fd3d4602dc64914d462924a08c1a9816435a2155d74f325853c1f1ac3b2d9800"},
{file = "audioop_lts-0.2.2-cp313-abi3-macosx_10_13_x86_64.whl", hash = "sha256:550c114a8df0aafe9a05442a1162dfc8fec37e9af1d625ae6060fed6e756f303"},
@@ -438,7 +438,7 @@ description = "Backport of asyncio.Runner, a context manager that controls event
optional = false
python-versions = "<3.11,>=3.8"
groups = ["main"]
markers = "python_version < \"3.11\""
markers = "python_version == \"3.10\""
files = [
{file = "backports_asyncio_runner-1.2.0-py3-none-any.whl", hash = "sha256:0da0a936a8aeb554eccb426dc55af3ba63bcdc69fa1a600b5bb305413a4477b5"},
{file = "backports_asyncio_runner-1.2.0.tar.gz", hash = "sha256:a5aa7b2b7d8f8bfcaa2b57313f70792df84e32a2a746f585213373f900b42162"},
@@ -451,7 +451,7 @@ description = "Backport of CPython tarfile module"
optional = false
python-versions = ">=3.8"
groups = ["main"]
markers = "python_version <= \"3.11\""
markers = "python_version < \"3.12\""
files = [
{file = "backports.tarfile-1.2.0-py3-none-any.whl", hash = "sha256:77e284d754527b01fb1e6fa8a1afe577858ebe4e9dad8919e34c862cb399bc34"},
{file = "backports_tarfile-1.2.0.tar.gz", hash = "sha256:d75e02c268746e1b8144c278978b6e98e85de6ad16f8e4b0844a154557eca991"},
@@ -567,7 +567,7 @@ pyproject_hooks = "*"
tomli = {version = ">=1.1.0", markers = "python_version < \"3.11\""}
[package.extras]
docs = ["furo (>=2023.08.17)", "sphinx (>=7.0,<8.0)", "sphinx-argparse-cli (>=1.5)", "sphinx-autodoc-typehints (>=1.10)", "sphinx-issues (>=3.0.0)"]
docs = ["furo (>=2023.8.17)", "sphinx (>=7.0,<8.0)", "sphinx-argparse-cli (>=1.5)", "sphinx-autodoc-typehints (>=1.10)", "sphinx-issues (>=3.0.0)"]
test = ["build[uv,virtualenv]", "filelock (>=3)", "pytest (>=6.2.4)", "pytest-cov (>=2.12)", "pytest-mock (>=2)", "pytest-rerunfailures (>=9.1)", "pytest-xdist (>=1.34)", "setuptools (>=42.0.0) ; python_version < \"3.10\"", "setuptools (>=56.0.0) ; python_version == \"3.10\"", "setuptools (>=56.0.0) ; python_version == \"3.11\"", "setuptools (>=67.8.0) ; python_version >= \"3.12\"", "wheel (>=0.36.0)"]
typing = ["build[uv]", "importlib-metadata (>=5.1)", "mypy (>=1.9.0,<1.10.0)", "tomli", "typing-extensions (>=3.7.4.3)"]
uv = ["uv (>=0.1.18)"]
@@ -1169,6 +1169,29 @@ attrs = ">=21.3.0"
e2b = ">=1.5.4,<2.0.0"
httpx = ">=0.20.0,<1.0.0"
[[package]]
name = "elevenlabs"
version = "1.59.0"
description = ""
optional = false
python-versions = "<4.0,>=3.8"
groups = ["main"]
files = [
{file = "elevenlabs-1.59.0-py3-none-any.whl", hash = "sha256:468145db81a0bc867708b4a8619699f75583e9481b395ec1339d0b443da771ed"},
{file = "elevenlabs-1.59.0.tar.gz", hash = "sha256:16e735bd594e86d415dd445d249c8cc28b09996cfd627fbc10102c0a84698859"},
]
[package.dependencies]
httpx = ">=0.21.2"
pydantic = ">=1.9.2"
pydantic-core = ">=2.18.2,<3.0.0"
requests = ">=2.20"
typing_extensions = ">=4.0.0"
websockets = ">=11.0"
[package.extras]
pyaudio = ["pyaudio (>=0.2.14)"]
[[package]]
name = "email-validator"
version = "2.2.0"
@@ -1215,7 +1238,7 @@ files = [
{file = "exceptiongroup-1.3.0-py3-none-any.whl", hash = "sha256:4d111e6e0c13d0644cad6ddaa7ed0261a0b36971f6d23e7ec9b4b9097da78a10"},
{file = "exceptiongroup-1.3.0.tar.gz", hash = "sha256:b241f5885f560bc56a59ee63ca4c6a8bfa46ae4ad651af316d4e81817bb9fd88"},
]
markers = {dev = "python_version < \"3.11\""}
markers = {dev = "python_version == \"3.10\""}
[package.dependencies]
typing-extensions = {version = ">=4.6.0", markers = "python_version < \"3.13\""}
@@ -1582,16 +1605,16 @@ files = [
google-auth = ">=2.14.1,<3.0.0"
googleapis-common-protos = ">=1.56.2,<2.0.0"
grpcio = [
{version = ">=1.33.2,<2.0.0", optional = true, markers = "extra == \"grpc\""},
{version = ">=1.49.1,<2.0.0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""},
{version = ">=1.33.2,<2.0.0", optional = true, markers = "extra == \"grpc\""},
]
grpcio-status = [
{version = ">=1.33.2,<2.0.0", optional = true, markers = "extra == \"grpc\""},
{version = ">=1.49.1,<2.0.0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""},
{version = ">=1.33.2,<2.0.0", optional = true, markers = "extra == \"grpc\""},
]
proto-plus = [
{version = ">=1.22.3,<2.0.0"},
{version = ">=1.25.0,<2.0.0", markers = "python_version >= \"3.13\""},
{version = ">=1.22.3,<2.0.0"},
]
protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<7.0.0"
requests = ">=2.18.0,<3.0.0"
@@ -1699,8 +1722,8 @@ files = [
google-api-core = {version = ">=1.34.1,<2.0.dev0 || >=2.11.dev0,<3.0.0", extras = ["grpc"]}
google-auth = ">=2.14.1,<2.24.0 || >2.24.0,<2.25.0 || >2.25.0,<3.0.0"
proto-plus = [
{version = ">=1.22.3,<2.0.0"},
{version = ">=1.25.0,<2.0.0", markers = "python_version >= \"3.13\""},
{version = ">=1.22.3,<2.0.0"},
]
protobuf = ">=3.20.2,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<7.0.0"
@@ -1733,11 +1756,11 @@ files = [
]
[package.dependencies]
google-api-core = ">=1.31.6,<2.0.dev0 || >2.3.0,<3.0.0dev"
google-auth = ">=1.25.0,<3.0dev"
google-api-core = ">=1.31.6,<2.0.dev0 || >2.3.0,<3.0.0.dev0"
google-auth = ">=1.25.0,<3.0.dev0"
[package.extras]
grpc = ["grpcio (>=1.38.0,<2.0dev)", "grpcio-status (>=1.38.0,<2.0.dev0)"]
grpc = ["grpcio (>=1.38.0,<2.0.dev0)", "grpcio-status (>=1.38.0,<2.0.dev0)"]
[[package]]
name = "google-cloud-logging"
@@ -1760,9 +1783,9 @@ google-cloud-core = ">=2.0.0,<3.0.0"
grpc-google-iam-v1 = ">=0.12.4,<1.0.0"
opentelemetry-api = ">=1.9.0"
proto-plus = [
{version = ">=1.22.0,<2.0.0"},
{version = ">=1.22.2,<2.0.0", markers = "python_version >= \"3.11\""},
{version = ">=1.25.0,<2.0.0", markers = "python_version >= \"3.13\""},
{version = ">=1.22.2,<2.0.0", markers = "python_version >= \"3.11\" and python_version < \"3.13\""},
{version = ">=1.22.0,<2.0.0", markers = "python_version < \"3.11\""},
]
protobuf = ">=3.20.2,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<7.0.0"
@@ -1850,11 +1873,11 @@ files = [
]
[package.dependencies]
google-crc32c = ">=1.0,<2.0dev"
google-crc32c = ">=1.0,<2.0.dev0"
[package.extras]
aiohttp = ["aiohttp (>=3.6.2,<4.0.0dev)", "google-auth (>=1.22.0,<2.0dev)"]
requests = ["requests (>=2.18.0,<3.0.0dev)"]
aiohttp = ["aiohttp (>=3.6.2,<4.0.0.dev0)", "google-auth (>=1.22.0,<2.0.dev0)"]
requests = ["requests (>=2.18.0,<3.0.0.dev0)"]
[[package]]
name = "googleapis-common-protos"
@@ -2126,7 +2149,7 @@ files = [
[package.dependencies]
googleapis-common-protos = ">=1.5.5"
grpcio = ">=1.71.2"
protobuf = ">=5.26.1,<6.0dev"
protobuf = ">=5.26.1,<6.0.dev0"
[[package]]
name = "h11"
@@ -2724,7 +2747,7 @@ files = [
[package.dependencies]
attrs = ">=22.2.0"
jsonschema-specifications = ">=2023.03.6"
jsonschema-specifications = ">=2023.3.6"
referencing = ">=0.28.4"
rpds-py = ">=0.7.1"
@@ -3294,7 +3317,7 @@ description = "Fundamental package for array computing in Python"
optional = false
python-versions = ">=3.10"
groups = ["main"]
markers = "python_version < \"3.11\""
markers = "python_version == \"3.10\""
files = [
{file = "numpy-2.2.6-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:b412caa66f72040e6d268491a59f2c43bf03eb6c96dd8f0307829feb7fa2b6fb"},
{file = "numpy-2.2.6-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:8e41fd67c52b86603a91c1a505ebaef50b3314de0213461c7a6e99c9a3beff90"},
@@ -3751,9 +3774,9 @@ files = [
[package.dependencies]
numpy = [
{version = ">=1.22.4", markers = "python_version < \"3.11\""},
{version = ">=1.23.2", markers = "python_version == \"3.11\""},
{version = ">=1.26.0", markers = "python_version >= \"3.12\""},
{version = ">=1.23.2", markers = "python_version == \"3.11\""},
{version = ">=1.22.4", markers = "python_version < \"3.11\""},
]
python-dateutil = ">=2.8.2"
pytz = ">=2020.1"
@@ -3990,8 +4013,8 @@ pinecone-plugin-interface = ">=0.0.7,<0.0.8"
python-dateutil = ">=2.5.3"
typing-extensions = ">=3.7.4"
urllib3 = [
{version = ">=1.26.0", markers = "python_version >= \"3.8\" and python_version < \"3.12\""},
{version = ">=1.26.5", markers = "python_version >= \"3.12\" and python_version < \"4.0\""},
{version = ">=1.26.0", markers = "python_version >= \"3.8\" and python_version < \"3.12\""},
]
[package.extras]
@@ -5326,8 +5349,8 @@ files = [
grpcio = ">=1.41.0"
httpx = {version = ">=0.20.0", extras = ["http2"]}
numpy = [
{version = ">=1.21", markers = "python_version >= \"3.10\" and python_version < \"3.12\""},
{version = ">=2.1.0", markers = "python_version >= \"3.13\""},
{version = ">=1.21", markers = "python_version >= \"3.10\" and python_version < \"3.12\""},
{version = ">=1.26", markers = "python_version == \"3.12\""},
]
portalocker = ">=2.7.0,<3.0.0"
@@ -6466,7 +6489,7 @@ description = "A lil' TOML parser"
optional = false
python-versions = ">=3.8"
groups = ["main", "dev"]
markers = "python_version < \"3.11\""
markers = "python_version == \"3.10\""
files = [
{file = "tomli-2.2.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249"},
{file = "tomli-2.2.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6"},
@@ -7361,6 +7384,28 @@ files = [
defusedxml = ">=0.7.1,<0.8.0"
requests = "*"
[[package]]
name = "yt-dlp"
version = "2024.12.23"
description = "A feature-rich command-line audio/video downloader"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "yt_dlp-2024.12.23-py3-none-any.whl", hash = "sha256:2fc08a5221a0379628ac4e7324c6c69a95b9fdfa7a7ca3187444b3b7451e38be"},
{file = "yt_dlp-2024.12.23.tar.gz", hash = "sha256:ac0e72b5a9017ba104b4258546201a7cedc38e8bd20727e0c63b77c829b425e9"},
]
[package.extras]
build = ["build", "hatchling", "pip", "setuptools (>=71.0.2)", "wheel"]
curl-cffi = ["curl-cffi (==0.5.10) ; os_name == \"nt\" and implementation_name == \"cpython\"", "curl-cffi (>=0.5.10,!=0.6.*,<0.7.2) ; os_name != \"nt\" and implementation_name == \"cpython\""]
default = ["brotli ; implementation_name == \"cpython\"", "brotlicffi ; implementation_name != \"cpython\"", "certifi", "mutagen", "pycryptodomex", "requests (>=2.32.2,<3)", "urllib3 (>=1.26.17,<3)", "websockets (>=13.0)"]
dev = ["autopep8 (>=2.0,<3.0)", "pre-commit", "pytest (>=8.1,<9.0)", "pytest-rerunfailures (>=14.0,<15.0)", "ruff (>=0.8.0,<0.9.0)"]
pyinstaller = ["pyinstaller (>=6.11.1)"]
secretstorage = ["cffi", "secretstorage"]
static-analysis = ["autopep8 (>=2.0,<3.0)", "ruff (>=0.8.0,<0.9.0)"]
test = ["pytest (>=8.1,<9.0)", "pytest-rerunfailures (>=14.0,<15.0)"]
[[package]]
name = "zerobouncesdk"
version = "1.1.2"
@@ -7512,4 +7557,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.14"
content-hash = "18b92e09596298c82432e4d0a85cb6d80a40b4229bee0a0c15f0529fd6cb21a4"
content-hash = "ee24b0e885ea951eecbda5e76314d711ed5ae02f63c69fd79c11ad2e3fe5fb0f"

View File

@@ -20,6 +20,7 @@ click = "^8.2.0"
cryptography = "^45.0"
discord-py = "^2.5.2"
e2b-code-interpreter = "^1.5.2"
elevenlabs = "^1.50.0"
fastapi = "^0.116.1"
feedparser = "^6.0.11"
flake8 = "^7.3.0"
@@ -71,6 +72,7 @@ tweepy = "^4.16.0"
uvicorn = { extras = ["standard"], version = "^0.35.0" }
websockets = "^15.0"
youtube-transcript-api = "^1.2.1"
yt-dlp = "^2024.12.13"
zerobouncesdk = "^1.1.2"
# NOTE: please insert new dependencies in their alphabetical location
pytest-snapshot = "^0.9.0"

View File

@@ -26,6 +26,7 @@ export const providerIcons: Partial<
nvidia: fallbackIcon,
discord: FaDiscord,
d_id: fallbackIcon,
elevenlabs: fallbackIcon,
google_maps: FaGoogle,
jina: fallbackIcon,
ideogram: fallbackIcon,

View File

@@ -1,28 +0,0 @@
"""Video editing blocks for AutoGPT Platform.
This module provides blocks for:
- Downloading videos from URLs (YouTube, Vimeo, news sites, direct links)
- Clipping/trimming video segments
- Concatenating multiple videos
- Adding text overlays
- Adding AI-generated narration
Dependencies:
- yt-dlp: For video downloading
- moviepy: For video editing operations
- requests: For API calls (narration block)
"""
from .download import VideoDownloadBlock
from .clip import VideoClipBlock
from .concat import VideoConcatBlock
from .text_overlay import VideoTextOverlayBlock
from .narration import VideoNarrationBlock
__all__ = [
"VideoClipBlock",
"VideoConcatBlock",
"VideoDownloadBlock",
"VideoNarrationBlock",
"VideoTextOverlayBlock",
]

View File

@@ -1,93 +0,0 @@
"""
VideoClipBlock - Extract a segment from a video file
"""
import uuid
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
class VideoClipBlock(Block):
"""Extract a time segment from a video."""
class Input(BlockSchemaInput):
video_in: str = SchemaField(
description="Input video (URL, data URI, or file path)",
json_schema_extra={"format": "file"}
)
start_time: float = SchemaField(
description="Start time in seconds",
ge=0.0
)
end_time: float = SchemaField(
description="End time in seconds",
ge=0.0
)
output_format: str = SchemaField(
description="Output format",
default="mp4",
advanced=True
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Clipped video file",
json_schema_extra={"format": "file"}
)
duration: float = SchemaField(description="Clip duration in seconds")
def __init__(self):
super().__init__(
id="b2c3d4e5-f6a7-8901-bcde-f23456789012",
description="Extract a time segment from a video",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={"video_in": "/tmp/test.mp4", "start_time": 0.0, "end_time": 10.0},
test_output=[("video_out", str), ("duration", float)],
test_mock={"_clip_video": lambda *args: ("/tmp/clip.mp4", 10.0)}
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
# Validate time range
if input_data.end_time <= input_data.start_time:
raise BlockExecutionError(
message=f"end_time ({input_data.end_time}) must be greater than start_time ({input_data.start_time})",
block_name=self.name,
block_id=str(self.id)
)
try:
from moviepy.video.io.VideoFileClip import VideoFileClip
except ImportError as e:
raise BlockExecutionError(
message="moviepy is not installed. Please install it with: pip install moviepy",
block_name=self.name,
block_id=str(self.id)
) from e
clip = None
subclip = None
try:
clip = VideoFileClip(input_data.video_in)
subclip = clip.subclip(input_data.start_time, input_data.end_time)
output_path = f"/tmp/clip_{uuid.uuid4()}.{input_data.output_format}"
subclip.write_videofile(output_path, logger=None)
yield "video_out", output_path
yield "duration", subclip.duration
except Exception as e:
raise BlockExecutionError(
message=f"Failed to clip video: {e}",
block_name=self.name,
block_id=str(self.id)
) from e
finally:
if subclip:
subclip.close()
if clip:
clip.close()

View File

@@ -1,123 +0,0 @@
"""
VideoConcatBlock - Concatenate multiple video clips into one
"""
import uuid
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
class VideoConcatBlock(Block):
"""Merge multiple video clips into one continuous video."""
class Input(BlockSchemaInput):
videos: list[str] = SchemaField(
description="List of video files to concatenate (in order)"
)
transition: str = SchemaField(
description="Transition between clips",
default="none",
enum=["none", "crossfade", "fade_black"]
)
transition_duration: float = SchemaField(
description="Transition duration in seconds",
default=0.5,
advanced=True
)
output_format: str = SchemaField(
description="Output format",
default="mp4",
advanced=True
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Concatenated video file",
json_schema_extra={"format": "file"}
)
total_duration: float = SchemaField(description="Total duration in seconds")
def __init__(self):
super().__init__(
id="c3d4e5f6-a7b8-9012-cdef-345678901234",
description="Merge multiple video clips into one continuous video",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={"videos": ["/tmp/a.mp4", "/tmp/b.mp4"]},
test_output=[("video_out", str), ("total_duration", float)],
test_mock={"_concat_videos": lambda *args: ("/tmp/concat.mp4", 20.0)}
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
from moviepy.editor import VideoFileClip, concatenate_videoclips
except ImportError as e:
raise BlockExecutionError(
message="moviepy is not installed. Please install it with: pip install moviepy",
block_name=self.name,
block_id=str(self.id)
) from e
# Validate minimum clips
if len(input_data.videos) < 2:
raise BlockExecutionError(
message="At least 2 videos are required for concatenation",
block_name=self.name,
block_id=str(self.id)
)
clips = []
faded_clips = []
final = None
try:
# Load clips one by one to handle partial failures
for v in input_data.videos:
clips.append(VideoFileClip(v))
if input_data.transition == "crossfade":
# Apply crossfade between clips using crossfadein/crossfadeout
transition_dur = input_data.transition_duration
for i, clip in enumerate(clips):
if i > 0:
clip = clip.crossfadein(transition_dur)
if i < len(clips) - 1:
clip = clip.crossfadeout(transition_dur)
faded_clips.append(clip)
final = concatenate_videoclips(
faded_clips,
method="compose",
padding=-transition_dur
)
elif input_data.transition == "fade_black":
# Fade to black between clips
for clip in clips:
faded = clip.fadein(input_data.transition_duration).fadeout(
input_data.transition_duration
)
faded_clips.append(faded)
final = concatenate_videoclips(faded_clips)
else:
final = concatenate_videoclips(clips)
output_path = f"/tmp/concat_{uuid.uuid4()}.{input_data.output_format}"
final.write_videofile(output_path, logger=None)
yield "video_out", output_path
yield "total_duration", final.duration
except Exception as e:
raise BlockExecutionError(
message=f"Failed to concatenate videos: {e}",
block_name=self.name,
block_id=str(self.id)
) from e
finally:
if final:
final.close()
for clip in faded_clips:
clip.close()
for clip in clips:
clip.close()

View File

@@ -1,102 +0,0 @@
"""
VideoDownloadBlock - Download video from URL (YouTube, Vimeo, news sites, direct links)
"""
import uuid
from typing import Literal
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
class VideoDownloadBlock(Block):
"""Download video from URL using yt-dlp."""
class Input(BlockSchemaInput):
url: str = SchemaField(
description="URL of the video to download (YouTube, Vimeo, direct link, etc.)",
placeholder="https://www.youtube.com/watch?v=..."
)
quality: Literal["best", "1080p", "720p", "480p", "audio_only"] = SchemaField(
description="Video quality preference",
default="720p"
)
output_format: Literal["mp4", "webm", "mkv"] = SchemaField(
description="Output video format",
default="mp4",
advanced=True
)
class Output(BlockSchemaOutput):
video_file: str = SchemaField(
description="Path or data URI of downloaded video",
json_schema_extra={"format": "file"}
)
duration: float = SchemaField(description="Video duration in seconds")
title: str = SchemaField(description="Video title from source")
source_url: str = SchemaField(description="Original source URL")
def __init__(self):
super().__init__(
id="a1b2c3d4-e5f6-7890-abcd-ef1234567890",
description="Download video from URL (YouTube, Vimeo, news sites, direct links)",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={"url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ", "quality": "480p"},
test_output=[("video_file", str), ("duration", float), ("title", str), ("source_url", str)],
test_mock={"_download_video": lambda *args: ("/tmp/video.mp4", 212.0, "Test Video")}
)
def _get_format_string(self, quality: str) -> str:
formats = {
"best": "bestvideo+bestaudio/best",
"1080p": "bestvideo[height<=1080]+bestaudio/best[height<=1080]",
"720p": "bestvideo[height<=720]+bestaudio/best[height<=720]",
"480p": "bestvideo[height<=480]+bestaudio/best[height<=480]",
"audio_only": "bestaudio/best"
}
return formats.get(quality, formats["720p"])
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
import yt_dlp
except ImportError as e:
raise BlockExecutionError(
message="yt-dlp is not installed. Please install it with: pip install yt-dlp",
block_name=self.name,
block_id=str(self.id)
) from e
video_id = str(uuid.uuid4())[:8]
output_template = f"/tmp/{video_id}.%(ext)s"
ydl_opts = {
"format": self._get_format_string(input_data.quality),
"outtmpl": output_template,
"merge_output_format": input_data.output_format,
"quiet": True,
"no_warnings": True,
}
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(input_data.url, download=True)
video_path = ydl.prepare_filename(info)
# Handle format conversion in filename
if not video_path.endswith(f".{input_data.output_format}"):
video_path = video_path.rsplit(".", 1)[0] + f".{input_data.output_format}"
yield "video_file", video_path
yield "duration", info.get("duration") or 0.0
yield "title", info.get("title") or "Unknown"
yield "source_url", input_data.url
except Exception as e:
raise BlockExecutionError(
message=f"Failed to download video: {e}",
block_name=self.name,
block_id=str(self.id)
) from e

View File

@@ -1,167 +0,0 @@
"""
VideoNarrationBlock - Generate AI voice narration and add to video
"""
import uuid
from typing import Literal
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.model import SchemaField, CredentialsMetaInput, APIKeyCredentials
from backend.integrations.providers import ProviderName
from backend.util.exceptions import BlockExecutionError
class VideoNarrationBlock(Block):
"""Generate AI narration and add to video."""
class Input(BlockSchemaInput):
credentials: CredentialsMetaInput[
Literal[ProviderName.ELEVENLABS], Literal["api_key"]
] = SchemaField(
description="ElevenLabs API key for voice synthesis"
)
video_in: str = SchemaField(
description="Input video file",
json_schema_extra={"format": "file"}
)
script: str = SchemaField(
description="Narration script text"
)
voice_id: str = SchemaField(
description="ElevenLabs voice ID",
default="21m00Tcm4TlvDq8ikWAM" # Rachel
)
mix_mode: Literal["replace", "mix", "ducking"] = SchemaField(
description="How to combine with original audio",
default="ducking"
)
narration_volume: float = SchemaField(
description="Narration volume (0.0 to 2.0)",
default=1.0,
ge=0.0,
le=2.0,
advanced=True
)
original_volume: float = SchemaField(
description="Original audio volume when mixing (0.0 to 1.0)",
default=0.3,
ge=0.0,
le=1.0,
advanced=True
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Video with narration",
json_schema_extra={"format": "file"}
)
audio_file: str = SchemaField(
description="Generated audio file",
json_schema_extra={"format": "file"}
)
def __init__(self):
super().__init__(
id="e5f6a7b8-c9d0-1234-ef56-789012345678",
description="Generate AI narration and add to video",
categories={BlockCategory.MULTIMEDIA, BlockCategory.AI},
input_schema=self.Input,
output_schema=self.Output,
test_input={
"video_in": "/tmp/test.mp4",
"script": "Hello world",
"credentials": {"provider": "elevenlabs", "id": "test", "type": "api_key"}
},
test_output=[("video_out", str), ("audio_file", str)],
test_mock={"_generate_narration": lambda *args: ("/tmp/narrated.mp4", "/tmp/audio.mp3")}
)
async def run(
self,
input_data: Input,
*,
credentials: APIKeyCredentials,
**kwargs
) -> BlockOutput:
try:
import requests
from moviepy.editor import VideoFileClip, AudioFileClip, CompositeAudioClip
except ImportError as e:
raise BlockExecutionError(
message=f"Missing dependency: {e}. Install moviepy and requests.",
block_name=self.name,
block_id=str(self.id)
) from e
video = None
final = None
narration = None
try:
# Generate narration via ElevenLabs
response = requests.post(
f"https://api.elevenlabs.io/v1/text-to-speech/{input_data.voice_id}",
headers={
"xi-api-key": credentials.api_key.get_secret_value(),
"Content-Type": "application/json"
},
json={
"text": input_data.script,
"model_id": "eleven_monolingual_v1"
},
timeout=120
)
response.raise_for_status()
audio_path = f"/tmp/narration_{uuid.uuid4()}.mp3"
with open(audio_path, "wb") as f:
f.write(response.content)
# Combine with video
video = VideoFileClip(input_data.video_in)
narration = AudioFileClip(audio_path)
narration = narration.volumex(input_data.narration_volume)
if input_data.mix_mode == "replace":
final_audio = narration
elif input_data.mix_mode == "mix":
if video.audio:
original = video.audio.volumex(input_data.original_volume)
final_audio = CompositeAudioClip([original, narration])
else:
final_audio = narration
else: # ducking - lower original volume more when narration plays
if video.audio:
# Apply stronger attenuation for ducking effect
ducking_volume = input_data.original_volume * 0.3
original = video.audio.volumex(ducking_volume)
final_audio = CompositeAudioClip([original, narration])
else:
final_audio = narration
final = video.set_audio(final_audio)
output_path = f"/tmp/narrated_{uuid.uuid4()}.mp4"
final.write_videofile(output_path, logger=None)
yield "video_out", output_path
yield "audio_file", audio_path
except requests.exceptions.RequestException as e:
raise BlockExecutionError(
message=f"ElevenLabs API error: {e}",
block_name=self.name,
block_id=str(self.id)
) from e
except Exception as e:
raise BlockExecutionError(
message=f"Failed to add narration: {e}",
block_name=self.name,
block_id=str(self.id)
) from e
finally:
if narration:
narration.close()
if final:
final.close()
if video:
video.close()

View File

@@ -1,149 +0,0 @@
"""
VideoTextOverlayBlock - Add text overlay to video
"""
import uuid
from typing import Literal
from backend.data.block import Block, BlockCategory, BlockOutput
from backend.data.block import BlockSchemaInput, BlockSchemaOutput
from backend.data.model import SchemaField
from backend.util.exceptions import BlockExecutionError
class VideoTextOverlayBlock(Block):
"""Add text overlay/caption to video."""
class Input(BlockSchemaInput):
video_in: str = SchemaField(
description="Input video file",
json_schema_extra={"format": "file"}
)
text: str = SchemaField(
description="Text to overlay on video"
)
position: Literal[
"top", "center", "bottom",
"top-left", "top-right",
"bottom-left", "bottom-right"
] = SchemaField(
description="Position of text on screen",
default="bottom"
)
start_time: float | None = SchemaField(
description="When to show text (seconds). None = entire video",
default=None,
advanced=True
)
end_time: float | None = SchemaField(
description="When to hide text (seconds). None = until end",
default=None,
advanced=True
)
font_size: int = SchemaField(
description="Font size",
default=48,
ge=12,
le=200,
advanced=True
)
font_color: str = SchemaField(
description="Font color (hex or name)",
default="white",
advanced=True
)
bg_color: str | None = SchemaField(
description="Background color behind text (None for transparent)",
default=None,
advanced=True
)
class Output(BlockSchemaOutput):
video_out: str = SchemaField(
description="Video with text overlay",
json_schema_extra={"format": "file"}
)
def __init__(self):
super().__init__(
id="d4e5f6a7-b8c9-0123-def4-567890123456",
description="Add text overlay/caption to video",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={"video_in": "/tmp/test.mp4", "text": "Hello World"},
test_output=[("video_out", str)],
test_mock={"_add_text": lambda *args: "/tmp/overlay.mp4"}
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
from moviepy.editor import VideoFileClip, TextClip, CompositeVideoClip
except ImportError as e:
raise BlockExecutionError(
message="moviepy is not installed. Please install it with: pip install moviepy",
block_name=self.name,
block_id=str(self.id)
) from e
# Validate time range if both are provided
if (input_data.start_time is not None and
input_data.end_time is not None and
input_data.end_time <= input_data.start_time):
raise BlockExecutionError(
message=f"end_time ({input_data.end_time}) must be greater than start_time ({input_data.start_time})",
block_name=self.name,
block_id=str(self.id)
)
video = None
final = None
txt_clip = None
try:
video = VideoFileClip(input_data.video_in)
txt_clip = TextClip(
input_data.text,
fontsize=input_data.font_size,
color=input_data.font_color,
bg_color=input_data.bg_color,
)
# Position mapping
pos_map = {
"top": ("center", "top"),
"center": ("center", "center"),
"bottom": ("center", "bottom"),
"top-left": ("left", "top"),
"top-right": ("right", "top"),
"bottom-left": ("left", "bottom"),
"bottom-right": ("right", "bottom"),
}
txt_clip = txt_clip.set_position(pos_map[input_data.position])
# Set timing
start = input_data.start_time or 0
end = input_data.end_time or video.duration
duration = max(0, end - start)
txt_clip = txt_clip.set_start(start).set_end(end).set_duration(duration)
final = CompositeVideoClip([video, txt_clip])
output_path = f"/tmp/overlay_{uuid.uuid4()}.mp4"
final.write_videofile(output_path, logger=None)
yield "video_out", output_path
except Exception as e:
raise BlockExecutionError(
message=f"Failed to add text overlay: {e}",
block_name=self.name,
block_id=str(self.id)
) from e
finally:
if txt_clip:
txt_clip.close()
if final:
final.close()
if video:
video.close()