Compare commits

..

1 Commits

Author SHA1 Message Date
claude[bot]
fda10563e7 feat(blocks): add video transcription and editing blocks on dev
- Add TranscribeVideoBlock and EditVideoByTextBlock to blocks/video/
- Update video/__init__.py with new block exports
- Generate block documentation via generate_block_docs.py
- Fix wait=False bug in Replicate API calls (was returning Prediction
  object instead of actual output)
- Format fixes

Co-authored-by: Nicholas Tindle <ntindle@users.noreply.github.com>
2026-02-09 07:51:52 +00:00
12 changed files with 433 additions and 457 deletions

View File

@@ -13,32 +13,10 @@ from backend.api.features.chat.tools.models import (
NoResultsResponse,
)
from backend.api.features.store.hybrid_search import unified_hybrid_search
from backend.data.block import BlockType, get_block
from backend.data.block import get_block
logger = logging.getLogger(__name__)
_TARGET_RESULTS = 10
# Over-fetch to compensate for post-hoc filtering of graph-only blocks.
# 40 is 2x current removed; speed of query 10 vs 40 is minimial
_OVERFETCH_PAGE_SIZE = 40
# Block types that only work within graphs and cannot run standalone in CoPilot.
COPILOT_EXCLUDED_BLOCK_TYPES = {
BlockType.INPUT, # Graph interface definition - data enters via chat, not graph inputs
BlockType.OUTPUT, # Graph interface definition - data exits via chat, not graph outputs
BlockType.WEBHOOK, # Wait for external events - would hang forever in CoPilot
BlockType.WEBHOOK_MANUAL, # Same as WEBHOOK
BlockType.NOTE, # Visual annotation only - no runtime behavior
BlockType.HUMAN_IN_THE_LOOP, # Pauses for human approval - CoPilot IS human-in-the-loop
BlockType.AGENT, # AgentExecutorBlock requires execution_context - use run_agent tool
}
# Specific block IDs excluded from CoPilot (STANDARD type but still require graph context)
COPILOT_EXCLUDED_BLOCK_IDS = {
# SmartDecisionMakerBlock - dynamically discovers downstream blocks via graph topology
"3b191d9f-356f-482d-8238-ba04b6d18381",
}
class FindBlockTool(BaseTool):
"""Tool for searching available blocks."""
@@ -110,7 +88,7 @@ class FindBlockTool(BaseTool):
query=query,
content_types=[ContentType.BLOCK],
page=1,
page_size=_OVERFETCH_PAGE_SIZE,
page_size=10,
)
if not results:
@@ -130,90 +108,60 @@ class FindBlockTool(BaseTool):
block = get_block(block_id)
# Skip disabled blocks
if not block or block.disabled:
continue
if block and not block.disabled:
# Get input/output schemas
input_schema = {}
output_schema = {}
try:
input_schema = block.input_schema.jsonschema()
except Exception:
pass
try:
output_schema = block.output_schema.jsonschema()
except Exception:
pass
# Skip blocks excluded from CoPilot (graph-only blocks)
if (
block.block_type in COPILOT_EXCLUDED_BLOCK_TYPES
or block.id in COPILOT_EXCLUDED_BLOCK_IDS
):
continue
# Get categories from block instance
categories = []
if hasattr(block, "categories") and block.categories:
categories = [cat.value for cat in block.categories]
# Get input/output schemas
input_schema = {}
output_schema = {}
try:
input_schema = block.input_schema.jsonschema()
except Exception as e:
logger.debug(
"Failed to generate input schema for block %s: %s",
block_id,
e,
)
try:
output_schema = block.output_schema.jsonschema()
except Exception as e:
logger.debug(
"Failed to generate output schema for block %s: %s",
block_id,
e,
)
# Get categories from block instance
categories = []
if hasattr(block, "categories") and block.categories:
categories = [cat.value for cat in block.categories]
# Extract required inputs for easier use
required_inputs: list[BlockInputFieldInfo] = []
if input_schema:
properties = input_schema.get("properties", {})
required_fields = set(input_schema.get("required", []))
# Get credential field names to exclude from required inputs
credentials_fields = set(
block.input_schema.get_credentials_fields().keys()
)
for field_name, field_schema in properties.items():
# Skip credential fields - they're handled separately
if field_name in credentials_fields:
continue
required_inputs.append(
BlockInputFieldInfo(
name=field_name,
type=field_schema.get("type", "string"),
description=field_schema.get("description", ""),
required=field_name in required_fields,
default=field_schema.get("default"),
)
# Extract required inputs for easier use
required_inputs: list[BlockInputFieldInfo] = []
if input_schema:
properties = input_schema.get("properties", {})
required_fields = set(input_schema.get("required", []))
# Get credential field names to exclude from required inputs
credentials_fields = set(
block.input_schema.get_credentials_fields().keys()
)
blocks.append(
BlockInfoSummary(
id=block_id,
name=block.name,
description=block.description or "",
categories=categories,
input_schema=input_schema,
output_schema=output_schema,
required_inputs=required_inputs,
for field_name, field_schema in properties.items():
# Skip credential fields - they're handled separately
if field_name in credentials_fields:
continue
required_inputs.append(
BlockInputFieldInfo(
name=field_name,
type=field_schema.get("type", "string"),
description=field_schema.get("description", ""),
required=field_name in required_fields,
default=field_schema.get("default"),
)
)
blocks.append(
BlockInfoSummary(
id=block_id,
name=block.name,
description=block.description or "",
categories=categories,
input_schema=input_schema,
output_schema=output_schema,
required_inputs=required_inputs,
)
)
)
if len(blocks) >= _TARGET_RESULTS:
break
if blocks and len(blocks) < _TARGET_RESULTS:
logger.debug(
"find_block returned %d/%d results for query '%s' "
"(filtered %d excluded/disabled blocks)",
len(blocks),
_TARGET_RESULTS,
query,
len(results) - len(blocks),
)
if not blocks:
return NoResultsResponse(

View File

@@ -1,139 +0,0 @@
"""Tests for block filtering in FindBlockTool."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from backend.api.features.chat.tools.find_block import (
COPILOT_EXCLUDED_BLOCK_IDS,
COPILOT_EXCLUDED_BLOCK_TYPES,
FindBlockTool,
)
from backend.api.features.chat.tools.models import BlockListResponse
from backend.data.block import BlockType
from ._test_data import make_session
_TEST_USER_ID = "test-user-find-block"
def make_mock_block(
block_id: str, name: str, block_type: BlockType, disabled: bool = False
):
"""Create a mock block for testing."""
mock = MagicMock()
mock.id = block_id
mock.name = name
mock.description = f"{name} description"
mock.block_type = block_type
mock.disabled = disabled
mock.input_schema = MagicMock()
mock.input_schema.jsonschema.return_value = {"properties": {}, "required": []}
mock.input_schema.get_credentials_fields.return_value = {}
mock.output_schema = MagicMock()
mock.output_schema.jsonschema.return_value = {}
mock.categories = []
return mock
class TestFindBlockFiltering:
"""Tests for block filtering in FindBlockTool."""
def test_excluded_block_types_contains_expected_types(self):
"""Verify COPILOT_EXCLUDED_BLOCK_TYPES contains all graph-only types."""
assert BlockType.INPUT in COPILOT_EXCLUDED_BLOCK_TYPES
assert BlockType.OUTPUT in COPILOT_EXCLUDED_BLOCK_TYPES
assert BlockType.WEBHOOK in COPILOT_EXCLUDED_BLOCK_TYPES
assert BlockType.WEBHOOK_MANUAL in COPILOT_EXCLUDED_BLOCK_TYPES
assert BlockType.NOTE in COPILOT_EXCLUDED_BLOCK_TYPES
assert BlockType.HUMAN_IN_THE_LOOP in COPILOT_EXCLUDED_BLOCK_TYPES
assert BlockType.AGENT in COPILOT_EXCLUDED_BLOCK_TYPES
def test_excluded_block_ids_contains_smart_decision_maker(self):
"""Verify SmartDecisionMakerBlock is in COPILOT_EXCLUDED_BLOCK_IDS."""
assert "3b191d9f-356f-482d-8238-ba04b6d18381" in COPILOT_EXCLUDED_BLOCK_IDS
@pytest.mark.asyncio(loop_scope="session")
async def test_excluded_block_type_filtered_from_results(self):
"""Verify blocks with excluded BlockTypes are filtered from search results."""
session = make_session(user_id=_TEST_USER_ID)
# Mock search returns an INPUT block (excluded) and a STANDARD block (included)
search_results = [
{"content_id": "input-block-id", "score": 0.9},
{"content_id": "standard-block-id", "score": 0.8},
]
input_block = make_mock_block("input-block-id", "Input Block", BlockType.INPUT)
standard_block = make_mock_block(
"standard-block-id", "HTTP Request", BlockType.STANDARD
)
def mock_get_block(block_id):
return {
"input-block-id": input_block,
"standard-block-id": standard_block,
}.get(block_id)
with patch(
"backend.api.features.chat.tools.find_block.unified_hybrid_search",
new_callable=AsyncMock,
return_value=(search_results, 2),
):
with patch(
"backend.api.features.chat.tools.find_block.get_block",
side_effect=mock_get_block,
):
tool = FindBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID, session=session, query="test"
)
# Should only return the standard block, not the INPUT block
assert isinstance(response, BlockListResponse)
assert len(response.blocks) == 1
assert response.blocks[0].id == "standard-block-id"
@pytest.mark.asyncio(loop_scope="session")
async def test_excluded_block_id_filtered_from_results(self):
"""Verify SmartDecisionMakerBlock is filtered from search results."""
session = make_session(user_id=_TEST_USER_ID)
smart_decision_id = "3b191d9f-356f-482d-8238-ba04b6d18381"
search_results = [
{"content_id": smart_decision_id, "score": 0.9},
{"content_id": "normal-block-id", "score": 0.8},
]
# SmartDecisionMakerBlock has STANDARD type but is excluded by ID
smart_block = make_mock_block(
smart_decision_id, "Smart Decision Maker", BlockType.STANDARD
)
normal_block = make_mock_block(
"normal-block-id", "Normal Block", BlockType.STANDARD
)
def mock_get_block(block_id):
return {
smart_decision_id: smart_block,
"normal-block-id": normal_block,
}.get(block_id)
with patch(
"backend.api.features.chat.tools.find_block.unified_hybrid_search",
new_callable=AsyncMock,
return_value=(search_results, 2),
):
with patch(
"backend.api.features.chat.tools.find_block.get_block",
side_effect=mock_get_block,
):
tool = FindBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID, session=session, query="decision"
)
# Should only return normal block, not SmartDecisionMakerBlock
assert isinstance(response, BlockListResponse)
assert len(response.blocks) == 1
assert response.blocks[0].id == "normal-block-id"

View File

@@ -8,10 +8,6 @@ from typing import Any
from pydantic_core import PydanticUndefined
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools.find_block import (
COPILOT_EXCLUDED_BLOCK_IDS,
COPILOT_EXCLUDED_BLOCK_TYPES,
)
from backend.data.block import get_block
from backend.data.execution import ExecutionContext
from backend.data.model import CredentialsMetaInput
@@ -216,19 +212,6 @@ class RunBlockTool(BaseTool):
session_id=session_id,
)
# Check if block is excluded from CoPilot (graph-only blocks)
if (
block.block_type in COPILOT_EXCLUDED_BLOCK_TYPES
or block.id in COPILOT_EXCLUDED_BLOCK_IDS
):
return ErrorResponse(
message=(
f"Block '{block.name}' cannot be run directly in CoPilot. "
"This block is designed for use within graphs only."
),
session_id=session_id,
)
logger.info(f"Executing block {block.name} ({block_id}) for user {user_id}")
creds_manager = IntegrationCredentialsManager()

View File

@@ -1,106 +0,0 @@
"""Tests for block execution guards in RunBlockTool."""
from unittest.mock import MagicMock, patch
import pytest
from backend.api.features.chat.tools.models import ErrorResponse
from backend.api.features.chat.tools.run_block import RunBlockTool
from backend.data.block import BlockType
from ._test_data import make_session
_TEST_USER_ID = "test-user-run-block"
def make_mock_block(
block_id: str, name: str, block_type: BlockType, disabled: bool = False
):
"""Create a mock block for testing."""
mock = MagicMock()
mock.id = block_id
mock.name = name
mock.block_type = block_type
mock.disabled = disabled
mock.input_schema = MagicMock()
mock.input_schema.jsonschema.return_value = {"properties": {}, "required": []}
mock.input_schema.get_credentials_fields_info.return_value = []
return mock
class TestRunBlockFiltering:
"""Tests for block execution guards in RunBlockTool."""
@pytest.mark.asyncio(loop_scope="session")
async def test_excluded_block_type_returns_error(self):
"""Attempting to execute a block with excluded BlockType returns error."""
session = make_session(user_id=_TEST_USER_ID)
input_block = make_mock_block("input-block-id", "Input Block", BlockType.INPUT)
with patch(
"backend.api.features.chat.tools.run_block.get_block",
return_value=input_block,
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="input-block-id",
input_data={},
)
assert isinstance(response, ErrorResponse)
assert "cannot be run directly in CoPilot" in response.message
assert "designed for use within graphs only" in response.message
@pytest.mark.asyncio(loop_scope="session")
async def test_excluded_block_id_returns_error(self):
"""Attempting to execute SmartDecisionMakerBlock returns error."""
session = make_session(user_id=_TEST_USER_ID)
smart_decision_id = "3b191d9f-356f-482d-8238-ba04b6d18381"
smart_block = make_mock_block(
smart_decision_id, "Smart Decision Maker", BlockType.STANDARD
)
with patch(
"backend.api.features.chat.tools.run_block.get_block",
return_value=smart_block,
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id=smart_decision_id,
input_data={},
)
assert isinstance(response, ErrorResponse)
assert "cannot be run directly in CoPilot" in response.message
@pytest.mark.asyncio(loop_scope="session")
async def test_non_excluded_block_passes_guard(self):
"""Non-excluded blocks pass the filtering guard (may fail later for other reasons)."""
session = make_session(user_id=_TEST_USER_ID)
standard_block = make_mock_block(
"standard-id", "HTTP Request", BlockType.STANDARD
)
with patch(
"backend.api.features.chat.tools.run_block.get_block",
return_value=standard_block,
):
tool = RunBlockTool()
response = await tool._execute(
user_id=_TEST_USER_ID,
session=session,
block_id="standard-id",
input_data={},
)
# Should NOT be an ErrorResponse about CoPilot exclusion
# (may be other errors like missing credentials, but not the exclusion guard)
if isinstance(response, ErrorResponse):
assert "cannot be run directly in CoPilot" not in response.message

View File

@@ -8,7 +8,6 @@ Includes BM25 reranking for improved lexical relevance.
import logging
import re
import time
from dataclasses import dataclass
from typing import Any, Literal
@@ -363,11 +362,7 @@ async def unified_hybrid_search(
LIMIT {limit_param} OFFSET {offset_param}
"""
try:
results = await query_raw_with_schema(sql_query, *params)
except Exception as e:
await _log_vector_error_diagnostics(e)
raise
results = await query_raw_with_schema(sql_query, *params)
total = results[0]["total_count"] if results else 0
# Apply BM25 reranking
@@ -691,11 +686,7 @@ async def hybrid_search(
LIMIT {limit_param} OFFSET {offset_param}
"""
try:
results = await query_raw_with_schema(sql_query, *params)
except Exception as e:
await _log_vector_error_diagnostics(e)
raise
results = await query_raw_with_schema(sql_query, *params)
total = results[0]["total_count"] if results else 0
@@ -727,87 +718,6 @@ async def hybrid_search_simple(
return await hybrid_search(query=query, page=page, page_size=page_size)
# ============================================================================
# Diagnostics
# ============================================================================
# Rate limit: only log vector error diagnostics once per this interval
_VECTOR_DIAG_INTERVAL_SECONDS = 60
_last_vector_diag_time: float = 0
async def _log_vector_error_diagnostics(error: Exception) -> None:
"""Log diagnostic info when 'type vector does not exist' error occurs.
Note: Diagnostic queries use query_raw_with_schema which may run on a different
pooled connection than the one that failed. Session-level search_path can differ,
so these diagnostics show cluster-wide state, not necessarily the failed session.
Includes rate limiting to avoid log spam - only logs once per minute.
Caller should re-raise the error after calling this function.
"""
global _last_vector_diag_time
# Check if this is the vector type error
error_str = str(error).lower()
if not (
"type" in error_str and "vector" in error_str and "does not exist" in error_str
):
return
# Rate limit: only log once per interval
now = time.time()
if now - _last_vector_diag_time < _VECTOR_DIAG_INTERVAL_SECONDS:
return
_last_vector_diag_time = now
try:
diagnostics: dict[str, object] = {}
try:
search_path_result = await query_raw_with_schema("SHOW search_path")
diagnostics["search_path"] = search_path_result
except Exception as e:
diagnostics["search_path"] = f"Error: {e}"
try:
schema_result = await query_raw_with_schema("SELECT current_schema()")
diagnostics["current_schema"] = schema_result
except Exception as e:
diagnostics["current_schema"] = f"Error: {e}"
try:
user_result = await query_raw_with_schema(
"SELECT current_user, session_user, current_database()"
)
diagnostics["user_info"] = user_result
except Exception as e:
diagnostics["user_info"] = f"Error: {e}"
try:
# Check pgvector extension installation (cluster-wide, stable info)
ext_result = await query_raw_with_schema(
"SELECT extname, extversion, nspname as schema "
"FROM pg_extension e "
"JOIN pg_namespace n ON e.extnamespace = n.oid "
"WHERE extname = 'vector'"
)
diagnostics["pgvector_extension"] = ext_result
except Exception as e:
diagnostics["pgvector_extension"] = f"Error: {e}"
logger.error(
f"Vector type error diagnostics:\n"
f" Error: {error}\n"
f" search_path: {diagnostics.get('search_path')}\n"
f" current_schema: {diagnostics.get('current_schema')}\n"
f" user_info: {diagnostics.get('user_info')}\n"
f" pgvector_extension: {diagnostics.get('pgvector_extension')}"
)
except Exception as diag_error:
logger.error(f"Failed to collect vector error diagnostics: {diag_error}")
# Backward compatibility alias - HybridSearchWeights maps to StoreAgentSearchWeights
# for existing code that expects the popularity parameter
HybridSearchWeights = StoreAgentSearchWeights

View File

@@ -9,11 +9,14 @@ This module provides blocks for:
- Getting media duration
- Looping videos
- Adding audio to videos
- Transcribing video speech to text
- Editing videos by modifying their transcript
Dependencies:
- yt-dlp: For video downloading
- moviepy: For video editing operations
- elevenlabs: For AI narration (optional)
- replicate: For video transcription and text-based editing
"""
from backend.blocks.video.add_audio import AddAudioToVideoBlock
@@ -21,14 +24,18 @@ from backend.blocks.video.clip import VideoClipBlock
from backend.blocks.video.concat import VideoConcatBlock
from backend.blocks.video.download import VideoDownloadBlock
from backend.blocks.video.duration import MediaDurationBlock
from backend.blocks.video.edit_by_text import EditVideoByTextBlock
from backend.blocks.video.loop import LoopVideoBlock
from backend.blocks.video.narration import VideoNarrationBlock
from backend.blocks.video.text_overlay import VideoTextOverlayBlock
from backend.blocks.video.transcribe import TranscribeVideoBlock
__all__ = [
"AddAudioToVideoBlock",
"EditVideoByTextBlock",
"LoopVideoBlock",
"MediaDurationBlock",
"TranscribeVideoBlock",
"VideoClipBlock",
"VideoConcatBlock",
"VideoDownloadBlock",

View File

@@ -0,0 +1,159 @@
"""EditVideoByTextBlock - Edit a video by modifying its transcript via Replicate."""
from __future__ import annotations
import logging
from typing import Literal
from replicate.client import Client as ReplicateClient
from replicate.helpers import FileOutput
from backend.blocks.replicate._auth import (
TEST_CREDENTIALS,
TEST_CREDENTIALS_INPUT,
ReplicateCredentials,
ReplicateCredentialsInput,
)
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.execution import ExecutionContext
from backend.data.model import CredentialsField, SchemaField
from backend.util.exceptions import BlockExecutionError
from backend.util.file import MediaFileType, store_media_file
logger = logging.getLogger(__name__)
class EditVideoByTextBlock(Block):
"""Edit a video by modifying its transcript, cutting segments via Replicate API."""
class Input(BlockSchemaInput):
credentials: ReplicateCredentialsInput = CredentialsField(
description="Replicate API key for video editing.",
)
video_in: MediaFileType = SchemaField(
description="Input video file to edit (URL, data URI, or local path)",
)
transcription: str = SchemaField(
description="Desired transcript for the output video",
)
split_at: Literal["word", "character"] = SchemaField(
description="Granularity for transcript matching",
default="word",
)
class Output(BlockSchemaOutput):
video_url: str = SchemaField(
description="URL of the edited video",
)
transcription: str = SchemaField(
description="Transcription used for editing",
)
def __init__(self):
super().__init__(
id="98d40049-a1de-465f-bba1-47411298ad1a",
description="Edit a video by modifying its transcript",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={
"credentials": TEST_CREDENTIALS_INPUT,
"video_in": "data:video/mp4;base64,AAAA",
"transcription": "edited transcript",
},
test_output=[
("video_url", "https://replicate.com/output/video.mp4"),
("transcription", "edited transcript"),
],
test_mock={
"_edit_video": lambda *args: "https://replicate.com/output/video.mp4",
"_store_input_video": lambda *args, **kwargs: "data:video/mp4;base64,AAAA",
},
test_credentials=TEST_CREDENTIALS,
)
async def _store_input_video(
self, execution_context: ExecutionContext, file: MediaFileType
) -> MediaFileType:
"""Store input video locally. Extracted for testability."""
return await store_media_file(
file=file,
execution_context=execution_context,
return_format="for_external_api",
)
async def _edit_video(
self, data_uri: str, transcription: str, split_at: str, api_key: str
) -> str:
"""Call Replicate API to edit the video based on the transcript."""
client = ReplicateClient(api_token=api_key)
output = await client.async_run(
"jd7h/edit-video-by-editing-text:e010b880347314d07e3ce3b21cbd4c57add51fea3474677a6cb1316751c4cb90",
input={
"mode": "edit",
"video_in": data_uri,
"transcription": transcription,
"split_at": split_at,
},
)
# Get video URL from output
if isinstance(output, dict) and "video" in output:
video_output = output["video"]
if isinstance(video_output, FileOutput):
return video_output.url
return str(video_output)
if isinstance(output, list) and len(output) > 0:
video_url = output[0]
if isinstance(video_url, FileOutput):
return video_url.url
return str(video_url)
if isinstance(output, FileOutput):
return output.url
if isinstance(output, str):
return output
raise ValueError(f"Unexpected output format from Replicate API: {output}")
async def run(
self,
input_data: Input,
*,
credentials: ReplicateCredentials,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
try:
# Store video and get data URI for API submission
data_uri = await self._store_input_video(
execution_context, input_data.video_in
)
video_url = await self._edit_video(
data_uri,
input_data.transcription,
input_data.split_at,
credentials.api_key.get_secret_value(),
)
yield "video_url", video_url
yield "transcription", input_data.transcription
except BlockExecutionError:
raise
except Exception as e:
raise BlockExecutionError(
message=f"Failed to edit video: {e}",
block_name=self.name,
block_id=str(self.id),
) from e

View File

@@ -0,0 +1,139 @@
"""TranscribeVideoBlock - Transcribe speech from a video file using Replicate."""
from __future__ import annotations
import logging
from replicate.client import Client as ReplicateClient
from replicate.helpers import FileOutput
from backend.blocks.replicate._auth import (
TEST_CREDENTIALS,
TEST_CREDENTIALS_INPUT,
ReplicateCredentials,
ReplicateCredentialsInput,
)
from backend.data.block import (
Block,
BlockCategory,
BlockOutput,
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.execution import ExecutionContext
from backend.data.model import CredentialsField, SchemaField
from backend.util.exceptions import BlockExecutionError
from backend.util.file import MediaFileType, store_media_file
logger = logging.getLogger(__name__)
class TranscribeVideoBlock(Block):
"""Transcribe speech from a video file to text via Replicate API."""
class Input(BlockSchemaInput):
credentials: ReplicateCredentialsInput = CredentialsField(
description="Replicate API key for video transcription.",
)
video_in: MediaFileType = SchemaField(
description="Input video file to transcribe (URL, data URI, or local path)",
)
class Output(BlockSchemaOutput):
transcription: str = SchemaField(
description="Text transcription extracted from the video",
)
def __init__(self):
super().__init__(
id="fa49dad0-a5fc-441c-ba04-2ac206e392d8",
description="Transcribe speech from a video file to text",
categories={BlockCategory.MULTIMEDIA},
input_schema=self.Input,
output_schema=self.Output,
test_input={
"credentials": TEST_CREDENTIALS_INPUT,
"video_in": "data:video/mp4;base64,AAAA",
},
test_output=[("transcription", "example transcript")],
test_mock={
"_transcribe": lambda *args: "example transcript",
"_store_input_video": lambda *args, **kwargs: "test.mp4",
},
test_credentials=TEST_CREDENTIALS,
)
async def _store_input_video(
self, execution_context: ExecutionContext, file: MediaFileType
) -> MediaFileType:
"""Store input video locally. Extracted for testability."""
return await store_media_file(
file=file,
execution_context=execution_context,
return_format="for_external_api",
)
async def _transcribe(self, data_uri: str, api_key: str) -> str:
"""Call Replicate API to transcribe the video."""
client = ReplicateClient(api_token=api_key)
output = await client.async_run(
"jd7h/edit-video-by-editing-text:e010b880347314d07e3ce3b21cbd4c57add51fea3474677a6cb1316751c4cb90",
input={
"mode": "transcribe",
"video_in": data_uri,
},
)
# Handle dictionary response format
if isinstance(output, dict):
if "transcription" in output:
return str(output["transcription"])
if "error" in output:
raise ValueError(f"API returned error: {output['error']}")
# Handle list formats
if isinstance(output, list) and len(output) > 0:
if isinstance(output[0], FileOutput):
return output[0].url
if isinstance(output[0], dict) and "text" in output[0]:
return " ".join(
segment.get("text", "") for segment in output # type: ignore
)
return str(output[0])
if isinstance(output, FileOutput):
return output.url
if isinstance(output, str):
return output
raise ValueError(f"Unexpected output format from Replicate API: {output}")
async def run(
self,
input_data: Input,
*,
credentials: ReplicateCredentials,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
try:
# Store video and get data URI for API submission
data_uri = await self._store_input_video(
execution_context, input_data.video_in
)
transcript = await self._transcribe(
data_uri, credentials.api_key.get_secret_value()
)
yield "transcription", transcript
except BlockExecutionError:
raise
except Exception as e:
raise BlockExecutionError(
message=f"Failed to transcribe video: {e}",
block_name=self.name,
block_id=str(self.id),
) from e

View File

@@ -474,8 +474,10 @@ Below is a comprehensive list of all available blocks, categorized by their prim
| Block Name | Description |
|------------|-------------|
| [Add Audio To Video](block-integrations/video/add_audio.md#add-audio-to-video) | Block to attach an audio file to a video file using moviepy |
| [Edit Video By Text](block-integrations/video/edit_by_text.md#edit-video-by-text) | Edit a video by modifying its transcript |
| [Loop Video](block-integrations/video/loop.md#loop-video) | Block to loop a video to a given duration or number of repeats |
| [Media Duration](block-integrations/video/duration.md#media-duration) | Block to get the duration of a media file |
| [Transcribe Video](block-integrations/video/transcribe.md#transcribe-video) | Transcribe speech from a video file to text |
| [Video Clip](block-integrations/video/clip.md#video-clip) | Extract a time segment from a video |
| [Video Concat](block-integrations/video/concat.md#video-concat) | Merge multiple video clips into one continuous video |
| [Video Download](block-integrations/video/download.md#video-download) | Download video from URL (YouTube, Vimeo, news sites, direct links) |

View File

@@ -133,8 +133,10 @@
* [Video Concat](block-integrations/video/concat.md)
* [Video Download](block-integrations/video/download.md)
* [Video Duration](block-integrations/video/duration.md)
* [Video Edit By Text](block-integrations/video/edit_by_text.md)
* [Video Loop](block-integrations/video/loop.md)
* [Video Narration](block-integrations/video/narration.md)
* [Video Text Overlay](block-integrations/video/text_overlay.md)
* [Video Transcribe](block-integrations/video/transcribe.md)
* [Wolfram LLM API](block-integrations/wolfram/llm_api.md)
* [Zerobounce Validate Emails](block-integrations/zerobounce/validate_emails.md)

View File

@@ -0,0 +1,37 @@
# Video Edit By Text
<!-- MANUAL: file_description -->
_Add a description of this category of blocks._
<!-- END MANUAL -->
## Edit Video By Text
### What it is
Edit a video by modifying its transcript
### How it works
<!-- MANUAL: how_it_works -->
_Add technical explanation here._
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| video_in | Input video file to edit (URL, data URI, or local path) | str (file) | Yes |
| transcription | Desired transcript for the output video | str | Yes |
| split_at | Granularity for transcript matching | "word" \| "character" | No |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| video_url | URL of the edited video | str |
| transcription | Transcription used for editing | str |
### Possible use case
<!-- MANUAL: use_case -->
_Add practical use case examples here._
<!-- END MANUAL -->
---

View File

@@ -0,0 +1,34 @@
# Video Transcribe
<!-- MANUAL: file_description -->
_Add a description of this category of blocks._
<!-- END MANUAL -->
## Transcribe Video
### What it is
Transcribe speech from a video file to text
### How it works
<!-- MANUAL: how_it_works -->
_Add technical explanation here._
<!-- END MANUAL -->
### Inputs
| Input | Description | Type | Required |
|-------|-------------|------|----------|
| video_in | Input video file to transcribe (URL, data URI, or local path) | str (file) | Yes |
### Outputs
| Output | Description | Type |
|--------|-------------|------|
| error | Error message if the operation failed | str |
| transcription | Text transcription extracted from the video | str |
### Possible use case
<!-- MANUAL: use_case -->
_Add practical use case examples here._
<!-- END MANUAL -->
---