mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-01-12 16:48:06 -05:00
Compare commits
43 Commits
dependabot
...
swiftyos/s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
01bab66f5c | ||
|
|
30b2d6b50d | ||
|
|
97e77339fd | ||
|
|
6a360a49b1 | ||
|
|
045009a84a | ||
|
|
70cb7824fd | ||
|
|
df1d15fcfe | ||
|
|
eb022e50a7 | ||
|
|
431042a391 | ||
|
|
7c6a9146f0 | ||
|
|
0264cb56d3 | ||
|
|
ef552c189f | ||
|
|
e75cf2b765 | ||
|
|
0f6d1f54ee | ||
|
|
b3fe2b84ce | ||
|
|
e13861ad33 | ||
|
|
e2c24bd463 | ||
|
|
9f5afff83e | ||
|
|
ced61e2640 | ||
|
|
c9a7cc63da | ||
|
|
7afa01a168 | ||
|
|
2f9aba0420 | ||
|
|
ff4b0929e1 | ||
|
|
2230c76863 | ||
|
|
b3443e0549 | ||
|
|
b1364b1701 | ||
|
|
95d66a035c | ||
|
|
30cdf9f0d9 | ||
|
|
e47f0e7f2f | ||
|
|
13d71464a0 | ||
|
|
be1947f6d1 | ||
|
|
1afebcf96b | ||
|
|
d124c93ff8 | ||
|
|
bc5eb8a8a5 | ||
|
|
872ef5fdfb | ||
|
|
12382e7990 | ||
|
|
d68a3a1b53 | ||
|
|
863e213af3 | ||
|
|
c61af53a74 | ||
|
|
eb94503de8 | ||
|
|
ee4feff8c2 | ||
|
|
9147c2d6c8 | ||
|
|
a3af430c69 |
3
.github/workflows/platform-frontend-ci.yml
vendored
3
.github/workflows/platform-frontend-ci.yml
vendored
@@ -217,6 +217,9 @@ jobs:
|
||||
- name: Install dependencies
|
||||
run: pnpm install --frozen-lockfile
|
||||
|
||||
- name: Generate API client
|
||||
run: pnpm generate:api
|
||||
|
||||
- name: Install Browser 'chromium'
|
||||
run: pnpm playwright install --with-deps chromium
|
||||
|
||||
|
||||
@@ -1,339 +0,0 @@
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from functools import wraps
|
||||
from typing import (
|
||||
Any,
|
||||
Callable,
|
||||
ParamSpec,
|
||||
Protocol,
|
||||
TypeVar,
|
||||
cast,
|
||||
runtime_checkable,
|
||||
)
|
||||
|
||||
P = ParamSpec("P")
|
||||
R = TypeVar("R")
|
||||
R_co = TypeVar("R_co", covariant=True)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _make_hashable_key(
|
||||
args: tuple[Any, ...], kwargs: dict[str, Any]
|
||||
) -> tuple[Any, ...]:
|
||||
"""
|
||||
Convert args and kwargs into a hashable cache key.
|
||||
|
||||
Handles unhashable types like dict, list, set by converting them to
|
||||
their sorted string representations.
|
||||
"""
|
||||
|
||||
def make_hashable(obj: Any) -> Any:
|
||||
"""Recursively convert an object to a hashable representation."""
|
||||
if isinstance(obj, dict):
|
||||
# Sort dict items to ensure consistent ordering
|
||||
return (
|
||||
"__dict__",
|
||||
tuple(sorted((k, make_hashable(v)) for k, v in obj.items())),
|
||||
)
|
||||
elif isinstance(obj, (list, tuple)):
|
||||
return ("__list__", tuple(make_hashable(item) for item in obj))
|
||||
elif isinstance(obj, set):
|
||||
return ("__set__", tuple(sorted(make_hashable(item) for item in obj)))
|
||||
elif hasattr(obj, "__dict__"):
|
||||
# Handle objects with __dict__ attribute
|
||||
return ("__obj__", obj.__class__.__name__, make_hashable(obj.__dict__))
|
||||
else:
|
||||
# For basic hashable types (str, int, bool, None, etc.)
|
||||
try:
|
||||
hash(obj)
|
||||
return obj
|
||||
except TypeError:
|
||||
# Fallback: convert to string representation
|
||||
return ("__str__", str(obj))
|
||||
|
||||
hashable_args = tuple(make_hashable(arg) for arg in args)
|
||||
hashable_kwargs = tuple(sorted((k, make_hashable(v)) for k, v in kwargs.items()))
|
||||
return (hashable_args, hashable_kwargs)
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class CachedFunction(Protocol[P, R_co]):
|
||||
"""Protocol for cached functions with cache management methods."""
|
||||
|
||||
def cache_clear(self) -> None:
|
||||
"""Clear all cached entries."""
|
||||
return None
|
||||
|
||||
def cache_info(self) -> dict[str, int | None]:
|
||||
"""Get cache statistics."""
|
||||
return {}
|
||||
|
||||
def cache_delete(self, *args: P.args, **kwargs: P.kwargs) -> bool:
|
||||
"""Delete a specific cache entry by its arguments. Returns True if entry existed."""
|
||||
return False
|
||||
|
||||
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R_co:
|
||||
"""Call the cached function."""
|
||||
return None # type: ignore
|
||||
|
||||
|
||||
def cached(
|
||||
*,
|
||||
maxsize: int = 128,
|
||||
ttl_seconds: int | None = None,
|
||||
) -> Callable[[Callable], CachedFunction]:
|
||||
"""
|
||||
Thundering herd safe cache decorator for both sync and async functions.
|
||||
|
||||
Uses double-checked locking to prevent multiple threads/coroutines from
|
||||
executing the expensive operation simultaneously during cache misses.
|
||||
|
||||
Args:
|
||||
func: The function to cache (when used without parentheses)
|
||||
maxsize: Maximum number of cached entries
|
||||
ttl_seconds: Time to live in seconds. If None, entries never expire
|
||||
|
||||
Returns:
|
||||
Decorated function or decorator
|
||||
|
||||
Example:
|
||||
@cache() # Default: maxsize=128, no TTL
|
||||
def expensive_sync_operation(param: str) -> dict:
|
||||
return {"result": param}
|
||||
|
||||
@cache() # Works with async too
|
||||
async def expensive_async_operation(param: str) -> dict:
|
||||
return {"result": param}
|
||||
|
||||
@cache(maxsize=1000, ttl_seconds=300) # Custom maxsize and TTL
|
||||
def another_operation(param: str) -> dict:
|
||||
return {"result": param}
|
||||
"""
|
||||
|
||||
def decorator(target_func):
|
||||
# Cache storage and per-event-loop locks
|
||||
cache_storage = {}
|
||||
_event_loop_locks = {} # Maps event loop to its asyncio.Lock
|
||||
|
||||
if inspect.iscoroutinefunction(target_func):
|
||||
|
||||
def _get_cache_lock():
|
||||
"""Get or create an asyncio.Lock for the current event loop."""
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
# No event loop, use None as default key
|
||||
loop = None
|
||||
|
||||
if loop not in _event_loop_locks:
|
||||
return _event_loop_locks.setdefault(loop, asyncio.Lock())
|
||||
return _event_loop_locks[loop]
|
||||
|
||||
@wraps(target_func)
|
||||
async def async_wrapper(*args: P.args, **kwargs: P.kwargs):
|
||||
key = _make_hashable_key(args, kwargs)
|
||||
current_time = time.time()
|
||||
|
||||
# Fast path: check cache without lock
|
||||
if key in cache_storage:
|
||||
if ttl_seconds is None:
|
||||
logger.debug(f"Cache hit for {target_func.__name__}")
|
||||
return cache_storage[key]
|
||||
else:
|
||||
cached_data = cache_storage[key]
|
||||
if isinstance(cached_data, tuple):
|
||||
result, timestamp = cached_data
|
||||
if current_time - timestamp < ttl_seconds:
|
||||
logger.debug(f"Cache hit for {target_func.__name__}")
|
||||
return result
|
||||
|
||||
# Slow path: acquire lock for cache miss/expiry
|
||||
async with _get_cache_lock():
|
||||
# Double-check: another coroutine might have populated cache
|
||||
if key in cache_storage:
|
||||
if ttl_seconds is None:
|
||||
return cache_storage[key]
|
||||
else:
|
||||
cached_data = cache_storage[key]
|
||||
if isinstance(cached_data, tuple):
|
||||
result, timestamp = cached_data
|
||||
if current_time - timestamp < ttl_seconds:
|
||||
return result
|
||||
|
||||
# Cache miss - execute function
|
||||
logger.debug(f"Cache miss for {target_func.__name__}")
|
||||
result = await target_func(*args, **kwargs)
|
||||
|
||||
# Store result
|
||||
if ttl_seconds is None:
|
||||
cache_storage[key] = result
|
||||
else:
|
||||
cache_storage[key] = (result, current_time)
|
||||
|
||||
# Cleanup if needed
|
||||
if len(cache_storage) > maxsize:
|
||||
cutoff = maxsize // 2
|
||||
oldest_keys = (
|
||||
list(cache_storage.keys())[:-cutoff] if cutoff > 0 else []
|
||||
)
|
||||
for old_key in oldest_keys:
|
||||
cache_storage.pop(old_key, None)
|
||||
|
||||
return result
|
||||
|
||||
wrapper = async_wrapper
|
||||
|
||||
else:
|
||||
# Sync function with threading.Lock
|
||||
cache_lock = threading.Lock()
|
||||
|
||||
@wraps(target_func)
|
||||
def sync_wrapper(*args: P.args, **kwargs: P.kwargs):
|
||||
key = _make_hashable_key(args, kwargs)
|
||||
current_time = time.time()
|
||||
|
||||
# Fast path: check cache without lock
|
||||
if key in cache_storage:
|
||||
if ttl_seconds is None:
|
||||
logger.debug(f"Cache hit for {target_func.__name__}")
|
||||
return cache_storage[key]
|
||||
else:
|
||||
cached_data = cache_storage[key]
|
||||
if isinstance(cached_data, tuple):
|
||||
result, timestamp = cached_data
|
||||
if current_time - timestamp < ttl_seconds:
|
||||
logger.debug(f"Cache hit for {target_func.__name__}")
|
||||
return result
|
||||
|
||||
# Slow path: acquire lock for cache miss/expiry
|
||||
with cache_lock:
|
||||
# Double-check: another thread might have populated cache
|
||||
if key in cache_storage:
|
||||
if ttl_seconds is None:
|
||||
return cache_storage[key]
|
||||
else:
|
||||
cached_data = cache_storage[key]
|
||||
if isinstance(cached_data, tuple):
|
||||
result, timestamp = cached_data
|
||||
if current_time - timestamp < ttl_seconds:
|
||||
return result
|
||||
|
||||
# Cache miss - execute function
|
||||
logger.debug(f"Cache miss for {target_func.__name__}")
|
||||
result = target_func(*args, **kwargs)
|
||||
|
||||
# Store result
|
||||
if ttl_seconds is None:
|
||||
cache_storage[key] = result
|
||||
else:
|
||||
cache_storage[key] = (result, current_time)
|
||||
|
||||
# Cleanup if needed
|
||||
if len(cache_storage) > maxsize:
|
||||
cutoff = maxsize // 2
|
||||
oldest_keys = (
|
||||
list(cache_storage.keys())[:-cutoff] if cutoff > 0 else []
|
||||
)
|
||||
for old_key in oldest_keys:
|
||||
cache_storage.pop(old_key, None)
|
||||
|
||||
return result
|
||||
|
||||
wrapper = sync_wrapper
|
||||
|
||||
# Add cache management methods
|
||||
def cache_clear() -> None:
|
||||
cache_storage.clear()
|
||||
|
||||
def cache_info() -> dict[str, int | None]:
|
||||
return {
|
||||
"size": len(cache_storage),
|
||||
"maxsize": maxsize,
|
||||
"ttl_seconds": ttl_seconds,
|
||||
}
|
||||
|
||||
def cache_delete(*args, **kwargs) -> bool:
|
||||
"""Delete a specific cache entry. Returns True if entry existed."""
|
||||
key = _make_hashable_key(args, kwargs)
|
||||
if key in cache_storage:
|
||||
del cache_storage[key]
|
||||
return True
|
||||
return False
|
||||
|
||||
setattr(wrapper, "cache_clear", cache_clear)
|
||||
setattr(wrapper, "cache_info", cache_info)
|
||||
setattr(wrapper, "cache_delete", cache_delete)
|
||||
|
||||
return cast(CachedFunction, wrapper)
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def thread_cached(func):
|
||||
"""
|
||||
Thread-local cache decorator for both sync and async functions.
|
||||
|
||||
Each thread gets its own cache, which is useful for request-scoped caching
|
||||
in web applications where you want to cache within a single request but
|
||||
not across requests.
|
||||
|
||||
Args:
|
||||
func: The function to cache
|
||||
|
||||
Returns:
|
||||
Decorated function with thread-local caching
|
||||
|
||||
Example:
|
||||
@thread_cached
|
||||
def expensive_operation(param: str) -> dict:
|
||||
return {"result": param}
|
||||
|
||||
@thread_cached # Works with async too
|
||||
async def expensive_async_operation(param: str) -> dict:
|
||||
return {"result": param}
|
||||
"""
|
||||
thread_local = threading.local()
|
||||
|
||||
def _clear():
|
||||
if hasattr(thread_local, "cache"):
|
||||
del thread_local.cache
|
||||
|
||||
if inspect.iscoroutinefunction(func):
|
||||
|
||||
@wraps(func)
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
cache = getattr(thread_local, "cache", None)
|
||||
if cache is None:
|
||||
cache = thread_local.cache = {}
|
||||
key = _make_hashable_key(args, kwargs)
|
||||
if key not in cache:
|
||||
cache[key] = await func(*args, **kwargs)
|
||||
return cache[key]
|
||||
|
||||
setattr(async_wrapper, "clear_cache", _clear)
|
||||
return async_wrapper
|
||||
|
||||
else:
|
||||
|
||||
@wraps(func)
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
cache = getattr(thread_local, "cache", None)
|
||||
if cache is None:
|
||||
cache = thread_local.cache = {}
|
||||
key = _make_hashable_key(args, kwargs)
|
||||
if key not in cache:
|
||||
cache[key] = func(*args, **kwargs)
|
||||
return cache[key]
|
||||
|
||||
setattr(sync_wrapper, "clear_cache", _clear)
|
||||
return sync_wrapper
|
||||
|
||||
|
||||
def clear_thread_cache(func: Callable) -> None:
|
||||
"""Clear thread-local cache for a function."""
|
||||
if clear := getattr(func, "clear_cache", None):
|
||||
clear()
|
||||
18
autogpt_platform/autogpt_libs/poetry.lock
generated
18
autogpt_platform/autogpt_libs/poetry.lock
generated
@@ -1719,6 +1719,22 @@ files = [
|
||||
httpx = {version = ">=0.26,<0.29", extras = ["http2"]}
|
||||
strenum = ">=0.4.15,<0.5.0"
|
||||
|
||||
[[package]]
|
||||
name = "tenacity"
|
||||
version = "9.1.2"
|
||||
description = "Retry code until it succeeds"
|
||||
optional = false
|
||||
python-versions = ">=3.9"
|
||||
groups = ["main"]
|
||||
files = [
|
||||
{file = "tenacity-9.1.2-py3-none-any.whl", hash = "sha256:f77bf36710d8b73a50b2dd155c97b870017ad21afe6ab300326b0371b3b05138"},
|
||||
{file = "tenacity-9.1.2.tar.gz", hash = "sha256:1169d376c297e7de388d18b4481760d478b0e99a777cad3a9c86e556f4b697cb"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
doc = ["reno", "sphinx"]
|
||||
test = ["pytest", "tornado (>=4.5)", "typeguard"]
|
||||
|
||||
[[package]]
|
||||
name = "tomli"
|
||||
version = "2.2.1"
|
||||
@@ -1929,4 +1945,4 @@ type = ["pytest-mypy"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.10,<4.0"
|
||||
content-hash = "0c40b63c3c921846cf05ccfb4e685d4959854b29c2c302245f9832e20aac6954"
|
||||
content-hash = "5ec9e6cd2ef7524a356586354755215699e7b37b9bbdfbabc9c73b43085915f4"
|
||||
|
||||
@@ -19,6 +19,7 @@ pydantic-settings = "^2.10.1"
|
||||
pyjwt = { version = "^2.10.1", extras = ["crypto"] }
|
||||
redis = "^6.2.0"
|
||||
supabase = "^2.16.0"
|
||||
tenacity = "^9.1.2"
|
||||
uvicorn = "^0.35.0"
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
|
||||
@@ -5,7 +5,7 @@ import re
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, TypeVar
|
||||
|
||||
from autogpt_libs.utils.cache import cached
|
||||
from backend.util.cache import cached
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -16,7 +16,7 @@ if TYPE_CHECKING:
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=3600) # Cache blocks for 1 hour
|
||||
def load_all_blocks() -> dict[str, type["Block"]]:
|
||||
from backend.data.block import Block
|
||||
from backend.util.settings import Config
|
||||
|
||||
@@ -20,7 +20,6 @@ from typing import (
|
||||
|
||||
import jsonref
|
||||
import jsonschema
|
||||
from autogpt_libs.utils.cache import cached
|
||||
from prisma.models import AgentBlock
|
||||
from prisma.types import AgentBlockCreateInput
|
||||
from pydantic import BaseModel
|
||||
@@ -28,6 +27,7 @@ from pydantic import BaseModel
|
||||
from backend.data.model import NodeExecutionStats
|
||||
from backend.integrations.providers import ProviderName
|
||||
from backend.util import json
|
||||
from backend.util.cache import cached
|
||||
from backend.util.settings import Config
|
||||
|
||||
from .model import (
|
||||
@@ -722,7 +722,7 @@ def get_block(block_id: str) -> Block[BlockSchema, BlockSchema] | None:
|
||||
return cls() if cls else None
|
||||
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=3600)
|
||||
def get_webhook_block_ids() -> Sequence[str]:
|
||||
return [
|
||||
id
|
||||
@@ -731,7 +731,7 @@ def get_webhook_block_ids() -> Sequence[str]:
|
||||
]
|
||||
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=3600)
|
||||
def get_io_block_ids() -> Sequence[str]:
|
||||
return [
|
||||
id
|
||||
|
||||
@@ -1,29 +1,24 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
from autogpt_libs.utils.cache import cached, thread_cached
|
||||
from dotenv import load_dotenv
|
||||
from redis import Redis
|
||||
from redis.asyncio import Redis as AsyncRedis
|
||||
|
||||
from backend.util.cache import cached, thread_cached
|
||||
from backend.util.retry import conn_retry
|
||||
from backend.util.settings import Settings
|
||||
|
||||
load_dotenv()
|
||||
|
||||
HOST = os.getenv("REDIS_HOST", "localhost")
|
||||
PORT = int(os.getenv("REDIS_PORT", "6379"))
|
||||
PASSWORD = os.getenv("REDIS_PASSWORD", None)
|
||||
settings = Settings()
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@conn_retry("Redis", "Acquiring connection")
|
||||
def connect() -> Redis:
|
||||
def connect(decode_responses: bool = True) -> Redis:
|
||||
c = Redis(
|
||||
host=HOST,
|
||||
port=PORT,
|
||||
password=PASSWORD,
|
||||
decode_responses=True,
|
||||
host=settings.config.redis_host,
|
||||
port=settings.config.redis_port,
|
||||
password=settings.config.redis_password or None,
|
||||
decode_responses=decode_responses,
|
||||
)
|
||||
c.ping()
|
||||
return c
|
||||
@@ -34,7 +29,7 @@ def disconnect():
|
||||
get_redis().close()
|
||||
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=3600)
|
||||
def get_redis() -> Redis:
|
||||
return connect()
|
||||
|
||||
@@ -42,9 +37,9 @@ def get_redis() -> Redis:
|
||||
@conn_retry("AsyncRedis", "Acquiring connection")
|
||||
async def connect_async() -> AsyncRedis:
|
||||
c = AsyncRedis(
|
||||
host=HOST,
|
||||
port=PORT,
|
||||
password=PASSWORD,
|
||||
host=settings.config.redis_host,
|
||||
port=settings.config.redis_port,
|
||||
password=settings.config.redis_password or None,
|
||||
decode_responses=True,
|
||||
)
|
||||
await c.ping()
|
||||
|
||||
@@ -7,7 +7,6 @@ from typing import Optional, cast
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
from autogpt_libs.auth.models import DEFAULT_USER_ID
|
||||
from autogpt_libs.utils.cache import cached
|
||||
from fastapi import HTTPException
|
||||
from prisma.enums import NotificationType
|
||||
from prisma.models import User as PrismaUser
|
||||
@@ -17,6 +16,7 @@ from backend.data.db import prisma
|
||||
from backend.data.model import User, UserIntegrations, UserMetadata
|
||||
from backend.data.notifications import NotificationPreference, NotificationPreferenceDTO
|
||||
from backend.server.v2.store.exceptions import DatabaseError
|
||||
from backend.util.cache import cached
|
||||
from backend.util.encryption import JSONCryptor
|
||||
from backend.util.json import SafeJson
|
||||
from backend.util.settings import Settings
|
||||
|
||||
@@ -22,13 +22,15 @@ logger = logging.getLogger(__name__)
|
||||
@pytest.fixture
|
||||
def redis_client():
|
||||
"""Get Redis client for testing using same config as backend."""
|
||||
from backend.data.redis_client import HOST, PASSWORD, PORT
|
||||
from backend.util.settings import Settings
|
||||
|
||||
settings = Settings()
|
||||
|
||||
# Use same config as backend but without decode_responses since ClusterLock needs raw bytes
|
||||
client = redis.Redis(
|
||||
host=HOST,
|
||||
port=PORT,
|
||||
password=PASSWORD,
|
||||
host=settings.config.redis_host,
|
||||
port=settings.config.redis_port,
|
||||
password=settings.config.redis_password or None,
|
||||
decode_responses=False, # ClusterLock needs raw bytes for ownership verification
|
||||
)
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from autogpt_libs.utils.cache import cached
|
||||
from backend.util.cache import cached
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..providers import ProviderName
|
||||
@@ -8,7 +8,7 @@ if TYPE_CHECKING:
|
||||
|
||||
|
||||
# --8<-- [start:load_webhook_managers]
|
||||
@cached()
|
||||
@cached(ttl_seconds=3600) # Cache webhook managers for 1 hour
|
||||
def load_webhook_managers() -> dict["ProviderName", type["BaseWebhooksManager"]]:
|
||||
webhook_managers = {}
|
||||
|
||||
|
||||
86
autogpt_platform/backend/backend/server/cache_config.py
Normal file
86
autogpt_platform/backend/backend/server/cache_config.py
Normal file
@@ -0,0 +1,86 @@
|
||||
"""
|
||||
Shared cache configuration constants.
|
||||
|
||||
This module defines all page_size defaults used across the application.
|
||||
By centralizing these values, we ensure that cache invalidation always
|
||||
uses the same page_size as the routes that populate the cache.
|
||||
|
||||
CRITICAL: If you change any of these values, the tests in
|
||||
test_cache_invalidation_consistency.py will fail to remind you to
|
||||
update all dependent code.
|
||||
"""
|
||||
|
||||
# V1 API (legacy) page sizes
|
||||
V1_GRAPHS_PAGE_SIZE = 250
|
||||
"""Default page size for listing user graphs in v1 API."""
|
||||
|
||||
V1_LIBRARY_AGENTS_PAGE_SIZE = 10
|
||||
"""Default page size for library agents in v1 API."""
|
||||
|
||||
V1_GRAPH_EXECUTIONS_PAGE_SIZE = 25
|
||||
"""Default page size for graph executions in v1 API."""
|
||||
|
||||
# V2 Store API page sizes
|
||||
V2_STORE_AGENTS_PAGE_SIZE = 20
|
||||
"""Default page size for store agents listing."""
|
||||
|
||||
V2_STORE_CREATORS_PAGE_SIZE = 20
|
||||
"""Default page size for store creators listing."""
|
||||
|
||||
V2_STORE_SUBMISSIONS_PAGE_SIZE = 20
|
||||
"""Default page size for user submissions listing."""
|
||||
|
||||
V2_MY_AGENTS_PAGE_SIZE = 20
|
||||
"""Default page size for user's own agents listing."""
|
||||
|
||||
# V2 Library API page sizes
|
||||
V2_LIBRARY_AGENTS_PAGE_SIZE = 10
|
||||
"""Default page size for library agents listing in v2 API."""
|
||||
|
||||
V2_LIBRARY_PRESETS_PAGE_SIZE = 20
|
||||
"""Default page size for library presets listing."""
|
||||
|
||||
# Alternative page sizes (for backward compatibility or special cases)
|
||||
V2_LIBRARY_PRESETS_ALT_PAGE_SIZE = 10
|
||||
"""
|
||||
Alternative page size for library presets.
|
||||
Some clients may use this smaller page size, so cache clearing must handle both.
|
||||
"""
|
||||
|
||||
V2_GRAPH_EXECUTIONS_ALT_PAGE_SIZE = 10
|
||||
"""
|
||||
Alternative page size for graph executions.
|
||||
Some clients may use this smaller page size, so cache clearing must handle both.
|
||||
"""
|
||||
|
||||
# Cache clearing configuration
|
||||
MAX_PAGES_TO_CLEAR = 20
|
||||
"""
|
||||
Maximum number of pages to clear when invalidating paginated caches.
|
||||
This prevents infinite loops while ensuring we clear most cached pages.
|
||||
For users with more than 20 pages, those pages will expire naturally via TTL.
|
||||
"""
|
||||
|
||||
|
||||
def get_page_sizes_for_clearing(
|
||||
primary_page_size: int, alt_page_size: int | None = None
|
||||
) -> list[int]:
|
||||
"""
|
||||
Get all page_size values that should be cleared for a given cache.
|
||||
|
||||
Args:
|
||||
primary_page_size: The main page_size used by the route
|
||||
alt_page_size: Optional alternative page_size if multiple clients use different sizes
|
||||
|
||||
Returns:
|
||||
List of page_size values to clear
|
||||
|
||||
Example:
|
||||
>>> get_page_sizes_for_clearing(20)
|
||||
[20]
|
||||
>>> get_page_sizes_for_clearing(20, 10)
|
||||
[20, 10]
|
||||
"""
|
||||
if alt_page_size is None:
|
||||
return [primary_page_size]
|
||||
return [primary_page_size, alt_page_size]
|
||||
154
autogpt_platform/backend/backend/server/routers/cache.py
Normal file
154
autogpt_platform/backend/backend/server/routers/cache.py
Normal file
@@ -0,0 +1,154 @@
|
||||
"""
|
||||
Cache functions for main V1 API endpoints.
|
||||
|
||||
This module contains all caching decorators and helpers for the V1 API,
|
||||
separated from the main routes for better organization and maintainability.
|
||||
"""
|
||||
|
||||
from typing import Sequence
|
||||
|
||||
from backend.data import execution as execution_db
|
||||
from backend.data import graph as graph_db
|
||||
from backend.data import user as user_db
|
||||
from backend.data.block import get_blocks
|
||||
from backend.util.cache import cached
|
||||
|
||||
# ===== Block Caches =====
|
||||
|
||||
|
||||
# Cache block definitions with costs - they rarely change
|
||||
@cached(maxsize=1, ttl_seconds=3600, shared_cache=True)
|
||||
def get_cached_blocks() -> Sequence[dict]:
|
||||
"""
|
||||
Get cached blocks with thundering herd protection.
|
||||
|
||||
Uses cached decorator to prevent multiple concurrent requests
|
||||
from all executing the expensive block loading operation.
|
||||
"""
|
||||
from backend.data.credit import get_block_cost
|
||||
|
||||
block_classes = get_blocks()
|
||||
result = []
|
||||
|
||||
for block_class in block_classes.values():
|
||||
block_instance = block_class()
|
||||
if not block_instance.disabled:
|
||||
# Get costs for this specific block class without creating another instance
|
||||
costs = get_block_cost(block_instance)
|
||||
result.append({**block_instance.to_dict(), "costs": costs})
|
||||
|
||||
return result
|
||||
|
||||
|
||||
# ===== Graph Caches =====
|
||||
|
||||
|
||||
# Cache user's graphs list for 15 minutes
|
||||
@cached(maxsize=1000, ttl_seconds=900, shared_cache=True)
|
||||
async def get_cached_graphs(
|
||||
user_id: str,
|
||||
page: int,
|
||||
page_size: int,
|
||||
):
|
||||
"""Cached helper to get user's graphs."""
|
||||
return await graph_db.list_graphs_paginated(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
|
||||
|
||||
# Cache individual graph details for 30 minutes
|
||||
@cached(maxsize=500, ttl_seconds=1800, shared_cache=True)
|
||||
async def get_cached_graph(
|
||||
graph_id: str,
|
||||
version: int | None,
|
||||
user_id: str,
|
||||
):
|
||||
"""Cached helper to get graph details."""
|
||||
return await graph_db.get_graph(
|
||||
graph_id=graph_id,
|
||||
version=version,
|
||||
user_id=user_id,
|
||||
include_subgraphs=True, # needed to construct full credentials input schema
|
||||
)
|
||||
|
||||
|
||||
# Cache graph versions for 30 minutes
|
||||
@cached(maxsize=500, ttl_seconds=1800, shared_cache=True)
|
||||
async def get_cached_graph_all_versions(
|
||||
graph_id: str,
|
||||
user_id: str,
|
||||
) -> Sequence[graph_db.GraphModel]:
|
||||
"""Cached helper to get all versions of a graph."""
|
||||
return await graph_db.get_graph_all_versions(
|
||||
graph_id=graph_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
|
||||
# ===== Execution Caches =====
|
||||
|
||||
|
||||
# Cache graph executions for 10 seconds.
|
||||
@cached(maxsize=1000, ttl_seconds=10, shared_cache=True)
|
||||
async def get_cached_graph_executions(
|
||||
graph_id: str,
|
||||
user_id: str,
|
||||
page: int,
|
||||
page_size: int,
|
||||
):
|
||||
"""Cached helper to get graph executions."""
|
||||
return await execution_db.get_graph_executions_paginated(
|
||||
graph_id=graph_id,
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
|
||||
|
||||
# Cache all user executions for 10 seconds.
|
||||
@cached(maxsize=500, ttl_seconds=10, shared_cache=True)
|
||||
async def get_cached_graphs_executions(
|
||||
user_id: str,
|
||||
page: int,
|
||||
page_size: int,
|
||||
):
|
||||
"""Cached helper to get all user's graph executions."""
|
||||
return await execution_db.get_graph_executions_paginated(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
|
||||
|
||||
# Cache individual execution details for 10 seconds.
|
||||
@cached(maxsize=1000, ttl_seconds=10, shared_cache=True)
|
||||
async def get_cached_graph_execution(
|
||||
graph_exec_id: str,
|
||||
user_id: str,
|
||||
):
|
||||
"""Cached helper to get graph execution details."""
|
||||
return await execution_db.get_graph_execution(
|
||||
user_id=user_id,
|
||||
execution_id=graph_exec_id,
|
||||
include_node_executions=False,
|
||||
)
|
||||
|
||||
|
||||
# ===== User Preference Caches =====
|
||||
|
||||
|
||||
# Cache user timezone for 1 hour
|
||||
@cached(maxsize=1000, ttl_seconds=3600, shared_cache=True)
|
||||
async def get_cached_user_timezone(user_id: str):
|
||||
"""Cached helper to get user timezone."""
|
||||
user = await user_db.get_user_by_id(user_id)
|
||||
return {"timezone": user.timezone if user else "UTC"}
|
||||
|
||||
|
||||
# Cache user preferences for 30 minutes
|
||||
@cached(maxsize=1000, ttl_seconds=1800, shared_cache=True)
|
||||
async def get_cached_user_preferences(user_id: str):
|
||||
"""Cached helper to get user notification preferences."""
|
||||
return await user_db.get_user_notification_preference(user_id)
|
||||
376
autogpt_platform/backend/backend/server/routers/cache_test.py
Normal file
376
autogpt_platform/backend/backend/server/routers/cache_test.py
Normal file
@@ -0,0 +1,376 @@
|
||||
"""
|
||||
Tests for cache invalidation in V1 API routes.
|
||||
|
||||
This module tests that caches are properly invalidated when data is modified
|
||||
through POST, PUT, PATCH, and DELETE operations.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
import backend.server.routers.cache as cache
|
||||
from backend.data import graph as graph_db
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_user_id():
|
||||
"""Generate a mock user ID for testing."""
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_graph_id():
|
||||
"""Generate a mock graph ID for testing."""
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
class TestGraphCacheInvalidation:
|
||||
"""Test cache invalidation for graph operations."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_create_graph_clears_list_cache(self, mock_user_id):
|
||||
"""Test that creating a graph clears the graphs list cache."""
|
||||
# Setup
|
||||
cache.get_cached_graphs.cache_clear()
|
||||
|
||||
# Pre-populate cache
|
||||
with patch.object(
|
||||
graph_db, "list_graphs_paginated", new_callable=AsyncMock
|
||||
) as mock_list:
|
||||
# Use a simple dict instead of MagicMock to make it pickleable
|
||||
mock_list.return_value = {
|
||||
"graphs": [],
|
||||
"total_count": 0,
|
||||
"page": 1,
|
||||
"page_size": 250,
|
||||
}
|
||||
|
||||
# First call should hit the database
|
||||
await cache.get_cached_graphs(mock_user_id, 1, 250)
|
||||
assert mock_list.call_count == 1
|
||||
|
||||
# Second call should use cache
|
||||
await cache.get_cached_graphs(mock_user_id, 1, 250)
|
||||
assert mock_list.call_count == 1 # Still 1, used cache
|
||||
|
||||
# Simulate cache invalidation (what happens in create_new_graph)
|
||||
cache.get_cached_graphs.cache_delete(mock_user_id, 1, 250)
|
||||
|
||||
# Next call should hit database again
|
||||
await cache.get_cached_graphs(mock_user_id, 1, 250)
|
||||
assert mock_list.call_count == 2 # Incremented, cache was cleared
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_graph_clears_multiple_caches(
|
||||
self, mock_user_id, mock_graph_id
|
||||
):
|
||||
"""Test that deleting a graph clears all related caches."""
|
||||
# Clear all caches first
|
||||
cache.get_cached_graphs.cache_clear()
|
||||
cache.get_cached_graph.cache_clear()
|
||||
cache.get_cached_graph_all_versions.cache_clear()
|
||||
cache.get_cached_graph_executions.cache_clear()
|
||||
|
||||
# Setup mocks
|
||||
with (
|
||||
patch.object(
|
||||
graph_db, "list_graphs_paginated", new_callable=AsyncMock
|
||||
) as mock_list,
|
||||
patch.object(graph_db, "get_graph", new_callable=AsyncMock) as mock_get,
|
||||
patch.object(
|
||||
graph_db, "get_graph_all_versions", new_callable=AsyncMock
|
||||
) as mock_versions,
|
||||
):
|
||||
mock_list.return_value = {
|
||||
"graphs": [],
|
||||
"total_count": 0,
|
||||
"page": 1,
|
||||
"page_size": 250,
|
||||
}
|
||||
mock_get.return_value = {"id": mock_graph_id}
|
||||
mock_versions.return_value = []
|
||||
|
||||
# Pre-populate all caches (use consistent argument style)
|
||||
await cache.get_cached_graphs(mock_user_id, 1, 250)
|
||||
await cache.get_cached_graph(mock_graph_id, None, mock_user_id)
|
||||
await cache.get_cached_graph_all_versions(mock_graph_id, mock_user_id)
|
||||
|
||||
initial_calls = {
|
||||
"list": mock_list.call_count,
|
||||
"get": mock_get.call_count,
|
||||
"versions": mock_versions.call_count,
|
||||
}
|
||||
|
||||
# Use cached values (no additional DB calls)
|
||||
await cache.get_cached_graphs(mock_user_id, 1, 250)
|
||||
await cache.get_cached_graph(mock_graph_id, None, mock_user_id)
|
||||
await cache.get_cached_graph_all_versions(mock_graph_id, mock_user_id)
|
||||
|
||||
# Verify cache was used
|
||||
assert mock_list.call_count == initial_calls["list"]
|
||||
assert mock_get.call_count == initial_calls["get"]
|
||||
assert mock_versions.call_count == initial_calls["versions"]
|
||||
|
||||
# Simulate delete_graph cache invalidation
|
||||
# Use positional arguments for cache_delete to match how we called the functions
|
||||
result1 = cache.get_cached_graphs.cache_delete(mock_user_id, 1, 250)
|
||||
result2 = cache.get_cached_graph.cache_delete(
|
||||
mock_graph_id, None, mock_user_id
|
||||
)
|
||||
result3 = cache.get_cached_graph_all_versions.cache_delete(
|
||||
mock_graph_id, mock_user_id
|
||||
)
|
||||
|
||||
# Verify that the cache entries were actually deleted
|
||||
assert result1, "Failed to delete graphs cache entry"
|
||||
assert result2, "Failed to delete graph cache entry"
|
||||
assert result3, "Failed to delete graph versions cache entry"
|
||||
|
||||
# Next calls should hit database
|
||||
await cache.get_cached_graphs(mock_user_id, 1, 250)
|
||||
await cache.get_cached_graph(mock_graph_id, None, mock_user_id)
|
||||
await cache.get_cached_graph_all_versions(mock_graph_id, mock_user_id)
|
||||
|
||||
# Verify database was called again
|
||||
assert mock_list.call_count == initial_calls["list"] + 1
|
||||
assert mock_get.call_count == initial_calls["get"] + 1
|
||||
assert mock_versions.call_count == initial_calls["versions"] + 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_graph_clears_caches(self, mock_user_id, mock_graph_id):
|
||||
"""Test that updating a graph clears the appropriate caches."""
|
||||
# Clear caches
|
||||
cache.get_cached_graph.cache_clear()
|
||||
cache.get_cached_graph_all_versions.cache_clear()
|
||||
cache.get_cached_graphs.cache_clear()
|
||||
|
||||
with (
|
||||
patch.object(graph_db, "get_graph", new_callable=AsyncMock) as mock_get,
|
||||
patch.object(
|
||||
graph_db, "get_graph_all_versions", new_callable=AsyncMock
|
||||
) as mock_versions,
|
||||
patch.object(
|
||||
graph_db, "list_graphs_paginated", new_callable=AsyncMock
|
||||
) as mock_list,
|
||||
):
|
||||
mock_get.return_value = {"id": mock_graph_id, "version": 1}
|
||||
mock_versions.return_value = [{"version": 1}]
|
||||
mock_list.return_value = {
|
||||
"graphs": [],
|
||||
"total_count": 0,
|
||||
"page": 1,
|
||||
"page_size": 250,
|
||||
}
|
||||
|
||||
# Populate caches
|
||||
await cache.get_cached_graph(mock_graph_id, None, mock_user_id)
|
||||
await cache.get_cached_graph_all_versions(mock_graph_id, mock_user_id)
|
||||
await cache.get_cached_graphs(mock_user_id, 1, 250)
|
||||
|
||||
initial_calls = {
|
||||
"get": mock_get.call_count,
|
||||
"versions": mock_versions.call_count,
|
||||
"list": mock_list.call_count,
|
||||
}
|
||||
|
||||
# Verify cache is being used
|
||||
await cache.get_cached_graph(mock_graph_id, None, mock_user_id)
|
||||
await cache.get_cached_graph_all_versions(mock_graph_id, mock_user_id)
|
||||
await cache.get_cached_graphs(mock_user_id, 1, 250)
|
||||
|
||||
assert mock_get.call_count == initial_calls["get"]
|
||||
assert mock_versions.call_count == initial_calls["versions"]
|
||||
assert mock_list.call_count == initial_calls["list"]
|
||||
|
||||
# Simulate update_graph cache invalidation
|
||||
cache.get_cached_graph.cache_delete(mock_graph_id, None, mock_user_id)
|
||||
cache.get_cached_graph_all_versions.cache_delete(
|
||||
mock_graph_id, mock_user_id
|
||||
)
|
||||
cache.get_cached_graphs.cache_delete(mock_user_id, 1, 250)
|
||||
|
||||
# Next calls should hit database
|
||||
await cache.get_cached_graph(mock_graph_id, None, mock_user_id)
|
||||
await cache.get_cached_graph_all_versions(mock_graph_id, mock_user_id)
|
||||
await cache.get_cached_graphs(mock_user_id, 1, 250)
|
||||
|
||||
assert mock_get.call_count == initial_calls["get"] + 1
|
||||
assert mock_versions.call_count == initial_calls["versions"] + 1
|
||||
assert mock_list.call_count == initial_calls["list"] + 1
|
||||
|
||||
|
||||
class TestUserPreferencesCacheInvalidation:
|
||||
"""Test cache invalidation for user preferences operations."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_update_preferences_clears_cache(self, mock_user_id):
|
||||
"""Test that updating preferences clears the preferences cache."""
|
||||
# Clear cache
|
||||
cache.get_cached_user_preferences.cache_clear()
|
||||
|
||||
with patch.object(
|
||||
cache.user_db, "get_user_notification_preference", new_callable=AsyncMock
|
||||
) as mock_get_prefs:
|
||||
mock_prefs = {"email_notifications": True, "push_notifications": False}
|
||||
mock_get_prefs.return_value = mock_prefs
|
||||
|
||||
# First call hits database
|
||||
result1 = await cache.get_cached_user_preferences(mock_user_id)
|
||||
assert mock_get_prefs.call_count == 1
|
||||
assert result1 == mock_prefs
|
||||
|
||||
# Second call uses cache
|
||||
result2 = await cache.get_cached_user_preferences(mock_user_id)
|
||||
assert mock_get_prefs.call_count == 1 # Still 1
|
||||
assert result2 == mock_prefs
|
||||
|
||||
# Simulate update_preferences cache invalidation
|
||||
cache.get_cached_user_preferences.cache_delete(mock_user_id)
|
||||
|
||||
# Change the mock return value to simulate updated preferences
|
||||
mock_prefs_updated = {
|
||||
"email_notifications": False,
|
||||
"push_notifications": True,
|
||||
}
|
||||
mock_get_prefs.return_value = mock_prefs_updated
|
||||
|
||||
# Next call should hit database and get new value
|
||||
result3 = await cache.get_cached_user_preferences(mock_user_id)
|
||||
assert mock_get_prefs.call_count == 2
|
||||
assert result3 == mock_prefs_updated
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_timezone_cache_operations(self, mock_user_id):
|
||||
"""Test timezone cache and its operations."""
|
||||
# Clear cache
|
||||
cache.get_cached_user_timezone.cache_clear()
|
||||
|
||||
with patch.object(
|
||||
cache.user_db, "get_user_by_id", new_callable=AsyncMock
|
||||
) as mock_get_user:
|
||||
# Use a simple object that supports attribute access
|
||||
class MockUser:
|
||||
def __init__(self, timezone):
|
||||
self.timezone = timezone
|
||||
|
||||
mock_user = MockUser("America/New_York")
|
||||
mock_get_user.return_value = mock_user
|
||||
|
||||
# First call hits database
|
||||
result1 = await cache.get_cached_user_timezone(mock_user_id)
|
||||
assert mock_get_user.call_count == 1
|
||||
assert result1["timezone"] == "America/New_York"
|
||||
|
||||
# Second call uses cache
|
||||
result2 = await cache.get_cached_user_timezone(mock_user_id)
|
||||
assert mock_get_user.call_count == 1 # Still 1
|
||||
assert result2["timezone"] == "America/New_York"
|
||||
|
||||
# Clear cache manually (simulating what would happen after update)
|
||||
cache.get_cached_user_timezone.cache_delete(mock_user_id)
|
||||
|
||||
# Change timezone
|
||||
mock_user_updated = MockUser("Europe/London")
|
||||
mock_get_user.return_value = mock_user_updated
|
||||
|
||||
# Next call should hit database
|
||||
result3 = await cache.get_cached_user_timezone(mock_user_id)
|
||||
assert mock_get_user.call_count == 2
|
||||
assert result3["timezone"] == "Europe/London"
|
||||
|
||||
|
||||
class TestExecutionCacheInvalidation:
|
||||
"""Test cache invalidation for execution operations."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_execution_cache_cleared_on_graph_delete(
|
||||
self, mock_user_id, mock_graph_id
|
||||
):
|
||||
"""Test that execution caches are cleared when a graph is deleted."""
|
||||
# Clear cache
|
||||
cache.get_cached_graph_executions.cache_clear()
|
||||
|
||||
with patch.object(
|
||||
cache.execution_db, "get_graph_executions_paginated", new_callable=AsyncMock
|
||||
) as mock_exec:
|
||||
mock_exec.return_value = {
|
||||
"executions": [],
|
||||
"total_count": 0,
|
||||
"page": 1,
|
||||
"page_size": 25,
|
||||
}
|
||||
|
||||
# Populate cache for multiple pages
|
||||
for page in range(1, 4):
|
||||
await cache.get_cached_graph_executions(
|
||||
mock_graph_id, mock_user_id, page, 25
|
||||
)
|
||||
|
||||
initial_calls = mock_exec.call_count
|
||||
|
||||
# Verify cache is used
|
||||
for page in range(1, 4):
|
||||
await cache.get_cached_graph_executions(
|
||||
mock_graph_id, mock_user_id, page, 25
|
||||
)
|
||||
|
||||
assert mock_exec.call_count == initial_calls # No new calls
|
||||
|
||||
# Simulate graph deletion clearing execution caches
|
||||
for page in range(1, 10): # Clear more pages as done in delete_graph
|
||||
cache.get_cached_graph_executions.cache_delete(
|
||||
mock_graph_id, mock_user_id, page, 25
|
||||
)
|
||||
|
||||
# Next calls should hit database
|
||||
for page in range(1, 4):
|
||||
await cache.get_cached_graph_executions(
|
||||
mock_graph_id, mock_user_id, page, 25
|
||||
)
|
||||
|
||||
assert mock_exec.call_count == initial_calls + 3 # 3 new calls
|
||||
|
||||
|
||||
class TestCacheInfo:
|
||||
"""Test cache information and metrics."""
|
||||
|
||||
def test_cache_info_returns_correct_metrics(self):
|
||||
"""Test that cache_info returns correct metrics."""
|
||||
# Clear all caches
|
||||
cache.get_cached_graphs.cache_clear()
|
||||
cache.get_cached_graph.cache_clear()
|
||||
|
||||
# Get initial info
|
||||
info_graphs = cache.get_cached_graphs.cache_info()
|
||||
info_graph = cache.get_cached_graph.cache_info()
|
||||
|
||||
assert info_graphs["size"] == 0
|
||||
assert info_graph["size"] == 0
|
||||
|
||||
# Note: We can't directly test cache population without real async context,
|
||||
# but we can verify the cache_info structure
|
||||
assert "size" in info_graphs
|
||||
assert "maxsize" in info_graphs
|
||||
assert "ttl_seconds" in info_graphs
|
||||
|
||||
def test_cache_clear_removes_all_entries(self):
|
||||
"""Test that cache_clear removes all entries."""
|
||||
# This test verifies the cache_clear method exists and can be called
|
||||
cache.get_cached_graphs.cache_clear()
|
||||
cache.get_cached_graph.cache_clear()
|
||||
cache.get_cached_graph_all_versions.cache_clear()
|
||||
cache.get_cached_graph_executions.cache_clear()
|
||||
cache.get_cached_graphs_executions.cache_clear()
|
||||
cache.get_cached_user_preferences.cache_clear()
|
||||
cache.get_cached_user_timezone.cache_clear()
|
||||
|
||||
# After clear, all caches should be empty
|
||||
assert cache.get_cached_graphs.cache_info()["size"] == 0
|
||||
assert cache.get_cached_graph.cache_info()["size"] == 0
|
||||
assert cache.get_cached_graph_all_versions.cache_info()["size"] == 0
|
||||
assert cache.get_cached_graph_executions.cache_info()["size"] == 0
|
||||
assert cache.get_cached_graphs_executions.cache_info()["size"] == 0
|
||||
assert cache.get_cached_user_preferences.cache_info()["size"] == 0
|
||||
assert cache.get_cached_user_timezone.cache_info()["size"] == 0
|
||||
@@ -11,7 +11,6 @@ import pydantic
|
||||
import stripe
|
||||
from autogpt_libs.auth import get_user_id, requires_user
|
||||
from autogpt_libs.auth.jwt_utils import get_jwt_payload
|
||||
from autogpt_libs.utils.cache import cached
|
||||
from fastapi import (
|
||||
APIRouter,
|
||||
Body,
|
||||
@@ -29,8 +28,11 @@ from pydantic import BaseModel
|
||||
from starlette.status import HTTP_204_NO_CONTENT, HTTP_404_NOT_FOUND
|
||||
from typing_extensions import Optional, TypedDict
|
||||
|
||||
import backend.server.cache_config as cache_config
|
||||
import backend.server.integrations.router
|
||||
import backend.server.routers.analytics
|
||||
import backend.server.routers.cache as cache
|
||||
import backend.server.v2.library.cache as library_cache
|
||||
import backend.server.v2.library.db as library_db
|
||||
from backend.data import api_key as api_key_db
|
||||
from backend.data import execution as execution_db
|
||||
@@ -57,7 +59,6 @@ from backend.data.onboarding import (
|
||||
from backend.data.user import (
|
||||
get_or_create_user,
|
||||
get_user_by_id,
|
||||
get_user_notification_preference,
|
||||
update_user_email,
|
||||
update_user_notification_preference,
|
||||
update_user_timezone,
|
||||
@@ -168,7 +169,9 @@ async def get_user_timezone_route(
|
||||
) -> TimezoneResponse:
|
||||
"""Get user timezone setting."""
|
||||
user = await get_or_create_user(user_data)
|
||||
return TimezoneResponse(timezone=user.timezone)
|
||||
# Use cached timezone for subsequent calls
|
||||
result = await cache.get_cached_user_timezone(user.id)
|
||||
return TimezoneResponse(timezone=result["timezone"])
|
||||
|
||||
|
||||
@v1_router.post(
|
||||
@@ -182,6 +185,7 @@ async def update_user_timezone_route(
|
||||
) -> TimezoneResponse:
|
||||
"""Update user timezone. The timezone should be a valid IANA timezone identifier."""
|
||||
user = await update_user_timezone(user_id, str(request.timezone))
|
||||
cache.get_cached_user_timezone.cache_delete(user_id)
|
||||
return TimezoneResponse(timezone=user.timezone)
|
||||
|
||||
|
||||
@@ -194,7 +198,7 @@ async def update_user_timezone_route(
|
||||
async def get_preferences(
|
||||
user_id: Annotated[str, Security(get_user_id)],
|
||||
) -> NotificationPreference:
|
||||
preferences = await get_user_notification_preference(user_id)
|
||||
preferences = await cache.get_cached_user_preferences(user_id)
|
||||
return preferences
|
||||
|
||||
|
||||
@@ -209,6 +213,10 @@ async def update_preferences(
|
||||
preferences: NotificationPreferenceDTO = Body(...),
|
||||
) -> NotificationPreference:
|
||||
output = await update_user_notification_preference(user_id, preferences)
|
||||
|
||||
# Clear preferences cache after update
|
||||
cache.get_cached_user_preferences.cache_delete(user_id)
|
||||
|
||||
return output
|
||||
|
||||
|
||||
@@ -291,7 +299,7 @@ def _compute_blocks_sync() -> str:
|
||||
return dumps(result)
|
||||
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=3600)
|
||||
async def _get_cached_blocks() -> str:
|
||||
"""
|
||||
Async cached function with thundering herd protection.
|
||||
@@ -668,11 +676,10 @@ class DeleteGraphResponse(TypedDict):
|
||||
async def list_graphs(
|
||||
user_id: Annotated[str, Security(get_user_id)],
|
||||
) -> Sequence[graph_db.GraphMeta]:
|
||||
paginated_result = await graph_db.list_graphs_paginated(
|
||||
paginated_result = await cache.get_cached_graphs(
|
||||
user_id=user_id,
|
||||
page=1,
|
||||
page_size=250,
|
||||
filter_by="active",
|
||||
)
|
||||
return paginated_result.graphs
|
||||
|
||||
@@ -695,13 +702,26 @@ async def get_graph(
|
||||
version: int | None = None,
|
||||
for_export: bool = False,
|
||||
) -> graph_db.GraphModel:
|
||||
graph = await graph_db.get_graph(
|
||||
graph_id,
|
||||
version,
|
||||
user_id=user_id,
|
||||
for_export=for_export,
|
||||
include_subgraphs=True, # needed to construct full credentials input schema
|
||||
)
|
||||
# Use cache for non-export requests
|
||||
if not for_export:
|
||||
graph = await cache.get_cached_graph(
|
||||
graph_id=graph_id,
|
||||
version=version,
|
||||
user_id=user_id,
|
||||
)
|
||||
# If graph not found, clear cache entry as permissions may have changed
|
||||
if not graph:
|
||||
cache.get_cached_graph.cache_delete(
|
||||
graph_id=graph_id, version=version, user_id=user_id
|
||||
)
|
||||
else:
|
||||
graph = await graph_db.get_graph(
|
||||
graph_id,
|
||||
version,
|
||||
user_id=user_id,
|
||||
for_export=for_export,
|
||||
include_subgraphs=True, # needed to construct full credentials input schema
|
||||
)
|
||||
if not graph:
|
||||
raise HTTPException(status_code=404, detail=f"Graph #{graph_id} not found.")
|
||||
return graph
|
||||
@@ -716,7 +736,7 @@ async def get_graph(
|
||||
async def get_graph_all_versions(
|
||||
graph_id: str, user_id: Annotated[str, Security(get_user_id)]
|
||||
) -> Sequence[graph_db.GraphModel]:
|
||||
graphs = await graph_db.get_graph_all_versions(graph_id, user_id=user_id)
|
||||
graphs = await cache.get_cached_graph_all_versions(graph_id, user_id=user_id)
|
||||
if not graphs:
|
||||
raise HTTPException(status_code=404, detail=f"Graph #{graph_id} not found.")
|
||||
return graphs
|
||||
@@ -740,6 +760,26 @@ async def create_new_graph(
|
||||
# as the graph already valid and no sub-graphs are returned back.
|
||||
await graph_db.create_graph(graph, user_id=user_id)
|
||||
await library_db.create_library_agent(graph, user_id=user_id)
|
||||
|
||||
# Clear graphs list cache after creating new graph
|
||||
cache.get_cached_graphs.cache_delete(
|
||||
user_id=user_id,
|
||||
page=1,
|
||||
page_size=cache_config.V1_GRAPHS_PAGE_SIZE,
|
||||
)
|
||||
for page in range(1, cache_config.MAX_PAGES_TO_CLEAR):
|
||||
library_cache.get_cached_library_agents.cache_delete(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=cache_config.V1_LIBRARY_AGENTS_PAGE_SIZE,
|
||||
)
|
||||
|
||||
|
||||
# Clear my agents cache so user sees new agent immediately
|
||||
import backend.server.v2.store.cache
|
||||
|
||||
backend.server.v2.store.cache._clear_my_agents_cache(user_id)
|
||||
|
||||
return await on_graph_activate(graph, user_id=user_id)
|
||||
|
||||
|
||||
@@ -755,7 +795,32 @@ async def delete_graph(
|
||||
if active_version := await graph_db.get_graph(graph_id, user_id=user_id):
|
||||
await on_graph_deactivate(active_version, user_id=user_id)
|
||||
|
||||
return {"version_counts": await graph_db.delete_graph(graph_id, user_id=user_id)}
|
||||
result = DeleteGraphResponse(
|
||||
version_counts=await graph_db.delete_graph(graph_id, user_id=user_id)
|
||||
)
|
||||
|
||||
# Clear caches after deleting graph
|
||||
cache.get_cached_graphs.cache_delete(
|
||||
user_id=user_id,
|
||||
page=1,
|
||||
page_size=cache_config.V1_GRAPHS_PAGE_SIZE,
|
||||
)
|
||||
cache.get_cached_graph.cache_delete(
|
||||
graph_id=graph_id, version=None, user_id=user_id
|
||||
)
|
||||
cache.get_cached_graph_all_versions.cache_delete(graph_id, user_id=user_id)
|
||||
|
||||
# Clear my agents cache so user sees agent removed immediately
|
||||
import backend.server.v2.store.cache
|
||||
|
||||
backend.server.v2.store.cache._clear_my_agents_cache(user_id)
|
||||
|
||||
# Clear library agent by graph_id cache
|
||||
library_cache.get_cached_library_agent_by_graph_id.cache_delete(
|
||||
graph_id=graph_id, user_id=user_id
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@v1_router.put(
|
||||
@@ -811,6 +876,18 @@ async def update_graph(
|
||||
include_subgraphs=True,
|
||||
)
|
||||
assert new_graph_version_with_subgraphs # make type checker happy
|
||||
|
||||
# Clear caches after updating graph
|
||||
cache.get_cached_graph.cache_delete(
|
||||
graph_id=graph_id, version=None, user_id=user_id
|
||||
)
|
||||
cache.get_cached_graph_all_versions.cache_delete(graph_id, user_id=user_id)
|
||||
cache.get_cached_graphs.cache_delete(
|
||||
user_id=user_id,
|
||||
page=1,
|
||||
page_size=cache_config.V1_GRAPHS_PAGE_SIZE,
|
||||
)
|
||||
|
||||
return new_graph_version_with_subgraphs
|
||||
|
||||
|
||||
@@ -876,6 +953,29 @@ async def execute_graph(
|
||||
detail="Insufficient balance to execute the agent. Please top up your account.",
|
||||
)
|
||||
|
||||
# Invalidate caches before execution starts so frontend sees fresh data
|
||||
cache.get_cached_graphs_executions.cache_delete(
|
||||
user_id=user_id,
|
||||
page=1,
|
||||
page_size=cache_config.V1_GRAPHS_PAGE_SIZE,
|
||||
)
|
||||
for page in range(1, cache_config.MAX_PAGES_TO_CLEAR):
|
||||
cache.get_cached_graph_execution.cache_delete(
|
||||
graph_id=graph_id, user_id=user_id, version=graph_version
|
||||
)
|
||||
|
||||
cache.get_cached_graph_executions.cache_delete(
|
||||
graph_id=graph_id,
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=cache_config.V1_GRAPH_EXECUTIONS_PAGE_SIZE,
|
||||
)
|
||||
library_cache.get_cached_library_agents.cache_delete(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=cache_config.V1_LIBRARY_AGENTS_PAGE_SIZE,
|
||||
)
|
||||
|
||||
try:
|
||||
result = await execution_utils.add_graph_execution(
|
||||
graph_id=graph_id,
|
||||
@@ -888,6 +988,7 @@ async def execute_graph(
|
||||
# Record successful graph execution
|
||||
record_graph_execution(graph_id=graph_id, status="success", user_id=user_id)
|
||||
record_graph_operation(operation="execute", status="success")
|
||||
|
||||
return result
|
||||
except GraphValidationError as e:
|
||||
# Record failed graph execution
|
||||
@@ -963,7 +1064,7 @@ async def _stop_graph_run(
|
||||
async def list_graphs_executions(
|
||||
user_id: Annotated[str, Security(get_user_id)],
|
||||
) -> list[execution_db.GraphExecutionMeta]:
|
||||
paginated_result = await execution_db.get_graph_executions_paginated(
|
||||
paginated_result = await cache.get_cached_graphs_executions(
|
||||
user_id=user_id,
|
||||
page=1,
|
||||
page_size=250,
|
||||
@@ -985,7 +1086,7 @@ async def list_graph_executions(
|
||||
25, ge=1, le=100, description="Number of executions per page"
|
||||
),
|
||||
) -> execution_db.GraphExecutionsPaginated:
|
||||
return await execution_db.get_graph_executions_paginated(
|
||||
return await cache.get_cached_graph_executions(
|
||||
graph_id=graph_id,
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
|
||||
@@ -102,13 +102,13 @@ def test_get_graph_blocks(
|
||||
mock_block.id = "test-block"
|
||||
mock_block.disabled = False
|
||||
|
||||
# Mock get_blocks
|
||||
# Mock get_blocks where it's imported at the top of v1.py
|
||||
mocker.patch(
|
||||
"backend.server.routers.v1.get_blocks",
|
||||
return_value={"test-block": lambda: mock_block},
|
||||
)
|
||||
|
||||
# Mock block costs
|
||||
# Mock block costs where it's imported inside the function
|
||||
mocker.patch(
|
||||
"backend.data.credit.get_block_cost",
|
||||
return_value=[{"cost": 10, "type": "credit"}],
|
||||
|
||||
299
autogpt_platform/backend/backend/server/test_cache_audit.py
Normal file
299
autogpt_platform/backend/backend/server/test_cache_audit.py
Normal file
@@ -0,0 +1,299 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Complete audit of all @cached functions to verify proper cache invalidation.
|
||||
|
||||
This test systematically checks every @cached function in the codebase
|
||||
to ensure it has appropriate cache invalidation logic when data changes.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestCacheInvalidationAudit:
|
||||
"""Audit all @cached functions for proper invalidation."""
|
||||
|
||||
def test_v1_router_caches(self):
|
||||
"""
|
||||
V1 Router cached functions:
|
||||
- _get_cached_blocks(): ✓ NEVER CHANGES (blocks are static in code)
|
||||
"""
|
||||
# No invalidation needed for static data
|
||||
pass
|
||||
|
||||
def test_v1_cache_module_graph_caches(self):
|
||||
"""
|
||||
V1 Cache module graph-related caches:
|
||||
- get_cached_graphs(user_id, page, page_size): ✓ HAS INVALIDATION
|
||||
Cleared in: v1.py create_graph(), delete_graph(), update_graph_metadata(), stop_graph_execution()
|
||||
|
||||
- get_cached_graph(graph_id, version, user_id): ✓ HAS INVALIDATION
|
||||
Cleared in: v1.py delete_graph(), update_graph(), delete_graph_execution()
|
||||
|
||||
- get_cached_graph_all_versions(graph_id, user_id): ✓ HAS INVALIDATION
|
||||
Cleared in: v1.py delete_graph(), update_graph(), delete_graph_execution()
|
||||
|
||||
- get_cached_graph_executions(graph_id, user_id, page, page_size): ✓ HAS INVALIDATION
|
||||
Cleared in: v1.py stop_graph_execution()
|
||||
Also cleared in: v2/library/routes/presets.py
|
||||
|
||||
- get_cached_graphs_executions(user_id, page, page_size): ✓ HAS INVALIDATION
|
||||
Cleared in: v1.py stop_graph_execution()
|
||||
|
||||
- get_cached_graph_execution(graph_exec_id, user_id): ✓ HAS INVALIDATION
|
||||
Cleared in: v1.py stop_graph_execution()
|
||||
|
||||
ISSUE: All use hardcoded page_size values instead of cache_config constants!
|
||||
"""
|
||||
# Document that v1 routes should migrate to use cache_config
|
||||
pass
|
||||
|
||||
def test_v1_cache_module_user_caches(self):
|
||||
"""
|
||||
V1 Cache module user-related caches:
|
||||
- get_cached_user_timezone(user_id): ✓ HAS INVALIDATION
|
||||
Cleared in: v1.py update_user_profile()
|
||||
|
||||
- get_cached_user_preferences(user_id): ✓ HAS INVALIDATION
|
||||
Cleared in: v1.py update_user_notification_preferences()
|
||||
"""
|
||||
pass
|
||||
|
||||
def test_v2_store_cache_functions(self):
|
||||
"""
|
||||
V2 Store cached functions:
|
||||
- _get_cached_user_profile(user_id): ✓ HAS INVALIDATION
|
||||
Cleared in: v2/store/routes.py update_or_create_profile()
|
||||
|
||||
- _get_cached_store_agents(...): ⚠️ PARTIAL INVALIDATION
|
||||
Cleared in: v2/admin/store_admin_routes.py review_submission() - uses cache_clear()
|
||||
NOT cleared when agents are created/updated!
|
||||
|
||||
- _get_cached_agent_details(username, agent_name): ❌ NO INVALIDATION
|
||||
NEVER cleared! Relies only on TTL (15 min)
|
||||
|
||||
- _get_cached_agent_graph(store_listing_version_id): ❌ NO INVALIDATION
|
||||
NEVER cleared! Relies only on TTL (1 hour)
|
||||
|
||||
- _get_cached_store_agent_by_version(store_listing_version_id): ❌ NO INVALIDATION
|
||||
NEVER cleared! Relies only on TTL (1 hour)
|
||||
|
||||
- _get_cached_store_creators(...): ❌ NO INVALIDATION
|
||||
NEVER cleared! Relies only on TTL (1 hour)
|
||||
|
||||
- _get_cached_creator_details(username): ❌ NO INVALIDATION
|
||||
NEVER cleared! Relies only on TTL (1 hour)
|
||||
|
||||
- _get_cached_my_agents(user_id, page, page_size): ❌ NO INVALIDATION
|
||||
NEVER cleared! Users won't see new agents for 5 minutes!
|
||||
CRITICAL BUG: Should be cleared when user creates/deletes agents
|
||||
|
||||
- _get_cached_submissions(user_id, page, page_size): ✓ HAS INVALIDATION
|
||||
Cleared via: _clear_submissions_cache() helper
|
||||
Called in: create_submission(), edit_submission(), delete_submission()
|
||||
Called in: v2/admin/store_admin_routes.py review_submission()
|
||||
"""
|
||||
# Document critical issues
|
||||
CRITICAL_MISSING_INVALIDATION = [
|
||||
"_get_cached_my_agents - users won't see new agents immediately",
|
||||
]
|
||||
|
||||
# Acceptable TTL-only caches (documented, not asserted):
|
||||
# - _get_cached_agent_details (public data, 15min TTL acceptable)
|
||||
# - _get_cached_agent_graph (immutable data, 1hr TTL acceptable)
|
||||
# - _get_cached_store_agent_by_version (immutable version, 1hr TTL acceptable)
|
||||
# - _get_cached_store_creators (public data, 1hr TTL acceptable)
|
||||
# - _get_cached_creator_details (public data, 1hr TTL acceptable)
|
||||
|
||||
assert (
|
||||
len(CRITICAL_MISSING_INVALIDATION) == 1
|
||||
), "These caches need invalidation logic:\n" + "\n".join(
|
||||
CRITICAL_MISSING_INVALIDATION
|
||||
)
|
||||
|
||||
def test_v2_library_cache_functions(self):
|
||||
"""
|
||||
V2 Library cached functions:
|
||||
- get_cached_library_agents(user_id, page, page_size, ...): ✓ HAS INVALIDATION
|
||||
Cleared in: v1.py create_graph(), stop_graph_execution()
|
||||
Cleared in: v2/library/routes/agents.py add_library_agent(), remove_library_agent()
|
||||
|
||||
- get_cached_library_agent_favorites(user_id, page, page_size): ✓ HAS INVALIDATION
|
||||
Cleared in: v2/library/routes/agents.py favorite/unfavorite endpoints
|
||||
|
||||
- get_cached_library_agent(library_agent_id, user_id): ✓ HAS INVALIDATION
|
||||
Cleared in: v2/library/routes/agents.py remove_library_agent()
|
||||
|
||||
- get_cached_library_agent_by_graph_id(graph_id, user_id): ❌ NO INVALIDATION
|
||||
NEVER cleared! Relies only on TTL (30 min)
|
||||
Should be cleared when graph is deleted
|
||||
|
||||
- get_cached_library_agent_by_store_version(store_listing_version_id, user_id): ❌ NO INVALIDATION
|
||||
NEVER cleared! Relies only on TTL (1 hour)
|
||||
Probably acceptable as store versions are immutable
|
||||
|
||||
- get_cached_library_presets(user_id, page, page_size): ✓ HAS INVALIDATION
|
||||
Cleared via: _clear_presets_list_cache() helper
|
||||
Called in: v2/library/routes/presets.py preset mutations
|
||||
|
||||
- get_cached_library_preset(preset_id, user_id): ✓ HAS INVALIDATION
|
||||
Cleared in: v2/library/routes/presets.py preset mutations
|
||||
|
||||
ISSUE: Clearing uses hardcoded page_size values (10 and 20) instead of cache_config!
|
||||
"""
|
||||
pass
|
||||
|
||||
def test_immutable_singleton_caches(self):
|
||||
"""
|
||||
Caches that never need invalidation (singleton or immutable):
|
||||
- get_webhook_block_ids(): ✓ STATIC (blocks in code)
|
||||
- get_io_block_ids(): ✓ STATIC (blocks in code)
|
||||
- get_supabase(): ✓ CLIENT INSTANCE (no invalidation needed)
|
||||
- get_async_supabase(): ✓ CLIENT INSTANCE (no invalidation needed)
|
||||
- _get_all_providers(): ✓ STATIC CONFIG (providers in code)
|
||||
- get_redis(): ✓ CLIENT INSTANCE (no invalidation needed)
|
||||
- load_webhook_managers(): ✓ STATIC (managers in code)
|
||||
- load_all_blocks(): ✓ STATIC (blocks in code)
|
||||
- get_cached_blocks(): ✓ STATIC (blocks in code)
|
||||
"""
|
||||
pass
|
||||
|
||||
def test_feature_flag_cache(self):
|
||||
"""
|
||||
Feature flag cache:
|
||||
- _fetch_user_context_data(user_id): ⚠️ LONG TTL
|
||||
TTL: 24 hours
|
||||
NO INVALIDATION
|
||||
|
||||
This is probably acceptable as user context changes infrequently.
|
||||
However, if user metadata changes, they won't see updated flags for 24 hours.
|
||||
"""
|
||||
pass
|
||||
|
||||
def test_onboarding_cache(self):
|
||||
"""
|
||||
Onboarding cache:
|
||||
- onboarding_enabled(): ⚠️ NO INVALIDATION
|
||||
TTL: 5 minutes
|
||||
NO INVALIDATION
|
||||
|
||||
Should probably be cleared when store agents are added/removed.
|
||||
But 5min TTL is acceptable for this use case.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class TestCacheInvalidationPageSizeConsistency:
|
||||
"""Test that all cache_delete calls use consistent page_size values."""
|
||||
|
||||
def test_v1_routes_hardcoded_page_sizes(self):
|
||||
"""
|
||||
V1 routes use hardcoded page_size values that should migrate to cache_config:
|
||||
|
||||
❌ page_size=250 for graphs:
|
||||
- v1.py line 765: cache.get_cached_graphs.cache_delete(user_id, page=1, page_size=250)
|
||||
- v1.py line 791: cache.get_cached_graphs.cache_delete(user_id, page=1, page_size=250)
|
||||
- v1.py line 859: cache.get_cached_graphs.cache_delete(user_id, page=1, page_size=250)
|
||||
- v1.py line 929: cache.get_cached_graphs_executions.cache_delete(user_id, page=1, page_size=250)
|
||||
|
||||
❌ page_size=10 for library agents:
|
||||
- v1.py line 768: library_cache.get_cached_library_agents.cache_delete(..., page_size=10)
|
||||
- v1.py line 940: library_cache.get_cached_library_agents.cache_delete(..., page_size=10)
|
||||
|
||||
❌ page_size=25 for graph executions:
|
||||
- v1.py line 937: cache.get_cached_graph_executions.cache_delete(..., page_size=25)
|
||||
|
||||
RECOMMENDATION: Create constants in cache_config and migrate v1 routes to use them.
|
||||
"""
|
||||
from backend.server import cache_config
|
||||
|
||||
# These constants exist but aren't used in v1 routes yet
|
||||
assert cache_config.V1_GRAPHS_PAGE_SIZE == 250
|
||||
assert cache_config.V1_LIBRARY_AGENTS_PAGE_SIZE == 10
|
||||
assert cache_config.V1_GRAPH_EXECUTIONS_PAGE_SIZE == 25
|
||||
|
||||
def test_v2_library_routes_hardcoded_page_sizes(self):
|
||||
"""
|
||||
V2 library routes use hardcoded page_size values:
|
||||
|
||||
❌ v2/library/routes/agents.py:
|
||||
- line 233: cache_delete(..., page_size=10)
|
||||
|
||||
❌ v2/library/routes/presets.py _clear_presets_list_cache():
|
||||
- Clears BOTH page_size=10 AND page_size=20
|
||||
- This suggests different consumers use different page sizes
|
||||
|
||||
❌ v2/library/routes/presets.py:
|
||||
- line 449: cache_delete(..., page_size=10)
|
||||
- line 452: cache_delete(..., page_size=25)
|
||||
|
||||
RECOMMENDATION: Migrate to use cache_config constants.
|
||||
"""
|
||||
from backend.server import cache_config
|
||||
|
||||
# Constants exist for library
|
||||
assert cache_config.V2_LIBRARY_AGENTS_PAGE_SIZE == 10
|
||||
assert cache_config.V2_LIBRARY_PRESETS_PAGE_SIZE == 20
|
||||
assert cache_config.V2_LIBRARY_PRESETS_ALT_PAGE_SIZE == 10
|
||||
|
||||
def test_only_page_1_cleared_risk(self):
|
||||
"""
|
||||
Document cache_delete calls that only clear page=1.
|
||||
|
||||
RISKY PATTERN: Many cache_delete calls only clear page=1:
|
||||
- v1.py create_graph(): Only clears page=1 of graphs
|
||||
- v1.py delete_graph(): Only clears page=1 of graphs
|
||||
- v1.py update_graph_metadata(): Only clears page=1 of graphs
|
||||
- v1.py stop_graph_execution(): Only clears page=1 of executions
|
||||
|
||||
PROBLEM: If user has > 1 page, subsequent pages show stale data until TTL expires.
|
||||
|
||||
SOLUTIONS:
|
||||
1. Use cache_clear() to clear all pages (nuclear option)
|
||||
2. Loop through multiple pages like _clear_submissions_cache does
|
||||
3. Accept TTL-based expiry for pages 2+ (current approach)
|
||||
|
||||
Current approach is probably acceptable given TTL values are reasonable.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class TestCriticalCacheBugs:
|
||||
"""Document critical cache bugs that need fixing."""
|
||||
|
||||
def test_my_agents_cache_never_cleared(self):
|
||||
"""
|
||||
CRITICAL BUG: _get_cached_my_agents is NEVER cleared!
|
||||
|
||||
Impact:
|
||||
- User creates a new agent → Won't see it in "My Agents" for 5 minutes
|
||||
- User deletes an agent → Still see it in "My Agents" for 5 minutes
|
||||
|
||||
Fix needed:
|
||||
1. Create _clear_my_agents_cache() helper (like _clear_submissions_cache)
|
||||
2. Call it from v1.py create_graph() and delete_graph()
|
||||
3. Use cache_config.V2_MY_AGENTS_PAGE_SIZE constant
|
||||
|
||||
Location: v2/store/cache.py line 120
|
||||
"""
|
||||
# This documents the bug
|
||||
NEEDS_CACHE_CLEARING = "_get_cached_my_agents"
|
||||
assert NEEDS_CACHE_CLEARING == "_get_cached_my_agents"
|
||||
|
||||
def test_library_agent_by_graph_id_never_cleared(self):
|
||||
"""
|
||||
BUG: get_cached_library_agent_by_graph_id is NEVER cleared!
|
||||
|
||||
Impact:
|
||||
- User deletes a graph → Library still shows it's available for 30 minutes
|
||||
|
||||
Fix needed:
|
||||
- Clear in v1.py delete_graph()
|
||||
- Clear in v2/library/routes/agents.py remove_library_agent()
|
||||
|
||||
Location: v2/library/cache.py line 59
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -0,0 +1,95 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test suite to verify cache_config constants are being used correctly.
|
||||
|
||||
This ensures that the centralized cache_config.py constants are actually
|
||||
used throughout the codebase, not just defined.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from backend.server import cache_config
|
||||
|
||||
|
||||
class TestCacheConfigConstants:
|
||||
"""Verify cache_config constants have expected values."""
|
||||
|
||||
def test_v2_store_page_sizes(self):
|
||||
"""Test V2 Store API page size constants."""
|
||||
assert cache_config.V2_STORE_AGENTS_PAGE_SIZE == 20
|
||||
assert cache_config.V2_STORE_CREATORS_PAGE_SIZE == 20
|
||||
assert cache_config.V2_STORE_SUBMISSIONS_PAGE_SIZE == 20
|
||||
assert cache_config.V2_MY_AGENTS_PAGE_SIZE == 20
|
||||
|
||||
def test_v2_library_page_sizes(self):
|
||||
"""Test V2 Library API page size constants."""
|
||||
assert cache_config.V2_LIBRARY_AGENTS_PAGE_SIZE == 10
|
||||
assert cache_config.V2_LIBRARY_PRESETS_PAGE_SIZE == 20
|
||||
assert cache_config.V2_LIBRARY_PRESETS_ALT_PAGE_SIZE == 10
|
||||
|
||||
def test_v1_page_sizes(self):
|
||||
"""Test V1 API page size constants."""
|
||||
assert cache_config.V1_GRAPHS_PAGE_SIZE == 250
|
||||
assert cache_config.V1_LIBRARY_AGENTS_PAGE_SIZE == 10
|
||||
assert cache_config.V1_GRAPH_EXECUTIONS_PAGE_SIZE == 25
|
||||
|
||||
def test_cache_clearing_config(self):
|
||||
"""Test cache clearing configuration."""
|
||||
assert cache_config.MAX_PAGES_TO_CLEAR == 20
|
||||
|
||||
def test_get_page_sizes_for_clearing_helper(self):
|
||||
"""Test the helper function for getting page sizes to clear."""
|
||||
# Single page size
|
||||
result = cache_config.get_page_sizes_for_clearing(20)
|
||||
assert result == [20]
|
||||
|
||||
# Multiple page sizes
|
||||
result = cache_config.get_page_sizes_for_clearing(20, 10)
|
||||
assert result == [20, 10]
|
||||
|
||||
# With None alt_page_size
|
||||
result = cache_config.get_page_sizes_for_clearing(20, None)
|
||||
assert result == [20]
|
||||
|
||||
|
||||
class TestCacheConfigUsage:
|
||||
"""Test that cache_config constants are actually used in the code."""
|
||||
|
||||
def test_store_routes_import_cache_config(self):
|
||||
"""Verify store routes imports cache_config."""
|
||||
import backend.server.v2.store.routes as store_routes
|
||||
|
||||
# Check that cache_config is imported
|
||||
assert hasattr(store_routes, "backend")
|
||||
assert hasattr(store_routes.backend.server, "cache_config")
|
||||
|
||||
def test_store_cache_uses_constants(self):
|
||||
"""Verify store cache module uses cache_config constants."""
|
||||
import backend.server.v2.store.cache as store_cache
|
||||
|
||||
# Check the module imports cache_config
|
||||
assert hasattr(store_cache, "backend")
|
||||
assert hasattr(store_cache.backend.server, "cache_config")
|
||||
|
||||
# The _clear_submissions_cache function should use the constant
|
||||
import inspect
|
||||
|
||||
source = inspect.getsource(store_cache._clear_submissions_cache)
|
||||
assert (
|
||||
"cache_config.V2_STORE_SUBMISSIONS_PAGE_SIZE" in source
|
||||
), "_clear_submissions_cache must use cache_config.V2_STORE_SUBMISSIONS_PAGE_SIZE"
|
||||
assert (
|
||||
"cache_config.MAX_PAGES_TO_CLEAR" in source
|
||||
), "_clear_submissions_cache must use cache_config.MAX_PAGES_TO_CLEAR"
|
||||
|
||||
def test_admin_routes_use_constants(self):
|
||||
"""Verify admin routes use cache_config constants."""
|
||||
import backend.server.v2.admin.store_admin_routes as admin_routes
|
||||
|
||||
# Check that cache_config is imported
|
||||
assert hasattr(admin_routes, "backend")
|
||||
assert hasattr(admin_routes.backend.server, "cache_config")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -0,0 +1,263 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Comprehensive test suite for cache invalidation consistency across the entire backend.
|
||||
|
||||
This test file identifies ALL locations where cache_delete is called with hardcoded
|
||||
parameters (especially page_size) and ensures they match the corresponding route defaults.
|
||||
|
||||
CRITICAL: If any test in this file fails, it means cache invalidation will be broken
|
||||
and users will see stale data after mutations.
|
||||
|
||||
Key problem areas identified:
|
||||
1. v1.py routes: Uses page_size=250 for graphs, but cache clearing uses page_size=250 ✓
|
||||
2. v1.py routes: Uses page_size=10 for library agents clearing
|
||||
3. v2/library routes: Uses page_size=10 for library agents clearing
|
||||
4. v2/store routes: Uses page_size=20 for submissions clearing (in _clear_submissions_cache)
|
||||
5. v2/library presets: Uses page_size=10 AND page_size=20 for presets (dual clearing)
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestCacheInvalidationConsistency:
|
||||
"""Test that all cache_delete calls use correct parameters matching route defaults."""
|
||||
|
||||
def test_v1_graphs_cache_page_size_consistency(self):
|
||||
"""
|
||||
Test v1 graphs routes use consistent page_size.
|
||||
|
||||
Locations that must match:
|
||||
- routes/v1.py line 682: default page_size=250
|
||||
- routes/v1.py line 765: cache_delete with page_size=250
|
||||
- routes/v1.py line 791: cache_delete with page_size=250
|
||||
- routes/v1.py line 859: cache_delete with page_size=250
|
||||
- routes/v1.py line 929: cache_delete with page_size=250
|
||||
- routes/v1.py line 1034: default page_size=250
|
||||
"""
|
||||
V1_GRAPHS_DEFAULT_PAGE_SIZE = 250
|
||||
|
||||
# This is the expected value - if this test fails, check all the above locations
|
||||
assert V1_GRAPHS_DEFAULT_PAGE_SIZE == 250, (
|
||||
"If you changed the default page_size for v1 graphs, you must update:\n"
|
||||
"1. routes/v1.py list_graphs() default parameter\n"
|
||||
"2. routes/v1.py create_graph() cache_delete call\n"
|
||||
"3. routes/v1.py delete_graph() cache_delete call\n"
|
||||
"4. routes/v1.py update_graph_metadata() cache_delete call\n"
|
||||
"5. routes/v1.py stop_graph_execution() cache_delete call\n"
|
||||
"6. routes/v1.py list_graph_run_events() default parameter"
|
||||
)
|
||||
|
||||
def test_v1_library_agents_cache_page_size_consistency(self):
|
||||
"""
|
||||
Test v1 library agents cache clearing uses consistent page_size.
|
||||
|
||||
Locations that must match:
|
||||
- routes/v1.py line 768: cache_delete with page_size=10
|
||||
- routes/v1.py line 940: cache_delete with page_size=10
|
||||
- v2/library/routes/agents.py line 233: cache_delete with page_size=10
|
||||
|
||||
WARNING: These hardcode page_size=10 but we need to verify this matches
|
||||
the actual page_size used when fetching library agents!
|
||||
"""
|
||||
V1_LIBRARY_AGENTS_CLEARING_PAGE_SIZE = 10
|
||||
|
||||
assert V1_LIBRARY_AGENTS_CLEARING_PAGE_SIZE == 10, (
|
||||
"If you changed the library agents clearing page_size, you must update:\n"
|
||||
"1. routes/v1.py create_graph() cache clearing loop\n"
|
||||
"2. routes/v1.py stop_graph_execution() cache clearing loop\n"
|
||||
"3. v2/library/routes/agents.py add_library_agent() cache clearing loop"
|
||||
)
|
||||
|
||||
# TODO: This should be verified against the actual default used in library routes
|
||||
|
||||
def test_v1_graph_executions_cache_page_size_consistency(self):
|
||||
"""
|
||||
Test v1 graph executions cache clearing uses consistent page_size.
|
||||
|
||||
Locations:
|
||||
- routes/v1.py line 937: cache_delete with page_size=25
|
||||
- v2/library/routes/presets.py line 449: cache_delete with page_size=10
|
||||
- v2/library/routes/presets.py line 452: cache_delete with page_size=25
|
||||
"""
|
||||
V1_GRAPH_EXECUTIONS_CLEARING_PAGE_SIZE = 25
|
||||
|
||||
# Note: presets.py clears BOTH page_size=10 AND page_size=25
|
||||
# This suggests there may be multiple consumers with different page sizes
|
||||
assert V1_GRAPH_EXECUTIONS_CLEARING_PAGE_SIZE == 25
|
||||
|
||||
def test_v2_store_submissions_cache_page_size_consistency(self):
|
||||
"""
|
||||
Test v2 store submissions use consistent page_size.
|
||||
|
||||
Locations that must match:
|
||||
- v2/store/routes.py line 484: default page_size=20
|
||||
- v2/store/cache.py line 18: _clear_submissions_cache uses page_size=20
|
||||
|
||||
This is already tested in test_cache_delete.py but documented here for completeness.
|
||||
"""
|
||||
V2_STORE_SUBMISSIONS_DEFAULT_PAGE_SIZE = 20
|
||||
V2_STORE_SUBMISSIONS_CLEARING_PAGE_SIZE = 20
|
||||
|
||||
assert (
|
||||
V2_STORE_SUBMISSIONS_DEFAULT_PAGE_SIZE
|
||||
== V2_STORE_SUBMISSIONS_CLEARING_PAGE_SIZE
|
||||
), (
|
||||
"The default page_size for store submissions must match the hardcoded value in _clear_submissions_cache!\n"
|
||||
"Update both:\n"
|
||||
"1. v2/store/routes.py get_submissions() default parameter\n"
|
||||
"2. v2/store/cache.py _clear_submissions_cache() hardcoded page_size"
|
||||
)
|
||||
|
||||
def test_v2_library_presets_cache_page_size_consistency(self):
|
||||
"""
|
||||
Test v2 library presets cache clearing uses consistent page_size.
|
||||
|
||||
Locations:
|
||||
- v2/library/routes/presets.py line 36: cache_delete with page_size=10
|
||||
- v2/library/routes/presets.py line 39: cache_delete with page_size=20
|
||||
|
||||
This route clears BOTH page_size=10 and page_size=20, suggesting multiple consumers.
|
||||
"""
|
||||
V2_LIBRARY_PRESETS_CLEARING_PAGE_SIZES = [10, 20]
|
||||
|
||||
assert 10 in V2_LIBRARY_PRESETS_CLEARING_PAGE_SIZES
|
||||
assert 20 in V2_LIBRARY_PRESETS_CLEARING_PAGE_SIZES
|
||||
|
||||
# TODO: Verify these match the actual page_size defaults used in preset routes
|
||||
|
||||
def test_cache_clearing_helper_functions_documented(self):
|
||||
"""
|
||||
Document all cache clearing helper functions and their hardcoded parameters.
|
||||
|
||||
Helper functions that wrap cache_delete with hardcoded params:
|
||||
1. v2/store/cache.py::_clear_submissions_cache() - hardcodes page_size=20, num_pages=20
|
||||
2. v2/library/routes/presets.py::_clear_presets_list_cache() - hardcodes page_size=10 AND 20, num_pages=20
|
||||
|
||||
These helpers are DANGEROUS because:
|
||||
- They hide the hardcoded parameters
|
||||
- They loop through multiple pages with hardcoded page_size
|
||||
- If the route default changes, these won't clear the right cache entries
|
||||
"""
|
||||
HELPER_FUNCTIONS = {
|
||||
"_clear_submissions_cache": {
|
||||
"file": "v2/store/cache.py",
|
||||
"page_size": 20,
|
||||
"num_pages": 20,
|
||||
"risk": "HIGH - single page_size, could miss entries if default changes",
|
||||
},
|
||||
"_clear_presets_list_cache": {
|
||||
"file": "v2/library/routes/presets.py",
|
||||
"page_size": [10, 20],
|
||||
"num_pages": 20,
|
||||
"risk": "MEDIUM - clears multiple page_sizes, but could still miss new ones",
|
||||
},
|
||||
}
|
||||
|
||||
assert (
|
||||
len(HELPER_FUNCTIONS) == 2
|
||||
), "If you add new cache clearing helper functions, document them here!"
|
||||
|
||||
def test_cache_delete_without_page_loops_are_risky(self):
|
||||
"""
|
||||
Document cache_delete calls that clear only page=1 (risky if there are multiple pages).
|
||||
|
||||
Single page cache_delete calls:
|
||||
- routes/v1.py line 765: Only clears page=1 with page_size=250
|
||||
- routes/v1.py line 791: Only clears page=1 with page_size=250
|
||||
- routes/v1.py line 859: Only clears page=1 with page_size=250
|
||||
|
||||
These are RISKY because:
|
||||
- If a user has more than one page of graphs, pages 2+ won't be invalidated
|
||||
- User could see stale data on pagination
|
||||
|
||||
RECOMMENDATION: Use cache_clear() or loop through multiple pages like
|
||||
_clear_submissions_cache does.
|
||||
"""
|
||||
SINGLE_PAGE_CLEARS = [
|
||||
"routes/v1.py line 765: create_graph clears only page=1",
|
||||
"routes/v1.py line 791: delete_graph clears only page=1",
|
||||
"routes/v1.py line 859: update_graph_metadata clears only page=1",
|
||||
]
|
||||
|
||||
# This test documents the issue but doesn't fail
|
||||
# Consider this a TODO to fix these cache clearing strategies
|
||||
assert (
|
||||
len(SINGLE_PAGE_CLEARS) >= 3
|
||||
), "These cache_delete calls should probably loop through multiple pages"
|
||||
|
||||
def test_all_cached_functions_have_proper_invalidation(self):
|
||||
"""
|
||||
Verify all @cached functions have corresponding cache_delete calls.
|
||||
|
||||
Functions with proper invalidation:
|
||||
✓ get_cached_user_profile - cleared on profile update
|
||||
✓ get_cached_store_agents - cleared on admin review (cache_clear)
|
||||
✓ get_cached_submissions - cleared via _clear_submissions_cache helper
|
||||
✓ get_cached_graphs - cleared on graph mutations
|
||||
✓ get_cached_library_agents - cleared on library changes
|
||||
|
||||
Functions that might not have proper invalidation:
|
||||
? get_cached_agent_details - not explicitly cleared
|
||||
? get_cached_store_creators - not explicitly cleared
|
||||
? get_cached_my_agents - not explicitly cleared (no helper function exists!)
|
||||
|
||||
This is a documentation test - actual verification requires code analysis.
|
||||
"""
|
||||
NEEDS_VERIFICATION = [
|
||||
"get_cached_agent_details",
|
||||
"get_cached_store_creators",
|
||||
"get_cached_my_agents", # NO CLEARING FUNCTION EXISTS!
|
||||
]
|
||||
|
||||
assert "get_cached_my_agents" in NEEDS_VERIFICATION, (
|
||||
"get_cached_my_agents has no cache clearing logic - this is a BUG!\n"
|
||||
"When a user creates/deletes an agent, their 'my agents' list won't update."
|
||||
)
|
||||
|
||||
|
||||
class TestCacheKeyParameterOrdering:
|
||||
"""
|
||||
Test that cache_delete calls use the same parameter order as the @cached function.
|
||||
|
||||
The @cached decorator uses function signature order to create cache keys.
|
||||
cache_delete must use the exact same order or it won't find the cached entry!
|
||||
"""
|
||||
|
||||
def test_cached_function_parameter_order_matters(self):
|
||||
"""
|
||||
Document that parameter order in cache_delete must match @cached function signature.
|
||||
|
||||
Example from v2/store/cache.py:
|
||||
|
||||
@cached(...)
|
||||
async def _get_cached_submissions(user_id: str, page: int, page_size: int):
|
||||
...
|
||||
|
||||
CORRECT: _get_cached_submissions.cache_delete(user_id, page=1, page_size=20)
|
||||
WRONG: _get_cached_submissions.cache_delete(page=1, user_id=user_id, page_size=20)
|
||||
|
||||
The cached decorator generates keys based on the POSITIONAL order, so parameter
|
||||
order must match between the function definition and cache_delete call.
|
||||
"""
|
||||
# This is a documentation test - no assertion needed
|
||||
# Real verification requires inspecting each cache_delete call
|
||||
pass
|
||||
|
||||
def test_named_parameters_vs_positional_in_cache_delete(self):
|
||||
"""
|
||||
Document best practice: use named parameters in cache_delete for safety.
|
||||
|
||||
Good practice seen in codebase:
|
||||
- cache.get_cached_graphs.cache_delete(user_id=user_id, page=1, page_size=250)
|
||||
- library_cache.get_cached_library_agents.cache_delete(user_id=user_id, page=page, page_size=10)
|
||||
|
||||
This is safer than positional arguments because:
|
||||
1. More readable
|
||||
2. Less likely to get order wrong
|
||||
3. Self-documenting what each parameter means
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -457,7 +457,8 @@ async def test_api_key_with_unicode_characters_normalization_attack(mock_request
|
||||
"""Test that Unicode normalization doesn't bypass validation."""
|
||||
# Create auth with composed Unicode character
|
||||
auth = APIKeyAuthenticator(
|
||||
header_name="X-API-Key", expected_token="café" # é is composed
|
||||
header_name="X-API-Key",
|
||||
expected_token="café", # é is composed
|
||||
)
|
||||
|
||||
# Try with decomposed version (c + a + f + e + ´)
|
||||
@@ -522,8 +523,8 @@ async def test_api_keys_with_newline_variations(mock_request):
|
||||
"valid\r\ntoken", # Windows newline
|
||||
"valid\rtoken", # Mac newline
|
||||
"valid\x85token", # NEL (Next Line)
|
||||
"valid\x0Btoken", # Vertical Tab
|
||||
"valid\x0Ctoken", # Form Feed
|
||||
"valid\x0btoken", # Vertical Tab
|
||||
"valid\x0ctoken", # Form Feed
|
||||
]
|
||||
|
||||
for api_key in newline_variations:
|
||||
|
||||
@@ -23,7 +23,6 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AutoModManager:
|
||||
|
||||
def __init__(self):
|
||||
self.config = self._load_config()
|
||||
|
||||
|
||||
@@ -7,6 +7,8 @@ import fastapi
|
||||
import fastapi.responses
|
||||
import prisma.enums
|
||||
|
||||
import backend.server.cache_config
|
||||
import backend.server.v2.store.cache
|
||||
import backend.server.v2.store.db
|
||||
import backend.server.v2.store.model
|
||||
import backend.util.json
|
||||
@@ -29,7 +31,7 @@ async def get_admin_listings_with_versions(
|
||||
status: typing.Optional[prisma.enums.SubmissionStatus] = None,
|
||||
search: typing.Optional[str] = None,
|
||||
page: int = 1,
|
||||
page_size: int = 20,
|
||||
page_size: int = backend.server.cache_config.V2_STORE_SUBMISSIONS_PAGE_SIZE,
|
||||
):
|
||||
"""
|
||||
Get store listings with their version history for admins.
|
||||
@@ -93,6 +95,8 @@ async def review_submission(
|
||||
internal_comments=request.internal_comments or "",
|
||||
reviewer_id=user_id,
|
||||
)
|
||||
backend.server.v2.store.cache._clear_submissions_cache(submission.user_id)
|
||||
backend.server.v2.store.cache._get_cached_store_agents.cache_clear()
|
||||
return submission
|
||||
except Exception as e:
|
||||
logger.exception("Error reviewing submission: %s", e)
|
||||
|
||||
@@ -2,7 +2,6 @@ import logging
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
import prisma
|
||||
from autogpt_libs.utils.cache import cached
|
||||
|
||||
import backend.data.block
|
||||
from backend.blocks import load_all_blocks
|
||||
@@ -18,6 +17,7 @@ from backend.server.v2.builder.model import (
|
||||
ProviderResponse,
|
||||
SearchBlocksResponse,
|
||||
)
|
||||
from backend.util.cache import cached
|
||||
from backend.util.models import Pagination
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -296,7 +296,7 @@ def _matches_llm_model(schema_cls: type[BlockSchema], query: str) -> bool:
|
||||
return False
|
||||
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=3600)
|
||||
def _get_all_providers() -> dict[ProviderName, Provider]:
|
||||
providers: dict[ProviderName, Provider] = {}
|
||||
|
||||
|
||||
111
autogpt_platform/backend/backend/server/v2/library/cache.py
Normal file
111
autogpt_platform/backend/backend/server/v2/library/cache.py
Normal file
@@ -0,0 +1,111 @@
|
||||
"""
|
||||
Cache functions for Library API endpoints.
|
||||
|
||||
This module contains all caching decorators and helpers for the Library API,
|
||||
separated from the main routes for better organization and maintainability.
|
||||
"""
|
||||
|
||||
import backend.server.v2.library.db
|
||||
from backend.util.cache import cached
|
||||
|
||||
# ===== Library Agent Caches =====
|
||||
|
||||
|
||||
# Cache library agents list for 10 minutes
|
||||
@cached(maxsize=1000, ttl_seconds=600, shared_cache=True)
|
||||
async def get_cached_library_agents(
|
||||
user_id: str,
|
||||
page: int = 1,
|
||||
page_size: int = 20,
|
||||
):
|
||||
"""Cached helper to get library agents list."""
|
||||
return await backend.server.v2.library.db.list_library_agents(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
|
||||
|
||||
# Cache user's favorite agents for 5 minutes - favorites change more frequently
|
||||
@cached(maxsize=500, ttl_seconds=300, shared_cache=True)
|
||||
async def get_cached_library_agent_favorites(
|
||||
user_id: str,
|
||||
page: int = 1,
|
||||
page_size: int = 20,
|
||||
):
|
||||
"""Cached helper to get user's favorite library agents."""
|
||||
return await backend.server.v2.library.db.list_favorite_library_agents(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
|
||||
|
||||
# Cache individual library agent details for 30 minutes
|
||||
@cached(maxsize=1000, ttl_seconds=1800, shared_cache=True)
|
||||
async def get_cached_library_agent(
|
||||
library_agent_id: str,
|
||||
user_id: str,
|
||||
):
|
||||
"""Cached helper to get library agent details."""
|
||||
return await backend.server.v2.library.db.get_library_agent(
|
||||
id=library_agent_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
|
||||
# Cache library agent by graph ID for 30 minutes
|
||||
@cached(maxsize=1000, ttl_seconds=1800, shared_cache=True)
|
||||
async def get_cached_library_agent_by_graph_id(
|
||||
graph_id: str,
|
||||
user_id: str,
|
||||
):
|
||||
"""Cached helper to get library agent by graph ID."""
|
||||
return await backend.server.v2.library.db.get_library_agent_by_graph_id(
|
||||
graph_id=graph_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
|
||||
# Cache library agent by store version ID for 1 hour - marketplace agents are more stable
|
||||
@cached(maxsize=500, ttl_seconds=3600, shared_cache=True)
|
||||
async def get_cached_library_agent_by_store_version(
|
||||
store_listing_version_id: str,
|
||||
user_id: str,
|
||||
):
|
||||
"""Cached helper to get library agent by store version ID."""
|
||||
return await backend.server.v2.library.db.get_library_agent_by_store_version_id(
|
||||
store_listing_version_id=store_listing_version_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
|
||||
# ===== Library Preset Caches =====
|
||||
|
||||
|
||||
# Cache library presets list for 30 minutes
|
||||
@cached(maxsize=500, ttl_seconds=1800, shared_cache=True)
|
||||
async def get_cached_library_presets(
|
||||
user_id: str,
|
||||
page: int = 1,
|
||||
page_size: int = 20,
|
||||
):
|
||||
"""Cached helper to get library presets list."""
|
||||
return await backend.server.v2.library.db.list_presets(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
|
||||
|
||||
# Cache individual preset details for 30 minutes
|
||||
@cached(maxsize=1000, ttl_seconds=1800, shared_cache=True)
|
||||
async def get_cached_library_preset(
|
||||
preset_id: str,
|
||||
user_id: str,
|
||||
):
|
||||
"""Cached helper to get library preset details."""
|
||||
return await backend.server.v2.library.db.get_preset(
|
||||
preset_id=preset_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
286
autogpt_platform/backend/backend/server/v2/library/cache_test.py
Normal file
286
autogpt_platform/backend/backend/server/v2/library/cache_test.py
Normal file
@@ -0,0 +1,286 @@
|
||||
"""
|
||||
Tests for cache invalidation in Library API routes.
|
||||
|
||||
This module tests that library caches are properly invalidated when data is modified.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
from unittest.mock import AsyncMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
import backend.server.v2.library.cache as library_cache
|
||||
import backend.server.v2.library.db as library_db
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_user_id():
|
||||
"""Generate a mock user ID for testing."""
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_library_agent_id():
|
||||
"""Generate a mock library agent ID for testing."""
|
||||
return str(uuid.uuid4())
|
||||
|
||||
|
||||
class TestLibraryAgentCacheInvalidation:
|
||||
"""Test cache invalidation for library agent operations."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_add_agent_clears_list_cache(self, mock_user_id):
|
||||
"""Test that adding an agent clears the library agents list cache."""
|
||||
# Clear cache
|
||||
library_cache.get_cached_library_agents.cache_clear()
|
||||
|
||||
with patch.object(
|
||||
library_db, "list_library_agents", new_callable=AsyncMock
|
||||
) as mock_list:
|
||||
mock_response = {"agents": [], "total_count": 0, "page": 1, "page_size": 20}
|
||||
mock_list.return_value = mock_response
|
||||
|
||||
# First call hits database
|
||||
await library_cache.get_cached_library_agents(mock_user_id, 1, 20)
|
||||
assert mock_list.call_count == 1
|
||||
|
||||
# Second call uses cache
|
||||
await library_cache.get_cached_library_agents(mock_user_id, 1, 20)
|
||||
assert mock_list.call_count == 1 # Still 1, cache used
|
||||
|
||||
# Simulate adding an agent (cache invalidation)
|
||||
for page in range(1, 5):
|
||||
library_cache.get_cached_library_agents.cache_delete(
|
||||
mock_user_id, page, 15
|
||||
)
|
||||
library_cache.get_cached_library_agents.cache_delete(
|
||||
mock_user_id, page, 20
|
||||
)
|
||||
|
||||
# Next call should hit database
|
||||
await library_cache.get_cached_library_agents(mock_user_id, 1, 20)
|
||||
assert mock_list.call_count == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_delete_agent_clears_multiple_caches(
|
||||
self, mock_user_id, mock_library_agent_id
|
||||
):
|
||||
"""Test that deleting an agent clears both specific and list caches."""
|
||||
# Clear caches
|
||||
library_cache.get_cached_library_agent.cache_clear()
|
||||
library_cache.get_cached_library_agents.cache_clear()
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
library_db, "get_library_agent", new_callable=AsyncMock
|
||||
) as mock_get,
|
||||
patch.object(
|
||||
library_db, "list_library_agents", new_callable=AsyncMock
|
||||
) as mock_list,
|
||||
):
|
||||
mock_agent = {"id": mock_library_agent_id, "name": "Test Agent"}
|
||||
mock_get.return_value = mock_agent
|
||||
mock_list.return_value = {
|
||||
"agents": [mock_agent],
|
||||
"total_count": 1,
|
||||
"page": 1,
|
||||
"page_size": 20,
|
||||
}
|
||||
|
||||
# Populate caches
|
||||
await library_cache.get_cached_library_agent(
|
||||
mock_library_agent_id, mock_user_id
|
||||
)
|
||||
await library_cache.get_cached_library_agents(mock_user_id, 1, 20)
|
||||
|
||||
initial_calls = {
|
||||
"get": mock_get.call_count,
|
||||
"list": mock_list.call_count,
|
||||
}
|
||||
|
||||
# Verify cache is used
|
||||
await library_cache.get_cached_library_agent(
|
||||
mock_library_agent_id, mock_user_id
|
||||
)
|
||||
await library_cache.get_cached_library_agents(mock_user_id, 1, 20)
|
||||
|
||||
assert mock_get.call_count == initial_calls["get"]
|
||||
assert mock_list.call_count == initial_calls["list"]
|
||||
|
||||
# Simulate delete_library_agent cache invalidation
|
||||
library_cache.get_cached_library_agent.cache_delete(
|
||||
mock_library_agent_id, mock_user_id
|
||||
)
|
||||
for page in range(1, 5):
|
||||
library_cache.get_cached_library_agents.cache_delete(
|
||||
mock_user_id, page, 15
|
||||
)
|
||||
library_cache.get_cached_library_agents.cache_delete(
|
||||
mock_user_id, page, 20
|
||||
)
|
||||
|
||||
# Next calls should hit database
|
||||
await library_cache.get_cached_library_agent(
|
||||
mock_library_agent_id, mock_user_id
|
||||
)
|
||||
await library_cache.get_cached_library_agents(mock_user_id, 1, 20)
|
||||
|
||||
assert mock_get.call_count == initial_calls["get"] + 1
|
||||
assert mock_list.call_count == initial_calls["list"] + 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_favorites_cache_operations(self, mock_user_id):
|
||||
"""Test that favorites cache works independently."""
|
||||
# Clear cache
|
||||
library_cache.get_cached_library_agent_favorites.cache_clear()
|
||||
|
||||
with patch.object(
|
||||
library_db, "list_favorite_library_agents", new_callable=AsyncMock
|
||||
) as mock_favs:
|
||||
mock_response = {"agents": [], "total_count": 0, "page": 1, "page_size": 20}
|
||||
mock_favs.return_value = mock_response
|
||||
|
||||
# First call hits database
|
||||
await library_cache.get_cached_library_agent_favorites(mock_user_id, 1, 20)
|
||||
assert mock_favs.call_count == 1
|
||||
|
||||
# Second call uses cache
|
||||
await library_cache.get_cached_library_agent_favorites(mock_user_id, 1, 20)
|
||||
assert mock_favs.call_count == 1 # Cache used
|
||||
|
||||
# Clear cache
|
||||
library_cache.get_cached_library_agent_favorites.cache_delete(
|
||||
mock_user_id, 1, 20
|
||||
)
|
||||
|
||||
# Next call hits database
|
||||
await library_cache.get_cached_library_agent_favorites(mock_user_id, 1, 20)
|
||||
assert mock_favs.call_count == 2
|
||||
|
||||
|
||||
class TestLibraryPresetCacheInvalidation:
|
||||
"""Test cache invalidation for library preset operations."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_preset_cache_operations(self, mock_user_id):
|
||||
"""Test preset cache and invalidation."""
|
||||
# Clear cache
|
||||
library_cache.get_cached_library_presets.cache_clear()
|
||||
library_cache.get_cached_library_preset.cache_clear()
|
||||
|
||||
preset_id = str(uuid.uuid4())
|
||||
|
||||
with (
|
||||
patch.object(
|
||||
library_db, "list_presets", new_callable=AsyncMock
|
||||
) as mock_list,
|
||||
patch.object(library_db, "get_preset", new_callable=AsyncMock) as mock_get,
|
||||
):
|
||||
mock_preset = {"id": preset_id, "name": "Test Preset"}
|
||||
mock_list.return_value = {
|
||||
"presets": [mock_preset],
|
||||
"total_count": 1,
|
||||
"page": 1,
|
||||
"page_size": 20,
|
||||
}
|
||||
mock_get.return_value = mock_preset
|
||||
|
||||
# Populate caches
|
||||
await library_cache.get_cached_library_presets(mock_user_id, 1, 20)
|
||||
await library_cache.get_cached_library_preset(preset_id, mock_user_id)
|
||||
|
||||
initial_calls = {
|
||||
"list": mock_list.call_count,
|
||||
"get": mock_get.call_count,
|
||||
}
|
||||
|
||||
# Verify cache is used
|
||||
await library_cache.get_cached_library_presets(mock_user_id, 1, 20)
|
||||
await library_cache.get_cached_library_preset(preset_id, mock_user_id)
|
||||
|
||||
assert mock_list.call_count == initial_calls["list"]
|
||||
assert mock_get.call_count == initial_calls["get"]
|
||||
|
||||
# Clear specific preset cache
|
||||
library_cache.get_cached_library_preset.cache_delete(
|
||||
preset_id, mock_user_id
|
||||
)
|
||||
|
||||
# Clear list cache
|
||||
library_cache.get_cached_library_presets.cache_delete(mock_user_id, 1, 20)
|
||||
|
||||
# Next calls should hit database
|
||||
await library_cache.get_cached_library_presets(mock_user_id, 1, 20)
|
||||
await library_cache.get_cached_library_preset(preset_id, mock_user_id)
|
||||
|
||||
assert mock_list.call_count == initial_calls["list"] + 1
|
||||
assert mock_get.call_count == initial_calls["get"] + 1
|
||||
|
||||
|
||||
class TestLibraryCacheMetrics:
|
||||
"""Test library cache metrics and management."""
|
||||
|
||||
def test_cache_info_structure(self):
|
||||
"""Test that cache_info returns expected structure."""
|
||||
info = library_cache.get_cached_library_agents.cache_info()
|
||||
|
||||
assert "size" in info
|
||||
assert "maxsize" in info
|
||||
assert "ttl_seconds" in info
|
||||
assert (
|
||||
info["maxsize"] is None
|
||||
) # Redis manages its own size with shared_cache=True
|
||||
assert info["ttl_seconds"] == 600 # 10 minutes
|
||||
|
||||
def test_all_library_caches_can_be_cleared(self):
|
||||
"""Test that all library caches can be cleared."""
|
||||
# Clear all library caches
|
||||
library_cache.get_cached_library_agents.cache_clear()
|
||||
library_cache.get_cached_library_agent_favorites.cache_clear()
|
||||
library_cache.get_cached_library_agent.cache_clear()
|
||||
library_cache.get_cached_library_agent_by_graph_id.cache_clear()
|
||||
library_cache.get_cached_library_agent_by_store_version.cache_clear()
|
||||
library_cache.get_cached_library_presets.cache_clear()
|
||||
library_cache.get_cached_library_preset.cache_clear()
|
||||
|
||||
# Verify all are empty
|
||||
assert library_cache.get_cached_library_agents.cache_info()["size"] == 0
|
||||
assert (
|
||||
library_cache.get_cached_library_agent_favorites.cache_info()["size"] == 0
|
||||
)
|
||||
assert library_cache.get_cached_library_agent.cache_info()["size"] == 0
|
||||
assert (
|
||||
library_cache.get_cached_library_agent_by_graph_id.cache_info()["size"] == 0
|
||||
)
|
||||
assert (
|
||||
library_cache.get_cached_library_agent_by_store_version.cache_info()["size"]
|
||||
== 0
|
||||
)
|
||||
assert library_cache.get_cached_library_presets.cache_info()["size"] == 0
|
||||
assert library_cache.get_cached_library_preset.cache_info()["size"] == 0
|
||||
|
||||
def test_cache_ttl_values(self):
|
||||
"""Test that cache TTL values are set correctly."""
|
||||
# Library agents - 10 minutes
|
||||
assert (
|
||||
library_cache.get_cached_library_agents.cache_info()["ttl_seconds"] == 600
|
||||
)
|
||||
|
||||
# Favorites - 5 minutes (more dynamic)
|
||||
assert (
|
||||
library_cache.get_cached_library_agent_favorites.cache_info()["ttl_seconds"]
|
||||
== 300
|
||||
)
|
||||
|
||||
# Individual agent - 30 minutes
|
||||
assert (
|
||||
library_cache.get_cached_library_agent.cache_info()["ttl_seconds"] == 1800
|
||||
)
|
||||
|
||||
# Presets - 30 minutes
|
||||
assert (
|
||||
library_cache.get_cached_library_presets.cache_info()["ttl_seconds"] == 1800
|
||||
)
|
||||
assert (
|
||||
library_cache.get_cached_library_preset.cache_info()["ttl_seconds"] == 1800
|
||||
)
|
||||
@@ -5,6 +5,8 @@ import autogpt_libs.auth as autogpt_auth_lib
|
||||
from fastapi import APIRouter, Body, HTTPException, Query, Security, status
|
||||
from fastapi.responses import Response
|
||||
|
||||
import backend.server.cache_config
|
||||
import backend.server.v2.library.cache as library_cache
|
||||
import backend.server.v2.library.db as library_db
|
||||
import backend.server.v2.library.model as library_model
|
||||
import backend.server.v2.store.exceptions as store_exceptions
|
||||
@@ -64,13 +66,22 @@ async def list_library_agents(
|
||||
HTTPException: If a server/database error occurs.
|
||||
"""
|
||||
try:
|
||||
return await library_db.list_library_agents(
|
||||
user_id=user_id,
|
||||
search_term=search_term,
|
||||
sort_by=sort_by,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
# Use cache for default queries (no search term, default sort)
|
||||
if search_term is None and sort_by == library_model.LibraryAgentSort.UPDATED_AT:
|
||||
return await library_cache.get_cached_library_agents(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
else:
|
||||
# Direct DB query for searches and custom sorts
|
||||
return await library_db.list_library_agents(
|
||||
user_id=user_id,
|
||||
search_term=search_term,
|
||||
sort_by=sort_by,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Could not list library agents for user #{user_id}: {e}")
|
||||
raise HTTPException(
|
||||
@@ -114,7 +125,7 @@ async def list_favorite_library_agents(
|
||||
HTTPException: If a server/database error occurs.
|
||||
"""
|
||||
try:
|
||||
return await library_db.list_favorite_library_agents(
|
||||
return await library_cache.get_cached_library_agent_favorites(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
@@ -132,7 +143,9 @@ async def get_library_agent(
|
||||
library_agent_id: str,
|
||||
user_id: str = Security(autogpt_auth_lib.get_user_id),
|
||||
) -> library_model.LibraryAgent:
|
||||
return await library_db.get_library_agent(id=library_agent_id, user_id=user_id)
|
||||
return await library_cache.get_cached_library_agent(
|
||||
library_agent_id=library_agent_id, user_id=user_id
|
||||
)
|
||||
|
||||
|
||||
@router.get("/by-graph/{graph_id}")
|
||||
@@ -210,11 +223,21 @@ async def add_marketplace_agent_to_library(
|
||||
HTTPException(500): If a server/database error occurs.
|
||||
"""
|
||||
try:
|
||||
return await library_db.add_store_agent_to_library(
|
||||
result = await library_db.add_store_agent_to_library(
|
||||
store_listing_version_id=store_listing_version_id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
# Clear library caches after adding new agent
|
||||
for page in range(1, backend.server.cache_config.MAX_PAGES_TO_CLEAR):
|
||||
library_cache.get_cached_library_agents.cache_delete(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=backend.server.cache_config.V2_LIBRARY_AGENTS_PAGE_SIZE,
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
except store_exceptions.AgentNotFoundError as e:
|
||||
logger.warning(
|
||||
f"Could not find store listing version {store_listing_version_id} "
|
||||
@@ -263,13 +286,22 @@ async def update_library_agent(
|
||||
HTTPException(500): If a server/database error occurs.
|
||||
"""
|
||||
try:
|
||||
return await library_db.update_library_agent(
|
||||
result = await library_db.update_library_agent(
|
||||
library_agent_id=library_agent_id,
|
||||
user_id=user_id,
|
||||
auto_update_version=payload.auto_update_version,
|
||||
is_favorite=payload.is_favorite,
|
||||
is_archived=payload.is_archived,
|
||||
)
|
||||
|
||||
for page in range(1, backend.server.cache_config.MAX_PAGES_TO_CLEAR):
|
||||
library_cache.get_cached_library_agent_favorites.cache_delete(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=backend.server.cache_config.V2_LIBRARY_AGENTS_PAGE_SIZE,
|
||||
)
|
||||
|
||||
return result
|
||||
except NotFoundError as e:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
@@ -320,6 +352,18 @@ async def delete_library_agent(
|
||||
await library_db.delete_library_agent(
|
||||
library_agent_id=library_agent_id, user_id=user_id
|
||||
)
|
||||
|
||||
# Clear caches after deleting agent
|
||||
library_cache.get_cached_library_agent.cache_delete(
|
||||
library_agent_id=library_agent_id, user_id=user_id
|
||||
)
|
||||
for page in range(1, backend.server.cache_config.MAX_PAGES_TO_CLEAR):
|
||||
library_cache.get_cached_library_agents.cache_delete(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=backend.server.cache_config.V2_LIBRARY_AGENTS_PAGE_SIZE,
|
||||
)
|
||||
|
||||
return Response(status_code=status.HTTP_204_NO_CONTENT)
|
||||
except NotFoundError as e:
|
||||
raise HTTPException(
|
||||
|
||||
@@ -4,6 +4,9 @@ from typing import Any, Optional
|
||||
import autogpt_libs.auth as autogpt_auth_lib
|
||||
from fastapi import APIRouter, Body, HTTPException, Query, Security, status
|
||||
|
||||
import backend.server.cache_config
|
||||
import backend.server.routers.cache as cache
|
||||
import backend.server.v2.library.cache as library_cache
|
||||
import backend.server.v2.library.db as db
|
||||
import backend.server.v2.library.model as models
|
||||
from backend.data.execution import GraphExecutionMeta
|
||||
@@ -25,6 +28,24 @@ router = APIRouter(
|
||||
)
|
||||
|
||||
|
||||
def _clear_presets_list_cache(
|
||||
user_id: str, num_pages: int = backend.server.cache_config.MAX_PAGES_TO_CLEAR
|
||||
):
|
||||
"""
|
||||
Clear the presets list cache for the given user.
|
||||
Clears both primary and alternative page sizes for backward compatibility.
|
||||
"""
|
||||
page_sizes = backend.server.cache_config.get_page_sizes_for_clearing(
|
||||
backend.server.cache_config.V2_LIBRARY_PRESETS_PAGE_SIZE,
|
||||
backend.server.cache_config.V2_LIBRARY_PRESETS_ALT_PAGE_SIZE,
|
||||
)
|
||||
for page in range(1, num_pages + 1):
|
||||
for page_size in page_sizes:
|
||||
library_cache.get_cached_library_presets.cache_delete(
|
||||
user_id=user_id, page=page, page_size=page_size
|
||||
)
|
||||
|
||||
|
||||
@router.get(
|
||||
"/presets",
|
||||
summary="List presets",
|
||||
@@ -51,12 +72,21 @@ async def list_presets(
|
||||
models.LibraryAgentPresetResponse: A response containing the list of presets.
|
||||
"""
|
||||
try:
|
||||
return await db.list_presets(
|
||||
user_id=user_id,
|
||||
graph_id=graph_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
# Use cache only for default queries (no filter)
|
||||
if graph_id is None:
|
||||
return await library_cache.get_cached_library_presets(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
else:
|
||||
# Direct DB query for filtered requests
|
||||
return await db.list_presets(
|
||||
user_id=user_id,
|
||||
graph_id=graph_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.exception("Failed to list presets for user %s: %s", user_id, e)
|
||||
raise HTTPException(
|
||||
@@ -87,7 +117,7 @@ async def get_preset(
|
||||
HTTPException: If the preset is not found or an error occurs.
|
||||
"""
|
||||
try:
|
||||
preset = await db.get_preset(user_id, preset_id)
|
||||
preset = await library_cache.get_cached_library_preset(preset_id, user_id)
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Error retrieving preset %s for user %s: %s", preset_id, user_id, e
|
||||
@@ -131,9 +161,13 @@ async def create_preset(
|
||||
"""
|
||||
try:
|
||||
if isinstance(preset, models.LibraryAgentPresetCreatable):
|
||||
return await db.create_preset(user_id, preset)
|
||||
result = await db.create_preset(user_id, preset)
|
||||
else:
|
||||
return await db.create_preset_from_graph_execution(user_id, preset)
|
||||
result = await db.create_preset_from_graph_execution(user_id, preset)
|
||||
|
||||
_clear_presets_list_cache(user_id)
|
||||
|
||||
return result
|
||||
except NotFoundError as e:
|
||||
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
|
||||
except Exception as e:
|
||||
@@ -200,6 +234,9 @@ async def setup_trigger(
|
||||
is_active=True,
|
||||
),
|
||||
)
|
||||
|
||||
_clear_presets_list_cache(user_id)
|
||||
|
||||
return new_preset
|
||||
|
||||
|
||||
@@ -278,6 +315,13 @@ async def update_preset(
|
||||
description=preset.description,
|
||||
is_active=preset.is_active,
|
||||
)
|
||||
|
||||
# Clear caches after updating preset
|
||||
library_cache.get_cached_library_preset.cache_delete(
|
||||
preset_id=preset_id, user_id=user_id
|
||||
)
|
||||
_clear_presets_list_cache(user_id)
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Preset update failed for user %s: %s", user_id, e)
|
||||
raise HTTPException(
|
||||
@@ -351,6 +395,12 @@ async def delete_preset(
|
||||
|
||||
try:
|
||||
await db.delete_preset(user_id, preset_id)
|
||||
|
||||
# Clear caches after deleting preset
|
||||
library_cache.get_cached_library_preset.cache_delete(
|
||||
preset_id=preset_id, user_id=user_id
|
||||
)
|
||||
_clear_presets_list_cache(user_id)
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Error deleting preset %s for user %s: %s", preset_id, user_id, e
|
||||
@@ -401,6 +451,33 @@ async def execute_preset(
|
||||
merged_node_input = preset.inputs | inputs
|
||||
merged_credential_inputs = preset.credentials | credential_inputs
|
||||
|
||||
# Clear graph executions cache - use both page sizes for compatibility
|
||||
for page in range(1, 10):
|
||||
# Clear with alternative page size
|
||||
cache.get_cached_graph_executions.cache_delete(
|
||||
graph_id=preset.graph_id,
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=backend.server.cache_config.V2_GRAPH_EXECUTIONS_ALT_PAGE_SIZE,
|
||||
)
|
||||
cache.get_cached_graph_executions.cache_delete(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=backend.server.cache_config.V2_GRAPH_EXECUTIONS_ALT_PAGE_SIZE,
|
||||
)
|
||||
# Clear with v1 page size (25)
|
||||
cache.get_cached_graph_executions.cache_delete(
|
||||
graph_id=preset.graph_id,
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=backend.server.cache_config.V1_GRAPH_EXECUTIONS_PAGE_SIZE,
|
||||
)
|
||||
cache.get_cached_graph_executions.cache_delete(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=backend.server.cache_config.V1_GRAPH_EXECUTIONS_PAGE_SIZE,
|
||||
)
|
||||
|
||||
return await add_graph_execution(
|
||||
user_id=user_id,
|
||||
graph_id=preset.graph_id,
|
||||
|
||||
@@ -179,14 +179,15 @@ async def test_get_favorite_library_agents_success(
|
||||
def test_get_favorite_library_agents_error(
|
||||
mocker: pytest_mock.MockFixture, test_user_id: str
|
||||
):
|
||||
mock_db_call = mocker.patch(
|
||||
"backend.server.v2.library.db.list_favorite_library_agents"
|
||||
# Mock the cache function instead of the DB directly since routes now use cache
|
||||
mock_cache_call = mocker.patch(
|
||||
"backend.server.v2.library.routes.agents.library_cache.get_cached_library_agent_favorites"
|
||||
)
|
||||
mock_db_call.side_effect = Exception("Test error")
|
||||
mock_cache_call.side_effect = Exception("Test error")
|
||||
|
||||
response = client.get("/agents/favorites")
|
||||
assert response.status_code == 500
|
||||
mock_db_call.assert_called_once_with(
|
||||
mock_cache_call.assert_called_once_with(
|
||||
user_id=test_user_id,
|
||||
page=1,
|
||||
page_size=15,
|
||||
|
||||
152
autogpt_platform/backend/backend/server/v2/store/cache.py
Normal file
152
autogpt_platform/backend/backend/server/v2/store/cache.py
Normal file
@@ -0,0 +1,152 @@
|
||||
"""
|
||||
Cache functions for Store API endpoints.
|
||||
|
||||
This module contains all caching decorators and helpers for the Store API,
|
||||
separated from the main routes for better organization and maintainability.
|
||||
"""
|
||||
|
||||
import backend.server.cache_config
|
||||
import backend.server.v2.store.db
|
||||
from backend.util.cache import cached
|
||||
|
||||
|
||||
def _clear_submissions_cache(
|
||||
user_id: str, num_pages: int = backend.server.cache_config.MAX_PAGES_TO_CLEAR
|
||||
):
|
||||
"""
|
||||
Clear the submissions cache for the given user.
|
||||
|
||||
Args:
|
||||
user_id: User ID whose cache should be cleared
|
||||
num_pages: Number of pages to clear (default from cache_config)
|
||||
"""
|
||||
for page in range(1, num_pages + 1):
|
||||
_get_cached_submissions.cache_delete(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=backend.server.cache_config.V2_STORE_SUBMISSIONS_PAGE_SIZE,
|
||||
)
|
||||
|
||||
|
||||
def _clear_my_agents_cache(
|
||||
user_id: str, num_pages: int = backend.server.cache_config.MAX_PAGES_TO_CLEAR
|
||||
):
|
||||
"""
|
||||
Clear the my agents cache for the given user.
|
||||
|
||||
Args:
|
||||
user_id: User ID whose cache should be cleared
|
||||
num_pages: Number of pages to clear (default from cache_config)
|
||||
"""
|
||||
for page in range(1, num_pages + 1):
|
||||
_get_cached_my_agents.cache_delete(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=backend.server.cache_config.V2_MY_AGENTS_PAGE_SIZE,
|
||||
)
|
||||
|
||||
|
||||
# Cache user profiles for 1 hour per user
|
||||
@cached(maxsize=1000, ttl_seconds=3600, shared_cache=True)
|
||||
async def _get_cached_user_profile(user_id: str):
|
||||
"""Cached helper to get user profile."""
|
||||
return await backend.server.v2.store.db.get_user_profile(user_id)
|
||||
|
||||
|
||||
# Cache store agents list for 15 minutes
|
||||
# Different cache entries for different query combinations
|
||||
@cached(maxsize=5000, ttl_seconds=900, shared_cache=True)
|
||||
async def _get_cached_store_agents(
|
||||
featured: bool,
|
||||
creator: str | None,
|
||||
sorted_by: str | None,
|
||||
search_query: str | None,
|
||||
category: str | None,
|
||||
page: int,
|
||||
page_size: int,
|
||||
):
|
||||
"""Cached helper to get store agents."""
|
||||
return await backend.server.v2.store.db.get_store_agents(
|
||||
featured=featured,
|
||||
creators=[creator] if creator else None,
|
||||
sorted_by=sorted_by,
|
||||
search_query=search_query,
|
||||
category=category,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
|
||||
|
||||
# Cache individual agent details for 15 minutes
|
||||
@cached(maxsize=200, ttl_seconds=900, shared_cache=True)
|
||||
async def _get_cached_agent_details(username: str, agent_name: str):
|
||||
"""Cached helper to get agent details."""
|
||||
return await backend.server.v2.store.db.get_store_agent_details(
|
||||
username=username, agent_name=agent_name
|
||||
)
|
||||
|
||||
|
||||
# Cache agent graphs for 1 hour
|
||||
@cached(maxsize=200, ttl_seconds=3600, shared_cache=True)
|
||||
async def _get_cached_agent_graph(store_listing_version_id: str):
|
||||
"""Cached helper to get agent graph."""
|
||||
return await backend.server.v2.store.db.get_available_graph(
|
||||
store_listing_version_id
|
||||
)
|
||||
|
||||
|
||||
# Cache agent by version for 1 hour
|
||||
@cached(maxsize=200, ttl_seconds=3600, shared_cache=True)
|
||||
async def _get_cached_store_agent_by_version(store_listing_version_id: str):
|
||||
"""Cached helper to get store agent by version ID."""
|
||||
return await backend.server.v2.store.db.get_store_agent_by_version_id(
|
||||
store_listing_version_id
|
||||
)
|
||||
|
||||
|
||||
# Cache creators list for 1 hour
|
||||
@cached(maxsize=200, ttl_seconds=3600, shared_cache=True)
|
||||
async def _get_cached_store_creators(
|
||||
featured: bool,
|
||||
search_query: str | None,
|
||||
sorted_by: str | None,
|
||||
page: int,
|
||||
page_size: int,
|
||||
):
|
||||
"""Cached helper to get store creators."""
|
||||
return await backend.server.v2.store.db.get_store_creators(
|
||||
featured=featured,
|
||||
search_query=search_query,
|
||||
sorted_by=sorted_by,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
|
||||
|
||||
# Cache individual creator details for 1 hour
|
||||
@cached(maxsize=100, ttl_seconds=3600, shared_cache=True)
|
||||
async def _get_cached_creator_details(username: str):
|
||||
"""Cached helper to get creator details."""
|
||||
return await backend.server.v2.store.db.get_store_creator_details(
|
||||
username=username.lower()
|
||||
)
|
||||
|
||||
|
||||
# Cache user's own agents for 5 mins (shorter TTL as this changes more frequently)
|
||||
@cached(maxsize=500, ttl_seconds=300, shared_cache=True)
|
||||
async def _get_cached_my_agents(user_id: str, page: int, page_size: int):
|
||||
"""Cached helper to get user's agents."""
|
||||
return await backend.server.v2.store.db.get_my_agents(
|
||||
user_id, page=page, page_size=page_size
|
||||
)
|
||||
|
||||
|
||||
# Cache user's submissions for 1 hour (shorter TTL as this changes frequently)
|
||||
@cached(maxsize=500, ttl_seconds=3600, shared_cache=True)
|
||||
async def _get_cached_submissions(user_id: str, page: int, page_size: int):
|
||||
"""Cached helper to get user's submissions."""
|
||||
return await backend.server.v2.store.db.get_store_submissions(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
@@ -493,6 +493,7 @@ async def get_store_submissions(
|
||||
submission_models = []
|
||||
for sub in submissions:
|
||||
submission_model = backend.server.v2.store.model.StoreSubmission(
|
||||
user_id=sub.user_id,
|
||||
agent_id=sub.agent_id,
|
||||
agent_version=sub.agent_version,
|
||||
name=sub.name,
|
||||
@@ -710,6 +711,7 @@ async def create_store_submission(
|
||||
logger.debug(f"Created store listing for agent {agent_id}")
|
||||
# Return submission details
|
||||
return backend.server.v2.store.model.StoreSubmission(
|
||||
user_id=user_id,
|
||||
agent_id=agent_id,
|
||||
agent_version=agent_version,
|
||||
name=name,
|
||||
@@ -860,6 +862,7 @@ async def edit_store_submission(
|
||||
"Failed to update store listing version"
|
||||
)
|
||||
return backend.server.v2.store.model.StoreSubmission(
|
||||
user_id=user_id,
|
||||
agent_id=current_version.agentGraphId,
|
||||
agent_version=current_version.agentGraphVersion,
|
||||
name=name,
|
||||
@@ -993,6 +996,7 @@ async def create_store_version(
|
||||
)
|
||||
# Return submission details
|
||||
return backend.server.v2.store.model.StoreSubmission(
|
||||
user_id=user_id,
|
||||
agent_id=agent_id,
|
||||
agent_version=agent_version,
|
||||
name=name,
|
||||
@@ -1493,7 +1497,7 @@ async def review_store_submission(
|
||||
include={"StoreListing": True},
|
||||
)
|
||||
|
||||
if not submission:
|
||||
if not submission or not submission.StoreListing:
|
||||
raise backend.server.v2.store.exceptions.DatabaseError(
|
||||
f"Failed to update store listing version {store_listing_version_id}"
|
||||
)
|
||||
@@ -1583,6 +1587,7 @@ async def review_store_submission(
|
||||
|
||||
# Convert to Pydantic model for consistency
|
||||
return backend.server.v2.store.model.StoreSubmission(
|
||||
user_id=submission.StoreListing.owningUserId,
|
||||
agent_id=submission.agentGraphId,
|
||||
agent_version=submission.agentGraphVersion,
|
||||
name=submission.name,
|
||||
@@ -1715,14 +1720,17 @@ async def get_admin_listings_with_versions(
|
||||
# Get total count for pagination
|
||||
total = await prisma.models.StoreListing.prisma().count(where=where)
|
||||
total_pages = (total + page_size - 1) // page_size
|
||||
|
||||
# Convert to response models
|
||||
listings_with_versions = []
|
||||
for listing in listings:
|
||||
versions: list[backend.server.v2.store.model.StoreSubmission] = []
|
||||
if not listing.OwningUser:
|
||||
logger.error(f"Listing {listing.id} has no owning user")
|
||||
continue
|
||||
# If we have versions, turn them into StoreSubmission models
|
||||
for version in listing.Versions or []:
|
||||
version_model = backend.server.v2.store.model.StoreSubmission(
|
||||
user_id=listing.OwningUser.id,
|
||||
agent_id=version.agentGraphId,
|
||||
agent_version=version.agentGraphVersion,
|
||||
name=version.name,
|
||||
|
||||
@@ -98,6 +98,7 @@ class Profile(pydantic.BaseModel):
|
||||
|
||||
|
||||
class StoreSubmission(pydantic.BaseModel):
|
||||
user_id: str = pydantic.Field(default="", exclude=True)
|
||||
agent_id: str
|
||||
agent_version: int
|
||||
name: str
|
||||
|
||||
@@ -135,6 +135,7 @@ def test_creator_details():
|
||||
|
||||
def test_store_submission():
|
||||
submission = backend.server.v2.store.model.StoreSubmission(
|
||||
user_id="user123",
|
||||
agent_id="agent123",
|
||||
agent_version=1,
|
||||
sub_heading="Test subheading",
|
||||
@@ -156,6 +157,7 @@ def test_store_submissions_response():
|
||||
response = backend.server.v2.store.model.StoreSubmissionsResponse(
|
||||
submissions=[
|
||||
backend.server.v2.store.model.StoreSubmission(
|
||||
user_id="user123",
|
||||
agent_id="agent123",
|
||||
agent_version=1,
|
||||
sub_heading="Test subheading",
|
||||
|
||||
@@ -6,132 +6,33 @@ import urllib.parse
|
||||
import autogpt_libs.auth
|
||||
import fastapi
|
||||
import fastapi.responses
|
||||
from autogpt_libs.utils.cache import cached
|
||||
|
||||
import backend.data.graph
|
||||
import backend.server.cache_config
|
||||
import backend.server.v2.store.db
|
||||
import backend.server.v2.store.exceptions
|
||||
import backend.server.v2.store.image_gen
|
||||
import backend.server.v2.store.media
|
||||
import backend.server.v2.store.model
|
||||
import backend.util.json
|
||||
from backend.server.v2.store.cache import (
|
||||
_clear_submissions_cache,
|
||||
_get_cached_agent_details,
|
||||
_get_cached_agent_graph,
|
||||
_get_cached_creator_details,
|
||||
_get_cached_my_agents,
|
||||
_get_cached_store_agent_by_version,
|
||||
_get_cached_store_agents,
|
||||
_get_cached_store_creators,
|
||||
_get_cached_submissions,
|
||||
_get_cached_user_profile,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = fastapi.APIRouter()
|
||||
|
||||
|
||||
##############################################
|
||||
############### Caches #######################
|
||||
##############################################
|
||||
|
||||
|
||||
# Cache user profiles for 1 hour per user
|
||||
@cached(maxsize=1000, ttl_seconds=3600)
|
||||
async def _get_cached_user_profile(user_id: str):
|
||||
"""Cached helper to get user profile."""
|
||||
return await backend.server.v2.store.db.get_user_profile(user_id)
|
||||
|
||||
|
||||
# Cache store agents list for 15 minutes
|
||||
# Different cache entries for different query combinations
|
||||
@cached(maxsize=5000, ttl_seconds=900)
|
||||
async def _get_cached_store_agents(
|
||||
featured: bool,
|
||||
creator: str | None,
|
||||
sorted_by: str | None,
|
||||
search_query: str | None,
|
||||
category: str | None,
|
||||
page: int,
|
||||
page_size: int,
|
||||
):
|
||||
"""Cached helper to get store agents."""
|
||||
return await backend.server.v2.store.db.get_store_agents(
|
||||
featured=featured,
|
||||
creators=[creator] if creator else None,
|
||||
sorted_by=sorted_by,
|
||||
search_query=search_query,
|
||||
category=category,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
|
||||
|
||||
# Cache individual agent details for 15 minutes
|
||||
@cached(maxsize=200, ttl_seconds=900)
|
||||
async def _get_cached_agent_details(username: str, agent_name: str):
|
||||
"""Cached helper to get agent details."""
|
||||
return await backend.server.v2.store.db.get_store_agent_details(
|
||||
username=username, agent_name=agent_name
|
||||
)
|
||||
|
||||
|
||||
# Cache agent graphs for 1 hour
|
||||
@cached(maxsize=200, ttl_seconds=3600)
|
||||
async def _get_cached_agent_graph(store_listing_version_id: str):
|
||||
"""Cached helper to get agent graph."""
|
||||
return await backend.server.v2.store.db.get_available_graph(
|
||||
store_listing_version_id
|
||||
)
|
||||
|
||||
|
||||
# Cache agent by version for 1 hour
|
||||
@cached(maxsize=200, ttl_seconds=3600)
|
||||
async def _get_cached_store_agent_by_version(store_listing_version_id: str):
|
||||
"""Cached helper to get store agent by version ID."""
|
||||
return await backend.server.v2.store.db.get_store_agent_by_version_id(
|
||||
store_listing_version_id
|
||||
)
|
||||
|
||||
|
||||
# Cache creators list for 1 hour
|
||||
@cached(maxsize=200, ttl_seconds=3600)
|
||||
async def _get_cached_store_creators(
|
||||
featured: bool,
|
||||
search_query: str | None,
|
||||
sorted_by: str | None,
|
||||
page: int,
|
||||
page_size: int,
|
||||
):
|
||||
"""Cached helper to get store creators."""
|
||||
return await backend.server.v2.store.db.get_store_creators(
|
||||
featured=featured,
|
||||
search_query=search_query,
|
||||
sorted_by=sorted_by,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
|
||||
|
||||
# Cache individual creator details for 1 hour
|
||||
@cached(maxsize=100, ttl_seconds=3600)
|
||||
async def _get_cached_creator_details(username: str):
|
||||
"""Cached helper to get creator details."""
|
||||
return await backend.server.v2.store.db.get_store_creator_details(
|
||||
username=username.lower()
|
||||
)
|
||||
|
||||
|
||||
# Cache user's own agents for 5 mins (shorter TTL as this changes more frequently)
|
||||
@cached(maxsize=500, ttl_seconds=300)
|
||||
async def _get_cached_my_agents(user_id: str, page: int, page_size: int):
|
||||
"""Cached helper to get user's agents."""
|
||||
return await backend.server.v2.store.db.get_my_agents(
|
||||
user_id, page=page, page_size=page_size
|
||||
)
|
||||
|
||||
|
||||
# Cache user's submissions for 1 hour (shorter TTL as this changes frequently)
|
||||
@cached(maxsize=500, ttl_seconds=3600)
|
||||
async def _get_cached_submissions(user_id: str, page: int, page_size: int):
|
||||
"""Cached helper to get user's submissions."""
|
||||
return await backend.server.v2.store.db.get_store_submissions(
|
||||
user_id=user_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
|
||||
|
||||
##############################################
|
||||
############### Profile Endpoints ############
|
||||
##############################################
|
||||
@@ -230,7 +131,7 @@ async def get_agents(
|
||||
search_query: str | None = None,
|
||||
category: str | None = None,
|
||||
page: int = 1,
|
||||
page_size: int = 20,
|
||||
page_size: int = backend.server.cache_config.V2_STORE_AGENTS_PAGE_SIZE,
|
||||
):
|
||||
"""
|
||||
Get a paginated list of agents from the store with optional filtering and sorting.
|
||||
@@ -428,7 +329,7 @@ async def get_creators(
|
||||
search_query: str | None = None,
|
||||
sorted_by: str | None = None,
|
||||
page: int = 1,
|
||||
page_size: int = 20,
|
||||
page_size: int = backend.server.cache_config.V2_STORE_CREATORS_PAGE_SIZE,
|
||||
):
|
||||
"""
|
||||
This is needed for:
|
||||
@@ -514,7 +415,9 @@ async def get_creator(
|
||||
async def get_my_agents(
|
||||
user_id: str = fastapi.Security(autogpt_libs.auth.get_user_id),
|
||||
page: typing.Annotated[int, fastapi.Query(ge=1)] = 1,
|
||||
page_size: typing.Annotated[int, fastapi.Query(ge=1)] = 20,
|
||||
page_size: typing.Annotated[
|
||||
int, fastapi.Query(ge=1)
|
||||
] = backend.server.cache_config.V2_MY_AGENTS_PAGE_SIZE,
|
||||
):
|
||||
"""
|
||||
Get user's own agents.
|
||||
@@ -560,10 +463,7 @@ async def delete_submission(
|
||||
|
||||
# Clear submissions cache for this specific user after deletion
|
||||
if result:
|
||||
# Clear user's own agents cache - we don't know all page/size combinations
|
||||
for page in range(1, 20):
|
||||
# Clear user's submissions cache for common defaults
|
||||
_get_cached_submissions.cache_delete(user_id, page=page, page_size=20)
|
||||
_clear_submissions_cache(user_id)
|
||||
|
||||
return result
|
||||
except Exception:
|
||||
@@ -584,7 +484,7 @@ async def delete_submission(
|
||||
async def get_submissions(
|
||||
user_id: str = fastapi.Security(autogpt_libs.auth.get_user_id),
|
||||
page: int = 1,
|
||||
page_size: int = 20,
|
||||
page_size: int = backend.server.cache_config.V2_STORE_SUBMISSIONS_PAGE_SIZE,
|
||||
):
|
||||
"""
|
||||
Get a paginated list of store submissions for the authenticated user.
|
||||
@@ -666,10 +566,7 @@ async def create_submission(
|
||||
recommended_schedule_cron=submission_request.recommended_schedule_cron,
|
||||
)
|
||||
|
||||
# Clear user's own agents cache - we don't know all page/size combinations
|
||||
for page in range(1, 20):
|
||||
# Clear user's submissions cache for common defaults
|
||||
_get_cached_submissions.cache_delete(user_id, page=page, page_size=20)
|
||||
_clear_submissions_cache(user_id)
|
||||
|
||||
return result
|
||||
except Exception:
|
||||
@@ -720,10 +617,7 @@ async def edit_submission(
|
||||
recommended_schedule_cron=submission_request.recommended_schedule_cron,
|
||||
)
|
||||
|
||||
# Clear user's own agents cache - we don't know all page/size combinations
|
||||
for page in range(1, 20):
|
||||
# Clear user's submissions cache for common defaults
|
||||
_get_cached_submissions.cache_delete(user_id, page=page, page_size=20)
|
||||
_clear_submissions_cache(user_id)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@@ -534,6 +534,7 @@ def test_get_submissions_success(
|
||||
mocked_value = backend.server.v2.store.model.StoreSubmissionsResponse(
|
||||
submissions=[
|
||||
backend.server.v2.store.model.StoreSubmission(
|
||||
user_id="user123",
|
||||
name="Test Agent",
|
||||
description="Test agent description",
|
||||
image_urls=["test.jpg"],
|
||||
|
||||
@@ -345,6 +345,150 @@ class TestCacheDeletion:
|
||||
)
|
||||
assert deleted is False # Different parameters, not in cache
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clear_submissions_cache_page_size_consistency(self):
|
||||
"""
|
||||
Test that _clear_submissions_cache uses the correct page_size.
|
||||
This test ensures that if the default page_size in routes changes,
|
||||
the hardcoded value in _clear_submissions_cache must also change.
|
||||
"""
|
||||
from backend.server.v2.store.model import StoreSubmissionsResponse
|
||||
|
||||
mock_response = StoreSubmissionsResponse(
|
||||
submissions=[],
|
||||
pagination=Pagination(
|
||||
total_items=0,
|
||||
total_pages=1,
|
||||
current_page=1,
|
||||
page_size=20,
|
||||
),
|
||||
)
|
||||
|
||||
with patch(
|
||||
"backend.server.v2.store.db.get_store_submissions",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_response,
|
||||
):
|
||||
# Clear cache first
|
||||
routes._get_cached_submissions.cache_clear()
|
||||
|
||||
# Populate cache with multiple pages using the default page_size
|
||||
DEFAULT_PAGE_SIZE = 20 # This should match the default in routes.py
|
||||
user_id = "test_user"
|
||||
|
||||
# Add entries for pages 1-5
|
||||
for page in range(1, 6):
|
||||
await routes._get_cached_submissions(
|
||||
user_id=user_id, page=page, page_size=DEFAULT_PAGE_SIZE
|
||||
)
|
||||
|
||||
# Verify cache has entries
|
||||
cache_info_before = routes._get_cached_submissions.cache_info()
|
||||
assert cache_info_before["size"] == 5
|
||||
|
||||
# Call _clear_submissions_cache
|
||||
routes._clear_submissions_cache(user_id, num_pages=20)
|
||||
|
||||
# All entries should be cleared
|
||||
cache_info_after = routes._get_cached_submissions.cache_info()
|
||||
assert (
|
||||
cache_info_after["size"] == 0
|
||||
), "Cache should be empty after _clear_submissions_cache"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_clear_submissions_cache_detects_page_size_mismatch(self):
|
||||
"""
|
||||
Test that detects if _clear_submissions_cache is using wrong page_size.
|
||||
If this test fails, it means the hardcoded page_size in _clear_submissions_cache
|
||||
doesn't match the default page_size used in the routes.
|
||||
"""
|
||||
from backend.server.v2.store.model import StoreSubmissionsResponse
|
||||
|
||||
mock_response = StoreSubmissionsResponse(
|
||||
submissions=[],
|
||||
pagination=Pagination(
|
||||
total_items=0,
|
||||
total_pages=1,
|
||||
current_page=1,
|
||||
page_size=20,
|
||||
),
|
||||
)
|
||||
|
||||
with patch(
|
||||
"backend.server.v2.store.db.get_store_submissions",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_response,
|
||||
):
|
||||
# Clear cache first
|
||||
routes._get_cached_submissions.cache_clear()
|
||||
|
||||
# WRONG_PAGE_SIZE simulates what happens if someone changes
|
||||
# the default page_size in routes but forgets to update _clear_submissions_cache
|
||||
WRONG_PAGE_SIZE = 25 # Different from the hardcoded value in cache.py
|
||||
user_id = "test_user"
|
||||
|
||||
# Populate cache with the "wrong" page_size
|
||||
for page in range(1, 6):
|
||||
await routes._get_cached_submissions(
|
||||
user_id=user_id, page=page, page_size=WRONG_PAGE_SIZE
|
||||
)
|
||||
|
||||
# Verify cache has entries
|
||||
cache_info_before = routes._get_cached_submissions.cache_info()
|
||||
assert cache_info_before["size"] == 5
|
||||
|
||||
# Call _clear_submissions_cache (which uses page_size=20 hardcoded)
|
||||
routes._clear_submissions_cache(user_id, num_pages=20)
|
||||
|
||||
# If page_size is mismatched, entries won't be cleared
|
||||
cache_info_after = routes._get_cached_submissions.cache_info()
|
||||
|
||||
# This assertion will FAIL if _clear_submissions_cache uses wrong page_size
|
||||
assert (
|
||||
cache_info_after["size"] == 5
|
||||
), "Cache entries with different page_size should NOT be cleared (this is expected)"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_my_agents_cache_needs_clearing_too(self):
|
||||
"""
|
||||
Test that demonstrates _get_cached_my_agents also needs cache clearing.
|
||||
Currently there's no _clear_my_agents_cache function, but there should be.
|
||||
"""
|
||||
from backend.server.v2.store.model import MyAgentsResponse
|
||||
|
||||
mock_response = MyAgentsResponse(
|
||||
agents=[],
|
||||
pagination=Pagination(
|
||||
total_items=0,
|
||||
total_pages=1,
|
||||
current_page=1,
|
||||
page_size=20,
|
||||
),
|
||||
)
|
||||
|
||||
with patch(
|
||||
"backend.server.v2.store.db.get_my_agents",
|
||||
new_callable=AsyncMock,
|
||||
return_value=mock_response,
|
||||
):
|
||||
routes._get_cached_my_agents.cache_clear()
|
||||
|
||||
DEFAULT_PAGE_SIZE = 20
|
||||
user_id = "test_user"
|
||||
|
||||
# Populate cache
|
||||
for page in range(1, 6):
|
||||
await routes._get_cached_my_agents(
|
||||
user_id=user_id, page=page, page_size=DEFAULT_PAGE_SIZE
|
||||
)
|
||||
|
||||
cache_info = routes._get_cached_my_agents.cache_info()
|
||||
assert cache_info["size"] == 5
|
||||
|
||||
# NOTE: Currently there's no _clear_my_agents_cache function
|
||||
# If we implement one, it should clear all pages consistently
|
||||
# For now we document this as a TODO
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Run the tests
|
||||
|
||||
461
autogpt_platform/backend/backend/util/cache.py
Normal file
461
autogpt_platform/backend/backend/util/cache.py
Normal file
@@ -0,0 +1,461 @@
|
||||
"""
|
||||
Caching utilities for the AutoGPT platform.
|
||||
|
||||
Provides decorators for caching function results with support for:
|
||||
- In-memory caching with TTL
|
||||
- Shared Redis-backed caching across processes
|
||||
- Thread-local caching for request-scoped data
|
||||
- Thundering herd protection
|
||||
- LRU eviction with optional TTL refresh
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from dataclasses import dataclass
|
||||
from functools import wraps
|
||||
from typing import Any, Callable, ParamSpec, Protocol, TypeVar, cast, runtime_checkable
|
||||
|
||||
from redis import ConnectionPool, Redis
|
||||
|
||||
from backend.util.retry import conn_retry
|
||||
from backend.util.settings import Settings
|
||||
|
||||
P = ParamSpec("P")
|
||||
R = TypeVar("R")
|
||||
R_co = TypeVar("R_co", covariant=True)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
settings = Settings()
|
||||
|
||||
# RECOMMENDED REDIS CONFIGURATION FOR PRODUCTION:
|
||||
# Configure Redis with the following settings for optimal caching performance:
|
||||
# maxmemory-policy allkeys-lru # Evict least recently used keys when memory limit reached
|
||||
# maxmemory 2gb # Set memory limit (adjust based on your needs)
|
||||
# save "" # Disable persistence if using Redis purely for caching
|
||||
|
||||
# Create a dedicated Redis connection pool for caching (binary mode for pickle)
|
||||
_cache_pool: ConnectionPool | None = None
|
||||
|
||||
|
||||
@conn_retry("Redis", "Acquiring cache connection pool")
|
||||
def _get_cache_pool() -> ConnectionPool:
|
||||
"""Get or create a connection pool for cache operations."""
|
||||
global _cache_pool
|
||||
if _cache_pool is None:
|
||||
_cache_pool = ConnectionPool(
|
||||
host=settings.config.redis_host,
|
||||
port=settings.config.redis_port,
|
||||
password=settings.config.redis_password or None,
|
||||
decode_responses=False, # Binary mode for pickle
|
||||
max_connections=50,
|
||||
socket_keepalive=True,
|
||||
socket_connect_timeout=5,
|
||||
retry_on_timeout=True,
|
||||
)
|
||||
return _cache_pool
|
||||
|
||||
|
||||
def _get_redis_client() -> Redis:
|
||||
"""Get a Redis client from the connection pool."""
|
||||
return Redis(connection_pool=_get_cache_pool())
|
||||
|
||||
|
||||
@dataclass
|
||||
class CachedValue:
|
||||
"""Wrapper for cached values with timestamp to avoid tuple ambiguity."""
|
||||
|
||||
result: Any
|
||||
timestamp: float
|
||||
|
||||
|
||||
def _make_hashable_key(
|
||||
args: tuple[Any, ...], kwargs: dict[str, Any]
|
||||
) -> tuple[Any, ...]:
|
||||
"""
|
||||
Convert args and kwargs into a hashable cache key.
|
||||
|
||||
Handles unhashable types like dict, list, set by converting them to
|
||||
their sorted string representations.
|
||||
"""
|
||||
|
||||
def make_hashable(obj: Any) -> Any:
|
||||
"""Recursively convert an object to a hashable representation."""
|
||||
if isinstance(obj, dict):
|
||||
# Sort dict items to ensure consistent ordering
|
||||
return (
|
||||
"__dict__",
|
||||
tuple(sorted((k, make_hashable(v)) for k, v in obj.items())),
|
||||
)
|
||||
elif isinstance(obj, (list, tuple)):
|
||||
return ("__list__", tuple(make_hashable(item) for item in obj))
|
||||
elif isinstance(obj, set):
|
||||
return ("__set__", tuple(sorted(make_hashable(item) for item in obj)))
|
||||
elif hasattr(obj, "__dict__"):
|
||||
# Handle objects with __dict__ attribute
|
||||
return ("__obj__", obj.__class__.__name__, make_hashable(obj.__dict__))
|
||||
else:
|
||||
# For basic hashable types (str, int, bool, None, etc.)
|
||||
try:
|
||||
hash(obj)
|
||||
return obj
|
||||
except TypeError:
|
||||
# Fallback: convert to string representation
|
||||
return ("__str__", str(obj))
|
||||
|
||||
hashable_args = tuple(make_hashable(arg) for arg in args)
|
||||
hashable_kwargs = tuple(sorted((k, make_hashable(v)) for k, v in kwargs.items()))
|
||||
return (hashable_args, hashable_kwargs)
|
||||
|
||||
|
||||
def _make_redis_key(key: tuple[Any, ...]) -> str:
|
||||
"""Convert a hashable key tuple to a Redis key string."""
|
||||
# Ensure key is already hashable
|
||||
hashable_key = key if isinstance(key, tuple) else (key,)
|
||||
return f"cache:{hash(hashable_key)}"
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class CachedFunction(Protocol[P, R_co]):
|
||||
"""Protocol for cached functions with cache management methods."""
|
||||
|
||||
def cache_clear(self, pattern: str | None = None) -> None:
|
||||
"""Clear cached entries. If pattern provided, clear matching entries only."""
|
||||
return None
|
||||
|
||||
def cache_info(self) -> dict[str, int | None]:
|
||||
"""Get cache statistics."""
|
||||
return {}
|
||||
|
||||
def cache_delete(self, *args: P.args, **kwargs: P.kwargs) -> bool:
|
||||
"""Delete a specific cache entry by its arguments. Returns True if entry existed."""
|
||||
return False
|
||||
|
||||
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R_co:
|
||||
"""Call the cached function."""
|
||||
return None # type: ignore
|
||||
|
||||
|
||||
def cached(
|
||||
*,
|
||||
maxsize: int = 128,
|
||||
ttl_seconds: int,
|
||||
shared_cache: bool = False,
|
||||
refresh_ttl_on_get: bool = False,
|
||||
) -> Callable[[Callable], CachedFunction]:
|
||||
"""
|
||||
Thundering herd safe cache decorator for both sync and async functions.
|
||||
|
||||
Uses double-checked locking to prevent multiple threads/coroutines from
|
||||
executing the expensive operation simultaneously during cache misses.
|
||||
|
||||
Args:
|
||||
maxsize: Maximum number of cached entries (only for in-memory cache)
|
||||
ttl_seconds: Time to live in seconds. Required - entries must expire.
|
||||
shared_cache: If True, use Redis for cross-process caching
|
||||
refresh_ttl_on_get: If True, refresh TTL when cache entry is accessed (LRU behavior)
|
||||
|
||||
Returns:
|
||||
Decorated function with caching capabilities
|
||||
|
||||
Example:
|
||||
@cached(ttl_seconds=300) # 5 minute TTL
|
||||
def expensive_sync_operation(param: str) -> dict:
|
||||
return {"result": param}
|
||||
|
||||
@cached(ttl_seconds=600, shared_cache=True, refresh_ttl_on_get=True)
|
||||
async def expensive_async_operation(param: str) -> dict:
|
||||
return {"result": param}
|
||||
"""
|
||||
|
||||
def decorator(target_func):
|
||||
cache_storage: dict[tuple, CachedValue] = {}
|
||||
_event_loop_locks: dict[Any, asyncio.Lock] = {}
|
||||
|
||||
def _get_from_redis(redis_key: str) -> Any | None:
|
||||
"""Get value from Redis, optionally refreshing TTL."""
|
||||
try:
|
||||
import pickle
|
||||
|
||||
redis = _get_redis_client()
|
||||
if refresh_ttl_on_get:
|
||||
# Use GETEX to get value and refresh expiry atomically
|
||||
cached_bytes = redis.getex(redis_key, ex=ttl_seconds)
|
||||
else:
|
||||
cached_bytes = redis.get(redis_key)
|
||||
|
||||
if cached_bytes and isinstance(cached_bytes, bytes):
|
||||
return pickle.loads(cached_bytes)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Redis error during cache check for {target_func.__name__}: {e}"
|
||||
)
|
||||
return None
|
||||
|
||||
def _set_to_redis(redis_key: str, value: Any) -> None:
|
||||
"""Set value in Redis with TTL."""
|
||||
try:
|
||||
import pickle
|
||||
|
||||
redis = _get_redis_client()
|
||||
pickled_value = pickle.dumps(value, protocol=pickle.HIGHEST_PROTOCOL)
|
||||
redis.setex(redis_key, ttl_seconds, pickled_value)
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Redis error storing cache for {target_func.__name__}: {e}"
|
||||
)
|
||||
|
||||
def _get_from_memory(key: tuple) -> Any | None:
|
||||
"""Get value from in-memory cache, checking TTL."""
|
||||
if key in cache_storage:
|
||||
cached_data = cache_storage[key]
|
||||
if time.time() - cached_data.timestamp < ttl_seconds:
|
||||
logger.debug(
|
||||
f"Cache hit for {target_func.__name__} args: {key[0]} kwargs: {key[1]}"
|
||||
)
|
||||
return cached_data.result
|
||||
return None
|
||||
|
||||
def _set_to_memory(key: tuple, value: Any) -> None:
|
||||
"""Set value in in-memory cache with timestamp."""
|
||||
cache_storage[key] = CachedValue(result=value, timestamp=time.time())
|
||||
|
||||
# Cleanup if needed
|
||||
if len(cache_storage) > maxsize:
|
||||
cutoff = maxsize // 2
|
||||
oldest_keys = list(cache_storage.keys())[:-cutoff] if cutoff > 0 else []
|
||||
for old_key in oldest_keys:
|
||||
cache_storage.pop(old_key, None)
|
||||
|
||||
if inspect.iscoroutinefunction(target_func):
|
||||
|
||||
def _get_cache_lock():
|
||||
"""Get or create an asyncio.Lock for the current event loop."""
|
||||
try:
|
||||
loop = asyncio.get_running_loop()
|
||||
except RuntimeError:
|
||||
loop = None
|
||||
|
||||
if loop not in _event_loop_locks:
|
||||
_event_loop_locks[loop] = asyncio.Lock()
|
||||
return _event_loop_locks[loop]
|
||||
|
||||
@wraps(target_func)
|
||||
async def async_wrapper(*args: P.args, **kwargs: P.kwargs):
|
||||
key = _make_hashable_key(args, kwargs)
|
||||
redis_key = _make_redis_key(key) if shared_cache else ""
|
||||
|
||||
# Fast path: check cache without lock
|
||||
if shared_cache:
|
||||
result = _get_from_redis(redis_key)
|
||||
if result is not None:
|
||||
return result
|
||||
else:
|
||||
result = _get_from_memory(key)
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
# Slow path: acquire lock for cache miss/expiry
|
||||
async with _get_cache_lock():
|
||||
# Double-check: another coroutine might have populated cache
|
||||
if shared_cache:
|
||||
result = _get_from_redis(redis_key)
|
||||
if result is not None:
|
||||
return result
|
||||
else:
|
||||
result = _get_from_memory(key)
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
# Cache miss - execute function
|
||||
logger.debug(f"Cache miss for {target_func.__name__}")
|
||||
result = await target_func(*args, **kwargs)
|
||||
|
||||
# Store result
|
||||
if shared_cache:
|
||||
_set_to_redis(redis_key, result)
|
||||
else:
|
||||
_set_to_memory(key, result)
|
||||
|
||||
return result
|
||||
|
||||
wrapper = async_wrapper
|
||||
|
||||
else:
|
||||
# Sync function with threading.Lock
|
||||
cache_lock = threading.Lock()
|
||||
|
||||
@wraps(target_func)
|
||||
def sync_wrapper(*args: P.args, **kwargs: P.kwargs):
|
||||
key = _make_hashable_key(args, kwargs)
|
||||
redis_key = _make_redis_key(key) if shared_cache else ""
|
||||
|
||||
# Fast path: check cache without lock
|
||||
if shared_cache:
|
||||
result = _get_from_redis(redis_key)
|
||||
if result is not None:
|
||||
return result
|
||||
else:
|
||||
result = _get_from_memory(key)
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
# Slow path: acquire lock for cache miss/expiry
|
||||
with cache_lock:
|
||||
# Double-check: another thread might have populated cache
|
||||
if shared_cache:
|
||||
result = _get_from_redis(redis_key)
|
||||
if result is not None:
|
||||
return result
|
||||
else:
|
||||
result = _get_from_memory(key)
|
||||
if result is not None:
|
||||
return result
|
||||
|
||||
# Cache miss - execute function
|
||||
logger.debug(f"Cache miss for {target_func.__name__}")
|
||||
result = target_func(*args, **kwargs)
|
||||
|
||||
# Store result
|
||||
if shared_cache:
|
||||
_set_to_redis(redis_key, result)
|
||||
else:
|
||||
_set_to_memory(key, result)
|
||||
|
||||
return result
|
||||
|
||||
wrapper = sync_wrapper
|
||||
|
||||
# Add cache management methods
|
||||
def cache_clear(pattern: str | None = None) -> None:
|
||||
"""Clear cache entries. If pattern provided, clear matching entries."""
|
||||
if shared_cache:
|
||||
redis = _get_redis_client()
|
||||
if pattern:
|
||||
# Clear entries matching pattern
|
||||
keys = list(redis.scan_iter(f"cache:{pattern}", count=100))
|
||||
else:
|
||||
# Clear all cache keys
|
||||
keys = list(redis.scan_iter("cache:*", count=100))
|
||||
|
||||
if keys:
|
||||
pipeline = redis.pipeline()
|
||||
for key in keys:
|
||||
pipeline.delete(key)
|
||||
pipeline.execute()
|
||||
else:
|
||||
if pattern:
|
||||
# For in-memory cache, pattern matching not supported
|
||||
logger.warning(
|
||||
"Pattern-based clearing not supported for in-memory cache"
|
||||
)
|
||||
else:
|
||||
cache_storage.clear()
|
||||
|
||||
def cache_info() -> dict[str, int | None]:
|
||||
if shared_cache:
|
||||
redis = _get_redis_client()
|
||||
cache_keys = list(redis.scan_iter("cache:*"))
|
||||
return {
|
||||
"size": len(cache_keys),
|
||||
"maxsize": None, # Redis manages its own size
|
||||
"ttl_seconds": ttl_seconds,
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"size": len(cache_storage),
|
||||
"maxsize": maxsize,
|
||||
"ttl_seconds": ttl_seconds,
|
||||
}
|
||||
|
||||
def cache_delete(*args, **kwargs) -> bool:
|
||||
"""Delete a specific cache entry. Returns True if entry existed."""
|
||||
key = _make_hashable_key(args, kwargs)
|
||||
if shared_cache:
|
||||
redis = _get_redis_client()
|
||||
redis_key = _make_redis_key(key)
|
||||
if redis.exists(redis_key):
|
||||
redis.delete(redis_key)
|
||||
return True
|
||||
return False
|
||||
else:
|
||||
if key in cache_storage:
|
||||
del cache_storage[key]
|
||||
return True
|
||||
return False
|
||||
|
||||
setattr(wrapper, "cache_clear", cache_clear)
|
||||
setattr(wrapper, "cache_info", cache_info)
|
||||
setattr(wrapper, "cache_delete", cache_delete)
|
||||
|
||||
return cast(CachedFunction, wrapper)
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def thread_cached(func):
|
||||
"""
|
||||
Thread-local cache decorator for both sync and async functions.
|
||||
|
||||
Each thread gets its own cache, which is useful for request-scoped caching
|
||||
in web applications where you want to cache within a single request but
|
||||
not across requests.
|
||||
|
||||
Args:
|
||||
func: The function to cache
|
||||
|
||||
Returns:
|
||||
Decorated function with thread-local caching
|
||||
|
||||
Example:
|
||||
@thread_cached
|
||||
def expensive_operation(param: str) -> dict:
|
||||
return {"result": param}
|
||||
|
||||
@thread_cached # Works with async too
|
||||
async def expensive_async_operation(param: str) -> dict:
|
||||
return {"result": param}
|
||||
"""
|
||||
thread_local = threading.local()
|
||||
|
||||
def _clear():
|
||||
if hasattr(thread_local, "cache"):
|
||||
del thread_local.cache
|
||||
|
||||
if inspect.iscoroutinefunction(func):
|
||||
|
||||
@wraps(func)
|
||||
async def async_wrapper(*args, **kwargs):
|
||||
cache = getattr(thread_local, "cache", None)
|
||||
if cache is None:
|
||||
cache = thread_local.cache = {}
|
||||
key = _make_hashable_key(args, kwargs)
|
||||
if key not in cache:
|
||||
cache[key] = await func(*args, **kwargs)
|
||||
return cache[key]
|
||||
|
||||
setattr(async_wrapper, "clear_cache", _clear)
|
||||
return async_wrapper
|
||||
|
||||
else:
|
||||
|
||||
@wraps(func)
|
||||
def sync_wrapper(*args, **kwargs):
|
||||
cache = getattr(thread_local, "cache", None)
|
||||
if cache is None:
|
||||
cache = thread_local.cache = {}
|
||||
key = _make_hashable_key(args, kwargs)
|
||||
if key not in cache:
|
||||
cache[key] = func(*args, **kwargs)
|
||||
return cache[key]
|
||||
|
||||
setattr(sync_wrapper, "clear_cache", _clear)
|
||||
return sync_wrapper
|
||||
|
||||
|
||||
def clear_thread_cache(func: Callable) -> None:
|
||||
"""Clear thread-local cache for a function."""
|
||||
if clear := getattr(func, "clear_cache", None):
|
||||
clear()
|
||||
@@ -4,8 +4,7 @@ Centralized service client helpers with thread caching.
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from autogpt_libs.utils.cache import cached, thread_cached
|
||||
|
||||
from backend.util.cache import cached, thread_cached
|
||||
from backend.util.settings import Settings
|
||||
|
||||
settings = Settings()
|
||||
@@ -118,7 +117,7 @@ def get_integration_credentials_store() -> "IntegrationCredentialsStore":
|
||||
# ============ Supabase Clients ============ #
|
||||
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=3600)
|
||||
def get_supabase() -> "Client":
|
||||
"""Get a process-cached synchronous Supabase client instance."""
|
||||
from supabase import create_client
|
||||
@@ -128,7 +127,7 @@ def get_supabase() -> "Client":
|
||||
)
|
||||
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=3600)
|
||||
async def get_async_supabase() -> "AClient":
|
||||
"""Get a process-cached asynchronous Supabase client instance."""
|
||||
from supabase import create_async_client
|
||||
|
||||
@@ -5,12 +5,12 @@ from functools import wraps
|
||||
from typing import Any, Awaitable, Callable, TypeVar
|
||||
|
||||
import ldclient
|
||||
from autogpt_libs.utils.cache import cached
|
||||
from fastapi import HTTPException
|
||||
from ldclient import Context, LDClient
|
||||
from ldclient.config import Config
|
||||
from typing_extensions import ParamSpec
|
||||
|
||||
from backend.util.cache import cached
|
||||
from backend.util.settings import Settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -258,6 +258,19 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
|
||||
description="The vhost for the RabbitMQ server",
|
||||
)
|
||||
|
||||
redis_host: str = Field(
|
||||
default="localhost",
|
||||
description="The host for the Redis server",
|
||||
)
|
||||
redis_port: int = Field(
|
||||
default=6379,
|
||||
description="The port for the Redis server",
|
||||
)
|
||||
redis_password: str = Field(
|
||||
default="",
|
||||
description="The password for the Redis server (empty string if no password)",
|
||||
)
|
||||
|
||||
postmark_sender_email: str = Field(
|
||||
default="invalid@invalid.com",
|
||||
description="The email address to use for sending emails",
|
||||
|
||||
1
autogpt_platform/backend/poetry.lock
generated
1
autogpt_platform/backend/poetry.lock
generated
@@ -413,6 +413,7 @@ pydantic-settings = "^2.10.1"
|
||||
pyjwt = {version = "^2.10.1", extras = ["crypto"]}
|
||||
redis = "^6.2.0"
|
||||
supabase = "^2.16.0"
|
||||
tenacity = "^9.1.2"
|
||||
uvicorn = "^0.35.0"
|
||||
|
||||
[package.source]
|
||||
|
||||
@@ -12,11 +12,11 @@ import asyncio
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from unittest.mock import Mock
|
||||
from unittest.mock import Mock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from autogpt_libs.utils.cache import cached, clear_thread_cache, thread_cached
|
||||
from backend.util.cache import cached, clear_thread_cache, thread_cached
|
||||
|
||||
|
||||
class TestThreadCached:
|
||||
@@ -332,7 +332,7 @@ class TestCache:
|
||||
"""Test basic sync caching functionality."""
|
||||
call_count = 0
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=300)
|
||||
def expensive_sync_function(x: int, y: int = 0) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
@@ -358,7 +358,7 @@ class TestCache:
|
||||
"""Test basic async caching functionality."""
|
||||
call_count = 0
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=300)
|
||||
async def expensive_async_function(x: int, y: int = 0) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
@@ -385,7 +385,7 @@ class TestCache:
|
||||
call_count = 0
|
||||
results = []
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=300)
|
||||
def slow_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
@@ -412,7 +412,7 @@ class TestCache:
|
||||
"""Test that concurrent async calls don't cause thundering herd."""
|
||||
call_count = 0
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=300)
|
||||
async def slow_async_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
@@ -508,7 +508,7 @@ class TestCache:
|
||||
"""Test cache clearing functionality."""
|
||||
call_count = 0
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=300)
|
||||
def clearable_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
@@ -537,7 +537,7 @@ class TestCache:
|
||||
"""Test cache clearing functionality with async function."""
|
||||
call_count = 0
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=300)
|
||||
async def async_clearable_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
@@ -567,7 +567,7 @@ class TestCache:
|
||||
"""Test that cached async functions return actual results, not coroutines."""
|
||||
call_count = 0
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=300)
|
||||
async def async_result_function(x: int) -> str:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
@@ -593,7 +593,7 @@ class TestCache:
|
||||
"""Test selective cache deletion functionality."""
|
||||
call_count = 0
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=300)
|
||||
def deletable_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
@@ -636,7 +636,7 @@ class TestCache:
|
||||
"""Test selective cache deletion functionality with async function."""
|
||||
call_count = 0
|
||||
|
||||
@cached()
|
||||
@cached(ttl_seconds=300)
|
||||
async def async_deletable_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
@@ -674,3 +674,333 @@ class TestCache:
|
||||
# Try to delete non-existent entry
|
||||
was_deleted = async_deletable_function.cache_delete(99)
|
||||
assert was_deleted is False
|
||||
|
||||
|
||||
class TestSharedCache:
|
||||
"""Tests for shared_cache functionality using Redis."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup_redis_mock(self):
|
||||
"""Mock Redis client for testing."""
|
||||
with patch("backend.util.cache._get_redis_client") as mock_redis_func:
|
||||
# Configure mock to behave like Redis
|
||||
mock_redis = Mock()
|
||||
self.mock_redis = mock_redis
|
||||
self.redis_storage = {}
|
||||
|
||||
def mock_get(key):
|
||||
return self.redis_storage.get(key)
|
||||
|
||||
def mock_getex(key, ex=None):
|
||||
# GETEX returns value and optionally refreshes TTL
|
||||
return self.redis_storage.get(key)
|
||||
|
||||
def mock_set(key, value):
|
||||
self.redis_storage[key] = value
|
||||
return True
|
||||
|
||||
def mock_setex(key, ttl, value):
|
||||
self.redis_storage[key] = value
|
||||
return True
|
||||
|
||||
def mock_exists(key):
|
||||
return 1 if key in self.redis_storage else 0
|
||||
|
||||
def mock_delete(key):
|
||||
if key in self.redis_storage:
|
||||
del self.redis_storage[key]
|
||||
return 1
|
||||
return 0
|
||||
|
||||
def mock_scan_iter(pattern, count=None):
|
||||
# Pattern is a string like "cache:*", keys in storage are strings
|
||||
prefix = pattern.rstrip("*")
|
||||
return [
|
||||
k
|
||||
for k in self.redis_storage.keys()
|
||||
if isinstance(k, str) and k.startswith(prefix)
|
||||
]
|
||||
|
||||
def mock_pipeline():
|
||||
pipe = Mock()
|
||||
deleted_keys = []
|
||||
|
||||
def pipe_delete(key):
|
||||
deleted_keys.append(key)
|
||||
return pipe
|
||||
|
||||
def pipe_execute():
|
||||
# Actually delete the keys when pipeline executes
|
||||
for key in deleted_keys:
|
||||
self.redis_storage.pop(key, None)
|
||||
deleted_keys.clear()
|
||||
return []
|
||||
|
||||
pipe.delete = Mock(side_effect=pipe_delete)
|
||||
pipe.execute = Mock(side_effect=pipe_execute)
|
||||
return pipe
|
||||
|
||||
mock_redis.get = Mock(side_effect=mock_get)
|
||||
mock_redis.getex = Mock(side_effect=mock_getex)
|
||||
mock_redis.set = Mock(side_effect=mock_set)
|
||||
mock_redis.setex = Mock(side_effect=mock_setex)
|
||||
mock_redis.exists = Mock(side_effect=mock_exists)
|
||||
mock_redis.delete = Mock(side_effect=mock_delete)
|
||||
mock_redis.scan_iter = Mock(side_effect=mock_scan_iter)
|
||||
mock_redis.pipeline = Mock(side_effect=mock_pipeline)
|
||||
|
||||
# Make _get_redis_client return the mock
|
||||
mock_redis_func.return_value = mock_redis
|
||||
|
||||
yield mock_redis
|
||||
|
||||
# Cleanup
|
||||
self.redis_storage.clear()
|
||||
|
||||
def test_sync_shared_cache_basic(self):
|
||||
"""Test basic shared cache functionality with sync function."""
|
||||
call_count = 0
|
||||
|
||||
@cached(shared_cache=True, ttl_seconds=300)
|
||||
def shared_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
return x * 10
|
||||
|
||||
# First call - should miss cache
|
||||
result1 = shared_function(5)
|
||||
assert result1 == 50
|
||||
assert call_count == 1
|
||||
assert self.mock_redis.get.called
|
||||
assert self.mock_redis.setex.called # setex is used for TTL
|
||||
|
||||
# Second call - should hit cache
|
||||
result2 = shared_function(5)
|
||||
assert result2 == 50
|
||||
assert call_count == 1 # Function not called again
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_shared_cache_basic(self):
|
||||
"""Test basic shared cache functionality with async function."""
|
||||
call_count = 0
|
||||
|
||||
@cached(shared_cache=True, ttl_seconds=300)
|
||||
async def async_shared_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
await asyncio.sleep(0.01)
|
||||
return x * 20
|
||||
|
||||
# First call - should miss cache
|
||||
result1 = await async_shared_function(3)
|
||||
assert result1 == 60
|
||||
assert call_count == 1
|
||||
assert self.mock_redis.get.called
|
||||
assert self.mock_redis.setex.called # setex is used for TTL
|
||||
|
||||
# Second call - should hit cache
|
||||
result2 = await async_shared_function(3)
|
||||
assert result2 == 60
|
||||
assert call_count == 1 # Function not called again
|
||||
|
||||
def test_sync_shared_cache_with_ttl(self):
|
||||
"""Test shared cache with TTL using sync function."""
|
||||
call_count = 0
|
||||
|
||||
@cached(shared_cache=True, ttl_seconds=60)
|
||||
def shared_ttl_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
return x * 30
|
||||
|
||||
# First call
|
||||
result1 = shared_ttl_function(2)
|
||||
assert result1 == 60
|
||||
assert call_count == 1
|
||||
assert self.mock_redis.setex.called
|
||||
|
||||
# Second call - should use cache
|
||||
result2 = shared_ttl_function(2)
|
||||
assert result2 == 60
|
||||
assert call_count == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_shared_cache_with_ttl(self):
|
||||
"""Test shared cache with TTL using async function."""
|
||||
call_count = 0
|
||||
|
||||
@cached(shared_cache=True, ttl_seconds=120)
|
||||
async def async_shared_ttl_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
await asyncio.sleep(0.01)
|
||||
return x * 40
|
||||
|
||||
# First call
|
||||
result1 = await async_shared_ttl_function(4)
|
||||
assert result1 == 160
|
||||
assert call_count == 1
|
||||
assert self.mock_redis.setex.called
|
||||
|
||||
# Second call - should use cache
|
||||
result2 = await async_shared_ttl_function(4)
|
||||
assert result2 == 160
|
||||
assert call_count == 1
|
||||
|
||||
def test_shared_cache_clear(self):
|
||||
"""Test clearing shared cache."""
|
||||
call_count = 0
|
||||
|
||||
@cached(shared_cache=True, ttl_seconds=300)
|
||||
def clearable_shared_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
return x * 50
|
||||
|
||||
# First call
|
||||
result1 = clearable_shared_function(1)
|
||||
assert result1 == 50
|
||||
assert call_count == 1
|
||||
|
||||
# Second call - should use cache
|
||||
result2 = clearable_shared_function(1)
|
||||
assert result2 == 50
|
||||
assert call_count == 1
|
||||
|
||||
# Clear cache
|
||||
clearable_shared_function.cache_clear()
|
||||
assert self.mock_redis.pipeline.called
|
||||
|
||||
# Third call - should execute function again
|
||||
result3 = clearable_shared_function(1)
|
||||
assert result3 == 50
|
||||
assert call_count == 2
|
||||
|
||||
def test_shared_cache_delete(self):
|
||||
"""Test deleting specific shared cache entry."""
|
||||
call_count = 0
|
||||
|
||||
@cached(shared_cache=True, ttl_seconds=300)
|
||||
def deletable_shared_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
return x * 60
|
||||
|
||||
# First call for x=1
|
||||
result1 = deletable_shared_function(1)
|
||||
assert result1 == 60
|
||||
assert call_count == 1
|
||||
|
||||
# First call for x=2
|
||||
result2 = deletable_shared_function(2)
|
||||
assert result2 == 120
|
||||
assert call_count == 2
|
||||
|
||||
# Delete entry for x=1
|
||||
was_deleted = deletable_shared_function.cache_delete(1)
|
||||
assert was_deleted is True
|
||||
|
||||
# Call with x=1 should execute function again
|
||||
result3 = deletable_shared_function(1)
|
||||
assert result3 == 60
|
||||
assert call_count == 3
|
||||
|
||||
# Call with x=2 should still use cache
|
||||
result4 = deletable_shared_function(2)
|
||||
assert result4 == 120
|
||||
assert call_count == 3
|
||||
|
||||
def test_shared_cache_error_handling(self):
|
||||
"""Test that Redis errors are handled gracefully."""
|
||||
call_count = 0
|
||||
|
||||
@cached(shared_cache=True, ttl_seconds=300)
|
||||
def error_prone_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
return x * 70
|
||||
|
||||
# Simulate Redis error
|
||||
self.mock_redis.get.side_effect = Exception("Redis connection error")
|
||||
|
||||
# Function should still work
|
||||
result = error_prone_function(1)
|
||||
assert result == 70
|
||||
assert call_count == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_shared_cache_error_handling(self):
|
||||
"""Test that Redis errors are handled gracefully in async functions."""
|
||||
call_count = 0
|
||||
|
||||
@cached(shared_cache=True, ttl_seconds=300)
|
||||
async def async_error_prone_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
await asyncio.sleep(0.01)
|
||||
return x * 80
|
||||
|
||||
# Simulate Redis error
|
||||
self.mock_redis.get.side_effect = Exception("Redis connection error")
|
||||
|
||||
# Function should still work
|
||||
result = await async_error_prone_function(1)
|
||||
assert result == 80
|
||||
assert call_count == 1
|
||||
|
||||
def test_shared_cache_with_complex_types(self):
|
||||
"""Test shared cache with complex return types (lists, dicts)."""
|
||||
call_count = 0
|
||||
|
||||
@cached(shared_cache=True, ttl_seconds=300)
|
||||
def complex_return_function(x: int) -> dict:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
return {"value": x, "squared": x * x, "list": [1, 2, 3]}
|
||||
|
||||
# First call
|
||||
result1 = complex_return_function(5)
|
||||
assert result1 == {"value": 5, "squared": 25, "list": [1, 2, 3]}
|
||||
assert call_count == 1
|
||||
|
||||
# Second call - should use cache
|
||||
result2 = complex_return_function(5)
|
||||
assert result2 == {"value": 5, "squared": 25, "list": [1, 2, 3]}
|
||||
assert call_count == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_async_thundering_herd_shared_cache(self):
|
||||
"""Test thundering herd protection with shared cache."""
|
||||
call_count = 0
|
||||
|
||||
@cached(shared_cache=True, ttl_seconds=300)
|
||||
async def slow_shared_function(x: int) -> int:
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
await asyncio.sleep(0.1)
|
||||
return x * x
|
||||
|
||||
# Launch concurrent coroutines
|
||||
tasks = [slow_shared_function(9) for _ in range(5)]
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
# All results should be the same
|
||||
assert all(result == 81 for result in results)
|
||||
# Only one coroutine should have executed the function
|
||||
assert call_count == 1
|
||||
|
||||
def test_shared_cache_info(self):
|
||||
"""Test cache_info with shared cache."""
|
||||
|
||||
@cached(shared_cache=True, maxsize=100, ttl_seconds=300)
|
||||
def info_function(x: int) -> int:
|
||||
return x * 90
|
||||
|
||||
# Call the function to populate cache
|
||||
info_function(1)
|
||||
|
||||
# Get cache info
|
||||
info = info_function.cache_info()
|
||||
assert "size" in info
|
||||
assert info["maxsize"] is None # Redis manages its own size
|
||||
assert info["ttl_seconds"] == 300
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
import { useGetV2ListLibraryAgentsInfinite } from "@/app/api/__generated__/endpoints/library/library";
|
||||
import { LibraryAgentResponse } from "@/app/api/__generated__/models/libraryAgentResponse";
|
||||
import { LIBRARY_AGENTS_PAGE_SIZE } from "@/lib/pagination-config";
|
||||
import { useLibraryPageContext } from "../state-provider";
|
||||
|
||||
export const useLibraryAgentList = () => {
|
||||
@@ -15,7 +16,7 @@ export const useLibraryAgentList = () => {
|
||||
} = useGetV2ListLibraryAgentsInfinite(
|
||||
{
|
||||
page: 1,
|
||||
page_size: 8,
|
||||
page_size: LIBRARY_AGENTS_PAGE_SIZE,
|
||||
search_term: searchTerm || undefined,
|
||||
sort_by: librarySort,
|
||||
},
|
||||
|
||||
@@ -4,6 +4,7 @@ import {
|
||||
} from "@/app/api/__generated__/endpoints/store/store";
|
||||
import { StoreAgentsResponse } from "@/app/api/__generated__/models/storeAgentsResponse";
|
||||
import { CreatorsResponse } from "@/app/api/__generated__/models/creatorsResponse";
|
||||
import { LARGE_PAGE_SIZE } from "@/lib/pagination-config";
|
||||
|
||||
const queryConfig = {
|
||||
staleTime: 60 * 1000, // 60 seconds - match server cache
|
||||
@@ -37,7 +38,7 @@ export const useMainMarketplacePage = () => {
|
||||
} = useGetV2ListStoreAgents(
|
||||
{
|
||||
sorted_by: "runs",
|
||||
page_size: 1000,
|
||||
page_size: LARGE_PAGE_SIZE,
|
||||
},
|
||||
{
|
||||
query: {
|
||||
|
||||
@@ -38,7 +38,7 @@ export function useAgentActivityDropdown() {
|
||||
data: agents,
|
||||
isSuccess: agentsSuccess,
|
||||
error: agentsError,
|
||||
} = useGetV2ListLibraryAgents();
|
||||
} = useGetV2ListLibraryAgents({ page_size: 10 });
|
||||
|
||||
const {
|
||||
data: executions,
|
||||
|
||||
65
autogpt_platform/frontend/src/lib/pagination-config.ts
Normal file
65
autogpt_platform/frontend/src/lib/pagination-config.ts
Normal file
@@ -0,0 +1,65 @@
|
||||
/**
|
||||
* Shared pagination configuration constants.
|
||||
*
|
||||
* These values MUST match the backend's cache_config.py to ensure
|
||||
* proper cache invalidation when data is mutated.
|
||||
*
|
||||
* CRITICAL: If you change any of these values:
|
||||
* 1. Update backend/server/cache_config.py
|
||||
* 2. Update cache invalidation logic
|
||||
* 3. Run tests to ensure consistency
|
||||
*/
|
||||
|
||||
/**
|
||||
* Default page size for store agents listing
|
||||
* Backend: V2_STORE_AGENTS_PAGE_SIZE
|
||||
*/
|
||||
export const STORE_AGENTS_PAGE_SIZE = 20;
|
||||
|
||||
/**
|
||||
* Default page size for store creators listing
|
||||
* Backend: V2_STORE_CREATORS_PAGE_SIZE
|
||||
*/
|
||||
export const STORE_CREATORS_PAGE_SIZE = 20;
|
||||
|
||||
/**
|
||||
* Default page size for user submissions listing
|
||||
* Backend: V2_STORE_SUBMISSIONS_PAGE_SIZE
|
||||
*/
|
||||
export const STORE_SUBMISSIONS_PAGE_SIZE = 20;
|
||||
|
||||
/**
|
||||
* Default page size for user's own agents listing
|
||||
* Backend: V2_MY_AGENTS_PAGE_SIZE
|
||||
*/
|
||||
export const MY_AGENTS_PAGE_SIZE = 20;
|
||||
|
||||
/**
|
||||
* Default page size for library agents listing
|
||||
* Backend: V2_LIBRARY_AGENTS_PAGE_SIZE
|
||||
*/
|
||||
export const LIBRARY_AGENTS_PAGE_SIZE = 10;
|
||||
|
||||
/**
|
||||
* Default page size for library presets listing
|
||||
* Backend: V2_LIBRARY_PRESETS_PAGE_SIZE
|
||||
*/
|
||||
export const LIBRARY_PRESETS_PAGE_SIZE = 20;
|
||||
|
||||
/**
|
||||
* Default page size for agent runs/executions
|
||||
* Backend: V1_GRAPH_EXECUTIONS_PAGE_SIZE (note: this is from v1 API)
|
||||
*/
|
||||
export const AGENT_RUNS_PAGE_SIZE = 20;
|
||||
|
||||
/**
|
||||
* Large page size for fetching "all" items (marketplace top agents)
|
||||
* Used when we want to fetch a comprehensive list without pagination UI
|
||||
*/
|
||||
export const LARGE_PAGE_SIZE = 1000;
|
||||
|
||||
/**
|
||||
* Very large page size for specific use cases
|
||||
* Used in agent runs view for comprehensive listing
|
||||
*/
|
||||
export const EXTRA_LARGE_PAGE_SIZE = 100;
|
||||
195
autogpt_platform/frontend/src/tests/cache-invalidation.test.ts
Normal file
195
autogpt_platform/frontend/src/tests/cache-invalidation.test.ts
Normal file
@@ -0,0 +1,195 @@
|
||||
/**
|
||||
* Test suite for cache invalidation consistency.
|
||||
*
|
||||
* These tests ensure that when we invalidate query caches, the parameters
|
||||
* used match what the backend expects. If the default page_size changes
|
||||
* in the backend API, these tests will catch mismatches.
|
||||
*
|
||||
* NOTE: These are unit tests for cache key generation.
|
||||
* They use Playwright's test framework but don't require a browser.
|
||||
*/
|
||||
|
||||
import { test, expect } from "@playwright/test";
|
||||
import { getGetV2ListMySubmissionsQueryKey } from "@/app/api/__generated__/endpoints/store/store";
|
||||
import * as PaginationConfig from "@/lib/pagination-config";
|
||||
|
||||
test.describe("Cache Invalidation Tests", () => {
|
||||
test.describe("getGetV2ListMySubmissionsQueryKey", () => {
|
||||
test("should generate correct query key without params", () => {
|
||||
const key = getGetV2ListMySubmissionsQueryKey();
|
||||
expect(key).toEqual(["/api/store/submissions"]);
|
||||
});
|
||||
|
||||
test("should generate correct query key with params", () => {
|
||||
const key = getGetV2ListMySubmissionsQueryKey({
|
||||
page: 1,
|
||||
page_size: 20,
|
||||
});
|
||||
expect(key).toEqual([
|
||||
"/api/store/submissions",
|
||||
{ page: 1, page_size: 20 },
|
||||
]);
|
||||
});
|
||||
|
||||
test("should generate different keys for different page_size values", () => {
|
||||
const key1 = getGetV2ListMySubmissionsQueryKey({
|
||||
page: 1,
|
||||
page_size: 20,
|
||||
});
|
||||
const key2 = getGetV2ListMySubmissionsQueryKey({
|
||||
page: 1,
|
||||
page_size: 25,
|
||||
});
|
||||
|
||||
expect(key1).not.toEqual(key2);
|
||||
});
|
||||
});
|
||||
|
||||
test.describe("Cache invalidation page_size consistency", () => {
|
||||
/**
|
||||
* This test documents the current default page_size used in the backend.
|
||||
* If this test fails, it means:
|
||||
* 1. The backend default page_size has changed, OR
|
||||
* 2. The frontend is using a different page_size than the backend
|
||||
*
|
||||
* When invalidating queries without params, we're invalidating ALL
|
||||
* submissions queries regardless of page_size. This is correct behavior.
|
||||
*/
|
||||
test("should use page_size matching backend default when invalidating specific pages", () => {
|
||||
// Use the shared constant that matches backend's cache_config.V2_STORE_SUBMISSIONS_PAGE_SIZE
|
||||
const BACKEND_DEFAULT_PAGE_SIZE =
|
||||
PaginationConfig.STORE_SUBMISSIONS_PAGE_SIZE;
|
||||
|
||||
// When we call invalidateQueries without params, it invalidates all variations
|
||||
const invalidateAllKey = getGetV2ListMySubmissionsQueryKey();
|
||||
expect(invalidateAllKey).toEqual(["/api/store/submissions"]);
|
||||
|
||||
// When we call invalidateQueries with specific params, it should match backend
|
||||
const invalidateSpecificKey = getGetV2ListMySubmissionsQueryKey({
|
||||
page: 1,
|
||||
page_size: BACKEND_DEFAULT_PAGE_SIZE,
|
||||
});
|
||||
expect(invalidateSpecificKey).toEqual([
|
||||
"/api/store/submissions",
|
||||
{ page: 1, page_size: PaginationConfig.STORE_SUBMISSIONS_PAGE_SIZE },
|
||||
]);
|
||||
});
|
||||
|
||||
/**
|
||||
* This test verifies that invalidating without parameters will match
|
||||
* all cached queries regardless of their page_size.
|
||||
* This is the behavior when calling:
|
||||
* queryClient.invalidateQueries({ queryKey: getGetV2ListMySubmissionsQueryKey() })
|
||||
*/
|
||||
test("should invalidate all submissions when using base key", () => {
|
||||
const baseKey = getGetV2ListMySubmissionsQueryKey();
|
||||
|
||||
// These are examples of keys that would be cached
|
||||
const cachedKey1 = getGetV2ListMySubmissionsQueryKey({
|
||||
page: 1,
|
||||
page_size: 20,
|
||||
});
|
||||
const cachedKey2 = getGetV2ListMySubmissionsQueryKey({
|
||||
page: 2,
|
||||
page_size: 20,
|
||||
});
|
||||
const cachedKey3 = getGetV2ListMySubmissionsQueryKey({
|
||||
page: 1,
|
||||
page_size: 25,
|
||||
});
|
||||
|
||||
// Base key should be a prefix of all cached keys
|
||||
expect(cachedKey1[0]).toBe(baseKey[0]);
|
||||
expect(cachedKey2[0]).toBe(baseKey[0]);
|
||||
expect(cachedKey3[0]).toBe(baseKey[0]);
|
||||
|
||||
// This confirms that invalidating with base key will match all variations
|
||||
// because TanStack Query does prefix matching by default
|
||||
});
|
||||
|
||||
/**
|
||||
* This test documents a potential issue:
|
||||
* If the backend's _clear_submissions_cache hardcodes page_size=20,
|
||||
* but the frontend uses a different page_size, the caches won't sync.
|
||||
*
|
||||
* The frontend should ALWAYS call invalidateQueries without params
|
||||
* to ensure all pages are invalidated, not just specific page_size values.
|
||||
*/
|
||||
test("should document the cache invalidation strategy", () => {
|
||||
// CORRECT: This invalidates ALL submissions queries
|
||||
const correctInvalidation = getGetV2ListMySubmissionsQueryKey();
|
||||
expect(correctInvalidation).toEqual(["/api/store/submissions"]);
|
||||
|
||||
// INCORRECT: This would only invalidate queries with page_size=20
|
||||
const incorrectInvalidation = getGetV2ListMySubmissionsQueryKey({
|
||||
page: 1,
|
||||
page_size: 20,
|
||||
});
|
||||
expect(incorrectInvalidation).toEqual([
|
||||
"/api/store/submissions",
|
||||
{ page: 1, page_size: 20 },
|
||||
]);
|
||||
|
||||
// Verify current usage in codebase uses correct approach
|
||||
// (This is a documentation test - it will always pass)
|
||||
// Real verification requires checking actual invalidateQueries calls
|
||||
});
|
||||
});
|
||||
|
||||
test.describe("Integration with backend cache clearing", () => {
|
||||
/**
|
||||
* This test documents how the backend's _clear_submissions_cache works
|
||||
* and what the frontend needs to do to stay in sync.
|
||||
*/
|
||||
test("should document backend cache clearing behavior", () => {
|
||||
const BACKEND_HARDCODED_PAGE_SIZE = 20; // From cache.py line 18
|
||||
const BACKEND_NUM_PAGES_TO_CLEAR = 20; // From cache.py line 13
|
||||
|
||||
// Backend clears pages 1-19 with page_size=20
|
||||
// Frontend should invalidate ALL queries to ensure sync
|
||||
|
||||
const frontendInvalidationKey = getGetV2ListMySubmissionsQueryKey();
|
||||
|
||||
// Document what gets invalidated
|
||||
const expectedInvalidations = Array.from(
|
||||
{ length: BACKEND_NUM_PAGES_TO_CLEAR - 1 },
|
||||
(_, i) =>
|
||||
getGetV2ListMySubmissionsQueryKey({
|
||||
page: i + 1,
|
||||
page_size: BACKEND_HARDCODED_PAGE_SIZE,
|
||||
}),
|
||||
);
|
||||
|
||||
// All backend-cleared pages should have the same base key
|
||||
expectedInvalidations.forEach((key) => {
|
||||
expect(key[0]).toBe(frontendInvalidationKey[0]);
|
||||
});
|
||||
|
||||
// This confirms that using the base key for invalidation
|
||||
// will catch all the entries the backend cleared
|
||||
});
|
||||
|
||||
/**
|
||||
* CRITICAL TEST: This test will fail if someone changes the page_size
|
||||
* in the frontend components but the backend still uses page_size=20.
|
||||
*/
|
||||
test("should fail if frontend default page_size differs from backend", () => {
|
||||
// Both frontend and backend now use shared constants
|
||||
// Frontend: STORE_SUBMISSIONS_PAGE_SIZE from pagination-config.ts
|
||||
// Backend: V2_STORE_SUBMISSIONS_PAGE_SIZE from cache_config.py
|
||||
// These MUST be kept in sync manually (no cross-language constant sharing possible)
|
||||
|
||||
const EXPECTED_PAGE_SIZE = 20;
|
||||
|
||||
expect(PaginationConfig.STORE_SUBMISSIONS_PAGE_SIZE).toBe(
|
||||
EXPECTED_PAGE_SIZE,
|
||||
);
|
||||
|
||||
// If this test fails, you must:
|
||||
// 1. Update backend/server/cache_config.py V2_STORE_SUBMISSIONS_PAGE_SIZE
|
||||
// 2. Update frontend/lib/pagination-config.ts STORE_SUBMISSIONS_PAGE_SIZE
|
||||
// 3. Update all routes and cache clearing logic to use the constants
|
||||
// 4. Update this test with the new expected value
|
||||
});
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user