refactor(backend/blocks): update block signatures to use ExecutionContext

Update remaining blocks to use the unified ExecutionContext parameter
instead of individual graph_exec_id, user_id, node_exec_id parameters.

Blocks updated:
- FileStoreBlock, FileReadBlock (basic.py, text.py)
- AgentFileInputBlock (io.py)
- MediaDurationBlock, LoopVideoBlock, AddAudioToVideoBlock (media.py)
- SendWebRequestBlock, SendAuthenticatedWebRequestBlock (http.py)
- ScreenshotWebPageBlock (screenshotone.py)
- ReadSpreadsheetBlock (spreadsheet.py)
- SendDiscordFileBlock (discord/bot_blocks.py)
- GmailSendBlock (google/gmail.py)
- Updated test for store_media_file

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Nicholas Tindle
2026-01-27 20:22:11 -06:00
parent ca5758cce6
commit 759248b7fe
10 changed files with 102 additions and 103 deletions

View File

@@ -9,6 +9,7 @@ from backend.data.block import (
BlockSchemaOutput,
BlockType,
)
from backend.data.execution import ExecutionContext
from backend.data.model import SchemaField
from backend.util.file import store_media_file
from backend.util.type import MediaFileType, convert
@@ -45,14 +46,12 @@ class FileStoreBlock(Block):
self,
input_data: Input,
*,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
yield "file_out", await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.file_in,
user_id=user_id,
execution_context=execution_context,
return_content=input_data.base_64,
)

View File

@@ -15,6 +15,7 @@ from backend.data.block import (
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.execution import ExecutionContext
from backend.data.model import APIKeyCredentials, SchemaField
from backend.util.file import store_media_file
from backend.util.request import Requests
@@ -666,8 +667,7 @@ class SendDiscordFileBlock(Block):
file: MediaFileType,
filename: str,
message_content: str,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
) -> dict:
intents = discord.Intents.default()
intents.guilds = True
@@ -731,9 +731,8 @@ class SendDiscordFileBlock(Block):
# Local file path - read from stored media file
# This would be a path from a previous block's output
stored_file = await store_media_file(
graph_exec_id=graph_exec_id,
file=file,
user_id=user_id,
execution_context=execution_context,
return_content=True, # Get as data URI
)
# Now process as data URI
@@ -781,8 +780,7 @@ class SendDiscordFileBlock(Block):
input_data: Input,
*,
credentials: APIKeyCredentials,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
try:
@@ -793,8 +791,7 @@ class SendDiscordFileBlock(Block):
file=input_data.file,
filename=input_data.filename,
message_content=input_data.message_content,
graph_exec_id=graph_exec_id,
user_id=user_id,
execution_context=execution_context,
)
yield "status", result.get("status", "Unknown error")

View File

@@ -21,6 +21,7 @@ from backend.data.block import (
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.execution import ExecutionContext
from backend.data.model import SchemaField
from backend.util.file import MediaFileType, get_exec_file_path, store_media_file
from backend.util.settings import Settings
@@ -95,8 +96,7 @@ def _make_mime_text(
async def create_mime_message(
input_data,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
) -> str:
"""Create a MIME message with attachments and return base64-encoded raw message."""
@@ -117,12 +117,13 @@ async def create_mime_message(
if input_data.attachments:
for attach in input_data.attachments:
local_path = await store_media_file(
user_id=user_id,
graph_exec_id=graph_exec_id,
file=attach,
execution_context=execution_context,
return_content=False,
)
abs_path = get_exec_file_path(graph_exec_id, local_path)
abs_path = get_exec_file_path(
execution_context.graph_exec_id or "", local_path
)
part = MIMEBase("application", "octet-stream")
with open(abs_path, "rb") as f:
part.set_payload(f.read())
@@ -582,27 +583,25 @@ class GmailSendBlock(GmailBase):
input_data: Input,
*,
credentials: GoogleCredentials,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
service = self._build_service(credentials, **kwargs)
result = await self._send_email(
service,
input_data,
graph_exec_id,
user_id,
execution_context,
)
yield "result", result
async def _send_email(
self, service, input_data: Input, graph_exec_id: str, user_id: str
self, service, input_data: Input, execution_context: ExecutionContext
) -> dict:
if not input_data.to or not input_data.subject or not input_data.body:
raise ValueError(
"At least one recipient, subject, and body are required for sending an email"
)
raw_message = await create_mime_message(input_data, graph_exec_id, user_id)
raw_message = await create_mime_message(input_data, execution_context)
sent_message = await asyncio.to_thread(
lambda: service.users()
.messages()
@@ -692,30 +691,28 @@ class GmailCreateDraftBlock(GmailBase):
input_data: Input,
*,
credentials: GoogleCredentials,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
service = self._build_service(credentials, **kwargs)
result = await self._create_draft(
service,
input_data,
graph_exec_id,
user_id,
execution_context,
)
yield "result", GmailDraftResult(
id=result["id"], message_id=result["message"]["id"], status="draft_created"
)
async def _create_draft(
self, service, input_data: Input, graph_exec_id: str, user_id: str
self, service, input_data: Input, execution_context: ExecutionContext
) -> dict:
if not input_data.to or not input_data.subject:
raise ValueError(
"At least one recipient and subject are required for creating a draft"
)
raw_message = await create_mime_message(input_data, graph_exec_id, user_id)
raw_message = await create_mime_message(input_data, execution_context)
draft = await asyncio.to_thread(
lambda: service.users()
.drafts()
@@ -1100,7 +1097,7 @@ class GmailGetThreadBlock(GmailBase):
async def _build_reply_message(
service, input_data, graph_exec_id: str, user_id: str
service, input_data, execution_context: ExecutionContext
) -> tuple[str, str]:
"""
Builds a reply MIME message for Gmail threads.
@@ -1190,12 +1187,11 @@ async def _build_reply_message(
# Handle attachments
for attach in input_data.attachments:
local_path = await store_media_file(
user_id=user_id,
graph_exec_id=graph_exec_id,
file=attach,
execution_context=execution_context,
return_content=False,
)
abs_path = get_exec_file_path(graph_exec_id, local_path)
abs_path = get_exec_file_path(execution_context.graph_exec_id or "", local_path)
part = MIMEBase("application", "octet-stream")
with open(abs_path, "rb") as f:
part.set_payload(f.read())
@@ -1311,16 +1307,14 @@ class GmailReplyBlock(GmailBase):
input_data: Input,
*,
credentials: GoogleCredentials,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
service = self._build_service(credentials, **kwargs)
message = await self._reply(
service,
input_data,
graph_exec_id,
user_id,
execution_context,
)
yield "messageId", message["id"]
yield "threadId", message.get("threadId", input_data.threadId)
@@ -1343,11 +1337,11 @@ class GmailReplyBlock(GmailBase):
yield "email", email
async def _reply(
self, service, input_data: Input, graph_exec_id: str, user_id: str
self, service, input_data: Input, execution_context: ExecutionContext
) -> dict:
# Build the reply message using the shared helper
raw, thread_id = await _build_reply_message(
service, input_data, graph_exec_id, user_id
service, input_data, execution_context
)
# Send the message
@@ -1441,16 +1435,14 @@ class GmailDraftReplyBlock(GmailBase):
input_data: Input,
*,
credentials: GoogleCredentials,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
service = self._build_service(credentials, **kwargs)
draft = await self._create_draft_reply(
service,
input_data,
graph_exec_id,
user_id,
execution_context,
)
yield "draftId", draft["id"]
yield "messageId", draft["message"]["id"]
@@ -1458,11 +1450,11 @@ class GmailDraftReplyBlock(GmailBase):
yield "status", "draft_created"
async def _create_draft_reply(
self, service, input_data: Input, graph_exec_id: str, user_id: str
self, service, input_data: Input, execution_context: ExecutionContext
) -> dict:
# Build the reply message using the shared helper
raw, thread_id = await _build_reply_message(
service, input_data, graph_exec_id, user_id
service, input_data, execution_context
)
# Create draft with proper thread association
@@ -1629,23 +1621,21 @@ class GmailForwardBlock(GmailBase):
input_data: Input,
*,
credentials: GoogleCredentials,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
service = self._build_service(credentials, **kwargs)
result = await self._forward_message(
service,
input_data,
graph_exec_id,
user_id,
execution_context,
)
yield "messageId", result["id"]
yield "threadId", result.get("threadId", "")
yield "status", "forwarded"
async def _forward_message(
self, service, input_data: Input, graph_exec_id: str, user_id: str
self, service, input_data: Input, execution_context: ExecutionContext
) -> dict:
if not input_data.to:
raise ValueError("At least one recipient is required for forwarding")
@@ -1727,12 +1717,13 @@ To: {original_to}
# Add any additional attachments
for attach in input_data.additionalAttachments:
local_path = await store_media_file(
user_id=user_id,
graph_exec_id=graph_exec_id,
file=attach,
execution_context=execution_context,
return_content=False,
)
abs_path = get_exec_file_path(graph_exec_id, local_path)
abs_path = get_exec_file_path(
execution_context.graph_exec_id or "", local_path
)
part = MIMEBase("application", "octet-stream")
with open(abs_path, "rb") as f:
part.set_payload(f.read())

View File

@@ -15,6 +15,7 @@ from backend.data.block import (
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.execution import ExecutionContext
from backend.data.model import (
CredentialsField,
CredentialsMetaInput,
@@ -116,10 +117,9 @@ class SendWebRequestBlock(Block):
@staticmethod
async def _prepare_files(
graph_exec_id: str,
execution_context: ExecutionContext,
files_name: str,
files: list[MediaFileType],
user_id: str,
) -> list[tuple[str, tuple[str, BytesIO, str]]]:
"""
Prepare files for the request by storing them and reading their content.
@@ -127,11 +127,15 @@ class SendWebRequestBlock(Block):
(files_name, (filename, BytesIO, mime_type))
"""
files_payload: list[tuple[str, tuple[str, BytesIO, str]]] = []
graph_exec_id = execution_context.graph_exec_id
assert graph_exec_id is not None
for media in files:
# Normalise to a list so we can repeat the same key
rel_path = await store_media_file(
graph_exec_id, media, user_id, return_content=False
file=media,
execution_context=execution_context,
return_content=False,
)
abs_path = get_exec_file_path(graph_exec_id, rel_path)
async with aiofiles.open(abs_path, "rb") as f:
@@ -143,7 +147,7 @@ class SendWebRequestBlock(Block):
return files_payload
async def run(
self, input_data: Input, *, graph_exec_id: str, user_id: str, **kwargs
self, input_data: Input, *, execution_context: ExecutionContext, **kwargs
) -> BlockOutput:
# ─── Parse/normalise body ────────────────────────────────────
body = input_data.body
@@ -174,7 +178,7 @@ class SendWebRequestBlock(Block):
files_payload: list[tuple[str, tuple[str, BytesIO, str]]] = []
if use_files:
files_payload = await self._prepare_files(
graph_exec_id, input_data.files_name, input_data.files, user_id
execution_context, input_data.files_name, input_data.files
)
# Enforce body format rules

View File

@@ -12,6 +12,7 @@ from backend.data.block import (
BlockSchemaInput,
BlockType,
)
from backend.data.execution import ExecutionContext
from backend.data.model import SchemaField
from backend.util.file import store_media_file
from backend.util.mock import MockObject
@@ -462,17 +463,15 @@ class AgentFileInputBlock(AgentInputBlock):
self,
input_data: Input,
*,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
if not input_data.value:
return
yield "result", await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.value,
user_id=user_id,
execution_context=execution_context,
return_content=input_data.base_64,
)

View File

@@ -13,6 +13,7 @@ from backend.data.block import (
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.execution import ExecutionContext
from backend.data.model import SchemaField
from backend.util.file import MediaFileType, get_exec_file_path, store_media_file
@@ -46,18 +47,19 @@ class MediaDurationBlock(Block):
self,
input_data: Input,
*,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
# 1) Store the input media locally
local_media_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.media_in,
user_id=user_id,
execution_context=execution_context,
return_content=False,
)
media_abspath = get_exec_file_path(graph_exec_id, local_media_path)
assert execution_context.graph_exec_id is not None
media_abspath = get_exec_file_path(
execution_context.graph_exec_id, local_media_path
)
# 2) Load the clip
if input_data.is_video:
@@ -111,16 +113,18 @@ class LoopVideoBlock(Block):
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
assert execution_context.graph_exec_id is not None
assert execution_context.node_exec_id is not None
graph_exec_id = execution_context.graph_exec_id
node_exec_id = execution_context.node_exec_id
# 1) Store the input video locally
local_video_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.video_in,
user_id=user_id,
execution_context=execution_context,
return_content=False,
)
input_abspath = get_exec_file_path(graph_exec_id, local_video_path)
@@ -151,9 +155,8 @@ class LoopVideoBlock(Block):
# Return as data URI
video_out = await store_media_file(
graph_exec_id=graph_exec_id,
file=output_filename,
user_id=user_id,
execution_context=execution_context,
return_content=input_data.output_return_type == "data_uri",
)
@@ -200,22 +203,23 @@ class AddAudioToVideoBlock(Block):
self,
input_data: Input,
*,
node_exec_id: str,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
assert execution_context.graph_exec_id is not None
assert execution_context.node_exec_id is not None
graph_exec_id = execution_context.graph_exec_id
node_exec_id = execution_context.node_exec_id
# 1) Store the inputs locally
local_video_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.video_in,
user_id=user_id,
execution_context=execution_context,
return_content=False,
)
local_audio_path = await store_media_file(
graph_exec_id=graph_exec_id,
file=input_data.audio_in,
user_id=user_id,
execution_context=execution_context,
return_content=False,
)
@@ -242,9 +246,8 @@ class AddAudioToVideoBlock(Block):
# 5) Return either path or data URI
video_out = await store_media_file(
graph_exec_id=graph_exec_id,
file=output_filename,
user_id=user_id,
execution_context=execution_context,
return_content=input_data.output_return_type == "data_uri",
)

View File

@@ -11,6 +11,7 @@ from backend.data.block import (
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.execution import ExecutionContext
from backend.data.model import (
APIKeyCredentials,
CredentialsField,
@@ -112,8 +113,7 @@ class ScreenshotWebPageBlock(Block):
@staticmethod
async def take_screenshot(
credentials: APIKeyCredentials,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
url: str,
viewport_width: int,
viewport_height: int,
@@ -155,11 +155,10 @@ class ScreenshotWebPageBlock(Block):
return {
"image": await store_media_file(
graph_exec_id=graph_exec_id,
file=MediaFileType(
f"data:image/{format.value};base64,{b64encode(content).decode('utf-8')}"
),
user_id=user_id,
execution_context=execution_context,
return_content=True,
)
}
@@ -169,15 +168,13 @@ class ScreenshotWebPageBlock(Block):
input_data: Input,
*,
credentials: APIKeyCredentials,
graph_exec_id: str,
user_id: str,
execution_context: ExecutionContext,
**kwargs,
) -> BlockOutput:
try:
screenshot_data = await self.take_screenshot(
credentials=credentials,
graph_exec_id=graph_exec_id,
user_id=user_id,
execution_context=execution_context,
url=input_data.url,
viewport_width=input_data.viewport_width,
viewport_height=input_data.viewport_height,

View File

@@ -7,6 +7,7 @@ from backend.data.block import (
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.execution import ExecutionContext
from backend.data.model import ContributorDetails, SchemaField
from backend.util.file import get_exec_file_path, store_media_file
from backend.util.type import MediaFileType
@@ -98,7 +99,7 @@ class ReadSpreadsheetBlock(Block):
)
async def run(
self, input_data: Input, *, graph_exec_id: str, user_id: str, **_kwargs
self, input_data: Input, *, execution_context: ExecutionContext, **_kwargs
) -> BlockOutput:
import csv
from io import StringIO
@@ -106,14 +107,15 @@ class ReadSpreadsheetBlock(Block):
# Determine data source - prefer file_input if provided, otherwise use contents
if input_data.file_input:
stored_file_path = await store_media_file(
user_id=user_id,
graph_exec_id=graph_exec_id,
file=input_data.file_input,
execution_context=execution_context,
return_content=False,
)
# Get full file path
file_path = get_exec_file_path(graph_exec_id, stored_file_path)
file_path = get_exec_file_path(
execution_context.graph_exec_id or "", stored_file_path
)
if not Path(file_path).exists():
raise ValueError(f"File does not exist: {file_path}")

View File

@@ -12,6 +12,7 @@ from backend.blocks.iteration import StepThroughItemsBlock
from backend.blocks.llm import AITextSummarizerBlock
from backend.blocks.text import ExtractTextInformationBlock
from backend.blocks.xml_parser import XMLParserBlock
from backend.data.execution import ExecutionContext
from backend.util.file import store_media_file
from backend.util.type import MediaFileType
@@ -233,9 +234,11 @@ class TestStoreMediaFileSecurity:
with pytest.raises(ValueError, match="File too large"):
await store_media_file(
graph_exec_id="test",
file=MediaFileType(large_data_uri),
user_id="test_user",
execution_context=ExecutionContext(
user_id="test_user",
graph_exec_id="test",
),
)
@patch("backend.util.file.Path")
@@ -270,9 +273,11 @@ class TestStoreMediaFileSecurity:
# Should raise an error when directory size exceeds limit
with pytest.raises(ValueError, match="Disk usage limit exceeded"):
await store_media_file(
graph_exec_id="test",
file=MediaFileType(
"data:text/plain;base64,dGVzdA=="
), # Small test file
user_id="test_user",
execution_context=ExecutionContext(
user_id="test_user",
graph_exec_id="test",
),
)

View File

@@ -11,6 +11,7 @@ from backend.data.block import (
BlockSchemaInput,
BlockSchemaOutput,
)
from backend.data.execution import ExecutionContext
from backend.data.model import SchemaField
from backend.util import json, text
from backend.util.file import get_exec_file_path, store_media_file
@@ -444,18 +445,19 @@ class FileReadBlock(Block):
)
async def run(
self, input_data: Input, *, graph_exec_id: str, user_id: str, **_kwargs
self, input_data: Input, *, execution_context: ExecutionContext, **_kwargs
) -> BlockOutput:
# Store the media file properly (handles URLs, data URIs, etc.)
stored_file_path = await store_media_file(
user_id=user_id,
graph_exec_id=graph_exec_id,
file=input_data.file_input,
execution_context=execution_context,
return_content=False,
)
# Get full file path
file_path = get_exec_file_path(graph_exec_id, stored_file_path)
file_path = get_exec_file_path(
execution_context.graph_exec_id or "", stored_file_path
)
if not Path(file_path).exists():
raise ValueError(f"File does not exist: {file_path}")