fix(backend): Avoid multithreaded pika access (#9832)

### Changes 🏗️

Avoid other threads accessing the channel within the same process.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Manual agent runs
This commit is contained in:
Zamil Majdy
2025-04-17 00:06:07 +02:00
committed by GitHub
parent 9a7a838418
commit e3846c22bd
3 changed files with 14 additions and 6 deletions

View File

@@ -25,7 +25,7 @@ def thread_cached(
cache = getattr(thread_local, "cache", None)
if cache is None:
cache = thread_local.cache = {}
key = (func, args, tuple(sorted(kwargs.items())))
key = (args, tuple(sorted(kwargs.items())))
if key not in cache:
cache[key] = await cast(Callable[P, Awaitable[R]], func)(
*args, **kwargs
@@ -40,7 +40,7 @@ def thread_cached(
if cache is None:
cache = thread_local.cache = {}
# Include function in the key to prevent collisions between different functions
key = (func, args, tuple(sorted(kwargs.items())))
key = (args, tuple(sorted(kwargs.items())))
if key not in cache:
cache[key] = func(*args, **kwargs)
return cache[key]

View File

@@ -1015,9 +1015,13 @@ class ExecutionManager(AppProcess):
logger.error(
f"[{self.service_name}] Execution for {graph_exec_id} failed: {f.exception()}"
)
channel.basic_nack(delivery_tag, requeue=False)
channel.connection.add_callback_threadsafe(
lambda: channel.basic_nack(delivery_tag, requeue=False)
)
else:
channel.basic_ack(delivery_tag)
channel.connection.add_callback_threadsafe(
lambda: channel.basic_ack(delivery_tag)
)
except Exception as e:
logger.error(f"[{self.service_name}] Error acknowledging message: {e}")

View File

@@ -612,8 +612,12 @@ async def execute_graph(
user_id=user_id,
preset_id=preset_id,
)
execution_utils.get_execution_event_bus().publish(graph_exec)
execution_utils.get_execution_queue().publish_message(
bus = execution_event_bus()
await bus.publish(graph_exec)
queue = await execution_queue_client()
await queue.publish_message(
routing_key=execution_utils.GRAPH_EXECUTION_ROUTING_KEY,
message=graph_exec.to_graph_execution_entry().model_dump_json(),
exchange=execution_utils.GRAPH_EXECUTION_EXCHANGE,