Compare commits

..

7 Commits

Author SHA1 Message Date
Swifty
a86d750cf5 Merge branch 'fix/integrations-credential-type' into swiftyos/dev 2025-12-04 16:14:51 +01:00
Swifty
13bd648731 Merge branch 'swiftyos/vector-search' into swiftyos/dev 2025-12-04 16:14:47 +01:00
Swifty
3d7ee7cc29 Merge branch 'swiftyos/add-default-agents' into swiftyos/dev 2025-12-04 16:14:44 +01:00
Swifty
1ea52934cd add store agents for seeding test databases 2025-12-04 16:07:58 +01:00
Swifty
7b6db6e260 add vector search 2025-12-04 16:05:47 +01:00
Swifty
2c9563353e formatting 2025-12-04 09:35:53 +01:00
Swifty
fb2a70e2d8 pass credential type 2025-12-04 09:21:12 +01:00
997 changed files with 23077 additions and 81921 deletions

View File

@@ -1,37 +0,0 @@
{
"worktreeCopyPatterns": [
".env*",
".vscode/**",
".auth/**",
".claude/**",
"autogpt_platform/.env*",
"autogpt_platform/backend/.env*",
"autogpt_platform/frontend/.env*",
"autogpt_platform/frontend/.auth/**",
"autogpt_platform/db/docker/.env*"
],
"worktreeCopyIgnores": [
"**/node_modules/**",
"**/dist/**",
"**/.git/**",
"**/Thumbs.db",
"**/.DS_Store",
"**/.next/**",
"**/__pycache__/**",
"**/.ruff_cache/**",
"**/.pytest_cache/**",
"**/*.pyc",
"**/playwright-report/**",
"**/logs/**",
"**/site/**"
],
"worktreePathTemplate": "$BASE_PATH.worktree",
"postCreateCmd": [
"cd autogpt_platform/autogpt_libs && poetry install",
"cd autogpt_platform/backend && poetry install && poetry run prisma generate",
"cd autogpt_platform/frontend && pnpm install",
"cd docs && pip install -r requirements.txt"
],
"terminalCommand": "code .",
"deleteBranchWithWorktree": false
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,125 +0,0 @@
---
name: vercel-react-best-practices
description: React and Next.js performance optimization guidelines from Vercel Engineering. This skill should be used when writing, reviewing, or refactoring React/Next.js code to ensure optimal performance patterns. Triggers on tasks involving React components, Next.js pages, data fetching, bundle optimization, or performance improvements.
license: MIT
metadata:
author: vercel
version: "1.0.0"
---
# Vercel React Best Practices
Comprehensive performance optimization guide for React and Next.js applications, maintained by Vercel. Contains 45 rules across 8 categories, prioritized by impact to guide automated refactoring and code generation.
## When to Apply
Reference these guidelines when:
- Writing new React components or Next.js pages
- Implementing data fetching (client or server-side)
- Reviewing code for performance issues
- Refactoring existing React/Next.js code
- Optimizing bundle size or load times
## Rule Categories by Priority
| Priority | Category | Impact | Prefix |
|----------|----------|--------|--------|
| 1 | Eliminating Waterfalls | CRITICAL | `async-` |
| 2 | Bundle Size Optimization | CRITICAL | `bundle-` |
| 3 | Server-Side Performance | HIGH | `server-` |
| 4 | Client-Side Data Fetching | MEDIUM-HIGH | `client-` |
| 5 | Re-render Optimization | MEDIUM | `rerender-` |
| 6 | Rendering Performance | MEDIUM | `rendering-` |
| 7 | JavaScript Performance | LOW-MEDIUM | `js-` |
| 8 | Advanced Patterns | LOW | `advanced-` |
## Quick Reference
### 1. Eliminating Waterfalls (CRITICAL)
- `async-defer-await` - Move await into branches where actually used
- `async-parallel` - Use Promise.all() for independent operations
- `async-dependencies` - Use better-all for partial dependencies
- `async-api-routes` - Start promises early, await late in API routes
- `async-suspense-boundaries` - Use Suspense to stream content
### 2. Bundle Size Optimization (CRITICAL)
- `bundle-barrel-imports` - Import directly, avoid barrel files
- `bundle-dynamic-imports` - Use next/dynamic for heavy components
- `bundle-defer-third-party` - Load analytics/logging after hydration
- `bundle-conditional` - Load modules only when feature is activated
- `bundle-preload` - Preload on hover/focus for perceived speed
### 3. Server-Side Performance (HIGH)
- `server-cache-react` - Use React.cache() for per-request deduplication
- `server-cache-lru` - Use LRU cache for cross-request caching
- `server-serialization` - Minimize data passed to client components
- `server-parallel-fetching` - Restructure components to parallelize fetches
- `server-after-nonblocking` - Use after() for non-blocking operations
### 4. Client-Side Data Fetching (MEDIUM-HIGH)
- `client-swr-dedup` - Use SWR for automatic request deduplication
- `client-event-listeners` - Deduplicate global event listeners
### 5. Re-render Optimization (MEDIUM)
- `rerender-defer-reads` - Don't subscribe to state only used in callbacks
- `rerender-memo` - Extract expensive work into memoized components
- `rerender-dependencies` - Use primitive dependencies in effects
- `rerender-derived-state` - Subscribe to derived booleans, not raw values
- `rerender-functional-setstate` - Use functional setState for stable callbacks
- `rerender-lazy-state-init` - Pass function to useState for expensive values
- `rerender-transitions` - Use startTransition for non-urgent updates
### 6. Rendering Performance (MEDIUM)
- `rendering-animate-svg-wrapper` - Animate div wrapper, not SVG element
- `rendering-content-visibility` - Use content-visibility for long lists
- `rendering-hoist-jsx` - Extract static JSX outside components
- `rendering-svg-precision` - Reduce SVG coordinate precision
- `rendering-hydration-no-flicker` - Use inline script for client-only data
- `rendering-activity` - Use Activity component for show/hide
- `rendering-conditional-render` - Use ternary, not && for conditionals
### 7. JavaScript Performance (LOW-MEDIUM)
- `js-batch-dom-css` - Group CSS changes via classes or cssText
- `js-index-maps` - Build Map for repeated lookups
- `js-cache-property-access` - Cache object properties in loops
- `js-cache-function-results` - Cache function results in module-level Map
- `js-cache-storage` - Cache localStorage/sessionStorage reads
- `js-combine-iterations` - Combine multiple filter/map into one loop
- `js-length-check-first` - Check array length before expensive comparison
- `js-early-exit` - Return early from functions
- `js-hoist-regexp` - Hoist RegExp creation outside loops
- `js-min-max-loop` - Use loop for min/max instead of sort
- `js-set-map-lookups` - Use Set/Map for O(1) lookups
- `js-tosorted-immutable` - Use toSorted() for immutability
### 8. Advanced Patterns (LOW)
- `advanced-event-handler-refs` - Store event handlers in refs
- `advanced-use-latest` - useLatest for stable callback refs
## How to Use
Read individual rule files for detailed explanations and code examples:
```
rules/async-parallel.md
rules/bundle-barrel-imports.md
rules/_sections.md
```
Each rule file contains:
- Brief explanation of why it matters
- Incorrect code example with explanation
- Correct code example with explanation
- Additional context and references
## Full Compiled Document
For the complete guide with all rules expanded: `AGENTS.md`

View File

@@ -1,55 +0,0 @@
---
title: Store Event Handlers in Refs
impact: LOW
impactDescription: stable subscriptions
tags: advanced, hooks, refs, event-handlers, optimization
---
## Store Event Handlers in Refs
Store callbacks in refs when used in effects that shouldn't re-subscribe on callback changes.
**Incorrect (re-subscribes on every render):**
```tsx
function useWindowEvent(event: string, handler: () => void) {
useEffect(() => {
window.addEventListener(event, handler)
return () => window.removeEventListener(event, handler)
}, [event, handler])
}
```
**Correct (stable subscription):**
```tsx
function useWindowEvent(event: string, handler: () => void) {
const handlerRef = useRef(handler)
useEffect(() => {
handlerRef.current = handler
}, [handler])
useEffect(() => {
const listener = () => handlerRef.current()
window.addEventListener(event, listener)
return () => window.removeEventListener(event, listener)
}, [event])
}
```
**Alternative: use `useEffectEvent` if you're on latest React:**
```tsx
import { useEffectEvent } from 'react'
function useWindowEvent(event: string, handler: () => void) {
const onEvent = useEffectEvent(handler)
useEffect(() => {
window.addEventListener(event, onEvent)
return () => window.removeEventListener(event, onEvent)
}, [event])
}
```
`useEffectEvent` provides a cleaner API for the same pattern: it creates a stable function reference that always calls the latest version of the handler.

View File

@@ -1,49 +0,0 @@
---
title: useLatest for Stable Callback Refs
impact: LOW
impactDescription: prevents effect re-runs
tags: advanced, hooks, useLatest, refs, optimization
---
## useLatest for Stable Callback Refs
Access latest values in callbacks without adding them to dependency arrays. Prevents effect re-runs while avoiding stale closures.
**Implementation:**
```typescript
function useLatest<T>(value: T) {
const ref = useRef(value)
useEffect(() => {
ref.current = value
}, [value])
return ref
}
```
**Incorrect (effect re-runs on every callback change):**
```tsx
function SearchInput({ onSearch }: { onSearch: (q: string) => void }) {
const [query, setQuery] = useState('')
useEffect(() => {
const timeout = setTimeout(() => onSearch(query), 300)
return () => clearTimeout(timeout)
}, [query, onSearch])
}
```
**Correct (stable effect, fresh callback):**
```tsx
function SearchInput({ onSearch }: { onSearch: (q: string) => void }) {
const [query, setQuery] = useState('')
const onSearchRef = useLatest(onSearch)
useEffect(() => {
const timeout = setTimeout(() => onSearchRef.current(query), 300)
return () => clearTimeout(timeout)
}, [query])
}
```

View File

@@ -1,38 +0,0 @@
---
title: Prevent Waterfall Chains in API Routes
impact: CRITICAL
impactDescription: 2-10× improvement
tags: api-routes, server-actions, waterfalls, parallelization
---
## Prevent Waterfall Chains in API Routes
In API routes and Server Actions, start independent operations immediately, even if you don't await them yet.
**Incorrect (config waits for auth, data waits for both):**
```typescript
export async function GET(request: Request) {
const session = await auth()
const config = await fetchConfig()
const data = await fetchData(session.user.id)
return Response.json({ data, config })
}
```
**Correct (auth and config start immediately):**
```typescript
export async function GET(request: Request) {
const sessionPromise = auth()
const configPromise = fetchConfig()
const session = await sessionPromise
const [config, data] = await Promise.all([
configPromise,
fetchData(session.user.id)
])
return Response.json({ data, config })
}
```
For operations with more complex dependency chains, use `better-all` to automatically maximize parallelism (see Dependency-Based Parallelization).

View File

@@ -1,80 +0,0 @@
---
title: Defer Await Until Needed
impact: HIGH
impactDescription: avoids blocking unused code paths
tags: async, await, conditional, optimization
---
## Defer Await Until Needed
Move `await` operations into the branches where they're actually used to avoid blocking code paths that don't need them.
**Incorrect (blocks both branches):**
```typescript
async function handleRequest(userId: string, skipProcessing: boolean) {
const userData = await fetchUserData(userId)
if (skipProcessing) {
// Returns immediately but still waited for userData
return { skipped: true }
}
// Only this branch uses userData
return processUserData(userData)
}
```
**Correct (only blocks when needed):**
```typescript
async function handleRequest(userId: string, skipProcessing: boolean) {
if (skipProcessing) {
// Returns immediately without waiting
return { skipped: true }
}
// Fetch only when needed
const userData = await fetchUserData(userId)
return processUserData(userData)
}
```
**Another example (early return optimization):**
```typescript
// Incorrect: always fetches permissions
async function updateResource(resourceId: string, userId: string) {
const permissions = await fetchPermissions(userId)
const resource = await getResource(resourceId)
if (!resource) {
return { error: 'Not found' }
}
if (!permissions.canEdit) {
return { error: 'Forbidden' }
}
return await updateResourceData(resource, permissions)
}
// Correct: fetches only when needed
async function updateResource(resourceId: string, userId: string) {
const resource = await getResource(resourceId)
if (!resource) {
return { error: 'Not found' }
}
const permissions = await fetchPermissions(userId)
if (!permissions.canEdit) {
return { error: 'Forbidden' }
}
return await updateResourceData(resource, permissions)
}
```
This optimization is especially valuable when the skipped branch is frequently taken, or when the deferred operation is expensive.

View File

@@ -1,36 +0,0 @@
---
title: Dependency-Based Parallelization
impact: CRITICAL
impactDescription: 2-10× improvement
tags: async, parallelization, dependencies, better-all
---
## Dependency-Based Parallelization
For operations with partial dependencies, use `better-all` to maximize parallelism. It automatically starts each task at the earliest possible moment.
**Incorrect (profile waits for config unnecessarily):**
```typescript
const [user, config] = await Promise.all([
fetchUser(),
fetchConfig()
])
const profile = await fetchProfile(user.id)
```
**Correct (config and profile run in parallel):**
```typescript
import { all } from 'better-all'
const { user, config, profile } = await all({
async user() { return fetchUser() },
async config() { return fetchConfig() },
async profile() {
return fetchProfile((await this.$.user).id)
}
})
```
Reference: [https://github.com/shuding/better-all](https://github.com/shuding/better-all)

View File

@@ -1,28 +0,0 @@
---
title: Promise.all() for Independent Operations
impact: CRITICAL
impactDescription: 2-10× improvement
tags: async, parallelization, promises, waterfalls
---
## Promise.all() for Independent Operations
When async operations have no interdependencies, execute them concurrently using `Promise.all()`.
**Incorrect (sequential execution, 3 round trips):**
```typescript
const user = await fetchUser()
const posts = await fetchPosts()
const comments = await fetchComments()
```
**Correct (parallel execution, 1 round trip):**
```typescript
const [user, posts, comments] = await Promise.all([
fetchUser(),
fetchPosts(),
fetchComments()
])
```

View File

@@ -1,99 +0,0 @@
---
title: Strategic Suspense Boundaries
impact: HIGH
impactDescription: faster initial paint
tags: async, suspense, streaming, layout-shift
---
## Strategic Suspense Boundaries
Instead of awaiting data in async components before returning JSX, use Suspense boundaries to show the wrapper UI faster while data loads.
**Incorrect (wrapper blocked by data fetching):**
```tsx
async function Page() {
const data = await fetchData() // Blocks entire page
return (
<div>
<div>Sidebar</div>
<div>Header</div>
<div>
<DataDisplay data={data} />
</div>
<div>Footer</div>
</div>
)
}
```
The entire layout waits for data even though only the middle section needs it.
**Correct (wrapper shows immediately, data streams in):**
```tsx
function Page() {
return (
<div>
<div>Sidebar</div>
<div>Header</div>
<div>
<Suspense fallback={<Skeleton />}>
<DataDisplay />
</Suspense>
</div>
<div>Footer</div>
</div>
)
}
async function DataDisplay() {
const data = await fetchData() // Only blocks this component
return <div>{data.content}</div>
}
```
Sidebar, Header, and Footer render immediately. Only DataDisplay waits for data.
**Alternative (share promise across components):**
```tsx
function Page() {
// Start fetch immediately, but don't await
const dataPromise = fetchData()
return (
<div>
<div>Sidebar</div>
<div>Header</div>
<Suspense fallback={<Skeleton />}>
<DataDisplay dataPromise={dataPromise} />
<DataSummary dataPromise={dataPromise} />
</Suspense>
<div>Footer</div>
</div>
)
}
function DataDisplay({ dataPromise }: { dataPromise: Promise<Data> }) {
const data = use(dataPromise) // Unwraps the promise
return <div>{data.content}</div>
}
function DataSummary({ dataPromise }: { dataPromise: Promise<Data> }) {
const data = use(dataPromise) // Reuses the same promise
return <div>{data.summary}</div>
}
```
Both components share the same promise, so only one fetch occurs. Layout renders immediately while both components wait together.
**When NOT to use this pattern:**
- Critical data needed for layout decisions (affects positioning)
- SEO-critical content above the fold
- Small, fast queries where suspense overhead isn't worth it
- When you want to avoid layout shift (loading → content jump)
**Trade-off:** Faster initial paint vs potential layout shift. Choose based on your UX priorities.

View File

@@ -1,59 +0,0 @@
---
title: Avoid Barrel File Imports
impact: CRITICAL
impactDescription: 200-800ms import cost, slow builds
tags: bundle, imports, tree-shaking, barrel-files, performance
---
## Avoid Barrel File Imports
Import directly from source files instead of barrel files to avoid loading thousands of unused modules. **Barrel files** are entry points that re-export multiple modules (e.g., `index.js` that does `export * from './module'`).
Popular icon and component libraries can have **up to 10,000 re-exports** in their entry file. For many React packages, **it takes 200-800ms just to import them**, affecting both development speed and production cold starts.
**Why tree-shaking doesn't help:** When a library is marked as external (not bundled), the bundler can't optimize it. If you bundle it to enable tree-shaking, builds become substantially slower analyzing the entire module graph.
**Incorrect (imports entire library):**
```tsx
import { Check, X, Menu } from 'lucide-react'
// Loads 1,583 modules, takes ~2.8s extra in dev
// Runtime cost: 200-800ms on every cold start
import { Button, TextField } from '@mui/material'
// Loads 2,225 modules, takes ~4.2s extra in dev
```
**Correct (imports only what you need):**
```tsx
import Check from 'lucide-react/dist/esm/icons/check'
import X from 'lucide-react/dist/esm/icons/x'
import Menu from 'lucide-react/dist/esm/icons/menu'
// Loads only 3 modules (~2KB vs ~1MB)
import Button from '@mui/material/Button'
import TextField from '@mui/material/TextField'
// Loads only what you use
```
**Alternative (Next.js 13.5+):**
```js
// next.config.js - use optimizePackageImports
module.exports = {
experimental: {
optimizePackageImports: ['lucide-react', '@mui/material']
}
}
// Then you can keep the ergonomic barrel imports:
import { Check, X, Menu } from 'lucide-react'
// Automatically transformed to direct imports at build time
```
Direct imports provide 15-70% faster dev boot, 28% faster builds, 40% faster cold starts, and significantly faster HMR.
Libraries commonly affected: `lucide-react`, `@mui/material`, `@mui/icons-material`, `@tabler/icons-react`, `react-icons`, `@headlessui/react`, `@radix-ui/react-*`, `lodash`, `ramda`, `date-fns`, `rxjs`, `react-use`.
Reference: [How we optimized package imports in Next.js](https://vercel.com/blog/how-we-optimized-package-imports-in-next-js)

View File

@@ -1,31 +0,0 @@
---
title: Conditional Module Loading
impact: HIGH
impactDescription: loads large data only when needed
tags: bundle, conditional-loading, lazy-loading
---
## Conditional Module Loading
Load large data or modules only when a feature is activated.
**Example (lazy-load animation frames):**
```tsx
function AnimationPlayer({ enabled }: { enabled: boolean }) {
const [frames, setFrames] = useState<Frame[] | null>(null)
useEffect(() => {
if (enabled && !frames && typeof window !== 'undefined') {
import('./animation-frames.js')
.then(mod => setFrames(mod.frames))
.catch(() => setEnabled(false))
}
}, [enabled, frames])
if (!frames) return <Skeleton />
return <Canvas frames={frames} />
}
```
The `typeof window !== 'undefined'` check prevents bundling this module for SSR, optimizing server bundle size and build speed.

View File

@@ -1,49 +0,0 @@
---
title: Defer Non-Critical Third-Party Libraries
impact: MEDIUM
impactDescription: loads after hydration
tags: bundle, third-party, analytics, defer
---
## Defer Non-Critical Third-Party Libraries
Analytics, logging, and error tracking don't block user interaction. Load them after hydration.
**Incorrect (blocks initial bundle):**
```tsx
import { Analytics } from '@vercel/analytics/react'
export default function RootLayout({ children }) {
return (
<html>
<body>
{children}
<Analytics />
</body>
</html>
)
}
```
**Correct (loads after hydration):**
```tsx
import dynamic from 'next/dynamic'
const Analytics = dynamic(
() => import('@vercel/analytics/react').then(m => m.Analytics),
{ ssr: false }
)
export default function RootLayout({ children }) {
return (
<html>
<body>
{children}
<Analytics />
</body>
</html>
)
}
```

View File

@@ -1,35 +0,0 @@
---
title: Dynamic Imports for Heavy Components
impact: CRITICAL
impactDescription: directly affects TTI and LCP
tags: bundle, dynamic-import, code-splitting, next-dynamic
---
## Dynamic Imports for Heavy Components
Use `next/dynamic` to lazy-load large components not needed on initial render.
**Incorrect (Monaco bundles with main chunk ~300KB):**
```tsx
import { MonacoEditor } from './monaco-editor'
function CodePanel({ code }: { code: string }) {
return <MonacoEditor value={code} />
}
```
**Correct (Monaco loads on demand):**
```tsx
import dynamic from 'next/dynamic'
const MonacoEditor = dynamic(
() => import('./monaco-editor').then(m => m.MonacoEditor),
{ ssr: false }
)
function CodePanel({ code }: { code: string }) {
return <MonacoEditor value={code} />
}
```

View File

@@ -1,50 +0,0 @@
---
title: Preload Based on User Intent
impact: MEDIUM
impactDescription: reduces perceived latency
tags: bundle, preload, user-intent, hover
---
## Preload Based on User Intent
Preload heavy bundles before they're needed to reduce perceived latency.
**Example (preload on hover/focus):**
```tsx
function EditorButton({ onClick }: { onClick: () => void }) {
const preload = () => {
if (typeof window !== 'undefined') {
void import('./monaco-editor')
}
}
return (
<button
onMouseEnter={preload}
onFocus={preload}
onClick={onClick}
>
Open Editor
</button>
)
}
```
**Example (preload when feature flag is enabled):**
```tsx
function FlagsProvider({ children, flags }: Props) {
useEffect(() => {
if (flags.editorEnabled && typeof window !== 'undefined') {
void import('./monaco-editor').then(mod => mod.init())
}
}, [flags.editorEnabled])
return <FlagsContext.Provider value={flags}>
{children}
</FlagsContext.Provider>
}
```
The `typeof window !== 'undefined'` check prevents bundling preloaded modules for SSR, optimizing server bundle size and build speed.

View File

@@ -1,74 +0,0 @@
---
title: Deduplicate Global Event Listeners
impact: LOW
impactDescription: single listener for N components
tags: client, swr, event-listeners, subscription
---
## Deduplicate Global Event Listeners
Use `useSWRSubscription()` to share global event listeners across component instances.
**Incorrect (N instances = N listeners):**
```tsx
function useKeyboardShortcut(key: string, callback: () => void) {
useEffect(() => {
const handler = (e: KeyboardEvent) => {
if (e.metaKey && e.key === key) {
callback()
}
}
window.addEventListener('keydown', handler)
return () => window.removeEventListener('keydown', handler)
}, [key, callback])
}
```
When using the `useKeyboardShortcut` hook multiple times, each instance will register a new listener.
**Correct (N instances = 1 listener):**
```tsx
import useSWRSubscription from 'swr/subscription'
// Module-level Map to track callbacks per key
const keyCallbacks = new Map<string, Set<() => void>>()
function useKeyboardShortcut(key: string, callback: () => void) {
// Register this callback in the Map
useEffect(() => {
if (!keyCallbacks.has(key)) {
keyCallbacks.set(key, new Set())
}
keyCallbacks.get(key)!.add(callback)
return () => {
const set = keyCallbacks.get(key)
if (set) {
set.delete(callback)
if (set.size === 0) {
keyCallbacks.delete(key)
}
}
}
}, [key, callback])
useSWRSubscription('global-keydown', () => {
const handler = (e: KeyboardEvent) => {
if (e.metaKey && keyCallbacks.has(e.key)) {
keyCallbacks.get(e.key)!.forEach(cb => cb())
}
}
window.addEventListener('keydown', handler)
return () => window.removeEventListener('keydown', handler)
})
}
function Profile() {
// Multiple shortcuts will share the same listener
useKeyboardShortcut('p', () => { /* ... */ })
useKeyboardShortcut('k', () => { /* ... */ })
// ...
}
```

View File

@@ -1,56 +0,0 @@
---
title: Use SWR for Automatic Deduplication
impact: MEDIUM-HIGH
impactDescription: automatic deduplication
tags: client, swr, deduplication, data-fetching
---
## Use SWR for Automatic Deduplication
SWR enables request deduplication, caching, and revalidation across component instances.
**Incorrect (no deduplication, each instance fetches):**
```tsx
function UserList() {
const [users, setUsers] = useState([])
useEffect(() => {
fetch('/api/users')
.then(r => r.json())
.then(setUsers)
}, [])
}
```
**Correct (multiple instances share one request):**
```tsx
import useSWR from 'swr'
function UserList() {
const { data: users } = useSWR('/api/users', fetcher)
}
```
**For immutable data:**
```tsx
import { useImmutableSWR } from '@/lib/swr'
function StaticContent() {
const { data } = useImmutableSWR('/api/config', fetcher)
}
```
**For mutations:**
```tsx
import { useSWRMutation } from 'swr/mutation'
function UpdateButton() {
const { trigger } = useSWRMutation('/api/user', updateUser)
return <button onClick={() => trigger()}>Update</button>
}
```
Reference: [https://swr.vercel.app](https://swr.vercel.app)

View File

@@ -1,82 +0,0 @@
---
title: Batch DOM CSS Changes
impact: MEDIUM
impactDescription: reduces reflows/repaints
tags: javascript, dom, css, performance, reflow
---
## Batch DOM CSS Changes
Avoid changing styles one property at a time. Group multiple CSS changes together via classes or `cssText` to minimize browser reflows.
**Incorrect (multiple reflows):**
```typescript
function updateElementStyles(element: HTMLElement) {
// Each line triggers a reflow
element.style.width = '100px'
element.style.height = '200px'
element.style.backgroundColor = 'blue'
element.style.border = '1px solid black'
}
```
**Correct (add class - single reflow):**
```typescript
// CSS file
.highlighted-box {
width: 100px;
height: 200px;
background-color: blue;
border: 1px solid black;
}
// JavaScript
function updateElementStyles(element: HTMLElement) {
element.classList.add('highlighted-box')
}
```
**Correct (change cssText - single reflow):**
```typescript
function updateElementStyles(element: HTMLElement) {
element.style.cssText = `
width: 100px;
height: 200px;
background-color: blue;
border: 1px solid black;
`
}
```
**React example:**
```tsx
// Incorrect: changing styles one by one
function Box({ isHighlighted }: { isHighlighted: boolean }) {
const ref = useRef<HTMLDivElement>(null)
useEffect(() => {
if (ref.current && isHighlighted) {
ref.current.style.width = '100px'
ref.current.style.height = '200px'
ref.current.style.backgroundColor = 'blue'
}
}, [isHighlighted])
return <div ref={ref}>Content</div>
}
// Correct: toggle class
function Box({ isHighlighted }: { isHighlighted: boolean }) {
return (
<div className={isHighlighted ? 'highlighted-box' : ''}>
Content
</div>
)
}
```
Prefer CSS classes over inline styles when possible. Classes are cached by the browser and provide better separation of concerns.

View File

@@ -1,80 +0,0 @@
---
title: Cache Repeated Function Calls
impact: MEDIUM
impactDescription: avoid redundant computation
tags: javascript, cache, memoization, performance
---
## Cache Repeated Function Calls
Use a module-level Map to cache function results when the same function is called repeatedly with the same inputs during render.
**Incorrect (redundant computation):**
```typescript
function ProjectList({ projects }: { projects: Project[] }) {
return (
<div>
{projects.map(project => {
// slugify() called 100+ times for same project names
const slug = slugify(project.name)
return <ProjectCard key={project.id} slug={slug} />
})}
</div>
)
}
```
**Correct (cached results):**
```typescript
// Module-level cache
const slugifyCache = new Map<string, string>()
function cachedSlugify(text: string): string {
if (slugifyCache.has(text)) {
return slugifyCache.get(text)!
}
const result = slugify(text)
slugifyCache.set(text, result)
return result
}
function ProjectList({ projects }: { projects: Project[] }) {
return (
<div>
{projects.map(project => {
// Computed only once per unique project name
const slug = cachedSlugify(project.name)
return <ProjectCard key={project.id} slug={slug} />
})}
</div>
)
}
```
**Simpler pattern for single-value functions:**
```typescript
let isLoggedInCache: boolean | null = null
function isLoggedIn(): boolean {
if (isLoggedInCache !== null) {
return isLoggedInCache
}
isLoggedInCache = document.cookie.includes('auth=')
return isLoggedInCache
}
// Clear cache when auth changes
function onAuthChange() {
isLoggedInCache = null
}
```
Use a Map (not a hook) so it works everywhere: utilities, event handlers, not just React components.
Reference: [How we made the Vercel Dashboard twice as fast](https://vercel.com/blog/how-we-made-the-vercel-dashboard-twice-as-fast)

View File

@@ -1,28 +0,0 @@
---
title: Cache Property Access in Loops
impact: LOW-MEDIUM
impactDescription: reduces lookups
tags: javascript, loops, optimization, caching
---
## Cache Property Access in Loops
Cache object property lookups in hot paths.
**Incorrect (3 lookups × N iterations):**
```typescript
for (let i = 0; i < arr.length; i++) {
process(obj.config.settings.value)
}
```
**Correct (1 lookup total):**
```typescript
const value = obj.config.settings.value
const len = arr.length
for (let i = 0; i < len; i++) {
process(value)
}
```

View File

@@ -1,70 +0,0 @@
---
title: Cache Storage API Calls
impact: LOW-MEDIUM
impactDescription: reduces expensive I/O
tags: javascript, localStorage, storage, caching, performance
---
## Cache Storage API Calls
`localStorage`, `sessionStorage`, and `document.cookie` are synchronous and expensive. Cache reads in memory.
**Incorrect (reads storage on every call):**
```typescript
function getTheme() {
return localStorage.getItem('theme') ?? 'light'
}
// Called 10 times = 10 storage reads
```
**Correct (Map cache):**
```typescript
const storageCache = new Map<string, string | null>()
function getLocalStorage(key: string) {
if (!storageCache.has(key)) {
storageCache.set(key, localStorage.getItem(key))
}
return storageCache.get(key)
}
function setLocalStorage(key: string, value: string) {
localStorage.setItem(key, value)
storageCache.set(key, value) // keep cache in sync
}
```
Use a Map (not a hook) so it works everywhere: utilities, event handlers, not just React components.
**Cookie caching:**
```typescript
let cookieCache: Record<string, string> | null = null
function getCookie(name: string) {
if (!cookieCache) {
cookieCache = Object.fromEntries(
document.cookie.split('; ').map(c => c.split('='))
)
}
return cookieCache[name]
}
```
**Important (invalidate on external changes):**
If storage can change externally (another tab, server-set cookies), invalidate cache:
```typescript
window.addEventListener('storage', (e) => {
if (e.key) storageCache.delete(e.key)
})
document.addEventListener('visibilitychange', () => {
if (document.visibilityState === 'visible') {
storageCache.clear()
}
})
```

View File

@@ -1,32 +0,0 @@
---
title: Combine Multiple Array Iterations
impact: LOW-MEDIUM
impactDescription: reduces iterations
tags: javascript, arrays, loops, performance
---
## Combine Multiple Array Iterations
Multiple `.filter()` or `.map()` calls iterate the array multiple times. Combine into one loop.
**Incorrect (3 iterations):**
```typescript
const admins = users.filter(u => u.isAdmin)
const testers = users.filter(u => u.isTester)
const inactive = users.filter(u => !u.isActive)
```
**Correct (1 iteration):**
```typescript
const admins: User[] = []
const testers: User[] = []
const inactive: User[] = []
for (const user of users) {
if (user.isAdmin) admins.push(user)
if (user.isTester) testers.push(user)
if (!user.isActive) inactive.push(user)
}
```

View File

@@ -1,50 +0,0 @@
---
title: Early Return from Functions
impact: LOW-MEDIUM
impactDescription: avoids unnecessary computation
tags: javascript, functions, optimization, early-return
---
## Early Return from Functions
Return early when result is determined to skip unnecessary processing.
**Incorrect (processes all items even after finding answer):**
```typescript
function validateUsers(users: User[]) {
let hasError = false
let errorMessage = ''
for (const user of users) {
if (!user.email) {
hasError = true
errorMessage = 'Email required'
}
if (!user.name) {
hasError = true
errorMessage = 'Name required'
}
// Continues checking all users even after error found
}
return hasError ? { valid: false, error: errorMessage } : { valid: true }
}
```
**Correct (returns immediately on first error):**
```typescript
function validateUsers(users: User[]) {
for (const user of users) {
if (!user.email) {
return { valid: false, error: 'Email required' }
}
if (!user.name) {
return { valid: false, error: 'Name required' }
}
}
return { valid: true }
}
```

View File

@@ -1,45 +0,0 @@
---
title: Hoist RegExp Creation
impact: LOW-MEDIUM
impactDescription: avoids recreation
tags: javascript, regexp, optimization, memoization
---
## Hoist RegExp Creation
Don't create RegExp inside render. Hoist to module scope or memoize with `useMemo()`.
**Incorrect (new RegExp every render):**
```tsx
function Highlighter({ text, query }: Props) {
const regex = new RegExp(`(${query})`, 'gi')
const parts = text.split(regex)
return <>{parts.map((part, i) => ...)}</>
}
```
**Correct (memoize or hoist):**
```tsx
const EMAIL_REGEX = /^[^\s@]+@[^\s@]+\.[^\s@]+$/
function Highlighter({ text, query }: Props) {
const regex = useMemo(
() => new RegExp(`(${escapeRegex(query)})`, 'gi'),
[query]
)
const parts = text.split(regex)
return <>{parts.map((part, i) => ...)}</>
}
```
**Warning (global regex has mutable state):**
Global regex (`/g`) has mutable `lastIndex` state:
```typescript
const regex = /foo/g
regex.test('foo') // true, lastIndex = 3
regex.test('foo') // false, lastIndex = 0
```

View File

@@ -1,37 +0,0 @@
---
title: Build Index Maps for Repeated Lookups
impact: LOW-MEDIUM
impactDescription: 1M ops to 2K ops
tags: javascript, map, indexing, optimization, performance
---
## Build Index Maps for Repeated Lookups
Multiple `.find()` calls by the same key should use a Map.
**Incorrect (O(n) per lookup):**
```typescript
function processOrders(orders: Order[], users: User[]) {
return orders.map(order => ({
...order,
user: users.find(u => u.id === order.userId)
}))
}
```
**Correct (O(1) per lookup):**
```typescript
function processOrders(orders: Order[], users: User[]) {
const userById = new Map(users.map(u => [u.id, u]))
return orders.map(order => ({
...order,
user: userById.get(order.userId)
}))
}
```
Build map once (O(n)), then all lookups are O(1).
For 1000 orders × 1000 users: 1M ops → 2K ops.

View File

@@ -1,49 +0,0 @@
---
title: Early Length Check for Array Comparisons
impact: MEDIUM-HIGH
impactDescription: avoids expensive operations when lengths differ
tags: javascript, arrays, performance, optimization, comparison
---
## Early Length Check for Array Comparisons
When comparing arrays with expensive operations (sorting, deep equality, serialization), check lengths first. If lengths differ, the arrays cannot be equal.
In real-world applications, this optimization is especially valuable when the comparison runs in hot paths (event handlers, render loops).
**Incorrect (always runs expensive comparison):**
```typescript
function hasChanges(current: string[], original: string[]) {
// Always sorts and joins, even when lengths differ
return current.sort().join() !== original.sort().join()
}
```
Two O(n log n) sorts run even when `current.length` is 5 and `original.length` is 100. There is also overhead of joining the arrays and comparing the strings.
**Correct (O(1) length check first):**
```typescript
function hasChanges(current: string[], original: string[]) {
// Early return if lengths differ
if (current.length !== original.length) {
return true
}
// Only sort/join when lengths match
const currentSorted = current.toSorted()
const originalSorted = original.toSorted()
for (let i = 0; i < currentSorted.length; i++) {
if (currentSorted[i] !== originalSorted[i]) {
return true
}
}
return false
}
```
This new approach is more efficient because:
- It avoids the overhead of sorting and joining the arrays when lengths differ
- It avoids consuming memory for the joined strings (especially important for large arrays)
- It avoids mutating the original arrays
- It returns early when a difference is found

View File

@@ -1,82 +0,0 @@
---
title: Use Loop for Min/Max Instead of Sort
impact: LOW
impactDescription: O(n) instead of O(n log n)
tags: javascript, arrays, performance, sorting, algorithms
---
## Use Loop for Min/Max Instead of Sort
Finding the smallest or largest element only requires a single pass through the array. Sorting is wasteful and slower.
**Incorrect (O(n log n) - sort to find latest):**
```typescript
interface Project {
id: string
name: string
updatedAt: number
}
function getLatestProject(projects: Project[]) {
const sorted = [...projects].sort((a, b) => b.updatedAt - a.updatedAt)
return sorted[0]
}
```
Sorts the entire array just to find the maximum value.
**Incorrect (O(n log n) - sort for oldest and newest):**
```typescript
function getOldestAndNewest(projects: Project[]) {
const sorted = [...projects].sort((a, b) => a.updatedAt - b.updatedAt)
return { oldest: sorted[0], newest: sorted[sorted.length - 1] }
}
```
Still sorts unnecessarily when only min/max are needed.
**Correct (O(n) - single loop):**
```typescript
function getLatestProject(projects: Project[]) {
if (projects.length === 0) return null
let latest = projects[0]
for (let i = 1; i < projects.length; i++) {
if (projects[i].updatedAt > latest.updatedAt) {
latest = projects[i]
}
}
return latest
}
function getOldestAndNewest(projects: Project[]) {
if (projects.length === 0) return { oldest: null, newest: null }
let oldest = projects[0]
let newest = projects[0]
for (let i = 1; i < projects.length; i++) {
if (projects[i].updatedAt < oldest.updatedAt) oldest = projects[i]
if (projects[i].updatedAt > newest.updatedAt) newest = projects[i]
}
return { oldest, newest }
}
```
Single pass through the array, no copying, no sorting.
**Alternative (Math.min/Math.max for small arrays):**
```typescript
const numbers = [5, 2, 8, 1, 9]
const min = Math.min(...numbers)
const max = Math.max(...numbers)
```
This works for small arrays but can be slower for very large arrays due to spread operator limitations. Use the loop approach for reliability.

View File

@@ -1,24 +0,0 @@
---
title: Use Set/Map for O(1) Lookups
impact: LOW-MEDIUM
impactDescription: O(n) to O(1)
tags: javascript, set, map, data-structures, performance
---
## Use Set/Map for O(1) Lookups
Convert arrays to Set/Map for repeated membership checks.
**Incorrect (O(n) per check):**
```typescript
const allowedIds = ['a', 'b', 'c', ...]
items.filter(item => allowedIds.includes(item.id))
```
**Correct (O(1) per check):**
```typescript
const allowedIds = new Set(['a', 'b', 'c', ...])
items.filter(item => allowedIds.has(item.id))
```

View File

@@ -1,57 +0,0 @@
---
title: Use toSorted() Instead of sort() for Immutability
impact: MEDIUM-HIGH
impactDescription: prevents mutation bugs in React state
tags: javascript, arrays, immutability, react, state, mutation
---
## Use toSorted() Instead of sort() for Immutability
`.sort()` mutates the array in place, which can cause bugs with React state and props. Use `.toSorted()` to create a new sorted array without mutation.
**Incorrect (mutates original array):**
```typescript
function UserList({ users }: { users: User[] }) {
// Mutates the users prop array!
const sorted = useMemo(
() => users.sort((a, b) => a.name.localeCompare(b.name)),
[users]
)
return <div>{sorted.map(renderUser)}</div>
}
```
**Correct (creates new array):**
```typescript
function UserList({ users }: { users: User[] }) {
// Creates new sorted array, original unchanged
const sorted = useMemo(
() => users.toSorted((a, b) => a.name.localeCompare(b.name)),
[users]
)
return <div>{sorted.map(renderUser)}</div>
}
```
**Why this matters in React:**
1. Props/state mutations break React's immutability model - React expects props and state to be treated as read-only
2. Causes stale closure bugs - Mutating arrays inside closures (callbacks, effects) can lead to unexpected behavior
**Browser support (fallback for older browsers):**
`.toSorted()` is available in all modern browsers (Chrome 110+, Safari 16+, Firefox 115+, Node.js 20+). For older environments, use spread operator:
```typescript
// Fallback for older browsers
const sorted = [...items].sort((a, b) => a.value - b.value)
```
**Other immutable array methods:**
- `.toSorted()` - immutable sort
- `.toReversed()` - immutable reverse
- `.toSpliced()` - immutable splice
- `.with()` - immutable element replacement

View File

@@ -1,26 +0,0 @@
---
title: Use Activity Component for Show/Hide
impact: MEDIUM
impactDescription: preserves state/DOM
tags: rendering, activity, visibility, state-preservation
---
## Use Activity Component for Show/Hide
Use React's `<Activity>` to preserve state/DOM for expensive components that frequently toggle visibility.
**Usage:**
```tsx
import { Activity } from 'react'
function Dropdown({ isOpen }: Props) {
return (
<Activity mode={isOpen ? 'visible' : 'hidden'}>
<ExpensiveMenu />
</Activity>
)
}
```
Avoids expensive re-renders and state loss.

View File

@@ -1,47 +0,0 @@
---
title: Animate SVG Wrapper Instead of SVG Element
impact: LOW
impactDescription: enables hardware acceleration
tags: rendering, svg, css, animation, performance
---
## Animate SVG Wrapper Instead of SVG Element
Many browsers don't have hardware acceleration for CSS3 animations on SVG elements. Wrap SVG in a `<div>` and animate the wrapper instead.
**Incorrect (animating SVG directly - no hardware acceleration):**
```tsx
function LoadingSpinner() {
return (
<svg
className="animate-spin"
width="24"
height="24"
viewBox="0 0 24 24"
>
<circle cx="12" cy="12" r="10" stroke="currentColor" />
</svg>
)
}
```
**Correct (animating wrapper div - hardware accelerated):**
```tsx
function LoadingSpinner() {
return (
<div className="animate-spin">
<svg
width="24"
height="24"
viewBox="0 0 24 24"
>
<circle cx="12" cy="12" r="10" stroke="currentColor" />
</svg>
</div>
)
}
```
This applies to all CSS transforms and transitions (`transform`, `opacity`, `translate`, `scale`, `rotate`). The wrapper div allows browsers to use GPU acceleration for smoother animations.

View File

@@ -1,40 +0,0 @@
---
title: Use Explicit Conditional Rendering
impact: LOW
impactDescription: prevents rendering 0 or NaN
tags: rendering, conditional, jsx, falsy-values
---
## Use Explicit Conditional Rendering
Use explicit ternary operators (`? :`) instead of `&&` for conditional rendering when the condition can be `0`, `NaN`, or other falsy values that render.
**Incorrect (renders "0" when count is 0):**
```tsx
function Badge({ count }: { count: number }) {
return (
<div>
{count && <span className="badge">{count}</span>}
</div>
)
}
// When count = 0, renders: <div>0</div>
// When count = 5, renders: <div><span class="badge">5</span></div>
```
**Correct (renders nothing when count is 0):**
```tsx
function Badge({ count }: { count: number }) {
return (
<div>
{count > 0 ? <span className="badge">{count}</span> : null}
</div>
)
}
// When count = 0, renders: <div></div>
// When count = 5, renders: <div><span class="badge">5</span></div>
```

View File

@@ -1,38 +0,0 @@
---
title: CSS content-visibility for Long Lists
impact: HIGH
impactDescription: faster initial render
tags: rendering, css, content-visibility, long-lists
---
## CSS content-visibility for Long Lists
Apply `content-visibility: auto` to defer off-screen rendering.
**CSS:**
```css
.message-item {
content-visibility: auto;
contain-intrinsic-size: 0 80px;
}
```
**Example:**
```tsx
function MessageList({ messages }: { messages: Message[] }) {
return (
<div className="overflow-y-auto h-screen">
{messages.map(msg => (
<div key={msg.id} className="message-item">
<Avatar user={msg.author} />
<div>{msg.content}</div>
</div>
))}
</div>
)
}
```
For 1000 messages, browser skips layout/paint for ~990 off-screen items (10× faster initial render).

View File

@@ -1,46 +0,0 @@
---
title: Hoist Static JSX Elements
impact: LOW
impactDescription: avoids re-creation
tags: rendering, jsx, static, optimization
---
## Hoist Static JSX Elements
Extract static JSX outside components to avoid re-creation.
**Incorrect (recreates element every render):**
```tsx
function LoadingSkeleton() {
return <div className="animate-pulse h-20 bg-gray-200" />
}
function Container() {
return (
<div>
{loading && <LoadingSkeleton />}
</div>
)
}
```
**Correct (reuses same element):**
```tsx
const loadingSkeleton = (
<div className="animate-pulse h-20 bg-gray-200" />
)
function Container() {
return (
<div>
{loading && loadingSkeleton}
</div>
)
}
```
This is especially helpful for large and static SVG nodes, which can be expensive to recreate on every render.
**Note:** If your project has [React Compiler](https://react.dev/learn/react-compiler) enabled, the compiler automatically hoists static JSX elements and optimizes component re-renders, making manual hoisting unnecessary.

View File

@@ -1,82 +0,0 @@
---
title: Prevent Hydration Mismatch Without Flickering
impact: MEDIUM
impactDescription: avoids visual flicker and hydration errors
tags: rendering, ssr, hydration, localStorage, flicker
---
## Prevent Hydration Mismatch Without Flickering
When rendering content that depends on client-side storage (localStorage, cookies), avoid both SSR breakage and post-hydration flickering by injecting a synchronous script that updates the DOM before React hydrates.
**Incorrect (breaks SSR):**
```tsx
function ThemeWrapper({ children }: { children: ReactNode }) {
// localStorage is not available on server - throws error
const theme = localStorage.getItem('theme') || 'light'
return (
<div className={theme}>
{children}
</div>
)
}
```
Server-side rendering will fail because `localStorage` is undefined.
**Incorrect (visual flickering):**
```tsx
function ThemeWrapper({ children }: { children: ReactNode }) {
const [theme, setTheme] = useState('light')
useEffect(() => {
// Runs after hydration - causes visible flash
const stored = localStorage.getItem('theme')
if (stored) {
setTheme(stored)
}
}, [])
return (
<div className={theme}>
{children}
</div>
)
}
```
Component first renders with default value (`light`), then updates after hydration, causing a visible flash of incorrect content.
**Correct (no flicker, no hydration mismatch):**
```tsx
function ThemeWrapper({ children }: { children: ReactNode }) {
return (
<>
<div id="theme-wrapper">
{children}
</div>
<script
dangerouslySetInnerHTML={{
__html: `
(function() {
try {
var theme = localStorage.getItem('theme') || 'light';
var el = document.getElementById('theme-wrapper');
if (el) el.className = theme;
} catch (e) {}
})();
`,
}}
/>
</>
)
}
```
The inline script executes synchronously before showing the element, ensuring the DOM already has the correct value. No flickering, no hydration mismatch.
This pattern is especially useful for theme toggles, user preferences, authentication states, and any client-only data that should render immediately without flashing default values.

View File

@@ -1,28 +0,0 @@
---
title: Optimize SVG Precision
impact: LOW
impactDescription: reduces file size
tags: rendering, svg, optimization, svgo
---
## Optimize SVG Precision
Reduce SVG coordinate precision to decrease file size. The optimal precision depends on the viewBox size, but in general reducing precision should be considered.
**Incorrect (excessive precision):**
```svg
<path d="M 10.293847 20.847362 L 30.938472 40.192837" />
```
**Correct (1 decimal place):**
```svg
<path d="M 10.3 20.8 L 30.9 40.2" />
```
**Automate with SVGO:**
```bash
npx svgo --precision=1 --multipass icon.svg
```

View File

@@ -1,39 +0,0 @@
---
title: Defer State Reads to Usage Point
impact: MEDIUM
impactDescription: avoids unnecessary subscriptions
tags: rerender, searchParams, localStorage, optimization
---
## Defer State Reads to Usage Point
Don't subscribe to dynamic state (searchParams, localStorage) if you only read it inside callbacks.
**Incorrect (subscribes to all searchParams changes):**
```tsx
function ShareButton({ chatId }: { chatId: string }) {
const searchParams = useSearchParams()
const handleShare = () => {
const ref = searchParams.get('ref')
shareChat(chatId, { ref })
}
return <button onClick={handleShare}>Share</button>
}
```
**Correct (reads on demand, no subscription):**
```tsx
function ShareButton({ chatId }: { chatId: string }) {
const handleShare = () => {
const params = new URLSearchParams(window.location.search)
const ref = params.get('ref')
shareChat(chatId, { ref })
}
return <button onClick={handleShare}>Share</button>
}
```

View File

@@ -1,45 +0,0 @@
---
title: Narrow Effect Dependencies
impact: LOW
impactDescription: minimizes effect re-runs
tags: rerender, useEffect, dependencies, optimization
---
## Narrow Effect Dependencies
Specify primitive dependencies instead of objects to minimize effect re-runs.
**Incorrect (re-runs on any user field change):**
```tsx
useEffect(() => {
console.log(user.id)
}, [user])
```
**Correct (re-runs only when id changes):**
```tsx
useEffect(() => {
console.log(user.id)
}, [user.id])
```
**For derived state, compute outside effect:**
```tsx
// Incorrect: runs on width=767, 766, 765...
useEffect(() => {
if (width < 768) {
enableMobileMode()
}
}, [width])
// Correct: runs only on boolean transition
const isMobile = width < 768
useEffect(() => {
if (isMobile) {
enableMobileMode()
}
}, [isMobile])
```

View File

@@ -1,29 +0,0 @@
---
title: Subscribe to Derived State
impact: MEDIUM
impactDescription: reduces re-render frequency
tags: rerender, derived-state, media-query, optimization
---
## Subscribe to Derived State
Subscribe to derived boolean state instead of continuous values to reduce re-render frequency.
**Incorrect (re-renders on every pixel change):**
```tsx
function Sidebar() {
const width = useWindowWidth() // updates continuously
const isMobile = width < 768
return <nav className={isMobile ? 'mobile' : 'desktop'}>
}
```
**Correct (re-renders only when boolean changes):**
```tsx
function Sidebar() {
const isMobile = useMediaQuery('(max-width: 767px)')
return <nav className={isMobile ? 'mobile' : 'desktop'}>
}
```

View File

@@ -1,74 +0,0 @@
---
title: Use Functional setState Updates
impact: MEDIUM
impactDescription: prevents stale closures and unnecessary callback recreations
tags: react, hooks, useState, useCallback, callbacks, closures
---
## Use Functional setState Updates
When updating state based on the current state value, use the functional update form of setState instead of directly referencing the state variable. This prevents stale closures, eliminates unnecessary dependencies, and creates stable callback references.
**Incorrect (requires state as dependency):**
```tsx
function TodoList() {
const [items, setItems] = useState(initialItems)
// Callback must depend on items, recreated on every items change
const addItems = useCallback((newItems: Item[]) => {
setItems([...items, ...newItems])
}, [items]) // ❌ items dependency causes recreations
// Risk of stale closure if dependency is forgotten
const removeItem = useCallback((id: string) => {
setItems(items.filter(item => item.id !== id))
}, []) // ❌ Missing items dependency - will use stale items!
return <ItemsEditor items={items} onAdd={addItems} onRemove={removeItem} />
}
```
The first callback is recreated every time `items` changes, which can cause child components to re-render unnecessarily. The second callback has a stale closure bug—it will always reference the initial `items` value.
**Correct (stable callbacks, no stale closures):**
```tsx
function TodoList() {
const [items, setItems] = useState(initialItems)
// Stable callback, never recreated
const addItems = useCallback((newItems: Item[]) => {
setItems(curr => [...curr, ...newItems])
}, []) // ✅ No dependencies needed
// Always uses latest state, no stale closure risk
const removeItem = useCallback((id: string) => {
setItems(curr => curr.filter(item => item.id !== id))
}, []) // ✅ Safe and stable
return <ItemsEditor items={items} onAdd={addItems} onRemove={removeItem} />
}
```
**Benefits:**
1. **Stable callback references** - Callbacks don't need to be recreated when state changes
2. **No stale closures** - Always operates on the latest state value
3. **Fewer dependencies** - Simplifies dependency arrays and reduces memory leaks
4. **Prevents bugs** - Eliminates the most common source of React closure bugs
**When to use functional updates:**
- Any setState that depends on the current state value
- Inside useCallback/useMemo when state is needed
- Event handlers that reference state
- Async operations that update state
**When direct updates are fine:**
- Setting state to a static value: `setCount(0)`
- Setting state from props/arguments only: `setName(newName)`
- State doesn't depend on previous value
**Note:** If your project has [React Compiler](https://react.dev/learn/react-compiler) enabled, the compiler can automatically optimize some cases, but functional updates are still recommended for correctness and to prevent stale closure bugs.

View File

@@ -1,58 +0,0 @@
---
title: Use Lazy State Initialization
impact: MEDIUM
impactDescription: wasted computation on every render
tags: react, hooks, useState, performance, initialization
---
## Use Lazy State Initialization
Pass a function to `useState` for expensive initial values. Without the function form, the initializer runs on every render even though the value is only used once.
**Incorrect (runs on every render):**
```tsx
function FilteredList({ items }: { items: Item[] }) {
// buildSearchIndex() runs on EVERY render, even after initialization
const [searchIndex, setSearchIndex] = useState(buildSearchIndex(items))
const [query, setQuery] = useState('')
// When query changes, buildSearchIndex runs again unnecessarily
return <SearchResults index={searchIndex} query={query} />
}
function UserProfile() {
// JSON.parse runs on every render
const [settings, setSettings] = useState(
JSON.parse(localStorage.getItem('settings') || '{}')
)
return <SettingsForm settings={settings} onChange={setSettings} />
}
```
**Correct (runs only once):**
```tsx
function FilteredList({ items }: { items: Item[] }) {
// buildSearchIndex() runs ONLY on initial render
const [searchIndex, setSearchIndex] = useState(() => buildSearchIndex(items))
const [query, setQuery] = useState('')
return <SearchResults index={searchIndex} query={query} />
}
function UserProfile() {
// JSON.parse runs only on initial render
const [settings, setSettings] = useState(() => {
const stored = localStorage.getItem('settings')
return stored ? JSON.parse(stored) : {}
})
return <SettingsForm settings={settings} onChange={setSettings} />
}
```
Use lazy initialization when computing initial values from localStorage/sessionStorage, building data structures (indexes, maps), reading from the DOM, or performing heavy transformations.
For simple primitives (`useState(0)`), direct references (`useState(props.value)`), or cheap literals (`useState({})`), the function form is unnecessary.

View File

@@ -1,44 +0,0 @@
---
title: Extract to Memoized Components
impact: MEDIUM
impactDescription: enables early returns
tags: rerender, memo, useMemo, optimization
---
## Extract to Memoized Components
Extract expensive work into memoized components to enable early returns before computation.
**Incorrect (computes avatar even when loading):**
```tsx
function Profile({ user, loading }: Props) {
const avatar = useMemo(() => {
const id = computeAvatarId(user)
return <Avatar id={id} />
}, [user])
if (loading) return <Skeleton />
return <div>{avatar}</div>
}
```
**Correct (skips computation when loading):**
```tsx
const UserAvatar = memo(function UserAvatar({ user }: { user: User }) {
const id = useMemo(() => computeAvatarId(user), [user])
return <Avatar id={id} />
})
function Profile({ user, loading }: Props) {
if (loading) return <Skeleton />
return (
<div>
<UserAvatar user={user} />
</div>
)
}
```
**Note:** If your project has [React Compiler](https://react.dev/learn/react-compiler) enabled, manual memoization with `memo()` and `useMemo()` is not necessary. The compiler automatically optimizes re-renders.

View File

@@ -1,40 +0,0 @@
---
title: Use Transitions for Non-Urgent Updates
impact: MEDIUM
impactDescription: maintains UI responsiveness
tags: rerender, transitions, startTransition, performance
---
## Use Transitions for Non-Urgent Updates
Mark frequent, non-urgent state updates as transitions to maintain UI responsiveness.
**Incorrect (blocks UI on every scroll):**
```tsx
function ScrollTracker() {
const [scrollY, setScrollY] = useState(0)
useEffect(() => {
const handler = () => setScrollY(window.scrollY)
window.addEventListener('scroll', handler, { passive: true })
return () => window.removeEventListener('scroll', handler)
}, [])
}
```
**Correct (non-blocking updates):**
```tsx
import { startTransition } from 'react'
function ScrollTracker() {
const [scrollY, setScrollY] = useState(0)
useEffect(() => {
const handler = () => {
startTransition(() => setScrollY(window.scrollY))
}
window.addEventListener('scroll', handler, { passive: true })
return () => window.removeEventListener('scroll', handler)
}, [])
}
```

View File

@@ -1,73 +0,0 @@
---
title: Use after() for Non-Blocking Operations
impact: MEDIUM
impactDescription: faster response times
tags: server, async, logging, analytics, side-effects
---
## Use after() for Non-Blocking Operations
Use Next.js's `after()` to schedule work that should execute after a response is sent. This prevents logging, analytics, and other side effects from blocking the response.
**Incorrect (blocks response):**
```tsx
import { logUserAction } from '@/app/utils'
export async function POST(request: Request) {
// Perform mutation
await updateDatabase(request)
// Logging blocks the response
const userAgent = request.headers.get('user-agent') || 'unknown'
await logUserAction({ userAgent })
return new Response(JSON.stringify({ status: 'success' }), {
status: 200,
headers: { 'Content-Type': 'application/json' }
})
}
```
**Correct (non-blocking):**
```tsx
import { after } from 'next/server'
import { headers, cookies } from 'next/headers'
import { logUserAction } from '@/app/utils'
export async function POST(request: Request) {
// Perform mutation
await updateDatabase(request)
// Log after response is sent
after(async () => {
const userAgent = (await headers()).get('user-agent') || 'unknown'
const sessionCookie = (await cookies()).get('session-id')?.value || 'anonymous'
logUserAction({ sessionCookie, userAgent })
})
return new Response(JSON.stringify({ status: 'success' }), {
status: 200,
headers: { 'Content-Type': 'application/json' }
})
}
```
The response is sent immediately while logging happens in the background.
**Common use cases:**
- Analytics tracking
- Audit logging
- Sending notifications
- Cache invalidation
- Cleanup tasks
**Important notes:**
- `after()` runs even if the response fails or redirects
- Works in Server Actions, Route Handlers, and Server Components
Reference: [https://nextjs.org/docs/app/api-reference/functions/after](https://nextjs.org/docs/app/api-reference/functions/after)

View File

@@ -1,41 +0,0 @@
---
title: Cross-Request LRU Caching
impact: HIGH
impactDescription: caches across requests
tags: server, cache, lru, cross-request
---
## Cross-Request LRU Caching
`React.cache()` only works within one request. For data shared across sequential requests (user clicks button A then button B), use an LRU cache.
**Implementation:**
```typescript
import { LRUCache } from 'lru-cache'
const cache = new LRUCache<string, any>({
max: 1000,
ttl: 5 * 60 * 1000 // 5 minutes
})
export async function getUser(id: string) {
const cached = cache.get(id)
if (cached) return cached
const user = await db.user.findUnique({ where: { id } })
cache.set(id, user)
return user
}
// Request 1: DB query, result cached
// Request 2: cache hit, no DB query
```
Use when sequential user actions hit multiple endpoints needing the same data within seconds.
**With Vercel's [Fluid Compute](https://vercel.com/docs/fluid-compute):** LRU caching is especially effective because multiple concurrent requests can share the same function instance and cache. This means the cache persists across requests without needing external storage like Redis.
**In traditional serverless:** Each invocation runs in isolation, so consider Redis for cross-process caching.
Reference: [https://github.com/isaacs/node-lru-cache](https://github.com/isaacs/node-lru-cache)

View File

@@ -1,26 +0,0 @@
---
title: Per-Request Deduplication with React.cache()
impact: MEDIUM
impactDescription: deduplicates within request
tags: server, cache, react-cache, deduplication
---
## Per-Request Deduplication with React.cache()
Use `React.cache()` for server-side request deduplication. Authentication and database queries benefit most.
**Usage:**
```typescript
import { cache } from 'react'
export const getCurrentUser = cache(async () => {
const session = await auth()
if (!session?.user?.id) return null
return await db.user.findUnique({
where: { id: session.user.id }
})
})
```
Within a single request, multiple calls to `getCurrentUser()` execute the query only once.

View File

@@ -1,79 +0,0 @@
---
title: Parallel Data Fetching with Component Composition
impact: CRITICAL
impactDescription: eliminates server-side waterfalls
tags: server, rsc, parallel-fetching, composition
---
## Parallel Data Fetching with Component Composition
React Server Components execute sequentially within a tree. Restructure with composition to parallelize data fetching.
**Incorrect (Sidebar waits for Page's fetch to complete):**
```tsx
export default async function Page() {
const header = await fetchHeader()
return (
<div>
<div>{header}</div>
<Sidebar />
</div>
)
}
async function Sidebar() {
const items = await fetchSidebarItems()
return <nav>{items.map(renderItem)}</nav>
}
```
**Correct (both fetch simultaneously):**
```tsx
async function Header() {
const data = await fetchHeader()
return <div>{data}</div>
}
async function Sidebar() {
const items = await fetchSidebarItems()
return <nav>{items.map(renderItem)}</nav>
}
export default function Page() {
return (
<div>
<Header />
<Sidebar />
</div>
)
}
```
**Alternative with children prop:**
```tsx
async function Layout({ children }: { children: ReactNode }) {
const header = await fetchHeader()
return (
<div>
<div>{header}</div>
{children}
</div>
)
}
async function Sidebar() {
const items = await fetchSidebarItems()
return <nav>{items.map(renderItem)}</nav>
}
export default function Page() {
return (
<Layout>
<Sidebar />
</Layout>
)
}
```

View File

@@ -1,38 +0,0 @@
---
title: Minimize Serialization at RSC Boundaries
impact: HIGH
impactDescription: reduces data transfer size
tags: server, rsc, serialization, props
---
## Minimize Serialization at RSC Boundaries
The React Server/Client boundary serializes all object properties into strings and embeds them in the HTML response and subsequent RSC requests. This serialized data directly impacts page weight and load time, so **size matters a lot**. Only pass fields that the client actually uses.
**Incorrect (serializes all 50 fields):**
```tsx
async function Page() {
const user = await fetchUser() // 50 fields
return <Profile user={user} />
}
'use client'
function Profile({ user }: { user: User }) {
return <div>{user.name}</div> // uses 1 field
}
```
**Correct (serializes only 1 field):**
```tsx
async function Page() {
const user = await fetchUser()
return <Profile name={user.name} />
}
'use client'
function Profile({ name }: { name: string }) {
return <div>{name}</div>
}
```

View File

@@ -1,9 +1,6 @@
# Ignore everything by default, selectively add things to context
*
# Documentation (for embeddings/search)
!docs/
# Platform - Libs
!autogpt_platform/autogpt_libs/autogpt_libs/
!autogpt_platform/autogpt_libs/pyproject.toml
@@ -19,7 +16,6 @@
!autogpt_platform/backend/poetry.lock
!autogpt_platform/backend/README.md
!autogpt_platform/backend/.env
!autogpt_platform/backend/gen_prisma_types_stub.py
# Platform - Market
!autogpt_platform/market/market/

View File

@@ -74,7 +74,7 @@ jobs:
- name: Generate Prisma Client
working-directory: autogpt_platform/backend
run: poetry run prisma generate && poetry run gen-prisma-stub
run: poetry run prisma generate
# Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml)
- name: Set up Node.js

View File

@@ -90,7 +90,7 @@ jobs:
- name: Generate Prisma Client
working-directory: autogpt_platform/backend
run: poetry run prisma generate && poetry run gen-prisma-stub
run: poetry run prisma generate
# Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml)
- name: Set up Node.js

View File

@@ -72,7 +72,7 @@ jobs:
- name: Generate Prisma Client
working-directory: autogpt_platform/backend
run: poetry run prisma generate && poetry run gen-prisma-stub
run: poetry run prisma generate
# Frontend Node.js/pnpm setup (mirrors platform-frontend-ci.yml)
- name: Set up Node.js
@@ -108,16 +108,6 @@ jobs:
# run: pnpm playwright install --with-deps chromium
# Docker setup for development environment
- name: Free up disk space
run: |
# Remove large unused tools to free disk space for Docker builds
sudo rm -rf /usr/share/dotnet
sudo rm -rf /usr/local/lib/android
sudo rm -rf /opt/ghc
sudo rm -rf /opt/hostedtoolcache/CodeQL
sudo docker system prune -af
df -h
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

View File

@@ -134,7 +134,7 @@ jobs:
run: poetry install
- name: Generate Prisma Client
run: poetry run prisma generate && poetry run gen-prisma-stub
run: poetry run prisma generate
- id: supabase
name: Start Supabase
@@ -176,7 +176,7 @@ jobs:
}
- name: Run Database Migrations
run: poetry run prisma migrate deploy
run: poetry run prisma migrate dev --name updates
env:
DATABASE_URL: ${{ steps.supabase.outputs.DB_URL }}
DIRECT_URL: ${{ steps.supabase.outputs.DB_URL }}

View File

@@ -11,7 +11,6 @@ on:
- ".github/workflows/platform-frontend-ci.yml"
- "autogpt_platform/frontend/**"
merge_group:
workflow_dispatch:
concurrency:
group: ${{ github.workflow }}-${{ github.event_name == 'merge_group' && format('merge-queue-{0}', github.ref) || format('{0}-{1}', github.ref, github.event.pull_request.number || github.sha) }}
@@ -152,14 +151,6 @@ jobs:
run: |
cp ../.env.default ../.env
- name: Copy backend .env and set OpenAI API key
run: |
cp ../backend/.env.default ../backend/.env
echo "OPENAI_INTERNAL_API_KEY=${{ secrets.OPENAI_API_KEY }}" >> ../backend/.env
env:
# Used by E2E test data script to generate embeddings for approved store agents
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
@@ -235,25 +226,13 @@ jobs:
- name: Run Playwright tests
run: pnpm test:no-build
continue-on-error: false
- name: Upload Playwright report
if: always()
- name: Upload Playwright artifacts
if: failure()
uses: actions/upload-artifact@v4
with:
name: playwright-report
path: playwright-report
if-no-files-found: ignore
retention-days: 3
- name: Upload Playwright test results
if: always()
uses: actions/upload-artifact@v4
with:
name: playwright-test-results
path: test-results
if-no-files-found: ignore
retention-days: 3
- name: Print Final Docker Compose logs
if: always()

View File

@@ -11,7 +11,7 @@ jobs:
stale:
runs-on: ubuntu-latest
steps:
- uses: actions/stale@v10
- uses: actions/stale@v9
with:
# operations-per-run: 5000
stale-issue-message: >

View File

@@ -61,6 +61,6 @@ jobs:
pull-requests: write
runs-on: ubuntu-latest
steps:
- uses: actions/labeler@v6
- uses: actions/labeler@v5
with:
sync-labels: true

View File

@@ -6,14 +6,12 @@ start-core:
# Stop core services
stop-core:
docker compose stop
docker compose stop deps
reset-db:
docker compose stop db
rm -rf db/docker/volumes/db/data
cd backend && poetry run prisma migrate deploy
cd backend && poetry run prisma generate
cd backend && poetry run gen-prisma-stub
# View logs for core services
logs-core:
@@ -35,7 +33,6 @@ init-env:
migrate:
cd backend && poetry run prisma migrate deploy
cd backend && poetry run prisma generate
cd backend && poetry run gen-prisma-stub
run-backend:
cd backend && poetry run app
@@ -47,7 +44,7 @@ test-data:
cd backend && poetry run python test/test_data_creator.py
load-store-agents:
cd backend && poetry run load-store-agents
cd backend && poetry run python test/load_store_agents.py
help:
@echo "Usage: make <target>"
@@ -61,4 +58,4 @@ help:
@echo " run-backend - Run the backend FastAPI server"
@echo " run-frontend - Run the frontend Next.js development server"
@echo " test-data - Run the test data creator"
@echo " load-store-agents - Load store agents from agents/ folder into test database"
@echo " load-store-agents - Load store agents from agents/ folder into test database"

View File

@@ -57,9 +57,6 @@ class APIKeySmith:
def hash_key(self, raw_key: str) -> tuple[str, str]:
"""Migrate a legacy hash to secure hash format."""
if not raw_key.startswith(self.PREFIX):
raise ValueError("Key without 'agpt_' prefix would fail validation")
salt = self._generate_salt()
hash = self._hash_key_with_salt(raw_key, salt)
return hash, salt.hex()

View File

@@ -1,25 +1,29 @@
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
from .jwt_utils import bearer_jwt_auth
def add_auth_responses_to_openapi(app: FastAPI) -> None:
"""
Patch a FastAPI instance's `openapi()` method to add 401 responses
Set up custom OpenAPI schema generation that adds 401 responses
to all authenticated endpoints.
This is needed when using HTTPBearer with auto_error=False to get proper
401 responses instead of 403, but FastAPI only automatically adds security
responses when auto_error=True.
"""
# Wrap current method to allow stacking OpenAPI schema modifiers like this
wrapped_openapi = app.openapi
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = wrapped_openapi()
openapi_schema = get_openapi(
title=app.title,
version=app.version,
description=app.description,
routes=app.routes,
)
# Add 401 response to all endpoints that have security requirements
for path, methods in openapi_schema["paths"].items():

View File

@@ -58,13 +58,6 @@ V0_API_KEY=
OPEN_ROUTER_API_KEY=
NVIDIA_API_KEY=
# Langfuse Prompt Management
# Used for managing the CoPilot system prompt externally
# Get credentials from https://cloud.langfuse.com or your self-hosted instance
LANGFUSE_PUBLIC_KEY=
LANGFUSE_SECRET_KEY=
LANGFUSE_HOST=https://cloud.langfuse.com
# OAuth Credentials
# For the OAuth callback URL, use <your_frontend_url>/auth/integrations/oauth_callback,
# e.g. http://localhost:3000/auth/integrations/oauth_callback

View File

@@ -18,4 +18,3 @@ load-tests/results/
load-tests/*.json
load-tests/*.log
load-tests/node_modules/*
migrations/*/rollback*.sql

View File

@@ -48,8 +48,7 @@ RUN poetry install --no-ansi --no-root
# Generate Prisma client
COPY autogpt_platform/backend/schema.prisma ./
COPY autogpt_platform/backend/backend/data/partial_types.py ./backend/data/partial_types.py
COPY autogpt_platform/backend/gen_prisma_types_stub.py ./
RUN poetry run prisma generate && poetry run gen-prisma-stub
RUN poetry run prisma generate
FROM debian:13-slim AS server_dependencies
@@ -100,7 +99,6 @@ COPY autogpt_platform/backend/migrations /app/autogpt_platform/backend/migration
FROM server_dependencies AS server
COPY autogpt_platform/backend /app/autogpt_platform/backend
COPY docs /app/docs
RUN poetry install --no-ansi --only-root
ENV PORT=8000

View File

@@ -108,7 +108,7 @@ import fastapi.testclient
import pytest
from pytest_snapshot.plugin import Snapshot
from backend.api.features.myroute import router
from backend.server.v2.myroute import router
app = fastapi.FastAPI()
app.include_router(router)
@@ -149,7 +149,7 @@ These provide the easiest way to set up authentication mocking in test modules:
import fastapi
import fastapi.testclient
import pytest
from backend.api.features.myroute import router
from backend.server.v2.myroute import router
app = fastapi.FastAPI()
app.include_router(router)

View File

@@ -1,25 +0,0 @@
from fastapi import FastAPI
from backend.api.middleware.security import SecurityHeadersMiddleware
from backend.monitoring.instrumentation import instrument_fastapi
from .v1.routes import v1_router
external_api = FastAPI(
title="AutoGPT External API",
description="External API for AutoGPT integrations",
docs_url="/docs",
version="1.0",
)
external_api.add_middleware(SecurityHeadersMiddleware)
external_api.include_router(v1_router, prefix="/v1")
# Add Prometheus instrumentation
instrument_fastapi(
external_api,
service_name="external-api",
expose_endpoint=True,
endpoint="/metrics",
include_in_schema=True,
)

View File

@@ -1,107 +0,0 @@
from fastapi import HTTPException, Security, status
from fastapi.security import APIKeyHeader, HTTPAuthorizationCredentials, HTTPBearer
from prisma.enums import APIKeyPermission
from backend.data.auth.api_key import APIKeyInfo, validate_api_key
from backend.data.auth.base import APIAuthorizationInfo
from backend.data.auth.oauth import (
InvalidClientError,
InvalidTokenError,
OAuthAccessTokenInfo,
validate_access_token,
)
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
bearer_auth = HTTPBearer(auto_error=False)
async def require_api_key(api_key: str | None = Security(api_key_header)) -> APIKeyInfo:
"""Middleware for API key authentication only"""
if api_key is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Missing API key"
)
api_key_obj = await validate_api_key(api_key)
if not api_key_obj:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key"
)
return api_key_obj
async def require_access_token(
bearer: HTTPAuthorizationCredentials | None = Security(bearer_auth),
) -> OAuthAccessTokenInfo:
"""Middleware for OAuth access token authentication only"""
if bearer is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing Authorization header",
)
try:
token_info, _ = await validate_access_token(bearer.credentials)
except (InvalidClientError, InvalidTokenError) as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e))
return token_info
async def require_auth(
api_key: str | None = Security(api_key_header),
bearer: HTTPAuthorizationCredentials | None = Security(bearer_auth),
) -> APIAuthorizationInfo:
"""
Unified authentication middleware supporting both API keys and OAuth tokens.
Supports two authentication methods, which are checked in order:
1. X-API-Key header (existing API key authentication)
2. Authorization: Bearer <token> header (OAuth access token)
Returns:
APIAuthorizationInfo: base class of both APIKeyInfo and OAuthAccessTokenInfo.
"""
# Try API key first
if api_key is not None:
api_key_info = await validate_api_key(api_key)
if api_key_info:
return api_key_info
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key"
)
# Try OAuth bearer token
if bearer is not None:
try:
token_info, _ = await validate_access_token(bearer.credentials)
return token_info
except (InvalidClientError, InvalidTokenError) as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=str(e))
# No credentials provided
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing authentication. Provide API key or access token.",
)
def require_permission(permission: APIKeyPermission):
"""
Dependency function for checking specific permissions
(works with API keys and OAuth tokens)
"""
async def check_permission(
auth: APIAuthorizationInfo = Security(require_auth),
) -> APIAuthorizationInfo:
if permission not in auth.scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing required permission: {permission.value}",
)
return auth
return check_permission

View File

@@ -1,340 +0,0 @@
"""Tests for analytics API endpoints."""
import json
from unittest.mock import AsyncMock, Mock
import fastapi
import fastapi.testclient
import pytest
import pytest_mock
from pytest_snapshot.plugin import Snapshot
from .analytics import router as analytics_router
app = fastapi.FastAPI()
app.include_router(analytics_router)
client = fastapi.testclient.TestClient(app)
@pytest.fixture(autouse=True)
def setup_app_auth(mock_jwt_user):
"""Setup auth overrides for all tests in this module."""
from autogpt_libs.auth.jwt_utils import get_jwt_payload
app.dependency_overrides[get_jwt_payload] = mock_jwt_user["get_jwt_payload"]
yield
app.dependency_overrides.clear()
# =============================================================================
# /log_raw_metric endpoint tests
# =============================================================================
def test_log_raw_metric_success(
mocker: pytest_mock.MockFixture,
configured_snapshot: Snapshot,
test_user_id: str,
) -> None:
"""Test successful raw metric logging."""
mock_result = Mock(id="metric-123-uuid")
mock_log_metric = mocker.patch(
"backend.data.analytics.log_raw_metric",
new_callable=AsyncMock,
return_value=mock_result,
)
request_data = {
"metric_name": "page_load_time",
"metric_value": 2.5,
"data_string": "/dashboard",
}
response = client.post("/log_raw_metric", json=request_data)
assert response.status_code == 200, f"Unexpected response: {response.text}"
assert response.json() == "metric-123-uuid"
mock_log_metric.assert_called_once_with(
user_id=test_user_id,
metric_name="page_load_time",
metric_value=2.5,
data_string="/dashboard",
)
configured_snapshot.assert_match(
json.dumps({"metric_id": response.json()}, indent=2, sort_keys=True),
"analytics_log_metric_success",
)
@pytest.mark.parametrize(
"metric_value,metric_name,data_string,test_id",
[
(100, "api_calls_count", "external_api", "integer_value"),
(0, "error_count", "no_errors", "zero_value"),
(-5.2, "temperature_delta", "cooling", "negative_value"),
(1.23456789, "precision_test", "float_precision", "float_precision"),
(999999999, "large_number", "max_value", "large_number"),
(0.0000001, "tiny_number", "min_value", "tiny_number"),
],
)
def test_log_raw_metric_various_values(
mocker: pytest_mock.MockFixture,
configured_snapshot: Snapshot,
metric_value: float,
metric_name: str,
data_string: str,
test_id: str,
) -> None:
"""Test raw metric logging with various metric values."""
mock_result = Mock(id=f"metric-{test_id}-uuid")
mocker.patch(
"backend.data.analytics.log_raw_metric",
new_callable=AsyncMock,
return_value=mock_result,
)
request_data = {
"metric_name": metric_name,
"metric_value": metric_value,
"data_string": data_string,
}
response = client.post("/log_raw_metric", json=request_data)
assert response.status_code == 200, f"Failed for {test_id}: {response.text}"
configured_snapshot.assert_match(
json.dumps(
{"metric_id": response.json(), "test_case": test_id},
indent=2,
sort_keys=True,
),
f"analytics_metric_{test_id}",
)
@pytest.mark.parametrize(
"invalid_data,expected_error",
[
({}, "Field required"),
({"metric_name": "test"}, "Field required"),
(
{"metric_name": "test", "metric_value": "not_a_number", "data_string": "x"},
"Input should be a valid number",
),
(
{"metric_name": "", "metric_value": 1.0, "data_string": "test"},
"String should have at least 1 character",
),
(
{"metric_name": "test", "metric_value": 1.0, "data_string": ""},
"String should have at least 1 character",
),
],
ids=[
"empty_request",
"missing_metric_value_and_data_string",
"invalid_metric_value_type",
"empty_metric_name",
"empty_data_string",
],
)
def test_log_raw_metric_validation_errors(
invalid_data: dict,
expected_error: str,
) -> None:
"""Test validation errors for invalid metric requests."""
response = client.post("/log_raw_metric", json=invalid_data)
assert response.status_code == 422
error_detail = response.json()
assert "detail" in error_detail, f"Missing 'detail' in error: {error_detail}"
error_text = json.dumps(error_detail)
assert (
expected_error in error_text
), f"Expected '{expected_error}' in error response: {error_text}"
def test_log_raw_metric_service_error(
mocker: pytest_mock.MockFixture,
test_user_id: str,
) -> None:
"""Test error handling when analytics service fails."""
mocker.patch(
"backend.data.analytics.log_raw_metric",
new_callable=AsyncMock,
side_effect=Exception("Database connection failed"),
)
request_data = {
"metric_name": "test_metric",
"metric_value": 1.0,
"data_string": "test",
}
response = client.post("/log_raw_metric", json=request_data)
assert response.status_code == 500
error_detail = response.json()["detail"]
assert "Database connection failed" in error_detail["message"]
assert "hint" in error_detail
# =============================================================================
# /log_raw_analytics endpoint tests
# =============================================================================
def test_log_raw_analytics_success(
mocker: pytest_mock.MockFixture,
configured_snapshot: Snapshot,
test_user_id: str,
) -> None:
"""Test successful raw analytics logging."""
mock_result = Mock(id="analytics-789-uuid")
mock_log_analytics = mocker.patch(
"backend.data.analytics.log_raw_analytics",
new_callable=AsyncMock,
return_value=mock_result,
)
request_data = {
"type": "user_action",
"data": {
"action": "button_click",
"button_id": "submit_form",
"timestamp": "2023-01-01T00:00:00Z",
"metadata": {"form_type": "registration", "fields_filled": 5},
},
"data_index": "button_click_submit_form",
}
response = client.post("/log_raw_analytics", json=request_data)
assert response.status_code == 200, f"Unexpected response: {response.text}"
assert response.json() == "analytics-789-uuid"
mock_log_analytics.assert_called_once_with(
test_user_id,
"user_action",
request_data["data"],
"button_click_submit_form",
)
configured_snapshot.assert_match(
json.dumps({"analytics_id": response.json()}, indent=2, sort_keys=True),
"analytics_log_analytics_success",
)
def test_log_raw_analytics_complex_data(
mocker: pytest_mock.MockFixture,
configured_snapshot: Snapshot,
) -> None:
"""Test raw analytics logging with complex nested data structures."""
mock_result = Mock(id="analytics-complex-uuid")
mocker.patch(
"backend.data.analytics.log_raw_analytics",
new_callable=AsyncMock,
return_value=mock_result,
)
request_data = {
"type": "agent_execution",
"data": {
"agent_id": "agent_123",
"execution_id": "exec_456",
"status": "completed",
"duration_ms": 3500,
"nodes_executed": 15,
"blocks_used": [
{"block_id": "llm_block", "count": 3},
{"block_id": "http_block", "count": 5},
{"block_id": "code_block", "count": 2},
],
"errors": [],
"metadata": {
"trigger": "manual",
"user_tier": "premium",
"environment": "production",
},
},
"data_index": "agent_123_exec_456",
}
response = client.post("/log_raw_analytics", json=request_data)
assert response.status_code == 200
configured_snapshot.assert_match(
json.dumps(
{"analytics_id": response.json(), "logged_data": request_data["data"]},
indent=2,
sort_keys=True,
),
"analytics_log_analytics_complex_data",
)
@pytest.mark.parametrize(
"invalid_data,expected_error",
[
({}, "Field required"),
({"type": "test"}, "Field required"),
(
{"type": "test", "data": "not_a_dict", "data_index": "test"},
"Input should be a valid dictionary",
),
({"type": "test", "data": {"key": "value"}}, "Field required"),
],
ids=[
"empty_request",
"missing_data_and_data_index",
"invalid_data_type",
"missing_data_index",
],
)
def test_log_raw_analytics_validation_errors(
invalid_data: dict,
expected_error: str,
) -> None:
"""Test validation errors for invalid analytics requests."""
response = client.post("/log_raw_analytics", json=invalid_data)
assert response.status_code == 422
error_detail = response.json()
assert "detail" in error_detail, f"Missing 'detail' in error: {error_detail}"
error_text = json.dumps(error_detail)
assert (
expected_error in error_text
), f"Expected '{expected_error}' in error response: {error_text}"
def test_log_raw_analytics_service_error(
mocker: pytest_mock.MockFixture,
test_user_id: str,
) -> None:
"""Test error handling when analytics service fails."""
mocker.patch(
"backend.data.analytics.log_raw_analytics",
new_callable=AsyncMock,
side_effect=Exception("Analytics DB unreachable"),
)
request_data = {
"type": "test_event",
"data": {"key": "value"},
"data_index": "test_index",
}
response = client.post("/log_raw_analytics", json=request_data)
assert response.status_code == 500
error_detail = response.json()["detail"]
assert "Analytics DB unreachable" in error_detail["message"]
assert "hint" in error_detail

View File

@@ -1,689 +0,0 @@
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from difflib import SequenceMatcher
from typing import Sequence
import prisma
import backend.api.features.library.db as library_db
import backend.api.features.library.model as library_model
import backend.api.features.store.db as store_db
import backend.api.features.store.model as store_model
import backend.data.block
from backend.blocks import load_all_blocks
from backend.blocks.llm import LlmModel
from backend.data.block import AnyBlockSchema, BlockCategory, BlockInfo, BlockSchema
from backend.data.db import query_raw_with_schema
from backend.integrations.providers import ProviderName
from backend.util.cache import cached
from backend.util.models import Pagination
from .model import (
BlockCategoryResponse,
BlockResponse,
BlockType,
CountResponse,
FilterType,
Provider,
ProviderResponse,
SearchEntry,
)
logger = logging.getLogger(__name__)
llm_models = [name.name.lower().replace("_", " ") for name in LlmModel]
MAX_LIBRARY_AGENT_RESULTS = 100
MAX_MARKETPLACE_AGENT_RESULTS = 100
MIN_SCORE_FOR_FILTERED_RESULTS = 10.0
SearchResultItem = BlockInfo | library_model.LibraryAgent | store_model.StoreAgent
@dataclass
class _ScoredItem:
item: SearchResultItem
filter_type: FilterType
score: float
sort_key: str
@dataclass
class _SearchCacheEntry:
items: list[SearchResultItem]
total_items: dict[FilterType, int]
def get_block_categories(category_blocks: int = 3) -> list[BlockCategoryResponse]:
categories: dict[BlockCategory, BlockCategoryResponse] = {}
for block_type in load_all_blocks().values():
block: AnyBlockSchema = block_type()
# Skip disabled blocks
if block.disabled:
continue
# Skip blocks that don't have categories (all should have at least one)
if not block.categories:
continue
# Add block to the categories
for category in block.categories:
if category not in categories:
categories[category] = BlockCategoryResponse(
name=category.name.lower(),
total_blocks=0,
blocks=[],
)
categories[category].total_blocks += 1
# Append if the category has less than the specified number of blocks
if len(categories[category].blocks) < category_blocks:
categories[category].blocks.append(block.get_info())
# Sort categories by name
return sorted(categories.values(), key=lambda x: x.name)
def get_blocks(
*,
category: str | None = None,
type: BlockType | None = None,
provider: ProviderName | None = None,
page: int = 1,
page_size: int = 50,
) -> BlockResponse:
"""
Get blocks based on either category, type or provider.
Providing nothing fetches all block types.
"""
# Only one of category, type, or provider can be specified
if (category and type) or (category and provider) or (type and provider):
raise ValueError("Only one of category, type, or provider can be specified")
blocks: list[AnyBlockSchema] = []
skip = (page - 1) * page_size
take = page_size
total = 0
for block_type in load_all_blocks().values():
block: AnyBlockSchema = block_type()
# Skip disabled blocks
if block.disabled:
continue
# Skip blocks that don't match the category
if category and category not in {c.name.lower() for c in block.categories}:
continue
# Skip blocks that don't match the type
if (
(type == "input" and block.block_type.value != "Input")
or (type == "output" and block.block_type.value != "Output")
or (type == "action" and block.block_type.value in ("Input", "Output"))
):
continue
# Skip blocks that don't match the provider
if provider:
credentials_info = block.input_schema.get_credentials_fields_info().values()
if not any(provider in info.provider for info in credentials_info):
continue
total += 1
if skip > 0:
skip -= 1
continue
if take > 0:
take -= 1
blocks.append(block)
return BlockResponse(
blocks=[b.get_info() for b in blocks],
pagination=Pagination(
total_items=total,
total_pages=(total + page_size - 1) // page_size,
current_page=page,
page_size=page_size,
),
)
def get_block_by_id(block_id: str) -> BlockInfo | None:
"""
Get a specific block by its ID.
"""
for block_type in load_all_blocks().values():
block: AnyBlockSchema = block_type()
if block.id == block_id:
return block.get_info()
return None
async def update_search(user_id: str, search: SearchEntry) -> str:
"""
Upsert a search request for the user and return the search ID.
"""
if search.search_id:
# Update existing search
await prisma.models.BuilderSearchHistory.prisma().update(
where={
"id": search.search_id,
},
data={
"searchQuery": search.search_query or "",
"filter": search.filter or [], # type: ignore
"byCreator": search.by_creator or [],
},
)
return search.search_id
else:
# Create new search
new_search = await prisma.models.BuilderSearchHistory.prisma().create(
data={
"userId": user_id,
"searchQuery": search.search_query or "",
"filter": search.filter or [], # type: ignore
"byCreator": search.by_creator or [],
}
)
return new_search.id
async def get_recent_searches(user_id: str, limit: int = 5) -> list[SearchEntry]:
"""
Get the user's most recent search requests.
"""
searches = await prisma.models.BuilderSearchHistory.prisma().find_many(
where={
"userId": user_id,
},
order={
"updatedAt": "desc",
},
take=limit,
)
return [
SearchEntry(
search_query=s.searchQuery,
filter=s.filter, # type: ignore
by_creator=s.byCreator,
search_id=s.id,
)
for s in searches
]
async def get_sorted_search_results(
*,
user_id: str,
search_query: str | None,
filters: Sequence[FilterType],
by_creator: Sequence[str] | None = None,
) -> _SearchCacheEntry:
normalized_filters: tuple[FilterType, ...] = tuple(sorted(set(filters or [])))
normalized_creators: tuple[str, ...] = tuple(sorted(set(by_creator or [])))
return await _build_cached_search_results(
user_id=user_id,
search_query=search_query or "",
filters=normalized_filters,
by_creator=normalized_creators,
)
@cached(ttl_seconds=300, shared_cache=True)
async def _build_cached_search_results(
user_id: str,
search_query: str,
filters: tuple[FilterType, ...],
by_creator: tuple[str, ...],
) -> _SearchCacheEntry:
normalized_query = (search_query or "").strip().lower()
include_blocks = "blocks" in filters
include_integrations = "integrations" in filters
include_library_agents = "my_agents" in filters
include_marketplace_agents = "marketplace_agents" in filters
scored_items: list[_ScoredItem] = []
total_items: dict[FilterType, int] = {
"blocks": 0,
"integrations": 0,
"marketplace_agents": 0,
"my_agents": 0,
}
block_results, block_total, integration_total = _collect_block_results(
normalized_query=normalized_query,
include_blocks=include_blocks,
include_integrations=include_integrations,
)
scored_items.extend(block_results)
total_items["blocks"] = block_total
total_items["integrations"] = integration_total
if include_library_agents:
library_response = await library_db.list_library_agents(
user_id=user_id,
search_term=search_query or None,
page=1,
page_size=MAX_LIBRARY_AGENT_RESULTS,
)
total_items["my_agents"] = library_response.pagination.total_items
scored_items.extend(
_build_library_items(
agents=library_response.agents,
normalized_query=normalized_query,
)
)
if include_marketplace_agents:
marketplace_response = await store_db.get_store_agents(
creators=list(by_creator) or None,
search_query=search_query or None,
page=1,
page_size=MAX_MARKETPLACE_AGENT_RESULTS,
)
total_items["marketplace_agents"] = marketplace_response.pagination.total_items
scored_items.extend(
_build_marketplace_items(
agents=marketplace_response.agents,
normalized_query=normalized_query,
)
)
sorted_items = sorted(
scored_items,
key=lambda entry: (-entry.score, entry.sort_key, entry.filter_type),
)
return _SearchCacheEntry(
items=[entry.item for entry in sorted_items],
total_items=total_items,
)
def _collect_block_results(
*,
normalized_query: str,
include_blocks: bool,
include_integrations: bool,
) -> tuple[list[_ScoredItem], int, int]:
results: list[_ScoredItem] = []
block_count = 0
integration_count = 0
if not include_blocks and not include_integrations:
return results, block_count, integration_count
for block_type in load_all_blocks().values():
block: AnyBlockSchema = block_type()
if block.disabled:
continue
block_info = block.get_info()
credentials = list(block.input_schema.get_credentials_fields().values())
is_integration = len(credentials) > 0
if is_integration and not include_integrations:
continue
if not is_integration and not include_blocks:
continue
score = _score_block(block, block_info, normalized_query)
if not _should_include_item(score, normalized_query):
continue
filter_type: FilterType = "integrations" if is_integration else "blocks"
if is_integration:
integration_count += 1
else:
block_count += 1
results.append(
_ScoredItem(
item=block_info,
filter_type=filter_type,
score=score,
sort_key=_get_item_name(block_info),
)
)
return results, block_count, integration_count
def _build_library_items(
*,
agents: list[library_model.LibraryAgent],
normalized_query: str,
) -> list[_ScoredItem]:
results: list[_ScoredItem] = []
for agent in agents:
score = _score_library_agent(agent, normalized_query)
if not _should_include_item(score, normalized_query):
continue
results.append(
_ScoredItem(
item=agent,
filter_type="my_agents",
score=score,
sort_key=_get_item_name(agent),
)
)
return results
def _build_marketplace_items(
*,
agents: list[store_model.StoreAgent],
normalized_query: str,
) -> list[_ScoredItem]:
results: list[_ScoredItem] = []
for agent in agents:
score = _score_store_agent(agent, normalized_query)
if not _should_include_item(score, normalized_query):
continue
results.append(
_ScoredItem(
item=agent,
filter_type="marketplace_agents",
score=score,
sort_key=_get_item_name(agent),
)
)
return results
def get_providers(
query: str = "",
page: int = 1,
page_size: int = 50,
) -> ProviderResponse:
providers = []
query = query.lower()
skip = (page - 1) * page_size
take = page_size
all_providers = _get_all_providers()
for provider in all_providers.values():
if (
query not in provider.name.value.lower()
and query not in provider.description.lower()
):
continue
if skip > 0:
skip -= 1
continue
if take > 0:
take -= 1
providers.append(provider)
total = len(all_providers)
return ProviderResponse(
providers=providers,
pagination=Pagination(
total_items=total,
total_pages=(total + page_size - 1) // page_size,
current_page=page,
page_size=page_size,
),
)
async def get_counts(user_id: str) -> CountResponse:
my_agents = await prisma.models.LibraryAgent.prisma().count(
where={
"userId": user_id,
"isDeleted": False,
"isArchived": False,
}
)
counts = await _get_static_counts()
return CountResponse(
my_agents=my_agents,
**counts,
)
@cached(ttl_seconds=3600)
async def _get_static_counts():
"""
Get counts of blocks, integrations, and marketplace agents.
This is cached to avoid unnecessary database queries and calculations.
"""
all_blocks = 0
input_blocks = 0
action_blocks = 0
output_blocks = 0
integrations = 0
for block_type in load_all_blocks().values():
block: AnyBlockSchema = block_type()
if block.disabled:
continue
all_blocks += 1
if block.block_type.value == "Input":
input_blocks += 1
elif block.block_type.value == "Output":
output_blocks += 1
else:
action_blocks += 1
credentials = list(block.input_schema.get_credentials_fields().values())
if len(credentials) > 0:
integrations += 1
marketplace_agents = await prisma.models.StoreAgent.prisma().count()
return {
"all_blocks": all_blocks,
"input_blocks": input_blocks,
"action_blocks": action_blocks,
"output_blocks": output_blocks,
"integrations": integrations,
"marketplace_agents": marketplace_agents,
}
def _matches_llm_model(schema_cls: type[BlockSchema], query: str) -> bool:
for field in schema_cls.model_fields.values():
if field.annotation == LlmModel:
# Check if query matches any value in llm_models
if any(query in name for name in llm_models):
return True
return False
def _score_block(
block: AnyBlockSchema,
block_info: BlockInfo,
normalized_query: str,
) -> float:
if not normalized_query:
return 0.0
name = block_info.name.lower()
description = block_info.description.lower()
score = _score_primary_fields(name, description, normalized_query)
category_text = " ".join(
category.get("category", "").lower() for category in block_info.categories
)
score += _score_additional_field(category_text, normalized_query, 12, 6)
credentials_info = block.input_schema.get_credentials_fields_info().values()
provider_names = [
provider.value.lower()
for info in credentials_info
for provider in info.provider
]
provider_text = " ".join(provider_names)
score += _score_additional_field(provider_text, normalized_query, 15, 6)
if _matches_llm_model(block.input_schema, normalized_query):
score += 20
return score
def _score_library_agent(
agent: library_model.LibraryAgent,
normalized_query: str,
) -> float:
if not normalized_query:
return 0.0
name = agent.name.lower()
description = (agent.description or "").lower()
instructions = (agent.instructions or "").lower()
score = _score_primary_fields(name, description, normalized_query)
score += _score_additional_field(instructions, normalized_query, 15, 6)
score += _score_additional_field(
agent.creator_name.lower(), normalized_query, 10, 5
)
return score
def _score_store_agent(
agent: store_model.StoreAgent,
normalized_query: str,
) -> float:
if not normalized_query:
return 0.0
name = agent.agent_name.lower()
description = agent.description.lower()
sub_heading = agent.sub_heading.lower()
score = _score_primary_fields(name, description, normalized_query)
score += _score_additional_field(sub_heading, normalized_query, 12, 6)
score += _score_additional_field(agent.creator.lower(), normalized_query, 10, 5)
return score
def _score_primary_fields(name: str, description: str, query: str) -> float:
score = 0.0
if name == query:
score += 120
elif name.startswith(query):
score += 90
elif query in name:
score += 60
score += SequenceMatcher(None, name, query).ratio() * 50
if description:
if query in description:
score += 30
score += SequenceMatcher(None, description, query).ratio() * 25
return score
def _score_additional_field(
value: str,
query: str,
contains_weight: float,
similarity_weight: float,
) -> float:
if not value or not query:
return 0.0
score = 0.0
if query in value:
score += contains_weight
score += SequenceMatcher(None, value, query).ratio() * similarity_weight
return score
def _should_include_item(score: float, normalized_query: str) -> bool:
if not normalized_query:
return True
return score >= MIN_SCORE_FOR_FILTERED_RESULTS
def _get_item_name(item: SearchResultItem) -> str:
if isinstance(item, BlockInfo):
return item.name.lower()
if isinstance(item, library_model.LibraryAgent):
return item.name.lower()
return item.agent_name.lower()
@cached(ttl_seconds=3600)
def _get_all_providers() -> dict[ProviderName, Provider]:
providers: dict[ProviderName, Provider] = {}
for block_type in load_all_blocks().values():
block: AnyBlockSchema = block_type()
if block.disabled:
continue
credentials_info = block.input_schema.get_credentials_fields_info().values()
for info in credentials_info:
for provider in info.provider: # provider is a ProviderName enum member
if provider in providers:
providers[provider].integration_count += 1
else:
providers[provider] = Provider(
name=provider, description="", integration_count=1
)
return providers
@cached(ttl_seconds=3600)
async def get_suggested_blocks(count: int = 5) -> list[BlockInfo]:
suggested_blocks = []
# Sum the number of executions for each block type
# Prisma cannot group by nested relations, so we do a raw query
# Calculate the cutoff timestamp
timestamp_threshold = datetime.now(timezone.utc) - timedelta(days=30)
results = await query_raw_with_schema(
"""
SELECT
agent_node."agentBlockId" AS block_id,
COUNT(execution.id) AS execution_count
FROM {schema_prefix}"AgentNodeExecution" execution
JOIN {schema_prefix}"AgentNode" agent_node ON execution."agentNodeId" = agent_node.id
WHERE execution."endedTime" >= $1::timestamp
GROUP BY agent_node."agentBlockId"
ORDER BY execution_count DESC;
""",
timestamp_threshold,
)
# Get the top blocks based on execution count
# But ignore Input and Output blocks
blocks: list[tuple[BlockInfo, int]] = []
for block_type in load_all_blocks().values():
block: AnyBlockSchema = block_type()
if block.disabled or block.block_type in (
backend.data.block.BlockType.INPUT,
backend.data.block.BlockType.OUTPUT,
backend.data.block.BlockType.AGENT,
):
continue
# Find the execution count for this block
execution_count = next(
(row["execution_count"] for row in results if row["block_id"] == block.id),
0,
)
blocks.append((block.get_info(), execution_count))
# Sort blocks by execution count
blocks.sort(key=lambda x: x[1], reverse=True)
suggested_blocks = [block[0] for block in blocks]
# Return the top blocks
return suggested_blocks[:count]

View File

@@ -1,249 +0,0 @@
"""Database operations for chat sessions."""
import asyncio
import logging
from datetime import UTC, datetime
from typing import Any, cast
from prisma.models import ChatMessage as PrismaChatMessage
from prisma.models import ChatSession as PrismaChatSession
from prisma.types import (
ChatMessageCreateInput,
ChatSessionCreateInput,
ChatSessionUpdateInput,
ChatSessionWhereInput,
)
from backend.data.db import transaction
from backend.util.json import SafeJson
logger = logging.getLogger(__name__)
async def get_chat_session(session_id: str) -> PrismaChatSession | None:
"""Get a chat session by ID from the database."""
session = await PrismaChatSession.prisma().find_unique(
where={"id": session_id},
include={"Messages": True},
)
if session and session.Messages:
# Sort messages by sequence in Python - Prisma Python client doesn't support
# order_by in include clauses (unlike Prisma JS), so we sort after fetching
session.Messages.sort(key=lambda m: m.sequence)
return session
async def create_chat_session(
session_id: str,
user_id: str,
) -> PrismaChatSession:
"""Create a new chat session in the database."""
data = ChatSessionCreateInput(
id=session_id,
userId=user_id,
credentials=SafeJson({}),
successfulAgentRuns=SafeJson({}),
successfulAgentSchedules=SafeJson({}),
)
return await PrismaChatSession.prisma().create(
data=data,
include={"Messages": True},
)
async def update_chat_session(
session_id: str,
credentials: dict[str, Any] | None = None,
successful_agent_runs: dict[str, Any] | None = None,
successful_agent_schedules: dict[str, Any] | None = None,
total_prompt_tokens: int | None = None,
total_completion_tokens: int | None = None,
title: str | None = None,
) -> PrismaChatSession | None:
"""Update a chat session's metadata."""
data: ChatSessionUpdateInput = {"updatedAt": datetime.now(UTC)}
if credentials is not None:
data["credentials"] = SafeJson(credentials)
if successful_agent_runs is not None:
data["successfulAgentRuns"] = SafeJson(successful_agent_runs)
if successful_agent_schedules is not None:
data["successfulAgentSchedules"] = SafeJson(successful_agent_schedules)
if total_prompt_tokens is not None:
data["totalPromptTokens"] = total_prompt_tokens
if total_completion_tokens is not None:
data["totalCompletionTokens"] = total_completion_tokens
if title is not None:
data["title"] = title
session = await PrismaChatSession.prisma().update(
where={"id": session_id},
data=data,
include={"Messages": True},
)
if session and session.Messages:
# Sort in Python - Prisma Python doesn't support order_by in include clauses
session.Messages.sort(key=lambda m: m.sequence)
return session
async def add_chat_message(
session_id: str,
role: str,
sequence: int,
content: str | None = None,
name: str | None = None,
tool_call_id: str | None = None,
refusal: str | None = None,
tool_calls: list[dict[str, Any]] | None = None,
function_call: dict[str, Any] | None = None,
) -> PrismaChatMessage:
"""Add a message to a chat session."""
# Build input dict dynamically rather than using ChatMessageCreateInput directly
# because Prisma's TypedDict validation rejects optional fields set to None.
# We only include fields that have values, then cast at the end.
data: dict[str, Any] = {
"Session": {"connect": {"id": session_id}},
"role": role,
"sequence": sequence,
}
# Add optional string fields
if content is not None:
data["content"] = content
if name is not None:
data["name"] = name
if tool_call_id is not None:
data["toolCallId"] = tool_call_id
if refusal is not None:
data["refusal"] = refusal
# Add optional JSON fields only when they have values
if tool_calls is not None:
data["toolCalls"] = SafeJson(tool_calls)
if function_call is not None:
data["functionCall"] = SafeJson(function_call)
# Run message create and session timestamp update in parallel for lower latency
_, message = await asyncio.gather(
PrismaChatSession.prisma().update(
where={"id": session_id},
data={"updatedAt": datetime.now(UTC)},
),
PrismaChatMessage.prisma().create(data=cast(ChatMessageCreateInput, data)),
)
return message
async def add_chat_messages_batch(
session_id: str,
messages: list[dict[str, Any]],
start_sequence: int,
) -> list[PrismaChatMessage]:
"""Add multiple messages to a chat session in a batch.
Uses a transaction for atomicity - if any message creation fails,
the entire batch is rolled back.
"""
if not messages:
return []
created_messages = []
async with transaction() as tx:
for i, msg in enumerate(messages):
# Build input dict dynamically rather than using ChatMessageCreateInput
# directly because Prisma's TypedDict validation rejects optional fields
# set to None. We only include fields that have values, then cast.
data: dict[str, Any] = {
"Session": {"connect": {"id": session_id}},
"role": msg["role"],
"sequence": start_sequence + i,
}
# Add optional string fields
if msg.get("content") is not None:
data["content"] = msg["content"]
if msg.get("name") is not None:
data["name"] = msg["name"]
if msg.get("tool_call_id") is not None:
data["toolCallId"] = msg["tool_call_id"]
if msg.get("refusal") is not None:
data["refusal"] = msg["refusal"]
# Add optional JSON fields only when they have values
if msg.get("tool_calls") is not None:
data["toolCalls"] = SafeJson(msg["tool_calls"])
if msg.get("function_call") is not None:
data["functionCall"] = SafeJson(msg["function_call"])
created = await PrismaChatMessage.prisma(tx).create(
data=cast(ChatMessageCreateInput, data)
)
created_messages.append(created)
# Update session's updatedAt timestamp within the same transaction.
# Note: Token usage (total_prompt_tokens, total_completion_tokens) is updated
# separately via update_chat_session() after streaming completes.
await PrismaChatSession.prisma(tx).update(
where={"id": session_id},
data={"updatedAt": datetime.now(UTC)},
)
return created_messages
async def get_user_chat_sessions(
user_id: str,
limit: int = 50,
offset: int = 0,
) -> list[PrismaChatSession]:
"""Get chat sessions for a user, ordered by most recent."""
return await PrismaChatSession.prisma().find_many(
where={"userId": user_id},
order={"updatedAt": "desc"},
take=limit,
skip=offset,
)
async def get_user_session_count(user_id: str) -> int:
"""Get the total number of chat sessions for a user."""
return await PrismaChatSession.prisma().count(where={"userId": user_id})
async def delete_chat_session(session_id: str, user_id: str | None = None) -> bool:
"""Delete a chat session and all its messages.
Args:
session_id: The session ID to delete.
user_id: If provided, validates that the session belongs to this user
before deletion. This prevents unauthorized deletion of other
users' sessions.
Returns:
True if deleted successfully, False otherwise.
"""
try:
# Build typed where clause with optional user_id validation
where_clause: ChatSessionWhereInput = {"id": session_id}
if user_id is not None:
where_clause["userId"] = user_id
result = await PrismaChatSession.prisma().delete_many(where=where_clause)
if result == 0:
logger.warning(
f"No session deleted for {session_id} "
f"(user_id validation: {user_id is not None})"
)
return False
return True
except Exception as e:
logger.error(f"Failed to delete chat session {session_id}: {e}")
return False
async def get_chat_session_message_count(session_id: str) -> int:
"""Get the number of messages in a chat session."""
count = await PrismaChatMessage.prisma().count(where={"sessionId": session_id})
return count

View File

@@ -1,597 +0,0 @@
import asyncio
import logging
import uuid
from datetime import UTC, datetime
from typing import Any
from weakref import WeakValueDictionary
from openai.types.chat import (
ChatCompletionAssistantMessageParam,
ChatCompletionDeveloperMessageParam,
ChatCompletionFunctionMessageParam,
ChatCompletionMessageParam,
ChatCompletionSystemMessageParam,
ChatCompletionToolMessageParam,
ChatCompletionUserMessageParam,
)
from openai.types.chat.chat_completion_assistant_message_param import FunctionCall
from openai.types.chat.chat_completion_message_tool_call_param import (
ChatCompletionMessageToolCallParam,
Function,
)
from prisma.models import ChatMessage as PrismaChatMessage
from prisma.models import ChatSession as PrismaChatSession
from pydantic import BaseModel
from backend.data.redis_client import get_redis_async
from backend.util import json
from backend.util.exceptions import DatabaseError, RedisError
from . import db as chat_db
from .config import ChatConfig
logger = logging.getLogger(__name__)
config = ChatConfig()
def _parse_json_field(value: str | dict | list | None, default: Any = None) -> Any:
"""Parse a JSON field that may be stored as string or already parsed."""
if value is None:
return default
if isinstance(value, str):
return json.loads(value)
return value
# Redis cache key prefix for chat sessions
CHAT_SESSION_CACHE_PREFIX = "chat:session:"
def _get_session_cache_key(session_id: str) -> str:
"""Get the Redis cache key for a chat session."""
return f"{CHAT_SESSION_CACHE_PREFIX}{session_id}"
# Session-level locks to prevent race conditions during concurrent upserts.
# Uses WeakValueDictionary to automatically garbage collect locks when no longer referenced,
# preventing unbounded memory growth while maintaining lock semantics for active sessions.
# Invalidation: Locks are auto-removed by GC when no coroutine holds a reference (after
# async with lock: completes). Explicit cleanup also occurs in delete_chat_session().
_session_locks: WeakValueDictionary[str, asyncio.Lock] = WeakValueDictionary()
_session_locks_mutex = asyncio.Lock()
async def _get_session_lock(session_id: str) -> asyncio.Lock:
"""Get or create a lock for a specific session to prevent concurrent upserts.
Uses WeakValueDictionary for automatic cleanup: locks are garbage collected
when no coroutine holds a reference to them, preventing memory leaks from
unbounded growth of session locks.
"""
async with _session_locks_mutex:
lock = _session_locks.get(session_id)
if lock is None:
lock = asyncio.Lock()
_session_locks[session_id] = lock
return lock
class ChatMessage(BaseModel):
role: str
content: str | None = None
name: str | None = None
tool_call_id: str | None = None
refusal: str | None = None
tool_calls: list[dict] | None = None
function_call: dict | None = None
class Usage(BaseModel):
prompt_tokens: int
completion_tokens: int
total_tokens: int
class ChatSession(BaseModel):
session_id: str
user_id: str
title: str | None = None
messages: list[ChatMessage]
usage: list[Usage]
credentials: dict[str, dict] = {} # Map of provider -> credential metadata
started_at: datetime
updated_at: datetime
successful_agent_runs: dict[str, int] = {}
successful_agent_schedules: dict[str, int] = {}
@staticmethod
def new(user_id: str) -> "ChatSession":
return ChatSession(
session_id=str(uuid.uuid4()),
user_id=user_id,
title=None,
messages=[],
usage=[],
credentials={},
started_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
@staticmethod
def from_db(
prisma_session: PrismaChatSession,
prisma_messages: list[PrismaChatMessage] | None = None,
) -> "ChatSession":
"""Convert Prisma models to Pydantic ChatSession."""
messages = []
if prisma_messages:
for msg in prisma_messages:
messages.append(
ChatMessage(
role=msg.role,
content=msg.content,
name=msg.name,
tool_call_id=msg.toolCallId,
refusal=msg.refusal,
tool_calls=_parse_json_field(msg.toolCalls),
function_call=_parse_json_field(msg.functionCall),
)
)
# Parse JSON fields from Prisma
credentials = _parse_json_field(prisma_session.credentials, default={})
successful_agent_runs = _parse_json_field(
prisma_session.successfulAgentRuns, default={}
)
successful_agent_schedules = _parse_json_field(
prisma_session.successfulAgentSchedules, default={}
)
# Calculate usage from token counts
usage = []
if prisma_session.totalPromptTokens or prisma_session.totalCompletionTokens:
usage.append(
Usage(
prompt_tokens=prisma_session.totalPromptTokens or 0,
completion_tokens=prisma_session.totalCompletionTokens or 0,
total_tokens=(prisma_session.totalPromptTokens or 0)
+ (prisma_session.totalCompletionTokens or 0),
)
)
return ChatSession(
session_id=prisma_session.id,
user_id=prisma_session.userId,
title=prisma_session.title,
messages=messages,
usage=usage,
credentials=credentials,
started_at=prisma_session.createdAt,
updated_at=prisma_session.updatedAt,
successful_agent_runs=successful_agent_runs,
successful_agent_schedules=successful_agent_schedules,
)
def to_openai_messages(self) -> list[ChatCompletionMessageParam]:
messages = []
for message in self.messages:
if message.role == "developer":
m = ChatCompletionDeveloperMessageParam(
role="developer",
content=message.content or "",
)
if message.name:
m["name"] = message.name
messages.append(m)
elif message.role == "system":
m = ChatCompletionSystemMessageParam(
role="system",
content=message.content or "",
)
if message.name:
m["name"] = message.name
messages.append(m)
elif message.role == "user":
m = ChatCompletionUserMessageParam(
role="user",
content=message.content or "",
)
if message.name:
m["name"] = message.name
messages.append(m)
elif message.role == "assistant":
m = ChatCompletionAssistantMessageParam(
role="assistant",
content=message.content or "",
)
if message.function_call:
m["function_call"] = FunctionCall(
arguments=message.function_call["arguments"],
name=message.function_call["name"],
)
if message.refusal:
m["refusal"] = message.refusal
if message.tool_calls:
t: list[ChatCompletionMessageToolCallParam] = []
for tool_call in message.tool_calls:
# Tool calls are stored with nested structure: {id, type, function: {name, arguments}}
function_data = tool_call.get("function", {})
# Skip tool calls that are missing required fields
if "id" not in tool_call or "name" not in function_data:
logger.warning(
f"Skipping invalid tool call: missing required fields. "
f"Got: {tool_call.keys()}, function keys: {function_data.keys()}"
)
continue
# Arguments are stored as a JSON string
arguments_str = function_data.get("arguments", "{}")
t.append(
ChatCompletionMessageToolCallParam(
id=tool_call["id"],
type="function",
function=Function(
arguments=arguments_str,
name=function_data["name"],
),
)
)
m["tool_calls"] = t
if message.name:
m["name"] = message.name
messages.append(m)
elif message.role == "tool":
messages.append(
ChatCompletionToolMessageParam(
role="tool",
content=message.content or "",
tool_call_id=message.tool_call_id or "",
)
)
elif message.role == "function":
messages.append(
ChatCompletionFunctionMessageParam(
role="function",
content=message.content,
name=message.name or "",
)
)
return messages
async def _get_session_from_cache(session_id: str) -> ChatSession | None:
"""Get a chat session from Redis cache."""
redis_key = _get_session_cache_key(session_id)
async_redis = await get_redis_async()
raw_session: bytes | None = await async_redis.get(redis_key)
if raw_session is None:
return None
try:
session = ChatSession.model_validate_json(raw_session)
logger.info(
f"Loading session {session_id} from cache: "
f"message_count={len(session.messages)}, "
f"roles={[m.role for m in session.messages]}"
)
return session
except Exception as e:
logger.error(f"Failed to deserialize session {session_id}: {e}", exc_info=True)
raise RedisError(f"Corrupted session data for {session_id}") from e
async def _cache_session(session: ChatSession) -> None:
"""Cache a chat session in Redis."""
redis_key = _get_session_cache_key(session.session_id)
async_redis = await get_redis_async()
await async_redis.setex(redis_key, config.session_ttl, session.model_dump_json())
async def _get_session_from_db(session_id: str) -> ChatSession | None:
"""Get a chat session from the database."""
prisma_session = await chat_db.get_chat_session(session_id)
if not prisma_session:
return None
messages = prisma_session.Messages
logger.info(
f"Loading session {session_id} from DB: "
f"has_messages={messages is not None}, "
f"message_count={len(messages) if messages else 0}, "
f"roles={[m.role for m in messages] if messages else []}"
)
return ChatSession.from_db(prisma_session, messages)
async def _save_session_to_db(
session: ChatSession, existing_message_count: int
) -> None:
"""Save or update a chat session in the database."""
# Check if session exists in DB
existing = await chat_db.get_chat_session(session.session_id)
if not existing:
# Create new session
await chat_db.create_chat_session(
session_id=session.session_id,
user_id=session.user_id,
)
existing_message_count = 0
# Calculate total tokens from usage
total_prompt = sum(u.prompt_tokens for u in session.usage)
total_completion = sum(u.completion_tokens for u in session.usage)
# Update session metadata
await chat_db.update_chat_session(
session_id=session.session_id,
credentials=session.credentials,
successful_agent_runs=session.successful_agent_runs,
successful_agent_schedules=session.successful_agent_schedules,
total_prompt_tokens=total_prompt,
total_completion_tokens=total_completion,
)
# Add new messages (only those after existing count)
new_messages = session.messages[existing_message_count:]
if new_messages:
messages_data = []
for msg in new_messages:
messages_data.append(
{
"role": msg.role,
"content": msg.content,
"name": msg.name,
"tool_call_id": msg.tool_call_id,
"refusal": msg.refusal,
"tool_calls": msg.tool_calls,
"function_call": msg.function_call,
}
)
logger.info(
f"Saving {len(new_messages)} new messages to DB for session {session.session_id}: "
f"roles={[m['role'] for m in messages_data]}, "
f"start_sequence={existing_message_count}"
)
await chat_db.add_chat_messages_batch(
session_id=session.session_id,
messages=messages_data,
start_sequence=existing_message_count,
)
async def get_chat_session(
session_id: str,
user_id: str | None = None,
) -> ChatSession | None:
"""Get a chat session by ID.
Checks Redis cache first, falls back to database if not found.
Caches database results back to Redis.
Args:
session_id: The session ID to fetch.
user_id: If provided, validates that the session belongs to this user.
If None, ownership is not validated (admin/system access).
"""
# Try cache first
try:
session = await _get_session_from_cache(session_id)
if session:
# Verify user ownership if user_id was provided for validation
if user_id is not None and session.user_id != user_id:
logger.warning(
f"Session {session_id} user id mismatch: {session.user_id} != {user_id}"
)
return None
return session
except RedisError:
logger.warning(f"Cache error for session {session_id}, trying database")
except Exception as e:
logger.warning(f"Unexpected cache error for session {session_id}: {e}")
# Fall back to database
logger.info(f"Session {session_id} not in cache, checking database")
session = await _get_session_from_db(session_id)
if session is None:
logger.warning(f"Session {session_id} not found in cache or database")
return None
# Verify user ownership if user_id was provided for validation
if user_id is not None and session.user_id != user_id:
logger.warning(
f"Session {session_id} user id mismatch: {session.user_id} != {user_id}"
)
return None
# Cache the session from DB
try:
await _cache_session(session)
logger.info(f"Cached session {session_id} from database")
except Exception as e:
logger.warning(f"Failed to cache session {session_id}: {e}")
return session
async def upsert_chat_session(
session: ChatSession,
) -> ChatSession:
"""Update a chat session in both cache and database.
Uses session-level locking to prevent race conditions when concurrent
operations (e.g., background title update and main stream handler)
attempt to upsert the same session simultaneously.
Raises:
DatabaseError: If the database write fails. The cache is still updated
as a best-effort optimization, but the error is propagated to ensure
callers are aware of the persistence failure.
RedisError: If the cache write fails (after successful DB write).
"""
# Acquire session-specific lock to prevent concurrent upserts
lock = await _get_session_lock(session.session_id)
async with lock:
# Get existing message count from DB for incremental saves
existing_message_count = await chat_db.get_chat_session_message_count(
session.session_id
)
db_error: Exception | None = None
# Save to database (primary storage)
try:
await _save_session_to_db(session, existing_message_count)
except Exception as e:
logger.error(
f"Failed to save session {session.session_id} to database: {e}"
)
db_error = e
# Save to cache (best-effort, even if DB failed)
try:
await _cache_session(session)
except Exception as e:
# If DB succeeded but cache failed, raise cache error
if db_error is None:
raise RedisError(
f"Failed to persist chat session {session.session_id} to Redis: {e}"
) from e
# If both failed, log cache error but raise DB error (more critical)
logger.warning(
f"Cache write also failed for session {session.session_id}: {e}"
)
# Propagate DB error after attempting cache (prevents data loss)
if db_error is not None:
raise DatabaseError(
f"Failed to persist chat session {session.session_id} to database"
) from db_error
return session
async def create_chat_session(user_id: str) -> ChatSession:
"""Create a new chat session and persist it.
Raises:
DatabaseError: If the database write fails. We fail fast to ensure
callers never receive a non-persisted session that only exists
in cache (which would be lost when the cache expires).
"""
session = ChatSession.new(user_id)
# Create in database first - fail fast if this fails
try:
await chat_db.create_chat_session(
session_id=session.session_id,
user_id=user_id,
)
except Exception as e:
logger.error(f"Failed to create session {session.session_id} in database: {e}")
raise DatabaseError(
f"Failed to create chat session {session.session_id} in database"
) from e
# Cache the session (best-effort optimization, DB is source of truth)
try:
await _cache_session(session)
except Exception as e:
logger.warning(f"Failed to cache new session {session.session_id}: {e}")
return session
async def get_user_sessions(
user_id: str,
limit: int = 50,
offset: int = 0,
) -> tuple[list[ChatSession], int]:
"""Get chat sessions for a user from the database with total count.
Returns:
A tuple of (sessions, total_count) where total_count is the overall
number of sessions for the user (not just the current page).
"""
prisma_sessions = await chat_db.get_user_chat_sessions(user_id, limit, offset)
total_count = await chat_db.get_user_session_count(user_id)
sessions = []
for prisma_session in prisma_sessions:
# Convert without messages for listing (lighter weight)
sessions.append(ChatSession.from_db(prisma_session, None))
return sessions, total_count
async def delete_chat_session(session_id: str, user_id: str | None = None) -> bool:
"""Delete a chat session from both cache and database.
Args:
session_id: The session ID to delete.
user_id: If provided, validates that the session belongs to this user
before deletion. This prevents unauthorized deletion.
Returns:
True if deleted successfully, False otherwise.
"""
# Delete from database first (with optional user_id validation)
# This confirms ownership before invalidating cache
deleted = await chat_db.delete_chat_session(session_id, user_id)
if not deleted:
return False
# Only invalidate cache and clean up lock after DB confirms deletion
try:
redis_key = _get_session_cache_key(session_id)
async_redis = await get_redis_async()
await async_redis.delete(redis_key)
except Exception as e:
logger.warning(f"Failed to delete session {session_id} from cache: {e}")
# Clean up session lock (belt-and-suspenders with WeakValueDictionary)
async with _session_locks_mutex:
_session_locks.pop(session_id, None)
return True
async def update_session_title(session_id: str, title: str) -> bool:
"""Update only the title of a chat session.
This is a lightweight operation that doesn't touch messages, avoiding
race conditions with concurrent message updates. Use this for background
title generation instead of upsert_chat_session.
Args:
session_id: The session ID to update.
title: The new title to set.
Returns:
True if updated successfully, False otherwise.
"""
try:
result = await chat_db.update_chat_session(session_id=session_id, title=title)
if result is None:
logger.warning(f"Session {session_id} not found for title update")
return False
# Invalidate cache so next fetch gets updated title
try:
redis_key = _get_session_cache_key(session_id)
async_redis = await get_redis_async()
await async_redis.delete(redis_key)
except Exception as e:
logger.warning(f"Failed to invalidate cache for session {session_id}: {e}")
return True
except Exception as e:
logger.error(f"Failed to update title for session {session_id}: {e}")
return False

View File

@@ -1,119 +0,0 @@
import pytest
from .model import (
ChatMessage,
ChatSession,
Usage,
get_chat_session,
upsert_chat_session,
)
messages = [
ChatMessage(content="Hello, how are you?", role="user"),
ChatMessage(
content="I'm fine, thank you!",
role="assistant",
tool_calls=[
{
"id": "t123",
"type": "function",
"function": {
"name": "get_weather",
"arguments": '{"city": "New York"}',
},
}
],
),
ChatMessage(
content="I'm using the tool to get the weather",
role="tool",
tool_call_id="t123",
),
]
@pytest.mark.asyncio(loop_scope="session")
async def test_chatsession_serialization_deserialization():
s = ChatSession.new(user_id="abc123")
s.messages = messages
s.usage = [Usage(prompt_tokens=100, completion_tokens=200, total_tokens=300)]
serialized = s.model_dump_json()
s2 = ChatSession.model_validate_json(serialized)
assert s2.model_dump() == s.model_dump()
@pytest.mark.asyncio(loop_scope="session")
async def test_chatsession_redis_storage(setup_test_user, test_user_id):
s = ChatSession.new(user_id=test_user_id)
s.messages = messages
s = await upsert_chat_session(s)
s2 = await get_chat_session(
session_id=s.session_id,
user_id=s.user_id,
)
assert s2 == s
@pytest.mark.asyncio(loop_scope="session")
async def test_chatsession_redis_storage_user_id_mismatch(
setup_test_user, test_user_id
):
s = ChatSession.new(user_id=test_user_id)
s.messages = messages
s = await upsert_chat_session(s)
s2 = await get_chat_session(s.session_id, "different_user_id")
assert s2 is None
@pytest.mark.asyncio(loop_scope="session")
async def test_chatsession_db_storage(setup_test_user, test_user_id):
"""Test that messages are correctly saved to and loaded from DB (not cache)."""
from backend.data.redis_client import get_redis_async
# Create session with messages including assistant message
s = ChatSession.new(user_id=test_user_id)
s.messages = messages # Contains user, assistant, and tool messages
assert s.session_id is not None, "Session id is not set"
# Upsert to save to both cache and DB
s = await upsert_chat_session(s)
# Clear the Redis cache to force DB load
redis_key = f"chat:session:{s.session_id}"
async_redis = await get_redis_async()
await async_redis.delete(redis_key)
# Load from DB (cache was cleared)
s2 = await get_chat_session(
session_id=s.session_id,
user_id=s.user_id,
)
assert s2 is not None, "Session not found after loading from DB"
assert len(s2.messages) == len(
s.messages
), f"Message count mismatch: expected {len(s.messages)}, got {len(s2.messages)}"
# Verify all roles are present
roles = [m.role for m in s2.messages]
assert "user" in roles, f"User message missing. Roles found: {roles}"
assert "assistant" in roles, f"Assistant message missing. Roles found: {roles}"
assert "tool" in roles, f"Tool message missing. Roles found: {roles}"
# Verify message content
for orig, loaded in zip(s.messages, s2.messages):
assert orig.role == loaded.role, f"Role mismatch: {orig.role} != {loaded.role}"
assert (
orig.content == loaded.content
), f"Content mismatch for {orig.role}: {orig.content} != {loaded.content}"
if orig.tool_calls:
assert (
loaded.tool_calls is not None
), f"Tool calls missing for {orig.role} message"
assert len(orig.tool_calls) == len(loaded.tool_calls)

View File

@@ -1,144 +0,0 @@
"""
Response models for Vercel AI SDK UI Stream Protocol.
This module implements the AI SDK UI Stream Protocol (v1) for streaming chat responses.
See: https://ai-sdk.dev/docs/ai-sdk-ui/stream-protocol
"""
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
class ResponseType(str, Enum):
"""Types of streaming responses following AI SDK protocol."""
# Message lifecycle
START = "start"
FINISH = "finish"
# Text streaming
TEXT_START = "text-start"
TEXT_DELTA = "text-delta"
TEXT_END = "text-end"
# Tool interaction
TOOL_INPUT_START = "tool-input-start"
TOOL_INPUT_AVAILABLE = "tool-input-available"
TOOL_OUTPUT_AVAILABLE = "tool-output-available"
# Other
ERROR = "error"
USAGE = "usage"
class StreamBaseResponse(BaseModel):
"""Base response model for all streaming responses."""
type: ResponseType
def to_sse(self) -> str:
"""Convert to SSE format."""
return f"data: {self.model_dump_json()}\n\n"
# ========== Message Lifecycle ==========
class StreamStart(StreamBaseResponse):
"""Start of a new message."""
type: ResponseType = ResponseType.START
messageId: str = Field(..., description="Unique message ID")
class StreamFinish(StreamBaseResponse):
"""End of message/stream."""
type: ResponseType = ResponseType.FINISH
# ========== Text Streaming ==========
class StreamTextStart(StreamBaseResponse):
"""Start of a text block."""
type: ResponseType = ResponseType.TEXT_START
id: str = Field(..., description="Text block ID")
class StreamTextDelta(StreamBaseResponse):
"""Streaming text content delta."""
type: ResponseType = ResponseType.TEXT_DELTA
id: str = Field(..., description="Text block ID")
delta: str = Field(..., description="Text content delta")
class StreamTextEnd(StreamBaseResponse):
"""End of a text block."""
type: ResponseType = ResponseType.TEXT_END
id: str = Field(..., description="Text block ID")
# ========== Tool Interaction ==========
class StreamToolInputStart(StreamBaseResponse):
"""Tool call started notification."""
type: ResponseType = ResponseType.TOOL_INPUT_START
toolCallId: str = Field(..., description="Unique tool call ID")
toolName: str = Field(..., description="Name of the tool being called")
class StreamToolInputAvailable(StreamBaseResponse):
"""Tool input is ready for execution."""
type: ResponseType = ResponseType.TOOL_INPUT_AVAILABLE
toolCallId: str = Field(..., description="Unique tool call ID")
toolName: str = Field(..., description="Name of the tool being called")
input: dict[str, Any] = Field(
default_factory=dict, description="Tool input arguments"
)
class StreamToolOutputAvailable(StreamBaseResponse):
"""Tool execution result."""
type: ResponseType = ResponseType.TOOL_OUTPUT_AVAILABLE
toolCallId: str = Field(..., description="Tool call ID this responds to")
output: str | dict[str, Any] = Field(..., description="Tool execution output")
# Additional fields for internal use (not part of AI SDK spec but useful)
toolName: str | None = Field(
default=None, description="Name of the tool that was executed"
)
success: bool = Field(
default=True, description="Whether the tool execution succeeded"
)
# ========== Other ==========
class StreamUsage(StreamBaseResponse):
"""Token usage statistics."""
type: ResponseType = ResponseType.USAGE
promptTokens: int = Field(..., description="Number of prompt tokens")
completionTokens: int = Field(..., description="Number of completion tokens")
totalTokens: int = Field(..., description="Total number of tokens")
class StreamError(StreamBaseResponse):
"""Error response."""
type: ResponseType = ResponseType.ERROR
errorText: str = Field(..., description="Error message text")
code: str | None = Field(default=None, description="Error code")
details: dict[str, Any] | None = Field(
default=None, description="Additional error details"
)

View File

@@ -1,362 +0,0 @@
"""Chat API routes for chat session management and streaming via SSE."""
import logging
from collections.abc import AsyncGenerator
from typing import Annotated
from autogpt_libs import auth
from fastapi import APIRouter, Depends, Query, Security
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from backend.util.exceptions import NotFoundError
from . import service as chat_service
from .config import ChatConfig
from .model import ChatSession, create_chat_session, get_chat_session, get_user_sessions
config = ChatConfig()
logger = logging.getLogger(__name__)
async def _validate_and_get_session(
session_id: str,
user_id: str | None,
) -> ChatSession:
"""Validate session exists and belongs to user."""
session = await get_chat_session(session_id, user_id)
if not session:
raise NotFoundError(f"Session {session_id} not found.")
return session
router = APIRouter(
tags=["chat"],
)
# ========== Request/Response Models ==========
class StreamChatRequest(BaseModel):
"""Request model for streaming chat with optional context."""
message: str
is_user_message: bool = True
context: dict[str, str] | None = None # {url: str, content: str}
class CreateSessionResponse(BaseModel):
"""Response model containing information on a newly created chat session."""
id: str
created_at: str
user_id: str | None
class SessionDetailResponse(BaseModel):
"""Response model providing complete details for a chat session, including messages."""
id: str
created_at: str
updated_at: str
user_id: str | None
messages: list[dict]
class SessionSummaryResponse(BaseModel):
"""Response model for a session summary (without messages)."""
id: str
created_at: str
updated_at: str
title: str | None = None
class ListSessionsResponse(BaseModel):
"""Response model for listing chat sessions."""
sessions: list[SessionSummaryResponse]
total: int
# ========== Routes ==========
@router.get(
"/sessions",
dependencies=[Security(auth.requires_user)],
)
async def list_sessions(
user_id: Annotated[str, Security(auth.get_user_id)],
limit: int = Query(default=50, ge=1, le=100),
offset: int = Query(default=0, ge=0),
) -> ListSessionsResponse:
"""
List chat sessions for the authenticated user.
Returns a paginated list of chat sessions belonging to the current user,
ordered by most recently updated.
Args:
user_id: The authenticated user's ID.
limit: Maximum number of sessions to return (1-100).
offset: Number of sessions to skip for pagination.
Returns:
ListSessionsResponse: List of session summaries and total count.
"""
sessions, total_count = await get_user_sessions(user_id, limit, offset)
return ListSessionsResponse(
sessions=[
SessionSummaryResponse(
id=session.session_id,
created_at=session.started_at.isoformat(),
updated_at=session.updated_at.isoformat(),
title=session.title,
)
for session in sessions
],
total=total_count,
)
@router.post(
"/sessions",
)
async def create_session(
user_id: Annotated[str, Depends(auth.get_user_id)],
) -> CreateSessionResponse:
"""
Create a new chat session.
Initiates a new chat session for the authenticated user.
Args:
user_id: The authenticated user ID parsed from the JWT (required).
Returns:
CreateSessionResponse: Details of the created session.
"""
logger.info(
f"Creating session with user_id: "
f"...{user_id[-8:] if len(user_id) > 8 else '<redacted>'}"
)
session = await create_chat_session(user_id)
return CreateSessionResponse(
id=session.session_id,
created_at=session.started_at.isoformat(),
user_id=session.user_id,
)
@router.get(
"/sessions/{session_id}",
)
async def get_session(
session_id: str,
user_id: Annotated[str | None, Depends(auth.get_user_id)],
) -> SessionDetailResponse:
"""
Retrieve the details of a specific chat session.
Looks up a chat session by ID for the given user (if authenticated) and returns all session data including messages.
Args:
session_id: The unique identifier for the desired chat session.
user_id: The optional authenticated user ID, or None for anonymous access.
Returns:
SessionDetailResponse: Details for the requested session; raises NotFoundError if not found.
"""
session = await get_chat_session(session_id, user_id)
if not session:
raise NotFoundError(f"Session {session_id} not found")
messages = [message.model_dump() for message in session.messages]
logger.info(
f"Returning session {session_id}: "
f"message_count={len(messages)}, "
f"roles={[m.get('role') for m in messages]}"
)
return SessionDetailResponse(
id=session.session_id,
created_at=session.started_at.isoformat(),
updated_at=session.updated_at.isoformat(),
user_id=session.user_id or None,
messages=messages,
)
@router.post(
"/sessions/{session_id}/stream",
)
async def stream_chat_post(
session_id: str,
request: StreamChatRequest,
user_id: str | None = Depends(auth.get_user_id),
):
"""
Stream chat responses for a session (POST with context support).
Streams the AI/completion responses in real time over Server-Sent Events (SSE), including:
- Text fragments as they are generated
- Tool call UI elements (if invoked)
- Tool execution results
Args:
session_id: The chat session identifier to associate with the streamed messages.
request: Request body containing message, is_user_message, and optional context.
user_id: Optional authenticated user ID.
Returns:
StreamingResponse: SSE-formatted response chunks.
"""
session = await _validate_and_get_session(session_id, user_id)
async def event_generator() -> AsyncGenerator[str, None]:
async for chunk in chat_service.stream_chat_completion(
session_id,
request.message,
is_user_message=request.is_user_message,
user_id=user_id,
session=session, # Pass pre-fetched session to avoid double-fetch
context=request.context,
):
yield chunk.to_sse()
# AI SDK protocol termination
yield "data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
"x-vercel-ai-ui-message-stream": "v1", # AI SDK protocol header
},
)
@router.get(
"/sessions/{session_id}/stream",
)
async def stream_chat_get(
session_id: str,
message: Annotated[str, Query(min_length=1, max_length=10000)],
user_id: str | None = Depends(auth.get_user_id),
is_user_message: bool = Query(default=True),
):
"""
Stream chat responses for a session (GET - legacy endpoint).
Streams the AI/completion responses in real time over Server-Sent Events (SSE), including:
- Text fragments as they are generated
- Tool call UI elements (if invoked)
- Tool execution results
Args:
session_id: The chat session identifier to associate with the streamed messages.
message: The user's new message to process.
user_id: Optional authenticated user ID.
is_user_message: Whether the message is a user message.
Returns:
StreamingResponse: SSE-formatted response chunks.
"""
session = await _validate_and_get_session(session_id, user_id)
async def event_generator() -> AsyncGenerator[str, None]:
async for chunk in chat_service.stream_chat_completion(
session_id,
message,
is_user_message=is_user_message,
user_id=user_id,
session=session, # Pass pre-fetched session to avoid double-fetch
):
yield chunk.to_sse()
# AI SDK protocol termination
yield "data: [DONE]\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
"x-vercel-ai-ui-message-stream": "v1", # AI SDK protocol header
},
)
@router.patch(
"/sessions/{session_id}/assign-user",
dependencies=[Security(auth.requires_user)],
status_code=200,
)
async def session_assign_user(
session_id: str,
user_id: Annotated[str, Security(auth.get_user_id)],
) -> dict:
"""
Assign an authenticated user to a chat session.
Used (typically post-login) to claim an existing anonymous session as the current authenticated user.
Args:
session_id: The identifier for the (previously anonymous) session.
user_id: The authenticated user's ID to associate with the session.
Returns:
dict: Status of the assignment.
"""
await chat_service.assign_user_to_session(session_id, user_id)
return {"status": "ok"}
# ========== Health Check ==========
@router.get("/health", status_code=200)
async def health_check() -> dict:
"""
Health check endpoint for the chat service.
Performs a full cycle test of session creation and retrieval. Should always return healthy
if the service and data layer are operational.
Returns:
dict: A status dictionary indicating health, service name, and API version.
"""
from backend.data.user import get_or_create_user
# Ensure health check user exists (required for FK constraint)
health_check_user_id = "health-check-user"
await get_or_create_user(
{
"sub": health_check_user_id,
"email": "health-check@system.local",
"user_metadata": {"name": "Health Check User"},
}
)
# Create and retrieve session to verify full data layer
session = await create_chat_session(health_check_user_id)
await get_chat_session(session.session_id, health_check_user_id)
return {
"status": "healthy",
"service": "chat",
"version": "0.1.0",
}

View File

@@ -1,904 +0,0 @@
import asyncio
import logging
from collections.abc import AsyncGenerator
from typing import Any
import orjson
from langfuse import Langfuse
from openai import (
APIConnectionError,
APIError,
APIStatusError,
AsyncOpenAI,
RateLimitError,
)
from openai.types.chat import ChatCompletionChunk, ChatCompletionToolParam
from backend.data.understanding import (
format_understanding_for_prompt,
get_business_understanding,
)
from backend.util.exceptions import NotFoundError
from backend.util.settings import Settings
from . import db as chat_db
from .config import ChatConfig
from .model import (
ChatMessage,
ChatSession,
Usage,
get_chat_session,
update_session_title,
upsert_chat_session,
)
from .response_model import (
StreamBaseResponse,
StreamError,
StreamFinish,
StreamStart,
StreamTextDelta,
StreamTextEnd,
StreamTextStart,
StreamToolInputAvailable,
StreamToolInputStart,
StreamToolOutputAvailable,
StreamUsage,
)
from .tools import execute_tool, tools
logger = logging.getLogger(__name__)
config = ChatConfig()
settings = Settings()
client = AsyncOpenAI(api_key=config.api_key, base_url=config.base_url)
# Langfuse client (lazy initialization)
_langfuse_client: Langfuse | None = None
class LangfuseNotConfiguredError(Exception):
"""Raised when Langfuse is required but not configured."""
pass
def _is_langfuse_configured() -> bool:
"""Check if Langfuse credentials are configured."""
return bool(
settings.secrets.langfuse_public_key and settings.secrets.langfuse_secret_key
)
def _get_langfuse_client() -> Langfuse:
"""Get or create the Langfuse client for prompt management and tracing."""
global _langfuse_client
if _langfuse_client is None:
if not _is_langfuse_configured():
raise LangfuseNotConfiguredError(
"Langfuse is not configured. The chat feature requires Langfuse for prompt management. "
"Please set the LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY environment variables."
)
_langfuse_client = Langfuse(
public_key=settings.secrets.langfuse_public_key,
secret_key=settings.secrets.langfuse_secret_key,
host=settings.secrets.langfuse_host or "https://cloud.langfuse.com",
)
return _langfuse_client
def _get_environment() -> str:
"""Get the current environment name for Langfuse tagging."""
return settings.config.app_env.value
def _get_langfuse_prompt() -> str:
"""Fetch the latest production prompt from Langfuse.
Returns:
The compiled prompt text from Langfuse.
Raises:
Exception: If Langfuse is unavailable or prompt fetch fails.
"""
try:
langfuse = _get_langfuse_client()
# cache_ttl_seconds=0 disables SDK caching to always get the latest prompt
prompt = langfuse.get_prompt(config.langfuse_prompt_name, cache_ttl_seconds=0)
compiled = prompt.compile()
logger.info(
f"Fetched prompt '{config.langfuse_prompt_name}' from Langfuse "
f"(version: {prompt.version})"
)
return compiled
except Exception as e:
logger.error(f"Failed to fetch prompt from Langfuse: {e}")
raise
async def _is_first_session(user_id: str) -> bool:
"""Check if this is the user's first chat session.
Returns True if the user has 1 or fewer sessions (meaning this is their first).
"""
try:
session_count = await chat_db.get_user_session_count(user_id)
return session_count <= 1
except Exception as e:
logger.warning(f"Failed to check session count for user {user_id}: {e}")
return False # Default to non-onboarding if we can't check
async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
"""Build the full system prompt including business understanding if available.
Args:
user_id: The user ID for fetching business understanding
If "default" and this is the user's first session, will use "onboarding" instead.
Returns:
Tuple of (compiled prompt string, Langfuse prompt object for tracing)
"""
langfuse = _get_langfuse_client()
# cache_ttl_seconds=0 disables SDK caching to always get the latest prompt
prompt = langfuse.get_prompt(config.langfuse_prompt_name, cache_ttl_seconds=0)
# If user is authenticated, try to fetch their business understanding
understanding = None
if user_id:
try:
understanding = await get_business_understanding(user_id)
except Exception as e:
logger.warning(f"Failed to fetch business understanding: {e}")
understanding = None
if understanding:
context = format_understanding_for_prompt(understanding)
else:
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
compiled = prompt.compile(users_information=context)
return compiled, prompt
async def _generate_session_title(message: str) -> str | None:
"""Generate a concise title for a chat session based on the first message.
Args:
message: The first user message in the session
Returns:
A short title (3-6 words) or None if generation fails
"""
try:
response = await client.chat.completions.create(
model=config.title_model,
messages=[
{
"role": "system",
"content": (
"Generate a very short title (3-6 words) for a chat conversation "
"based on the user's first message. The title should capture the "
"main topic or intent. Return ONLY the title, no quotes or punctuation."
),
},
{"role": "user", "content": message[:500]}, # Limit input length
],
max_tokens=20,
)
title = response.choices[0].message.content
if title:
# Clean up the title
title = title.strip().strip("\"'")
# Limit length
if len(title) > 50:
title = title[:47] + "..."
return title
return None
except Exception as e:
logger.warning(f"Failed to generate session title: {e}")
return None
async def assign_user_to_session(
session_id: str,
user_id: str,
) -> ChatSession:
"""
Assign a user to a chat session.
"""
session = await get_chat_session(session_id, None)
if not session:
raise NotFoundError(f"Session {session_id} not found")
session.user_id = user_id
return await upsert_chat_session(session)
async def stream_chat_completion(
session_id: str,
message: str | None = None,
is_user_message: bool = True,
user_id: str | None = None,
retry_count: int = 0,
session: ChatSession | None = None,
context: dict[str, str] | None = None, # {url: str, content: str}
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Main entry point for streaming chat completions with database handling.
This function handles all database operations and delegates streaming
to the internal _stream_chat_chunks function.
Args:
session_id: Chat session ID
user_message: User's input message
user_id: User ID for authentication (None for anonymous)
session: Optional pre-loaded session object (for recursive calls to avoid Redis refetch)
Yields:
StreamBaseResponse objects formatted as SSE
Raises:
NotFoundError: If session_id is invalid
ValueError: If max_context_messages is exceeded
"""
logger.info(
f"Streaming chat completion for session {session_id} for message {message} and user id {user_id}. Message is user message: {is_user_message}"
)
# Check if Langfuse is configured - required for chat functionality
if not _is_langfuse_configured():
logger.error("Chat request failed: Langfuse is not configured")
yield StreamError(
errorText="Chat service is not available. Langfuse must be configured "
"with LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY environment variables."
)
yield StreamFinish()
return
# Langfuse observations will be created after session is loaded (need messages for input)
# Initialize to None so finally block can safely check and end them
trace = None
generation = None
# Only fetch from Redis if session not provided (initial call)
if session is None:
session = await get_chat_session(session_id, user_id)
logger.info(
f"Fetched session from Redis: {session.session_id if session else 'None'}, "
f"message_count={len(session.messages) if session else 0}"
)
else:
logger.info(
f"Using provided session object: {session.session_id}, "
f"message_count={len(session.messages)}"
)
if not session:
raise NotFoundError(
f"Session {session_id} not found. Please create a new session first."
)
if message:
# Build message content with context if provided
message_content = message
if context and context.get("url") and context.get("content"):
context_text = f"Page URL: {context['url']}\n\nPage Content:\n{context['content']}\n\n---\n\nUser Message: {message}"
message_content = context_text
logger.info(
f"Including page context: URL={context['url']}, content_length={len(context['content'])}"
)
session.messages.append(
ChatMessage(
role="user" if is_user_message else "assistant", content=message_content
)
)
logger.info(
f"Appended message (role={'user' if is_user_message else 'assistant'}), "
f"new message_count={len(session.messages)}"
)
logger.info(
f"Upserting session: {session.session_id} with user id {session.user_id}, "
f"message_count={len(session.messages)}"
)
session = await upsert_chat_session(session)
assert session, "Session not found"
# Generate title for new sessions on first user message (non-blocking)
# Check: is_user_message, no title yet, and this is the first user message
if is_user_message and message and not session.title:
user_messages = [m for m in session.messages if m.role == "user"]
if len(user_messages) == 1:
# First user message - generate title in background
import asyncio
# Capture only the values we need (not the session object) to avoid
# stale data issues when the main flow modifies the session
captured_session_id = session_id
captured_message = message
async def _update_title():
try:
title = await _generate_session_title(captured_message)
if title:
# Use dedicated title update function that doesn't
# touch messages, avoiding race conditions
await update_session_title(captured_session_id, title)
logger.info(
f"Generated title for session {captured_session_id}: {title}"
)
except Exception as e:
logger.warning(f"Failed to update session title: {e}")
# Fire and forget - don't block the chat response
asyncio.create_task(_update_title())
# Build system prompt with business understanding
system_prompt, langfuse_prompt = await _build_system_prompt(user_id)
# Build input messages including system prompt for complete Langfuse logging
trace_input_messages = [{"role": "system", "content": system_prompt}] + [
m.model_dump() for m in session.messages
]
# Create Langfuse trace for this LLM call (each call gets its own trace, grouped by session_id)
# Using v3 SDK: start_observation creates a root span, update_trace sets trace-level attributes
try:
langfuse = _get_langfuse_client()
env = _get_environment()
trace = langfuse.start_observation(
name="chat_completion",
input={"messages": trace_input_messages},
metadata={
"environment": env,
"model": config.model,
"message_count": len(session.messages),
"prompt_name": langfuse_prompt.name if langfuse_prompt else None,
"prompt_version": langfuse_prompt.version if langfuse_prompt else None,
},
)
# Set trace-level attributes (session_id, user_id, tags)
trace.update_trace(
session_id=session_id,
user_id=user_id,
tags=[env, "copilot"],
)
except Exception as e:
logger.warning(f"Failed to create Langfuse trace: {e}")
# Initialize variables that will be used in finally block (must be defined before try)
assistant_response = ChatMessage(
role="assistant",
content="",
)
accumulated_tool_calls: list[dict[str, Any]] = []
# Wrap main logic in try/finally to ensure Langfuse observations are always ended
try:
has_yielded_end = False
has_yielded_error = False
has_done_tool_call = False
has_received_text = False
text_streaming_ended = False
tool_response_messages: list[ChatMessage] = []
should_retry = False
# Generate unique IDs for AI SDK protocol
import uuid as uuid_module
message_id = str(uuid_module.uuid4())
text_block_id = str(uuid_module.uuid4())
# Yield message start
yield StreamStart(messageId=message_id)
# Create Langfuse generation for each LLM call, linked to the prompt
# Using v3 SDK: start_observation with as_type="generation"
generation = (
trace.start_observation(
as_type="generation",
name="llm_call",
model=config.model,
input={"messages": trace_input_messages},
prompt=langfuse_prompt,
)
if trace
else None
)
try:
async for chunk in _stream_chat_chunks(
session=session,
tools=tools,
system_prompt=system_prompt,
text_block_id=text_block_id,
):
if isinstance(chunk, StreamTextStart):
# Emit text-start before first text delta
if not has_received_text:
yield chunk
elif isinstance(chunk, StreamTextDelta):
delta = chunk.delta or ""
assert assistant_response.content is not None
assistant_response.content += delta
has_received_text = True
yield chunk
elif isinstance(chunk, StreamTextEnd):
# Emit text-end after text completes
if has_received_text and not text_streaming_ended:
text_streaming_ended = True
yield chunk
elif isinstance(chunk, StreamToolInputStart):
# Emit text-end before first tool call, but only if we've received text
if has_received_text and not text_streaming_ended:
yield StreamTextEnd(id=text_block_id)
text_streaming_ended = True
yield chunk
elif isinstance(chunk, StreamToolInputAvailable):
# Accumulate tool calls in OpenAI format
accumulated_tool_calls.append(
{
"id": chunk.toolCallId,
"type": "function",
"function": {
"name": chunk.toolName,
"arguments": orjson.dumps(chunk.input).decode("utf-8"),
},
}
)
elif isinstance(chunk, StreamToolOutputAvailable):
result_content = (
chunk.output
if isinstance(chunk.output, str)
else orjson.dumps(chunk.output).decode("utf-8")
)
tool_response_messages.append(
ChatMessage(
role="tool",
content=result_content,
tool_call_id=chunk.toolCallId,
)
)
has_done_tool_call = True
# Track if any tool execution failed
if not chunk.success:
logger.warning(
f"Tool {chunk.toolName} (ID: {chunk.toolCallId}) execution failed"
)
yield chunk
elif isinstance(chunk, StreamFinish):
if not has_done_tool_call:
# Emit text-end before finish if we received text but haven't closed it
if has_received_text and not text_streaming_ended:
yield StreamTextEnd(id=text_block_id)
text_streaming_ended = True
has_yielded_end = True
yield chunk
elif isinstance(chunk, StreamError):
has_yielded_error = True
elif isinstance(chunk, StreamUsage):
session.usage.append(
Usage(
prompt_tokens=chunk.promptTokens,
completion_tokens=chunk.completionTokens,
total_tokens=chunk.totalTokens,
)
)
else:
logger.error(f"Unknown chunk type: {type(chunk)}", exc_info=True)
except Exception as e:
logger.error(f"Error during stream: {e!s}", exc_info=True)
# Check if this is a retryable error (JSON parsing, incomplete tool calls, etc.)
is_retryable = isinstance(e, (orjson.JSONDecodeError, KeyError, TypeError))
if is_retryable and retry_count < config.max_retries:
logger.info(
f"Retryable error encountered. Attempt {retry_count + 1}/{config.max_retries}"
)
should_retry = True
else:
# Non-retryable error or max retries exceeded
# Save any partial progress before reporting error
messages_to_save: list[ChatMessage] = []
# Add assistant message if it has content or tool calls
if accumulated_tool_calls:
assistant_response.tool_calls = accumulated_tool_calls
if assistant_response.content or assistant_response.tool_calls:
messages_to_save.append(assistant_response)
# Add tool response messages after assistant message
messages_to_save.extend(tool_response_messages)
session.messages.extend(messages_to_save)
await upsert_chat_session(session)
if not has_yielded_error:
error_message = str(e)
if not is_retryable:
error_message = f"Non-retryable error: {error_message}"
elif retry_count >= config.max_retries:
error_message = f"Max retries ({config.max_retries}) exceeded: {error_message}"
error_response = StreamError(errorText=error_message)
yield error_response
if not has_yielded_end:
yield StreamFinish()
return
# Handle retry outside of exception handler to avoid nesting
if should_retry and retry_count < config.max_retries:
logger.info(
f"Retrying stream_chat_completion for session {session_id}, attempt {retry_count + 1}"
)
async for chunk in stream_chat_completion(
session_id=session.session_id,
user_id=user_id,
retry_count=retry_count + 1,
session=session,
context=context,
):
yield chunk
return # Exit after retry to avoid double-saving in finally block
# Normal completion path - save session and handle tool call continuation
logger.info(
f"Normal completion path: session={session.session_id}, "
f"current message_count={len(session.messages)}"
)
# Build the messages list in the correct order
messages_to_save: list[ChatMessage] = []
# Add assistant message with tool_calls if any
if accumulated_tool_calls:
assistant_response.tool_calls = accumulated_tool_calls
logger.info(
f"Added {len(accumulated_tool_calls)} tool calls to assistant message"
)
if assistant_response.content or assistant_response.tool_calls:
messages_to_save.append(assistant_response)
logger.info(
f"Saving assistant message with content_len={len(assistant_response.content or '')}, tool_calls={len(assistant_response.tool_calls or [])}"
)
# Add tool response messages after assistant message
messages_to_save.extend(tool_response_messages)
logger.info(
f"Saving {len(tool_response_messages)} tool response messages, "
f"total_to_save={len(messages_to_save)}"
)
session.messages.extend(messages_to_save)
logger.info(
f"Extended session messages, new message_count={len(session.messages)}"
)
await upsert_chat_session(session)
# If we did a tool call, stream the chat completion again to get the next response
if has_done_tool_call:
logger.info(
"Tool call executed, streaming chat completion again to get assistant response"
)
async for chunk in stream_chat_completion(
session_id=session.session_id,
user_id=user_id,
session=session, # Pass session object to avoid Redis refetch
context=context,
):
yield chunk
finally:
# Always end Langfuse observations to prevent resource leaks
# Guard against None and catch errors to avoid masking original exceptions
if generation is not None:
try:
latest_usage = session.usage[-1] if session.usage else None
generation.update(
model=config.model,
output={
"content": assistant_response.content,
"tool_calls": accumulated_tool_calls or None,
},
usage_details=(
{
"input": latest_usage.prompt_tokens,
"output": latest_usage.completion_tokens,
"total": latest_usage.total_tokens,
}
if latest_usage
else None
),
)
generation.end()
except Exception as e:
logger.warning(f"Failed to end Langfuse generation: {e}")
if trace is not None:
try:
if accumulated_tool_calls:
trace.update_trace(output={"tool_calls": accumulated_tool_calls})
else:
trace.update_trace(output={"response": assistant_response.content})
trace.end()
except Exception as e:
logger.warning(f"Failed to end Langfuse trace: {e}")
# Retry configuration for OpenAI API calls
MAX_RETRIES = 3
BASE_DELAY_SECONDS = 1.0
MAX_DELAY_SECONDS = 30.0
def _is_retryable_error(error: Exception) -> bool:
"""Determine if an error is retryable."""
if isinstance(error, RateLimitError):
return True
if isinstance(error, APIConnectionError):
return True
if isinstance(error, APIStatusError):
# APIStatusError has a response with status_code
# Retry on 5xx status codes (server errors)
if error.response.status_code >= 500:
return True
if isinstance(error, APIError):
# Retry on overloaded errors or 500 errors (may not have status code)
error_message = str(error).lower()
if "overloaded" in error_message or "internal server error" in error_message:
return True
return False
async def _stream_chat_chunks(
session: ChatSession,
tools: list[ChatCompletionToolParam],
system_prompt: str | None = None,
text_block_id: str | None = None,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""
Pure streaming function for OpenAI chat completions with tool calling.
This function is database-agnostic and focuses only on streaming logic.
Implements exponential backoff retry for transient API errors.
Args:
session: Chat session with conversation history
tools: Available tools for the model
system_prompt: System prompt to prepend to messages
Yields:
SSE formatted JSON response objects
"""
model = config.model
logger.info("Starting pure chat stream")
# Build messages with system prompt prepended
messages = session.to_openai_messages()
if system_prompt:
from openai.types.chat import ChatCompletionSystemMessageParam
system_message = ChatCompletionSystemMessageParam(
role="system",
content=system_prompt,
)
messages = [system_message] + messages
# Loop to handle tool calls and continue conversation
while True:
retry_count = 0
last_error: Exception | None = None
while retry_count <= MAX_RETRIES:
try:
logger.info(
f"Creating OpenAI chat completion stream..."
f"{f' (retry {retry_count}/{MAX_RETRIES})' if retry_count > 0 else ''}"
)
# Create the stream with proper types
stream = await client.chat.completions.create(
model=model,
messages=messages,
tools=tools,
tool_choice="auto",
stream=True,
stream_options={"include_usage": True},
)
# Variables to accumulate tool calls
tool_calls: list[dict[str, Any]] = []
active_tool_call_idx: int | None = None
finish_reason: str | None = None
# Track which tool call indices have had their start event emitted
emitted_start_for_idx: set[int] = set()
# Track if we've started the text block
text_started = False
# Process the stream
chunk: ChatCompletionChunk
async for chunk in stream:
if chunk.usage:
yield StreamUsage(
promptTokens=chunk.usage.prompt_tokens,
completionTokens=chunk.usage.completion_tokens,
totalTokens=chunk.usage.total_tokens,
)
if chunk.choices:
choice = chunk.choices[0]
delta = choice.delta
# Capture finish reason
if choice.finish_reason:
finish_reason = choice.finish_reason
logger.info(f"Finish reason: {finish_reason}")
# Handle content streaming
if delta.content:
# Emit text-start on first text content
if not text_started and text_block_id:
yield StreamTextStart(id=text_block_id)
text_started = True
# Stream the text delta
text_response = StreamTextDelta(
id=text_block_id or "",
delta=delta.content,
)
yield text_response
# Handle tool calls
if delta.tool_calls:
for tc_chunk in delta.tool_calls:
idx = tc_chunk.index
# Update active tool call index if needed
if (
active_tool_call_idx is None
or active_tool_call_idx != idx
):
active_tool_call_idx = idx
# Ensure we have a tool call object at this index
while len(tool_calls) <= idx:
tool_calls.append(
{
"id": "",
"type": "function",
"function": {
"name": "",
"arguments": "",
},
},
)
# Accumulate the tool call data
if tc_chunk.id:
tool_calls[idx]["id"] = tc_chunk.id
if tc_chunk.function:
if tc_chunk.function.name:
tool_calls[idx]["function"][
"name"
] = tc_chunk.function.name
if tc_chunk.function.arguments:
tool_calls[idx]["function"][
"arguments"
] += tc_chunk.function.arguments
# Emit StreamToolInputStart only after we have the tool call ID
if (
idx not in emitted_start_for_idx
and tool_calls[idx]["id"]
and tool_calls[idx]["function"]["name"]
):
yield StreamToolInputStart(
toolCallId=tool_calls[idx]["id"],
toolName=tool_calls[idx]["function"]["name"],
)
emitted_start_for_idx.add(idx)
logger.info(f"Stream complete. Finish reason: {finish_reason}")
# Yield all accumulated tool calls after the stream is complete
# This ensures all tool call arguments have been fully received
for idx, tool_call in enumerate(tool_calls):
try:
async for tc in _yield_tool_call(tool_calls, idx, session):
yield tc
except (orjson.JSONDecodeError, KeyError, TypeError) as e:
logger.error(
f"Failed to parse tool call {idx}: {e}",
exc_info=True,
extra={"tool_call": tool_call},
)
yield StreamError(
errorText=f"Invalid tool call arguments for tool {tool_call.get('function', {}).get('name', 'unknown')}: {e}",
)
# Re-raise to trigger retry logic in the parent function
raise
yield StreamFinish()
return
except Exception as e:
last_error = e
if _is_retryable_error(e) and retry_count < MAX_RETRIES:
retry_count += 1
# Calculate delay with exponential backoff
delay = min(
BASE_DELAY_SECONDS * (2 ** (retry_count - 1)),
MAX_DELAY_SECONDS,
)
logger.warning(
f"Retryable error in stream: {e!s}. "
f"Retrying in {delay:.1f}s (attempt {retry_count}/{MAX_RETRIES})"
)
await asyncio.sleep(delay)
continue # Retry the stream
else:
# Non-retryable error or max retries exceeded
logger.error(
f"Error in stream (not retrying): {e!s}",
exc_info=True,
)
error_response = StreamError(errorText=str(e))
yield error_response
yield StreamFinish()
return
# If we exit the retry loop without returning, it means we exhausted retries
if last_error:
logger.error(
f"Max retries ({MAX_RETRIES}) exceeded. Last error: {last_error!s}",
exc_info=True,
)
yield StreamError(errorText=f"Max retries exceeded: {last_error!s}")
yield StreamFinish()
return
async def _yield_tool_call(
tool_calls: list[dict[str, Any]],
yield_idx: int,
session: ChatSession,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""
Yield a tool call and its execution result.
Raises:
orjson.JSONDecodeError: If tool call arguments cannot be parsed as JSON
KeyError: If expected tool call fields are missing
TypeError: If tool call structure is invalid
"""
tool_name = tool_calls[yield_idx]["function"]["name"]
tool_call_id = tool_calls[yield_idx]["id"]
logger.info(f"Yielding tool call: {tool_calls[yield_idx]}")
# Parse tool call arguments - handle empty arguments gracefully
raw_arguments = tool_calls[yield_idx]["function"]["arguments"]
if raw_arguments:
arguments = orjson.loads(raw_arguments)
else:
arguments = {}
yield StreamToolInputAvailable(
toolCallId=tool_call_id,
toolName=tool_name,
input=arguments,
)
tool_execution_response: StreamToolOutputAvailable = await execute_tool(
tool_name=tool_name,
parameters=arguments,
tool_call_id=tool_call_id,
user_id=session.user_id,
session=session,
)
logger.info(f"Yielding Tool execution response: {tool_execution_response}")
yield tool_execution_response

View File

@@ -1,59 +0,0 @@
from typing import TYPE_CHECKING, Any
from openai.types.chat import ChatCompletionToolParam
from backend.api.features.chat.model import ChatSession
from .add_understanding import AddUnderstandingTool
from .agent_output import AgentOutputTool
from .base import BaseTool
from .create_agent import CreateAgentTool
from .edit_agent import EditAgentTool
from .find_agent import FindAgentTool
from .find_block import FindBlockTool
from .find_library_agent import FindLibraryAgentTool
from .get_doc_page import GetDocPageTool
from .run_agent import RunAgentTool
from .run_block import RunBlockTool
from .search_docs import SearchDocsTool
if TYPE_CHECKING:
from backend.api.features.chat.response_model import StreamToolOutputAvailable
# Single source of truth for all tools
TOOL_REGISTRY: dict[str, BaseTool] = {
"add_understanding": AddUnderstandingTool(),
"create_agent": CreateAgentTool(),
"edit_agent": EditAgentTool(),
"find_agent": FindAgentTool(),
"find_block": FindBlockTool(),
"find_library_agent": FindLibraryAgentTool(),
"run_agent": RunAgentTool(),
"run_block": RunBlockTool(),
"agent_output": AgentOutputTool(),
"search_docs": SearchDocsTool(),
"get_doc_page": GetDocPageTool(),
}
# Export individual tool instances for backwards compatibility
find_agent_tool = TOOL_REGISTRY["find_agent"]
run_agent_tool = TOOL_REGISTRY["run_agent"]
# Generated from registry for OpenAI API
tools: list[ChatCompletionToolParam] = [
tool.as_openai_tool() for tool in TOOL_REGISTRY.values()
]
async def execute_tool(
tool_name: str,
parameters: dict[str, Any],
user_id: str | None,
session: ChatSession,
tool_call_id: str,
) -> "StreamToolOutputAvailable":
"""Execute a tool by name."""
tool = TOOL_REGISTRY.get(tool_name)
if not tool:
raise ValueError(f"Tool {tool_name} not found")
return await tool.execute(user_id, session, tool_call_id, **parameters)

View File

@@ -1,119 +0,0 @@
"""Tool for capturing user business understanding incrementally."""
import logging
from typing import Any
from backend.api.features.chat.model import ChatSession
from backend.data.understanding import (
BusinessUnderstandingInput,
upsert_business_understanding,
)
from .base import BaseTool
from .models import ErrorResponse, ToolResponseBase, UnderstandingUpdatedResponse
logger = logging.getLogger(__name__)
class AddUnderstandingTool(BaseTool):
"""Tool for capturing user's business understanding incrementally."""
@property
def name(self) -> str:
return "add_understanding"
@property
def description(self) -> str:
return """Capture and store information about the user's business context,
workflows, pain points, and automation goals. Call this tool whenever the user
shares information about their business. Each call incrementally adds to the
existing understanding - you don't need to provide all fields at once.
Use this to build a comprehensive profile that helps recommend better agents
and automations for the user's specific needs."""
@property
def parameters(self) -> dict[str, Any]:
# Auto-generate from Pydantic model schema
schema = BusinessUnderstandingInput.model_json_schema()
properties = {}
for field_name, field_schema in schema.get("properties", {}).items():
prop: dict[str, Any] = {"description": field_schema.get("description", "")}
# Handle anyOf for Optional types
if "anyOf" in field_schema:
for option in field_schema["anyOf"]:
if option.get("type") != "null":
prop["type"] = option.get("type", "string")
if "items" in option:
prop["items"] = option["items"]
break
else:
prop["type"] = field_schema.get("type", "string")
if "items" in field_schema:
prop["items"] = field_schema["items"]
properties[field_name] = prop
return {"type": "object", "properties": properties, "required": []}
@property
def requires_auth(self) -> bool:
"""Requires authentication to store user-specific data."""
return True
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""
Capture and store business understanding incrementally.
Each call merges new data with existing understanding:
- String fields are overwritten if provided
- List fields are appended (with deduplication)
"""
session_id = session.session_id
if not user_id:
return ErrorResponse(
message="Authentication required to save business understanding.",
session_id=session_id,
)
# Check if any data was provided
if not any(v is not None for v in kwargs.values()):
return ErrorResponse(
message="Please provide at least one field to update.",
session_id=session_id,
)
# Build input model from kwargs (only include fields defined in the model)
valid_fields = set(BusinessUnderstandingInput.model_fields.keys())
input_data = BusinessUnderstandingInput(
**{k: v for k, v in kwargs.items() if k in valid_fields}
)
# Track which fields were updated
updated_fields = [
k for k, v in kwargs.items() if k in valid_fields and v is not None
]
# Upsert with merge
understanding = await upsert_business_understanding(user_id, input_data)
# Build current understanding summary (filter out empty values)
current_understanding = {
k: v
for k, v in understanding.model_dump(
exclude={"id", "user_id", "created_at", "updated_at"}
).items()
if v is not None and v != [] and v != ""
}
return UnderstandingUpdatedResponse(
message=f"Updated understanding with: {', '.join(updated_fields)}. "
"I now have a better picture of your business context.",
session_id=session_id,
updated_fields=updated_fields,
current_understanding=current_understanding,
)

View File

@@ -1,29 +0,0 @@
"""Agent generator package - Creates agents from natural language."""
from .core import (
apply_agent_patch,
decompose_goal,
generate_agent,
generate_agent_patch,
get_agent_as_json,
save_agent_to_library,
)
from .fixer import apply_all_fixes
from .utils import get_blocks_info
from .validator import validate_agent
__all__ = [
# Core functions
"decompose_goal",
"generate_agent",
"generate_agent_patch",
"apply_agent_patch",
"save_agent_to_library",
"get_agent_as_json",
# Fixer
"apply_all_fixes",
# Validator
"validate_agent",
# Utils
"get_blocks_info",
]

View File

@@ -1,25 +0,0 @@
"""OpenRouter client configuration for agent generation."""
import os
from openai import AsyncOpenAI
# Configuration - use OPEN_ROUTER_API_KEY for consistency with chat/config.py
OPENROUTER_API_KEY = os.getenv("OPEN_ROUTER_API_KEY")
AGENT_GENERATOR_MODEL = os.getenv("AGENT_GENERATOR_MODEL", "anthropic/claude-opus-4.5")
# OpenRouter client (OpenAI-compatible API)
_client: AsyncOpenAI | None = None
def get_client() -> AsyncOpenAI:
"""Get or create the OpenRouter client."""
global _client
if _client is None:
if not OPENROUTER_API_KEY:
raise ValueError("OPENROUTER_API_KEY environment variable is required")
_client = AsyncOpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=OPENROUTER_API_KEY,
)
return _client

View File

@@ -1,390 +0,0 @@
"""Core agent generation functions."""
import copy
import json
import logging
import uuid
from typing import Any
from backend.api.features.library import db as library_db
from backend.data.graph import Graph, Link, Node, create_graph
from .client import AGENT_GENERATOR_MODEL, get_client
from .prompts import DECOMPOSITION_PROMPT, GENERATION_PROMPT, PATCH_PROMPT
from .utils import get_block_summaries, parse_json_from_llm
logger = logging.getLogger(__name__)
async def decompose_goal(description: str, context: str = "") -> dict[str, Any] | None:
"""Break down a goal into steps or return clarifying questions.
Args:
description: Natural language goal description
context: Additional context (e.g., answers to previous questions)
Returns:
Dict with either:
- {"type": "clarifying_questions", "questions": [...]}
- {"type": "instructions", "steps": [...]}
Or None on error
"""
client = get_client()
prompt = DECOMPOSITION_PROMPT.format(block_summaries=get_block_summaries())
full_description = description
if context:
full_description = f"{description}\n\nAdditional context:\n{context}"
try:
response = await client.chat.completions.create(
model=AGENT_GENERATOR_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": full_description},
],
temperature=0,
)
content = response.choices[0].message.content
if content is None:
logger.error("LLM returned empty content for decomposition")
return None
result = parse_json_from_llm(content)
if result is None:
logger.error(f"Failed to parse decomposition response: {content[:200]}")
return None
return result
except Exception as e:
logger.error(f"Error decomposing goal: {e}")
return None
async def generate_agent(instructions: dict[str, Any]) -> dict[str, Any] | None:
"""Generate agent JSON from instructions.
Args:
instructions: Structured instructions from decompose_goal
Returns:
Agent JSON dict or None on error
"""
client = get_client()
prompt = GENERATION_PROMPT.format(block_summaries=get_block_summaries())
try:
response = await client.chat.completions.create(
model=AGENT_GENERATOR_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": json.dumps(instructions, indent=2)},
],
temperature=0,
)
content = response.choices[0].message.content
if content is None:
logger.error("LLM returned empty content for agent generation")
return None
result = parse_json_from_llm(content)
if result is None:
logger.error(f"Failed to parse agent JSON: {content[:200]}")
return None
# Ensure required fields
if "id" not in result:
result["id"] = str(uuid.uuid4())
if "version" not in result:
result["version"] = 1
if "is_active" not in result:
result["is_active"] = True
return result
except Exception as e:
logger.error(f"Error generating agent: {e}")
return None
def json_to_graph(agent_json: dict[str, Any]) -> Graph:
"""Convert agent JSON dict to Graph model.
Args:
agent_json: Agent JSON with nodes and links
Returns:
Graph ready for saving
"""
nodes = []
for n in agent_json.get("nodes", []):
node = Node(
id=n.get("id", str(uuid.uuid4())),
block_id=n["block_id"],
input_default=n.get("input_default", {}),
metadata=n.get("metadata", {}),
)
nodes.append(node)
links = []
for link_data in agent_json.get("links", []):
link = Link(
id=link_data.get("id", str(uuid.uuid4())),
source_id=link_data["source_id"],
sink_id=link_data["sink_id"],
source_name=link_data["source_name"],
sink_name=link_data["sink_name"],
is_static=link_data.get("is_static", False),
)
links.append(link)
return Graph(
id=agent_json.get("id", str(uuid.uuid4())),
version=agent_json.get("version", 1),
is_active=agent_json.get("is_active", True),
name=agent_json.get("name", "Generated Agent"),
description=agent_json.get("description", ""),
nodes=nodes,
links=links,
)
def _reassign_node_ids(graph: Graph) -> None:
"""Reassign all node and link IDs to new UUIDs.
This is needed when creating a new version to avoid unique constraint violations.
"""
# Create mapping from old node IDs to new UUIDs
id_map = {node.id: str(uuid.uuid4()) for node in graph.nodes}
# Reassign node IDs
for node in graph.nodes:
node.id = id_map[node.id]
# Update link references to use new node IDs
for link in graph.links:
link.id = str(uuid.uuid4()) # Also give links new IDs
if link.source_id in id_map:
link.source_id = id_map[link.source_id]
if link.sink_id in id_map:
link.sink_id = id_map[link.sink_id]
async def save_agent_to_library(
agent_json: dict[str, Any], user_id: str, is_update: bool = False
) -> tuple[Graph, Any]:
"""Save agent to database and user's library.
Args:
agent_json: Agent JSON dict
user_id: User ID
is_update: Whether this is an update to an existing agent
Returns:
Tuple of (created Graph, LibraryAgent)
"""
from backend.data.graph import get_graph_all_versions
graph = json_to_graph(agent_json)
if is_update:
# For updates, keep the same graph ID but increment version
# and reassign node/link IDs to avoid conflicts
if graph.id:
existing_versions = await get_graph_all_versions(graph.id, user_id)
if existing_versions:
latest_version = max(v.version for v in existing_versions)
graph.version = latest_version + 1
# Reassign node IDs (but keep graph ID the same)
_reassign_node_ids(graph)
logger.info(f"Updating agent {graph.id} to version {graph.version}")
else:
# For new agents, always generate a fresh UUID to avoid collisions
graph.id = str(uuid.uuid4())
graph.version = 1
# Reassign all node IDs as well
_reassign_node_ids(graph)
logger.info(f"Creating new agent with ID {graph.id}")
# Save to database
created_graph = await create_graph(graph, user_id)
# Add to user's library (or update existing library agent)
library_agents = await library_db.create_library_agent(
graph=created_graph,
user_id=user_id,
create_library_agents_for_sub_graphs=False,
)
return created_graph, library_agents[0]
async def get_agent_as_json(
graph_id: str, user_id: str | None
) -> dict[str, Any] | None:
"""Fetch an agent and convert to JSON format for editing.
Args:
graph_id: Graph ID or library agent ID
user_id: User ID
Returns:
Agent as JSON dict or None if not found
"""
from backend.data.graph import get_graph
# Try to get the graph (version=None gets the active version)
graph = await get_graph(graph_id, version=None, user_id=user_id)
if not graph:
return None
# Convert to JSON format
nodes = []
for node in graph.nodes:
nodes.append(
{
"id": node.id,
"block_id": node.block_id,
"input_default": node.input_default,
"metadata": node.metadata,
}
)
links = []
for node in graph.nodes:
for link in node.output_links:
links.append(
{
"id": link.id,
"source_id": link.source_id,
"sink_id": link.sink_id,
"source_name": link.source_name,
"sink_name": link.sink_name,
"is_static": link.is_static,
}
)
return {
"id": graph.id,
"name": graph.name,
"description": graph.description,
"version": graph.version,
"is_active": graph.is_active,
"nodes": nodes,
"links": links,
}
async def generate_agent_patch(
update_request: str, current_agent: dict[str, Any]
) -> dict[str, Any] | None:
"""Generate a patch to update an existing agent.
Args:
update_request: Natural language description of changes
current_agent: Current agent JSON
Returns:
Patch dict or clarifying questions, or None on error
"""
client = get_client()
prompt = PATCH_PROMPT.format(
current_agent=json.dumps(current_agent, indent=2),
block_summaries=get_block_summaries(),
)
try:
response = await client.chat.completions.create(
model=AGENT_GENERATOR_MODEL,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": update_request},
],
temperature=0,
)
content = response.choices[0].message.content
if content is None:
logger.error("LLM returned empty content for patch generation")
return None
return parse_json_from_llm(content)
except Exception as e:
logger.error(f"Error generating patch: {e}")
return None
def apply_agent_patch(
current_agent: dict[str, Any], patch: dict[str, Any]
) -> dict[str, Any]:
"""Apply a patch to an existing agent.
Args:
current_agent: Current agent JSON
patch: Patch dict with operations
Returns:
Updated agent JSON
"""
agent = copy.deepcopy(current_agent)
patches = patch.get("patches", [])
for p in patches:
patch_type = p.get("type")
if patch_type == "modify":
node_id = p.get("node_id")
changes = p.get("changes", {})
for node in agent.get("nodes", []):
if node["id"] == node_id:
_deep_update(node, changes)
logger.debug(f"Modified node {node_id}")
break
elif patch_type == "add":
new_nodes = p.get("new_nodes", [])
new_links = p.get("new_links", [])
agent["nodes"] = agent.get("nodes", []) + new_nodes
agent["links"] = agent.get("links", []) + new_links
logger.debug(f"Added {len(new_nodes)} nodes, {len(new_links)} links")
elif patch_type == "remove":
node_ids_to_remove = set(p.get("node_ids", []))
link_ids_to_remove = set(p.get("link_ids", []))
# Remove nodes
agent["nodes"] = [
n for n in agent.get("nodes", []) if n["id"] not in node_ids_to_remove
]
# Remove links (both explicit and those referencing removed nodes)
agent["links"] = [
link
for link in agent.get("links", [])
if link["id"] not in link_ids_to_remove
and link["source_id"] not in node_ids_to_remove
and link["sink_id"] not in node_ids_to_remove
]
logger.debug(
f"Removed {len(node_ids_to_remove)} nodes, {len(link_ids_to_remove)} links"
)
return agent
def _deep_update(target: dict, source: dict) -> None:
"""Recursively update a dict with another dict."""
for key, value in source.items():
if key in target and isinstance(target[key], dict) and isinstance(value, dict):
_deep_update(target[key], value)
else:
target[key] = value

View File

@@ -1,606 +0,0 @@
"""Agent fixer - Fixes common LLM generation errors."""
import logging
import re
import uuid
from typing import Any
from .utils import (
ADDTODICTIONARY_BLOCK_ID,
ADDTOLIST_BLOCK_ID,
CODE_EXECUTION_BLOCK_ID,
CONDITION_BLOCK_ID,
CREATEDICT_BLOCK_ID,
CREATELIST_BLOCK_ID,
DATA_SAMPLING_BLOCK_ID,
DOUBLE_CURLY_BRACES_BLOCK_IDS,
GET_CURRENT_DATE_BLOCK_ID,
STORE_VALUE_BLOCK_ID,
UNIVERSAL_TYPE_CONVERTER_BLOCK_ID,
get_blocks_info,
is_valid_uuid,
)
logger = logging.getLogger(__name__)
def fix_agent_ids(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix invalid UUIDs in agent and link IDs."""
# Fix agent ID
if not is_valid_uuid(agent.get("id", "")):
agent["id"] = str(uuid.uuid4())
logger.debug(f"Fixed agent ID: {agent['id']}")
# Fix node IDs
id_mapping = {} # Old ID -> New ID
for node in agent.get("nodes", []):
if not is_valid_uuid(node.get("id", "")):
old_id = node.get("id", "")
new_id = str(uuid.uuid4())
id_mapping[old_id] = new_id
node["id"] = new_id
logger.debug(f"Fixed node ID: {old_id} -> {new_id}")
# Fix link IDs and update references
for link in agent.get("links", []):
if not is_valid_uuid(link.get("id", "")):
link["id"] = str(uuid.uuid4())
logger.debug(f"Fixed link ID: {link['id']}")
# Update source/sink IDs if they were remapped
if link.get("source_id") in id_mapping:
link["source_id"] = id_mapping[link["source_id"]]
if link.get("sink_id") in id_mapping:
link["sink_id"] = id_mapping[link["sink_id"]]
return agent
def fix_double_curly_braces(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix single curly braces to double in template blocks."""
for node in agent.get("nodes", []):
if node.get("block_id") not in DOUBLE_CURLY_BRACES_BLOCK_IDS:
continue
input_data = node.get("input_default", {})
for key in ("prompt", "format"):
if key in input_data and isinstance(input_data[key], str):
original = input_data[key]
# Fix simple variable references: {var} -> {{var}}
fixed = re.sub(
r"(?<!\{)\{([a-zA-Z_][a-zA-Z0-9_]*)\}(?!\})",
r"{{\1}}",
original,
)
if fixed != original:
input_data[key] = fixed
logger.debug(f"Fixed curly braces in {key}")
return agent
def fix_storevalue_before_condition(agent: dict[str, Any]) -> dict[str, Any]:
"""Add StoreValueBlock before ConditionBlock if needed for value2."""
nodes = agent.get("nodes", [])
links = agent.get("links", [])
# Find all ConditionBlock nodes
condition_node_ids = {
node["id"] for node in nodes if node.get("block_id") == CONDITION_BLOCK_ID
}
if not condition_node_ids:
return agent
new_nodes = []
new_links = []
processed_conditions = set()
for link in links:
sink_id = link.get("sink_id")
sink_name = link.get("sink_name")
# Check if this link goes to a ConditionBlock's value2
if sink_id in condition_node_ids and sink_name == "value2":
source_node = next(
(n for n in nodes if n["id"] == link.get("source_id")), None
)
# Skip if source is already a StoreValueBlock
if source_node and source_node.get("block_id") == STORE_VALUE_BLOCK_ID:
continue
# Skip if we already processed this condition
if sink_id in processed_conditions:
continue
processed_conditions.add(sink_id)
# Create StoreValueBlock
store_node_id = str(uuid.uuid4())
store_node = {
"id": store_node_id,
"block_id": STORE_VALUE_BLOCK_ID,
"input_default": {"data": None},
"metadata": {"position": {"x": 0, "y": -100}},
}
new_nodes.append(store_node)
# Create link: original source -> StoreValueBlock
new_links.append(
{
"id": str(uuid.uuid4()),
"source_id": link["source_id"],
"source_name": link["source_name"],
"sink_id": store_node_id,
"sink_name": "input",
"is_static": False,
}
)
# Update original link: StoreValueBlock -> ConditionBlock
link["source_id"] = store_node_id
link["source_name"] = "output"
logger.debug(f"Added StoreValueBlock before ConditionBlock {sink_id}")
if new_nodes:
agent["nodes"] = nodes + new_nodes
return agent
def fix_addtolist_blocks(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix AddToList blocks by adding prerequisite empty AddToList block.
When an AddToList block is found:
1. Checks if there's a CreateListBlock before it
2. Removes CreateListBlock if linked directly to AddToList
3. Adds an empty AddToList block before the original
4. Ensures the original has a self-referencing link
"""
nodes = agent.get("nodes", [])
links = agent.get("links", [])
new_nodes = []
original_addtolist_ids = set()
nodes_to_remove = set()
links_to_remove = []
# First pass: identify CreateListBlock nodes to remove
for link in links:
source_node = next(
(n for n in nodes if n.get("id") == link.get("source_id")), None
)
sink_node = next((n for n in nodes if n.get("id") == link.get("sink_id")), None)
if (
source_node
and sink_node
and source_node.get("block_id") == CREATELIST_BLOCK_ID
and sink_node.get("block_id") == ADDTOLIST_BLOCK_ID
):
nodes_to_remove.add(source_node.get("id"))
links_to_remove.append(link)
logger.debug(f"Removing CreateListBlock {source_node.get('id')}")
# Second pass: process AddToList blocks
filtered_nodes = []
for node in nodes:
if node.get("id") in nodes_to_remove:
continue
if node.get("block_id") == ADDTOLIST_BLOCK_ID:
original_addtolist_ids.add(node.get("id"))
node_id = node.get("id")
pos = node.get("metadata", {}).get("position", {"x": 0, "y": 0})
# Check if already has prerequisite
has_prereq = any(
link.get("sink_id") == node_id
and link.get("sink_name") == "list"
and link.get("source_name") == "updated_list"
for link in links
)
if not has_prereq:
# Remove links to "list" input (except self-reference)
for link in links:
if (
link.get("sink_id") == node_id
and link.get("sink_name") == "list"
and link.get("source_id") != node_id
and link not in links_to_remove
):
links_to_remove.append(link)
# Create prerequisite AddToList block
prereq_id = str(uuid.uuid4())
prereq_node = {
"id": prereq_id,
"block_id": ADDTOLIST_BLOCK_ID,
"input_default": {"list": [], "entry": None, "entries": []},
"metadata": {
"position": {"x": pos.get("x", 0) - 800, "y": pos.get("y", 0)}
},
}
new_nodes.append(prereq_node)
# Link prerequisite to original
links.append(
{
"id": str(uuid.uuid4()),
"source_id": prereq_id,
"source_name": "updated_list",
"sink_id": node_id,
"sink_name": "list",
"is_static": False,
}
)
logger.debug(f"Added prerequisite AddToList block for {node_id}")
filtered_nodes.append(node)
# Remove marked links
filtered_links = [link for link in links if link not in links_to_remove]
# Add self-referencing links for original AddToList blocks
for node in filtered_nodes + new_nodes:
if (
node.get("block_id") == ADDTOLIST_BLOCK_ID
and node.get("id") in original_addtolist_ids
):
node_id = node.get("id")
has_self_ref = any(
link["source_id"] == node_id
and link["sink_id"] == node_id
and link["source_name"] == "updated_list"
and link["sink_name"] == "list"
for link in filtered_links
)
if not has_self_ref:
filtered_links.append(
{
"id": str(uuid.uuid4()),
"source_id": node_id,
"source_name": "updated_list",
"sink_id": node_id,
"sink_name": "list",
"is_static": False,
}
)
logger.debug(f"Added self-reference for AddToList {node_id}")
agent["nodes"] = filtered_nodes + new_nodes
agent["links"] = filtered_links
return agent
def fix_addtodictionary_blocks(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix AddToDictionary blocks by removing empty CreateDictionary nodes."""
nodes = agent.get("nodes", [])
links = agent.get("links", [])
nodes_to_remove = set()
links_to_remove = []
for link in links:
source_node = next(
(n for n in nodes if n.get("id") == link.get("source_id")), None
)
sink_node = next((n for n in nodes if n.get("id") == link.get("sink_id")), None)
if (
source_node
and sink_node
and source_node.get("block_id") == CREATEDICT_BLOCK_ID
and sink_node.get("block_id") == ADDTODICTIONARY_BLOCK_ID
):
nodes_to_remove.add(source_node.get("id"))
links_to_remove.append(link)
logger.debug(f"Removing CreateDictionary {source_node.get('id')}")
agent["nodes"] = [n for n in nodes if n.get("id") not in nodes_to_remove]
agent["links"] = [link for link in links if link not in links_to_remove]
return agent
def fix_code_execution_output(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix CodeExecutionBlock output: change 'response' to 'stdout_logs'."""
nodes = agent.get("nodes", [])
links = agent.get("links", [])
for link in links:
source_node = next(
(n for n in nodes if n.get("id") == link.get("source_id")), None
)
if (
source_node
and source_node.get("block_id") == CODE_EXECUTION_BLOCK_ID
and link.get("source_name") == "response"
):
link["source_name"] = "stdout_logs"
logger.debug("Fixed CodeExecutionBlock output: response -> stdout_logs")
return agent
def fix_data_sampling_sample_size(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix DataSamplingBlock by setting sample_size to 1 as default."""
nodes = agent.get("nodes", [])
links = agent.get("links", [])
links_to_remove = []
for node in nodes:
if node.get("block_id") == DATA_SAMPLING_BLOCK_ID:
node_id = node.get("id")
input_default = node.get("input_default", {})
# Remove links to sample_size
for link in links:
if (
link.get("sink_id") == node_id
and link.get("sink_name") == "sample_size"
):
links_to_remove.append(link)
# Set default
input_default["sample_size"] = 1
node["input_default"] = input_default
logger.debug(f"Fixed DataSamplingBlock {node_id} sample_size to 1")
if links_to_remove:
agent["links"] = [link for link in links if link not in links_to_remove]
return agent
def fix_node_x_coordinates(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix node x-coordinates to ensure 800+ unit spacing between linked nodes."""
nodes = agent.get("nodes", [])
links = agent.get("links", [])
node_lookup = {n.get("id"): n for n in nodes}
for link in links:
source_id = link.get("source_id")
sink_id = link.get("sink_id")
source_node = node_lookup.get(source_id)
sink_node = node_lookup.get(sink_id)
if not source_node or not sink_node:
continue
source_pos = source_node.get("metadata", {}).get("position", {})
sink_pos = sink_node.get("metadata", {}).get("position", {})
source_x = source_pos.get("x", 0)
sink_x = sink_pos.get("x", 0)
if abs(sink_x - source_x) < 800:
new_x = source_x + 800
if "metadata" not in sink_node:
sink_node["metadata"] = {}
if "position" not in sink_node["metadata"]:
sink_node["metadata"]["position"] = {}
sink_node["metadata"]["position"]["x"] = new_x
logger.debug(f"Fixed node {sink_id} x: {sink_x} -> {new_x}")
return agent
def fix_getcurrentdate_offset(agent: dict[str, Any]) -> dict[str, Any]:
"""Fix GetCurrentDateBlock offset to ensure it's positive."""
for node in agent.get("nodes", []):
if node.get("block_id") == GET_CURRENT_DATE_BLOCK_ID:
input_default = node.get("input_default", {})
if "offset" in input_default:
offset = input_default["offset"]
if isinstance(offset, (int, float)) and offset < 0:
input_default["offset"] = abs(offset)
logger.debug(f"Fixed offset: {offset} -> {abs(offset)}")
return agent
def fix_ai_model_parameter(
agent: dict[str, Any],
blocks_info: list[dict[str, Any]],
default_model: str = "gpt-4o",
) -> dict[str, Any]:
"""Add default model parameter to AI blocks if missing."""
block_map = {b.get("id"): b for b in blocks_info}
for node in agent.get("nodes", []):
block_id = node.get("block_id")
block = block_map.get(block_id)
if not block:
continue
# Check if block has AI category
categories = block.get("categories", [])
is_ai_block = any(
cat.get("category") == "AI" for cat in categories if isinstance(cat, dict)
)
if is_ai_block:
input_default = node.get("input_default", {})
if "model" not in input_default:
input_default["model"] = default_model
node["input_default"] = input_default
logger.debug(
f"Added model '{default_model}' to AI block {node.get('id')}"
)
return agent
def fix_link_static_properties(
agent: dict[str, Any], blocks_info: list[dict[str, Any]]
) -> dict[str, Any]:
"""Fix is_static property based on source block's staticOutput."""
block_map = {b.get("id"): b for b in blocks_info}
node_lookup = {n.get("id"): n for n in agent.get("nodes", [])}
for link in agent.get("links", []):
source_node = node_lookup.get(link.get("source_id"))
if not source_node:
continue
source_block = block_map.get(source_node.get("block_id"))
if not source_block:
continue
static_output = source_block.get("staticOutput", False)
if link.get("is_static") != static_output:
link["is_static"] = static_output
logger.debug(f"Fixed link {link.get('id')} is_static to {static_output}")
return agent
def fix_data_type_mismatch(
agent: dict[str, Any], blocks_info: list[dict[str, Any]]
) -> dict[str, Any]:
"""Fix data type mismatches by inserting UniversalTypeConverterBlock."""
nodes = agent.get("nodes", [])
links = agent.get("links", [])
block_map = {b.get("id"): b for b in blocks_info}
node_lookup = {n.get("id"): n for n in nodes}
def get_property_type(schema: dict, name: str) -> str | None:
if "_#_" in name:
parent, child = name.split("_#_", 1)
parent_schema = schema.get(parent, {})
if "properties" in parent_schema:
return parent_schema["properties"].get(child, {}).get("type")
return None
return schema.get(name, {}).get("type")
def are_types_compatible(src: str, sink: str) -> bool:
if {src, sink} <= {"integer", "number"}:
return True
return src == sink
type_mapping = {
"string": "string",
"text": "string",
"integer": "number",
"number": "number",
"float": "number",
"boolean": "boolean",
"bool": "boolean",
"array": "list",
"list": "list",
"object": "dictionary",
"dict": "dictionary",
"dictionary": "dictionary",
}
new_links = []
nodes_to_add = []
for link in links:
source_node = node_lookup.get(link.get("source_id"))
sink_node = node_lookup.get(link.get("sink_id"))
if not source_node or not sink_node:
new_links.append(link)
continue
source_block = block_map.get(source_node.get("block_id"))
sink_block = block_map.get(sink_node.get("block_id"))
if not source_block or not sink_block:
new_links.append(link)
continue
source_outputs = source_block.get("outputSchema", {}).get("properties", {})
sink_inputs = sink_block.get("inputSchema", {}).get("properties", {})
source_type = get_property_type(source_outputs, link.get("source_name", ""))
sink_type = get_property_type(sink_inputs, link.get("sink_name", ""))
if (
source_type
and sink_type
and not are_types_compatible(source_type, sink_type)
):
# Insert type converter
converter_id = str(uuid.uuid4())
target_type = type_mapping.get(sink_type, sink_type)
converter_node = {
"id": converter_id,
"block_id": UNIVERSAL_TYPE_CONVERTER_BLOCK_ID,
"input_default": {"type": target_type},
"metadata": {"position": {"x": 0, "y": 100}},
}
nodes_to_add.append(converter_node)
# source -> converter
new_links.append(
{
"id": str(uuid.uuid4()),
"source_id": link["source_id"],
"source_name": link["source_name"],
"sink_id": converter_id,
"sink_name": "value",
"is_static": False,
}
)
# converter -> sink
new_links.append(
{
"id": str(uuid.uuid4()),
"source_id": converter_id,
"source_name": "value",
"sink_id": link["sink_id"],
"sink_name": link["sink_name"],
"is_static": False,
}
)
logger.debug(f"Inserted type converter: {source_type} -> {target_type}")
else:
new_links.append(link)
if nodes_to_add:
agent["nodes"] = nodes + nodes_to_add
agent["links"] = new_links
return agent
def apply_all_fixes(
agent: dict[str, Any], blocks_info: list[dict[str, Any]] | None = None
) -> dict[str, Any]:
"""Apply all fixes to an agent JSON.
Args:
agent: Agent JSON dict
blocks_info: Optional list of block info dicts for advanced fixes
Returns:
Fixed agent JSON
"""
# Basic fixes (no block info needed)
agent = fix_agent_ids(agent)
agent = fix_double_curly_braces(agent)
agent = fix_storevalue_before_condition(agent)
agent = fix_addtolist_blocks(agent)
agent = fix_addtodictionary_blocks(agent)
agent = fix_code_execution_output(agent)
agent = fix_data_sampling_sample_size(agent)
agent = fix_node_x_coordinates(agent)
agent = fix_getcurrentdate_offset(agent)
# Advanced fixes (require block info)
if blocks_info is None:
blocks_info = get_blocks_info()
agent = fix_ai_model_parameter(agent, blocks_info)
agent = fix_link_static_properties(agent, blocks_info)
agent = fix_data_type_mismatch(agent, blocks_info)
return agent

View File

@@ -1,225 +0,0 @@
"""Prompt templates for agent generation."""
DECOMPOSITION_PROMPT = """
You are an expert AutoGPT Workflow Decomposer. Your task is to analyze a user's high-level goal and break it down into a clear, step-by-step plan using the available blocks.
Each step should represent a distinct, automatable action suitable for execution by an AI automation system.
---
FIRST: Analyze the user's goal and determine:
1) Design-time configuration (fixed settings that won't change per run)
2) Runtime inputs (values the agent's end-user will provide each time it runs)
For anything that can vary per run (email addresses, names, dates, search terms, etc.):
- DO NOT ask for the actual value
- Instead, define it as an Agent Input with a clear name, type, and description
Only ask clarifying questions about design-time config that affects how you build the workflow:
- Which external service to use (e.g., "Gmail vs Outlook", "Notion vs Google Docs")
- Required formats or structures (e.g., "CSV, JSON, or PDF output?")
- Business rules that must be hard-coded
IMPORTANT CLARIFICATIONS POLICY:
- Ask no more than five essential questions
- Do not ask for concrete values that can be provided at runtime as Agent Inputs
- Do not ask for API keys or credentials; the platform handles those directly
- If there is enough information to infer reasonable defaults, prefer to propose defaults
---
GUIDELINES:
1. List each step as a numbered item
2. Describe the action clearly and specify inputs/outputs
3. Ensure steps are in logical, sequential order
4. Mention block names naturally (e.g., "Use GetWeatherByLocationBlock to...")
5. Help the user reach their goal efficiently
---
RULES:
1. OUTPUT FORMAT: Only output either clarifying questions OR step-by-step instructions, not both
2. USE ONLY THE BLOCKS PROVIDED
3. ALL required_input fields must be provided
4. Data types of linked properties must match
5. Write expert-level prompts for AI-related blocks
---
CRITICAL BLOCK RESTRICTIONS:
1. AddToListBlock: Outputs updated list EVERY addition, not after all additions
2. SendEmailBlock: Draft the email for user review; set SMTP config based on email type
3. ConditionBlock: value2 is reference, value1 is contrast
4. CodeExecutionBlock: DO NOT USE - use AI blocks instead
5. ReadCsvBlock: Only use the 'rows' output, not 'row'
---
OUTPUT FORMAT:
If more information is needed:
```json
{{
"type": "clarifying_questions",
"questions": [
{{
"question": "Which email provider should be used? (Gmail, Outlook, custom SMTP)",
"keyword": "email_provider",
"example": "Gmail"
}}
]
}}
```
If ready to proceed:
```json
{{
"type": "instructions",
"steps": [
{{
"step_number": 1,
"block_name": "AgentShortTextInputBlock",
"description": "Get the URL of the content to analyze.",
"inputs": [{{"name": "name", "value": "URL"}}],
"outputs": [{{"name": "result", "description": "The URL entered by user"}}]
}}
]
}}
```
---
AVAILABLE BLOCKS:
{block_summaries}
"""
GENERATION_PROMPT = """
You are an expert AI workflow builder. Generate a valid agent JSON from the given instructions.
---
NODES:
Each node must include:
- `id`: Unique UUID v4 (e.g. `a8f5b1e2-c3d4-4e5f-8a9b-0c1d2e3f4a5b`)
- `block_id`: The block identifier (must match an Allowed Block)
- `input_default`: Dict of inputs (can be empty if no static inputs needed)
- `metadata`: Must contain:
- `position`: {{"x": number, "y": number}} - adjacent nodes should differ by 800+ in X
- `customized_name`: Clear name describing this block's purpose in the workflow
---
LINKS:
Each link connects a source node's output to a sink node's input:
- `id`: MUST be UUID v4 (NOT "link-1", "link-2", etc.)
- `source_id`: ID of the source node
- `source_name`: Output field name from the source block
- `sink_id`: ID of the sink node
- `sink_name`: Input field name on the sink block
- `is_static`: true only if source block has static_output: true
CRITICAL: All IDs must be valid UUID v4 format!
---
AGENT (GRAPH):
Wrap nodes and links in:
- `id`: UUID of the agent
- `name`: Short, generic name (avoid specific company names, URLs)
- `description`: Short, generic description
- `nodes`: List of all nodes
- `links`: List of all links
- `version`: 1
- `is_active`: true
---
TIPS:
- All required_input fields must be provided via input_default or a valid link
- Ensure consistent source_id and sink_id references
- Avoid dangling links
- Input/output pins must match block schemas
- Do not invent unknown block_ids
---
ALLOWED BLOCKS:
{block_summaries}
---
Generate the complete agent JSON. Output ONLY valid JSON, no explanation.
"""
PATCH_PROMPT = """
You are an expert at modifying AutoGPT agent workflows. Given the current agent and a modification request, generate a JSON patch to update the agent.
CURRENT AGENT:
{current_agent}
AVAILABLE BLOCKS:
{block_summaries}
---
PATCH FORMAT:
Return a JSON object with the following structure:
```json
{{
"type": "patch",
"intent": "Brief description of what the patch does",
"patches": [
{{
"type": "modify",
"node_id": "uuid-of-node-to-modify",
"changes": {{
"input_default": {{"field": "new_value"}},
"metadata": {{"customized_name": "New Name"}}
}}
}},
{{
"type": "add",
"new_nodes": [
{{
"id": "new-uuid",
"block_id": "block-uuid",
"input_default": {{}},
"metadata": {{"position": {{"x": 0, "y": 0}}, "customized_name": "Name"}}
}}
],
"new_links": [
{{
"id": "link-uuid",
"source_id": "source-node-id",
"source_name": "output_field",
"sink_id": "sink-node-id",
"sink_name": "input_field"
}}
]
}},
{{
"type": "remove",
"node_ids": ["uuid-of-node-to-remove"],
"link_ids": ["uuid-of-link-to-remove"]
}}
]
}}
```
If you need more information, return:
```json
{{
"type": "clarifying_questions",
"questions": [
{{
"question": "What specific change do you want?",
"keyword": "change_type",
"example": "Add error handling"
}}
]
}}
```
Generate the minimal patch needed. Output ONLY valid JSON.
"""

View File

@@ -1,213 +0,0 @@
"""Utilities for agent generation."""
import json
import re
from typing import Any
from backend.data.block import get_blocks
# UUID validation regex
UUID_REGEX = re.compile(
r"^[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89ab][a-f0-9]{3}-[a-f0-9]{12}$"
)
# Block IDs for various fixes
STORE_VALUE_BLOCK_ID = "1ff065e9-88e8-4358-9d82-8dc91f622ba9"
CONDITION_BLOCK_ID = "715696a0-e1da-45c8-b209-c2fa9c3b0be6"
ADDTOLIST_BLOCK_ID = "aeb08fc1-2fc1-4141-bc8e-f758f183a822"
ADDTODICTIONARY_BLOCK_ID = "31d1064e-7446-4693-a7d4-65e5ca1180d1"
CREATELIST_BLOCK_ID = "a912d5c7-6e00-4542-b2a9-8034136930e4"
CREATEDICT_BLOCK_ID = "b924ddf4-de4f-4b56-9a85-358930dcbc91"
CODE_EXECUTION_BLOCK_ID = "0b02b072-abe7-11ef-8372-fb5d162dd712"
DATA_SAMPLING_BLOCK_ID = "4a448883-71fa-49cf-91cf-70d793bd7d87"
UNIVERSAL_TYPE_CONVERTER_BLOCK_ID = "95d1b990-ce13-4d88-9737-ba5c2070c97b"
GET_CURRENT_DATE_BLOCK_ID = "b29c1b50-5d0e-4d9f-8f9d-1b0e6fcbf0b1"
DOUBLE_CURLY_BRACES_BLOCK_IDS = [
"44f6c8ad-d75c-4ae1-8209-aad1c0326928", # FillTextTemplateBlock
"6ab085e2-20b3-4055-bc3e-08036e01eca6",
"90f8c45e-e983-4644-aa0b-b4ebe2f531bc",
"363ae599-353e-4804-937e-b2ee3cef3da4", # AgentOutputBlock
"3b191d9f-356f-482d-8238-ba04b6d18381",
"db7d8f02-2f44-4c55-ab7a-eae0941f0c30",
"3a7c4b8d-6e2f-4a5d-b9c1-f8d23c5a9b0e",
"ed1ae7a0-b770-4089-b520-1f0005fad19a",
"a892b8d9-3e4e-4e9c-9c1e-75f8efcf1bfa",
"b29c1b50-5d0e-4d9f-8f9d-1b0e6fcbf0b1",
"716a67b3-6760-42e7-86dc-18645c6e00fc",
"530cf046-2ce0-4854-ae2c-659db17c7a46",
"ed55ac19-356e-4243-a6cb-bc599e9b716f",
"1f292d4a-41a4-4977-9684-7c8d560b9f91", # LLM blocks
"32a87eab-381e-4dd4-bdb8-4c47151be35a",
]
def is_valid_uuid(value: str) -> bool:
"""Check if a string is a valid UUID v4."""
return isinstance(value, str) and UUID_REGEX.match(value) is not None
def _compact_schema(schema: dict) -> dict[str, str]:
"""Extract compact type info from a JSON schema properties dict.
Returns a dict of {field_name: type_string} for essential info only.
"""
props = schema.get("properties", {})
result = {}
for name, prop in props.items():
# Skip internal/complex fields
if name.startswith("_"):
continue
# Get type string
type_str = prop.get("type", "any")
# Handle anyOf/oneOf (optional types)
if "anyOf" in prop:
types = [t.get("type", "?") for t in prop["anyOf"] if t.get("type")]
type_str = "|".join(types) if types else "any"
elif "allOf" in prop:
type_str = "object"
# Add array item type if present
if type_str == "array" and "items" in prop:
items = prop["items"]
if isinstance(items, dict):
item_type = items.get("type", "any")
type_str = f"array[{item_type}]"
result[name] = type_str
return result
def get_block_summaries(include_schemas: bool = True) -> str:
"""Generate compact block summaries for prompts.
Args:
include_schemas: Whether to include input/output type info
Returns:
Formatted string of block summaries (compact format)
"""
blocks = get_blocks()
summaries = []
for block_id, block_cls in blocks.items():
block = block_cls()
name = block.name
desc = getattr(block, "description", "") or ""
# Truncate description
if len(desc) > 150:
desc = desc[:147] + "..."
if not include_schemas:
summaries.append(f"- {name} (id: {block_id}): {desc}")
else:
# Compact format with type info only
inputs = {}
outputs = {}
required = []
if hasattr(block, "input_schema"):
try:
schema = block.input_schema.jsonschema()
inputs = _compact_schema(schema)
required = schema.get("required", [])
except Exception:
pass
if hasattr(block, "output_schema"):
try:
schema = block.output_schema.jsonschema()
outputs = _compact_schema(schema)
except Exception:
pass
# Build compact line format
# Format: NAME (id): desc | in: {field:type, ...} [required] | out: {field:type}
in_str = ", ".join(f"{k}:{v}" for k, v in inputs.items())
out_str = ", ".join(f"{k}:{v}" for k, v in outputs.items())
req_str = f" req=[{','.join(required)}]" if required else ""
static = " [static]" if getattr(block, "static_output", False) else ""
line = f"- {name} (id: {block_id}): {desc}"
if in_str:
line += f"\n in: {{{in_str}}}{req_str}"
if out_str:
line += f"\n out: {{{out_str}}}{static}"
summaries.append(line)
return "\n".join(summaries)
def get_blocks_info() -> list[dict[str, Any]]:
"""Get block information with schemas for validation and fixing."""
blocks = get_blocks()
blocks_info = []
for block_id, block_cls in blocks.items():
block = block_cls()
blocks_info.append(
{
"id": block_id,
"name": block.name,
"description": getattr(block, "description", ""),
"categories": getattr(block, "categories", []),
"staticOutput": getattr(block, "static_output", False),
"inputSchema": (
block.input_schema.jsonschema()
if hasattr(block, "input_schema")
else {}
),
"outputSchema": (
block.output_schema.jsonschema()
if hasattr(block, "output_schema")
else {}
),
}
)
return blocks_info
def parse_json_from_llm(text: str) -> dict[str, Any] | None:
"""Extract JSON from LLM response (handles markdown code blocks)."""
if not text:
return None
# Try fenced code block
match = re.search(r"```(?:json)?\s*([\s\S]*?)```", text, re.IGNORECASE)
if match:
try:
return json.loads(match.group(1).strip())
except json.JSONDecodeError:
pass
# Try raw text
try:
return json.loads(text.strip())
except json.JSONDecodeError:
pass
# Try finding {...} span
start = text.find("{")
end = text.rfind("}")
if start != -1 and end > start:
try:
return json.loads(text[start : end + 1])
except json.JSONDecodeError:
pass
# Try finding [...] span
start = text.find("[")
end = text.rfind("]")
if start != -1 and end > start:
try:
return json.loads(text[start : end + 1])
except json.JSONDecodeError:
pass
return None

View File

@@ -1,279 +0,0 @@
"""Agent validator - Validates agent structure and connections."""
import logging
import re
from typing import Any
from .utils import get_blocks_info
logger = logging.getLogger(__name__)
class AgentValidator:
"""Validator for AutoGPT agents with detailed error reporting."""
def __init__(self):
self.errors: list[str] = []
def add_error(self, error: str) -> None:
"""Add an error message."""
self.errors.append(error)
def validate_block_existence(
self, agent: dict[str, Any], blocks_info: list[dict[str, Any]]
) -> bool:
"""Validate all block IDs exist in the blocks library."""
valid = True
valid_block_ids = {b.get("id") for b in blocks_info if b.get("id")}
for node in agent.get("nodes", []):
block_id = node.get("block_id")
node_id = node.get("id")
if not block_id:
self.add_error(f"Node '{node_id}' is missing 'block_id' field.")
valid = False
continue
if block_id not in valid_block_ids:
self.add_error(
f"Node '{node_id}' references block_id '{block_id}' which does not exist."
)
valid = False
return valid
def validate_link_node_references(self, agent: dict[str, Any]) -> bool:
"""Validate all node IDs referenced in links exist."""
valid = True
valid_node_ids = {n.get("id") for n in agent.get("nodes", []) if n.get("id")}
for link in agent.get("links", []):
link_id = link.get("id", "Unknown")
source_id = link.get("source_id")
sink_id = link.get("sink_id")
if not source_id:
self.add_error(f"Link '{link_id}' is missing 'source_id'.")
valid = False
elif source_id not in valid_node_ids:
self.add_error(
f"Link '{link_id}' references non-existent source_id '{source_id}'."
)
valid = False
if not sink_id:
self.add_error(f"Link '{link_id}' is missing 'sink_id'.")
valid = False
elif sink_id not in valid_node_ids:
self.add_error(
f"Link '{link_id}' references non-existent sink_id '{sink_id}'."
)
valid = False
return valid
def validate_required_inputs(
self, agent: dict[str, Any], blocks_info: list[dict[str, Any]]
) -> bool:
"""Validate required inputs are provided."""
valid = True
block_map = {b.get("id"): b for b in blocks_info}
for node in agent.get("nodes", []):
block_id = node.get("block_id")
block = block_map.get(block_id)
if not block:
continue
required_inputs = block.get("inputSchema", {}).get("required", [])
input_defaults = node.get("input_default", {})
node_id = node.get("id")
# Get linked inputs
linked_inputs = {
link["sink_name"]
for link in agent.get("links", [])
if link.get("sink_id") == node_id
}
for req_input in required_inputs:
if (
req_input not in input_defaults
and req_input not in linked_inputs
and req_input != "credentials"
):
block_name = block.get("name", "Unknown Block")
self.add_error(
f"Node '{node_id}' ({block_name}) is missing required input '{req_input}'."
)
valid = False
return valid
def validate_data_type_compatibility(
self, agent: dict[str, Any], blocks_info: list[dict[str, Any]]
) -> bool:
"""Validate linked data types are compatible."""
valid = True
block_map = {b.get("id"): b for b in blocks_info}
node_lookup = {n.get("id"): n for n in agent.get("nodes", [])}
def get_type(schema: dict, name: str) -> str | None:
if "_#_" in name:
parent, child = name.split("_#_", 1)
parent_schema = schema.get(parent, {})
if "properties" in parent_schema:
return parent_schema["properties"].get(child, {}).get("type")
return None
return schema.get(name, {}).get("type")
def are_compatible(src: str, sink: str) -> bool:
if {src, sink} <= {"integer", "number"}:
return True
return src == sink
for link in agent.get("links", []):
source_node = node_lookup.get(link.get("source_id"))
sink_node = node_lookup.get(link.get("sink_id"))
if not source_node or not sink_node:
continue
source_block = block_map.get(source_node.get("block_id"))
sink_block = block_map.get(sink_node.get("block_id"))
if not source_block or not sink_block:
continue
source_outputs = source_block.get("outputSchema", {}).get("properties", {})
sink_inputs = sink_block.get("inputSchema", {}).get("properties", {})
source_type = get_type(source_outputs, link.get("source_name", ""))
sink_type = get_type(sink_inputs, link.get("sink_name", ""))
if source_type and sink_type and not are_compatible(source_type, sink_type):
self.add_error(
f"Type mismatch: {source_block.get('name')} output '{link['source_name']}' "
f"({source_type}) -> {sink_block.get('name')} input '{link['sink_name']}' ({sink_type})."
)
valid = False
return valid
def validate_nested_sink_links(
self, agent: dict[str, Any], blocks_info: list[dict[str, Any]]
) -> bool:
"""Validate nested sink links (with _#_ notation)."""
valid = True
block_map = {b.get("id"): b for b in blocks_info}
node_lookup = {n.get("id"): n for n in agent.get("nodes", [])}
for link in agent.get("links", []):
sink_name = link.get("sink_name", "")
if "_#_" in sink_name:
parent, child = sink_name.split("_#_", 1)
sink_node = node_lookup.get(link.get("sink_id"))
if not sink_node:
continue
block = block_map.get(sink_node.get("block_id"))
if not block:
continue
input_props = block.get("inputSchema", {}).get("properties", {})
parent_schema = input_props.get(parent)
if not parent_schema:
self.add_error(
f"Invalid nested link '{sink_name}': parent '{parent}' not found."
)
valid = False
continue
if not parent_schema.get("additionalProperties"):
if not (
isinstance(parent_schema, dict)
and "properties" in parent_schema
and child in parent_schema.get("properties", {})
):
self.add_error(
f"Invalid nested link '{sink_name}': child '{child}' not found in '{parent}'."
)
valid = False
return valid
def validate_prompt_spaces(self, agent: dict[str, Any]) -> bool:
"""Validate prompts don't have spaces in template variables."""
valid = True
for node in agent.get("nodes", []):
input_default = node.get("input_default", {})
prompt = input_default.get("prompt", "")
if not isinstance(prompt, str):
continue
# Find {{...}} with spaces
matches = re.finditer(r"\{\{([^}]+)\}\}", prompt)
for match in matches:
content = match.group(1)
if " " in content:
self.add_error(
f"Node '{node.get('id')}' has spaces in template variable: "
f"'{{{{{content}}}}}' should be '{{{{{content.replace(' ', '_')}}}}}'."
)
valid = False
return valid
def validate(
self, agent: dict[str, Any], blocks_info: list[dict[str, Any]] | None = None
) -> tuple[bool, str | None]:
"""Run all validations.
Returns:
Tuple of (is_valid, error_message)
"""
self.errors = []
if blocks_info is None:
blocks_info = get_blocks_info()
checks = [
self.validate_block_existence(agent, blocks_info),
self.validate_link_node_references(agent),
self.validate_required_inputs(agent, blocks_info),
self.validate_data_type_compatibility(agent, blocks_info),
self.validate_nested_sink_links(agent, blocks_info),
self.validate_prompt_spaces(agent),
]
all_passed = all(checks)
if all_passed:
logger.info("Agent validation successful")
return True, None
error_message = "Agent validation failed:\n"
for i, error in enumerate(self.errors, 1):
error_message += f"{i}. {error}\n"
logger.warning(f"Agent validation failed with {len(self.errors)} errors")
return False, error_message
def validate_agent(
agent: dict[str, Any], blocks_info: list[dict[str, Any]] | None = None
) -> tuple[bool, str | None]:
"""Convenience function to validate an agent.
Returns:
Tuple of (is_valid, error_message)
"""
validator = AgentValidator()
return validator.validate(agent, blocks_info)

View File

@@ -1,446 +0,0 @@
"""Tool for retrieving agent execution outputs from user's library."""
import logging
import re
from datetime import datetime, timedelta, timezone
from typing import Any
from pydantic import BaseModel, field_validator
from backend.api.features.chat.model import ChatSession
from backend.api.features.library import db as library_db
from backend.api.features.library.model import LibraryAgent
from backend.data import execution as execution_db
from backend.data.execution import ExecutionStatus, GraphExecution, GraphExecutionMeta
from .base import BaseTool
from .models import (
AgentOutputResponse,
ErrorResponse,
ExecutionOutputInfo,
NoResultsResponse,
ToolResponseBase,
)
from .utils import fetch_graph_from_store_slug
logger = logging.getLogger(__name__)
class AgentOutputInput(BaseModel):
"""Input parameters for the agent_output tool."""
agent_name: str = ""
library_agent_id: str = ""
store_slug: str = ""
execution_id: str = ""
run_time: str = "latest"
@field_validator(
"agent_name",
"library_agent_id",
"store_slug",
"execution_id",
"run_time",
mode="before",
)
@classmethod
def strip_strings(cls, v: Any) -> Any:
"""Strip whitespace from string fields."""
return v.strip() if isinstance(v, str) else v
def parse_time_expression(
time_expr: str | None,
) -> tuple[datetime | None, datetime | None]:
"""
Parse time expression into datetime range (start, end).
Supports: "latest", "yesterday", "today", "last week", "last 7 days",
"last month", "last 30 days", ISO date "YYYY-MM-DD", ISO datetime.
"""
if not time_expr or time_expr.lower() == "latest":
return None, None
now = datetime.now(timezone.utc)
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
expr = time_expr.lower().strip()
# Relative time expressions lookup
relative_times: dict[str, tuple[datetime, datetime]] = {
"yesterday": (today_start - timedelta(days=1), today_start),
"today": (today_start, now),
"last week": (now - timedelta(days=7), now),
"last 7 days": (now - timedelta(days=7), now),
"last month": (now - timedelta(days=30), now),
"last 30 days": (now - timedelta(days=30), now),
}
if expr in relative_times:
return relative_times[expr]
# Try ISO date format (YYYY-MM-DD)
date_match = re.match(r"^(\d{4})-(\d{2})-(\d{2})$", expr)
if date_match:
try:
year, month, day = map(int, date_match.groups())
start = datetime(year, month, day, 0, 0, 0, tzinfo=timezone.utc)
return start, start + timedelta(days=1)
except ValueError:
# Invalid date components (e.g., month=13, day=32)
pass
# Try ISO datetime
try:
parsed = datetime.fromisoformat(expr.replace("Z", "+00:00"))
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed - timedelta(hours=1), parsed + timedelta(hours=1)
except ValueError:
return None, None
class AgentOutputTool(BaseTool):
"""Tool for retrieving execution outputs from user's library agents."""
@property
def name(self) -> str:
return "agent_output"
@property
def description(self) -> str:
return """Retrieve execution outputs from agents in the user's library.
Identify the agent using one of:
- agent_name: Fuzzy search in user's library
- library_agent_id: Exact library agent ID
- store_slug: Marketplace format 'username/agent-name'
Select which run to retrieve using:
- execution_id: Specific execution ID
- run_time: 'latest' (default), 'yesterday', 'last week', or ISO date 'YYYY-MM-DD'
"""
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"agent_name": {
"type": "string",
"description": "Agent name to search for in user's library (fuzzy match)",
},
"library_agent_id": {
"type": "string",
"description": "Exact library agent ID",
},
"store_slug": {
"type": "string",
"description": "Marketplace identifier: 'username/agent-slug'",
},
"execution_id": {
"type": "string",
"description": "Specific execution ID to retrieve",
},
"run_time": {
"type": "string",
"description": (
"Time filter: 'latest', 'yesterday', 'last week', or 'YYYY-MM-DD'"
),
},
},
"required": [],
}
@property
def requires_auth(self) -> bool:
return True
async def _resolve_agent(
self,
user_id: str,
agent_name: str | None,
library_agent_id: str | None,
store_slug: str | None,
) -> tuple[LibraryAgent | None, str | None]:
"""
Resolve agent from provided identifiers.
Returns (library_agent, error_message).
"""
# Priority 1: Exact library agent ID
if library_agent_id:
try:
agent = await library_db.get_library_agent(library_agent_id, user_id)
return agent, None
except Exception as e:
logger.warning(f"Failed to get library agent by ID: {e}")
return None, f"Library agent '{library_agent_id}' not found"
# Priority 2: Store slug (username/agent-name)
if store_slug and "/" in store_slug:
username, agent_slug = store_slug.split("/", 1)
graph, _ = await fetch_graph_from_store_slug(username, agent_slug)
if not graph:
return None, f"Agent '{store_slug}' not found in marketplace"
# Find in user's library by graph_id
agent = await library_db.get_library_agent_by_graph_id(user_id, graph.id)
if not agent:
return (
None,
f"Agent '{store_slug}' is not in your library. "
"Add it first to see outputs.",
)
return agent, None
# Priority 3: Fuzzy name search in library
if agent_name:
try:
response = await library_db.list_library_agents(
user_id=user_id,
search_term=agent_name,
page_size=5,
)
if not response.agents:
return (
None,
f"No agents matching '{agent_name}' found in your library",
)
# Return best match (first result from search)
return response.agents[0], None
except Exception as e:
logger.error(f"Error searching library agents: {e}")
return None, f"Error searching for agent: {e}"
return (
None,
"Please specify an agent name, library_agent_id, or store_slug",
)
async def _get_execution(
self,
user_id: str,
graph_id: str,
execution_id: str | None,
time_start: datetime | None,
time_end: datetime | None,
) -> tuple[GraphExecution | None, list[GraphExecutionMeta], str | None]:
"""
Fetch execution(s) based on filters.
Returns (single_execution, available_executions_meta, error_message).
"""
# If specific execution_id provided, fetch it directly
if execution_id:
execution = await execution_db.get_graph_execution(
user_id=user_id,
execution_id=execution_id,
include_node_executions=False,
)
if not execution:
return None, [], f"Execution '{execution_id}' not found"
return execution, [], None
# Get completed executions with time filters
executions = await execution_db.get_graph_executions(
graph_id=graph_id,
user_id=user_id,
statuses=[ExecutionStatus.COMPLETED],
created_time_gte=time_start,
created_time_lte=time_end,
limit=10,
)
if not executions:
return None, [], None # No error, just no executions
# If only one execution, fetch full details
if len(executions) == 1:
full_execution = await execution_db.get_graph_execution(
user_id=user_id,
execution_id=executions[0].id,
include_node_executions=False,
)
return full_execution, [], None
# Multiple executions - return latest with full details, plus list of available
full_execution = await execution_db.get_graph_execution(
user_id=user_id,
execution_id=executions[0].id,
include_node_executions=False,
)
return full_execution, executions, None
def _build_response(
self,
agent: LibraryAgent,
execution: GraphExecution | None,
available_executions: list[GraphExecutionMeta],
session_id: str | None,
) -> AgentOutputResponse:
"""Build the response based on execution data."""
library_agent_link = f"/library/agents/{agent.id}"
if not execution:
return AgentOutputResponse(
message=f"No completed executions found for agent '{agent.name}'",
session_id=session_id,
agent_name=agent.name,
agent_id=agent.graph_id,
library_agent_id=agent.id,
library_agent_link=library_agent_link,
total_executions=0,
)
execution_info = ExecutionOutputInfo(
execution_id=execution.id,
status=execution.status.value,
started_at=execution.started_at,
ended_at=execution.ended_at,
outputs=dict(execution.outputs),
inputs_summary=execution.inputs if execution.inputs else None,
)
available_list = None
if len(available_executions) > 1:
available_list = [
{
"id": e.id,
"status": e.status.value,
"started_at": e.started_at.isoformat() if e.started_at else None,
}
for e in available_executions[:5]
]
message = f"Found execution outputs for agent '{agent.name}'"
if len(available_executions) > 1:
message += (
f". Showing latest of {len(available_executions)} matching executions."
)
return AgentOutputResponse(
message=message,
session_id=session_id,
agent_name=agent.name,
agent_id=agent.graph_id,
library_agent_id=agent.id,
library_agent_link=library_agent_link,
execution=execution_info,
available_executions=available_list,
total_executions=len(available_executions) if available_executions else 1,
)
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Execute the agent_output tool."""
session_id = session.session_id
# Parse and validate input
try:
input_data = AgentOutputInput(**kwargs)
except Exception as e:
logger.error(f"Invalid input: {e}")
return ErrorResponse(
message="Invalid input parameters",
error=str(e),
session_id=session_id,
)
# Ensure user_id is present (should be guaranteed by requires_auth)
if not user_id:
return ErrorResponse(
message="User authentication required",
session_id=session_id,
)
# Check if at least one identifier is provided
if not any(
[
input_data.agent_name,
input_data.library_agent_id,
input_data.store_slug,
input_data.execution_id,
]
):
return ErrorResponse(
message=(
"Please specify at least one of: agent_name, "
"library_agent_id, store_slug, or execution_id"
),
session_id=session_id,
)
# If only execution_id provided, we need to find the agent differently
if (
input_data.execution_id
and not input_data.agent_name
and not input_data.library_agent_id
and not input_data.store_slug
):
# Fetch execution directly to get graph_id
execution = await execution_db.get_graph_execution(
user_id=user_id,
execution_id=input_data.execution_id,
include_node_executions=False,
)
if not execution:
return ErrorResponse(
message=f"Execution '{input_data.execution_id}' not found",
session_id=session_id,
)
# Find library agent by graph_id
agent = await library_db.get_library_agent_by_graph_id(
user_id, execution.graph_id
)
if not agent:
return NoResultsResponse(
message=(
f"Execution found but agent not in your library. "
f"Graph ID: {execution.graph_id}"
),
session_id=session_id,
suggestions=["Add the agent to your library to see more details"],
)
return self._build_response(agent, execution, [], session_id)
# Resolve agent from identifiers
agent, error = await self._resolve_agent(
user_id=user_id,
agent_name=input_data.agent_name or None,
library_agent_id=input_data.library_agent_id or None,
store_slug=input_data.store_slug or None,
)
if error or not agent:
return NoResultsResponse(
message=error or "Agent not found",
session_id=session_id,
suggestions=[
"Check the agent name or ID",
"Make sure the agent is in your library",
],
)
# Parse time expression
time_start, time_end = parse_time_expression(input_data.run_time)
# Fetch execution(s)
execution, available_executions, exec_error = await self._get_execution(
user_id=user_id,
graph_id=agent.graph_id,
execution_id=input_data.execution_id or None,
time_start=time_start,
time_end=time_end,
)
if exec_error:
return ErrorResponse(
message=exec_error,
session_id=session_id,
)
return self._build_response(agent, execution, available_executions, session_id)

View File

@@ -1,151 +0,0 @@
"""Shared agent search functionality for find_agent and find_library_agent tools."""
import logging
from typing import Literal
from backend.api.features.library import db as library_db
from backend.api.features.store import db as store_db
from backend.util.exceptions import DatabaseError, NotFoundError
from .models import (
AgentInfo,
AgentsFoundResponse,
ErrorResponse,
NoResultsResponse,
ToolResponseBase,
)
logger = logging.getLogger(__name__)
SearchSource = Literal["marketplace", "library"]
async def search_agents(
query: str,
source: SearchSource,
session_id: str | None,
user_id: str | None = None,
) -> ToolResponseBase:
"""
Search for agents in marketplace or user library.
Args:
query: Search query string
source: "marketplace" or "library"
session_id: Chat session ID
user_id: User ID (required for library search)
Returns:
AgentsFoundResponse, NoResultsResponse, or ErrorResponse
"""
if not query:
return ErrorResponse(
message="Please provide a search query", session_id=session_id
)
if source == "library" and not user_id:
return ErrorResponse(
message="User authentication required to search library",
session_id=session_id,
)
agents: list[AgentInfo] = []
try:
if source == "marketplace":
logger.info(f"Searching marketplace for: {query}")
results = await store_db.get_store_agents(search_query=query, page_size=5)
for agent in results.agents:
agents.append(
AgentInfo(
id=f"{agent.creator}/{agent.slug}",
name=agent.agent_name,
description=agent.description or "",
source="marketplace",
in_library=False,
creator=agent.creator,
category="general",
rating=agent.rating,
runs=agent.runs,
is_featured=False,
)
)
else: # library
logger.info(f"Searching user library for: {query}")
results = await library_db.list_library_agents(
user_id=user_id, # type: ignore[arg-type]
search_term=query,
page_size=10,
)
for agent in results.agents:
agents.append(
AgentInfo(
id=agent.id,
name=agent.name,
description=agent.description or "",
source="library",
in_library=True,
creator=agent.creator_name,
status=agent.status.value,
can_access_graph=agent.can_access_graph,
has_external_trigger=agent.has_external_trigger,
new_output=agent.new_output,
graph_id=agent.graph_id,
)
)
logger.info(f"Found {len(agents)} agents in {source}")
except NotFoundError:
pass
except DatabaseError as e:
logger.error(f"Error searching {source}: {e}", exc_info=True)
return ErrorResponse(
message=f"Failed to search {source}. Please try again.",
error=str(e),
session_id=session_id,
)
if not agents:
suggestions = (
[
"Try more general terms",
"Browse categories in the marketplace",
"Check spelling",
]
if source == "marketplace"
else [
"Try different keywords",
"Use find_agent to search the marketplace",
"Check your library at /library",
]
)
no_results_msg = (
f"No agents found matching '{query}'. Try different keywords or browse the marketplace."
if source == "marketplace"
else f"No agents matching '{query}' found in your library."
)
return NoResultsResponse(
message=no_results_msg, session_id=session_id, suggestions=suggestions
)
title = f"Found {len(agents)} agent{'s' if len(agents) != 1 else ''} "
title += (
f"for '{query}'"
if source == "marketplace"
else f"in your library for '{query}'"
)
message = (
"Now you have found some options for the user to choose from. "
"You can add a link to a recommended agent at: /marketplace/agent/agent_id "
"Please ask the user if they would like to use any of these agents."
if source == "marketplace"
else "Found agents in the user's library. You can provide a link to view an agent at: "
"/library/agents/{agent_id}. Use agent_output to get execution results, or run_agent to execute."
)
return AgentsFoundResponse(
message=message,
title=title,
agents=agents,
count=len(agents),
session_id=session_id,
)

View File

@@ -1,279 +0,0 @@
"""CreateAgentTool - Creates agents from natural language descriptions."""
import logging
from typing import Any
from backend.api.features.chat.model import ChatSession
from .agent_generator import (
apply_all_fixes,
decompose_goal,
generate_agent,
get_blocks_info,
save_agent_to_library,
validate_agent,
)
from .base import BaseTool
from .models import (
AgentPreviewResponse,
AgentSavedResponse,
ClarificationNeededResponse,
ClarifyingQuestion,
ErrorResponse,
ToolResponseBase,
)
logger = logging.getLogger(__name__)
# Maximum retries for agent generation with validation feedback
MAX_GENERATION_RETRIES = 2
class CreateAgentTool(BaseTool):
"""Tool for creating agents from natural language descriptions."""
@property
def name(self) -> str:
return "create_agent"
@property
def description(self) -> str:
return (
"Create a new agent workflow from a natural language description. "
"First generates a preview, then saves to library if save=true."
)
@property
def requires_auth(self) -> bool:
return True
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"description": {
"type": "string",
"description": (
"Natural language description of what the agent should do. "
"Be specific about inputs, outputs, and the workflow steps."
),
},
"context": {
"type": "string",
"description": (
"Additional context or answers to previous clarifying questions. "
"Include any preferences or constraints mentioned by the user."
),
},
"save": {
"type": "boolean",
"description": (
"Whether to save the agent to the user's library. "
"Default is true. Set to false for preview only."
),
"default": True,
},
},
"required": ["description"],
}
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Execute the create_agent tool.
Flow:
1. Decompose the description into steps (may return clarifying questions)
2. Generate agent JSON from the steps
3. Apply fixes to correct common LLM errors
4. Preview or save based on the save parameter
"""
description = kwargs.get("description", "").strip()
context = kwargs.get("context", "")
save = kwargs.get("save", True)
session_id = session.session_id if session else None
if not description:
return ErrorResponse(
message="Please provide a description of what the agent should do.",
error="Missing description parameter",
session_id=session_id,
)
# Step 1: Decompose goal into steps
try:
decomposition_result = await decompose_goal(description, context)
except ValueError as e:
# Handle missing API key or configuration errors
return ErrorResponse(
message=f"Agent generation is not configured: {str(e)}",
error="configuration_error",
session_id=session_id,
)
if decomposition_result is None:
return ErrorResponse(
message="Failed to analyze the goal. Please try rephrasing.",
error="Decomposition failed",
session_id=session_id,
)
# Check if LLM returned clarifying questions
if decomposition_result.get("type") == "clarifying_questions":
questions = decomposition_result.get("questions", [])
return ClarificationNeededResponse(
message=(
"I need some more information to create this agent. "
"Please answer the following questions:"
),
questions=[
ClarifyingQuestion(
question=q.get("question", ""),
keyword=q.get("keyword", ""),
example=q.get("example"),
)
for q in questions
],
session_id=session_id,
)
# Check for unachievable/vague goals
if decomposition_result.get("type") == "unachievable_goal":
suggested = decomposition_result.get("suggested_goal", "")
reason = decomposition_result.get("reason", "")
return ErrorResponse(
message=(
f"This goal cannot be accomplished with the available blocks. "
f"{reason} "
f"Suggestion: {suggested}"
),
error="unachievable_goal",
details={"suggested_goal": suggested, "reason": reason},
session_id=session_id,
)
if decomposition_result.get("type") == "vague_goal":
suggested = decomposition_result.get("suggested_goal", "")
return ErrorResponse(
message=(
f"The goal is too vague to create a specific workflow. "
f"Suggestion: {suggested}"
),
error="vague_goal",
details={"suggested_goal": suggested},
session_id=session_id,
)
# Step 2: Generate agent JSON with retry on validation failure
blocks_info = get_blocks_info()
agent_json = None
validation_errors = None
for attempt in range(MAX_GENERATION_RETRIES + 1):
# Generate agent (include validation errors from previous attempt)
if attempt == 0:
agent_json = await generate_agent(decomposition_result)
else:
# Retry with validation error feedback
logger.info(
f"Retry {attempt}/{MAX_GENERATION_RETRIES} with validation feedback"
)
retry_instructions = {
**decomposition_result,
"previous_errors": validation_errors,
"retry_instructions": (
"The previous generation had validation errors. "
"Please fix these issues in the new generation:\n"
f"{validation_errors}"
),
}
agent_json = await generate_agent(retry_instructions)
if agent_json is None:
if attempt == MAX_GENERATION_RETRIES:
return ErrorResponse(
message="Failed to generate the agent. Please try again.",
error="Generation failed",
session_id=session_id,
)
continue
# Step 3: Apply fixes to correct common errors
agent_json = apply_all_fixes(agent_json, blocks_info)
# Step 4: Validate the agent
is_valid, validation_errors = validate_agent(agent_json, blocks_info)
if is_valid:
logger.info(f"Agent generated successfully on attempt {attempt + 1}")
break
logger.warning(
f"Validation failed on attempt {attempt + 1}: {validation_errors}"
)
if attempt == MAX_GENERATION_RETRIES:
# Return error with validation details
return ErrorResponse(
message=(
f"Generated agent has validation errors after {MAX_GENERATION_RETRIES + 1} attempts. "
f"Please try rephrasing your request or simplify the workflow."
),
error="validation_failed",
details={"validation_errors": validation_errors},
session_id=session_id,
)
agent_name = agent_json.get("name", "Generated Agent")
agent_description = agent_json.get("description", "")
node_count = len(agent_json.get("nodes", []))
link_count = len(agent_json.get("links", []))
# Step 4: Preview or save
if not save:
return AgentPreviewResponse(
message=(
f"I've generated an agent called '{agent_name}' with {node_count} blocks. "
f"Review it and call create_agent with save=true to save it to your library."
),
agent_json=agent_json,
agent_name=agent_name,
description=agent_description,
node_count=node_count,
link_count=link_count,
session_id=session_id,
)
# Save to library
if not user_id:
return ErrorResponse(
message="You must be logged in to save agents.",
error="auth_required",
session_id=session_id,
)
try:
created_graph, library_agent = await save_agent_to_library(
agent_json, user_id
)
return AgentSavedResponse(
message=f"Agent '{created_graph.name}' has been saved to your library!",
agent_id=created_graph.id,
agent_name=created_graph.name,
library_agent_id=library_agent.id,
library_agent_link=f"/library/{library_agent.id}",
agent_page_link=f"/build?flowID={created_graph.id}",
session_id=session_id,
)
except Exception as e:
return ErrorResponse(
message=f"Failed to save the agent: {str(e)}",
error="save_failed",
details={"exception": str(e)},
session_id=session_id,
)

View File

@@ -1,294 +0,0 @@
"""EditAgentTool - Edits existing agents using natural language."""
import logging
from typing import Any
from backend.api.features.chat.model import ChatSession
from .agent_generator import (
apply_agent_patch,
apply_all_fixes,
generate_agent_patch,
get_agent_as_json,
get_blocks_info,
save_agent_to_library,
validate_agent,
)
from .base import BaseTool
from .models import (
AgentPreviewResponse,
AgentSavedResponse,
ClarificationNeededResponse,
ClarifyingQuestion,
ErrorResponse,
ToolResponseBase,
)
logger = logging.getLogger(__name__)
# Maximum retries for patch generation with validation feedback
MAX_GENERATION_RETRIES = 2
class EditAgentTool(BaseTool):
"""Tool for editing existing agents using natural language."""
@property
def name(self) -> str:
return "edit_agent"
@property
def description(self) -> str:
return (
"Edit an existing agent from the user's library using natural language. "
"Generates a patch to update the agent while preserving unchanged parts."
)
@property
def requires_auth(self) -> bool:
return True
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"agent_id": {
"type": "string",
"description": (
"The ID of the agent to edit. "
"Can be a graph ID or library agent ID."
),
},
"changes": {
"type": "string",
"description": (
"Natural language description of what changes to make. "
"Be specific about what to add, remove, or modify."
),
},
"context": {
"type": "string",
"description": (
"Additional context or answers to previous clarifying questions."
),
},
"save": {
"type": "boolean",
"description": (
"Whether to save the changes. "
"Default is true. Set to false for preview only."
),
"default": True,
},
},
"required": ["agent_id", "changes"],
}
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Execute the edit_agent tool.
Flow:
1. Fetch the current agent
2. Generate a patch based on the requested changes
3. Apply the patch to create an updated agent
4. Preview or save based on the save parameter
"""
agent_id = kwargs.get("agent_id", "").strip()
changes = kwargs.get("changes", "").strip()
context = kwargs.get("context", "")
save = kwargs.get("save", True)
session_id = session.session_id if session else None
if not agent_id:
return ErrorResponse(
message="Please provide the agent ID to edit.",
error="Missing agent_id parameter",
session_id=session_id,
)
if not changes:
return ErrorResponse(
message="Please describe what changes you want to make.",
error="Missing changes parameter",
session_id=session_id,
)
# Step 1: Fetch current agent
current_agent = await get_agent_as_json(agent_id, user_id)
if current_agent is None:
return ErrorResponse(
message=f"Could not find agent with ID '{agent_id}' in your library.",
error="agent_not_found",
session_id=session_id,
)
# Build the update request with context
update_request = changes
if context:
update_request = f"{changes}\n\nAdditional context:\n{context}"
# Step 2: Generate patch with retry on validation failure
blocks_info = get_blocks_info()
updated_agent = None
validation_errors = None
intent = "Applied requested changes"
for attempt in range(MAX_GENERATION_RETRIES + 1):
# Generate patch (include validation errors from previous attempt)
try:
if attempt == 0:
patch_result = await generate_agent_patch(
update_request, current_agent
)
else:
# Retry with validation error feedback
logger.info(
f"Retry {attempt}/{MAX_GENERATION_RETRIES} with validation feedback"
)
retry_request = (
f"{update_request}\n\n"
f"IMPORTANT: The previous edit had validation errors. "
f"Please fix these issues:\n{validation_errors}"
)
patch_result = await generate_agent_patch(
retry_request, current_agent
)
except ValueError as e:
# Handle missing API key or configuration errors
return ErrorResponse(
message=f"Agent generation is not configured: {str(e)}",
error="configuration_error",
session_id=session_id,
)
if patch_result is None:
if attempt == MAX_GENERATION_RETRIES:
return ErrorResponse(
message="Failed to generate changes. Please try rephrasing.",
error="Patch generation failed",
session_id=session_id,
)
continue
# Check if LLM returned clarifying questions
if patch_result.get("type") == "clarifying_questions":
questions = patch_result.get("questions", [])
return ClarificationNeededResponse(
message=(
"I need some more information about the changes. "
"Please answer the following questions:"
),
questions=[
ClarifyingQuestion(
question=q.get("question", ""),
keyword=q.get("keyword", ""),
example=q.get("example"),
)
for q in questions
],
session_id=session_id,
)
# Step 3: Apply patch and fixes
try:
updated_agent = apply_agent_patch(current_agent, patch_result)
updated_agent = apply_all_fixes(updated_agent, blocks_info)
except Exception as e:
if attempt == MAX_GENERATION_RETRIES:
return ErrorResponse(
message=f"Failed to apply changes: {str(e)}",
error="patch_apply_failed",
details={"exception": str(e)},
session_id=session_id,
)
validation_errors = str(e)
continue
# Step 4: Validate the updated agent
is_valid, validation_errors = validate_agent(updated_agent, blocks_info)
if is_valid:
logger.info(f"Agent edited successfully on attempt {attempt + 1}")
intent = patch_result.get("intent", "Applied requested changes")
break
logger.warning(
f"Validation failed on attempt {attempt + 1}: {validation_errors}"
)
if attempt == MAX_GENERATION_RETRIES:
# Return error with validation details
return ErrorResponse(
message=(
f"Updated agent has validation errors after "
f"{MAX_GENERATION_RETRIES + 1} attempts. "
f"Please try rephrasing your request or simplify the changes."
),
error="validation_failed",
details={"validation_errors": validation_errors},
session_id=session_id,
)
# At this point, updated_agent is guaranteed to be set (we return on all failure paths)
assert updated_agent is not None
agent_name = updated_agent.get("name", "Updated Agent")
agent_description = updated_agent.get("description", "")
node_count = len(updated_agent.get("nodes", []))
link_count = len(updated_agent.get("links", []))
# Step 5: Preview or save
if not save:
return AgentPreviewResponse(
message=(
f"I've updated the agent. Changes: {intent}. "
f"The agent now has {node_count} blocks. "
f"Review it and call edit_agent with save=true to save the changes."
),
agent_json=updated_agent,
agent_name=agent_name,
description=agent_description,
node_count=node_count,
link_count=link_count,
session_id=session_id,
)
# Save to library (creates a new version)
if not user_id:
return ErrorResponse(
message="You must be logged in to save agents.",
error="auth_required",
session_id=session_id,
)
try:
created_graph, library_agent = await save_agent_to_library(
updated_agent, user_id, is_update=True
)
return AgentSavedResponse(
message=(
f"Updated agent '{created_graph.name}' has been saved to your library! "
f"Changes: {intent}"
),
agent_id=created_graph.id,
agent_name=created_graph.name,
library_agent_id=library_agent.id,
library_agent_link=f"/library/{library_agent.id}",
agent_page_link=f"/build?flowID={created_graph.id}",
session_id=session_id,
)
except Exception as e:
return ErrorResponse(
message=f"Failed to save the updated agent: {str(e)}",
error="save_failed",
details={"exception": str(e)},
session_id=session_id,
)

View File

@@ -1,46 +0,0 @@
"""Tool for discovering agents from marketplace."""
from typing import Any
from backend.api.features.chat.model import ChatSession
from .agent_search import search_agents
from .base import BaseTool
from .models import ToolResponseBase
class FindAgentTool(BaseTool):
"""Tool for discovering agents from the marketplace."""
@property
def name(self) -> str:
return "find_agent"
@property
def description(self) -> str:
return (
"Discover agents from the marketplace based on capabilities and user needs."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query describing what the user wants to accomplish. Use single keywords for best results.",
},
},
"required": ["query"],
}
async def _execute(
self, user_id: str | None, session: ChatSession, **kwargs
) -> ToolResponseBase:
return await search_agents(
query=kwargs.get("query", "").strip(),
source="marketplace",
session_id=session.session_id,
user_id=user_id,
)

View File

@@ -1,192 +0,0 @@
import logging
from typing import Any
from prisma.enums import ContentType
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools.base import BaseTool, ToolResponseBase
from backend.api.features.chat.tools.models import (
BlockInfoSummary,
BlockInputFieldInfo,
BlockListResponse,
ErrorResponse,
NoResultsResponse,
)
from backend.api.features.store.hybrid_search import unified_hybrid_search
from backend.data.block import get_block
logger = logging.getLogger(__name__)
class FindBlockTool(BaseTool):
"""Tool for searching available blocks."""
@property
def name(self) -> str:
return "find_block"
@property
def description(self) -> str:
return (
"Search for available blocks by name or description. "
"Blocks are reusable components that perform specific tasks like "
"sending emails, making API calls, processing text, etc. "
"IMPORTANT: Use this tool FIRST to get the block's 'id' before calling run_block. "
"The response includes each block's id, required_inputs, and input_schema."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"Search query to find blocks by name or description. "
"Use keywords like 'email', 'http', 'text', 'ai', etc."
),
},
},
"required": ["query"],
}
@property
def requires_auth(self) -> bool:
return True
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Search for blocks matching the query.
Args:
user_id: User ID (required)
session: Chat session
query: Search query
Returns:
BlockListResponse: List of matching blocks
NoResultsResponse: No blocks found
ErrorResponse: Error message
"""
query = kwargs.get("query", "").strip()
session_id = session.session_id
if not query:
return ErrorResponse(
message="Please provide a search query",
session_id=session_id,
)
try:
# Search for blocks using hybrid search
results, total = await unified_hybrid_search(
query=query,
content_types=[ContentType.BLOCK],
page=1,
page_size=10,
)
if not results:
return NoResultsResponse(
message=f"No blocks found for '{query}'",
suggestions=[
"Try broader keywords like 'email', 'http', 'text', 'ai'",
"Check spelling of technical terms",
],
session_id=session_id,
)
# Enrich results with full block information
blocks: list[BlockInfoSummary] = []
for result in results:
block_id = result["content_id"]
block = get_block(block_id)
if block:
# Get input/output schemas
input_schema = {}
output_schema = {}
try:
input_schema = block.input_schema.jsonschema()
except Exception:
pass
try:
output_schema = block.output_schema.jsonschema()
except Exception:
pass
# Get categories from block instance
categories = []
if hasattr(block, "categories") and block.categories:
categories = [cat.value for cat in block.categories]
# Extract required inputs for easier use
required_inputs: list[BlockInputFieldInfo] = []
if input_schema:
properties = input_schema.get("properties", {})
required_fields = set(input_schema.get("required", []))
# Get credential field names to exclude from required inputs
credentials_fields = set(
block.input_schema.get_credentials_fields().keys()
)
for field_name, field_schema in properties.items():
# Skip credential fields - they're handled separately
if field_name in credentials_fields:
continue
required_inputs.append(
BlockInputFieldInfo(
name=field_name,
type=field_schema.get("type", "string"),
description=field_schema.get("description", ""),
required=field_name in required_fields,
default=field_schema.get("default"),
)
)
blocks.append(
BlockInfoSummary(
id=block_id,
name=block.name,
description=block.description or "",
categories=categories,
input_schema=input_schema,
output_schema=output_schema,
required_inputs=required_inputs,
)
)
if not blocks:
return NoResultsResponse(
message=f"No blocks found for '{query}'",
suggestions=[
"Try broader keywords like 'email', 'http', 'text', 'ai'",
],
session_id=session_id,
)
return BlockListResponse(
message=(
f"Found {len(blocks)} block(s) matching '{query}'. "
"To execute a block, use run_block with the block's 'id' field "
"and provide 'input_data' matching the block's input_schema."
),
blocks=blocks,
count=len(blocks),
query=query,
session_id=session_id,
)
except Exception as e:
logger.error(f"Error searching blocks: {e}", exc_info=True)
return ErrorResponse(
message="Failed to search blocks",
error=str(e),
session_id=session_id,
)

View File

@@ -1,52 +0,0 @@
"""Tool for searching agents in the user's library."""
from typing import Any
from backend.api.features.chat.model import ChatSession
from .agent_search import search_agents
from .base import BaseTool
from .models import ToolResponseBase
class FindLibraryAgentTool(BaseTool):
"""Tool for searching agents in the user's library."""
@property
def name(self) -> str:
return "find_library_agent"
@property
def description(self) -> str:
return (
"Search for agents in the user's library. Use this to find agents "
"the user has already added to their library, including agents they "
"created or added from the marketplace."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query to find agents by name or description.",
},
},
"required": ["query"],
}
@property
def requires_auth(self) -> bool:
return True
async def _execute(
self, user_id: str | None, session: ChatSession, **kwargs
) -> ToolResponseBase:
return await search_agents(
query=kwargs.get("query", "").strip(),
source="library",
session_id=session.session_id,
user_id=user_id,
)

View File

@@ -1,148 +0,0 @@
"""GetDocPageTool - Fetch full content of a documentation page."""
import logging
from pathlib import Path
from typing import Any
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools.base import BaseTool
from backend.api.features.chat.tools.models import (
DocPageResponse,
ErrorResponse,
ToolResponseBase,
)
logger = logging.getLogger(__name__)
# Base URL for documentation (can be configured)
DOCS_BASE_URL = "https://docs.agpt.co"
class GetDocPageTool(BaseTool):
"""Tool for fetching full content of a documentation page."""
@property
def name(self) -> str:
return "get_doc_page"
@property
def description(self) -> str:
return (
"Get the full content of a documentation page by its path. "
"Use this after search_docs to read the complete content of a relevant page."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": (
"The path to the documentation file, as returned by search_docs. "
"Example: 'platform/block-sdk-guide.md'"
),
},
},
"required": ["path"],
}
@property
def requires_auth(self) -> bool:
return False # Documentation is public
def _get_docs_root(self) -> Path:
"""Get the documentation root directory."""
this_file = Path(__file__)
project_root = this_file.parent.parent.parent.parent.parent.parent.parent.parent
return project_root / "docs"
def _extract_title(self, content: str, fallback: str) -> str:
"""Extract title from markdown content."""
lines = content.split("\n")
for line in lines:
if line.startswith("# "):
return line[2:].strip()
return fallback
def _make_doc_url(self, path: str) -> str:
"""Create a URL for a documentation page."""
url_path = path.rsplit(".", 1)[0] if "." in path else path
return f"{DOCS_BASE_URL}/{url_path}"
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Fetch full content of a documentation page.
Args:
user_id: User ID (not required for docs)
session: Chat session
path: Path to the documentation file
Returns:
DocPageResponse: Full document content
ErrorResponse: Error message
"""
path = kwargs.get("path", "").strip()
session_id = session.session_id if session else None
if not path:
return ErrorResponse(
message="Please provide a documentation path.",
error="Missing path parameter",
session_id=session_id,
)
# Sanitize path to prevent directory traversal
if ".." in path or path.startswith("/"):
return ErrorResponse(
message="Invalid documentation path.",
error="invalid_path",
session_id=session_id,
)
docs_root = self._get_docs_root()
full_path = docs_root / path
if not full_path.exists():
return ErrorResponse(
message=f"Documentation page not found: {path}",
error="not_found",
session_id=session_id,
)
# Ensure the path is within docs root
try:
full_path.resolve().relative_to(docs_root.resolve())
except ValueError:
return ErrorResponse(
message="Invalid documentation path.",
error="invalid_path",
session_id=session_id,
)
try:
content = full_path.read_text(encoding="utf-8")
title = self._extract_title(content, path)
return DocPageResponse(
message=f"Retrieved documentation page: {title}",
title=title,
path=path,
content=content,
doc_url=self._make_doc_url(path),
session_id=session_id,
)
except Exception as e:
logger.error(f"Failed to read documentation page {path}: {e}")
return ErrorResponse(
message=f"Failed to read documentation page: {str(e)}",
error="read_failed",
session_id=session_id,
)

View File

@@ -1,336 +0,0 @@
"""Pydantic models for tool responses."""
from datetime import datetime
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
from backend.data.model import CredentialsMetaInput
class ResponseType(str, Enum):
"""Types of tool responses."""
AGENTS_FOUND = "agents_found"
AGENT_DETAILS = "agent_details"
SETUP_REQUIREMENTS = "setup_requirements"
EXECUTION_STARTED = "execution_started"
NEED_LOGIN = "need_login"
ERROR = "error"
NO_RESULTS = "no_results"
AGENT_OUTPUT = "agent_output"
UNDERSTANDING_UPDATED = "understanding_updated"
AGENT_PREVIEW = "agent_preview"
AGENT_SAVED = "agent_saved"
CLARIFICATION_NEEDED = "clarification_needed"
BLOCK_LIST = "block_list"
BLOCK_OUTPUT = "block_output"
DOC_SEARCH_RESULTS = "doc_search_results"
DOC_PAGE = "doc_page"
# Base response model
class ToolResponseBase(BaseModel):
"""Base model for all tool responses."""
type: ResponseType
message: str
session_id: str | None = None
# Agent discovery models
class AgentInfo(BaseModel):
"""Information about an agent."""
id: str
name: str
description: str
source: str = Field(description="marketplace or library")
in_library: bool = False
creator: str | None = None
category: str | None = None
rating: float | None = None
runs: int | None = None
is_featured: bool | None = None
status: str | None = None
can_access_graph: bool | None = None
has_external_trigger: bool | None = None
new_output: bool | None = None
graph_id: str | None = None
class AgentsFoundResponse(ToolResponseBase):
"""Response for find_agent tool."""
type: ResponseType = ResponseType.AGENTS_FOUND
title: str = "Available Agents"
agents: list[AgentInfo]
count: int
name: str = "agents_found"
class NoResultsResponse(ToolResponseBase):
"""Response when no agents found."""
type: ResponseType = ResponseType.NO_RESULTS
suggestions: list[str] = []
name: str = "no_results"
# Agent details models
class InputField(BaseModel):
"""Input field specification."""
name: str
type: str = "string"
description: str = ""
required: bool = False
default: Any | None = None
options: list[Any] | None = None
format: str | None = None
class ExecutionOptions(BaseModel):
"""Available execution options for an agent."""
manual: bool = True
scheduled: bool = True
webhook: bool = False
class AgentDetails(BaseModel):
"""Detailed agent information."""
id: str
name: str
description: str
in_library: bool = False
inputs: dict[str, Any] = {}
credentials: list[CredentialsMetaInput] = []
execution_options: ExecutionOptions = Field(default_factory=ExecutionOptions)
trigger_info: dict[str, Any] | None = None
class AgentDetailsResponse(ToolResponseBase):
"""Response for get_details action."""
type: ResponseType = ResponseType.AGENT_DETAILS
agent: AgentDetails
user_authenticated: bool = False
graph_id: str | None = None
graph_version: int | None = None
# Setup info models
class UserReadiness(BaseModel):
"""User readiness status."""
has_all_credentials: bool = False
missing_credentials: dict[str, Any] = {}
ready_to_run: bool = False
class SetupInfo(BaseModel):
"""Complete setup information."""
agent_id: str
agent_name: str
requirements: dict[str, list[Any]] = Field(
default_factory=lambda: {
"credentials": [],
"inputs": [],
"execution_modes": [],
},
)
user_readiness: UserReadiness = Field(default_factory=UserReadiness)
class SetupRequirementsResponse(ToolResponseBase):
"""Response for validate action."""
type: ResponseType = ResponseType.SETUP_REQUIREMENTS
setup_info: SetupInfo
graph_id: str | None = None
graph_version: int | None = None
# Execution models
class ExecutionStartedResponse(ToolResponseBase):
"""Response for run/schedule actions."""
type: ResponseType = ResponseType.EXECUTION_STARTED
execution_id: str
graph_id: str
graph_name: str
library_agent_id: str | None = None
library_agent_link: str | None = None
status: str = "QUEUED"
# Auth/error models
class NeedLoginResponse(ToolResponseBase):
"""Response when login is needed."""
type: ResponseType = ResponseType.NEED_LOGIN
agent_info: dict[str, Any] | None = None
class ErrorResponse(ToolResponseBase):
"""Response for errors."""
type: ResponseType = ResponseType.ERROR
error: str | None = None
details: dict[str, Any] | None = None
# Agent output models
class ExecutionOutputInfo(BaseModel):
"""Summary of a single execution's outputs."""
execution_id: str
status: str
started_at: datetime | None = None
ended_at: datetime | None = None
outputs: dict[str, list[Any]]
inputs_summary: dict[str, Any] | None = None
class AgentOutputResponse(ToolResponseBase):
"""Response for agent_output tool."""
type: ResponseType = ResponseType.AGENT_OUTPUT
agent_name: str
agent_id: str
library_agent_id: str | None = None
library_agent_link: str | None = None
execution: ExecutionOutputInfo | None = None
available_executions: list[dict[str, Any]] | None = None
total_executions: int = 0
# Business understanding models
class UnderstandingUpdatedResponse(ToolResponseBase):
"""Response for add_understanding tool."""
type: ResponseType = ResponseType.UNDERSTANDING_UPDATED
updated_fields: list[str] = Field(default_factory=list)
current_understanding: dict[str, Any] = Field(default_factory=dict)
# Agent generation models
class ClarifyingQuestion(BaseModel):
"""A question that needs user clarification."""
question: str
keyword: str
example: str | None = None
class AgentPreviewResponse(ToolResponseBase):
"""Response for previewing a generated agent before saving."""
type: ResponseType = ResponseType.AGENT_PREVIEW
agent_json: dict[str, Any]
agent_name: str
description: str
node_count: int
link_count: int = 0
class AgentSavedResponse(ToolResponseBase):
"""Response when an agent is saved to the library."""
type: ResponseType = ResponseType.AGENT_SAVED
agent_id: str
agent_name: str
library_agent_id: str
library_agent_link: str
agent_page_link: str # Link to the agent builder/editor page
class ClarificationNeededResponse(ToolResponseBase):
"""Response when the LLM needs more information from the user."""
type: ResponseType = ResponseType.CLARIFICATION_NEEDED
questions: list[ClarifyingQuestion] = Field(default_factory=list)
# Documentation search models
class DocSearchResult(BaseModel):
"""A single documentation search result."""
title: str
path: str
section: str
snippet: str # Short excerpt for UI display
score: float
doc_url: str | None = None
class DocSearchResultsResponse(ToolResponseBase):
"""Response for search_docs tool."""
type: ResponseType = ResponseType.DOC_SEARCH_RESULTS
results: list[DocSearchResult]
count: int
query: str
class DocPageResponse(ToolResponseBase):
"""Response for get_doc_page tool."""
type: ResponseType = ResponseType.DOC_PAGE
title: str
path: str
content: str # Full document content
doc_url: str | None = None
# Block models
class BlockInputFieldInfo(BaseModel):
"""Information about a block input field."""
name: str
type: str
description: str = ""
required: bool = False
default: Any | None = None
class BlockInfoSummary(BaseModel):
"""Summary of a block for search results."""
id: str
name: str
description: str
categories: list[str]
input_schema: dict[str, Any]
output_schema: dict[str, Any]
required_inputs: list[BlockInputFieldInfo] = Field(
default_factory=list,
description="List of required input fields for this block",
)
class BlockListResponse(ToolResponseBase):
"""Response for find_block tool."""
type: ResponseType = ResponseType.BLOCK_LIST
blocks: list[BlockInfoSummary]
count: int
query: str
usage_hint: str = Field(
default="To execute a block, call run_block with block_id set to the block's "
"'id' field and input_data containing the required fields from input_schema."
)
class BlockOutputResponse(ToolResponseBase):
"""Response for run_block tool."""
type: ResponseType = ResponseType.BLOCK_OUTPUT
block_id: str
block_name: str
outputs: dict[str, list[Any]]
success: bool = True

View File

@@ -1,297 +0,0 @@
"""Tool for executing blocks directly."""
import logging
from collections import defaultdict
from typing import Any
from backend.api.features.chat.model import ChatSession
from backend.data.block import get_block
from backend.data.execution import ExecutionContext
from backend.data.model import CredentialsMetaInput
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.util.exceptions import BlockError
from .base import BaseTool
from .models import (
BlockOutputResponse,
ErrorResponse,
SetupInfo,
SetupRequirementsResponse,
ToolResponseBase,
UserReadiness,
)
logger = logging.getLogger(__name__)
class RunBlockTool(BaseTool):
"""Tool for executing a block and returning its outputs."""
@property
def name(self) -> str:
return "run_block"
@property
def description(self) -> str:
return (
"Execute a specific block with the provided input data. "
"IMPORTANT: You MUST call find_block first to get the block's 'id' - "
"do NOT guess or make up block IDs. "
"Use the 'id' from find_block results and provide input_data "
"matching the block's required_inputs."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"block_id": {
"type": "string",
"description": (
"The block's 'id' field from find_block results. "
"NEVER guess this - always get it from find_block first."
),
},
"input_data": {
"type": "object",
"description": (
"Input values for the block. Use the 'required_inputs' field "
"from find_block to see what fields are needed."
),
},
},
"required": ["block_id", "input_data"],
}
@property
def requires_auth(self) -> bool:
return True
async def _check_block_credentials(
self,
user_id: str,
block: Any,
) -> tuple[dict[str, CredentialsMetaInput], list[CredentialsMetaInput]]:
"""
Check if user has required credentials for a block.
Returns:
tuple[matched_credentials, missing_credentials]
"""
matched_credentials: dict[str, CredentialsMetaInput] = {}
missing_credentials: list[CredentialsMetaInput] = []
# Get credential field info from block's input schema
credentials_fields_info = block.input_schema.get_credentials_fields_info()
if not credentials_fields_info:
return matched_credentials, missing_credentials
# Get user's available credentials
creds_manager = IntegrationCredentialsManager()
available_creds = await creds_manager.store.get_all_creds(user_id)
for field_name, field_info in credentials_fields_info.items():
# field_info.provider is a frozenset of acceptable providers
# field_info.supported_types is a frozenset of acceptable types
matching_cred = next(
(
cred
for cred in available_creds
if cred.provider in field_info.provider
and cred.type in field_info.supported_types
),
None,
)
if matching_cred:
matched_credentials[field_name] = CredentialsMetaInput(
id=matching_cred.id,
provider=matching_cred.provider, # type: ignore
type=matching_cred.type,
title=matching_cred.title,
)
else:
# Create a placeholder for the missing credential
provider = next(iter(field_info.provider), "unknown")
cred_type = next(iter(field_info.supported_types), "api_key")
missing_credentials.append(
CredentialsMetaInput(
id=field_name,
provider=provider, # type: ignore
type=cred_type, # type: ignore
title=field_name.replace("_", " ").title(),
)
)
return matched_credentials, missing_credentials
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Execute a block with the given input data.
Args:
user_id: User ID (required)
session: Chat session
block_id: Block UUID to execute
input_data: Input values for the block
Returns:
BlockOutputResponse: Block execution outputs
SetupRequirementsResponse: Missing credentials
ErrorResponse: Error message
"""
block_id = kwargs.get("block_id", "").strip()
input_data = kwargs.get("input_data", {})
session_id = session.session_id
if not block_id:
return ErrorResponse(
message="Please provide a block_id",
session_id=session_id,
)
if not isinstance(input_data, dict):
return ErrorResponse(
message="input_data must be an object",
session_id=session_id,
)
if not user_id:
return ErrorResponse(
message="Authentication required",
session_id=session_id,
)
# Get the block
block = get_block(block_id)
if not block:
return ErrorResponse(
message=f"Block '{block_id}' not found",
session_id=session_id,
)
logger.info(f"Executing block {block.name} ({block_id}) for user {user_id}")
# Check credentials
creds_manager = IntegrationCredentialsManager()
matched_credentials, missing_credentials = await self._check_block_credentials(
user_id, block
)
if missing_credentials:
# Return setup requirements response with missing credentials
missing_creds_dict = {c.id: c.model_dump() for c in missing_credentials}
return SetupRequirementsResponse(
message=(
f"Block '{block.name}' requires credentials that are not configured. "
"Please set up the required credentials before running this block."
),
session_id=session_id,
setup_info=SetupInfo(
agent_id=block_id,
agent_name=block.name,
user_readiness=UserReadiness(
has_all_credentials=False,
missing_credentials=missing_creds_dict,
ready_to_run=False,
),
requirements={
"credentials": [c.model_dump() for c in missing_credentials],
"inputs": self._get_inputs_list(block),
"execution_modes": ["immediate"],
},
),
graph_id=None,
graph_version=None,
)
try:
# Fetch actual credentials and prepare kwargs for block execution
# Create execution context with defaults (blocks may require it)
exec_kwargs: dict[str, Any] = {
"user_id": user_id,
"execution_context": ExecutionContext(),
}
for field_name, cred_meta in matched_credentials.items():
# Inject metadata into input_data (for validation)
if field_name not in input_data:
input_data[field_name] = cred_meta.model_dump()
# Fetch actual credentials and pass as kwargs (for execution)
actual_credentials = await creds_manager.get(
user_id, cred_meta.id, lock=False
)
if actual_credentials:
exec_kwargs[field_name] = actual_credentials
else:
return ErrorResponse(
message=f"Failed to retrieve credentials for {field_name}",
session_id=session_id,
)
# Execute the block and collect outputs
outputs: dict[str, list[Any]] = defaultdict(list)
async for output_name, output_data in block.execute(
input_data,
**exec_kwargs,
):
outputs[output_name].append(output_data)
return BlockOutputResponse(
message=f"Block '{block.name}' executed successfully",
block_id=block_id,
block_name=block.name,
outputs=dict(outputs),
success=True,
session_id=session_id,
)
except BlockError as e:
logger.warning(f"Block execution failed: {e}")
return ErrorResponse(
message=f"Block execution failed: {e}",
error=str(e),
session_id=session_id,
)
except Exception as e:
logger.error(f"Unexpected error executing block: {e}", exc_info=True)
return ErrorResponse(
message=f"Failed to execute block: {str(e)}",
error=str(e),
session_id=session_id,
)
def _get_inputs_list(self, block: Any) -> list[dict[str, Any]]:
"""Extract non-credential inputs from block schema."""
inputs_list = []
schema = block.input_schema.jsonschema()
properties = schema.get("properties", {})
required_fields = set(schema.get("required", []))
# Get credential field names to exclude
credentials_fields = set(block.input_schema.get_credentials_fields().keys())
for field_name, field_schema in properties.items():
# Skip credential fields
if field_name in credentials_fields:
continue
inputs_list.append(
{
"name": field_name,
"title": field_schema.get("title", field_name),
"type": field_schema.get("type", "string"),
"description": field_schema.get("description", ""),
"required": field_name in required_fields,
}
)
return inputs_list

View File

@@ -1,208 +0,0 @@
"""SearchDocsTool - Search documentation using hybrid search."""
import logging
from typing import Any
from prisma.enums import ContentType
from backend.api.features.chat.model import ChatSession
from backend.api.features.chat.tools.base import BaseTool
from backend.api.features.chat.tools.models import (
DocSearchResult,
DocSearchResultsResponse,
ErrorResponse,
NoResultsResponse,
ToolResponseBase,
)
from backend.api.features.store.hybrid_search import unified_hybrid_search
logger = logging.getLogger(__name__)
# Base URL for documentation (can be configured)
DOCS_BASE_URL = "https://docs.agpt.co"
# Maximum number of results to return
MAX_RESULTS = 5
# Snippet length for preview
SNIPPET_LENGTH = 200
class SearchDocsTool(BaseTool):
"""Tool for searching AutoGPT platform documentation."""
@property
def name(self) -> str:
return "search_docs"
@property
def description(self) -> str:
return (
"Search the AutoGPT platform documentation for information about "
"how to use the platform, build agents, configure blocks, and more. "
"Returns relevant documentation sections. Use get_doc_page to read full content."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": (
"Search query to find relevant documentation. "
"Use natural language to describe what you're looking for."
),
},
},
"required": ["query"],
}
@property
def requires_auth(self) -> bool:
return False # Documentation is public
def _create_snippet(self, content: str, max_length: int = SNIPPET_LENGTH) -> str:
"""Create a short snippet from content for preview."""
# Remove markdown formatting for cleaner snippet
clean_content = content.replace("#", "").replace("*", "").replace("`", "")
# Remove extra whitespace
clean_content = " ".join(clean_content.split())
if len(clean_content) <= max_length:
return clean_content
# Truncate at word boundary
truncated = clean_content[:max_length]
last_space = truncated.rfind(" ")
if last_space > max_length // 2:
truncated = truncated[:last_space]
return truncated + "..."
def _make_doc_url(self, path: str) -> str:
"""Create a URL for a documentation page."""
# Remove file extension for URL
url_path = path.rsplit(".", 1)[0] if "." in path else path
return f"{DOCS_BASE_URL}/{url_path}"
async def _execute(
self,
user_id: str | None,
session: ChatSession,
**kwargs,
) -> ToolResponseBase:
"""Search documentation and return relevant sections.
Args:
user_id: User ID (not required for docs)
session: Chat session
query: Search query
Returns:
DocSearchResultsResponse: List of matching documentation sections
NoResultsResponse: No results found
ErrorResponse: Error message
"""
query = kwargs.get("query", "").strip()
session_id = session.session_id if session else None
if not query:
return ErrorResponse(
message="Please provide a search query.",
error="Missing query parameter",
session_id=session_id,
)
try:
# Search using hybrid search for DOCUMENTATION content type only
results, total = await unified_hybrid_search(
query=query,
content_types=[ContentType.DOCUMENTATION],
page=1,
page_size=MAX_RESULTS * 2, # Fetch extra for deduplication
min_score=0.1, # Lower threshold for docs
)
if not results:
return NoResultsResponse(
message=f"No documentation found for '{query}'.",
suggestions=[
"Try different keywords",
"Use more general terms",
"Check for typos in your query",
],
session_id=session_id,
)
# Deduplicate by document path (keep highest scoring section per doc)
seen_docs: dict[str, dict[str, Any]] = {}
for result in results:
metadata = result.get("metadata", {})
doc_path = metadata.get("path", "")
if not doc_path:
continue
# Keep the highest scoring result for each document
if doc_path not in seen_docs:
seen_docs[doc_path] = result
elif result.get("combined_score", 0) > seen_docs[doc_path].get(
"combined_score", 0
):
seen_docs[doc_path] = result
# Sort by score and take top MAX_RESULTS
deduplicated = sorted(
seen_docs.values(),
key=lambda x: x.get("combined_score", 0),
reverse=True,
)[:MAX_RESULTS]
if not deduplicated:
return NoResultsResponse(
message=f"No documentation found for '{query}'.",
suggestions=[
"Try different keywords",
"Use more general terms",
],
session_id=session_id,
)
# Build response
doc_results: list[DocSearchResult] = []
for result in deduplicated:
metadata = result.get("metadata", {})
doc_path = metadata.get("path", "")
doc_title = metadata.get("doc_title", "")
section_title = metadata.get("section_title", "")
searchable_text = result.get("searchable_text", "")
score = result.get("combined_score", 0)
doc_results.append(
DocSearchResult(
title=doc_title or section_title or doc_path,
path=doc_path,
section=section_title,
snippet=self._create_snippet(searchable_text),
score=round(score, 3),
doc_url=self._make_doc_url(doc_path),
)
)
return DocSearchResultsResponse(
message=f"Found {len(doc_results)} relevant documentation sections.",
results=doc_results,
count=len(doc_results),
query=query,
session_id=session_id,
)
except Exception as e:
logger.error(f"Documentation search failed: {e}")
return ErrorResponse(
message=f"Failed to search documentation: {str(e)}",
error="search_failed",
session_id=session_id,
)

View File

@@ -1,833 +0,0 @@
"""
OAuth 2.0 Provider Endpoints
Implements OAuth 2.0 Authorization Code flow with PKCE support.
Flow:
1. User clicks "Login with AutoGPT" in 3rd party app
2. App redirects user to /auth/authorize with client_id, redirect_uri, scope, state
3. User sees consent screen (if not already logged in, redirects to login first)
4. User approves → backend creates authorization code
5. User redirected back to app with code
6. App exchanges code for access/refresh tokens at /api/oauth/token
7. App uses access token to call external API endpoints
"""
import io
import logging
import os
import uuid
from datetime import datetime
from typing import Literal, Optional
from urllib.parse import urlencode
from autogpt_libs.auth import get_user_id
from fastapi import APIRouter, Body, HTTPException, Security, UploadFile, status
from gcloud.aio import storage as async_storage
from PIL import Image
from prisma.enums import APIKeyPermission
from pydantic import BaseModel, Field
from backend.data.auth.oauth import (
InvalidClientError,
InvalidGrantError,
OAuthApplicationInfo,
TokenIntrospectionResult,
consume_authorization_code,
create_access_token,
create_authorization_code,
create_refresh_token,
get_oauth_application,
get_oauth_application_by_id,
introspect_token,
list_user_oauth_applications,
refresh_tokens,
revoke_access_token,
revoke_refresh_token,
update_oauth_application,
validate_client_credentials,
validate_redirect_uri,
validate_scopes,
)
from backend.util.settings import Settings
from backend.util.virus_scanner import scan_content_safe
settings = Settings()
logger = logging.getLogger(__name__)
router = APIRouter()
# ============================================================================
# Request/Response Models
# ============================================================================
class TokenResponse(BaseModel):
"""OAuth 2.0 token response"""
token_type: Literal["Bearer"] = "Bearer"
access_token: str
access_token_expires_at: datetime
refresh_token: str
refresh_token_expires_at: datetime
scopes: list[str]
class ErrorResponse(BaseModel):
"""OAuth 2.0 error response"""
error: str
error_description: Optional[str] = None
class OAuthApplicationPublicInfo(BaseModel):
"""Public information about an OAuth application (for consent screen)"""
name: str
description: Optional[str] = None
logo_url: Optional[str] = None
scopes: list[str]
# ============================================================================
# Application Info Endpoint
# ============================================================================
@router.get(
"/app/{client_id}",
responses={
404: {"description": "Application not found or disabled"},
},
)
async def get_oauth_app_info(
client_id: str, user_id: str = Security(get_user_id)
) -> OAuthApplicationPublicInfo:
"""
Get public information about an OAuth application.
This endpoint is used by the consent screen to display application details
to the user before they authorize access.
Returns:
- name: Application name
- description: Application description (if provided)
- scopes: List of scopes the application is allowed to request
"""
app = await get_oauth_application(client_id)
if not app or not app.is_active:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Application not found",
)
return OAuthApplicationPublicInfo(
name=app.name,
description=app.description,
logo_url=app.logo_url,
scopes=[s.value for s in app.scopes],
)
# ============================================================================
# Authorization Endpoint
# ============================================================================
class AuthorizeRequest(BaseModel):
"""OAuth 2.0 authorization request"""
client_id: str = Field(description="Client identifier")
redirect_uri: str = Field(description="Redirect URI")
scopes: list[str] = Field(description="List of scopes")
state: str = Field(description="Anti-CSRF token from client")
response_type: str = Field(
default="code", description="Must be 'code' for authorization code flow"
)
code_challenge: str = Field(description="PKCE code challenge (required)")
code_challenge_method: Literal["S256", "plain"] = Field(
default="S256", description="PKCE code challenge method (S256 recommended)"
)
class AuthorizeResponse(BaseModel):
"""OAuth 2.0 authorization response with redirect URL"""
redirect_url: str = Field(description="URL to redirect the user to")
@router.post("/authorize")
async def authorize(
request: AuthorizeRequest = Body(),
user_id: str = Security(get_user_id),
) -> AuthorizeResponse:
"""
OAuth 2.0 Authorization Endpoint
User must be logged in (authenticated with Supabase JWT).
This endpoint creates an authorization code and returns a redirect URL.
PKCE (Proof Key for Code Exchange) is REQUIRED for all authorization requests.
The frontend consent screen should call this endpoint after the user approves,
then redirect the user to the returned `redirect_url`.
Request Body:
- client_id: The OAuth application's client ID
- redirect_uri: Where to redirect after authorization (must match registered URI)
- scopes: List of permissions (e.g., "EXECUTE_GRAPH READ_GRAPH")
- state: Anti-CSRF token provided by client (will be returned in redirect)
- response_type: Must be "code" (for authorization code flow)
- code_challenge: PKCE code challenge (required)
- code_challenge_method: "S256" (recommended) or "plain"
Returns:
- redirect_url: The URL to redirect the user to (includes authorization code)
Error cases return a redirect_url with error parameters, or raise HTTPException
for critical errors (like invalid redirect_uri).
"""
try:
# Validate response_type
if request.response_type != "code":
return _error_redirect_url(
request.redirect_uri,
request.state,
"unsupported_response_type",
"Only 'code' response type is supported",
)
# Get application
app = await get_oauth_application(request.client_id)
if not app:
return _error_redirect_url(
request.redirect_uri,
request.state,
"invalid_client",
"Unknown client_id",
)
if not app.is_active:
return _error_redirect_url(
request.redirect_uri,
request.state,
"invalid_client",
"Application is not active",
)
# Validate redirect URI
if not validate_redirect_uri(app, request.redirect_uri):
# For invalid redirect_uri, we can't redirect safely
# Must return error instead
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
"Invalid redirect_uri. "
f"Must be one of: {', '.join(app.redirect_uris)}"
),
)
# Parse and validate scopes
try:
requested_scopes = [APIKeyPermission(s.strip()) for s in request.scopes]
except ValueError as e:
return _error_redirect_url(
request.redirect_uri,
request.state,
"invalid_scope",
f"Invalid scope: {e}",
)
if not requested_scopes:
return _error_redirect_url(
request.redirect_uri,
request.state,
"invalid_scope",
"At least one scope is required",
)
if not validate_scopes(app, requested_scopes):
return _error_redirect_url(
request.redirect_uri,
request.state,
"invalid_scope",
"Application is not authorized for all requested scopes. "
f"Allowed: {', '.join(s.value for s in app.scopes)}",
)
# Create authorization code
auth_code = await create_authorization_code(
application_id=app.id,
user_id=user_id,
scopes=requested_scopes,
redirect_uri=request.redirect_uri,
code_challenge=request.code_challenge,
code_challenge_method=request.code_challenge_method,
)
# Build redirect URL with authorization code
params = {
"code": auth_code.code,
"state": request.state,
}
redirect_url = f"{request.redirect_uri}?{urlencode(params)}"
logger.info(
f"Authorization code issued for user #{user_id} "
f"and app {app.name} (#{app.id})"
)
return AuthorizeResponse(redirect_url=redirect_url)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in authorization endpoint: {e}", exc_info=True)
return _error_redirect_url(
request.redirect_uri,
request.state,
"server_error",
"An unexpected error occurred",
)
def _error_redirect_url(
redirect_uri: str,
state: str,
error: str,
error_description: Optional[str] = None,
) -> AuthorizeResponse:
"""Helper to build redirect URL with OAuth error parameters"""
params = {
"error": error,
"state": state,
}
if error_description:
params["error_description"] = error_description
redirect_url = f"{redirect_uri}?{urlencode(params)}"
return AuthorizeResponse(redirect_url=redirect_url)
# ============================================================================
# Token Endpoint
# ============================================================================
class TokenRequestByCode(BaseModel):
grant_type: Literal["authorization_code"]
code: str = Field(description="Authorization code")
redirect_uri: str = Field(
description="Redirect URI (must match authorization request)"
)
client_id: str
client_secret: str
code_verifier: str = Field(description="PKCE code verifier")
class TokenRequestByRefreshToken(BaseModel):
grant_type: Literal["refresh_token"]
refresh_token: str
client_id: str
client_secret: str
@router.post("/token")
async def token(
request: TokenRequestByCode | TokenRequestByRefreshToken = Body(),
) -> TokenResponse:
"""
OAuth 2.0 Token Endpoint
Exchanges authorization code or refresh token for access token.
Grant Types:
1. authorization_code: Exchange authorization code for tokens
- Required: grant_type, code, redirect_uri, client_id, client_secret
- Optional: code_verifier (required if PKCE was used)
2. refresh_token: Exchange refresh token for new access token
- Required: grant_type, refresh_token, client_id, client_secret
Returns:
- access_token: Bearer token for API access (1 hour TTL)
- token_type: "Bearer"
- expires_in: Seconds until access token expires
- refresh_token: Token for refreshing access (30 days TTL)
- scopes: List of scopes
"""
# Validate client credentials
try:
app = await validate_client_credentials(
request.client_id, request.client_secret
)
except InvalidClientError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(e),
)
# Handle authorization_code grant
if request.grant_type == "authorization_code":
# Consume authorization code
try:
user_id, scopes = await consume_authorization_code(
code=request.code,
application_id=app.id,
redirect_uri=request.redirect_uri,
code_verifier=request.code_verifier,
)
except InvalidGrantError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
# Create access and refresh tokens
access_token = await create_access_token(app.id, user_id, scopes)
refresh_token = await create_refresh_token(app.id, user_id, scopes)
logger.info(
f"Access token issued for user #{user_id} and app {app.name} (#{app.id})"
"via authorization code"
)
if not access_token.token or not refresh_token.token:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to generate tokens",
)
return TokenResponse(
token_type="Bearer",
access_token=access_token.token.get_secret_value(),
access_token_expires_at=access_token.expires_at,
refresh_token=refresh_token.token.get_secret_value(),
refresh_token_expires_at=refresh_token.expires_at,
scopes=list(s.value for s in scopes),
)
# Handle refresh_token grant
elif request.grant_type == "refresh_token":
# Refresh access token
try:
new_access_token, new_refresh_token = await refresh_tokens(
request.refresh_token, app.id
)
except InvalidGrantError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
logger.info(
f"Tokens refreshed for user #{new_access_token.user_id} "
f"by app {app.name} (#{app.id})"
)
if not new_access_token.token or not new_refresh_token.token:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to generate tokens",
)
return TokenResponse(
token_type="Bearer",
access_token=new_access_token.token.get_secret_value(),
access_token_expires_at=new_access_token.expires_at,
refresh_token=new_refresh_token.token.get_secret_value(),
refresh_token_expires_at=new_refresh_token.expires_at,
scopes=list(s.value for s in new_access_token.scopes),
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Unsupported grant_type: {request.grant_type}. "
"Must be 'authorization_code' or 'refresh_token'",
)
# ============================================================================
# Token Introspection Endpoint
# ============================================================================
@router.post("/introspect")
async def introspect(
token: str = Body(description="Token to introspect"),
token_type_hint: Optional[Literal["access_token", "refresh_token"]] = Body(
None, description="Hint about token type ('access_token' or 'refresh_token')"
),
client_id: str = Body(description="Client identifier"),
client_secret: str = Body(description="Client secret"),
) -> TokenIntrospectionResult:
"""
OAuth 2.0 Token Introspection Endpoint (RFC 7662)
Allows clients to check if a token is valid and get its metadata.
Returns:
- active: Whether the token is currently active
- scopes: List of authorized scopes (if active)
- client_id: The client the token was issued to (if active)
- user_id: The user the token represents (if active)
- exp: Expiration timestamp (if active)
- token_type: "access_token" or "refresh_token" (if active)
"""
# Validate client credentials
try:
await validate_client_credentials(client_id, client_secret)
except InvalidClientError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(e),
)
# Introspect the token
return await introspect_token(token, token_type_hint)
# ============================================================================
# Token Revocation Endpoint
# ============================================================================
@router.post("/revoke")
async def revoke(
token: str = Body(description="Token to revoke"),
token_type_hint: Optional[Literal["access_token", "refresh_token"]] = Body(
None, description="Hint about token type ('access_token' or 'refresh_token')"
),
client_id: str = Body(description="Client identifier"),
client_secret: str = Body(description="Client secret"),
):
"""
OAuth 2.0 Token Revocation Endpoint (RFC 7009)
Allows clients to revoke an access or refresh token.
Note: Revoking a refresh token does NOT revoke associated access tokens.
Revoking an access token does NOT revoke the associated refresh token.
"""
# Validate client credentials
try:
app = await validate_client_credentials(client_id, client_secret)
except InvalidClientError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=str(e),
)
# Try to revoke as access token first
# Note: We pass app.id to ensure the token belongs to the authenticated app
if token_type_hint != "refresh_token":
revoked = await revoke_access_token(token, app.id)
if revoked:
logger.info(
f"Access token revoked for app {app.name} (#{app.id}); "
f"user #{revoked.user_id}"
)
return {"status": "ok"}
# Try to revoke as refresh token
revoked = await revoke_refresh_token(token, app.id)
if revoked:
logger.info(
f"Refresh token revoked for app {app.name} (#{app.id}); "
f"user #{revoked.user_id}"
)
return {"status": "ok"}
# Per RFC 7009, revocation endpoint returns 200 even if token not found
# or if token belongs to a different application.
# This prevents token scanning attacks.
logger.warning(f"Unsuccessful token revocation attempt by app {app.name} #{app.id}")
return {"status": "ok"}
# ============================================================================
# Application Management Endpoints (for app owners)
# ============================================================================
@router.get("/apps/mine")
async def list_my_oauth_apps(
user_id: str = Security(get_user_id),
) -> list[OAuthApplicationInfo]:
"""
List all OAuth applications owned by the current user.
Returns a list of OAuth applications with their details including:
- id, name, description, logo_url
- client_id (public identifier)
- redirect_uris, grant_types, scopes
- is_active status
- created_at, updated_at timestamps
Note: client_secret is never returned for security reasons.
"""
return await list_user_oauth_applications(user_id)
@router.patch("/apps/{app_id}/status")
async def update_app_status(
app_id: str,
user_id: str = Security(get_user_id),
is_active: bool = Body(description="Whether the app should be active", embed=True),
) -> OAuthApplicationInfo:
"""
Enable or disable an OAuth application.
Only the application owner can update the status.
When disabled, the application cannot be used for new authorizations
and existing access tokens will fail validation.
Returns the updated application info.
"""
updated_app = await update_oauth_application(
app_id=app_id,
owner_id=user_id,
is_active=is_active,
)
if not updated_app:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Application not found or you don't have permission to update it",
)
action = "enabled" if is_active else "disabled"
logger.info(f"OAuth app {updated_app.name} (#{app_id}) {action} by user #{user_id}")
return updated_app
class UpdateAppLogoRequest(BaseModel):
logo_url: str = Field(description="URL of the uploaded logo image")
@router.patch("/apps/{app_id}/logo")
async def update_app_logo(
app_id: str,
request: UpdateAppLogoRequest = Body(),
user_id: str = Security(get_user_id),
) -> OAuthApplicationInfo:
"""
Update the logo URL for an OAuth application.
Only the application owner can update the logo.
The logo should be uploaded first using the media upload endpoint,
then this endpoint is called with the resulting URL.
Logo requirements:
- Must be square (1:1 aspect ratio)
- Minimum 512x512 pixels
- Maximum 2048x2048 pixels
Returns the updated application info.
"""
if (
not (app := await get_oauth_application_by_id(app_id))
or app.owner_id != user_id
):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="OAuth App not found",
)
# Delete the current app logo file (if any and it's in our cloud storage)
await _delete_app_current_logo_file(app)
updated_app = await update_oauth_application(
app_id=app_id,
owner_id=user_id,
logo_url=request.logo_url,
)
if not updated_app:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Application not found or you don't have permission to update it",
)
logger.info(
f"OAuth app {updated_app.name} (#{app_id}) logo updated by user #{user_id}"
)
return updated_app
# Logo upload constraints
LOGO_MIN_SIZE = 512
LOGO_MAX_SIZE = 2048
LOGO_ALLOWED_TYPES = {"image/jpeg", "image/png", "image/webp"}
LOGO_MAX_FILE_SIZE = 3 * 1024 * 1024 # 3MB
@router.post("/apps/{app_id}/logo/upload")
async def upload_app_logo(
app_id: str,
file: UploadFile,
user_id: str = Security(get_user_id),
) -> OAuthApplicationInfo:
"""
Upload a logo image for an OAuth application.
Requirements:
- Image must be square (1:1 aspect ratio)
- Minimum 512x512 pixels
- Maximum 2048x2048 pixels
- Allowed formats: JPEG, PNG, WebP
- Maximum file size: 3MB
The image is uploaded to cloud storage and the app's logoUrl is updated.
Returns the updated application info.
"""
# Verify ownership to reduce vulnerability to DoS(torage) or DoM(oney) attacks
if (
not (app := await get_oauth_application_by_id(app_id))
or app.owner_id != user_id
):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="OAuth App not found",
)
# Check GCS configuration
if not settings.config.media_gcs_bucket_name:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Media storage is not configured",
)
# Validate content type
content_type = file.content_type
if content_type not in LOGO_ALLOWED_TYPES:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid file type. Allowed: JPEG, PNG, WebP. Got: {content_type}",
)
# Read file content
try:
file_bytes = await file.read()
except Exception as e:
logger.error(f"Error reading logo file: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Failed to read uploaded file",
)
# Check file size
if len(file_bytes) > LOGO_MAX_FILE_SIZE:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=(
"File too large. "
f"Maximum size is {LOGO_MAX_FILE_SIZE // 1024 // 1024}MB"
),
)
# Validate image dimensions
try:
image = Image.open(io.BytesIO(file_bytes))
width, height = image.size
if width != height:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Logo must be square. Got {width}x{height}",
)
if width < LOGO_MIN_SIZE:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Logo too small. Minimum {LOGO_MIN_SIZE}x{LOGO_MIN_SIZE}. "
f"Got {width}x{height}",
)
if width > LOGO_MAX_SIZE:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Logo too large. Maximum {LOGO_MAX_SIZE}x{LOGO_MAX_SIZE}. "
f"Got {width}x{height}",
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error validating logo image: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid image file",
)
# Scan for viruses
filename = file.filename or "logo"
await scan_content_safe(file_bytes, filename=filename)
# Generate unique filename
file_ext = os.path.splitext(filename)[1].lower() or ".png"
unique_filename = f"{uuid.uuid4()}{file_ext}"
storage_path = f"oauth-apps/{app_id}/logo/{unique_filename}"
# Upload to GCS
try:
async with async_storage.Storage() as async_client:
bucket_name = settings.config.media_gcs_bucket_name
await async_client.upload(
bucket_name, storage_path, file_bytes, content_type=content_type
)
logo_url = f"https://storage.googleapis.com/{bucket_name}/{storage_path}"
except Exception as e:
logger.error(f"Error uploading logo to GCS: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to upload logo",
)
# Delete the current app logo file (if any and it's in our cloud storage)
await _delete_app_current_logo_file(app)
# Update the app with the new logo URL
updated_app = await update_oauth_application(
app_id=app_id,
owner_id=user_id,
logo_url=logo_url,
)
if not updated_app:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Application not found or you don't have permission to update it",
)
logger.info(
f"OAuth app {updated_app.name} (#{app_id}) logo uploaded by user #{user_id}"
)
return updated_app
async def _delete_app_current_logo_file(app: OAuthApplicationInfo):
"""
Delete the current logo file for the given app, if there is one in our cloud storage
"""
bucket_name = settings.config.media_gcs_bucket_name
storage_base_url = f"https://storage.googleapis.com/{bucket_name}/"
if app.logo_url and app.logo_url.startswith(storage_base_url):
# Parse blob path from URL: https://storage.googleapis.com/{bucket}/{path}
old_path = app.logo_url.replace(storage_base_url, "")
try:
async with async_storage.Storage() as async_client:
await async_client.delete(bucket_name, old_path)
logger.info(f"Deleted old logo for OAuth app #{app.id}: {old_path}")
except Exception as e:
# Log but don't fail - the new logo was uploaded successfully
logger.warning(
f"Failed to delete old logo for OAuth app #{app.id}: {e}", exc_info=e
)

File diff suppressed because it is too large Load Diff

Some files were not shown because too many files have changed in this diff Show More