mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-08 13:55:06 -05:00
Compare commits
1 Commits
fix/execut
...
claude/res
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
938dff9b99 |
198
STANDALONE_MODE.md
Normal file
198
STANDALONE_MODE.md
Normal file
@@ -0,0 +1,198 @@
|
||||
# Standalone Mode
|
||||
|
||||
This document describes how to run the AutoGPT platform in **standalone mode** without external dependencies like Redis, RabbitMQ, and Supabase.
|
||||
|
||||
## Overview
|
||||
|
||||
Standalone mode uses in-memory implementations for:
|
||||
- **Redis**: In-memory key-value store and pub/sub messaging
|
||||
- **RabbitMQ**: In-memory message queues and task distribution
|
||||
- **Event Bus**: In-memory event distribution system
|
||||
|
||||
This is useful for:
|
||||
- Development and testing without Docker
|
||||
- Running in restricted environments
|
||||
- Quick local demos
|
||||
- CI/CD pipelines without external services
|
||||
|
||||
## Limitations
|
||||
|
||||
Standalone mode has these limitations:
|
||||
- **No persistence**: All data is stored in memory and lost on restart
|
||||
- **Single process**: Cannot scale across multiple workers/servers
|
||||
- **No distributed locking**: Cluster coordination features are disabled
|
||||
- **Database still required**: You still need PostgreSQL or need to configure SQLite
|
||||
- **Authentication may need adjustment**: Supabase authentication won't work
|
||||
|
||||
## How to Enable
|
||||
|
||||
### Method 1: Environment Variable
|
||||
|
||||
Set the `STANDALONE_MODE` environment variable:
|
||||
|
||||
```bash
|
||||
export STANDALONE_MODE=true
|
||||
```
|
||||
|
||||
### Method 2: Configuration File
|
||||
|
||||
1. Copy the standalone environment template:
|
||||
```bash
|
||||
cd autogpt_platform/backend
|
||||
cp .env.standalone .env
|
||||
```
|
||||
|
||||
2. Edit `.env` and adjust any settings as needed
|
||||
|
||||
3. Add to the config (via environment or config.json):
|
||||
```json
|
||||
{
|
||||
"standalone_mode": true
|
||||
}
|
||||
```
|
||||
|
||||
### Method 3: Programmatic
|
||||
|
||||
In your code, you can check/set standalone mode:
|
||||
|
||||
```python
|
||||
from backend.util.settings import Settings
|
||||
|
||||
settings = Settings()
|
||||
settings.config.standalone_mode = True
|
||||
```
|
||||
|
||||
## Running the Platform
|
||||
|
||||
Once standalone mode is enabled:
|
||||
|
||||
```bash
|
||||
cd autogpt_platform/backend
|
||||
|
||||
# Install dependencies
|
||||
poetry install
|
||||
|
||||
# Run migrations (if using SQLite, update schema first)
|
||||
poetry run prisma migrate deploy
|
||||
|
||||
# Start the backend server
|
||||
poetry run python -m backend.app
|
||||
```
|
||||
|
||||
The platform will automatically use in-memory implementations when it detects:
|
||||
- `STANDALONE_MODE=true` environment variable, OR
|
||||
- `standalone_mode: true` in config settings
|
||||
|
||||
## Implementation Details
|
||||
|
||||
### Redis Client (`backend/data/redis_client.py`)
|
||||
|
||||
When standalone mode is active:
|
||||
- Returns `InMemoryRedis()` instead of connecting to Redis
|
||||
- Stores data in a Python dictionary (singleton)
|
||||
- Supports basic operations: get, set, delete, incr, keys, etc.
|
||||
- Pub/sub returns empty generators (use event bus instead)
|
||||
|
||||
### Event Bus (`backend/data/event_bus.py`)
|
||||
|
||||
When standalone mode is active:
|
||||
- `RedisEventBus` uses `InMemorySyncEventBus`
|
||||
- `AsyncRedisEventBus` uses `InMemoryAsyncEventBus`
|
||||
- Events are distributed via Python queues
|
||||
- Supports pattern subscriptions with wildcards
|
||||
|
||||
### RabbitMQ (`backend/data/rabbitmq.py`)
|
||||
|
||||
When standalone mode is active:
|
||||
- `SyncRabbitMQ` uses `InMemorySyncRabbitMQ`
|
||||
- `AsyncRabbitMQ` uses `InMemoryAsyncRabbitMQ`
|
||||
- Messages are routed via Python queues
|
||||
- Supports exchanges, queues, and routing keys
|
||||
- Implements fanout, direct, and topic exchange types
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### "Connection refused" errors
|
||||
|
||||
If you still see connection errors:
|
||||
1. Check that `STANDALONE_MODE=true` is set
|
||||
2. Verify the setting with: `python -c "from backend.util.settings import Settings; print(Settings().config.standalone_mode)"`
|
||||
3. Make sure you're not explicitly connecting to Redis/RabbitMQ elsewhere
|
||||
|
||||
### Database connection errors
|
||||
|
||||
Standalone mode does NOT replace the database. You need either:
|
||||
- PostgreSQL running locally
|
||||
- SQLite (requires schema changes)
|
||||
- A remote database connection
|
||||
|
||||
### Authentication errors
|
||||
|
||||
Supabase authentication won't work in standalone mode. Options:
|
||||
1. Disable authentication: `ENABLE_AUTH=false` in `.env`
|
||||
2. Implement a mock auth provider
|
||||
3. Use a different auth method
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────┐
|
||||
│ Standalone Mode │
|
||||
├─────────────────────────────────────────┤
|
||||
│ │
|
||||
│ ┌──────────────────────────────────┐ │
|
||||
│ │ InMemoryRedis (Singleton) │ │
|
||||
│ │ - Key-value store (dict) │ │
|
||||
│ │ - TTL not implemented │ │
|
||||
│ └──────────────────────────────────┘ │
|
||||
│ │
|
||||
│ ┌──────────────────────────────────┐ │
|
||||
│ │ InMemoryEventBus (Singleton) │ │
|
||||
│ │ - Pub/sub via queues │ │
|
||||
│ │ - Pattern matching support │ │
|
||||
│ └──────────────────────────────────┘ │
|
||||
│ │
|
||||
│ ┌──────────────────────────────────┐ │
|
||||
│ │ InMemoryMessageBroker │ │
|
||||
│ │ - Exchanges & queues │ │
|
||||
│ │ - Routing key matching │ │
|
||||
│ │ - Fanout/direct/topic types │ │
|
||||
│ └──────────────────────────────────┘ │
|
||||
│ │
|
||||
└─────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
## Testing
|
||||
|
||||
To test standalone mode:
|
||||
|
||||
```bash
|
||||
# Set standalone mode
|
||||
export STANDALONE_MODE=true
|
||||
|
||||
# Run tests
|
||||
cd autogpt_platform/backend
|
||||
poetry run pytest
|
||||
|
||||
# Or run a specific service
|
||||
poetry run python -m backend.rest
|
||||
```
|
||||
|
||||
## Production Use
|
||||
|
||||
**⚠️ Warning**: Standalone mode is NOT recommended for production use because:
|
||||
- No data persistence
|
||||
- No horizontal scaling
|
||||
- No failover or redundancy
|
||||
- Limited to single-process execution
|
||||
|
||||
For production, use the full Docker setup with real Redis, RabbitMQ, and PostgreSQL.
|
||||
|
||||
## Contributing
|
||||
|
||||
The standalone mode implementations are in:
|
||||
- `backend/data/inmemory_redis.py` - In-memory Redis client
|
||||
- `backend/data/inmemory_event_bus.py` - In-memory event bus
|
||||
- `backend/data/inmemory_queue.py` - In-memory RabbitMQ
|
||||
|
||||
Feel free to improve these implementations or add missing features.
|
||||
34
autogpt_platform/backend/.env.standalone
Normal file
34
autogpt_platform/backend/.env.standalone
Normal file
@@ -0,0 +1,34 @@
|
||||
# Standalone Mode Configuration
|
||||
# This configuration allows running the AutoGPT platform without external dependencies
|
||||
# Copy this to .env to use standalone mode
|
||||
|
||||
# Enable standalone mode (uses in-memory implementations)
|
||||
STANDALONE_MODE=true
|
||||
|
||||
# Database - Use SQLite for standalone mode (if supported by Prisma schema)
|
||||
# Note: You may need to adjust the Prisma schema to support SQLite
|
||||
DATABASE_URL=file:./dev.db
|
||||
|
||||
# Disable external services
|
||||
REDIS_HOST=localhost
|
||||
REDIS_PORT=6379
|
||||
RABBITMQ_HOST=localhost
|
||||
RABBITMQ_PORT=5672
|
||||
|
||||
# Disable authentication for standalone mode (optional - for easier testing)
|
||||
# Uncomment the line below to disable auth
|
||||
# ENABLE_AUTH=false
|
||||
|
||||
# Disable virus scanning for standalone mode
|
||||
CLAMAV_SERVICE_ENABLED=false
|
||||
|
||||
# Minimal secrets (not used in standalone mode but required by config)
|
||||
SUPABASE_URL=http://localhost:8000
|
||||
SUPABASE_SERVICE_ROLE_KEY=dummy_key_for_standalone
|
||||
RABBITMQ_DEFAULT_USER=guest
|
||||
RABBITMQ_DEFAULT_PASS=guest
|
||||
ENCRYPTION_KEY=dummy_encryption_key_for_standalone_mode_32chars
|
||||
|
||||
# Optional: Configure platform URLs for local testing
|
||||
PLATFORM_BASE_URL=http://localhost:8006
|
||||
FRONTEND_BASE_URL=http://localhost:3000
|
||||
@@ -1,5 +1,6 @@
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, AsyncGenerator, Generator, Generic, Optional, TypeVar
|
||||
|
||||
@@ -14,6 +15,9 @@ from backend.util.settings import Settings
|
||||
logger = logging.getLogger(__name__)
|
||||
config = Settings().config
|
||||
|
||||
# Check if we should use in-memory implementations
|
||||
USE_IN_MEMORY = config.standalone_mode or os.getenv("STANDALONE_MODE", "").lower() in ("true", "1", "yes")
|
||||
|
||||
|
||||
M = TypeVar("M", bound=BaseModel)
|
||||
|
||||
@@ -98,15 +102,36 @@ class _EventPayloadWrapper(BaseModel, Generic[M]):
|
||||
|
||||
|
||||
class RedisEventBus(BaseRedisEventBus[M], ABC):
|
||||
def __init__(self):
|
||||
if USE_IN_MEMORY:
|
||||
from backend.data.inmemory_event_bus import InMemorySyncEventBus
|
||||
# Copy over the abstract properties to the in-memory implementation
|
||||
self._in_memory_bus = type(
|
||||
self.__class__.__name__,
|
||||
(InMemorySyncEventBus,),
|
||||
{"event_bus_name": property(lambda s: self.event_bus_name), "Model": self.Model}
|
||||
)()
|
||||
logger.info(f"Using in-memory event bus for {self.event_bus_name} (standalone mode)")
|
||||
else:
|
||||
self._in_memory_bus = None
|
||||
|
||||
@property
|
||||
def connection(self) -> redis.Redis:
|
||||
return redis.get_redis()
|
||||
|
||||
def publish_event(self, event: M, channel_key: str):
|
||||
if self._in_memory_bus:
|
||||
self._in_memory_bus.publish_event(event, channel_key)
|
||||
return
|
||||
|
||||
message, full_channel_name = self._serialize_message(event, channel_key)
|
||||
self.connection.publish(full_channel_name, message)
|
||||
|
||||
def listen_events(self, channel_key: str) -> Generator[M, None, None]:
|
||||
if self._in_memory_bus:
|
||||
yield from self._in_memory_bus.listen_events(channel_key)
|
||||
return
|
||||
|
||||
pubsub, full_channel_name = self._get_pubsub_channel(
|
||||
self.connection, channel_key
|
||||
)
|
||||
@@ -123,16 +148,38 @@ class RedisEventBus(BaseRedisEventBus[M], ABC):
|
||||
|
||||
|
||||
class AsyncRedisEventBus(BaseRedisEventBus[M], ABC):
|
||||
def __init__(self):
|
||||
if USE_IN_MEMORY:
|
||||
from backend.data.inmemory_event_bus import InMemoryAsyncEventBus
|
||||
# Copy over the abstract properties to the in-memory implementation
|
||||
self._in_memory_bus = type(
|
||||
self.__class__.__name__,
|
||||
(InMemoryAsyncEventBus,),
|
||||
{"event_bus_name": property(lambda s: self.event_bus_name), "Model": self.Model}
|
||||
)()
|
||||
logger.info(f"Using in-memory async event bus for {self.event_bus_name} (standalone mode)")
|
||||
else:
|
||||
self._in_memory_bus = None
|
||||
|
||||
@property
|
||||
async def connection(self) -> redis.AsyncRedis:
|
||||
return await redis.get_redis_async()
|
||||
|
||||
async def publish_event(self, event: M, channel_key: str):
|
||||
if self._in_memory_bus:
|
||||
await self._in_memory_bus.publish_event(event, channel_key)
|
||||
return
|
||||
|
||||
message, full_channel_name = self._serialize_message(event, channel_key)
|
||||
connection = await self.connection
|
||||
await connection.publish(full_channel_name, message)
|
||||
|
||||
async def listen_events(self, channel_key: str) -> AsyncGenerator[M, None]:
|
||||
if self._in_memory_bus:
|
||||
async for event in self._in_memory_bus.listen_events(channel_key):
|
||||
yield event
|
||||
return
|
||||
|
||||
pubsub, full_channel_name = self._get_pubsub_channel(
|
||||
await self.connection, channel_key
|
||||
)
|
||||
@@ -150,6 +197,9 @@ class AsyncRedisEventBus(BaseRedisEventBus[M], ABC):
|
||||
async def wait_for_event(
|
||||
self, channel_key: str, timeout: Optional[float] = None
|
||||
) -> M | None:
|
||||
if self._in_memory_bus:
|
||||
return await self._in_memory_bus.wait_for_event(channel_key, timeout)
|
||||
|
||||
try:
|
||||
return await asyncio.wait_for(
|
||||
anext(aiter(self.listen_events(channel_key))), timeout
|
||||
|
||||
257
autogpt_platform/backend/backend/data/inmemory_event_bus.py
Normal file
257
autogpt_platform/backend/backend/data/inmemory_event_bus.py
Normal file
@@ -0,0 +1,257 @@
|
||||
"""
|
||||
In-memory event bus implementation as a fallback for Redis.
|
||||
This allows running the platform without an external Redis server.
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
from abc import ABC
|
||||
from collections import defaultdict
|
||||
from typing import Any, AsyncGenerator, Generator, Generic
|
||||
from queue import Queue, Empty
|
||||
from threading import Lock
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from backend.util import json
|
||||
from backend.util.settings import Settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
config = Settings().config
|
||||
|
||||
# Type variable for message model
|
||||
M = type(BaseModel)
|
||||
|
||||
|
||||
class _EventPayloadWrapper(BaseModel, Generic[M]):
|
||||
"""Wrapper model to allow message payloads"""
|
||||
payload: M
|
||||
|
||||
|
||||
class InMemoryEventBus:
|
||||
"""
|
||||
Singleton in-memory event bus that stores all subscribers and messages.
|
||||
Thread-safe implementation using locks and queues.
|
||||
"""
|
||||
_instance = None
|
||||
_lock = Lock()
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
with cls._lock:
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._subscribers = defaultdict(list)
|
||||
cls._instance._async_subscribers = defaultdict(list)
|
||||
cls._instance._subscriber_lock = Lock()
|
||||
return cls._instance
|
||||
|
||||
def publish(self, channel: str, message: str):
|
||||
"""Publish a message to all subscribers of a channel"""
|
||||
with self._subscriber_lock:
|
||||
# Handle pattern subscriptions (channels with *)
|
||||
for pattern, queues in self._subscribers.items():
|
||||
if self._match_pattern(pattern, channel):
|
||||
for queue in queues:
|
||||
try:
|
||||
queue.put({
|
||||
'type': 'pmessage' if '*' in pattern else 'message',
|
||||
'channel': channel,
|
||||
'data': message
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to publish to queue: {e}")
|
||||
|
||||
async def publish_async(self, channel: str, message: str):
|
||||
"""Async version of publish"""
|
||||
with self._subscriber_lock:
|
||||
# Notify async subscribers
|
||||
for pattern, queues in self._async_subscribers.items():
|
||||
if self._match_pattern(pattern, channel):
|
||||
for queue in queues:
|
||||
try:
|
||||
await queue.put({
|
||||
'type': 'pmessage' if '*' in pattern else 'message',
|
||||
'channel': channel,
|
||||
'data': message
|
||||
})
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to publish async to queue: {e}")
|
||||
|
||||
def subscribe(self, channel: str) -> Queue:
|
||||
"""Subscribe to a channel and return a queue for receiving messages"""
|
||||
queue = Queue()
|
||||
with self._subscriber_lock:
|
||||
self._subscribers[channel].append(queue)
|
||||
return queue
|
||||
|
||||
async def subscribe_async(self, channel: str) -> asyncio.Queue:
|
||||
"""Async version of subscribe"""
|
||||
queue = asyncio.Queue()
|
||||
with self._subscriber_lock:
|
||||
self._async_subscribers[channel].append(queue)
|
||||
return queue
|
||||
|
||||
def unsubscribe(self, channel: str, queue: Queue):
|
||||
"""Unsubscribe a queue from a channel"""
|
||||
with self._subscriber_lock:
|
||||
if channel in self._subscribers:
|
||||
try:
|
||||
self._subscribers[channel].remove(queue)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
async def unsubscribe_async(self, channel: str, queue: asyncio.Queue):
|
||||
"""Async version of unsubscribe"""
|
||||
with self._subscriber_lock:
|
||||
if channel in self._async_subscribers:
|
||||
try:
|
||||
self._async_subscribers[channel].remove(queue)
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def _match_pattern(pattern: str, channel: str) -> bool:
|
||||
"""Match a channel against a pattern (supports * wildcard)"""
|
||||
if '*' not in pattern:
|
||||
return pattern == channel
|
||||
|
||||
# Simple pattern matching for Redis-style patterns
|
||||
pattern_parts = pattern.split('*')
|
||||
if len(pattern_parts) == 2:
|
||||
prefix, suffix = pattern_parts
|
||||
return channel.startswith(prefix) and channel.endswith(suffix)
|
||||
|
||||
# More complex patterns - just check prefix for now
|
||||
return channel.startswith(pattern_parts[0])
|
||||
|
||||
|
||||
class BaseInMemoryEventBus(Generic[M], ABC):
|
||||
"""Base class for in-memory event bus implementations"""
|
||||
Model: type[M]
|
||||
|
||||
@property
|
||||
def event_bus_name(self) -> str:
|
||||
"""Override this in subclasses"""
|
||||
return "events"
|
||||
|
||||
@property
|
||||
def Message(self) -> type[_EventPayloadWrapper[M]]:
|
||||
return _EventPayloadWrapper[self.Model]
|
||||
|
||||
def _serialize_message(self, item: M, channel_key: str) -> tuple[str, str]:
|
||||
"""Serialize a message for publishing"""
|
||||
MAX_MESSAGE_SIZE = config.max_message_size_limit
|
||||
|
||||
try:
|
||||
message = json.dumps(
|
||||
self.Message(payload=item), ensure_ascii=False, separators=(",", ":")
|
||||
)
|
||||
except UnicodeError:
|
||||
message = json.dumps(
|
||||
self.Message(payload=item), ensure_ascii=True, separators=(",", ":")
|
||||
)
|
||||
logger.warning(
|
||||
f"Unicode serialization failed, falling back to ASCII for channel {channel_key}"
|
||||
)
|
||||
|
||||
# Check message size and truncate if necessary
|
||||
message_size = len(message.encode("utf-8"))
|
||||
if message_size > MAX_MESSAGE_SIZE:
|
||||
logger.warning(
|
||||
f"Message size {message_size} bytes exceeds limit {MAX_MESSAGE_SIZE} bytes for channel {channel_key}. "
|
||||
"Truncating payload."
|
||||
)
|
||||
error_payload = {
|
||||
"payload": {
|
||||
"event_type": "error_comms_update",
|
||||
"error": "Payload too large for transmission",
|
||||
"original_size_bytes": message_size,
|
||||
"max_size_bytes": MAX_MESSAGE_SIZE,
|
||||
}
|
||||
}
|
||||
message = json.dumps(
|
||||
error_payload, ensure_ascii=False, separators=(",", ":")
|
||||
)
|
||||
|
||||
channel_name = f"{self.event_bus_name}/{channel_key}"
|
||||
logger.debug(f"[{channel_name}] Publishing event: {message[:100]}...")
|
||||
return message, channel_name
|
||||
|
||||
def _deserialize_message(self, msg: dict, channel_key: str) -> M | None:
|
||||
"""Deserialize a message from the queue"""
|
||||
message_type = "pmessage" if "*" in channel_key else "message"
|
||||
if msg["type"] != message_type:
|
||||
return None
|
||||
try:
|
||||
logger.debug(f"[{channel_key}] Consuming event: {msg['data'][:100]}...")
|
||||
return self.Message.model_validate_json(msg["data"]).payload
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to parse event from queue {msg}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
class InMemorySyncEventBus(BaseInMemoryEventBus[M], ABC):
|
||||
"""Synchronous in-memory event bus"""
|
||||
|
||||
def __init__(self):
|
||||
self._bus = InMemoryEventBus()
|
||||
|
||||
def publish_event(self, event: M, channel_key: str):
|
||||
"""Publish an event to a channel"""
|
||||
message, full_channel_name = self._serialize_message(event, channel_key)
|
||||
self._bus.publish(full_channel_name, message)
|
||||
|
||||
def listen_events(self, channel_key: str) -> Generator[M, None, None]:
|
||||
"""Listen for events on a channel"""
|
||||
full_channel_name = f"{self.event_bus_name}/{channel_key}"
|
||||
queue = self._bus.subscribe(full_channel_name)
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
message = queue.get(timeout=1.0)
|
||||
if event := self._deserialize_message(message, channel_key):
|
||||
yield event
|
||||
except Empty:
|
||||
continue
|
||||
finally:
|
||||
self._bus.unsubscribe(full_channel_name, queue)
|
||||
|
||||
|
||||
class InMemoryAsyncEventBus(BaseInMemoryEventBus[M], ABC):
|
||||
"""Asynchronous in-memory event bus"""
|
||||
|
||||
def __init__(self):
|
||||
self._bus = InMemoryEventBus()
|
||||
|
||||
async def publish_event(self, event: M, channel_key: str):
|
||||
"""Publish an event to a channel"""
|
||||
message, full_channel_name = self._serialize_message(event, channel_key)
|
||||
await self._bus.publish_async(full_channel_name, message)
|
||||
|
||||
async def listen_events(self, channel_key: str) -> AsyncGenerator[M, None]:
|
||||
"""Listen for events on a channel"""
|
||||
full_channel_name = f"{self.event_bus_name}/{channel_key}"
|
||||
queue = await self._bus.subscribe_async(full_channel_name)
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
message = await asyncio.wait_for(queue.get(), timeout=1.0)
|
||||
if event := self._deserialize_message(message, channel_key):
|
||||
yield event
|
||||
except asyncio.TimeoutError:
|
||||
continue
|
||||
finally:
|
||||
await self._bus.unsubscribe_async(full_channel_name, queue)
|
||||
|
||||
async def wait_for_event(
|
||||
self, channel_key: str, timeout: float | None = None
|
||||
) -> M | None:
|
||||
"""Wait for a single event with optional timeout"""
|
||||
try:
|
||||
return await asyncio.wait_for(
|
||||
anext(aiter(self.listen_events(channel_key))), timeout
|
||||
)
|
||||
except (TimeoutError, asyncio.TimeoutError):
|
||||
return None
|
||||
374
autogpt_platform/backend/backend/data/inmemory_queue.py
Normal file
374
autogpt_platform/backend/backend/data/inmemory_queue.py
Normal file
@@ -0,0 +1,374 @@
|
||||
"""
|
||||
In-memory queue implementation as a fallback for RabbitMQ.
|
||||
This allows running the platform without an external RabbitMQ server.
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
from queue import Queue, Empty
|
||||
from threading import Lock
|
||||
from typing import Optional, Callable, Awaitable
|
||||
from collections import defaultdict
|
||||
|
||||
from backend.data.rabbitmq import (
|
||||
RabbitMQBase,
|
||||
RabbitMQConfig,
|
||||
Exchange,
|
||||
Queue as QueueConfig,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InMemoryMessageBroker:
|
||||
"""
|
||||
Singleton in-memory message broker that manages all queues and exchanges.
|
||||
Thread-safe implementation.
|
||||
"""
|
||||
_instance = None
|
||||
_lock = Lock()
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
with cls._lock:
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._queues = {}
|
||||
cls._instance._async_queues = {}
|
||||
cls._instance._exchanges = {}
|
||||
cls._instance._bindings = defaultdict(list) # exchange -> [(queue, routing_key)]
|
||||
cls._instance._consumers = {} # queue -> [callbacks]
|
||||
cls._instance._async_consumers = {} # queue -> [async callbacks]
|
||||
return cls._instance
|
||||
|
||||
def declare_exchange(self, exchange: Exchange):
|
||||
"""Declare an exchange"""
|
||||
with self._lock:
|
||||
if exchange.name not in self._exchanges:
|
||||
self._exchanges[exchange.name] = exchange
|
||||
logger.debug(f"Declared exchange: {exchange.name} (type: {exchange.type})")
|
||||
|
||||
def declare_queue(self, queue: QueueConfig):
|
||||
"""Declare a queue"""
|
||||
with self._lock:
|
||||
if queue.name not in self._queues:
|
||||
self._queues[queue.name] = Queue()
|
||||
self._async_queues[queue.name] = asyncio.Queue()
|
||||
logger.debug(f"Declared queue: {queue.name}")
|
||||
|
||||
def bind_queue(self, queue_name: str, exchange_name: str, routing_key: str):
|
||||
"""Bind a queue to an exchange with a routing key"""
|
||||
with self._lock:
|
||||
self._bindings[exchange_name].append((queue_name, routing_key))
|
||||
logger.debug(f"Bound queue {queue_name} to exchange {exchange_name} with key {routing_key}")
|
||||
|
||||
def publish(self, exchange_name: str, routing_key: str, message: str):
|
||||
"""Publish a message to an exchange"""
|
||||
with self._lock:
|
||||
if not exchange_name:
|
||||
# Direct publish to queue (default exchange)
|
||||
if routing_key in self._queues:
|
||||
self._queues[routing_key].put(message)
|
||||
logger.debug(f"Published to queue {routing_key}: {message[:100]}...")
|
||||
return
|
||||
|
||||
# Publish to exchange
|
||||
exchange = self._exchanges.get(exchange_name)
|
||||
if not exchange:
|
||||
logger.warning(f"Exchange {exchange_name} not found")
|
||||
return
|
||||
|
||||
# Route message based on exchange type
|
||||
for queue_name, bound_key in self._bindings.get(exchange_name, []):
|
||||
if self._matches_routing_key(exchange, routing_key, bound_key):
|
||||
if queue_name in self._queues:
|
||||
self._queues[queue_name].put(message)
|
||||
logger.debug(f"Routed to queue {queue_name}: {message[:100]}...")
|
||||
|
||||
async def publish_async(self, exchange_name: str, routing_key: str, message: str):
|
||||
"""Async version of publish"""
|
||||
with self._lock:
|
||||
if not exchange_name:
|
||||
# Direct publish to queue (default exchange)
|
||||
if routing_key in self._async_queues:
|
||||
await self._async_queues[routing_key].put(message)
|
||||
logger.debug(f"Published async to queue {routing_key}: {message[:100]}...")
|
||||
return
|
||||
|
||||
# Publish to exchange
|
||||
exchange = self._exchanges.get(exchange_name)
|
||||
if not exchange:
|
||||
logger.warning(f"Exchange {exchange_name} not found")
|
||||
return
|
||||
|
||||
# Route message based on exchange type
|
||||
for queue_name, bound_key in self._bindings.get(exchange_name, []):
|
||||
if self._matches_routing_key(exchange, routing_key, bound_key):
|
||||
if queue_name in self._async_queues:
|
||||
await self._async_queues[queue_name].put(message)
|
||||
logger.debug(f"Routed async to queue {queue_name}: {message[:100]}...")
|
||||
|
||||
def get_message(self, queue_name: str, timeout: float = 1.0) -> Optional[str]:
|
||||
"""Get a message from a queue"""
|
||||
if queue_name not in self._queues:
|
||||
return None
|
||||
try:
|
||||
return self._queues[queue_name].get(timeout=timeout)
|
||||
except Empty:
|
||||
return None
|
||||
|
||||
async def get_message_async(self, queue_name: str, timeout: float = 1.0) -> Optional[str]:
|
||||
"""Async version of get_message"""
|
||||
if queue_name not in self._async_queues:
|
||||
return None
|
||||
try:
|
||||
return await asyncio.wait_for(
|
||||
self._async_queues[queue_name].get(),
|
||||
timeout=timeout
|
||||
)
|
||||
except asyncio.TimeoutError:
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _matches_routing_key(exchange: Exchange, routing_key: str, bound_key: str) -> bool:
|
||||
"""Check if a routing key matches a bound key based on exchange type"""
|
||||
from backend.data.rabbitmq import ExchangeType
|
||||
|
||||
if exchange.type == ExchangeType.FANOUT:
|
||||
# Fanout exchanges ignore routing keys
|
||||
return True
|
||||
elif exchange.type == ExchangeType.DIRECT:
|
||||
# Direct exchanges require exact match
|
||||
return routing_key == bound_key
|
||||
elif exchange.type == ExchangeType.TOPIC:
|
||||
# Topic exchanges support wildcards (* and #)
|
||||
return InMemoryMessageBroker._match_topic(routing_key, bound_key)
|
||||
else:
|
||||
# Headers exchange not implemented, default to exact match
|
||||
return routing_key == bound_key
|
||||
|
||||
@staticmethod
|
||||
def _match_topic(routing_key: str, pattern: str) -> bool:
|
||||
"""Match a routing key against a topic pattern"""
|
||||
routing_parts = routing_key.split('.')
|
||||
pattern_parts = pattern.split('.')
|
||||
|
||||
if len(pattern_parts) > len(routing_parts) and '#' not in pattern:
|
||||
return False
|
||||
|
||||
for i, pattern_part in enumerate(pattern_parts):
|
||||
if pattern_part == '#':
|
||||
# # matches zero or more words
|
||||
return True
|
||||
if pattern_part == '*':
|
||||
# * matches exactly one word
|
||||
if i >= len(routing_parts):
|
||||
return False
|
||||
continue
|
||||
if i >= len(routing_parts) or routing_parts[i] != pattern_part:
|
||||
return False
|
||||
|
||||
return len(routing_parts) == len(pattern_parts)
|
||||
|
||||
|
||||
class InMemorySyncRabbitMQ(RabbitMQBase):
|
||||
"""Synchronous in-memory RabbitMQ replacement"""
|
||||
|
||||
def __init__(self, config: RabbitMQConfig):
|
||||
# Don't call super().__init__() as it tries to connect to real RabbitMQ
|
||||
self.config = config
|
||||
self._connection = True # Fake connection
|
||||
self._channel = True # Fake channel
|
||||
self._broker = InMemoryMessageBroker()
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return bool(self._connection)
|
||||
|
||||
@property
|
||||
def is_ready(self) -> bool:
|
||||
return bool(self._connection and self._channel)
|
||||
|
||||
def connect(self) -> None:
|
||||
"""Establish fake connection"""
|
||||
if self.is_connected:
|
||||
return
|
||||
|
||||
logger.info("Connecting to in-memory message broker")
|
||||
self._connection = True
|
||||
self._channel = True
|
||||
self.declare_infrastructure()
|
||||
|
||||
def disconnect(self) -> None:
|
||||
"""Close fake connection"""
|
||||
self._connection = None
|
||||
self._channel = None
|
||||
|
||||
def declare_infrastructure(self) -> None:
|
||||
"""Declare exchanges and queues"""
|
||||
if not self.is_ready:
|
||||
self.connect()
|
||||
|
||||
# Declare exchanges
|
||||
for exchange in self.config.exchanges:
|
||||
self._broker.declare_exchange(exchange)
|
||||
|
||||
# Declare queues and bind them
|
||||
for queue in self.config.queues:
|
||||
self._broker.declare_queue(queue)
|
||||
if queue.exchange:
|
||||
self._broker.bind_queue(
|
||||
queue.name,
|
||||
queue.exchange.name,
|
||||
queue.routing_key or queue.name
|
||||
)
|
||||
|
||||
def publish_message(
|
||||
self,
|
||||
routing_key: str,
|
||||
message: str,
|
||||
exchange: Optional[Exchange] = None,
|
||||
properties=None,
|
||||
mandatory: bool = True,
|
||||
) -> None:
|
||||
"""Publish a message"""
|
||||
if not self.is_ready:
|
||||
self.connect()
|
||||
|
||||
exchange_name = exchange.name if exchange else ""
|
||||
self._broker.publish(exchange_name, routing_key, message)
|
||||
|
||||
def get_channel(self):
|
||||
"""Get the channel (returns self for in-memory implementation)"""
|
||||
if not self.is_ready:
|
||||
self.connect()
|
||||
return self
|
||||
|
||||
def basic_consume(
|
||||
self,
|
||||
queue: str,
|
||||
on_message_callback: Callable,
|
||||
auto_ack: bool = False
|
||||
):
|
||||
"""
|
||||
Consume messages from a queue.
|
||||
This is a simplified implementation for testing.
|
||||
"""
|
||||
def consume_loop():
|
||||
while True:
|
||||
message = self._broker.get_message(queue, timeout=1.0)
|
||||
if message:
|
||||
# Create a mock message object
|
||||
class MockMethod:
|
||||
delivery_tag = 1
|
||||
class MockMessage:
|
||||
def __init__(self, body):
|
||||
self.body = body.encode() if isinstance(body, str) else body
|
||||
|
||||
on_message_callback(self, MockMethod(), None, MockMessage(message))
|
||||
|
||||
# Start consumer in background (simplified, not production-ready)
|
||||
import threading
|
||||
thread = threading.Thread(target=consume_loop, daemon=True)
|
||||
thread.start()
|
||||
|
||||
def basic_ack(self, delivery_tag):
|
||||
"""Acknowledge a message (no-op for in-memory)"""
|
||||
pass
|
||||
|
||||
def basic_qos(self, prefetch_count: int):
|
||||
"""Set QoS (no-op for in-memory)"""
|
||||
pass
|
||||
|
||||
|
||||
class InMemoryAsyncRabbitMQ(RabbitMQBase):
|
||||
"""Asynchronous in-memory RabbitMQ replacement"""
|
||||
|
||||
def __init__(self, config: RabbitMQConfig):
|
||||
# Don't call super().__init__() as it tries to connect to real RabbitMQ
|
||||
self.config = config
|
||||
self._connection = True # Fake connection
|
||||
self._channel = True # Fake channel
|
||||
self._broker = InMemoryMessageBroker()
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return bool(self._connection)
|
||||
|
||||
@property
|
||||
def is_ready(self) -> bool:
|
||||
return bool(self._connection and self._channel)
|
||||
|
||||
async def connect(self):
|
||||
"""Establish fake connection"""
|
||||
if self.is_connected:
|
||||
return
|
||||
|
||||
logger.info("Connecting to in-memory async message broker")
|
||||
self._connection = True
|
||||
self._channel = True
|
||||
await self.declare_infrastructure()
|
||||
|
||||
async def disconnect(self):
|
||||
"""Close fake connection"""
|
||||
self._connection = None
|
||||
self._channel = None
|
||||
|
||||
async def declare_infrastructure(self):
|
||||
"""Declare exchanges and queues"""
|
||||
if not self.is_ready:
|
||||
await self.connect()
|
||||
|
||||
# Declare exchanges
|
||||
for exchange in self.config.exchanges:
|
||||
self._broker.declare_exchange(exchange)
|
||||
|
||||
# Declare queues and bind them
|
||||
for queue in self.config.queues:
|
||||
self._broker.declare_queue(queue)
|
||||
if queue.exchange:
|
||||
self._broker.bind_queue(
|
||||
queue.name,
|
||||
queue.exchange.name,
|
||||
queue.routing_key or queue.name
|
||||
)
|
||||
|
||||
async def publish_message(
|
||||
self,
|
||||
routing_key: str,
|
||||
message: str,
|
||||
exchange: Optional[Exchange] = None,
|
||||
persistent: bool = True,
|
||||
) -> None:
|
||||
"""Publish a message"""
|
||||
if not self.is_ready:
|
||||
await self.connect()
|
||||
|
||||
exchange_name = exchange.name if exchange else ""
|
||||
await self._broker.publish_async(exchange_name, routing_key, message)
|
||||
|
||||
async def get_channel(self):
|
||||
"""Get the channel (returns self for in-memory implementation)"""
|
||||
if not self.is_ready:
|
||||
await self.connect()
|
||||
return self
|
||||
|
||||
async def consume_messages(
|
||||
self,
|
||||
queue_name: str,
|
||||
callback: Callable[[str], Awaitable[None]]
|
||||
):
|
||||
"""
|
||||
Consume messages from a queue asynchronously.
|
||||
Calls the callback for each message.
|
||||
"""
|
||||
while True:
|
||||
message = await self._broker.get_message_async(queue_name, timeout=1.0)
|
||||
if message:
|
||||
try:
|
||||
await callback(message)
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing message: {e}")
|
||||
|
||||
async def get_message(self, queue_name: str, timeout: float = 1.0) -> Optional[str]:
|
||||
"""Get a single message from a queue"""
|
||||
return await self._broker.get_message_async(queue_name, timeout)
|
||||
293
autogpt_platform/backend/backend/data/inmemory_redis.py
Normal file
293
autogpt_platform/backend/backend/data/inmemory_redis.py
Normal file
@@ -0,0 +1,293 @@
|
||||
"""
|
||||
In-memory Redis client implementation as a fallback.
|
||||
This allows running the platform without an external Redis server.
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
from threading import Lock
|
||||
from typing import Any, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class InMemoryRedis:
|
||||
"""
|
||||
In-memory Redis-compatible client.
|
||||
Thread-safe singleton implementation.
|
||||
"""
|
||||
_instance = None
|
||||
_lock = Lock()
|
||||
|
||||
def __new__(cls):
|
||||
if cls._instance is None:
|
||||
with cls._lock:
|
||||
if cls._instance is None:
|
||||
cls._instance = super().__new__(cls)
|
||||
cls._instance._data = {}
|
||||
cls._instance._pubsub_channels = defaultdict(list)
|
||||
cls._instance._data_lock = Lock()
|
||||
return cls._instance
|
||||
|
||||
def ping(self) -> bool:
|
||||
"""Test connection (always succeeds for in-memory)"""
|
||||
return True
|
||||
|
||||
def get(self, key: str) -> Optional[str]:
|
||||
"""Get a value by key"""
|
||||
with self._data_lock:
|
||||
return self._data.get(key)
|
||||
|
||||
def set(
|
||||
self,
|
||||
key: str,
|
||||
value: Any,
|
||||
ex: Optional[int] = None,
|
||||
px: Optional[int] = None,
|
||||
nx: bool = False,
|
||||
xx: bool = False,
|
||||
) -> bool:
|
||||
"""Set a value with optional expiration"""
|
||||
with self._data_lock:
|
||||
if nx and key in self._data:
|
||||
return False
|
||||
if xx and key not in self._data:
|
||||
return False
|
||||
|
||||
self._data[key] = str(value)
|
||||
# Note: Expiration (ex, px) not implemented in this simple version
|
||||
if ex or px:
|
||||
logger.debug(f"Expiration not implemented in InMemoryRedis (key: {key})")
|
||||
return True
|
||||
|
||||
def delete(self, *keys: str) -> int:
|
||||
"""Delete one or more keys"""
|
||||
with self._data_lock:
|
||||
count = 0
|
||||
for key in keys:
|
||||
if key in self._data:
|
||||
del self._data[key]
|
||||
count += 1
|
||||
return count
|
||||
|
||||
def exists(self, *keys: str) -> int:
|
||||
"""Check if keys exist"""
|
||||
with self._data_lock:
|
||||
return sum(1 for key in keys if key in self._data)
|
||||
|
||||
def keys(self, pattern: str = "*") -> list[str]:
|
||||
"""Get all keys matching a pattern"""
|
||||
with self._data_lock:
|
||||
if pattern == "*":
|
||||
return list(self._data.keys())
|
||||
# Simple pattern matching
|
||||
import re
|
||||
regex_pattern = pattern.replace("*", ".*").replace("?", ".")
|
||||
regex = re.compile(regex_pattern)
|
||||
return [key for key in self._data.keys() if regex.match(key)]
|
||||
|
||||
def incr(self, key: str) -> int:
|
||||
"""Increment a key"""
|
||||
with self._data_lock:
|
||||
current = int(self._data.get(key, 0))
|
||||
current += 1
|
||||
self._data[key] = str(current)
|
||||
return current
|
||||
|
||||
def decr(self, key: str) -> int:
|
||||
"""Decrement a key"""
|
||||
with self._data_lock:
|
||||
current = int(self._data.get(key, 0))
|
||||
current -= 1
|
||||
self._data[key] = str(current)
|
||||
return current
|
||||
|
||||
def expire(self, key: str, seconds: int) -> bool:
|
||||
"""Set expiration on a key (not implemented)"""
|
||||
logger.debug(f"Expiration not implemented in InMemoryRedis (key: {key})")
|
||||
return key in self._data
|
||||
|
||||
def ttl(self, key: str) -> int:
|
||||
"""Get time to live (always returns -1 for no expiration)"""
|
||||
with self._data_lock:
|
||||
if key not in self._data:
|
||||
return -2 # Key doesn't exist
|
||||
return -1 # No expiration
|
||||
|
||||
def publish(self, channel: str, message: str) -> int:
|
||||
"""Publish a message to a channel"""
|
||||
# This is handled by InMemoryEventBus
|
||||
logger.debug(f"Publish to channel {channel}: {message[:100]}...")
|
||||
return 1
|
||||
|
||||
def pubsub(self):
|
||||
"""Get a pubsub object"""
|
||||
return InMemoryPubSub(self)
|
||||
|
||||
def close(self):
|
||||
"""Close connection (no-op for in-memory)"""
|
||||
pass
|
||||
|
||||
def flushdb(self):
|
||||
"""Flush all data (for testing)"""
|
||||
with self._data_lock:
|
||||
self._data.clear()
|
||||
|
||||
|
||||
class InMemoryPubSub:
|
||||
"""In-memory PubSub implementation"""
|
||||
|
||||
def __init__(self, client: InMemoryRedis):
|
||||
self.client = client
|
||||
self.channels = []
|
||||
self.patterns = []
|
||||
|
||||
def subscribe(self, *channels: str):
|
||||
"""Subscribe to channels"""
|
||||
self.channels.extend(channels)
|
||||
logger.debug(f"Subscribed to channels: {channels}")
|
||||
|
||||
def psubscribe(self, *patterns: str):
|
||||
"""Subscribe to channel patterns"""
|
||||
self.patterns.extend(patterns)
|
||||
logger.debug(f"Subscribed to patterns: {patterns}")
|
||||
|
||||
def unsubscribe(self, *channels: str):
|
||||
"""Unsubscribe from channels"""
|
||||
for channel in channels:
|
||||
if channel in self.channels:
|
||||
self.channels.remove(channel)
|
||||
|
||||
def punsubscribe(self, *patterns: str):
|
||||
"""Unsubscribe from patterns"""
|
||||
for pattern in patterns:
|
||||
if pattern in self.patterns:
|
||||
self.patterns.remove(pattern)
|
||||
|
||||
def listen(self):
|
||||
"""Listen for messages (returns empty generator)"""
|
||||
# This should be integrated with InMemoryEventBus
|
||||
# For now, return empty to prevent blocking
|
||||
return iter([])
|
||||
|
||||
def close(self):
|
||||
"""Close pubsub connection"""
|
||||
self.channels = []
|
||||
self.patterns = []
|
||||
|
||||
|
||||
class InMemoryAsyncRedis:
|
||||
"""
|
||||
Async in-memory Redis-compatible client.
|
||||
Uses the same singleton storage as the sync version.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._sync_client = InMemoryRedis()
|
||||
|
||||
async def ping(self) -> bool:
|
||||
"""Test connection (always succeeds for in-memory)"""
|
||||
return True
|
||||
|
||||
async def get(self, key: str) -> Optional[str]:
|
||||
"""Get a value by key"""
|
||||
return self._sync_client.get(key)
|
||||
|
||||
async def set(
|
||||
self,
|
||||
key: str,
|
||||
value: Any,
|
||||
ex: Optional[int] = None,
|
||||
px: Optional[int] = None,
|
||||
nx: bool = False,
|
||||
xx: bool = False,
|
||||
) -> bool:
|
||||
"""Set a value with optional expiration"""
|
||||
return self._sync_client.set(key, value, ex, px, nx, xx)
|
||||
|
||||
async def delete(self, *keys: str) -> int:
|
||||
"""Delete one or more keys"""
|
||||
return self._sync_client.delete(*keys)
|
||||
|
||||
async def exists(self, *keys: str) -> int:
|
||||
"""Check if keys exist"""
|
||||
return self._sync_client.exists(*keys)
|
||||
|
||||
async def keys(self, pattern: str = "*") -> list[str]:
|
||||
"""Get all keys matching a pattern"""
|
||||
return self._sync_client.keys(pattern)
|
||||
|
||||
async def incr(self, key: str) -> int:
|
||||
"""Increment a key"""
|
||||
return self._sync_client.incr(key)
|
||||
|
||||
async def decr(self, key: str) -> int:
|
||||
"""Decrement a key"""
|
||||
return self._sync_client.decr(key)
|
||||
|
||||
async def expire(self, key: str, seconds: int) -> bool:
|
||||
"""Set expiration on a key (not implemented)"""
|
||||
return self._sync_client.expire(key, seconds)
|
||||
|
||||
async def ttl(self, key: str) -> int:
|
||||
"""Get time to live"""
|
||||
return self._sync_client.ttl(key)
|
||||
|
||||
async def publish(self, channel: str, message: str) -> int:
|
||||
"""Publish a message to a channel"""
|
||||
return self._sync_client.publish(channel, message)
|
||||
|
||||
def pubsub(self):
|
||||
"""Get a pubsub object"""
|
||||
return InMemoryAsyncPubSub(self)
|
||||
|
||||
async def close(self):
|
||||
"""Close connection (no-op for in-memory)"""
|
||||
pass
|
||||
|
||||
async def flushdb(self):
|
||||
"""Flush all data (for testing)"""
|
||||
self._sync_client.flushdb()
|
||||
|
||||
|
||||
class InMemoryAsyncPubSub:
|
||||
"""Async in-memory PubSub implementation"""
|
||||
|
||||
def __init__(self, client: InMemoryAsyncRedis):
|
||||
self.client = client
|
||||
self.channels = []
|
||||
self.patterns = []
|
||||
|
||||
async def subscribe(self, *channels: str):
|
||||
"""Subscribe to channels"""
|
||||
self.channels.extend(channels)
|
||||
logger.debug(f"Subscribed to channels: {channels}")
|
||||
|
||||
async def psubscribe(self, *patterns: str):
|
||||
"""Subscribe to channel patterns"""
|
||||
self.patterns.extend(patterns)
|
||||
logger.debug(f"Subscribed to patterns: {patterns}")
|
||||
|
||||
async def unsubscribe(self, *channels: str):
|
||||
"""Unsubscribe from channels"""
|
||||
for channel in channels:
|
||||
if channel in self.channels:
|
||||
self.channels.remove(channel)
|
||||
|
||||
async def punsubscribe(self, *patterns: str):
|
||||
"""Unsubscribe from patterns"""
|
||||
for pattern in patterns:
|
||||
if pattern in self.patterns:
|
||||
self.patterns.remove(pattern)
|
||||
|
||||
async def listen(self):
|
||||
"""Listen for messages (returns empty async generator)"""
|
||||
# This should be integrated with InMemoryEventBus
|
||||
# For now, return empty to prevent blocking
|
||||
if False:
|
||||
yield
|
||||
|
||||
async def close(self):
|
||||
"""Close pubsub connection"""
|
||||
self.channels = []
|
||||
self.patterns = []
|
||||
@@ -1,4 +1,5 @@
|
||||
import logging
|
||||
import os
|
||||
from abc import ABC, abstractmethod
|
||||
from enum import Enum
|
||||
from typing import Awaitable, Optional
|
||||
@@ -14,6 +15,10 @@ from backend.util.settings import Settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Check if we should use in-memory implementations
|
||||
_settings = Settings()
|
||||
USE_IN_MEMORY = _settings.config.standalone_mode or os.getenv("STANDALONE_MODE", "").lower() in ("true", "1", "yes")
|
||||
|
||||
# RabbitMQ Connection Constants
|
||||
# These constants solve specific connection stability issues observed in production
|
||||
|
||||
@@ -88,6 +93,9 @@ class RabbitMQBase(ABC):
|
||||
self._connection = None
|
||||
self._channel = None
|
||||
|
||||
# Check if we should use in-memory implementation
|
||||
self._use_in_memory = USE_IN_MEMORY
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
"""Check if we have a valid connection"""
|
||||
@@ -117,16 +125,33 @@ class RabbitMQBase(ABC):
|
||||
class SyncRabbitMQ(RabbitMQBase):
|
||||
"""Synchronous RabbitMQ client"""
|
||||
|
||||
def __init__(self, config: RabbitMQConfig):
|
||||
super().__init__(config)
|
||||
if self._use_in_memory:
|
||||
from backend.data.inmemory_queue import InMemorySyncRabbitMQ
|
||||
self._in_memory_client = InMemorySyncRabbitMQ(config)
|
||||
logger.info("Using in-memory RabbitMQ client (standalone mode)")
|
||||
else:
|
||||
self._in_memory_client = None
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
if self._in_memory_client:
|
||||
return self._in_memory_client.is_connected
|
||||
return bool(self._connection and self._connection.is_open)
|
||||
|
||||
@property
|
||||
def is_ready(self) -> bool:
|
||||
if self._in_memory_client:
|
||||
return self._in_memory_client.is_ready
|
||||
return bool(self.is_connected and self._channel and self._channel.is_open)
|
||||
|
||||
@conn_retry("RabbitMQ", "Acquiring connection")
|
||||
def connect(self) -> None:
|
||||
if self._in_memory_client:
|
||||
self._in_memory_client.connect()
|
||||
return
|
||||
|
||||
if self.is_connected:
|
||||
return
|
||||
|
||||
@@ -150,6 +175,10 @@ class SyncRabbitMQ(RabbitMQBase):
|
||||
self.declare_infrastructure()
|
||||
|
||||
def disconnect(self) -> None:
|
||||
if self._in_memory_client:
|
||||
self._in_memory_client.disconnect()
|
||||
return
|
||||
|
||||
if self._channel:
|
||||
if self._channel.is_open:
|
||||
self._channel.close()
|
||||
@@ -161,6 +190,10 @@ class SyncRabbitMQ(RabbitMQBase):
|
||||
|
||||
def declare_infrastructure(self) -> None:
|
||||
"""Declare exchanges and queues for this service"""
|
||||
if self._in_memory_client:
|
||||
self._in_memory_client.declare_infrastructure()
|
||||
return
|
||||
|
||||
if not self.is_ready:
|
||||
self.connect()
|
||||
|
||||
@@ -200,6 +233,10 @@ class SyncRabbitMQ(RabbitMQBase):
|
||||
properties: Optional[BasicProperties] = None,
|
||||
mandatory: bool = True,
|
||||
) -> None:
|
||||
if self._in_memory_client:
|
||||
self._in_memory_client.publish_message(routing_key, message, exchange, properties, mandatory)
|
||||
return
|
||||
|
||||
if not self.is_ready:
|
||||
self.connect()
|
||||
|
||||
@@ -214,7 +251,10 @@ class SyncRabbitMQ(RabbitMQBase):
|
||||
mandatory=mandatory,
|
||||
)
|
||||
|
||||
def get_channel(self) -> pika.adapters.blocking_connection.BlockingChannel:
|
||||
def get_channel(self):
|
||||
if self._in_memory_client:
|
||||
return self._in_memory_client.get_channel()
|
||||
|
||||
if not self.is_ready:
|
||||
self.connect()
|
||||
if self._channel is None:
|
||||
@@ -225,16 +265,33 @@ class SyncRabbitMQ(RabbitMQBase):
|
||||
class AsyncRabbitMQ(RabbitMQBase):
|
||||
"""Asynchronous RabbitMQ client"""
|
||||
|
||||
def __init__(self, config: RabbitMQConfig):
|
||||
super().__init__(config)
|
||||
if self._use_in_memory:
|
||||
from backend.data.inmemory_queue import InMemoryAsyncRabbitMQ
|
||||
self._in_memory_client = InMemoryAsyncRabbitMQ(config)
|
||||
logger.info("Using in-memory async RabbitMQ client (standalone mode)")
|
||||
else:
|
||||
self._in_memory_client = None
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
if self._in_memory_client:
|
||||
return self._in_memory_client.is_connected
|
||||
return bool(self._connection and not self._connection.is_closed)
|
||||
|
||||
@property
|
||||
def is_ready(self) -> bool:
|
||||
if self._in_memory_client:
|
||||
return self._in_memory_client.is_ready
|
||||
return bool(self.is_connected and self._channel and not self._channel.is_closed)
|
||||
|
||||
@conn_retry("AsyncRabbitMQ", "Acquiring async connection")
|
||||
async def connect(self):
|
||||
if self._in_memory_client:
|
||||
await self._in_memory_client.connect()
|
||||
return
|
||||
|
||||
if self.is_connected:
|
||||
return
|
||||
|
||||
@@ -253,6 +310,10 @@ class AsyncRabbitMQ(RabbitMQBase):
|
||||
await self.declare_infrastructure()
|
||||
|
||||
async def disconnect(self):
|
||||
if self._in_memory_client:
|
||||
await self._in_memory_client.disconnect()
|
||||
return
|
||||
|
||||
if self._channel:
|
||||
await self._channel.close()
|
||||
self._channel = None
|
||||
@@ -262,6 +323,10 @@ class AsyncRabbitMQ(RabbitMQBase):
|
||||
|
||||
async def declare_infrastructure(self):
|
||||
"""Declare exchanges and queues for this service"""
|
||||
if self._in_memory_client:
|
||||
await self._in_memory_client.declare_infrastructure()
|
||||
return
|
||||
|
||||
if not self.is_ready:
|
||||
await self.connect()
|
||||
|
||||
@@ -299,6 +364,10 @@ class AsyncRabbitMQ(RabbitMQBase):
|
||||
exchange: Optional[Exchange] = None,
|
||||
persistent: bool = True,
|
||||
) -> None:
|
||||
if self._in_memory_client:
|
||||
await self._in_memory_client.publish_message(routing_key, message, exchange, persistent)
|
||||
return
|
||||
|
||||
if not self.is_ready:
|
||||
await self.connect()
|
||||
|
||||
@@ -322,7 +391,10 @@ class AsyncRabbitMQ(RabbitMQBase):
|
||||
routing_key=routing_key,
|
||||
)
|
||||
|
||||
async def get_channel(self) -> aio_pika.abc.AbstractChannel:
|
||||
async def get_channel(self):
|
||||
if self._in_memory_client:
|
||||
return await self._in_memory_client.get_channel()
|
||||
|
||||
if not self.is_ready:
|
||||
await self.connect()
|
||||
if self._channel is None:
|
||||
|
||||
@@ -7,6 +7,7 @@ from redis import Redis
|
||||
from redis.asyncio import Redis as AsyncRedis
|
||||
|
||||
from backend.util.retry import conn_retry
|
||||
from backend.util.settings import Settings
|
||||
|
||||
load_dotenv()
|
||||
|
||||
@@ -16,9 +17,18 @@ PASSWORD = os.getenv("REDIS_PASSWORD", None)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Check if we should use in-memory implementations
|
||||
_settings = Settings()
|
||||
USE_IN_MEMORY = _settings.config.standalone_mode or os.getenv("STANDALONE_MODE", "").lower() in ("true", "1", "yes")
|
||||
|
||||
|
||||
@conn_retry("Redis", "Acquiring connection")
|
||||
def connect() -> Redis:
|
||||
if USE_IN_MEMORY:
|
||||
from backend.data.inmemory_redis import InMemoryRedis
|
||||
logger.info("Using in-memory Redis client (standalone mode)")
|
||||
return InMemoryRedis()
|
||||
|
||||
c = Redis(
|
||||
host=HOST,
|
||||
port=PORT,
|
||||
@@ -41,6 +51,11 @@ def get_redis() -> Redis:
|
||||
|
||||
@conn_retry("AsyncRedis", "Acquiring connection")
|
||||
async def connect_async() -> AsyncRedis:
|
||||
if USE_IN_MEMORY:
|
||||
from backend.data.inmemory_redis import InMemoryAsyncRedis
|
||||
logger.info("Using in-memory async Redis client (standalone mode)")
|
||||
return InMemoryAsyncRedis()
|
||||
|
||||
c = AsyncRedis(
|
||||
host=HOST,
|
||||
port=PORT,
|
||||
|
||||
@@ -316,6 +316,11 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
|
||||
description="Whether to enable example blocks in production",
|
||||
)
|
||||
|
||||
standalone_mode: bool = Field(
|
||||
default=False,
|
||||
description="Whether to run in standalone mode without external dependencies (Redis, RabbitMQ, Supabase). Uses in-memory implementations instead.",
|
||||
)
|
||||
|
||||
cloud_storage_cleanup_interval_hours: int = Field(
|
||||
default=6,
|
||||
ge=1,
|
||||
|
||||
Reference in New Issue
Block a user