From 60f506add91e0ee8b3e0359a3ede582c2b85ee1a Mon Sep 17 00:00:00 2001 From: Nicholas Tindle Date: Tue, 3 Feb 2026 18:26:15 -0600 Subject: [PATCH] feat(classic): add Agent Skills (SKILL.md) support Implement the open Agent Skills standard for Classic AutoGPT, enabling modular, progressively-loaded capabilities via SKILL.md files. Skills are discovered from workspace (.autogpt/skills) and global (~/.autogpt/skills) directories with three-level progressive disclosure to minimize token usage. Co-Authored-By: Claude Opus 4.5 --- .../forge/forge/components/skills/__init__.py | 15 + .../components/skills/skill_component.py | 322 +++++++++ .../forge/components/skills/skill_model.py | 124 ++++ .../forge/components/skills/skill_parser.py | 234 +++++++ .../forge/components/skills/test_skills.py | 656 ++++++++++++++++++ .../original_autogpt/autogpt/agents/agent.py | 12 + 6 files changed, 1363 insertions(+) create mode 100644 classic/forge/forge/components/skills/__init__.py create mode 100644 classic/forge/forge/components/skills/skill_component.py create mode 100644 classic/forge/forge/components/skills/skill_model.py create mode 100644 classic/forge/forge/components/skills/skill_parser.py create mode 100644 classic/forge/forge/components/skills/test_skills.py diff --git a/classic/forge/forge/components/skills/__init__.py b/classic/forge/forge/components/skills/__init__.py new file mode 100644 index 0000000000..a8efa00d82 --- /dev/null +++ b/classic/forge/forge/components/skills/__init__.py @@ -0,0 +1,15 @@ +"""Agent Skills (SKILL.md) support for Classic AutoGPT.""" + +from .skill_component import SkillComponent +from .skill_model import Skill, SkillConfiguration, SkillLoadLevel, SkillMetadata +from .skill_parser import SkillParseError, discover_skills + +__all__ = [ + "SkillComponent", + "SkillConfiguration", + "Skill", + "SkillLoadLevel", + "SkillMetadata", + "SkillParseError", + "discover_skills", +] diff --git a/classic/forge/forge/components/skills/skill_component.py b/classic/forge/forge/components/skills/skill_component.py new file mode 100644 index 0000000000..a85720cf99 --- /dev/null +++ b/classic/forge/forge/components/skills/skill_component.py @@ -0,0 +1,322 @@ +""" +SkillComponent - Provides Agent Skills (SKILL.md) support for Classic AutoGPT. + +This component implements the open Agent Skills standard, enabling modular, +progressively-loaded capabilities via markdown-based skill files. + +See: https://platform.claude.com/docs/en/agents-and-tools/agent-skills/overview +""" + +import logging +from typing import Iterator, Optional + +from forge.agent.components import ConfigurableComponent +from forge.agent.protocols import CommandProvider, DirectiveProvider, MessageProvider +from forge.command import Command, command +from forge.llm.providers import ChatMessage +from forge.models.json_schema import JSONSchema + +from .skill_model import Skill, SkillConfiguration, SkillLoadLevel +from .skill_parser import ( + SkillParseError, + discover_skills, + load_skill_content, + load_skill_file, +) + +logger = logging.getLogger(__name__) + + +class SkillComponent( + DirectiveProvider, + MessageProvider, + CommandProvider, + ConfigurableComponent[SkillConfiguration], +): + """ + Component that provides Agent Skills support. + + Skills are modular capabilities defined by SKILL.md files. They use + progressive disclosure to minimize token usage: + - Level 1: Metadata always loaded (~100 tokens/skill) + - Level 2: Full SKILL.md loaded when triggered (~500-5000 tokens) + - Level 3: Additional files loaded on demand + """ + + config_class = SkillConfiguration + + def __init__(self, config: Optional[SkillConfiguration] = None): + ConfigurableComponent.__init__(self, config) + # All discovered skills (Level 1 - metadata only) + self._available_skills: dict[str, Skill] = {} + # Skills with full content loaded (Level 2+) + self._loaded_skills: dict[str, Skill] = {} + # Discover skills on initialization + self._discover_skills() + + def _discover_skills(self) -> None: + """Discover all skills in configured directories.""" + skills = discover_skills(self.config.skill_directories) + self._available_skills = {skill.metadata.name: skill for skill in skills} + logger.info( + f"Discovered {len(self._available_skills)} skills: " + f"{list(self._available_skills.keys())}" + ) + + # ------------------------------------------------------------------------- + # DirectiveProvider methods + # ------------------------------------------------------------------------- + + def get_resources(self) -> Iterator[str]: + if self._available_skills: + yield ( + "You have access to modular skills that provide specialized " + "capabilities. Use `list_skills` to see available skills, " + "and `load_skill` to activate one when needed." + ) + + def get_best_practices(self) -> Iterator[str]: + if self._available_skills: + yield ( + "Only load skills when you actually need their capabilities. " + "Unload skills when you're done to conserve context space." + ) + yield ( + "Before implementing complex functionality, check if a skill " + "already provides the capability you need." + ) + + # ------------------------------------------------------------------------- + # MessageProvider methods + # ------------------------------------------------------------------------- + + def get_messages(self) -> Iterator[ChatMessage]: + # Always provide skill catalog if skills are available + if self._available_skills: + catalog_lines = ["## Available Skills"] + for name, skill in self._available_skills.items(): + loaded_marker = " [LOADED]" if name in self._loaded_skills else "" + catalog_lines.append( + f"- **{name}**{loaded_marker}: {skill.metadata.description}" + ) + yield ChatMessage.system("\n".join(catalog_lines)) + + # Provide loaded skill content + for name, skill in self._loaded_skills.items(): + if skill.load_level >= SkillLoadLevel.FULL_CONTENT and skill.content: + skill_content = [f"## Skill: {name}"] + skill_content.append("") + skill_content.append(skill.content) + + # Show available additional files + additional_files = skill.list_additional_files() + if additional_files: + skill_content.append("") + skill_content.append("### Additional Files Available") + for f in additional_files: + loaded = " [loaded]" if f in skill.additional_files else "" + skill_content.append(f"- `{f}`{loaded}") + + yield ChatMessage.system("\n".join(skill_content)) + + # ------------------------------------------------------------------------- + # CommandProvider methods + # ------------------------------------------------------------------------- + + def get_commands(self) -> Iterator[Command]: + if self._available_skills: + yield self.list_skills + yield self.load_skill + if self._loaded_skills: + yield self.unload_skill + yield self.read_skill_file + + @command( + names=["list_skills"], + description="List all available skills with their descriptions", + parameters={}, + ) + def list_skills(self) -> str: + """List all available skills with their metadata. + + Returns: + str: Formatted list of available skills + """ + if not self._available_skills: + return "No skills available. Skills can be added to .autogpt/skills/" + + lines = ["Available Skills:", ""] + for name, skill in self._available_skills.items(): + loaded = " [LOADED]" if name in self._loaded_skills else "" + lines.append(f"**{name}**{loaded}") + lines.append(f" Description: {skill.metadata.description}") + if skill.metadata.author: + lines.append(f" Author: {skill.metadata.author}") + if skill.metadata.version: + lines.append(f" Version: {skill.metadata.version}") + if skill.metadata.tags: + lines.append(f" Tags: {', '.join(skill.metadata.tags)}") + lines.append("") + + return "\n".join(lines) + + @command( + names=["load_skill"], + description="Load a skill's full content to use its capabilities", + parameters={ + "skill_name": JSONSchema( + type=JSONSchema.Type.STRING, + description="The name of the skill to load", + required=True, + ) + }, + ) + def load_skill(self, skill_name: str) -> str: + """Load a skill's full content (Level 2). + + Args: + skill_name: The name of the skill to load + + Returns: + str: Status message indicating success or failure + """ + if skill_name not in self._available_skills: + available = ", ".join(self._available_skills.keys()) + return f"Skill '{skill_name}' not found. Available skills: {available}" + + if skill_name in self._loaded_skills: + return f"Skill '{skill_name}' is already loaded." + + # Check if we've hit the max loaded skills limit + if len(self._loaded_skills) >= self.config.max_loaded_skills: + loaded = ", ".join(self._loaded_skills.keys()) + return ( + f"Cannot load skill '{skill_name}': maximum of " + f"{self.config.max_loaded_skills} skills already loaded ({loaded}). " + f"Unload a skill first using `unload_skill`." + ) + + skill = self._available_skills[skill_name] + try: + loaded_skill = load_skill_content(skill) + self._loaded_skills[skill_name] = loaded_skill + self._available_skills[skill_name] = loaded_skill + + return ( + f"Skill '{skill_name}' loaded successfully. " + f"Its instructions are now available in the context." + ) + except SkillParseError as e: + logger.error(f"Failed to load skill '{skill_name}': {e}") + return f"Failed to load skill '{skill_name}': {e}" + + @command( + names=["unload_skill"], + description="Unload a skill to free up context space", + parameters={ + "skill_name": JSONSchema( + type=JSONSchema.Type.STRING, + description="The name of the skill to unload", + required=True, + ) + }, + ) + def unload_skill(self, skill_name: str) -> str: + """Unload a skill to free context space. + + Args: + skill_name: The name of the skill to unload + + Returns: + str: Status message indicating success or failure + """ + if skill_name not in self._loaded_skills: + if skill_name in self._available_skills: + return f"Skill '{skill_name}' is not currently loaded." + return f"Skill '{skill_name}' not found." + + del self._loaded_skills[skill_name] + + # Reset the skill to metadata-only state + skill = self._available_skills[skill_name] + skill.content = None + skill.additional_files.clear() + skill.load_level = SkillLoadLevel.METADATA + + return f"Skill '{skill_name}' has been unloaded." + + @command( + names=["read_skill_file"], + description="Read an additional file from a loaded skill", + parameters={ + "skill_name": JSONSchema( + type=JSONSchema.Type.STRING, + description="The name of the skill", + required=True, + ), + "filename": JSONSchema( + type=JSONSchema.Type.STRING, + description="The name of the file to read", + required=True, + ), + }, + ) + def read_skill_file(self, skill_name: str, filename: str) -> str: + """Read an additional file from a skill (Level 3). + + Args: + skill_name: The name of the skill + filename: The name of the file to read + + Returns: + str: The content of the file or an error message + """ + if skill_name not in self._loaded_skills: + if skill_name in self._available_skills: + return ( + f"Skill '{skill_name}' must be loaded first. " + f'Use `load_skill("{skill_name}")` to load it.' + ) + return f"Skill '{skill_name}' not found." + + skill = self._loaded_skills[skill_name] + + # Check if already loaded + if filename in skill.additional_files: + return skill.additional_files[filename] + + # List available files for better error messages + available_files = skill.list_additional_files() + if filename not in available_files: + if available_files: + return ( + f"File '{filename}' not found in skill '{skill_name}'. " + f"Available files: {', '.join(available_files)}" + ) + return f"Skill '{skill_name}' has no additional files." + + try: + content = load_skill_file(skill, filename) + return content + except (FileNotFoundError, ValueError, SkillParseError) as e: + logger.error(f"Failed to read file '{filename}' from skill: {e}") + return f"Failed to read file '{filename}': {e}" + + # ------------------------------------------------------------------------- + # Public API + # ------------------------------------------------------------------------- + + def refresh_skills(self) -> None: + """Re-discover skills from configured directories.""" + self._loaded_skills.clear() + self._discover_skills() + + @property + def available_skills(self) -> dict[str, Skill]: + """Get all discovered skills.""" + return self._available_skills.copy() + + @property + def loaded_skills(self) -> dict[str, Skill]: + """Get currently loaded skills.""" + return self._loaded_skills.copy() diff --git a/classic/forge/forge/components/skills/skill_model.py b/classic/forge/forge/components/skills/skill_model.py new file mode 100644 index 0000000000..6fa7bca23e --- /dev/null +++ b/classic/forge/forge/components/skills/skill_model.py @@ -0,0 +1,124 @@ +"""Data models for Agent Skills (SKILL.md) support.""" + +import re +from enum import IntEnum +from pathlib import Path +from typing import Optional + +from pydantic import BaseModel, ConfigDict, Field, field_validator + + +class SkillLoadLevel(IntEnum): + """Progressive disclosure levels for skill loading.""" + + METADATA = 1 # Always loaded (~100 tokens/skill) + FULL_CONTENT = 2 # Loaded when triggered (~500-5000 tokens) + ADDITIONAL = 3 # On-demand files + + +class SkillMetadata(BaseModel): + """Metadata parsed from SKILL.md frontmatter.""" + + model_config = ConfigDict(populate_by_name=True) + + name: str = Field( + ..., + description="Skill name (lowercase, alphanumeric, hyphens, max 64 chars)", + max_length=64, + ) + description: str = Field( + ..., + description="Skill description (max 1024 chars)", + max_length=1024, + ) + license: Optional[str] = Field( + default=None, + description="License for the skill", + ) + allowed_tools: Optional[list[str]] = Field( + default=None, + alias="allowed-tools", + description="List of tools this skill is allowed to use", + ) + author: Optional[str] = Field( + default=None, + description="Skill author", + ) + version: Optional[str] = Field( + default=None, + description="Skill version", + ) + tags: Optional[list[str]] = Field( + default=None, + description="Tags for categorizing the skill", + ) + + @field_validator("name") + @classmethod + def validate_name(cls, v: str) -> str: + """Validate skill name follows spec: lowercase, alphanumeric, hyphens.""" + if not re.match(r"^[a-z0-9-]+$", v): + raise ValueError( + "Skill name must be lowercase, alphanumeric, " + "and may contain hyphens only" + ) + return v + + +class Skill(BaseModel): + """Represents a loaded skill with its metadata and content.""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + path: Path = Field( + ..., + description="Path to the skill directory", + ) + metadata: SkillMetadata = Field( + ..., + description="Parsed skill metadata from frontmatter", + ) + content: Optional[str] = Field( + default=None, + description="Full SKILL.md content (body, excluding frontmatter)", + ) + additional_files: dict[str, str] = Field( + default_factory=dict, + description="Additional files loaded on demand", + ) + load_level: SkillLoadLevel = Field( + default=SkillLoadLevel.METADATA, + description="Current load level of the skill", + ) + + @property + def skill_md_path(self) -> Path: + """Path to the SKILL.md file.""" + return self.path / "SKILL.md" + + def list_additional_files(self) -> list[str]: + """List available additional files in the skill directory.""" + files = [] + if self.path.exists(): + for item in self.path.iterdir(): + if item.is_file() and item.name != "SKILL.md": + files.append(item.name) + return files + + +class SkillConfiguration(BaseModel): + """Configuration for the SkillComponent.""" + + skill_directories: list[Path] = Field( + default_factory=lambda: [ + Path(".autogpt/skills"), + Path.home() / ".autogpt/skills", + ], + description="Directories to search for skills", + ) + max_loaded_skills: int = Field( + default=5, + description="Maximum number of skills that can be fully loaded at once", + ge=1, + le=20, + ) diff --git a/classic/forge/forge/components/skills/skill_parser.py b/classic/forge/forge/components/skills/skill_parser.py new file mode 100644 index 0000000000..2967e0a657 --- /dev/null +++ b/classic/forge/forge/components/skills/skill_parser.py @@ -0,0 +1,234 @@ +"""Parsing utilities for SKILL.md files.""" + +import logging +import re +from pathlib import Path +from typing import Optional + +import yaml +from pydantic import ValidationError + +from .skill_model import Skill, SkillLoadLevel, SkillMetadata + +logger = logging.getLogger(__name__) + + +class SkillParseError(Exception): + """Raised when a SKILL.md file cannot be parsed.""" + + pass + + +def _extract_frontmatter(content: str) -> tuple[Optional[str], str]: + """ + Extract YAML frontmatter from markdown content. + + Args: + content: Full markdown content with optional frontmatter + + Returns: + Tuple of (frontmatter_yaml, body_content). + frontmatter_yaml is None if no frontmatter found. + """ + # Match frontmatter: starts with ---, ends with --- + pattern = r"^---\s*\n(.*?)\n---\s*\n?(.*)" + match = re.match(pattern, content, re.DOTALL) + + if match: + return match.group(1), match.group(2) + return None, content + + +def parse_skill_md(skill_path: Path) -> Skill: + """ + Parse a SKILL.md file and return a Skill with metadata only (Level 1). + + Args: + skill_path: Path to the skill directory containing SKILL.md + + Returns: + Skill object with metadata loaded + + Raises: + SkillParseError: If the SKILL.md file cannot be parsed + FileNotFoundError: If SKILL.md doesn't exist + """ + skill_md_file = skill_path / "SKILL.md" + + if not skill_md_file.exists(): + raise FileNotFoundError(f"SKILL.md not found at {skill_md_file}") + + try: + content = skill_md_file.read_text(encoding="utf-8") + except Exception as e: + raise SkillParseError(f"Failed to read SKILL.md: {e}") from e + + frontmatter_yaml, body = _extract_frontmatter(content) + + if frontmatter_yaml is None: + raise SkillParseError( + f"SKILL.md at {skill_path} missing required YAML frontmatter" + ) + + try: + frontmatter_data = yaml.safe_load(frontmatter_yaml) + except yaml.YAMLError as e: + raise SkillParseError(f"Invalid YAML frontmatter in {skill_path}: {e}") from e + + if not isinstance(frontmatter_data, dict): + raise SkillParseError( + f"YAML frontmatter in {skill_path} must be a mapping, " + f"got {type(frontmatter_data).__name__}" + ) + + # Handle nested metadata field if present + if "metadata" in frontmatter_data: + metadata_section = frontmatter_data.pop("metadata") + if isinstance(metadata_section, dict): + # Merge metadata fields into top level (author, version, tags) + for key in ["author", "version", "tags"]: + if key in metadata_section and key not in frontmatter_data: + frontmatter_data[key] = metadata_section[key] + + try: + metadata = SkillMetadata(**frontmatter_data) + except ValidationError as e: + raise SkillParseError( + f"Invalid metadata in SKILL.md at {skill_path}: {e}" + ) from e + + return Skill( + path=skill_path, + metadata=metadata, + content=None, # Content not loaded at Level 1 + load_level=SkillLoadLevel.METADATA, + ) + + +def load_skill_content(skill: Skill) -> Skill: + """ + Load the full content of a skill's SKILL.md (Level 2). + + Args: + skill: Skill object with metadata loaded + + Returns: + Updated Skill object with full content loaded + + Raises: + SkillParseError: If content cannot be loaded + """ + if skill.load_level == SkillLoadLevel.FULL_CONTENT: + return skill # Already loaded + + skill_md_file = skill.path / "SKILL.md" + + try: + content = skill_md_file.read_text(encoding="utf-8") + except Exception as e: + raise SkillParseError(f"Failed to read SKILL.md: {e}") from e + + _, body = _extract_frontmatter(content) + + skill.content = body.strip() + skill.load_level = SkillLoadLevel.FULL_CONTENT + + return skill + + +def load_skill_file(skill: Skill, filename: str) -> str: + """ + Load an additional file from a skill directory (Level 3). + + Args: + skill: Skill object + filename: Name of the file to load + + Returns: + Content of the file + + Raises: + FileNotFoundError: If file doesn't exist + SkillParseError: If file cannot be read + ValueError: If filename is invalid (attempts path traversal) + """ + # Security: prevent path traversal + if ".." in filename or filename.startswith("/"): + raise ValueError(f"Invalid filename: {filename}") + + file_path = skill.path / filename + + # Verify the resolved path is still within the skill directory + try: + file_path.resolve().relative_to(skill.path.resolve()) + except ValueError: + raise ValueError(f"Invalid filename: {filename}") + + if not file_path.exists(): + raise FileNotFoundError(f"File not found: {filename}") + + if not file_path.is_file(): + raise ValueError(f"Not a file: {filename}") + + try: + content = file_path.read_text(encoding="utf-8") + except Exception as e: + raise SkillParseError(f"Failed to read {filename}: {e}") from e + + # Cache the loaded file + skill.additional_files[filename] = content + skill.load_level = SkillLoadLevel.ADDITIONAL + + return content + + +def discover_skills(directories: list[Path]) -> list[Skill]: + """ + Discover all skills in the given directories. + + Args: + directories: List of directories to search for skills + + Returns: + List of Skill objects with metadata loaded (Level 1) + """ + skills: list[Skill] = [] + seen_names: set[str] = set() + + for directory in directories: + if not directory.exists(): + logger.debug(f"Skill directory does not exist: {directory}") + continue + + if not directory.is_dir(): + logger.warning(f"Skill path is not a directory: {directory}") + continue + + for item in directory.iterdir(): + if not item.is_dir(): + continue + + skill_md = item / "SKILL.md" + if not skill_md.exists(): + continue + + try: + skill = parse_skill_md(item) + + # Skip duplicates (first occurrence wins) + if skill.metadata.name in seen_names: + logger.warning( + f"Duplicate skill name '{skill.metadata.name}' " + f"found at {item}, skipping" + ) + continue + + seen_names.add(skill.metadata.name) + skills.append(skill) + logger.debug(f"Discovered skill: {skill.metadata.name} at {item}") + + except (SkillParseError, FileNotFoundError) as e: + logger.warning(f"Failed to parse skill at {item}: {e}") + continue + + return skills diff --git a/classic/forge/forge/components/skills/test_skills.py b/classic/forge/forge/components/skills/test_skills.py new file mode 100644 index 0000000000..d58e73da71 --- /dev/null +++ b/classic/forge/forge/components/skills/test_skills.py @@ -0,0 +1,656 @@ +"""Tests for Agent Skills (SKILL.md) support.""" + +from pathlib import Path +from tempfile import TemporaryDirectory + +import pytest + +from .skill_component import SkillComponent +from .skill_model import SkillConfiguration, SkillLoadLevel, SkillMetadata +from .skill_parser import ( + SkillParseError, + _extract_frontmatter, + discover_skills, + load_skill_content, + load_skill_file, + parse_skill_md, +) + + +class TestSkillMetadata: + """Tests for SkillMetadata model validation.""" + + def test_valid_metadata(self): + """Test creating valid metadata.""" + meta = SkillMetadata( + name="test-skill", + description="A test skill", + author="Test Author", + version="1.0.0", + tags=["test", "example"], + ) + assert meta.name == "test-skill" + assert meta.description == "A test skill" + + def test_name_validation_lowercase(self): + """Test that name must be lowercase.""" + with pytest.raises(ValueError, match="lowercase"): + SkillMetadata(name="Test-Skill", description="A test") + + def test_name_validation_no_spaces(self): + """Test that name cannot contain spaces.""" + with pytest.raises(ValueError, match="lowercase"): + SkillMetadata(name="test skill", description="A test") + + def test_name_validation_alphanumeric_hyphens(self): + """Test that name can only have alphanumeric and hyphens.""" + with pytest.raises(ValueError, match="alphanumeric"): + SkillMetadata(name="test_skill", description="A test") + + def test_name_max_length(self): + """Test that name cannot exceed 64 characters.""" + with pytest.raises(ValueError): + SkillMetadata(name="a" * 65, description="A test") + + def test_description_max_length(self): + """Test that description cannot exceed 1024 characters.""" + with pytest.raises(ValueError): + SkillMetadata(name="test", description="a" * 1025) + + def test_allowed_tools_alias(self): + """Test that allowed-tools YAML field maps to allowed_tools.""" + # Simulate what pydantic does when parsing YAML with the alias field name + data = { + "name": "test", + "description": "test", + "allowed-tools": ["tool1", "tool2"], + } + meta = SkillMetadata.model_validate(data) + assert meta.allowed_tools == ["tool1", "tool2"] + + +class TestFrontmatterExtraction: + """Tests for YAML frontmatter extraction.""" + + def test_extract_valid_frontmatter(self): + """Test extracting valid frontmatter.""" + content = """--- +name: test-skill +description: A test skill +--- +# Skill Content +Here is the body.""" + + yaml_part, body = _extract_frontmatter(content) + assert yaml_part is not None + assert "name: test-skill" in yaml_part + assert body.strip().startswith("# Skill Content") + + def test_no_frontmatter(self): + """Test content without frontmatter.""" + content = "# Just markdown\nNo frontmatter here." + + yaml_part, body = _extract_frontmatter(content) + assert yaml_part is None + assert body == content + + def test_incomplete_frontmatter(self): + """Test frontmatter without closing ---.""" + content = """--- +name: test +# No closing delimiter +Body content""" + + yaml_part, _ = _extract_frontmatter(content) + # Without the closing delimiter, it's not valid frontmatter + assert yaml_part is None + + +class TestSkillParser: + """Tests for SKILL.md parsing.""" + + def test_parse_valid_skill(self): + """Test parsing a valid SKILL.md file.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "test-skill" + skill_dir.mkdir() + + skill_md = skill_dir / "SKILL.md" + skill_md.write_text( + """--- +name: test-skill +description: A test skill for validation +author: Test Author +version: 1.0.0 +tags: + - test + - example +--- +# Test Skill + +This is the skill content. +""" + ) + + skill = parse_skill_md(skill_dir) + assert skill.metadata.name == "test-skill" + assert skill.metadata.description == "A test skill for validation" + assert skill.metadata.author == "Test Author" + assert skill.load_level == SkillLoadLevel.METADATA + assert skill.content is None # Not loaded at level 1 + + def test_parse_missing_skill_md(self): + """Test parsing when SKILL.md doesn't exist.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "empty-skill" + skill_dir.mkdir() + + with pytest.raises(FileNotFoundError): + parse_skill_md(skill_dir) + + def test_parse_missing_frontmatter(self): + """Test parsing SKILL.md without frontmatter.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "no-frontmatter" + skill_dir.mkdir() + + skill_md = skill_dir / "SKILL.md" + skill_md.write_text("# Just content, no frontmatter") + + with pytest.raises(SkillParseError, match="frontmatter"): + parse_skill_md(skill_dir) + + def test_parse_invalid_yaml(self): + """Test parsing SKILL.md with invalid YAML.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "bad-yaml" + skill_dir.mkdir() + + skill_md = skill_dir / "SKILL.md" + skill_md.write_text( + """--- +name: [invalid: yaml +description: broken +--- +Content""" + ) + + with pytest.raises(SkillParseError, match="Invalid YAML"): + parse_skill_md(skill_dir) + + def test_parse_missing_required_fields(self): + """Test parsing SKILL.md with missing required fields.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "missing-fields" + skill_dir.mkdir() + + skill_md = skill_dir / "SKILL.md" + skill_md.write_text( + """--- +name: test-skill +--- +Missing description""" + ) + + with pytest.raises(SkillParseError, match="Invalid metadata"): + parse_skill_md(skill_dir) + + def test_parse_nested_metadata(self): + """Test parsing SKILL.md with nested metadata section.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "nested-meta" + skill_dir.mkdir() + + skill_md = skill_dir / "SKILL.md" + skill_md.write_text( + """--- +name: nested-skill +description: Has nested metadata +metadata: + author: Nested Author + version: 2.0.0 +--- +Content""" + ) + + skill = parse_skill_md(skill_dir) + assert skill.metadata.name == "nested-skill" + assert skill.metadata.author == "Nested Author" + assert skill.metadata.version == "2.0.0" + + +class TestSkillLoading: + """Tests for progressive skill loading.""" + + def test_load_skill_content(self): + """Test loading full skill content (Level 2).""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "load-test" + skill_dir.mkdir() + + skill_md = skill_dir / "SKILL.md" + skill_md.write_text( + """--- +name: load-test +description: Test loading +--- +# Full Content + +This should be loaded.""" + ) + + skill = parse_skill_md(skill_dir) + assert skill.content is None + assert skill.load_level == SkillLoadLevel.METADATA + + loaded = load_skill_content(skill) + assert loaded.content is not None + assert "Full Content" in loaded.content + assert loaded.load_level == SkillLoadLevel.FULL_CONTENT + + def test_load_skill_content_idempotent(self): + """Test that loading content twice doesn't change anything.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "idempotent" + skill_dir.mkdir() + + skill_md = skill_dir / "SKILL.md" + skill_md.write_text( + """--- +name: idempotent +description: Test +--- +Content""" + ) + + skill = parse_skill_md(skill_dir) + loaded1 = load_skill_content(skill) + loaded2 = load_skill_content(loaded1) + assert loaded1.content == loaded2.content + + def test_load_skill_file(self): + """Test loading additional files (Level 3).""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "extra-files" + skill_dir.mkdir() + + skill_md = skill_dir / "SKILL.md" + skill_md.write_text( + """--- +name: extra-files +description: Has extra files +--- +Content""" + ) + + reference_md = skill_dir / "reference.md" + reference_md.write_text("# Reference\nExtra content here.") + + skill = parse_skill_md(skill_dir) + content = load_skill_file(skill, "reference.md") + assert "Reference" in content + assert "reference.md" in skill.additional_files + + def test_load_skill_file_path_traversal(self): + """Test that path traversal is prevented.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "traversal-test" + skill_dir.mkdir() + + skill_md = skill_dir / "SKILL.md" + skill_md.write_text( + """--- +name: traversal-test +description: Test +--- +Content""" + ) + + skill = parse_skill_md(skill_dir) + + with pytest.raises(ValueError, match="Invalid filename"): + load_skill_file(skill, "../../../etc/passwd") + + with pytest.raises(ValueError, match="Invalid filename"): + load_skill_file(skill, "/etc/passwd") + + def test_list_additional_files(self): + """Test listing additional files in skill directory.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "list-files" + skill_dir.mkdir() + + skill_md = skill_dir / "SKILL.md" + skill_md.write_text( + """--- +name: list-files +description: Test +--- +Content""" + ) + + (skill_dir / "file1.md").write_text("File 1") + (skill_dir / "file2.txt").write_text("File 2") + + skill = parse_skill_md(skill_dir) + files = skill.list_additional_files() + + assert "file1.md" in files + assert "file2.txt" in files + assert "SKILL.md" not in files # Excluded + + +class TestSkillDiscovery: + """Tests for skill discovery.""" + + def test_discover_skills(self): + """Test discovering skills in directories.""" + with TemporaryDirectory() as tmp_dir: + skills_dir = Path(tmp_dir) / "skills" + skills_dir.mkdir() + + # Create two valid skills + for name in ["skill-a", "skill-b"]: + skill_dir = skills_dir / name + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + f"""--- +name: {name} +description: Skill {name} +--- +Content for {name}""" + ) + + # Create a directory without SKILL.md (should be ignored) + (skills_dir / "not-a-skill").mkdir() + + skills = discover_skills([skills_dir]) + names = [s.metadata.name for s in skills] + + assert len(skills) == 2 + assert "skill-a" in names + assert "skill-b" in names + + def test_discover_skills_duplicates(self): + """Test that duplicate skill names are handled (first wins).""" + with TemporaryDirectory() as tmp_dir: + dir1 = Path(tmp_dir) / "dir1" + dir2 = Path(tmp_dir) / "dir2" + dir1.mkdir() + dir2.mkdir() + + for d, desc in [(dir1, "First"), (dir2, "Second")]: + skill_dir = d / "duplicate-skill" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + f"""--- +name: duplicate-skill +description: {desc} skill +--- +Content""" + ) + + skills = discover_skills([dir1, dir2]) + assert len(skills) == 1 + assert skills[0].metadata.description == "First skill" + + def test_discover_skills_nonexistent_directory(self): + """Test that nonexistent directories are handled gracefully.""" + skills = discover_skills([Path("/nonexistent/path")]) + assert skills == [] + + +class TestSkillComponent: + """Tests for the SkillComponent.""" + + def test_component_initialization(self): + """Test component initializes correctly.""" + with TemporaryDirectory() as tmp_dir: + config = SkillConfiguration( + skill_directories=[Path(tmp_dir)], + max_loaded_skills=3, + ) + component = SkillComponent(config) + assert component._available_skills == {} + assert component._loaded_skills == {} + + def test_list_skills_command(self): + """Test the list_skills command.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "test-skill" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + """--- +name: test-skill +description: A test skill +author: Test Author +--- +Content""" + ) + + config = SkillConfiguration(skill_directories=[Path(tmp_dir)]) + component = SkillComponent(config) + + result = component.list_skills() + assert "test-skill" in result + assert "A test skill" in result + assert "Test Author" in result + + def test_load_skill_command(self): + """Test the load_skill command.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "loadable" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + """--- +name: loadable +description: A loadable skill +--- +# Instructions + +Use this skill for testing.""" + ) + + config = SkillConfiguration(skill_directories=[Path(tmp_dir)]) + component = SkillComponent(config) + + result = component.load_skill("loadable") + assert "loaded successfully" in result + assert "loadable" in component._loaded_skills + + def test_load_skill_max_limit(self): + """Test that max_loaded_skills limit is enforced.""" + with TemporaryDirectory() as tmp_dir: + for i in range(3): + skill_dir = Path(tmp_dir) / f"skill-{i}" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + f"""--- +name: skill-{i} +description: Skill {i} +--- +Content""" + ) + + config = SkillConfiguration( + skill_directories=[Path(tmp_dir)], + max_loaded_skills=2, + ) + component = SkillComponent(config) + + component.load_skill("skill-0") + component.load_skill("skill-1") + result = component.load_skill("skill-2") + + assert "Cannot load skill" in result + assert "maximum" in result + assert len(component._loaded_skills) == 2 + + def test_unload_skill_command(self): + """Test the unload_skill command.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "unloadable" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + """--- +name: unloadable +description: Test +--- +Content""" + ) + + config = SkillConfiguration(skill_directories=[Path(tmp_dir)]) + component = SkillComponent(config) + + component.load_skill("unloadable") + assert "unloadable" in component._loaded_skills + + result = component.unload_skill("unloadable") + assert "unloaded" in result + assert "unloadable" not in component._loaded_skills + + def test_read_skill_file_command(self): + """Test the read_skill_file command.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "with-files" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + """--- +name: with-files +description: Has extra files +--- +Main content""" + ) + (skill_dir / "extra.md").write_text("Extra file content") + + config = SkillConfiguration(skill_directories=[Path(tmp_dir)]) + component = SkillComponent(config) + + # Must load skill first + result = component.read_skill_file("with-files", "extra.md") + assert "must be loaded first" in result + + component.load_skill("with-files") + result = component.read_skill_file("with-files", "extra.md") + assert "Extra file content" in result + + def test_get_messages_includes_catalog(self): + """Test that get_messages includes skill catalog.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "catalog-test" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + """--- +name: catalog-test +description: Test skill +--- +Content""" + ) + + config = SkillConfiguration(skill_directories=[Path(tmp_dir)]) + component = SkillComponent(config) + + messages = list(component.get_messages()) + assert len(messages) >= 1 + assert "Available Skills" in messages[0].content + assert "catalog-test" in messages[0].content + + def test_get_messages_includes_loaded_content(self): + """Test that get_messages includes loaded skill content.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "content-test" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + """--- +name: content-test +description: Test skill +--- +# Skill Instructions + +Use this for testing purposes.""" + ) + + config = SkillConfiguration(skill_directories=[Path(tmp_dir)]) + component = SkillComponent(config) + component.load_skill("content-test") + + messages = list(component.get_messages()) + # First message is catalog, second should be loaded skill content + assert len(messages) >= 2 + full_content = "\n".join(m.content for m in messages) + assert "Skill Instructions" in full_content + + def test_get_commands(self): + """Test that get_commands returns appropriate commands.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "cmd-test" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + """--- +name: cmd-test +description: Test +--- +Content""" + ) + + config = SkillConfiguration(skill_directories=[Path(tmp_dir)]) + component = SkillComponent(config) + + commands = list(component.get_commands()) + cmd_names = [c.names[0] for c in commands] + + assert "list_skills" in cmd_names + assert "load_skill" in cmd_names + # unload_skill and read_skill_file only when skills are loaded + assert "unload_skill" not in cmd_names + + component.load_skill("cmd-test") + commands = list(component.get_commands()) + cmd_names = [c.names[0] for c in commands] + assert "unload_skill" in cmd_names + assert "read_skill_file" in cmd_names + + def test_get_resources(self): + """Test that get_resources returns skill resource info.""" + with TemporaryDirectory() as tmp_dir: + skill_dir = Path(tmp_dir) / "resource-test" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + """--- +name: resource-test +description: Test +--- +Content""" + ) + + config = SkillConfiguration(skill_directories=[Path(tmp_dir)]) + component = SkillComponent(config) + + resources = list(component.get_resources()) + assert len(resources) >= 1 + assert any("skill" in r.lower() for r in resources) + + def test_refresh_skills(self): + """Test the refresh_skills method.""" + with TemporaryDirectory() as tmp_dir: + config = SkillConfiguration(skill_directories=[Path(tmp_dir)]) + component = SkillComponent(config) + + # Initially empty + assert len(component._available_skills) == 0 + + # Add a skill + skill_dir = Path(tmp_dir) / "new-skill" + skill_dir.mkdir() + (skill_dir / "SKILL.md").write_text( + """--- +name: new-skill +description: New +--- +Content""" + ) + + # Refresh + component.refresh_skills() + assert len(component._available_skills) == 1 + assert "new-skill" in component._available_skills diff --git a/classic/original_autogpt/autogpt/agents/agent.py b/classic/original_autogpt/autogpt/agents/agent.py index 73b7f07e69..03f944501b 100644 --- a/classic/original_autogpt/autogpt/agents/agent.py +++ b/classic/original_autogpt/autogpt/agents/agent.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio import inspect import logging +from pathlib import Path from typing import TYPE_CHECKING, Any, ClassVar, Optional import sentry_sdk @@ -37,6 +38,7 @@ from forge.components.http_client import HTTPClientComponent from forge.components.image_gen import ImageGeneratorComponent from forge.components.math_utils import MathUtilsComponent from forge.components.platform_blocks import PlatformBlocksComponent +from forge.components.skills import SkillComponent, SkillConfiguration from forge.components.system import SystemComponent from forge.components.text_utils import TextUtilsComponent from forge.components.todo import TodoComponent @@ -215,6 +217,16 @@ class Agent(BaseAgent[AnyActionProposal], Configurable[AgentSettings]): # Platform blocks (enabled only if PLATFORM_API_KEY is set) self.platform_blocks = PlatformBlocksComponent() + # Skills (SKILL.md support) + self.skills = SkillComponent( + SkillConfiguration( + skill_directories=[ + app_config.workspace / ".autogpt/skills", + Path.home() / ".autogpt/skills", + ] + ) + ) + self.event_history = settings.history def _create_root_execution_context(