Compare commits

..

4 Commits

Author SHA1 Message Date
Swifty
894e3600fb add other specs 2025-08-01 14:21:57 +02:00
Swifty
9de4b09f20 mv to sub dir 2025-08-01 13:19:42 +02:00
Swifty
62e41d409a websocket server running well now 2025-08-01 13:17:45 +02:00
Swifty
9f03e3af47 added websocket service 2025-08-01 11:19:29 +02:00
2184 changed files with 62382 additions and 231796 deletions

View File

@@ -1,37 +0,0 @@
{
"worktreeCopyPatterns": [
".env*",
".vscode/**",
".auth/**",
".claude/**",
"autogpt_platform/.env*",
"autogpt_platform/backend/.env*",
"autogpt_platform/frontend/.env*",
"autogpt_platform/frontend/.auth/**",
"autogpt_platform/db/docker/.env*"
],
"worktreeCopyIgnores": [
"**/node_modules/**",
"**/dist/**",
"**/.git/**",
"**/Thumbs.db",
"**/.DS_Store",
"**/.next/**",
"**/__pycache__/**",
"**/.ruff_cache/**",
"**/.pytest_cache/**",
"**/*.pyc",
"**/playwright-report/**",
"**/logs/**",
"**/site/**"
],
"worktreePathTemplate": "$BASE_PATH.worktree",
"postCreateCmd": [
"cd autogpt_platform/autogpt_libs && poetry install",
"cd autogpt_platform/backend && poetry install && poetry run prisma generate",
"cd autogpt_platform/frontend && pnpm install",
"cd docs && pip install -r requirements.txt"
],
"terminalCommand": "code .",
"deleteBranchWithWorktree": false
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,125 +0,0 @@
---
name: vercel-react-best-practices
description: React and Next.js performance optimization guidelines from Vercel Engineering. This skill should be used when writing, reviewing, or refactoring React/Next.js code to ensure optimal performance patterns. Triggers on tasks involving React components, Next.js pages, data fetching, bundle optimization, or performance improvements.
license: MIT
metadata:
author: vercel
version: "1.0.0"
---
# Vercel React Best Practices
Comprehensive performance optimization guide for React and Next.js applications, maintained by Vercel. Contains 45 rules across 8 categories, prioritized by impact to guide automated refactoring and code generation.
## When to Apply
Reference these guidelines when:
- Writing new React components or Next.js pages
- Implementing data fetching (client or server-side)
- Reviewing code for performance issues
- Refactoring existing React/Next.js code
- Optimizing bundle size or load times
## Rule Categories by Priority
| Priority | Category | Impact | Prefix |
|----------|----------|--------|--------|
| 1 | Eliminating Waterfalls | CRITICAL | `async-` |
| 2 | Bundle Size Optimization | CRITICAL | `bundle-` |
| 3 | Server-Side Performance | HIGH | `server-` |
| 4 | Client-Side Data Fetching | MEDIUM-HIGH | `client-` |
| 5 | Re-render Optimization | MEDIUM | `rerender-` |
| 6 | Rendering Performance | MEDIUM | `rendering-` |
| 7 | JavaScript Performance | LOW-MEDIUM | `js-` |
| 8 | Advanced Patterns | LOW | `advanced-` |
## Quick Reference
### 1. Eliminating Waterfalls (CRITICAL)
- `async-defer-await` - Move await into branches where actually used
- `async-parallel` - Use Promise.all() for independent operations
- `async-dependencies` - Use better-all for partial dependencies
- `async-api-routes` - Start promises early, await late in API routes
- `async-suspense-boundaries` - Use Suspense to stream content
### 2. Bundle Size Optimization (CRITICAL)
- `bundle-barrel-imports` - Import directly, avoid barrel files
- `bundle-dynamic-imports` - Use next/dynamic for heavy components
- `bundle-defer-third-party` - Load analytics/logging after hydration
- `bundle-conditional` - Load modules only when feature is activated
- `bundle-preload` - Preload on hover/focus for perceived speed
### 3. Server-Side Performance (HIGH)
- `server-cache-react` - Use React.cache() for per-request deduplication
- `server-cache-lru` - Use LRU cache for cross-request caching
- `server-serialization` - Minimize data passed to client components
- `server-parallel-fetching` - Restructure components to parallelize fetches
- `server-after-nonblocking` - Use after() for non-blocking operations
### 4. Client-Side Data Fetching (MEDIUM-HIGH)
- `client-swr-dedup` - Use SWR for automatic request deduplication
- `client-event-listeners` - Deduplicate global event listeners
### 5. Re-render Optimization (MEDIUM)
- `rerender-defer-reads` - Don't subscribe to state only used in callbacks
- `rerender-memo` - Extract expensive work into memoized components
- `rerender-dependencies` - Use primitive dependencies in effects
- `rerender-derived-state` - Subscribe to derived booleans, not raw values
- `rerender-functional-setstate` - Use functional setState for stable callbacks
- `rerender-lazy-state-init` - Pass function to useState for expensive values
- `rerender-transitions` - Use startTransition for non-urgent updates
### 6. Rendering Performance (MEDIUM)
- `rendering-animate-svg-wrapper` - Animate div wrapper, not SVG element
- `rendering-content-visibility` - Use content-visibility for long lists
- `rendering-hoist-jsx` - Extract static JSX outside components
- `rendering-svg-precision` - Reduce SVG coordinate precision
- `rendering-hydration-no-flicker` - Use inline script for client-only data
- `rendering-activity` - Use Activity component for show/hide
- `rendering-conditional-render` - Use ternary, not && for conditionals
### 7. JavaScript Performance (LOW-MEDIUM)
- `js-batch-dom-css` - Group CSS changes via classes or cssText
- `js-index-maps` - Build Map for repeated lookups
- `js-cache-property-access` - Cache object properties in loops
- `js-cache-function-results` - Cache function results in module-level Map
- `js-cache-storage` - Cache localStorage/sessionStorage reads
- `js-combine-iterations` - Combine multiple filter/map into one loop
- `js-length-check-first` - Check array length before expensive comparison
- `js-early-exit` - Return early from functions
- `js-hoist-regexp` - Hoist RegExp creation outside loops
- `js-min-max-loop` - Use loop for min/max instead of sort
- `js-set-map-lookups` - Use Set/Map for O(1) lookups
- `js-tosorted-immutable` - Use toSorted() for immutability
### 8. Advanced Patterns (LOW)
- `advanced-event-handler-refs` - Store event handlers in refs
- `advanced-use-latest` - useLatest for stable callback refs
## How to Use
Read individual rule files for detailed explanations and code examples:
```
rules/async-parallel.md
rules/bundle-barrel-imports.md
rules/_sections.md
```
Each rule file contains:
- Brief explanation of why it matters
- Incorrect code example with explanation
- Correct code example with explanation
- Additional context and references
## Full Compiled Document
For the complete guide with all rules expanded: `AGENTS.md`

View File

@@ -1,55 +0,0 @@
---
title: Store Event Handlers in Refs
impact: LOW
impactDescription: stable subscriptions
tags: advanced, hooks, refs, event-handlers, optimization
---
## Store Event Handlers in Refs
Store callbacks in refs when used in effects that shouldn't re-subscribe on callback changes.
**Incorrect (re-subscribes on every render):**
```tsx
function useWindowEvent(event: string, handler: () => void) {
useEffect(() => {
window.addEventListener(event, handler)
return () => window.removeEventListener(event, handler)
}, [event, handler])
}
```
**Correct (stable subscription):**
```tsx
function useWindowEvent(event: string, handler: () => void) {
const handlerRef = useRef(handler)
useEffect(() => {
handlerRef.current = handler
}, [handler])
useEffect(() => {
const listener = () => handlerRef.current()
window.addEventListener(event, listener)
return () => window.removeEventListener(event, listener)
}, [event])
}
```
**Alternative: use `useEffectEvent` if you're on latest React:**
```tsx
import { useEffectEvent } from 'react'
function useWindowEvent(event: string, handler: () => void) {
const onEvent = useEffectEvent(handler)
useEffect(() => {
window.addEventListener(event, onEvent)
return () => window.removeEventListener(event, onEvent)
}, [event])
}
```
`useEffectEvent` provides a cleaner API for the same pattern: it creates a stable function reference that always calls the latest version of the handler.

View File

@@ -1,49 +0,0 @@
---
title: useLatest for Stable Callback Refs
impact: LOW
impactDescription: prevents effect re-runs
tags: advanced, hooks, useLatest, refs, optimization
---
## useLatest for Stable Callback Refs
Access latest values in callbacks without adding them to dependency arrays. Prevents effect re-runs while avoiding stale closures.
**Implementation:**
```typescript
function useLatest<T>(value: T) {
const ref = useRef(value)
useEffect(() => {
ref.current = value
}, [value])
return ref
}
```
**Incorrect (effect re-runs on every callback change):**
```tsx
function SearchInput({ onSearch }: { onSearch: (q: string) => void }) {
const [query, setQuery] = useState('')
useEffect(() => {
const timeout = setTimeout(() => onSearch(query), 300)
return () => clearTimeout(timeout)
}, [query, onSearch])
}
```
**Correct (stable effect, fresh callback):**
```tsx
function SearchInput({ onSearch }: { onSearch: (q: string) => void }) {
const [query, setQuery] = useState('')
const onSearchRef = useLatest(onSearch)
useEffect(() => {
const timeout = setTimeout(() => onSearchRef.current(query), 300)
return () => clearTimeout(timeout)
}, [query])
}
```

View File

@@ -1,38 +0,0 @@
---
title: Prevent Waterfall Chains in API Routes
impact: CRITICAL
impactDescription: 2-10× improvement
tags: api-routes, server-actions, waterfalls, parallelization
---
## Prevent Waterfall Chains in API Routes
In API routes and Server Actions, start independent operations immediately, even if you don't await them yet.
**Incorrect (config waits for auth, data waits for both):**
```typescript
export async function GET(request: Request) {
const session = await auth()
const config = await fetchConfig()
const data = await fetchData(session.user.id)
return Response.json({ data, config })
}
```
**Correct (auth and config start immediately):**
```typescript
export async function GET(request: Request) {
const sessionPromise = auth()
const configPromise = fetchConfig()
const session = await sessionPromise
const [config, data] = await Promise.all([
configPromise,
fetchData(session.user.id)
])
return Response.json({ data, config })
}
```
For operations with more complex dependency chains, use `better-all` to automatically maximize parallelism (see Dependency-Based Parallelization).

View File

@@ -1,80 +0,0 @@
---
title: Defer Await Until Needed
impact: HIGH
impactDescription: avoids blocking unused code paths
tags: async, await, conditional, optimization
---
## Defer Await Until Needed
Move `await` operations into the branches where they're actually used to avoid blocking code paths that don't need them.
**Incorrect (blocks both branches):**
```typescript
async function handleRequest(userId: string, skipProcessing: boolean) {
const userData = await fetchUserData(userId)
if (skipProcessing) {
// Returns immediately but still waited for userData
return { skipped: true }
}
// Only this branch uses userData
return processUserData(userData)
}
```
**Correct (only blocks when needed):**
```typescript
async function handleRequest(userId: string, skipProcessing: boolean) {
if (skipProcessing) {
// Returns immediately without waiting
return { skipped: true }
}
// Fetch only when needed
const userData = await fetchUserData(userId)
return processUserData(userData)
}
```
**Another example (early return optimization):**
```typescript
// Incorrect: always fetches permissions
async function updateResource(resourceId: string, userId: string) {
const permissions = await fetchPermissions(userId)
const resource = await getResource(resourceId)
if (!resource) {
return { error: 'Not found' }
}
if (!permissions.canEdit) {
return { error: 'Forbidden' }
}
return await updateResourceData(resource, permissions)
}
// Correct: fetches only when needed
async function updateResource(resourceId: string, userId: string) {
const resource = await getResource(resourceId)
if (!resource) {
return { error: 'Not found' }
}
const permissions = await fetchPermissions(userId)
if (!permissions.canEdit) {
return { error: 'Forbidden' }
}
return await updateResourceData(resource, permissions)
}
```
This optimization is especially valuable when the skipped branch is frequently taken, or when the deferred operation is expensive.

View File

@@ -1,36 +0,0 @@
---
title: Dependency-Based Parallelization
impact: CRITICAL
impactDescription: 2-10× improvement
tags: async, parallelization, dependencies, better-all
---
## Dependency-Based Parallelization
For operations with partial dependencies, use `better-all` to maximize parallelism. It automatically starts each task at the earliest possible moment.
**Incorrect (profile waits for config unnecessarily):**
```typescript
const [user, config] = await Promise.all([
fetchUser(),
fetchConfig()
])
const profile = await fetchProfile(user.id)
```
**Correct (config and profile run in parallel):**
```typescript
import { all } from 'better-all'
const { user, config, profile } = await all({
async user() { return fetchUser() },
async config() { return fetchConfig() },
async profile() {
return fetchProfile((await this.$.user).id)
}
})
```
Reference: [https://github.com/shuding/better-all](https://github.com/shuding/better-all)

View File

@@ -1,28 +0,0 @@
---
title: Promise.all() for Independent Operations
impact: CRITICAL
impactDescription: 2-10× improvement
tags: async, parallelization, promises, waterfalls
---
## Promise.all() for Independent Operations
When async operations have no interdependencies, execute them concurrently using `Promise.all()`.
**Incorrect (sequential execution, 3 round trips):**
```typescript
const user = await fetchUser()
const posts = await fetchPosts()
const comments = await fetchComments()
```
**Correct (parallel execution, 1 round trip):**
```typescript
const [user, posts, comments] = await Promise.all([
fetchUser(),
fetchPosts(),
fetchComments()
])
```

View File

@@ -1,99 +0,0 @@
---
title: Strategic Suspense Boundaries
impact: HIGH
impactDescription: faster initial paint
tags: async, suspense, streaming, layout-shift
---
## Strategic Suspense Boundaries
Instead of awaiting data in async components before returning JSX, use Suspense boundaries to show the wrapper UI faster while data loads.
**Incorrect (wrapper blocked by data fetching):**
```tsx
async function Page() {
const data = await fetchData() // Blocks entire page
return (
<div>
<div>Sidebar</div>
<div>Header</div>
<div>
<DataDisplay data={data} />
</div>
<div>Footer</div>
</div>
)
}
```
The entire layout waits for data even though only the middle section needs it.
**Correct (wrapper shows immediately, data streams in):**
```tsx
function Page() {
return (
<div>
<div>Sidebar</div>
<div>Header</div>
<div>
<Suspense fallback={<Skeleton />}>
<DataDisplay />
</Suspense>
</div>
<div>Footer</div>
</div>
)
}
async function DataDisplay() {
const data = await fetchData() // Only blocks this component
return <div>{data.content}</div>
}
```
Sidebar, Header, and Footer render immediately. Only DataDisplay waits for data.
**Alternative (share promise across components):**
```tsx
function Page() {
// Start fetch immediately, but don't await
const dataPromise = fetchData()
return (
<div>
<div>Sidebar</div>
<div>Header</div>
<Suspense fallback={<Skeleton />}>
<DataDisplay dataPromise={dataPromise} />
<DataSummary dataPromise={dataPromise} />
</Suspense>
<div>Footer</div>
</div>
)
}
function DataDisplay({ dataPromise }: { dataPromise: Promise<Data> }) {
const data = use(dataPromise) // Unwraps the promise
return <div>{data.content}</div>
}
function DataSummary({ dataPromise }: { dataPromise: Promise<Data> }) {
const data = use(dataPromise) // Reuses the same promise
return <div>{data.summary}</div>
}
```
Both components share the same promise, so only one fetch occurs. Layout renders immediately while both components wait together.
**When NOT to use this pattern:**
- Critical data needed for layout decisions (affects positioning)
- SEO-critical content above the fold
- Small, fast queries where suspense overhead isn't worth it
- When you want to avoid layout shift (loading → content jump)
**Trade-off:** Faster initial paint vs potential layout shift. Choose based on your UX priorities.

View File

@@ -1,59 +0,0 @@
---
title: Avoid Barrel File Imports
impact: CRITICAL
impactDescription: 200-800ms import cost, slow builds
tags: bundle, imports, tree-shaking, barrel-files, performance
---
## Avoid Barrel File Imports
Import directly from source files instead of barrel files to avoid loading thousands of unused modules. **Barrel files** are entry points that re-export multiple modules (e.g., `index.js` that does `export * from './module'`).
Popular icon and component libraries can have **up to 10,000 re-exports** in their entry file. For many React packages, **it takes 200-800ms just to import them**, affecting both development speed and production cold starts.
**Why tree-shaking doesn't help:** When a library is marked as external (not bundled), the bundler can't optimize it. If you bundle it to enable tree-shaking, builds become substantially slower analyzing the entire module graph.
**Incorrect (imports entire library):**
```tsx
import { Check, X, Menu } from 'lucide-react'
// Loads 1,583 modules, takes ~2.8s extra in dev
// Runtime cost: 200-800ms on every cold start
import { Button, TextField } from '@mui/material'
// Loads 2,225 modules, takes ~4.2s extra in dev
```
**Correct (imports only what you need):**
```tsx
import Check from 'lucide-react/dist/esm/icons/check'
import X from 'lucide-react/dist/esm/icons/x'
import Menu from 'lucide-react/dist/esm/icons/menu'
// Loads only 3 modules (~2KB vs ~1MB)
import Button from '@mui/material/Button'
import TextField from '@mui/material/TextField'
// Loads only what you use
```
**Alternative (Next.js 13.5+):**
```js
// next.config.js - use optimizePackageImports
module.exports = {
experimental: {
optimizePackageImports: ['lucide-react', '@mui/material']
}
}
// Then you can keep the ergonomic barrel imports:
import { Check, X, Menu } from 'lucide-react'
// Automatically transformed to direct imports at build time
```
Direct imports provide 15-70% faster dev boot, 28% faster builds, 40% faster cold starts, and significantly faster HMR.
Libraries commonly affected: `lucide-react`, `@mui/material`, `@mui/icons-material`, `@tabler/icons-react`, `react-icons`, `@headlessui/react`, `@radix-ui/react-*`, `lodash`, `ramda`, `date-fns`, `rxjs`, `react-use`.
Reference: [How we optimized package imports in Next.js](https://vercel.com/blog/how-we-optimized-package-imports-in-next-js)

View File

@@ -1,31 +0,0 @@
---
title: Conditional Module Loading
impact: HIGH
impactDescription: loads large data only when needed
tags: bundle, conditional-loading, lazy-loading
---
## Conditional Module Loading
Load large data or modules only when a feature is activated.
**Example (lazy-load animation frames):**
```tsx
function AnimationPlayer({ enabled }: { enabled: boolean }) {
const [frames, setFrames] = useState<Frame[] | null>(null)
useEffect(() => {
if (enabled && !frames && typeof window !== 'undefined') {
import('./animation-frames.js')
.then(mod => setFrames(mod.frames))
.catch(() => setEnabled(false))
}
}, [enabled, frames])
if (!frames) return <Skeleton />
return <Canvas frames={frames} />
}
```
The `typeof window !== 'undefined'` check prevents bundling this module for SSR, optimizing server bundle size and build speed.

View File

@@ -1,49 +0,0 @@
---
title: Defer Non-Critical Third-Party Libraries
impact: MEDIUM
impactDescription: loads after hydration
tags: bundle, third-party, analytics, defer
---
## Defer Non-Critical Third-Party Libraries
Analytics, logging, and error tracking don't block user interaction. Load them after hydration.
**Incorrect (blocks initial bundle):**
```tsx
import { Analytics } from '@vercel/analytics/react'
export default function RootLayout({ children }) {
return (
<html>
<body>
{children}
<Analytics />
</body>
</html>
)
}
```
**Correct (loads after hydration):**
```tsx
import dynamic from 'next/dynamic'
const Analytics = dynamic(
() => import('@vercel/analytics/react').then(m => m.Analytics),
{ ssr: false }
)
export default function RootLayout({ children }) {
return (
<html>
<body>
{children}
<Analytics />
</body>
</html>
)
}
```

View File

@@ -1,35 +0,0 @@
---
title: Dynamic Imports for Heavy Components
impact: CRITICAL
impactDescription: directly affects TTI and LCP
tags: bundle, dynamic-import, code-splitting, next-dynamic
---
## Dynamic Imports for Heavy Components
Use `next/dynamic` to lazy-load large components not needed on initial render.
**Incorrect (Monaco bundles with main chunk ~300KB):**
```tsx
import { MonacoEditor } from './monaco-editor'
function CodePanel({ code }: { code: string }) {
return <MonacoEditor value={code} />
}
```
**Correct (Monaco loads on demand):**
```tsx
import dynamic from 'next/dynamic'
const MonacoEditor = dynamic(
() => import('./monaco-editor').then(m => m.MonacoEditor),
{ ssr: false }
)
function CodePanel({ code }: { code: string }) {
return <MonacoEditor value={code} />
}
```

View File

@@ -1,50 +0,0 @@
---
title: Preload Based on User Intent
impact: MEDIUM
impactDescription: reduces perceived latency
tags: bundle, preload, user-intent, hover
---
## Preload Based on User Intent
Preload heavy bundles before they're needed to reduce perceived latency.
**Example (preload on hover/focus):**
```tsx
function EditorButton({ onClick }: { onClick: () => void }) {
const preload = () => {
if (typeof window !== 'undefined') {
void import('./monaco-editor')
}
}
return (
<button
onMouseEnter={preload}
onFocus={preload}
onClick={onClick}
>
Open Editor
</button>
)
}
```
**Example (preload when feature flag is enabled):**
```tsx
function FlagsProvider({ children, flags }: Props) {
useEffect(() => {
if (flags.editorEnabled && typeof window !== 'undefined') {
void import('./monaco-editor').then(mod => mod.init())
}
}, [flags.editorEnabled])
return <FlagsContext.Provider value={flags}>
{children}
</FlagsContext.Provider>
}
```
The `typeof window !== 'undefined'` check prevents bundling preloaded modules for SSR, optimizing server bundle size and build speed.

View File

@@ -1,74 +0,0 @@
---
title: Deduplicate Global Event Listeners
impact: LOW
impactDescription: single listener for N components
tags: client, swr, event-listeners, subscription
---
## Deduplicate Global Event Listeners
Use `useSWRSubscription()` to share global event listeners across component instances.
**Incorrect (N instances = N listeners):**
```tsx
function useKeyboardShortcut(key: string, callback: () => void) {
useEffect(() => {
const handler = (e: KeyboardEvent) => {
if (e.metaKey && e.key === key) {
callback()
}
}
window.addEventListener('keydown', handler)
return () => window.removeEventListener('keydown', handler)
}, [key, callback])
}
```
When using the `useKeyboardShortcut` hook multiple times, each instance will register a new listener.
**Correct (N instances = 1 listener):**
```tsx
import useSWRSubscription from 'swr/subscription'
// Module-level Map to track callbacks per key
const keyCallbacks = new Map<string, Set<() => void>>()
function useKeyboardShortcut(key: string, callback: () => void) {
// Register this callback in the Map
useEffect(() => {
if (!keyCallbacks.has(key)) {
keyCallbacks.set(key, new Set())
}
keyCallbacks.get(key)!.add(callback)
return () => {
const set = keyCallbacks.get(key)
if (set) {
set.delete(callback)
if (set.size === 0) {
keyCallbacks.delete(key)
}
}
}
}, [key, callback])
useSWRSubscription('global-keydown', () => {
const handler = (e: KeyboardEvent) => {
if (e.metaKey && keyCallbacks.has(e.key)) {
keyCallbacks.get(e.key)!.forEach(cb => cb())
}
}
window.addEventListener('keydown', handler)
return () => window.removeEventListener('keydown', handler)
})
}
function Profile() {
// Multiple shortcuts will share the same listener
useKeyboardShortcut('p', () => { /* ... */ })
useKeyboardShortcut('k', () => { /* ... */ })
// ...
}
```

View File

@@ -1,56 +0,0 @@
---
title: Use SWR for Automatic Deduplication
impact: MEDIUM-HIGH
impactDescription: automatic deduplication
tags: client, swr, deduplication, data-fetching
---
## Use SWR for Automatic Deduplication
SWR enables request deduplication, caching, and revalidation across component instances.
**Incorrect (no deduplication, each instance fetches):**
```tsx
function UserList() {
const [users, setUsers] = useState([])
useEffect(() => {
fetch('/api/users')
.then(r => r.json())
.then(setUsers)
}, [])
}
```
**Correct (multiple instances share one request):**
```tsx
import useSWR from 'swr'
function UserList() {
const { data: users } = useSWR('/api/users', fetcher)
}
```
**For immutable data:**
```tsx
import { useImmutableSWR } from '@/lib/swr'
function StaticContent() {
const { data } = useImmutableSWR('/api/config', fetcher)
}
```
**For mutations:**
```tsx
import { useSWRMutation } from 'swr/mutation'
function UpdateButton() {
const { trigger } = useSWRMutation('/api/user', updateUser)
return <button onClick={() => trigger()}>Update</button>
}
```
Reference: [https://swr.vercel.app](https://swr.vercel.app)

View File

@@ -1,82 +0,0 @@
---
title: Batch DOM CSS Changes
impact: MEDIUM
impactDescription: reduces reflows/repaints
tags: javascript, dom, css, performance, reflow
---
## Batch DOM CSS Changes
Avoid changing styles one property at a time. Group multiple CSS changes together via classes or `cssText` to minimize browser reflows.
**Incorrect (multiple reflows):**
```typescript
function updateElementStyles(element: HTMLElement) {
// Each line triggers a reflow
element.style.width = '100px'
element.style.height = '200px'
element.style.backgroundColor = 'blue'
element.style.border = '1px solid black'
}
```
**Correct (add class - single reflow):**
```typescript
// CSS file
.highlighted-box {
width: 100px;
height: 200px;
background-color: blue;
border: 1px solid black;
}
// JavaScript
function updateElementStyles(element: HTMLElement) {
element.classList.add('highlighted-box')
}
```
**Correct (change cssText - single reflow):**
```typescript
function updateElementStyles(element: HTMLElement) {
element.style.cssText = `
width: 100px;
height: 200px;
background-color: blue;
border: 1px solid black;
`
}
```
**React example:**
```tsx
// Incorrect: changing styles one by one
function Box({ isHighlighted }: { isHighlighted: boolean }) {
const ref = useRef<HTMLDivElement>(null)
useEffect(() => {
if (ref.current && isHighlighted) {
ref.current.style.width = '100px'
ref.current.style.height = '200px'
ref.current.style.backgroundColor = 'blue'
}
}, [isHighlighted])
return <div ref={ref}>Content</div>
}
// Correct: toggle class
function Box({ isHighlighted }: { isHighlighted: boolean }) {
return (
<div className={isHighlighted ? 'highlighted-box' : ''}>
Content
</div>
)
}
```
Prefer CSS classes over inline styles when possible. Classes are cached by the browser and provide better separation of concerns.

View File

@@ -1,80 +0,0 @@
---
title: Cache Repeated Function Calls
impact: MEDIUM
impactDescription: avoid redundant computation
tags: javascript, cache, memoization, performance
---
## Cache Repeated Function Calls
Use a module-level Map to cache function results when the same function is called repeatedly with the same inputs during render.
**Incorrect (redundant computation):**
```typescript
function ProjectList({ projects }: { projects: Project[] }) {
return (
<div>
{projects.map(project => {
// slugify() called 100+ times for same project names
const slug = slugify(project.name)
return <ProjectCard key={project.id} slug={slug} />
})}
</div>
)
}
```
**Correct (cached results):**
```typescript
// Module-level cache
const slugifyCache = new Map<string, string>()
function cachedSlugify(text: string): string {
if (slugifyCache.has(text)) {
return slugifyCache.get(text)!
}
const result = slugify(text)
slugifyCache.set(text, result)
return result
}
function ProjectList({ projects }: { projects: Project[] }) {
return (
<div>
{projects.map(project => {
// Computed only once per unique project name
const slug = cachedSlugify(project.name)
return <ProjectCard key={project.id} slug={slug} />
})}
</div>
)
}
```
**Simpler pattern for single-value functions:**
```typescript
let isLoggedInCache: boolean | null = null
function isLoggedIn(): boolean {
if (isLoggedInCache !== null) {
return isLoggedInCache
}
isLoggedInCache = document.cookie.includes('auth=')
return isLoggedInCache
}
// Clear cache when auth changes
function onAuthChange() {
isLoggedInCache = null
}
```
Use a Map (not a hook) so it works everywhere: utilities, event handlers, not just React components.
Reference: [How we made the Vercel Dashboard twice as fast](https://vercel.com/blog/how-we-made-the-vercel-dashboard-twice-as-fast)

View File

@@ -1,28 +0,0 @@
---
title: Cache Property Access in Loops
impact: LOW-MEDIUM
impactDescription: reduces lookups
tags: javascript, loops, optimization, caching
---
## Cache Property Access in Loops
Cache object property lookups in hot paths.
**Incorrect (3 lookups × N iterations):**
```typescript
for (let i = 0; i < arr.length; i++) {
process(obj.config.settings.value)
}
```
**Correct (1 lookup total):**
```typescript
const value = obj.config.settings.value
const len = arr.length
for (let i = 0; i < len; i++) {
process(value)
}
```

View File

@@ -1,70 +0,0 @@
---
title: Cache Storage API Calls
impact: LOW-MEDIUM
impactDescription: reduces expensive I/O
tags: javascript, localStorage, storage, caching, performance
---
## Cache Storage API Calls
`localStorage`, `sessionStorage`, and `document.cookie` are synchronous and expensive. Cache reads in memory.
**Incorrect (reads storage on every call):**
```typescript
function getTheme() {
return localStorage.getItem('theme') ?? 'light'
}
// Called 10 times = 10 storage reads
```
**Correct (Map cache):**
```typescript
const storageCache = new Map<string, string | null>()
function getLocalStorage(key: string) {
if (!storageCache.has(key)) {
storageCache.set(key, localStorage.getItem(key))
}
return storageCache.get(key)
}
function setLocalStorage(key: string, value: string) {
localStorage.setItem(key, value)
storageCache.set(key, value) // keep cache in sync
}
```
Use a Map (not a hook) so it works everywhere: utilities, event handlers, not just React components.
**Cookie caching:**
```typescript
let cookieCache: Record<string, string> | null = null
function getCookie(name: string) {
if (!cookieCache) {
cookieCache = Object.fromEntries(
document.cookie.split('; ').map(c => c.split('='))
)
}
return cookieCache[name]
}
```
**Important (invalidate on external changes):**
If storage can change externally (another tab, server-set cookies), invalidate cache:
```typescript
window.addEventListener('storage', (e) => {
if (e.key) storageCache.delete(e.key)
})
document.addEventListener('visibilitychange', () => {
if (document.visibilityState === 'visible') {
storageCache.clear()
}
})
```

View File

@@ -1,32 +0,0 @@
---
title: Combine Multiple Array Iterations
impact: LOW-MEDIUM
impactDescription: reduces iterations
tags: javascript, arrays, loops, performance
---
## Combine Multiple Array Iterations
Multiple `.filter()` or `.map()` calls iterate the array multiple times. Combine into one loop.
**Incorrect (3 iterations):**
```typescript
const admins = users.filter(u => u.isAdmin)
const testers = users.filter(u => u.isTester)
const inactive = users.filter(u => !u.isActive)
```
**Correct (1 iteration):**
```typescript
const admins: User[] = []
const testers: User[] = []
const inactive: User[] = []
for (const user of users) {
if (user.isAdmin) admins.push(user)
if (user.isTester) testers.push(user)
if (!user.isActive) inactive.push(user)
}
```

View File

@@ -1,50 +0,0 @@
---
title: Early Return from Functions
impact: LOW-MEDIUM
impactDescription: avoids unnecessary computation
tags: javascript, functions, optimization, early-return
---
## Early Return from Functions
Return early when result is determined to skip unnecessary processing.
**Incorrect (processes all items even after finding answer):**
```typescript
function validateUsers(users: User[]) {
let hasError = false
let errorMessage = ''
for (const user of users) {
if (!user.email) {
hasError = true
errorMessage = 'Email required'
}
if (!user.name) {
hasError = true
errorMessage = 'Name required'
}
// Continues checking all users even after error found
}
return hasError ? { valid: false, error: errorMessage } : { valid: true }
}
```
**Correct (returns immediately on first error):**
```typescript
function validateUsers(users: User[]) {
for (const user of users) {
if (!user.email) {
return { valid: false, error: 'Email required' }
}
if (!user.name) {
return { valid: false, error: 'Name required' }
}
}
return { valid: true }
}
```

View File

@@ -1,45 +0,0 @@
---
title: Hoist RegExp Creation
impact: LOW-MEDIUM
impactDescription: avoids recreation
tags: javascript, regexp, optimization, memoization
---
## Hoist RegExp Creation
Don't create RegExp inside render. Hoist to module scope or memoize with `useMemo()`.
**Incorrect (new RegExp every render):**
```tsx
function Highlighter({ text, query }: Props) {
const regex = new RegExp(`(${query})`, 'gi')
const parts = text.split(regex)
return <>{parts.map((part, i) => ...)}</>
}
```
**Correct (memoize or hoist):**
```tsx
const EMAIL_REGEX = /^[^\s@]+@[^\s@]+\.[^\s@]+$/
function Highlighter({ text, query }: Props) {
const regex = useMemo(
() => new RegExp(`(${escapeRegex(query)})`, 'gi'),
[query]
)
const parts = text.split(regex)
return <>{parts.map((part, i) => ...)}</>
}
```
**Warning (global regex has mutable state):**
Global regex (`/g`) has mutable `lastIndex` state:
```typescript
const regex = /foo/g
regex.test('foo') // true, lastIndex = 3
regex.test('foo') // false, lastIndex = 0
```

View File

@@ -1,37 +0,0 @@
---
title: Build Index Maps for Repeated Lookups
impact: LOW-MEDIUM
impactDescription: 1M ops to 2K ops
tags: javascript, map, indexing, optimization, performance
---
## Build Index Maps for Repeated Lookups
Multiple `.find()` calls by the same key should use a Map.
**Incorrect (O(n) per lookup):**
```typescript
function processOrders(orders: Order[], users: User[]) {
return orders.map(order => ({
...order,
user: users.find(u => u.id === order.userId)
}))
}
```
**Correct (O(1) per lookup):**
```typescript
function processOrders(orders: Order[], users: User[]) {
const userById = new Map(users.map(u => [u.id, u]))
return orders.map(order => ({
...order,
user: userById.get(order.userId)
}))
}
```
Build map once (O(n)), then all lookups are O(1).
For 1000 orders × 1000 users: 1M ops → 2K ops.

View File

@@ -1,49 +0,0 @@
---
title: Early Length Check for Array Comparisons
impact: MEDIUM-HIGH
impactDescription: avoids expensive operations when lengths differ
tags: javascript, arrays, performance, optimization, comparison
---
## Early Length Check for Array Comparisons
When comparing arrays with expensive operations (sorting, deep equality, serialization), check lengths first. If lengths differ, the arrays cannot be equal.
In real-world applications, this optimization is especially valuable when the comparison runs in hot paths (event handlers, render loops).
**Incorrect (always runs expensive comparison):**
```typescript
function hasChanges(current: string[], original: string[]) {
// Always sorts and joins, even when lengths differ
return current.sort().join() !== original.sort().join()
}
```
Two O(n log n) sorts run even when `current.length` is 5 and `original.length` is 100. There is also overhead of joining the arrays and comparing the strings.
**Correct (O(1) length check first):**
```typescript
function hasChanges(current: string[], original: string[]) {
// Early return if lengths differ
if (current.length !== original.length) {
return true
}
// Only sort/join when lengths match
const currentSorted = current.toSorted()
const originalSorted = original.toSorted()
for (let i = 0; i < currentSorted.length; i++) {
if (currentSorted[i] !== originalSorted[i]) {
return true
}
}
return false
}
```
This new approach is more efficient because:
- It avoids the overhead of sorting and joining the arrays when lengths differ
- It avoids consuming memory for the joined strings (especially important for large arrays)
- It avoids mutating the original arrays
- It returns early when a difference is found

View File

@@ -1,82 +0,0 @@
---
title: Use Loop for Min/Max Instead of Sort
impact: LOW
impactDescription: O(n) instead of O(n log n)
tags: javascript, arrays, performance, sorting, algorithms
---
## Use Loop for Min/Max Instead of Sort
Finding the smallest or largest element only requires a single pass through the array. Sorting is wasteful and slower.
**Incorrect (O(n log n) - sort to find latest):**
```typescript
interface Project {
id: string
name: string
updatedAt: number
}
function getLatestProject(projects: Project[]) {
const sorted = [...projects].sort((a, b) => b.updatedAt - a.updatedAt)
return sorted[0]
}
```
Sorts the entire array just to find the maximum value.
**Incorrect (O(n log n) - sort for oldest and newest):**
```typescript
function getOldestAndNewest(projects: Project[]) {
const sorted = [...projects].sort((a, b) => a.updatedAt - b.updatedAt)
return { oldest: sorted[0], newest: sorted[sorted.length - 1] }
}
```
Still sorts unnecessarily when only min/max are needed.
**Correct (O(n) - single loop):**
```typescript
function getLatestProject(projects: Project[]) {
if (projects.length === 0) return null
let latest = projects[0]
for (let i = 1; i < projects.length; i++) {
if (projects[i].updatedAt > latest.updatedAt) {
latest = projects[i]
}
}
return latest
}
function getOldestAndNewest(projects: Project[]) {
if (projects.length === 0) return { oldest: null, newest: null }
let oldest = projects[0]
let newest = projects[0]
for (let i = 1; i < projects.length; i++) {
if (projects[i].updatedAt < oldest.updatedAt) oldest = projects[i]
if (projects[i].updatedAt > newest.updatedAt) newest = projects[i]
}
return { oldest, newest }
}
```
Single pass through the array, no copying, no sorting.
**Alternative (Math.min/Math.max for small arrays):**
```typescript
const numbers = [5, 2, 8, 1, 9]
const min = Math.min(...numbers)
const max = Math.max(...numbers)
```
This works for small arrays but can be slower for very large arrays due to spread operator limitations. Use the loop approach for reliability.

View File

@@ -1,24 +0,0 @@
---
title: Use Set/Map for O(1) Lookups
impact: LOW-MEDIUM
impactDescription: O(n) to O(1)
tags: javascript, set, map, data-structures, performance
---
## Use Set/Map for O(1) Lookups
Convert arrays to Set/Map for repeated membership checks.
**Incorrect (O(n) per check):**
```typescript
const allowedIds = ['a', 'b', 'c', ...]
items.filter(item => allowedIds.includes(item.id))
```
**Correct (O(1) per check):**
```typescript
const allowedIds = new Set(['a', 'b', 'c', ...])
items.filter(item => allowedIds.has(item.id))
```

View File

@@ -1,57 +0,0 @@
---
title: Use toSorted() Instead of sort() for Immutability
impact: MEDIUM-HIGH
impactDescription: prevents mutation bugs in React state
tags: javascript, arrays, immutability, react, state, mutation
---
## Use toSorted() Instead of sort() for Immutability
`.sort()` mutates the array in place, which can cause bugs with React state and props. Use `.toSorted()` to create a new sorted array without mutation.
**Incorrect (mutates original array):**
```typescript
function UserList({ users }: { users: User[] }) {
// Mutates the users prop array!
const sorted = useMemo(
() => users.sort((a, b) => a.name.localeCompare(b.name)),
[users]
)
return <div>{sorted.map(renderUser)}</div>
}
```
**Correct (creates new array):**
```typescript
function UserList({ users }: { users: User[] }) {
// Creates new sorted array, original unchanged
const sorted = useMemo(
() => users.toSorted((a, b) => a.name.localeCompare(b.name)),
[users]
)
return <div>{sorted.map(renderUser)}</div>
}
```
**Why this matters in React:**
1. Props/state mutations break React's immutability model - React expects props and state to be treated as read-only
2. Causes stale closure bugs - Mutating arrays inside closures (callbacks, effects) can lead to unexpected behavior
**Browser support (fallback for older browsers):**
`.toSorted()` is available in all modern browsers (Chrome 110+, Safari 16+, Firefox 115+, Node.js 20+). For older environments, use spread operator:
```typescript
// Fallback for older browsers
const sorted = [...items].sort((a, b) => a.value - b.value)
```
**Other immutable array methods:**
- `.toSorted()` - immutable sort
- `.toReversed()` - immutable reverse
- `.toSpliced()` - immutable splice
- `.with()` - immutable element replacement

View File

@@ -1,26 +0,0 @@
---
title: Use Activity Component for Show/Hide
impact: MEDIUM
impactDescription: preserves state/DOM
tags: rendering, activity, visibility, state-preservation
---
## Use Activity Component for Show/Hide
Use React's `<Activity>` to preserve state/DOM for expensive components that frequently toggle visibility.
**Usage:**
```tsx
import { Activity } from 'react'
function Dropdown({ isOpen }: Props) {
return (
<Activity mode={isOpen ? 'visible' : 'hidden'}>
<ExpensiveMenu />
</Activity>
)
}
```
Avoids expensive re-renders and state loss.

View File

@@ -1,47 +0,0 @@
---
title: Animate SVG Wrapper Instead of SVG Element
impact: LOW
impactDescription: enables hardware acceleration
tags: rendering, svg, css, animation, performance
---
## Animate SVG Wrapper Instead of SVG Element
Many browsers don't have hardware acceleration for CSS3 animations on SVG elements. Wrap SVG in a `<div>` and animate the wrapper instead.
**Incorrect (animating SVG directly - no hardware acceleration):**
```tsx
function LoadingSpinner() {
return (
<svg
className="animate-spin"
width="24"
height="24"
viewBox="0 0 24 24"
>
<circle cx="12" cy="12" r="10" stroke="currentColor" />
</svg>
)
}
```
**Correct (animating wrapper div - hardware accelerated):**
```tsx
function LoadingSpinner() {
return (
<div className="animate-spin">
<svg
width="24"
height="24"
viewBox="0 0 24 24"
>
<circle cx="12" cy="12" r="10" stroke="currentColor" />
</svg>
</div>
)
}
```
This applies to all CSS transforms and transitions (`transform`, `opacity`, `translate`, `scale`, `rotate`). The wrapper div allows browsers to use GPU acceleration for smoother animations.

View File

@@ -1,40 +0,0 @@
---
title: Use Explicit Conditional Rendering
impact: LOW
impactDescription: prevents rendering 0 or NaN
tags: rendering, conditional, jsx, falsy-values
---
## Use Explicit Conditional Rendering
Use explicit ternary operators (`? :`) instead of `&&` for conditional rendering when the condition can be `0`, `NaN`, or other falsy values that render.
**Incorrect (renders "0" when count is 0):**
```tsx
function Badge({ count }: { count: number }) {
return (
<div>
{count && <span className="badge">{count}</span>}
</div>
)
}
// When count = 0, renders: <div>0</div>
// When count = 5, renders: <div><span class="badge">5</span></div>
```
**Correct (renders nothing when count is 0):**
```tsx
function Badge({ count }: { count: number }) {
return (
<div>
{count > 0 ? <span className="badge">{count}</span> : null}
</div>
)
}
// When count = 0, renders: <div></div>
// When count = 5, renders: <div><span class="badge">5</span></div>
```

View File

@@ -1,38 +0,0 @@
---
title: CSS content-visibility for Long Lists
impact: HIGH
impactDescription: faster initial render
tags: rendering, css, content-visibility, long-lists
---
## CSS content-visibility for Long Lists
Apply `content-visibility: auto` to defer off-screen rendering.
**CSS:**
```css
.message-item {
content-visibility: auto;
contain-intrinsic-size: 0 80px;
}
```
**Example:**
```tsx
function MessageList({ messages }: { messages: Message[] }) {
return (
<div className="overflow-y-auto h-screen">
{messages.map(msg => (
<div key={msg.id} className="message-item">
<Avatar user={msg.author} />
<div>{msg.content}</div>
</div>
))}
</div>
)
}
```
For 1000 messages, browser skips layout/paint for ~990 off-screen items (10× faster initial render).

View File

@@ -1,46 +0,0 @@
---
title: Hoist Static JSX Elements
impact: LOW
impactDescription: avoids re-creation
tags: rendering, jsx, static, optimization
---
## Hoist Static JSX Elements
Extract static JSX outside components to avoid re-creation.
**Incorrect (recreates element every render):**
```tsx
function LoadingSkeleton() {
return <div className="animate-pulse h-20 bg-gray-200" />
}
function Container() {
return (
<div>
{loading && <LoadingSkeleton />}
</div>
)
}
```
**Correct (reuses same element):**
```tsx
const loadingSkeleton = (
<div className="animate-pulse h-20 bg-gray-200" />
)
function Container() {
return (
<div>
{loading && loadingSkeleton}
</div>
)
}
```
This is especially helpful for large and static SVG nodes, which can be expensive to recreate on every render.
**Note:** If your project has [React Compiler](https://react.dev/learn/react-compiler) enabled, the compiler automatically hoists static JSX elements and optimizes component re-renders, making manual hoisting unnecessary.

View File

@@ -1,82 +0,0 @@
---
title: Prevent Hydration Mismatch Without Flickering
impact: MEDIUM
impactDescription: avoids visual flicker and hydration errors
tags: rendering, ssr, hydration, localStorage, flicker
---
## Prevent Hydration Mismatch Without Flickering
When rendering content that depends on client-side storage (localStorage, cookies), avoid both SSR breakage and post-hydration flickering by injecting a synchronous script that updates the DOM before React hydrates.
**Incorrect (breaks SSR):**
```tsx
function ThemeWrapper({ children }: { children: ReactNode }) {
// localStorage is not available on server - throws error
const theme = localStorage.getItem('theme') || 'light'
return (
<div className={theme}>
{children}
</div>
)
}
```
Server-side rendering will fail because `localStorage` is undefined.
**Incorrect (visual flickering):**
```tsx
function ThemeWrapper({ children }: { children: ReactNode }) {
const [theme, setTheme] = useState('light')
useEffect(() => {
// Runs after hydration - causes visible flash
const stored = localStorage.getItem('theme')
if (stored) {
setTheme(stored)
}
}, [])
return (
<div className={theme}>
{children}
</div>
)
}
```
Component first renders with default value (`light`), then updates after hydration, causing a visible flash of incorrect content.
**Correct (no flicker, no hydration mismatch):**
```tsx
function ThemeWrapper({ children }: { children: ReactNode }) {
return (
<>
<div id="theme-wrapper">
{children}
</div>
<script
dangerouslySetInnerHTML={{
__html: `
(function() {
try {
var theme = localStorage.getItem('theme') || 'light';
var el = document.getElementById('theme-wrapper');
if (el) el.className = theme;
} catch (e) {}
})();
`,
}}
/>
</>
)
}
```
The inline script executes synchronously before showing the element, ensuring the DOM already has the correct value. No flickering, no hydration mismatch.
This pattern is especially useful for theme toggles, user preferences, authentication states, and any client-only data that should render immediately without flashing default values.

View File

@@ -1,28 +0,0 @@
---
title: Optimize SVG Precision
impact: LOW
impactDescription: reduces file size
tags: rendering, svg, optimization, svgo
---
## Optimize SVG Precision
Reduce SVG coordinate precision to decrease file size. The optimal precision depends on the viewBox size, but in general reducing precision should be considered.
**Incorrect (excessive precision):**
```svg
<path d="M 10.293847 20.847362 L 30.938472 40.192837" />
```
**Correct (1 decimal place):**
```svg
<path d="M 10.3 20.8 L 30.9 40.2" />
```
**Automate with SVGO:**
```bash
npx svgo --precision=1 --multipass icon.svg
```

View File

@@ -1,39 +0,0 @@
---
title: Defer State Reads to Usage Point
impact: MEDIUM
impactDescription: avoids unnecessary subscriptions
tags: rerender, searchParams, localStorage, optimization
---
## Defer State Reads to Usage Point
Don't subscribe to dynamic state (searchParams, localStorage) if you only read it inside callbacks.
**Incorrect (subscribes to all searchParams changes):**
```tsx
function ShareButton({ chatId }: { chatId: string }) {
const searchParams = useSearchParams()
const handleShare = () => {
const ref = searchParams.get('ref')
shareChat(chatId, { ref })
}
return <button onClick={handleShare}>Share</button>
}
```
**Correct (reads on demand, no subscription):**
```tsx
function ShareButton({ chatId }: { chatId: string }) {
const handleShare = () => {
const params = new URLSearchParams(window.location.search)
const ref = params.get('ref')
shareChat(chatId, { ref })
}
return <button onClick={handleShare}>Share</button>
}
```

View File

@@ -1,45 +0,0 @@
---
title: Narrow Effect Dependencies
impact: LOW
impactDescription: minimizes effect re-runs
tags: rerender, useEffect, dependencies, optimization
---
## Narrow Effect Dependencies
Specify primitive dependencies instead of objects to minimize effect re-runs.
**Incorrect (re-runs on any user field change):**
```tsx
useEffect(() => {
console.log(user.id)
}, [user])
```
**Correct (re-runs only when id changes):**
```tsx
useEffect(() => {
console.log(user.id)
}, [user.id])
```
**For derived state, compute outside effect:**
```tsx
// Incorrect: runs on width=767, 766, 765...
useEffect(() => {
if (width < 768) {
enableMobileMode()
}
}, [width])
// Correct: runs only on boolean transition
const isMobile = width < 768
useEffect(() => {
if (isMobile) {
enableMobileMode()
}
}, [isMobile])
```

View File

@@ -1,29 +0,0 @@
---
title: Subscribe to Derived State
impact: MEDIUM
impactDescription: reduces re-render frequency
tags: rerender, derived-state, media-query, optimization
---
## Subscribe to Derived State
Subscribe to derived boolean state instead of continuous values to reduce re-render frequency.
**Incorrect (re-renders on every pixel change):**
```tsx
function Sidebar() {
const width = useWindowWidth() // updates continuously
const isMobile = width < 768
return <nav className={isMobile ? 'mobile' : 'desktop'}>
}
```
**Correct (re-renders only when boolean changes):**
```tsx
function Sidebar() {
const isMobile = useMediaQuery('(max-width: 767px)')
return <nav className={isMobile ? 'mobile' : 'desktop'}>
}
```

View File

@@ -1,74 +0,0 @@
---
title: Use Functional setState Updates
impact: MEDIUM
impactDescription: prevents stale closures and unnecessary callback recreations
tags: react, hooks, useState, useCallback, callbacks, closures
---
## Use Functional setState Updates
When updating state based on the current state value, use the functional update form of setState instead of directly referencing the state variable. This prevents stale closures, eliminates unnecessary dependencies, and creates stable callback references.
**Incorrect (requires state as dependency):**
```tsx
function TodoList() {
const [items, setItems] = useState(initialItems)
// Callback must depend on items, recreated on every items change
const addItems = useCallback((newItems: Item[]) => {
setItems([...items, ...newItems])
}, [items]) // ❌ items dependency causes recreations
// Risk of stale closure if dependency is forgotten
const removeItem = useCallback((id: string) => {
setItems(items.filter(item => item.id !== id))
}, []) // ❌ Missing items dependency - will use stale items!
return <ItemsEditor items={items} onAdd={addItems} onRemove={removeItem} />
}
```
The first callback is recreated every time `items` changes, which can cause child components to re-render unnecessarily. The second callback has a stale closure bug—it will always reference the initial `items` value.
**Correct (stable callbacks, no stale closures):**
```tsx
function TodoList() {
const [items, setItems] = useState(initialItems)
// Stable callback, never recreated
const addItems = useCallback((newItems: Item[]) => {
setItems(curr => [...curr, ...newItems])
}, []) // ✅ No dependencies needed
// Always uses latest state, no stale closure risk
const removeItem = useCallback((id: string) => {
setItems(curr => curr.filter(item => item.id !== id))
}, []) // ✅ Safe and stable
return <ItemsEditor items={items} onAdd={addItems} onRemove={removeItem} />
}
```
**Benefits:**
1. **Stable callback references** - Callbacks don't need to be recreated when state changes
2. **No stale closures** - Always operates on the latest state value
3. **Fewer dependencies** - Simplifies dependency arrays and reduces memory leaks
4. **Prevents bugs** - Eliminates the most common source of React closure bugs
**When to use functional updates:**
- Any setState that depends on the current state value
- Inside useCallback/useMemo when state is needed
- Event handlers that reference state
- Async operations that update state
**When direct updates are fine:**
- Setting state to a static value: `setCount(0)`
- Setting state from props/arguments only: `setName(newName)`
- State doesn't depend on previous value
**Note:** If your project has [React Compiler](https://react.dev/learn/react-compiler) enabled, the compiler can automatically optimize some cases, but functional updates are still recommended for correctness and to prevent stale closure bugs.

View File

@@ -1,58 +0,0 @@
---
title: Use Lazy State Initialization
impact: MEDIUM
impactDescription: wasted computation on every render
tags: react, hooks, useState, performance, initialization
---
## Use Lazy State Initialization
Pass a function to `useState` for expensive initial values. Without the function form, the initializer runs on every render even though the value is only used once.
**Incorrect (runs on every render):**
```tsx
function FilteredList({ items }: { items: Item[] }) {
// buildSearchIndex() runs on EVERY render, even after initialization
const [searchIndex, setSearchIndex] = useState(buildSearchIndex(items))
const [query, setQuery] = useState('')
// When query changes, buildSearchIndex runs again unnecessarily
return <SearchResults index={searchIndex} query={query} />
}
function UserProfile() {
// JSON.parse runs on every render
const [settings, setSettings] = useState(
JSON.parse(localStorage.getItem('settings') || '{}')
)
return <SettingsForm settings={settings} onChange={setSettings} />
}
```
**Correct (runs only once):**
```tsx
function FilteredList({ items }: { items: Item[] }) {
// buildSearchIndex() runs ONLY on initial render
const [searchIndex, setSearchIndex] = useState(() => buildSearchIndex(items))
const [query, setQuery] = useState('')
return <SearchResults index={searchIndex} query={query} />
}
function UserProfile() {
// JSON.parse runs only on initial render
const [settings, setSettings] = useState(() => {
const stored = localStorage.getItem('settings')
return stored ? JSON.parse(stored) : {}
})
return <SettingsForm settings={settings} onChange={setSettings} />
}
```
Use lazy initialization when computing initial values from localStorage/sessionStorage, building data structures (indexes, maps), reading from the DOM, or performing heavy transformations.
For simple primitives (`useState(0)`), direct references (`useState(props.value)`), or cheap literals (`useState({})`), the function form is unnecessary.

View File

@@ -1,44 +0,0 @@
---
title: Extract to Memoized Components
impact: MEDIUM
impactDescription: enables early returns
tags: rerender, memo, useMemo, optimization
---
## Extract to Memoized Components
Extract expensive work into memoized components to enable early returns before computation.
**Incorrect (computes avatar even when loading):**
```tsx
function Profile({ user, loading }: Props) {
const avatar = useMemo(() => {
const id = computeAvatarId(user)
return <Avatar id={id} />
}, [user])
if (loading) return <Skeleton />
return <div>{avatar}</div>
}
```
**Correct (skips computation when loading):**
```tsx
const UserAvatar = memo(function UserAvatar({ user }: { user: User }) {
const id = useMemo(() => computeAvatarId(user), [user])
return <Avatar id={id} />
})
function Profile({ user, loading }: Props) {
if (loading) return <Skeleton />
return (
<div>
<UserAvatar user={user} />
</div>
)
}
```
**Note:** If your project has [React Compiler](https://react.dev/learn/react-compiler) enabled, manual memoization with `memo()` and `useMemo()` is not necessary. The compiler automatically optimizes re-renders.

View File

@@ -1,40 +0,0 @@
---
title: Use Transitions for Non-Urgent Updates
impact: MEDIUM
impactDescription: maintains UI responsiveness
tags: rerender, transitions, startTransition, performance
---
## Use Transitions for Non-Urgent Updates
Mark frequent, non-urgent state updates as transitions to maintain UI responsiveness.
**Incorrect (blocks UI on every scroll):**
```tsx
function ScrollTracker() {
const [scrollY, setScrollY] = useState(0)
useEffect(() => {
const handler = () => setScrollY(window.scrollY)
window.addEventListener('scroll', handler, { passive: true })
return () => window.removeEventListener('scroll', handler)
}, [])
}
```
**Correct (non-blocking updates):**
```tsx
import { startTransition } from 'react'
function ScrollTracker() {
const [scrollY, setScrollY] = useState(0)
useEffect(() => {
const handler = () => {
startTransition(() => setScrollY(window.scrollY))
}
window.addEventListener('scroll', handler, { passive: true })
return () => window.removeEventListener('scroll', handler)
}, [])
}
```

View File

@@ -1,73 +0,0 @@
---
title: Use after() for Non-Blocking Operations
impact: MEDIUM
impactDescription: faster response times
tags: server, async, logging, analytics, side-effects
---
## Use after() for Non-Blocking Operations
Use Next.js's `after()` to schedule work that should execute after a response is sent. This prevents logging, analytics, and other side effects from blocking the response.
**Incorrect (blocks response):**
```tsx
import { logUserAction } from '@/app/utils'
export async function POST(request: Request) {
// Perform mutation
await updateDatabase(request)
// Logging blocks the response
const userAgent = request.headers.get('user-agent') || 'unknown'
await logUserAction({ userAgent })
return new Response(JSON.stringify({ status: 'success' }), {
status: 200,
headers: { 'Content-Type': 'application/json' }
})
}
```
**Correct (non-blocking):**
```tsx
import { after } from 'next/server'
import { headers, cookies } from 'next/headers'
import { logUserAction } from '@/app/utils'
export async function POST(request: Request) {
// Perform mutation
await updateDatabase(request)
// Log after response is sent
after(async () => {
const userAgent = (await headers()).get('user-agent') || 'unknown'
const sessionCookie = (await cookies()).get('session-id')?.value || 'anonymous'
logUserAction({ sessionCookie, userAgent })
})
return new Response(JSON.stringify({ status: 'success' }), {
status: 200,
headers: { 'Content-Type': 'application/json' }
})
}
```
The response is sent immediately while logging happens in the background.
**Common use cases:**
- Analytics tracking
- Audit logging
- Sending notifications
- Cache invalidation
- Cleanup tasks
**Important notes:**
- `after()` runs even if the response fails or redirects
- Works in Server Actions, Route Handlers, and Server Components
Reference: [https://nextjs.org/docs/app/api-reference/functions/after](https://nextjs.org/docs/app/api-reference/functions/after)

View File

@@ -1,41 +0,0 @@
---
title: Cross-Request LRU Caching
impact: HIGH
impactDescription: caches across requests
tags: server, cache, lru, cross-request
---
## Cross-Request LRU Caching
`React.cache()` only works within one request. For data shared across sequential requests (user clicks button A then button B), use an LRU cache.
**Implementation:**
```typescript
import { LRUCache } from 'lru-cache'
const cache = new LRUCache<string, any>({
max: 1000,
ttl: 5 * 60 * 1000 // 5 minutes
})
export async function getUser(id: string) {
const cached = cache.get(id)
if (cached) return cached
const user = await db.user.findUnique({ where: { id } })
cache.set(id, user)
return user
}
// Request 1: DB query, result cached
// Request 2: cache hit, no DB query
```
Use when sequential user actions hit multiple endpoints needing the same data within seconds.
**With Vercel's [Fluid Compute](https://vercel.com/docs/fluid-compute):** LRU caching is especially effective because multiple concurrent requests can share the same function instance and cache. This means the cache persists across requests without needing external storage like Redis.
**In traditional serverless:** Each invocation runs in isolation, so consider Redis for cross-process caching.
Reference: [https://github.com/isaacs/node-lru-cache](https://github.com/isaacs/node-lru-cache)

View File

@@ -1,26 +0,0 @@
---
title: Per-Request Deduplication with React.cache()
impact: MEDIUM
impactDescription: deduplicates within request
tags: server, cache, react-cache, deduplication
---
## Per-Request Deduplication with React.cache()
Use `React.cache()` for server-side request deduplication. Authentication and database queries benefit most.
**Usage:**
```typescript
import { cache } from 'react'
export const getCurrentUser = cache(async () => {
const session = await auth()
if (!session?.user?.id) return null
return await db.user.findUnique({
where: { id: session.user.id }
})
})
```
Within a single request, multiple calls to `getCurrentUser()` execute the query only once.

View File

@@ -1,79 +0,0 @@
---
title: Parallel Data Fetching with Component Composition
impact: CRITICAL
impactDescription: eliminates server-side waterfalls
tags: server, rsc, parallel-fetching, composition
---
## Parallel Data Fetching with Component Composition
React Server Components execute sequentially within a tree. Restructure with composition to parallelize data fetching.
**Incorrect (Sidebar waits for Page's fetch to complete):**
```tsx
export default async function Page() {
const header = await fetchHeader()
return (
<div>
<div>{header}</div>
<Sidebar />
</div>
)
}
async function Sidebar() {
const items = await fetchSidebarItems()
return <nav>{items.map(renderItem)}</nav>
}
```
**Correct (both fetch simultaneously):**
```tsx
async function Header() {
const data = await fetchHeader()
return <div>{data}</div>
}
async function Sidebar() {
const items = await fetchSidebarItems()
return <nav>{items.map(renderItem)}</nav>
}
export default function Page() {
return (
<div>
<Header />
<Sidebar />
</div>
)
}
```
**Alternative with children prop:**
```tsx
async function Layout({ children }: { children: ReactNode }) {
const header = await fetchHeader()
return (
<div>
<div>{header}</div>
{children}
</div>
)
}
async function Sidebar() {
const items = await fetchSidebarItems()
return <nav>{items.map(renderItem)}</nav>
}
export default function Page() {
return (
<Layout>
<Sidebar />
</Layout>
)
}
```

View File

@@ -1,38 +0,0 @@
---
title: Minimize Serialization at RSC Boundaries
impact: HIGH
impactDescription: reduces data transfer size
tags: server, rsc, serialization, props
---
## Minimize Serialization at RSC Boundaries
The React Server/Client boundary serializes all object properties into strings and embeds them in the HTML response and subsequent RSC requests. This serialized data directly impacts page weight and load time, so **size matters a lot**. Only pass fields that the client actually uses.
**Incorrect (serializes all 50 fields):**
```tsx
async function Page() {
const user = await fetchUser() // 50 fields
return <Profile user={user} />
}
'use client'
function Profile({ user }: { user: User }) {
return <div>{user.name}</div> // uses 1 field
}
```
**Correct (serializes only 1 field):**
```tsx
async function Page() {
const user = await fetchUser()
return <Profile name={user.name} />
}
'use client'
function Profile({ name }: { name: string }) {
return <div>{name}</div>
}
```

View File

@@ -1,9 +1,6 @@
# Ignore everything by default, selectively add things to context
*
# Documentation (for embeddings/search)
!docs/
# Platform - Libs
!autogpt_platform/autogpt_libs/autogpt_libs/
!autogpt_platform/autogpt_libs/pyproject.toml
@@ -18,8 +15,6 @@
!autogpt_platform/backend/pyproject.toml
!autogpt_platform/backend/poetry.lock
!autogpt_platform/backend/README.md
!autogpt_platform/backend/.env
!autogpt_platform/backend/gen_prisma_types_stub.py
# Platform - Market
!autogpt_platform/market/market/
@@ -32,7 +27,6 @@
# Platform - Frontend
!autogpt_platform/frontend/src/
!autogpt_platform/frontend/public/
!autogpt_platform/frontend/scripts/
!autogpt_platform/frontend/package.json
!autogpt_platform/frontend/pnpm-lock.yaml
!autogpt_platform/frontend/tsconfig.json
@@ -40,7 +34,6 @@
## config
!autogpt_platform/frontend/*.config.*
!autogpt_platform/frontend/.env.*
!autogpt_platform/frontend/.env
# Classic - AutoGPT
!classic/original_autogpt/autogpt/

View File

@@ -24,8 +24,7 @@
</details>
#### For configuration changes:
- [ ] `.env.default` is updated or already compatible with my changes
- [ ] `.env.example` is updated or already compatible with my changes
- [ ] `docker-compose.yml` is updated or already compatible with my changes
- [ ] I have included a list of my configuration changes in the PR description (under **Changes**)

View File

@@ -1,322 +0,0 @@
# GitHub Copilot Instructions for AutoGPT
This file provides comprehensive onboarding information for GitHub Copilot coding agent to work efficiently with the AutoGPT repository.
## Repository Overview
**AutoGPT** is a powerful platform for creating, deploying, and managing continuous AI agents that automate complex workflows. This is a large monorepo (~150MB) containing multiple components:
- **AutoGPT Platform** (`autogpt_platform/`) - Main focus: Modern AI agent platform (Polyform Shield License)
- **Classic AutoGPT** (`classic/`) - Legacy agent system (MIT License)
- **Documentation** (`docs/`) - MkDocs-based documentation site
- **Infrastructure** - Docker configurations, CI/CD, and development tools
**Primary Languages & Frameworks:**
- **Backend**: Python 3.10-3.13, FastAPI, Prisma ORM, PostgreSQL, RabbitMQ
- **Frontend**: TypeScript, Next.js 15, React, Tailwind CSS, Radix UI
- **Development**: Docker, Poetry, pnpm, Playwright, Storybook
## Build and Validation Instructions
### Essential Setup Commands
**Always run these commands in the correct directory and in this order:**
1. **Initial Setup** (required once):
```bash
# Clone and enter repository
git clone <repo> && cd AutoGPT
# Start all services (database, redis, rabbitmq, clamav)
cd autogpt_platform && docker compose --profile local up deps --build --detach
```
2. **Backend Setup** (always run before backend development):
```bash
cd autogpt_platform/backend
poetry install # Install dependencies
poetry run prisma migrate dev # Run database migrations
poetry run prisma generate # Generate Prisma client
```
3. **Frontend Setup** (always run before frontend development):
```bash
cd autogpt_platform/frontend
pnpm install # Install dependencies
```
### Runtime Requirements
**Critical:** Always ensure Docker services are running before starting development:
```bash
cd autogpt_platform && docker compose --profile local up deps --build --detach
```
**Python Version:** Use Python 3.11 (required; managed by Poetry via pyproject.toml)
**Node.js Version:** Use Node.js 21+ with pnpm package manager
### Development Commands
**Backend Development:**
```bash
cd autogpt_platform/backend
poetry run serve # Start development server (port 8000)
poetry run test # Run all tests (requires ~5 minutes)
poetry run pytest path/to/test.py # Run specific test
poetry run format # Format code (Black + isort) - always run first
poetry run lint # Lint code (ruff) - run after format
```
**Frontend Development:**
```bash
cd autogpt_platform/frontend
pnpm dev # Start development server (port 3000) - use for active development
pnpm build # Build for production (only needed for E2E tests or deployment)
pnpm test # Run Playwright E2E tests (requires build first)
pnpm test-ui # Run tests with UI
pnpm format # Format and lint code
pnpm storybook # Start component development server
```
### Testing Strategy
**Backend Tests:**
- **Block Tests**: `poetry run pytest backend/blocks/test/test_block.py -xvs` (validates all blocks)
- **Specific Block**: `poetry run pytest 'backend/blocks/test/test_block.py::test_available_blocks[BlockName]' -xvs`
- **Snapshot Tests**: Use `--snapshot-update` when output changes, always review with `git diff`
**Frontend Tests:**
- **E2E Tests**: Always run `pnpm dev` before `pnpm test` (Playwright requires running instance)
- **Component Tests**: Use Storybook for isolated component development
### Critical Validation Steps
**Before committing changes:**
1. Run `poetry run format` (backend) and `pnpm format` (frontend)
2. Ensure all tests pass in modified areas
3. Verify Docker services are still running
4. Check that database migrations apply cleanly
**Common Issues & Workarounds:**
- **Prisma issues**: Run `poetry run prisma generate` after schema changes
- **Permission errors**: Ensure Docker has proper permissions
- **Port conflicts**: Check the `docker-compose.yml` file for the current list of exposed ports. You can list all mapped ports with:
- **Test timeouts**: Backend tests can take 5+ minutes, use `-x` flag to stop on first failure
## Project Layout & Architecture
### Core Architecture
**AutoGPT Platform** (`autogpt_platform/`):
- `backend/` - FastAPI server with async support
- `backend/backend/` - Core API logic
- `backend/blocks/` - Agent execution blocks
- `backend/data/` - Database models and schemas
- `schema.prisma` - Database schema definition
- `frontend/` - Next.js application
- `src/app/` - App Router pages and layouts
- `src/components/` - Reusable React components
- `src/lib/` - Utilities and configurations
- `autogpt_libs/` - Shared Python utilities
- `docker-compose.yml` - Development stack orchestration
**Key Configuration Files:**
- `pyproject.toml` - Python dependencies and tooling
- `package.json` - Node.js dependencies and scripts
- `schema.prisma` - Database schema and migrations
- `next.config.mjs` - Next.js configuration
- `tailwind.config.ts` - Styling configuration
### Security & Middleware
**Cache Protection**: Backend includes middleware preventing sensitive data caching in browsers/proxies
**Authentication**: JWT-based with Supabase integration
**User ID Validation**: All data access requires user ID checks - verify this for any `data/*.py` changes
### Development Workflow
**GitHub Actions**: Multiple CI/CD workflows in `.github/workflows/`
- `platform-backend-ci.yml` - Backend testing and validation
- `platform-frontend-ci.yml` - Frontend testing and validation
- `platform-fullstack-ci.yml` - End-to-end integration tests
**Pre-commit Hooks**: Run linting and formatting checks
**Conventional Commits**: Use format `type(scope): description` (e.g., `feat(backend): add API`)
### Key Source Files
**Backend Entry Points:**
- `backend/backend/server/server.py` - FastAPI application setup
- `backend/backend/data/` - Database models and user management
- `backend/blocks/` - Agent execution blocks and logic
**Frontend Entry Points:**
- `frontend/src/app/layout.tsx` - Root application layout
- `frontend/src/app/page.tsx` - Home page
- `frontend/src/lib/supabase/` - Authentication and database client
**Protected Routes**: Update `frontend/lib/supabase/middleware.ts` when adding protected routes
### Agent Block System
Agents are built using a visual block-based system where each block performs a single action. Blocks are defined in `backend/blocks/` and must include:
- Block definition with input/output schemas
- Execution logic with proper error handling
- Tests validating functionality
### Database & ORM
**Prisma ORM** with PostgreSQL backend including pgvector for embeddings:
- Schema in `schema.prisma`
- Migrations in `backend/migrations/`
- Always run `prisma migrate dev` and `prisma generate` after schema changes
## Environment Configuration
### Configuration Files Priority Order
1. **Backend**: `/backend/.env.default` → `/backend/.env` (user overrides)
2. **Frontend**: `/frontend/.env.default` → `/frontend/.env` (user overrides)
3. **Platform**: `/.env.default` (Supabase/shared) → `/.env` (user overrides)
4. Docker Compose `environment:` sections override file-based config
5. Shell environment variables have highest precedence
### Docker Environment Setup
- All services use hardcoded defaults (no `${VARIABLE}` substitutions)
- The `env_file` directive loads variables INTO containers at runtime
- Backend/Frontend services use YAML anchors for consistent configuration
- Copy `.env.default` files to `.env` for local development customization
## Advanced Development Patterns
### Adding New Blocks
1. Create file in `/backend/backend/blocks/`
2. Inherit from `Block` base class with input/output schemas
3. Implement `run` method with proper error handling
4. Generate block UUID using `uuid.uuid4()`
5. Register in block registry
6. Write tests alongside block implementation
7. Consider how inputs/outputs connect with other blocks in graph editor
### API Development
1. Update routes in `/backend/backend/server/routers/`
2. Add/update Pydantic models in same directory
3. Write tests alongside route files
4. For `data/*.py` changes, validate user ID checks
5. Run `poetry run test` to verify changes
### Frontend Development
**📖 Complete Frontend Guide**: See `autogpt_platform/frontend/CONTRIBUTING.md` and `autogpt_platform/frontend/.cursorrules` for comprehensive patterns and conventions.
**Quick Reference:**
**Component Structure:**
- Separate render logic from data/behavior
- Structure: `ComponentName/ComponentName.tsx` + `useComponentName.ts` + `helpers.ts`
- Exception: Small components (3-4 lines of logic) can be inline
- Render-only components can be direct files without folders
**Data Fetching:**
- Use generated API hooks from `@/app/api/__generated__/endpoints/`
- Generated via Orval from backend OpenAPI spec
- Pattern: `use{Method}{Version}{OperationName}`
- Example: `useGetV2ListLibraryAgents`
- Regenerate with: `pnpm generate:api`
- **Never** use deprecated `BackendAPI` or `src/lib/autogpt-server-api/*`
**Code Conventions:**
- Use function declarations for components and handlers (not arrow functions)
- Only arrow functions for small inline lambdas (map, filter, etc.)
- Components: `PascalCase`, Hooks: `camelCase` with `use` prefix
- No barrel files or `index.ts` re-exports
- Minimal comments (code should be self-documenting)
**Styling:**
- Use Tailwind CSS utilities only
- Use design system components from `src/components/` (atoms, molecules, organisms)
- Never use `src/components/__legacy__/*`
- Only use Phosphor Icons (`@phosphor-icons/react`)
- Prefer design tokens over hardcoded values
**Error Handling:**
- Render errors: Use `<ErrorCard />` component
- Mutation errors: Display with toast notifications
- Manual exceptions: Use `Sentry.captureException()`
- Global error boundaries already configured
**Testing:**
- Add/update Storybook stories for UI components (`pnpm storybook`)
- Run Playwright E2E tests with `pnpm test`
- Verify in Chromatic after PR
**Architecture:**
- Default to client components ("use client")
- Server components only for SEO or extreme TTFB needs
- Use React Query for server state (via generated hooks)
- Co-locate UI state in components/hooks
### Security Guidelines
**Cache Protection Middleware** (`/backend/backend/server/middleware/security.py`):
- Default: Disables caching for ALL endpoints with `Cache-Control: no-store, no-cache, must-revalidate, private`
- Uses allow list approach for cacheable paths (static assets, health checks, public pages)
- Prevents sensitive data caching in browsers/proxies
- Add new cacheable endpoints to `CACHEABLE_PATHS`
### CI/CD Alignment
The repository has comprehensive CI workflows that test:
- **Backend**: Python 3.11-3.13, services (Redis/RabbitMQ/ClamAV), Prisma migrations, Poetry lock validation
- **Frontend**: Node.js 21, pnpm, Playwright with Docker Compose stack, API schema validation
- **Integration**: Full-stack type checking and E2E testing
Match these patterns when developing locally - the copilot setup environment mirrors these CI configurations.
## Collaboration with Other AI Assistants
This repository is actively developed with assistance from Claude (via CLAUDE.md files). When working on this codebase:
- Check for existing CLAUDE.md files that provide additional context
- Follow established patterns and conventions already in the codebase
- Maintain consistency with existing code style and architecture
- Consider that changes may be reviewed and extended by both human developers and AI assistants
## Trust These Instructions
These instructions are comprehensive and tested. Only perform additional searches if:
1. Information here is incomplete for your specific task
2. You encounter errors not covered by the workarounds
3. You need to understand implementation details not covered above
For detailed platform development patterns, refer to `autogpt_platform/CLAUDE.md` and `AGENTS.md` in the repository root.

View File

@@ -1,97 +0,0 @@
name: Auto Fix CI Failures
on:
workflow_run:
workflows: ["CI"]
types:
- completed
permissions:
contents: write
pull-requests: write
actions: read
issues: write
id-token: write # Required for OIDC token exchange
jobs:
auto-fix:
if: |
github.event.workflow_run.conclusion == 'failure' &&
github.event.workflow_run.pull_requests[0] &&
!startsWith(github.event.workflow_run.head_branch, 'claude-auto-fix-ci-')
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
ref: ${{ github.event.workflow_run.head_branch }}
fetch-depth: 0
token: ${{ secrets.GITHUB_TOKEN }}
- name: Setup git identity
run: |
git config --global user.email "claude[bot]@users.noreply.github.com"
git config --global user.name "claude[bot]"
- name: Create fix branch
id: branch
run: |
BRANCH_NAME="claude-auto-fix-ci-${{ github.event.workflow_run.head_branch }}-${{ github.run_id }}"
git checkout -b "$BRANCH_NAME"
echo "branch_name=$BRANCH_NAME" >> $GITHUB_OUTPUT
- name: Get CI failure details
id: failure_details
uses: actions/github-script@v7
with:
script: |
const run = await github.rest.actions.getWorkflowRun({
owner: context.repo.owner,
repo: context.repo.repo,
run_id: ${{ github.event.workflow_run.id }}
});
const jobs = await github.rest.actions.listJobsForWorkflowRun({
owner: context.repo.owner,
repo: context.repo.repo,
run_id: ${{ github.event.workflow_run.id }}
});
const failedJobs = jobs.data.jobs.filter(job => job.conclusion === 'failure');
let errorLogs = [];
for (const job of failedJobs) {
const logs = await github.rest.actions.downloadJobLogsForWorkflowRun({
owner: context.repo.owner,
repo: context.repo.repo,
job_id: job.id
});
errorLogs.push({
jobName: job.name,
logs: logs.data
});
}
return {
runUrl: run.data.html_url,
failedJobs: failedJobs.map(j => j.name),
errorLogs: errorLogs
};
- name: Fix CI failures with Claude
id: claude
uses: anthropics/claude-code-action@v1
with:
prompt: |
/fix-ci
Failed CI Run: ${{ fromJSON(steps.failure_details.outputs.result).runUrl }}
Failed Jobs: ${{ join(fromJSON(steps.failure_details.outputs.result).failedJobs, ', ') }}
PR Number: ${{ github.event.workflow_run.pull_requests[0].number }}
Branch Name: ${{ steps.branch.outputs.branch_name }}
Base Branch: ${{ github.event.workflow_run.head_branch }}
Repository: ${{ github.repository }}
Error logs:
${{ toJSON(fromJSON(steps.failure_details.outputs.result).errorLogs) }}
anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }}
claude_args: "--allowedTools 'Edit,MultiEdit,Write,Read,Glob,Grep,LS,Bash(git:*),Bash(bun:*),Bash(npm:*),Bash(npx:*),Bash(gh:*)'"

View File

@@ -1,379 +0,0 @@
# Claude Dependabot PR Review Workflow
#
# This workflow automatically runs Claude analysis on Dependabot PRs to:
# - Identify dependency changes and their versions
# - Look up changelogs for updated packages
# - Assess breaking changes and security impacts
# - Provide actionable recommendations for the development team
#
# Triggered on: Dependabot PRs (opened, synchronize)
# Requirements: ANTHROPIC_API_KEY secret must be configured
name: Claude Dependabot PR Review
on:
pull_request:
types: [opened, synchronize]
jobs:
dependabot-review:
# Only run on Dependabot PRs
if: github.actor == 'dependabot[bot]'
runs-on: ubuntu-latest
timeout-minutes: 30
permissions:
contents: write
pull-requests: read
issues: read
id-token: write
actions: read # Required for CI access
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 1
# Backend Python/Poetry setup (mirrors platform-backend-ci.yml)
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11" # Use standard version matching CI
- name: Set up Python dependency cache
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry
key: poetry-${{ runner.os }}-${{ hashFiles('autogpt_platform/backend/poetry.lock') }}
- name: Install Poetry
run: |
# Extract Poetry version from backend/poetry.lock (matches CI)
cd autogpt_platform/backend
HEAD_POETRY_VERSION=$(python3 ../../.github/workflows/scripts/get_package_version_from_lockfile.py poetry)
echo "Found Poetry version ${HEAD_POETRY_VERSION} in backend/poetry.lock"
# Install Poetry
curl -sSL https://install.python-poetry.org | POETRY_VERSION=$HEAD_POETRY_VERSION python3 -
# Add Poetry to PATH
echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Check poetry.lock
working-directory: autogpt_platform/backend
run: |
poetry lock
if ! git diff --quiet --ignore-matching-lines="^# " poetry.lock; then
echo "Warning: poetry.lock not up to date, but continuing for setup"
git checkout poetry.lock # Reset for clean setup
fi
- name: Install Python dependencies
working-directory: autogpt_platform/backend
run: poetry install
- name: Generate Prisma Client
working-directory: autogpt_platform/backend
run: poetry run prisma generate && poetry run gen-prisma-stub
# Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml)
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: "22"
- name: Enable corepack
run: corepack enable
- name: Set pnpm store directory
run: |
pnpm config set store-dir ~/.pnpm-store
echo "PNPM_HOME=$HOME/.pnpm-store" >> $GITHUB_ENV
- name: Cache frontend dependencies
uses: actions/cache@v4
with:
path: ~/.pnpm-store
key: ${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml', 'autogpt_platform/frontend/package.json') }}
restore-keys: |
${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
${{ runner.os }}-pnpm-
- name: Install JavaScript dependencies
working-directory: autogpt_platform/frontend
run: pnpm install --frozen-lockfile
# Install Playwright browsers for frontend testing
# NOTE: Disabled to save ~1 minute of setup time. Re-enable if Copilot needs browser automation (e.g., for MCP)
# - name: Install Playwright browsers
# working-directory: autogpt_platform/frontend
# run: pnpm playwright install --with-deps chromium
# Docker setup for development environment
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Copy default environment files
working-directory: autogpt_platform
run: |
# Copy default environment files for development
cp .env.default .env
cp backend/.env.default backend/.env
cp frontend/.env.default frontend/.env
# Phase 1: Cache and load Docker images for faster setup
- name: Set up Docker image cache
id: docker-cache
uses: actions/cache@v4
with:
path: ~/docker-cache
# Use a versioned key for cache invalidation when image list changes
key: docker-images-v2-${{ runner.os }}-${{ hashFiles('.github/workflows/copilot-setup-steps.yml') }}
restore-keys: |
docker-images-v2-${{ runner.os }}-
docker-images-v1-${{ runner.os }}-
- name: Load or pull Docker images
working-directory: autogpt_platform
run: |
mkdir -p ~/docker-cache
# Define image list for easy maintenance
IMAGES=(
"redis:latest"
"rabbitmq:management"
"clamav/clamav-debian:latest"
"busybox:latest"
"kong:2.8.1"
"supabase/gotrue:v2.170.0"
"supabase/postgres:15.8.1.049"
"supabase/postgres-meta:v0.86.1"
"supabase/studio:20250224-d10db0f"
)
# Check if any cached tar files exist (more reliable than cache-hit)
if ls ~/docker-cache/*.tar 1> /dev/null 2>&1; then
echo "Docker cache found, loading images in parallel..."
for image in "${IMAGES[@]}"; do
# Convert image name to filename (replace : and / with -)
filename=$(echo "$image" | tr ':/' '--')
if [ -f ~/docker-cache/${filename}.tar ]; then
echo "Loading $image..."
docker load -i ~/docker-cache/${filename}.tar || echo "Warning: Failed to load $image from cache" &
fi
done
wait
echo "All cached images loaded"
else
echo "No Docker cache found, pulling images in parallel..."
# Pull all images in parallel
for image in "${IMAGES[@]}"; do
docker pull "$image" &
done
wait
# Only save cache on main branches (not PRs) to avoid cache pollution
if [[ "${{ github.ref }}" == "refs/heads/master" ]] || [[ "${{ github.ref }}" == "refs/heads/dev" ]]; then
echo "Saving Docker images to cache in parallel..."
for image in "${IMAGES[@]}"; do
# Convert image name to filename (replace : and / with -)
filename=$(echo "$image" | tr ':/' '--')
echo "Saving $image..."
docker save -o ~/docker-cache/${filename}.tar "$image" || echo "Warning: Failed to save $image" &
done
wait
echo "Docker image cache saved"
else
echo "Skipping cache save for PR/feature branch"
fi
fi
echo "Docker images ready for use"
# Phase 2: Build migrate service with GitHub Actions cache
- name: Build migrate Docker image with cache
working-directory: autogpt_platform
run: |
# Build the migrate image with buildx for GHA caching
docker buildx build \
--cache-from type=gha \
--cache-to type=gha,mode=max \
--target migrate \
--tag autogpt_platform-migrate:latest \
--load \
-f backend/Dockerfile \
..
# Start services using pre-built images
- name: Start Docker services for development
working-directory: autogpt_platform
run: |
# Start essential services (migrate image already built with correct tag)
docker compose --profile local up deps --no-build --detach
echo "Waiting for services to be ready..."
# Wait for database to be ready
echo "Checking database readiness..."
timeout 30 sh -c 'until docker compose exec -T db pg_isready -U postgres 2>/dev/null; do
echo " Waiting for database..."
sleep 2
done' && echo "✅ Database is ready" || echo "⚠️ Database ready check timeout after 30s, continuing..."
# Check migrate service status
echo "Checking migration status..."
docker compose ps migrate || echo " Migrate service not visible in ps output"
# Wait for migrate service to complete
echo "Waiting for migrations to complete..."
timeout 30 bash -c '
ATTEMPTS=0
while [ $ATTEMPTS -lt 15 ]; do
ATTEMPTS=$((ATTEMPTS + 1))
# Check using docker directly (more reliable than docker compose ps)
CONTAINER_STATUS=$(docker ps -a --filter "label=com.docker.compose.service=migrate" --format "{{.Status}}" | head -1)
if [ -z "$CONTAINER_STATUS" ]; then
echo " Attempt $ATTEMPTS: Migrate container not found yet..."
elif echo "$CONTAINER_STATUS" | grep -q "Exited (0)"; then
echo "✅ Migrations completed successfully"
docker compose logs migrate --tail=5 2>/dev/null || true
exit 0
elif echo "$CONTAINER_STATUS" | grep -q "Exited ([1-9]"; then
EXIT_CODE=$(echo "$CONTAINER_STATUS" | grep -oE "Exited \([0-9]+\)" | grep -oE "[0-9]+")
echo "❌ Migrations failed with exit code: $EXIT_CODE"
echo "Migration logs:"
docker compose logs migrate --tail=20 2>/dev/null || true
exit 1
elif echo "$CONTAINER_STATUS" | grep -q "Up"; then
echo " Attempt $ATTEMPTS: Migrate container is running... ($CONTAINER_STATUS)"
else
echo " Attempt $ATTEMPTS: Migrate container status: $CONTAINER_STATUS"
fi
sleep 2
done
echo "⚠️ Timeout: Could not determine migration status after 30 seconds"
echo "Final container check:"
docker ps -a --filter "label=com.docker.compose.service=migrate" || true
echo "Migration logs (if available):"
docker compose logs migrate --tail=10 2>/dev/null || echo " No logs available"
' || echo "⚠️ Migration check completed with warnings, continuing..."
# Brief wait for other services to stabilize
echo "Waiting 5 seconds for other services to stabilize..."
sleep 5
# Verify installations and provide environment info
- name: Verify setup and show environment info
run: |
echo "=== Python Setup ==="
python --version
poetry --version
echo "=== Node.js Setup ==="
node --version
pnpm --version
echo "=== Additional Tools ==="
docker --version
docker compose version
gh --version || true
echo "=== Services Status ==="
cd autogpt_platform
docker compose ps || true
echo "=== Backend Dependencies ==="
cd backend
poetry show | head -10 || true
echo "=== Frontend Dependencies ==="
cd ../frontend
pnpm list --depth=0 | head -10 || true
echo "=== Environment Files ==="
ls -la ../.env* || true
ls -la .env* || true
ls -la ../backend/.env* || true
echo "✅ AutoGPT Platform development environment setup complete!"
echo "🚀 Ready for development with Docker services running"
echo "📝 Backend server: poetry run serve (port 8000)"
echo "🌐 Frontend server: pnpm dev (port 3000)"
- name: Run Claude Dependabot Analysis
id: claude_review
uses: anthropics/claude-code-action@v1
with:
anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }}
claude_args: |
--allowedTools "Bash(npm:*),Bash(pnpm:*),Bash(poetry:*),Bash(git:*),Edit,Replace,NotebookEditCell,mcp__github_inline_comment__create_inline_comment,Bash(gh pr comment:*), Bash(gh pr diff:*), Bash(gh pr view:*)"
prompt: |
You are Claude, an AI assistant specialized in reviewing Dependabot dependency update PRs.
Your primary tasks are:
1. **Analyze the dependency changes** in this Dependabot PR
2. **Look up changelogs** for all updated dependencies to understand what changed
3. **Identify breaking changes** and assess potential impact on the AutoGPT codebase
4. **Provide actionable recommendations** for the development team
## Analysis Process:
1. **Identify Changed Dependencies**:
- Use git diff to see what dependencies were updated
- Parse package.json, poetry.lock, requirements files, etc.
- List all package versions: old → new
2. **Changelog Research**:
- For each updated dependency, look up its changelog/release notes
- Use WebFetch to access GitHub releases, NPM package pages, PyPI project pages. The pr should also have some details
- Focus on versions between the old and new versions
- Identify: breaking changes, deprecations, security fixes, new features
3. **Breaking Change Assessment**:
- Categorize changes: BREAKING, MAJOR, MINOR, PATCH, SECURITY
- Assess impact on AutoGPT's usage patterns
- Check if AutoGPT uses affected APIs/features
- Look for migration guides or upgrade instructions
4. **Codebase Impact Analysis**:
- Search the AutoGPT codebase for usage of changed APIs
- Identify files that might be affected by breaking changes
- Check test files for deprecated usage patterns
- Look for configuration changes needed
## Output Format:
Provide a comprehensive review comment with:
### 🔍 Dependency Analysis Summary
- List of updated packages with version changes
- Overall risk assessment (LOW/MEDIUM/HIGH)
### 📋 Detailed Changelog Review
For each updated dependency:
- **Package**: name (old_version → new_version)
- **Changes**: Summary of key changes
- **Breaking Changes**: List any breaking changes
- **Security Fixes**: Note security improvements
- **Migration Notes**: Any upgrade steps needed
### ⚠️ Impact Assessment
- **Breaking Changes Found**: Yes/No with details
- **Affected Files**: List AutoGPT files that may need updates
- **Test Impact**: Any tests that may need updating
- **Configuration Changes**: Required config updates
### 🛠️ Recommendations
- **Action Required**: What the team should do
- **Testing Focus**: Areas to test thoroughly
- **Follow-up Tasks**: Any additional work needed
- **Merge Recommendation**: APPROVE/REVIEW_NEEDED/HOLD
### 📚 Useful Links
- Links to relevant changelogs, migration guides, documentation
Be thorough but concise. Focus on actionable insights that help the development team make informed decisions about the dependency updates.

View File

@@ -30,302 +30,18 @@ jobs:
github.event.issue.author_association == 'COLLABORATOR'
)
runs-on: ubuntu-latest
timeout-minutes: 45
permissions:
contents: write
contents: read
pull-requests: read
issues: read
id-token: write
actions: read # Required for CI access
steps:
- name: Checkout code
- name: Checkout repository
uses: actions/checkout@v4
with:
fetch-depth: 1
- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@v1.3.1
with:
large-packages: false # slow
docker-images: false # limited benefit
# Backend Python/Poetry setup (mirrors platform-backend-ci.yml)
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11" # Use standard version matching CI
- name: Set up Python dependency cache
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry
key: poetry-${{ runner.os }}-${{ hashFiles('autogpt_platform/backend/poetry.lock') }}
- name: Install Poetry
run: |
# Extract Poetry version from backend/poetry.lock (matches CI)
cd autogpt_platform/backend
HEAD_POETRY_VERSION=$(python3 ../../.github/workflows/scripts/get_package_version_from_lockfile.py poetry)
echo "Found Poetry version ${HEAD_POETRY_VERSION} in backend/poetry.lock"
# Install Poetry
curl -sSL https://install.python-poetry.org | POETRY_VERSION=$HEAD_POETRY_VERSION python3 -
# Add Poetry to PATH
echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Check poetry.lock
working-directory: autogpt_platform/backend
run: |
poetry lock
if ! git diff --quiet --ignore-matching-lines="^# " poetry.lock; then
echo "Warning: poetry.lock not up to date, but continuing for setup"
git checkout poetry.lock # Reset for clean setup
fi
- name: Install Python dependencies
working-directory: autogpt_platform/backend
run: poetry install
- name: Generate Prisma Client
working-directory: autogpt_platform/backend
run: poetry run prisma generate && poetry run gen-prisma-stub
# Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml)
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: "22"
- name: Enable corepack
run: corepack enable
- name: Set pnpm store directory
run: |
pnpm config set store-dir ~/.pnpm-store
echo "PNPM_HOME=$HOME/.pnpm-store" >> $GITHUB_ENV
- name: Cache frontend dependencies
uses: actions/cache@v4
with:
path: ~/.pnpm-store
key: ${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml', 'autogpt_platform/frontend/package.json') }}
restore-keys: |
${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
${{ runner.os }}-pnpm-
- name: Install JavaScript dependencies
working-directory: autogpt_platform/frontend
run: pnpm install --frozen-lockfile
# Install Playwright browsers for frontend testing
# NOTE: Disabled to save ~1 minute of setup time. Re-enable if Copilot needs browser automation (e.g., for MCP)
# - name: Install Playwright browsers
# working-directory: autogpt_platform/frontend
# run: pnpm playwright install --with-deps chromium
# Docker setup for development environment
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Copy default environment files
working-directory: autogpt_platform
run: |
# Copy default environment files for development
cp .env.default .env
cp backend/.env.default backend/.env
cp frontend/.env.default frontend/.env
# Phase 1: Cache and load Docker images for faster setup
- name: Set up Docker image cache
id: docker-cache
uses: actions/cache@v4
with:
path: ~/docker-cache
# Use a versioned key for cache invalidation when image list changes
key: docker-images-v2-${{ runner.os }}-${{ hashFiles('.github/workflows/copilot-setup-steps.yml') }}
restore-keys: |
docker-images-v2-${{ runner.os }}-
docker-images-v1-${{ runner.os }}-
- name: Load or pull Docker images
working-directory: autogpt_platform
run: |
mkdir -p ~/docker-cache
# Define image list for easy maintenance
IMAGES=(
"redis:latest"
"rabbitmq:management"
"clamav/clamav-debian:latest"
"busybox:latest"
"kong:2.8.1"
"supabase/gotrue:v2.170.0"
"supabase/postgres:15.8.1.049"
"supabase/postgres-meta:v0.86.1"
"supabase/studio:20250224-d10db0f"
)
# Check if any cached tar files exist (more reliable than cache-hit)
if ls ~/docker-cache/*.tar 1> /dev/null 2>&1; then
echo "Docker cache found, loading images in parallel..."
for image in "${IMAGES[@]}"; do
# Convert image name to filename (replace : and / with -)
filename=$(echo "$image" | tr ':/' '--')
if [ -f ~/docker-cache/${filename}.tar ]; then
echo "Loading $image..."
docker load -i ~/docker-cache/${filename}.tar || echo "Warning: Failed to load $image from cache" &
fi
done
wait
echo "All cached images loaded"
else
echo "No Docker cache found, pulling images in parallel..."
# Pull all images in parallel
for image in "${IMAGES[@]}"; do
docker pull "$image" &
done
wait
# Only save cache on main branches (not PRs) to avoid cache pollution
if [[ "${{ github.ref }}" == "refs/heads/master" ]] || [[ "${{ github.ref }}" == "refs/heads/dev" ]]; then
echo "Saving Docker images to cache in parallel..."
for image in "${IMAGES[@]}"; do
# Convert image name to filename (replace : and / with -)
filename=$(echo "$image" | tr ':/' '--')
echo "Saving $image..."
docker save -o ~/docker-cache/${filename}.tar "$image" || echo "Warning: Failed to save $image" &
done
wait
echo "Docker image cache saved"
else
echo "Skipping cache save for PR/feature branch"
fi
fi
echo "Docker images ready for use"
# Phase 2: Build migrate service with GitHub Actions cache
- name: Build migrate Docker image with cache
working-directory: autogpt_platform
run: |
# Build the migrate image with buildx for GHA caching
docker buildx build \
--cache-from type=gha \
--cache-to type=gha,mode=max \
--target migrate \
--tag autogpt_platform-migrate:latest \
--load \
-f backend/Dockerfile \
..
# Start services using pre-built images
- name: Start Docker services for development
working-directory: autogpt_platform
run: |
# Start essential services (migrate image already built with correct tag)
docker compose --profile local up deps --no-build --detach
echo "Waiting for services to be ready..."
# Wait for database to be ready
echo "Checking database readiness..."
timeout 30 sh -c 'until docker compose exec -T db pg_isready -U postgres 2>/dev/null; do
echo " Waiting for database..."
sleep 2
done' && echo "✅ Database is ready" || echo "⚠️ Database ready check timeout after 30s, continuing..."
# Check migrate service status
echo "Checking migration status..."
docker compose ps migrate || echo " Migrate service not visible in ps output"
# Wait for migrate service to complete
echo "Waiting for migrations to complete..."
timeout 30 bash -c '
ATTEMPTS=0
while [ $ATTEMPTS -lt 15 ]; do
ATTEMPTS=$((ATTEMPTS + 1))
# Check using docker directly (more reliable than docker compose ps)
CONTAINER_STATUS=$(docker ps -a --filter "label=com.docker.compose.service=migrate" --format "{{.Status}}" | head -1)
if [ -z "$CONTAINER_STATUS" ]; then
echo " Attempt $ATTEMPTS: Migrate container not found yet..."
elif echo "$CONTAINER_STATUS" | grep -q "Exited (0)"; then
echo "✅ Migrations completed successfully"
docker compose logs migrate --tail=5 2>/dev/null || true
exit 0
elif echo "$CONTAINER_STATUS" | grep -q "Exited ([1-9]"; then
EXIT_CODE=$(echo "$CONTAINER_STATUS" | grep -oE "Exited \([0-9]+\)" | grep -oE "[0-9]+")
echo "❌ Migrations failed with exit code: $EXIT_CODE"
echo "Migration logs:"
docker compose logs migrate --tail=20 2>/dev/null || true
exit 1
elif echo "$CONTAINER_STATUS" | grep -q "Up"; then
echo " Attempt $ATTEMPTS: Migrate container is running... ($CONTAINER_STATUS)"
else
echo " Attempt $ATTEMPTS: Migrate container status: $CONTAINER_STATUS"
fi
sleep 2
done
echo "⚠️ Timeout: Could not determine migration status after 30 seconds"
echo "Final container check:"
docker ps -a --filter "label=com.docker.compose.service=migrate" || true
echo "Migration logs (if available):"
docker compose logs migrate --tail=10 2>/dev/null || echo " No logs available"
' || echo "⚠️ Migration check completed with warnings, continuing..."
# Brief wait for other services to stabilize
echo "Waiting 5 seconds for other services to stabilize..."
sleep 5
# Verify installations and provide environment info
- name: Verify setup and show environment info
run: |
echo "=== Python Setup ==="
python --version
poetry --version
echo "=== Node.js Setup ==="
node --version
pnpm --version
echo "=== Additional Tools ==="
docker --version
docker compose version
gh --version || true
echo "=== Services Status ==="
cd autogpt_platform
docker compose ps || true
echo "=== Backend Dependencies ==="
cd backend
poetry show | head -10 || true
echo "=== Frontend Dependencies ==="
cd ../frontend
pnpm list --depth=0 | head -10 || true
echo "=== Environment Files ==="
ls -la ../.env* || true
ls -la .env* || true
ls -la ../backend/.env* || true
echo "✅ AutoGPT Platform development environment setup complete!"
echo "🚀 Ready for development with Docker services running"
echo "📝 Backend server: poetry run serve (port 8000)"
echo "🌐 Frontend server: pnpm dev (port 3000)"
- name: Run Claude Code
id: claude
uses: anthropics/claude-code-action@v1
uses: anthropics/claude-code-action@beta
with:
anthropic_api_key: ${{ secrets.ANTHROPIC_API_KEY }}
claude_args: |
--allowedTools "Bash(npm:*),Bash(pnpm:*),Bash(poetry:*),Bash(git:*),Edit,Replace,NotebookEditCell,mcp__github_inline_comment__create_inline_comment,Bash(gh pr comment:*), Bash(gh pr diff:*), Bash(gh pr view:*), Bash(gh pr edit:*)"
--model opus
additional_permissions: |
actions: read

View File

@@ -1,312 +0,0 @@
name: "Copilot Setup Steps"
# Automatically run the setup steps when they are changed to allow for easy validation, and
# allow manual testing through the repository's "Actions" tab
on:
workflow_dispatch:
push:
paths:
- .github/workflows/copilot-setup-steps.yml
pull_request:
paths:
- .github/workflows/copilot-setup-steps.yml
jobs:
# The job MUST be called `copilot-setup-steps` or it will not be picked up by Copilot.
copilot-setup-steps:
runs-on: ubuntu-latest
timeout-minutes: 45
# Set the permissions to the lowest permissions possible needed for your steps.
# Copilot will be given its own token for its operations.
permissions:
# If you want to clone the repository as part of your setup steps, for example to install dependencies, you'll need the `contents: read` permission. If you don't clone the repository in your setup steps, Copilot will do this for you automatically after the steps complete.
contents: read
# You can define any steps you want, and they will run before the agent starts.
# If you do not check out your code, Copilot will do this for you.
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
submodules: true
# Backend Python/Poetry setup (mirrors platform-backend-ci.yml)
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11" # Use standard version matching CI
- name: Set up Python dependency cache
uses: actions/cache@v4
with:
path: ~/.cache/pypoetry
key: poetry-${{ runner.os }}-${{ hashFiles('autogpt_platform/backend/poetry.lock') }}
- name: Install Poetry
run: |
# Extract Poetry version from backend/poetry.lock (matches CI)
cd autogpt_platform/backend
HEAD_POETRY_VERSION=$(python3 ../../.github/workflows/scripts/get_package_version_from_lockfile.py poetry)
echo "Found Poetry version ${HEAD_POETRY_VERSION} in backend/poetry.lock"
# Install Poetry
curl -sSL https://install.python-poetry.org | POETRY_VERSION=$HEAD_POETRY_VERSION python3 -
# Add Poetry to PATH
echo "$HOME/.local/bin" >> $GITHUB_PATH
- name: Check poetry.lock
working-directory: autogpt_platform/backend
run: |
poetry lock
if ! git diff --quiet --ignore-matching-lines="^# " poetry.lock; then
echo "Warning: poetry.lock not up to date, but continuing for setup"
git checkout poetry.lock # Reset for clean setup
fi
- name: Install Python dependencies
working-directory: autogpt_platform/backend
run: poetry install
- name: Generate Prisma Client
working-directory: autogpt_platform/backend
run: poetry run prisma generate && poetry run gen-prisma-stub
# Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml)
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: "22"
- name: Enable corepack
run: corepack enable
- name: Set pnpm store directory
run: |
pnpm config set store-dir ~/.pnpm-store
echo "PNPM_HOME=$HOME/.pnpm-store" >> $GITHUB_ENV
- name: Cache frontend dependencies
uses: actions/cache@v4
with:
path: ~/.pnpm-store
key: ${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml', 'autogpt_platform/frontend/package.json') }}
restore-keys: |
${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
${{ runner.os }}-pnpm-
- name: Install JavaScript dependencies
working-directory: autogpt_platform/frontend
run: pnpm install --frozen-lockfile
# Install Playwright browsers for frontend testing
# NOTE: Disabled to save ~1 minute of setup time. Re-enable if Copilot needs browser automation (e.g., for MCP)
# - name: Install Playwright browsers
# working-directory: autogpt_platform/frontend
# run: pnpm playwright install --with-deps chromium
# Docker setup for development environment
- name: Free up disk space
run: |
# Remove large unused tools to free disk space for Docker builds
sudo rm -rf /usr/share/dotnet
sudo rm -rf /usr/local/lib/android
sudo rm -rf /opt/ghc
sudo rm -rf /opt/hostedtoolcache/CodeQL
sudo docker system prune -af
df -h
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Copy default environment files
working-directory: autogpt_platform
run: |
# Copy default environment files for development
cp .env.default .env
cp backend/.env.default backend/.env
cp frontend/.env.default frontend/.env
# Phase 1: Cache and load Docker images for faster setup
- name: Set up Docker image cache
id: docker-cache
uses: actions/cache@v4
with:
path: ~/docker-cache
# Use a versioned key for cache invalidation when image list changes
key: docker-images-v2-${{ runner.os }}-${{ hashFiles('.github/workflows/copilot-setup-steps.yml') }}
restore-keys: |
docker-images-v2-${{ runner.os }}-
docker-images-v1-${{ runner.os }}-
- name: Load or pull Docker images
working-directory: autogpt_platform
run: |
mkdir -p ~/docker-cache
# Define image list for easy maintenance
IMAGES=(
"redis:latest"
"rabbitmq:management"
"clamav/clamav-debian:latest"
"busybox:latest"
"kong:2.8.1"
"supabase/gotrue:v2.170.0"
"supabase/postgres:15.8.1.049"
"supabase/postgres-meta:v0.86.1"
"supabase/studio:20250224-d10db0f"
)
# Check if any cached tar files exist (more reliable than cache-hit)
if ls ~/docker-cache/*.tar 1> /dev/null 2>&1; then
echo "Docker cache found, loading images in parallel..."
for image in "${IMAGES[@]}"; do
# Convert image name to filename (replace : and / with -)
filename=$(echo "$image" | tr ':/' '--')
if [ -f ~/docker-cache/${filename}.tar ]; then
echo "Loading $image..."
docker load -i ~/docker-cache/${filename}.tar || echo "Warning: Failed to load $image from cache" &
fi
done
wait
echo "All cached images loaded"
else
echo "No Docker cache found, pulling images in parallel..."
# Pull all images in parallel
for image in "${IMAGES[@]}"; do
docker pull "$image" &
done
wait
# Only save cache on main branches (not PRs) to avoid cache pollution
if [[ "${{ github.ref }}" == "refs/heads/master" ]] || [[ "${{ github.ref }}" == "refs/heads/dev" ]]; then
echo "Saving Docker images to cache in parallel..."
for image in "${IMAGES[@]}"; do
# Convert image name to filename (replace : and / with -)
filename=$(echo "$image" | tr ':/' '--')
echo "Saving $image..."
docker save -o ~/docker-cache/${filename}.tar "$image" || echo "Warning: Failed to save $image" &
done
wait
echo "Docker image cache saved"
else
echo "Skipping cache save for PR/feature branch"
fi
fi
echo "Docker images ready for use"
# Phase 2: Build migrate service with GitHub Actions cache
- name: Build migrate Docker image with cache
working-directory: autogpt_platform
run: |
# Build the migrate image with buildx for GHA caching
docker buildx build \
--cache-from type=gha \
--cache-to type=gha,mode=max \
--target migrate \
--tag autogpt_platform-migrate:latest \
--load \
-f backend/Dockerfile \
..
# Start services using pre-built images
- name: Start Docker services for development
working-directory: autogpt_platform
run: |
# Start essential services (migrate image already built with correct tag)
docker compose --profile local up deps --no-build --detach
echo "Waiting for services to be ready..."
# Wait for database to be ready
echo "Checking database readiness..."
timeout 30 sh -c 'until docker compose exec -T db pg_isready -U postgres 2>/dev/null; do
echo " Waiting for database..."
sleep 2
done' && echo "✅ Database is ready" || echo "⚠️ Database ready check timeout after 30s, continuing..."
# Check migrate service status
echo "Checking migration status..."
docker compose ps migrate || echo " Migrate service not visible in ps output"
# Wait for migrate service to complete
echo "Waiting for migrations to complete..."
timeout 30 bash -c '
ATTEMPTS=0
while [ $ATTEMPTS -lt 15 ]; do
ATTEMPTS=$((ATTEMPTS + 1))
# Check using docker directly (more reliable than docker compose ps)
CONTAINER_STATUS=$(docker ps -a --filter "label=com.docker.compose.service=migrate" --format "{{.Status}}" | head -1)
if [ -z "$CONTAINER_STATUS" ]; then
echo " Attempt $ATTEMPTS: Migrate container not found yet..."
elif echo "$CONTAINER_STATUS" | grep -q "Exited (0)"; then
echo "✅ Migrations completed successfully"
docker compose logs migrate --tail=5 2>/dev/null || true
exit 0
elif echo "$CONTAINER_STATUS" | grep -q "Exited ([1-9]"; then
EXIT_CODE=$(echo "$CONTAINER_STATUS" | grep -oE "Exited \([0-9]+\)" | grep -oE "[0-9]+")
echo "❌ Migrations failed with exit code: $EXIT_CODE"
echo "Migration logs:"
docker compose logs migrate --tail=20 2>/dev/null || true
exit 1
elif echo "$CONTAINER_STATUS" | grep -q "Up"; then
echo " Attempt $ATTEMPTS: Migrate container is running... ($CONTAINER_STATUS)"
else
echo " Attempt $ATTEMPTS: Migrate container status: $CONTAINER_STATUS"
fi
sleep 2
done
echo "⚠️ Timeout: Could not determine migration status after 30 seconds"
echo "Final container check:"
docker ps -a --filter "label=com.docker.compose.service=migrate" || true
echo "Migration logs (if available):"
docker compose logs migrate --tail=10 2>/dev/null || echo " No logs available"
' || echo "⚠️ Migration check completed with warnings, continuing..."
# Brief wait for other services to stabilize
echo "Waiting 5 seconds for other services to stabilize..."
sleep 5
# Verify installations and provide environment info
- name: Verify setup and show environment info
run: |
echo "=== Python Setup ==="
python --version
poetry --version
echo "=== Node.js Setup ==="
node --version
pnpm --version
echo "=== Additional Tools ==="
docker --version
docker compose version
gh --version || true
echo "=== Services Status ==="
cd autogpt_platform
docker compose ps || true
echo "=== Backend Dependencies ==="
cd backend
poetry show | head -10 || true
echo "=== Frontend Dependencies ==="
cd ../frontend
pnpm list --depth=0 | head -10 || true
echo "=== Environment Files ==="
ls -la ../.env* || true
ls -la .env* || true
ls -la ../backend/.env* || true
echo "✅ AutoGPT Platform development environment setup complete!"
echo "🚀 Ready for development with Docker services running"
echo "📝 Backend server: poetry run serve (port 8000)"
echo "🌐 Frontend server: pnpm dev (port 3000)"

View File

@@ -3,7 +3,6 @@ name: AutoGPT Platform - Deploy Prod Environment
on:
release:
types: [published]
workflow_dispatch:
permissions:
contents: 'read'
@@ -18,8 +17,6 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
ref: ${{ github.ref_name || 'master' }}
- name: Set up Python
uses: actions/setup-python@v5
@@ -39,7 +36,7 @@ jobs:
DATABASE_URL: ${{ secrets.BACKEND_DATABASE_URL }}
DIRECT_URL: ${{ secrets.BACKEND_DATABASE_URL }}
trigger:
needs: migrate
runs-on: ubuntu-latest
@@ -50,5 +47,4 @@ jobs:
token: ${{ secrets.DEPLOY_TOKEN }}
repository: Significant-Gravitas/AutoGPT_cloud_infrastructure
event-type: build_deploy_prod
client-payload: |
{"ref": "${{ github.ref_name || 'master' }}", "repository": "${{ github.repository }}"}
client-payload: '{"ref": "${{ github.ref }}", "sha": "${{ github.sha }}", "repository": "${{ github.repository }}"}'

View File

@@ -5,13 +5,6 @@ on:
branches: [ dev ]
paths:
- 'autogpt_platform/**'
workflow_dispatch:
inputs:
git_ref:
description: 'Git ref (branch/tag) of AutoGPT to deploy'
required: true
default: 'master'
type: string
permissions:
contents: 'read'
@@ -26,8 +19,6 @@ jobs:
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.git_ref || github.ref_name }}
- name: Set up Python
uses: actions/setup-python@v5
@@ -57,4 +48,4 @@ jobs:
token: ${{ secrets.DEPLOY_TOKEN }}
repository: Significant-Gravitas/AutoGPT_cloud_infrastructure
event-type: build_deploy_dev
client-payload: '{"ref": "${{ github.event.inputs.git_ref || github.ref }}", "repository": "${{ github.repository }}"}'
client-payload: '{"ref": "${{ github.ref }}", "sha": "${{ github.sha }}", "repository": "${{ github.repository }}"}'

View File

@@ -32,12 +32,14 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: ["3.11", "3.12", "3.13"]
python-version: ["3.11"]
runs-on: ubuntu-latest
services:
redis:
image: redis:latest
image: bitnami/redis:6.2
env:
REDIS_PASSWORD: testpassword
ports:
- 6379:6379
rabbitmq:
@@ -134,7 +136,7 @@ jobs:
run: poetry install
- name: Generate Prisma Client
run: poetry run prisma generate && poetry run gen-prisma-stub
run: poetry run prisma generate
- id: supabase
name: Start Supabase
@@ -176,7 +178,7 @@ jobs:
}
- name: Run Database Migrations
run: poetry run prisma migrate deploy
run: poetry run prisma migrate dev --name updates
env:
DATABASE_URL: ${{ steps.supabase.outputs.DB_URL }}
DIRECT_URL: ${{ steps.supabase.outputs.DB_URL }}
@@ -199,9 +201,10 @@ jobs:
DIRECT_URL: ${{ steps.supabase.outputs.DB_URL }}
SUPABASE_URL: ${{ steps.supabase.outputs.API_URL }}
SUPABASE_SERVICE_ROLE_KEY: ${{ steps.supabase.outputs.SERVICE_ROLE_KEY }}
JWT_VERIFY_KEY: ${{ steps.supabase.outputs.JWT_SECRET }}
SUPABASE_JWT_SECRET: ${{ steps.supabase.outputs.JWT_SECRET }}
REDIS_HOST: "localhost"
REDIS_PORT: "6379"
REDIS_PASSWORD: "testpassword"
ENCRYPTION_KEY: "dvziYgz0KSK8FENhju0ZYi8-fRTfAdlz6YLhdB_jhNw=" # DO NOT USE IN PRODUCTION!!
env:

View File

@@ -11,11 +11,6 @@ on:
- ".github/workflows/platform-frontend-ci.yml"
- "autogpt_platform/frontend/**"
merge_group:
workflow_dispatch:
concurrency:
group: ${{ github.workflow }}-${{ github.event_name == 'merge_group' && format('merge-queue-{0}', github.ref) || format('{0}-{1}', github.ref, github.event.pull_request.number || github.sha) }}
cancel-in-progress: ${{ github.event_name == 'pull_request' }}
defaults:
run:
@@ -35,7 +30,7 @@ jobs:
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: "22.18.0"
node-version: "21"
- name: Enable corepack
run: corepack enable
@@ -67,7 +62,7 @@ jobs:
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: "22.18.0"
node-version: "21"
- name: Enable corepack
run: corepack enable
@@ -87,6 +82,37 @@ jobs:
- name: Run lint
run: pnpm lint
type-check:
runs-on: ubuntu-latest
needs: setup
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: "21"
- name: Enable corepack
run: corepack enable
- name: Restore dependencies cache
uses: actions/cache@v4
with:
path: ~/.pnpm-store
key: ${{ needs.setup.outputs.cache-key }}
restore-keys: |
${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
${{ runner.os }}-pnpm-
- name: Install dependencies
run: pnpm install --frozen-lockfile
- name: Run tsc check
run: pnpm type-check
chromatic:
runs-on: ubuntu-latest
needs: setup
@@ -102,7 +128,7 @@ jobs:
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: "22.18.0"
node-version: "21"
- name: Enable corepack
run: corepack enable
@@ -143,22 +169,18 @@ jobs:
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: "22.18.0"
node-version: "21"
- name: Enable corepack
run: corepack enable
- name: Copy default supabase .env
run: |
cp ../.env.default ../.env
cp ../.env.example ../.env
- name: Copy backend .env and set OpenAI API key
- name: Copy backend .env
run: |
cp ../backend/.env.default ../backend/.env
echo "OPENAI_INTERNAL_API_KEY=${{ secrets.OPENAI_API_KEY }}" >> ../backend/.env
env:
# Used by E2E test data script to generate embeddings for approved store agents
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
cp ../backend/.env.example ../backend/.env
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
@@ -173,7 +195,7 @@ jobs:
- name: Run docker compose
run: |
NEXT_PUBLIC_PW_TEST=true docker compose -f ../docker-compose.yml up -d
docker compose -f ../docker-compose.yml up -d
env:
DOCKER_BUILDKIT: 1
BUILDX_CACHE_FROM: type=local,src=/tmp/.buildx-cache
@@ -230,30 +252,27 @@ jobs:
- name: Install dependencies
run: pnpm install --frozen-lockfile
- name: Setup .env
run: cp .env.example .env
- name: Build frontend
run: pnpm build --turbo
# uses Turbopack, much faster and safe enough for a test pipeline
env:
NEXT_PUBLIC_PW_TEST: true
- name: Install Browser 'chromium'
run: pnpm playwright install --with-deps chromium
- name: Run Playwright tests
run: pnpm test:no-build
continue-on-error: false
- name: Upload Playwright report
if: always()
- name: Upload Playwright artifacts
if: failure()
uses: actions/upload-artifact@v4
with:
name: playwright-report
path: playwright-report
if-no-files-found: ignore
retention-days: 3
- name: Upload Playwright test results
if: always()
uses: actions/upload-artifact@v4
with:
name: playwright-test-results
path: test-results
if-no-files-found: ignore
retention-days: 3
- name: Print Final Docker Compose logs
if: always()

View File

@@ -1,136 +0,0 @@
name: AutoGPT Platform - Frontend CI
on:
push:
branches: [master, dev]
paths:
- ".github/workflows/platform-fullstack-ci.yml"
- "autogpt_platform/**"
pull_request:
paths:
- ".github/workflows/platform-fullstack-ci.yml"
- "autogpt_platform/**"
merge_group:
concurrency:
group: ${{ github.workflow }}-${{ github.event_name == 'merge_group' && format('merge-queue-{0}', github.ref) || github.head_ref && format('pr-{0}', github.event.pull_request.number) || github.sha }}
cancel-in-progress: ${{ github.event_name == 'pull_request' }}
defaults:
run:
shell: bash
working-directory: autogpt_platform/frontend
jobs:
setup:
runs-on: ubuntu-latest
outputs:
cache-key: ${{ steps.cache-key.outputs.key }}
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: "22.18.0"
- name: Enable corepack
run: corepack enable
- name: Generate cache key
id: cache-key
run: echo "key=${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml', 'autogpt_platform/frontend/package.json') }}" >> $GITHUB_OUTPUT
- name: Cache dependencies
uses: actions/cache@v4
with:
path: ~/.pnpm-store
key: ${{ steps.cache-key.outputs.key }}
restore-keys: |
${{ runner.os }}-pnpm-${{ hashFiles('autogpt_platform/frontend/pnpm-lock.yaml') }}
${{ runner.os }}-pnpm-
- name: Install dependencies
run: pnpm install --frozen-lockfile
types:
runs-on: ubuntu-latest
needs: setup
strategy:
fail-fast: false
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
submodules: recursive
- name: Set up Node.js
uses: actions/setup-node@v4
with:
node-version: "22.18.0"
- name: Enable corepack
run: corepack enable
- name: Copy default supabase .env
run: |
cp ../.env.default ../.env
- name: Copy backend .env
run: |
cp ../backend/.env.default ../backend/.env
- name: Run docker compose
run: |
docker compose -f ../docker-compose.yml --profile local --profile deps_backend up -d
- name: Restore dependencies cache
uses: actions/cache@v4
with:
path: ~/.pnpm-store
key: ${{ needs.setup.outputs.cache-key }}
restore-keys: |
${{ runner.os }}-pnpm-
- name: Install dependencies
run: pnpm install --frozen-lockfile
- name: Setup .env
run: cp .env.default .env
- name: Wait for services to be ready
run: |
echo "Waiting for rest_server to be ready..."
timeout 60 sh -c 'until curl -f http://localhost:8006/health 2>/dev/null; do sleep 2; done' || echo "Rest server health check timeout, continuing..."
echo "Waiting for database to be ready..."
timeout 60 sh -c 'until docker compose -f ../docker-compose.yml exec -T db pg_isready -U postgres 2>/dev/null; do sleep 2; done' || echo "Database ready check timeout, continuing..."
- name: Generate API queries
run: pnpm generate:api:force
- name: Check for API schema changes
run: |
if ! git diff --exit-code src/app/api/openapi.json; then
echo "❌ API schema changes detected in src/app/api/openapi.json"
echo ""
echo "The openapi.json file has been modified after running 'pnpm generate:api-all'."
echo "This usually means changes have been made in the BE endpoints without updating the Frontend."
echo "The API schema is now out of sync with the Front-end queries."
echo ""
echo "To fix this:"
echo "1. Pull the backend 'docker compose pull && docker compose up -d --build --force-recreate'"
echo "2. Run 'pnpm generate:api' locally"
echo "3. Run 'pnpm types' locally"
echo "4. Fix any TypeScript errors that may have been introduced"
echo "5. Commit and push your changes"
echo ""
exit 1
else
echo "✅ No API schema changes detected"
fi
- name: Run Typescript checks
run: pnpm types

View File

@@ -11,7 +11,7 @@ jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v10
- uses: actions/stale@v9
with:
# operations-per-run: 5000
stale-issue-message: >

View File

@@ -61,6 +61,6 @@ jobs:
pull-requests: write
runs-on: ubuntu-latest
steps:
- uses: actions/labeler@v6
- uses: actions/labeler@v5
with:
sync-labels: true

4
.gitignore vendored
View File

@@ -5,8 +5,6 @@ classic/original_autogpt/*.json
auto_gpt_workspace/*
*.mpeg
.env
# Root .env files
/.env
azure.yaml
.vscode
.idea/*
@@ -123,6 +121,7 @@ celerybeat.pid
# Environments
.direnv/
.env
.venv
env/
venv*/
@@ -178,4 +177,3 @@ autogpt_platform/backend/settings.py
*.ign.*
.test-contents
.claude/settings.local.json
/autogpt_platform/backend/logs

View File

@@ -1,3 +1,6 @@
[pr_reviewer]
num_code_suggestions=0
[pr_code_suggestions]
commitable_code_suggestions=false
num_code_suggestions=0

View File

@@ -235,7 +235,7 @@ repos:
hooks:
- id: tsc
name: Typecheck - AutoGPT Platform - Frontend
entry: bash -c 'cd autogpt_platform/frontend && pnpm types'
entry: bash -c 'cd autogpt_platform/frontend && pnpm type-check'
files: ^autogpt_platform/frontend/
types: [file]
language: system

View File

@@ -3,16 +3,6 @@
[![Discord Follow](https://img.shields.io/badge/dynamic/json?url=https%3A%2F%2Fdiscord.com%2Fapi%2Finvites%2Fautogpt%3Fwith_counts%3Dtrue&query=%24.approximate_member_count&label=total%20members&logo=discord&logoColor=white&color=7289da)](https://discord.gg/autogpt) &ensp;
[![Twitter Follow](https://img.shields.io/twitter/follow/Auto_GPT?style=social)](https://twitter.com/Auto_GPT) &ensp;
<!-- Keep these links. Translations will automatically update with the README. -->
[Deutsch](https://zdoc.app/de/Significant-Gravitas/AutoGPT) |
[Español](https://zdoc.app/es/Significant-Gravitas/AutoGPT) |
[français](https://zdoc.app/fr/Significant-Gravitas/AutoGPT) |
[日本語](https://zdoc.app/ja/Significant-Gravitas/AutoGPT) |
[한국어](https://zdoc.app/ko/Significant-Gravitas/AutoGPT) |
[Português](https://zdoc.app/pt/Significant-Gravitas/AutoGPT) |
[Русский](https://zdoc.app/ru/Significant-Gravitas/AutoGPT) |
[中文](https://zdoc.app/zh/Significant-Gravitas/AutoGPT)
**AutoGPT** is a powerful platform that allows you to create, deploy, and manage continuous AI agents that automate complex workflows.
## Hosting Options

View File

@@ -1,11 +1,9 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Repository Overview
AutoGPT Platform is a monorepo containing:
- **Backend** (`/backend`): Python FastAPI server with async support
- **Frontend** (`/frontend`): Next.js React application
- **Shared Libraries** (`/autogpt_libs`): Common Python utilities
@@ -13,7 +11,6 @@ AutoGPT Platform is a monorepo containing:
## Essential Commands
### Backend Development
```bash
# Install dependencies
cd backend && poetry install
@@ -33,18 +30,11 @@ poetry run test
# Run specific test
poetry run pytest path/to/test_file.py::test_function_name
# Run block tests (tests that validate all blocks work correctly)
poetry run pytest backend/blocks/test/test_block.py -xvs
# Run tests for a specific block (e.g., GetCurrentTimeBlock)
poetry run pytest 'backend/blocks/test/test_block.py::test_available_blocks[GetCurrentTimeBlock]' -xvs
# Lint and format
# prefer format if you want to just "fix" it and only get the errors that can't be autofixed
poetry run format # Black + isort
poetry run lint # ruff
```
More details can be found in TESTING.md
#### Creating/Updating Snapshots
@@ -57,49 +47,31 @@ poetry run pytest path/to/test.py --snapshot-update
⚠️ **Important**: Always review snapshot changes before committing! Use `git diff` to verify the changes are expected.
### Frontend Development
### Frontend Development
```bash
# Install dependencies
cd frontend && pnpm i
# Generate API client from OpenAPI spec
pnpm generate:api
cd frontend && npm install
# Start development server
pnpm dev
npm run dev
# Run E2E tests
pnpm test
npm run test
# Run Storybook for component development
pnpm storybook
npm run storybook
# Build production
pnpm build
# Format and lint
pnpm format
npm run build
# Type checking
pnpm types
npm run type-check
```
**📖 Complete Guide**: See `/frontend/CONTRIBUTING.md` and `/frontend/.cursorrules` for comprehensive frontend patterns.
**Key Frontend Conventions:**
- Separate render logic from data/behavior in components
- Use generated API hooks from `@/app/api/__generated__/endpoints/`
- Use function declarations (not arrow functions) for components/handlers
- Use design system components from `src/components/` (atoms, molecules, organisms)
- Only use Phosphor Icons
- Never use `src/components/__legacy__/*` or deprecated `BackendAPI`
## Architecture Overview
### Backend Architecture
- **API Layer**: FastAPI with REST and WebSocket endpoints
- **Database**: PostgreSQL with Prisma ORM, includes pgvector for embeddings
- **Queue System**: RabbitMQ for async task processing
@@ -108,20 +80,13 @@ pnpm types
- **Security**: Cache protection middleware prevents sensitive data caching in browsers/proxies
### Frontend Architecture
- **Framework**: Next.js 15 App Router (client-first approach)
- **Data Fetching**: Type-safe generated API hooks via Orval + React Query
- **State Management**: React Query for server state, co-located UI state in components/hooks
- **Component Structure**: Separate render logic (`.tsx`) from business logic (`use*.ts` hooks)
- **Framework**: Next.js App Router with React Server Components
- **State Management**: React hooks + Supabase client for real-time updates
- **Workflow Builder**: Visual graph editor using @xyflow/react
- **UI Components**: shadcn/ui (Radix UI primitives) with Tailwind CSS styling
- **Icons**: Phosphor Icons only
- **UI Components**: Radix UI primitives with Tailwind CSS styling
- **Feature Flags**: LaunchDarkly integration
- **Error Handling**: ErrorCard for render errors, toast for mutations, Sentry for exceptions
- **Testing**: Playwright for E2E, Storybook for component development
### Key Concepts
1. **Agent Graphs**: Workflow definitions stored as JSON, executed by the backend
2. **Blocks**: Reusable components in `/backend/blocks/` that perform specific tasks
3. **Integrations**: OAuth and API connections stored per user
@@ -129,16 +94,13 @@ pnpm types
5. **Virus Scanning**: ClamAV integration for file upload security
### Testing Approach
- Backend uses pytest with snapshot testing for API responses
- Test files are colocated with source files (`*_test.py`)
- Frontend uses Playwright for E2E tests
- Component testing via Storybook
### Database Schema
Key models (defined in `/backend/schema.prisma`):
- `User`: Authentication and profile data
- `AgentGraph`: Workflow definitions with version control
- `AgentGraphExecution`: Execution history and results
@@ -146,82 +108,35 @@ Key models (defined in `/backend/schema.prisma`):
- `StoreListing`: Marketplace listings for sharing agents
### Environment Configuration
#### Configuration Files
- **Backend**: `/backend/.env.default` (defaults) → `/backend/.env` (user overrides)
- **Frontend**: `/frontend/.env.default` (defaults) → `/frontend/.env` (user overrides)
- **Platform**: `/.env.default` (Supabase/shared defaults) → `/.env` (user overrides)
#### Docker Environment Loading Order
1. `.env.default` files provide base configuration (tracked in git)
2. `.env` files provide user-specific overrides (gitignored)
3. Docker Compose `environment:` sections provide service-specific overrides
4. Shell environment variables have highest precedence
#### Key Points
- All services use hardcoded defaults in docker-compose files (no `${VARIABLE}` substitutions)
- The `env_file` directive loads variables INTO containers at runtime
- Backend/Frontend services use YAML anchors for consistent configuration
- Supabase services (`db/docker/docker-compose.yml`) follow the same pattern
- Backend: `.env` file in `/backend`
- Frontend: `.env.local` file in `/frontend`
- Both require Supabase credentials and API keys for various services
### Common Development Tasks
**Adding a new block:**
Follow the comprehensive [Block SDK Guide](../../../docs/content/platform/block-sdk-guide.md) which covers:
- Provider configuration with `ProviderBuilder`
- Block schema definition
- Authentication (API keys, OAuth, webhooks)
- Testing and validation
- File organization
Quick steps:
1. Create new file in `/backend/backend/blocks/`
2. Configure provider using `ProviderBuilder` in `_config.py`
3. Inherit from `Block` base class
4. Define input/output schemas using `BlockSchema`
5. Implement async `run` method
6. Generate unique block ID using `uuid.uuid4()`
7. Test with `poetry run pytest backend/blocks/test/test_block.py`
Note: when making many new blocks analyze the interfaces for each of these blocks and picture if they would go well together in a graph based editor or would they struggle to connect productively?
ex: do the inputs and outputs tie well together?
If you get any pushback or hit complex block conditions check the new_blocks guide in the docs.
2. Inherit from `Block` base class
3. Define input/output schemas
4. Implement `run` method
5. Register in block registry
6. Generate the block uuid using `uuid.uuid4()`
**Modifying the API:**
1. Update route in `/backend/backend/server/routers/`
2. Add/update Pydantic models in same directory
3. Write tests alongside the route file
4. Run `poetry run test` to verify
**Frontend feature development:**
See `/frontend/CONTRIBUTING.md` for complete patterns. Quick reference:
1. **Pages**: Create in `src/app/(platform)/feature-name/page.tsx`
- Add `usePageName.ts` hook for logic
- Put sub-components in local `components/` folder
2. **Components**: Structure as `ComponentName/ComponentName.tsx` + `useComponentName.ts` + `helpers.ts`
- Use design system components from `src/components/` (atoms, molecules, organisms)
- Never use `src/components/__legacy__/*`
3. **Data fetching**: Use generated API hooks from `@/app/api/__generated__/endpoints/`
- Regenerate with `pnpm generate:api`
- Pattern: `use{Method}{Version}{OperationName}`
4. **Styling**: Tailwind CSS only, use design tokens, Phosphor Icons only
5. **Testing**: Add Storybook stories for new components, Playwright for E2E
6. **Code conventions**: Function declarations (not arrow functions) for components/handlers
1. Components go in `/frontend/src/components/`
2. Use existing UI components from `/frontend/src/components/ui/`
3. Add Storybook stories for new components
4. Test with Playwright if user-facing
### Security Implementation
**Cache Protection Middleware:**
- Located in `/backend/backend/server/middleware/security.py`
- Default behavior: Disables caching for ALL endpoints with `Cache-Control: no-store, no-cache, must-revalidate, private`
- Uses an allow list approach - only explicitly permitted paths can be cached
@@ -229,47 +144,3 @@ See `/frontend/CONTRIBUTING.md` for complete patterns. Quick reference:
- Prevents sensitive data (auth tokens, API keys, user data) from being cached by browsers/proxies
- To allow caching for a new endpoint, add it to `CACHEABLE_PATHS` in the middleware
- Applied to both main API server and external API applications
### Creating Pull Requests
- Create the PR aginst the `dev` branch of the repository.
- Ensure the branch name is descriptive (e.g., `feature/add-new-block`)/
- Use conventional commit messages (see below)/
- Fill out the .github/PULL_REQUEST_TEMPLATE.md template as the PR description/
- Run the github pre-commit hooks to ensure code quality.
### Reviewing/Revising Pull Requests
- When the user runs /pr-comments or tries to fetch them, also run gh api /repos/Significant-Gravitas/AutoGPT/pulls/[issuenum]/reviews to get the reviews
- Use gh api /repos/Significant-Gravitas/AutoGPT/pulls/[issuenum]/reviews/[review_id]/comments to get the review contents
- Use gh api /repos/Significant-Gravitas/AutoGPT/issues/9924/comments to get the pr specific comments
### Conventional Commits
Use this format for commit messages and Pull Request titles:
**Conventional Commit Types:**
- `feat`: Introduces a new feature to the codebase
- `fix`: Patches a bug in the codebase
- `refactor`: Code change that neither fixes a bug nor adds a feature; also applies to removing features
- `ci`: Changes to CI configuration
- `docs`: Documentation-only changes
- `dx`: Improvements to the developer experience
**Recommended Base Scopes:**
- `platform`: Changes affecting both frontend and backend
- `frontend`
- `backend`
- `infra`
- `blocks`: Modifications/additions of individual blocks
**Subscope Examples:**
- `backend/executor`
- `backend/db`
- `frontend/builder` (includes changes to the block UI component)
- `infra/prod`
Use these scopes and subscopes for clarity and consistency in commit messages.

View File

@@ -1,64 +0,0 @@
.PHONY: start-core stop-core logs-core format lint migrate run-backend run-frontend load-store-agents
# Run just Supabase + Redis + RabbitMQ
start-core:
docker compose up -d deps
# Stop core services
stop-core:
docker compose stop
reset-db:
docker compose stop db
rm -rf db/docker/volumes/db/data
cd backend && poetry run prisma migrate deploy
cd backend && poetry run prisma generate
cd backend && poetry run gen-prisma-stub
# View logs for core services
logs-core:
docker compose logs -f deps
# Run formatting and linting for backend and frontend
format:
cd backend && poetry run format
cd frontend && pnpm format
cd frontend && pnpm lint
init-env:
cp -n .env.default .env || true
cd backend && cp -n .env.default .env || true
cd frontend && cp -n .env.default .env || true
# Run migrations for backend
migrate:
cd backend && poetry run prisma migrate deploy
cd backend && poetry run prisma generate
cd backend && poetry run gen-prisma-stub
run-backend:
cd backend && poetry run app
run-frontend:
cd frontend && pnpm dev
test-data:
cd backend && poetry run python test/test_data_creator.py
load-store-agents:
cd backend && poetry run load-store-agents
help:
@echo "Usage: make <target>"
@echo "Targets:"
@echo " start-core - Start just the core services (Supabase, Redis, RabbitMQ) in background"
@echo " stop-core - Stop the core services"
@echo " reset-db - Reset the database by deleting the volume"
@echo " logs-core - Tail the logs for core services"
@echo " format - Format & lint backend (Python) and frontend (TypeScript) code"
@echo " migrate - Run backend database migrations"
@echo " run-backend - Run the backend FastAPI server"
@echo " run-frontend - Run the frontend Next.js development server"
@echo " test-data - Run the test data creator"
@echo " load-store-agents - Load store agents from agents/ folder into test database"

View File

@@ -8,6 +8,7 @@ Welcome to the AutoGPT Platform - a powerful system for creating and running AI
- Docker
- Docker Compose V2 (comes with Docker Desktop, or can be installed separately)
- Node.js & NPM (for running the frontend application)
### Running the System
@@ -23,10 +24,10 @@ To run the AutoGPT Platform, follow these steps:
2. Run the following command:
```
cp .env.default .env
cp .env.example .env
```
This command will copy the `.env.default` file to `.env`. You can modify the `.env` file to add your own environment variables.
This command will copy the `.env.example` file to `.env`. You can modify the `.env` file to add your own environment variables.
3. Run the following command:
@@ -36,38 +37,44 @@ To run the AutoGPT Platform, follow these steps:
This command will start all the necessary backend services defined in the `docker-compose.yml` file in detached mode.
4. After all the services are in ready state, open your browser and navigate to `http://localhost:3000` to access the AutoGPT Platform frontend.
4. Navigate to `frontend` within the `autogpt_platform` directory:
### Running Just Core services
```
cd frontend
```
You can now run the following to enable just the core services.
You will need to run your frontend application separately on your local machine.
```
# For help
make help
5. Run the following command:
# Run just Supabase + Redis + RabbitMQ
make start-core
```
cp .env.example .env.local
```
# Stop core services
make stop-core
This command will copy the `.env.example` file to `.env.local` in the `frontend` directory. You can modify the `.env.local` within this folder to add your own environment variables for the frontend application.
# View logs from core services
make logs-core
6. Run the following command:
# Run formatting and linting for backend and frontend
make format
Enable corepack and install dependencies by running:
# Run migrations for backend database
make migrate
```
corepack enable
pnpm i
```
# Run backend server
make run-backend
Generate the API client (this step is required before running the frontend):
# Run frontend development server
make run-frontend
```
pnpm generate:api-client
```
```
Then start the frontend application in development mode:
```
pnpm dev
```
7. Open your browser and navigate to `http://localhost:3000` to access the AutoGPT Platform frontend.
### Docker Compose Commands
@@ -170,21 +177,20 @@ The platform includes scripts for generating and managing the API client:
- `pnpm fetch:openapi`: Fetches the OpenAPI specification from the backend service (requires backend to be running on port 8006)
- `pnpm generate:api-client`: Generates the TypeScript API client from the OpenAPI specification using Orval
- `pnpm generate:api`: Runs both fetch and generate commands in sequence
- `pnpm generate:api-all`: Runs both fetch and generate commands in sequence
#### Manual API Client Updates
If you need to update the API client after making changes to the backend API:
1. Ensure the backend services are running:
```
docker compose up -d
```
2. Generate the updated API client:
```
pnpm generate:api
pnpm generate:api-all
```
This will fetch the latest OpenAPI specification and regenerate the TypeScript client code.

View File

@@ -0,0 +1,802 @@
# DatabaseManager Technical Specification
## Executive Summary
This document provides a complete technical specification for implementing a drop-in replacement for the AutoGPT Platform's DatabaseManager service. The replacement must maintain 100% API compatibility while preserving all functional behaviors, security requirements, and performance characteristics.
## 1. System Overview
### 1.1 Purpose
The DatabaseManager is a centralized service that provides database access for the AutoGPT Platform's executor system. It encapsulates all database operations behind a service interface, enabling distributed execution while maintaining data consistency and security.
### 1.2 Architecture Pattern
- **Service Type**: HTTP-based microservice using FastAPI
- **Communication**: RPC-style over HTTP with JSON serialization
- **Base Class**: Inherits from `AppService` (backend.util.service)
- **Client Classes**: `DatabaseManagerClient` (sync) and `DatabaseManagerAsyncClient` (async)
- **Port**: Configurable via `config.database_api_port`
### 1.3 Critical Requirements
1. **API Compatibility**: All 40+ exposed methods must maintain exact signatures
2. **Type Safety**: Full type preservation across service boundaries
3. **User Isolation**: All operations must respect user_id boundaries
4. **Transaction Support**: Maintain ACID properties for critical operations
5. **Event Publishing**: Maintain Redis event bus integration for real-time updates
## 2. Service Implementation Requirements
### 2.1 Base Service Class
```python
from backend.util.service import AppService, expose
from backend.util.settings import Config
from backend.data import db
import logging
class DatabaseManager(AppService):
"""
REQUIRED: Inherit from AppService to get:
- Automatic endpoint generation via @expose decorator
- Built-in health checks at /health
- Request/response serialization
- Error handling and logging
"""
def run_service(self) -> None:
"""REQUIRED: Initialize database connection before starting service"""
logger.info(f"[{self.service_name}] ⏳ Connecting to Database...")
self.run_and_wait(db.connect()) # CRITICAL: Must connect to database
super().run_service() # Start HTTP server
def cleanup(self):
"""REQUIRED: Clean disconnect on shutdown"""
super().cleanup()
logger.info(f"[{self.service_name}] ⏳ Disconnecting Database...")
self.run_and_wait(db.disconnect()) # CRITICAL: Must disconnect cleanly
@classmethod
def get_port(cls) -> int:
"""REQUIRED: Return configured port"""
return config.database_api_port
```
### 2.2 Method Exposure Pattern
```python
@staticmethod
def _(f: Callable[P, R], name: str | None = None) -> Callable[Concatenate[object, P], R]:
"""
REQUIRED: Helper to expose methods with proper signatures
- Preserves function name for endpoint generation
- Maintains type information
- Adds 'self' parameter for instance binding
"""
if name is not None:
f.__name__ = name
return cast(Callable[Concatenate[object, P], R], expose(f))
```
### 2.3 Database Connection Management
**REQUIRED: Use Prisma ORM with these exact configurations:**
```python
from prisma import Prisma
prisma = Prisma(
auto_register=True,
http={"timeout": HTTP_TIMEOUT}, # Default: 120 seconds
datasource={"url": DATABASE_URL}
)
# Connection lifecycle
async def connect():
await prisma.connect()
async def disconnect():
await prisma.disconnect()
```
### 2.4 Transaction Support
**REQUIRED: Implement both regular and locked transactions:**
```python
async def transaction(timeout: float | None = None):
"""Regular database transaction"""
async with prisma.tx(timeout=timeout) as tx:
yield tx
async def locked_transaction(key: str, timeout: float | None = None):
"""Transaction with PostgreSQL advisory lock"""
lock_key = zlib.crc32(key.encode("utf-8"))
async with transaction(timeout=timeout) as tx:
await tx.execute_raw("SELECT pg_advisory_xact_lock($1)", lock_key)
yield tx
```
## 3. Complete API Specification
### 3.1 Execution Management APIs
#### get_graph_execution
```python
async def get_graph_execution(
user_id: str,
execution_id: str,
*,
include_node_executions: bool = False
) -> GraphExecution | GraphExecutionWithNodes | None
```
**Behavior**:
- Returns execution only if user_id matches
- Optionally includes all node executions
- Returns None if not found or unauthorized
#### get_graph_executions
```python
async def get_graph_executions(
user_id: str,
graph_id: str | None = None,
*,
limit: int = 50,
graph_version: int | None = None,
cursor: str | None = None,
preset_id: str | None = None
) -> tuple[list[GraphExecution], str | None]
```
**Behavior**:
- Paginated results with cursor
- Filter by graph_id, version, or preset_id
- Returns (executions, next_cursor)
#### create_graph_execution
```python
async def create_graph_execution(
graph_id: str,
graph_version: int,
starting_nodes_input: dict[str, dict[str, Any]],
user_id: str,
preset_id: str | None = None
) -> GraphExecutionWithNodes
```
**Behavior**:
- Creates execution with status "QUEUED"
- Initializes all nodes with "PENDING" status
- Publishes creation event to Redis
- Uses locked transaction on graph_id
#### update_graph_execution_start_time
```python
async def update_graph_execution_start_time(
graph_exec_id: str
) -> None
```
**Behavior**:
- Sets start_time to current timestamp
- Only updates if currently NULL
#### update_graph_execution_stats
```python
async def update_graph_execution_stats(
graph_exec_id: str,
status: AgentExecutionStatus | None = None,
stats: dict[str, Any] | None = None
) -> GraphExecution | None
```
**Behavior**:
- Updates status and/or stats atomically
- Sets end_time if status is terminal (COMPLETED/FAILED)
- Publishes update event to Redis
- Returns updated execution
#### get_node_execution
```python
async def get_node_execution(
node_exec_id: str
) -> NodeExecutionResult | None
```
**Behavior**:
- No user_id check (relies on graph execution security)
- Includes all input/output data
#### get_node_executions
```python
async def get_node_executions(
graph_exec_id: str
) -> list[NodeExecutionResult]
```
**Behavior**:
- Returns all node executions for graph
- Ordered by creation time
#### get_latest_node_execution
```python
async def get_latest_node_execution(
graph_exec_id: str,
node_id: str
) -> NodeExecutionResult | None
```
**Behavior**:
- Returns most recent execution of specific node
- Used for retry/rerun scenarios
#### update_node_execution_status
```python
async def update_node_execution_status(
node_exec_id: str,
status: AgentExecutionStatus,
execution_data: dict[str, Any] | None = None,
stats: dict[str, Any] | None = None
) -> NodeExecutionResult
```
**Behavior**:
- Updates status atomically
- Sets end_time for terminal states
- Optionally updates stats/data
- Publishes event to Redis
- Returns updated execution
#### update_node_execution_status_batch
```python
async def update_node_execution_status_batch(
execution_updates: list[NodeExecutionUpdate]
) -> list[NodeExecutionResult]
```
**Behavior**:
- Batch update multiple nodes in single transaction
- Each update can have different status/stats
- Publishes events for all updates
- Returns all updated executions
#### update_node_execution_stats
```python
async def update_node_execution_stats(
node_exec_id: str,
stats: dict[str, Any]
) -> NodeExecutionResult
```
**Behavior**:
- Updates only stats field
- Merges with existing stats
- Does not affect status
#### upsert_execution_input
```python
async def upsert_execution_input(
node_id: str,
graph_exec_id: str,
input_name: str,
input_data: Any,
node_exec_id: str | None = None
) -> tuple[str, BlockInput]
```
**Behavior**:
- Creates or updates input data
- If node_exec_id not provided, creates node execution
- Serializes input_data to JSON
- Returns (node_exec_id, input_object)
#### upsert_execution_output
```python
async def upsert_execution_output(
node_exec_id: str,
output_name: str,
output_data: Any
) -> None
```
**Behavior**:
- Creates or updates output data
- Serializes output_data to JSON
- No return value
#### get_execution_kv_data
```python
async def get_execution_kv_data(
user_id: str,
key: str
) -> Any | None
```
**Behavior**:
- User-scoped key-value storage
- Returns deserialized JSON data
- Returns None if key not found
#### set_execution_kv_data
```python
async def set_execution_kv_data(
user_id: str,
node_exec_id: str,
key: str,
data: Any
) -> Any | None
```
**Behavior**:
- Sets user-scoped key-value data
- Associates with node execution
- Serializes data to JSON
- Returns previous value or None
#### get_block_error_stats
```python
async def get_block_error_stats() -> list[BlockErrorStats]
```
**Behavior**:
- Aggregates error counts by block_id
- Last 7 days of data
- Groups by error type
### 3.2 Graph Management APIs
#### get_node
```python
async def get_node(
node_id: str
) -> AgentNode | None
```
**Behavior**:
- Returns node with block data
- No user_id check (public blocks)
#### get_graph
```python
async def get_graph(
graph_id: str,
version: int | None = None,
user_id: str | None = None,
for_export: bool = False,
include_subgraphs: bool = False
) -> GraphModel | None
```
**Behavior**:
- Returns latest version if version=None
- Checks user_id for private graphs
- for_export=True excludes internal fields
- include_subgraphs=True loads nested graphs
#### get_connected_output_nodes
```python
async def get_connected_output_nodes(
node_id: str,
output_name: str
) -> list[tuple[AgentNode, AgentNodeLink]]
```
**Behavior**:
- Returns downstream nodes connected to output
- Includes link metadata
- Used for execution flow
#### get_graph_metadata
```python
async def get_graph_metadata(
graph_id: str,
user_id: str
) -> GraphMetadata | None
```
**Behavior**:
- Returns graph metadata without full definition
- User must own or have access to graph
### 3.3 Credit System APIs
#### get_credits
```python
async def get_credits(
user_id: str
) -> int
```
**Behavior**:
- Returns current credit balance
- Always non-negative
#### spend_credits
```python
async def spend_credits(
user_id: str,
cost: int,
metadata: UsageTransactionMetadata
) -> int
```
**Behavior**:
- Deducts credits atomically
- Creates transaction record
- Throws InsufficientCredits if balance too low
- Returns new balance
- metadata includes: block_id, node_exec_id, context
### 3.4 User Management APIs
#### get_user_metadata
```python
async def get_user_metadata(
user_id: str
) -> UserMetadata
```
**Behavior**:
- Returns user preferences and settings
- Creates default if not exists
#### update_user_metadata
```python
async def update_user_metadata(
user_id: str,
data: UserMetadataDTO
) -> UserMetadata
```
**Behavior**:
- Partial update of metadata
- Validates against schema
- Returns updated metadata
#### get_user_integrations
```python
async def get_user_integrations(
user_id: str
) -> UserIntegrations
```
**Behavior**:
- Returns OAuth credentials
- Decrypts sensitive data
- Creates empty if not exists
#### update_user_integrations
```python
async def update_user_integrations(
user_id: str,
data: UserIntegrations
) -> None
```
**Behavior**:
- Updates integration credentials
- Encrypts sensitive data
- No return value
### 3.5 User Communication APIs
#### get_active_user_ids_in_timerange
```python
async def get_active_user_ids_in_timerange(
start_time: datetime,
end_time: datetime
) -> list[str]
```
**Behavior**:
- Returns users with graph executions in range
- Used for analytics/notifications
#### get_user_email_by_id
```python
async def get_user_email_by_id(
user_id: str
) -> str | None
```
**Behavior**:
- Returns user's email address
- None if user not found
#### get_user_email_verification
```python
async def get_user_email_verification(
user_id: str
) -> UserEmailVerification
```
**Behavior**:
- Returns email and verification status
- Used for notification filtering
#### get_user_notification_preference
```python
async def get_user_notification_preference(
user_id: str
) -> NotificationPreference
```
**Behavior**:
- Returns notification settings
- Creates default if not exists
### 3.6 Notification APIs
#### create_or_add_to_user_notification_batch
```python
async def create_or_add_to_user_notification_batch(
user_id: str,
notification_type: NotificationType,
notification_data: NotificationEvent
) -> UserNotificationBatchDTO
```
**Behavior**:
- Adds to existing batch or creates new
- Batches by type for efficiency
- Returns updated batch
#### empty_user_notification_batch
```python
async def empty_user_notification_batch(
user_id: str,
notification_type: NotificationType
) -> None
```
**Behavior**:
- Clears all notifications of type
- Used after sending batch
#### get_all_batches_by_type
```python
async def get_all_batches_by_type(
notification_type: NotificationType
) -> list[UserNotificationBatchDTO]
```
**Behavior**:
- Returns all user batches of type
- Used by notification service
#### get_user_notification_batch
```python
async def get_user_notification_batch(
user_id: str,
notification_type: NotificationType
) -> UserNotificationBatchDTO | None
```
**Behavior**:
- Returns user's batch for type
- None if no batch exists
#### get_user_notification_oldest_message_in_batch
```python
async def get_user_notification_oldest_message_in_batch(
user_id: str,
notification_type: NotificationType
) -> NotificationEvent | None
```
**Behavior**:
- Returns oldest notification in batch
- Used for batch timing decisions
## 4. Client Implementation Requirements
### 4.1 Synchronous Client
```python
class DatabaseManagerClient(AppServiceClient):
"""
REQUIRED: Synchronous client that:
- Converts async methods to sync using endpoint_to_sync
- Maintains exact method signatures
- Handles connection pooling
- Implements retry logic
"""
@classmethod
def get_service_type(cls):
return DatabaseManager
# Example method mapping
get_graph_execution = endpoint_to_sync(DatabaseManager.get_graph_execution)
```
### 4.2 Asynchronous Client
```python
class DatabaseManagerAsyncClient(AppServiceClient):
"""
REQUIRED: Async client that:
- Directly references async methods
- No conversion needed
- Shares connection pool
"""
@classmethod
def get_service_type(cls):
return DatabaseManager
# Direct method reference
get_graph_execution = DatabaseManager.get_graph_execution
```
## 5. Data Models
### 5.1 Core Enums
```python
class AgentExecutionStatus(str, Enum):
PENDING = "PENDING"
QUEUED = "QUEUED"
RUNNING = "RUNNING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
CANCELED = "CANCELED"
class NotificationType(str, Enum):
SYSTEM = "SYSTEM"
REVIEW = "REVIEW"
EXECUTION = "EXECUTION"
MARKETING = "MARKETING"
```
### 5.2 Key Data Models
All models must exactly match the Prisma schema definitions. Key models include:
- `GraphExecution`: Execution metadata with stats
- `GraphExecutionWithNodes`: Includes all node executions
- `NodeExecutionResult`: Node execution with I/O data
- `GraphModel`: Complete graph definition
- `UserIntegrations`: OAuth credentials
- `UsageTransactionMetadata`: Credit usage context
- `NotificationEvent`: Individual notification data
## 6. Security Requirements
### 6.1 User Isolation
- **CRITICAL**: All user-scoped operations MUST filter by user_id
- Never expose data across user boundaries
- Use database-level row security where possible
### 6.2 Authentication
- Service assumes authentication handled by API gateway
- user_id parameter is trusted after authentication
- No additional auth checks within service
### 6.3 Data Protection
- Encrypt sensitive integration credentials
- Use HMAC for unsubscribe tokens
- Never log sensitive data
## 7. Performance Requirements
### 7.1 Connection Management
- Maintain persistent database connection
- Use connection pooling (default: 10 connections)
- Implement exponential backoff for retries
### 7.2 Query Optimization
- Use indexes for all WHERE clauses
- Batch operations where possible
- Limit default result sets (50 items)
### 7.3 Event Publishing
- Publish events asynchronously
- Don't block on event delivery
- Use fire-and-forget pattern
## 8. Error Handling
### 8.1 Standard Exceptions
```python
class InsufficientCredits(Exception):
"""Raised when user lacks credits"""
class NotFoundError(Exception):
"""Raised when entity not found"""
class AuthorizationError(Exception):
"""Raised when user lacks access"""
```
### 8.2 Error Response Format
```json
{
"error": "error_type",
"message": "Human readable message",
"details": {} // Optional additional context
}
```
## 9. Testing Requirements
### 9.1 Unit Tests
- Test each method in isolation
- Mock database calls
- Verify user_id filtering
### 9.2 Integration Tests
- Test with real database
- Verify transaction boundaries
- Test concurrent operations
### 9.3 Service Tests
- Test HTTP endpoint generation
- Verify serialization/deserialization
- Test error handling
## 10. Implementation Checklist
### Phase 1: Core Service Setup
- [ ] Create DatabaseManager class inheriting from AppService
- [ ] Implement run_service() with database connection
- [ ] Implement cleanup() with proper disconnect
- [ ] Configure port from settings
- [ ] Set up method exposure helper
### Phase 2: Execution APIs (15 methods)
- [ ] get_graph_execution
- [ ] get_graph_executions
- [ ] get_graph_execution_meta
- [ ] create_graph_execution
- [ ] update_graph_execution_start_time
- [ ] update_graph_execution_stats
- [ ] get_node_execution
- [ ] get_node_executions
- [ ] get_latest_node_execution
- [ ] update_node_execution_status
- [ ] update_node_execution_status_batch
- [ ] update_node_execution_stats
- [ ] upsert_execution_input
- [ ] upsert_execution_output
- [ ] get_execution_kv_data
- [ ] set_execution_kv_data
- [ ] get_block_error_stats
### Phase 3: Graph APIs (4 methods)
- [ ] get_node
- [ ] get_graph
- [ ] get_connected_output_nodes
- [ ] get_graph_metadata
### Phase 4: Credit APIs (2 methods)
- [ ] get_credits
- [ ] spend_credits
### Phase 5: User APIs (4 methods)
- [ ] get_user_metadata
- [ ] update_user_metadata
- [ ] get_user_integrations
- [ ] update_user_integrations
### Phase 6: Communication APIs (4 methods)
- [ ] get_active_user_ids_in_timerange
- [ ] get_user_email_by_id
- [ ] get_user_email_verification
- [ ] get_user_notification_preference
### Phase 7: Notification APIs (5 methods)
- [ ] create_or_add_to_user_notification_batch
- [ ] empty_user_notification_batch
- [ ] get_all_batches_by_type
- [ ] get_user_notification_batch
- [ ] get_user_notification_oldest_message_in_batch
### Phase 8: Client Implementation
- [ ] Create DatabaseManagerClient with sync methods
- [ ] Create DatabaseManagerAsyncClient with async methods
- [ ] Test client method generation
- [ ] Verify type preservation
### Phase 9: Integration Testing
- [ ] Test all methods with real database
- [ ] Verify user isolation
- [ ] Test error scenarios
- [ ] Performance testing
- [ ] Event publishing verification
### Phase 10: Deployment Validation
- [ ] Deploy to test environment
- [ ] Run integration test suite
- [ ] Verify backward compatibility
- [ ] Performance benchmarking
- [ ] Production deployment
## 11. Success Criteria
The implementation is successful when:
1. **All 40+ methods** produce identical outputs to the original
2. **Performance** is within 10% of original implementation
3. **All tests** pass without modification
4. **No breaking changes** to any client code
5. **Security boundaries** are maintained
6. **Event publishing** works identically
7. **Error handling** matches original behavior
## 12. Critical Implementation Notes
1. **DO NOT** modify any function signatures
2. **DO NOT** change any return types
3. **DO NOT** add new required parameters
4. **DO NOT** remove any functionality
5. **ALWAYS** maintain user_id isolation
6. **ALWAYS** publish events for state changes
7. **ALWAYS** use transactions for multi-step operations
8. **ALWAYS** handle errors exactly as original
This specification, when implemented correctly, will produce a drop-in replacement for the DatabaseManager that maintains 100% compatibility with the existing system.

View File

@@ -0,0 +1,765 @@
# Notification Service Technical Specification
## Overview
The AutoGPT Platform Notification Service is a RabbitMQ-based asynchronous notification system that handles various types of user notifications including real-time alerts, batched notifications, and scheduled summaries. The service supports email delivery via Postmark and system alerts via Discord.
## Architecture Overview
### Core Components
1. **NotificationManager Service** (`notifications.py`)
- AppService implementation with RabbitMQ integration
- Processes notification queues asynchronously
- Manages batching strategies and delivery timing
- Handles email templating and sending
2. **RabbitMQ Message Broker**
- Multiple queues for different notification strategies
- Dead letter exchange for failed messages
- Topic-based routing for message distribution
3. **Email Sender** (`email.py`)
- Postmark integration for email delivery
- Jinja2 template rendering
- HTML email composition with unsubscribe headers
4. **Database Storage**
- Notification batching tables
- User preference storage
- Email verification tracking
## Service Exposure Mechanism
### AppService Framework
The NotificationManager extends `AppService` which automatically exposes methods decorated with `@expose` as HTTP endpoints:
```python
class NotificationManager(AppService):
@expose
def queue_weekly_summary(self):
# Implementation
@expose
def process_existing_batches(self, notification_types: list[NotificationType]):
# Implementation
@expose
async def discord_system_alert(self, content: str):
# Implementation
```
### Automatic HTTP Endpoint Creation
When the service starts, the AppService base class:
1. Scans for methods with `@expose` decorator
2. Creates FastAPI routes for each exposed method:
- Route path: `/{method_name}`
- HTTP method: POST
- Endpoint handler: Generated via `_create_fastapi_endpoint()`
### Service Client Access
#### NotificationManagerClient
```python
class NotificationManagerClient(AppServiceClient):
@classmethod
def get_service_type(cls):
return NotificationManager
# Direct method references (sync)
process_existing_batches = NotificationManager.process_existing_batches
queue_weekly_summary = NotificationManager.queue_weekly_summary
# Async-to-sync conversion
discord_system_alert = endpoint_to_sync(NotificationManager.discord_system_alert)
```
#### Client Usage Pattern
```python
# Get client instance
client = get_service_client(NotificationManagerClient)
# Call exposed methods via HTTP
client.process_existing_batches([NotificationType.AGENT_RUN])
client.queue_weekly_summary()
client.discord_system_alert("System alert message")
```
### HTTP Communication Details
1. **Service URL**: `http://{host}:{notification_service_port}`
- Default port: 8007
- Host: Configurable via settings
2. **Request Format**:
- Method: POST
- Path: `/{method_name}`
- Body: JSON with method parameters
3. **Client Implementation**:
- Uses `httpx` for HTTP requests
- Automatic retry on connection failures
- Configurable timeout (default from api_call_timeout)
### Direct Function Calls
The service also exposes two functions that can be called directly without going through the service client:
```python
# Sync version - used by ExecutionManager
def queue_notification(event: NotificationEventModel) -> NotificationResult
# Async version - used by credit system
async def queue_notification_async(event: NotificationEventModel) -> NotificationResult
```
These functions:
- Connect directly to RabbitMQ
- Publish messages to appropriate queues
- Return success/failure status
- Are NOT exposed via HTTP
## Message Queuing Architecture
### RabbitMQ Configuration
#### Exchanges
```python
NOTIFICATION_EXCHANGE = Exchange(name="notifications", type=ExchangeType.TOPIC)
DEAD_LETTER_EXCHANGE = Exchange(name="dead_letter", type=ExchangeType.TOPIC)
```
#### Queues
1. **immediate_notifications**
- Routing Key: `notification.immediate.#`
- Dead Letter: `failed.immediate`
- For: Critical alerts, errors
2. **admin_notifications**
- Routing Key: `notification.admin.#`
- Dead Letter: `failed.admin`
- For: Refund requests, system alerts
3. **summary_notifications**
- Routing Key: `notification.summary.#`
- Dead Letter: `failed.summary`
- For: Daily/weekly summaries
4. **batch_notifications**
- Routing Key: `notification.batch.#`
- Dead Letter: `failed.batch`
- For: Agent runs, batched events
5. **failed_notifications**
- Routing Key: `failed.#`
- For: All failed messages
### Queue Strategies (QueueType enum)
1. **IMMEDIATE**: Send right away (errors, critical notifications)
2. **BATCH**: Batch for configured delay (agent runs)
3. **SUMMARY**: Scheduled digest (daily/weekly summaries)
4. **BACKOFF**: Exponential backoff strategy (defined but not fully implemented)
5. **ADMIN**: Admin-only notifications
## Notification Types
### Enum Values (NotificationType)
```python
AGENT_RUN # Batch strategy, 1 day delay
ZERO_BALANCE # Backoff strategy, 60 min delay
LOW_BALANCE # Immediate strategy
BLOCK_EXECUTION_FAILED # Backoff strategy, 60 min delay
CONTINUOUS_AGENT_ERROR # Backoff strategy, 60 min delay
DAILY_SUMMARY # Summary strategy
WEEKLY_SUMMARY # Summary strategy
MONTHLY_SUMMARY # Summary strategy
REFUND_REQUEST # Admin strategy
REFUND_PROCESSED # Admin strategy
```
## Integration Points
### 1. Scheduler Integration
The scheduler service (`backend.executor.scheduler`) imports monitoring functions that call the NotificationManagerClient:
```python
from backend.monitoring import (
process_existing_batches,
process_weekly_summary,
)
# These are scheduled as cron jobs
```
### 2. Execution Manager Integration
The ExecutionManager directly calls `queue_notification()` for:
- Agent run completions
- Low balance alerts
```python
from backend.notifications.notifications import queue_notification
# Called after graph execution completes
queue_notification(NotificationEventModel(
user_id=graph_exec.user_id,
type=NotificationType.AGENT_RUN,
data=AgentRunData(...)
))
```
### 3. Credit System Integration
The credit system uses `queue_notification_async()` for:
- Refund requests
- Refund processed notifications
```python
from backend.notifications.notifications import queue_notification_async
await queue_notification_async(NotificationEventModel(
user_id=user_id,
type=NotificationType.REFUND_REQUEST,
data=RefundRequestData(...)
))
```
### 4. Monitoring Module Wrappers
The monitoring module provides wrapper functions that are used by the scheduler:
```python
# backend/monitoring/notification_monitor.py
def process_existing_batches(**kwargs):
args = NotificationJobArgs(**kwargs)
get_notification_manager_client().process_existing_batches(
args.notification_types
)
def process_weekly_summary(**kwargs):
get_notification_manager_client().queue_weekly_summary()
```
## Data Models
### Base Event Model
```typescript
interface BaseEventModel {
type: NotificationType;
user_id: string;
created_at: string; // ISO datetime with timezone
}
```
### Notification Event Model
```typescript
interface NotificationEventModel<T> extends BaseEventModel {
data: T;
}
```
### Notification Data Types
#### AgentRunData
```typescript
interface AgentRunData {
agent_name: string;
credits_used: number;
execution_time: number;
node_count: number;
graph_id: string;
outputs: Array<Record<string, any>>;
}
```
#### ZeroBalanceData
```typescript
interface ZeroBalanceData {
last_transaction: number;
last_transaction_time: string; // ISO datetime with timezone
top_up_link: string;
}
```
#### LowBalanceData
```typescript
interface LowBalanceData {
agent_name: string;
current_balance: number; // credits (100 = $1)
billing_page_link: string;
shortfall: number;
}
```
#### BlockExecutionFailedData
```typescript
interface BlockExecutionFailedData {
block_name: string;
block_id: string;
error_message: string;
graph_id: string;
node_id: string;
execution_id: string;
}
```
#### ContinuousAgentErrorData
```typescript
interface ContinuousAgentErrorData {
agent_name: string;
error_message: string;
graph_id: string;
execution_id: string;
start_time: string; // ISO datetime with timezone
error_time: string; // ISO datetime with timezone
attempts: number;
}
```
#### Summary Data Types
```typescript
interface BaseSummaryData {
total_credits_used: number;
total_executions: number;
most_used_agent: string;
total_execution_time: number;
successful_runs: number;
failed_runs: number;
average_execution_time: number;
cost_breakdown: Record<string, number>;
}
interface DailySummaryData extends BaseSummaryData {
date: string; // ISO datetime with timezone
}
interface WeeklySummaryData extends BaseSummaryData {
start_date: string; // ISO datetime with timezone
end_date: string; // ISO datetime with timezone
}
```
#### RefundRequestData
```typescript
interface RefundRequestData {
user_id: string;
user_name: string;
user_email: string;
transaction_id: string;
refund_request_id: string;
reason: string;
amount: number;
balance: number;
}
```
### Summary Parameters
```typescript
interface BaseSummaryParams {
start_date: string; // ISO datetime with timezone
end_date: string; // ISO datetime with timezone
}
interface DailySummaryParams extends BaseSummaryParams {
date: string; // ISO datetime with timezone
}
interface WeeklySummaryParams extends BaseSummaryParams {
start_date: string; // ISO datetime with timezone
end_date: string; // ISO datetime with timezone
}
```
## Database Schema
### NotificationEvent Table
```sql
model NotificationEvent {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
UserNotificationBatch UserNotificationBatch? @relation
userNotificationBatchId String?
type NotificationType
data Json
@@index([userNotificationBatchId])
}
```
### UserNotificationBatch Table
```sql
model UserNotificationBatch {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
userId String
User User @relation
type NotificationType
Notifications NotificationEvent[]
@@unique([userId, type])
}
```
## API Methods
### Exposed Service Methods (via HTTP)
#### queue_weekly_summary()
- **HTTP Endpoint**: `POST /queue_weekly_summary`
- **Purpose**: Triggers weekly summary generation for all active users
- **Process**:
1. Runs in background executor
2. Queries users active in last 7 days
3. Queues summary notification for each user
- **Used by**: Scheduler service (via cron)
#### process_existing_batches(notification_types: list[NotificationType])
- **HTTP Endpoint**: `POST /process_existing_batches`
- **Purpose**: Processes aged-out batches for specified notification types
- **Process**:
1. Runs in background executor
2. Retrieves all batches for given types
3. Checks if oldest message exceeds max delay
4. Sends batched email if aged out
5. Clears processed batches
- **Used by**: Scheduler service (via cron)
#### discord_system_alert(content: str)
- **HTTP Endpoint**: `POST /discord_system_alert`
- **Purpose**: Sends system alerts to Discord channel
- **Async**: Yes (converted to sync by client)
- **Used by**: Monitoring services
### Direct Queue Functions (not via HTTP)
#### queue_notification(event: NotificationEventModel) -> NotificationResult
- **Purpose**: Queue a notification (sync version)
- **Used by**: ExecutionManager (same process)
- **Direct RabbitMQ**: Yes
#### queue_notification_async(event: NotificationEventModel) -> NotificationResult
- **Purpose**: Queue a notification (async version)
- **Used by**: Credit system (async context)
- **Direct RabbitMQ**: Yes
## Message Processing Flow
### 1. Message Routing
```python
def get_routing_key(event_type: NotificationType) -> str:
strategy = NotificationTypeOverride(event_type).strategy
if strategy == QueueType.IMMEDIATE:
return f"notification.immediate.{event_type.value}"
elif strategy == QueueType.BATCH:
return f"notification.batch.{event_type.value}"
# ... etc
```
### 2. Queue Processing Methods
#### _process_immediate(message: str) -> bool
1. Parse message to NotificationEventModel
2. Retrieve user email
3. Check user preferences and email verification
4. Send email immediately via EmailSender
5. Return True if successful
#### _process_batch(message: str) -> bool
1. Parse message to NotificationEventModel
2. Add to user's notification batch
3. Check if batch is old enough (based on delay)
4. If aged out:
- Retrieve all batch messages
- Send combined email
- Clear batch
5. Return True if processed or batched
#### _process_summary(message: str) -> bool
1. Parse message to SummaryParamsEventModel
2. Gather summary data (credits, executions, etc.)
- **Note**: Currently returns hardcoded placeholder data
3. Format and send summary email
4. Return True if successful
#### _process_admin_message(message: str) -> bool
1. Parse message
2. Send to configured admin email
3. No user preference checks
4. Return True if successful
## Email Delivery
### EmailSender Class
#### Template Loading
- Base template: `templates/base.html.jinja2`
- Notification templates: `templates/{notification_type}.html.jinja2`
- Subject templates from NotificationTypeOverride
- **Note**: Templates use `.html.jinja2` extension, not just `.html`
#### Email Composition
```python
def send_templated(
notification: NotificationType,
user_email: str,
data: NotificationEventModel | list[NotificationEventModel],
user_unsub_link: str | None = None
)
```
#### Postmark Integration
- API Token: `settings.secrets.postmark_server_api_token`
- Sender Email: `settings.config.postmark_sender_email`
- Headers:
- `List-Unsubscribe-Post: List-Unsubscribe=One-Click`
- `List-Unsubscribe: <{unsubscribe_link}>`
## User Preferences and Permissions
### Email Verification Check
```python
validated_email = get_db().get_user_email_verification(user_id)
```
### Notification Preferences
```python
preferences = get_db().get_user_notification_preference(user_id).preferences
# Returns dict[NotificationType, bool]
```
### Preference Fields in User Model
- `notifyOnAgentRun`
- `notifyOnZeroBalance`
- `notifyOnLowBalance`
- `notifyOnBlockExecutionFailed`
- `notifyOnContinuousAgentError`
- `notifyOnDailySummary`
- `notifyOnWeeklySummary`
- `notifyOnMonthlySummary`
### Unsubscribe Link Generation
```python
def generate_unsubscribe_link(user_id: str) -> str:
# HMAC-SHA256 signed token
# Format: base64(user_id:signature_hex)
# URL: {platform_base_url}/api/email/unsubscribe?token={token}
```
## Batching Logic
### Batch Delays (get_batch_delay)
**Note**: The delay configuration exists for multiple notification types, but only notifications with `QueueType.BATCH` strategy actually use batching. Others use different strategies:
- `AGENT_RUN`: 1 day (Strategy: BATCH - actually uses batching)
- `ZERO_BALANCE`: 60 minutes configured (Strategy: BACKOFF - not batched)
- `LOW_BALANCE`: 60 minutes configured (Strategy: IMMEDIATE - sent immediately)
- `BLOCK_EXECUTION_FAILED`: 60 minutes configured (Strategy: BACKOFF - not batched)
- `CONTINUOUS_AGENT_ERROR`: 60 minutes configured (Strategy: BACKOFF - not batched)
### Batch Processing
1. Messages added to UserNotificationBatch
2. Oldest message timestamp tracked
3. When `oldest_timestamp + delay < now()`:
- Batch is processed
- All messages sent in single email
- Batch cleared
## Service Lifecycle
### Startup
1. Initialize FastAPI app with exposed endpoints
2. Start HTTP server on port 8007
3. Initialize RabbitMQ connection
4. Create/verify exchanges and queues
5. Set up queue consumers
6. Start processing loop
### Main Loop
```python
while self.running:
await self._run_queue(immediate_queue, self._process_immediate, ...)
await self._run_queue(admin_queue, self._process_admin_message, ...)
await self._run_queue(batch_queue, self._process_batch, ...)
await self._run_queue(summary_queue, self._process_summary, ...)
await asyncio.sleep(0.1)
```
### Shutdown
1. Set `running = False`
2. Disconnect RabbitMQ
3. Cleanup resources
## Configuration
### Environment Variables
```python
# Service Configuration
notification_service_port: int = 8007
# Email Configuration
postmark_sender_email: str = "invalid@invalid.com"
refund_notification_email: str = "refund@agpt.co"
# Security
unsubscribe_secret_key: str = ""
# Secrets
postmark_server_api_token: str = ""
postmark_webhook_token: str = ""
discord_bot_token: str = ""
# Platform URLs
platform_base_url: str
frontend_base_url: str
```
## Error Handling
### Message Processing Errors
- Failed messages sent to dead letter queue
- Validation errors logged but don't crash service
- Connection errors trigger retry with `@continuous_retry()`
### RabbitMQ ACK/NACK Protocol
- Success: `message.ack()`
- Failure: `message.reject(requeue=False)`
- Timeout/Queue empty: Continue loop
### HTTP Endpoint Errors
- Wrapped in RemoteCallError for client
- Automatic retry available via client configuration
- Connection failures tracked and logged
## System Integrations
### DatabaseManagerClient
- User email retrieval
- Email verification status
- Notification preferences
- Batch management
- Active user queries
### Discord Integration
- Uses SendDiscordMessageBlock
- Configured via discord_bot_token
- For system alerts only
## Implementation Checklist
1. **Core Service**
- [ ] AppService implementation with @expose decorators
- [ ] FastAPI endpoint generation
- [ ] RabbitMQ connection management
- [ ] Queue consumer setup
- [ ] Message routing logic
2. **Service Client**
- [ ] NotificationManagerClient implementation
- [ ] HTTP client configuration
- [ ] Method mapping to service endpoints
- [ ] Async-to-sync conversions
3. **Message Processing**
- [ ] Parse and validate all notification types
- [ ] Implement all queue strategies
- [ ] Batch management with delays
- [ ] Summary data gathering
4. **Email Delivery**
- [ ] Postmark integration
- [ ] Template loading and rendering
- [ ] Unsubscribe header support
- [ ] HTML email composition
5. **User Management**
- [ ] Preference checking
- [ ] Email verification
- [ ] Unsubscribe link generation
- [ ] Daily limit tracking
6. **Batching System**
- [ ] Database batch operations
- [ ] Age-out checking
- [ ] Batch clearing after send
- [ ] Oldest message tracking
7. **Error Handling**
- [ ] Dead letter queue routing
- [ ] Message rejection on failure
- [ ] Continuous retry wrapper
- [ ] Validation error logging
8. **Scheduled Operations**
- [ ] Weekly summary generation
- [ ] Batch processing triggers
- [ ] Background executor usage
## Security Considerations
1. **Service-to-Service Communication**:
- HTTP endpoints only accessible internally
- No authentication on service endpoints (internal network only)
- Service discovery via host/port configuration
2. **User Security**:
- Email verification required for all user notifications
- Unsubscribe tokens HMAC-signed
- User preferences enforced
3. **Admin Notifications**:
- Separate queue, no user preference checks
- Fixed admin email configuration
## Testing Considerations
1. **Unit Tests**
- Message parsing and validation
- Routing key generation
- Batch delay calculations
- Template rendering
2. **Integration Tests**
- HTTP endpoint accessibility
- Service client method calls
- RabbitMQ message flow
- Database batch operations
- Email sending (mock Postmark)
3. **Load Tests**
- High volume message processing
- Concurrent HTTP requests
- Batch accumulation limits
- Memory usage under load
## Implementation Status Notes
1. **Backoff Strategy**: While `QueueType.BACKOFF` is defined and used by several notification types (ZERO_BALANCE, BLOCK_EXECUTION_FAILED, CONTINUOUS_AGENT_ERROR), the actual exponential backoff processing logic is not implemented. These messages are routed to immediate queue.
2. **Summary Data**: The `_gather_summary_data()` method currently returns hardcoded placeholder values rather than querying actual execution data from the database.
3. **Batch Processing**: Only `AGENT_RUN` notifications actually use batch processing. Other notification types with configured delays use different strategies (IMMEDIATE or BACKOFF).
## Future Enhancements
1. **Additional Channels**
- SMS notifications (not implemented)
- Webhook notifications (not implemented)
- In-app notifications
2. **Advanced Batching**
- Dynamic batch sizes
- Priority-based processing
- Custom delay configurations
3. **Analytics**
- Delivery tracking
- Open/click rates
- Notification effectiveness metrics
4. **Service Improvements**
- Authentication for HTTP endpoints
- Rate limiting per user
- Circuit breaker patterns
- Implement actual backoff processing for BACKOFF strategy
- Implement real summary data gathering

View File

@@ -0,0 +1,474 @@
# AutoGPT Platform Scheduler Technical Specification
## Executive Summary
This document provides a comprehensive technical specification for the AutoGPT Platform Scheduler service. The scheduler is responsible for managing scheduled graph executions, system monitoring tasks, and periodic maintenance operations. This specification is designed to enable a complete reimplementation that maintains 100% compatibility with the existing system.
## Table of Contents
1. [System Architecture](#system-architecture)
2. [Service Implementation](#service-implementation)
3. [Data Models](#data-models)
4. [API Endpoints](#api-endpoints)
5. [Database Schema](#database-schema)
6. [External Dependencies](#external-dependencies)
7. [Authentication & Authorization](#authentication--authorization)
8. [Process Management](#process-management)
9. [Error Handling](#error-handling)
10. [Configuration](#configuration)
11. [Testing Strategy](#testing-strategy)
## System Architecture
### Overview
The scheduler operates as an independent microservice within the AutoGPT platform, implementing the `AppService` base class pattern. It runs on a dedicated port (default: 8003) and exposes HTTP/JSON-RPC endpoints for communication with other services.
### Core Components
1. **Scheduler Service** (`backend/executor/scheduler.py:156`)
- Extends `AppService` base class
- Manages APScheduler instance with multiple jobstores
- Handles lifecycle management and graceful shutdown
2. **Scheduler Client** (`backend/executor/scheduler.py:354`)
- Extends `AppServiceClient` base class
- Provides async/sync method wrappers for RPC calls
- Implements automatic retry and connection pooling
3. **Entry Points**
- Main executable: `backend/scheduler.py`
- Service launcher: `backend/app.py`
## Service Implementation
### Base Service Pattern
```python
class Scheduler(AppService):
scheduler: BlockingScheduler
def __init__(self, register_system_tasks: bool = True):
self.register_system_tasks = register_system_tasks
@classmethod
def get_port(cls) -> int:
return config.execution_scheduler_port # Default: 8003
@classmethod
def db_pool_size(cls) -> int:
return config.scheduler_db_pool_size # Default: 3
def run_service(self):
# Initialize scheduler with jobstores
# Register system tasks if enabled
# Start scheduler blocking loop
def cleanup(self):
# Graceful shutdown of scheduler
# Wait=False for immediate termination
```
### Jobstore Configuration
The scheduler uses three distinct jobstores:
1. **EXECUTION** (`Jobstores.EXECUTION.value`)
- Type: SQLAlchemyJobStore
- Table: `apscheduler_jobs`
- Purpose: Graph execution schedules
- Persistence: Required
2. **BATCHED_NOTIFICATIONS** (`Jobstores.BATCHED_NOTIFICATIONS.value`)
- Type: SQLAlchemyJobStore
- Table: `apscheduler_jobs_batched_notifications`
- Purpose: Batched notification processing
- Persistence: Required
3. **WEEKLY_NOTIFICATIONS** (`Jobstores.WEEKLY_NOTIFICATIONS.value`)
- Type: MemoryJobStore
- Purpose: Weekly summary notifications
- Persistence: Not required
### System Tasks
When `register_system_tasks=True`, the following monitoring tasks are registered:
1. **Weekly Summary Processing**
- Job ID: `process_weekly_summary`
- Schedule: `0 * * * *` (hourly)
- Function: `monitoring.process_weekly_summary`
- Jobstore: WEEKLY_NOTIFICATIONS
2. **Late Execution Monitoring**
- Job ID: `report_late_executions`
- Schedule: Interval (config.execution_late_notification_threshold_secs)
- Function: `monitoring.report_late_executions`
- Jobstore: EXECUTION
3. **Block Error Rate Monitoring**
- Job ID: `report_block_error_rates`
- Schedule: Interval (config.block_error_rate_check_interval_secs)
- Function: `monitoring.report_block_error_rates`
- Jobstore: EXECUTION
4. **Cloud Storage Cleanup**
- Job ID: `cleanup_expired_files`
- Schedule: Interval (config.cloud_storage_cleanup_interval_hours * 3600)
- Function: `cleanup_expired_files`
- Jobstore: EXECUTION
## Data Models
### GraphExecutionJobArgs
```python
class GraphExecutionJobArgs(BaseModel):
user_id: str
graph_id: str
graph_version: int
cron: str
input_data: BlockInput
input_credentials: dict[str, CredentialsMetaInput] = Field(default_factory=dict)
```
### GraphExecutionJobInfo
```python
class GraphExecutionJobInfo(GraphExecutionJobArgs):
id: str
name: str
next_run_time: str
@staticmethod
def from_db(job_args: GraphExecutionJobArgs, job_obj: JobObj) -> "GraphExecutionJobInfo":
return GraphExecutionJobInfo(
id=job_obj.id,
name=job_obj.name,
next_run_time=job_obj.next_run_time.isoformat(),
**job_args.model_dump(),
)
```
### NotificationJobArgs
```python
class NotificationJobArgs(BaseModel):
notification_types: list[NotificationType]
cron: str
```
### CredentialsMetaInput
```python
class CredentialsMetaInput(BaseModel, Generic[CP, CT]):
id: str
title: Optional[str] = None
provider: CP
type: CT
```
## API Endpoints
All endpoints are exposed via the `@expose` decorator and follow HTTP POST JSON-RPC pattern.
### 1. Add Graph Execution Schedule
**Endpoint**: `/add_graph_execution_schedule`
**Request Body**:
```json
{
"user_id": "string",
"graph_id": "string",
"graph_version": "integer",
"cron": "string (crontab format)",
"input_data": {},
"input_credentials": {},
"name": "string (optional)"
}
```
**Response**: `GraphExecutionJobInfo`
**Behavior**:
- Creates APScheduler job with CronTrigger
- Uses job kwargs to store GraphExecutionJobArgs
- Sets `replace_existing=True` to allow updates
- Returns job info with generated ID and next run time
### 2. Delete Graph Execution Schedule
**Endpoint**: `/delete_graph_execution_schedule`
**Request Body**:
```json
{
"schedule_id": "string",
"user_id": "string"
}
```
**Response**: `GraphExecutionJobInfo`
**Behavior**:
- Validates schedule exists in EXECUTION jobstore
- Verifies user_id matches job's user_id
- Removes job from scheduler
- Returns deleted job info
**Errors**:
- `NotFoundError`: If job doesn't exist
- `NotAuthorizedError`: If user_id doesn't match
### 3. Get Graph Execution Schedules
**Endpoint**: `/get_graph_execution_schedules`
**Request Body**:
```json
{
"graph_id": "string (optional)",
"user_id": "string (optional)"
}
```
**Response**: `list[GraphExecutionJobInfo]`
**Behavior**:
- Retrieves all jobs from EXECUTION jobstore
- Filters by graph_id and/or user_id if provided
- Validates job kwargs as GraphExecutionJobArgs
- Skips invalid jobs (ValidationError)
- Only returns jobs with next_run_time set
### 4. System Task Endpoints
- `/execute_process_existing_batches` - Trigger batch processing
- `/execute_process_weekly_summary` - Trigger weekly summary
- `/execute_report_late_executions` - Trigger late execution report
- `/execute_report_block_error_rates` - Trigger error rate report
- `/execute_cleanup_expired_files` - Trigger file cleanup
### 5. Health Check
**Endpoints**: `/health_check`, `/health_check_async`
**Methods**: POST, GET
**Response**: "OK"
## Database Schema
### APScheduler Tables
The scheduler relies on APScheduler's SQLAlchemy jobstore schema:
1. **apscheduler_jobs**
- id: VARCHAR (PRIMARY KEY)
- next_run_time: FLOAT
- job_state: BLOB/BYTEA (pickled job data)
2. **apscheduler_jobs_batched_notifications**
- Same schema as above
- Separate table for notification jobs
### Database Configuration
- URL extraction from `DIRECT_URL` environment variable
- Schema extraction from URL query parameter
- Connection pooling: `pool_size=db_pool_size()`, `max_overflow=0`
- Metadata schema binding for multi-schema support
## External Dependencies
### Required Services
1. **PostgreSQL Database**
- Connection via `DIRECT_URL` environment variable
- Schema support via URL parameter
- APScheduler job persistence
2. **ExecutionManager** (via execution_utils)
- Function: `add_graph_execution`
- Called by: `execute_graph` job function
- Purpose: Create graph execution entries
3. **NotificationManager** (via monitoring module)
- Functions: `process_existing_batches`, `queue_weekly_summary`
- Purpose: Notification processing
4. **Cloud Storage** (via util.cloud_storage)
- Function: `cleanup_expired_files_async`
- Purpose: File expiration management
### Python Dependencies
```
apscheduler>=3.10.0
sqlalchemy
pydantic>=2.0
httpx
uvicorn
fastapi
python-dotenv
tenacity
```
## Authentication & Authorization
### Service-Level Authentication
- No authentication required between internal services
- Services communicate via trusted internal network
- Host/port configuration via environment variables
### User-Level Authorization
- Authorization check in `delete_graph_execution_schedule`:
- Validates `user_id` matches job's `user_id`
- Raises `NotAuthorizedError` on mismatch
- No authorization for read operations (security consideration)
## Process Management
### Startup Sequence
1. Load environment variables via `dotenv.load_dotenv()`
2. Extract database URL and schema
3. Initialize BlockingScheduler with configured jobstores
4. Register system tasks (if enabled)
5. Add job execution listener
6. Start scheduler (blocking)
### Shutdown Sequence
1. Receive SIGTERM/SIGINT signal
2. Call `cleanup()` method
3. Shutdown scheduler with `wait=False`
4. Terminate process
### Multi-Process Architecture
- Runs as independent process via `AppProcess`
- Started by `run_processes()` in app.py
- Can run in foreground or background mode
- Automatic signal handling for graceful shutdown
## Error Handling
### Job Execution Errors
- Listener on `EVENT_JOB_ERROR` logs failures
- Errors in job functions are caught and logged
- Jobs continue to run on schedule despite failures
### RPC Communication Errors
- Automatic retry via `@conn_retry` decorator
- Configurable retry count and timeout
- Connection pooling with self-healing
### Database Connection Errors
- APScheduler handles reconnection automatically
- Pool exhaustion prevented by `max_overflow=0`
- Connection errors logged but don't crash service
## Configuration
### Environment Variables
- `DIRECT_URL`: PostgreSQL connection string (required)
- `{SERVICE_NAME}_HOST`: Override service host
- Standard logging configuration
### Config Settings (via Config class)
```python
execution_scheduler_port: int = 8003
scheduler_db_pool_size: int = 3
execution_late_notification_threshold_secs: int
block_error_rate_check_interval_secs: int
cloud_storage_cleanup_interval_hours: int
pyro_host: str = "localhost"
pyro_client_comm_timeout: float = 15
pyro_client_comm_retry: int = 3
rpc_client_call_timeout: int = 300
```
## Testing Strategy
### Unit Tests
1. Mock APScheduler for job management tests
2. Mock database connections
3. Test each RPC endpoint independently
4. Verify job serialization/deserialization
### Integration Tests
1. Test with real PostgreSQL instance
2. Verify job persistence across restarts
3. Test concurrent job execution
4. Validate cron expression parsing
### Critical Test Cases
1. **Job Persistence**: Jobs survive scheduler restart
2. **User Isolation**: Users can only delete their own jobs
3. **Concurrent Access**: Multiple clients can add/remove jobs
4. **Error Recovery**: Service recovers from database outages
5. **Resource Cleanup**: No memory/connection leaks
## Implementation Notes
### Key Design Decisions
1. **BlockingScheduler vs AsyncIOScheduler**: Uses BlockingScheduler for simplicity and compatibility with multiprocessing architecture
2. **Job Storage**: All job arguments stored in kwargs, not in job name/id
3. **Separate Jobstores**: Isolation between execution and notification jobs
4. **No Authentication**: Relies on network isolation for security
### Migration Considerations
1. APScheduler job format must be preserved exactly
2. Database schema cannot change without migration
3. RPC protocol must maintain compatibility
4. Environment variables must match existing deployment
### Performance Considerations
1. Database pool size limited to prevent exhaustion
2. No job result storage (fire-and-forget pattern)
3. Minimal logging in hot paths
4. Connection reuse via pooling
## Appendix: Critical Implementation Details
### Event Loop Management
```python
@thread_cached
def get_event_loop():
return asyncio.new_event_loop()
def execute_graph(**kwargs):
get_event_loop().run_until_complete(_execute_graph(**kwargs))
```
### Job Function Execution Context
- Jobs run in scheduler's process space
- Each job gets fresh event loop
- No shared state between job executions
- Exceptions logged but don't affect scheduler
### Cron Expression Format
- Uses standard crontab format via `CronTrigger.from_crontab()`
- Supports: minute hour day month day_of_week
- Special strings: @yearly, @monthly, @weekly, @daily, @hourly
This specification provides all necessary details to reimplement the scheduler service while maintaining 100% compatibility with the existing system. Any deviation from these specifications may result in system incompatibility.

View File

@@ -0,0 +1,85 @@
name: CI
on:
push:
branches: [ main, master ]
pull_request:
branches: [ main, master ]
env:
CARGO_TERM_COLOR: always
RUSTFLAGS: "-D warnings"
jobs:
test:
name: Test
runs-on: ubuntu-latest
services:
redis:
image: redis:7
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- name: Run tests
run: cargo test
env:
REDIS_URL: redis://localhost:6379
clippy:
name: Clippy
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@stable
with:
components: clippy
- uses: Swatinem/rust-cache@v2
- name: Run clippy
run: |
cargo clippy -- \
-D warnings \
-D clippy::unwrap_used \
-D clippy::panic \
-D clippy::unimplemented \
-D clippy::todo
fmt:
name: Format
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@stable
with:
components: rustfmt
- name: Check formatting
run: cargo fmt -- --check
bench:
name: Benchmarks
runs-on: ubuntu-latest
services:
redis:
image: redis:7
options: >-
--health-cmd "redis-cli ping"
--health-interval 10s
--health-timeout 5s
--health-retries 5
ports:
- 6379:6379
steps:
- uses: actions/checkout@v3
- uses: dtolnay/rust-toolchain@stable
- uses: Swatinem/rust-cache@v2
- name: Build benchmarks
run: cargo bench --no-run
env:
REDIS_URL: redis://localhost:6379

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,60 @@
[package]
name = "websocket"
authors = ["AutoGPT Team"]
description = "WebSocket server for AutoGPT Platform"
version = "0.1.0"
edition = "2021"
[lib]
name = "websocket"
path = "src/lib.rs"
[[bin]]
name = "websocket"
path = "src/main.rs"
[dependencies]
axum = { version = "0.7.5", features = ["ws"] }
jsonwebtoken = "9.3.0"
redis = { version = "0.25.4", features = ["aio", "tokio-comp"] }
serde = { version = "1.0.204", features = ["derive"] }
serde_json = "1.0.120"
tokio = { version = "1.38.1", features = ["rt-multi-thread", "macros", "net", "sync", "time", "io-util"] }
tower-http = { version = "0.5.2", features = ["cors"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
futures = "0.3"
dotenvy = "0.15"
clap = { version = "4.5.4", features = ["derive"] }
toml = "0.8"
[dev-dependencies]
# Load testing and profiling
tokio-console = "0.1"
criterion = { version = "0.5", features = ["async_tokio"] }
pprof = { version = "0.13", features = ["flamegraph", "criterion"] }
# Dependencies for benchmarks
tokio-tungstenite = "0.24"
futures-util = "0.3"
chrono = "0.4"
[[bench]]
name = "websocket_bench"
harness = false
[[example]]
name = "ws_client_example"
required-features = []
[profile.release]
opt-level = 3 # Maximum optimization
lto = true # Enable link-time optimization
codegen-units = 1 # Reduce parallel code generation units to increase optimization
panic = "abort" # Remove panic unwinding to reduce binary size
strip = true # Strip symbols from binary
[profile.bench]
opt-level = 3 # Maximum optimization
lto = true # Enable link-time optimization
codegen-units = 1 # Reduce parallel code generation units to increase optimization
debug = true # Keep debug symbols for profiling

View File

@@ -0,0 +1,412 @@
# WebSocket API Technical Specification
## Overview
This document provides a complete technical specification for the AutoGPT Platform WebSocket API (`ws_api.py`). The WebSocket API provides real-time updates for graph and node execution events, enabling clients to monitor workflow execution progress.
## Architecture Overview
### Core Components
1. **WebSocket Server** (`ws_api.py`)
- FastAPI application with WebSocket endpoint
- Handles client connections and message routing
- Authenticates clients via JWT tokens
- Manages subscriptions to execution events
2. **Connection Manager** (`conn_manager.py`)
- Maintains active WebSocket connections
- Manages channel subscriptions
- Routes execution events to subscribed clients
- Handles connection lifecycle
3. **Event Broadcasting System**
- Redis Pub/Sub based event bus
- Asynchronous event broadcaster
- Execution event propagation from backend services
## API Endpoint
### WebSocket Endpoint
- **URL**: `/ws`
- **Protocol**: WebSocket (ws:// or wss://)
- **Query Parameters**:
- `token` (required when auth enabled): JWT authentication token
## Authentication
### JWT Token Authentication
- **When Required**: When `settings.config.enable_auth` is `True`
- **Token Location**: Query parameter `?token=<JWT_TOKEN>`
- **Token Validation**:
```python
payload = parse_jwt_token(token)
user_id = payload.get("sub")
```
- **JWT Requirements**:
- Algorithm: Configured via `settings.JWT_ALGORITHM`
- Secret Key: Configured via `settings.JWT_SECRET_KEY`
- Audience: Must be "authenticated"
- Claims: Must contain `sub` (user ID)
### Authentication Failures
- **4001**: Missing authentication token
- **4002**: Invalid token (missing user ID)
- **4003**: Invalid token (parsing error or expired)
### No-Auth Mode
- When `settings.config.enable_auth` is `False`
- Uses `DEFAULT_USER_ID` from `backend.data.user`
## Message Protocol
### Message Format
All messages use JSON format with the following structure:
```typescript
interface WSMessage {
method: WSMethod;
data?: Record<string, any> | any[] | string;
success?: boolean;
channel?: string;
error?: string;
}
```
### Message Methods (WSMethod enum)
1. **Client-to-Server Methods**:
- `SUBSCRIBE_GRAPH_EXEC`: Subscribe to specific graph execution
- `SUBSCRIBE_GRAPH_EXECS`: Subscribe to all executions of a graph
- `UNSUBSCRIBE`: Unsubscribe from a channel
- `HEARTBEAT`: Keep-alive ping
2. **Server-to-Client Methods**:
- `GRAPH_EXECUTION_EVENT`: Graph execution status update
- `NODE_EXECUTION_EVENT`: Node execution status update
- `ERROR`: Error message
- `HEARTBEAT`: Keep-alive pong
## Subscription Models
### Subscribe to Specific Graph Execution
```typescript
interface WSSubscribeGraphExecutionRequest {
graph_exec_id: string;
}
```
**Channel Key Format**: `{user_id}|graph_exec#{graph_exec_id}`
### Subscribe to All Graph Executions
```typescript
interface WSSubscribeGraphExecutionsRequest {
graph_id: string;
}
```
**Channel Key Format**: `{user_id}|graph#{graph_id}|executions`
## Event Models
### Graph Execution Event
```typescript
interface GraphExecutionEvent {
event_type: "graph_execution_update";
id: string; // graph_exec_id
user_id: string;
graph_id: string;
graph_version: number;
preset_id?: string;
status: ExecutionStatus;
started_at: string; // ISO datetime
ended_at: string; // ISO datetime
inputs: Record<string, any>;
outputs: Record<string, any>;
stats?: {
cost: number; // cents
duration: number; // seconds
duration_cpu_only: number;
node_exec_time: number;
node_exec_time_cpu_only: number;
node_exec_count: number;
node_error_count: number;
error?: string;
};
}
```
### Node Execution Event
```typescript
interface NodeExecutionEvent {
event_type: "node_execution_update";
user_id: string;
graph_id: string;
graph_version: number;
graph_exec_id: string;
node_exec_id: string;
node_id: string;
block_id: string;
status: ExecutionStatus;
input_data: Record<string, any>;
output_data: Record<string, any>;
add_time: string; // ISO datetime
queue_time?: string; // ISO datetime
start_time?: string; // ISO datetime
end_time?: string; // ISO datetime
}
```
### Execution Status Enum
```typescript
enum ExecutionStatus {
INCOMPLETE = "INCOMPLETE",
QUEUED = "QUEUED",
RUNNING = "RUNNING",
COMPLETED = "COMPLETED",
FAILED = "FAILED"
}
```
## Message Flow Examples
### 1. Subscribe to Graph Execution
```json
// Client → Server
{
"method": "subscribe_graph_execution",
"data": {
"graph_exec_id": "exec-123"
}
}
// Server → Client (Success)
{
"method": "subscribe_graph_execution",
"success": true,
"channel": "user-456|graph_exec#exec-123"
}
```
### 2. Receive Execution Updates
```json
// Server → Client (Graph Update)
{
"method": "graph_execution_event",
"channel": "user-456|graph_exec#exec-123",
"data": {
"event_type": "graph_execution_update",
"id": "exec-123",
"user_id": "user-456",
"graph_id": "graph-789",
"status": "RUNNING",
// ... other fields
}
}
// Server → Client (Node Update)
{
"method": "node_execution_event",
"channel": "user-456|graph_exec#exec-123",
"data": {
"event_type": "node_execution_update",
"node_exec_id": "node-exec-111",
"status": "COMPLETED",
// ... other fields
}
}
```
### 3. Heartbeat
```json
// Client → Server
{
"method": "heartbeat",
"data": "ping"
}
// Server → Client
{
"method": "heartbeat",
"data": "pong",
"success": true
}
```
### 4. Error Handling
```json
// Server → Client (Invalid Message)
{
"method": "error",
"success": false,
"error": "Invalid message format. Review the schema and retry"
}
```
## Event Broadcasting Architecture
### Redis Pub/Sub Integration
1. **Event Bus Name**: Configured via `config.execution_event_bus_name`
2. **Channel Pattern**: `{event_bus_name}/{channel_key}`
3. **Event Flow**:
- Execution services publish events to Redis
- Event broadcaster listens to Redis pattern `*`
- Events are routed to WebSocket connections based on subscriptions
### Event Broadcaster
- Runs as continuous async task using `@continuous_retry()` decorator
- Listens to all execution events via `AsyncRedisExecutionEventBus`
- Calls `ConnectionManager.send_execution_update()` for each event
## Connection Lifecycle
### Connection Establishment
1. Client connects to `/ws` endpoint
2. Authentication performed (JWT validation)
3. WebSocket accepted via `manager.connect_socket()`
4. Connection added to active connections set
### Message Processing Loop
1. Receive text message from client
2. Parse and validate as `WSMessage`
3. Route to appropriate handler based on `method`
4. Send response or error back to client
### Connection Termination
1. `WebSocketDisconnect` exception caught
2. `manager.disconnect_socket()` called
3. Connection removed from active connections
4. All subscriptions for that connection removed
## Error Handling
### Validation Errors
- **Invalid Message Format**: Returns error with method "error"
- **Invalid Message Data**: Returns error with specific validation message
- **Unknown Message Type**: Returns error indicating unsupported method
### Connection Errors
- WebSocket disconnections handled gracefully
- Failed event parsing logged but doesn't crash connection
- Handler exceptions logged and connection continues
## Configuration
### Environment Variables
```python
# WebSocket Server Configuration
websocket_server_host: str = "0.0.0.0"
websocket_server_port: int = 8001
# Authentication
enable_auth: bool = True
# CORS
backend_cors_allow_origins: List[str] = []
# Redis Event Bus
execution_event_bus_name: str = "autogpt:execution_event_bus"
# Message Size Limits
max_message_size_limit: int = 512000 # 512KB
```
### Security Headers
- CORS middleware applied with configured origins
- Credentials allowed for authenticated requests
- All methods and headers allowed (configurable)
## Deployment Requirements
### Dependencies
1. **FastAPI**: Web framework with WebSocket support
2. **Redis**: For pub/sub event broadcasting
3. **JWT Libraries**: For token validation
4. **Prisma**: Database ORM (for future graph access validation)
### Process Management
- Implements `AppProcess` interface for service lifecycle
- Runs via `uvicorn` ASGI server
- Graceful shutdown handling in `cleanup()` method
### Concurrent Connections
- No hard limit on WebSocket connections
- Memory usage scales with active connections
- Each connection maintains subscription set
## Implementation Checklist
To implement a compatible WebSocket API:
1. **Authentication**
- [ ] JWT token validation from query parameters
- [ ] Support for no-auth mode with default user ID
- [ ] Proper error codes for auth failures
2. **Message Handling**
- [ ] Parse and validate WSMessage format
- [ ] Implement all client-to-server methods
- [ ] Support all server-to-client event types
- [ ] Proper error responses for invalid messages
3. **Subscription Management**
- [ ] Channel key generation matching exact format
- [ ] Support for both execution and graph-level subscriptions
- [ ] Unsubscribe functionality
- [ ] Clean up subscriptions on disconnect
4. **Event Broadcasting**
- [ ] Listen to Redis pub/sub for execution events
- [ ] Route events to correct subscribed connections
- [ ] Handle both graph and node execution events
- [ ] Maintain event order and completeness
5. **Connection Management**
- [ ] Track active WebSocket connections
- [ ] Handle graceful disconnections
- [ ] Implement heartbeat/keepalive
- [ ] Memory-efficient subscription storage
6. **Configuration**
- [ ] Support all environment variables
- [ ] CORS configuration for allowed origins
- [ ] Configurable host/port binding
- [ ] Redis connection configuration
7. **Error Handling**
- [ ] Graceful handling of malformed messages
- [ ] Logging of errors without dropping connections
- [ ] Specific error messages for debugging
- [ ] Recovery from Redis connection issues
## Testing Considerations
1. **Unit Tests**
- Message parsing and validation
- Channel key generation
- Subscription management logic
2. **Integration Tests**
- Full WebSocket connection flow
- Event broadcasting from Redis
- Multi-client subscription scenarios
- Authentication success/failure cases
3. **Load Tests**
- Many concurrent connections
- High-frequency event broadcasting
- Memory usage under load
- Connection/disconnection cycles
## Security Considerations
1. **Authentication**: JWT tokens transmitted via query parameters (consider upgrading to headers)
2. **Authorization**: Currently no graph-level access validation (commented out in code)
3. **Rate Limiting**: No rate limiting implemented
4. **Message Size**: Limited by `max_message_size_limit` configuration
5. **Input Validation**: All inputs validated via Pydantic models
## Future Enhancements (Currently Commented Out)
1. **Graph Access Validation**: Verify user has read access to subscribed graphs
2. **Message Compression**: For large execution payloads
3. **Batch Updates**: Aggregate multiple events in single message
4. **Selective Field Subscription**: Subscribe to specific fields only

View File

@@ -0,0 +1,93 @@
# WebSocket Server Benchmarks
This directory contains performance benchmarks for the AutoGPT WebSocket server.
## Prerequisites
1. Redis must be running locally or set `REDIS_URL` environment variable:
```bash
docker run -d -p 6379:6379 redis:latest
```
2. Build the project in release mode:
```bash
cargo build --release
```
## Running Benchmarks
Run all benchmarks:
```bash
cargo bench
```
Run specific benchmark group:
```bash
cargo bench connection_establishment
cargo bench subscriptions
cargo bench message_throughput
cargo bench concurrent_connections
cargo bench message_parsing
cargo bench redis_event_processing
```
## Benchmark Categories
### Connection Establishment
Tests the performance of establishing WebSocket connections with different authentication scenarios:
- No authentication
- Valid JWT authentication
- Invalid JWT authentication (connection rejection)
### Subscriptions
Measures the performance of subscription operations:
- Subscribing to graph execution events
- Unsubscribing from channels
### Message Throughput
Tests how many messages the server can process per second with varying message counts (10, 100, 1000).
### Concurrent Connections
Benchmarks the server's ability to handle multiple simultaneous connections (10, 50, 100, 500 clients).
### Message Parsing
Tests JSON parsing performance with different message sizes (100B to 100KB).
### Redis Event Processing
Benchmarks the parsing of execution events received from Redis.
## Profiling
To generate flamegraphs for CPU profiling:
1. Install flamegraph tools:
```bash
cargo install flamegraph
```
2. Run benchmarks with profiling:
```bash
cargo bench --bench websocket_bench -- --profile-time=10
```
## Interpreting Results
- **Throughput**: Higher is better (operations/second or elements/second)
- **Time**: Lower is better (nanoseconds per operation)
- **Error margins**: Look for stable results with low standard deviation
## Optimizing Performance
Based on benchmark results, consider:
1. **Connection pooling** for Redis connections
2. **Message batching** for high-throughput scenarios
3. **Async task tuning** for concurrent connection handling
4. **JSON parsing optimization** using simd-json or other fast parsers
5. **Memory allocation** optimization using arena allocators
## Notes
- Benchmarks create actual WebSocket servers on random ports
- Each benchmark iteration properly cleans up resources
- Results may vary based on system resources and Redis performance

View File

@@ -0,0 +1,406 @@
#![allow(clippy::unwrap_used)] // Benchmarks can panic on setup errors
use axum::{routing::get, Router};
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::runtime::Runtime;
use tokio_tungstenite::{connect_async, tungstenite::Message};
// Import the actual websocket server components
use websocket::{models, ws_handler, AppState, Config, ConnectionManager, Stats};
// Helper to create a test server
async fn create_test_server(enable_auth: bool) -> (String, tokio::task::JoinHandle<()>) {
// Set environment variables for test config
std::env::set_var("WEBSOCKET_SERVER_HOST", "127.0.0.1");
std::env::set_var("WEBSOCKET_SERVER_PORT", "0");
std::env::set_var("ENABLE_AUTH", enable_auth.to_string());
std::env::set_var("SUPABASE_JWT_SECRET", "test_secret");
std::env::set_var("DEFAULT_USER_ID", "test_user");
if std::env::var("REDIS_URL").is_err() {
std::env::set_var("REDIS_URL", "redis://localhost:6379");
}
let mut config = Config::load(None);
config.port = 0; // Force OS to assign port
let redis_client =
redis::Client::open(config.redis_url.clone()).expect("Failed to connect to Redis");
let stats = Arc::new(Stats::default());
let mgr = Arc::new(ConnectionManager::new(
redis_client,
config.execution_event_bus_name.clone(),
stats.clone(),
));
// Start broadcaster
let mgr_clone = mgr.clone();
tokio::spawn(async move {
mgr_clone.run_broadcaster().await;
});
let state = AppState {
mgr,
config: Arc::new(config),
stats,
};
let app = Router::new()
.route("/ws", get(ws_handler))
.layer(axum::Extension(state));
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let server_url = format!("ws://{addr}");
let server_handle = tokio::spawn(async move {
axum::serve(listener, app.into_make_service())
.await
.unwrap();
});
// Give server time to start
tokio::time::sleep(Duration::from_millis(100)).await;
(server_url, server_handle)
}
// Helper to create a valid JWT token
fn create_jwt_token(user_id: &str) -> String {
use jsonwebtoken::{encode, Algorithm, EncodingKey, Header};
use serde::Serialize;
#[derive(Serialize)]
struct Claims {
sub: String,
aud: Vec<String>,
exp: usize,
}
let claims = Claims {
sub: user_id.to_string(),
aud: vec!["authenticated".to_string()],
exp: (chrono::Utc::now() + chrono::Duration::hours(1)).timestamp() as usize,
};
encode(
&Header::new(Algorithm::HS256),
&claims,
&EncodingKey::from_secret(b"test_secret"),
)
.unwrap()
}
// Benchmark connection establishment
fn benchmark_connection_establishment(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("connection_establishment");
group.measurement_time(Duration::from_secs(30));
// Test without auth
group.bench_function("no_auth", |b| {
b.to_async(&rt).iter_with_large_drop(|| async {
let (server_url, server_handle) = create_test_server(false).await;
let url = format!("{server_url}/ws");
let (ws_stream, _) = connect_async(&url).await.unwrap();
drop(ws_stream);
server_handle.abort();
});
});
// Test with valid auth
group.bench_function("valid_auth", |b| {
b.to_async(&rt).iter_with_large_drop(|| async {
let (server_url, server_handle) = create_test_server(true).await;
let token = create_jwt_token("test_user");
let url = format!("{server_url}/ws?token={token}");
let (ws_stream, _) = connect_async(&url).await.unwrap();
drop(ws_stream);
server_handle.abort();
});
});
// Test with invalid auth
group.bench_function("invalid_auth", |b| {
b.to_async(&rt).iter_with_large_drop(|| async {
let (server_url, server_handle) = create_test_server(true).await;
let url = format!("{server_url}/ws?token=invalid");
let result = connect_async(&url).await;
assert!(
result.is_err() || {
if let Ok((mut ws_stream, _)) = result {
// Should receive close frame
matches!(ws_stream.next().await, Some(Ok(Message::Close(_))))
} else {
false
}
}
);
server_handle.abort();
});
});
group.finish();
}
// Benchmark subscription operations
fn benchmark_subscriptions(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("subscriptions");
group.measurement_time(Duration::from_secs(20));
group.bench_function("subscribe_graph_execution", |b| {
b.to_async(&rt).iter_with_large_drop(|| async {
let (server_url, server_handle) = create_test_server(false).await;
let url = format!("{server_url}/ws");
let (mut ws_stream, _) = connect_async(&url).await.unwrap();
let msg = json!({
"method": "subscribe_graph_execution",
"data": {
"graph_exec_id": "test_exec_123"
}
});
ws_stream
.send(Message::Text(msg.to_string()))
.await
.unwrap();
// Wait for response
if let Some(Ok(Message::Text(response))) = ws_stream.next().await {
let resp: serde_json::Value = serde_json::from_str(&response).unwrap();
assert_eq!(resp["success"], true);
}
server_handle.abort();
});
});
group.bench_function("unsubscribe", |b| {
b.to_async(&rt).iter_with_large_drop(|| async {
let (server_url, server_handle) = create_test_server(false).await;
let url = format!("{server_url}/ws");
let (mut ws_stream, _) = connect_async(&url).await.unwrap();
// First subscribe
let msg = json!({
"method": "subscribe_graph_execution",
"data": {
"graph_exec_id": "test_exec_123"
}
});
ws_stream
.send(Message::Text(msg.to_string()))
.await
.unwrap();
ws_stream.next().await; // Consume response
let msg = json!({
"method": "unsubscribe",
"data": {
"channel": "test_user|graph_exec#test_exec_123"
}
});
ws_stream
.send(Message::Text(msg.to_string()))
.await
.unwrap();
// Wait for response
if let Some(Ok(Message::Text(response))) = ws_stream.next().await {
let resp: serde_json::Value = serde_json::from_str(&response).unwrap();
assert_eq!(resp["success"], true);
}
server_handle.abort();
});
});
group.finish();
}
// Benchmark message throughput
fn benchmark_message_throughput(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("message_throughput");
group.measurement_time(Duration::from_secs(30));
for msg_count in [10, 100, 1000].iter() {
group.throughput(Throughput::Elements(*msg_count as u64));
group.bench_with_input(
BenchmarkId::from_parameter(msg_count),
msg_count,
|b, &msg_count| {
b.to_async(&rt).iter_with_large_drop(|| async {
let (server_url, server_handle) = create_test_server(false).await;
let url = format!("{server_url}/ws");
let (mut ws_stream, _) = connect_async(&url).await.unwrap();
// Send multiple heartbeat messages
for _ in 0..msg_count {
let msg = json!({
"method": "heartbeat",
"data": "ping"
});
ws_stream
.send(Message::Text(msg.to_string()))
.await
.unwrap();
}
// Receive all responses
for _ in 0..msg_count {
ws_stream.next().await;
}
server_handle.abort();
});
},
);
}
group.finish();
}
// Benchmark concurrent connections
fn benchmark_concurrent_connections(c: &mut Criterion) {
let rt = Runtime::new().unwrap();
let mut group = c.benchmark_group("concurrent_connections");
group.measurement_time(Duration::from_secs(60));
group.sample_size(10);
for num_clients in [100, 500, 1000].iter() {
group.throughput(Throughput::Elements(*num_clients as u64));
group.bench_with_input(
BenchmarkId::from_parameter(num_clients),
num_clients,
|b, &num_clients| {
b.to_async(&rt).iter_with_large_drop(|| async {
let (server_url, server_handle) = create_test_server(false).await;
let url = format!("{server_url}/ws");
// Create multiple concurrent connections
let mut handles = vec![];
for i in 0..num_clients {
let url = url.clone();
let handle = tokio::spawn(async move {
let (mut ws_stream, _) = connect_async(&url).await.unwrap();
// Subscribe to a unique channel
let msg = json!({
"method": "subscribe_graph_execution",
"data": {
"graph_exec_id": format!("exec_{}", i)
}
});
ws_stream
.send(Message::Text(msg.to_string()))
.await
.unwrap();
ws_stream.next().await; // Wait for response
// Send a heartbeat
let msg = json!({
"method": "heartbeat",
"data": "ping"
});
ws_stream
.send(Message::Text(msg.to_string()))
.await
.unwrap();
ws_stream.next().await; // Wait for response
ws_stream
});
handles.push(handle);
}
// Wait for all connections to complete
for handle in handles {
let _ = handle.await;
}
server_handle.abort();
});
},
);
}
group.finish();
}
// Benchmark message parsing
fn benchmark_message_parsing(c: &mut Criterion) {
let mut group = c.benchmark_group("message_parsing");
// Test different message sizes
for msg_size in [100, 1000, 10000].iter() {
group.throughput(Throughput::Bytes(*msg_size as u64));
group.bench_with_input(
BenchmarkId::new("parse_json", msg_size),
msg_size,
|b, &msg_size| {
let data_str = "x".repeat(msg_size);
let json_msg = json!({
"method": "subscribe_graph_execution",
"data": {
"graph_exec_id": data_str
}
});
let json_str = json_msg.to_string();
b.iter(|| {
let _: models::WSMessage = serde_json::from_str(&json_str).unwrap();
});
},
);
}
group.finish();
}
// Benchmark Redis event processing
fn benchmark_redis_event_processing(c: &mut Criterion) {
let mut group = c.benchmark_group("redis_event_processing");
group.bench_function("parse_execution_event", |b| {
let event = json!({
"payload": {
"event_type": "graph_execution_update",
"id": "exec_123",
"graph_id": "graph_456",
"graph_version": 1,
"user_id": "user_789",
"status": "RUNNING",
"started_at": "2024-01-01T00:00:00Z",
"inputs": {"test": "data"},
"outputs": {}
}
});
let event_str = event.to_string();
b.iter(|| {
let _: models::RedisEventWrapper = serde_json::from_str(&event_str).unwrap();
});
});
group.finish();
}
criterion_group!(
benches,
benchmark_connection_establishment,
benchmark_subscriptions,
benchmark_message_throughput,
benchmark_concurrent_connections,
benchmark_message_parsing,
benchmark_redis_event_processing
);
criterion_main!(benches);

View File

@@ -0,0 +1,10 @@
# Clippy configuration for robust error handling
# Set the maximum cognitive complexity allowed
cognitive-complexity-threshold = 30
# Warn on TODO/FIXME comments
allow-dbg-in-tests = false
# Enforce documentation
missing-docs-in-crate-items = true

View File

@@ -0,0 +1,23 @@
# WebSocket API Configuration
# Server settings
host = "0.0.0.0"
port = 8001
# Authentication
enable_auth = true
jwt_secret = "your-super-secret-jwt-token-with-at-least-32-characters-long"
jwt_algorithm = "HS256"
default_user_id = "default"
# Redis configuration
redis_url = "redis://:password@localhost:6379/"
# Event bus
execution_event_bus_name = "execution_event"
# Message size limit (in bytes)
max_message_size_limit = 512000
# CORS allowed origins
backend_cors_allow_origins = ["http://localhost:3000", "https://559f69c159ef.ngrok.app"]

View File

@@ -0,0 +1,75 @@
use futures_util::{SinkExt, StreamExt};
use serde_json::json;
use tokio_tungstenite::{connect_async, tungstenite::Message};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let url = "ws://localhost:8001/ws";
println!("Connecting to {url}");
let (mut ws_stream, _) = connect_async(url).await?;
println!("Connected!");
// Subscribe to a graph execution
let subscribe_msg = json!({
"method": "subscribe_graph_execution",
"data": {
"graph_exec_id": "test_exec_123"
}
});
println!("Sending subscription request...");
ws_stream
.send(Message::Text(subscribe_msg.to_string()))
.await?;
// Wait for response
if let Some(msg) = ws_stream.next().await {
if let Message::Text(text) = msg? {
println!("Received: {text}");
}
}
// Send heartbeat
let heartbeat_msg = json!({
"method": "heartbeat",
"data": "ping"
});
println!("Sending heartbeat...");
ws_stream
.send(Message::Text(heartbeat_msg.to_string()))
.await?;
// Wait for pong
if let Some(msg) = ws_stream.next().await {
if let Message::Text(text) = msg? {
println!("Received: {text}");
}
}
// Unsubscribe
let unsubscribe_msg = json!({
"method": "unsubscribe",
"data": {
"channel": "default|graph_exec#test_exec_123"
}
});
println!("Sending unsubscribe request...");
ws_stream
.send(Message::Text(unsubscribe_msg.to_string()))
.await?;
// Wait for response
if let Some(msg) = ws_stream.next().await {
if let Message::Text(text) = msg? {
println!("Received: {text}");
}
}
println!("Closing connection...");
ws_stream.close(None).await?;
Ok(())
}

View File

@@ -0,0 +1,99 @@
use jsonwebtoken::Algorithm;
use serde::Deserialize;
use std::env;
use std::fs;
use std::path::Path;
use std::str::FromStr;
#[derive(Clone, Debug, Deserialize)]
pub struct Config {
pub host: String,
pub port: u16,
pub enable_auth: bool,
pub jwt_secret: String,
pub jwt_algorithm: Algorithm,
pub execution_event_bus_name: String,
pub redis_url: String,
pub default_user_id: String,
pub max_message_size_limit: usize,
pub backend_cors_allow_origins: Vec<String>,
}
impl Config {
pub fn load(config_path: Option<&Path>) -> Self {
let path = config_path.unwrap_or(Path::new("config.toml"));
let toml_result = fs::read_to_string(path)
.ok()
.and_then(|s| toml::from_str::<Config>(&s).ok());
let mut config = match toml_result {
Some(config) => config,
None => Config {
host: env::var("WEBSOCKET_SERVER_HOST").unwrap_or_else(|_| "0.0.0.0".to_string()),
port: env::var("WEBSOCKET_SERVER_PORT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(8001),
enable_auth: env::var("ENABLE_AUTH")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(true),
jwt_secret: env::var("SUPABASE_JWT_SECRET")
.unwrap_or_else(|_| "dummy_secret_for_no_auth".to_string()),
jwt_algorithm: Algorithm::HS256,
execution_event_bus_name: env::var("EXECUTION_EVENT_BUS_NAME")
.unwrap_or_else(|_| "execution_event".to_string()),
redis_url: env::var("REDIS_URL")
.unwrap_or_else(|_| "redis://localhost/".to_string()),
default_user_id: "default".to_string(),
max_message_size_limit: env::var("MAX_MESSAGE_SIZE_LIMIT")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(512000),
backend_cors_allow_origins: env::var("BACKEND_CORS_ALLOW_ORIGINS")
.unwrap_or_default()
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect(),
},
};
if let Ok(v) = env::var("WEBSOCKET_SERVER_HOST") {
config.host = v;
}
if let Ok(v) = env::var("WEBSOCKET_SERVER_PORT") {
config.port = v.parse().unwrap_or(8001);
}
if let Ok(v) = env::var("ENABLE_AUTH") {
config.enable_auth = v.parse().unwrap_or(true);
}
if let Ok(v) = env::var("SUPABASE_JWT_SECRET") {
config.jwt_secret = v;
}
if let Ok(v) = env::var("JWT_ALGORITHM") {
config.jwt_algorithm = Algorithm::from_str(&v).unwrap_or(Algorithm::HS256);
}
if let Ok(v) = env::var("EXECUTION_EVENT_BUS_NAME") {
config.execution_event_bus_name = v;
}
if let Ok(v) = env::var("REDIS_URL") {
config.redis_url = v;
}
if let Ok(v) = env::var("DEFAULT_USER_ID") {
config.default_user_id = v;
}
if let Ok(v) = env::var("MAX_MESSAGE_SIZE_LIMIT") {
config.max_message_size_limit = v.parse().unwrap_or(512000);
}
if let Ok(v) = env::var("BACKEND_CORS_ALLOW_ORIGINS") {
config.backend_cors_allow_origins = v
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
}
config
}
}

View File

@@ -0,0 +1,277 @@
use futures::StreamExt;
use redis::Client as RedisClient;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, error, info, warn};
use crate::models::{ExecutionEvent, RedisEventWrapper, WSMessage};
use crate::stats::Stats;
pub struct ConnectionManager {
pub subscribers: RwLock<HashMap<String, HashSet<u64>>>,
pub clients: RwLock<HashMap<u64, (String, mpsc::Sender<String>)>>,
pub client_channels: RwLock<HashMap<u64, HashSet<String>>>,
pub next_id: AtomicU64,
pub redis_client: RedisClient,
pub bus_name: String,
pub stats: Arc<Stats>,
}
impl ConnectionManager {
pub fn new(redis_client: RedisClient, bus_name: String, stats: Arc<Stats>) -> Self {
Self {
subscribers: RwLock::new(HashMap::new()),
clients: RwLock::new(HashMap::new()),
client_channels: RwLock::new(HashMap::new()),
next_id: AtomicU64::new(0),
redis_client,
bus_name,
stats,
}
}
pub async fn run_broadcaster(self: Arc<Self>) {
info!("🚀 Starting Redis event broadcaster");
loop {
match self.run_broadcaster_inner().await {
Ok(_) => {
warn!("⚠️ Event broadcaster stopped unexpectedly, restarting in 5 seconds");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
Err(e) => {
error!("❌ Event broadcaster error: {}, restarting in 5 seconds", e);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
}
}
async fn run_broadcaster_inner(
self: &Arc<Self>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut pubsub = self.redis_client.get_async_pubsub().await?;
pubsub.psubscribe("*").await?;
info!(
"📡 Listening to all Redis events, filtering for bus: {}",
self.bus_name
);
let mut pubsub_stream = pubsub.on_message();
loop {
let msg = pubsub_stream.next().await;
match msg {
Some(msg) => {
let channel: String = msg.get_channel_name().to_string();
debug!("📨 Received message on Redis channel: {}", channel);
self.stats
.redis_messages_received
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let payload: String = match msg.get_payload() {
Ok(p) => p,
Err(e) => {
warn!("⚠️ Failed to get payload from Redis message: {}", e);
self.stats
.errors_total
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
continue;
}
};
// Parse the channel format: execution_event/{user_id}/{graph_id}/{graph_exec_id}
let parts: Vec<&str> = channel.split('/').collect();
// Check if this is an execution event channel
if parts.len() != 4 || parts[0] != self.bus_name {
debug!(
"🚫 Ignoring non-execution event channel: {} (parts: {:?}, bus_name: {})",
channel, parts, self.bus_name
);
self.stats
.redis_messages_ignored
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
continue;
}
let user_id = parts[1];
let graph_id = parts[2];
let graph_exec_id = parts[3];
debug!(
"📥 Received event - user: {}, graph: {}, exec: {}",
user_id, graph_id, graph_exec_id
);
// Parse the wrapped event
let wrapped_event = match RedisEventWrapper::parse(&payload) {
Ok(e) => e,
Err(e) => {
warn!("⚠️ Failed to parse event JSON: {}, payload: {}", e, payload);
self.stats
.errors_json_parse
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.stats
.errors_total
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
continue;
}
};
let event = wrapped_event.payload;
debug!("📦 Event received: {:?}", event);
let (method, event_json) = match &event {
ExecutionEvent::GraphExecutionUpdate(graph_event) => {
self.stats
.graph_execution_events
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.stats
.events_received_total
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
(
"graph_execution_event",
match serde_json::to_value(graph_event) {
Ok(v) => v,
Err(e) => {
error!("❌ Failed to serialize graph event: {}", e);
continue;
}
},
)
}
ExecutionEvent::NodeExecutionUpdate(node_event) => {
self.stats
.node_execution_events
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
self.stats
.events_received_total
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
(
"node_execution_event",
match serde_json::to_value(node_event) {
Ok(v) => v,
Err(e) => {
error!("❌ Failed to serialize node event: {}", e);
continue;
}
},
)
}
};
// Create the channel keys in the format expected by WebSocket clients
let mut channels_to_notify = Vec::new();
// For both event types, notify the specific execution channel
let exec_channel = format!("{user_id}|graph_exec#{graph_exec_id}");
channels_to_notify.push(exec_channel.clone());
// For graph execution events, also notify the graph executions channel
if matches!(&event, ExecutionEvent::GraphExecutionUpdate(_)) {
let graph_channel = format!("{user_id}|graph#{graph_id}|executions");
channels_to_notify.push(graph_channel);
}
debug!(
"📢 Broadcasting {} event to channels: {:?}",
method, channels_to_notify
);
let subs = self.subscribers.read().await;
// Log current subscriber state
debug!("📊 Current subscribers count: {}", subs.len());
for channel_key in channels_to_notify {
let ws_msg = WSMessage {
method: method.to_string(),
channel: Some(channel_key.clone()),
data: Some(event_json.clone()),
..Default::default()
};
let json_msg = match serde_json::to_string(&ws_msg) {
Ok(j) => {
debug!("📤 Sending WebSocket message: {}", j);
j
}
Err(e) => {
error!("❌ Failed to serialize WebSocket message: {}", e);
continue;
}
};
if let Some(client_ids) = subs.get(&channel_key) {
let clients = self.clients.read().await;
let client_count = client_ids.len();
debug!(
"📣 Broadcasting to {} clients on channel: {}",
client_count, channel_key
);
for &cid in client_ids {
if let Some((user_id, tx)) = clients.get(&cid) {
match tx.try_send(json_msg.clone()) {
Ok(_) => {
debug!(
"✅ Message sent immediately to client {} (user: {})",
cid, user_id
);
self.stats
.messages_sent_total
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
Err(mpsc::error::TrySendError::Full(_)) => {
// Channel is full, try with a small timeout
let tx_clone = tx.clone();
let msg_clone = json_msg.clone();
let stats_clone = self.stats.clone();
tokio::spawn(async move {
match tokio::time::timeout(
std::time::Duration::from_millis(100),
tx_clone.send(msg_clone),
)
.await {
Ok(Ok(_)) => {
stats_clone
.messages_sent_total
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
_ => {
stats_clone
.messages_failed_total
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
});
warn!("⚠️ Channel full for client {} (user: {}), sending async", cid, user_id);
}
Err(mpsc::error::TrySendError::Closed(_)) => {
warn!(
"⚠️ Channel closed for client {} (user: {})",
cid, user_id
);
self.stats
.messages_failed_total
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
} else {
warn!("⚠️ Client {} not found in clients map", cid);
}
}
} else {
info!("📭 No subscribers for channel: {}", channel_key);
}
}
}
None => {
return Err("❌ Redis pubsub stream ended".into());
}
}
}
}
}

View File

@@ -0,0 +1,442 @@
use axum::extract::ws::{CloseFrame, Message, WebSocket};
use axum::{
extract::{Query, WebSocketUpgrade},
http::HeaderMap,
response::IntoResponse,
Extension,
};
use jsonwebtoken::{decode, DecodingKey, Validation};
use serde_json::{json, Value};
use std::collections::HashMap;
use tokio::sync::mpsc;
use tracing::{debug, error, info, warn};
use crate::connection_manager::ConnectionManager;
use crate::models::{Claims, WSMessage};
use crate::AppState;
// Helper function to safely serialize messages
fn serialize_message(msg: &WSMessage) -> String {
serde_json::to_string(msg).unwrap_or_else(|e| {
error!("❌ Failed to serialize WebSocket message: {}", e);
json!({"method": "error", "success": false, "error": "Internal serialization error"})
.to_string()
})
}
pub async fn ws_handler(
ws: WebSocketUpgrade,
query: Query<HashMap<String, String>>,
_headers: HeaderMap,
Extension(state): Extension<AppState>,
) -> impl IntoResponse {
let token = query.0.get("token").cloned();
let mut user_id = state.config.default_user_id.clone();
let mut auth_error_code: Option<u16> = None;
if state.config.enable_auth {
match token {
Some(token_str) => {
debug!("🔐 Authenticating WebSocket connection");
let mut validation = Validation::new(state.config.jwt_algorithm);
validation.set_audience(&["authenticated"]);
let key = DecodingKey::from_secret(state.config.jwt_secret.as_bytes());
match decode::<Claims>(&token_str, &key, &validation) {
Ok(token_data) => {
user_id = token_data.claims.sub.clone();
debug!("✅ WebSocket authenticated for user: {}", user_id);
}
Err(e) => {
warn!("⚠️ JWT validation failed: {}", e);
auth_error_code = Some(4003);
}
}
}
None => {
warn!("⚠️ Missing authentication token in WebSocket connection");
auth_error_code = Some(4001);
}
}
} else {
debug!("🔓 WebSocket connection without auth (auth disabled)");
}
if let Some(code) = auth_error_code {
error!("❌ WebSocket authentication failed with code: {}", code);
state
.mgr
.stats
.connections_failed_auth
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
state
.mgr
.stats
.connections_total
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
return ws
.on_upgrade(move |mut socket: WebSocket| async move {
let close_frame = Some(CloseFrame {
code,
reason: "Authentication failed".into(),
});
let _ = socket.send(Message::Close(close_frame)).await;
let _ = socket.close().await;
})
.into_response();
}
debug!("✅ WebSocket connection established for user: {}", user_id);
ws.on_upgrade(move |socket| {
handle_socket(
socket,
user_id,
state.mgr.clone(),
state.config.max_message_size_limit,
)
})
}
async fn update_subscription_stats(mgr: &ConnectionManager, channel: &str, add: bool) {
if add {
mgr.stats
.subscriptions_total
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
mgr.stats
.subscriptions_active
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let mut channel_stats = mgr.stats.channels_active.write().await;
let count = channel_stats.entry(channel.to_string()).or_insert(0);
*count += 1;
} else {
mgr.stats
.unsubscriptions_total
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
mgr.stats
.subscriptions_active
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
let mut channel_stats = mgr.stats.channels_active.write().await;
if let Some(count) = channel_stats.get_mut(channel) {
*count = count.saturating_sub(1);
if *count == 0 {
channel_stats.remove(channel);
}
}
}
}
pub async fn handle_socket(
mut socket: WebSocket,
user_id: String,
mgr: std::sync::Arc<ConnectionManager>,
max_size: usize,
) {
let client_id = mgr
.next_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let (tx, mut rx) = mpsc::channel::<String>(10);
info!("👋 New WebSocket client {} for user: {}", client_id, user_id);
// Update connection stats
mgr.stats
.connections_total
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
mgr.stats
.connections_active
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
// Update active users
{
let mut active_users = mgr.stats.active_users.write().await;
let count = active_users.entry(user_id.clone()).or_insert(0);
*count += 1;
}
{
let mut clients = mgr.clients.write().await;
clients.insert(client_id, (user_id.clone(), tx));
}
{
let mut client_channels = mgr.client_channels.write().await;
client_channels.insert(client_id, std::collections::HashSet::new());
}
loop {
tokio::select! {
msg = rx.recv() => {
if let Some(msg) = msg {
if socket.send(Message::Text(msg)).await.is_err() {
break;
}
} else {
break;
}
}
incoming = socket.recv() => {
let msg = match incoming {
Some(Ok(msg)) => msg,
_ => break,
};
match msg {
Message::Text(text) => {
if text.len() > max_size {
warn!("⚠️ Message from client {} exceeds size limit: {} > {}", client_id, text.len(), max_size);
let err_resp = serialize_message(&WSMessage {
method: "error".to_string(),
success: Some(false),
error: Some("Message exceeds size limit".to_string()),
..Default::default()
});
if socket.send(Message::Text(err_resp)).await.is_err() {
break;
}
continue;
}
mgr.stats.messages_received_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let ws_msg: WSMessage = match serde_json::from_str(&text) {
Ok(m) => m,
Err(e) => {
warn!("⚠️ Invalid message format from client {}: {}", client_id, e);
mgr.stats.errors_json_parse.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
mgr.stats.errors_total.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let err_resp = serialize_message(&WSMessage {
method: "error".to_string(),
success: Some(false),
error: Some("Invalid message format. Review the schema and retry".to_string()),
..Default::default()
});
if socket.send(Message::Text(err_resp)).await.is_err() {
break;
}
continue;
}
};
debug!("📥 Received {} message from client {}", ws_msg.method, client_id);
match ws_msg.method.as_str() {
"subscribe_graph_execution" => {
let graph_exec_id = match &ws_msg.data {
Some(Value::Object(map)) => map.get("graph_exec_id").and_then(|v| v.as_str()),
_ => None,
};
let Some(graph_exec_id) = graph_exec_id else {
warn!("⚠️ Missing graph_exec_id in subscribe_graph_execution from client {}", client_id);
let err_resp = json!({"method": "error", "success": false, "error": "Missing graph_exec_id"});
if socket.send(Message::Text(err_resp.to_string())).await.is_err() {
break;
}
continue;
};
let channel = format!("{user_id}|graph_exec#{graph_exec_id}");
debug!("📌 Client {} subscribing to channel: {}", client_id, channel);
{
let mut subs = mgr.subscribers.write().await;
subs.entry(channel.clone()).or_insert(std::collections::HashSet::new()).insert(client_id);
}
{
let mut chs = mgr.client_channels.write().await;
if let Some(set) = chs.get_mut(&client_id) {
set.insert(channel.clone());
}
}
// Update subscription stats
update_subscription_stats(&mgr, &channel, true).await;
let resp = WSMessage {
method: "subscribe_graph_execution".to_string(),
success: Some(true),
channel: Some(channel),
..Default::default()
};
if socket.send(Message::Text(serialize_message(&resp))).await.is_err() {
break;
}
}
"subscribe_graph_executions" => {
let graph_id = match &ws_msg.data {
Some(Value::Object(map)) => map.get("graph_id").and_then(|v| v.as_str()),
_ => None,
};
let Some(graph_id) = graph_id else {
let err_resp = json!({"method": "error", "success": false, "error": "Missing graph_id"});
if socket.send(Message::Text(err_resp.to_string())).await.is_err() {
break;
}
continue;
};
let channel = format!("{user_id}|graph#{graph_id}|executions");
{
let mut subs = mgr.subscribers.write().await;
subs.entry(channel.clone()).or_insert(std::collections::HashSet::new()).insert(client_id);
}
{
let mut chs = mgr.client_channels.write().await;
if let Some(set) = chs.get_mut(&client_id) {
set.insert(channel.clone());
}
}
debug!("📌 Client {} subscribing to channel: {}", client_id, channel);
// Update subscription stats
update_subscription_stats(&mgr, &channel, true).await;
let resp = WSMessage {
method: "subscribe_graph_executions".to_string(),
success: Some(true),
channel: Some(channel),
..Default::default()
};
if socket.send(Message::Text(serialize_message(&resp))).await.is_err() {
break;
}
}
"unsubscribe" => {
let channel = match &ws_msg.data {
Some(Value::String(s)) => Some(s.as_str()),
Some(Value::Object(map)) => map.get("channel").and_then(|v| v.as_str()),
_ => None,
};
let Some(channel) = channel else {
let err_resp = json!({"method": "error", "success": false, "error": "Missing channel"});
if socket.send(Message::Text(err_resp.to_string())).await.is_err() {
break;
}
continue;
};
let channel = channel.to_string();
if !channel.starts_with(&format!("{user_id}|")) {
let err_resp = json!({"method": "error", "success": false, "error": "Unauthorized channel"});
if socket.send(Message::Text(err_resp.to_string())).await.is_err() {
break;
}
continue;
}
{
let mut subs = mgr.subscribers.write().await;
if let Some(set) = subs.get_mut(&channel) {
set.remove(&client_id);
if set.is_empty() {
subs.remove(&channel);
}
}
}
{
let mut chs = mgr.client_channels.write().await;
if let Some(set) = chs.get_mut(&client_id) {
set.remove(&channel);
}
}
// Update subscription stats
update_subscription_stats(&mgr, &channel, false).await;
let resp = WSMessage {
method: "unsubscribe".to_string(),
success: Some(true),
channel: Some(channel),
..Default::default()
};
if socket.send(Message::Text(serialize_message(&resp))).await.is_err() {
break;
}
}
"heartbeat" => {
if ws_msg.data == Some(Value::String("ping".to_string())) {
let resp = WSMessage {
method: "heartbeat".to_string(),
data: Some(Value::String("pong".to_string())),
success: Some(true),
..Default::default()
};
if socket.send(Message::Text(serialize_message(&resp))).await.is_err() {
break;
}
} else {
let err_resp = json!({"method": "error", "success": false, "error": "Invalid heartbeat"});
if socket.send(Message::Text(err_resp.to_string())).await.is_err() {
break;
}
}
}
_ => {
warn!("❓ Unknown method '{}' from client {}", ws_msg.method, client_id);
let err_resp = json!({"method": "error", "success": false, "error": "Unknown method"});
if socket.send(Message::Text(err_resp.to_string())).await.is_err() {
break;
}
}
}
}
Message::Close(_) => break,
Message::Ping(_) => {
if socket.send(Message::Pong(vec![])).await.is_err() {
break;
}
}
Message::Pong(_) => {}
_ => {}
}
}
else => break,
}
}
// Cleanup
debug!("👋 WebSocket client {} disconnected, cleaning up", client_id);
// Update connection stats
mgr.stats
.connections_active
.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
// Update active users
{
let mut active_users = mgr.stats.active_users.write().await;
if let Some(count) = active_users.get_mut(&user_id) {
*count = count.saturating_sub(1);
if *count == 0 {
active_users.remove(&user_id);
}
}
}
let channels = {
let mut client_channels = mgr.client_channels.write().await;
client_channels.remove(&client_id).unwrap_or_default()
};
{
let mut subs = mgr.subscribers.write().await;
for channel in &channels {
if let Some(set) = subs.get_mut(channel) {
set.remove(&client_id);
if set.is_empty() {
subs.remove(channel);
}
}
}
}
// Update subscription stats for all channels the client was subscribed to
for channel in &channels {
update_subscription_stats(&mgr, channel, false).await;
}
{
let mut clients = mgr.clients.write().await;
clients.remove(&client_id);
}
debug!("✨ Cleanup completed for client {}", client_id);
}

View File

@@ -0,0 +1,26 @@
#![deny(warnings)]
#![deny(clippy::unwrap_used)]
#![deny(clippy::panic)]
#![deny(clippy::unimplemented)]
#![deny(clippy::todo)]
pub mod config;
pub mod connection_manager;
pub mod handlers;
pub mod models;
pub mod stats;
pub use config::Config;
pub use connection_manager::ConnectionManager;
pub use handlers::ws_handler;
pub use stats::Stats;
use std::sync::Arc;
#[derive(Clone)]
pub struct AppState {
pub mgr: Arc<ConnectionManager>,
pub config: Arc<Config>,
pub stats: Arc<Stats>,
}

View File

@@ -0,0 +1,172 @@
use axum::{
body::Body,
http::{header, StatusCode},
response::Response,
routing::get,
Router,
};
use clap::Parser;
use std::sync::Arc;
use tokio::net::TcpListener;
use tower_http::cors::{Any, CorsLayer};
use tracing::{debug, error, info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use crate::config::Config;
use crate::connection_manager::ConnectionManager;
use crate::handlers::ws_handler;
async fn stats_handler(
axum::Extension(state): axum::Extension<AppState>,
) -> Result<axum::response::Json<stats::StatsSnapshot>, StatusCode> {
let snapshot = state.stats.snapshot().await;
Ok(axum::response::Json(snapshot))
}
async fn prometheus_handler(
axum::Extension(state): axum::Extension<AppState>,
) -> Result<Response, StatusCode> {
let snapshot = state.stats.snapshot().await;
let prometheus_text = state.stats.to_prometheus_format(&snapshot);
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "text/plain; version=0.0.4")
.body(Body::from(prometheus_text))
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
}
mod config;
mod connection_manager;
mod handlers;
mod models;
mod stats;
#[derive(Parser, Debug)]
#[command(author, version, about)]
struct Cli {
/// Path to a TOML configuration file
#[arg(short = 'c', long = "config", value_name = "FILE")]
config: Option<std::path::PathBuf>,
}
#[derive(Clone)]
pub struct AppState {
mgr: Arc<ConnectionManager>,
config: Arc<Config>,
stats: Arc<stats::Stats>,
}
#[tokio::main]
async fn main() {
// Initialize tracing
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "websocket=info,tower_http=debug".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
info!("🚀 Starting WebSocket API server");
let cli = Cli::parse();
let config = Arc::new(Config::load(cli.config.as_deref()));
info!(
"⚙️ Configuration loaded - host: {}, port: {}, auth: {}",
config.host, config.port, config.enable_auth
);
let redis_client = match redis::Client::open(config.redis_url.clone()) {
Ok(client) => {
debug!("✅ Redis client created successfully");
client
}
Err(e) => {
error!(
"❌ Failed to create Redis client: {}. Please check REDIS_URL environment variable",
e
);
std::process::exit(1);
}
};
let stats = Arc::new(stats::Stats::default());
let mgr = Arc::new(ConnectionManager::new(
redis_client,
config.execution_event_bus_name.clone(),
stats.clone(),
));
let mgr_clone = mgr.clone();
tokio::spawn(async move {
debug!("📡 Starting event broadcaster task");
mgr_clone.run_broadcaster().await;
});
let state = AppState {
mgr,
config: config.clone(),
stats,
};
let app = Router::new()
.route("/ws", get(ws_handler))
.route("/stats", get(stats_handler))
.route("/metrics", get(prometheus_handler))
.layer(axum::Extension(state));
let cors = if config.backend_cors_allow_origins.is_empty() {
// If no specific origins configured, allow any origin but without credentials
CorsLayer::new()
.allow_methods(Any)
.allow_headers(Any)
.allow_origin(Any)
} else {
// If specific origins configured, allow credentials
CorsLayer::new()
.allow_methods([
axum::http::Method::GET,
axum::http::Method::POST,
axum::http::Method::PUT,
axum::http::Method::DELETE,
axum::http::Method::OPTIONS,
])
.allow_headers(vec![
axum::http::header::CONTENT_TYPE,
axum::http::header::AUTHORIZATION,
])
.allow_credentials(true)
.allow_origin(
config
.backend_cors_allow_origins
.iter()
.filter_map(|o| o.parse::<axum::http::HeaderValue>().ok())
.collect::<Vec<_>>(),
)
};
let app = app.layer(cors);
let addr = format!("{}:{}", config.host, config.port);
let listener = match TcpListener::bind(&addr).await {
Ok(listener) => {
info!("🎧 WebSocket server listening on: {}", addr);
listener
}
Err(e) => {
error!(
"❌ Failed to bind to {}: {}. Please check if the port is already in use",
addr, e
);
std::process::exit(1);
}
};
info!("✨ WebSocket API server ready to accept connections");
if let Err(e) = axum::serve(listener, app.into_make_service()).await {
error!("💥 Server error: {}", e);
std::process::exit(1);
}
}

View File

@@ -0,0 +1,103 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Default, Clone, Debug, Serialize, Deserialize)]
pub struct WSMessage {
pub method: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub data: Option<Value>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub success: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub channel: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
}
#[derive(Deserialize)]
pub struct Claims {
pub sub: String,
}
// Event models moved from events.rs
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "event_type")]
pub enum ExecutionEvent {
#[serde(rename = "graph_execution_update")]
GraphExecutionUpdate(GraphExecutionEvent),
#[serde(rename = "node_execution_update")]
NodeExecutionUpdate(NodeExecutionEvent),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphExecutionEvent {
pub id: String,
pub graph_id: String,
pub graph_version: u32,
pub user_id: String,
pub status: ExecutionStatus,
pub started_at: Option<String>,
pub ended_at: Option<String>,
pub preset_id: Option<String>,
pub stats: Option<ExecutionStats>,
// Keep these as JSON since they vary by graph
pub inputs: Value,
pub outputs: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeExecutionEvent {
pub node_exec_id: String,
pub node_id: String,
pub graph_exec_id: String,
pub graph_id: String,
pub graph_version: u32,
pub user_id: String,
pub block_id: String,
pub status: ExecutionStatus,
pub add_time: String,
pub queue_time: Option<String>,
pub start_time: Option<String>,
pub end_time: Option<String>,
// Keep these as JSON since they vary by node type
pub input_data: Value,
pub output_data: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExecutionStats {
pub cost: f64,
pub duration: f64,
pub duration_cpu_only: f64,
pub error: Option<String>,
pub node_error_count: u32,
pub node_exec_count: u32,
pub node_exec_time: f64,
pub node_exec_time_cpu_only: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum ExecutionStatus {
Queued,
Running,
Completed,
Failed,
Incomplete,
Terminated,
}
// Wrapper for the Redis event that includes the payload
#[derive(Debug, Deserialize)]
pub struct RedisEventWrapper {
pub payload: ExecutionEvent,
}
impl RedisEventWrapper {
pub fn parse(json_str: &str) -> Result<Self, serde_json::Error> {
serde_json::from_str(json_str)
}
}

View File

@@ -0,0 +1,238 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use tokio::sync::RwLock;
#[derive(Default)]
pub struct Stats {
// Connection metrics
pub connections_total: AtomicU64,
pub connections_active: AtomicU64,
pub connections_failed_auth: AtomicU64,
// Message metrics
pub messages_received_total: AtomicU64,
pub messages_sent_total: AtomicU64,
pub messages_failed_total: AtomicU64,
// Subscription metrics
pub subscriptions_total: AtomicU64,
pub subscriptions_active: AtomicU64,
pub unsubscriptions_total: AtomicU64,
// Event metrics by type
pub events_received_total: AtomicU64,
pub graph_execution_events: AtomicU64,
pub node_execution_events: AtomicU64,
// Redis metrics
pub redis_messages_received: AtomicU64,
pub redis_messages_ignored: AtomicU64,
// Channel metrics
pub channels_active: RwLock<HashMap<String, usize>>, // channel -> subscriber count
// User metrics
pub active_users: RwLock<HashMap<String, usize>>, // user_id -> connection count
// Error metrics
pub errors_total: AtomicU64,
pub errors_json_parse: AtomicU64,
pub errors_message_size: AtomicU64,
}
#[derive(Serialize, Deserialize)]
pub struct StatsSnapshot {
// Connection metrics
pub connections_total: u64,
pub connections_active: u64,
pub connections_failed_auth: u64,
// Message metrics
pub messages_received_total: u64,
pub messages_sent_total: u64,
pub messages_failed_total: u64,
// Subscription metrics
pub subscriptions_total: u64,
pub subscriptions_active: u64,
pub unsubscriptions_total: u64,
// Event metrics
pub events_received_total: u64,
pub graph_execution_events: u64,
pub node_execution_events: u64,
// Redis metrics
pub redis_messages_received: u64,
pub redis_messages_ignored: u64,
// Channel metrics
pub channels_active_count: usize,
pub total_subscribers: usize,
// User metrics
pub active_users_count: usize,
// Error metrics
pub errors_total: u64,
pub errors_json_parse: u64,
pub errors_message_size: u64,
}
impl Stats {
pub async fn snapshot(&self) -> StatsSnapshot {
// Take read locks for HashMap data - it's ok if this is slightly stale
let channels = self.channels_active.read().await;
let total_subscribers: usize = channels.values().sum();
let channels_active_count = channels.len();
drop(channels); // Release lock early
let users = self.active_users.read().await;
let active_users_count = users.len();
drop(users); // Release lock early
StatsSnapshot {
connections_total: self.connections_total.load(Ordering::Relaxed),
connections_active: self.connections_active.load(Ordering::Relaxed),
connections_failed_auth: self.connections_failed_auth.load(Ordering::Relaxed),
messages_received_total: self.messages_received_total.load(Ordering::Relaxed),
messages_sent_total: self.messages_sent_total.load(Ordering::Relaxed),
messages_failed_total: self.messages_failed_total.load(Ordering::Relaxed),
subscriptions_total: self.subscriptions_total.load(Ordering::Relaxed),
subscriptions_active: self.subscriptions_active.load(Ordering::Relaxed),
unsubscriptions_total: self.unsubscriptions_total.load(Ordering::Relaxed),
events_received_total: self.events_received_total.load(Ordering::Relaxed),
graph_execution_events: self.graph_execution_events.load(Ordering::Relaxed),
node_execution_events: self.node_execution_events.load(Ordering::Relaxed),
redis_messages_received: self.redis_messages_received.load(Ordering::Relaxed),
redis_messages_ignored: self.redis_messages_ignored.load(Ordering::Relaxed),
channels_active_count,
total_subscribers,
active_users_count,
errors_total: self.errors_total.load(Ordering::Relaxed),
errors_json_parse: self.errors_json_parse.load(Ordering::Relaxed),
errors_message_size: self.errors_message_size.load(Ordering::Relaxed),
}
}
pub fn to_prometheus_format(&self, snapshot: &StatsSnapshot) -> String {
let mut output = String::new();
// Connection metrics
output.push_str("# HELP ws_connections_total Total number of WebSocket connections\n");
output.push_str("# TYPE ws_connections_total counter\n");
output.push_str(&format!(
"ws_connections_total {}\n\n",
snapshot.connections_total
));
output.push_str(
"# HELP ws_connections_active Current number of active WebSocket connections\n",
);
output.push_str("# TYPE ws_connections_active gauge\n");
output.push_str(&format!(
"ws_connections_active {}\n\n",
snapshot.connections_active
));
output
.push_str("# HELP ws_connections_failed_auth Total number of failed authentications\n");
output.push_str("# TYPE ws_connections_failed_auth counter\n");
output.push_str(&format!(
"ws_connections_failed_auth {}\n\n",
snapshot.connections_failed_auth
));
// Message metrics
output.push_str(
"# HELP ws_messages_received_total Total number of messages received from clients\n",
);
output.push_str("# TYPE ws_messages_received_total counter\n");
output.push_str(&format!(
"ws_messages_received_total {}\n\n",
snapshot.messages_received_total
));
output.push_str("# HELP ws_messages_sent_total Total number of messages sent to clients\n");
output.push_str("# TYPE ws_messages_sent_total counter\n");
output.push_str(&format!(
"ws_messages_sent_total {}\n\n",
snapshot.messages_sent_total
));
// Subscription metrics
output.push_str("# HELP ws_subscriptions_active Current number of active subscriptions\n");
output.push_str("# TYPE ws_subscriptions_active gauge\n");
output.push_str(&format!(
"ws_subscriptions_active {}\n\n",
snapshot.subscriptions_active
));
// Event metrics
output.push_str(
"# HELP ws_events_received_total Total number of events received from Redis\n",
);
output.push_str("# TYPE ws_events_received_total counter\n");
output.push_str(&format!(
"ws_events_received_total {}\n\n",
snapshot.events_received_total
));
output.push_str(
"# HELP ws_graph_execution_events_total Total number of graph execution events\n",
);
output.push_str("# TYPE ws_graph_execution_events_total counter\n");
output.push_str(&format!(
"ws_graph_execution_events_total {}\n\n",
snapshot.graph_execution_events
));
output.push_str(
"# HELP ws_node_execution_events_total Total number of node execution events\n",
);
output.push_str("# TYPE ws_node_execution_events_total counter\n");
output.push_str(&format!(
"ws_node_execution_events_total {}\n\n",
snapshot.node_execution_events
));
// Channel metrics
output.push_str("# HELP ws_channels_active Number of active channels\n");
output.push_str("# TYPE ws_channels_active gauge\n");
output.push_str(&format!(
"ws_channels_active {}\n\n",
snapshot.channels_active_count
));
output.push_str(
"# HELP ws_total_subscribers Total number of subscribers across all channels\n",
);
output.push_str("# TYPE ws_total_subscribers gauge\n");
output.push_str(&format!(
"ws_total_subscribers {}\n\n",
snapshot.total_subscribers
));
// User metrics
output.push_str("# HELP ws_active_users Number of unique users with active connections\n");
output.push_str("# TYPE ws_active_users gauge\n");
output.push_str(&format!(
"ws_active_users {}\n\n",
snapshot.active_users_count
));
// Error metrics
output.push_str("# HELP ws_errors_total Total number of errors\n");
output.push_str("# TYPE ws_errors_total counter\n");
output.push_str(&format!("ws_errors_total {}\n", snapshot.errors_total));
output
}
}

View File

@@ -0,0 +1,35 @@
import hashlib
import secrets
from typing import NamedTuple
class APIKeyContainer(NamedTuple):
"""Container for API key parts."""
raw: str
prefix: str
postfix: str
hash: str
class APIKeyManager:
PREFIX: str = "agpt_"
PREFIX_LENGTH: int = 8
POSTFIX_LENGTH: int = 8
def generate_api_key(self) -> APIKeyContainer:
"""Generate a new API key with all its parts."""
raw_key = f"{self.PREFIX}{secrets.token_urlsafe(32)}"
return APIKeyContainer(
raw=raw_key,
prefix=raw_key[: self.PREFIX_LENGTH],
postfix=raw_key[-self.POSTFIX_LENGTH :],
hash=hashlib.sha256(raw_key.encode()).hexdigest(),
)
def verify_api_key(self, provided_key: str, stored_hash: str) -> bool:
"""Verify if a provided API key matches the stored hash."""
if not provided_key.startswith(self.PREFIX):
return False
provided_hash = hashlib.sha256(provided_key.encode()).hexdigest()
return secrets.compare_digest(provided_hash, stored_hash)

View File

@@ -1,81 +0,0 @@
import hashlib
import secrets
from typing import NamedTuple
from cryptography.hazmat.primitives.kdf.scrypt import Scrypt
class APIKeyContainer(NamedTuple):
"""Container for API key parts."""
key: str
head: str
tail: str
hash: str
salt: str
class APIKeySmith:
PREFIX: str = "agpt_"
HEAD_LENGTH: int = 8
TAIL_LENGTH: int = 8
def generate_key(self) -> APIKeyContainer:
"""Generate a new API key with secure hashing."""
raw_key = f"{self.PREFIX}{secrets.token_urlsafe(32)}"
hash, salt = self.hash_key(raw_key)
return APIKeyContainer(
key=raw_key,
head=raw_key[: self.HEAD_LENGTH],
tail=raw_key[-self.TAIL_LENGTH :],
hash=hash,
salt=salt,
)
def verify_key(
self, provided_key: str, known_hash: str, known_salt: str | None = None
) -> bool:
"""
Verify an API key against a known hash (+ salt).
Supports verifying both legacy SHA256 and secure Scrypt hashes.
"""
if not provided_key.startswith(self.PREFIX):
return False
# Handle legacy SHA256 hashes (migration support)
if known_salt is None:
legacy_hash = hashlib.sha256(provided_key.encode()).hexdigest()
return secrets.compare_digest(legacy_hash, known_hash)
try:
salt_bytes = bytes.fromhex(known_salt)
provided_hash = self._hash_key_with_salt(provided_key, salt_bytes)
return secrets.compare_digest(provided_hash, known_hash)
except (ValueError, TypeError):
return False
def hash_key(self, raw_key: str) -> tuple[str, str]:
"""Migrate a legacy hash to secure hash format."""
if not raw_key.startswith(self.PREFIX):
raise ValueError("Key without 'agpt_' prefix would fail validation")
salt = self._generate_salt()
hash = self._hash_key_with_salt(raw_key, salt)
return hash, salt.hex()
def _generate_salt(self) -> bytes:
"""Generate a random salt for hashing."""
return secrets.token_bytes(32)
def _hash_key_with_salt(self, raw_key: str, salt: bytes) -> str:
"""Hash API key using Scrypt with salt."""
kdf = Scrypt(
length=32,
salt=salt,
n=2**14, # CPU/memory cost parameter
r=8, # Block size parameter
p=1, # Parallelization parameter
)
key_hash = kdf.derive(raw_key.encode())
return key_hash.hex()

View File

@@ -1,79 +0,0 @@
import hashlib
from autogpt_libs.api_key.keysmith import APIKeySmith
def test_generate_api_key():
keysmith = APIKeySmith()
key = keysmith.generate_key()
assert key.key.startswith(keysmith.PREFIX)
assert key.head == key.key[: keysmith.HEAD_LENGTH]
assert key.tail == key.key[-keysmith.TAIL_LENGTH :]
assert len(key.hash) == 64 # 32 bytes hex encoded
assert len(key.salt) == 64 # 32 bytes hex encoded
def test_verify_new_secure_key():
keysmith = APIKeySmith()
key = keysmith.generate_key()
# Test correct key validates
assert keysmith.verify_key(key.key, key.hash, key.salt) is True
# Test wrong key fails
wrong_key = f"{keysmith.PREFIX}wrongkey123"
assert keysmith.verify_key(wrong_key, key.hash, key.salt) is False
def test_verify_legacy_key():
keysmith = APIKeySmith()
legacy_key = f"{keysmith.PREFIX}legacykey123"
legacy_hash = hashlib.sha256(legacy_key.encode()).hexdigest()
# Test legacy key validates without salt
assert keysmith.verify_key(legacy_key, legacy_hash) is True
# Test wrong legacy key fails
wrong_key = f"{keysmith.PREFIX}wronglegacy"
assert keysmith.verify_key(wrong_key, legacy_hash) is False
def test_rehash_existing_key():
keysmith = APIKeySmith()
legacy_key = f"{keysmith.PREFIX}migratekey123"
# Migrate the legacy key
new_hash, new_salt = keysmith.hash_key(legacy_key)
# Verify migrated key works
assert keysmith.verify_key(legacy_key, new_hash, new_salt) is True
# Verify different key fails with migrated hash
wrong_key = f"{keysmith.PREFIX}wrongkey"
assert keysmith.verify_key(wrong_key, new_hash, new_salt) is False
def test_invalid_key_prefix():
keysmith = APIKeySmith()
key = keysmith.generate_key()
# Test key without proper prefix fails
invalid_key = "invalid_prefix_key"
assert keysmith.verify_key(invalid_key, key.hash, key.salt) is False
def test_secure_hash_requires_salt():
keysmith = APIKeySmith()
key = keysmith.generate_key()
# Secure hash without salt should fail
assert keysmith.verify_key(key.key, key.hash) is False
def test_invalid_salt_format():
keysmith = APIKeySmith()
key = keysmith.generate_key()
# Invalid salt format should fail gracefully
assert keysmith.verify_key(key.key, key.hash, "invalid_hex") is False

View File

@@ -1,19 +1,13 @@
from .config import verify_settings
from .dependencies import (
get_optional_user_id,
get_user_id,
requires_admin_user,
requires_user,
)
from .helpers import add_auth_responses_to_openapi
from .depends import requires_admin_user, requires_user
from .jwt_utils import parse_jwt_token
from .middleware import APIKeyValidator, auth_middleware
from .models import User
__all__ = [
"verify_settings",
"get_user_id",
"requires_admin_user",
"parse_jwt_token",
"requires_user",
"get_optional_user_id",
"add_auth_responses_to_openapi",
"requires_admin_user",
"APIKeyValidator",
"auth_middleware",
"User",
]

View File

@@ -1,90 +1,15 @@
import logging
import os
from jwt.algorithms import get_default_algorithms, has_crypto
logger = logging.getLogger(__name__)
class AuthConfigError(ValueError):
"""Raised when authentication configuration is invalid."""
pass
ALGO_RECOMMENDATION = (
"We highly recommend using an asymmetric algorithm such as ES256, "
"because when leaked, a shared secret would allow anyone to "
"forge valid tokens and impersonate users. "
"More info: https://supabase.com/docs/guides/auth/signing-keys#choosing-the-right-signing-algorithm" # noqa
)
class Settings:
def __init__(self):
self.JWT_VERIFY_KEY: str = os.getenv(
"JWT_VERIFY_KEY", os.getenv("SUPABASE_JWT_SECRET", "")
).strip()
self.JWT_ALGORITHM: str = os.getenv("JWT_SIGN_ALGORITHM", "HS256").strip()
self.JWT_SECRET_KEY: str = os.getenv("SUPABASE_JWT_SECRET", "")
self.ENABLE_AUTH: bool = os.getenv("ENABLE_AUTH", "false").lower() == "true"
self.JWT_ALGORITHM: str = "HS256"
self.validate()
def validate(self):
if not self.JWT_VERIFY_KEY:
raise AuthConfigError(
"JWT_VERIFY_KEY must be set. "
"An empty JWT secret would allow anyone to forge valid tokens."
)
if len(self.JWT_VERIFY_KEY) < 32:
logger.warning(
"⚠️ JWT_VERIFY_KEY appears weak (less than 32 characters). "
"Consider using a longer, cryptographically secure secret."
)
supported_algorithms = get_default_algorithms().keys()
if not has_crypto:
logger.warning(
"⚠️ Asymmetric JWT verification is not available "
"because the 'cryptography' package is not installed. "
+ ALGO_RECOMMENDATION
)
if (
self.JWT_ALGORITHM not in supported_algorithms
or self.JWT_ALGORITHM == "none"
):
raise AuthConfigError(
f"Invalid JWT_SIGN_ALGORITHM: '{self.JWT_ALGORITHM}'. "
"Supported algorithms are listed on "
"https://pyjwt.readthedocs.io/en/stable/algorithms.html"
)
if self.JWT_ALGORITHM.startswith("HS"):
logger.warning(
f"⚠️ JWT_SIGN_ALGORITHM is set to '{self.JWT_ALGORITHM}', "
"a symmetric shared-key signature algorithm. " + ALGO_RECOMMENDATION
)
@property
def is_configured(self) -> bool:
return bool(self.JWT_SECRET_KEY)
_settings: Settings = None # type: ignore
def get_settings() -> Settings:
global _settings
if not _settings:
_settings = Settings()
return _settings
def verify_settings() -> None:
global _settings
if not _settings:
_settings = Settings() # calls validation indirectly
return
_settings.validate()
settings = Settings()

View File

@@ -1,306 +0,0 @@
"""
Comprehensive tests for auth configuration to ensure 100% line and branch coverage.
These tests verify critical security checks preventing JWT token forgery.
"""
import logging
import os
import pytest
from pytest_mock import MockerFixture
from autogpt_libs.auth.config import AuthConfigError, Settings
def test_environment_variable_precedence(mocker: MockerFixture):
"""Test that environment variables take precedence over defaults."""
secret = "environment-secret-key-with-proper-length-123456"
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": secret}, clear=True)
settings = Settings()
assert settings.JWT_VERIFY_KEY == secret
def test_environment_variable_backwards_compatible(mocker: MockerFixture):
"""Test that SUPABASE_JWT_SECRET is read if JWT_VERIFY_KEY is not set."""
secret = "environment-secret-key-with-proper-length-123456"
mocker.patch.dict(os.environ, {"SUPABASE_JWT_SECRET": secret}, clear=True)
settings = Settings()
assert settings.JWT_VERIFY_KEY == secret
def test_auth_config_error_inheritance():
"""Test that AuthConfigError is properly defined as an Exception."""
assert issubclass(AuthConfigError, Exception)
error = AuthConfigError("test message")
assert str(error) == "test message"
def test_settings_static_after_creation(mocker: MockerFixture):
"""Test that settings maintain their values after creation."""
secret = "immutable-secret-key-with-proper-length-12345"
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": secret}, clear=True)
settings = Settings()
original_secret = settings.JWT_VERIFY_KEY
# Changing environment after creation shouldn't affect settings
os.environ["JWT_VERIFY_KEY"] = "different-secret"
assert settings.JWT_VERIFY_KEY == original_secret
def test_settings_load_with_valid_secret(mocker: MockerFixture):
"""Test auth enabled with a valid JWT secret."""
valid_secret = "a" * 32 # 32 character secret
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": valid_secret}, clear=True)
settings = Settings()
assert settings.JWT_VERIFY_KEY == valid_secret
def test_settings_load_with_strong_secret(mocker: MockerFixture):
"""Test auth enabled with a cryptographically strong secret."""
strong_secret = "super-secret-jwt-token-with-at-least-32-characters-long"
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": strong_secret}, clear=True)
settings = Settings()
assert settings.JWT_VERIFY_KEY == strong_secret
assert len(settings.JWT_VERIFY_KEY) >= 32
def test_secret_empty_raises_error(mocker: MockerFixture):
"""Test that auth enabled with empty secret raises AuthConfigError."""
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": ""}, clear=True)
with pytest.raises(Exception) as exc_info:
Settings()
assert "JWT_VERIFY_KEY" in str(exc_info.value)
def test_secret_missing_raises_error(mocker: MockerFixture):
"""Test that auth enabled without secret env var raises AuthConfigError."""
mocker.patch.dict(os.environ, {}, clear=True)
with pytest.raises(Exception) as exc_info:
Settings()
assert "JWT_VERIFY_KEY" in str(exc_info.value)
@pytest.mark.parametrize("secret", [" ", " ", "\t", "\n", " \t\n "])
def test_secret_only_whitespace_raises_error(mocker: MockerFixture, secret: str):
"""Test that auth enabled with whitespace-only secret raises error."""
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": secret}, clear=True)
with pytest.raises(ValueError):
Settings()
def test_secret_weak_logs_warning(
mocker: MockerFixture, caplog: pytest.LogCaptureFixture
):
"""Test that weak JWT secret triggers warning log."""
weak_secret = "short" # Less than 32 characters
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": weak_secret}, clear=True)
with caplog.at_level(logging.WARNING):
settings = Settings()
assert settings.JWT_VERIFY_KEY == weak_secret
assert "key appears weak" in caplog.text.lower()
assert "less than 32 characters" in caplog.text
def test_secret_31_char_logs_warning(
mocker: MockerFixture, caplog: pytest.LogCaptureFixture
):
"""Test that 31-character secret triggers warning (boundary test)."""
secret_31 = "a" * 31 # Exactly 31 characters
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": secret_31}, clear=True)
with caplog.at_level(logging.WARNING):
settings = Settings()
assert len(settings.JWT_VERIFY_KEY) == 31
assert "key appears weak" in caplog.text.lower()
def test_secret_32_char_no_warning(
mocker: MockerFixture, caplog: pytest.LogCaptureFixture
):
"""Test that 32-character secret does not trigger warning (boundary test)."""
secret_32 = "a" * 32 # Exactly 32 characters
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": secret_32}, clear=True)
with caplog.at_level(logging.WARNING):
settings = Settings()
assert len(settings.JWT_VERIFY_KEY) == 32
assert "JWT secret appears weak" not in caplog.text
def test_secret_whitespace_stripped(mocker: MockerFixture):
"""Test that JWT secret whitespace is stripped."""
secret = "a" * 32
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": f" {secret} "}, clear=True)
settings = Settings()
assert settings.JWT_VERIFY_KEY == secret
def test_secret_with_special_characters(mocker: MockerFixture):
"""Test JWT secret with special characters."""
special_secret = "!@#$%^&*()_+-=[]{}|;:,.<>?`~" + "a" * 10 # 40 chars total
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": special_secret}, clear=True)
settings = Settings()
assert settings.JWT_VERIFY_KEY == special_secret
def test_secret_with_unicode(mocker: MockerFixture):
"""Test JWT secret with unicode characters."""
unicode_secret = "秘密🔐キー" + "a" * 25 # Ensure >32 bytes
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": unicode_secret}, clear=True)
settings = Settings()
assert settings.JWT_VERIFY_KEY == unicode_secret
def test_secret_very_long(mocker: MockerFixture):
"""Test JWT secret with excessive length."""
long_secret = "a" * 1000 # 1000 character secret
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": long_secret}, clear=True)
settings = Settings()
assert settings.JWT_VERIFY_KEY == long_secret
assert len(settings.JWT_VERIFY_KEY) == 1000
def test_secret_with_newline(mocker: MockerFixture):
"""Test JWT secret containing newlines."""
multiline_secret = "secret\nwith\nnewlines" + "a" * 20
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": multiline_secret}, clear=True)
settings = Settings()
assert settings.JWT_VERIFY_KEY == multiline_secret
def test_secret_base64_encoded(mocker: MockerFixture):
"""Test JWT secret that looks like base64."""
base64_secret = "dGhpc19pc19hX3NlY3JldF9rZXlfd2l0aF9wcm9wZXJfbGVuZ3Ro"
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": base64_secret}, clear=True)
settings = Settings()
assert settings.JWT_VERIFY_KEY == base64_secret
def test_secret_numeric_only(mocker: MockerFixture):
"""Test JWT secret with only numbers."""
numeric_secret = "1234567890" * 4 # 40 character numeric secret
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": numeric_secret}, clear=True)
settings = Settings()
assert settings.JWT_VERIFY_KEY == numeric_secret
def test_algorithm_default_hs256(mocker: MockerFixture):
"""Test that JWT algorithm defaults to HS256."""
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": "a" * 32}, clear=True)
settings = Settings()
assert settings.JWT_ALGORITHM == "HS256"
def test_algorithm_whitespace_stripped(mocker: MockerFixture):
"""Test that JWT algorithm whitespace is stripped."""
secret = "a" * 32
mocker.patch.dict(
os.environ,
{"JWT_VERIFY_KEY": secret, "JWT_SIGN_ALGORITHM": " HS256 "},
clear=True,
)
settings = Settings()
assert settings.JWT_ALGORITHM == "HS256"
def test_no_crypto_warning(mocker: MockerFixture, caplog: pytest.LogCaptureFixture):
"""Test warning when crypto package is not available."""
secret = "a" * 32
mocker.patch.dict(os.environ, {"JWT_VERIFY_KEY": secret}, clear=True)
# Mock has_crypto to return False
mocker.patch("autogpt_libs.auth.config.has_crypto", False)
with caplog.at_level(logging.WARNING):
Settings()
assert "Asymmetric JWT verification is not available" in caplog.text
assert "cryptography" in caplog.text
def test_algorithm_invalid_raises_error(mocker: MockerFixture):
"""Test that invalid JWT algorithm raises AuthConfigError."""
secret = "a" * 32
mocker.patch.dict(
os.environ,
{"JWT_VERIFY_KEY": secret, "JWT_SIGN_ALGORITHM": "INVALID_ALG"},
clear=True,
)
with pytest.raises(AuthConfigError) as exc_info:
Settings()
assert "Invalid JWT_SIGN_ALGORITHM" in str(exc_info.value)
assert "INVALID_ALG" in str(exc_info.value)
def test_algorithm_none_raises_error(mocker: MockerFixture):
"""Test that 'none' algorithm raises AuthConfigError."""
secret = "a" * 32
mocker.patch.dict(
os.environ,
{"JWT_VERIFY_KEY": secret, "JWT_SIGN_ALGORITHM": "none"},
clear=True,
)
with pytest.raises(AuthConfigError) as exc_info:
Settings()
assert "Invalid JWT_SIGN_ALGORITHM" in str(exc_info.value)
@pytest.mark.parametrize("algorithm", ["HS256", "HS384", "HS512"])
def test_algorithm_symmetric_warning(
mocker: MockerFixture, caplog: pytest.LogCaptureFixture, algorithm: str
):
"""Test warning for symmetric algorithms (HS256, HS384, HS512)."""
secret = "a" * 32
mocker.patch.dict(
os.environ,
{"JWT_VERIFY_KEY": secret, "JWT_SIGN_ALGORITHM": algorithm},
clear=True,
)
with caplog.at_level(logging.WARNING):
settings = Settings()
assert algorithm in caplog.text
assert "symmetric shared-key signature algorithm" in caplog.text
assert settings.JWT_ALGORITHM == algorithm
@pytest.mark.parametrize(
"algorithm",
["ES256", "ES384", "ES512", "RS256", "RS384", "RS512", "PS256", "PS384", "PS512"],
)
def test_algorithm_asymmetric_no_warning(
mocker: MockerFixture, caplog: pytest.LogCaptureFixture, algorithm: str
):
"""Test that asymmetric algorithms do not trigger warning."""
secret = "a" * 32
mocker.patch.dict(
os.environ,
{"JWT_VERIFY_KEY": secret, "JWT_SIGN_ALGORITHM": algorithm},
clear=True,
)
with caplog.at_level(logging.WARNING):
settings = Settings()
# Should not contain the symmetric algorithm warning
assert "symmetric shared-key signature algorithm" not in caplog.text
assert settings.JWT_ALGORITHM == algorithm

View File

@@ -1,117 +0,0 @@
"""
FastAPI dependency functions for JWT-based authentication and authorization.
These are the high-level dependency functions used in route definitions.
"""
import logging
import fastapi
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from .jwt_utils import get_jwt_payload, verify_user
from .models import User
optional_bearer = HTTPBearer(auto_error=False)
# Header name for admin impersonation
IMPERSONATION_HEADER_NAME = "X-Act-As-User-Id"
logger = logging.getLogger(__name__)
def get_optional_user_id(
credentials: HTTPAuthorizationCredentials | None = fastapi.Security(
optional_bearer
),
) -> str | None:
"""
Attempts to extract the user ID ("sub" claim) from a Bearer JWT if provided.
This dependency allows for both authenticated and anonymous access. If a valid bearer token is
supplied, it parses the JWT and extracts the user ID. If the token is missing or invalid, it returns None,
treating the request as anonymous.
Args:
credentials: Optional HTTPAuthorizationCredentials object from FastAPI Security dependency.
Returns:
The user ID (str) extracted from the JWT "sub" claim, or None if no valid token is present.
"""
if not credentials:
return None
try:
# Parse JWT token to get user ID
from autogpt_libs.auth.jwt_utils import parse_jwt_token
payload = parse_jwt_token(credentials.credentials)
return payload.get("sub")
except Exception as e:
logger.debug(f"Auth token validation failed (anonymous access): {e}")
return None
async def requires_user(jwt_payload: dict = fastapi.Security(get_jwt_payload)) -> User:
"""
FastAPI dependency that requires a valid authenticated user.
Raises:
HTTPException: 401 for authentication failures
"""
return verify_user(jwt_payload, admin_only=False)
async def requires_admin_user(
jwt_payload: dict = fastapi.Security(get_jwt_payload),
) -> User:
"""
FastAPI dependency that requires a valid admin user.
Raises:
HTTPException: 401 for authentication failures, 403 for insufficient permissions
"""
return verify_user(jwt_payload, admin_only=True)
async def get_user_id(
request: fastapi.Request, jwt_payload: dict = fastapi.Security(get_jwt_payload)
) -> str:
"""
FastAPI dependency that returns the ID of the authenticated user.
Supports admin impersonation via X-Act-As-User-Id header:
- If the header is present and user is admin, returns the impersonated user ID
- Otherwise returns the authenticated user's own ID
- Logs all impersonation actions for audit trail
Raises:
HTTPException: 401 for authentication failures or missing user ID
HTTPException: 403 if non-admin tries to use impersonation
"""
# Get the authenticated user's ID from JWT
user_id = jwt_payload.get("sub")
if not user_id:
raise fastapi.HTTPException(
status_code=401, detail="User ID not found in token"
)
# Check for admin impersonation header
impersonate_header = request.headers.get(IMPERSONATION_HEADER_NAME, "").strip()
if impersonate_header:
# Verify the authenticated user is an admin
authenticated_user = verify_user(jwt_payload, admin_only=False)
if authenticated_user.role != "admin":
raise fastapi.HTTPException(
status_code=403, detail="Only admin users can impersonate other users"
)
# Log the impersonation for audit trail
logger.info(
f"Admin impersonation: {authenticated_user.user_id} ({authenticated_user.email}) "
f"acting as user {impersonate_header} for requesting {request.method} {request.url}"
)
return impersonate_header
return user_id

View File

@@ -1,554 +0,0 @@
"""
Comprehensive integration tests for authentication dependencies.
Tests the full authentication flow from HTTP requests to user validation.
"""
import os
from unittest.mock import Mock
import pytest
from fastapi import FastAPI, HTTPException, Request, Security
from fastapi.testclient import TestClient
from pytest_mock import MockerFixture
from autogpt_libs.auth.dependencies import (
get_user_id,
requires_admin_user,
requires_user,
)
from autogpt_libs.auth.models import User
class TestAuthDependencies:
"""Test suite for authentication dependency functions."""
@pytest.fixture
def app(self):
"""Create a test FastAPI application."""
app = FastAPI()
@app.get("/user")
def get_user_endpoint(user: User = Security(requires_user)):
return {"user_id": user.user_id, "role": user.role}
@app.get("/admin")
def get_admin_endpoint(user: User = Security(requires_admin_user)):
return {"user_id": user.user_id, "role": user.role}
@app.get("/user-id")
def get_user_id_endpoint(user_id: str = Security(get_user_id)):
return {"user_id": user_id}
return app
@pytest.fixture
def client(self, app):
"""Create a test client."""
return TestClient(app)
@pytest.mark.asyncio
async def test_requires_user_with_valid_jwt_payload(self, mocker: MockerFixture):
"""Test requires_user with valid JWT payload."""
jwt_payload = {"sub": "user-123", "role": "user", "email": "user@example.com"}
# Mock get_jwt_payload to return our test payload
mocker.patch(
"autogpt_libs.auth.dependencies.get_jwt_payload", return_value=jwt_payload
)
user = await requires_user(jwt_payload)
assert isinstance(user, User)
assert user.user_id == "user-123"
assert user.role == "user"
@pytest.mark.asyncio
async def test_requires_user_with_admin_jwt_payload(self, mocker: MockerFixture):
"""Test requires_user accepts admin users."""
jwt_payload = {
"sub": "admin-456",
"role": "admin",
"email": "admin@example.com",
}
mocker.patch(
"autogpt_libs.auth.dependencies.get_jwt_payload", return_value=jwt_payload
)
user = await requires_user(jwt_payload)
assert user.user_id == "admin-456"
assert user.role == "admin"
@pytest.mark.asyncio
async def test_requires_user_missing_sub(self):
"""Test requires_user with missing user ID."""
jwt_payload = {"role": "user", "email": "user@example.com"}
with pytest.raises(HTTPException) as exc_info:
await requires_user(jwt_payload)
assert exc_info.value.status_code == 401
assert "User ID not found" in exc_info.value.detail
@pytest.mark.asyncio
async def test_requires_user_empty_sub(self):
"""Test requires_user with empty user ID."""
jwt_payload = {"sub": "", "role": "user"}
with pytest.raises(HTTPException) as exc_info:
await requires_user(jwt_payload)
assert exc_info.value.status_code == 401
@pytest.mark.asyncio
async def test_requires_admin_user_with_admin(self, mocker: MockerFixture):
"""Test requires_admin_user with admin role."""
jwt_payload = {
"sub": "admin-789",
"role": "admin",
"email": "admin@example.com",
}
mocker.patch(
"autogpt_libs.auth.dependencies.get_jwt_payload", return_value=jwt_payload
)
user = await requires_admin_user(jwt_payload)
assert user.user_id == "admin-789"
assert user.role == "admin"
@pytest.mark.asyncio
async def test_requires_admin_user_with_regular_user(self):
"""Test requires_admin_user rejects regular users."""
jwt_payload = {"sub": "user-123", "role": "user", "email": "user@example.com"}
with pytest.raises(HTTPException) as exc_info:
await requires_admin_user(jwt_payload)
assert exc_info.value.status_code == 403
assert "Admin access required" in exc_info.value.detail
@pytest.mark.asyncio
async def test_requires_admin_user_missing_role(self):
"""Test requires_admin_user with missing role."""
jwt_payload = {"sub": "user-123", "email": "user@example.com"}
with pytest.raises(KeyError):
await requires_admin_user(jwt_payload)
@pytest.mark.asyncio
async def test_get_user_id_with_valid_payload(self, mocker: MockerFixture):
"""Test get_user_id extracts user ID correctly."""
request = Mock(spec=Request)
request.headers = {}
jwt_payload = {"sub": "user-id-xyz", "role": "user"}
mocker.patch(
"autogpt_libs.auth.dependencies.get_jwt_payload", return_value=jwt_payload
)
user_id = await get_user_id(request, jwt_payload)
assert user_id == "user-id-xyz"
@pytest.mark.asyncio
async def test_get_user_id_missing_sub(self):
"""Test get_user_id with missing user ID."""
request = Mock(spec=Request)
request.headers = {}
jwt_payload = {"role": "user"}
with pytest.raises(HTTPException) as exc_info:
await get_user_id(request, jwt_payload)
assert exc_info.value.status_code == 401
assert "User ID not found" in exc_info.value.detail
@pytest.mark.asyncio
async def test_get_user_id_none_sub(self):
"""Test get_user_id with None user ID."""
request = Mock(spec=Request)
request.headers = {}
jwt_payload = {"sub": None, "role": "user"}
with pytest.raises(HTTPException) as exc_info:
await get_user_id(request, jwt_payload)
assert exc_info.value.status_code == 401
class TestAuthDependenciesIntegration:
"""Integration tests for auth dependencies with FastAPI."""
acceptable_jwt_secret = "test-secret-with-proper-length-123456"
@pytest.fixture
def create_token(self, mocker: MockerFixture):
"""Helper to create JWT tokens."""
import jwt
mocker.patch.dict(
os.environ,
{"JWT_VERIFY_KEY": self.acceptable_jwt_secret},
clear=True,
)
def _create_token(payload, secret=self.acceptable_jwt_secret):
return jwt.encode(payload, secret, algorithm="HS256")
return _create_token
@pytest.mark.asyncio
async def test_endpoint_auth_enabled_no_token(self):
"""Test endpoints require token when auth is enabled."""
app = FastAPI()
@app.get("/test")
def test_endpoint(user: User = Security(requires_user)):
return {"user_id": user.user_id}
client = TestClient(app)
# Should fail without auth header
response = client.get("/test")
assert response.status_code == 401
@pytest.mark.asyncio
async def test_endpoint_with_valid_token(self, create_token):
"""Test endpoint with valid JWT token."""
app = FastAPI()
@app.get("/test")
def test_endpoint(user: User = Security(requires_user)):
return {"user_id": user.user_id, "role": user.role}
client = TestClient(app)
token = create_token(
{"sub": "test-user", "role": "user", "aud": "authenticated"},
secret=self.acceptable_jwt_secret,
)
response = client.get("/test", headers={"Authorization": f"Bearer {token}"})
assert response.status_code == 200
assert response.json()["user_id"] == "test-user"
@pytest.mark.asyncio
async def test_admin_endpoint_requires_admin_role(self, create_token):
"""Test admin endpoint rejects non-admin users."""
app = FastAPI()
@app.get("/admin")
def admin_endpoint(user: User = Security(requires_admin_user)):
return {"user_id": user.user_id}
client = TestClient(app)
# Regular user token
user_token = create_token(
{"sub": "regular-user", "role": "user", "aud": "authenticated"},
secret=self.acceptable_jwt_secret,
)
response = client.get(
"/admin", headers={"Authorization": f"Bearer {user_token}"}
)
assert response.status_code == 403
# Admin token
admin_token = create_token(
{"sub": "admin-user", "role": "admin", "aud": "authenticated"},
secret=self.acceptable_jwt_secret,
)
response = client.get(
"/admin", headers={"Authorization": f"Bearer {admin_token}"}
)
assert response.status_code == 200
assert response.json()["user_id"] == "admin-user"
class TestAuthDependenciesEdgeCases:
"""Edge case tests for authentication dependencies."""
@pytest.mark.asyncio
async def test_dependency_with_complex_payload(self):
"""Test dependencies handle complex JWT payloads."""
complex_payload = {
"sub": "user-123",
"role": "admin",
"email": "test@example.com",
"app_metadata": {"provider": "email", "providers": ["email"]},
"user_metadata": {
"full_name": "Test User",
"avatar_url": "https://example.com/avatar.jpg",
},
"aud": "authenticated",
"iat": 1234567890,
"exp": 9999999999,
}
user = await requires_user(complex_payload)
assert user.user_id == "user-123"
assert user.email == "test@example.com"
admin = await requires_admin_user(complex_payload)
assert admin.role == "admin"
@pytest.mark.asyncio
async def test_dependency_with_unicode_in_payload(self):
"""Test dependencies handle unicode in JWT payloads."""
unicode_payload = {
"sub": "user-😀-123",
"role": "user",
"email": "测试@example.com",
"name": "日本語",
}
user = await requires_user(unicode_payload)
assert "😀" in user.user_id
assert user.email == "测试@example.com"
@pytest.mark.asyncio
async def test_dependency_with_null_values(self):
"""Test dependencies handle null values in payload."""
null_payload = {
"sub": "user-123",
"role": "user",
"email": None,
"phone": None,
"metadata": None,
}
user = await requires_user(null_payload)
assert user.user_id == "user-123"
assert user.email is None
@pytest.mark.asyncio
async def test_concurrent_requests_isolation(self):
"""Test that concurrent requests don't interfere with each other."""
payload1 = {"sub": "user-1", "role": "user"}
payload2 = {"sub": "user-2", "role": "admin"}
# Simulate concurrent processing
user1 = await requires_user(payload1)
user2 = await requires_admin_user(payload2)
assert user1.user_id == "user-1"
assert user2.user_id == "user-2"
assert user1.role == "user"
assert user2.role == "admin"
@pytest.mark.parametrize(
"payload,expected_error,admin_only",
[
(None, "Authorization header is missing", False),
({}, "User ID not found", False),
({"sub": ""}, "User ID not found", False),
({"role": "user"}, "User ID not found", False),
({"sub": "user", "role": "user"}, "Admin access required", True),
],
)
@pytest.mark.asyncio
async def test_dependency_error_cases(
self, payload, expected_error: str, admin_only: bool
):
"""Test that errors propagate correctly through dependencies."""
# Import verify_user to test it directly since dependencies use FastAPI Security
from autogpt_libs.auth.jwt_utils import verify_user
with pytest.raises(HTTPException) as exc_info:
verify_user(payload, admin_only=admin_only)
assert expected_error in exc_info.value.detail
@pytest.mark.asyncio
async def test_dependency_valid_user(self):
"""Test valid user case for dependency."""
# Import verify_user to test it directly since dependencies use FastAPI Security
from autogpt_libs.auth.jwt_utils import verify_user
# Valid case
user = verify_user({"sub": "user", "role": "user"}, admin_only=False)
assert user.user_id == "user"
class TestAdminImpersonation:
"""Test suite for admin user impersonation functionality."""
@pytest.mark.asyncio
async def test_admin_impersonation_success(self, mocker: MockerFixture):
"""Test admin successfully impersonating another user."""
request = Mock(spec=Request)
request.headers = {"X-Act-As-User-Id": "target-user-123"}
jwt_payload = {
"sub": "admin-456",
"role": "admin",
"email": "admin@example.com",
}
# Mock verify_user to return admin user data
mock_verify_user = mocker.patch("autogpt_libs.auth.dependencies.verify_user")
mock_verify_user.return_value = Mock(
user_id="admin-456", email="admin@example.com", role="admin"
)
# Mock logger to verify audit logging
mock_logger = mocker.patch("autogpt_libs.auth.dependencies.logger")
mocker.patch(
"autogpt_libs.auth.dependencies.get_jwt_payload", return_value=jwt_payload
)
user_id = await get_user_id(request, jwt_payload)
# Should return the impersonated user ID
assert user_id == "target-user-123"
# Should log the impersonation attempt
mock_logger.info.assert_called_once()
log_call = mock_logger.info.call_args[0][0]
assert "Admin impersonation:" in log_call
assert "admin@example.com" in log_call
assert "target-user-123" in log_call
@pytest.mark.asyncio
async def test_non_admin_impersonation_attempt(self, mocker: MockerFixture):
"""Test non-admin user attempting impersonation returns 403."""
request = Mock(spec=Request)
request.headers = {"X-Act-As-User-Id": "target-user-123"}
jwt_payload = {
"sub": "regular-user",
"role": "user",
"email": "user@example.com",
}
# Mock verify_user to return regular user data
mock_verify_user = mocker.patch("autogpt_libs.auth.dependencies.verify_user")
mock_verify_user.return_value = Mock(
user_id="regular-user", email="user@example.com", role="user"
)
mocker.patch(
"autogpt_libs.auth.dependencies.get_jwt_payload", return_value=jwt_payload
)
with pytest.raises(HTTPException) as exc_info:
await get_user_id(request, jwt_payload)
assert exc_info.value.status_code == 403
assert "Only admin users can impersonate other users" in exc_info.value.detail
@pytest.mark.asyncio
async def test_impersonation_empty_header(self, mocker: MockerFixture):
"""Test impersonation with empty header falls back to regular user ID."""
request = Mock(spec=Request)
request.headers = {"X-Act-As-User-Id": ""}
jwt_payload = {
"sub": "admin-456",
"role": "admin",
"email": "admin@example.com",
}
mocker.patch(
"autogpt_libs.auth.dependencies.get_jwt_payload", return_value=jwt_payload
)
user_id = await get_user_id(request, jwt_payload)
# Should fall back to the admin's own user ID
assert user_id == "admin-456"
@pytest.mark.asyncio
async def test_impersonation_missing_header(self, mocker: MockerFixture):
"""Test normal behavior when impersonation header is missing."""
request = Mock(spec=Request)
request.headers = {} # No impersonation header
jwt_payload = {
"sub": "admin-456",
"role": "admin",
"email": "admin@example.com",
}
mocker.patch(
"autogpt_libs.auth.dependencies.get_jwt_payload", return_value=jwt_payload
)
user_id = await get_user_id(request, jwt_payload)
# Should return the admin's own user ID
assert user_id == "admin-456"
@pytest.mark.asyncio
async def test_impersonation_audit_logging_details(self, mocker: MockerFixture):
"""Test that impersonation audit logging includes all required details."""
request = Mock(spec=Request)
request.headers = {"X-Act-As-User-Id": "victim-user-789"}
jwt_payload = {
"sub": "admin-999",
"role": "admin",
"email": "superadmin@company.com",
}
# Mock verify_user to return admin user data
mock_verify_user = mocker.patch("autogpt_libs.auth.dependencies.verify_user")
mock_verify_user.return_value = Mock(
user_id="admin-999", email="superadmin@company.com", role="admin"
)
# Mock logger to capture audit trail
mock_logger = mocker.patch("autogpt_libs.auth.dependencies.logger")
mocker.patch(
"autogpt_libs.auth.dependencies.get_jwt_payload", return_value=jwt_payload
)
user_id = await get_user_id(request, jwt_payload)
# Verify all audit details are logged
assert user_id == "victim-user-789"
mock_logger.info.assert_called_once()
log_message = mock_logger.info.call_args[0][0]
assert "Admin impersonation:" in log_message
assert "superadmin@company.com" in log_message
assert "victim-user-789" in log_message
@pytest.mark.asyncio
async def test_impersonation_header_case_sensitivity(self, mocker: MockerFixture):
"""Test that impersonation header is case-sensitive."""
request = Mock(spec=Request)
# Use wrong case - should not trigger impersonation
request.headers = {"x-act-as-user-id": "target-user-123"}
jwt_payload = {
"sub": "admin-456",
"role": "admin",
"email": "admin@example.com",
}
mocker.patch(
"autogpt_libs.auth.dependencies.get_jwt_payload", return_value=jwt_payload
)
user_id = await get_user_id(request, jwt_payload)
# Should fall back to admin's own ID (header case mismatch)
assert user_id == "admin-456"
@pytest.mark.asyncio
async def test_impersonation_with_whitespace_header(self, mocker: MockerFixture):
"""Test impersonation with whitespace in header value."""
request = Mock(spec=Request)
request.headers = {"X-Act-As-User-Id": " target-user-123 "}
jwt_payload = {
"sub": "admin-456",
"role": "admin",
"email": "admin@example.com",
}
# Mock verify_user to return admin user data
mock_verify_user = mocker.patch("autogpt_libs.auth.dependencies.verify_user")
mock_verify_user.return_value = Mock(
user_id="admin-456", email="admin@example.com", role="admin"
)
# Mock logger
mock_logger = mocker.patch("autogpt_libs.auth.dependencies.logger")
mocker.patch(
"autogpt_libs.auth.dependencies.get_jwt_payload", return_value=jwt_payload
)
user_id = await get_user_id(request, jwt_payload)
# Should strip whitespace and impersonate successfully
assert user_id == "target-user-123"
mock_logger.info.assert_called_once()

View File

@@ -0,0 +1,46 @@
import fastapi
from .config import settings
from .middleware import auth_middleware
from .models import DEFAULT_USER_ID, User
def requires_user(payload: dict = fastapi.Depends(auth_middleware)) -> User:
return verify_user(payload, admin_only=False)
def requires_admin_user(
payload: dict = fastapi.Depends(auth_middleware),
) -> User:
return verify_user(payload, admin_only=True)
def verify_user(payload: dict | None, admin_only: bool) -> User:
if not payload:
if settings.ENABLE_AUTH:
raise fastapi.HTTPException(
status_code=401, detail="Authorization header is missing"
)
# This handles the case when authentication is disabled
payload = {"sub": DEFAULT_USER_ID, "role": "admin"}
user_id = payload.get("sub")
if not user_id:
raise fastapi.HTTPException(
status_code=401, detail="User ID not found in token"
)
if admin_only and payload["role"] != "admin":
raise fastapi.HTTPException(status_code=403, detail="Admin access required")
return User.from_payload(payload)
def get_user_id(payload: dict = fastapi.Depends(auth_middleware)) -> str:
user_id = payload.get("sub")
if not user_id:
raise fastapi.HTTPException(
status_code=401, detail="User ID not found in token"
)
return user_id

View File

@@ -0,0 +1,68 @@
import pytest
from .depends import requires_admin_user, requires_user, verify_user
def test_verify_user_no_payload():
user = verify_user(None, admin_only=False)
assert user.user_id == "3e53486c-cf57-477e-ba2a-cb02dc828e1a"
assert user.role == "admin"
def test_verify_user_no_user_id():
with pytest.raises(Exception):
verify_user({"role": "admin"}, admin_only=False)
def test_verify_user_not_admin():
with pytest.raises(Exception):
verify_user(
{"sub": "3e53486c-cf57-477e-ba2a-cb02dc828e1a", "role": "user"},
admin_only=True,
)
def test_verify_user_with_admin_role():
user = verify_user(
{"sub": "3e53486c-cf57-477e-ba2a-cb02dc828e1a", "role": "admin"},
admin_only=True,
)
assert user.user_id == "3e53486c-cf57-477e-ba2a-cb02dc828e1a"
assert user.role == "admin"
def test_verify_user_with_user_role():
user = verify_user(
{"sub": "3e53486c-cf57-477e-ba2a-cb02dc828e1a", "role": "user"},
admin_only=False,
)
assert user.user_id == "3e53486c-cf57-477e-ba2a-cb02dc828e1a"
assert user.role == "user"
def test_requires_user():
user = requires_user(
{"sub": "3e53486c-cf57-477e-ba2a-cb02dc828e1a", "role": "user"}
)
assert user.user_id == "3e53486c-cf57-477e-ba2a-cb02dc828e1a"
assert user.role == "user"
def test_requires_user_no_user_id():
with pytest.raises(Exception):
requires_user({"role": "user"})
def test_requires_admin_user():
user = requires_admin_user(
{"sub": "3e53486c-cf57-477e-ba2a-cb02dc828e1a", "role": "admin"}
)
assert user.user_id == "3e53486c-cf57-477e-ba2a-cb02dc828e1a"
assert user.role == "admin"
def test_requires_admin_user_not_admin():
with pytest.raises(Exception):
requires_admin_user(
{"sub": "3e53486c-cf57-477e-ba2a-cb02dc828e1a", "role": "user"}
)

View File

@@ -1,64 +0,0 @@
from fastapi import FastAPI
from .jwt_utils import bearer_jwt_auth
def add_auth_responses_to_openapi(app: FastAPI) -> None:
"""
Patch a FastAPI instance's `openapi()` method to add 401 responses
to all authenticated endpoints.
This is needed when using HTTPBearer with auto_error=False to get proper
401 responses instead of 403, but FastAPI only automatically adds security
responses when auto_error=True.
"""
# Wrap current method to allow stacking OpenAPI schema modifiers like this
wrapped_openapi = app.openapi
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = wrapped_openapi()
# Add 401 response to all endpoints that have security requirements
for path, methods in openapi_schema["paths"].items():
for method, details in methods.items():
security_schemas = [
schema
for auth_option in details.get("security", [])
for schema in auth_option.keys()
]
if bearer_jwt_auth.scheme_name not in security_schemas:
continue
if "responses" not in details:
details["responses"] = {}
details["responses"]["401"] = {
"$ref": "#/components/responses/HTTP401NotAuthenticatedError"
}
# Ensure #/components/responses exists
if "components" not in openapi_schema:
openapi_schema["components"] = {}
if "responses" not in openapi_schema["components"]:
openapi_schema["components"]["responses"] = {}
# Define 401 response
openapi_schema["components"]["responses"]["HTTP401NotAuthenticatedError"] = {
"description": "Authentication required",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {"detail": {"type": "string"}},
}
}
},
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi

Some files were not shown because too many files have changed in this diff Show More