Compare commits

..

1 Commits

Author SHA1 Message Date
Nick Tindle
49971439b5 test: verify e2e tests on dev (no-op change) 2026-02-01 15:46:51 -06:00
2 changed files with 23 additions and 60 deletions

View File

@@ -291,18 +291,6 @@ class AsyncRabbitMQ(RabbitMQBase):
exchange, routing_key=queue.routing_key or queue.name
)
async def _ensure_channel(self) -> aio_pika.abc.AbstractChannel:
"""Get a valid channel, reconnecting if the current one is stale."""
if not self.is_ready:
# Reset stale channel so connect() creates a fresh one
self._channel = None
await self.connect()
if self._channel is None:
raise RuntimeError("Channel should be established after connect")
return self._channel
@func_retry
async def publish_message(
self,
@@ -311,58 +299,32 @@ class AsyncRabbitMQ(RabbitMQBase):
exchange: Optional[Exchange] = None,
persistent: bool = True,
) -> None:
try:
channel = await self._ensure_channel()
except Exception:
# Force full reconnect on channel acquisition failure
self._channel = None
self._connection = None
channel = await self._ensure_channel()
try:
if exchange:
exchange_obj = await channel.get_exchange(exchange.name)
else:
exchange_obj = channel.default_exchange
await exchange_obj.publish(
aio_pika.Message(
body=message.encode(),
delivery_mode=(
aio_pika.DeliveryMode.PERSISTENT
if persistent
else aio_pika.DeliveryMode.NOT_PERSISTENT
),
),
routing_key=routing_key,
)
except aio_pika.exceptions.ChannelInvalidStateError:
logger.warning(
"RabbitMQ channel invalid, reconnecting and retrying publish"
)
self._channel = None
self._connection = None
if not self.is_ready:
await self.connect()
if self._channel is None:
raise RuntimeError("Channel should be established after reconnect")
if self._channel is None:
raise RuntimeError("Channel should be established after connect")
if exchange:
exchange_obj = await self._channel.get_exchange(exchange.name)
else:
exchange_obj = self._channel.default_exchange
if exchange:
exchange_obj = await self._channel.get_exchange(exchange.name)
else:
exchange_obj = self._channel.default_exchange
await exchange_obj.publish(
aio_pika.Message(
body=message.encode(),
delivery_mode=(
aio_pika.DeliveryMode.PERSISTENT
if persistent
else aio_pika.DeliveryMode.NOT_PERSISTENT
),
await exchange_obj.publish(
aio_pika.Message(
body=message.encode(),
delivery_mode=(
aio_pika.DeliveryMode.PERSISTENT
if persistent
else aio_pika.DeliveryMode.NOT_PERSISTENT
),
routing_key=routing_key,
)
),
routing_key=routing_key,
)
async def get_channel(self) -> aio_pika.abc.AbstractChannel:
return await self._ensure_channel()
if not self.is_ready:
await self.connect()
if self._channel is None:
raise RuntimeError("Channel should be established after connect")
return self._channel

View File

@@ -124,3 +124,4 @@ test("user can signup with existing email handling", async ({
console.error("❌ Duplicate email handling test failed:", error);
}
});