address comments

This commit is contained in:
Reinier van der Leer
2026-02-12 15:10:11 +01:00
parent e523eb62b5
commit 6e2f595c7d
3 changed files with 29 additions and 14 deletions

View File

@@ -190,6 +190,13 @@ class CoPilotExecutor(AppProcess):
if not self.cancel_client.is_ready:
self.cancel_client.disconnect()
self.cancel_client.connect()
# Check again after connect - shutdown may have been requested
if self.stop_consuming.is_set() and not self.active_tasks:
logger.info("Stop consuming requested during reconnect - disconnecting")
self.cancel_client.disconnect()
return
cancel_channel = self.cancel_client.get_channel()
cancel_channel.basic_consume(
queue=COPILOT_CANCEL_QUEUE_NAME,
@@ -212,6 +219,13 @@ class CoPilotExecutor(AppProcess):
if not self.run_client.is_ready:
self.run_client.disconnect()
self.run_client.connect()
# Check again after connect - shutdown may have been requested
if self.stop_consuming.is_set() and not self.active_tasks:
logger.info("Stop consuming requested during reconnect - disconnecting")
self.run_client.disconnect()
return
run_channel = self.run_client.get_channel()
run_channel.basic_qos(prefetch_count=self.pool_size)
@@ -299,16 +313,18 @@ class CoPilotExecutor(AppProcess):
task_id = entry.task_id
# Check for local duplicate
# Check for local duplicate - task is already running on this executor
if task_id in self.active_tasks:
logger.warning(f"Task {task_id} already running locally")
ack_message(reject=True, requeue=True)
logger.warning(
f"Task {task_id} already running locally, rejecting duplicate"
)
ack_message(reject=True, requeue=False)
return
# Try to acquire cluster-wide lock
cluster_lock = ClusterLock(
redis=redis.get_redis(),
key=f"copilot_lock:{task_id}",
key=f"copilot:task:{task_id}:lock",
owner_id=self.executor_id,
timeout=settings.config.cluster_lock_timeout,
)
@@ -352,7 +368,10 @@ class CoPilotExecutor(AppProcess):
try:
if exec_error := f.exception():
logger.error(f"Execution for {task_id} failed: {exec_error}")
ack_message(reject=True, requeue=True)
# Don't requeue failed tasks - they've been marked as failed
# in the stream registry. Requeuing would cause infinite retries
# for deterministic failures.
ack_message(reject=True, requeue=False)
else:
ack_message(reject=False, requeue=False)
except BaseException as e:
@@ -402,9 +421,8 @@ class CoPilotExecutor(AppProcess):
channel = client.get_channel()
channel.connection.add_callback_threadsafe(lambda: channel.stop_consuming())
try:
thread.join(timeout=300)
except TimeoutError:
thread.join(timeout=300)
if thread.is_alive():
logger.error(
f"{prefix} Thread did not finish in time, forcing disconnect"
)

View File

@@ -151,11 +151,8 @@ class CoPilotProcessor:
except Exception as e:
elapsed = time.monotonic() - start_time
log.error(f"Execution failed after {elapsed:.2f}s: {e}")
# Ensure task is marked as failed in stream registry
asyncio.run_coroutine_threadsafe(
self._mark_task_failed(entry.task_id, str(e)),
self.execution_loop,
).result(timeout=10.0)
# Note: _execute_async already marks the task as failed before re-raising,
# so we don't call _mark_task_failed here to avoid duplicate error events.
raise
async def _execute_async(

View File

@@ -76,7 +76,7 @@ COPILOT_CANCEL_EXCHANGE = Exchange(
name="copilot_cancel",
type=ExchangeType.FANOUT,
durable=True,
auto_delete=True,
auto_delete=False,
)
COPILOT_CANCEL_QUEUE_NAME = "copilot_cancel_queue"