fix(app): incorrect node mappings when preparing collect nodes

The previous logic had a subtle python bug related the scope and nested
generators.

Python generators are lazily evaluated - the expressions are stored and
only evaluated when needed (e.g. calling next() or list() on them)

The old logic used a variable `s`, which was continually overwritten as
the generator expressions were created. As a result, the final mappings
all use the _final_ value for `s`.

Following the consequences of this down the line, we find that collect
nodes can end up with multiple edges from exactly one of their ancestor
nodes, instead of one edge from each ancestor. Notably, it's only the
source _node_id_ that is affected - the source _fields_ have the correct
values.

So the invalid edges will point to a real node and a real field, but the
field exists on a different node.

---

This can result in a number of cryptic problems - include an error about
incompatible field types:

```
InvalidEdgeError: Field types are incompatible
(31758fd5-14a8-4de7-a840-b73ec1a1b94f.value ->
3459c793-41a2-4d82-9204-7df2d6d099ba.item)
```

Here are the conditions that lead to this error:
- The collect node has at least two incoming connections.
- The two incoming connections come from nodes of different types.
- The nodes both output a value of the same type, but the name of the
output field differs between them.

---

This commit uses non-generator logic to build up the mappings, avoiding
the issue entirely. As a bonus, it is much easier to read.
This commit is contained in:
psychedelicious
2025-06-30 18:49:33 +10:00
parent c02be4bdf4
commit fda86ae981

View File

@@ -1007,10 +1007,11 @@ class GraphExecutionState(BaseModel):
new_node_ids = []
if isinstance(next_node, CollectInvocation):
# Collapse all iterator input mappings and create a single execution node for the collect invocation
all_iteration_mappings = list(
itertools.chain(*(((s, p) for p in self.source_prepared_mapping[s]) for s in next_node_parents))
)
# all_iteration_mappings = list(set(itertools.chain(*prepared_parent_mappings)))
all_iteration_mappings = []
for source_node_id in next_node_parents:
prepared_nodes = self.source_prepared_mapping[source_node_id]
all_iteration_mappings.extend([(source_node_id, p) for p in prepared_nodes])
create_results = self._create_execution_node(next_node_id, all_iteration_mappings)
if create_results is not None:
new_node_ids.extend(create_results)