feat(platform): Add long-running agent framework

Implements a framework for long-running agents based on Anthropic's
"Effective Harnesses for Long-Running Agents" research. This enables
agents to work effectively across multiple context windows.

Key components:
- Data models for features, progress tracking, and session state
- Session manager for file-based and database-backed state management
- Blocks for initializer and coding agents:
  - LongRunningInitializerBlock: First-run setup with feature list
  - LongRunningCodingBlock: Incremental progress in coding sessions
  - FeatureCompleteBlock/FeatureFailedBlock: Feature status management
  - SessionEndBlock: Proper session cleanup
  - Progress tracking blocks (LogProgress, GitCommit, etc.)
- Database schema for persistent long-running sessions
- REST API endpoints for session, feature, and progress management

The framework addresses common agent failure modes:
1. Declaring victory too early - Uses feature list tracking
2. Leaving environment in buggy state - Requires commits and testing
3. Marking features complete prematurely - Verification steps
4. Time spent understanding codebase - Progress logs and git history
This commit is contained in:
Claude
2025-12-10 05:08:56 +00:00
parent c4eb7edb65
commit 039a2ba7b5
12 changed files with 3760 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
# Long-running agent framework blocks
# Based on Anthropic's "Effective Harnesses for Long-Running Agents" research
# https://www.anthropic.com/engineering/effective-harnesses-for-long-running-agents

View File

@@ -0,0 +1,537 @@
"""
Long-Running Coding Block.
This block is used for subsequent sessions after initialization.
It follows the pattern of:
1. Getting oriented (reading progress, git logs, feature list)
2. Choosing a feature to work on
3. Making incremental progress
4. Testing the feature
5. Committing changes and updating progress
Based on Anthropic's "Effective Harnesses for Long-Running Agents" research.
"""
import logging
import uuid
from typing import Any, Optional
from backend.data.block import Block, BlockCategory, BlockOutput, BlockType
from backend.data.model import SchemaField
from .models import (
FeatureStatus,
ProgressEntry,
ProgressEntryType,
SessionStatus,
)
from .session_manager import SessionManager
logger = logging.getLogger(__name__)
class LongRunningCodingBlock(Block):
"""
Start a coding session for a long-running agent project.
This block is used AFTER initialization to work on features incrementally.
It provides context about the project state and the next feature to work on.
The coding agent workflow:
1. Read the feature list and progress log
2. Choose the highest-priority incomplete feature
3. Implement the feature
4. Test the feature thoroughly
5. Mark the feature as complete and commit changes
"""
class Input(Block.Input):
working_directory: str = SchemaField(
description="Directory where the project is located"
)
verify_basic_functionality: bool = SchemaField(
default=True,
description="Run basic verification before starting new work",
)
max_context_entries: int = SchemaField(
default=20,
description="Maximum number of progress entries to include in context",
)
class Output(Block.Output):
session_id: str = SchemaField(
description="Unique identifier for this session"
)
project_name: str = SchemaField(
description="Name of the project"
)
project_description: str = SchemaField(
description="Description of the project"
)
current_feature: dict = SchemaField(
description="The feature to work on in this session"
)
feature_summary: dict = SchemaField(
description="Summary of feature statuses (passing, failing, pending, etc.)"
)
recent_progress: list[dict] = SchemaField(
description="Recent progress entries for context"
)
recent_commits: list[dict] = SchemaField(
description="Recent git commits for context"
)
init_script_output: str = SchemaField(
description="Output from running the init script (if verify_basic_functionality is True)",
default="",
)
is_project_complete: bool = SchemaField(
description="Whether all features are passing"
)
status: str = SchemaField(
description="Status of session initialization"
)
error: str = SchemaField(
description="Error message if session start failed",
default="",
)
def __init__(self):
super().__init__(
id="b2c3d4e5-f6a7-8901-bcde-f12345678901",
description="Start a coding session for a long-running agent project, providing context and the next feature to work on",
input_schema=LongRunningCodingBlock.Input,
output_schema=LongRunningCodingBlock.Output,
categories={BlockCategory.AGENT},
block_type=BlockType.AGENT,
)
async def run(
self,
input_data: Input,
**kwargs,
) -> BlockOutput:
session_id = str(uuid.uuid4())[:8]
try:
# Initialize session manager
manager = SessionManager(input_data.working_directory)
# Load session state
state = manager.load_session_state()
if not state:
yield "error", f"No session state found at {input_data.working_directory}. Run initializer first."
yield "status", "failed"
return
if state.status == SessionStatus.INITIALIZING:
yield "error", "Project initialization not complete. Run initializer first."
yield "status", "failed"
return
# Update session status
manager.update_session_status(SessionStatus.WORKING, session_id)
# Start session log
manager.start_session_log(session_id)
# Run verification if requested
init_output = ""
if input_data.verify_basic_functionality:
success, output = manager.run_init_script()
init_output = output
if not success:
logger.warning(f"Init script failed: {output}")
manager.add_progress_entry(
ProgressEntry(
id=str(uuid.uuid4()),
session_id=session_id,
entry_type=ProgressEntryType.ERROR,
title="Init script failed",
description=output[:500],
)
)
# Load feature list
feature_list = manager.load_feature_list()
if not feature_list:
yield "error", "No feature list found. Run initializer first."
yield "status", "failed"
return
# Get next feature to work on
next_feature = feature_list.get_next_feature()
current_feature_dict: dict[str, Any] = {}
if next_feature:
# Mark feature as in progress
manager.update_feature_status(
next_feature.id,
FeatureStatus.IN_PROGRESS,
session_id,
)
current_feature_dict = {
"id": next_feature.id,
"category": next_feature.category.value,
"description": next_feature.description,
"steps": next_feature.steps,
"priority": next_feature.priority,
"dependencies": next_feature.dependencies,
"notes": next_feature.notes,
}
# Log feature start
manager.add_progress_entry(
ProgressEntry(
id=str(uuid.uuid4()),
session_id=session_id,
entry_type=ProgressEntryType.FEATURE_START,
title=f"Started working on: {next_feature.description[:50]}",
feature_id=next_feature.id,
)
)
# Get context
feature_summary = feature_list.get_progress_summary()
progress_log = manager.load_progress_log()
recent_progress = [
e.model_dump(mode="json")
for e in progress_log.get_recent_entries(input_data.max_context_entries)
]
recent_commits = manager.get_git_log(10)
# Check if complete
is_complete = feature_list.is_complete()
yield "session_id", session_id
yield "project_name", state.project_name
yield "project_description", state.project_description
yield "current_feature", current_feature_dict
yield "feature_summary", feature_summary
yield "recent_progress", recent_progress
yield "recent_commits", recent_commits
yield "init_script_output", init_output
yield "is_project_complete", is_complete
yield "status", "success" if not is_complete else "complete"
except Exception as e:
logger.exception(f"Failed to start coding session: {e}")
yield "error", str(e)
yield "status", "failed"
class FeatureCompleteBlock(Block):
"""
Mark a feature as complete and commit changes.
This block should be used after successfully implementing and testing a feature.
It updates the feature status, creates a git commit, and logs progress.
"""
class Input(Block.Input):
working_directory: str = SchemaField(
description="Directory where the project is located"
)
session_id: str = SchemaField(
description="Current session ID"
)
feature_id: str = SchemaField(
description="ID of the feature to mark as complete"
)
commit_message: str = SchemaField(
description="Git commit message describing the changes"
)
test_results: str = SchemaField(
default="",
description="Description of how the feature was tested",
)
files_changed: list[str] = SchemaField(
default=[],
description="List of files that were modified",
)
class Output(Block.Output):
success: bool = SchemaField(
description="Whether the feature was marked complete successfully"
)
commit_hash: str = SchemaField(
description="Git commit hash if commit was successful",
default="",
)
next_feature: dict = SchemaField(
description="The next feature to work on (if any)"
)
remaining_features: int = SchemaField(
description="Number of features still pending"
)
error: str = SchemaField(
description="Error message if operation failed",
default="",
)
def __init__(self):
super().__init__(
id="c3d4e5f6-a7b8-9012-cdef-123456789012",
description="Mark a feature as complete, commit changes, and get the next feature to work on",
input_schema=FeatureCompleteBlock.Input,
output_schema=FeatureCompleteBlock.Output,
categories={BlockCategory.AGENT},
block_type=BlockType.STANDARD,
)
async def run(
self,
input_data: Input,
**kwargs,
) -> BlockOutput:
try:
manager = SessionManager(input_data.working_directory)
# Update feature status
success = manager.update_feature_status(
input_data.feature_id,
FeatureStatus.PASSING,
input_data.session_id,
notes=f"Tested: {input_data.test_results}" if input_data.test_results else None,
)
if not success:
yield "error", f"Failed to update feature status for {input_data.feature_id}"
yield "success", False
return
# Create git commit
commit_hash = manager.git_commit(
input_data.commit_message,
input_data.files_changed if input_data.files_changed else None,
)
# Log progress
manager.add_progress_entry(
ProgressEntry(
id=str(uuid.uuid4()),
session_id=input_data.session_id,
entry_type=ProgressEntryType.FEATURE_COMPLETE,
title=f"Completed feature: {input_data.feature_id}",
description=input_data.test_results,
feature_id=input_data.feature_id,
git_commit_hash=commit_hash,
files_changed=input_data.files_changed,
)
)
# Get next feature
feature_list = manager.load_feature_list()
next_feature = feature_list.get_next_feature() if feature_list else None
next_feature_dict: dict[str, Any] = {}
if next_feature:
next_feature_dict = {
"id": next_feature.id,
"category": next_feature.category.value,
"description": next_feature.description,
"steps": next_feature.steps,
"priority": next_feature.priority,
}
# Count remaining
remaining = 0
if feature_list:
summary = feature_list.get_progress_summary()
remaining = summary.get("pending", 0) + summary.get("failing", 0)
yield "success", True
yield "commit_hash", commit_hash or ""
yield "next_feature", next_feature_dict
yield "remaining_features", remaining
except Exception as e:
logger.exception(f"Failed to complete feature: {e}")
yield "error", str(e)
yield "success", False
class FeatureFailedBlock(Block):
"""
Mark a feature as failed and log the issue.
Use this block when a feature cannot be completed due to blockers,
bugs, or other issues. It preserves the work done and notes for the next session.
"""
class Input(Block.Input):
working_directory: str = SchemaField(
description="Directory where the project is located"
)
session_id: str = SchemaField(
description="Current session ID"
)
feature_id: str = SchemaField(
description="ID of the feature that failed"
)
failure_reason: str = SchemaField(
description="Description of why the feature failed"
)
commit_partial_work: bool = SchemaField(
default=True,
description="Whether to commit any partial work done",
)
commit_message: str = SchemaField(
default="",
description="Git commit message if committing partial work",
)
class Output(Block.Output):
success: bool = SchemaField(
description="Whether the failure was logged successfully"
)
commit_hash: str = SchemaField(
description="Git commit hash if partial work was committed",
default="",
)
error: str = SchemaField(
description="Error message if operation failed",
default="",
)
def __init__(self):
super().__init__(
id="d4e5f6a7-b8c9-0123-def0-123456789abc",
description="Mark a feature as failed, log the issue, and optionally commit partial work",
input_schema=FeatureFailedBlock.Input,
output_schema=FeatureFailedBlock.Output,
categories={BlockCategory.AGENT},
block_type=BlockType.STANDARD,
)
async def run(
self,
input_data: Input,
**kwargs,
) -> BlockOutput:
try:
manager = SessionManager(input_data.working_directory)
# Update feature status
success = manager.update_feature_status(
input_data.feature_id,
FeatureStatus.FAILING,
input_data.session_id,
notes=f"Failed: {input_data.failure_reason}",
)
if not success:
yield "error", f"Failed to update feature status for {input_data.feature_id}"
yield "success", False
return
# Commit partial work if requested
commit_hash = ""
if input_data.commit_partial_work and input_data.commit_message:
commit_hash = manager.git_commit(input_data.commit_message) or ""
# Log progress
manager.add_progress_entry(
ProgressEntry(
id=str(uuid.uuid4()),
session_id=input_data.session_id,
entry_type=ProgressEntryType.FEATURE_FAILED,
title=f"Feature failed: {input_data.feature_id}",
description=input_data.failure_reason,
feature_id=input_data.feature_id,
git_commit_hash=commit_hash if commit_hash else None,
)
)
yield "success", True
yield "commit_hash", commit_hash
except Exception as e:
logger.exception(f"Failed to mark feature as failed: {e}")
yield "error", str(e)
yield "success", False
class SessionEndBlock(Block):
"""
End a coding session properly with a summary.
This block should be called at the end of every coding session to:
- Write a session summary to the progress log
- Commit any uncommitted changes
- Update the session state
"""
class Input(Block.Input):
working_directory: str = SchemaField(
description="Directory where the project is located"
)
session_id: str = SchemaField(
description="Current session ID"
)
summary: str = SchemaField(
description="Summary of what was accomplished in this session"
)
commit_remaining: bool = SchemaField(
default=True,
description="Whether to commit any uncommitted changes",
)
class Output(Block.Output):
success: bool = SchemaField(
description="Whether the session was ended successfully"
)
final_commit_hash: str = SchemaField(
description="Hash of any final commit made",
default="",
)
project_status: dict = SchemaField(
description="Final project status after this session"
)
error: str = SchemaField(
description="Error message if operation failed",
default="",
)
def __init__(self):
super().__init__(
id="e5f6a7b8-c9d0-1234-ef01-23456789abcd",
description="End a coding session with a proper summary and commit any remaining changes",
input_schema=SessionEndBlock.Input,
output_schema=SessionEndBlock.Output,
categories={BlockCategory.AGENT},
block_type=BlockType.STANDARD,
)
async def run(
self,
input_data: Input,
**kwargs,
) -> BlockOutput:
try:
manager = SessionManager(input_data.working_directory)
# Commit any remaining changes
commit_hash = ""
if input_data.commit_remaining:
commit_hash = manager.git_commit(
f"Session {input_data.session_id}: {input_data.summary[:50]}"
) or ""
# End session log
manager.end_session_log(input_data.session_id, input_data.summary)
# Update session status
manager.update_session_status(SessionStatus.PAUSED)
# Get final status
project_status = manager.get_project_status()
yield "success", True
yield "final_commit_hash", commit_hash
yield "project_status", project_status
except Exception as e:
logger.exception(f"Failed to end session: {e}")
yield "error", str(e)
yield "success", False

View File

@@ -0,0 +1,324 @@
"""
Long-Running Initializer Block.
This block sets up the initial environment for a long-running agent project.
It is run ONCE at the start of a new project and creates:
- Feature list file with all features marked as pending
- Progress log file
- Init script for environment setup
- Initial git commit
Based on Anthropic's "Effective Harnesses for Long-Running Agents" research.
"""
import json
import logging
import uuid
from typing import Optional
from pydantic import SecretStr
from backend.blocks.llm import AIStructuredResponseGeneratorBlock, LlmModel
from backend.data.block import Block, BlockCategory, BlockOutput, BlockType
from backend.data.model import (
APIKeyCredentials,
CredentialsField,
CredentialsMetaInput,
SchemaField,
)
from backend.integrations.providers import ProviderName
from .models import (
FeatureCategory,
FeatureListItem,
FeatureStatus,
InitializerConfig,
ProgressEntry,
ProgressEntryType,
SessionStatus,
)
from .session_manager import SessionManager
logger = logging.getLogger(__name__)
class LongRunningInitializerBlock(Block):
"""
Initialize a long-running agent project.
This block sets up the environment for a project that will span
multiple agent sessions. It creates:
- A feature list based on the project specification
- A progress log for tracking work across sessions
- An init.sh script for environment setup
- An initial git repository with the setup
Use this block ONLY for the first session of a new project.
"""
class Input(CredentialsMetaInput):
project_name: str = SchemaField(
description="Name of the project to create"
)
project_description: str = SchemaField(
description="Detailed description of what the project should accomplish. "
"Include all features, requirements, and acceptance criteria."
)
working_directory: str = SchemaField(
description="Directory where the project will be created"
)
generate_features_with_ai: bool = SchemaField(
default=True,
description="Use AI to generate a comprehensive feature list from the description",
)
custom_features: list[dict] = SchemaField(
default=[],
description="Custom features to add (each dict should have 'description' and optionally 'category', 'steps', 'priority')",
)
init_commands: list[str] = SchemaField(
default=[],
description="Shell commands to include in the init.sh script",
)
initialize_git: bool = SchemaField(
default=True,
description="Whether to initialize a git repository",
)
credentials: CredentialsMetaInput[
ProviderName.OPENAI, ProviderName.ANTHROPIC, ProviderName.GROQ
] = CredentialsField(
description="LLM API credentials for feature generation (if using AI)",
required=False,
)
class Output(Block.Output):
session_id: str = SchemaField(
description="Unique identifier for this long-running session"
)
feature_count: int = SchemaField(
description="Number of features generated"
)
feature_list_path: str = SchemaField(
description="Path to the feature list file"
)
progress_log_path: str = SchemaField(
description="Path to the progress log file"
)
init_script_path: str = SchemaField(
description="Path to the init script"
)
status: str = SchemaField(
description="Status of the initialization"
)
error: str = SchemaField(
description="Error message if initialization failed",
default="",
)
def __init__(self):
super().__init__(
id="a1b2c3d4-e5f6-7890-abcd-ef1234567890",
description="Initialize a long-running agent project with feature list, progress tracking, and environment setup",
input_schema=LongRunningInitializerBlock.Input,
output_schema=LongRunningInitializerBlock.Output,
categories={BlockCategory.AGENT},
block_type=BlockType.AGENT,
)
async def run(
self,
input_data: Input,
*,
credentials: Optional[APIKeyCredentials] = None,
**kwargs,
) -> BlockOutput:
session_id = str(uuid.uuid4())[:8]
try:
# Initialize session manager
manager = SessionManager(input_data.working_directory)
# Check if project already exists
existing_state = manager.load_session_state()
if existing_state and existing_state.status != SessionStatus.INITIALIZING:
yield "error", f"Project already initialized at {input_data.working_directory}"
yield "status", "failed"
return
# Create session state
state = manager.create_session_state(
project_name=input_data.project_name,
project_description=input_data.project_description,
)
# Start progress log
manager.start_session_log(session_id)
# Generate features
features = []
if input_data.generate_features_with_ai and credentials:
ai_features = await self._generate_features_with_ai(
input_data.project_name,
input_data.project_description,
credentials,
)
features.extend(ai_features)
# Add custom features
for custom in input_data.custom_features:
if "description" in custom:
features.append(custom)
# Create feature list
if features:
manager.create_feature_list(
project_name=input_data.project_name,
project_description=input_data.project_description,
features=features,
)
# Create init script
init_commands = input_data.init_commands or [
"echo 'Project initialized successfully'",
]
manager.create_init_script(
commands=init_commands,
description=f"Setup script for {input_data.project_name}",
)
# Initialize git if requested
if input_data.initialize_git:
manager.initialize_git()
# Update session state to ready
manager.update_session_status(SessionStatus.READY, session_id)
# Log completion
manager.add_progress_entry(
ProgressEntry(
id=str(uuid.uuid4()),
session_id=session_id,
entry_type=ProgressEntryType.ENVIRONMENT_SETUP,
title="Project initialization complete",
description=f"Created {len(features)} features, init script, and git repository",
)
)
yield "session_id", session_id
yield "feature_count", len(features)
yield "feature_list_path", str(manager.feature_list_path)
yield "progress_log_path", str(manager.progress_log_path)
yield "init_script_path", str(manager.init_script_path)
yield "status", "success"
except Exception as e:
logger.exception(f"Failed to initialize long-running project: {e}")
yield "error", str(e)
yield "status", "failed"
async def _generate_features_with_ai(
self,
project_name: str,
project_description: str,
credentials: APIKeyCredentials,
) -> list[dict]:
"""Generate a comprehensive feature list using AI."""
system_prompt = """You are an expert software architect generating a comprehensive feature list for a project.
Your task is to analyze the project description and create a detailed list of features that need to be implemented.
For each feature:
1. Write a clear, testable description of the feature
2. Assign a category: functional, ui, integration, performance, security, documentation, testing, infrastructure
3. Provide verification steps (how to test the feature works)
4. Assign a priority (1=highest, 10=lowest)
Important guidelines:
- Break down complex features into smaller, independently testable features
- Each feature should be completable in a single session
- Include edge cases and error handling as separate features
- Be comprehensive - include features that might be implicit in the description
- Think about the user journey end-to-end"""
user_prompt = f"""Project Name: {project_name}
Project Description:
{project_description}
Generate a comprehensive list of features for this project. Output as JSON array with the following structure:
[
{{
"id": "feature_001",
"category": "functional",
"description": "Clear description of what the feature does",
"steps": ["Step 1 to verify", "Step 2 to verify"],
"priority": 1
}}
]
Generate at least 20 features covering all aspects of the project."""
try:
# Determine model based on provider
if credentials.provider == "anthropic":
model = LlmModel.CLAUDE_3_5_SONNET
elif credentials.provider == "openai":
model = LlmModel.GPT4O
else:
model = LlmModel.LLAMA3_70B
structured_block = AIStructuredResponseGeneratorBlock()
structured_input = AIStructuredResponseGeneratorBlock.Input(
prompt=user_prompt,
sys_prompt=system_prompt,
expected_format={
"features": [
{
"id": "string",
"category": "string",
"description": "string",
"steps": ["string"],
"priority": 1,
}
]
},
model=model,
credentials={
"provider": credentials.provider,
"id": credentials.id,
"type": credentials.type,
"title": credentials.title,
},
max_tokens=4000,
retry=3,
)
async for output_name, output_data in structured_block.run(
structured_input, credentials=credentials
):
if output_name == "response":
return output_data.get("features", [])
return []
except Exception as e:
logger.error(f"Failed to generate features with AI: {e}")
# Return some default features if AI generation fails
return [
{
"id": "feature_001",
"category": "functional",
"description": "Basic project setup and structure",
"steps": ["Verify project files exist", "Verify dependencies installed"],
"priority": 1,
},
{
"id": "feature_002",
"category": "functional",
"description": "Core functionality as described in project spec",
"steps": ["Verify main features work end-to-end"],
"priority": 2,
},
]

View File

@@ -0,0 +1,325 @@
"""
Data models for long-running agent framework.
Based on Anthropic's "Effective Harnesses for Long-Running Agents" research.
https://www.anthropic.com/engineering/effective-harnesses-for-long-running-agents
The core concepts:
1. Feature List - A comprehensive list of features to be implemented, each with status
2. Progress Log - A log of what each agent session has accomplished
3. Session State - The overall state of a long-running agent project
"""
from datetime import datetime
from enum import Enum
from typing import Any, Optional
from pydantic import BaseModel, Field
class FeatureCategory(str, Enum):
"""Category types for features."""
FUNCTIONAL = "functional"
UI = "ui"
INTEGRATION = "integration"
PERFORMANCE = "performance"
SECURITY = "security"
DOCUMENTATION = "documentation"
TESTING = "testing"
INFRASTRUCTURE = "infrastructure"
class FeatureStatus(str, Enum):
"""Status values for features."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
PASSING = "passing"
FAILING = "failing"
BLOCKED = "blocked"
SKIPPED = "skipped"
class FeatureListItem(BaseModel):
"""
A single feature in the feature list.
Features are end-to-end descriptions of functionality that should be testable.
Each feature has verification steps that define how to test it.
"""
id: str = Field(description="Unique identifier for the feature")
category: FeatureCategory = Field(
default=FeatureCategory.FUNCTIONAL, description="Category of the feature"
)
description: str = Field(description="Human-readable description of the feature")
steps: list[str] = Field(
default_factory=list, description="Verification steps to test the feature"
)
status: FeatureStatus = Field(
default=FeatureStatus.PENDING, description="Current status of the feature"
)
priority: int = Field(
default=5, ge=1, le=10, description="Priority level (1=highest, 10=lowest)"
)
dependencies: list[str] = Field(
default_factory=list, description="IDs of features this depends on"
)
notes: str = Field(
default="", description="Additional notes about the feature or blockers"
)
last_updated: datetime = Field(
default_factory=datetime.utcnow, description="When the feature was last updated"
)
updated_by_session: Optional[str] = Field(
default=None, description="Session ID that last updated this feature"
)
class FeatureList(BaseModel):
"""
The complete feature list for a long-running agent project.
This file is written by the initializer agent and read by coding agents.
Coding agents should ONLY modify the status field and notes - never remove features.
"""
project_name: str = Field(description="Name of the project")
project_description: str = Field(description="Description of the project goal")
created_at: datetime = Field(
default_factory=datetime.utcnow, description="When the feature list was created"
)
features: list[FeatureListItem] = Field(
default_factory=list, description="List of all features"
)
def get_next_feature(self) -> Optional[FeatureListItem]:
"""Get the next feature to work on (highest priority pending/failing)."""
candidates = [
f
for f in self.features
if f.status in (FeatureStatus.PENDING, FeatureStatus.FAILING)
]
if not candidates:
return None
# Sort by priority (ascending - 1 is highest) and status (failing first)
candidates.sort(
key=lambda f: (0 if f.status == FeatureStatus.FAILING else 1, f.priority)
)
return candidates[0]
def get_feature_by_id(self, feature_id: str) -> Optional[FeatureListItem]:
"""Get a feature by its ID."""
for f in self.features:
if f.id == feature_id:
return f
return None
def get_progress_summary(self) -> dict[str, int]:
"""Get a summary of feature statuses."""
summary: dict[str, int] = {status.value: 0 for status in FeatureStatus}
for f in self.features:
summary[f.status.value] += 1
return summary
def is_complete(self) -> bool:
"""Check if all features are passing."""
return all(f.status == FeatureStatus.PASSING for f in self.features)
class ProgressEntryType(str, Enum):
"""Type of progress entry."""
SESSION_START = "session_start"
SESSION_END = "session_end"
FEATURE_START = "feature_start"
FEATURE_COMPLETE = "feature_complete"
FEATURE_FAILED = "feature_failed"
CODE_CHANGE = "code_change"
TEST_RUN = "test_run"
BUG_FIX = "bug_fix"
ENVIRONMENT_SETUP = "environment_setup"
GIT_COMMIT = "git_commit"
NOTE = "note"
ERROR = "error"
class ProgressEntry(BaseModel):
"""
A single entry in the progress log.
Each agent session should add entries as it works, and write a summary at the end.
This helps the next agent understand what was done and why.
"""
id: str = Field(description="Unique identifier for the entry")
session_id: str = Field(description="The session that created this entry")
entry_type: ProgressEntryType = Field(description="Type of progress entry")
timestamp: datetime = Field(
default_factory=datetime.utcnow, description="When this entry was created"
)
title: str = Field(description="Short title for the entry")
description: str = Field(default="", description="Detailed description")
feature_id: Optional[str] = Field(
default=None, description="Related feature ID, if applicable"
)
git_commit_hash: Optional[str] = Field(
default=None, description="Related git commit, if applicable"
)
files_changed: list[str] = Field(
default_factory=list, description="List of files that were changed"
)
metadata: dict[str, Any] = Field(
default_factory=dict, description="Additional metadata"
)
class ProgressLog(BaseModel):
"""
The complete progress log for a long-running agent project.
This file is continuously updated by agent sessions to track progress.
"""
project_name: str = Field(description="Name of the project")
entries: list[ProgressEntry] = Field(
default_factory=list, description="All progress entries"
)
def get_recent_entries(self, limit: int = 20) -> list[ProgressEntry]:
"""Get the most recent entries."""
return sorted(self.entries, key=lambda e: e.timestamp, reverse=True)[:limit]
def get_session_entries(self, session_id: str) -> list[ProgressEntry]:
"""Get all entries for a specific session."""
return [e for e in self.entries if e.session_id == session_id]
def get_feature_entries(self, feature_id: str) -> list[ProgressEntry]:
"""Get all entries related to a specific feature."""
return [e for e in self.entries if e.feature_id == feature_id]
class SessionStatus(str, Enum):
"""Status of a long-running agent session."""
INITIALIZING = "initializing"
READY = "ready"
WORKING = "working"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
class LongRunningSessionState(BaseModel):
"""
The overall state of a long-running agent project.
This is the main state object that tracks the project across sessions.
"""
id: str = Field(description="Unique identifier for the session state")
project_name: str = Field(description="Name of the project")
project_description: str = Field(description="Description of the project goal")
status: SessionStatus = Field(
default=SessionStatus.INITIALIZING, description="Current status"
)
created_at: datetime = Field(
default_factory=datetime.utcnow, description="When the project was created"
)
last_updated: datetime = Field(
default_factory=datetime.utcnow, description="When the state was last updated"
)
current_session_id: Optional[str] = Field(
default=None, description="ID of the currently active session"
)
session_count: int = Field(default=0, description="Number of sessions run")
working_directory: str = Field(description="The working directory for the project")
init_script_path: Optional[str] = Field(
default=None, description="Path to the init.sh script"
)
feature_list_path: Optional[str] = Field(
default=None, description="Path to the feature list JSON file"
)
progress_log_path: Optional[str] = Field(
default=None, description="Path to the progress log file"
)
git_repo_initialized: bool = Field(
default=False, description="Whether git repo is initialized"
)
current_feature_id: Optional[str] = Field(
default=None, description="The feature currently being worked on"
)
environment_variables: dict[str, str] = Field(
default_factory=dict, description="Environment variables for the project"
)
metadata: dict[str, Any] = Field(
default_factory=dict, description="Additional project metadata"
)
class InitializerConfig(BaseModel):
"""
Configuration for the initializer agent.
The initializer sets up the project environment on the first run.
"""
project_name: str = Field(description="Name of the project to create")
project_description: str = Field(
description="Description of what the project should accomplish"
)
working_directory: str = Field(
description="Directory where the project will be created"
)
generate_feature_list: bool = Field(
default=True, description="Whether to generate a feature list from the spec"
)
initialize_git: bool = Field(
default=True, description="Whether to initialize a git repository"
)
create_init_script: bool = Field(
default=True, description="Whether to create an init.sh script"
)
custom_init_commands: list[str] = Field(
default_factory=list, description="Additional commands to run during init"
)
feature_categories: list[FeatureCategory] = Field(
default_factory=lambda: [
FeatureCategory.FUNCTIONAL,
FeatureCategory.UI,
FeatureCategory.INTEGRATION,
],
description="Categories of features to generate",
)
class CodingAgentConfig(BaseModel):
"""
Configuration for the coding agent.
The coding agent works on implementing features incrementally.
"""
session_state_path: str = Field(
description="Path to the session state file to load"
)
max_features_per_session: int = Field(
default=1,
ge=1,
le=5,
description="Maximum features to work on in a single session",
)
auto_commit: bool = Field(
default=True, description="Whether to auto-commit changes after each feature"
)
run_tests_before_marking_complete: bool = Field(
default=True, description="Whether to run verification tests before completion"
)
use_browser_testing: bool = Field(
default=False, description="Whether to use browser automation for E2E testing"
)
verify_basic_functionality_first: bool = Field(
default=True,
description="Whether to verify basic functionality before starting new work",
)

View File

@@ -0,0 +1,460 @@
"""
Progress Tracking Blocks for Long-Running Agents.
These blocks provide utilities for tracking progress during coding sessions,
including logging notes, code changes, test results, and git commits.
Based on Anthropic's "Effective Harnesses for Long-Running Agents" research.
"""
import logging
import uuid
from typing import Any, Optional
from backend.data.block import Block, BlockCategory, BlockOutput, BlockType
from backend.data.model import SchemaField
from .models import (
FeatureList,
ProgressEntry,
ProgressEntryType,
)
from .session_manager import SessionManager
logger = logging.getLogger(__name__)
class LogProgressBlock(Block):
"""
Log a progress entry during a coding session.
Use this block to record important events, decisions, or milestones
during the implementation of features.
"""
class Input(Block.Input):
working_directory: str = SchemaField(
description="Directory where the project is located"
)
session_id: str = SchemaField(
description="Current session ID"
)
entry_type: str = SchemaField(
description="Type of progress entry (code_change, test_run, bug_fix, note, error)",
default="note",
)
title: str = SchemaField(
description="Short title for the entry"
)
description: str = SchemaField(
default="",
description="Detailed description of the progress",
)
feature_id: Optional[str] = SchemaField(
default=None,
description="Related feature ID (if applicable)",
)
files_changed: list[str] = SchemaField(
default=[],
description="List of files that were changed",
)
class Output(Block.Output):
success: bool = SchemaField(
description="Whether the entry was logged successfully"
)
entry_id: str = SchemaField(
description="ID of the created entry"
)
error: str = SchemaField(
description="Error message if logging failed",
default="",
)
def __init__(self):
super().__init__(
id="f6a7b8c9-d0e1-2345-f012-3456789abcde",
description="Log a progress entry during a coding session",
input_schema=LogProgressBlock.Input,
output_schema=LogProgressBlock.Output,
categories={BlockCategory.AGENT},
block_type=BlockType.STANDARD,
)
async def run(
self,
input_data: Input,
**kwargs,
) -> BlockOutput:
try:
manager = SessionManager(input_data.working_directory)
# Map string to enum
entry_type_map = {
"code_change": ProgressEntryType.CODE_CHANGE,
"test_run": ProgressEntryType.TEST_RUN,
"bug_fix": ProgressEntryType.BUG_FIX,
"note": ProgressEntryType.NOTE,
"error": ProgressEntryType.ERROR,
"git_commit": ProgressEntryType.GIT_COMMIT,
}
entry_type = entry_type_map.get(
input_data.entry_type.lower(), ProgressEntryType.NOTE
)
entry_id = str(uuid.uuid4())
entry = ProgressEntry(
id=entry_id,
session_id=input_data.session_id,
entry_type=entry_type,
title=input_data.title,
description=input_data.description,
feature_id=input_data.feature_id,
files_changed=input_data.files_changed,
)
success = manager.add_progress_entry(entry)
yield "success", success
yield "entry_id", entry_id
except Exception as e:
logger.exception(f"Failed to log progress: {e}")
yield "error", str(e)
yield "success", False
class GitCommitBlock(Block):
"""
Create a git commit with proper logging.
This block creates a git commit and logs it to the progress file.
Use this after making code changes to preserve the project state.
"""
class Input(Block.Input):
working_directory: str = SchemaField(
description="Directory where the project is located"
)
session_id: str = SchemaField(
description="Current session ID"
)
message: str = SchemaField(
description="Git commit message"
)
files: list[str] = SchemaField(
default=[],
description="Specific files to commit (empty for all changes)",
)
feature_id: Optional[str] = SchemaField(
default=None,
description="Related feature ID (if applicable)",
)
class Output(Block.Output):
success: bool = SchemaField(
description="Whether the commit was successful"
)
commit_hash: str = SchemaField(
description="Hash of the created commit",
default="",
)
error: str = SchemaField(
description="Error message if commit failed",
default="",
)
def __init__(self):
super().__init__(
id="a7b8c9d0-e1f2-3456-0123-456789abcdef",
description="Create a git commit with proper progress logging",
input_schema=GitCommitBlock.Input,
output_schema=GitCommitBlock.Output,
categories={BlockCategory.AGENT},
block_type=BlockType.STANDARD,
)
async def run(
self,
input_data: Input,
**kwargs,
) -> BlockOutput:
try:
manager = SessionManager(input_data.working_directory)
commit_hash = manager.git_commit(
input_data.message,
input_data.files if input_data.files else None,
)
if commit_hash:
# Log the commit
manager.add_progress_entry(
ProgressEntry(
id=str(uuid.uuid4()),
session_id=input_data.session_id,
entry_type=ProgressEntryType.GIT_COMMIT,
title=f"Git commit: {input_data.message[:50]}",
description=input_data.message,
feature_id=input_data.feature_id,
git_commit_hash=commit_hash,
files_changed=input_data.files,
)
)
yield "success", True
yield "commit_hash", commit_hash
else:
yield "success", False
yield "error", "No changes to commit or commit failed"
except Exception as e:
logger.exception(f"Failed to create git commit: {e}")
yield "error", str(e)
yield "success", False
class GetProjectStatusBlock(Block):
"""
Get the current status of a long-running project.
This block provides a comprehensive view of the project state,
including feature progress, recent commits, and session history.
"""
class Input(Block.Input):
working_directory: str = SchemaField(
description="Directory where the project is located"
)
class Output(Block.Output):
exists: bool = SchemaField(
description="Whether a project exists at this location"
)
project_name: str = SchemaField(
description="Name of the project",
default="",
)
project_description: str = SchemaField(
description="Description of the project",
default="",
)
status: str = SchemaField(
description="Current session status",
default="",
)
feature_summary: dict = SchemaField(
description="Summary of feature statuses"
)
is_complete: bool = SchemaField(
description="Whether all features are passing"
)
session_count: int = SchemaField(
description="Number of sessions run"
)
recent_progress: list[dict] = SchemaField(
description="Recent progress entries"
)
recent_commits: list[dict] = SchemaField(
description="Recent git commits"
)
error: str = SchemaField(
description="Error message if status check failed",
default="",
)
def __init__(self):
super().__init__(
id="b8c9d0e1-f2a3-4567-1234-56789abcdef0",
description="Get the current status of a long-running project",
input_schema=GetProjectStatusBlock.Input,
output_schema=GetProjectStatusBlock.Output,
categories={BlockCategory.AGENT},
block_type=BlockType.STANDARD,
)
async def run(
self,
input_data: Input,
**kwargs,
) -> BlockOutput:
try:
manager = SessionManager(input_data.working_directory)
status = manager.get_project_status()
session_state = status.get("session_state")
yield "exists", status.get("exists", False)
yield "project_name", session_state.get("project_name", "") if session_state else ""
yield "project_description", session_state.get("project_description", "") if session_state else ""
yield "status", session_state.get("status", "") if session_state else ""
yield "feature_summary", status.get("feature_summary", {})
yield "is_complete", status.get("is_complete", False)
yield "session_count", session_state.get("session_count", 0) if session_state else 0
yield "recent_progress", status.get("recent_progress", [])
yield "recent_commits", status.get("recent_commits", [])
except Exception as e:
logger.exception(f"Failed to get project status: {e}")
yield "error", str(e)
yield "exists", False
class GetFeatureListBlock(Block):
"""
Get the complete feature list for a project.
This block returns all features with their current statuses,
useful for planning which features to work on.
"""
class Input(Block.Input):
working_directory: str = SchemaField(
description="Directory where the project is located"
)
filter_status: Optional[str] = SchemaField(
default=None,
description="Filter by status (pending, in_progress, passing, failing, blocked, skipped)",
)
class Output(Block.Output):
features: list[dict] = SchemaField(
description="List of features"
)
total_count: int = SchemaField(
description="Total number of features"
)
summary: dict = SchemaField(
description="Summary of feature statuses"
)
error: str = SchemaField(
description="Error message if retrieval failed",
default="",
)
def __init__(self):
super().__init__(
id="c9d0e1f2-a3b4-5678-2345-6789abcdef01",
description="Get the complete feature list for a project",
input_schema=GetFeatureListBlock.Input,
output_schema=GetFeatureListBlock.Output,
categories={BlockCategory.AGENT},
block_type=BlockType.STANDARD,
)
async def run(
self,
input_data: Input,
**kwargs,
) -> BlockOutput:
try:
manager = SessionManager(input_data.working_directory)
feature_list = manager.load_feature_list()
if not feature_list:
yield "error", "No feature list found"
yield "features", []
yield "total_count", 0
yield "summary", {}
return
features = feature_list.features
if input_data.filter_status:
features = [
f for f in features if f.status.value == input_data.filter_status
]
feature_dicts = [
{
"id": f.id,
"category": f.category.value,
"description": f.description,
"steps": f.steps,
"status": f.status.value,
"priority": f.priority,
"dependencies": f.dependencies,
"notes": f.notes,
"last_updated": f.last_updated.isoformat(),
}
for f in features
]
yield "features", feature_dicts
yield "total_count", len(feature_list.features)
yield "summary", feature_list.get_progress_summary()
except Exception as e:
logger.exception(f"Failed to get feature list: {e}")
yield "error", str(e)
yield "features", []
class RevertToCommitBlock(Block):
"""
Revert the project to a specific git commit.
Use this block to recover from bad changes by reverting
to a known good state.
"""
class Input(Block.Input):
working_directory: str = SchemaField(
description="Directory where the project is located"
)
session_id: str = SchemaField(
description="Current session ID"
)
commit_hash: str = SchemaField(
description="Git commit hash to revert to"
)
reason: str = SchemaField(
description="Reason for reverting"
)
class Output(Block.Output):
success: bool = SchemaField(
description="Whether the revert was successful"
)
error: str = SchemaField(
description="Error message if revert failed",
default="",
)
def __init__(self):
super().__init__(
id="d0e1f2a3-b4c5-6789-3456-789abcdef012",
description="Revert the project to a specific git commit",
input_schema=RevertToCommitBlock.Input,
output_schema=RevertToCommitBlock.Output,
categories={BlockCategory.AGENT},
block_type=BlockType.STANDARD,
)
async def run(
self,
input_data: Input,
**kwargs,
) -> BlockOutput:
try:
manager = SessionManager(input_data.working_directory)
success = manager.git_revert_to_commit(input_data.commit_hash)
if success:
# Log the revert
manager.add_progress_entry(
ProgressEntry(
id=str(uuid.uuid4()),
session_id=input_data.session_id,
entry_type=ProgressEntryType.NOTE,
title=f"Reverted to commit {input_data.commit_hash[:8]}",
description=input_data.reason,
git_commit_hash=input_data.commit_hash,
)
)
yield "success", success
except Exception as e:
logger.exception(f"Failed to revert: {e}")
yield "error", str(e)
yield "success", False

View File

@@ -0,0 +1,658 @@
"""
Session manager for long-running agent framework.
This module handles:
- Creating and managing session state files
- Reading and writing feature lists
- Managing progress logs
- Git operations for version control
"""
import json
import logging
import os
import subprocess
import uuid
from datetime import datetime
from pathlib import Path
from typing import Optional
from .models import (
FeatureCategory,
FeatureList,
FeatureListItem,
FeatureStatus,
LongRunningSessionState,
ProgressEntry,
ProgressEntryType,
ProgressLog,
SessionStatus,
)
logger = logging.getLogger(__name__)
# Default file names
SESSION_STATE_FILE = ".autogpt_session_state.json"
FEATURE_LIST_FILE = "feature_list.json"
PROGRESS_LOG_FILE = "autogpt_progress.txt"
INIT_SCRIPT_FILE = "init.sh"
class SessionManager:
"""
Manages long-running agent session state, features, and progress.
This class provides file-based persistence for agent sessions,
following the patterns from Anthropic's long-running agent research.
"""
def __init__(self, working_directory: str):
"""
Initialize the session manager.
Args:
working_directory: The directory where session files will be stored
"""
self.working_directory = Path(working_directory)
self.working_directory.mkdir(parents=True, exist_ok=True)
self._session_state: Optional[LongRunningSessionState] = None
self._feature_list: Optional[FeatureList] = None
self._progress_log: Optional[ProgressLog] = None
@property
def session_state_path(self) -> Path:
return self.working_directory / SESSION_STATE_FILE
@property
def feature_list_path(self) -> Path:
return self.working_directory / FEATURE_LIST_FILE
@property
def progress_log_path(self) -> Path:
return self.working_directory / PROGRESS_LOG_FILE
@property
def init_script_path(self) -> Path:
return self.working_directory / INIT_SCRIPT_FILE
# === Session State Management ===
def load_session_state(self) -> Optional[LongRunningSessionState]:
"""Load session state from file if it exists."""
if self.session_state_path.exists():
try:
with open(self.session_state_path) as f:
data = json.load(f)
self._session_state = LongRunningSessionState.model_validate(data)
return self._session_state
except Exception as e:
logger.error(f"Failed to load session state: {e}")
return None
return None
def save_session_state(self, state: LongRunningSessionState) -> bool:
"""Save session state to file."""
try:
state.last_updated = datetime.utcnow()
with open(self.session_state_path, "w") as f:
json.dump(state.model_dump(mode="json"), f, indent=2, default=str)
self._session_state = state
return True
except Exception as e:
logger.error(f"Failed to save session state: {e}")
return False
def create_session_state(
self,
project_name: str,
project_description: str,
) -> LongRunningSessionState:
"""Create a new session state for a project."""
state = LongRunningSessionState(
id=str(uuid.uuid4()),
project_name=project_name,
project_description=project_description,
status=SessionStatus.INITIALIZING,
working_directory=str(self.working_directory),
feature_list_path=str(self.feature_list_path),
progress_log_path=str(self.progress_log_path),
init_script_path=str(self.init_script_path),
)
self.save_session_state(state)
return state
def get_or_create_session_state(
self, project_name: str, project_description: str
) -> LongRunningSessionState:
"""Get existing session state or create a new one."""
existing = self.load_session_state()
if existing:
return existing
return self.create_session_state(project_name, project_description)
def update_session_status(
self, status: SessionStatus, session_id: Optional[str] = None
) -> bool:
"""Update the session status."""
state = self.load_session_state()
if not state:
return False
state.status = status
if session_id:
state.current_session_id = session_id
if status == SessionStatus.WORKING:
state.session_count += 1
return self.save_session_state(state)
# === Feature List Management ===
def load_feature_list(self) -> Optional[FeatureList]:
"""Load feature list from file if it exists."""
if self.feature_list_path.exists():
try:
with open(self.feature_list_path) as f:
data = json.load(f)
self._feature_list = FeatureList.model_validate(data)
return self._feature_list
except Exception as e:
logger.error(f"Failed to load feature list: {e}")
return None
return None
def save_feature_list(self, feature_list: FeatureList) -> bool:
"""Save feature list to file."""
try:
with open(self.feature_list_path, "w") as f:
json.dump(feature_list.model_dump(mode="json"), f, indent=2, default=str)
self._feature_list = feature_list
return True
except Exception as e:
logger.error(f"Failed to save feature list: {e}")
return False
def create_feature_list(
self,
project_name: str,
project_description: str,
features: list[dict],
) -> FeatureList:
"""
Create a new feature list for a project.
Args:
project_name: Name of the project
project_description: Description of the project
features: List of feature dictionaries with at minimum 'description'
"""
feature_items = []
for i, f in enumerate(features):
item = FeatureListItem(
id=f.get("id", f"feature_{i+1:03d}"),
category=FeatureCategory(
f.get("category", FeatureCategory.FUNCTIONAL.value)
),
description=f["description"],
steps=f.get("steps", []),
status=FeatureStatus.PENDING,
priority=f.get("priority", 5),
dependencies=f.get("dependencies", []),
)
feature_items.append(item)
feature_list = FeatureList(
project_name=project_name,
project_description=project_description,
features=feature_items,
)
self.save_feature_list(feature_list)
return feature_list
def update_feature_status(
self,
feature_id: str,
status: FeatureStatus,
session_id: str,
notes: Optional[str] = None,
) -> bool:
"""
Update the status of a feature.
IMPORTANT: This is the ONLY way features should be modified.
Never remove or edit feature descriptions.
"""
feature_list = self.load_feature_list()
if not feature_list:
return False
feature = feature_list.get_feature_by_id(feature_id)
if not feature:
logger.error(f"Feature {feature_id} not found")
return False
feature.status = status
feature.last_updated = datetime.utcnow()
feature.updated_by_session = session_id
if notes:
feature.notes = notes
return self.save_feature_list(feature_list)
def get_next_feature_to_work_on(self) -> Optional[FeatureListItem]:
"""Get the next feature to work on based on priority and status."""
feature_list = self.load_feature_list()
if not feature_list:
return None
return feature_list.get_next_feature()
# === Progress Log Management ===
def load_progress_log(self) -> ProgressLog:
"""Load progress log from file or create a new one."""
if self.progress_log_path.exists():
try:
# Parse the text-based progress log
entries = self._parse_progress_log_text()
state = self.load_session_state()
project_name = state.project_name if state else "Unknown Project"
self._progress_log = ProgressLog(
project_name=project_name, entries=entries
)
return self._progress_log
except Exception as e:
logger.error(f"Failed to load progress log: {e}")
# Return empty log if file doesn't exist or parsing failed
state = self.load_session_state()
project_name = state.project_name if state else "Unknown Project"
return ProgressLog(project_name=project_name, entries=[])
def _parse_progress_log_text(self) -> list[ProgressEntry]:
"""Parse a text-based progress log into structured entries."""
entries: list[ProgressEntry] = []
if not self.progress_log_path.exists():
return entries
content = self.progress_log_path.read_text()
current_entry: dict = {}
current_session: str = "unknown"
for line in content.split("\n"):
line = line.strip()
if not line:
continue
# Parse session headers
if line.startswith("=== Session:"):
current_session = line.replace("=== Session:", "").strip().rstrip("=")
continue
# Parse timestamp and entry
if line.startswith("[") and "]" in line:
# Save previous entry if exists
if current_entry:
entries.append(ProgressEntry(**current_entry))
# Parse new entry
try:
timestamp_str, rest = line.split("]", 1)
timestamp_str = timestamp_str.strip("[")
timestamp = datetime.fromisoformat(timestamp_str)
except (ValueError, IndexError):
timestamp = datetime.utcnow()
rest = line
current_entry = {
"id": str(uuid.uuid4()),
"session_id": current_session,
"entry_type": ProgressEntryType.NOTE,
"timestamp": timestamp,
"title": rest.strip()[:100],
"description": rest.strip(),
}
elif current_entry:
# Append to current entry's description
current_entry["description"] += "\n" + line
# Save last entry
if current_entry:
entries.append(ProgressEntry(**current_entry))
return entries
def add_progress_entry(self, entry: ProgressEntry) -> bool:
"""Add a new entry to the progress log."""
try:
# Append to text file in human-readable format
with open(self.progress_log_path, "a") as f:
timestamp = entry.timestamp.isoformat()
f.write(f"\n[{timestamp}] {entry.entry_type.value.upper()}: {entry.title}\n")
if entry.description and entry.description != entry.title:
f.write(f" {entry.description}\n")
if entry.feature_id:
f.write(f" Feature: {entry.feature_id}\n")
if entry.git_commit_hash:
f.write(f" Commit: {entry.git_commit_hash}\n")
if entry.files_changed:
f.write(f" Files: {', '.join(entry.files_changed)}\n")
return True
except Exception as e:
logger.error(f"Failed to add progress entry: {e}")
return False
def start_session_log(self, session_id: str) -> bool:
"""Add a session start entry to the progress log."""
try:
with open(self.progress_log_path, "a") as f:
f.write(f"\n{'='*60}\n")
f.write(f"=== Session: {session_id} ===\n")
f.write(f"=== Started: {datetime.utcnow().isoformat()} ===\n")
f.write(f"{'='*60}\n")
entry = ProgressEntry(
id=str(uuid.uuid4()),
session_id=session_id,
entry_type=ProgressEntryType.SESSION_START,
title=f"Session {session_id} started",
)
return self.add_progress_entry(entry)
except Exception as e:
logger.error(f"Failed to start session log: {e}")
return False
def end_session_log(self, session_id: str, summary: str) -> bool:
"""Add a session end entry to the progress log."""
try:
entry = ProgressEntry(
id=str(uuid.uuid4()),
session_id=session_id,
entry_type=ProgressEntryType.SESSION_END,
title=f"Session {session_id} ended",
description=summary,
)
self.add_progress_entry(entry)
with open(self.progress_log_path, "a") as f:
f.write(f"\n{'='*60}\n")
f.write(f"=== Session {session_id} Summary ===\n")
f.write(f"{summary}\n")
f.write(f"{'='*60}\n\n")
return True
except Exception as e:
logger.error(f"Failed to end session log: {e}")
return False
# === Git Operations ===
def initialize_git(self) -> bool:
"""Initialize a git repository in the working directory."""
try:
# Check if already initialized
git_dir = self.working_directory / ".git"
if git_dir.exists():
logger.info("Git repository already initialized")
return True
# Initialize git
result = subprocess.run(
["git", "init"],
cwd=self.working_directory,
capture_output=True,
text=True,
)
if result.returncode != 0:
logger.error(f"Git init failed: {result.stderr}")
return False
# Create .gitignore
gitignore_path = self.working_directory / ".gitignore"
gitignore_content = """# AutoGPT Long-Running Agent Files
.autogpt_session_state.json
*.log
*.pyc
__pycache__/
.env
.venv/
node_modules/
.DS_Store
"""
gitignore_path.write_text(gitignore_content)
# Initial commit
subprocess.run(
["git", "add", "."],
cwd=self.working_directory,
capture_output=True,
)
subprocess.run(
["git", "commit", "-m", "Initial commit - AutoGPT long-running agent setup"],
cwd=self.working_directory,
capture_output=True,
)
# Update session state
state = self.load_session_state()
if state:
state.git_repo_initialized = True
self.save_session_state(state)
return True
except Exception as e:
logger.error(f"Failed to initialize git: {e}")
return False
def git_commit(self, message: str, files: Optional[list[str]] = None) -> Optional[str]:
"""
Create a git commit with the given message.
Args:
message: Commit message
files: Optional list of specific files to commit (defaults to all changes)
Returns:
The commit hash if successful, None otherwise
"""
try:
# Add files
if files:
for f in files:
subprocess.run(
["git", "add", f],
cwd=self.working_directory,
capture_output=True,
)
else:
subprocess.run(
["git", "add", "-A"],
cwd=self.working_directory,
capture_output=True,
)
# Check if there are changes to commit
status = subprocess.run(
["git", "status", "--porcelain"],
cwd=self.working_directory,
capture_output=True,
text=True,
)
if not status.stdout.strip():
logger.info("No changes to commit")
return None
# Commit
result = subprocess.run(
["git", "commit", "-m", message],
cwd=self.working_directory,
capture_output=True,
text=True,
)
if result.returncode != 0:
logger.error(f"Git commit failed: {result.stderr}")
return None
# Get commit hash
hash_result = subprocess.run(
["git", "rev-parse", "HEAD"],
cwd=self.working_directory,
capture_output=True,
text=True,
)
return hash_result.stdout.strip() if hash_result.returncode == 0 else None
except Exception as e:
logger.error(f"Failed to create git commit: {e}")
return None
def get_git_log(self, limit: int = 10) -> list[dict]:
"""Get recent git commits."""
try:
result = subprocess.run(
[
"git",
"log",
f"-{limit}",
"--pretty=format:%H|%s|%ai|%an",
],
cwd=self.working_directory,
capture_output=True,
text=True,
)
if result.returncode != 0:
return []
commits = []
for line in result.stdout.strip().split("\n"):
if not line:
continue
parts = line.split("|")
if len(parts) >= 4:
commits.append({
"hash": parts[0],
"message": parts[1],
"date": parts[2],
"author": parts[3],
})
return commits
except Exception as e:
logger.error(f"Failed to get git log: {e}")
return []
def git_revert_to_commit(self, commit_hash: str) -> bool:
"""Revert to a specific commit."""
try:
result = subprocess.run(
["git", "reset", "--hard", commit_hash],
cwd=self.working_directory,
capture_output=True,
text=True,
)
return result.returncode == 0
except Exception as e:
logger.error(f"Failed to revert to commit: {e}")
return False
# === Init Script Management ===
def create_init_script(
self,
commands: list[str],
description: str = "",
) -> bool:
"""
Create an init.sh script for environment setup.
Args:
commands: List of shell commands to include
description: Description of what the script does
"""
try:
script_content = f"""#!/bin/bash
# AutoGPT Long-Running Agent Init Script
# {description}
# Generated: {datetime.utcnow().isoformat()}
set -e # Exit on error
echo "=== AutoGPT Environment Setup ==="
# Check if we're in the right directory
if [ ! -f "{FEATURE_LIST_FILE}" ]; then
echo "Error: {FEATURE_LIST_FILE} not found. Are you in the project directory?"
exit 1
fi
"""
for cmd in commands:
script_content += f"echo \"Running: {cmd}\"\n"
script_content += f"{cmd}\n\n"
script_content += """
echo "=== Environment setup complete ==="
"""
self.init_script_path.write_text(script_content)
# Make executable
os.chmod(self.init_script_path, 0o755)
return True
except Exception as e:
logger.error(f"Failed to create init script: {e}")
return False
def run_init_script(self) -> tuple[bool, str]:
"""
Run the init.sh script.
Returns:
Tuple of (success, output)
"""
if not self.init_script_path.exists():
return False, "Init script not found"
try:
result = subprocess.run(
["bash", str(self.init_script_path)],
cwd=self.working_directory,
capture_output=True,
text=True,
timeout=300, # 5 minute timeout
)
output = result.stdout + result.stderr
return result.returncode == 0, output
except subprocess.TimeoutExpired:
return False, "Init script timed out"
except Exception as e:
return False, str(e)
# === Status Queries ===
def get_project_status(self) -> dict:
"""Get a comprehensive status of the project."""
state = self.load_session_state()
feature_list = self.load_feature_list()
progress_log = self.load_progress_log()
git_log = self.get_git_log(5)
status = {
"exists": state is not None,
"session_state": state.model_dump() if state else None,
"feature_summary": (
feature_list.get_progress_summary() if feature_list else None
),
"is_complete": feature_list.is_complete() if feature_list else False,
"recent_progress": [
e.model_dump() for e in progress_log.get_recent_entries(5)
],
"recent_commits": git_log,
}
return status
def is_initialized(self) -> bool:
"""Check if the project has been initialized."""
state = self.load_session_state()
return state is not None and state.status != SessionStatus.INITIALIZING

View File

@@ -33,6 +33,7 @@ import backend.server.v2.executions.review.routes
import backend.server.v2.library.db
import backend.server.v2.library.model
import backend.server.v2.library.routes
import backend.server.v2.long_running.routes
import backend.server.v2.otto.routes
import backend.server.v2.store.model
import backend.server.v2.store.routes
@@ -283,6 +284,11 @@ app.include_router(
app.include_router(
backend.server.v2.library.routes.router, tags=["v2"], prefix="/api/library"
)
app.include_router(
backend.server.v2.long_running.routes.router,
tags=["v2", "long-running"],
prefix="/api/long-running",
)
app.include_router(
backend.server.v2.otto.routes.router, tags=["v2", "otto"], prefix="/api/otto"
)

View File

@@ -0,0 +1 @@
# Long-running agent session management API

View File

@@ -0,0 +1,431 @@
"""
Database operations for long-running agent sessions.
"""
import logging
from typing import Any, Optional
from prisma import Prisma
from prisma.models import (
LongRunningFeature,
LongRunningProgress,
LongRunningSession,
)
from backend.util.service import get_service_client
from . import model
logger = logging.getLogger(__name__)
async def get_db() -> Prisma:
"""Get database client."""
return get_service_client().db_client
# === Session Operations ===
async def create_session(
user_id: str,
project_name: str,
project_description: str,
working_directory: str,
features: list[dict[str, Any]] = [],
) -> LongRunningSession:
"""Create a new long-running session."""
db = await get_db()
session = await db.longrunningsession.create(
data={
"userId": user_id,
"projectName": project_name,
"projectDescription": project_description,
"workingDirectory": working_directory,
"status": "INITIALIZING",
}
)
# Create features if provided
for i, f in enumerate(features):
await db.longrunningfeature.create(
data={
"sessionId": session.id,
"featureId": f.get("feature_id", f"feature_{i+1:03d}"),
"category": f.get("category", "FUNCTIONAL").upper(),
"description": f.get("description", ""),
"steps": f.get("steps", []),
"priority": f.get("priority", 5),
"dependencies": f.get("dependencies", []),
"status": "PENDING",
}
)
return session
async def get_session(session_id: str, user_id: str) -> Optional[LongRunningSession]:
"""Get a session by ID."""
db = await get_db()
return await db.longrunningsession.find_first(
where={
"id": session_id,
"userId": user_id,
}
)
async def get_session_with_details(
session_id: str, user_id: str
) -> Optional[LongRunningSession]:
"""Get a session with features and recent progress."""
db = await get_db()
return await db.longrunningsession.find_first(
where={
"id": session_id,
"userId": user_id,
},
include={
"Features": {
"order_by": [{"priority": "asc"}, {"createdAt": "asc"}]
},
"ProgressLog": {
"order_by": {"createdAt": "desc"},
"take": 50,
},
},
)
async def list_sessions(
user_id: str,
status: Optional[model.SessionStatus] = None,
page: int = 1,
page_size: int = 20,
) -> tuple[list[LongRunningSession], int]:
"""List sessions for a user."""
db = await get_db()
where_clause: dict[str, Any] = {"userId": user_id}
if status:
where_clause["status"] = status.value.upper()
total = await db.longrunningsession.count(where=where_clause)
sessions = await db.longrunningsession.find_many(
where=where_clause,
order_by={"updatedAt": "desc"},
skip=(page - 1) * page_size,
take=page_size,
)
return sessions, total
async def update_session(
session_id: str,
user_id: str,
status: Optional[model.SessionStatus] = None,
current_session_id: Optional[str] = None,
current_feature_id: Optional[str] = None,
metadata: Optional[dict[str, Any]] = None,
) -> Optional[LongRunningSession]:
"""Update a session."""
db = await get_db()
# Verify ownership
existing = await get_session(session_id, user_id)
if not existing:
return None
update_data: dict[str, Any] = {}
if status is not None:
update_data["status"] = status.value.upper()
if current_session_id is not None:
update_data["currentSessionId"] = current_session_id
if current_feature_id is not None:
update_data["currentFeatureId"] = current_feature_id
if metadata is not None:
update_data["metadata"] = metadata
if not update_data:
return existing
# Increment session count when status changes to WORKING
if status == model.SessionStatus.WORKING:
update_data["sessionCount"] = existing.sessionCount + 1
return await db.longrunningsession.update(
where={"id": session_id},
data=update_data,
)
async def delete_session(session_id: str, user_id: str) -> bool:
"""Delete a session and all related data."""
db = await get_db()
# Verify ownership
existing = await get_session(session_id, user_id)
if not existing:
return False
await db.longrunningsession.delete(where={"id": session_id})
return True
# === Feature Operations ===
async def create_feature(
session_id: str,
user_id: str,
feature_id: str,
description: str,
category: model.FeatureCategory = model.FeatureCategory.FUNCTIONAL,
steps: list[str] = [],
priority: int = 5,
dependencies: list[str] = [],
) -> Optional[LongRunningFeature]:
"""Create a new feature for a session."""
db = await get_db()
# Verify session ownership
session = await get_session(session_id, user_id)
if not session:
return None
return await db.longrunningfeature.create(
data={
"sessionId": session_id,
"featureId": feature_id,
"category": category.value.upper(),
"description": description,
"steps": steps,
"priority": priority,
"dependencies": dependencies,
"status": "PENDING",
}
)
async def get_features(
session_id: str,
user_id: str,
status: Optional[model.FeatureStatus] = None,
) -> list[LongRunningFeature]:
"""Get all features for a session."""
db = await get_db()
# Verify session ownership
session = await get_session(session_id, user_id)
if not session:
return []
where_clause: dict[str, Any] = {"sessionId": session_id}
if status:
where_clause["status"] = status.value.upper()
return await db.longrunningfeature.find_many(
where=where_clause,
order_by=[{"priority": "asc"}, {"createdAt": "asc"}],
)
async def update_feature(
session_id: str,
feature_id: str,
user_id: str,
status: Optional[model.FeatureStatus] = None,
notes: Optional[str] = None,
updated_by_session: Optional[str] = None,
) -> Optional[LongRunningFeature]:
"""Update a feature's status."""
db = await get_db()
# Verify session ownership
session = await get_session(session_id, user_id)
if not session:
return None
# Find the feature
feature = await db.longrunningfeature.find_first(
where={
"sessionId": session_id,
"featureId": feature_id,
}
)
if not feature:
return None
update_data: dict[str, Any] = {}
if status is not None:
update_data["status"] = status.value.upper()
if notes is not None:
update_data["notes"] = notes
if updated_by_session is not None:
update_data["updatedBySession"] = updated_by_session
if not update_data:
return feature
return await db.longrunningfeature.update(
where={"id": feature.id},
data=update_data,
)
async def get_next_feature(
session_id: str,
user_id: str,
) -> Optional[LongRunningFeature]:
"""Get the next feature to work on (highest priority pending/failing)."""
db = await get_db()
# Verify session ownership
session = await get_session(session_id, user_id)
if not session:
return None
# Get failing features first (highest priority)
failing = await db.longrunningfeature.find_first(
where={
"sessionId": session_id,
"status": "FAILING",
},
order_by=[{"priority": "asc"}, {"createdAt": "asc"}],
)
if failing:
return failing
# Then get pending features
return await db.longrunningfeature.find_first(
where={
"sessionId": session_id,
"status": "PENDING",
},
order_by=[{"priority": "asc"}, {"createdAt": "asc"}],
)
async def get_feature_summary(
session_id: str,
user_id: str,
) -> dict[str, int]:
"""Get a summary of feature statuses."""
db = await get_db()
# Verify session ownership
session = await get_session(session_id, user_id)
if not session:
return {}
features = await db.longrunningfeature.find_many(
where={"sessionId": session_id}
)
summary: dict[str, int] = {
"pending": 0,
"in_progress": 0,
"passing": 0,
"failing": 0,
"blocked": 0,
"skipped": 0,
}
for f in features:
status = f.status.lower()
if status in summary:
summary[status] += 1
return summary
# === Progress Operations ===
async def create_progress_entry(
session_id: str,
user_id: str,
agent_session_id: str,
entry_type: model.ProgressEntryType,
title: str,
description: Optional[str] = None,
feature_id: Optional[str] = None,
git_commit_hash: Optional[str] = None,
files_changed: list[str] = [],
metadata: dict[str, Any] = {},
) -> Optional[LongRunningProgress]:
"""Create a progress entry."""
db = await get_db()
# Verify session ownership
session = await get_session(session_id, user_id)
if not session:
return None
# Find feature if feature_id is provided
db_feature_id = None
if feature_id:
feature = await db.longrunningfeature.find_first(
where={
"sessionId": session_id,
"featureId": feature_id,
}
)
if feature:
db_feature_id = feature.id
return await db.longrunningprogress.create(
data={
"sessionId": session_id,
"agentSessionId": agent_session_id,
"entryType": entry_type.value.upper(),
"title": title,
"description": description,
"featureId": db_feature_id,
"gitCommitHash": git_commit_hash,
"filesChanged": files_changed,
"metadata": metadata,
}
)
async def get_progress_entries(
session_id: str,
user_id: str,
limit: int = 50,
agent_session_id: Optional[str] = None,
feature_id: Optional[str] = None,
) -> list[LongRunningProgress]:
"""Get progress entries for a session."""
db = await get_db()
# Verify session ownership
session = await get_session(session_id, user_id)
if not session:
return []
where_clause: dict[str, Any] = {"sessionId": session_id}
if agent_session_id:
where_clause["agentSessionId"] = agent_session_id
if feature_id:
feature = await db.longrunningfeature.find_first(
where={
"sessionId": session_id,
"featureId": feature_id,
}
)
if feature:
where_clause["featureId"] = feature.id
return await db.longrunningprogress.find_many(
where=where_clause,
order_by={"createdAt": "desc"},
take=limit,
)

View File

@@ -0,0 +1,198 @@
"""
Pydantic models for long-running agent session API.
"""
from datetime import datetime
from enum import Enum
from typing import Any, Optional
from pydantic import BaseModel, Field
class SessionStatus(str, Enum):
"""Status of a long-running agent session."""
INITIALIZING = "initializing"
READY = "ready"
WORKING = "working"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
class FeatureStatus(str, Enum):
"""Status of a feature."""
PENDING = "pending"
IN_PROGRESS = "in_progress"
PASSING = "passing"
FAILING = "failing"
BLOCKED = "blocked"
SKIPPED = "skipped"
class FeatureCategory(str, Enum):
"""Category of a feature."""
FUNCTIONAL = "functional"
UI = "ui"
INTEGRATION = "integration"
PERFORMANCE = "performance"
SECURITY = "security"
DOCUMENTATION = "documentation"
TESTING = "testing"
INFRASTRUCTURE = "infrastructure"
class ProgressEntryType(str, Enum):
"""Type of progress entry."""
SESSION_START = "session_start"
SESSION_END = "session_end"
FEATURE_START = "feature_start"
FEATURE_COMPLETE = "feature_complete"
FEATURE_FAILED = "feature_failed"
CODE_CHANGE = "code_change"
TEST_RUN = "test_run"
BUG_FIX = "bug_fix"
ENVIRONMENT_SETUP = "environment_setup"
GIT_COMMIT = "git_commit"
NOTE = "note"
ERROR = "error"
# Request models
class CreateSessionRequest(BaseModel):
"""Request to create a new long-running session."""
project_name: str = Field(description="Name of the project")
project_description: str = Field(
description="Detailed description of the project"
)
working_directory: str = Field(
description="Directory where the project will be created"
)
features: list[dict[str, Any]] = Field(
default=[],
description="Optional list of features to initialize with",
)
class UpdateSessionRequest(BaseModel):
"""Request to update a session."""
status: Optional[SessionStatus] = None
current_session_id: Optional[str] = None
current_feature_id: Optional[str] = None
metadata: Optional[dict[str, Any]] = None
class CreateFeatureRequest(BaseModel):
"""Request to create a new feature."""
feature_id: str = Field(description="Unique identifier for the feature")
category: FeatureCategory = Field(default=FeatureCategory.FUNCTIONAL)
description: str = Field(description="Description of the feature")
steps: list[str] = Field(default=[], description="Verification steps")
priority: int = Field(default=5, ge=1, le=10)
dependencies: list[str] = Field(default=[])
class UpdateFeatureRequest(BaseModel):
"""Request to update a feature."""
status: Optional[FeatureStatus] = None
notes: Optional[str] = None
updated_by_session: Optional[str] = None
class CreateProgressEntryRequest(BaseModel):
"""Request to create a progress entry."""
agent_session_id: str = Field(description="Session ID that created this entry")
entry_type: ProgressEntryType
title: str = Field(description="Short title")
description: Optional[str] = None
feature_id: Optional[str] = None
git_commit_hash: Optional[str] = None
files_changed: list[str] = Field(default=[])
metadata: dict[str, Any] = Field(default={})
# Response models
class FeatureResponse(BaseModel):
"""Response model for a feature."""
id: str
feature_id: str
category: FeatureCategory
description: str
steps: list[str]
status: FeatureStatus
priority: int
dependencies: list[str]
notes: Optional[str]
updated_by_session: Optional[str]
created_at: datetime
updated_at: datetime
class ProgressEntryResponse(BaseModel):
"""Response model for a progress entry."""
id: str
agent_session_id: str
entry_type: ProgressEntryType
title: str
description: Optional[str]
feature_id: Optional[str]
git_commit_hash: Optional[str]
files_changed: list[str]
metadata: dict[str, Any]
created_at: datetime
class SessionResponse(BaseModel):
"""Response model for a session."""
id: str
project_name: str
project_description: str
status: SessionStatus
current_session_id: Optional[str]
session_count: int
working_directory: str
feature_list_path: Optional[str]
progress_log_path: Optional[str]
init_script_path: Optional[str]
git_repo_initialized: bool
current_feature_id: Optional[str]
environment_variables: dict[str, Any]
metadata: dict[str, Any]
created_at: datetime
updated_at: datetime
class SessionDetailResponse(SessionResponse):
"""Detailed session response with features and progress."""
features: list[FeatureResponse]
recent_progress: list[ProgressEntryResponse]
feature_summary: dict[str, int]
class SessionListResponse(BaseModel):
"""Response model for listing sessions."""
sessions: list[SessionResponse]
total: int
page: int
page_size: int
class FeatureListResponse(BaseModel):
"""Response model for listing features."""
features: list[FeatureResponse]
total: int
summary: dict[str, int]

View File

@@ -0,0 +1,652 @@
"""
API routes for long-running agent session management.
Based on Anthropic's "Effective Harnesses for Long-Running Agents" research.
https://www.anthropic.com/engineering/effective-harnesses-for-long-running-agents
"""
import logging
from typing import Optional
import autogpt_libs.auth as autogpt_auth_lib
from fastapi import APIRouter, HTTPException, Query, Security, status
from . import db, model
logger = logging.getLogger(__name__)
router = APIRouter(
prefix="/long-running",
tags=["long-running", "private"],
dependencies=[Security(autogpt_auth_lib.requires_user)],
)
# === Session Routes ===
@router.post(
"/sessions",
summary="Create Long-Running Session",
response_model=model.SessionResponse,
responses={
201: {"description": "Session created successfully"},
500: {"description": "Server error"},
},
status_code=status.HTTP_201_CREATED,
)
async def create_session(
request: model.CreateSessionRequest,
user_id: str = Security(autogpt_auth_lib.get_user_id),
) -> model.SessionResponse:
"""
Create a new long-running agent session.
This is typically called by the initializer agent when starting
a new multi-session project.
"""
try:
session = await db.create_session(
user_id=user_id,
project_name=request.project_name,
project_description=request.project_description,
working_directory=request.working_directory,
features=request.features,
)
return model.SessionResponse(
id=session.id,
project_name=session.projectName,
project_description=session.projectDescription,
status=model.SessionStatus(session.status.lower()),
current_session_id=session.currentSessionId,
session_count=session.sessionCount,
working_directory=session.workingDirectory,
feature_list_path=session.featureListPath,
progress_log_path=session.progressLogPath,
init_script_path=session.initScriptPath,
git_repo_initialized=session.gitRepoInitialized,
current_feature_id=session.currentFeatureId,
environment_variables=session.environmentVariables or {},
metadata=session.metadata or {},
created_at=session.createdAt,
updated_at=session.updatedAt,
)
except Exception as e:
logger.error(f"Failed to create session: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
) from e
@router.get(
"/sessions",
summary="List Long-Running Sessions",
response_model=model.SessionListResponse,
responses={
200: {"description": "List of sessions"},
500: {"description": "Server error"},
},
)
async def list_sessions(
user_id: str = Security(autogpt_auth_lib.get_user_id),
session_status: Optional[model.SessionStatus] = Query(
None, alias="status", description="Filter by session status"
),
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(20, ge=1, le=100, description="Page size"),
) -> model.SessionListResponse:
"""List all long-running sessions for the user."""
try:
sessions, total = await db.list_sessions(
user_id=user_id,
status=session_status,
page=page,
page_size=page_size,
)
return model.SessionListResponse(
sessions=[
model.SessionResponse(
id=s.id,
project_name=s.projectName,
project_description=s.projectDescription,
status=model.SessionStatus(s.status.lower()),
current_session_id=s.currentSessionId,
session_count=s.sessionCount,
working_directory=s.workingDirectory,
feature_list_path=s.featureListPath,
progress_log_path=s.progressLogPath,
init_script_path=s.initScriptPath,
git_repo_initialized=s.gitRepoInitialized,
current_feature_id=s.currentFeatureId,
environment_variables=s.environmentVariables or {},
metadata=s.metadata or {},
created_at=s.createdAt,
updated_at=s.updatedAt,
)
for s in sessions
],
total=total,
page=page,
page_size=page_size,
)
except Exception as e:
logger.error(f"Failed to list sessions: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
) from e
@router.get(
"/sessions/{session_id}",
summary="Get Long-Running Session",
response_model=model.SessionDetailResponse,
responses={
200: {"description": "Session details"},
404: {"description": "Session not found"},
500: {"description": "Server error"},
},
)
async def get_session(
session_id: str,
user_id: str = Security(autogpt_auth_lib.get_user_id),
) -> model.SessionDetailResponse:
"""Get detailed information about a long-running session."""
try:
session = await db.get_session_with_details(session_id, user_id)
if not session:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Session {session_id} not found",
)
# Get feature summary
summary = await db.get_feature_summary(session_id, user_id)
features = [
model.FeatureResponse(
id=f.id,
feature_id=f.featureId,
category=model.FeatureCategory(f.category.lower()),
description=f.description,
steps=f.steps or [],
status=model.FeatureStatus(f.status.lower()),
priority=f.priority,
dependencies=f.dependencies or [],
notes=f.notes,
updated_by_session=f.updatedBySession,
created_at=f.createdAt,
updated_at=f.updatedAt,
)
for f in (session.Features or [])
]
progress = [
model.ProgressEntryResponse(
id=p.id,
agent_session_id=p.agentSessionId,
entry_type=model.ProgressEntryType(p.entryType.lower()),
title=p.title,
description=p.description,
feature_id=p.featureId,
git_commit_hash=p.gitCommitHash,
files_changed=p.filesChanged or [],
metadata=p.metadata or {},
created_at=p.createdAt,
)
for p in (session.ProgressLog or [])
]
return model.SessionDetailResponse(
id=session.id,
project_name=session.projectName,
project_description=session.projectDescription,
status=model.SessionStatus(session.status.lower()),
current_session_id=session.currentSessionId,
session_count=session.sessionCount,
working_directory=session.workingDirectory,
feature_list_path=session.featureListPath,
progress_log_path=session.progressLogPath,
init_script_path=session.initScriptPath,
git_repo_initialized=session.gitRepoInitialized,
current_feature_id=session.currentFeatureId,
environment_variables=session.environmentVariables or {},
metadata=session.metadata or {},
created_at=session.createdAt,
updated_at=session.updatedAt,
features=features,
recent_progress=progress,
feature_summary=summary,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to get session: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
) from e
@router.patch(
"/sessions/{session_id}",
summary="Update Long-Running Session",
response_model=model.SessionResponse,
responses={
200: {"description": "Session updated"},
404: {"description": "Session not found"},
500: {"description": "Server error"},
},
)
async def update_session(
session_id: str,
request: model.UpdateSessionRequest,
user_id: str = Security(autogpt_auth_lib.get_user_id),
) -> model.SessionResponse:
"""Update a long-running session's status or metadata."""
try:
session = await db.update_session(
session_id=session_id,
user_id=user_id,
status=request.status,
current_session_id=request.current_session_id,
current_feature_id=request.current_feature_id,
metadata=request.metadata,
)
if not session:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Session {session_id} not found",
)
return model.SessionResponse(
id=session.id,
project_name=session.projectName,
project_description=session.projectDescription,
status=model.SessionStatus(session.status.lower()),
current_session_id=session.currentSessionId,
session_count=session.sessionCount,
working_directory=session.workingDirectory,
feature_list_path=session.featureListPath,
progress_log_path=session.progressLogPath,
init_script_path=session.initScriptPath,
git_repo_initialized=session.gitRepoInitialized,
current_feature_id=session.currentFeatureId,
environment_variables=session.environmentVariables or {},
metadata=session.metadata or {},
created_at=session.createdAt,
updated_at=session.updatedAt,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to update session: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
) from e
@router.delete(
"/sessions/{session_id}",
summary="Delete Long-Running Session",
responses={
204: {"description": "Session deleted"},
404: {"description": "Session not found"},
500: {"description": "Server error"},
},
status_code=status.HTTP_204_NO_CONTENT,
)
async def delete_session(
session_id: str,
user_id: str = Security(autogpt_auth_lib.get_user_id),
) -> None:
"""Delete a long-running session and all related data."""
try:
success = await db.delete_session(session_id, user_id)
if not success:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Session {session_id} not found",
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to delete session: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
) from e
# === Feature Routes ===
@router.post(
"/sessions/{session_id}/features",
summary="Create Feature",
response_model=model.FeatureResponse,
responses={
201: {"description": "Feature created"},
404: {"description": "Session not found"},
500: {"description": "Server error"},
},
status_code=status.HTTP_201_CREATED,
)
async def create_feature(
session_id: str,
request: model.CreateFeatureRequest,
user_id: str = Security(autogpt_auth_lib.get_user_id),
) -> model.FeatureResponse:
"""Create a new feature for a session."""
try:
feature = await db.create_feature(
session_id=session_id,
user_id=user_id,
feature_id=request.feature_id,
description=request.description,
category=request.category,
steps=request.steps,
priority=request.priority,
dependencies=request.dependencies,
)
if not feature:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Session {session_id} not found",
)
return model.FeatureResponse(
id=feature.id,
feature_id=feature.featureId,
category=model.FeatureCategory(feature.category.lower()),
description=feature.description,
steps=feature.steps or [],
status=model.FeatureStatus(feature.status.lower()),
priority=feature.priority,
dependencies=feature.dependencies or [],
notes=feature.notes,
updated_by_session=feature.updatedBySession,
created_at=feature.createdAt,
updated_at=feature.updatedAt,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to create feature: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
) from e
@router.get(
"/sessions/{session_id}/features",
summary="List Features",
response_model=model.FeatureListResponse,
responses={
200: {"description": "List of features"},
404: {"description": "Session not found"},
500: {"description": "Server error"},
},
)
async def list_features(
session_id: str,
user_id: str = Security(autogpt_auth_lib.get_user_id),
feature_status: Optional[model.FeatureStatus] = Query(
None, alias="status", description="Filter by feature status"
),
) -> model.FeatureListResponse:
"""List all features for a session."""
try:
features = await db.get_features(session_id, user_id, feature_status)
summary = await db.get_feature_summary(session_id, user_id)
return model.FeatureListResponse(
features=[
model.FeatureResponse(
id=f.id,
feature_id=f.featureId,
category=model.FeatureCategory(f.category.lower()),
description=f.description,
steps=f.steps or [],
status=model.FeatureStatus(f.status.lower()),
priority=f.priority,
dependencies=f.dependencies or [],
notes=f.notes,
updated_by_session=f.updatedBySession,
created_at=f.createdAt,
updated_at=f.updatedAt,
)
for f in features
],
total=len(features),
summary=summary,
)
except Exception as e:
logger.error(f"Failed to list features: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
) from e
@router.get(
"/sessions/{session_id}/features/next",
summary="Get Next Feature",
response_model=Optional[model.FeatureResponse],
responses={
200: {"description": "Next feature to work on"},
404: {"description": "Session not found or no features available"},
500: {"description": "Server error"},
},
)
async def get_next_feature(
session_id: str,
user_id: str = Security(autogpt_auth_lib.get_user_id),
) -> Optional[model.FeatureResponse]:
"""Get the next feature to work on (highest priority pending/failing)."""
try:
feature = await db.get_next_feature(session_id, user_id)
if not feature:
return None
return model.FeatureResponse(
id=feature.id,
feature_id=feature.featureId,
category=model.FeatureCategory(feature.category.lower()),
description=feature.description,
steps=feature.steps or [],
status=model.FeatureStatus(feature.status.lower()),
priority=feature.priority,
dependencies=feature.dependencies or [],
notes=feature.notes,
updated_by_session=feature.updatedBySession,
created_at=feature.createdAt,
updated_at=feature.updatedAt,
)
except Exception as e:
logger.error(f"Failed to get next feature: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
) from e
@router.patch(
"/sessions/{session_id}/features/{feature_id}",
summary="Update Feature",
response_model=model.FeatureResponse,
responses={
200: {"description": "Feature updated"},
404: {"description": "Feature not found"},
500: {"description": "Server error"},
},
)
async def update_feature(
session_id: str,
feature_id: str,
request: model.UpdateFeatureRequest,
user_id: str = Security(autogpt_auth_lib.get_user_id),
) -> model.FeatureResponse:
"""Update a feature's status."""
try:
feature = await db.update_feature(
session_id=session_id,
feature_id=feature_id,
user_id=user_id,
status=request.status,
notes=request.notes,
updated_by_session=request.updated_by_session,
)
if not feature:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Feature {feature_id} not found in session {session_id}",
)
return model.FeatureResponse(
id=feature.id,
feature_id=feature.featureId,
category=model.FeatureCategory(feature.category.lower()),
description=feature.description,
steps=feature.steps or [],
status=model.FeatureStatus(feature.status.lower()),
priority=feature.priority,
dependencies=feature.dependencies or [],
notes=feature.notes,
updated_by_session=feature.updatedBySession,
created_at=feature.createdAt,
updated_at=feature.updatedAt,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to update feature: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
) from e
# === Progress Routes ===
@router.post(
"/sessions/{session_id}/progress",
summary="Create Progress Entry",
response_model=model.ProgressEntryResponse,
responses={
201: {"description": "Progress entry created"},
404: {"description": "Session not found"},
500: {"description": "Server error"},
},
status_code=status.HTTP_201_CREATED,
)
async def create_progress_entry(
session_id: str,
request: model.CreateProgressEntryRequest,
user_id: str = Security(autogpt_auth_lib.get_user_id),
) -> model.ProgressEntryResponse:
"""Create a progress entry for a session."""
try:
entry = await db.create_progress_entry(
session_id=session_id,
user_id=user_id,
agent_session_id=request.agent_session_id,
entry_type=request.entry_type,
title=request.title,
description=request.description,
feature_id=request.feature_id,
git_commit_hash=request.git_commit_hash,
files_changed=request.files_changed,
metadata=request.metadata,
)
if not entry:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Session {session_id} not found",
)
return model.ProgressEntryResponse(
id=entry.id,
agent_session_id=entry.agentSessionId,
entry_type=model.ProgressEntryType(entry.entryType.lower()),
title=entry.title,
description=entry.description,
feature_id=entry.featureId,
git_commit_hash=entry.gitCommitHash,
files_changed=entry.filesChanged or [],
metadata=entry.metadata or {},
created_at=entry.createdAt,
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to create progress entry: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
) from e
@router.get(
"/sessions/{session_id}/progress",
summary="Get Progress Entries",
response_model=list[model.ProgressEntryResponse],
responses={
200: {"description": "List of progress entries"},
404: {"description": "Session not found"},
500: {"description": "Server error"},
},
)
async def get_progress_entries(
session_id: str,
user_id: str = Security(autogpt_auth_lib.get_user_id),
limit: int = Query(50, ge=1, le=200, description="Maximum entries to return"),
agent_session_id: Optional[str] = Query(
None, description="Filter by agent session ID"
),
feature_id: Optional[str] = Query(None, description="Filter by feature ID"),
) -> list[model.ProgressEntryResponse]:
"""Get progress entries for a session."""
try:
entries = await db.get_progress_entries(
session_id=session_id,
user_id=user_id,
limit=limit,
agent_session_id=agent_session_id,
feature_id=feature_id,
)
return [
model.ProgressEntryResponse(
id=e.id,
agent_session_id=e.agentSessionId,
entry_type=model.ProgressEntryType(e.entryType.lower()),
title=e.title,
description=e.description,
feature_id=e.featureId,
git_commit_hash=e.gitCommitHash,
files_changed=e.filesChanged or [],
metadata=e.metadata or {},
created_at=e.createdAt,
)
for e in entries
]
except Exception as e:
logger.error(f"Failed to get progress entries: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=str(e),
) from e

View File

@@ -60,6 +60,9 @@ model User {
IntegrationWebhooks IntegrationWebhook[]
NotificationBatches UserNotificationBatch[]
PendingHumanReviews PendingHumanReview[]
// Long-running agent sessions
LongRunningSessions LongRunningSession[]
}
enum OnboardingStep {
@@ -959,3 +962,165 @@ enum APIKeyStatus {
REVOKED
SUSPENDED
}
////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////
////////// LONG-RUNNING AGENT SESSION TABLES //////////
////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////
// Status of a long-running agent session
enum LongRunningSessionStatus {
INITIALIZING
READY
WORKING
PAUSED
COMPLETED
FAILED
}
// Status of a feature in a long-running session
enum LongRunningFeatureStatus {
PENDING
IN_PROGRESS
PASSING
FAILING
BLOCKED
SKIPPED
}
// Category of a feature
enum LongRunningFeatureCategory {
FUNCTIONAL
UI
INTEGRATION
PERFORMANCE
SECURITY
DOCUMENTATION
TESTING
INFRASTRUCTURE
}
// Type of progress entry
enum LongRunningProgressEntryType {
SESSION_START
SESSION_END
FEATURE_START
FEATURE_COMPLETE
FEATURE_FAILED
CODE_CHANGE
TEST_RUN
BUG_FIX
ENVIRONMENT_SETUP
GIT_COMMIT
NOTE
ERROR
}
// Main session state for long-running agent projects
// Based on Anthropic's "Effective Harnesses for Long-Running Agents" research
model LongRunningSession {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// Project info
projectName String
projectDescription String @db.Text
// Status tracking
status LongRunningSessionStatus @default(INITIALIZING)
currentSessionId String?
sessionCount Int @default(0)
// File paths (for file-based state management)
workingDirectory String
featureListPath String?
progressLogPath String?
initScriptPath String?
// Git tracking
gitRepoInitialized Boolean @default(false)
currentFeatureId String?
// Additional metadata
environmentVariables Json @default("{}")
metadata Json @default("{}")
// Link to User model
userId String
User User @relation(fields: [userId], references: [id], onDelete: Cascade)
// Relations
Features LongRunningFeature[]
ProgressLog LongRunningProgress[]
@@index([userId])
@@index([status])
}
// Features for a long-running session
model LongRunningFeature {
id String @id @default(uuid())
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
// Feature info
featureId String // User-defined feature ID (e.g., "feature_001")
category LongRunningFeatureCategory @default(FUNCTIONAL)
description String @db.Text
steps String[] @default([])
// Status tracking
status LongRunningFeatureStatus @default(PENDING)
priority Int @default(5) // 1 = highest, 10 = lowest
// Dependencies
dependencies String[] @default([])
// Notes and tracking
notes String?
updatedBySession String?
// Link to session
sessionId String
Session LongRunningSession @relation(fields: [sessionId], references: [id], onDelete: Cascade)
// Progress entries related to this feature
ProgressEntries LongRunningProgress[]
@@unique([sessionId, featureId])
@@index([sessionId, status])
@@index([sessionId, priority])
}
// Progress entries for a long-running session
model LongRunningProgress {
id String @id @default(uuid())
createdAt DateTime @default(now())
// Entry info
agentSessionId String // The session that created this entry
entryType LongRunningProgressEntryType
// Content
title String
description String? @db.Text
// Related entities
gitCommitHash String?
filesChanged String[] @default([])
metadata Json @default("{}")
// Link to session
sessionId String
Session LongRunningSession @relation(fields: [sessionId], references: [id], onDelete: Cascade)
// Optional link to feature
featureId String?
Feature LongRunningFeature? @relation(fields: [featureId], references: [id], onDelete: SetNull)
@@index([sessionId, createdAt])
@@index([agentSessionId])
@@index([featureId])
}