Address changes

This commit is contained in:
Zamil Majdy
2025-01-24 14:45:26 +01:00
parent a1a52b9569
commit d31167958c
5 changed files with 114 additions and 121 deletions

View File

@@ -40,7 +40,7 @@ class FileStoreBlock(Block):
**kwargs,
) -> BlockOutput:
file_path = store_temp_file(
exec_id=graph_exec_id,
graph_exec_id=graph_exec_id,
file=input_data.file_in,
return_content=False,
)

View File

@@ -8,7 +8,7 @@ from moviepy.video.io.VideoFileClip import VideoFileClip
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField
from backend.util.file import get_path, store_temp_file
from backend.util.file import get_exec_file_path, store_temp_file
class MediaDurationBlock(Block):
@@ -48,11 +48,11 @@ class MediaDurationBlock(Block):
) -> BlockOutput:
# 1) Store the input media locally
local_media_path = store_temp_file(
exec_id=graph_exec_id,
graph_exec_id=graph_exec_id,
file=input_data.media_in,
return_content=False,
)
media_abspath = get_path(graph_exec_id, local_media_path)
media_abspath = get_exec_file_path(graph_exec_id, local_media_path)
# 2) Load the clip
if input_data.is_video:
@@ -115,11 +115,11 @@ class LoopVideoBlock(Block):
) -> BlockOutput:
# 1) Store the input video locally
local_video_path = store_temp_file(
exec_id=graph_exec_id,
graph_exec_id=graph_exec_id,
file=input_data.video_in,
return_content=False,
)
input_abspath = get_path(graph_exec_id, local_video_path)
input_abspath = get_exec_file_path(graph_exec_id, local_video_path)
# 2) Load the clip
clip = VideoFileClip(input_abspath)
@@ -138,14 +138,14 @@ class LoopVideoBlock(Block):
# 4) Save the looped output
output_filename = f"{node_exec_id}_looped_{os.path.basename(local_video_path)}"
output_abspath = get_path(graph_exec_id, output_filename)
output_abspath = get_exec_file_path(graph_exec_id, output_filename)
looped_clip = looped_clip.with_audio(clip.audio)
looped_clip.write_videofile(output_abspath, codec="libx264", audio_codec="aac")
# Return as data URI
video_out = store_temp_file(
exec_id=graph_exec_id,
graph_exec_id=graph_exec_id,
file=output_filename,
return_content=input_data.output_return_type == "data_uri",
)
@@ -202,12 +202,12 @@ class AddAudioToVideoBlock(Block):
) -> BlockOutput:
# 1) Store the inputs locally
local_video_path = store_temp_file(
exec_id=graph_exec_id,
graph_exec_id=graph_exec_id,
file=input_data.video_in,
return_content=False,
)
local_audio_path = store_temp_file(
exec_id=graph_exec_id,
graph_exec_id=graph_exec_id,
file=input_data.audio_in,
return_content=False,
)
@@ -235,7 +235,7 @@ class AddAudioToVideoBlock(Block):
# 5) Return either path or data URI
video_out = store_temp_file(
exec_id=graph_exec_id,
graph_exec_id=graph_exec_id,
file=output_filename,
return_content=input_data.output_return_type == "data_uri",
)

View File

@@ -40,6 +40,7 @@ from backend.data.graph import GraphModel, Link, Node
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.util import json
from backend.util.decorator import error_logged, time_measured
from backend.util.file import clean_exec_files
from backend.util.logging import configure_logging
from backend.util.process import set_service_name
from backend.util.service import (
@@ -730,6 +731,7 @@ class Executor:
finished = True
cancel.set()
cancel_thread.join()
clean_exec_files(graph_exec.graph_exec_id)
return (
exec_stats,

View File

@@ -1,26 +1,35 @@
import base64
import mimetypes
import os
import re
import shutil
import tempfile
import uuid
from pathlib import Path
from urllib.parse import urlparse
# This "requests" presumably has additional checks against internal networks for SSRF.
from backend.util.request import requests
TEMP_DIR = tempfile.gettempdir()
TEMP_DIR = Path(tempfile.gettempdir()).resolve()
def get_path(exec_id: str, path: str) -> str:
def get_exec_file_path(graph_exec_id: str, path: str) -> str:
"""
Utility to build an absolute path in the {temp}/exec_file/{exec_id}/... folder.
"""
rel_path = os.path.join(TEMP_DIR, "exec_file", exec_id, path)
return os.path.realpath(rel_path)
return str(TEMP_DIR / "exec_file" / graph_exec_id / path)
def store_temp_file(exec_id: str, file: str, return_content: bool = False) -> str:
def clean_exec_files(graph_exec_id: str, file: str = "") -> None:
"""
Utility to remove the {temp}/exec_file/{exec_id} folder and its contents.
"""
exec_path = Path(get_exec_file_path(graph_exec_id, file))
if exec_path.exists() and exec_path.is_dir():
shutil.rmtree(exec_path)
def store_temp_file(graph_exec_id: str, file: str, return_content: bool = False) -> str:
"""
Safely handle 'file' (a data URI, a URL, or a local path relative to {temp}/exec_file/{exec_id}),
placing or verifying it under:
@@ -39,103 +48,82 @@ def store_temp_file(exec_id: str, file: str, return_content: bool = False) -> st
(no copying, as it's presumably already there).
We realpath-check so no symlink or '..' can escape the folder.
:param exec_id: Unique identifier for the execution context.
:param graph_exec_id: The unique ID of the graph execution.
:param file: Data URI, URL, or local (relative) path.
:param return_content: If True, return a data URI of the file content.
If False, return the *relative* path inside the exec_id folder.
:return: The requested result: data URI or relative path.
"""
# Build base path
base_path = Path(get_exec_file_path(graph_exec_id, ""))
base_path.mkdir(parents=True, exist_ok=True)
# 1) Build the absolute base path for this exec_id
temp_base = get_path(exec_id, "")
os.makedirs(temp_base, exist_ok=True)
# 2) Helper functions
# Helper functions
def _extension_from_mime(mime: str) -> str:
ext = mimetypes.guess_extension(mime, strict=False)
return ext if ext else ".bin"
def _file_to_data_uri(path: str) -> str:
def _file_to_data_uri(path: Path) -> str:
mime_type, _ = mimetypes.guess_type(path)
if not mime_type:
mime_type = "application/octet-stream"
with open(path, "rb") as f:
raw = f.read()
b64 = base64.b64encode(raw).decode("utf-8")
mime_type = mime_type or "application/octet-stream"
b64 = base64.b64encode(path.read_bytes()).decode("utf-8")
return f"data:{mime_type};base64,{b64}"
def _strip_base_prefix(absolute_path: str) -> str:
# Stripe temp_base prefix and normalize path
return absolute_path.removeprefix(temp_base).removeprefix(os.sep)
def _ensure_inside_base(path_candidate: Path, base: Path) -> Path:
"""
Resolve symlinks via resolve() and ensure the result is still under base.
"""
real_candidate = path_candidate.resolve()
real_base = base.resolve()
def _ensure_inside_base(path_candidate: str) -> str:
"""
Resolve symlinks via realpath and ensure the result is still under temp_base.
If valid, returns the real, absolute path.
Otherwise, raises ValueError.
"""
real_candidate = os.path.realpath(path_candidate)
real_base = os.path.realpath(temp_base)
# Must be either exactly the folder or inside it
if (
not real_candidate.startswith(real_base + os.sep)
and real_candidate != real_base
):
if not real_candidate.is_relative_to(real_base):
raise ValueError(
"Local file path is outside the temp_base directory. Access denied."
)
return real_candidate
# 3) Distinguish between data URI, URL, or local path
def _strip_base_prefix(absolute_path: Path, base: Path) -> str:
"""
Strip base prefix and normalize path.
"""
return str(absolute_path.relative_to(base))
# Process file
if file.startswith("data:"):
# === Data URI ===
# Data URI
match = re.match(r"^data:([^;]+);base64,(.*)$", file, re.DOTALL)
if not match:
raise ValueError(
"Invalid data URI format. Expected data:<mime>;base64,<data>"
)
mime_type = match.group(1).strip().lower()
b64_content = match.group(2).strip()
# Generate random filename with guessed extension
# Generate filename and decode
extension = _extension_from_mime(mime_type)
local_filename = str(uuid.uuid4()) + extension
# Our intended path
intended_path = os.path.join(temp_base, local_filename)
absolute_path = _ensure_inside_base(intended_path)
filename = f"{uuid.uuid4()}{extension}"
target_path = _ensure_inside_base(base_path / filename, base_path)
target_path.write_bytes(base64.b64decode(b64_content))
# Decode and write
raw_bytes = base64.b64decode(b64_content)
with open(absolute_path, "wb") as f:
f.write(raw_bytes)
elif file.startswith("http://") or file.startswith("https://"):
# === URL ===
elif file.startswith(("http://", "https://")):
# URL
parsed_url = urlparse(file)
basename = os.path.basename(parsed_url.path) or str(uuid.uuid4())
filename = Path(parsed_url.path).name or f"{uuid.uuid4()}"
target_path = _ensure_inside_base(base_path / filename, base_path)
intended_path = os.path.join(temp_base, basename)
absolute_path = _ensure_inside_base(intended_path)
# Download
# Download and save
resp = requests.get(file)
resp.raise_for_status()
with open(absolute_path, "wb") as f:
f.write(resp.content)
target_path.write_bytes(resp.content)
else:
# === Local path (relative to temp_base) ===
# interpret 'file' as a sub-path, then realpath-check it
intended_path = os.path.join(temp_base, file)
absolute_path = _ensure_inside_base(intended_path)
# Local path
target_path = _ensure_inside_base(base_path / file, base_path)
if not target_path.is_file():
raise ValueError(f"Local file does not exist: {target_path}")
# Check file actually exists
if not os.path.isfile(absolute_path):
raise ValueError(f"Local file does not exist: {absolute_path}")
# 4) Return result
# Return result
if return_content:
return _file_to_data_uri(absolute_path)
return _file_to_data_uri(target_path)
else:
return _strip_base_prefix(absolute_path)
return _strip_base_prefix(target_path, base_path)

View File

@@ -142,49 +142,52 @@ export default function useAgentGraph(
setAgentDescription(graph.description);
setNodes(() => {
const newNodes = graph.nodes.map((node) => {
const block = availableNodes.find(
(block) => block.id === node.block_id,
)!;
const flow =
block.uiType == BlockUIType.AGENT
? availableFlows.find(
(flow) => flow.id === node.input_default.graph_id,
)
: null;
const newNode: CustomNode = {
id: node.id,
type: "custom",
position: {
x: node?.metadata?.position?.x || 0,
y: node?.metadata?.position?.y || 0,
},
data: {
block_id: block.id,
blockType: flow?.name || block.name,
blockCosts: block.costs,
categories: block.categories,
description: block.description,
title: `${block.name} ${node.id}`,
inputSchema: block.inputSchema,
outputSchema: block.outputSchema,
hardcodedValues: node.input_default,
webhook: node.webhook,
uiType: block.uiType,
connections: graph.links
.filter((l) => [l.source_id, l.sink_id].includes(node.id))
.map((link) => ({
edge_id: formatEdgeID(link),
source: link.source_id,
sourceHandle: link.source_name,
target: link.sink_id,
targetHandle: link.sink_name,
})),
isOutputOpen: false,
},
};
return newNode;
});
const newNodes = graph.nodes
.map((node) => {
const block = availableNodes.find(
(block) => block.id === node.block_id,
)!;
if (!block) return null;
const flow =
block.uiType == BlockUIType.AGENT
? availableFlows.find(
(flow) => flow.id === node.input_default.graph_id,
)
: null;
const newNode: CustomNode = {
id: node.id,
type: "custom",
position: {
x: node?.metadata?.position?.x || 0,
y: node?.metadata?.position?.y || 0,
},
data: {
block_id: block.id,
blockType: flow?.name || block.name,
blockCosts: block.costs,
categories: block.categories,
description: block.description,
title: `${block.name} ${node.id}`,
inputSchema: block.inputSchema,
outputSchema: block.outputSchema,
hardcodedValues: node.input_default,
webhook: node.webhook,
uiType: block.uiType,
connections: graph.links
.filter((l) => [l.source_id, l.sink_id].includes(node.id))
.map((link) => ({
edge_id: formatEdgeID(link),
source: link.source_id,
sourceHandle: link.source_name,
target: link.sink_id,
targetHandle: link.sink_name,
})),
isOutputOpen: false,
},
};
return newNode;
})
.filter((node) => node !== null);
setEdges((_) =>
graph.links.map((link) => ({
id: formatEdgeID(link),