chore(nodes): better naming for graph validation utils

This commit is contained in:
psychedelicious
2025-02-05 10:03:59 +11:00
parent 2fb35d25dd
commit 8556a2558e

View File

@@ -55,14 +55,14 @@ class Edge(BaseModel):
return f"{self.source.node_id}.{self.source.field} -> {self.destination.node_id}.{self.destination.field}"
def get_output_field(node: BaseInvocation, field: str) -> Any:
def get_output_field_type(node: BaseInvocation, field: str) -> Any:
node_type = type(node)
node_outputs = get_type_hints(node_type.get_output_annotation())
node_output_field = node_outputs.get(field) or None
return node_output_field
def get_input_field(node: BaseInvocation, field: str) -> Any:
def get_input_field_type(node: BaseInvocation, field: str) -> Any:
node_type = type(node)
node_inputs = get_type_hints(node_type)
node_input_field = node_inputs.get(field) or None
@@ -141,10 +141,10 @@ def are_connections_compatible(
"""Determines if a connection between fields of two nodes is compatible."""
# TODO: handle iterators and collectors
from_node_field = get_output_field(from_node, from_field)
to_node_field = get_input_field(to_node, to_field)
from_type = get_output_field_type(from_node, from_field)
to_type = get_input_field_type(to_node, to_field)
return are_connection_types_compatible(from_node_field, to_node_field)
return are_connection_types_compatible(from_type, to_type)
T = TypeVar("T")
@@ -480,11 +480,11 @@ class Graph(BaseModel):
def _is_destination_field_Any(self, edge: Edge) -> bool:
"""Checks if the destination field for an edge is of type typing.Any"""
return get_input_field(self.get_node(edge.destination.node_id), edge.destination.field) == Any
return get_input_field_type(self.get_node(edge.destination.node_id), edge.destination.field) == Any
def _is_destination_field_list_of_Any(self, edge: Edge) -> bool:
"""Checks if the destination field for an edge is of type typing.Any"""
return get_input_field(self.get_node(edge.destination.node_id), edge.destination.field) == list[Any]
return get_input_field_type(self.get_node(edge.destination.node_id), edge.destination.field) == list[Any]
def _validate_edge(self, edge: Edge):
"""Validates that a new edge doesn't create a cycle in the graph"""
@@ -639,16 +639,16 @@ class Graph(BaseModel):
return "Iterator may only have one input edge"
# Get input and output fields (the fields linked to the iterator's input/output)
input_field = get_output_field(self.get_node(inputs[0].node_id), inputs[0].field)
output_fields = [get_input_field(self.get_node(e.node_id), e.field) for e in outputs]
input_field_type = get_output_field_type(self.get_node(inputs[0].node_id), inputs[0].field)
output_field_types = [get_input_field_type(self.get_node(e.node_id), e.field) for e in outputs]
# Input type must be a list
if get_origin(input_field) is not list:
if get_origin(input_field_type) is not list:
return "Iterator input must be a collection"
# Validate that all outputs match the input type
input_field_item_type = get_args(input_field)[0]
if not all((are_connection_types_compatible(input_field_item_type, f) for f in output_fields)):
input_field_item_type = get_args(input_field_type)[0]
if not all((are_connection_types_compatible(input_field_item_type, t) for t in output_field_types)):
return "Iterator outputs must connect to an input with a matching type"
return None
@@ -668,15 +668,17 @@ class Graph(BaseModel):
outputs.append(new_output)
# Get input and output fields (the fields linked to the iterator's input/output)
input_fields = [get_output_field(self.get_node(e.node_id), e.field) for e in inputs]
output_fields = [get_input_field(self.get_node(e.node_id), e.field) for e in outputs]
input_field_types = [get_output_field_type(self.get_node(e.node_id), e.field) for e in inputs]
output_field_types = [get_input_field_type(self.get_node(e.node_id), e.field) for e in outputs]
# Validate that all inputs are derived from or match a single type
input_field_types = {
t
for input_field in input_fields
for t in ([input_field] if get_origin(input_field) is None else get_args(input_field))
if t != NoneType
resolved_type
for input_field_type in input_field_types
for resolved_type in (
[input_field_type] if get_origin(input_field_type) is None else get_args(input_field_type)
)
if resolved_type != NoneType
} # Get unique types
type_tree = nx.DiGraph()
type_tree.add_nodes_from(input_field_types)
@@ -689,15 +691,15 @@ class Graph(BaseModel):
input_root_type = next(t[0] for t in type_degrees if t[1] == 0) # type: ignore
# Verify that all outputs are lists
if not all(is_list_or_contains_list(f) or is_any(f) for f in output_fields):
if not all(is_list_or_contains_list(t) or is_any(t) for t in output_field_types):
return "Collector output must connect to a collection input"
# Verify that all outputs match the input type (are a base class or the same class)
if not all(
is_any(f)
or is_union_subtype(input_root_type, get_args(f)[0])
or issubclass(input_root_type, get_args(f)[0])
for f in output_fields
is_any(t)
or is_union_subtype(input_root_type, get_args(t)[0])
or issubclass(input_root_type, get_args(t)[0])
for t in output_field_types
):
return "Collector outputs must connect to a collection input with a matching type"