diff --git a/autogpt_platform/backend/backend/copilot/executor/manager.py b/autogpt_platform/backend/backend/copilot/executor/manager.py index ce0c11d4de..2c3ee251af 100644 --- a/autogpt_platform/backend/backend/copilot/executor/manager.py +++ b/autogpt_platform/backend/backend/copilot/executor/manager.py @@ -190,6 +190,13 @@ class CoPilotExecutor(AppProcess): if not self.cancel_client.is_ready: self.cancel_client.disconnect() self.cancel_client.connect() + + # Check again after connect - shutdown may have been requested + if self.stop_consuming.is_set() and not self.active_tasks: + logger.info("Stop consuming requested during reconnect - disconnecting") + self.cancel_client.disconnect() + return + cancel_channel = self.cancel_client.get_channel() cancel_channel.basic_consume( queue=COPILOT_CANCEL_QUEUE_NAME, @@ -212,6 +219,13 @@ class CoPilotExecutor(AppProcess): if not self.run_client.is_ready: self.run_client.disconnect() self.run_client.connect() + + # Check again after connect - shutdown may have been requested + if self.stop_consuming.is_set() and not self.active_tasks: + logger.info("Stop consuming requested during reconnect - disconnecting") + self.run_client.disconnect() + return + run_channel = self.run_client.get_channel() run_channel.basic_qos(prefetch_count=self.pool_size) @@ -299,16 +313,18 @@ class CoPilotExecutor(AppProcess): task_id = entry.task_id - # Check for local duplicate + # Check for local duplicate - task is already running on this executor if task_id in self.active_tasks: - logger.warning(f"Task {task_id} already running locally") - ack_message(reject=True, requeue=True) + logger.warning( + f"Task {task_id} already running locally, rejecting duplicate" + ) + ack_message(reject=True, requeue=False) return # Try to acquire cluster-wide lock cluster_lock = ClusterLock( redis=redis.get_redis(), - key=f"copilot_lock:{task_id}", + key=f"copilot:task:{task_id}:lock", owner_id=self.executor_id, timeout=settings.config.cluster_lock_timeout, ) @@ -352,7 +368,10 @@ class CoPilotExecutor(AppProcess): try: if exec_error := f.exception(): logger.error(f"Execution for {task_id} failed: {exec_error}") - ack_message(reject=True, requeue=True) + # Don't requeue failed tasks - they've been marked as failed + # in the stream registry. Requeuing would cause infinite retries + # for deterministic failures. + ack_message(reject=True, requeue=False) else: ack_message(reject=False, requeue=False) except BaseException as e: @@ -402,9 +421,8 @@ class CoPilotExecutor(AppProcess): channel = client.get_channel() channel.connection.add_callback_threadsafe(lambda: channel.stop_consuming()) - try: - thread.join(timeout=300) - except TimeoutError: + thread.join(timeout=300) + if thread.is_alive(): logger.error( f"{prefix} Thread did not finish in time, forcing disconnect" ) diff --git a/autogpt_platform/backend/backend/copilot/executor/processor.py b/autogpt_platform/backend/backend/copilot/executor/processor.py index f8040df403..5da9077341 100644 --- a/autogpt_platform/backend/backend/copilot/executor/processor.py +++ b/autogpt_platform/backend/backend/copilot/executor/processor.py @@ -151,11 +151,8 @@ class CoPilotProcessor: except Exception as e: elapsed = time.monotonic() - start_time log.error(f"Execution failed after {elapsed:.2f}s: {e}") - # Ensure task is marked as failed in stream registry - asyncio.run_coroutine_threadsafe( - self._mark_task_failed(entry.task_id, str(e)), - self.execution_loop, - ).result(timeout=10.0) + # Note: _execute_async already marks the task as failed before re-raising, + # so we don't call _mark_task_failed here to avoid duplicate error events. raise async def _execute_async( diff --git a/autogpt_platform/backend/backend/copilot/executor/utils.py b/autogpt_platform/backend/backend/copilot/executor/utils.py index 473a1ff80a..60d9cb22bf 100644 --- a/autogpt_platform/backend/backend/copilot/executor/utils.py +++ b/autogpt_platform/backend/backend/copilot/executor/utils.py @@ -76,7 +76,7 @@ COPILOT_CANCEL_EXCHANGE = Exchange( name="copilot_cancel", type=ExchangeType.FANOUT, durable=True, - auto_delete=True, + auto_delete=False, ) COPILOT_CANCEL_QUEUE_NAME = "copilot_cancel_queue"