Merge branch 'dev' into open-2047-add-type-checking-step-to-front-end-ci

This commit is contained in:
Nicholas Tindle
2025-01-28 07:53:27 +00:00
committed by GitHub
19 changed files with 1706 additions and 916 deletions

View File

@@ -25,7 +25,7 @@ jobs:
close-issue-message: >
This issue was closed automatically because it has been stale for 10 days
with no activity.
days-before-stale: 50
days-before-stale: 100
days-before-close: 10
# Do not touch meta issues:
exempt-issue-labels: meta,fridge,project management

View File

@@ -3,6 +3,7 @@ from typing import Any, List
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema, BlockType
from backend.data.model import SchemaField
from backend.util.file import MediaFile, store_media_file
from backend.util.mock import MockObject
from backend.util.text import TextFormatter
from backend.util.type import convert
@@ -10,6 +11,42 @@ from backend.util.type import convert
formatter = TextFormatter()
class FileStoreBlock(Block):
class Input(BlockSchema):
file_in: MediaFile = SchemaField(
description="The file to store in the temporary directory, it can be a URL, data URI, or local path."
)
class Output(BlockSchema):
file_out: MediaFile = SchemaField(
description="The relative path to the stored file in the temporary directory."
)
def __init__(self):
super().__init__(
id="cbb50872-625b-42f0-8203-a2ae78242d8a",
description="Stores the input file in the temporary directory.",
categories={BlockCategory.BASIC, BlockCategory.MULTIMEDIA},
input_schema=FileStoreBlock.Input,
output_schema=FileStoreBlock.Output,
static_output=True,
)
def run(
self,
input_data: Input,
*,
graph_exec_id: str,
**kwargs,
) -> BlockOutput:
file_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.file_in,
return_content=False,
)
yield "file_out", file_path
class StoreValueBlock(Block):
"""
This block allows you to provide a constant value as a block, in a stateless manner.

View File

@@ -151,7 +151,7 @@ class IdeogramModelBlock(Block):
super().__init__(
id="6ab085e2-20b3-4055-bc3e-08036e01eca6",
description="This block runs Ideogram models with both simple and advanced settings.",
categories={BlockCategory.AI},
categories={BlockCategory.AI, BlockCategory.MULTIMEDIA},
input_schema=IdeogramModelBlock.Input,
output_schema=IdeogramModelBlock.Output,
test_input={

View File

@@ -1,5 +1,6 @@
from backend.blocks.linear._api import LinearAPIException, LinearClient
from backend.blocks.linear._auth import (
LINEAR_OAUTH_IS_CONFIGURED,
TEST_CREDENTIALS_INPUT_OAUTH,
TEST_CREDENTIALS_OAUTH,
LinearCredentials,
@@ -41,6 +42,7 @@ class LinearCreateCommentBlock(Block):
"comment": "Test comment",
"credentials": TEST_CREDENTIALS_INPUT_OAUTH,
},
disabled=not LINEAR_OAUTH_IS_CONFIGURED,
test_credentials=TEST_CREDENTIALS_OAUTH,
test_output=[("comment_id", "abc123"), ("comment_body", "Test comment")],
test_mock={

View File

@@ -1,5 +1,6 @@
from backend.blocks.linear._api import LinearAPIException, LinearClient
from backend.blocks.linear._auth import (
LINEAR_OAUTH_IS_CONFIGURED,
TEST_CREDENTIALS_INPUT_OAUTH,
TEST_CREDENTIALS_OAUTH,
LinearCredentials,
@@ -44,6 +45,7 @@ class LinearCreateIssueBlock(Block):
super().__init__(
id="f9c68f55-dcca-40a8-8771-abf9601680aa",
description="Creates a new issue on Linear",
disabled=not LINEAR_OAUTH_IS_CONFIGURED,
input_schema=self.Input,
output_schema=self.Output,
categories={BlockCategory.PRODUCTIVITY, BlockCategory.ISSUE_TRACKING},
@@ -132,6 +134,7 @@ class LinearSearchIssuesBlock(Block):
description="Searches for issues on Linear",
input_schema=self.Input,
output_schema=self.Output,
disabled=not LINEAR_OAUTH_IS_CONFIGURED,
test_input={
"term": "Test issue",
"credentials": TEST_CREDENTIALS_INPUT_OAUTH,

View File

@@ -1,5 +1,6 @@
from backend.blocks.linear._api import LinearAPIException, LinearClient
from backend.blocks.linear._auth import (
LINEAR_OAUTH_IS_CONFIGURED,
TEST_CREDENTIALS_INPUT_OAUTH,
TEST_CREDENTIALS_OAUTH,
LinearCredentials,
@@ -36,6 +37,7 @@ class LinearSearchProjectsBlock(Block):
"term": "Test project",
"credentials": TEST_CREDENTIALS_INPUT_OAUTH,
},
disabled=not LINEAR_OAUTH_IS_CONFIGURED,
test_credentials=TEST_CREDENTIALS_OAUTH,
test_output=[
(

View File

@@ -0,0 +1,245 @@
import os
import tempfile
from typing import Literal, Optional
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.video.fx.Loop import Loop
from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField
from backend.util.file import MediaFile, get_exec_file_path, store_media_file
class MediaDurationBlock(Block):
class Input(BlockSchema):
media_in: MediaFile = SchemaField(
description="Media input (URL, data URI, or local path)."
)
is_video: bool = SchemaField(
description="Whether the media is a video (True) or audio (False).",
default=True,
)
class Output(BlockSchema):
duration: float = SchemaField(
description="Duration of the media file (in seconds)."
)
error: str = SchemaField(
description="Error message if something fails.", default=""
)
def __init__(self):
super().__init__(
id="d8b91fd4-da26-42d4-8ecb-8b196c6d84b6",
description="Block to get the duration of a media file.",
categories={BlockCategory.MULTIMEDIA},
input_schema=MediaDurationBlock.Input,
output_schema=MediaDurationBlock.Output,
)
def run(
self,
input_data: Input,
*,
graph_exec_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the input media locally
local_media_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.media_in,
return_content=False,
)
media_abspath = get_exec_file_path(graph_exec_id, local_media_path)
# 2) Load the clip
if input_data.is_video:
clip = VideoFileClip(media_abspath)
else:
clip = AudioFileClip(media_abspath)
yield "duration", clip.duration
class LoopVideoBlock(Block):
"""
Block for looping (repeating) a video clip until a given duration or number of loops.
"""
class Input(BlockSchema):
video_in: MediaFile = SchemaField(
description="The input video (can be a URL, data URI, or local path)."
)
# Provide EITHER a `duration` or `n_loops` or both. We'll demonstrate `duration`.
duration: Optional[float] = SchemaField(
description="Target duration (in seconds) to loop the video to. If omitted, defaults to no looping.",
default=None,
ge=0.0,
)
n_loops: Optional[int] = SchemaField(
description="Number of times to repeat the video. If omitted, defaults to 1 (no repeat).",
default=None,
ge=1,
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="How to return the output video. Either a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchema):
video_out: str = SchemaField(
description="Looped video returned either as a relative path or a data URI."
)
error: str = SchemaField(
description="Error message if something fails.", default=""
)
def __init__(self):
super().__init__(
id="8bf9eef6-5451-4213-b265-25306446e94b",
description="Block to loop a video to a given duration or number of repeats.",
categories={BlockCategory.MULTIMEDIA},
input_schema=LoopVideoBlock.Input,
output_schema=LoopVideoBlock.Output,
)
def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the input video locally
local_video_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.video_in,
return_content=False,
)
input_abspath = get_exec_file_path(graph_exec_id, local_video_path)
# 2) Load the clip
clip = VideoFileClip(input_abspath)
# 3) Apply the loop effect
looped_clip = clip
if input_data.duration:
# Loop until we reach the specified duration
looped_clip = looped_clip.with_effects([Loop(duration=input_data.duration)])
elif input_data.n_loops:
looped_clip = looped_clip.with_effects([Loop(n=input_data.n_loops)])
else:
raise ValueError("Either 'duration' or 'n_loops' must be provided.")
assert isinstance(looped_clip, VideoFileClip)
# 4) Save the looped output
output_filename = MediaFile(
f"{node_exec_id}_looped_{os.path.basename(local_video_path)}"
)
output_abspath = get_exec_file_path(graph_exec_id, output_filename)
looped_clip = looped_clip.with_audio(clip.audio)
looped_clip.write_videofile(output_abspath, codec="libx264", audio_codec="aac")
# Return as data URI
video_out = store_media_file(
graph_exec_id=graph_exec_id,
file=output_filename,
return_content=input_data.output_return_type == "data_uri",
)
yield "video_out", video_out
class AddAudioToVideoBlock(Block):
"""
Block that adds (attaches) an audio track to an existing video.
Optionally scale the volume of the new track.
"""
class Input(BlockSchema):
video_in: MediaFile = SchemaField(
description="Video input (URL, data URI, or local path)."
)
audio_in: MediaFile = SchemaField(
description="Audio input (URL, data URI, or local path)."
)
volume: float = SchemaField(
description="Volume scale for the newly attached audio track (1.0 = original).",
default=1.0,
)
output_return_type: Literal["file_path", "data_uri"] = SchemaField(
description="Return the final output as a relative path or base64 data URI.",
default="file_path",
)
class Output(BlockSchema):
video_out: MediaFile = SchemaField(
description="Final video (with attached audio), as a path or data URI."
)
error: str = SchemaField(
description="Error message if something fails.", default=""
)
def __init__(self):
super().__init__(
id="3503748d-62b6-4425-91d6-725b064af509",
description="Block to attach an audio file to a video file using moviepy.",
categories={BlockCategory.MULTIMEDIA},
input_schema=AddAudioToVideoBlock.Input,
output_schema=AddAudioToVideoBlock.Output,
)
def run(
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
**kwargs,
) -> BlockOutput:
# 1) Store the inputs locally
local_video_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.video_in,
return_content=False,
)
local_audio_path = store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.audio_in,
return_content=False,
)
abs_temp_dir = os.path.join(tempfile.gettempdir(), "exec_file", graph_exec_id)
video_abspath = os.path.join(abs_temp_dir, local_video_path)
audio_abspath = os.path.join(abs_temp_dir, local_audio_path)
# 2) Load video + audio with moviepy
video_clip = VideoFileClip(video_abspath)
audio_clip = AudioFileClip(audio_abspath)
# Optionally scale volume
if input_data.volume != 1.0:
audio_clip = audio_clip.with_volume_scaled(input_data.volume)
# 3) Attach the new audio track
final_clip = video_clip.with_audio(audio_clip)
# 4) Write to output file
output_filename = MediaFile(
f"{node_exec_id}_audio_attached_{os.path.basename(local_video_path)}"
)
output_abspath = os.path.join(abs_temp_dir, output_filename)
final_clip.write_videofile(output_abspath, codec="libx264", audio_codec="aac")
# 5) Return either path or data URI
video_out = store_media_file(
graph_exec_id=graph_exec_id,
file=output_filename,
return_content=input_data.output_return_type == "data_uri",
)
yield "video_out", video_out

View File

@@ -131,7 +131,7 @@ class ReplicateFluxAdvancedModelBlock(Block):
super().__init__(
id="90f8c45e-e983-4644-aa0b-b4ebe2f531bc",
description="This block runs Flux models on Replicate with advanced settings.",
categories={BlockCategory.AI},
categories={BlockCategory.AI, BlockCategory.MULTIMEDIA},
input_schema=ReplicateFluxAdvancedModelBlock.Input,
output_schema=ReplicateFluxAdvancedModelBlock.Output,
test_input={

View File

@@ -78,7 +78,7 @@ class CreateTalkingAvatarVideoBlock(Block):
super().__init__(
id="98c6f503-8c47-4b1c-a96d-351fc7c87dab",
description="This block integrates with D-ID to create video clips and retrieve their URLs.",
categories={BlockCategory.AI},
categories={BlockCategory.AI, BlockCategory.MULTIMEDIA},
input_schema=CreateTalkingAvatarVideoBlock.Input,
output_schema=CreateTalkingAvatarVideoBlock.Output,
test_input={

View File

@@ -53,7 +53,7 @@ class UnrealTextToSpeechBlock(Block):
super().__init__(
id="4ff1ff6d-cc40-4caa-ae69-011daa20c378",
description="Converts text to speech using the Unreal Speech API",
categories={BlockCategory.AI, BlockCategory.TEXT},
categories={BlockCategory.AI, BlockCategory.TEXT, BlockCategory.MULTIMEDIA},
input_schema=UnrealTextToSpeechBlock.Input,
output_schema=UnrealTextToSpeechBlock.Output,
test_input={

View File

@@ -66,6 +66,7 @@ class BlockCategory(Enum):
)
PRODUCTIVITY = "Block that helps with productivity"
ISSUE_TRACKING = "Block that helps with issue tracking"
MULTIMEDIA = "Block that interacts with multimedia content"
def dict(self) -> dict[str, str]:
return {"category": self.name, "description": self.value}

View File

@@ -40,6 +40,7 @@ from backend.data.graph import GraphModel, Link, Node
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.util import json
from backend.util.decorator import error_logged, time_measured
from backend.util.file import clean_exec_files
from backend.util.logging import configure_logging
from backend.util.process import set_service_name
from backend.util.service import (
@@ -169,7 +170,12 @@ def execute_node(
log_metadata.info("Executed node with input", input=input_data_str)
update_execution(ExecutionStatus.RUNNING)
extra_exec_kwargs = {}
extra_exec_kwargs: dict = {
"graph_id": graph_id,
"node_id": node_id,
"graph_exec_id": graph_exec_id,
"node_exec_id": node_exec_id,
}
# Last-minute fetch credentials + acquire a system-wide read-write lock to prevent
# changes during execution. ⚠️ This means a set of credentials can only be used by
# one (running) block at a time; simultaneous execution of blocks using same
@@ -729,6 +735,7 @@ class Executor:
finished = True
cancel.set()
cancel_thread.join()
clean_exec_files(graph_exec.graph_exec_id)
return (
exec_stats,

View File

@@ -0,0 +1,143 @@
import base64
import mimetypes
import re
import shutil
import tempfile
import uuid
from pathlib import Path
from urllib.parse import urlparse
# This "requests" presumably has additional checks against internal networks for SSRF.
from backend.util.request import requests
TEMP_DIR = Path(tempfile.gettempdir()).resolve()
def get_exec_file_path(graph_exec_id: str, path: str) -> str:
"""
Utility to build an absolute path in the {temp}/exec_file/{exec_id}/... folder.
"""
return str(TEMP_DIR / "exec_file" / graph_exec_id / path)
def clean_exec_files(graph_exec_id: str, file: str = "") -> None:
"""
Utility to remove the {temp}/exec_file/{exec_id} folder and its contents.
"""
exec_path = Path(get_exec_file_path(graph_exec_id, file))
if exec_path.exists() and exec_path.is_dir():
shutil.rmtree(exec_path)
"""
MediaFile is a string that represents a file. It can be one of the following:
- Data URI: base64 encoded media file. See https://developer.mozilla.org/en-US/docs/Web/URI/Schemes/data/
- URL: Media file hosted on the internet, it starts with http:// or https://.
- Local path (anything else): A temporary file path living within graph execution time.
Note: Replace this type alias into a proper class, when more information is needed.
"""
MediaFile = str
def store_media_file(
graph_exec_id: str, file: MediaFile, return_content: bool = False
) -> MediaFile:
"""
Safely handle 'file' (a data URI, a URL, or a local path relative to {temp}/exec_file/{exec_id}),
placing or verifying it under:
{tempdir}/exec_file/{exec_id}/...
If 'return_content=True', return a data URI (data:<mime>;base64,<content>).
Otherwise, returns the file media path relative to the exec_id folder.
For each MediaFile type:
- Data URI:
-> decode and store in a new random file in that folder
- URL:
-> download and store in that folder
- Local path:
-> interpret as relative to that folder; verify it exists
(no copying, as it's presumably already there).
We realpath-check so no symlink or '..' can escape the folder.
:param graph_exec_id: The unique ID of the graph execution.
:param file: Data URI, URL, or local (relative) path.
:param return_content: If True, return a data URI of the file content.
If False, return the *relative* path inside the exec_id folder.
:return: The requested result: data URI or relative path of the media.
"""
# Build base path
base_path = Path(get_exec_file_path(graph_exec_id, ""))
base_path.mkdir(parents=True, exist_ok=True)
# Helper functions
def _extension_from_mime(mime: str) -> str:
ext = mimetypes.guess_extension(mime, strict=False)
return ext if ext else ".bin"
def _file_to_data_uri(path: Path) -> str:
mime_type, _ = mimetypes.guess_type(path)
mime_type = mime_type or "application/octet-stream"
b64 = base64.b64encode(path.read_bytes()).decode("utf-8")
return f"data:{mime_type};base64,{b64}"
def _ensure_inside_base(path_candidate: Path, base: Path) -> Path:
"""
Resolve symlinks via resolve() and ensure the result is still under base.
"""
real_candidate = path_candidate.resolve()
real_base = base.resolve()
if not real_candidate.is_relative_to(real_base):
raise ValueError(
"Local file path is outside the temp_base directory. Access denied."
)
return real_candidate
def _strip_base_prefix(absolute_path: Path, base: Path) -> str:
"""
Strip base prefix and normalize path.
"""
return str(absolute_path.relative_to(base))
# Process file
if file.startswith("data:"):
# Data URI
match = re.match(r"^data:([^;]+);base64,(.*)$", file, re.DOTALL)
if not match:
raise ValueError(
"Invalid data URI format. Expected data:<mime>;base64,<data>"
)
mime_type = match.group(1).strip().lower()
b64_content = match.group(2).strip()
# Generate filename and decode
extension = _extension_from_mime(mime_type)
filename = f"{uuid.uuid4()}{extension}"
target_path = _ensure_inside_base(base_path / filename, base_path)
target_path.write_bytes(base64.b64decode(b64_content))
elif file.startswith(("http://", "https://")):
# URL
parsed_url = urlparse(file)
filename = Path(parsed_url.path).name or f"{uuid.uuid4()}"
target_path = _ensure_inside_base(base_path / filename, base_path)
# Download and save
resp = requests.get(file)
resp.raise_for_status()
target_path.write_bytes(resp.content)
else:
# Local path
target_path = _ensure_inside_base(base_path / file, base_path)
if not target_path.is_file():
raise ValueError(f"Local file does not exist: {target_path}")
# Return result
if return_content:
return MediaFile(_file_to_data_uri(target_path))
else:
return MediaFile(_strip_base_prefix(target_path, base_path))

View File

@@ -57,7 +57,7 @@ async def wait_execution(
user_id: str,
graph_id: str,
graph_exec_id: str,
timeout: int = 20,
timeout: int = 30,
) -> Sequence[ExecutionResult]:
async def is_execution_completed():
status = await AgentServer().test_get_graph_run_status(graph_exec_id, user_id)

File diff suppressed because it is too large Load Diff

View File

@@ -55,6 +55,7 @@ sqlalchemy = "^2.0.36"
psycopg2-binary = "^2.9.10"
google-cloud-storage = "^2.18.2"
launchdarkly-server-sdk = "^9.8.0"
moviepy = "^2.1.2"
[tool.poetry.group.dev.dependencies]
poethepoet = "^0.32.1"

View File

@@ -11,9 +11,17 @@ const getYouTubeVideoId = (url: string) => {
};
const isValidVideoUrl = (url: string): boolean => {
if (url.startsWith("data:video")) {
return true;
}
const validUrl = /^(https?:\/\/)(www\.)?/i;
const videoExtensions = /\.(mp4|webm|ogg)$/i;
const youtubeRegex = /^(https?:\/\/)?(www\.)?(youtube\.com|youtu\.?be)\/.+$/;
return videoExtensions.test(url) || youtubeRegex.test(url);
const cleanedUrl = url.split("?")[0];
return (
(validUrl.test(cleanedUrl) && videoExtensions.test(cleanedUrl)) ||
youtubeRegex.test(cleanedUrl)
);
};
const isValidImageUrl = (url: string): boolean => {
@@ -26,9 +34,13 @@ const isValidImageUrl = (url: string): boolean => {
};
const isValidAudioUrl = (url: string): boolean => {
if (url.startsWith("data:audio")) {
return true;
}
const validUrl = /^(https?:\/\/)(www\.)?/i;
const audioExtensions = /\.(mp3|wav)$/i;
const cleanedUrl = url.split("?")[0];
return audioExtensions.test(cleanedUrl);
return validUrl.test(cleanedUrl) && audioExtensions.test(cleanedUrl);
};
const VideoRenderer: React.FC<{ videoUrl: string }> = ({ videoUrl }) => {

View File

@@ -142,49 +142,52 @@ export default function useAgentGraph(
setAgentDescription(graph.description);
setNodes(() => {
const newNodes = graph.nodes.map((node) => {
const block = availableNodes.find(
(block) => block.id === node.block_id,
)!;
const flow =
block.uiType == BlockUIType.AGENT
? availableFlows.find(
(flow) => flow.id === node.input_default.graph_id,
)
: null;
const newNode: CustomNode = {
id: node.id,
type: "custom",
position: {
x: node?.metadata?.position?.x || 0,
y: node?.metadata?.position?.y || 0,
},
data: {
block_id: block.id,
blockType: flow?.name || block.name,
blockCosts: block.costs,
categories: block.categories,
description: block.description,
title: `${block.name} ${node.id}`,
inputSchema: block.inputSchema,
outputSchema: block.outputSchema,
hardcodedValues: node.input_default,
webhook: node.webhook,
uiType: block.uiType,
connections: graph.links
.filter((l) => [l.source_id, l.sink_id].includes(node.id))
.map((link) => ({
edge_id: formatEdgeID(link),
source: link.source_id,
sourceHandle: link.source_name,
target: link.sink_id,
targetHandle: link.sink_name,
})),
isOutputOpen: false,
},
};
return newNode;
});
const newNodes = graph.nodes
.map((node) => {
const block = availableNodes.find(
(block) => block.id === node.block_id,
)!;
if (!block) return null;
const flow =
block.uiType == BlockUIType.AGENT
? availableFlows.find(
(flow) => flow.id === node.input_default.graph_id,
)
: null;
const newNode: CustomNode = {
id: node.id,
type: "custom",
position: {
x: node?.metadata?.position?.x || 0,
y: node?.metadata?.position?.y || 0,
},
data: {
block_id: block.id,
blockType: flow?.name || block.name,
blockCosts: block.costs,
categories: block.categories,
description: block.description,
title: `${block.name} ${node.id}`,
inputSchema: block.inputSchema,
outputSchema: block.outputSchema,
hardcodedValues: node.input_default,
webhook: node.webhook,
uiType: block.uiType,
connections: graph.links
.filter((l) => [l.source_id, l.sink_id].includes(node.id))
.map((link) => ({
edge_id: formatEdgeID(link),
source: link.source_id,
sourceHandle: link.source_name,
target: link.sink_id,
targetHandle: link.sink_name,
})),
isOutputOpen: false,
},
};
return newNode;
})
.filter((node) => node !== null);
setEdges((_) =>
graph.links.map((link) => ({
id: formatEdgeID(link),