Compare commits

...

43 Commits

Author SHA1 Message Date
Swifty
01bab66f5c Merge branch 'swiftyos/caching-pt2' into swiftyos/shared-cache 2025-10-07 14:35:52 +02:00
Swifty
30b2d6b50d update the frontend ci to generate the api files before running the tests 2025-10-07 14:13:33 +02:00
Swifty
97e77339fd Merge branch 'dev' into swiftyos/caching-pt2 2025-10-07 14:06:44 +02:00
Swifty
6a360a49b1 Merge branch 'dev' into swiftyos/caching-pt2 2025-10-07 02:40:13 +02:00
Swifty
045009a84a fix test 2025-10-03 20:36:27 +02:00
Swifty
70cb7824fd Merge branch 'swiftyos/caching-pt2' into swiftyos/shared-cache 2025-10-03 20:30:40 +02:00
Swifty
df1d15fcfe added more tests and central place to set page_size to make the caching more robust 2025-10-03 17:56:35 +02:00
Swifty
eb022e50a7 moved clear cache helper function to cache 2025-10-03 15:21:18 +02:00
Swifty
431042a391 fix cache clear helper function 2025-10-03 15:10:34 +02:00
Swifty
7c6a9146f0 Merge branch 'dev' into swiftyos/caching-pt2 2025-10-03 15:06:39 +02:00
Swifty
0264cb56d3 update page_size invalidation 2025-10-03 12:17:24 +02:00
Swifty
ef552c189f feat(cache): refactor caching system and address PR review feedback
Addresses all 14 review comments from @majdyz on PR #11030

Major architectural improvements:
- Move cache.py from autogpt_libs to backend/util for proper dependency management
- Add Redis configuration to Settings class for centralized config management
- Remove duplicate retry.py from autogpt_libs (use backend.util.retry)
- Implement dedicated Redis connection pool with 50 max connections

Cache API enhancements:
- Make ttl_seconds a required parameter (no infinite TTLs allowed)
- Add CachedValue dataclass to eliminate tuple ambiguity when caching tuple results
- Implement LRU with TTL refresh using Redis GETEX command
- Add pattern-based cache clearing: cache_clear(pattern="user:*")
- Simplify wrapper logic by extracting helper functions

Redis integration:
- Create separate connection pool for cache (binary mode for pickle)
- Add recommended Redis production configuration in comments
- Use Settings class for Redis config instead of environment variables directly
- Update both cache.py and redis_client.py to use centralized settings

Test improvements:
- Move test file from autogpt_libs to backend/test
- Fix tests to use pickleable data structures instead of MagicMock objects
- Update all @cached() decorators to include ttl_seconds parameter
- All 51 cache tests passing

Breaking changes:
- @cached() decorator now requires ttl_seconds parameter
- Import path changed: autogpt_libs.utils.cache -> backend.util.cache
- Tests using shared_cache=True must return pickleable objects
2025-10-02 15:45:43 +02:00
Swifty
e75cf2b765 create helper functions for clearing caches 2025-10-02 11:09:31 +02:00
Swifty
0f6d1f54ee update lock file 2025-10-01 16:21:58 +02:00
Swifty
b3fe2b84ce added shared caching 2025-10-01 16:17:26 +02:00
Swifty
e13861ad33 revet over logging in test 2025-10-01 15:38:27 +02:00
Swifty
e2c24bd463 Merge branch 'swiftyos/caching-pt2' of github.com:Significant-Gravitas/AutoGPT into swiftyos/caching-pt2 2025-10-01 15:25:18 +02:00
Swifty
9f5afff83e update caching rules 2025-10-01 15:24:59 +02:00
Swifty
ced61e2640 Merge branch 'dev' into swiftyos/caching-pt2 2025-10-01 15:07:12 +02:00
Swifty
c9a7cc63da invalidate more caches - we have many with very similar names... 2025-10-01 14:52:58 +02:00
Swifty
7afa01a168 add detailed log messages 2025-10-01 11:54:57 +02:00
Swifty
2f9aba0420 moved cache invalidation earlier in flow to avoid race condition 2025-10-01 11:23:53 +02:00
Swifty
ff4b0929e1 fix cache invalidation for agent activity dropdown 2025-10-01 10:26:37 +02:00
Swifty
2230c76863 Merge branch 'dev' into swiftyos/caching-pt2 2025-10-01 10:18:32 +02:00
Swifty
b3443e0549 fixing tests 2025-09-29 10:58:23 +02:00
Swifty
b1364b1701 fixed test 2025-09-29 10:45:55 +02:00
Swifty
95d66a035c invalidating cache for favoriates 2025-09-29 10:40:02 +02:00
Swifty
30cdf9f0d9 fix merge error 2025-09-29 09:55:34 +02:00
Swifty
e47f0e7f2f Merge branch 'swiftyos/caching-pt2' of https://github.com/Significant-Gravitas/AutoGPT into swiftyos/caching-pt2 2025-09-29 09:46:55 +02:00
Swifty
13d71464a0 Merge branch 'dev' into swiftyos/caching-pt2 2025-09-29 09:46:34 +02:00
Swifty
be1947f6d1 Merge branch 'dev' into swiftyos/caching-pt2 2025-09-26 10:37:42 +02:00
Swifty
1afebcf96b fix test 2025-09-25 15:34:47 +02:00
Swifty
d124c93ff8 Merge branch 'dev' into swiftyos/caching-pt2 2025-09-25 15:34:21 +02:00
Swifty
bc5eb8a8a5 updated caching invalidation rules 2025-09-25 15:26:51 +02:00
Swifty
872ef5fdfb fmt 2025-09-25 13:31:18 +02:00
Swifty
12382e7990 fix duplicated caching code 2025-09-25 13:31:13 +02:00
Swifty
d68a3a1b53 fixed caching when saving an agent 2025-09-25 12:57:52 +02:00
Swifty
863e213af3 fixed caching when adding agent from library 2025-09-25 12:57:40 +02:00
Swifty
c61af53a74 Merge remote-tracking branch 'origin/dev' into swiftyos/caching-pt2 2025-09-25 12:04:07 +02:00
Swifty
eb94503de8 fix(backend): prevent caching of None graph results to handle dynamic permissions
When a graph is not found/accessible, we now clear the cache entry rather than
caching the None result. This prevents issues with store listing permissions
where a graph becomes accessible after approval but the cache still returns
the old 'not found' result.
2025-09-24 18:08:13 +02:00
Swifty
ee4feff8c2 fix(backend): include subgraphs in cached graph retrieval
The cached graph function was missing include_subgraphs=True parameter which
is needed to construct full credentials input schema. This was causing
test_access_store_listing_graph to fail.
2025-09-24 16:51:20 +02:00
Swifty
9147c2d6c8 fix(backend): update library favorites test to mock cache function instead of db
The test was failing because routes now use cached functions. Updated the mock
to patch the cache function which is what the route actually calls.
2025-09-24 16:38:07 +02:00
Swifty
a3af430c69 feat(backend): implement comprehensive caching layer for all GET endpoints (Part 2)
- Created separate cache.py modules for better code organization
  - backend/server/routers/cache.py for V1 API endpoints
  - backend/server/v2/library/cache.py for library endpoints
  - backend/server/v2/store/cache.py (refactored from routes)

- Added caching to all major GET endpoints:
  - Graphs list/details with 15-30 min TTL
  - Graph executions with 5 min TTL
  - User preferences/timezone with 30-60 min TTL
  - Library agents/favorites/presets with 10-30 min TTL
  - Store listings/profiles with 5-60 min TTL

- Implemented intelligent cache invalidation:
  - Clears relevant caches on CREATE/UPDATE/DELETE operations
  - Uses positional arguments for cache_delete to match function calls
  - Selective caching only for default queries (bypasses cache for filtered/searched results)

- Added comprehensive test coverage:
  - 20 cache-specific tests all passing
  - Validates cache hit/miss behavior
  - Verifies invalidation on mutations

- Performance improvements:
  - Reduces database load for frequently accessed data
  - Built-in thundering herd protection via @cached decorator
  - Configurable TTLs based on data volatility
2025-09-24 16:20:19 +02:00
45 changed files with 3412 additions and 569 deletions

View File

@@ -217,6 +217,9 @@ jobs:
- name: Install dependencies
run: pnpm install --frozen-lockfile
- name: Generate API client
run: pnpm generate:api
- name: Install Browser 'chromium'
run: pnpm playwright install --with-deps chromium

View File

@@ -1,339 +0,0 @@
import asyncio
import inspect
import logging
import threading
import time
from functools import wraps
from typing import (
Any,
Callable,
ParamSpec,
Protocol,
TypeVar,
cast,
runtime_checkable,
)
P = ParamSpec("P")
R = TypeVar("R")
R_co = TypeVar("R_co", covariant=True)
logger = logging.getLogger(__name__)
def _make_hashable_key(
args: tuple[Any, ...], kwargs: dict[str, Any]
) -> tuple[Any, ...]:
"""
Convert args and kwargs into a hashable cache key.
Handles unhashable types like dict, list, set by converting them to
their sorted string representations.
"""
def make_hashable(obj: Any) -> Any:
"""Recursively convert an object to a hashable representation."""
if isinstance(obj, dict):
# Sort dict items to ensure consistent ordering
return (
"__dict__",
tuple(sorted((k, make_hashable(v)) for k, v in obj.items())),
)
elif isinstance(obj, (list, tuple)):
return ("__list__", tuple(make_hashable(item) for item in obj))
elif isinstance(obj, set):
return ("__set__", tuple(sorted(make_hashable(item) for item in obj)))
elif hasattr(obj, "__dict__"):
# Handle objects with __dict__ attribute
return ("__obj__", obj.__class__.__name__, make_hashable(obj.__dict__))
else:
# For basic hashable types (str, int, bool, None, etc.)
try:
hash(obj)
return obj
except TypeError:
# Fallback: convert to string representation
return ("__str__", str(obj))
hashable_args = tuple(make_hashable(arg) for arg in args)
hashable_kwargs = tuple(sorted((k, make_hashable(v)) for k, v in kwargs.items()))
return (hashable_args, hashable_kwargs)
@runtime_checkable
class CachedFunction(Protocol[P, R_co]):
"""Protocol for cached functions with cache management methods."""
def cache_clear(self) -> None:
"""Clear all cached entries."""
return None
def cache_info(self) -> dict[str, int | None]:
"""Get cache statistics."""
return {}
def cache_delete(self, *args: P.args, **kwargs: P.kwargs) -> bool:
"""Delete a specific cache entry by its arguments. Returns True if entry existed."""
return False
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R_co:
"""Call the cached function."""
return None # type: ignore
def cached(
*,
maxsize: int = 128,
ttl_seconds: int | None = None,
) -> Callable[[Callable], CachedFunction]:
"""
Thundering herd safe cache decorator for both sync and async functions.
Uses double-checked locking to prevent multiple threads/coroutines from
executing the expensive operation simultaneously during cache misses.
Args:
func: The function to cache (when used without parentheses)
maxsize: Maximum number of cached entries
ttl_seconds: Time to live in seconds. If None, entries never expire
Returns:
Decorated function or decorator
Example:
@cache() # Default: maxsize=128, no TTL
def expensive_sync_operation(param: str) -> dict:
return {"result": param}
@cache() # Works with async too
async def expensive_async_operation(param: str) -> dict:
return {"result": param}
@cache(maxsize=1000, ttl_seconds=300) # Custom maxsize and TTL
def another_operation(param: str) -> dict:
return {"result": param}
"""
def decorator(target_func):
# Cache storage and per-event-loop locks
cache_storage = {}
_event_loop_locks = {} # Maps event loop to its asyncio.Lock
if inspect.iscoroutinefunction(target_func):
def _get_cache_lock():
"""Get or create an asyncio.Lock for the current event loop."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# No event loop, use None as default key
loop = None
if loop not in _event_loop_locks:
return _event_loop_locks.setdefault(loop, asyncio.Lock())
return _event_loop_locks[loop]
@wraps(target_func)
async def async_wrapper(*args: P.args, **kwargs: P.kwargs):
key = _make_hashable_key(args, kwargs)
current_time = time.time()
# Fast path: check cache without lock
if key in cache_storage:
if ttl_seconds is None:
logger.debug(f"Cache hit for {target_func.__name__}")
return cache_storage[key]
else:
cached_data = cache_storage[key]
if isinstance(cached_data, tuple):
result, timestamp = cached_data
if current_time - timestamp < ttl_seconds:
logger.debug(f"Cache hit for {target_func.__name__}")
return result
# Slow path: acquire lock for cache miss/expiry
async with _get_cache_lock():
# Double-check: another coroutine might have populated cache
if key in cache_storage:
if ttl_seconds is None:
return cache_storage[key]
else:
cached_data = cache_storage[key]
if isinstance(cached_data, tuple):
result, timestamp = cached_data
if current_time - timestamp < ttl_seconds:
return result
# Cache miss - execute function
logger.debug(f"Cache miss for {target_func.__name__}")
result = await target_func(*args, **kwargs)
# Store result
if ttl_seconds is None:
cache_storage[key] = result
else:
cache_storage[key] = (result, current_time)
# Cleanup if needed
if len(cache_storage) > maxsize:
cutoff = maxsize // 2
oldest_keys = (
list(cache_storage.keys())[:-cutoff] if cutoff > 0 else []
)
for old_key in oldest_keys:
cache_storage.pop(old_key, None)
return result
wrapper = async_wrapper
else:
# Sync function with threading.Lock
cache_lock = threading.Lock()
@wraps(target_func)
def sync_wrapper(*args: P.args, **kwargs: P.kwargs):
key = _make_hashable_key(args, kwargs)
current_time = time.time()
# Fast path: check cache without lock
if key in cache_storage:
if ttl_seconds is None:
logger.debug(f"Cache hit for {target_func.__name__}")
return cache_storage[key]
else:
cached_data = cache_storage[key]
if isinstance(cached_data, tuple):
result, timestamp = cached_data
if current_time - timestamp < ttl_seconds:
logger.debug(f"Cache hit for {target_func.__name__}")
return result
# Slow path: acquire lock for cache miss/expiry
with cache_lock:
# Double-check: another thread might have populated cache
if key in cache_storage:
if ttl_seconds is None:
return cache_storage[key]
else:
cached_data = cache_storage[key]
if isinstance(cached_data, tuple):
result, timestamp = cached_data
if current_time - timestamp < ttl_seconds:
return result
# Cache miss - execute function
logger.debug(f"Cache miss for {target_func.__name__}")
result = target_func(*args, **kwargs)
# Store result
if ttl_seconds is None:
cache_storage[key] = result
else:
cache_storage[key] = (result, current_time)
# Cleanup if needed
if len(cache_storage) > maxsize:
cutoff = maxsize // 2
oldest_keys = (
list(cache_storage.keys())[:-cutoff] if cutoff > 0 else []
)
for old_key in oldest_keys:
cache_storage.pop(old_key, None)
return result
wrapper = sync_wrapper
# Add cache management methods
def cache_clear() -> None:
cache_storage.clear()
def cache_info() -> dict[str, int | None]:
return {
"size": len(cache_storage),
"maxsize": maxsize,
"ttl_seconds": ttl_seconds,
}
def cache_delete(*args, **kwargs) -> bool:
"""Delete a specific cache entry. Returns True if entry existed."""
key = _make_hashable_key(args, kwargs)
if key in cache_storage:
del cache_storage[key]
return True
return False
setattr(wrapper, "cache_clear", cache_clear)
setattr(wrapper, "cache_info", cache_info)
setattr(wrapper, "cache_delete", cache_delete)
return cast(CachedFunction, wrapper)
return decorator
def thread_cached(func):
"""
Thread-local cache decorator for both sync and async functions.
Each thread gets its own cache, which is useful for request-scoped caching
in web applications where you want to cache within a single request but
not across requests.
Args:
func: The function to cache
Returns:
Decorated function with thread-local caching
Example:
@thread_cached
def expensive_operation(param: str) -> dict:
return {"result": param}
@thread_cached # Works with async too
async def expensive_async_operation(param: str) -> dict:
return {"result": param}
"""
thread_local = threading.local()
def _clear():
if hasattr(thread_local, "cache"):
del thread_local.cache
if inspect.iscoroutinefunction(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
cache = getattr(thread_local, "cache", None)
if cache is None:
cache = thread_local.cache = {}
key = _make_hashable_key(args, kwargs)
if key not in cache:
cache[key] = await func(*args, **kwargs)
return cache[key]
setattr(async_wrapper, "clear_cache", _clear)
return async_wrapper
else:
@wraps(func)
def sync_wrapper(*args, **kwargs):
cache = getattr(thread_local, "cache", None)
if cache is None:
cache = thread_local.cache = {}
key = _make_hashable_key(args, kwargs)
if key not in cache:
cache[key] = func(*args, **kwargs)
return cache[key]
setattr(sync_wrapper, "clear_cache", _clear)
return sync_wrapper
def clear_thread_cache(func: Callable) -> None:
"""Clear thread-local cache for a function."""
if clear := getattr(func, "clear_cache", None):
clear()

View File

@@ -1719,6 +1719,22 @@ files = [
httpx = {version = ">=0.26,<0.29", extras = ["http2"]}
strenum = ">=0.4.15,<0.5.0"
[[package]]
name = "tenacity"
version = "9.1.2"
description = "Retry code until it succeeds"
optional = false
python-versions = ">=3.9"
groups = ["main"]
files = [
{file = "tenacity-9.1.2-py3-none-any.whl", hash = "sha256:f77bf36710d8b73a50b2dd155c97b870017ad21afe6ab300326b0371b3b05138"},
{file = "tenacity-9.1.2.tar.gz", hash = "sha256:1169d376c297e7de388d18b4481760d478b0e99a777cad3a9c86e556f4b697cb"},
]
[package.extras]
doc = ["reno", "sphinx"]
test = ["pytest", "tornado (>=4.5)", "typeguard"]
[[package]]
name = "tomli"
version = "2.2.1"
@@ -1929,4 +1945,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<4.0"
content-hash = "0c40b63c3c921846cf05ccfb4e685d4959854b29c2c302245f9832e20aac6954"
content-hash = "5ec9e6cd2ef7524a356586354755215699e7b37b9bbdfbabc9c73b43085915f4"

View File

@@ -19,6 +19,7 @@ pydantic-settings = "^2.10.1"
pyjwt = { version = "^2.10.1", extras = ["crypto"] }
redis = "^6.2.0"
supabase = "^2.16.0"
tenacity = "^9.1.2"
uvicorn = "^0.35.0"
[tool.poetry.group.dev.dependencies]

View File

@@ -5,7 +5,7 @@ import re
from pathlib import Path
from typing import TYPE_CHECKING, TypeVar
from autogpt_libs.utils.cache import cached
from backend.util.cache import cached
logger = logging.getLogger(__name__)
@@ -16,7 +16,7 @@ if TYPE_CHECKING:
T = TypeVar("T")
@cached()
@cached(ttl_seconds=3600) # Cache blocks for 1 hour
def load_all_blocks() -> dict[str, type["Block"]]:
from backend.data.block import Block
from backend.util.settings import Config

View File

@@ -20,7 +20,6 @@ from typing import (
import jsonref
import jsonschema
from autogpt_libs.utils.cache import cached
from prisma.models import AgentBlock
from prisma.types import AgentBlockCreateInput
from pydantic import BaseModel
@@ -28,6 +27,7 @@ from pydantic import BaseModel
from backend.data.model import NodeExecutionStats
from backend.integrations.providers import ProviderName
from backend.util import json
from backend.util.cache import cached
from backend.util.settings import Config
from .model import (
@@ -722,7 +722,7 @@ def get_block(block_id: str) -> Block[BlockSchema, BlockSchema] | None:
return cls() if cls else None
@cached()
@cached(ttl_seconds=3600)
def get_webhook_block_ids() -> Sequence[str]:
return [
id
@@ -731,7 +731,7 @@ def get_webhook_block_ids() -> Sequence[str]:
]
@cached()
@cached(ttl_seconds=3600)
def get_io_block_ids() -> Sequence[str]:
return [
id

View File

@@ -1,29 +1,24 @@
import logging
import os
from autogpt_libs.utils.cache import cached, thread_cached
from dotenv import load_dotenv
from redis import Redis
from redis.asyncio import Redis as AsyncRedis
from backend.util.cache import cached, thread_cached
from backend.util.retry import conn_retry
from backend.util.settings import Settings
load_dotenv()
HOST = os.getenv("REDIS_HOST", "localhost")
PORT = int(os.getenv("REDIS_PORT", "6379"))
PASSWORD = os.getenv("REDIS_PASSWORD", None)
settings = Settings()
logger = logging.getLogger(__name__)
@conn_retry("Redis", "Acquiring connection")
def connect() -> Redis:
def connect(decode_responses: bool = True) -> Redis:
c = Redis(
host=HOST,
port=PORT,
password=PASSWORD,
decode_responses=True,
host=settings.config.redis_host,
port=settings.config.redis_port,
password=settings.config.redis_password or None,
decode_responses=decode_responses,
)
c.ping()
return c
@@ -34,7 +29,7 @@ def disconnect():
get_redis().close()
@cached()
@cached(ttl_seconds=3600)
def get_redis() -> Redis:
return connect()
@@ -42,9 +37,9 @@ def get_redis() -> Redis:
@conn_retry("AsyncRedis", "Acquiring connection")
async def connect_async() -> AsyncRedis:
c = AsyncRedis(
host=HOST,
port=PORT,
password=PASSWORD,
host=settings.config.redis_host,
port=settings.config.redis_port,
password=settings.config.redis_password or None,
decode_responses=True,
)
await c.ping()

View File

@@ -7,7 +7,6 @@ from typing import Optional, cast
from urllib.parse import quote_plus
from autogpt_libs.auth.models import DEFAULT_USER_ID
from autogpt_libs.utils.cache import cached
from fastapi import HTTPException
from prisma.enums import NotificationType
from prisma.models import User as PrismaUser
@@ -17,6 +16,7 @@ from backend.data.db import prisma
from backend.data.model import User, UserIntegrations, UserMetadata
from backend.data.notifications import NotificationPreference, NotificationPreferenceDTO
from backend.server.v2.store.exceptions import DatabaseError
from backend.util.cache import cached
from backend.util.encryption import JSONCryptor
from backend.util.json import SafeJson
from backend.util.settings import Settings

View File

@@ -22,13 +22,15 @@ logger = logging.getLogger(__name__)
@pytest.fixture
def redis_client():
"""Get Redis client for testing using same config as backend."""
from backend.data.redis_client import HOST, PASSWORD, PORT
from backend.util.settings import Settings
settings = Settings()
# Use same config as backend but without decode_responses since ClusterLock needs raw bytes
client = redis.Redis(
host=HOST,
port=PORT,
password=PASSWORD,
host=settings.config.redis_host,
port=settings.config.redis_port,
password=settings.config.redis_password or None,
decode_responses=False, # ClusterLock needs raw bytes for ownership verification
)

View File

@@ -1,6 +1,6 @@
from typing import TYPE_CHECKING
from autogpt_libs.utils.cache import cached
from backend.util.cache import cached
if TYPE_CHECKING:
from ..providers import ProviderName
@@ -8,7 +8,7 @@ if TYPE_CHECKING:
# --8<-- [start:load_webhook_managers]
@cached()
@cached(ttl_seconds=3600) # Cache webhook managers for 1 hour
def load_webhook_managers() -> dict["ProviderName", type["BaseWebhooksManager"]]:
webhook_managers = {}

View File

@@ -0,0 +1,86 @@
"""
Shared cache configuration constants.
This module defines all page_size defaults used across the application.
By centralizing these values, we ensure that cache invalidation always
uses the same page_size as the routes that populate the cache.
CRITICAL: If you change any of these values, the tests in
test_cache_invalidation_consistency.py will fail to remind you to
update all dependent code.
"""
# V1 API (legacy) page sizes
V1_GRAPHS_PAGE_SIZE = 250
"""Default page size for listing user graphs in v1 API."""
V1_LIBRARY_AGENTS_PAGE_SIZE = 10
"""Default page size for library agents in v1 API."""
V1_GRAPH_EXECUTIONS_PAGE_SIZE = 25
"""Default page size for graph executions in v1 API."""
# V2 Store API page sizes
V2_STORE_AGENTS_PAGE_SIZE = 20
"""Default page size for store agents listing."""
V2_STORE_CREATORS_PAGE_SIZE = 20
"""Default page size for store creators listing."""
V2_STORE_SUBMISSIONS_PAGE_SIZE = 20
"""Default page size for user submissions listing."""
V2_MY_AGENTS_PAGE_SIZE = 20
"""Default page size for user's own agents listing."""
# V2 Library API page sizes
V2_LIBRARY_AGENTS_PAGE_SIZE = 10
"""Default page size for library agents listing in v2 API."""
V2_LIBRARY_PRESETS_PAGE_SIZE = 20
"""Default page size for library presets listing."""
# Alternative page sizes (for backward compatibility or special cases)
V2_LIBRARY_PRESETS_ALT_PAGE_SIZE = 10
"""
Alternative page size for library presets.
Some clients may use this smaller page size, so cache clearing must handle both.
"""
V2_GRAPH_EXECUTIONS_ALT_PAGE_SIZE = 10
"""
Alternative page size for graph executions.
Some clients may use this smaller page size, so cache clearing must handle both.
"""
# Cache clearing configuration
MAX_PAGES_TO_CLEAR = 20
"""
Maximum number of pages to clear when invalidating paginated caches.
This prevents infinite loops while ensuring we clear most cached pages.
For users with more than 20 pages, those pages will expire naturally via TTL.
"""
def get_page_sizes_for_clearing(
primary_page_size: int, alt_page_size: int | None = None
) -> list[int]:
"""
Get all page_size values that should be cleared for a given cache.
Args:
primary_page_size: The main page_size used by the route
alt_page_size: Optional alternative page_size if multiple clients use different sizes
Returns:
List of page_size values to clear
Example:
>>> get_page_sizes_for_clearing(20)
[20]
>>> get_page_sizes_for_clearing(20, 10)
[20, 10]
"""
if alt_page_size is None:
return [primary_page_size]
return [primary_page_size, alt_page_size]

View File

@@ -0,0 +1,154 @@
"""
Cache functions for main V1 API endpoints.
This module contains all caching decorators and helpers for the V1 API,
separated from the main routes for better organization and maintainability.
"""
from typing import Sequence
from backend.data import execution as execution_db
from backend.data import graph as graph_db
from backend.data import user as user_db
from backend.data.block import get_blocks
from backend.util.cache import cached
# ===== Block Caches =====
# Cache block definitions with costs - they rarely change
@cached(maxsize=1, ttl_seconds=3600, shared_cache=True)
def get_cached_blocks() -> Sequence[dict]:
"""
Get cached blocks with thundering herd protection.
Uses cached decorator to prevent multiple concurrent requests
from all executing the expensive block loading operation.
"""
from backend.data.credit import get_block_cost
block_classes = get_blocks()
result = []
for block_class in block_classes.values():
block_instance = block_class()
if not block_instance.disabled:
# Get costs for this specific block class without creating another instance
costs = get_block_cost(block_instance)
result.append({**block_instance.to_dict(), "costs": costs})
return result
# ===== Graph Caches =====
# Cache user's graphs list for 15 minutes
@cached(maxsize=1000, ttl_seconds=900, shared_cache=True)
async def get_cached_graphs(
user_id: str,
page: int,
page_size: int,
):
"""Cached helper to get user's graphs."""
return await graph_db.list_graphs_paginated(
user_id=user_id,
page=page,
page_size=page_size,
)
# Cache individual graph details for 30 minutes
@cached(maxsize=500, ttl_seconds=1800, shared_cache=True)
async def get_cached_graph(
graph_id: str,
version: int | None,
user_id: str,
):
"""Cached helper to get graph details."""
return await graph_db.get_graph(
graph_id=graph_id,
version=version,
user_id=user_id,
include_subgraphs=True, # needed to construct full credentials input schema
)
# Cache graph versions for 30 minutes
@cached(maxsize=500, ttl_seconds=1800, shared_cache=True)
async def get_cached_graph_all_versions(
graph_id: str,
user_id: str,
) -> Sequence[graph_db.GraphModel]:
"""Cached helper to get all versions of a graph."""
return await graph_db.get_graph_all_versions(
graph_id=graph_id,
user_id=user_id,
)
# ===== Execution Caches =====
# Cache graph executions for 10 seconds.
@cached(maxsize=1000, ttl_seconds=10, shared_cache=True)
async def get_cached_graph_executions(
graph_id: str,
user_id: str,
page: int,
page_size: int,
):
"""Cached helper to get graph executions."""
return await execution_db.get_graph_executions_paginated(
graph_id=graph_id,
user_id=user_id,
page=page,
page_size=page_size,
)
# Cache all user executions for 10 seconds.
@cached(maxsize=500, ttl_seconds=10, shared_cache=True)
async def get_cached_graphs_executions(
user_id: str,
page: int,
page_size: int,
):
"""Cached helper to get all user's graph executions."""
return await execution_db.get_graph_executions_paginated(
user_id=user_id,
page=page,
page_size=page_size,
)
# Cache individual execution details for 10 seconds.
@cached(maxsize=1000, ttl_seconds=10, shared_cache=True)
async def get_cached_graph_execution(
graph_exec_id: str,
user_id: str,
):
"""Cached helper to get graph execution details."""
return await execution_db.get_graph_execution(
user_id=user_id,
execution_id=graph_exec_id,
include_node_executions=False,
)
# ===== User Preference Caches =====
# Cache user timezone for 1 hour
@cached(maxsize=1000, ttl_seconds=3600, shared_cache=True)
async def get_cached_user_timezone(user_id: str):
"""Cached helper to get user timezone."""
user = await user_db.get_user_by_id(user_id)
return {"timezone": user.timezone if user else "UTC"}
# Cache user preferences for 30 minutes
@cached(maxsize=1000, ttl_seconds=1800, shared_cache=True)
async def get_cached_user_preferences(user_id: str):
"""Cached helper to get user notification preferences."""
return await user_db.get_user_notification_preference(user_id)

View File

@@ -0,0 +1,376 @@
"""
Tests for cache invalidation in V1 API routes.
This module tests that caches are properly invalidated when data is modified
through POST, PUT, PATCH, and DELETE operations.
"""
import uuid
from unittest.mock import AsyncMock, patch
import pytest
import backend.server.routers.cache as cache
from backend.data import graph as graph_db
@pytest.fixture
def mock_user_id():
"""Generate a mock user ID for testing."""
return str(uuid.uuid4())
@pytest.fixture
def mock_graph_id():
"""Generate a mock graph ID for testing."""
return str(uuid.uuid4())
class TestGraphCacheInvalidation:
"""Test cache invalidation for graph operations."""
@pytest.mark.asyncio
async def test_create_graph_clears_list_cache(self, mock_user_id):
"""Test that creating a graph clears the graphs list cache."""
# Setup
cache.get_cached_graphs.cache_clear()
# Pre-populate cache
with patch.object(
graph_db, "list_graphs_paginated", new_callable=AsyncMock
) as mock_list:
# Use a simple dict instead of MagicMock to make it pickleable
mock_list.return_value = {
"graphs": [],
"total_count": 0,
"page": 1,
"page_size": 250,
}
# First call should hit the database
await cache.get_cached_graphs(mock_user_id, 1, 250)
assert mock_list.call_count == 1
# Second call should use cache
await cache.get_cached_graphs(mock_user_id, 1, 250)
assert mock_list.call_count == 1 # Still 1, used cache
# Simulate cache invalidation (what happens in create_new_graph)
cache.get_cached_graphs.cache_delete(mock_user_id, 1, 250)
# Next call should hit database again
await cache.get_cached_graphs(mock_user_id, 1, 250)
assert mock_list.call_count == 2 # Incremented, cache was cleared
@pytest.mark.asyncio
async def test_delete_graph_clears_multiple_caches(
self, mock_user_id, mock_graph_id
):
"""Test that deleting a graph clears all related caches."""
# Clear all caches first
cache.get_cached_graphs.cache_clear()
cache.get_cached_graph.cache_clear()
cache.get_cached_graph_all_versions.cache_clear()
cache.get_cached_graph_executions.cache_clear()
# Setup mocks
with (
patch.object(
graph_db, "list_graphs_paginated", new_callable=AsyncMock
) as mock_list,
patch.object(graph_db, "get_graph", new_callable=AsyncMock) as mock_get,
patch.object(
graph_db, "get_graph_all_versions", new_callable=AsyncMock
) as mock_versions,
):
mock_list.return_value = {
"graphs": [],
"total_count": 0,
"page": 1,
"page_size": 250,
}
mock_get.return_value = {"id": mock_graph_id}
mock_versions.return_value = []
# Pre-populate all caches (use consistent argument style)
await cache.get_cached_graphs(mock_user_id, 1, 250)
await cache.get_cached_graph(mock_graph_id, None, mock_user_id)
await cache.get_cached_graph_all_versions(mock_graph_id, mock_user_id)
initial_calls = {
"list": mock_list.call_count,
"get": mock_get.call_count,
"versions": mock_versions.call_count,
}
# Use cached values (no additional DB calls)
await cache.get_cached_graphs(mock_user_id, 1, 250)
await cache.get_cached_graph(mock_graph_id, None, mock_user_id)
await cache.get_cached_graph_all_versions(mock_graph_id, mock_user_id)
# Verify cache was used
assert mock_list.call_count == initial_calls["list"]
assert mock_get.call_count == initial_calls["get"]
assert mock_versions.call_count == initial_calls["versions"]
# Simulate delete_graph cache invalidation
# Use positional arguments for cache_delete to match how we called the functions
result1 = cache.get_cached_graphs.cache_delete(mock_user_id, 1, 250)
result2 = cache.get_cached_graph.cache_delete(
mock_graph_id, None, mock_user_id
)
result3 = cache.get_cached_graph_all_versions.cache_delete(
mock_graph_id, mock_user_id
)
# Verify that the cache entries were actually deleted
assert result1, "Failed to delete graphs cache entry"
assert result2, "Failed to delete graph cache entry"
assert result3, "Failed to delete graph versions cache entry"
# Next calls should hit database
await cache.get_cached_graphs(mock_user_id, 1, 250)
await cache.get_cached_graph(mock_graph_id, None, mock_user_id)
await cache.get_cached_graph_all_versions(mock_graph_id, mock_user_id)
# Verify database was called again
assert mock_list.call_count == initial_calls["list"] + 1
assert mock_get.call_count == initial_calls["get"] + 1
assert mock_versions.call_count == initial_calls["versions"] + 1
@pytest.mark.asyncio
async def test_update_graph_clears_caches(self, mock_user_id, mock_graph_id):
"""Test that updating a graph clears the appropriate caches."""
# Clear caches
cache.get_cached_graph.cache_clear()
cache.get_cached_graph_all_versions.cache_clear()
cache.get_cached_graphs.cache_clear()
with (
patch.object(graph_db, "get_graph", new_callable=AsyncMock) as mock_get,
patch.object(
graph_db, "get_graph_all_versions", new_callable=AsyncMock
) as mock_versions,
patch.object(
graph_db, "list_graphs_paginated", new_callable=AsyncMock
) as mock_list,
):
mock_get.return_value = {"id": mock_graph_id, "version": 1}
mock_versions.return_value = [{"version": 1}]
mock_list.return_value = {
"graphs": [],
"total_count": 0,
"page": 1,
"page_size": 250,
}
# Populate caches
await cache.get_cached_graph(mock_graph_id, None, mock_user_id)
await cache.get_cached_graph_all_versions(mock_graph_id, mock_user_id)
await cache.get_cached_graphs(mock_user_id, 1, 250)
initial_calls = {
"get": mock_get.call_count,
"versions": mock_versions.call_count,
"list": mock_list.call_count,
}
# Verify cache is being used
await cache.get_cached_graph(mock_graph_id, None, mock_user_id)
await cache.get_cached_graph_all_versions(mock_graph_id, mock_user_id)
await cache.get_cached_graphs(mock_user_id, 1, 250)
assert mock_get.call_count == initial_calls["get"]
assert mock_versions.call_count == initial_calls["versions"]
assert mock_list.call_count == initial_calls["list"]
# Simulate update_graph cache invalidation
cache.get_cached_graph.cache_delete(mock_graph_id, None, mock_user_id)
cache.get_cached_graph_all_versions.cache_delete(
mock_graph_id, mock_user_id
)
cache.get_cached_graphs.cache_delete(mock_user_id, 1, 250)
# Next calls should hit database
await cache.get_cached_graph(mock_graph_id, None, mock_user_id)
await cache.get_cached_graph_all_versions(mock_graph_id, mock_user_id)
await cache.get_cached_graphs(mock_user_id, 1, 250)
assert mock_get.call_count == initial_calls["get"] + 1
assert mock_versions.call_count == initial_calls["versions"] + 1
assert mock_list.call_count == initial_calls["list"] + 1
class TestUserPreferencesCacheInvalidation:
"""Test cache invalidation for user preferences operations."""
@pytest.mark.asyncio
async def test_update_preferences_clears_cache(self, mock_user_id):
"""Test that updating preferences clears the preferences cache."""
# Clear cache
cache.get_cached_user_preferences.cache_clear()
with patch.object(
cache.user_db, "get_user_notification_preference", new_callable=AsyncMock
) as mock_get_prefs:
mock_prefs = {"email_notifications": True, "push_notifications": False}
mock_get_prefs.return_value = mock_prefs
# First call hits database
result1 = await cache.get_cached_user_preferences(mock_user_id)
assert mock_get_prefs.call_count == 1
assert result1 == mock_prefs
# Second call uses cache
result2 = await cache.get_cached_user_preferences(mock_user_id)
assert mock_get_prefs.call_count == 1 # Still 1
assert result2 == mock_prefs
# Simulate update_preferences cache invalidation
cache.get_cached_user_preferences.cache_delete(mock_user_id)
# Change the mock return value to simulate updated preferences
mock_prefs_updated = {
"email_notifications": False,
"push_notifications": True,
}
mock_get_prefs.return_value = mock_prefs_updated
# Next call should hit database and get new value
result3 = await cache.get_cached_user_preferences(mock_user_id)
assert mock_get_prefs.call_count == 2
assert result3 == mock_prefs_updated
@pytest.mark.asyncio
async def test_timezone_cache_operations(self, mock_user_id):
"""Test timezone cache and its operations."""
# Clear cache
cache.get_cached_user_timezone.cache_clear()
with patch.object(
cache.user_db, "get_user_by_id", new_callable=AsyncMock
) as mock_get_user:
# Use a simple object that supports attribute access
class MockUser:
def __init__(self, timezone):
self.timezone = timezone
mock_user = MockUser("America/New_York")
mock_get_user.return_value = mock_user
# First call hits database
result1 = await cache.get_cached_user_timezone(mock_user_id)
assert mock_get_user.call_count == 1
assert result1["timezone"] == "America/New_York"
# Second call uses cache
result2 = await cache.get_cached_user_timezone(mock_user_id)
assert mock_get_user.call_count == 1 # Still 1
assert result2["timezone"] == "America/New_York"
# Clear cache manually (simulating what would happen after update)
cache.get_cached_user_timezone.cache_delete(mock_user_id)
# Change timezone
mock_user_updated = MockUser("Europe/London")
mock_get_user.return_value = mock_user_updated
# Next call should hit database
result3 = await cache.get_cached_user_timezone(mock_user_id)
assert mock_get_user.call_count == 2
assert result3["timezone"] == "Europe/London"
class TestExecutionCacheInvalidation:
"""Test cache invalidation for execution operations."""
@pytest.mark.asyncio
async def test_execution_cache_cleared_on_graph_delete(
self, mock_user_id, mock_graph_id
):
"""Test that execution caches are cleared when a graph is deleted."""
# Clear cache
cache.get_cached_graph_executions.cache_clear()
with patch.object(
cache.execution_db, "get_graph_executions_paginated", new_callable=AsyncMock
) as mock_exec:
mock_exec.return_value = {
"executions": [],
"total_count": 0,
"page": 1,
"page_size": 25,
}
# Populate cache for multiple pages
for page in range(1, 4):
await cache.get_cached_graph_executions(
mock_graph_id, mock_user_id, page, 25
)
initial_calls = mock_exec.call_count
# Verify cache is used
for page in range(1, 4):
await cache.get_cached_graph_executions(
mock_graph_id, mock_user_id, page, 25
)
assert mock_exec.call_count == initial_calls # No new calls
# Simulate graph deletion clearing execution caches
for page in range(1, 10): # Clear more pages as done in delete_graph
cache.get_cached_graph_executions.cache_delete(
mock_graph_id, mock_user_id, page, 25
)
# Next calls should hit database
for page in range(1, 4):
await cache.get_cached_graph_executions(
mock_graph_id, mock_user_id, page, 25
)
assert mock_exec.call_count == initial_calls + 3 # 3 new calls
class TestCacheInfo:
"""Test cache information and metrics."""
def test_cache_info_returns_correct_metrics(self):
"""Test that cache_info returns correct metrics."""
# Clear all caches
cache.get_cached_graphs.cache_clear()
cache.get_cached_graph.cache_clear()
# Get initial info
info_graphs = cache.get_cached_graphs.cache_info()
info_graph = cache.get_cached_graph.cache_info()
assert info_graphs["size"] == 0
assert info_graph["size"] == 0
# Note: We can't directly test cache population without real async context,
# but we can verify the cache_info structure
assert "size" in info_graphs
assert "maxsize" in info_graphs
assert "ttl_seconds" in info_graphs
def test_cache_clear_removes_all_entries(self):
"""Test that cache_clear removes all entries."""
# This test verifies the cache_clear method exists and can be called
cache.get_cached_graphs.cache_clear()
cache.get_cached_graph.cache_clear()
cache.get_cached_graph_all_versions.cache_clear()
cache.get_cached_graph_executions.cache_clear()
cache.get_cached_graphs_executions.cache_clear()
cache.get_cached_user_preferences.cache_clear()
cache.get_cached_user_timezone.cache_clear()
# After clear, all caches should be empty
assert cache.get_cached_graphs.cache_info()["size"] == 0
assert cache.get_cached_graph.cache_info()["size"] == 0
assert cache.get_cached_graph_all_versions.cache_info()["size"] == 0
assert cache.get_cached_graph_executions.cache_info()["size"] == 0
assert cache.get_cached_graphs_executions.cache_info()["size"] == 0
assert cache.get_cached_user_preferences.cache_info()["size"] == 0
assert cache.get_cached_user_timezone.cache_info()["size"] == 0

View File

@@ -11,7 +11,6 @@ import pydantic
import stripe
from autogpt_libs.auth import get_user_id, requires_user
from autogpt_libs.auth.jwt_utils import get_jwt_payload
from autogpt_libs.utils.cache import cached
from fastapi import (
APIRouter,
Body,
@@ -29,8 +28,11 @@ from pydantic import BaseModel
from starlette.status import HTTP_204_NO_CONTENT, HTTP_404_NOT_FOUND
from typing_extensions import Optional, TypedDict
import backend.server.cache_config as cache_config
import backend.server.integrations.router
import backend.server.routers.analytics
import backend.server.routers.cache as cache
import backend.server.v2.library.cache as library_cache
import backend.server.v2.library.db as library_db
from backend.data import api_key as api_key_db
from backend.data import execution as execution_db
@@ -57,7 +59,6 @@ from backend.data.onboarding import (
from backend.data.user import (
get_or_create_user,
get_user_by_id,
get_user_notification_preference,
update_user_email,
update_user_notification_preference,
update_user_timezone,
@@ -168,7 +169,9 @@ async def get_user_timezone_route(
) -> TimezoneResponse:
"""Get user timezone setting."""
user = await get_or_create_user(user_data)
return TimezoneResponse(timezone=user.timezone)
# Use cached timezone for subsequent calls
result = await cache.get_cached_user_timezone(user.id)
return TimezoneResponse(timezone=result["timezone"])
@v1_router.post(
@@ -182,6 +185,7 @@ async def update_user_timezone_route(
) -> TimezoneResponse:
"""Update user timezone. The timezone should be a valid IANA timezone identifier."""
user = await update_user_timezone(user_id, str(request.timezone))
cache.get_cached_user_timezone.cache_delete(user_id)
return TimezoneResponse(timezone=user.timezone)
@@ -194,7 +198,7 @@ async def update_user_timezone_route(
async def get_preferences(
user_id: Annotated[str, Security(get_user_id)],
) -> NotificationPreference:
preferences = await get_user_notification_preference(user_id)
preferences = await cache.get_cached_user_preferences(user_id)
return preferences
@@ -209,6 +213,10 @@ async def update_preferences(
preferences: NotificationPreferenceDTO = Body(...),
) -> NotificationPreference:
output = await update_user_notification_preference(user_id, preferences)
# Clear preferences cache after update
cache.get_cached_user_preferences.cache_delete(user_id)
return output
@@ -291,7 +299,7 @@ def _compute_blocks_sync() -> str:
return dumps(result)
@cached()
@cached(ttl_seconds=3600)
async def _get_cached_blocks() -> str:
"""
Async cached function with thundering herd protection.
@@ -668,11 +676,10 @@ class DeleteGraphResponse(TypedDict):
async def list_graphs(
user_id: Annotated[str, Security(get_user_id)],
) -> Sequence[graph_db.GraphMeta]:
paginated_result = await graph_db.list_graphs_paginated(
paginated_result = await cache.get_cached_graphs(
user_id=user_id,
page=1,
page_size=250,
filter_by="active",
)
return paginated_result.graphs
@@ -695,13 +702,26 @@ async def get_graph(
version: int | None = None,
for_export: bool = False,
) -> graph_db.GraphModel:
graph = await graph_db.get_graph(
graph_id,
version,
user_id=user_id,
for_export=for_export,
include_subgraphs=True, # needed to construct full credentials input schema
)
# Use cache for non-export requests
if not for_export:
graph = await cache.get_cached_graph(
graph_id=graph_id,
version=version,
user_id=user_id,
)
# If graph not found, clear cache entry as permissions may have changed
if not graph:
cache.get_cached_graph.cache_delete(
graph_id=graph_id, version=version, user_id=user_id
)
else:
graph = await graph_db.get_graph(
graph_id,
version,
user_id=user_id,
for_export=for_export,
include_subgraphs=True, # needed to construct full credentials input schema
)
if not graph:
raise HTTPException(status_code=404, detail=f"Graph #{graph_id} not found.")
return graph
@@ -716,7 +736,7 @@ async def get_graph(
async def get_graph_all_versions(
graph_id: str, user_id: Annotated[str, Security(get_user_id)]
) -> Sequence[graph_db.GraphModel]:
graphs = await graph_db.get_graph_all_versions(graph_id, user_id=user_id)
graphs = await cache.get_cached_graph_all_versions(graph_id, user_id=user_id)
if not graphs:
raise HTTPException(status_code=404, detail=f"Graph #{graph_id} not found.")
return graphs
@@ -740,6 +760,26 @@ async def create_new_graph(
# as the graph already valid and no sub-graphs are returned back.
await graph_db.create_graph(graph, user_id=user_id)
await library_db.create_library_agent(graph, user_id=user_id)
# Clear graphs list cache after creating new graph
cache.get_cached_graphs.cache_delete(
user_id=user_id,
page=1,
page_size=cache_config.V1_GRAPHS_PAGE_SIZE,
)
for page in range(1, cache_config.MAX_PAGES_TO_CLEAR):
library_cache.get_cached_library_agents.cache_delete(
user_id=user_id,
page=page,
page_size=cache_config.V1_LIBRARY_AGENTS_PAGE_SIZE,
)
# Clear my agents cache so user sees new agent immediately
import backend.server.v2.store.cache
backend.server.v2.store.cache._clear_my_agents_cache(user_id)
return await on_graph_activate(graph, user_id=user_id)
@@ -755,7 +795,32 @@ async def delete_graph(
if active_version := await graph_db.get_graph(graph_id, user_id=user_id):
await on_graph_deactivate(active_version, user_id=user_id)
return {"version_counts": await graph_db.delete_graph(graph_id, user_id=user_id)}
result = DeleteGraphResponse(
version_counts=await graph_db.delete_graph(graph_id, user_id=user_id)
)
# Clear caches after deleting graph
cache.get_cached_graphs.cache_delete(
user_id=user_id,
page=1,
page_size=cache_config.V1_GRAPHS_PAGE_SIZE,
)
cache.get_cached_graph.cache_delete(
graph_id=graph_id, version=None, user_id=user_id
)
cache.get_cached_graph_all_versions.cache_delete(graph_id, user_id=user_id)
# Clear my agents cache so user sees agent removed immediately
import backend.server.v2.store.cache
backend.server.v2.store.cache._clear_my_agents_cache(user_id)
# Clear library agent by graph_id cache
library_cache.get_cached_library_agent_by_graph_id.cache_delete(
graph_id=graph_id, user_id=user_id
)
return result
@v1_router.put(
@@ -811,6 +876,18 @@ async def update_graph(
include_subgraphs=True,
)
assert new_graph_version_with_subgraphs # make type checker happy
# Clear caches after updating graph
cache.get_cached_graph.cache_delete(
graph_id=graph_id, version=None, user_id=user_id
)
cache.get_cached_graph_all_versions.cache_delete(graph_id, user_id=user_id)
cache.get_cached_graphs.cache_delete(
user_id=user_id,
page=1,
page_size=cache_config.V1_GRAPHS_PAGE_SIZE,
)
return new_graph_version_with_subgraphs
@@ -876,6 +953,29 @@ async def execute_graph(
detail="Insufficient balance to execute the agent. Please top up your account.",
)
# Invalidate caches before execution starts so frontend sees fresh data
cache.get_cached_graphs_executions.cache_delete(
user_id=user_id,
page=1,
page_size=cache_config.V1_GRAPHS_PAGE_SIZE,
)
for page in range(1, cache_config.MAX_PAGES_TO_CLEAR):
cache.get_cached_graph_execution.cache_delete(
graph_id=graph_id, user_id=user_id, version=graph_version
)
cache.get_cached_graph_executions.cache_delete(
graph_id=graph_id,
user_id=user_id,
page=page,
page_size=cache_config.V1_GRAPH_EXECUTIONS_PAGE_SIZE,
)
library_cache.get_cached_library_agents.cache_delete(
user_id=user_id,
page=page,
page_size=cache_config.V1_LIBRARY_AGENTS_PAGE_SIZE,
)
try:
result = await execution_utils.add_graph_execution(
graph_id=graph_id,
@@ -888,6 +988,7 @@ async def execute_graph(
# Record successful graph execution
record_graph_execution(graph_id=graph_id, status="success", user_id=user_id)
record_graph_operation(operation="execute", status="success")
return result
except GraphValidationError as e:
# Record failed graph execution
@@ -963,7 +1064,7 @@ async def _stop_graph_run(
async def list_graphs_executions(
user_id: Annotated[str, Security(get_user_id)],
) -> list[execution_db.GraphExecutionMeta]:
paginated_result = await execution_db.get_graph_executions_paginated(
paginated_result = await cache.get_cached_graphs_executions(
user_id=user_id,
page=1,
page_size=250,
@@ -985,7 +1086,7 @@ async def list_graph_executions(
25, ge=1, le=100, description="Number of executions per page"
),
) -> execution_db.GraphExecutionsPaginated:
return await execution_db.get_graph_executions_paginated(
return await cache.get_cached_graph_executions(
graph_id=graph_id,
user_id=user_id,
page=page,

View File

@@ -102,13 +102,13 @@ def test_get_graph_blocks(
mock_block.id = "test-block"
mock_block.disabled = False
# Mock get_blocks
# Mock get_blocks where it's imported at the top of v1.py
mocker.patch(
"backend.server.routers.v1.get_blocks",
return_value={"test-block": lambda: mock_block},
)
# Mock block costs
# Mock block costs where it's imported inside the function
mocker.patch(
"backend.data.credit.get_block_cost",
return_value=[{"cost": 10, "type": "credit"}],

View File

@@ -0,0 +1,299 @@
#!/usr/bin/env python3
"""
Complete audit of all @cached functions to verify proper cache invalidation.
This test systematically checks every @cached function in the codebase
to ensure it has appropriate cache invalidation logic when data changes.
"""
import pytest
class TestCacheInvalidationAudit:
"""Audit all @cached functions for proper invalidation."""
def test_v1_router_caches(self):
"""
V1 Router cached functions:
- _get_cached_blocks(): ✓ NEVER CHANGES (blocks are static in code)
"""
# No invalidation needed for static data
pass
def test_v1_cache_module_graph_caches(self):
"""
V1 Cache module graph-related caches:
- get_cached_graphs(user_id, page, page_size): ✓ HAS INVALIDATION
Cleared in: v1.py create_graph(), delete_graph(), update_graph_metadata(), stop_graph_execution()
- get_cached_graph(graph_id, version, user_id): ✓ HAS INVALIDATION
Cleared in: v1.py delete_graph(), update_graph(), delete_graph_execution()
- get_cached_graph_all_versions(graph_id, user_id): ✓ HAS INVALIDATION
Cleared in: v1.py delete_graph(), update_graph(), delete_graph_execution()
- get_cached_graph_executions(graph_id, user_id, page, page_size): ✓ HAS INVALIDATION
Cleared in: v1.py stop_graph_execution()
Also cleared in: v2/library/routes/presets.py
- get_cached_graphs_executions(user_id, page, page_size): ✓ HAS INVALIDATION
Cleared in: v1.py stop_graph_execution()
- get_cached_graph_execution(graph_exec_id, user_id): ✓ HAS INVALIDATION
Cleared in: v1.py stop_graph_execution()
ISSUE: All use hardcoded page_size values instead of cache_config constants!
"""
# Document that v1 routes should migrate to use cache_config
pass
def test_v1_cache_module_user_caches(self):
"""
V1 Cache module user-related caches:
- get_cached_user_timezone(user_id): ✓ HAS INVALIDATION
Cleared in: v1.py update_user_profile()
- get_cached_user_preferences(user_id): ✓ HAS INVALIDATION
Cleared in: v1.py update_user_notification_preferences()
"""
pass
def test_v2_store_cache_functions(self):
"""
V2 Store cached functions:
- _get_cached_user_profile(user_id): ✓ HAS INVALIDATION
Cleared in: v2/store/routes.py update_or_create_profile()
- _get_cached_store_agents(...): ⚠️ PARTIAL INVALIDATION
Cleared in: v2/admin/store_admin_routes.py review_submission() - uses cache_clear()
NOT cleared when agents are created/updated!
- _get_cached_agent_details(username, agent_name): ❌ NO INVALIDATION
NEVER cleared! Relies only on TTL (15 min)
- _get_cached_agent_graph(store_listing_version_id): ❌ NO INVALIDATION
NEVER cleared! Relies only on TTL (1 hour)
- _get_cached_store_agent_by_version(store_listing_version_id): ❌ NO INVALIDATION
NEVER cleared! Relies only on TTL (1 hour)
- _get_cached_store_creators(...): ❌ NO INVALIDATION
NEVER cleared! Relies only on TTL (1 hour)
- _get_cached_creator_details(username): ❌ NO INVALIDATION
NEVER cleared! Relies only on TTL (1 hour)
- _get_cached_my_agents(user_id, page, page_size): ❌ NO INVALIDATION
NEVER cleared! Users won't see new agents for 5 minutes!
CRITICAL BUG: Should be cleared when user creates/deletes agents
- _get_cached_submissions(user_id, page, page_size): ✓ HAS INVALIDATION
Cleared via: _clear_submissions_cache() helper
Called in: create_submission(), edit_submission(), delete_submission()
Called in: v2/admin/store_admin_routes.py review_submission()
"""
# Document critical issues
CRITICAL_MISSING_INVALIDATION = [
"_get_cached_my_agents - users won't see new agents immediately",
]
# Acceptable TTL-only caches (documented, not asserted):
# - _get_cached_agent_details (public data, 15min TTL acceptable)
# - _get_cached_agent_graph (immutable data, 1hr TTL acceptable)
# - _get_cached_store_agent_by_version (immutable version, 1hr TTL acceptable)
# - _get_cached_store_creators (public data, 1hr TTL acceptable)
# - _get_cached_creator_details (public data, 1hr TTL acceptable)
assert (
len(CRITICAL_MISSING_INVALIDATION) == 1
), "These caches need invalidation logic:\n" + "\n".join(
CRITICAL_MISSING_INVALIDATION
)
def test_v2_library_cache_functions(self):
"""
V2 Library cached functions:
- get_cached_library_agents(user_id, page, page_size, ...): ✓ HAS INVALIDATION
Cleared in: v1.py create_graph(), stop_graph_execution()
Cleared in: v2/library/routes/agents.py add_library_agent(), remove_library_agent()
- get_cached_library_agent_favorites(user_id, page, page_size): ✓ HAS INVALIDATION
Cleared in: v2/library/routes/agents.py favorite/unfavorite endpoints
- get_cached_library_agent(library_agent_id, user_id): ✓ HAS INVALIDATION
Cleared in: v2/library/routes/agents.py remove_library_agent()
- get_cached_library_agent_by_graph_id(graph_id, user_id): ❌ NO INVALIDATION
NEVER cleared! Relies only on TTL (30 min)
Should be cleared when graph is deleted
- get_cached_library_agent_by_store_version(store_listing_version_id, user_id): ❌ NO INVALIDATION
NEVER cleared! Relies only on TTL (1 hour)
Probably acceptable as store versions are immutable
- get_cached_library_presets(user_id, page, page_size): ✓ HAS INVALIDATION
Cleared via: _clear_presets_list_cache() helper
Called in: v2/library/routes/presets.py preset mutations
- get_cached_library_preset(preset_id, user_id): ✓ HAS INVALIDATION
Cleared in: v2/library/routes/presets.py preset mutations
ISSUE: Clearing uses hardcoded page_size values (10 and 20) instead of cache_config!
"""
pass
def test_immutable_singleton_caches(self):
"""
Caches that never need invalidation (singleton or immutable):
- get_webhook_block_ids(): ✓ STATIC (blocks in code)
- get_io_block_ids(): ✓ STATIC (blocks in code)
- get_supabase(): ✓ CLIENT INSTANCE (no invalidation needed)
- get_async_supabase(): ✓ CLIENT INSTANCE (no invalidation needed)
- _get_all_providers(): ✓ STATIC CONFIG (providers in code)
- get_redis(): ✓ CLIENT INSTANCE (no invalidation needed)
- load_webhook_managers(): ✓ STATIC (managers in code)
- load_all_blocks(): ✓ STATIC (blocks in code)
- get_cached_blocks(): ✓ STATIC (blocks in code)
"""
pass
def test_feature_flag_cache(self):
"""
Feature flag cache:
- _fetch_user_context_data(user_id): ⚠️ LONG TTL
TTL: 24 hours
NO INVALIDATION
This is probably acceptable as user context changes infrequently.
However, if user metadata changes, they won't see updated flags for 24 hours.
"""
pass
def test_onboarding_cache(self):
"""
Onboarding cache:
- onboarding_enabled(): ⚠️ NO INVALIDATION
TTL: 5 minutes
NO INVALIDATION
Should probably be cleared when store agents are added/removed.
But 5min TTL is acceptable for this use case.
"""
pass
class TestCacheInvalidationPageSizeConsistency:
"""Test that all cache_delete calls use consistent page_size values."""
def test_v1_routes_hardcoded_page_sizes(self):
"""
V1 routes use hardcoded page_size values that should migrate to cache_config:
❌ page_size=250 for graphs:
- v1.py line 765: cache.get_cached_graphs.cache_delete(user_id, page=1, page_size=250)
- v1.py line 791: cache.get_cached_graphs.cache_delete(user_id, page=1, page_size=250)
- v1.py line 859: cache.get_cached_graphs.cache_delete(user_id, page=1, page_size=250)
- v1.py line 929: cache.get_cached_graphs_executions.cache_delete(user_id, page=1, page_size=250)
❌ page_size=10 for library agents:
- v1.py line 768: library_cache.get_cached_library_agents.cache_delete(..., page_size=10)
- v1.py line 940: library_cache.get_cached_library_agents.cache_delete(..., page_size=10)
❌ page_size=25 for graph executions:
- v1.py line 937: cache.get_cached_graph_executions.cache_delete(..., page_size=25)
RECOMMENDATION: Create constants in cache_config and migrate v1 routes to use them.
"""
from backend.server import cache_config
# These constants exist but aren't used in v1 routes yet
assert cache_config.V1_GRAPHS_PAGE_SIZE == 250
assert cache_config.V1_LIBRARY_AGENTS_PAGE_SIZE == 10
assert cache_config.V1_GRAPH_EXECUTIONS_PAGE_SIZE == 25
def test_v2_library_routes_hardcoded_page_sizes(self):
"""
V2 library routes use hardcoded page_size values:
❌ v2/library/routes/agents.py:
- line 233: cache_delete(..., page_size=10)
❌ v2/library/routes/presets.py _clear_presets_list_cache():
- Clears BOTH page_size=10 AND page_size=20
- This suggests different consumers use different page sizes
❌ v2/library/routes/presets.py:
- line 449: cache_delete(..., page_size=10)
- line 452: cache_delete(..., page_size=25)
RECOMMENDATION: Migrate to use cache_config constants.
"""
from backend.server import cache_config
# Constants exist for library
assert cache_config.V2_LIBRARY_AGENTS_PAGE_SIZE == 10
assert cache_config.V2_LIBRARY_PRESETS_PAGE_SIZE == 20
assert cache_config.V2_LIBRARY_PRESETS_ALT_PAGE_SIZE == 10
def test_only_page_1_cleared_risk(self):
"""
Document cache_delete calls that only clear page=1.
RISKY PATTERN: Many cache_delete calls only clear page=1:
- v1.py create_graph(): Only clears page=1 of graphs
- v1.py delete_graph(): Only clears page=1 of graphs
- v1.py update_graph_metadata(): Only clears page=1 of graphs
- v1.py stop_graph_execution(): Only clears page=1 of executions
PROBLEM: If user has > 1 page, subsequent pages show stale data until TTL expires.
SOLUTIONS:
1. Use cache_clear() to clear all pages (nuclear option)
2. Loop through multiple pages like _clear_submissions_cache does
3. Accept TTL-based expiry for pages 2+ (current approach)
Current approach is probably acceptable given TTL values are reasonable.
"""
pass
class TestCriticalCacheBugs:
"""Document critical cache bugs that need fixing."""
def test_my_agents_cache_never_cleared(self):
"""
CRITICAL BUG: _get_cached_my_agents is NEVER cleared!
Impact:
- User creates a new agent → Won't see it in "My Agents" for 5 minutes
- User deletes an agent → Still see it in "My Agents" for 5 minutes
Fix needed:
1. Create _clear_my_agents_cache() helper (like _clear_submissions_cache)
2. Call it from v1.py create_graph() and delete_graph()
3. Use cache_config.V2_MY_AGENTS_PAGE_SIZE constant
Location: v2/store/cache.py line 120
"""
# This documents the bug
NEEDS_CACHE_CLEARING = "_get_cached_my_agents"
assert NEEDS_CACHE_CLEARING == "_get_cached_my_agents"
def test_library_agent_by_graph_id_never_cleared(self):
"""
BUG: get_cached_library_agent_by_graph_id is NEVER cleared!
Impact:
- User deletes a graph → Library still shows it's available for 30 minutes
Fix needed:
- Clear in v1.py delete_graph()
- Clear in v2/library/routes/agents.py remove_library_agent()
Location: v2/library/cache.py line 59
"""
pass
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,95 @@
#!/usr/bin/env python3
"""
Test suite to verify cache_config constants are being used correctly.
This ensures that the centralized cache_config.py constants are actually
used throughout the codebase, not just defined.
"""
import pytest
from backend.server import cache_config
class TestCacheConfigConstants:
"""Verify cache_config constants have expected values."""
def test_v2_store_page_sizes(self):
"""Test V2 Store API page size constants."""
assert cache_config.V2_STORE_AGENTS_PAGE_SIZE == 20
assert cache_config.V2_STORE_CREATORS_PAGE_SIZE == 20
assert cache_config.V2_STORE_SUBMISSIONS_PAGE_SIZE == 20
assert cache_config.V2_MY_AGENTS_PAGE_SIZE == 20
def test_v2_library_page_sizes(self):
"""Test V2 Library API page size constants."""
assert cache_config.V2_LIBRARY_AGENTS_PAGE_SIZE == 10
assert cache_config.V2_LIBRARY_PRESETS_PAGE_SIZE == 20
assert cache_config.V2_LIBRARY_PRESETS_ALT_PAGE_SIZE == 10
def test_v1_page_sizes(self):
"""Test V1 API page size constants."""
assert cache_config.V1_GRAPHS_PAGE_SIZE == 250
assert cache_config.V1_LIBRARY_AGENTS_PAGE_SIZE == 10
assert cache_config.V1_GRAPH_EXECUTIONS_PAGE_SIZE == 25
def test_cache_clearing_config(self):
"""Test cache clearing configuration."""
assert cache_config.MAX_PAGES_TO_CLEAR == 20
def test_get_page_sizes_for_clearing_helper(self):
"""Test the helper function for getting page sizes to clear."""
# Single page size
result = cache_config.get_page_sizes_for_clearing(20)
assert result == [20]
# Multiple page sizes
result = cache_config.get_page_sizes_for_clearing(20, 10)
assert result == [20, 10]
# With None alt_page_size
result = cache_config.get_page_sizes_for_clearing(20, None)
assert result == [20]
class TestCacheConfigUsage:
"""Test that cache_config constants are actually used in the code."""
def test_store_routes_import_cache_config(self):
"""Verify store routes imports cache_config."""
import backend.server.v2.store.routes as store_routes
# Check that cache_config is imported
assert hasattr(store_routes, "backend")
assert hasattr(store_routes.backend.server, "cache_config")
def test_store_cache_uses_constants(self):
"""Verify store cache module uses cache_config constants."""
import backend.server.v2.store.cache as store_cache
# Check the module imports cache_config
assert hasattr(store_cache, "backend")
assert hasattr(store_cache.backend.server, "cache_config")
# The _clear_submissions_cache function should use the constant
import inspect
source = inspect.getsource(store_cache._clear_submissions_cache)
assert (
"cache_config.V2_STORE_SUBMISSIONS_PAGE_SIZE" in source
), "_clear_submissions_cache must use cache_config.V2_STORE_SUBMISSIONS_PAGE_SIZE"
assert (
"cache_config.MAX_PAGES_TO_CLEAR" in source
), "_clear_submissions_cache must use cache_config.MAX_PAGES_TO_CLEAR"
def test_admin_routes_use_constants(self):
"""Verify admin routes use cache_config constants."""
import backend.server.v2.admin.store_admin_routes as admin_routes
# Check that cache_config is imported
assert hasattr(admin_routes, "backend")
assert hasattr(admin_routes.backend.server, "cache_config")
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,263 @@
#!/usr/bin/env python3
"""
Comprehensive test suite for cache invalidation consistency across the entire backend.
This test file identifies ALL locations where cache_delete is called with hardcoded
parameters (especially page_size) and ensures they match the corresponding route defaults.
CRITICAL: If any test in this file fails, it means cache invalidation will be broken
and users will see stale data after mutations.
Key problem areas identified:
1. v1.py routes: Uses page_size=250 for graphs, but cache clearing uses page_size=250 ✓
2. v1.py routes: Uses page_size=10 for library agents clearing
3. v2/library routes: Uses page_size=10 for library agents clearing
4. v2/store routes: Uses page_size=20 for submissions clearing (in _clear_submissions_cache)
5. v2/library presets: Uses page_size=10 AND page_size=20 for presets (dual clearing)
"""
import pytest
class TestCacheInvalidationConsistency:
"""Test that all cache_delete calls use correct parameters matching route defaults."""
def test_v1_graphs_cache_page_size_consistency(self):
"""
Test v1 graphs routes use consistent page_size.
Locations that must match:
- routes/v1.py line 682: default page_size=250
- routes/v1.py line 765: cache_delete with page_size=250
- routes/v1.py line 791: cache_delete with page_size=250
- routes/v1.py line 859: cache_delete with page_size=250
- routes/v1.py line 929: cache_delete with page_size=250
- routes/v1.py line 1034: default page_size=250
"""
V1_GRAPHS_DEFAULT_PAGE_SIZE = 250
# This is the expected value - if this test fails, check all the above locations
assert V1_GRAPHS_DEFAULT_PAGE_SIZE == 250, (
"If you changed the default page_size for v1 graphs, you must update:\n"
"1. routes/v1.py list_graphs() default parameter\n"
"2. routes/v1.py create_graph() cache_delete call\n"
"3. routes/v1.py delete_graph() cache_delete call\n"
"4. routes/v1.py update_graph_metadata() cache_delete call\n"
"5. routes/v1.py stop_graph_execution() cache_delete call\n"
"6. routes/v1.py list_graph_run_events() default parameter"
)
def test_v1_library_agents_cache_page_size_consistency(self):
"""
Test v1 library agents cache clearing uses consistent page_size.
Locations that must match:
- routes/v1.py line 768: cache_delete with page_size=10
- routes/v1.py line 940: cache_delete with page_size=10
- v2/library/routes/agents.py line 233: cache_delete with page_size=10
WARNING: These hardcode page_size=10 but we need to verify this matches
the actual page_size used when fetching library agents!
"""
V1_LIBRARY_AGENTS_CLEARING_PAGE_SIZE = 10
assert V1_LIBRARY_AGENTS_CLEARING_PAGE_SIZE == 10, (
"If you changed the library agents clearing page_size, you must update:\n"
"1. routes/v1.py create_graph() cache clearing loop\n"
"2. routes/v1.py stop_graph_execution() cache clearing loop\n"
"3. v2/library/routes/agents.py add_library_agent() cache clearing loop"
)
# TODO: This should be verified against the actual default used in library routes
def test_v1_graph_executions_cache_page_size_consistency(self):
"""
Test v1 graph executions cache clearing uses consistent page_size.
Locations:
- routes/v1.py line 937: cache_delete with page_size=25
- v2/library/routes/presets.py line 449: cache_delete with page_size=10
- v2/library/routes/presets.py line 452: cache_delete with page_size=25
"""
V1_GRAPH_EXECUTIONS_CLEARING_PAGE_SIZE = 25
# Note: presets.py clears BOTH page_size=10 AND page_size=25
# This suggests there may be multiple consumers with different page sizes
assert V1_GRAPH_EXECUTIONS_CLEARING_PAGE_SIZE == 25
def test_v2_store_submissions_cache_page_size_consistency(self):
"""
Test v2 store submissions use consistent page_size.
Locations that must match:
- v2/store/routes.py line 484: default page_size=20
- v2/store/cache.py line 18: _clear_submissions_cache uses page_size=20
This is already tested in test_cache_delete.py but documented here for completeness.
"""
V2_STORE_SUBMISSIONS_DEFAULT_PAGE_SIZE = 20
V2_STORE_SUBMISSIONS_CLEARING_PAGE_SIZE = 20
assert (
V2_STORE_SUBMISSIONS_DEFAULT_PAGE_SIZE
== V2_STORE_SUBMISSIONS_CLEARING_PAGE_SIZE
), (
"The default page_size for store submissions must match the hardcoded value in _clear_submissions_cache!\n"
"Update both:\n"
"1. v2/store/routes.py get_submissions() default parameter\n"
"2. v2/store/cache.py _clear_submissions_cache() hardcoded page_size"
)
def test_v2_library_presets_cache_page_size_consistency(self):
"""
Test v2 library presets cache clearing uses consistent page_size.
Locations:
- v2/library/routes/presets.py line 36: cache_delete with page_size=10
- v2/library/routes/presets.py line 39: cache_delete with page_size=20
This route clears BOTH page_size=10 and page_size=20, suggesting multiple consumers.
"""
V2_LIBRARY_PRESETS_CLEARING_PAGE_SIZES = [10, 20]
assert 10 in V2_LIBRARY_PRESETS_CLEARING_PAGE_SIZES
assert 20 in V2_LIBRARY_PRESETS_CLEARING_PAGE_SIZES
# TODO: Verify these match the actual page_size defaults used in preset routes
def test_cache_clearing_helper_functions_documented(self):
"""
Document all cache clearing helper functions and their hardcoded parameters.
Helper functions that wrap cache_delete with hardcoded params:
1. v2/store/cache.py::_clear_submissions_cache() - hardcodes page_size=20, num_pages=20
2. v2/library/routes/presets.py::_clear_presets_list_cache() - hardcodes page_size=10 AND 20, num_pages=20
These helpers are DANGEROUS because:
- They hide the hardcoded parameters
- They loop through multiple pages with hardcoded page_size
- If the route default changes, these won't clear the right cache entries
"""
HELPER_FUNCTIONS = {
"_clear_submissions_cache": {
"file": "v2/store/cache.py",
"page_size": 20,
"num_pages": 20,
"risk": "HIGH - single page_size, could miss entries if default changes",
},
"_clear_presets_list_cache": {
"file": "v2/library/routes/presets.py",
"page_size": [10, 20],
"num_pages": 20,
"risk": "MEDIUM - clears multiple page_sizes, but could still miss new ones",
},
}
assert (
len(HELPER_FUNCTIONS) == 2
), "If you add new cache clearing helper functions, document them here!"
def test_cache_delete_without_page_loops_are_risky(self):
"""
Document cache_delete calls that clear only page=1 (risky if there are multiple pages).
Single page cache_delete calls:
- routes/v1.py line 765: Only clears page=1 with page_size=250
- routes/v1.py line 791: Only clears page=1 with page_size=250
- routes/v1.py line 859: Only clears page=1 with page_size=250
These are RISKY because:
- If a user has more than one page of graphs, pages 2+ won't be invalidated
- User could see stale data on pagination
RECOMMENDATION: Use cache_clear() or loop through multiple pages like
_clear_submissions_cache does.
"""
SINGLE_PAGE_CLEARS = [
"routes/v1.py line 765: create_graph clears only page=1",
"routes/v1.py line 791: delete_graph clears only page=1",
"routes/v1.py line 859: update_graph_metadata clears only page=1",
]
# This test documents the issue but doesn't fail
# Consider this a TODO to fix these cache clearing strategies
assert (
len(SINGLE_PAGE_CLEARS) >= 3
), "These cache_delete calls should probably loop through multiple pages"
def test_all_cached_functions_have_proper_invalidation(self):
"""
Verify all @cached functions have corresponding cache_delete calls.
Functions with proper invalidation:
✓ get_cached_user_profile - cleared on profile update
✓ get_cached_store_agents - cleared on admin review (cache_clear)
✓ get_cached_submissions - cleared via _clear_submissions_cache helper
✓ get_cached_graphs - cleared on graph mutations
✓ get_cached_library_agents - cleared on library changes
Functions that might not have proper invalidation:
? get_cached_agent_details - not explicitly cleared
? get_cached_store_creators - not explicitly cleared
? get_cached_my_agents - not explicitly cleared (no helper function exists!)
This is a documentation test - actual verification requires code analysis.
"""
NEEDS_VERIFICATION = [
"get_cached_agent_details",
"get_cached_store_creators",
"get_cached_my_agents", # NO CLEARING FUNCTION EXISTS!
]
assert "get_cached_my_agents" in NEEDS_VERIFICATION, (
"get_cached_my_agents has no cache clearing logic - this is a BUG!\n"
"When a user creates/deletes an agent, their 'my agents' list won't update."
)
class TestCacheKeyParameterOrdering:
"""
Test that cache_delete calls use the same parameter order as the @cached function.
The @cached decorator uses function signature order to create cache keys.
cache_delete must use the exact same order or it won't find the cached entry!
"""
def test_cached_function_parameter_order_matters(self):
"""
Document that parameter order in cache_delete must match @cached function signature.
Example from v2/store/cache.py:
@cached(...)
async def _get_cached_submissions(user_id: str, page: int, page_size: int):
...
CORRECT: _get_cached_submissions.cache_delete(user_id, page=1, page_size=20)
WRONG: _get_cached_submissions.cache_delete(page=1, user_id=user_id, page_size=20)
The cached decorator generates keys based on the POSITIONAL order, so parameter
order must match between the function definition and cache_delete call.
"""
# This is a documentation test - no assertion needed
# Real verification requires inspecting each cache_delete call
pass
def test_named_parameters_vs_positional_in_cache_delete(self):
"""
Document best practice: use named parameters in cache_delete for safety.
Good practice seen in codebase:
- cache.get_cached_graphs.cache_delete(user_id=user_id, page=1, page_size=250)
- library_cache.get_cached_library_agents.cache_delete(user_id=user_id, page=page, page_size=10)
This is safer than positional arguments because:
1. More readable
2. Less likely to get order wrong
3. Self-documenting what each parameter means
"""
pass
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -457,7 +457,8 @@ async def test_api_key_with_unicode_characters_normalization_attack(mock_request
"""Test that Unicode normalization doesn't bypass validation."""
# Create auth with composed Unicode character
auth = APIKeyAuthenticator(
header_name="X-API-Key", expected_token="café" # é is composed
header_name="X-API-Key",
expected_token="café", # é is composed
)
# Try with decomposed version (c + a + f + e + ´)
@@ -522,8 +523,8 @@ async def test_api_keys_with_newline_variations(mock_request):
"valid\r\ntoken", # Windows newline
"valid\rtoken", # Mac newline
"valid\x85token", # NEL (Next Line)
"valid\x0Btoken", # Vertical Tab
"valid\x0Ctoken", # Form Feed
"valid\x0btoken", # Vertical Tab
"valid\x0ctoken", # Form Feed
]
for api_key in newline_variations:

View File

@@ -23,7 +23,6 @@ logger = logging.getLogger(__name__)
class AutoModManager:
def __init__(self):
self.config = self._load_config()

View File

@@ -7,6 +7,8 @@ import fastapi
import fastapi.responses
import prisma.enums
import backend.server.cache_config
import backend.server.v2.store.cache
import backend.server.v2.store.db
import backend.server.v2.store.model
import backend.util.json
@@ -29,7 +31,7 @@ async def get_admin_listings_with_versions(
status: typing.Optional[prisma.enums.SubmissionStatus] = None,
search: typing.Optional[str] = None,
page: int = 1,
page_size: int = 20,
page_size: int = backend.server.cache_config.V2_STORE_SUBMISSIONS_PAGE_SIZE,
):
"""
Get store listings with their version history for admins.
@@ -93,6 +95,8 @@ async def review_submission(
internal_comments=request.internal_comments or "",
reviewer_id=user_id,
)
backend.server.v2.store.cache._clear_submissions_cache(submission.user_id)
backend.server.v2.store.cache._get_cached_store_agents.cache_clear()
return submission
except Exception as e:
logger.exception("Error reviewing submission: %s", e)

View File

@@ -2,7 +2,6 @@ import logging
from datetime import datetime, timedelta, timezone
import prisma
from autogpt_libs.utils.cache import cached
import backend.data.block
from backend.blocks import load_all_blocks
@@ -18,6 +17,7 @@ from backend.server.v2.builder.model import (
ProviderResponse,
SearchBlocksResponse,
)
from backend.util.cache import cached
from backend.util.models import Pagination
logger = logging.getLogger(__name__)
@@ -296,7 +296,7 @@ def _matches_llm_model(schema_cls: type[BlockSchema], query: str) -> bool:
return False
@cached()
@cached(ttl_seconds=3600)
def _get_all_providers() -> dict[ProviderName, Provider]:
providers: dict[ProviderName, Provider] = {}

View File

@@ -0,0 +1,111 @@
"""
Cache functions for Library API endpoints.
This module contains all caching decorators and helpers for the Library API,
separated from the main routes for better organization and maintainability.
"""
import backend.server.v2.library.db
from backend.util.cache import cached
# ===== Library Agent Caches =====
# Cache library agents list for 10 minutes
@cached(maxsize=1000, ttl_seconds=600, shared_cache=True)
async def get_cached_library_agents(
user_id: str,
page: int = 1,
page_size: int = 20,
):
"""Cached helper to get library agents list."""
return await backend.server.v2.library.db.list_library_agents(
user_id=user_id,
page=page,
page_size=page_size,
)
# Cache user's favorite agents for 5 minutes - favorites change more frequently
@cached(maxsize=500, ttl_seconds=300, shared_cache=True)
async def get_cached_library_agent_favorites(
user_id: str,
page: int = 1,
page_size: int = 20,
):
"""Cached helper to get user's favorite library agents."""
return await backend.server.v2.library.db.list_favorite_library_agents(
user_id=user_id,
page=page,
page_size=page_size,
)
# Cache individual library agent details for 30 minutes
@cached(maxsize=1000, ttl_seconds=1800, shared_cache=True)
async def get_cached_library_agent(
library_agent_id: str,
user_id: str,
):
"""Cached helper to get library agent details."""
return await backend.server.v2.library.db.get_library_agent(
id=library_agent_id,
user_id=user_id,
)
# Cache library agent by graph ID for 30 minutes
@cached(maxsize=1000, ttl_seconds=1800, shared_cache=True)
async def get_cached_library_agent_by_graph_id(
graph_id: str,
user_id: str,
):
"""Cached helper to get library agent by graph ID."""
return await backend.server.v2.library.db.get_library_agent_by_graph_id(
graph_id=graph_id,
user_id=user_id,
)
# Cache library agent by store version ID for 1 hour - marketplace agents are more stable
@cached(maxsize=500, ttl_seconds=3600, shared_cache=True)
async def get_cached_library_agent_by_store_version(
store_listing_version_id: str,
user_id: str,
):
"""Cached helper to get library agent by store version ID."""
return await backend.server.v2.library.db.get_library_agent_by_store_version_id(
store_listing_version_id=store_listing_version_id,
user_id=user_id,
)
# ===== Library Preset Caches =====
# Cache library presets list for 30 minutes
@cached(maxsize=500, ttl_seconds=1800, shared_cache=True)
async def get_cached_library_presets(
user_id: str,
page: int = 1,
page_size: int = 20,
):
"""Cached helper to get library presets list."""
return await backend.server.v2.library.db.list_presets(
user_id=user_id,
page=page,
page_size=page_size,
)
# Cache individual preset details for 30 minutes
@cached(maxsize=1000, ttl_seconds=1800, shared_cache=True)
async def get_cached_library_preset(
preset_id: str,
user_id: str,
):
"""Cached helper to get library preset details."""
return await backend.server.v2.library.db.get_preset(
preset_id=preset_id,
user_id=user_id,
)

View File

@@ -0,0 +1,286 @@
"""
Tests for cache invalidation in Library API routes.
This module tests that library caches are properly invalidated when data is modified.
"""
import uuid
from unittest.mock import AsyncMock, patch
import pytest
import backend.server.v2.library.cache as library_cache
import backend.server.v2.library.db as library_db
@pytest.fixture
def mock_user_id():
"""Generate a mock user ID for testing."""
return str(uuid.uuid4())
@pytest.fixture
def mock_library_agent_id():
"""Generate a mock library agent ID for testing."""
return str(uuid.uuid4())
class TestLibraryAgentCacheInvalidation:
"""Test cache invalidation for library agent operations."""
@pytest.mark.asyncio
async def test_add_agent_clears_list_cache(self, mock_user_id):
"""Test that adding an agent clears the library agents list cache."""
# Clear cache
library_cache.get_cached_library_agents.cache_clear()
with patch.object(
library_db, "list_library_agents", new_callable=AsyncMock
) as mock_list:
mock_response = {"agents": [], "total_count": 0, "page": 1, "page_size": 20}
mock_list.return_value = mock_response
# First call hits database
await library_cache.get_cached_library_agents(mock_user_id, 1, 20)
assert mock_list.call_count == 1
# Second call uses cache
await library_cache.get_cached_library_agents(mock_user_id, 1, 20)
assert mock_list.call_count == 1 # Still 1, cache used
# Simulate adding an agent (cache invalidation)
for page in range(1, 5):
library_cache.get_cached_library_agents.cache_delete(
mock_user_id, page, 15
)
library_cache.get_cached_library_agents.cache_delete(
mock_user_id, page, 20
)
# Next call should hit database
await library_cache.get_cached_library_agents(mock_user_id, 1, 20)
assert mock_list.call_count == 2
@pytest.mark.asyncio
async def test_delete_agent_clears_multiple_caches(
self, mock_user_id, mock_library_agent_id
):
"""Test that deleting an agent clears both specific and list caches."""
# Clear caches
library_cache.get_cached_library_agent.cache_clear()
library_cache.get_cached_library_agents.cache_clear()
with (
patch.object(
library_db, "get_library_agent", new_callable=AsyncMock
) as mock_get,
patch.object(
library_db, "list_library_agents", new_callable=AsyncMock
) as mock_list,
):
mock_agent = {"id": mock_library_agent_id, "name": "Test Agent"}
mock_get.return_value = mock_agent
mock_list.return_value = {
"agents": [mock_agent],
"total_count": 1,
"page": 1,
"page_size": 20,
}
# Populate caches
await library_cache.get_cached_library_agent(
mock_library_agent_id, mock_user_id
)
await library_cache.get_cached_library_agents(mock_user_id, 1, 20)
initial_calls = {
"get": mock_get.call_count,
"list": mock_list.call_count,
}
# Verify cache is used
await library_cache.get_cached_library_agent(
mock_library_agent_id, mock_user_id
)
await library_cache.get_cached_library_agents(mock_user_id, 1, 20)
assert mock_get.call_count == initial_calls["get"]
assert mock_list.call_count == initial_calls["list"]
# Simulate delete_library_agent cache invalidation
library_cache.get_cached_library_agent.cache_delete(
mock_library_agent_id, mock_user_id
)
for page in range(1, 5):
library_cache.get_cached_library_agents.cache_delete(
mock_user_id, page, 15
)
library_cache.get_cached_library_agents.cache_delete(
mock_user_id, page, 20
)
# Next calls should hit database
await library_cache.get_cached_library_agent(
mock_library_agent_id, mock_user_id
)
await library_cache.get_cached_library_agents(mock_user_id, 1, 20)
assert mock_get.call_count == initial_calls["get"] + 1
assert mock_list.call_count == initial_calls["list"] + 1
@pytest.mark.asyncio
async def test_favorites_cache_operations(self, mock_user_id):
"""Test that favorites cache works independently."""
# Clear cache
library_cache.get_cached_library_agent_favorites.cache_clear()
with patch.object(
library_db, "list_favorite_library_agents", new_callable=AsyncMock
) as mock_favs:
mock_response = {"agents": [], "total_count": 0, "page": 1, "page_size": 20}
mock_favs.return_value = mock_response
# First call hits database
await library_cache.get_cached_library_agent_favorites(mock_user_id, 1, 20)
assert mock_favs.call_count == 1
# Second call uses cache
await library_cache.get_cached_library_agent_favorites(mock_user_id, 1, 20)
assert mock_favs.call_count == 1 # Cache used
# Clear cache
library_cache.get_cached_library_agent_favorites.cache_delete(
mock_user_id, 1, 20
)
# Next call hits database
await library_cache.get_cached_library_agent_favorites(mock_user_id, 1, 20)
assert mock_favs.call_count == 2
class TestLibraryPresetCacheInvalidation:
"""Test cache invalidation for library preset operations."""
@pytest.mark.asyncio
async def test_preset_cache_operations(self, mock_user_id):
"""Test preset cache and invalidation."""
# Clear cache
library_cache.get_cached_library_presets.cache_clear()
library_cache.get_cached_library_preset.cache_clear()
preset_id = str(uuid.uuid4())
with (
patch.object(
library_db, "list_presets", new_callable=AsyncMock
) as mock_list,
patch.object(library_db, "get_preset", new_callable=AsyncMock) as mock_get,
):
mock_preset = {"id": preset_id, "name": "Test Preset"}
mock_list.return_value = {
"presets": [mock_preset],
"total_count": 1,
"page": 1,
"page_size": 20,
}
mock_get.return_value = mock_preset
# Populate caches
await library_cache.get_cached_library_presets(mock_user_id, 1, 20)
await library_cache.get_cached_library_preset(preset_id, mock_user_id)
initial_calls = {
"list": mock_list.call_count,
"get": mock_get.call_count,
}
# Verify cache is used
await library_cache.get_cached_library_presets(mock_user_id, 1, 20)
await library_cache.get_cached_library_preset(preset_id, mock_user_id)
assert mock_list.call_count == initial_calls["list"]
assert mock_get.call_count == initial_calls["get"]
# Clear specific preset cache
library_cache.get_cached_library_preset.cache_delete(
preset_id, mock_user_id
)
# Clear list cache
library_cache.get_cached_library_presets.cache_delete(mock_user_id, 1, 20)
# Next calls should hit database
await library_cache.get_cached_library_presets(mock_user_id, 1, 20)
await library_cache.get_cached_library_preset(preset_id, mock_user_id)
assert mock_list.call_count == initial_calls["list"] + 1
assert mock_get.call_count == initial_calls["get"] + 1
class TestLibraryCacheMetrics:
"""Test library cache metrics and management."""
def test_cache_info_structure(self):
"""Test that cache_info returns expected structure."""
info = library_cache.get_cached_library_agents.cache_info()
assert "size" in info
assert "maxsize" in info
assert "ttl_seconds" in info
assert (
info["maxsize"] is None
) # Redis manages its own size with shared_cache=True
assert info["ttl_seconds"] == 600 # 10 minutes
def test_all_library_caches_can_be_cleared(self):
"""Test that all library caches can be cleared."""
# Clear all library caches
library_cache.get_cached_library_agents.cache_clear()
library_cache.get_cached_library_agent_favorites.cache_clear()
library_cache.get_cached_library_agent.cache_clear()
library_cache.get_cached_library_agent_by_graph_id.cache_clear()
library_cache.get_cached_library_agent_by_store_version.cache_clear()
library_cache.get_cached_library_presets.cache_clear()
library_cache.get_cached_library_preset.cache_clear()
# Verify all are empty
assert library_cache.get_cached_library_agents.cache_info()["size"] == 0
assert (
library_cache.get_cached_library_agent_favorites.cache_info()["size"] == 0
)
assert library_cache.get_cached_library_agent.cache_info()["size"] == 0
assert (
library_cache.get_cached_library_agent_by_graph_id.cache_info()["size"] == 0
)
assert (
library_cache.get_cached_library_agent_by_store_version.cache_info()["size"]
== 0
)
assert library_cache.get_cached_library_presets.cache_info()["size"] == 0
assert library_cache.get_cached_library_preset.cache_info()["size"] == 0
def test_cache_ttl_values(self):
"""Test that cache TTL values are set correctly."""
# Library agents - 10 minutes
assert (
library_cache.get_cached_library_agents.cache_info()["ttl_seconds"] == 600
)
# Favorites - 5 minutes (more dynamic)
assert (
library_cache.get_cached_library_agent_favorites.cache_info()["ttl_seconds"]
== 300
)
# Individual agent - 30 minutes
assert (
library_cache.get_cached_library_agent.cache_info()["ttl_seconds"] == 1800
)
# Presets - 30 minutes
assert (
library_cache.get_cached_library_presets.cache_info()["ttl_seconds"] == 1800
)
assert (
library_cache.get_cached_library_preset.cache_info()["ttl_seconds"] == 1800
)

View File

@@ -5,6 +5,8 @@ import autogpt_libs.auth as autogpt_auth_lib
from fastapi import APIRouter, Body, HTTPException, Query, Security, status
from fastapi.responses import Response
import backend.server.cache_config
import backend.server.v2.library.cache as library_cache
import backend.server.v2.library.db as library_db
import backend.server.v2.library.model as library_model
import backend.server.v2.store.exceptions as store_exceptions
@@ -64,13 +66,22 @@ async def list_library_agents(
HTTPException: If a server/database error occurs.
"""
try:
return await library_db.list_library_agents(
user_id=user_id,
search_term=search_term,
sort_by=sort_by,
page=page,
page_size=page_size,
)
# Use cache for default queries (no search term, default sort)
if search_term is None and sort_by == library_model.LibraryAgentSort.UPDATED_AT:
return await library_cache.get_cached_library_agents(
user_id=user_id,
page=page,
page_size=page_size,
)
else:
# Direct DB query for searches and custom sorts
return await library_db.list_library_agents(
user_id=user_id,
search_term=search_term,
sort_by=sort_by,
page=page,
page_size=page_size,
)
except Exception as e:
logger.error(f"Could not list library agents for user #{user_id}: {e}")
raise HTTPException(
@@ -114,7 +125,7 @@ async def list_favorite_library_agents(
HTTPException: If a server/database error occurs.
"""
try:
return await library_db.list_favorite_library_agents(
return await library_cache.get_cached_library_agent_favorites(
user_id=user_id,
page=page,
page_size=page_size,
@@ -132,7 +143,9 @@ async def get_library_agent(
library_agent_id: str,
user_id: str = Security(autogpt_auth_lib.get_user_id),
) -> library_model.LibraryAgent:
return await library_db.get_library_agent(id=library_agent_id, user_id=user_id)
return await library_cache.get_cached_library_agent(
library_agent_id=library_agent_id, user_id=user_id
)
@router.get("/by-graph/{graph_id}")
@@ -210,11 +223,21 @@ async def add_marketplace_agent_to_library(
HTTPException(500): If a server/database error occurs.
"""
try:
return await library_db.add_store_agent_to_library(
result = await library_db.add_store_agent_to_library(
store_listing_version_id=store_listing_version_id,
user_id=user_id,
)
# Clear library caches after adding new agent
for page in range(1, backend.server.cache_config.MAX_PAGES_TO_CLEAR):
library_cache.get_cached_library_agents.cache_delete(
user_id=user_id,
page=page,
page_size=backend.server.cache_config.V2_LIBRARY_AGENTS_PAGE_SIZE,
)
return result
except store_exceptions.AgentNotFoundError as e:
logger.warning(
f"Could not find store listing version {store_listing_version_id} "
@@ -263,13 +286,22 @@ async def update_library_agent(
HTTPException(500): If a server/database error occurs.
"""
try:
return await library_db.update_library_agent(
result = await library_db.update_library_agent(
library_agent_id=library_agent_id,
user_id=user_id,
auto_update_version=payload.auto_update_version,
is_favorite=payload.is_favorite,
is_archived=payload.is_archived,
)
for page in range(1, backend.server.cache_config.MAX_PAGES_TO_CLEAR):
library_cache.get_cached_library_agent_favorites.cache_delete(
user_id=user_id,
page=page,
page_size=backend.server.cache_config.V2_LIBRARY_AGENTS_PAGE_SIZE,
)
return result
except NotFoundError as e:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
@@ -320,6 +352,18 @@ async def delete_library_agent(
await library_db.delete_library_agent(
library_agent_id=library_agent_id, user_id=user_id
)
# Clear caches after deleting agent
library_cache.get_cached_library_agent.cache_delete(
library_agent_id=library_agent_id, user_id=user_id
)
for page in range(1, backend.server.cache_config.MAX_PAGES_TO_CLEAR):
library_cache.get_cached_library_agents.cache_delete(
user_id=user_id,
page=page,
page_size=backend.server.cache_config.V2_LIBRARY_AGENTS_PAGE_SIZE,
)
return Response(status_code=status.HTTP_204_NO_CONTENT)
except NotFoundError as e:
raise HTTPException(

View File

@@ -4,6 +4,9 @@ from typing import Any, Optional
import autogpt_libs.auth as autogpt_auth_lib
from fastapi import APIRouter, Body, HTTPException, Query, Security, status
import backend.server.cache_config
import backend.server.routers.cache as cache
import backend.server.v2.library.cache as library_cache
import backend.server.v2.library.db as db
import backend.server.v2.library.model as models
from backend.data.execution import GraphExecutionMeta
@@ -25,6 +28,24 @@ router = APIRouter(
)
def _clear_presets_list_cache(
user_id: str, num_pages: int = backend.server.cache_config.MAX_PAGES_TO_CLEAR
):
"""
Clear the presets list cache for the given user.
Clears both primary and alternative page sizes for backward compatibility.
"""
page_sizes = backend.server.cache_config.get_page_sizes_for_clearing(
backend.server.cache_config.V2_LIBRARY_PRESETS_PAGE_SIZE,
backend.server.cache_config.V2_LIBRARY_PRESETS_ALT_PAGE_SIZE,
)
for page in range(1, num_pages + 1):
for page_size in page_sizes:
library_cache.get_cached_library_presets.cache_delete(
user_id=user_id, page=page, page_size=page_size
)
@router.get(
"/presets",
summary="List presets",
@@ -51,12 +72,21 @@ async def list_presets(
models.LibraryAgentPresetResponse: A response containing the list of presets.
"""
try:
return await db.list_presets(
user_id=user_id,
graph_id=graph_id,
page=page,
page_size=page_size,
)
# Use cache only for default queries (no filter)
if graph_id is None:
return await library_cache.get_cached_library_presets(
user_id=user_id,
page=page,
page_size=page_size,
)
else:
# Direct DB query for filtered requests
return await db.list_presets(
user_id=user_id,
graph_id=graph_id,
page=page,
page_size=page_size,
)
except Exception as e:
logger.exception("Failed to list presets for user %s: %s", user_id, e)
raise HTTPException(
@@ -87,7 +117,7 @@ async def get_preset(
HTTPException: If the preset is not found or an error occurs.
"""
try:
preset = await db.get_preset(user_id, preset_id)
preset = await library_cache.get_cached_library_preset(preset_id, user_id)
except Exception as e:
logger.exception(
"Error retrieving preset %s for user %s: %s", preset_id, user_id, e
@@ -131,9 +161,13 @@ async def create_preset(
"""
try:
if isinstance(preset, models.LibraryAgentPresetCreatable):
return await db.create_preset(user_id, preset)
result = await db.create_preset(user_id, preset)
else:
return await db.create_preset_from_graph_execution(user_id, preset)
result = await db.create_preset_from_graph_execution(user_id, preset)
_clear_presets_list_cache(user_id)
return result
except NotFoundError as e:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
except Exception as e:
@@ -200,6 +234,9 @@ async def setup_trigger(
is_active=True,
),
)
_clear_presets_list_cache(user_id)
return new_preset
@@ -278,6 +315,13 @@ async def update_preset(
description=preset.description,
is_active=preset.is_active,
)
# Clear caches after updating preset
library_cache.get_cached_library_preset.cache_delete(
preset_id=preset_id, user_id=user_id
)
_clear_presets_list_cache(user_id)
except Exception as e:
logger.exception("Preset update failed for user %s: %s", user_id, e)
raise HTTPException(
@@ -351,6 +395,12 @@ async def delete_preset(
try:
await db.delete_preset(user_id, preset_id)
# Clear caches after deleting preset
library_cache.get_cached_library_preset.cache_delete(
preset_id=preset_id, user_id=user_id
)
_clear_presets_list_cache(user_id)
except Exception as e:
logger.exception(
"Error deleting preset %s for user %s: %s", preset_id, user_id, e
@@ -401,6 +451,33 @@ async def execute_preset(
merged_node_input = preset.inputs | inputs
merged_credential_inputs = preset.credentials | credential_inputs
# Clear graph executions cache - use both page sizes for compatibility
for page in range(1, 10):
# Clear with alternative page size
cache.get_cached_graph_executions.cache_delete(
graph_id=preset.graph_id,
user_id=user_id,
page=page,
page_size=backend.server.cache_config.V2_GRAPH_EXECUTIONS_ALT_PAGE_SIZE,
)
cache.get_cached_graph_executions.cache_delete(
user_id=user_id,
page=page,
page_size=backend.server.cache_config.V2_GRAPH_EXECUTIONS_ALT_PAGE_SIZE,
)
# Clear with v1 page size (25)
cache.get_cached_graph_executions.cache_delete(
graph_id=preset.graph_id,
user_id=user_id,
page=page,
page_size=backend.server.cache_config.V1_GRAPH_EXECUTIONS_PAGE_SIZE,
)
cache.get_cached_graph_executions.cache_delete(
user_id=user_id,
page=page,
page_size=backend.server.cache_config.V1_GRAPH_EXECUTIONS_PAGE_SIZE,
)
return await add_graph_execution(
user_id=user_id,
graph_id=preset.graph_id,

View File

@@ -179,14 +179,15 @@ async def test_get_favorite_library_agents_success(
def test_get_favorite_library_agents_error(
mocker: pytest_mock.MockFixture, test_user_id: str
):
mock_db_call = mocker.patch(
"backend.server.v2.library.db.list_favorite_library_agents"
# Mock the cache function instead of the DB directly since routes now use cache
mock_cache_call = mocker.patch(
"backend.server.v2.library.routes.agents.library_cache.get_cached_library_agent_favorites"
)
mock_db_call.side_effect = Exception("Test error")
mock_cache_call.side_effect = Exception("Test error")
response = client.get("/agents/favorites")
assert response.status_code == 500
mock_db_call.assert_called_once_with(
mock_cache_call.assert_called_once_with(
user_id=test_user_id,
page=1,
page_size=15,

View File

@@ -0,0 +1,152 @@
"""
Cache functions for Store API endpoints.
This module contains all caching decorators and helpers for the Store API,
separated from the main routes for better organization and maintainability.
"""
import backend.server.cache_config
import backend.server.v2.store.db
from backend.util.cache import cached
def _clear_submissions_cache(
user_id: str, num_pages: int = backend.server.cache_config.MAX_PAGES_TO_CLEAR
):
"""
Clear the submissions cache for the given user.
Args:
user_id: User ID whose cache should be cleared
num_pages: Number of pages to clear (default from cache_config)
"""
for page in range(1, num_pages + 1):
_get_cached_submissions.cache_delete(
user_id=user_id,
page=page,
page_size=backend.server.cache_config.V2_STORE_SUBMISSIONS_PAGE_SIZE,
)
def _clear_my_agents_cache(
user_id: str, num_pages: int = backend.server.cache_config.MAX_PAGES_TO_CLEAR
):
"""
Clear the my agents cache for the given user.
Args:
user_id: User ID whose cache should be cleared
num_pages: Number of pages to clear (default from cache_config)
"""
for page in range(1, num_pages + 1):
_get_cached_my_agents.cache_delete(
user_id=user_id,
page=page,
page_size=backend.server.cache_config.V2_MY_AGENTS_PAGE_SIZE,
)
# Cache user profiles for 1 hour per user
@cached(maxsize=1000, ttl_seconds=3600, shared_cache=True)
async def _get_cached_user_profile(user_id: str):
"""Cached helper to get user profile."""
return await backend.server.v2.store.db.get_user_profile(user_id)
# Cache store agents list for 15 minutes
# Different cache entries for different query combinations
@cached(maxsize=5000, ttl_seconds=900, shared_cache=True)
async def _get_cached_store_agents(
featured: bool,
creator: str | None,
sorted_by: str | None,
search_query: str | None,
category: str | None,
page: int,
page_size: int,
):
"""Cached helper to get store agents."""
return await backend.server.v2.store.db.get_store_agents(
featured=featured,
creators=[creator] if creator else None,
sorted_by=sorted_by,
search_query=search_query,
category=category,
page=page,
page_size=page_size,
)
# Cache individual agent details for 15 minutes
@cached(maxsize=200, ttl_seconds=900, shared_cache=True)
async def _get_cached_agent_details(username: str, agent_name: str):
"""Cached helper to get agent details."""
return await backend.server.v2.store.db.get_store_agent_details(
username=username, agent_name=agent_name
)
# Cache agent graphs for 1 hour
@cached(maxsize=200, ttl_seconds=3600, shared_cache=True)
async def _get_cached_agent_graph(store_listing_version_id: str):
"""Cached helper to get agent graph."""
return await backend.server.v2.store.db.get_available_graph(
store_listing_version_id
)
# Cache agent by version for 1 hour
@cached(maxsize=200, ttl_seconds=3600, shared_cache=True)
async def _get_cached_store_agent_by_version(store_listing_version_id: str):
"""Cached helper to get store agent by version ID."""
return await backend.server.v2.store.db.get_store_agent_by_version_id(
store_listing_version_id
)
# Cache creators list for 1 hour
@cached(maxsize=200, ttl_seconds=3600, shared_cache=True)
async def _get_cached_store_creators(
featured: bool,
search_query: str | None,
sorted_by: str | None,
page: int,
page_size: int,
):
"""Cached helper to get store creators."""
return await backend.server.v2.store.db.get_store_creators(
featured=featured,
search_query=search_query,
sorted_by=sorted_by,
page=page,
page_size=page_size,
)
# Cache individual creator details for 1 hour
@cached(maxsize=100, ttl_seconds=3600, shared_cache=True)
async def _get_cached_creator_details(username: str):
"""Cached helper to get creator details."""
return await backend.server.v2.store.db.get_store_creator_details(
username=username.lower()
)
# Cache user's own agents for 5 mins (shorter TTL as this changes more frequently)
@cached(maxsize=500, ttl_seconds=300, shared_cache=True)
async def _get_cached_my_agents(user_id: str, page: int, page_size: int):
"""Cached helper to get user's agents."""
return await backend.server.v2.store.db.get_my_agents(
user_id, page=page, page_size=page_size
)
# Cache user's submissions for 1 hour (shorter TTL as this changes frequently)
@cached(maxsize=500, ttl_seconds=3600, shared_cache=True)
async def _get_cached_submissions(user_id: str, page: int, page_size: int):
"""Cached helper to get user's submissions."""
return await backend.server.v2.store.db.get_store_submissions(
user_id=user_id,
page=page,
page_size=page_size,
)

View File

@@ -493,6 +493,7 @@ async def get_store_submissions(
submission_models = []
for sub in submissions:
submission_model = backend.server.v2.store.model.StoreSubmission(
user_id=sub.user_id,
agent_id=sub.agent_id,
agent_version=sub.agent_version,
name=sub.name,
@@ -710,6 +711,7 @@ async def create_store_submission(
logger.debug(f"Created store listing for agent {agent_id}")
# Return submission details
return backend.server.v2.store.model.StoreSubmission(
user_id=user_id,
agent_id=agent_id,
agent_version=agent_version,
name=name,
@@ -860,6 +862,7 @@ async def edit_store_submission(
"Failed to update store listing version"
)
return backend.server.v2.store.model.StoreSubmission(
user_id=user_id,
agent_id=current_version.agentGraphId,
agent_version=current_version.agentGraphVersion,
name=name,
@@ -993,6 +996,7 @@ async def create_store_version(
)
# Return submission details
return backend.server.v2.store.model.StoreSubmission(
user_id=user_id,
agent_id=agent_id,
agent_version=agent_version,
name=name,
@@ -1493,7 +1497,7 @@ async def review_store_submission(
include={"StoreListing": True},
)
if not submission:
if not submission or not submission.StoreListing:
raise backend.server.v2.store.exceptions.DatabaseError(
f"Failed to update store listing version {store_listing_version_id}"
)
@@ -1583,6 +1587,7 @@ async def review_store_submission(
# Convert to Pydantic model for consistency
return backend.server.v2.store.model.StoreSubmission(
user_id=submission.StoreListing.owningUserId,
agent_id=submission.agentGraphId,
agent_version=submission.agentGraphVersion,
name=submission.name,
@@ -1715,14 +1720,17 @@ async def get_admin_listings_with_versions(
# Get total count for pagination
total = await prisma.models.StoreListing.prisma().count(where=where)
total_pages = (total + page_size - 1) // page_size
# Convert to response models
listings_with_versions = []
for listing in listings:
versions: list[backend.server.v2.store.model.StoreSubmission] = []
if not listing.OwningUser:
logger.error(f"Listing {listing.id} has no owning user")
continue
# If we have versions, turn them into StoreSubmission models
for version in listing.Versions or []:
version_model = backend.server.v2.store.model.StoreSubmission(
user_id=listing.OwningUser.id,
agent_id=version.agentGraphId,
agent_version=version.agentGraphVersion,
name=version.name,

View File

@@ -98,6 +98,7 @@ class Profile(pydantic.BaseModel):
class StoreSubmission(pydantic.BaseModel):
user_id: str = pydantic.Field(default="", exclude=True)
agent_id: str
agent_version: int
name: str

View File

@@ -135,6 +135,7 @@ def test_creator_details():
def test_store_submission():
submission = backend.server.v2.store.model.StoreSubmission(
user_id="user123",
agent_id="agent123",
agent_version=1,
sub_heading="Test subheading",
@@ -156,6 +157,7 @@ def test_store_submissions_response():
response = backend.server.v2.store.model.StoreSubmissionsResponse(
submissions=[
backend.server.v2.store.model.StoreSubmission(
user_id="user123",
agent_id="agent123",
agent_version=1,
sub_heading="Test subheading",

View File

@@ -6,132 +6,33 @@ import urllib.parse
import autogpt_libs.auth
import fastapi
import fastapi.responses
from autogpt_libs.utils.cache import cached
import backend.data.graph
import backend.server.cache_config
import backend.server.v2.store.db
import backend.server.v2.store.exceptions
import backend.server.v2.store.image_gen
import backend.server.v2.store.media
import backend.server.v2.store.model
import backend.util.json
from backend.server.v2.store.cache import (
_clear_submissions_cache,
_get_cached_agent_details,
_get_cached_agent_graph,
_get_cached_creator_details,
_get_cached_my_agents,
_get_cached_store_agent_by_version,
_get_cached_store_agents,
_get_cached_store_creators,
_get_cached_submissions,
_get_cached_user_profile,
)
logger = logging.getLogger(__name__)
router = fastapi.APIRouter()
##############################################
############### Caches #######################
##############################################
# Cache user profiles for 1 hour per user
@cached(maxsize=1000, ttl_seconds=3600)
async def _get_cached_user_profile(user_id: str):
"""Cached helper to get user profile."""
return await backend.server.v2.store.db.get_user_profile(user_id)
# Cache store agents list for 15 minutes
# Different cache entries for different query combinations
@cached(maxsize=5000, ttl_seconds=900)
async def _get_cached_store_agents(
featured: bool,
creator: str | None,
sorted_by: str | None,
search_query: str | None,
category: str | None,
page: int,
page_size: int,
):
"""Cached helper to get store agents."""
return await backend.server.v2.store.db.get_store_agents(
featured=featured,
creators=[creator] if creator else None,
sorted_by=sorted_by,
search_query=search_query,
category=category,
page=page,
page_size=page_size,
)
# Cache individual agent details for 15 minutes
@cached(maxsize=200, ttl_seconds=900)
async def _get_cached_agent_details(username: str, agent_name: str):
"""Cached helper to get agent details."""
return await backend.server.v2.store.db.get_store_agent_details(
username=username, agent_name=agent_name
)
# Cache agent graphs for 1 hour
@cached(maxsize=200, ttl_seconds=3600)
async def _get_cached_agent_graph(store_listing_version_id: str):
"""Cached helper to get agent graph."""
return await backend.server.v2.store.db.get_available_graph(
store_listing_version_id
)
# Cache agent by version for 1 hour
@cached(maxsize=200, ttl_seconds=3600)
async def _get_cached_store_agent_by_version(store_listing_version_id: str):
"""Cached helper to get store agent by version ID."""
return await backend.server.v2.store.db.get_store_agent_by_version_id(
store_listing_version_id
)
# Cache creators list for 1 hour
@cached(maxsize=200, ttl_seconds=3600)
async def _get_cached_store_creators(
featured: bool,
search_query: str | None,
sorted_by: str | None,
page: int,
page_size: int,
):
"""Cached helper to get store creators."""
return await backend.server.v2.store.db.get_store_creators(
featured=featured,
search_query=search_query,
sorted_by=sorted_by,
page=page,
page_size=page_size,
)
# Cache individual creator details for 1 hour
@cached(maxsize=100, ttl_seconds=3600)
async def _get_cached_creator_details(username: str):
"""Cached helper to get creator details."""
return await backend.server.v2.store.db.get_store_creator_details(
username=username.lower()
)
# Cache user's own agents for 5 mins (shorter TTL as this changes more frequently)
@cached(maxsize=500, ttl_seconds=300)
async def _get_cached_my_agents(user_id: str, page: int, page_size: int):
"""Cached helper to get user's agents."""
return await backend.server.v2.store.db.get_my_agents(
user_id, page=page, page_size=page_size
)
# Cache user's submissions for 1 hour (shorter TTL as this changes frequently)
@cached(maxsize=500, ttl_seconds=3600)
async def _get_cached_submissions(user_id: str, page: int, page_size: int):
"""Cached helper to get user's submissions."""
return await backend.server.v2.store.db.get_store_submissions(
user_id=user_id,
page=page,
page_size=page_size,
)
##############################################
############### Profile Endpoints ############
##############################################
@@ -230,7 +131,7 @@ async def get_agents(
search_query: str | None = None,
category: str | None = None,
page: int = 1,
page_size: int = 20,
page_size: int = backend.server.cache_config.V2_STORE_AGENTS_PAGE_SIZE,
):
"""
Get a paginated list of agents from the store with optional filtering and sorting.
@@ -428,7 +329,7 @@ async def get_creators(
search_query: str | None = None,
sorted_by: str | None = None,
page: int = 1,
page_size: int = 20,
page_size: int = backend.server.cache_config.V2_STORE_CREATORS_PAGE_SIZE,
):
"""
This is needed for:
@@ -514,7 +415,9 @@ async def get_creator(
async def get_my_agents(
user_id: str = fastapi.Security(autogpt_libs.auth.get_user_id),
page: typing.Annotated[int, fastapi.Query(ge=1)] = 1,
page_size: typing.Annotated[int, fastapi.Query(ge=1)] = 20,
page_size: typing.Annotated[
int, fastapi.Query(ge=1)
] = backend.server.cache_config.V2_MY_AGENTS_PAGE_SIZE,
):
"""
Get user's own agents.
@@ -560,10 +463,7 @@ async def delete_submission(
# Clear submissions cache for this specific user after deletion
if result:
# Clear user's own agents cache - we don't know all page/size combinations
for page in range(1, 20):
# Clear user's submissions cache for common defaults
_get_cached_submissions.cache_delete(user_id, page=page, page_size=20)
_clear_submissions_cache(user_id)
return result
except Exception:
@@ -584,7 +484,7 @@ async def delete_submission(
async def get_submissions(
user_id: str = fastapi.Security(autogpt_libs.auth.get_user_id),
page: int = 1,
page_size: int = 20,
page_size: int = backend.server.cache_config.V2_STORE_SUBMISSIONS_PAGE_SIZE,
):
"""
Get a paginated list of store submissions for the authenticated user.
@@ -666,10 +566,7 @@ async def create_submission(
recommended_schedule_cron=submission_request.recommended_schedule_cron,
)
# Clear user's own agents cache - we don't know all page/size combinations
for page in range(1, 20):
# Clear user's submissions cache for common defaults
_get_cached_submissions.cache_delete(user_id, page=page, page_size=20)
_clear_submissions_cache(user_id)
return result
except Exception:
@@ -720,10 +617,7 @@ async def edit_submission(
recommended_schedule_cron=submission_request.recommended_schedule_cron,
)
# Clear user's own agents cache - we don't know all page/size combinations
for page in range(1, 20):
# Clear user's submissions cache for common defaults
_get_cached_submissions.cache_delete(user_id, page=page, page_size=20)
_clear_submissions_cache(user_id)
return result

View File

@@ -534,6 +534,7 @@ def test_get_submissions_success(
mocked_value = backend.server.v2.store.model.StoreSubmissionsResponse(
submissions=[
backend.server.v2.store.model.StoreSubmission(
user_id="user123",
name="Test Agent",
description="Test agent description",
image_urls=["test.jpg"],

View File

@@ -345,6 +345,150 @@ class TestCacheDeletion:
)
assert deleted is False # Different parameters, not in cache
@pytest.mark.asyncio
async def test_clear_submissions_cache_page_size_consistency(self):
"""
Test that _clear_submissions_cache uses the correct page_size.
This test ensures that if the default page_size in routes changes,
the hardcoded value in _clear_submissions_cache must also change.
"""
from backend.server.v2.store.model import StoreSubmissionsResponse
mock_response = StoreSubmissionsResponse(
submissions=[],
pagination=Pagination(
total_items=0,
total_pages=1,
current_page=1,
page_size=20,
),
)
with patch(
"backend.server.v2.store.db.get_store_submissions",
new_callable=AsyncMock,
return_value=mock_response,
):
# Clear cache first
routes._get_cached_submissions.cache_clear()
# Populate cache with multiple pages using the default page_size
DEFAULT_PAGE_SIZE = 20 # This should match the default in routes.py
user_id = "test_user"
# Add entries for pages 1-5
for page in range(1, 6):
await routes._get_cached_submissions(
user_id=user_id, page=page, page_size=DEFAULT_PAGE_SIZE
)
# Verify cache has entries
cache_info_before = routes._get_cached_submissions.cache_info()
assert cache_info_before["size"] == 5
# Call _clear_submissions_cache
routes._clear_submissions_cache(user_id, num_pages=20)
# All entries should be cleared
cache_info_after = routes._get_cached_submissions.cache_info()
assert (
cache_info_after["size"] == 0
), "Cache should be empty after _clear_submissions_cache"
@pytest.mark.asyncio
async def test_clear_submissions_cache_detects_page_size_mismatch(self):
"""
Test that detects if _clear_submissions_cache is using wrong page_size.
If this test fails, it means the hardcoded page_size in _clear_submissions_cache
doesn't match the default page_size used in the routes.
"""
from backend.server.v2.store.model import StoreSubmissionsResponse
mock_response = StoreSubmissionsResponse(
submissions=[],
pagination=Pagination(
total_items=0,
total_pages=1,
current_page=1,
page_size=20,
),
)
with patch(
"backend.server.v2.store.db.get_store_submissions",
new_callable=AsyncMock,
return_value=mock_response,
):
# Clear cache first
routes._get_cached_submissions.cache_clear()
# WRONG_PAGE_SIZE simulates what happens if someone changes
# the default page_size in routes but forgets to update _clear_submissions_cache
WRONG_PAGE_SIZE = 25 # Different from the hardcoded value in cache.py
user_id = "test_user"
# Populate cache with the "wrong" page_size
for page in range(1, 6):
await routes._get_cached_submissions(
user_id=user_id, page=page, page_size=WRONG_PAGE_SIZE
)
# Verify cache has entries
cache_info_before = routes._get_cached_submissions.cache_info()
assert cache_info_before["size"] == 5
# Call _clear_submissions_cache (which uses page_size=20 hardcoded)
routes._clear_submissions_cache(user_id, num_pages=20)
# If page_size is mismatched, entries won't be cleared
cache_info_after = routes._get_cached_submissions.cache_info()
# This assertion will FAIL if _clear_submissions_cache uses wrong page_size
assert (
cache_info_after["size"] == 5
), "Cache entries with different page_size should NOT be cleared (this is expected)"
@pytest.mark.asyncio
async def test_my_agents_cache_needs_clearing_too(self):
"""
Test that demonstrates _get_cached_my_agents also needs cache clearing.
Currently there's no _clear_my_agents_cache function, but there should be.
"""
from backend.server.v2.store.model import MyAgentsResponse
mock_response = MyAgentsResponse(
agents=[],
pagination=Pagination(
total_items=0,
total_pages=1,
current_page=1,
page_size=20,
),
)
with patch(
"backend.server.v2.store.db.get_my_agents",
new_callable=AsyncMock,
return_value=mock_response,
):
routes._get_cached_my_agents.cache_clear()
DEFAULT_PAGE_SIZE = 20
user_id = "test_user"
# Populate cache
for page in range(1, 6):
await routes._get_cached_my_agents(
user_id=user_id, page=page, page_size=DEFAULT_PAGE_SIZE
)
cache_info = routes._get_cached_my_agents.cache_info()
assert cache_info["size"] == 5
# NOTE: Currently there's no _clear_my_agents_cache function
# If we implement one, it should clear all pages consistently
# For now we document this as a TODO
if __name__ == "__main__":
# Run the tests

View File

@@ -0,0 +1,461 @@
"""
Caching utilities for the AutoGPT platform.
Provides decorators for caching function results with support for:
- In-memory caching with TTL
- Shared Redis-backed caching across processes
- Thread-local caching for request-scoped data
- Thundering herd protection
- LRU eviction with optional TTL refresh
"""
import asyncio
import inspect
import logging
import threading
import time
from dataclasses import dataclass
from functools import wraps
from typing import Any, Callable, ParamSpec, Protocol, TypeVar, cast, runtime_checkable
from redis import ConnectionPool, Redis
from backend.util.retry import conn_retry
from backend.util.settings import Settings
P = ParamSpec("P")
R = TypeVar("R")
R_co = TypeVar("R_co", covariant=True)
logger = logging.getLogger(__name__)
settings = Settings()
# RECOMMENDED REDIS CONFIGURATION FOR PRODUCTION:
# Configure Redis with the following settings for optimal caching performance:
# maxmemory-policy allkeys-lru # Evict least recently used keys when memory limit reached
# maxmemory 2gb # Set memory limit (adjust based on your needs)
# save "" # Disable persistence if using Redis purely for caching
# Create a dedicated Redis connection pool for caching (binary mode for pickle)
_cache_pool: ConnectionPool | None = None
@conn_retry("Redis", "Acquiring cache connection pool")
def _get_cache_pool() -> ConnectionPool:
"""Get or create a connection pool for cache operations."""
global _cache_pool
if _cache_pool is None:
_cache_pool = ConnectionPool(
host=settings.config.redis_host,
port=settings.config.redis_port,
password=settings.config.redis_password or None,
decode_responses=False, # Binary mode for pickle
max_connections=50,
socket_keepalive=True,
socket_connect_timeout=5,
retry_on_timeout=True,
)
return _cache_pool
def _get_redis_client() -> Redis:
"""Get a Redis client from the connection pool."""
return Redis(connection_pool=_get_cache_pool())
@dataclass
class CachedValue:
"""Wrapper for cached values with timestamp to avoid tuple ambiguity."""
result: Any
timestamp: float
def _make_hashable_key(
args: tuple[Any, ...], kwargs: dict[str, Any]
) -> tuple[Any, ...]:
"""
Convert args and kwargs into a hashable cache key.
Handles unhashable types like dict, list, set by converting them to
their sorted string representations.
"""
def make_hashable(obj: Any) -> Any:
"""Recursively convert an object to a hashable representation."""
if isinstance(obj, dict):
# Sort dict items to ensure consistent ordering
return (
"__dict__",
tuple(sorted((k, make_hashable(v)) for k, v in obj.items())),
)
elif isinstance(obj, (list, tuple)):
return ("__list__", tuple(make_hashable(item) for item in obj))
elif isinstance(obj, set):
return ("__set__", tuple(sorted(make_hashable(item) for item in obj)))
elif hasattr(obj, "__dict__"):
# Handle objects with __dict__ attribute
return ("__obj__", obj.__class__.__name__, make_hashable(obj.__dict__))
else:
# For basic hashable types (str, int, bool, None, etc.)
try:
hash(obj)
return obj
except TypeError:
# Fallback: convert to string representation
return ("__str__", str(obj))
hashable_args = tuple(make_hashable(arg) for arg in args)
hashable_kwargs = tuple(sorted((k, make_hashable(v)) for k, v in kwargs.items()))
return (hashable_args, hashable_kwargs)
def _make_redis_key(key: tuple[Any, ...]) -> str:
"""Convert a hashable key tuple to a Redis key string."""
# Ensure key is already hashable
hashable_key = key if isinstance(key, tuple) else (key,)
return f"cache:{hash(hashable_key)}"
@runtime_checkable
class CachedFunction(Protocol[P, R_co]):
"""Protocol for cached functions with cache management methods."""
def cache_clear(self, pattern: str | None = None) -> None:
"""Clear cached entries. If pattern provided, clear matching entries only."""
return None
def cache_info(self) -> dict[str, int | None]:
"""Get cache statistics."""
return {}
def cache_delete(self, *args: P.args, **kwargs: P.kwargs) -> bool:
"""Delete a specific cache entry by its arguments. Returns True if entry existed."""
return False
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R_co:
"""Call the cached function."""
return None # type: ignore
def cached(
*,
maxsize: int = 128,
ttl_seconds: int,
shared_cache: bool = False,
refresh_ttl_on_get: bool = False,
) -> Callable[[Callable], CachedFunction]:
"""
Thundering herd safe cache decorator for both sync and async functions.
Uses double-checked locking to prevent multiple threads/coroutines from
executing the expensive operation simultaneously during cache misses.
Args:
maxsize: Maximum number of cached entries (only for in-memory cache)
ttl_seconds: Time to live in seconds. Required - entries must expire.
shared_cache: If True, use Redis for cross-process caching
refresh_ttl_on_get: If True, refresh TTL when cache entry is accessed (LRU behavior)
Returns:
Decorated function with caching capabilities
Example:
@cached(ttl_seconds=300) # 5 minute TTL
def expensive_sync_operation(param: str) -> dict:
return {"result": param}
@cached(ttl_seconds=600, shared_cache=True, refresh_ttl_on_get=True)
async def expensive_async_operation(param: str) -> dict:
return {"result": param}
"""
def decorator(target_func):
cache_storage: dict[tuple, CachedValue] = {}
_event_loop_locks: dict[Any, asyncio.Lock] = {}
def _get_from_redis(redis_key: str) -> Any | None:
"""Get value from Redis, optionally refreshing TTL."""
try:
import pickle
redis = _get_redis_client()
if refresh_ttl_on_get:
# Use GETEX to get value and refresh expiry atomically
cached_bytes = redis.getex(redis_key, ex=ttl_seconds)
else:
cached_bytes = redis.get(redis_key)
if cached_bytes and isinstance(cached_bytes, bytes):
return pickle.loads(cached_bytes)
except Exception as e:
logger.error(
f"Redis error during cache check for {target_func.__name__}: {e}"
)
return None
def _set_to_redis(redis_key: str, value: Any) -> None:
"""Set value in Redis with TTL."""
try:
import pickle
redis = _get_redis_client()
pickled_value = pickle.dumps(value, protocol=pickle.HIGHEST_PROTOCOL)
redis.setex(redis_key, ttl_seconds, pickled_value)
except Exception as e:
logger.error(
f"Redis error storing cache for {target_func.__name__}: {e}"
)
def _get_from_memory(key: tuple) -> Any | None:
"""Get value from in-memory cache, checking TTL."""
if key in cache_storage:
cached_data = cache_storage[key]
if time.time() - cached_data.timestamp < ttl_seconds:
logger.debug(
f"Cache hit for {target_func.__name__} args: {key[0]} kwargs: {key[1]}"
)
return cached_data.result
return None
def _set_to_memory(key: tuple, value: Any) -> None:
"""Set value in in-memory cache with timestamp."""
cache_storage[key] = CachedValue(result=value, timestamp=time.time())
# Cleanup if needed
if len(cache_storage) > maxsize:
cutoff = maxsize // 2
oldest_keys = list(cache_storage.keys())[:-cutoff] if cutoff > 0 else []
for old_key in oldest_keys:
cache_storage.pop(old_key, None)
if inspect.iscoroutinefunction(target_func):
def _get_cache_lock():
"""Get or create an asyncio.Lock for the current event loop."""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop not in _event_loop_locks:
_event_loop_locks[loop] = asyncio.Lock()
return _event_loop_locks[loop]
@wraps(target_func)
async def async_wrapper(*args: P.args, **kwargs: P.kwargs):
key = _make_hashable_key(args, kwargs)
redis_key = _make_redis_key(key) if shared_cache else ""
# Fast path: check cache without lock
if shared_cache:
result = _get_from_redis(redis_key)
if result is not None:
return result
else:
result = _get_from_memory(key)
if result is not None:
return result
# Slow path: acquire lock for cache miss/expiry
async with _get_cache_lock():
# Double-check: another coroutine might have populated cache
if shared_cache:
result = _get_from_redis(redis_key)
if result is not None:
return result
else:
result = _get_from_memory(key)
if result is not None:
return result
# Cache miss - execute function
logger.debug(f"Cache miss for {target_func.__name__}")
result = await target_func(*args, **kwargs)
# Store result
if shared_cache:
_set_to_redis(redis_key, result)
else:
_set_to_memory(key, result)
return result
wrapper = async_wrapper
else:
# Sync function with threading.Lock
cache_lock = threading.Lock()
@wraps(target_func)
def sync_wrapper(*args: P.args, **kwargs: P.kwargs):
key = _make_hashable_key(args, kwargs)
redis_key = _make_redis_key(key) if shared_cache else ""
# Fast path: check cache without lock
if shared_cache:
result = _get_from_redis(redis_key)
if result is not None:
return result
else:
result = _get_from_memory(key)
if result is not None:
return result
# Slow path: acquire lock for cache miss/expiry
with cache_lock:
# Double-check: another thread might have populated cache
if shared_cache:
result = _get_from_redis(redis_key)
if result is not None:
return result
else:
result = _get_from_memory(key)
if result is not None:
return result
# Cache miss - execute function
logger.debug(f"Cache miss for {target_func.__name__}")
result = target_func(*args, **kwargs)
# Store result
if shared_cache:
_set_to_redis(redis_key, result)
else:
_set_to_memory(key, result)
return result
wrapper = sync_wrapper
# Add cache management methods
def cache_clear(pattern: str | None = None) -> None:
"""Clear cache entries. If pattern provided, clear matching entries."""
if shared_cache:
redis = _get_redis_client()
if pattern:
# Clear entries matching pattern
keys = list(redis.scan_iter(f"cache:{pattern}", count=100))
else:
# Clear all cache keys
keys = list(redis.scan_iter("cache:*", count=100))
if keys:
pipeline = redis.pipeline()
for key in keys:
pipeline.delete(key)
pipeline.execute()
else:
if pattern:
# For in-memory cache, pattern matching not supported
logger.warning(
"Pattern-based clearing not supported for in-memory cache"
)
else:
cache_storage.clear()
def cache_info() -> dict[str, int | None]:
if shared_cache:
redis = _get_redis_client()
cache_keys = list(redis.scan_iter("cache:*"))
return {
"size": len(cache_keys),
"maxsize": None, # Redis manages its own size
"ttl_seconds": ttl_seconds,
}
else:
return {
"size": len(cache_storage),
"maxsize": maxsize,
"ttl_seconds": ttl_seconds,
}
def cache_delete(*args, **kwargs) -> bool:
"""Delete a specific cache entry. Returns True if entry existed."""
key = _make_hashable_key(args, kwargs)
if shared_cache:
redis = _get_redis_client()
redis_key = _make_redis_key(key)
if redis.exists(redis_key):
redis.delete(redis_key)
return True
return False
else:
if key in cache_storage:
del cache_storage[key]
return True
return False
setattr(wrapper, "cache_clear", cache_clear)
setattr(wrapper, "cache_info", cache_info)
setattr(wrapper, "cache_delete", cache_delete)
return cast(CachedFunction, wrapper)
return decorator
def thread_cached(func):
"""
Thread-local cache decorator for both sync and async functions.
Each thread gets its own cache, which is useful for request-scoped caching
in web applications where you want to cache within a single request but
not across requests.
Args:
func: The function to cache
Returns:
Decorated function with thread-local caching
Example:
@thread_cached
def expensive_operation(param: str) -> dict:
return {"result": param}
@thread_cached # Works with async too
async def expensive_async_operation(param: str) -> dict:
return {"result": param}
"""
thread_local = threading.local()
def _clear():
if hasattr(thread_local, "cache"):
del thread_local.cache
if inspect.iscoroutinefunction(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
cache = getattr(thread_local, "cache", None)
if cache is None:
cache = thread_local.cache = {}
key = _make_hashable_key(args, kwargs)
if key not in cache:
cache[key] = await func(*args, **kwargs)
return cache[key]
setattr(async_wrapper, "clear_cache", _clear)
return async_wrapper
else:
@wraps(func)
def sync_wrapper(*args, **kwargs):
cache = getattr(thread_local, "cache", None)
if cache is None:
cache = thread_local.cache = {}
key = _make_hashable_key(args, kwargs)
if key not in cache:
cache[key] = func(*args, **kwargs)
return cache[key]
setattr(sync_wrapper, "clear_cache", _clear)
return sync_wrapper
def clear_thread_cache(func: Callable) -> None:
"""Clear thread-local cache for a function."""
if clear := getattr(func, "clear_cache", None):
clear()

View File

@@ -4,8 +4,7 @@ Centralized service client helpers with thread caching.
from typing import TYPE_CHECKING
from autogpt_libs.utils.cache import cached, thread_cached
from backend.util.cache import cached, thread_cached
from backend.util.settings import Settings
settings = Settings()
@@ -118,7 +117,7 @@ def get_integration_credentials_store() -> "IntegrationCredentialsStore":
# ============ Supabase Clients ============ #
@cached()
@cached(ttl_seconds=3600)
def get_supabase() -> "Client":
"""Get a process-cached synchronous Supabase client instance."""
from supabase import create_client
@@ -128,7 +127,7 @@ def get_supabase() -> "Client":
)
@cached()
@cached(ttl_seconds=3600)
async def get_async_supabase() -> "AClient":
"""Get a process-cached asynchronous Supabase client instance."""
from supabase import create_async_client

View File

@@ -5,12 +5,12 @@ from functools import wraps
from typing import Any, Awaitable, Callable, TypeVar
import ldclient
from autogpt_libs.utils.cache import cached
from fastapi import HTTPException
from ldclient import Context, LDClient
from ldclient.config import Config
from typing_extensions import ParamSpec
from backend.util.cache import cached
from backend.util.settings import Settings
logger = logging.getLogger(__name__)

View File

@@ -258,6 +258,19 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="The vhost for the RabbitMQ server",
)
redis_host: str = Field(
default="localhost",
description="The host for the Redis server",
)
redis_port: int = Field(
default=6379,
description="The port for the Redis server",
)
redis_password: str = Field(
default="",
description="The password for the Redis server (empty string if no password)",
)
postmark_sender_email: str = Field(
default="invalid@invalid.com",
description="The email address to use for sending emails",

View File

@@ -413,6 +413,7 @@ pydantic-settings = "^2.10.1"
pyjwt = {version = "^2.10.1", extras = ["crypto"]}
redis = "^6.2.0"
supabase = "^2.16.0"
tenacity = "^9.1.2"
uvicorn = "^0.35.0"
[package.source]

View File

@@ -12,11 +12,11 @@ import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from unittest.mock import Mock
from unittest.mock import Mock, patch
import pytest
from autogpt_libs.utils.cache import cached, clear_thread_cache, thread_cached
from backend.util.cache import cached, clear_thread_cache, thread_cached
class TestThreadCached:
@@ -332,7 +332,7 @@ class TestCache:
"""Test basic sync caching functionality."""
call_count = 0
@cached()
@cached(ttl_seconds=300)
def expensive_sync_function(x: int, y: int = 0) -> int:
nonlocal call_count
call_count += 1
@@ -358,7 +358,7 @@ class TestCache:
"""Test basic async caching functionality."""
call_count = 0
@cached()
@cached(ttl_seconds=300)
async def expensive_async_function(x: int, y: int = 0) -> int:
nonlocal call_count
call_count += 1
@@ -385,7 +385,7 @@ class TestCache:
call_count = 0
results = []
@cached()
@cached(ttl_seconds=300)
def slow_function(x: int) -> int:
nonlocal call_count
call_count += 1
@@ -412,7 +412,7 @@ class TestCache:
"""Test that concurrent async calls don't cause thundering herd."""
call_count = 0
@cached()
@cached(ttl_seconds=300)
async def slow_async_function(x: int) -> int:
nonlocal call_count
call_count += 1
@@ -508,7 +508,7 @@ class TestCache:
"""Test cache clearing functionality."""
call_count = 0
@cached()
@cached(ttl_seconds=300)
def clearable_function(x: int) -> int:
nonlocal call_count
call_count += 1
@@ -537,7 +537,7 @@ class TestCache:
"""Test cache clearing functionality with async function."""
call_count = 0
@cached()
@cached(ttl_seconds=300)
async def async_clearable_function(x: int) -> int:
nonlocal call_count
call_count += 1
@@ -567,7 +567,7 @@ class TestCache:
"""Test that cached async functions return actual results, not coroutines."""
call_count = 0
@cached()
@cached(ttl_seconds=300)
async def async_result_function(x: int) -> str:
nonlocal call_count
call_count += 1
@@ -593,7 +593,7 @@ class TestCache:
"""Test selective cache deletion functionality."""
call_count = 0
@cached()
@cached(ttl_seconds=300)
def deletable_function(x: int) -> int:
nonlocal call_count
call_count += 1
@@ -636,7 +636,7 @@ class TestCache:
"""Test selective cache deletion functionality with async function."""
call_count = 0
@cached()
@cached(ttl_seconds=300)
async def async_deletable_function(x: int) -> int:
nonlocal call_count
call_count += 1
@@ -674,3 +674,333 @@ class TestCache:
# Try to delete non-existent entry
was_deleted = async_deletable_function.cache_delete(99)
assert was_deleted is False
class TestSharedCache:
"""Tests for shared_cache functionality using Redis."""
@pytest.fixture(autouse=True)
def setup_redis_mock(self):
"""Mock Redis client for testing."""
with patch("backend.util.cache._get_redis_client") as mock_redis_func:
# Configure mock to behave like Redis
mock_redis = Mock()
self.mock_redis = mock_redis
self.redis_storage = {}
def mock_get(key):
return self.redis_storage.get(key)
def mock_getex(key, ex=None):
# GETEX returns value and optionally refreshes TTL
return self.redis_storage.get(key)
def mock_set(key, value):
self.redis_storage[key] = value
return True
def mock_setex(key, ttl, value):
self.redis_storage[key] = value
return True
def mock_exists(key):
return 1 if key in self.redis_storage else 0
def mock_delete(key):
if key in self.redis_storage:
del self.redis_storage[key]
return 1
return 0
def mock_scan_iter(pattern, count=None):
# Pattern is a string like "cache:*", keys in storage are strings
prefix = pattern.rstrip("*")
return [
k
for k in self.redis_storage.keys()
if isinstance(k, str) and k.startswith(prefix)
]
def mock_pipeline():
pipe = Mock()
deleted_keys = []
def pipe_delete(key):
deleted_keys.append(key)
return pipe
def pipe_execute():
# Actually delete the keys when pipeline executes
for key in deleted_keys:
self.redis_storage.pop(key, None)
deleted_keys.clear()
return []
pipe.delete = Mock(side_effect=pipe_delete)
pipe.execute = Mock(side_effect=pipe_execute)
return pipe
mock_redis.get = Mock(side_effect=mock_get)
mock_redis.getex = Mock(side_effect=mock_getex)
mock_redis.set = Mock(side_effect=mock_set)
mock_redis.setex = Mock(side_effect=mock_setex)
mock_redis.exists = Mock(side_effect=mock_exists)
mock_redis.delete = Mock(side_effect=mock_delete)
mock_redis.scan_iter = Mock(side_effect=mock_scan_iter)
mock_redis.pipeline = Mock(side_effect=mock_pipeline)
# Make _get_redis_client return the mock
mock_redis_func.return_value = mock_redis
yield mock_redis
# Cleanup
self.redis_storage.clear()
def test_sync_shared_cache_basic(self):
"""Test basic shared cache functionality with sync function."""
call_count = 0
@cached(shared_cache=True, ttl_seconds=300)
def shared_function(x: int) -> int:
nonlocal call_count
call_count += 1
return x * 10
# First call - should miss cache
result1 = shared_function(5)
assert result1 == 50
assert call_count == 1
assert self.mock_redis.get.called
assert self.mock_redis.setex.called # setex is used for TTL
# Second call - should hit cache
result2 = shared_function(5)
assert result2 == 50
assert call_count == 1 # Function not called again
@pytest.mark.asyncio
async def test_async_shared_cache_basic(self):
"""Test basic shared cache functionality with async function."""
call_count = 0
@cached(shared_cache=True, ttl_seconds=300)
async def async_shared_function(x: int) -> int:
nonlocal call_count
call_count += 1
await asyncio.sleep(0.01)
return x * 20
# First call - should miss cache
result1 = await async_shared_function(3)
assert result1 == 60
assert call_count == 1
assert self.mock_redis.get.called
assert self.mock_redis.setex.called # setex is used for TTL
# Second call - should hit cache
result2 = await async_shared_function(3)
assert result2 == 60
assert call_count == 1 # Function not called again
def test_sync_shared_cache_with_ttl(self):
"""Test shared cache with TTL using sync function."""
call_count = 0
@cached(shared_cache=True, ttl_seconds=60)
def shared_ttl_function(x: int) -> int:
nonlocal call_count
call_count += 1
return x * 30
# First call
result1 = shared_ttl_function(2)
assert result1 == 60
assert call_count == 1
assert self.mock_redis.setex.called
# Second call - should use cache
result2 = shared_ttl_function(2)
assert result2 == 60
assert call_count == 1
@pytest.mark.asyncio
async def test_async_shared_cache_with_ttl(self):
"""Test shared cache with TTL using async function."""
call_count = 0
@cached(shared_cache=True, ttl_seconds=120)
async def async_shared_ttl_function(x: int) -> int:
nonlocal call_count
call_count += 1
await asyncio.sleep(0.01)
return x * 40
# First call
result1 = await async_shared_ttl_function(4)
assert result1 == 160
assert call_count == 1
assert self.mock_redis.setex.called
# Second call - should use cache
result2 = await async_shared_ttl_function(4)
assert result2 == 160
assert call_count == 1
def test_shared_cache_clear(self):
"""Test clearing shared cache."""
call_count = 0
@cached(shared_cache=True, ttl_seconds=300)
def clearable_shared_function(x: int) -> int:
nonlocal call_count
call_count += 1
return x * 50
# First call
result1 = clearable_shared_function(1)
assert result1 == 50
assert call_count == 1
# Second call - should use cache
result2 = clearable_shared_function(1)
assert result2 == 50
assert call_count == 1
# Clear cache
clearable_shared_function.cache_clear()
assert self.mock_redis.pipeline.called
# Third call - should execute function again
result3 = clearable_shared_function(1)
assert result3 == 50
assert call_count == 2
def test_shared_cache_delete(self):
"""Test deleting specific shared cache entry."""
call_count = 0
@cached(shared_cache=True, ttl_seconds=300)
def deletable_shared_function(x: int) -> int:
nonlocal call_count
call_count += 1
return x * 60
# First call for x=1
result1 = deletable_shared_function(1)
assert result1 == 60
assert call_count == 1
# First call for x=2
result2 = deletable_shared_function(2)
assert result2 == 120
assert call_count == 2
# Delete entry for x=1
was_deleted = deletable_shared_function.cache_delete(1)
assert was_deleted is True
# Call with x=1 should execute function again
result3 = deletable_shared_function(1)
assert result3 == 60
assert call_count == 3
# Call with x=2 should still use cache
result4 = deletable_shared_function(2)
assert result4 == 120
assert call_count == 3
def test_shared_cache_error_handling(self):
"""Test that Redis errors are handled gracefully."""
call_count = 0
@cached(shared_cache=True, ttl_seconds=300)
def error_prone_function(x: int) -> int:
nonlocal call_count
call_count += 1
return x * 70
# Simulate Redis error
self.mock_redis.get.side_effect = Exception("Redis connection error")
# Function should still work
result = error_prone_function(1)
assert result == 70
assert call_count == 1
@pytest.mark.asyncio
async def test_async_shared_cache_error_handling(self):
"""Test that Redis errors are handled gracefully in async functions."""
call_count = 0
@cached(shared_cache=True, ttl_seconds=300)
async def async_error_prone_function(x: int) -> int:
nonlocal call_count
call_count += 1
await asyncio.sleep(0.01)
return x * 80
# Simulate Redis error
self.mock_redis.get.side_effect = Exception("Redis connection error")
# Function should still work
result = await async_error_prone_function(1)
assert result == 80
assert call_count == 1
def test_shared_cache_with_complex_types(self):
"""Test shared cache with complex return types (lists, dicts)."""
call_count = 0
@cached(shared_cache=True, ttl_seconds=300)
def complex_return_function(x: int) -> dict:
nonlocal call_count
call_count += 1
return {"value": x, "squared": x * x, "list": [1, 2, 3]}
# First call
result1 = complex_return_function(5)
assert result1 == {"value": 5, "squared": 25, "list": [1, 2, 3]}
assert call_count == 1
# Second call - should use cache
result2 = complex_return_function(5)
assert result2 == {"value": 5, "squared": 25, "list": [1, 2, 3]}
assert call_count == 1
@pytest.mark.asyncio
async def test_async_thundering_herd_shared_cache(self):
"""Test thundering herd protection with shared cache."""
call_count = 0
@cached(shared_cache=True, ttl_seconds=300)
async def slow_shared_function(x: int) -> int:
nonlocal call_count
call_count += 1
await asyncio.sleep(0.1)
return x * x
# Launch concurrent coroutines
tasks = [slow_shared_function(9) for _ in range(5)]
results = await asyncio.gather(*tasks)
# All results should be the same
assert all(result == 81 for result in results)
# Only one coroutine should have executed the function
assert call_count == 1
def test_shared_cache_info(self):
"""Test cache_info with shared cache."""
@cached(shared_cache=True, maxsize=100, ttl_seconds=300)
def info_function(x: int) -> int:
return x * 90
# Call the function to populate cache
info_function(1)
# Get cache info
info = info_function.cache_info()
assert "size" in info
assert info["maxsize"] is None # Redis manages its own size
assert info["ttl_seconds"] == 300

View File

@@ -2,6 +2,7 @@
import { useGetV2ListLibraryAgentsInfinite } from "@/app/api/__generated__/endpoints/library/library";
import { LibraryAgentResponse } from "@/app/api/__generated__/models/libraryAgentResponse";
import { LIBRARY_AGENTS_PAGE_SIZE } from "@/lib/pagination-config";
import { useLibraryPageContext } from "../state-provider";
export const useLibraryAgentList = () => {
@@ -15,7 +16,7 @@ export const useLibraryAgentList = () => {
} = useGetV2ListLibraryAgentsInfinite(
{
page: 1,
page_size: 8,
page_size: LIBRARY_AGENTS_PAGE_SIZE,
search_term: searchTerm || undefined,
sort_by: librarySort,
},

View File

@@ -4,6 +4,7 @@ import {
} from "@/app/api/__generated__/endpoints/store/store";
import { StoreAgentsResponse } from "@/app/api/__generated__/models/storeAgentsResponse";
import { CreatorsResponse } from "@/app/api/__generated__/models/creatorsResponse";
import { LARGE_PAGE_SIZE } from "@/lib/pagination-config";
const queryConfig = {
staleTime: 60 * 1000, // 60 seconds - match server cache
@@ -37,7 +38,7 @@ export const useMainMarketplacePage = () => {
} = useGetV2ListStoreAgents(
{
sorted_by: "runs",
page_size: 1000,
page_size: LARGE_PAGE_SIZE,
},
{
query: {

View File

@@ -38,7 +38,7 @@ export function useAgentActivityDropdown() {
data: agents,
isSuccess: agentsSuccess,
error: agentsError,
} = useGetV2ListLibraryAgents();
} = useGetV2ListLibraryAgents({ page_size: 10 });
const {
data: executions,

View File

@@ -0,0 +1,65 @@
/**
* Shared pagination configuration constants.
*
* These values MUST match the backend's cache_config.py to ensure
* proper cache invalidation when data is mutated.
*
* CRITICAL: If you change any of these values:
* 1. Update backend/server/cache_config.py
* 2. Update cache invalidation logic
* 3. Run tests to ensure consistency
*/
/**
* Default page size for store agents listing
* Backend: V2_STORE_AGENTS_PAGE_SIZE
*/
export const STORE_AGENTS_PAGE_SIZE = 20;
/**
* Default page size for store creators listing
* Backend: V2_STORE_CREATORS_PAGE_SIZE
*/
export const STORE_CREATORS_PAGE_SIZE = 20;
/**
* Default page size for user submissions listing
* Backend: V2_STORE_SUBMISSIONS_PAGE_SIZE
*/
export const STORE_SUBMISSIONS_PAGE_SIZE = 20;
/**
* Default page size for user's own agents listing
* Backend: V2_MY_AGENTS_PAGE_SIZE
*/
export const MY_AGENTS_PAGE_SIZE = 20;
/**
* Default page size for library agents listing
* Backend: V2_LIBRARY_AGENTS_PAGE_SIZE
*/
export const LIBRARY_AGENTS_PAGE_SIZE = 10;
/**
* Default page size for library presets listing
* Backend: V2_LIBRARY_PRESETS_PAGE_SIZE
*/
export const LIBRARY_PRESETS_PAGE_SIZE = 20;
/**
* Default page size for agent runs/executions
* Backend: V1_GRAPH_EXECUTIONS_PAGE_SIZE (note: this is from v1 API)
*/
export const AGENT_RUNS_PAGE_SIZE = 20;
/**
* Large page size for fetching "all" items (marketplace top agents)
* Used when we want to fetch a comprehensive list without pagination UI
*/
export const LARGE_PAGE_SIZE = 1000;
/**
* Very large page size for specific use cases
* Used in agent runs view for comprehensive listing
*/
export const EXTRA_LARGE_PAGE_SIZE = 100;

View File

@@ -0,0 +1,195 @@
/**
* Test suite for cache invalidation consistency.
*
* These tests ensure that when we invalidate query caches, the parameters
* used match what the backend expects. If the default page_size changes
* in the backend API, these tests will catch mismatches.
*
* NOTE: These are unit tests for cache key generation.
* They use Playwright's test framework but don't require a browser.
*/
import { test, expect } from "@playwright/test";
import { getGetV2ListMySubmissionsQueryKey } from "@/app/api/__generated__/endpoints/store/store";
import * as PaginationConfig from "@/lib/pagination-config";
test.describe("Cache Invalidation Tests", () => {
test.describe("getGetV2ListMySubmissionsQueryKey", () => {
test("should generate correct query key without params", () => {
const key = getGetV2ListMySubmissionsQueryKey();
expect(key).toEqual(["/api/store/submissions"]);
});
test("should generate correct query key with params", () => {
const key = getGetV2ListMySubmissionsQueryKey({
page: 1,
page_size: 20,
});
expect(key).toEqual([
"/api/store/submissions",
{ page: 1, page_size: 20 },
]);
});
test("should generate different keys for different page_size values", () => {
const key1 = getGetV2ListMySubmissionsQueryKey({
page: 1,
page_size: 20,
});
const key2 = getGetV2ListMySubmissionsQueryKey({
page: 1,
page_size: 25,
});
expect(key1).not.toEqual(key2);
});
});
test.describe("Cache invalidation page_size consistency", () => {
/**
* This test documents the current default page_size used in the backend.
* If this test fails, it means:
* 1. The backend default page_size has changed, OR
* 2. The frontend is using a different page_size than the backend
*
* When invalidating queries without params, we're invalidating ALL
* submissions queries regardless of page_size. This is correct behavior.
*/
test("should use page_size matching backend default when invalidating specific pages", () => {
// Use the shared constant that matches backend's cache_config.V2_STORE_SUBMISSIONS_PAGE_SIZE
const BACKEND_DEFAULT_PAGE_SIZE =
PaginationConfig.STORE_SUBMISSIONS_PAGE_SIZE;
// When we call invalidateQueries without params, it invalidates all variations
const invalidateAllKey = getGetV2ListMySubmissionsQueryKey();
expect(invalidateAllKey).toEqual(["/api/store/submissions"]);
// When we call invalidateQueries with specific params, it should match backend
const invalidateSpecificKey = getGetV2ListMySubmissionsQueryKey({
page: 1,
page_size: BACKEND_DEFAULT_PAGE_SIZE,
});
expect(invalidateSpecificKey).toEqual([
"/api/store/submissions",
{ page: 1, page_size: PaginationConfig.STORE_SUBMISSIONS_PAGE_SIZE },
]);
});
/**
* This test verifies that invalidating without parameters will match
* all cached queries regardless of their page_size.
* This is the behavior when calling:
* queryClient.invalidateQueries({ queryKey: getGetV2ListMySubmissionsQueryKey() })
*/
test("should invalidate all submissions when using base key", () => {
const baseKey = getGetV2ListMySubmissionsQueryKey();
// These are examples of keys that would be cached
const cachedKey1 = getGetV2ListMySubmissionsQueryKey({
page: 1,
page_size: 20,
});
const cachedKey2 = getGetV2ListMySubmissionsQueryKey({
page: 2,
page_size: 20,
});
const cachedKey3 = getGetV2ListMySubmissionsQueryKey({
page: 1,
page_size: 25,
});
// Base key should be a prefix of all cached keys
expect(cachedKey1[0]).toBe(baseKey[0]);
expect(cachedKey2[0]).toBe(baseKey[0]);
expect(cachedKey3[0]).toBe(baseKey[0]);
// This confirms that invalidating with base key will match all variations
// because TanStack Query does prefix matching by default
});
/**
* This test documents a potential issue:
* If the backend's _clear_submissions_cache hardcodes page_size=20,
* but the frontend uses a different page_size, the caches won't sync.
*
* The frontend should ALWAYS call invalidateQueries without params
* to ensure all pages are invalidated, not just specific page_size values.
*/
test("should document the cache invalidation strategy", () => {
// CORRECT: This invalidates ALL submissions queries
const correctInvalidation = getGetV2ListMySubmissionsQueryKey();
expect(correctInvalidation).toEqual(["/api/store/submissions"]);
// INCORRECT: This would only invalidate queries with page_size=20
const incorrectInvalidation = getGetV2ListMySubmissionsQueryKey({
page: 1,
page_size: 20,
});
expect(incorrectInvalidation).toEqual([
"/api/store/submissions",
{ page: 1, page_size: 20 },
]);
// Verify current usage in codebase uses correct approach
// (This is a documentation test - it will always pass)
// Real verification requires checking actual invalidateQueries calls
});
});
test.describe("Integration with backend cache clearing", () => {
/**
* This test documents how the backend's _clear_submissions_cache works
* and what the frontend needs to do to stay in sync.
*/
test("should document backend cache clearing behavior", () => {
const BACKEND_HARDCODED_PAGE_SIZE = 20; // From cache.py line 18
const BACKEND_NUM_PAGES_TO_CLEAR = 20; // From cache.py line 13
// Backend clears pages 1-19 with page_size=20
// Frontend should invalidate ALL queries to ensure sync
const frontendInvalidationKey = getGetV2ListMySubmissionsQueryKey();
// Document what gets invalidated
const expectedInvalidations = Array.from(
{ length: BACKEND_NUM_PAGES_TO_CLEAR - 1 },
(_, i) =>
getGetV2ListMySubmissionsQueryKey({
page: i + 1,
page_size: BACKEND_HARDCODED_PAGE_SIZE,
}),
);
// All backend-cleared pages should have the same base key
expectedInvalidations.forEach((key) => {
expect(key[0]).toBe(frontendInvalidationKey[0]);
});
// This confirms that using the base key for invalidation
// will catch all the entries the backend cleared
});
/**
* CRITICAL TEST: This test will fail if someone changes the page_size
* in the frontend components but the backend still uses page_size=20.
*/
test("should fail if frontend default page_size differs from backend", () => {
// Both frontend and backend now use shared constants
// Frontend: STORE_SUBMISSIONS_PAGE_SIZE from pagination-config.ts
// Backend: V2_STORE_SUBMISSIONS_PAGE_SIZE from cache_config.py
// These MUST be kept in sync manually (no cross-language constant sharing possible)
const EXPECTED_PAGE_SIZE = 20;
expect(PaginationConfig.STORE_SUBMISSIONS_PAGE_SIZE).toBe(
EXPECTED_PAGE_SIZE,
);
// If this test fails, you must:
// 1. Update backend/server/cache_config.py V2_STORE_SUBMISSIONS_PAGE_SIZE
// 2. Update frontend/lib/pagination-config.ts STORE_SUBMISSIONS_PAGE_SIZE
// 3. Update all routes and cache clearing logic to use the constants
// 4. Update this test with the new expected value
});
});
});