Compare commits

...

1 Commits

Author SHA1 Message Date
Claude
938dff9b99 feat(platform): Add standalone mode for running without external dependencies
Implemented in-memory fallbacks for Redis, RabbitMQ, and event bus to enable
running the platform in restricted environments without Docker or external services.

Changes:
- Added standalone_mode flag to Settings configuration
- Created InMemoryRedis client with basic Redis operations
- Created InMemoryEventBus for pub/sub messaging without Redis
- Created InMemoryMessageBroker for queue management without RabbitMQ
- Updated redis_client.py to use in-memory client when standalone mode enabled
- Updated event_bus.py to use in-memory implementation when standalone mode enabled
- Updated rabbitmq.py to use in-memory implementation when standalone mode enabled
- Added .env.standalone template for easy standalone configuration
- Added STANDALONE_MODE.md documentation

This allows developers to run the platform locally without Docker, useful for:
- Development and testing
- CI/CD pipelines
- Restricted environments
- Quick demos

Limitations:
- No data persistence (all in memory)
- Single process only (no distributed execution)
- Database still required (PostgreSQL or SQLite)

Usage:
  export STANDALONE_MODE=true
  # or copy .env.standalone to .env

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-21 21:15:29 +00:00
9 changed files with 1300 additions and 2 deletions

198
STANDALONE_MODE.md Normal file
View File

@@ -0,0 +1,198 @@
# Standalone Mode
This document describes how to run the AutoGPT platform in **standalone mode** without external dependencies like Redis, RabbitMQ, and Supabase.
## Overview
Standalone mode uses in-memory implementations for:
- **Redis**: In-memory key-value store and pub/sub messaging
- **RabbitMQ**: In-memory message queues and task distribution
- **Event Bus**: In-memory event distribution system
This is useful for:
- Development and testing without Docker
- Running in restricted environments
- Quick local demos
- CI/CD pipelines without external services
## Limitations
Standalone mode has these limitations:
- **No persistence**: All data is stored in memory and lost on restart
- **Single process**: Cannot scale across multiple workers/servers
- **No distributed locking**: Cluster coordination features are disabled
- **Database still required**: You still need PostgreSQL or need to configure SQLite
- **Authentication may need adjustment**: Supabase authentication won't work
## How to Enable
### Method 1: Environment Variable
Set the `STANDALONE_MODE` environment variable:
```bash
export STANDALONE_MODE=true
```
### Method 2: Configuration File
1. Copy the standalone environment template:
```bash
cd autogpt_platform/backend
cp .env.standalone .env
```
2. Edit `.env` and adjust any settings as needed
3. Add to the config (via environment or config.json):
```json
{
"standalone_mode": true
}
```
### Method 3: Programmatic
In your code, you can check/set standalone mode:
```python
from backend.util.settings import Settings
settings = Settings()
settings.config.standalone_mode = True
```
## Running the Platform
Once standalone mode is enabled:
```bash
cd autogpt_platform/backend
# Install dependencies
poetry install
# Run migrations (if using SQLite, update schema first)
poetry run prisma migrate deploy
# Start the backend server
poetry run python -m backend.app
```
The platform will automatically use in-memory implementations when it detects:
- `STANDALONE_MODE=true` environment variable, OR
- `standalone_mode: true` in config settings
## Implementation Details
### Redis Client (`backend/data/redis_client.py`)
When standalone mode is active:
- Returns `InMemoryRedis()` instead of connecting to Redis
- Stores data in a Python dictionary (singleton)
- Supports basic operations: get, set, delete, incr, keys, etc.
- Pub/sub returns empty generators (use event bus instead)
### Event Bus (`backend/data/event_bus.py`)
When standalone mode is active:
- `RedisEventBus` uses `InMemorySyncEventBus`
- `AsyncRedisEventBus` uses `InMemoryAsyncEventBus`
- Events are distributed via Python queues
- Supports pattern subscriptions with wildcards
### RabbitMQ (`backend/data/rabbitmq.py`)
When standalone mode is active:
- `SyncRabbitMQ` uses `InMemorySyncRabbitMQ`
- `AsyncRabbitMQ` uses `InMemoryAsyncRabbitMQ`
- Messages are routed via Python queues
- Supports exchanges, queues, and routing keys
- Implements fanout, direct, and topic exchange types
## Troubleshooting
### "Connection refused" errors
If you still see connection errors:
1. Check that `STANDALONE_MODE=true` is set
2. Verify the setting with: `python -c "from backend.util.settings import Settings; print(Settings().config.standalone_mode)"`
3. Make sure you're not explicitly connecting to Redis/RabbitMQ elsewhere
### Database connection errors
Standalone mode does NOT replace the database. You need either:
- PostgreSQL running locally
- SQLite (requires schema changes)
- A remote database connection
### Authentication errors
Supabase authentication won't work in standalone mode. Options:
1. Disable authentication: `ENABLE_AUTH=false` in `.env`
2. Implement a mock auth provider
3. Use a different auth method
## Architecture
```
┌─────────────────────────────────────────┐
│ Standalone Mode │
├─────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────┐ │
│ │ InMemoryRedis (Singleton) │ │
│ │ - Key-value store (dict) │ │
│ │ - TTL not implemented │ │
│ └──────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────┐ │
│ │ InMemoryEventBus (Singleton) │ │
│ │ - Pub/sub via queues │ │
│ │ - Pattern matching support │ │
│ └──────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────┐ │
│ │ InMemoryMessageBroker │ │
│ │ - Exchanges & queues │ │
│ │ - Routing key matching │ │
│ │ - Fanout/direct/topic types │ │
│ └──────────────────────────────────┘ │
│ │
└─────────────────────────────────────────┘
```
## Testing
To test standalone mode:
```bash
# Set standalone mode
export STANDALONE_MODE=true
# Run tests
cd autogpt_platform/backend
poetry run pytest
# Or run a specific service
poetry run python -m backend.rest
```
## Production Use
**⚠️ Warning**: Standalone mode is NOT recommended for production use because:
- No data persistence
- No horizontal scaling
- No failover or redundancy
- Limited to single-process execution
For production, use the full Docker setup with real Redis, RabbitMQ, and PostgreSQL.
## Contributing
The standalone mode implementations are in:
- `backend/data/inmemory_redis.py` - In-memory Redis client
- `backend/data/inmemory_event_bus.py` - In-memory event bus
- `backend/data/inmemory_queue.py` - In-memory RabbitMQ
Feel free to improve these implementations or add missing features.

View File

@@ -0,0 +1,34 @@
# Standalone Mode Configuration
# This configuration allows running the AutoGPT platform without external dependencies
# Copy this to .env to use standalone mode
# Enable standalone mode (uses in-memory implementations)
STANDALONE_MODE=true
# Database - Use SQLite for standalone mode (if supported by Prisma schema)
# Note: You may need to adjust the Prisma schema to support SQLite
DATABASE_URL=file:./dev.db
# Disable external services
REDIS_HOST=localhost
REDIS_PORT=6379
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
# Disable authentication for standalone mode (optional - for easier testing)
# Uncomment the line below to disable auth
# ENABLE_AUTH=false
# Disable virus scanning for standalone mode
CLAMAV_SERVICE_ENABLED=false
# Minimal secrets (not used in standalone mode but required by config)
SUPABASE_URL=http://localhost:8000
SUPABASE_SERVICE_ROLE_KEY=dummy_key_for_standalone
RABBITMQ_DEFAULT_USER=guest
RABBITMQ_DEFAULT_PASS=guest
ENCRYPTION_KEY=dummy_encryption_key_for_standalone_mode_32chars
# Optional: Configure platform URLs for local testing
PLATFORM_BASE_URL=http://localhost:8006
FRONTEND_BASE_URL=http://localhost:3000

View File

@@ -1,5 +1,6 @@
import asyncio
import logging
import os
from abc import ABC, abstractmethod
from typing import Any, AsyncGenerator, Generator, Generic, Optional, TypeVar
@@ -14,6 +15,9 @@ from backend.util.settings import Settings
logger = logging.getLogger(__name__)
config = Settings().config
# Check if we should use in-memory implementations
USE_IN_MEMORY = config.standalone_mode or os.getenv("STANDALONE_MODE", "").lower() in ("true", "1", "yes")
M = TypeVar("M", bound=BaseModel)
@@ -98,15 +102,36 @@ class _EventPayloadWrapper(BaseModel, Generic[M]):
class RedisEventBus(BaseRedisEventBus[M], ABC):
def __init__(self):
if USE_IN_MEMORY:
from backend.data.inmemory_event_bus import InMemorySyncEventBus
# Copy over the abstract properties to the in-memory implementation
self._in_memory_bus = type(
self.__class__.__name__,
(InMemorySyncEventBus,),
{"event_bus_name": property(lambda s: self.event_bus_name), "Model": self.Model}
)()
logger.info(f"Using in-memory event bus for {self.event_bus_name} (standalone mode)")
else:
self._in_memory_bus = None
@property
def connection(self) -> redis.Redis:
return redis.get_redis()
def publish_event(self, event: M, channel_key: str):
if self._in_memory_bus:
self._in_memory_bus.publish_event(event, channel_key)
return
message, full_channel_name = self._serialize_message(event, channel_key)
self.connection.publish(full_channel_name, message)
def listen_events(self, channel_key: str) -> Generator[M, None, None]:
if self._in_memory_bus:
yield from self._in_memory_bus.listen_events(channel_key)
return
pubsub, full_channel_name = self._get_pubsub_channel(
self.connection, channel_key
)
@@ -123,16 +148,38 @@ class RedisEventBus(BaseRedisEventBus[M], ABC):
class AsyncRedisEventBus(BaseRedisEventBus[M], ABC):
def __init__(self):
if USE_IN_MEMORY:
from backend.data.inmemory_event_bus import InMemoryAsyncEventBus
# Copy over the abstract properties to the in-memory implementation
self._in_memory_bus = type(
self.__class__.__name__,
(InMemoryAsyncEventBus,),
{"event_bus_name": property(lambda s: self.event_bus_name), "Model": self.Model}
)()
logger.info(f"Using in-memory async event bus for {self.event_bus_name} (standalone mode)")
else:
self._in_memory_bus = None
@property
async def connection(self) -> redis.AsyncRedis:
return await redis.get_redis_async()
async def publish_event(self, event: M, channel_key: str):
if self._in_memory_bus:
await self._in_memory_bus.publish_event(event, channel_key)
return
message, full_channel_name = self._serialize_message(event, channel_key)
connection = await self.connection
await connection.publish(full_channel_name, message)
async def listen_events(self, channel_key: str) -> AsyncGenerator[M, None]:
if self._in_memory_bus:
async for event in self._in_memory_bus.listen_events(channel_key):
yield event
return
pubsub, full_channel_name = self._get_pubsub_channel(
await self.connection, channel_key
)
@@ -150,6 +197,9 @@ class AsyncRedisEventBus(BaseRedisEventBus[M], ABC):
async def wait_for_event(
self, channel_key: str, timeout: Optional[float] = None
) -> M | None:
if self._in_memory_bus:
return await self._in_memory_bus.wait_for_event(channel_key, timeout)
try:
return await asyncio.wait_for(
anext(aiter(self.listen_events(channel_key))), timeout

View File

@@ -0,0 +1,257 @@
"""
In-memory event bus implementation as a fallback for Redis.
This allows running the platform without an external Redis server.
"""
import asyncio
import logging
from abc import ABC
from collections import defaultdict
from typing import Any, AsyncGenerator, Generator, Generic
from queue import Queue, Empty
from threading import Lock
from pydantic import BaseModel
from backend.util import json
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
config = Settings().config
# Type variable for message model
M = type(BaseModel)
class _EventPayloadWrapper(BaseModel, Generic[M]):
"""Wrapper model to allow message payloads"""
payload: M
class InMemoryEventBus:
"""
Singleton in-memory event bus that stores all subscribers and messages.
Thread-safe implementation using locks and queues.
"""
_instance = None
_lock = Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._subscribers = defaultdict(list)
cls._instance._async_subscribers = defaultdict(list)
cls._instance._subscriber_lock = Lock()
return cls._instance
def publish(self, channel: str, message: str):
"""Publish a message to all subscribers of a channel"""
with self._subscriber_lock:
# Handle pattern subscriptions (channels with *)
for pattern, queues in self._subscribers.items():
if self._match_pattern(pattern, channel):
for queue in queues:
try:
queue.put({
'type': 'pmessage' if '*' in pattern else 'message',
'channel': channel,
'data': message
})
except Exception as e:
logger.error(f"Failed to publish to queue: {e}")
async def publish_async(self, channel: str, message: str):
"""Async version of publish"""
with self._subscriber_lock:
# Notify async subscribers
for pattern, queues in self._async_subscribers.items():
if self._match_pattern(pattern, channel):
for queue in queues:
try:
await queue.put({
'type': 'pmessage' if '*' in pattern else 'message',
'channel': channel,
'data': message
})
except Exception as e:
logger.error(f"Failed to publish async to queue: {e}")
def subscribe(self, channel: str) -> Queue:
"""Subscribe to a channel and return a queue for receiving messages"""
queue = Queue()
with self._subscriber_lock:
self._subscribers[channel].append(queue)
return queue
async def subscribe_async(self, channel: str) -> asyncio.Queue:
"""Async version of subscribe"""
queue = asyncio.Queue()
with self._subscriber_lock:
self._async_subscribers[channel].append(queue)
return queue
def unsubscribe(self, channel: str, queue: Queue):
"""Unsubscribe a queue from a channel"""
with self._subscriber_lock:
if channel in self._subscribers:
try:
self._subscribers[channel].remove(queue)
except ValueError:
pass
async def unsubscribe_async(self, channel: str, queue: asyncio.Queue):
"""Async version of unsubscribe"""
with self._subscriber_lock:
if channel in self._async_subscribers:
try:
self._async_subscribers[channel].remove(queue)
except ValueError:
pass
@staticmethod
def _match_pattern(pattern: str, channel: str) -> bool:
"""Match a channel against a pattern (supports * wildcard)"""
if '*' not in pattern:
return pattern == channel
# Simple pattern matching for Redis-style patterns
pattern_parts = pattern.split('*')
if len(pattern_parts) == 2:
prefix, suffix = pattern_parts
return channel.startswith(prefix) and channel.endswith(suffix)
# More complex patterns - just check prefix for now
return channel.startswith(pattern_parts[0])
class BaseInMemoryEventBus(Generic[M], ABC):
"""Base class for in-memory event bus implementations"""
Model: type[M]
@property
def event_bus_name(self) -> str:
"""Override this in subclasses"""
return "events"
@property
def Message(self) -> type[_EventPayloadWrapper[M]]:
return _EventPayloadWrapper[self.Model]
def _serialize_message(self, item: M, channel_key: str) -> tuple[str, str]:
"""Serialize a message for publishing"""
MAX_MESSAGE_SIZE = config.max_message_size_limit
try:
message = json.dumps(
self.Message(payload=item), ensure_ascii=False, separators=(",", ":")
)
except UnicodeError:
message = json.dumps(
self.Message(payload=item), ensure_ascii=True, separators=(",", ":")
)
logger.warning(
f"Unicode serialization failed, falling back to ASCII for channel {channel_key}"
)
# Check message size and truncate if necessary
message_size = len(message.encode("utf-8"))
if message_size > MAX_MESSAGE_SIZE:
logger.warning(
f"Message size {message_size} bytes exceeds limit {MAX_MESSAGE_SIZE} bytes for channel {channel_key}. "
"Truncating payload."
)
error_payload = {
"payload": {
"event_type": "error_comms_update",
"error": "Payload too large for transmission",
"original_size_bytes": message_size,
"max_size_bytes": MAX_MESSAGE_SIZE,
}
}
message = json.dumps(
error_payload, ensure_ascii=False, separators=(",", ":")
)
channel_name = f"{self.event_bus_name}/{channel_key}"
logger.debug(f"[{channel_name}] Publishing event: {message[:100]}...")
return message, channel_name
def _deserialize_message(self, msg: dict, channel_key: str) -> M | None:
"""Deserialize a message from the queue"""
message_type = "pmessage" if "*" in channel_key else "message"
if msg["type"] != message_type:
return None
try:
logger.debug(f"[{channel_key}] Consuming event: {msg['data'][:100]}...")
return self.Message.model_validate_json(msg["data"]).payload
except Exception as e:
logger.error(f"Failed to parse event from queue {msg}: {e}")
return None
class InMemorySyncEventBus(BaseInMemoryEventBus[M], ABC):
"""Synchronous in-memory event bus"""
def __init__(self):
self._bus = InMemoryEventBus()
def publish_event(self, event: M, channel_key: str):
"""Publish an event to a channel"""
message, full_channel_name = self._serialize_message(event, channel_key)
self._bus.publish(full_channel_name, message)
def listen_events(self, channel_key: str) -> Generator[M, None, None]:
"""Listen for events on a channel"""
full_channel_name = f"{self.event_bus_name}/{channel_key}"
queue = self._bus.subscribe(full_channel_name)
try:
while True:
try:
message = queue.get(timeout=1.0)
if event := self._deserialize_message(message, channel_key):
yield event
except Empty:
continue
finally:
self._bus.unsubscribe(full_channel_name, queue)
class InMemoryAsyncEventBus(BaseInMemoryEventBus[M], ABC):
"""Asynchronous in-memory event bus"""
def __init__(self):
self._bus = InMemoryEventBus()
async def publish_event(self, event: M, channel_key: str):
"""Publish an event to a channel"""
message, full_channel_name = self._serialize_message(event, channel_key)
await self._bus.publish_async(full_channel_name, message)
async def listen_events(self, channel_key: str) -> AsyncGenerator[M, None]:
"""Listen for events on a channel"""
full_channel_name = f"{self.event_bus_name}/{channel_key}"
queue = await self._bus.subscribe_async(full_channel_name)
try:
while True:
try:
message = await asyncio.wait_for(queue.get(), timeout=1.0)
if event := self._deserialize_message(message, channel_key):
yield event
except asyncio.TimeoutError:
continue
finally:
await self._bus.unsubscribe_async(full_channel_name, queue)
async def wait_for_event(
self, channel_key: str, timeout: float | None = None
) -> M | None:
"""Wait for a single event with optional timeout"""
try:
return await asyncio.wait_for(
anext(aiter(self.listen_events(channel_key))), timeout
)
except (TimeoutError, asyncio.TimeoutError):
return None

View File

@@ -0,0 +1,374 @@
"""
In-memory queue implementation as a fallback for RabbitMQ.
This allows running the platform without an external RabbitMQ server.
"""
import asyncio
import logging
from queue import Queue, Empty
from threading import Lock
from typing import Optional, Callable, Awaitable
from collections import defaultdict
from backend.data.rabbitmq import (
RabbitMQBase,
RabbitMQConfig,
Exchange,
Queue as QueueConfig,
)
logger = logging.getLogger(__name__)
class InMemoryMessageBroker:
"""
Singleton in-memory message broker that manages all queues and exchanges.
Thread-safe implementation.
"""
_instance = None
_lock = Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._queues = {}
cls._instance._async_queues = {}
cls._instance._exchanges = {}
cls._instance._bindings = defaultdict(list) # exchange -> [(queue, routing_key)]
cls._instance._consumers = {} # queue -> [callbacks]
cls._instance._async_consumers = {} # queue -> [async callbacks]
return cls._instance
def declare_exchange(self, exchange: Exchange):
"""Declare an exchange"""
with self._lock:
if exchange.name not in self._exchanges:
self._exchanges[exchange.name] = exchange
logger.debug(f"Declared exchange: {exchange.name} (type: {exchange.type})")
def declare_queue(self, queue: QueueConfig):
"""Declare a queue"""
with self._lock:
if queue.name not in self._queues:
self._queues[queue.name] = Queue()
self._async_queues[queue.name] = asyncio.Queue()
logger.debug(f"Declared queue: {queue.name}")
def bind_queue(self, queue_name: str, exchange_name: str, routing_key: str):
"""Bind a queue to an exchange with a routing key"""
with self._lock:
self._bindings[exchange_name].append((queue_name, routing_key))
logger.debug(f"Bound queue {queue_name} to exchange {exchange_name} with key {routing_key}")
def publish(self, exchange_name: str, routing_key: str, message: str):
"""Publish a message to an exchange"""
with self._lock:
if not exchange_name:
# Direct publish to queue (default exchange)
if routing_key in self._queues:
self._queues[routing_key].put(message)
logger.debug(f"Published to queue {routing_key}: {message[:100]}...")
return
# Publish to exchange
exchange = self._exchanges.get(exchange_name)
if not exchange:
logger.warning(f"Exchange {exchange_name} not found")
return
# Route message based on exchange type
for queue_name, bound_key in self._bindings.get(exchange_name, []):
if self._matches_routing_key(exchange, routing_key, bound_key):
if queue_name in self._queues:
self._queues[queue_name].put(message)
logger.debug(f"Routed to queue {queue_name}: {message[:100]}...")
async def publish_async(self, exchange_name: str, routing_key: str, message: str):
"""Async version of publish"""
with self._lock:
if not exchange_name:
# Direct publish to queue (default exchange)
if routing_key in self._async_queues:
await self._async_queues[routing_key].put(message)
logger.debug(f"Published async to queue {routing_key}: {message[:100]}...")
return
# Publish to exchange
exchange = self._exchanges.get(exchange_name)
if not exchange:
logger.warning(f"Exchange {exchange_name} not found")
return
# Route message based on exchange type
for queue_name, bound_key in self._bindings.get(exchange_name, []):
if self._matches_routing_key(exchange, routing_key, bound_key):
if queue_name in self._async_queues:
await self._async_queues[queue_name].put(message)
logger.debug(f"Routed async to queue {queue_name}: {message[:100]}...")
def get_message(self, queue_name: str, timeout: float = 1.0) -> Optional[str]:
"""Get a message from a queue"""
if queue_name not in self._queues:
return None
try:
return self._queues[queue_name].get(timeout=timeout)
except Empty:
return None
async def get_message_async(self, queue_name: str, timeout: float = 1.0) -> Optional[str]:
"""Async version of get_message"""
if queue_name not in self._async_queues:
return None
try:
return await asyncio.wait_for(
self._async_queues[queue_name].get(),
timeout=timeout
)
except asyncio.TimeoutError:
return None
@staticmethod
def _matches_routing_key(exchange: Exchange, routing_key: str, bound_key: str) -> bool:
"""Check if a routing key matches a bound key based on exchange type"""
from backend.data.rabbitmq import ExchangeType
if exchange.type == ExchangeType.FANOUT:
# Fanout exchanges ignore routing keys
return True
elif exchange.type == ExchangeType.DIRECT:
# Direct exchanges require exact match
return routing_key == bound_key
elif exchange.type == ExchangeType.TOPIC:
# Topic exchanges support wildcards (* and #)
return InMemoryMessageBroker._match_topic(routing_key, bound_key)
else:
# Headers exchange not implemented, default to exact match
return routing_key == bound_key
@staticmethod
def _match_topic(routing_key: str, pattern: str) -> bool:
"""Match a routing key against a topic pattern"""
routing_parts = routing_key.split('.')
pattern_parts = pattern.split('.')
if len(pattern_parts) > len(routing_parts) and '#' not in pattern:
return False
for i, pattern_part in enumerate(pattern_parts):
if pattern_part == '#':
# # matches zero or more words
return True
if pattern_part == '*':
# * matches exactly one word
if i >= len(routing_parts):
return False
continue
if i >= len(routing_parts) or routing_parts[i] != pattern_part:
return False
return len(routing_parts) == len(pattern_parts)
class InMemorySyncRabbitMQ(RabbitMQBase):
"""Synchronous in-memory RabbitMQ replacement"""
def __init__(self, config: RabbitMQConfig):
# Don't call super().__init__() as it tries to connect to real RabbitMQ
self.config = config
self._connection = True # Fake connection
self._channel = True # Fake channel
self._broker = InMemoryMessageBroker()
@property
def is_connected(self) -> bool:
return bool(self._connection)
@property
def is_ready(self) -> bool:
return bool(self._connection and self._channel)
def connect(self) -> None:
"""Establish fake connection"""
if self.is_connected:
return
logger.info("Connecting to in-memory message broker")
self._connection = True
self._channel = True
self.declare_infrastructure()
def disconnect(self) -> None:
"""Close fake connection"""
self._connection = None
self._channel = None
def declare_infrastructure(self) -> None:
"""Declare exchanges and queues"""
if not self.is_ready:
self.connect()
# Declare exchanges
for exchange in self.config.exchanges:
self._broker.declare_exchange(exchange)
# Declare queues and bind them
for queue in self.config.queues:
self._broker.declare_queue(queue)
if queue.exchange:
self._broker.bind_queue(
queue.name,
queue.exchange.name,
queue.routing_key or queue.name
)
def publish_message(
self,
routing_key: str,
message: str,
exchange: Optional[Exchange] = None,
properties=None,
mandatory: bool = True,
) -> None:
"""Publish a message"""
if not self.is_ready:
self.connect()
exchange_name = exchange.name if exchange else ""
self._broker.publish(exchange_name, routing_key, message)
def get_channel(self):
"""Get the channel (returns self for in-memory implementation)"""
if not self.is_ready:
self.connect()
return self
def basic_consume(
self,
queue: str,
on_message_callback: Callable,
auto_ack: bool = False
):
"""
Consume messages from a queue.
This is a simplified implementation for testing.
"""
def consume_loop():
while True:
message = self._broker.get_message(queue, timeout=1.0)
if message:
# Create a mock message object
class MockMethod:
delivery_tag = 1
class MockMessage:
def __init__(self, body):
self.body = body.encode() if isinstance(body, str) else body
on_message_callback(self, MockMethod(), None, MockMessage(message))
# Start consumer in background (simplified, not production-ready)
import threading
thread = threading.Thread(target=consume_loop, daemon=True)
thread.start()
def basic_ack(self, delivery_tag):
"""Acknowledge a message (no-op for in-memory)"""
pass
def basic_qos(self, prefetch_count: int):
"""Set QoS (no-op for in-memory)"""
pass
class InMemoryAsyncRabbitMQ(RabbitMQBase):
"""Asynchronous in-memory RabbitMQ replacement"""
def __init__(self, config: RabbitMQConfig):
# Don't call super().__init__() as it tries to connect to real RabbitMQ
self.config = config
self._connection = True # Fake connection
self._channel = True # Fake channel
self._broker = InMemoryMessageBroker()
@property
def is_connected(self) -> bool:
return bool(self._connection)
@property
def is_ready(self) -> bool:
return bool(self._connection and self._channel)
async def connect(self):
"""Establish fake connection"""
if self.is_connected:
return
logger.info("Connecting to in-memory async message broker")
self._connection = True
self._channel = True
await self.declare_infrastructure()
async def disconnect(self):
"""Close fake connection"""
self._connection = None
self._channel = None
async def declare_infrastructure(self):
"""Declare exchanges and queues"""
if not self.is_ready:
await self.connect()
# Declare exchanges
for exchange in self.config.exchanges:
self._broker.declare_exchange(exchange)
# Declare queues and bind them
for queue in self.config.queues:
self._broker.declare_queue(queue)
if queue.exchange:
self._broker.bind_queue(
queue.name,
queue.exchange.name,
queue.routing_key or queue.name
)
async def publish_message(
self,
routing_key: str,
message: str,
exchange: Optional[Exchange] = None,
persistent: bool = True,
) -> None:
"""Publish a message"""
if not self.is_ready:
await self.connect()
exchange_name = exchange.name if exchange else ""
await self._broker.publish_async(exchange_name, routing_key, message)
async def get_channel(self):
"""Get the channel (returns self for in-memory implementation)"""
if not self.is_ready:
await self.connect()
return self
async def consume_messages(
self,
queue_name: str,
callback: Callable[[str], Awaitable[None]]
):
"""
Consume messages from a queue asynchronously.
Calls the callback for each message.
"""
while True:
message = await self._broker.get_message_async(queue_name, timeout=1.0)
if message:
try:
await callback(message)
except Exception as e:
logger.error(f"Error processing message: {e}")
async def get_message(self, queue_name: str, timeout: float = 1.0) -> Optional[str]:
"""Get a single message from a queue"""
return await self._broker.get_message_async(queue_name, timeout)

View File

@@ -0,0 +1,293 @@
"""
In-memory Redis client implementation as a fallback.
This allows running the platform without an external Redis server.
"""
import asyncio
import logging
from collections import defaultdict
from threading import Lock
from typing import Any, Optional
logger = logging.getLogger(__name__)
class InMemoryRedis:
"""
In-memory Redis-compatible client.
Thread-safe singleton implementation.
"""
_instance = None
_lock = Lock()
def __new__(cls):
if cls._instance is None:
with cls._lock:
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._data = {}
cls._instance._pubsub_channels = defaultdict(list)
cls._instance._data_lock = Lock()
return cls._instance
def ping(self) -> bool:
"""Test connection (always succeeds for in-memory)"""
return True
def get(self, key: str) -> Optional[str]:
"""Get a value by key"""
with self._data_lock:
return self._data.get(key)
def set(
self,
key: str,
value: Any,
ex: Optional[int] = None,
px: Optional[int] = None,
nx: bool = False,
xx: bool = False,
) -> bool:
"""Set a value with optional expiration"""
with self._data_lock:
if nx and key in self._data:
return False
if xx and key not in self._data:
return False
self._data[key] = str(value)
# Note: Expiration (ex, px) not implemented in this simple version
if ex or px:
logger.debug(f"Expiration not implemented in InMemoryRedis (key: {key})")
return True
def delete(self, *keys: str) -> int:
"""Delete one or more keys"""
with self._data_lock:
count = 0
for key in keys:
if key in self._data:
del self._data[key]
count += 1
return count
def exists(self, *keys: str) -> int:
"""Check if keys exist"""
with self._data_lock:
return sum(1 for key in keys if key in self._data)
def keys(self, pattern: str = "*") -> list[str]:
"""Get all keys matching a pattern"""
with self._data_lock:
if pattern == "*":
return list(self._data.keys())
# Simple pattern matching
import re
regex_pattern = pattern.replace("*", ".*").replace("?", ".")
regex = re.compile(regex_pattern)
return [key for key in self._data.keys() if regex.match(key)]
def incr(self, key: str) -> int:
"""Increment a key"""
with self._data_lock:
current = int(self._data.get(key, 0))
current += 1
self._data[key] = str(current)
return current
def decr(self, key: str) -> int:
"""Decrement a key"""
with self._data_lock:
current = int(self._data.get(key, 0))
current -= 1
self._data[key] = str(current)
return current
def expire(self, key: str, seconds: int) -> bool:
"""Set expiration on a key (not implemented)"""
logger.debug(f"Expiration not implemented in InMemoryRedis (key: {key})")
return key in self._data
def ttl(self, key: str) -> int:
"""Get time to live (always returns -1 for no expiration)"""
with self._data_lock:
if key not in self._data:
return -2 # Key doesn't exist
return -1 # No expiration
def publish(self, channel: str, message: str) -> int:
"""Publish a message to a channel"""
# This is handled by InMemoryEventBus
logger.debug(f"Publish to channel {channel}: {message[:100]}...")
return 1
def pubsub(self):
"""Get a pubsub object"""
return InMemoryPubSub(self)
def close(self):
"""Close connection (no-op for in-memory)"""
pass
def flushdb(self):
"""Flush all data (for testing)"""
with self._data_lock:
self._data.clear()
class InMemoryPubSub:
"""In-memory PubSub implementation"""
def __init__(self, client: InMemoryRedis):
self.client = client
self.channels = []
self.patterns = []
def subscribe(self, *channels: str):
"""Subscribe to channels"""
self.channels.extend(channels)
logger.debug(f"Subscribed to channels: {channels}")
def psubscribe(self, *patterns: str):
"""Subscribe to channel patterns"""
self.patterns.extend(patterns)
logger.debug(f"Subscribed to patterns: {patterns}")
def unsubscribe(self, *channels: str):
"""Unsubscribe from channels"""
for channel in channels:
if channel in self.channels:
self.channels.remove(channel)
def punsubscribe(self, *patterns: str):
"""Unsubscribe from patterns"""
for pattern in patterns:
if pattern in self.patterns:
self.patterns.remove(pattern)
def listen(self):
"""Listen for messages (returns empty generator)"""
# This should be integrated with InMemoryEventBus
# For now, return empty to prevent blocking
return iter([])
def close(self):
"""Close pubsub connection"""
self.channels = []
self.patterns = []
class InMemoryAsyncRedis:
"""
Async in-memory Redis-compatible client.
Uses the same singleton storage as the sync version.
"""
def __init__(self):
self._sync_client = InMemoryRedis()
async def ping(self) -> bool:
"""Test connection (always succeeds for in-memory)"""
return True
async def get(self, key: str) -> Optional[str]:
"""Get a value by key"""
return self._sync_client.get(key)
async def set(
self,
key: str,
value: Any,
ex: Optional[int] = None,
px: Optional[int] = None,
nx: bool = False,
xx: bool = False,
) -> bool:
"""Set a value with optional expiration"""
return self._sync_client.set(key, value, ex, px, nx, xx)
async def delete(self, *keys: str) -> int:
"""Delete one or more keys"""
return self._sync_client.delete(*keys)
async def exists(self, *keys: str) -> int:
"""Check if keys exist"""
return self._sync_client.exists(*keys)
async def keys(self, pattern: str = "*") -> list[str]:
"""Get all keys matching a pattern"""
return self._sync_client.keys(pattern)
async def incr(self, key: str) -> int:
"""Increment a key"""
return self._sync_client.incr(key)
async def decr(self, key: str) -> int:
"""Decrement a key"""
return self._sync_client.decr(key)
async def expire(self, key: str, seconds: int) -> bool:
"""Set expiration on a key (not implemented)"""
return self._sync_client.expire(key, seconds)
async def ttl(self, key: str) -> int:
"""Get time to live"""
return self._sync_client.ttl(key)
async def publish(self, channel: str, message: str) -> int:
"""Publish a message to a channel"""
return self._sync_client.publish(channel, message)
def pubsub(self):
"""Get a pubsub object"""
return InMemoryAsyncPubSub(self)
async def close(self):
"""Close connection (no-op for in-memory)"""
pass
async def flushdb(self):
"""Flush all data (for testing)"""
self._sync_client.flushdb()
class InMemoryAsyncPubSub:
"""Async in-memory PubSub implementation"""
def __init__(self, client: InMemoryAsyncRedis):
self.client = client
self.channels = []
self.patterns = []
async def subscribe(self, *channels: str):
"""Subscribe to channels"""
self.channels.extend(channels)
logger.debug(f"Subscribed to channels: {channels}")
async def psubscribe(self, *patterns: str):
"""Subscribe to channel patterns"""
self.patterns.extend(patterns)
logger.debug(f"Subscribed to patterns: {patterns}")
async def unsubscribe(self, *channels: str):
"""Unsubscribe from channels"""
for channel in channels:
if channel in self.channels:
self.channels.remove(channel)
async def punsubscribe(self, *patterns: str):
"""Unsubscribe from patterns"""
for pattern in patterns:
if pattern in self.patterns:
self.patterns.remove(pattern)
async def listen(self):
"""Listen for messages (returns empty async generator)"""
# This should be integrated with InMemoryEventBus
# For now, return empty to prevent blocking
if False:
yield
async def close(self):
"""Close pubsub connection"""
self.channels = []
self.patterns = []

View File

@@ -1,4 +1,5 @@
import logging
import os
from abc import ABC, abstractmethod
from enum import Enum
from typing import Awaitable, Optional
@@ -14,6 +15,10 @@ from backend.util.settings import Settings
logger = logging.getLogger(__name__)
# Check if we should use in-memory implementations
_settings = Settings()
USE_IN_MEMORY = _settings.config.standalone_mode or os.getenv("STANDALONE_MODE", "").lower() in ("true", "1", "yes")
# RabbitMQ Connection Constants
# These constants solve specific connection stability issues observed in production
@@ -88,6 +93,9 @@ class RabbitMQBase(ABC):
self._connection = None
self._channel = None
# Check if we should use in-memory implementation
self._use_in_memory = USE_IN_MEMORY
@property
def is_connected(self) -> bool:
"""Check if we have a valid connection"""
@@ -117,16 +125,33 @@ class RabbitMQBase(ABC):
class SyncRabbitMQ(RabbitMQBase):
"""Synchronous RabbitMQ client"""
def __init__(self, config: RabbitMQConfig):
super().__init__(config)
if self._use_in_memory:
from backend.data.inmemory_queue import InMemorySyncRabbitMQ
self._in_memory_client = InMemorySyncRabbitMQ(config)
logger.info("Using in-memory RabbitMQ client (standalone mode)")
else:
self._in_memory_client = None
@property
def is_connected(self) -> bool:
if self._in_memory_client:
return self._in_memory_client.is_connected
return bool(self._connection and self._connection.is_open)
@property
def is_ready(self) -> bool:
if self._in_memory_client:
return self._in_memory_client.is_ready
return bool(self.is_connected and self._channel and self._channel.is_open)
@conn_retry("RabbitMQ", "Acquiring connection")
def connect(self) -> None:
if self._in_memory_client:
self._in_memory_client.connect()
return
if self.is_connected:
return
@@ -150,6 +175,10 @@ class SyncRabbitMQ(RabbitMQBase):
self.declare_infrastructure()
def disconnect(self) -> None:
if self._in_memory_client:
self._in_memory_client.disconnect()
return
if self._channel:
if self._channel.is_open:
self._channel.close()
@@ -161,6 +190,10 @@ class SyncRabbitMQ(RabbitMQBase):
def declare_infrastructure(self) -> None:
"""Declare exchanges and queues for this service"""
if self._in_memory_client:
self._in_memory_client.declare_infrastructure()
return
if not self.is_ready:
self.connect()
@@ -200,6 +233,10 @@ class SyncRabbitMQ(RabbitMQBase):
properties: Optional[BasicProperties] = None,
mandatory: bool = True,
) -> None:
if self._in_memory_client:
self._in_memory_client.publish_message(routing_key, message, exchange, properties, mandatory)
return
if not self.is_ready:
self.connect()
@@ -214,7 +251,10 @@ class SyncRabbitMQ(RabbitMQBase):
mandatory=mandatory,
)
def get_channel(self) -> pika.adapters.blocking_connection.BlockingChannel:
def get_channel(self):
if self._in_memory_client:
return self._in_memory_client.get_channel()
if not self.is_ready:
self.connect()
if self._channel is None:
@@ -225,16 +265,33 @@ class SyncRabbitMQ(RabbitMQBase):
class AsyncRabbitMQ(RabbitMQBase):
"""Asynchronous RabbitMQ client"""
def __init__(self, config: RabbitMQConfig):
super().__init__(config)
if self._use_in_memory:
from backend.data.inmemory_queue import InMemoryAsyncRabbitMQ
self._in_memory_client = InMemoryAsyncRabbitMQ(config)
logger.info("Using in-memory async RabbitMQ client (standalone mode)")
else:
self._in_memory_client = None
@property
def is_connected(self) -> bool:
if self._in_memory_client:
return self._in_memory_client.is_connected
return bool(self._connection and not self._connection.is_closed)
@property
def is_ready(self) -> bool:
if self._in_memory_client:
return self._in_memory_client.is_ready
return bool(self.is_connected and self._channel and not self._channel.is_closed)
@conn_retry("AsyncRabbitMQ", "Acquiring async connection")
async def connect(self):
if self._in_memory_client:
await self._in_memory_client.connect()
return
if self.is_connected:
return
@@ -253,6 +310,10 @@ class AsyncRabbitMQ(RabbitMQBase):
await self.declare_infrastructure()
async def disconnect(self):
if self._in_memory_client:
await self._in_memory_client.disconnect()
return
if self._channel:
await self._channel.close()
self._channel = None
@@ -262,6 +323,10 @@ class AsyncRabbitMQ(RabbitMQBase):
async def declare_infrastructure(self):
"""Declare exchanges and queues for this service"""
if self._in_memory_client:
await self._in_memory_client.declare_infrastructure()
return
if not self.is_ready:
await self.connect()
@@ -299,6 +364,10 @@ class AsyncRabbitMQ(RabbitMQBase):
exchange: Optional[Exchange] = None,
persistent: bool = True,
) -> None:
if self._in_memory_client:
await self._in_memory_client.publish_message(routing_key, message, exchange, persistent)
return
if not self.is_ready:
await self.connect()
@@ -322,7 +391,10 @@ class AsyncRabbitMQ(RabbitMQBase):
routing_key=routing_key,
)
async def get_channel(self) -> aio_pika.abc.AbstractChannel:
async def get_channel(self):
if self._in_memory_client:
return await self._in_memory_client.get_channel()
if not self.is_ready:
await self.connect()
if self._channel is None:

View File

@@ -7,6 +7,7 @@ from redis import Redis
from redis.asyncio import Redis as AsyncRedis
from backend.util.retry import conn_retry
from backend.util.settings import Settings
load_dotenv()
@@ -16,9 +17,18 @@ PASSWORD = os.getenv("REDIS_PASSWORD", None)
logger = logging.getLogger(__name__)
# Check if we should use in-memory implementations
_settings = Settings()
USE_IN_MEMORY = _settings.config.standalone_mode or os.getenv("STANDALONE_MODE", "").lower() in ("true", "1", "yes")
@conn_retry("Redis", "Acquiring connection")
def connect() -> Redis:
if USE_IN_MEMORY:
from backend.data.inmemory_redis import InMemoryRedis
logger.info("Using in-memory Redis client (standalone mode)")
return InMemoryRedis()
c = Redis(
host=HOST,
port=PORT,
@@ -41,6 +51,11 @@ def get_redis() -> Redis:
@conn_retry("AsyncRedis", "Acquiring connection")
async def connect_async() -> AsyncRedis:
if USE_IN_MEMORY:
from backend.data.inmemory_redis import InMemoryAsyncRedis
logger.info("Using in-memory async Redis client (standalone mode)")
return InMemoryAsyncRedis()
c = AsyncRedis(
host=HOST,
port=PORT,

View File

@@ -316,6 +316,11 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="Whether to enable example blocks in production",
)
standalone_mode: bool = Field(
default=False,
description="Whether to run in standalone mode without external dependencies (Redis, RabbitMQ, Supabase). Uses in-memory implementations instead.",
)
cloud_storage_cleanup_interval_hours: int = Field(
default=6,
ge=1,