diff --git a/autogpt_platform/backend/backend/blocks/data_manipulation.py b/autogpt_platform/backend/backend/blocks/data_manipulation.py index a8f25ecb18..fe878acfa9 100644 --- a/autogpt_platform/backend/backend/blocks/data_manipulation.py +++ b/autogpt_platform/backend/backend/blocks/data_manipulation.py @@ -682,17 +682,219 @@ class ListIsEmptyBlock(Block): yield "is_empty", len(input_data.list) == 0 +# ============================================================================= +# List Concatenation Helpers +# ============================================================================= + + +def _validate_list_input(item: Any, index: int) -> str | None: + """Validate that an item is a list. Returns error message or None.""" + if item is None: + return None # None is acceptable, will be skipped + if not isinstance(item, list): + return ( + f"Invalid input at index {index}: expected a list, " + f"got {type(item).__name__}. " + f"All items in 'lists' must be lists (e.g., [[1, 2], [3, 4]])." + ) + return None + + +def _validate_all_lists(lists: List[Any]) -> str | None: + """Validate that all items in a sequence are lists. Returns first error or None.""" + for idx, item in enumerate(lists): + error = _validate_list_input(item, idx) + if error is not None and item is not None: + return error + return None + + +def _concatenate_lists_simple(lists: List[List[Any]]) -> List[Any]: + """Concatenate a sequence of lists into a single list, skipping None values.""" + result: List[Any] = [] + for lst in lists: + if lst is None: + continue + result.extend(lst) + return result + + +def _flatten_nested_list(nested: List[Any], max_depth: int = -1) -> List[Any]: + """ + Recursively flatten a nested list structure. + + Args: + nested: The list to flatten. + max_depth: Maximum recursion depth. -1 means unlimited. + + Returns: + A flat list with all nested elements extracted. + """ + result: List[Any] = [] + _flatten_recursive(nested, result, current_depth=0, max_depth=max_depth) + return result + + +_MAX_FLATTEN_DEPTH = 1000 + + +def _flatten_recursive( + items: List[Any], + result: List[Any], + current_depth: int, + max_depth: int, +) -> None: + """Internal recursive helper for flattening nested lists.""" + if current_depth > _MAX_FLATTEN_DEPTH: + raise RecursionError( + f"Flattening exceeded maximum depth of {_MAX_FLATTEN_DEPTH} levels. " + "Input may be too deeply nested." + ) + for item in items: + if isinstance(item, list) and (max_depth == -1 or current_depth < max_depth): + _flatten_recursive(item, result, current_depth + 1, max_depth) + else: + result.append(item) + + +def _deduplicate_list(items: List[Any]) -> List[Any]: + """ + Remove duplicate elements from a list, preserving order of first occurrences. + + Args: + items: The list to deduplicate. + + Returns: + A list with duplicates removed, maintaining original order. + """ + seen: set = set() + result: List[Any] = [] + for item in items: + item_id = _make_hashable(item) + if item_id not in seen: + seen.add(item_id) + result.append(item) + return result + + +def _make_hashable(item: Any): + """ + Create a hashable representation of any item for deduplication. + Converts unhashable types (dicts, lists) into deterministic tuple structures. + """ + if isinstance(item, dict): + return tuple( + sorted( + ((_make_hashable(k), _make_hashable(v)) for k, v in item.items()), + key=lambda x: (str(type(x[0])), str(x[0])), + ) + ) + if isinstance(item, (list, tuple)): + return tuple(_make_hashable(i) for i in item) + if isinstance(item, set): + return frozenset(_make_hashable(i) for i in item) + return item + + +def _filter_none_values(items: List[Any]) -> List[Any]: + """Remove None values from a list.""" + return [item for item in items if item is not None] + + +def _compute_nesting_depth( + items: Any, current: int = 0, max_depth: int = _MAX_FLATTEN_DEPTH +) -> int: + """ + Compute the maximum nesting depth of a list structure using iteration to avoid RecursionError. + + Uses a stack-based approach to handle deeply nested structures without hitting Python's + recursion limit (~1000 levels). + """ + if not isinstance(items, list): + return current + + # Stack contains tuples of (item, depth) + stack = [(items, current)] + max_observed_depth = current + + while stack: + item, depth = stack.pop() + + if depth > max_depth: + return depth + + if not isinstance(item, list): + max_observed_depth = max(max_observed_depth, depth) + continue + + if len(item) == 0: + max_observed_depth = max(max_observed_depth, depth + 1) + continue + + # Add all children to stack with incremented depth + for child in item: + stack.append((child, depth + 1)) + + return max_observed_depth + + +def _interleave_lists(lists: List[List[Any]]) -> List[Any]: + """ + Interleave elements from multiple lists in round-robin fashion. + Example: [[1,2,3], [a,b], [x,y,z]] -> [1, a, x, 2, b, y, 3, z] + """ + if not lists: + return [] + filtered = [lst for lst in lists if lst is not None] + if not filtered: + return [] + result: List[Any] = [] + max_len = max(len(lst) for lst in filtered) + for i in range(max_len): + for lst in filtered: + if i < len(lst): + result.append(lst[i]) + return result + + +# ============================================================================= +# List Concatenation Blocks +# ============================================================================= + + class ConcatenateListsBlock(Block): + """ + Concatenates two or more lists into a single list. + + This block accepts a list of lists and combines all their elements + in order into one flat output list. It supports options for + deduplication and None-filtering to provide flexible list merging + capabilities for workflow pipelines. + """ + class Input(BlockSchemaInput): lists: List[List[Any]] = SchemaField( description="A list of lists to concatenate together. All lists will be combined in order into a single list.", placeholder="e.g., [[1, 2], [3, 4], [5, 6]]", ) + deduplicate: bool = SchemaField( + description="If True, remove duplicate elements from the concatenated result while preserving order.", + default=False, + advanced=True, + ) + remove_none: bool = SchemaField( + description="If True, remove None values from the concatenated result.", + default=False, + advanced=True, + ) class Output(BlockSchemaOutput): concatenated_list: List[Any] = SchemaField( description="The concatenated list containing all elements from all input lists in order." ) + length: int = SchemaField( + description="The total number of elements in the concatenated list." + ) error: str = SchemaField( description="Error message if concatenation failed due to invalid input types." ) @@ -700,7 +902,7 @@ class ConcatenateListsBlock(Block): def __init__(self): super().__init__( id="3cf9298b-5817-4141-9d80-7c2cc5199c8e", - description="Concatenates multiple lists into a single list. All elements from all input lists are combined in order.", + description="Concatenates multiple lists into a single list. All elements from all input lists are combined in order. Supports optional deduplication and None removal.", categories={BlockCategory.BASIC}, input_schema=ConcatenateListsBlock.Input, output_schema=ConcatenateListsBlock.Output, @@ -709,29 +911,497 @@ class ConcatenateListsBlock(Block): {"lists": [["a", "b"], ["c"], ["d", "e", "f"]]}, {"lists": [[1, 2], []]}, {"lists": []}, + {"lists": [[1, 2, 2, 3], [3, 4]], "deduplicate": True}, + {"lists": [[1, None, 2], [None, 3]], "remove_none": True}, ], test_output=[ ("concatenated_list", [1, 2, 3, 4, 5, 6]), + ("length", 6), ("concatenated_list", ["a", "b", "c", "d", "e", "f"]), + ("length", 6), ("concatenated_list", [1, 2]), + ("length", 2), ("concatenated_list", []), + ("length", 0), + ("concatenated_list", [1, 2, 3, 4]), + ("length", 4), + ("concatenated_list", [1, 2, 3]), + ("length", 3), ], ) + def _validate_inputs(self, lists: List[Any]) -> str | None: + return _validate_all_lists(lists) + + def _perform_concatenation(self, lists: List[List[Any]]) -> List[Any]: + return _concatenate_lists_simple(lists) + + def _apply_deduplication(self, items: List[Any]) -> List[Any]: + return _deduplicate_list(items) + + def _apply_none_removal(self, items: List[Any]) -> List[Any]: + return _filter_none_values(items) + + def _post_process( + self, items: List[Any], deduplicate: bool, remove_none: bool + ) -> List[Any]: + """Apply all post-processing steps to the concatenated result.""" + result = items + if remove_none: + result = self._apply_none_removal(result) + if deduplicate: + result = self._apply_deduplication(result) + return result + async def run(self, input_data: Input, **kwargs) -> BlockOutput: - concatenated = [] - for idx, lst in enumerate(input_data.lists): - if lst is None: - # Skip None values to avoid errors - continue - if not isinstance(lst, list): - # Type validation: each item must be a list - # Strings are iterable and would cause extend() to iterate character-by-character - # Non-iterable types would raise TypeError - yield "error", ( - f"Invalid input at index {idx}: expected a list, got {type(lst).__name__}. " - f"All items in 'lists' must be lists (e.g., [[1, 2], [3, 4]])." - ) - return - concatenated.extend(lst) - yield "concatenated_list", concatenated + # Validate all inputs are lists + validation_error = self._validate_inputs(input_data.lists) + if validation_error is not None: + yield "error", validation_error + return + + # Perform concatenation + concatenated = self._perform_concatenation(input_data.lists) + + # Apply post-processing + result = self._post_process( + concatenated, input_data.deduplicate, input_data.remove_none + ) + + yield "concatenated_list", result + yield "length", len(result) + + +class FlattenListBlock(Block): + """ + Flattens a nested list structure into a single flat list. + + This block takes a list that may contain nested lists at any depth + and produces a single-level list with all leaf elements. Useful + for normalizing data structures from multiple sources that may + have varying levels of nesting. + """ + + class Input(BlockSchemaInput): + nested_list: List[Any] = SchemaField( + description="A potentially nested list to flatten into a single-level list.", + placeholder="e.g., [[1, [2, 3]], [4, [5, [6]]]]", + ) + max_depth: int = SchemaField( + description="Maximum depth to flatten. -1 means flatten completely. 1 means flatten only one level.", + default=-1, + advanced=True, + ) + + class Output(BlockSchemaOutput): + flattened_list: List[Any] = SchemaField( + description="The flattened list with all nested elements extracted." + ) + length: int = SchemaField( + description="The number of elements in the flattened list." + ) + original_depth: int = SchemaField( + description="The maximum nesting depth of the original input list." + ) + error: str = SchemaField(description="Error message if flattening failed.") + + def __init__(self): + super().__init__( + id="cc45bb0f-d035-4756-96a7-fe3e36254b4d", + description="Flattens a nested list structure into a single flat list. Supports configurable maximum flattening depth.", + categories={BlockCategory.BASIC}, + input_schema=FlattenListBlock.Input, + output_schema=FlattenListBlock.Output, + test_input=[ + {"nested_list": [[1, 2], [3, [4, 5]]]}, + {"nested_list": [1, [2, [3, [4]]]]}, + {"nested_list": [1, [2, [3, [4]]], 5], "max_depth": 1}, + {"nested_list": []}, + {"nested_list": [1, 2, 3]}, + ], + test_output=[ + ("flattened_list", [1, 2, 3, 4, 5]), + ("length", 5), + ("original_depth", 3), + ("flattened_list", [1, 2, 3, 4]), + ("length", 4), + ("original_depth", 4), + ("flattened_list", [1, 2, [3, [4]], 5]), + ("length", 4), + ("original_depth", 4), + ("flattened_list", []), + ("length", 0), + ("original_depth", 1), + ("flattened_list", [1, 2, 3]), + ("length", 3), + ("original_depth", 1), + ], + ) + + def _compute_depth(self, items: List[Any]) -> int: + """Compute the nesting depth of the input list.""" + return _compute_nesting_depth(items) + + def _flatten(self, items: List[Any], max_depth: int) -> List[Any]: + """Flatten the list to the specified depth.""" + return _flatten_nested_list(items, max_depth=max_depth) + + def _validate_max_depth(self, max_depth: int) -> str | None: + """Validate the max_depth parameter.""" + if max_depth < -1: + return f"max_depth must be -1 (unlimited) or a non-negative integer, got {max_depth}" + return None + + async def run(self, input_data: Input, **kwargs) -> BlockOutput: + # Validate max_depth + depth_error = self._validate_max_depth(input_data.max_depth) + if depth_error is not None: + yield "error", depth_error + return + + original_depth = self._compute_depth(input_data.nested_list) + flattened = self._flatten(input_data.nested_list, input_data.max_depth) + + yield "flattened_list", flattened + yield "length", len(flattened) + yield "original_depth", original_depth + + +class InterleaveListsBlock(Block): + """ + Interleaves elements from multiple lists in round-robin fashion. + + Given multiple input lists, this block takes one element from each + list in turn, producing an output where elements alternate between + sources. Lists of different lengths are handled gracefully - shorter + lists simply stop contributing once exhausted. + """ + + class Input(BlockSchemaInput): + lists: List[List[Any]] = SchemaField( + description="A list of lists to interleave. Elements will be taken in round-robin order.", + placeholder="e.g., [[1, 2, 3], ['a', 'b', 'c']]", + ) + + class Output(BlockSchemaOutput): + interleaved_list: List[Any] = SchemaField( + description="The interleaved list with elements alternating from each input list." + ) + length: int = SchemaField( + description="The total number of elements in the interleaved list." + ) + error: str = SchemaField(description="Error message if interleaving failed.") + + def __init__(self): + super().__init__( + id="9f616084-1d9f-4f8e-bc00-5b9d2a75cd75", + description="Interleaves elements from multiple lists in round-robin fashion, alternating between sources.", + categories={BlockCategory.BASIC}, + input_schema=InterleaveListsBlock.Input, + output_schema=InterleaveListsBlock.Output, + test_input=[ + {"lists": [[1, 2, 3], ["a", "b", "c"]]}, + {"lists": [[1, 2, 3], ["a", "b"], ["x", "y", "z"]]}, + {"lists": [[1], [2], [3]]}, + {"lists": []}, + ], + test_output=[ + ("interleaved_list", [1, "a", 2, "b", 3, "c"]), + ("length", 6), + ("interleaved_list", [1, "a", "x", 2, "b", "y", 3, "z"]), + ("length", 8), + ("interleaved_list", [1, 2, 3]), + ("length", 3), + ("interleaved_list", []), + ("length", 0), + ], + ) + + def _validate_inputs(self, lists: List[Any]) -> str | None: + return _validate_all_lists(lists) + + def _interleave(self, lists: List[List[Any]]) -> List[Any]: + return _interleave_lists(lists) + + async def run(self, input_data: Input, **kwargs) -> BlockOutput: + validation_error = self._validate_inputs(input_data.lists) + if validation_error is not None: + yield "error", validation_error + return + + result = self._interleave(input_data.lists) + yield "interleaved_list", result + yield "length", len(result) + + +class ZipListsBlock(Block): + """ + Zips multiple lists together into a list of grouped tuples/lists. + + Takes two or more input lists and combines corresponding elements + into sub-lists. For example, zipping [1,2,3] and ['a','b','c'] + produces [[1,'a'], [2,'b'], [3,'c']]. Supports both truncating + to shortest list and padding to longest list with a fill value. + """ + + class Input(BlockSchemaInput): + lists: List[List[Any]] = SchemaField( + description="A list of lists to zip together. Corresponding elements will be grouped.", + placeholder="e.g., [[1, 2, 3], ['a', 'b', 'c']]", + ) + pad_to_longest: bool = SchemaField( + description="If True, pad shorter lists with fill_value to match the longest list. If False, truncate to shortest.", + default=False, + advanced=True, + ) + fill_value: Any = SchemaField( + description="Value to use for padding when pad_to_longest is True.", + default=None, + advanced=True, + ) + + class Output(BlockSchemaOutput): + zipped_list: List[List[Any]] = SchemaField( + description="The zipped list of grouped elements." + ) + length: int = SchemaField( + description="The number of groups in the zipped result." + ) + error: str = SchemaField(description="Error message if zipping failed.") + + def __init__(self): + super().__init__( + id="0d0e684f-5cb9-4c4b-b8d1-47a0860e0c07", + description="Zips multiple lists together into a list of grouped elements. Supports padding to longest or truncating to shortest.", + categories={BlockCategory.BASIC}, + input_schema=ZipListsBlock.Input, + output_schema=ZipListsBlock.Output, + test_input=[ + {"lists": [[1, 2, 3], ["a", "b", "c"]]}, + {"lists": [[1, 2, 3], ["a", "b"]]}, + { + "lists": [[1, 2], ["a", "b", "c"]], + "pad_to_longest": True, + "fill_value": 0, + }, + {"lists": []}, + ], + test_output=[ + ("zipped_list", [[1, "a"], [2, "b"], [3, "c"]]), + ("length", 3), + ("zipped_list", [[1, "a"], [2, "b"]]), + ("length", 2), + ("zipped_list", [[1, "a"], [2, "b"], [0, "c"]]), + ("length", 3), + ("zipped_list", []), + ("length", 0), + ], + ) + + def _validate_inputs(self, lists: List[Any]) -> str | None: + return _validate_all_lists(lists) + + def _zip_truncate(self, lists: List[List[Any]]) -> List[List[Any]]: + """Zip lists, truncating to shortest.""" + filtered = [lst for lst in lists if lst is not None] + if not filtered: + return [] + return [list(group) for group in zip(*filtered)] + + def _zip_pad(self, lists: List[List[Any]], fill_value: Any) -> List[List[Any]]: + """Zip lists, padding shorter ones with fill_value.""" + if not lists: + return [] + lists = [lst for lst in lists if lst is not None] + if not lists: + return [] + max_len = max(len(lst) for lst in lists) + result: List[List[Any]] = [] + for i in range(max_len): + group: List[Any] = [] + for lst in lists: + if i < len(lst): + group.append(lst[i]) + else: + group.append(fill_value) + result.append(group) + return result + + async def run(self, input_data: Input, **kwargs) -> BlockOutput: + validation_error = self._validate_inputs(input_data.lists) + if validation_error is not None: + yield "error", validation_error + return + + if not input_data.lists: + yield "zipped_list", [] + yield "length", 0 + return + + if input_data.pad_to_longest: + result = self._zip_pad(input_data.lists, input_data.fill_value) + else: + result = self._zip_truncate(input_data.lists) + + yield "zipped_list", result + yield "length", len(result) + + +class ListDifferenceBlock(Block): + """ + Computes the difference between two lists (elements in the first + list that are not in the second list). + + This is useful for finding items that exist in one dataset but + not in another, such as finding new items, missing items, or + items that need to be processed. + """ + + class Input(BlockSchemaInput): + list_a: List[Any] = SchemaField( + description="The primary list to check elements from.", + placeholder="e.g., [1, 2, 3, 4, 5]", + ) + list_b: List[Any] = SchemaField( + description="The list to subtract. Elements found here will be removed from list_a.", + placeholder="e.g., [3, 4, 5, 6]", + ) + symmetric: bool = SchemaField( + description="If True, compute symmetric difference (elements in either list but not both).", + default=False, + advanced=True, + ) + + class Output(BlockSchemaOutput): + difference: List[Any] = SchemaField( + description="Elements from list_a not found in list_b (or symmetric difference if enabled)." + ) + length: int = SchemaField( + description="The number of elements in the difference result." + ) + error: str = SchemaField(description="Error message if the operation failed.") + + def __init__(self): + super().__init__( + id="05309873-9d61-447e-96b5-b804e2511829", + description="Computes the difference between two lists. Returns elements in the first list not found in the second, or symmetric difference.", + categories={BlockCategory.BASIC}, + input_schema=ListDifferenceBlock.Input, + output_schema=ListDifferenceBlock.Output, + test_input=[ + {"list_a": [1, 2, 3, 4, 5], "list_b": [3, 4, 5, 6, 7]}, + { + "list_a": [1, 2, 3, 4, 5], + "list_b": [3, 4, 5, 6, 7], + "symmetric": True, + }, + {"list_a": ["a", "b", "c"], "list_b": ["b"]}, + {"list_a": [], "list_b": [1, 2, 3]}, + ], + test_output=[ + ("difference", [1, 2]), + ("length", 2), + ("difference", [1, 2, 6, 7]), + ("length", 4), + ("difference", ["a", "c"]), + ("length", 2), + ("difference", []), + ("length", 0), + ], + ) + + def _compute_difference(self, list_a: List[Any], list_b: List[Any]) -> List[Any]: + """Compute elements in list_a not in list_b.""" + b_hashes = {_make_hashable(item) for item in list_b} + return [item for item in list_a if _make_hashable(item) not in b_hashes] + + def _compute_symmetric_difference( + self, list_a: List[Any], list_b: List[Any] + ) -> List[Any]: + """Compute elements in either list but not both.""" + a_hashes = {_make_hashable(item) for item in list_a} + b_hashes = {_make_hashable(item) for item in list_b} + only_in_a = [item for item in list_a if _make_hashable(item) not in b_hashes] + only_in_b = [item for item in list_b if _make_hashable(item) not in a_hashes] + return only_in_a + only_in_b + + async def run(self, input_data: Input, **kwargs) -> BlockOutput: + if input_data.symmetric: + result = self._compute_symmetric_difference( + input_data.list_a, input_data.list_b + ) + else: + result = self._compute_difference(input_data.list_a, input_data.list_b) + + yield "difference", result + yield "length", len(result) + + +class ListIntersectionBlock(Block): + """ + Computes the intersection of two lists (elements present in both lists). + + This is useful for finding common items between two datasets, + such as shared tags, mutual connections, or overlapping categories. + """ + + class Input(BlockSchemaInput): + list_a: List[Any] = SchemaField( + description="The first list to intersect.", + placeholder="e.g., [1, 2, 3, 4, 5]", + ) + list_b: List[Any] = SchemaField( + description="The second list to intersect.", + placeholder="e.g., [3, 4, 5, 6, 7]", + ) + + class Output(BlockSchemaOutput): + intersection: List[Any] = SchemaField( + description="Elements present in both list_a and list_b." + ) + length: int = SchemaField( + description="The number of elements in the intersection." + ) + error: str = SchemaField(description="Error message if the operation failed.") + + def __init__(self): + super().__init__( + id="b6eb08b6-dbe3-411b-b9b4-2508cb311a1f", + description="Computes the intersection of two lists, returning only elements present in both.", + categories={BlockCategory.BASIC}, + input_schema=ListIntersectionBlock.Input, + output_schema=ListIntersectionBlock.Output, + test_input=[ + {"list_a": [1, 2, 3, 4, 5], "list_b": [3, 4, 5, 6, 7]}, + {"list_a": ["a", "b", "c"], "list_b": ["c", "d", "e"]}, + {"list_a": [1, 2], "list_b": [3, 4]}, + {"list_a": [], "list_b": [1, 2, 3]}, + ], + test_output=[ + ("intersection", [3, 4, 5]), + ("length", 3), + ("intersection", ["c"]), + ("length", 1), + ("intersection", []), + ("length", 0), + ("intersection", []), + ("length", 0), + ], + ) + + def _compute_intersection(self, list_a: List[Any], list_b: List[Any]) -> List[Any]: + """Compute elements present in both lists, preserving order from list_a.""" + b_hashes = {_make_hashable(item) for item in list_b} + seen: set = set() + result: List[Any] = [] + for item in list_a: + h = _make_hashable(item) + if h in b_hashes and h not in seen: + result.append(item) + seen.add(h) + return result + + async def run(self, input_data: Input, **kwargs) -> BlockOutput: + result = self._compute_intersection(input_data.list_a, input_data.list_b) + yield "intersection", result + yield "length", len(result) diff --git a/autogpt_platform/backend/test/blocks/test_list_concatenation.py b/autogpt_platform/backend/test/blocks/test_list_concatenation.py new file mode 100644 index 0000000000..8cea3b60f7 --- /dev/null +++ b/autogpt_platform/backend/test/blocks/test_list_concatenation.py @@ -0,0 +1,1276 @@ +""" +Comprehensive test suite for list concatenation and manipulation blocks. + +Tests cover: +- ConcatenateListsBlock: basic concatenation, deduplication, None removal +- FlattenListBlock: nested list flattening with depth control +- InterleaveListsBlock: round-robin interleaving of multiple lists +- ZipListsBlock: zipping lists with truncation and padding +- ListDifferenceBlock: computing list differences (regular and symmetric) +- ListIntersectionBlock: finding common elements between lists +- Helper utility functions: validation, flattening, deduplication, etc. +""" + +import pytest + +from backend.blocks.data_manipulation import ( + _MAX_FLATTEN_DEPTH, + ConcatenateListsBlock, + FlattenListBlock, + InterleaveListsBlock, + ListDifferenceBlock, + ListIntersectionBlock, + ZipListsBlock, + _compute_nesting_depth, + _concatenate_lists_simple, + _deduplicate_list, + _filter_none_values, + _flatten_nested_list, + _interleave_lists, + _make_hashable, + _validate_all_lists, + _validate_list_input, +) +from backend.util.test import execute_block_test + +# ============================================================================= +# Helper Function Tests +# ============================================================================= + + +class TestValidateListInput: + """Tests for the _validate_list_input helper.""" + + def test_valid_list_returns_none(self): + assert _validate_list_input([1, 2, 3], 0) is None + + def test_empty_list_returns_none(self): + assert _validate_list_input([], 0) is None + + def test_none_returns_none(self): + assert _validate_list_input(None, 0) is None + + def test_string_returns_error(self): + result = _validate_list_input("hello", 0) + assert result is not None + assert "str" in result + assert "index 0" in result + + def test_integer_returns_error(self): + result = _validate_list_input(42, 1) + assert result is not None + assert "int" in result + assert "index 1" in result + + def test_dict_returns_error(self): + result = _validate_list_input({"a": 1}, 2) + assert result is not None + assert "dict" in result + assert "index 2" in result + + def test_tuple_returns_error(self): + result = _validate_list_input((1, 2), 3) + assert result is not None + assert "tuple" in result + + def test_boolean_returns_error(self): + result = _validate_list_input(True, 0) + assert result is not None + assert "bool" in result + + def test_float_returns_error(self): + result = _validate_list_input(3.14, 0) + assert result is not None + assert "float" in result + + +class TestValidateAllLists: + """Tests for the _validate_all_lists helper.""" + + def test_all_valid_lists(self): + assert _validate_all_lists([[1], [2], [3]]) is None + + def test_empty_outer_list(self): + assert _validate_all_lists([]) is None + + def test_mixed_valid_and_none(self): + # None is skipped, so this should pass + assert _validate_all_lists([[1], None, [3]]) is None + + def test_invalid_item_returns_error(self): + result = _validate_all_lists([[1], "bad", [3]]) + assert result is not None + assert "index 1" in result + + def test_first_invalid_is_returned(self): + result = _validate_all_lists(["first_bad", "second_bad"]) + assert result is not None + assert "index 0" in result + + def test_all_none_passes(self): + assert _validate_all_lists([None, None, None]) is None + + +class TestConcatenateListsSimple: + """Tests for the _concatenate_lists_simple helper.""" + + def test_basic_concatenation(self): + assert _concatenate_lists_simple([[1, 2], [3, 4]]) == [1, 2, 3, 4] + + def test_empty_lists(self): + assert _concatenate_lists_simple([[], []]) == [] + + def test_single_list(self): + assert _concatenate_lists_simple([[1, 2, 3]]) == [1, 2, 3] + + def test_no_lists(self): + assert _concatenate_lists_simple([]) == [] + + def test_skip_none_values(self): + assert _concatenate_lists_simple([[1, 2], None, [3, 4]]) == [1, 2, 3, 4] # type: ignore[arg-type] + + def test_mixed_types(self): + result = _concatenate_lists_simple([[1, "a"], [True, 3.14]]) + assert result == [1, "a", True, 3.14] + + def test_nested_lists_preserved(self): + result = _concatenate_lists_simple([[[1, 2]], [[3, 4]]]) + assert result == [[1, 2], [3, 4]] + + def test_large_number_of_lists(self): + lists = [[i] for i in range(100)] + result = _concatenate_lists_simple(lists) + assert result == list(range(100)) + + +class TestFlattenNestedList: + """Tests for the _flatten_nested_list helper.""" + + def test_already_flat(self): + assert _flatten_nested_list([1, 2, 3]) == [1, 2, 3] + + def test_one_level_nesting(self): + assert _flatten_nested_list([[1, 2], [3, 4]]) == [1, 2, 3, 4] + + def test_deep_nesting(self): + assert _flatten_nested_list([1, [2, [3, [4, [5]]]]]) == [1, 2, 3, 4, 5] + + def test_empty_list(self): + assert _flatten_nested_list([]) == [] + + def test_mixed_nesting(self): + assert _flatten_nested_list([1, [2, 3], 4, [5, [6]]]) == [1, 2, 3, 4, 5, 6] + + def test_max_depth_zero(self): + # max_depth=0 means no flattening at all + result = _flatten_nested_list([[1, 2], [3, 4]], max_depth=0) + assert result == [[1, 2], [3, 4]] + + def test_max_depth_one(self): + result = _flatten_nested_list([[1, [2, 3]], [4, [5]]], max_depth=1) + assert result == [1, [2, 3], 4, [5]] + + def test_max_depth_two(self): + result = _flatten_nested_list([[[1, 2], [3]], [[4, [5]]]], max_depth=2) + assert result == [1, 2, 3, 4, [5]] + + def test_unlimited_depth(self): + deeply_nested = [[[[[[[1]]]]]]] + assert _flatten_nested_list(deeply_nested, max_depth=-1) == [1] + + def test_preserves_non_list_iterables(self): + result = _flatten_nested_list(["hello", [1, 2]]) + assert result == ["hello", 1, 2] + + def test_preserves_dicts(self): + result = _flatten_nested_list([{"a": 1}, [{"b": 2}]]) + assert result == [{"a": 1}, {"b": 2}] + + def test_excessive_depth_raises_recursion_error(self): + """Deeply nested lists beyond 1000 levels should raise RecursionError.""" + # Build a list nested 1100 levels deep + nested = [42] + for _ in range(1100): + nested = [nested] + with pytest.raises(RecursionError, match="maximum.*depth"): + _flatten_nested_list(nested, max_depth=-1) + + +class TestDeduplicateList: + """Tests for the _deduplicate_list helper.""" + + def test_no_duplicates(self): + assert _deduplicate_list([1, 2, 3]) == [1, 2, 3] + + def test_with_duplicates(self): + assert _deduplicate_list([1, 2, 2, 3, 3, 3]) == [1, 2, 3] + + def test_all_duplicates(self): + assert _deduplicate_list([1, 1, 1]) == [1] + + def test_empty_list(self): + assert _deduplicate_list([]) == [] + + def test_preserves_order(self): + result = _deduplicate_list([3, 1, 2, 1, 3]) + assert result == [3, 1, 2] + + def test_string_duplicates(self): + assert _deduplicate_list(["a", "b", "a", "c"]) == ["a", "b", "c"] + + def test_mixed_types(self): + result = _deduplicate_list([1, "1", 1, "1"]) + assert result == [1, "1"] + + def test_dict_duplicates(self): + result = _deduplicate_list([{"a": 1}, {"a": 1}, {"b": 2}]) + assert result == [{"a": 1}, {"b": 2}] + + def test_list_duplicates(self): + result = _deduplicate_list([[1, 2], [1, 2], [3, 4]]) + assert result == [[1, 2], [3, 4]] + + def test_none_duplicates(self): + result = _deduplicate_list([None, 1, None, 2]) + assert result == [None, 1, 2] + + def test_single_element(self): + assert _deduplicate_list([42]) == [42] + + +class TestMakeHashable: + """Tests for the _make_hashable helper.""" + + def test_integer(self): + assert _make_hashable(42) == 42 + + def test_string(self): + assert _make_hashable("hello") == "hello" + + def test_none(self): + assert _make_hashable(None) is None + + def test_dict_returns_tuple(self): + result = _make_hashable({"a": 1}) + assert isinstance(result, tuple) + # Should be hashable + hash(result) + + def test_list_returns_tuple(self): + result = _make_hashable([1, 2, 3]) + assert result == (1, 2, 3) + + def test_same_dict_same_hash(self): + assert _make_hashable({"a": 1, "b": 2}) == _make_hashable({"a": 1, "b": 2}) + + def test_different_dict_different_hash(self): + assert _make_hashable({"a": 1}) != _make_hashable({"a": 2}) + + def test_dict_key_order_independent(self): + """Dicts with same keys in different insertion order produce same result.""" + d1 = {"b": 2, "a": 1} + d2 = {"a": 1, "b": 2} + assert _make_hashable(d1) == _make_hashable(d2) + + def test_tuple_hashable(self): + result = _make_hashable((1, 2, 3)) + assert result == (1, 2, 3) + hash(result) + + def test_boolean(self): + result = _make_hashable(True) + assert result is True + + def test_float(self): + result = _make_hashable(3.14) + assert result == 3.14 + + +class TestFilterNoneValues: + """Tests for the _filter_none_values helper.""" + + def test_removes_none(self): + assert _filter_none_values([1, None, 2, None, 3]) == [1, 2, 3] + + def test_no_none(self): + assert _filter_none_values([1, 2, 3]) == [1, 2, 3] + + def test_all_none(self): + assert _filter_none_values([None, None, None]) == [] + + def test_empty_list(self): + assert _filter_none_values([]) == [] + + def test_preserves_falsy_values(self): + assert _filter_none_values([0, False, "", None, []]) == [0, False, "", []] + + +class TestComputeNestingDepth: + """Tests for the _compute_nesting_depth helper.""" + + def test_flat_list(self): + assert _compute_nesting_depth([1, 2, 3]) == 1 + + def test_one_level(self): + assert _compute_nesting_depth([[1, 2], [3, 4]]) == 2 + + def test_deep_nesting(self): + assert _compute_nesting_depth([[[[]]]]) == 4 + + def test_mixed_depth(self): + depth = _compute_nesting_depth([1, [2, [3]]]) + assert depth == 3 + + def test_empty_list(self): + assert _compute_nesting_depth([]) == 1 + + def test_non_list(self): + assert _compute_nesting_depth(42) == 0 + + def test_string_not_recursed(self): + # Strings should not be treated as nested lists + assert _compute_nesting_depth(["hello"]) == 1 + + +class TestInterleaveListsHelper: + """Tests for the _interleave_lists helper.""" + + def test_equal_length_lists(self): + result = _interleave_lists([[1, 2, 3], ["a", "b", "c"]]) + assert result == [1, "a", 2, "b", 3, "c"] + + def test_unequal_length_lists(self): + result = _interleave_lists([[1, 2, 3], ["a"]]) + assert result == [1, "a", 2, 3] + + def test_empty_input(self): + assert _interleave_lists([]) == [] + + def test_single_list(self): + assert _interleave_lists([[1, 2, 3]]) == [1, 2, 3] + + def test_three_lists(self): + result = _interleave_lists([[1], [2], [3]]) + assert result == [1, 2, 3] + + def test_with_none_list(self): + result = _interleave_lists([[1, 2], None, [3, 4]]) # type: ignore[arg-type] + assert result == [1, 3, 2, 4] + + def test_all_empty_lists(self): + assert _interleave_lists([[], [], []]) == [] + + def test_all_none_lists(self): + """All-None inputs should return empty list, not crash.""" + assert _interleave_lists([None, None, None]) == [] # type: ignore[arg-type] + + +class TestComputeNestingDepthEdgeCases: + """Tests for _compute_nesting_depth with deeply nested input.""" + + def test_deeply_nested_does_not_crash(self): + """Deeply nested lists beyond 1000 levels should not raise RecursionError.""" + nested = [42] + for _ in range(1100): + nested = [nested] + # Should return a depth value without crashing + depth = _compute_nesting_depth(nested) + assert depth >= _MAX_FLATTEN_DEPTH + + +class TestMakeHashableMixedKeys: + """Tests for _make_hashable with mixed-type dict keys.""" + + def test_mixed_type_dict_keys(self): + """Dicts with mixed-type keys (int and str) should not crash sorted().""" + d = {1: "one", "two": 2} + result = _make_hashable(d) + assert isinstance(result, tuple) + hash(result) # Should be hashable without error + + def test_mixed_type_keys_deterministic(self): + """Same dict with mixed keys produces same result.""" + d1 = {1: "a", "b": 2} + d2 = {1: "a", "b": 2} + assert _make_hashable(d1) == _make_hashable(d2) + + +class TestZipListsNoneHandling: + """Tests for ZipListsBlock with None values in input.""" + + def setup_method(self): + self.block = ZipListsBlock() + + def test_zip_truncate_with_none(self): + """_zip_truncate should handle None values in input lists.""" + result = self.block._zip_truncate([[1, 2], None, [3, 4]]) # type: ignore[arg-type] + assert result == [[1, 3], [2, 4]] + + def test_zip_pad_with_none(self): + """_zip_pad should handle None values in input lists.""" + result = self.block._zip_pad([[1, 2, 3], None, ["a"]], fill_value="X") # type: ignore[arg-type] + assert result == [[1, "a"], [2, "X"], [3, "X"]] + + def test_zip_truncate_all_none(self): + """All-None inputs should return empty list.""" + result = self.block._zip_truncate([None, None]) # type: ignore[arg-type] + assert result == [] + + def test_zip_pad_all_none(self): + """All-None inputs should return empty list.""" + result = self.block._zip_pad([None, None], fill_value=0) # type: ignore[arg-type] + assert result == [] + + +# ============================================================================= +# Block Built-in Tests (using test_input/test_output) +# ============================================================================= + + +class TestConcatenateListsBlockBuiltin: + """Run the built-in test_input/test_output tests for ConcatenateListsBlock.""" + + @pytest.mark.asyncio + async def test_builtin_tests(self): + block = ConcatenateListsBlock() + await execute_block_test(block) + + +class TestFlattenListBlockBuiltin: + """Run the built-in test_input/test_output tests for FlattenListBlock.""" + + @pytest.mark.asyncio + async def test_builtin_tests(self): + block = FlattenListBlock() + await execute_block_test(block) + + +class TestInterleaveListsBlockBuiltin: + """Run the built-in test_input/test_output tests for InterleaveListsBlock.""" + + @pytest.mark.asyncio + async def test_builtin_tests(self): + block = InterleaveListsBlock() + await execute_block_test(block) + + +class TestZipListsBlockBuiltin: + """Run the built-in test_input/test_output tests for ZipListsBlock.""" + + @pytest.mark.asyncio + async def test_builtin_tests(self): + block = ZipListsBlock() + await execute_block_test(block) + + +class TestListDifferenceBlockBuiltin: + """Run the built-in test_input/test_output tests for ListDifferenceBlock.""" + + @pytest.mark.asyncio + async def test_builtin_tests(self): + block = ListDifferenceBlock() + await execute_block_test(block) + + +class TestListIntersectionBlockBuiltin: + """Run the built-in test_input/test_output tests for ListIntersectionBlock.""" + + @pytest.mark.asyncio + async def test_builtin_tests(self): + block = ListIntersectionBlock() + await execute_block_test(block) + + +# ============================================================================= +# ConcatenateListsBlock Manual Tests +# ============================================================================= + + +class TestConcatenateListsBlockManual: + """Manual test cases for ConcatenateListsBlock edge cases.""" + + def setup_method(self): + self.block = ConcatenateListsBlock() + + @pytest.mark.asyncio + async def test_two_lists(self): + """Test basic two-list concatenation.""" + results = {} + async for name, value in self.block.run( + ConcatenateListsBlock.Input(lists=[[1, 2], [3, 4]]) + ): + results[name] = value + assert results["concatenated_list"] == [1, 2, 3, 4] + assert results["length"] == 4 + + @pytest.mark.asyncio + async def test_three_lists(self): + """Test three-list concatenation.""" + results = {} + async for name, value in self.block.run( + ConcatenateListsBlock.Input(lists=[[1], [2], [3]]) + ): + results[name] = value + assert results["concatenated_list"] == [1, 2, 3] + + @pytest.mark.asyncio + async def test_five_lists(self): + """Test concatenation of five lists.""" + results = {} + async for name, value in self.block.run( + ConcatenateListsBlock.Input(lists=[[1], [2], [3], [4], [5]]) + ): + results[name] = value + assert results["concatenated_list"] == [1, 2, 3, 4, 5] + assert results["length"] == 5 + + @pytest.mark.asyncio + async def test_empty_lists_only(self): + """Test concatenation of only empty lists.""" + results = {} + async for name, value in self.block.run( + ConcatenateListsBlock.Input(lists=[[], [], []]) + ): + results[name] = value + assert results["concatenated_list"] == [] + assert results["length"] == 0 + + @pytest.mark.asyncio + async def test_mixed_types_in_lists(self): + """Test concatenation with mixed types.""" + results = {} + async for name, value in self.block.run( + ConcatenateListsBlock.Input( + lists=[[1, "a"], [True, 3.14], [None, {"key": "val"}]] + ) + ): + results[name] = value + assert results["concatenated_list"] == [ + 1, + "a", + True, + 3.14, + None, + {"key": "val"}, + ] + + @pytest.mark.asyncio + async def test_deduplication_enabled(self): + """Test deduplication removes duplicates.""" + results = {} + async for name, value in self.block.run( + ConcatenateListsBlock.Input( + lists=[[1, 2, 3], [2, 3, 4], [3, 4, 5]], + deduplicate=True, + ) + ): + results[name] = value + assert results["concatenated_list"] == [1, 2, 3, 4, 5] + + @pytest.mark.asyncio + async def test_deduplication_preserves_order(self): + """Test that deduplication preserves first-occurrence order.""" + results = {} + async for name, value in self.block.run( + ConcatenateListsBlock.Input( + lists=[[3, 1, 2], [2, 4, 1]], + deduplicate=True, + ) + ): + results[name] = value + assert results["concatenated_list"] == [3, 1, 2, 4] + + @pytest.mark.asyncio + async def test_remove_none_enabled(self): + """Test None removal from concatenated results.""" + results = {} + async for name, value in self.block.run( + ConcatenateListsBlock.Input( + lists=[[1, None], [None, 2], [3, None]], + remove_none=True, + ) + ): + results[name] = value + assert results["concatenated_list"] == [1, 2, 3] + + @pytest.mark.asyncio + async def test_dedup_and_remove_none_combined(self): + """Test both deduplication and None removal together.""" + results = {} + async for name, value in self.block.run( + ConcatenateListsBlock.Input( + lists=[[1, None, 2], [2, None, 3]], + deduplicate=True, + remove_none=True, + ) + ): + results[name] = value + assert results["concatenated_list"] == [1, 2, 3] + + @pytest.mark.asyncio + async def test_nested_lists_preserved(self): + """Test that nested lists are not flattened during concatenation.""" + results = {} + async for name, value in self.block.run( + ConcatenateListsBlock.Input(lists=[[[1, 2]], [[3, 4]]]) + ): + results[name] = value + assert results["concatenated_list"] == [[1, 2], [3, 4]] + + @pytest.mark.asyncio + async def test_large_lists(self): + """Test concatenation of large lists.""" + list_a = list(range(1000)) + list_b = list(range(1000, 2000)) + results = {} + async for name, value in self.block.run( + ConcatenateListsBlock.Input(lists=[list_a, list_b]) + ): + results[name] = value + assert results["concatenated_list"] == list(range(2000)) + assert results["length"] == 2000 + + @pytest.mark.asyncio + async def test_single_list_input(self): + """Test concatenation with a single list.""" + results = {} + async for name, value in self.block.run( + ConcatenateListsBlock.Input(lists=[[1, 2, 3]]) + ): + results[name] = value + assert results["concatenated_list"] == [1, 2, 3] + + @pytest.mark.asyncio + async def test_block_id_is_valid_uuid(self): + """Test that the block has a valid UUID4 ID.""" + import uuid + + parsed = uuid.UUID(self.block.id) + assert parsed.version == 4 + + @pytest.mark.asyncio + async def test_block_category(self): + """Test that the block has the correct category.""" + from backend.blocks._base import BlockCategory + + assert BlockCategory.BASIC in self.block.categories + + +# ============================================================================= +# FlattenListBlock Manual Tests +# ============================================================================= + + +class TestFlattenListBlockManual: + """Manual test cases for FlattenListBlock.""" + + def setup_method(self): + self.block = FlattenListBlock() + + @pytest.mark.asyncio + async def test_simple_flatten(self): + """Test flattening a simple nested list.""" + results = {} + async for name, value in self.block.run( + FlattenListBlock.Input(nested_list=[[1, 2], [3, 4]]) + ): + results[name] = value + assert results["flattened_list"] == [1, 2, 3, 4] + assert results["length"] == 4 + + @pytest.mark.asyncio + async def test_deeply_nested(self): + """Test flattening a deeply nested structure.""" + results = {} + async for name, value in self.block.run( + FlattenListBlock.Input(nested_list=[1, [2, [3, [4, [5]]]]]) + ): + results[name] = value + assert results["flattened_list"] == [1, 2, 3, 4, 5] + + @pytest.mark.asyncio + async def test_partial_flatten(self): + """Test flattening with max_depth=1.""" + results = {} + async for name, value in self.block.run( + FlattenListBlock.Input( + nested_list=[[1, [2, 3]], [4, [5]]], + max_depth=1, + ) + ): + results[name] = value + assert results["flattened_list"] == [1, [2, 3], 4, [5]] + + @pytest.mark.asyncio + async def test_already_flat_list(self): + """Test flattening an already flat list.""" + results = {} + async for name, value in self.block.run( + FlattenListBlock.Input(nested_list=[1, 2, 3, 4]) + ): + results[name] = value + assert results["flattened_list"] == [1, 2, 3, 4] + + @pytest.mark.asyncio + async def test_empty_nested_lists(self): + """Test flattening with empty nested lists.""" + results = {} + async for name, value in self.block.run( + FlattenListBlock.Input(nested_list=[[], [1], [], [2], []]) + ): + results[name] = value + assert results["flattened_list"] == [1, 2] + + @pytest.mark.asyncio + async def test_mixed_types_preserved(self): + """Test that non-list types are preserved during flattening.""" + results = {} + async for name, value in self.block.run( + FlattenListBlock.Input(nested_list=["hello", [1, {"a": 1}], [True]]) + ): + results[name] = value + assert results["flattened_list"] == ["hello", 1, {"a": 1}, True] + + @pytest.mark.asyncio + async def test_original_depth_reported(self): + """Test that original nesting depth is correctly reported.""" + results = {} + async for name, value in self.block.run( + FlattenListBlock.Input(nested_list=[1, [2, [3]]]) + ): + results[name] = value + assert results["original_depth"] == 3 + + @pytest.mark.asyncio + async def test_block_id_is_valid_uuid(self): + """Test that the block has a valid UUID4 ID.""" + import uuid + + parsed = uuid.UUID(self.block.id) + assert parsed.version == 4 + + +# ============================================================================= +# InterleaveListsBlock Manual Tests +# ============================================================================= + + +class TestInterleaveListsBlockManual: + """Manual test cases for InterleaveListsBlock.""" + + def setup_method(self): + self.block = InterleaveListsBlock() + + @pytest.mark.asyncio + async def test_equal_length_interleave(self): + """Test interleaving two equal-length lists.""" + results = {} + async for name, value in self.block.run( + InterleaveListsBlock.Input(lists=[[1, 2, 3], ["a", "b", "c"]]) + ): + results[name] = value + assert results["interleaved_list"] == [1, "a", 2, "b", 3, "c"] + + @pytest.mark.asyncio + async def test_unequal_length_interleave(self): + """Test interleaving lists of different lengths.""" + results = {} + async for name, value in self.block.run( + InterleaveListsBlock.Input(lists=[[1, 2, 3, 4], ["a", "b"]]) + ): + results[name] = value + assert results["interleaved_list"] == [1, "a", 2, "b", 3, 4] + + @pytest.mark.asyncio + async def test_three_lists_interleave(self): + """Test interleaving three lists.""" + results = {} + async for name, value in self.block.run( + InterleaveListsBlock.Input(lists=[[1, 2], ["a", "b"], ["x", "y"]]) + ): + results[name] = value + assert results["interleaved_list"] == [1, "a", "x", 2, "b", "y"] + + @pytest.mark.asyncio + async def test_single_element_lists(self): + """Test interleaving single-element lists.""" + results = {} + async for name, value in self.block.run( + InterleaveListsBlock.Input(lists=[[1], [2], [3], [4]]) + ): + results[name] = value + assert results["interleaved_list"] == [1, 2, 3, 4] + + @pytest.mark.asyncio + async def test_block_id_is_valid_uuid(self): + """Test that the block has a valid UUID4 ID.""" + import uuid + + parsed = uuid.UUID(self.block.id) + assert parsed.version == 4 + + +# ============================================================================= +# ZipListsBlock Manual Tests +# ============================================================================= + + +class TestZipListsBlockManual: + """Manual test cases for ZipListsBlock.""" + + def setup_method(self): + self.block = ZipListsBlock() + + @pytest.mark.asyncio + async def test_basic_zip(self): + """Test basic zipping of two lists.""" + results = {} + async for name, value in self.block.run( + ZipListsBlock.Input(lists=[[1, 2, 3], ["a", "b", "c"]]) + ): + results[name] = value + assert results["zipped_list"] == [[1, "a"], [2, "b"], [3, "c"]] + + @pytest.mark.asyncio + async def test_truncate_to_shortest(self): + """Test that default behavior truncates to shortest list.""" + results = {} + async for name, value in self.block.run( + ZipListsBlock.Input(lists=[[1, 2, 3], ["a", "b"]]) + ): + results[name] = value + assert results["zipped_list"] == [[1, "a"], [2, "b"]] + assert results["length"] == 2 + + @pytest.mark.asyncio + async def test_pad_to_longest(self): + """Test padding shorter lists with fill value.""" + results = {} + async for name, value in self.block.run( + ZipListsBlock.Input( + lists=[[1, 2, 3], ["a"]], + pad_to_longest=True, + fill_value="X", + ) + ): + results[name] = value + assert results["zipped_list"] == [[1, "a"], [2, "X"], [3, "X"]] + + @pytest.mark.asyncio + async def test_pad_with_none(self): + """Test padding with None (default fill value).""" + results = {} + async for name, value in self.block.run( + ZipListsBlock.Input( + lists=[[1, 2], ["a"]], + pad_to_longest=True, + ) + ): + results[name] = value + assert results["zipped_list"] == [[1, "a"], [2, None]] + + @pytest.mark.asyncio + async def test_three_lists_zip(self): + """Test zipping three lists.""" + results = {} + async for name, value in self.block.run( + ZipListsBlock.Input(lists=[[1, 2], ["a", "b"], [True, False]]) + ): + results[name] = value + assert results["zipped_list"] == [[1, "a", True], [2, "b", False]] + + @pytest.mark.asyncio + async def test_empty_lists_zip(self): + """Test zipping empty input.""" + results = {} + async for name, value in self.block.run(ZipListsBlock.Input(lists=[])): + results[name] = value + assert results["zipped_list"] == [] + assert results["length"] == 0 + + @pytest.mark.asyncio + async def test_block_id_is_valid_uuid(self): + """Test that the block has a valid UUID4 ID.""" + import uuid + + parsed = uuid.UUID(self.block.id) + assert parsed.version == 4 + + +# ============================================================================= +# ListDifferenceBlock Manual Tests +# ============================================================================= + + +class TestListDifferenceBlockManual: + """Manual test cases for ListDifferenceBlock.""" + + def setup_method(self): + self.block = ListDifferenceBlock() + + @pytest.mark.asyncio + async def test_basic_difference(self): + """Test basic set difference.""" + results = {} + async for name, value in self.block.run( + ListDifferenceBlock.Input( + list_a=[1, 2, 3, 4, 5], + list_b=[3, 4, 5, 6, 7], + ) + ): + results[name] = value + assert results["difference"] == [1, 2] + + @pytest.mark.asyncio + async def test_symmetric_difference(self): + """Test symmetric difference.""" + results = {} + async for name, value in self.block.run( + ListDifferenceBlock.Input( + list_a=[1, 2, 3], + list_b=[2, 3, 4], + symmetric=True, + ) + ): + results[name] = value + assert results["difference"] == [1, 4] + + @pytest.mark.asyncio + async def test_no_difference(self): + """Test when lists are identical.""" + results = {} + async for name, value in self.block.run( + ListDifferenceBlock.Input( + list_a=[1, 2, 3], + list_b=[1, 2, 3], + ) + ): + results[name] = value + assert results["difference"] == [] + assert results["length"] == 0 + + @pytest.mark.asyncio + async def test_complete_difference(self): + """Test when lists share no elements.""" + results = {} + async for name, value in self.block.run( + ListDifferenceBlock.Input( + list_a=[1, 2, 3], + list_b=[4, 5, 6], + ) + ): + results[name] = value + assert results["difference"] == [1, 2, 3] + + @pytest.mark.asyncio + async def test_empty_list_a(self): + """Test with empty list_a.""" + results = {} + async for name, value in self.block.run( + ListDifferenceBlock.Input(list_a=[], list_b=[1, 2, 3]) + ): + results[name] = value + assert results["difference"] == [] + + @pytest.mark.asyncio + async def test_empty_list_b(self): + """Test with empty list_b.""" + results = {} + async for name, value in self.block.run( + ListDifferenceBlock.Input(list_a=[1, 2, 3], list_b=[]) + ): + results[name] = value + assert results["difference"] == [1, 2, 3] + + @pytest.mark.asyncio + async def test_string_difference(self): + """Test difference with string elements.""" + results = {} + async for name, value in self.block.run( + ListDifferenceBlock.Input( + list_a=["apple", "banana", "cherry"], + list_b=["banana", "date"], + ) + ): + results[name] = value + assert results["difference"] == ["apple", "cherry"] + + @pytest.mark.asyncio + async def test_dict_difference(self): + """Test difference with dictionary elements.""" + results = {} + async for name, value in self.block.run( + ListDifferenceBlock.Input( + list_a=[{"a": 1}, {"b": 2}, {"c": 3}], + list_b=[{"b": 2}], + ) + ): + results[name] = value + assert results["difference"] == [{"a": 1}, {"c": 3}] + + @pytest.mark.asyncio + async def test_block_id_is_valid_uuid(self): + """Test that the block has a valid UUID4 ID.""" + import uuid + + parsed = uuid.UUID(self.block.id) + assert parsed.version == 4 + + +# ============================================================================= +# ListIntersectionBlock Manual Tests +# ============================================================================= + + +class TestListIntersectionBlockManual: + """Manual test cases for ListIntersectionBlock.""" + + def setup_method(self): + self.block = ListIntersectionBlock() + + @pytest.mark.asyncio + async def test_basic_intersection(self): + """Test basic intersection.""" + results = {} + async for name, value in self.block.run( + ListIntersectionBlock.Input( + list_a=[1, 2, 3, 4, 5], + list_b=[3, 4, 5, 6, 7], + ) + ): + results[name] = value + assert results["intersection"] == [3, 4, 5] + assert results["length"] == 3 + + @pytest.mark.asyncio + async def test_no_intersection(self): + """Test when lists share no elements.""" + results = {} + async for name, value in self.block.run( + ListIntersectionBlock.Input( + list_a=[1, 2, 3], + list_b=[4, 5, 6], + ) + ): + results[name] = value + assert results["intersection"] == [] + assert results["length"] == 0 + + @pytest.mark.asyncio + async def test_identical_lists(self): + """Test intersection of identical lists.""" + results = {} + async for name, value in self.block.run( + ListIntersectionBlock.Input( + list_a=[1, 2, 3], + list_b=[1, 2, 3], + ) + ): + results[name] = value + assert results["intersection"] == [1, 2, 3] + + @pytest.mark.asyncio + async def test_preserves_order_from_list_a(self): + """Test that intersection preserves order from list_a.""" + results = {} + async for name, value in self.block.run( + ListIntersectionBlock.Input( + list_a=[5, 3, 1], + list_b=[1, 3, 5], + ) + ): + results[name] = value + assert results["intersection"] == [5, 3, 1] + + @pytest.mark.asyncio + async def test_empty_list_a(self): + """Test with empty list_a.""" + results = {} + async for name, value in self.block.run( + ListIntersectionBlock.Input(list_a=[], list_b=[1, 2, 3]) + ): + results[name] = value + assert results["intersection"] == [] + + @pytest.mark.asyncio + async def test_empty_list_b(self): + """Test with empty list_b.""" + results = {} + async for name, value in self.block.run( + ListIntersectionBlock.Input(list_a=[1, 2, 3], list_b=[]) + ): + results[name] = value + assert results["intersection"] == [] + + @pytest.mark.asyncio + async def test_string_intersection(self): + """Test intersection with string elements.""" + results = {} + async for name, value in self.block.run( + ListIntersectionBlock.Input( + list_a=["apple", "banana", "cherry"], + list_b=["banana", "cherry", "date"], + ) + ): + results[name] = value + assert results["intersection"] == ["banana", "cherry"] + + @pytest.mark.asyncio + async def test_deduplication_in_intersection(self): + """Test that duplicates in input don't cause duplicate results.""" + results = {} + async for name, value in self.block.run( + ListIntersectionBlock.Input( + list_a=[1, 1, 2, 2, 3], + list_b=[1, 2], + ) + ): + results[name] = value + assert results["intersection"] == [1, 2] + + @pytest.mark.asyncio + async def test_block_id_is_valid_uuid(self): + """Test that the block has a valid UUID4 ID.""" + import uuid + + parsed = uuid.UUID(self.block.id) + assert parsed.version == 4 + + +# ============================================================================= +# Block Method Tests +# ============================================================================= + + +class TestConcatenateListsBlockMethods: + """Tests for internal methods of ConcatenateListsBlock.""" + + def setup_method(self): + self.block = ConcatenateListsBlock() + + def test_validate_inputs_valid(self): + assert self.block._validate_inputs([[1], [2]]) is None + + def test_validate_inputs_invalid(self): + result = self.block._validate_inputs([[1], "bad"]) + assert result is not None + + def test_perform_concatenation(self): + result = self.block._perform_concatenation([[1, 2], [3, 4]]) + assert result == [1, 2, 3, 4] + + def test_apply_deduplication(self): + result = self.block._apply_deduplication([1, 2, 2, 3]) + assert result == [1, 2, 3] + + def test_apply_none_removal(self): + result = self.block._apply_none_removal([1, None, 2]) + assert result == [1, 2] + + def test_post_process_all_options(self): + result = self.block._post_process( + [1, None, 2, None, 2], deduplicate=True, remove_none=True + ) + assert result == [1, 2] + + def test_post_process_no_options(self): + result = self.block._post_process( + [1, None, 2, None, 2], deduplicate=False, remove_none=False + ) + assert result == [1, None, 2, None, 2] + + +class TestFlattenListBlockMethods: + """Tests for internal methods of FlattenListBlock.""" + + def setup_method(self): + self.block = FlattenListBlock() + + def test_compute_depth_flat(self): + assert self.block._compute_depth([1, 2, 3]) == 1 + + def test_compute_depth_nested(self): + assert self.block._compute_depth([[1, [2]]]) == 3 + + def test_flatten_unlimited(self): + result = self.block._flatten([1, [2, [3]]], max_depth=-1) + assert result == [1, 2, 3] + + def test_flatten_limited(self): + result = self.block._flatten([1, [2, [3]]], max_depth=1) + assert result == [1, 2, [3]] + + def test_validate_max_depth_valid(self): + assert self.block._validate_max_depth(-1) is None + assert self.block._validate_max_depth(0) is None + assert self.block._validate_max_depth(5) is None + + def test_validate_max_depth_invalid(self): + result = self.block._validate_max_depth(-2) + assert result is not None + + +class TestZipListsBlockMethods: + """Tests for internal methods of ZipListsBlock.""" + + def setup_method(self): + self.block = ZipListsBlock() + + def test_zip_truncate(self): + result = self.block._zip_truncate([[1, 2, 3], ["a", "b"]]) + assert result == [[1, "a"], [2, "b"]] + + def test_zip_pad(self): + result = self.block._zip_pad([[1, 2, 3], ["a"]], fill_value="X") + assert result == [[1, "a"], [2, "X"], [3, "X"]] + + def test_zip_pad_empty(self): + result = self.block._zip_pad([], fill_value=None) + assert result == [] + + def test_validate_inputs(self): + assert self.block._validate_inputs([[1], [2]]) is None + result = self.block._validate_inputs([[1], "bad"]) + assert result is not None + + +class TestListDifferenceBlockMethods: + """Tests for internal methods of ListDifferenceBlock.""" + + def setup_method(self): + self.block = ListDifferenceBlock() + + def test_compute_difference(self): + result = self.block._compute_difference([1, 2, 3], [2, 3, 4]) + assert result == [1] + + def test_compute_symmetric_difference(self): + result = self.block._compute_symmetric_difference([1, 2, 3], [2, 3, 4]) + assert result == [1, 4] + + def test_compute_difference_empty(self): + result = self.block._compute_difference([], [1, 2]) + assert result == [] + + def test_compute_symmetric_difference_identical(self): + result = self.block._compute_symmetric_difference([1, 2], [1, 2]) + assert result == [] + + +class TestListIntersectionBlockMethods: + """Tests for internal methods of ListIntersectionBlock.""" + + def setup_method(self): + self.block = ListIntersectionBlock() + + def test_compute_intersection(self): + result = self.block._compute_intersection([1, 2, 3], [2, 3, 4]) + assert result == [2, 3] + + def test_compute_intersection_empty(self): + result = self.block._compute_intersection([], [1, 2]) + assert result == [] + + def test_compute_intersection_no_overlap(self): + result = self.block._compute_intersection([1, 2], [3, 4]) + assert result == [] diff --git a/docs/integrations/README.md b/docs/integrations/README.md index c216aa4836..00d4b0c73a 100644 --- a/docs/integrations/README.md +++ b/docs/integrations/README.md @@ -56,12 +56,16 @@ Below is a comprehensive list of all available blocks, categorized by their prim | [File Store](block-integrations/basic.md#file-store) | Downloads and stores a file from a URL, data URI, or local path | | [Find In Dictionary](block-integrations/basic.md#find-in-dictionary) | A block that looks up a value in a dictionary, list, or object by key or index and returns the corresponding value | | [Find In List](block-integrations/basic.md#find-in-list) | Finds the index of the value in the list | +| [Flatten List](block-integrations/basic.md#flatten-list) | Flattens a nested list structure into a single flat list | | [Get All Memories](block-integrations/basic.md#get-all-memories) | Retrieve all memories from Mem0 with optional conversation filtering | | [Get Latest Memory](block-integrations/basic.md#get-latest-memory) | Retrieve the latest memory from Mem0 with optional key filtering | | [Get List Item](block-integrations/basic.md#get-list-item) | Returns the element at the given index | | [Get Store Agent Details](block-integrations/system/store_operations.md#get-store-agent-details) | Get detailed information about an agent from the store | | [Get Weather Information](block-integrations/basic.md#get-weather-information) | Retrieves weather information for a specified location using OpenWeatherMap API | | [Human In The Loop](block-integrations/basic.md#human-in-the-loop) | Pause execution for human review | +| [Interleave Lists](block-integrations/basic.md#interleave-lists) | Interleaves elements from multiple lists in round-robin fashion, alternating between sources | +| [List Difference](block-integrations/basic.md#list-difference) | Computes the difference between two lists | +| [List Intersection](block-integrations/basic.md#list-intersection) | Computes the intersection of two lists, returning only elements present in both | | [List Is Empty](block-integrations/basic.md#list-is-empty) | Checks if a list is empty | | [List Library Agents](block-integrations/system/library_operations.md#list-library-agents) | List all agents in your personal library | | [Note](block-integrations/basic.md#note) | A visual annotation block that displays a sticky note in the workflow editor for documentation and organization purposes | @@ -84,6 +88,7 @@ Below is a comprehensive list of all available blocks, categorized by their prim | [Store Value](block-integrations/basic.md#store-value) | A basic block that stores and forwards a value throughout workflows, allowing it to be reused without changes across multiple blocks | | [Universal Type Converter](block-integrations/basic.md#universal-type-converter) | This block is used to convert a value to a universal type | | [XML Parser](block-integrations/basic.md#xml-parser) | Parses XML using gravitasml to tokenize and coverts it to dict | +| [Zip Lists](block-integrations/basic.md#zip-lists) | Zips multiple lists together into a list of grouped elements | ## Data Processing diff --git a/docs/integrations/block-integrations/basic.md b/docs/integrations/block-integrations/basic.md index 08def38ede..e032690edc 100644 --- a/docs/integrations/block-integrations/basic.md +++ b/docs/integrations/block-integrations/basic.md @@ -637,7 +637,7 @@ This enables extensibility by allowing custom blocks to be added without modifyi ## Concatenate Lists ### What it is -Concatenates multiple lists into a single list. All elements from all input lists are combined in order. +Concatenates multiple lists into a single list. All elements from all input lists are combined in order. Supports optional deduplication and None removal. ### How it works @@ -651,6 +651,8 @@ The block includes validation to ensure each item is actually a list. If a non-l | Input | Description | Type | Required | |-------|-------------|------|----------| | lists | A list of lists to concatenate together. All lists will be combined in order into a single list. | List[List[Any]] | Yes | +| deduplicate | If True, remove duplicate elements from the concatenated result while preserving order. | bool | No | +| remove_none | If True, remove None values from the concatenated result. | bool | No | ### Outputs @@ -658,6 +660,7 @@ The block includes validation to ensure each item is actually a list. If a non-l |--------|-------------|------| | error | Error message if concatenation failed due to invalid input types. | str | | concatenated_list | The concatenated list containing all elements from all input lists in order. | List[Any] | +| length | The total number of elements in the concatenated list. | int | ### Possible use case @@ -820,6 +823,45 @@ This enables conditional logic based on list membership and helps locate items f --- +## Flatten List + +### What it is +Flattens a nested list structure into a single flat list. Supports configurable maximum flattening depth. + +### How it works + +This block recursively traverses a nested list and extracts all leaf elements into a single flat list. You can control how deep the flattening goes with the max_depth parameter: set it to -1 to flatten completely, or to a positive integer to flatten only that many levels. + +The block also reports the original nesting depth of the input, which is useful for understanding the structure of data coming from sources with varying levels of nesting. + + +### Inputs + +| Input | Description | Type | Required | +|-------|-------------|------|----------| +| nested_list | A potentially nested list to flatten into a single-level list. | List[Any] | Yes | +| max_depth | Maximum depth to flatten. -1 means flatten completely. 1 means flatten only one level. | int | No | + +### Outputs + +| Output | Description | Type | +|--------|-------------|------| +| error | Error message if flattening failed. | str | +| flattened_list | The flattened list with all nested elements extracted. | List[Any] | +| length | The number of elements in the flattened list. | int | +| original_depth | The maximum nesting depth of the original input list. | int | + +### Possible use case + +**Normalizing API Responses**: Flatten nested JSON arrays from different API endpoints into a uniform single-level list for consistent processing. + +**Aggregating Nested Results**: Combine results from recursive file searches or nested category trees into a flat list of items for display or export. + +**Data Pipeline Cleanup**: Simplify deeply nested data structures from multiple transformation steps into a clean flat list before final output. + + +--- + ## Get All Memories ### What it is @@ -1012,6 +1054,120 @@ This enables human oversight at critical points in automated workflows, ensuring --- +## Interleave Lists + +### What it is +Interleaves elements from multiple lists in round-robin fashion, alternating between sources. + +### How it works + +This block takes elements from each input list in round-robin order, picking one element from each list in turn. For example, given `[[1, 2, 3], ['a', 'b', 'c']]`, it produces `[1, 'a', 2, 'b', 3, 'c']`. + +When lists have different lengths, shorter lists stop contributing once exhausted, and remaining elements from longer lists continue to be added in order. + + +### Inputs + +| Input | Description | Type | Required | +|-------|-------------|------|----------| +| lists | A list of lists to interleave. Elements will be taken in round-robin order. | List[List[Any]] | Yes | + +### Outputs + +| Output | Description | Type | +|--------|-------------|------| +| error | Error message if interleaving failed. | str | +| interleaved_list | The interleaved list with elements alternating from each input list. | List[Any] | +| length | The total number of elements in the interleaved list. | int | + +### Possible use case + +**Balanced Content Mixing**: Alternate between content from different sources (e.g., mixing promotional and organic posts) for a balanced feed. + +**Round-Robin Scheduling**: Distribute tasks evenly across workers or queues by interleaving items from separate task lists. + +**Multi-Language Output**: Weave together translated text segments with their original counterparts for side-by-side comparison. + + +--- + +## List Difference + +### What it is +Computes the difference between two lists. Returns elements in the first list not found in the second, or symmetric difference. + +### How it works + +This block compares two lists and returns elements from list_a that do not appear in list_b. It uses hash-based lookup for efficient comparison. When symmetric mode is enabled, it returns elements that are in either list but not in both. + +The order of elements from list_a is preserved in the output, and elements from list_b are appended when using symmetric difference. + + +### Inputs + +| Input | Description | Type | Required | +|-------|-------------|------|----------| +| list_a | The primary list to check elements from. | List[Any] | Yes | +| list_b | The list to subtract. Elements found here will be removed from list_a. | List[Any] | Yes | +| symmetric | If True, compute symmetric difference (elements in either list but not both). | bool | No | + +### Outputs + +| Output | Description | Type | +|--------|-------------|------| +| error | Error message if the operation failed. | str | +| difference | Elements from list_a not found in list_b (or symmetric difference if enabled). | List[Any] | +| length | The number of elements in the difference result. | int | + +### Possible use case + +**Change Detection**: Compare a current list of records against a previous snapshot to find newly added or removed items. + +**Exclusion Filtering**: Remove items from a list that appear in a blocklist or already-processed list to avoid duplicates. + +**Data Sync**: Identify which items exist in one system but not another to determine what needs to be synced. + + +--- + +## List Intersection + +### What it is +Computes the intersection of two lists, returning only elements present in both. + +### How it works + +This block finds elements that appear in both input lists by hashing elements from list_b for efficient lookup, then checking each element of list_a against that set. The output preserves the order from list_a and removes duplicates. + +This is useful for finding common items between two datasets without needing to manually iterate or compare. + + +### Inputs + +| Input | Description | Type | Required | +|-------|-------------|------|----------| +| list_a | The first list to intersect. | List[Any] | Yes | +| list_b | The second list to intersect. | List[Any] | Yes | + +### Outputs + +| Output | Description | Type | +|--------|-------------|------| +| error | Error message if the operation failed. | str | +| intersection | Elements present in both list_a and list_b. | List[Any] | +| length | The number of elements in the intersection. | int | + +### Possible use case + +**Finding Common Tags**: Identify shared tags or categories between two items for recommendation or grouping purposes. + +**Mutual Connections**: Find users or contacts that appear in both of two different lists, such as shared friends or overlapping team members. + +**Feature Comparison**: Determine which features or capabilities are supported by both of two systems or products. + + +--- + ## List Is Empty ### What it is @@ -1452,3 +1608,42 @@ This makes XML data accessible using standard dictionary operations, allowing yo --- + +## Zip Lists + +### What it is +Zips multiple lists together into a list of grouped elements. Supports padding to longest or truncating to shortest. + +### How it works + +This block pairs up corresponding elements from multiple input lists into sub-lists. For example, zipping `[[1, 2, 3], ['a', 'b', 'c']]` produces `[[1, 'a'], [2, 'b'], [3, 'c']]`. + +By default, the result is truncated to the length of the shortest input list. Enable pad_to_longest to instead pad shorter lists with a fill_value so no elements from longer lists are lost. + + +### Inputs + +| Input | Description | Type | Required | +|-------|-------------|------|----------| +| lists | A list of lists to zip together. Corresponding elements will be grouped. | List[List[Any]] | Yes | +| pad_to_longest | If True, pad shorter lists with fill_value to match the longest list. If False, truncate to shortest. | bool | No | +| fill_value | Value to use for padding when pad_to_longest is True. | Fill Value | No | + +### Outputs + +| Output | Description | Type | +|--------|-------------|------| +| error | Error message if zipping failed. | str | +| zipped_list | The zipped list of grouped elements. | List[List[Any]] | +| length | The number of groups in the zipped result. | int | + +### Possible use case + +**Creating Key-Value Pairs**: Combine a list of field names with a list of values to build structured records or dictionaries. + +**Parallel Data Alignment**: Pair up corresponding items from separate data sources (e.g., names and email addresses) for processing together. + +**Table Row Construction**: Group column data into rows by zipping each column's values together for CSV export or display. + + +---