Merge origin/dev into swiftyos/sdk and resolve conflicts

This commit is contained in:
SwiftyOS
2025-07-04 15:16:46 +02:00
26 changed files with 2503 additions and 511 deletions

View File

@@ -120,6 +120,7 @@ Key models (defined in `/backend/schema.prisma`):
3. Define input/output schemas
4. Implement `run` method
5. Register in block registry
6. Generate the block uuid using `uuid.uuid4()`
**Modifying the API:**
1. Update route in `/backend/backend/server/routers/`

View File

@@ -1,12 +1,9 @@
import enum
from typing import Any, List
from typing import Any
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema, BlockType
from backend.data.model import SchemaField
from backend.util import json
from backend.util.file import store_media_file
from backend.util.mock import MockObject
from backend.util.prompt import estimate_token_count_str
from backend.util.type import MediaFileType, convert
@@ -121,266 +118,6 @@ class PrintToConsoleBlock(Block):
yield "status", "printed"
class FindInDictionaryBlock(Block):
class Input(BlockSchema):
input: Any = SchemaField(description="Dictionary to lookup from")
key: str | int = SchemaField(description="Key to lookup in the dictionary")
class Output(BlockSchema):
output: Any = SchemaField(description="Value found for the given key")
missing: Any = SchemaField(
description="Value of the input that missing the key"
)
def __init__(self):
super().__init__(
id="0e50422c-6dee-4145-83d6-3a5a392f65de",
description="Lookup the given key in the input dictionary/object/list and return the value.",
input_schema=FindInDictionaryBlock.Input,
output_schema=FindInDictionaryBlock.Output,
test_input=[
{"input": {"apple": 1, "banana": 2, "cherry": 3}, "key": "banana"},
{"input": {"x": 10, "y": 20, "z": 30}, "key": "w"},
{"input": [1, 2, 3], "key": 1},
{"input": [1, 2, 3], "key": 3},
{"input": MockObject(value="!!", key="key"), "key": "key"},
{"input": [{"k1": "v1"}, {"k2": "v2"}, {"k1": "v3"}], "key": "k1"},
],
test_output=[
("output", 2),
("missing", {"x": 10, "y": 20, "z": 30}),
("output", 2),
("missing", [1, 2, 3]),
("output", "key"),
("output", ["v1", "v3"]),
],
categories={BlockCategory.BASIC},
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
obj = input_data.input
key = input_data.key
if isinstance(obj, str):
obj = json.loads(obj)
if isinstance(obj, dict) and key in obj:
yield "output", obj[key]
elif isinstance(obj, list) and isinstance(key, int) and 0 <= key < len(obj):
yield "output", obj[key]
elif isinstance(obj, list) and isinstance(key, str):
if len(obj) == 0:
yield "output", []
elif isinstance(obj[0], dict) and key in obj[0]:
yield "output", [item[key] for item in obj if key in item]
else:
yield "output", [getattr(val, key) for val in obj if hasattr(val, key)]
elif isinstance(obj, object) and isinstance(key, str) and hasattr(obj, key):
yield "output", getattr(obj, key)
else:
yield "missing", input_data.input
class AddToDictionaryBlock(Block):
class Input(BlockSchema):
dictionary: dict[Any, Any] = SchemaField(
default_factory=dict,
description="The dictionary to add the entry to. If not provided, a new dictionary will be created.",
)
key: str = SchemaField(
default="",
description="The key for the new entry.",
placeholder="new_key",
advanced=False,
)
value: Any = SchemaField(
default=None,
description="The value for the new entry.",
placeholder="new_value",
advanced=False,
)
entries: dict[Any, Any] = SchemaField(
default_factory=dict,
description="The entries to add to the dictionary. This is the batch version of the `key` and `value` fields.",
advanced=True,
)
class Output(BlockSchema):
updated_dictionary: dict = SchemaField(
description="The dictionary with the new entry added."
)
error: str = SchemaField(description="Error message if the operation failed.")
def __init__(self):
super().__init__(
id="31d1064e-7446-4693-a7d4-65e5ca1180d1",
description="Adds a new key-value pair to a dictionary. If no dictionary is provided, a new one is created.",
categories={BlockCategory.BASIC},
input_schema=AddToDictionaryBlock.Input,
output_schema=AddToDictionaryBlock.Output,
test_input=[
{
"dictionary": {"existing_key": "existing_value"},
"key": "new_key",
"value": "new_value",
},
{"key": "first_key", "value": "first_value"},
{
"dictionary": {"existing_key": "existing_value"},
"entries": {"new_key": "new_value", "first_key": "first_value"},
},
],
test_output=[
(
"updated_dictionary",
{"existing_key": "existing_value", "new_key": "new_value"},
),
("updated_dictionary", {"first_key": "first_value"}),
(
"updated_dictionary",
{
"existing_key": "existing_value",
"new_key": "new_value",
"first_key": "first_value",
},
),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
updated_dict = input_data.dictionary.copy()
if input_data.value is not None and input_data.key:
updated_dict[input_data.key] = input_data.value
for key, value in input_data.entries.items():
updated_dict[key] = value
yield "updated_dictionary", updated_dict
class AddToListBlock(Block):
class Input(BlockSchema):
list: List[Any] = SchemaField(
default_factory=list,
advanced=False,
description="The list to add the entry to. If not provided, a new list will be created.",
)
entry: Any = SchemaField(
description="The entry to add to the list. Can be of any type (string, int, dict, etc.).",
advanced=False,
default=None,
)
entries: List[Any] = SchemaField(
default_factory=lambda: list(),
description="The entries to add to the list. This is the batch version of the `entry` field.",
advanced=True,
)
position: int | None = SchemaField(
default=None,
description="The position to insert the new entry. If not provided, the entry will be appended to the end of the list.",
)
class Output(BlockSchema):
updated_list: List[Any] = SchemaField(
description="The list with the new entry added."
)
error: str = SchemaField(description="Error message if the operation failed.")
def __init__(self):
super().__init__(
id="aeb08fc1-2fc1-4141-bc8e-f758f183a822",
description="Adds a new entry to a list. The entry can be of any type. If no list is provided, a new one is created.",
categories={BlockCategory.BASIC},
input_schema=AddToListBlock.Input,
output_schema=AddToListBlock.Output,
test_input=[
{
"list": [1, "string", {"existing_key": "existing_value"}],
"entry": {"new_key": "new_value"},
"position": 1,
},
{"entry": "first_entry"},
{"list": ["a", "b", "c"], "entry": "d"},
{
"entry": "e",
"entries": ["f", "g"],
"list": ["a", "b"],
"position": 1,
},
],
test_output=[
(
"updated_list",
[
1,
{"new_key": "new_value"},
"string",
{"existing_key": "existing_value"},
],
),
("updated_list", ["first_entry"]),
("updated_list", ["a", "b", "c", "d"]),
("updated_list", ["a", "f", "g", "e", "b"]),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
entries_added = input_data.entries.copy()
if input_data.entry:
entries_added.append(input_data.entry)
updated_list = input_data.list.copy()
if (pos := input_data.position) is not None:
updated_list = updated_list[:pos] + entries_added + updated_list[pos:]
else:
updated_list += entries_added
yield "updated_list", updated_list
class FindInListBlock(Block):
class Input(BlockSchema):
list: List[Any] = SchemaField(description="The list to search in.")
value: Any = SchemaField(description="The value to search for.")
class Output(BlockSchema):
index: int = SchemaField(description="The index of the value in the list.")
found: bool = SchemaField(
description="Whether the value was found in the list."
)
not_found_value: Any = SchemaField(
description="The value that was not found in the list."
)
def __init__(self):
super().__init__(
id="5e2c6d0a-1e37-489f-b1d0-8e1812b23333",
description="Finds the index of the value in the list.",
categories={BlockCategory.BASIC},
input_schema=FindInListBlock.Input,
output_schema=FindInListBlock.Output,
test_input=[
{"list": [1, 2, 3, 4, 5], "value": 3},
{"list": [1, 2, 3, 4, 5], "value": 6},
],
test_output=[
("index", 2),
("found", True),
("found", False),
("not_found_value", 6),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
yield "index", input_data.list.index(input_data.value)
yield "found", True
except ValueError:
yield "found", False
yield "not_found_value", input_data.value
class NoteBlock(Block):
class Input(BlockSchema):
text: str = SchemaField(description="The text to display in the sticky note.")
@@ -406,133 +143,6 @@ class NoteBlock(Block):
yield "output", input_data.text
class CreateDictionaryBlock(Block):
class Input(BlockSchema):
values: dict[str, Any] = SchemaField(
description="Key-value pairs to create the dictionary with",
placeholder="e.g., {'name': 'Alice', 'age': 25}",
)
class Output(BlockSchema):
dictionary: dict[str, Any] = SchemaField(
description="The created dictionary containing the specified key-value pairs"
)
error: str = SchemaField(
description="Error message if dictionary creation failed"
)
def __init__(self):
super().__init__(
id="b924ddf4-de4f-4b56-9a85-358930dcbc91",
description="Creates a dictionary with the specified key-value pairs. Use this when you know all the values you want to add upfront.",
categories={BlockCategory.DATA},
input_schema=CreateDictionaryBlock.Input,
output_schema=CreateDictionaryBlock.Output,
test_input=[
{
"values": {"name": "Alice", "age": 25, "city": "New York"},
},
{
"values": {"numbers": [1, 2, 3], "active": True, "score": 95.5},
},
],
test_output=[
(
"dictionary",
{"name": "Alice", "age": 25, "city": "New York"},
),
(
"dictionary",
{"numbers": [1, 2, 3], "active": True, "score": 95.5},
),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
# The values are already validated by Pydantic schema
yield "dictionary", input_data.values
except Exception as e:
yield "error", f"Failed to create dictionary: {str(e)}"
class CreateListBlock(Block):
class Input(BlockSchema):
values: List[Any] = SchemaField(
description="A list of values to be combined into a new list.",
placeholder="e.g., ['Alice', 25, True]",
)
max_size: int | None = SchemaField(
default=None,
description="Maximum size of the list. If provided, the list will be yielded in chunks of this size.",
advanced=True,
)
max_tokens: int | None = SchemaField(
default=None,
description="Maximum tokens for the list. If provided, the list will be yielded in chunks that fit within this token limit.",
advanced=True,
)
class Output(BlockSchema):
list: List[Any] = SchemaField(
description="The created list containing the specified values."
)
error: str = SchemaField(description="Error message if list creation failed.")
def __init__(self):
super().__init__(
id="a912d5c7-6e00-4542-b2a9-8034136930e4",
description="Creates a list with the specified values. Use this when you know all the values you want to add upfront. This block can also yield the list in batches based on a maximum size or token limit.",
categories={BlockCategory.DATA},
input_schema=CreateListBlock.Input,
output_schema=CreateListBlock.Output,
test_input=[
{
"values": ["Alice", 25, True],
},
{
"values": [1, 2, 3, "four", {"key": "value"}],
},
],
test_output=[
(
"list",
["Alice", 25, True],
),
(
"list",
[1, 2, 3, "four", {"key": "value"}],
),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
chunk = []
cur_tokens, max_tokens = 0, input_data.max_tokens
cur_size, max_size = 0, input_data.max_size
for value in input_data.values:
if max_tokens:
tokens = estimate_token_count_str(value)
else:
tokens = 0
# Check if adding this value would exceed either limit
if (max_tokens and (cur_tokens + tokens > max_tokens)) or (
max_size and (cur_size + 1 > max_size)
):
yield "list", chunk
chunk = [value]
cur_size, cur_tokens = 1, tokens
else:
chunk.append(value)
cur_size, cur_tokens = cur_size + 1, cur_tokens + tokens
# Yield final chunk if any
if chunk or not input_data.values:
yield "list", chunk
class TypeOptions(enum.Enum):
STRING = "string"
NUMBER = "number"
@@ -550,6 +160,7 @@ class UniversalTypeConverterBlock(Block):
class Output(BlockSchema):
value: Any = SchemaField(description="The converted value.")
error: str = SchemaField(description="Error message if conversion failed.")
def __init__(self):
super().__init__(

View File

@@ -0,0 +1,683 @@
from typing import Any, List
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField
from backend.util.json import loads
from backend.util.mock import MockObject
from backend.util.prompt import estimate_token_count_str
# =============================================================================
# Dictionary Manipulation Blocks
# =============================================================================
class CreateDictionaryBlock(Block):
class Input(BlockSchema):
values: dict[str, Any] = SchemaField(
description="Key-value pairs to create the dictionary with",
placeholder="e.g., {'name': 'Alice', 'age': 25}",
)
class Output(BlockSchema):
dictionary: dict[str, Any] = SchemaField(
description="The created dictionary containing the specified key-value pairs"
)
error: str = SchemaField(
description="Error message if dictionary creation failed"
)
def __init__(self):
super().__init__(
id="b924ddf4-de4f-4b56-9a85-358930dcbc91",
description="Creates a dictionary with the specified key-value pairs. Use this when you know all the values you want to add upfront.",
categories={BlockCategory.DATA},
input_schema=CreateDictionaryBlock.Input,
output_schema=CreateDictionaryBlock.Output,
test_input=[
{
"values": {"name": "Alice", "age": 25, "city": "New York"},
},
{
"values": {"numbers": [1, 2, 3], "active": True, "score": 95.5},
},
],
test_output=[
(
"dictionary",
{"name": "Alice", "age": 25, "city": "New York"},
),
(
"dictionary",
{"numbers": [1, 2, 3], "active": True, "score": 95.5},
),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
# The values are already validated by Pydantic schema
yield "dictionary", input_data.values
except Exception as e:
yield "error", f"Failed to create dictionary: {str(e)}"
class AddToDictionaryBlock(Block):
class Input(BlockSchema):
dictionary: dict[Any, Any] = SchemaField(
default_factory=dict,
description="The dictionary to add the entry to. If not provided, a new dictionary will be created.",
)
key: str = SchemaField(
default="",
description="The key for the new entry.",
placeholder="new_key",
advanced=False,
)
value: Any = SchemaField(
default=None,
description="The value for the new entry.",
placeholder="new_value",
advanced=False,
)
entries: dict[Any, Any] = SchemaField(
default_factory=dict,
description="The entries to add to the dictionary. This is the batch version of the `key` and `value` fields.",
advanced=True,
)
class Output(BlockSchema):
updated_dictionary: dict = SchemaField(
description="The dictionary with the new entry added."
)
error: str = SchemaField(description="Error message if the operation failed.")
def __init__(self):
super().__init__(
id="31d1064e-7446-4693-a7d4-65e5ca1180d1",
description="Adds a new key-value pair to a dictionary. If no dictionary is provided, a new one is created.",
categories={BlockCategory.BASIC},
input_schema=AddToDictionaryBlock.Input,
output_schema=AddToDictionaryBlock.Output,
test_input=[
{
"dictionary": {"existing_key": "existing_value"},
"key": "new_key",
"value": "new_value",
},
{"key": "first_key", "value": "first_value"},
{
"dictionary": {"existing_key": "existing_value"},
"entries": {"new_key": "new_value", "first_key": "first_value"},
},
],
test_output=[
(
"updated_dictionary",
{"existing_key": "existing_value", "new_key": "new_value"},
),
("updated_dictionary", {"first_key": "first_value"}),
(
"updated_dictionary",
{
"existing_key": "existing_value",
"new_key": "new_value",
"first_key": "first_value",
},
),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
updated_dict = input_data.dictionary.copy()
if input_data.value is not None and input_data.key:
updated_dict[input_data.key] = input_data.value
for key, value in input_data.entries.items():
updated_dict[key] = value
yield "updated_dictionary", updated_dict
class FindInDictionaryBlock(Block):
class Input(BlockSchema):
input: Any = SchemaField(description="Dictionary to lookup from")
key: str | int = SchemaField(description="Key to lookup in the dictionary")
class Output(BlockSchema):
output: Any = SchemaField(description="Value found for the given key")
missing: Any = SchemaField(
description="Value of the input that missing the key"
)
def __init__(self):
super().__init__(
id="0e50422c-6dee-4145-83d6-3a5a392f65de",
description="Lookup the given key in the input dictionary/object/list and return the value.",
input_schema=FindInDictionaryBlock.Input,
output_schema=FindInDictionaryBlock.Output,
test_input=[
{"input": {"apple": 1, "banana": 2, "cherry": 3}, "key": "banana"},
{"input": {"x": 10, "y": 20, "z": 30}, "key": "w"},
{"input": [1, 2, 3], "key": 1},
{"input": [1, 2, 3], "key": 3},
{"input": MockObject(value="!!", key="key"), "key": "key"},
{"input": [{"k1": "v1"}, {"k2": "v2"}, {"k1": "v3"}], "key": "k1"},
],
test_output=[
("output", 2),
("missing", {"x": 10, "y": 20, "z": 30}),
("output", 2),
("missing", [1, 2, 3]),
("output", "key"),
("output", ["v1", "v3"]),
],
categories={BlockCategory.BASIC},
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
obj = input_data.input
key = input_data.key
if isinstance(obj, str):
obj = loads(obj)
if isinstance(obj, dict) and key in obj:
yield "output", obj[key]
elif isinstance(obj, list) and isinstance(key, int) and 0 <= key < len(obj):
yield "output", obj[key]
elif isinstance(obj, list) and isinstance(key, str):
if len(obj) == 0:
yield "output", []
elif isinstance(obj[0], dict) and key in obj[0]:
yield "output", [item[key] for item in obj if key in item]
else:
yield "output", [getattr(val, key) for val in obj if hasattr(val, key)]
elif isinstance(obj, object) and isinstance(key, str) and hasattr(obj, key):
yield "output", getattr(obj, key)
else:
yield "missing", input_data.input
class RemoveFromDictionaryBlock(Block):
class Input(BlockSchema):
dictionary: dict[Any, Any] = SchemaField(
description="The dictionary to modify."
)
key: str | int = SchemaField(description="Key to remove from the dictionary.")
return_value: bool = SchemaField(
default=False, description="Whether to return the removed value."
)
class Output(BlockSchema):
updated_dictionary: dict[Any, Any] = SchemaField(
description="The dictionary after removal."
)
removed_value: Any = SchemaField(description="The removed value if requested.")
error: str = SchemaField(description="Error message if the operation failed.")
def __init__(self):
super().__init__(
id="46afe2ea-c613-43f8-95ff-6692c3ef6876",
description="Removes a key-value pair from a dictionary.",
categories={BlockCategory.BASIC},
input_schema=RemoveFromDictionaryBlock.Input,
output_schema=RemoveFromDictionaryBlock.Output,
test_input=[
{
"dictionary": {"a": 1, "b": 2, "c": 3},
"key": "b",
"return_value": True,
},
{"dictionary": {"x": "hello", "y": "world"}, "key": "x"},
],
test_output=[
("updated_dictionary", {"a": 1, "c": 3}),
("removed_value", 2),
("updated_dictionary", {"y": "world"}),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
updated_dict = input_data.dictionary.copy()
try:
removed_value = updated_dict.pop(input_data.key)
yield "updated_dictionary", updated_dict
if input_data.return_value:
yield "removed_value", removed_value
except KeyError:
yield "error", f"Key '{input_data.key}' not found in dictionary"
class ReplaceDictionaryValueBlock(Block):
class Input(BlockSchema):
dictionary: dict[Any, Any] = SchemaField(
description="The dictionary to modify."
)
key: str | int = SchemaField(description="Key to replace the value for.")
value: Any = SchemaField(description="The new value for the given key.")
class Output(BlockSchema):
updated_dictionary: dict[Any, Any] = SchemaField(
description="The dictionary after replacement."
)
old_value: Any = SchemaField(description="The value that was replaced.")
error: str = SchemaField(description="Error message if the operation failed.")
def __init__(self):
super().__init__(
id="27e31876-18b6-44f3-ab97-f6226d8b3889",
description="Replaces the value for a specified key in a dictionary.",
categories={BlockCategory.BASIC},
input_schema=ReplaceDictionaryValueBlock.Input,
output_schema=ReplaceDictionaryValueBlock.Output,
test_input=[
{"dictionary": {"a": 1, "b": 2, "c": 3}, "key": "b", "value": 99},
{
"dictionary": {"x": "hello", "y": "world"},
"key": "y",
"value": "universe",
},
],
test_output=[
("updated_dictionary", {"a": 1, "b": 99, "c": 3}),
("old_value", 2),
("updated_dictionary", {"x": "hello", "y": "universe"}),
("old_value", "world"),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
updated_dict = input_data.dictionary.copy()
try:
old_value = updated_dict[input_data.key]
updated_dict[input_data.key] = input_data.value
yield "updated_dictionary", updated_dict
yield "old_value", old_value
except KeyError:
yield "error", f"Key '{input_data.key}' not found in dictionary"
class DictionaryIsEmptyBlock(Block):
class Input(BlockSchema):
dictionary: dict[Any, Any] = SchemaField(description="The dictionary to check.")
class Output(BlockSchema):
is_empty: bool = SchemaField(description="True if the dictionary is empty.")
def __init__(self):
super().__init__(
id="a3cf3f64-6bb9-4cc6-9900-608a0b3359b0",
description="Checks if a dictionary is empty.",
categories={BlockCategory.BASIC},
input_schema=DictionaryIsEmptyBlock.Input,
output_schema=DictionaryIsEmptyBlock.Output,
test_input=[{"dictionary": {}}, {"dictionary": {"a": 1}}],
test_output=[("is_empty", True), ("is_empty", False)],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
yield "is_empty", len(input_data.dictionary) == 0
# =============================================================================
# List Manipulation Blocks
# =============================================================================
class CreateListBlock(Block):
class Input(BlockSchema):
values: List[Any] = SchemaField(
description="A list of values to be combined into a new list.",
placeholder="e.g., ['Alice', 25, True]",
)
max_size: int | None = SchemaField(
default=None,
description="Maximum size of the list. If provided, the list will be yielded in chunks of this size.",
advanced=True,
)
max_tokens: int | None = SchemaField(
default=None,
description="Maximum tokens for the list. If provided, the list will be yielded in chunks that fit within this token limit.",
advanced=True,
)
class Output(BlockSchema):
list: List[Any] = SchemaField(
description="The created list containing the specified values."
)
error: str = SchemaField(description="Error message if list creation failed.")
def __init__(self):
super().__init__(
id="a912d5c7-6e00-4542-b2a9-8034136930e4",
description="Creates a list with the specified values. Use this when you know all the values you want to add upfront. This block can also yield the list in batches based on a maximum size or token limit.",
categories={BlockCategory.DATA},
input_schema=CreateListBlock.Input,
output_schema=CreateListBlock.Output,
test_input=[
{
"values": ["Alice", 25, True],
},
{
"values": [1, 2, 3, "four", {"key": "value"}],
},
],
test_output=[
(
"list",
["Alice", 25, True],
),
(
"list",
[1, 2, 3, "four", {"key": "value"}],
),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
chunk = []
cur_tokens, max_tokens = 0, input_data.max_tokens
cur_size, max_size = 0, input_data.max_size
for value in input_data.values:
if max_tokens:
tokens = estimate_token_count_str(value)
else:
tokens = 0
# Check if adding this value would exceed either limit
if (max_tokens and (cur_tokens + tokens > max_tokens)) or (
max_size and (cur_size + 1 > max_size)
):
yield "list", chunk
chunk = [value]
cur_size, cur_tokens = 1, tokens
else:
chunk.append(value)
cur_size, cur_tokens = cur_size + 1, cur_tokens + tokens
# Yield final chunk if any
if chunk or not input_data.values:
yield "list", chunk
class AddToListBlock(Block):
class Input(BlockSchema):
list: List[Any] = SchemaField(
default_factory=list,
advanced=False,
description="The list to add the entry to. If not provided, a new list will be created.",
)
entry: Any = SchemaField(
description="The entry to add to the list. Can be of any type (string, int, dict, etc.).",
advanced=False,
default=None,
)
entries: List[Any] = SchemaField(
default_factory=lambda: list(),
description="The entries to add to the list. This is the batch version of the `entry` field.",
advanced=True,
)
position: int | None = SchemaField(
default=None,
description="The position to insert the new entry. If not provided, the entry will be appended to the end of the list.",
)
class Output(BlockSchema):
updated_list: List[Any] = SchemaField(
description="The list with the new entry added."
)
error: str = SchemaField(description="Error message if the operation failed.")
def __init__(self):
super().__init__(
id="aeb08fc1-2fc1-4141-bc8e-f758f183a822",
description="Adds a new entry to a list. The entry can be of any type. If no list is provided, a new one is created.",
categories={BlockCategory.BASIC},
input_schema=AddToListBlock.Input,
output_schema=AddToListBlock.Output,
test_input=[
{
"list": [1, "string", {"existing_key": "existing_value"}],
"entry": {"new_key": "new_value"},
"position": 1,
},
{"entry": "first_entry"},
{"list": ["a", "b", "c"], "entry": "d"},
{
"entry": "e",
"entries": ["f", "g"],
"list": ["a", "b"],
"position": 1,
},
],
test_output=[
(
"updated_list",
[
1,
{"new_key": "new_value"},
"string",
{"existing_key": "existing_value"},
],
),
("updated_list", ["first_entry"]),
("updated_list", ["a", "b", "c", "d"]),
("updated_list", ["a", "f", "g", "e", "b"]),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
entries_added = input_data.entries.copy()
if input_data.entry:
entries_added.append(input_data.entry)
updated_list = input_data.list.copy()
if (pos := input_data.position) is not None:
updated_list = updated_list[:pos] + entries_added + updated_list[pos:]
else:
updated_list += entries_added
yield "updated_list", updated_list
class FindInListBlock(Block):
class Input(BlockSchema):
list: List[Any] = SchemaField(description="The list to search in.")
value: Any = SchemaField(description="The value to search for.")
class Output(BlockSchema):
index: int = SchemaField(description="The index of the value in the list.")
found: bool = SchemaField(
description="Whether the value was found in the list."
)
not_found_value: Any = SchemaField(
description="The value that was not found in the list."
)
def __init__(self):
super().__init__(
id="5e2c6d0a-1e37-489f-b1d0-8e1812b23333",
description="Finds the index of the value in the list.",
categories={BlockCategory.BASIC},
input_schema=FindInListBlock.Input,
output_schema=FindInListBlock.Output,
test_input=[
{"list": [1, 2, 3, 4, 5], "value": 3},
{"list": [1, 2, 3, 4, 5], "value": 6},
],
test_output=[
("index", 2),
("found", True),
("found", False),
("not_found_value", 6),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
yield "index", input_data.list.index(input_data.value)
yield "found", True
except ValueError:
yield "found", False
yield "not_found_value", input_data.value
class GetListItemBlock(Block):
class Input(BlockSchema):
list: List[Any] = SchemaField(description="The list to get the item from.")
index: int = SchemaField(
description="The 0-based index of the item (supports negative indices)."
)
class Output(BlockSchema):
item: Any = SchemaField(description="The item at the specified index.")
error: str = SchemaField(description="Error message if the operation failed.")
def __init__(self):
super().__init__(
id="262ca24c-1025-43cf-a578-534e23234e97",
description="Returns the element at the given index.",
categories={BlockCategory.BASIC},
input_schema=GetListItemBlock.Input,
output_schema=GetListItemBlock.Output,
test_input=[
{"list": [1, 2, 3], "index": 1},
{"list": [1, 2, 3], "index": -1},
],
test_output=[
("item", 2),
("item", 3),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
try:
yield "item", input_data.list[input_data.index]
except IndexError:
yield "error", "Index out of range"
class RemoveFromListBlock(Block):
class Input(BlockSchema):
list: List[Any] = SchemaField(description="The list to modify.")
value: Any = SchemaField(
default=None, description="Value to remove from the list."
)
index: int | None = SchemaField(
default=None,
description="Index of the item to pop (supports negative indices).",
)
return_item: bool = SchemaField(
default=False, description="Whether to return the removed item."
)
class Output(BlockSchema):
updated_list: List[Any] = SchemaField(description="The list after removal.")
removed_item: Any = SchemaField(description="The removed item if requested.")
error: str = SchemaField(description="Error message if the operation failed.")
def __init__(self):
super().__init__(
id="d93c5a93-ac7e-41c1-ae5c-ef67e6e9b826",
description="Removes an item from a list by value or index.",
categories={BlockCategory.BASIC},
input_schema=RemoveFromListBlock.Input,
output_schema=RemoveFromListBlock.Output,
test_input=[
{"list": [1, 2, 3], "index": 1, "return_item": True},
{"list": ["a", "b", "c"], "value": "b"},
],
test_output=[
("updated_list", [1, 3]),
("removed_item", 2),
("updated_list", ["a", "c"]),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
lst = input_data.list.copy()
removed = None
try:
if input_data.index is not None:
removed = lst.pop(input_data.index)
elif input_data.value is not None:
lst.remove(input_data.value)
removed = input_data.value
else:
raise ValueError("No index or value provided for removal")
except (IndexError, ValueError):
yield "error", "Index or value not found"
return
yield "updated_list", lst
if input_data.return_item:
yield "removed_item", removed
class ReplaceListItemBlock(Block):
class Input(BlockSchema):
list: List[Any] = SchemaField(description="The list to modify.")
index: int = SchemaField(
description="Index of the item to replace (supports negative indices)."
)
value: Any = SchemaField(description="The new value for the given index.")
class Output(BlockSchema):
updated_list: List[Any] = SchemaField(description="The list after replacement.")
old_item: Any = SchemaField(description="The item that was replaced.")
error: str = SchemaField(description="Error message if the operation failed.")
def __init__(self):
super().__init__(
id="fbf62922-bea1-4a3d-8bac-23587f810b38",
description="Replaces an item at the specified index.",
categories={BlockCategory.BASIC},
input_schema=ReplaceListItemBlock.Input,
output_schema=ReplaceListItemBlock.Output,
test_input=[
{"list": [1, 2, 3], "index": 1, "value": 99},
{"list": ["a", "b"], "index": -1, "value": "c"},
],
test_output=[
("updated_list", [1, 99, 3]),
("old_item", 2),
("updated_list", ["a", "c"]),
("old_item", "b"),
],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
lst = input_data.list.copy()
try:
old = lst[input_data.index]
lst[input_data.index] = input_data.value
except IndexError:
yield "error", "Index out of range"
return
yield "updated_list", lst
yield "old_item", old
class ListIsEmptyBlock(Block):
class Input(BlockSchema):
list: List[Any] = SchemaField(description="The list to check.")
class Output(BlockSchema):
is_empty: bool = SchemaField(description="True if the list is empty.")
def __init__(self):
super().__init__(
id="896ed73b-27d0-41be-813c-c1c1dc856c03",
description="Checks if a list is empty.",
categories={BlockCategory.BASIC},
input_schema=ListIsEmptyBlock.Input,
output_schema=ListIsEmptyBlock.Output,
test_input=[{"list": []}, {"list": [1]}],
test_output=[("is_empty", True), ("is_empty", False)],
)
async def run(self, input_data: Input, **kwargs) -> BlockOutput:
yield "is_empty", len(input_data.list) == 0

View File

@@ -498,6 +498,9 @@ class GithubListIssuesBlock(Block):
issue: IssueItem = SchemaField(
title="Issue", description="Issues with their title and URL"
)
issues: list[IssueItem] = SchemaField(
description="List of issues with their title and URL"
)
error: str = SchemaField(description="Error message if listing issues failed")
def __init__(self):
@@ -513,13 +516,22 @@ class GithubListIssuesBlock(Block):
},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"issues",
[
{
"title": "Issue 1",
"url": "https://github.com/owner/repo/issues/1",
}
],
),
(
"issue",
{
"title": "Issue 1",
"url": "https://github.com/owner/repo/issues/1",
},
)
),
],
test_mock={
"list_issues": lambda *args, **kwargs: [
@@ -551,10 +563,12 @@ class GithubListIssuesBlock(Block):
credentials: GithubCredentials,
**kwargs,
) -> BlockOutput:
for issue in await self.list_issues(
issues = await self.list_issues(
credentials,
input_data.repo_url,
):
)
yield "issues", issues
for issue in issues:
yield "issue", issue

View File

@@ -31,7 +31,12 @@ class GithubListPullRequestsBlock(Block):
pull_request: PRItem = SchemaField(
title="Pull Request", description="PRs with their title and URL"
)
error: str = SchemaField(description="Error message if listing issues failed")
pull_requests: list[PRItem] = SchemaField(
description="List of pull requests with their title and URL"
)
error: str = SchemaField(
description="Error message if listing pull requests failed"
)
def __init__(self):
super().__init__(
@@ -46,13 +51,22 @@ class GithubListPullRequestsBlock(Block):
},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"pull_requests",
[
{
"title": "Pull request 1",
"url": "https://github.com/owner/repo/pull/1",
}
],
),
(
"pull_request",
{
"title": "Pull request 1",
"url": "https://github.com/owner/repo/pull/1",
},
)
),
],
test_mock={
"list_prs": lambda *args, **kwargs: [
@@ -88,6 +102,7 @@ class GithubListPullRequestsBlock(Block):
credentials,
input_data.repo_url,
)
yield "pull_requests", pull_requests
for pr in pull_requests:
yield "pull_request", pr
@@ -460,6 +475,9 @@ class GithubListPRReviewersBlock(Block):
title="Reviewer",
description="Reviewers with their username and profile URL",
)
reviewers: list[ReviewerItem] = SchemaField(
description="List of reviewers with their username and profile URL"
)
error: str = SchemaField(
description="Error message if listing reviewers failed"
)
@@ -477,13 +495,22 @@ class GithubListPRReviewersBlock(Block):
},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"reviewers",
[
{
"username": "reviewer1",
"url": "https://github.com/reviewer1",
}
],
),
(
"reviewer",
{
"username": "reviewer1",
"url": "https://github.com/reviewer1",
},
)
),
],
test_mock={
"list_reviewers": lambda *args, **kwargs: [
@@ -516,10 +543,12 @@ class GithubListPRReviewersBlock(Block):
credentials: GithubCredentials,
**kwargs,
) -> BlockOutput:
for reviewer in await self.list_reviewers(
reviewers = await self.list_reviewers(
credentials,
input_data.pr_url,
):
)
yield "reviewers", reviewers
for reviewer in reviewers:
yield "reviewer", reviewer

View File

@@ -31,6 +31,9 @@ class GithubListTagsBlock(Block):
tag: TagItem = SchemaField(
title="Tag", description="Tags with their name and file tree browser URL"
)
tags: list[TagItem] = SchemaField(
description="List of tags with their name and file tree browser URL"
)
error: str = SchemaField(description="Error message if listing tags failed")
def __init__(self):
@@ -46,13 +49,22 @@ class GithubListTagsBlock(Block):
},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"tags",
[
{
"name": "v1.0.0",
"url": "https://github.com/owner/repo/tree/v1.0.0",
}
],
),
(
"tag",
{
"name": "v1.0.0",
"url": "https://github.com/owner/repo/tree/v1.0.0",
},
)
),
],
test_mock={
"list_tags": lambda *args, **kwargs: [
@@ -93,6 +105,7 @@ class GithubListTagsBlock(Block):
credentials,
input_data.repo_url,
)
yield "tags", tags
for tag in tags:
yield "tag", tag
@@ -114,6 +127,9 @@ class GithubListBranchesBlock(Block):
title="Branch",
description="Branches with their name and file tree browser URL",
)
branches: list[BranchItem] = SchemaField(
description="List of branches with their name and file tree browser URL"
)
error: str = SchemaField(description="Error message if listing branches failed")
def __init__(self):
@@ -129,13 +145,22 @@ class GithubListBranchesBlock(Block):
},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"branches",
[
{
"name": "main",
"url": "https://github.com/owner/repo/tree/main",
}
],
),
(
"branch",
{
"name": "main",
"url": "https://github.com/owner/repo/tree/main",
},
)
),
],
test_mock={
"list_branches": lambda *args, **kwargs: [
@@ -176,6 +201,7 @@ class GithubListBranchesBlock(Block):
credentials,
input_data.repo_url,
)
yield "branches", branches
for branch in branches:
yield "branch", branch
@@ -199,6 +225,9 @@ class GithubListDiscussionsBlock(Block):
discussion: DiscussionItem = SchemaField(
title="Discussion", description="Discussions with their title and URL"
)
discussions: list[DiscussionItem] = SchemaField(
description="List of discussions with their title and URL"
)
error: str = SchemaField(
description="Error message if listing discussions failed"
)
@@ -217,13 +246,22 @@ class GithubListDiscussionsBlock(Block):
},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"discussions",
[
{
"title": "Discussion 1",
"url": "https://github.com/owner/repo/discussions/1",
}
],
),
(
"discussion",
{
"title": "Discussion 1",
"url": "https://github.com/owner/repo/discussions/1",
},
)
),
],
test_mock={
"list_discussions": lambda *args, **kwargs: [
@@ -279,6 +317,7 @@ class GithubListDiscussionsBlock(Block):
input_data.repo_url,
input_data.num_discussions,
)
yield "discussions", discussions
for discussion in discussions:
yield "discussion", discussion
@@ -300,6 +339,9 @@ class GithubListReleasesBlock(Block):
title="Release",
description="Releases with their name and file tree browser URL",
)
releases: list[ReleaseItem] = SchemaField(
description="List of releases with their name and file tree browser URL"
)
error: str = SchemaField(description="Error message if listing releases failed")
def __init__(self):
@@ -315,13 +357,22 @@ class GithubListReleasesBlock(Block):
},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"releases",
[
{
"name": "v1.0.0",
"url": "https://github.com/owner/repo/releases/tag/v1.0.0",
}
],
),
(
"release",
{
"name": "v1.0.0",
"url": "https://github.com/owner/repo/releases/tag/v1.0.0",
},
)
),
],
test_mock={
"list_releases": lambda *args, **kwargs: [
@@ -357,6 +408,7 @@ class GithubListReleasesBlock(Block):
credentials,
input_data.repo_url,
)
yield "releases", releases
for release in releases:
yield "release", release
@@ -1041,6 +1093,9 @@ class GithubListStargazersBlock(Block):
title="Stargazer",
description="Stargazers with their username and profile URL",
)
stargazers: list[StargazerItem] = SchemaField(
description="List of stargazers with their username and profile URL"
)
error: str = SchemaField(
description="Error message if listing stargazers failed"
)
@@ -1058,13 +1113,22 @@ class GithubListStargazersBlock(Block):
},
test_credentials=TEST_CREDENTIALS,
test_output=[
(
"stargazers",
[
{
"username": "octocat",
"url": "https://github.com/octocat",
}
],
),
(
"stargazer",
{
"username": "octocat",
"url": "https://github.com/octocat",
},
)
),
],
test_mock={
"list_stargazers": lambda *args, **kwargs: [
@@ -1104,5 +1168,6 @@ class GithubListStargazersBlock(Block):
credentials,
input_data.repo_url,
)
yield "stargazers", stargazers
for stargazer in stargazers:
yield "stargazer", stargazer

File diff suppressed because it is too large Load Diff

View File

@@ -67,17 +67,19 @@ class AddMemoryBlock(Block, Mem0Base):
metadata: dict[str, Any] = SchemaField(
description="Optional metadata for the memory", default_factory=dict
)
limit_memory_to_run: bool = SchemaField(
description="Limit the memory to the run", default=False
)
limit_memory_to_agent: bool = SchemaField(
description="Limit the memory to the agent", default=False
description="Limit the memory to the agent", default=True
)
class Output(BlockSchema):
action: str = SchemaField(description="Action of the operation")
memory: str = SchemaField(description="Memory created")
results: list[dict[str, str]] = SchemaField(
description="List of all results from the operation"
)
error: str = SchemaField(description="Error message if operation fails")
def __init__(self):
@@ -104,7 +106,14 @@ class AddMemoryBlock(Block, Mem0Base):
"credentials": TEST_CREDENTIALS_INPUT,
},
],
test_output=[("action", "NO_CHANGE"), ("action", "NO_CHANGE")],
test_output=[
("results", [{"event": "CREATED", "memory": "test memory"}]),
("action", "CREATED"),
("memory", "test memory"),
("results", [{"event": "CREATED", "memory": "test memory"}]),
("action", "CREATED"),
("memory", "test memory"),
],
test_credentials=TEST_CREDENTIALS,
test_mock={"_get_client": lambda credentials: MockMemoryClient()},
)
@@ -117,7 +126,7 @@ class AddMemoryBlock(Block, Mem0Base):
user_id: str,
graph_id: str,
graph_exec_id: str,
**kwargs
**kwargs,
) -> BlockOutput:
try:
client = self._get_client(credentials)
@@ -146,8 +155,11 @@ class AddMemoryBlock(Block, Mem0Base):
**params,
)
if len(result.get("results", [])) > 0:
for result in result.get("results", []):
results = result.get("results", [])
yield "results", results
if len(results) > 0:
for result in results:
yield "action", result["event"]
yield "memory", result["memory"]
else:
@@ -178,6 +190,10 @@ class SearchMemoryBlock(Block, Mem0Base):
default_factory=list,
advanced=True,
)
metadata_filter: Optional[dict[str, Any]] = SchemaField(
description="Optional metadata filters to apply",
default=None,
)
limit_memory_to_run: bool = SchemaField(
description="Limit the memory to the run", default=False
)
@@ -216,7 +232,7 @@ class SearchMemoryBlock(Block, Mem0Base):
user_id: str,
graph_id: str,
graph_exec_id: str,
**kwargs
**kwargs,
) -> BlockOutput:
try:
client = self._get_client(credentials)
@@ -235,6 +251,8 @@ class SearchMemoryBlock(Block, Mem0Base):
filters["AND"].append({"run_id": graph_exec_id})
if input_data.limit_memory_to_agent:
filters["AND"].append({"agent_id": graph_id})
if input_data.metadata_filter:
filters["AND"].append({"metadata": input_data.metadata_filter})
result: list[dict[str, Any]] = client.search(
input_data.query, version="v2", filters=filters
@@ -260,11 +278,15 @@ class GetAllMemoriesBlock(Block, Mem0Base):
categories: Optional[list[str]] = SchemaField(
description="Filter by categories", default=None
)
metadata_filter: Optional[dict[str, Any]] = SchemaField(
description="Optional metadata filters to apply",
default=None,
)
limit_memory_to_run: bool = SchemaField(
description="Limit the memory to the run", default=False
)
limit_memory_to_agent: bool = SchemaField(
description="Limit the memory to the agent", default=False
description="Limit the memory to the agent", default=True
)
class Output(BlockSchema):
@@ -274,11 +296,11 @@ class GetAllMemoriesBlock(Block, Mem0Base):
def __init__(self):
super().__init__(
id="45aee5bf-4767-45d1-a28b-e01c5aae9fc1",
description="Retrieve all memories from Mem0 with pagination",
description="Retrieve all memories from Mem0 with optional conversation filtering",
input_schema=GetAllMemoriesBlock.Input,
output_schema=GetAllMemoriesBlock.Output,
test_input={
"user_id": "test_user",
"metadata_filter": {"type": "test"},
"credentials": TEST_CREDENTIALS_INPUT,
},
test_output=[
@@ -296,7 +318,7 @@ class GetAllMemoriesBlock(Block, Mem0Base):
user_id: str,
graph_id: str,
graph_exec_id: str,
**kwargs
**kwargs,
) -> BlockOutput:
try:
client = self._get_client(credentials)
@@ -314,6 +336,8 @@ class GetAllMemoriesBlock(Block, Mem0Base):
filters["AND"].append(
{"categories": {"contains": input_data.categories}}
)
if input_data.metadata_filter:
filters["AND"].append({"metadata": input_data.metadata_filter})
memories: list[dict[str, Any]] = client.get_all(
filters=filters,
@@ -326,14 +350,116 @@ class GetAllMemoriesBlock(Block, Mem0Base):
yield "error", str(e)
class GetLatestMemoryBlock(Block, Mem0Base):
"""Block for retrieving the latest memory from Mem0"""
class Input(BlockSchema):
credentials: CredentialsMetaInput[
Literal[ProviderName.MEM0], Literal["api_key"]
] = CredentialsField(description="Mem0 API key credentials")
trigger: bool = SchemaField(
description="An unused field that is used to trigger the block when you have no other inputs",
default=False,
advanced=False,
)
categories: Optional[list[str]] = SchemaField(
description="Filter by categories", default=None
)
conversation_id: Optional[str] = SchemaField(
description="Optional conversation ID to retrieve the latest memory from (uses run_id)",
default=None,
)
metadata_filter: Optional[dict[str, Any]] = SchemaField(
description="Optional metadata filters to apply",
default=None,
)
limit_memory_to_run: bool = SchemaField(
description="Limit the memory to the run", default=False
)
limit_memory_to_agent: bool = SchemaField(
description="Limit the memory to the agent", default=True
)
class Output(BlockSchema):
memory: Optional[dict[str, Any]] = SchemaField(
description="Latest memory if found"
)
found: bool = SchemaField(description="Whether a memory was found")
error: str = SchemaField(description="Error message if operation fails")
def __init__(self):
super().__init__(
id="0f9d81b5-a145-4c23-b87f-01d6bf37b677",
description="Retrieve the latest memory from Mem0 with optional key filtering",
input_schema=GetLatestMemoryBlock.Input,
output_schema=GetLatestMemoryBlock.Output,
test_input={
"metadata_filter": {"type": "test"},
"credentials": TEST_CREDENTIALS_INPUT,
},
test_output=[
("memory", {"id": "test-memory", "content": "test content"}),
("found", True),
],
test_credentials=TEST_CREDENTIALS,
test_mock={"_get_client": lambda credentials: MockMemoryClient()},
)
async def run(
self,
input_data: Input,
*,
credentials: APIKeyCredentials,
user_id: str,
graph_id: str,
graph_exec_id: str,
**kwargs,
) -> BlockOutput:
try:
client = self._get_client(credentials)
filters: Filter = {
"AND": [
{"user_id": user_id},
]
}
if input_data.limit_memory_to_run:
filters["AND"].append({"run_id": graph_exec_id})
if input_data.limit_memory_to_agent:
filters["AND"].append({"agent_id": graph_id})
if input_data.categories:
filters["AND"].append(
{"categories": {"contains": input_data.categories}}
)
if input_data.metadata_filter:
filters["AND"].append({"metadata": input_data.metadata_filter})
memories: list[dict[str, Any]] = client.get_all(
filters=filters,
version="v2",
)
if memories:
# Return the latest memory (first in the list as they're sorted by recency)
latest_memory = memories[0]
yield "memory", latest_memory
yield "found", True
else:
yield "memory", None
yield "found", False
except Exception as e:
yield "error", str(e)
# Mock client for testing
class MockMemoryClient:
"""Mock Mem0 client for testing"""
def add(self, *args, **kwargs):
return {"memory_id": "test-memory-id", "status": "success"}
return {"results": [{"event": "CREATED", "memory": "test memory"}]}
def search(self, *args, **kwargs) -> list[dict[str, str]]:
def search(self, *args, **kwargs) -> list[dict[str, Any]]:
return [{"id": "test-memory", "content": "test content"}]
def get_all(self, *args, **kwargs) -> list[dict[str, str]]:

View File

@@ -0,0 +1,155 @@
import logging
from typing import Any, Literal
from autogpt_libs.utils.cache import thread_cached
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField
logger = logging.getLogger(__name__)
@thread_cached
def get_database_manager_client():
from backend.executor import DatabaseManagerAsyncClient
from backend.util.service import get_service_client
return get_service_client(DatabaseManagerAsyncClient, health_check=False)
StorageScope = Literal["within_agent", "across_agents"]
def get_storage_key(key: str, scope: StorageScope, graph_id: str) -> str:
"""Generate the storage key based on scope"""
if scope == "across_agents":
return f"global#{key}"
else:
return f"agent#{graph_id}#{key}"
class PersistInformationBlock(Block):
"""Block for persisting key-value data for the current user with configurable scope"""
class Input(BlockSchema):
key: str = SchemaField(description="Key to store the information under")
value: Any = SchemaField(description="Value to store")
scope: StorageScope = SchemaField(
description="Scope of persistence: within_agent (shared across all runs of this agent) or across_agents (shared across all agents for this user)",
default="within_agent",
)
class Output(BlockSchema):
value: Any = SchemaField(description="Value that was stored")
def __init__(self):
super().__init__(
id="1d055e55-a2b9-4547-8311-907d05b0304d",
description="Persist key-value information for the current user",
categories={BlockCategory.DATA},
input_schema=PersistInformationBlock.Input,
output_schema=PersistInformationBlock.Output,
test_input={
"key": "user_preference",
"value": {"theme": "dark", "language": "en"},
"scope": "within_agent",
},
test_output=[
("value", {"theme": "dark", "language": "en"}),
],
test_mock={
"_store_data": lambda *args, **kwargs: {
"theme": "dark",
"language": "en",
}
},
)
async def run(
self,
input_data: Input,
*,
user_id: str,
graph_id: str,
node_exec_id: str,
**kwargs,
) -> BlockOutput:
# Determine the storage key based on scope
storage_key = get_storage_key(input_data.key, input_data.scope, graph_id)
# Store the data
yield "value", await self._store_data(
user_id=user_id,
node_exec_id=node_exec_id,
key=storage_key,
data=input_data.value,
)
async def _store_data(
self, user_id: str, node_exec_id: str, key: str, data: Any
) -> Any | None:
return await get_database_manager_client().set_execution_kv_data(
user_id=user_id,
node_exec_id=node_exec_id,
key=key,
data=data,
)
class RetrieveInformationBlock(Block):
"""Block for retrieving key-value data for the current user with configurable scope"""
class Input(BlockSchema):
key: str = SchemaField(description="Key to retrieve the information for")
scope: StorageScope = SchemaField(
description="Scope of persistence: within_agent (shared across all runs of this agent) or across_agents (shared across all agents for this user)",
default="within_agent",
)
default_value: Any = SchemaField(
description="Default value to return if key is not found", default=None
)
class Output(BlockSchema):
value: Any = SchemaField(description="Retrieved value or default value")
def __init__(self):
super().__init__(
id="d8710fc9-6e29-481e-a7d5-165eb16f8471",
description="Retrieve key-value information for the current user",
categories={BlockCategory.DATA},
input_schema=RetrieveInformationBlock.Input,
output_schema=RetrieveInformationBlock.Output,
test_input={
"key": "user_preference",
"scope": "within_agent",
"default_value": {"theme": "light", "language": "en"},
},
test_output=[
("value", {"theme": "light", "language": "en"}),
],
test_mock={"_retrieve_data": lambda *args, **kwargs: None},
static_output=True,
)
async def run(
self, input_data: Input, *, user_id: str, graph_id: str, **kwargs
) -> BlockOutput:
# Determine the storage key based on scope
storage_key = get_storage_key(input_data.key, input_data.scope, graph_id)
# Retrieve the data
stored_value = await self._retrieve_data(
user_id=user_id,
key=storage_key,
)
if stored_value is not None:
yield "value", stored_value
else:
yield "value", input_data.default_value
async def _retrieve_data(self, user_id: str, key: str) -> Any | None:
return await get_database_manager_client().get_execution_kv_data(
user_id=user_id,
key=key,
)

View File

@@ -96,6 +96,7 @@ class GetRedditPostsBlock(Block):
class Output(BlockSchema):
post: RedditPost = SchemaField(description="Reddit post")
posts: list[RedditPost] = SchemaField(description="List of all Reddit posts")
def __init__(self):
super().__init__(
@@ -128,6 +129,23 @@ class GetRedditPostsBlock(Block):
id="id2", subreddit="subreddit", title="title2", body="body2"
),
),
(
"posts",
[
RedditPost(
id="id1",
subreddit="subreddit",
title="title1",
body="body1",
),
RedditPost(
id="id2",
subreddit="subreddit",
title="title2",
body="body2",
),
],
),
],
test_mock={
"get_posts": lambda input_data, credentials: [
@@ -150,6 +168,7 @@ class GetRedditPostsBlock(Block):
self, input_data: Input, *, credentials: RedditCredentials, **kwargs
) -> BlockOutput:
current_time = datetime.now(tz=timezone.utc)
all_posts = []
for post in self.get_posts(input_data=input_data, credentials=credentials):
if input_data.last_minutes:
post_datetime = datetime.fromtimestamp(
@@ -162,12 +181,16 @@ class GetRedditPostsBlock(Block):
if input_data.last_post and post.id == input_data.last_post:
break
yield "post", RedditPost(
reddit_post = RedditPost(
id=post.id,
subreddit=input_data.subreddit,
title=post.title,
body=post.selftext,
)
all_posts.append(reddit_post)
yield "post", reddit_post
yield "posts", all_posts
class PostRedditCommentBlock(Block):

View File

@@ -40,6 +40,7 @@ class ReadRSSFeedBlock(Block):
class Output(BlockSchema):
entry: RSSEntry = SchemaField(description="The RSS item")
entries: list[RSSEntry] = SchemaField(description="List of all RSS entries")
def __init__(self):
super().__init__(
@@ -66,6 +67,21 @@ class ReadRSSFeedBlock(Block):
categories=["Technology", "News"],
),
),
(
"entries",
[
RSSEntry(
title="Example RSS Item",
link="https://example.com/article",
description="This is an example RSS item description.",
pub_date=datetime(
2023, 6, 23, 12, 30, 0, tzinfo=timezone.utc
),
author="John Doe",
categories=["Technology", "News"],
),
],
),
],
test_mock={
"parse_feed": lambda *args, **kwargs: {
@@ -96,21 +112,22 @@ class ReadRSSFeedBlock(Block):
keep_going = input_data.run_continuously
feed = self.parse_feed(input_data.rss_url)
all_entries = []
for entry in feed["entries"]:
pub_date = datetime(*entry["published_parsed"][:6], tzinfo=timezone.utc)
if pub_date > start_time:
yield (
"entry",
RSSEntry(
title=entry["title"],
link=entry["link"],
description=entry.get("summary", ""),
pub_date=pub_date,
author=entry.get("author", ""),
categories=[tag["term"] for tag in entry.get("tags", [])],
),
rss_entry = RSSEntry(
title=entry["title"],
link=entry["link"],
description=entry.get("summary", ""),
pub_date=pub_date,
author=entry.get("author", ""),
categories=[tag["term"] for tag in entry.get("tags", [])],
)
all_entries.append(rss_entry)
yield "entry", rss_entry
yield "entries", all_entries
await asyncio.sleep(input_data.polling_rate)

View File

@@ -26,10 +26,10 @@ logger = logging.getLogger(__name__)
@thread_cached
def get_database_manager_client():
from backend.executor import DatabaseManagerClient
from backend.executor import DatabaseManagerAsyncClient
from backend.util.service import get_service_client
return get_service_client(DatabaseManagerClient)
return get_service_client(DatabaseManagerAsyncClient, health_check=False)
def _get_tool_requests(entry: dict[str, Any]) -> list[str]:
@@ -273,7 +273,7 @@ class SmartDecisionMakerBlock(Block):
return re.sub(r"[^a-zA-Z0-9_-]", "_", s).lower()
@staticmethod
def _create_block_function_signature(
async def _create_block_function_signature(
sink_node: "Node", links: list["Link"]
) -> dict[str, Any]:
"""
@@ -312,7 +312,7 @@ class SmartDecisionMakerBlock(Block):
return {"type": "function", "function": tool_function}
@staticmethod
def _create_agent_function_signature(
async def _create_agent_function_signature(
sink_node: "Node", links: list["Link"]
) -> dict[str, Any]:
"""
@@ -334,7 +334,7 @@ class SmartDecisionMakerBlock(Block):
raise ValueError("Graph ID or Graph Version not found in sink node.")
db_client = get_database_manager_client()
sink_graph_meta = db_client.get_graph_metadata(graph_id, graph_version)
sink_graph_meta = await db_client.get_graph_metadata(graph_id, graph_version)
if not sink_graph_meta:
raise ValueError(
f"Sink graph metadata not found: {graph_id} {graph_version}"
@@ -374,7 +374,7 @@ class SmartDecisionMakerBlock(Block):
return {"type": "function", "function": tool_function}
@staticmethod
def _create_function_signature(node_id: str) -> list[dict[str, Any]]:
async def _create_function_signature(node_id: str) -> list[dict[str, Any]]:
"""
Creates function signatures for tools linked to a specified node within a graph.
@@ -396,13 +396,13 @@ class SmartDecisionMakerBlock(Block):
db_client = get_database_manager_client()
tools = [
(link, node)
for link, node in db_client.get_connected_output_nodes(node_id)
for link, node in await db_client.get_connected_output_nodes(node_id)
if link.source_name.startswith("tools_^_") and link.source_id == node_id
]
if not tools:
raise ValueError("There is no next node to execute.")
return_tool_functions = []
return_tool_functions: list[dict[str, Any]] = []
grouped_tool_links: dict[str, tuple["Node", list["Link"]]] = {}
for link, node in tools:
@@ -417,13 +417,13 @@ class SmartDecisionMakerBlock(Block):
if sink_node.block_id == AgentExecutorBlock().id:
return_tool_functions.append(
SmartDecisionMakerBlock._create_agent_function_signature(
await SmartDecisionMakerBlock._create_agent_function_signature(
sink_node, links
)
)
else:
return_tool_functions.append(
SmartDecisionMakerBlock._create_block_function_signature(
await SmartDecisionMakerBlock._create_block_function_signature(
sink_node, links
)
)
@@ -442,7 +442,7 @@ class SmartDecisionMakerBlock(Block):
user_id: str,
**kwargs,
) -> BlockOutput:
tool_functions = self._create_function_signature(node_id)
tool_functions = await self._create_function_signature(node_id)
yield "tool_functions", json.dumps(tool_functions)
input_data.conversation_history = input_data.conversation_history or []

View File

@@ -161,7 +161,7 @@ async def test_smart_decision_maker_function_signature(server: SpinTestServer):
)
test_graph = await create_graph(server, test_graph, test_user)
tool_functions = SmartDecisionMakerBlock._create_function_signature(
tool_functions = await SmartDecisionMakerBlock._create_function_signature(
test_graph.nodes[0].id
)
assert tool_functions is not None, "Tool functions should not be None"

View File

@@ -22,6 +22,7 @@ from prisma.models import (
AgentGraphExecution,
AgentNodeExecution,
AgentNodeExecutionInputOutput,
AgentNodeExecutionKeyValueData,
)
from prisma.types import (
AgentGraphExecutionCreateInput,
@@ -29,6 +30,7 @@ from prisma.types import (
AgentGraphExecutionWhereInput,
AgentNodeExecutionCreateInput,
AgentNodeExecutionInputOutputCreateInput,
AgentNodeExecutionKeyValueDataCreateInput,
AgentNodeExecutionUpdateInput,
AgentNodeExecutionWhereInput,
)
@@ -907,3 +909,57 @@ class AsyncRedisExecutionEventBus(AsyncRedisEventBus[ExecutionEvent]):
) -> AsyncGenerator[ExecutionEvent, None]:
async for event in self.listen_events(f"{user_id}/{graph_id}/{graph_exec_id}"):
yield event
# --------------------- KV Data Functions --------------------- #
async def get_execution_kv_data(user_id: str, key: str) -> Any | None:
"""
Get key-value data for a user and key.
Args:
user_id: The id of the User.
key: The key to retrieve data for.
Returns:
The data associated with the key, or None if not found.
"""
kv_data = await AgentNodeExecutionKeyValueData.prisma().find_unique(
where={"userId_key": {"userId": user_id, "key": key}}
)
return (
type_utils.convert(kv_data.data, type[Any])
if kv_data and kv_data.data
else None
)
async def set_execution_kv_data(
user_id: str, node_exec_id: str, key: str, data: Any
) -> Any | None:
"""
Set key-value data for a user and key.
Args:
user_id: The id of the User.
node_exec_id: The id of the AgentNodeExecution.
key: The key to store data under.
data: The data to store.
"""
resp = await AgentNodeExecutionKeyValueData.prisma().upsert(
where={"userId_key": {"userId": user_id, "key": key}},
data={
"create": AgentNodeExecutionKeyValueDataCreateInput(
userId=user_id,
agentNodeExecutionId=node_exec_id,
key=key,
data=Json(data) if data is not None else None,
),
"update": {
"agentNodeExecutionId": node_exec_id,
"data": Json(data) if data is not None else None,
},
},
)
return type_utils.convert(resp.data, type[Any]) if resp and resp.data else None

View File

@@ -5,12 +5,14 @@ from backend.data import db
from backend.data.credit import UsageTransactionMetadata, get_user_credit_model
from backend.data.execution import (
create_graph_execution,
get_execution_kv_data,
get_graph_execution,
get_graph_execution_meta,
get_graph_executions,
get_latest_node_execution,
get_node_execution,
get_node_executions,
set_execution_kv_data,
update_graph_execution_start_time,
update_graph_execution_stats,
update_node_execution_stats,
@@ -101,6 +103,8 @@ class DatabaseManager(AppService):
update_node_execution_stats = _(update_node_execution_stats)
upsert_execution_input = _(upsert_execution_input)
upsert_execution_output = _(upsert_execution_output)
get_execution_kv_data = _(get_execution_kv_data)
set_execution_kv_data = _(set_execution_kv_data)
# Graphs
get_node = _(get_node)
@@ -159,6 +163,8 @@ class DatabaseManagerClient(AppServiceClient):
update_node_execution_stats = _(d.update_node_execution_stats)
upsert_execution_input = _(d.upsert_execution_input)
upsert_execution_output = _(d.upsert_execution_output)
get_execution_kv_data = _(d.get_execution_kv_data)
set_execution_kv_data = _(d.set_execution_kv_data)
# Graphs
get_node = _(d.get_node)
@@ -202,8 +208,10 @@ class DatabaseManagerAsyncClient(AppServiceClient):
return DatabaseManager
create_graph_execution = d.create_graph_execution
get_connected_output_nodes = d.get_connected_output_nodes
get_latest_node_execution = d.get_latest_node_execution
get_graph = d.get_graph
get_graph_metadata = d.get_graph_metadata
get_graph_execution_meta = d.get_graph_execution_meta
get_node = d.get_node
get_node_execution = d.get_node_execution
@@ -216,3 +224,5 @@ class DatabaseManagerAsyncClient(AppServiceClient):
update_node_execution_status = d.update_node_execution_status
update_node_execution_status_batch = d.update_node_execution_status_batch
update_user_integrations = d.update_user_integrations
get_execution_kv_data = d.get_execution_kv_data
set_execution_kv_data = d.set_execution_kv_data

View File

@@ -7,7 +7,8 @@ from prisma.models import User
import backend.server.v2.library.model
import backend.server.v2.store.model
from backend.blocks.basic import FindInDictionaryBlock, StoreValueBlock
from backend.blocks.basic import StoreValueBlock
from backend.blocks.data_manipulation import FindInDictionaryBlock
from backend.blocks.io import AgentInputBlock
from backend.blocks.maths import CalculatorBlock, Operation
from backend.data import execution, graph

View File

@@ -170,7 +170,14 @@ async def get_library_agent(id: str, user_id: str) -> library_model.LibraryAgent
if not library_agent:
raise NotFoundError(f"Library agent #{id} not found")
return library_model.LibraryAgent.from_db(library_agent)
return library_model.LibraryAgent.from_db(
library_agent,
sub_graphs=(
await graph_db.get_sub_graphs(library_agent.AgentGraph)
if library_agent.AgentGraph
else None
),
)
except prisma.errors.PrismaError as e:
logger.error(f"Database error fetching library agent: {e}")

View File

@@ -51,7 +51,7 @@ class LibraryAgent(pydantic.BaseModel):
description: str
input_schema: dict[str, Any] # Should be BlockIOObjectSubSchema in frontend
credentials_input_schema: dict[str, Any] = pydantic.Field(
credentials_input_schema: dict[str, Any] | None = pydantic.Field(
description="Input schema for credentials required by the agent",
)
@@ -70,7 +70,10 @@ class LibraryAgent(pydantic.BaseModel):
is_latest_version: bool
@staticmethod
def from_db(agent: prisma.models.LibraryAgent) -> "LibraryAgent":
def from_db(
agent: prisma.models.LibraryAgent,
sub_graphs: Optional[list[prisma.models.AgentGraph]] = None,
) -> "LibraryAgent":
"""
Factory method that constructs a LibraryAgent from a Prisma LibraryAgent
model instance.
@@ -78,7 +81,7 @@ class LibraryAgent(pydantic.BaseModel):
if not agent.AgentGraph:
raise ValueError("Associated Agent record is required.")
graph = graph_model.GraphModel.from_db(agent.AgentGraph)
graph = graph_model.GraphModel.from_db(agent.AgentGraph, sub_graphs=sub_graphs)
agent_updated_at = agent.AgentGraph.updatedAt
lib_agent_updated_at = agent.updatedAt
@@ -123,7 +126,9 @@ class LibraryAgent(pydantic.BaseModel):
name=graph.name,
description=graph.description,
input_schema=graph.input_schema,
credentials_input_schema=graph.credentials_input_schema,
credentials_input_schema=(
graph.credentials_input_schema if sub_graphs else None
),
has_external_trigger=graph.has_webhook_trigger,
trigger_setup_info=(
LibraryAgentTriggerInfo(

View File

@@ -0,0 +1,11 @@
-- CreateTable
CREATE TABLE "AgentNodeExecutionKeyValueData" (
"userId" TEXT NOT NULL,
"key" TEXT NOT NULL,
"agentNodeExecutionId" TEXT NOT NULL,
"data" JSONB,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" TIMESTAMP(3),
CONSTRAINT "AgentNodeExecutionKeyValueData_pkey" PRIMARY KEY ("userId","key")
);

View File

@@ -413,6 +413,16 @@ model AgentNodeExecutionInputOutput {
@@index([name, time])
}
model AgentNodeExecutionKeyValueData {
userId String
key String
agentNodeExecutionId String
data Json?
createdAt DateTime @default(now())
updatedAt DateTime? @updatedAt
@@id([userId, key])
}
// Webhook that is registered with a provider and propagates to one or more nodes
model IntegrationWebhook {
id String @id @default(uuid())
@@ -432,8 +442,8 @@ model IntegrationWebhook {
providerWebhookId String // Webhook ID assigned by the provider
AgentNodes AgentNode[]
AgentPresets AgentPreset[]
AgentNodes AgentNode[]
AgentPresets AgentPreset[]
@@index([userId])
}

View File

@@ -17,6 +17,7 @@ export const agentRunStatusMap: Record<
GraphExecutionMeta["status"],
AgentRunStatus
> = {
INCOMPLETE: "draft",
COMPLETED: "success",
FAILED: "failed",
QUEUED: "queued",

View File

@@ -34,7 +34,7 @@ export default function AgentStatusChip({
variant="secondary"
className={`text-xs font-medium ${statusStyles[statusData[status]?.variant]} rounded-[45px] px-[9px] py-[3px]`}
>
{statusData[status].label}
{statusData[status]?.label}
</Badge>
);
}

View File

@@ -1,22 +1,5 @@
import { FC, useEffect, useMemo, useState } from "react";
import { cn } from "@/lib/utils";
import { Button } from "@/components/ui/button";
import SchemaTooltip from "@/components/SchemaTooltip";
import useCredentials from "@/hooks/useCredentials";
import { NotionLogoIcon } from "@radix-ui/react-icons";
import {
FaDiscord,
FaGithub,
FaTwitter,
FaGoogle,
FaMedium,
FaKey,
FaHubspot,
} from "react-icons/fa";
import {
BlockIOCredentialsSubSchema,
CredentialsMetaInput,
} from "@/lib/autogpt-server-api/types";
import { Button } from "@/components/ui/button";
import { IconKey, IconKeyPlus, IconUserPlus } from "@/components/ui/icons";
import {
Select,
@@ -26,12 +9,30 @@ import {
SelectTrigger,
SelectValue,
} from "@/components/ui/select";
import useCredentials from "@/hooks/useCredentials";
import { useBackendAPI } from "@/lib/autogpt-server-api/context";
import {
BlockIOCredentialsSubSchema,
CredentialsMetaInput,
CredentialsProviderName,
} from "@/lib/autogpt-server-api/types";
import { cn } from "@/lib/utils";
import { getHostFromUrl } from "@/lib/utils/url";
import { NotionLogoIcon } from "@radix-ui/react-icons";
import { FC, useEffect, useMemo, useState } from "react";
import {
FaDiscord,
FaGithub,
FaGoogle,
FaHubspot,
FaKey,
FaMedium,
FaTwitter,
} from "react-icons/fa";
import { APIKeyCredentialsModal } from "./api-key-credentials-modal";
import { UserPasswordCredentialsModal } from "./user-password-credentials-modal";
import { HostScopedCredentialsModal } from "./host-scoped-credentials-modal";
import { OAuth2FlowWaitingModal } from "./oauth2-flow-waiting-modal";
import { getHostFromUrl } from "@/lib/utils/url";
import { UserPasswordCredentialsModal } from "./user-password-credentials-modal";
const fallbackIcon = FaKey;

View File

@@ -2,7 +2,6 @@ import { getWebSocketToken } from "@/lib/supabase/actions";
import { getServerSupabase } from "@/lib/supabase/server/getServerSupabase";
import { createBrowserClient } from "@supabase/ssr";
import type { SupabaseClient } from "@supabase/supabase-js";
import { proxyApiRequest, proxyFileUpload } from "./proxy-action";
import type {
AddUserCreditsResponse,
AnalyticsDetails,
@@ -27,6 +26,7 @@ import type {
GraphID,
GraphMeta,
GraphUpdateable,
HostScopedCredentials,
LibraryAgent,
LibraryAgentID,
LibraryAgentPreset,
@@ -62,7 +62,6 @@ import type {
User,
UserOnboarding,
UserPasswordCredentials,
HostScopedCredentials,
UsersBalanceHistoryResponse,
} from "./types";
@@ -525,8 +524,6 @@ export default class BackendAPI {
}
uploadStoreSubmissionMedia(file: File): Promise<string> {
const formData = new FormData();
formData.append("file", file);
return this._uploadFile("/store/submissions/media", file);
}
@@ -817,8 +814,48 @@ export default class BackendAPI {
const formData = new FormData();
formData.append("file", file);
// Use proxy server action for secure file upload
return await proxyFileUpload(path, formData, this.baseUrl);
if (isClient) {
return this._makeClientFileUpload(path, formData);
} else {
return this._makeServerFileUpload(path, formData);
}
}
private async _makeClientFileUpload(
path: string,
formData: FormData,
): Promise<string> {
// Dynamic import is required even for client-only functions because helpers.ts
// has server-only imports (like getServerSupabase) at the top level. Static imports
// would bundle server-only code into the client bundle, causing runtime errors.
const { buildClientUrl, parseErrorResponse, handleFetchError } =
await import("./helpers");
const uploadUrl = buildClientUrl(path);
const response = await fetch(uploadUrl, {
method: "POST",
body: formData,
credentials: "include",
});
if (!response.ok) {
const errorData = await parseErrorResponse(response);
throw handleFetchError(response, errorData);
}
return await response.text();
}
private async _makeServerFileUpload(
path: string,
formData: FormData,
): Promise<string> {
const { makeAuthenticatedFileUpload, buildServerUrl } = await import(
"./helpers"
);
const url = buildServerUrl(path);
return await makeAuthenticatedFileUpload(url, formData);
}
private async _request(
@@ -830,13 +867,62 @@ export default class BackendAPI {
console.debug(`${method} ${path} payload:`, payload);
}
// Always use proxy server action to not expose any auth tokens to the browser
return await proxyApiRequest({
if (isClient) {
return this._makeClientRequest(method, path, payload);
} else {
return this._makeServerRequest(method, path, payload);
}
}
private async _makeClientRequest(
method: string,
path: string,
payload?: Record<string, any>,
) {
// Dynamic import is required even for client-only functions because helpers.ts
// has server-only imports (like getServerSupabase) at the top level. Static imports
// would bundle server-only code into the client bundle, causing runtime errors.
const {
buildClientUrl,
buildUrlWithQuery,
parseErrorResponse,
handleFetchError,
} = await import("./helpers");
const payloadAsQuery = ["GET", "DELETE"].includes(method);
let url = buildClientUrl(path);
if (payloadAsQuery && payload) {
url = buildUrlWithQuery(url, payload);
}
const response = await fetch(url, {
method,
path,
payload,
baseUrl: this.baseUrl,
headers: {
"Content-Type": "application/json",
},
body: !payloadAsQuery && payload ? JSON.stringify(payload) : undefined,
credentials: "include",
});
if (!response.ok) {
const errorData = await parseErrorResponse(response);
throw handleFetchError(response, errorData);
}
return await response.json();
}
private async _makeServerRequest(
method: string,
path: string,
payload?: Record<string, any>,
) {
const { makeAuthenticatedRequest, buildServerUrl } = await import(
"./helpers"
);
const url = buildServerUrl(path);
return await makeAuthenticatedRequest(method, url, payload);
}
////////////////////////////////////////

View File

@@ -29,6 +29,42 @@ export function buildRequestUrl(
return url;
}
export function buildClientUrl(path: string): string {
return `/api/proxy/api${path}`;
}
export function buildServerUrl(path: string): string {
const baseUrl =
process.env.NEXT_PUBLIC_AGPT_SERVER_URL || "http://localhost:8006/api";
return `${baseUrl}${path}`;
}
export function buildUrlWithQuery(
url: string,
payload?: Record<string, any>,
): string {
if (!payload) return url;
const queryParams = new URLSearchParams(payload);
return `${url}?${queryParams.toString()}`;
}
export function handleFetchError(response: Response, errorData: any): ApiError {
return new ApiError(
errorData?.error || "Request failed",
response.status,
errorData,
);
}
export async function parseErrorResponse(response: Response): Promise<any> {
try {
return await response.json();
} catch {
return { error: response.statusText };
}
}
export async function getServerAuthToken(): Promise<string> {
const supabase = await getServerSupabase();

View File

@@ -250,7 +250,13 @@ export type GraphExecutionMeta = {
graph_id: GraphID;
graph_version: number;
preset_id?: LibraryAgentPresetID;
status: "QUEUED" | "RUNNING" | "COMPLETED" | "TERMINATED" | "FAILED";
status:
| "QUEUED"
| "RUNNING"
| "COMPLETED"
| "TERMINATED"
| "FAILED"
| "INCOMPLETE";
started_at: Date;
ended_at: Date;
stats?: {