fix(rnd): Guarantee execution ordering per node by waiting the node completion (#7855)

### Background

We don't have an ordering guarantee on the node execution.
Let's say we have a node that has to execute different data A, B, and C.
The current implementation limits the execution to 1 execution at a time, but there is no guarantee that A, B, and C will be executed in order.

The initial implementation did not have any restrictions, so it used to be A, B, and C executed in parallel
In the current implementation with the per-node constraint, it's A, B, C are executed serially but with no guarantee of ordering.

The scope of this PR is to guarantee that order.

### Changes 🏗️

Guaranteeing the execution per node ordering by avoiding any re-enqueue mechanism. If there are two executions run in the same node, the first one will be executed and the other will block. The blocking mechanism is indeed sub-optimal, the performance improvement can be done later (a follow-up issue will be added).
This commit is contained in:
Zamil Majdy
2024-08-21 14:08:18 +02:00
committed by GitHub
parent fa12564954
commit 3d62cec553

View File

@@ -229,9 +229,9 @@ def _enqueue_next_nodes(
idata, msg = validate_exec(next_node, idata)
suffix = f"{next_output_name}>{next_input_name}~{ineid}:{msg}"
if not idata:
logger.warning(f"{prefix} Re-enqueueing skipped: {suffix}")
logger.warning(f"{prefix} Enqueueing static-link skipped: {suffix}")
continue
logger.warning(f"{prefix} Re-enqueued {suffix}")
logger.warning(f"{prefix} Enqueueing static-link execution {suffix}")
enqueued_executions.append(
add_enqueued_execution(iexec.node_exec_id, next_node_id, idata)
)
@@ -373,10 +373,11 @@ class Executor:
# Avoid parallel execution of the same node.
fut = futures.get(execution.node_id)
if fut and not fut.done():
cls.wait_future(fut)
logger.warning(f"{prefix} Re-enqueueing {execution.node_id}")
queue.add(execution)
continue
# TODO (performance improvement):
# Wait for the completion of the same node execution is blocking.
# To improve this we need a separate queue for each node.
# Re-enqueueing the data back to the queue will disrupt the order.
cls.wait_future(fut, timeout=None)
futures[execution.node_id] = cls.executor.submit(
cls.on_node_execution, queue, execution
@@ -395,9 +396,10 @@ class Executor:
logger.exception(f"{prefix} Failed graph execution: {e}")
@classmethod
def wait_future(cls, future: Future):
def wait_future(cls, future: Future, timeout: int | None = 3):
try:
future.result(timeout=3)
if not future.done():
future.result(timeout=timeout)
except TimeoutError:
# Avoid being blocked by long-running node, by not waiting its completion.
pass