mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-11 23:35:25 -05:00
feat(classic): add Agent Skills (SKILL.md) support
Implement the open Agent Skills standard for Classic AutoGPT, enabling modular, progressively-loaded capabilities via SKILL.md files. Skills are discovered from workspace (.autogpt/skills) and global (~/.autogpt/skills) directories with three-level progressive disclosure to minimize token usage. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
15
classic/forge/forge/components/skills/__init__.py
Normal file
15
classic/forge/forge/components/skills/__init__.py
Normal file
@@ -0,0 +1,15 @@
|
||||
"""Agent Skills (SKILL.md) support for Classic AutoGPT."""
|
||||
|
||||
from .skill_component import SkillComponent
|
||||
from .skill_model import Skill, SkillConfiguration, SkillLoadLevel, SkillMetadata
|
||||
from .skill_parser import SkillParseError, discover_skills
|
||||
|
||||
__all__ = [
|
||||
"SkillComponent",
|
||||
"SkillConfiguration",
|
||||
"Skill",
|
||||
"SkillLoadLevel",
|
||||
"SkillMetadata",
|
||||
"SkillParseError",
|
||||
"discover_skills",
|
||||
]
|
||||
322
classic/forge/forge/components/skills/skill_component.py
Normal file
322
classic/forge/forge/components/skills/skill_component.py
Normal file
@@ -0,0 +1,322 @@
|
||||
"""
|
||||
SkillComponent - Provides Agent Skills (SKILL.md) support for Classic AutoGPT.
|
||||
|
||||
This component implements the open Agent Skills standard, enabling modular,
|
||||
progressively-loaded capabilities via markdown-based skill files.
|
||||
|
||||
See: https://platform.claude.com/docs/en/agents-and-tools/agent-skills/overview
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Iterator, Optional
|
||||
|
||||
from forge.agent.components import ConfigurableComponent
|
||||
from forge.agent.protocols import CommandProvider, DirectiveProvider, MessageProvider
|
||||
from forge.command import Command, command
|
||||
from forge.llm.providers import ChatMessage
|
||||
from forge.models.json_schema import JSONSchema
|
||||
|
||||
from .skill_model import Skill, SkillConfiguration, SkillLoadLevel
|
||||
from .skill_parser import (
|
||||
SkillParseError,
|
||||
discover_skills,
|
||||
load_skill_content,
|
||||
load_skill_file,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SkillComponent(
|
||||
DirectiveProvider,
|
||||
MessageProvider,
|
||||
CommandProvider,
|
||||
ConfigurableComponent[SkillConfiguration],
|
||||
):
|
||||
"""
|
||||
Component that provides Agent Skills support.
|
||||
|
||||
Skills are modular capabilities defined by SKILL.md files. They use
|
||||
progressive disclosure to minimize token usage:
|
||||
- Level 1: Metadata always loaded (~100 tokens/skill)
|
||||
- Level 2: Full SKILL.md loaded when triggered (~500-5000 tokens)
|
||||
- Level 3: Additional files loaded on demand
|
||||
"""
|
||||
|
||||
config_class = SkillConfiguration
|
||||
|
||||
def __init__(self, config: Optional[SkillConfiguration] = None):
|
||||
ConfigurableComponent.__init__(self, config)
|
||||
# All discovered skills (Level 1 - metadata only)
|
||||
self._available_skills: dict[str, Skill] = {}
|
||||
# Skills with full content loaded (Level 2+)
|
||||
self._loaded_skills: dict[str, Skill] = {}
|
||||
# Discover skills on initialization
|
||||
self._discover_skills()
|
||||
|
||||
def _discover_skills(self) -> None:
|
||||
"""Discover all skills in configured directories."""
|
||||
skills = discover_skills(self.config.skill_directories)
|
||||
self._available_skills = {skill.metadata.name: skill for skill in skills}
|
||||
logger.info(
|
||||
f"Discovered {len(self._available_skills)} skills: "
|
||||
f"{list(self._available_skills.keys())}"
|
||||
)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# DirectiveProvider methods
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
def get_resources(self) -> Iterator[str]:
|
||||
if self._available_skills:
|
||||
yield (
|
||||
"You have access to modular skills that provide specialized "
|
||||
"capabilities. Use `list_skills` to see available skills, "
|
||||
"and `load_skill` to activate one when needed."
|
||||
)
|
||||
|
||||
def get_best_practices(self) -> Iterator[str]:
|
||||
if self._available_skills:
|
||||
yield (
|
||||
"Only load skills when you actually need their capabilities. "
|
||||
"Unload skills when you're done to conserve context space."
|
||||
)
|
||||
yield (
|
||||
"Before implementing complex functionality, check if a skill "
|
||||
"already provides the capability you need."
|
||||
)
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# MessageProvider methods
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
def get_messages(self) -> Iterator[ChatMessage]:
|
||||
# Always provide skill catalog if skills are available
|
||||
if self._available_skills:
|
||||
catalog_lines = ["## Available Skills"]
|
||||
for name, skill in self._available_skills.items():
|
||||
loaded_marker = " [LOADED]" if name in self._loaded_skills else ""
|
||||
catalog_lines.append(
|
||||
f"- **{name}**{loaded_marker}: {skill.metadata.description}"
|
||||
)
|
||||
yield ChatMessage.system("\n".join(catalog_lines))
|
||||
|
||||
# Provide loaded skill content
|
||||
for name, skill in self._loaded_skills.items():
|
||||
if skill.load_level >= SkillLoadLevel.FULL_CONTENT and skill.content:
|
||||
skill_content = [f"## Skill: {name}"]
|
||||
skill_content.append("")
|
||||
skill_content.append(skill.content)
|
||||
|
||||
# Show available additional files
|
||||
additional_files = skill.list_additional_files()
|
||||
if additional_files:
|
||||
skill_content.append("")
|
||||
skill_content.append("### Additional Files Available")
|
||||
for f in additional_files:
|
||||
loaded = " [loaded]" if f in skill.additional_files else ""
|
||||
skill_content.append(f"- `{f}`{loaded}")
|
||||
|
||||
yield ChatMessage.system("\n".join(skill_content))
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# CommandProvider methods
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
def get_commands(self) -> Iterator[Command]:
|
||||
if self._available_skills:
|
||||
yield self.list_skills
|
||||
yield self.load_skill
|
||||
if self._loaded_skills:
|
||||
yield self.unload_skill
|
||||
yield self.read_skill_file
|
||||
|
||||
@command(
|
||||
names=["list_skills"],
|
||||
description="List all available skills with their descriptions",
|
||||
parameters={},
|
||||
)
|
||||
def list_skills(self) -> str:
|
||||
"""List all available skills with their metadata.
|
||||
|
||||
Returns:
|
||||
str: Formatted list of available skills
|
||||
"""
|
||||
if not self._available_skills:
|
||||
return "No skills available. Skills can be added to .autogpt/skills/"
|
||||
|
||||
lines = ["Available Skills:", ""]
|
||||
for name, skill in self._available_skills.items():
|
||||
loaded = " [LOADED]" if name in self._loaded_skills else ""
|
||||
lines.append(f"**{name}**{loaded}")
|
||||
lines.append(f" Description: {skill.metadata.description}")
|
||||
if skill.metadata.author:
|
||||
lines.append(f" Author: {skill.metadata.author}")
|
||||
if skill.metadata.version:
|
||||
lines.append(f" Version: {skill.metadata.version}")
|
||||
if skill.metadata.tags:
|
||||
lines.append(f" Tags: {', '.join(skill.metadata.tags)}")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
@command(
|
||||
names=["load_skill"],
|
||||
description="Load a skill's full content to use its capabilities",
|
||||
parameters={
|
||||
"skill_name": JSONSchema(
|
||||
type=JSONSchema.Type.STRING,
|
||||
description="The name of the skill to load",
|
||||
required=True,
|
||||
)
|
||||
},
|
||||
)
|
||||
def load_skill(self, skill_name: str) -> str:
|
||||
"""Load a skill's full content (Level 2).
|
||||
|
||||
Args:
|
||||
skill_name: The name of the skill to load
|
||||
|
||||
Returns:
|
||||
str: Status message indicating success or failure
|
||||
"""
|
||||
if skill_name not in self._available_skills:
|
||||
available = ", ".join(self._available_skills.keys())
|
||||
return f"Skill '{skill_name}' not found. Available skills: {available}"
|
||||
|
||||
if skill_name in self._loaded_skills:
|
||||
return f"Skill '{skill_name}' is already loaded."
|
||||
|
||||
# Check if we've hit the max loaded skills limit
|
||||
if len(self._loaded_skills) >= self.config.max_loaded_skills:
|
||||
loaded = ", ".join(self._loaded_skills.keys())
|
||||
return (
|
||||
f"Cannot load skill '{skill_name}': maximum of "
|
||||
f"{self.config.max_loaded_skills} skills already loaded ({loaded}). "
|
||||
f"Unload a skill first using `unload_skill`."
|
||||
)
|
||||
|
||||
skill = self._available_skills[skill_name]
|
||||
try:
|
||||
loaded_skill = load_skill_content(skill)
|
||||
self._loaded_skills[skill_name] = loaded_skill
|
||||
self._available_skills[skill_name] = loaded_skill
|
||||
|
||||
return (
|
||||
f"Skill '{skill_name}' loaded successfully. "
|
||||
f"Its instructions are now available in the context."
|
||||
)
|
||||
except SkillParseError as e:
|
||||
logger.error(f"Failed to load skill '{skill_name}': {e}")
|
||||
return f"Failed to load skill '{skill_name}': {e}"
|
||||
|
||||
@command(
|
||||
names=["unload_skill"],
|
||||
description="Unload a skill to free up context space",
|
||||
parameters={
|
||||
"skill_name": JSONSchema(
|
||||
type=JSONSchema.Type.STRING,
|
||||
description="The name of the skill to unload",
|
||||
required=True,
|
||||
)
|
||||
},
|
||||
)
|
||||
def unload_skill(self, skill_name: str) -> str:
|
||||
"""Unload a skill to free context space.
|
||||
|
||||
Args:
|
||||
skill_name: The name of the skill to unload
|
||||
|
||||
Returns:
|
||||
str: Status message indicating success or failure
|
||||
"""
|
||||
if skill_name not in self._loaded_skills:
|
||||
if skill_name in self._available_skills:
|
||||
return f"Skill '{skill_name}' is not currently loaded."
|
||||
return f"Skill '{skill_name}' not found."
|
||||
|
||||
del self._loaded_skills[skill_name]
|
||||
|
||||
# Reset the skill to metadata-only state
|
||||
skill = self._available_skills[skill_name]
|
||||
skill.content = None
|
||||
skill.additional_files.clear()
|
||||
skill.load_level = SkillLoadLevel.METADATA
|
||||
|
||||
return f"Skill '{skill_name}' has been unloaded."
|
||||
|
||||
@command(
|
||||
names=["read_skill_file"],
|
||||
description="Read an additional file from a loaded skill",
|
||||
parameters={
|
||||
"skill_name": JSONSchema(
|
||||
type=JSONSchema.Type.STRING,
|
||||
description="The name of the skill",
|
||||
required=True,
|
||||
),
|
||||
"filename": JSONSchema(
|
||||
type=JSONSchema.Type.STRING,
|
||||
description="The name of the file to read",
|
||||
required=True,
|
||||
),
|
||||
},
|
||||
)
|
||||
def read_skill_file(self, skill_name: str, filename: str) -> str:
|
||||
"""Read an additional file from a skill (Level 3).
|
||||
|
||||
Args:
|
||||
skill_name: The name of the skill
|
||||
filename: The name of the file to read
|
||||
|
||||
Returns:
|
||||
str: The content of the file or an error message
|
||||
"""
|
||||
if skill_name not in self._loaded_skills:
|
||||
if skill_name in self._available_skills:
|
||||
return (
|
||||
f"Skill '{skill_name}' must be loaded first. "
|
||||
f'Use `load_skill("{skill_name}")` to load it.'
|
||||
)
|
||||
return f"Skill '{skill_name}' not found."
|
||||
|
||||
skill = self._loaded_skills[skill_name]
|
||||
|
||||
# Check if already loaded
|
||||
if filename in skill.additional_files:
|
||||
return skill.additional_files[filename]
|
||||
|
||||
# List available files for better error messages
|
||||
available_files = skill.list_additional_files()
|
||||
if filename not in available_files:
|
||||
if available_files:
|
||||
return (
|
||||
f"File '{filename}' not found in skill '{skill_name}'. "
|
||||
f"Available files: {', '.join(available_files)}"
|
||||
)
|
||||
return f"Skill '{skill_name}' has no additional files."
|
||||
|
||||
try:
|
||||
content = load_skill_file(skill, filename)
|
||||
return content
|
||||
except (FileNotFoundError, ValueError, SkillParseError) as e:
|
||||
logger.error(f"Failed to read file '{filename}' from skill: {e}")
|
||||
return f"Failed to read file '{filename}': {e}"
|
||||
|
||||
# -------------------------------------------------------------------------
|
||||
# Public API
|
||||
# -------------------------------------------------------------------------
|
||||
|
||||
def refresh_skills(self) -> None:
|
||||
"""Re-discover skills from configured directories."""
|
||||
self._loaded_skills.clear()
|
||||
self._discover_skills()
|
||||
|
||||
@property
|
||||
def available_skills(self) -> dict[str, Skill]:
|
||||
"""Get all discovered skills."""
|
||||
return self._available_skills.copy()
|
||||
|
||||
@property
|
||||
def loaded_skills(self) -> dict[str, Skill]:
|
||||
"""Get currently loaded skills."""
|
||||
return self._loaded_skills.copy()
|
||||
124
classic/forge/forge/components/skills/skill_model.py
Normal file
124
classic/forge/forge/components/skills/skill_model.py
Normal file
@@ -0,0 +1,124 @@
|
||||
"""Data models for Agent Skills (SKILL.md) support."""
|
||||
|
||||
import re
|
||||
from enum import IntEnum
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field, field_validator
|
||||
|
||||
|
||||
class SkillLoadLevel(IntEnum):
|
||||
"""Progressive disclosure levels for skill loading."""
|
||||
|
||||
METADATA = 1 # Always loaded (~100 tokens/skill)
|
||||
FULL_CONTENT = 2 # Loaded when triggered (~500-5000 tokens)
|
||||
ADDITIONAL = 3 # On-demand files
|
||||
|
||||
|
||||
class SkillMetadata(BaseModel):
|
||||
"""Metadata parsed from SKILL.md frontmatter."""
|
||||
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
name: str = Field(
|
||||
...,
|
||||
description="Skill name (lowercase, alphanumeric, hyphens, max 64 chars)",
|
||||
max_length=64,
|
||||
)
|
||||
description: str = Field(
|
||||
...,
|
||||
description="Skill description (max 1024 chars)",
|
||||
max_length=1024,
|
||||
)
|
||||
license: Optional[str] = Field(
|
||||
default=None,
|
||||
description="License for the skill",
|
||||
)
|
||||
allowed_tools: Optional[list[str]] = Field(
|
||||
default=None,
|
||||
alias="allowed-tools",
|
||||
description="List of tools this skill is allowed to use",
|
||||
)
|
||||
author: Optional[str] = Field(
|
||||
default=None,
|
||||
description="Skill author",
|
||||
)
|
||||
version: Optional[str] = Field(
|
||||
default=None,
|
||||
description="Skill version",
|
||||
)
|
||||
tags: Optional[list[str]] = Field(
|
||||
default=None,
|
||||
description="Tags for categorizing the skill",
|
||||
)
|
||||
|
||||
@field_validator("name")
|
||||
@classmethod
|
||||
def validate_name(cls, v: str) -> str:
|
||||
"""Validate skill name follows spec: lowercase, alphanumeric, hyphens."""
|
||||
if not re.match(r"^[a-z0-9-]+$", v):
|
||||
raise ValueError(
|
||||
"Skill name must be lowercase, alphanumeric, "
|
||||
"and may contain hyphens only"
|
||||
)
|
||||
return v
|
||||
|
||||
|
||||
class Skill(BaseModel):
|
||||
"""Represents a loaded skill with its metadata and content."""
|
||||
|
||||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
path: Path = Field(
|
||||
...,
|
||||
description="Path to the skill directory",
|
||||
)
|
||||
metadata: SkillMetadata = Field(
|
||||
...,
|
||||
description="Parsed skill metadata from frontmatter",
|
||||
)
|
||||
content: Optional[str] = Field(
|
||||
default=None,
|
||||
description="Full SKILL.md content (body, excluding frontmatter)",
|
||||
)
|
||||
additional_files: dict[str, str] = Field(
|
||||
default_factory=dict,
|
||||
description="Additional files loaded on demand",
|
||||
)
|
||||
load_level: SkillLoadLevel = Field(
|
||||
default=SkillLoadLevel.METADATA,
|
||||
description="Current load level of the skill",
|
||||
)
|
||||
|
||||
@property
|
||||
def skill_md_path(self) -> Path:
|
||||
"""Path to the SKILL.md file."""
|
||||
return self.path / "SKILL.md"
|
||||
|
||||
def list_additional_files(self) -> list[str]:
|
||||
"""List available additional files in the skill directory."""
|
||||
files = []
|
||||
if self.path.exists():
|
||||
for item in self.path.iterdir():
|
||||
if item.is_file() and item.name != "SKILL.md":
|
||||
files.append(item.name)
|
||||
return files
|
||||
|
||||
|
||||
class SkillConfiguration(BaseModel):
|
||||
"""Configuration for the SkillComponent."""
|
||||
|
||||
skill_directories: list[Path] = Field(
|
||||
default_factory=lambda: [
|
||||
Path(".autogpt/skills"),
|
||||
Path.home() / ".autogpt/skills",
|
||||
],
|
||||
description="Directories to search for skills",
|
||||
)
|
||||
max_loaded_skills: int = Field(
|
||||
default=5,
|
||||
description="Maximum number of skills that can be fully loaded at once",
|
||||
ge=1,
|
||||
le=20,
|
||||
)
|
||||
234
classic/forge/forge/components/skills/skill_parser.py
Normal file
234
classic/forge/forge/components/skills/skill_parser.py
Normal file
@@ -0,0 +1,234 @@
|
||||
"""Parsing utilities for SKILL.md files."""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
import yaml
|
||||
from pydantic import ValidationError
|
||||
|
||||
from .skill_model import Skill, SkillLoadLevel, SkillMetadata
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SkillParseError(Exception):
|
||||
"""Raised when a SKILL.md file cannot be parsed."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
def _extract_frontmatter(content: str) -> tuple[Optional[str], str]:
|
||||
"""
|
||||
Extract YAML frontmatter from markdown content.
|
||||
|
||||
Args:
|
||||
content: Full markdown content with optional frontmatter
|
||||
|
||||
Returns:
|
||||
Tuple of (frontmatter_yaml, body_content).
|
||||
frontmatter_yaml is None if no frontmatter found.
|
||||
"""
|
||||
# Match frontmatter: starts with ---, ends with ---
|
||||
pattern = r"^---\s*\n(.*?)\n---\s*\n?(.*)"
|
||||
match = re.match(pattern, content, re.DOTALL)
|
||||
|
||||
if match:
|
||||
return match.group(1), match.group(2)
|
||||
return None, content
|
||||
|
||||
|
||||
def parse_skill_md(skill_path: Path) -> Skill:
|
||||
"""
|
||||
Parse a SKILL.md file and return a Skill with metadata only (Level 1).
|
||||
|
||||
Args:
|
||||
skill_path: Path to the skill directory containing SKILL.md
|
||||
|
||||
Returns:
|
||||
Skill object with metadata loaded
|
||||
|
||||
Raises:
|
||||
SkillParseError: If the SKILL.md file cannot be parsed
|
||||
FileNotFoundError: If SKILL.md doesn't exist
|
||||
"""
|
||||
skill_md_file = skill_path / "SKILL.md"
|
||||
|
||||
if not skill_md_file.exists():
|
||||
raise FileNotFoundError(f"SKILL.md not found at {skill_md_file}")
|
||||
|
||||
try:
|
||||
content = skill_md_file.read_text(encoding="utf-8")
|
||||
except Exception as e:
|
||||
raise SkillParseError(f"Failed to read SKILL.md: {e}") from e
|
||||
|
||||
frontmatter_yaml, body = _extract_frontmatter(content)
|
||||
|
||||
if frontmatter_yaml is None:
|
||||
raise SkillParseError(
|
||||
f"SKILL.md at {skill_path} missing required YAML frontmatter"
|
||||
)
|
||||
|
||||
try:
|
||||
frontmatter_data = yaml.safe_load(frontmatter_yaml)
|
||||
except yaml.YAMLError as e:
|
||||
raise SkillParseError(f"Invalid YAML frontmatter in {skill_path}: {e}") from e
|
||||
|
||||
if not isinstance(frontmatter_data, dict):
|
||||
raise SkillParseError(
|
||||
f"YAML frontmatter in {skill_path} must be a mapping, "
|
||||
f"got {type(frontmatter_data).__name__}"
|
||||
)
|
||||
|
||||
# Handle nested metadata field if present
|
||||
if "metadata" in frontmatter_data:
|
||||
metadata_section = frontmatter_data.pop("metadata")
|
||||
if isinstance(metadata_section, dict):
|
||||
# Merge metadata fields into top level (author, version, tags)
|
||||
for key in ["author", "version", "tags"]:
|
||||
if key in metadata_section and key not in frontmatter_data:
|
||||
frontmatter_data[key] = metadata_section[key]
|
||||
|
||||
try:
|
||||
metadata = SkillMetadata(**frontmatter_data)
|
||||
except ValidationError as e:
|
||||
raise SkillParseError(
|
||||
f"Invalid metadata in SKILL.md at {skill_path}: {e}"
|
||||
) from e
|
||||
|
||||
return Skill(
|
||||
path=skill_path,
|
||||
metadata=metadata,
|
||||
content=None, # Content not loaded at Level 1
|
||||
load_level=SkillLoadLevel.METADATA,
|
||||
)
|
||||
|
||||
|
||||
def load_skill_content(skill: Skill) -> Skill:
|
||||
"""
|
||||
Load the full content of a skill's SKILL.md (Level 2).
|
||||
|
||||
Args:
|
||||
skill: Skill object with metadata loaded
|
||||
|
||||
Returns:
|
||||
Updated Skill object with full content loaded
|
||||
|
||||
Raises:
|
||||
SkillParseError: If content cannot be loaded
|
||||
"""
|
||||
if skill.load_level == SkillLoadLevel.FULL_CONTENT:
|
||||
return skill # Already loaded
|
||||
|
||||
skill_md_file = skill.path / "SKILL.md"
|
||||
|
||||
try:
|
||||
content = skill_md_file.read_text(encoding="utf-8")
|
||||
except Exception as e:
|
||||
raise SkillParseError(f"Failed to read SKILL.md: {e}") from e
|
||||
|
||||
_, body = _extract_frontmatter(content)
|
||||
|
||||
skill.content = body.strip()
|
||||
skill.load_level = SkillLoadLevel.FULL_CONTENT
|
||||
|
||||
return skill
|
||||
|
||||
|
||||
def load_skill_file(skill: Skill, filename: str) -> str:
|
||||
"""
|
||||
Load an additional file from a skill directory (Level 3).
|
||||
|
||||
Args:
|
||||
skill: Skill object
|
||||
filename: Name of the file to load
|
||||
|
||||
Returns:
|
||||
Content of the file
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If file doesn't exist
|
||||
SkillParseError: If file cannot be read
|
||||
ValueError: If filename is invalid (attempts path traversal)
|
||||
"""
|
||||
# Security: prevent path traversal
|
||||
if ".." in filename or filename.startswith("/"):
|
||||
raise ValueError(f"Invalid filename: {filename}")
|
||||
|
||||
file_path = skill.path / filename
|
||||
|
||||
# Verify the resolved path is still within the skill directory
|
||||
try:
|
||||
file_path.resolve().relative_to(skill.path.resolve())
|
||||
except ValueError:
|
||||
raise ValueError(f"Invalid filename: {filename}")
|
||||
|
||||
if not file_path.exists():
|
||||
raise FileNotFoundError(f"File not found: {filename}")
|
||||
|
||||
if not file_path.is_file():
|
||||
raise ValueError(f"Not a file: {filename}")
|
||||
|
||||
try:
|
||||
content = file_path.read_text(encoding="utf-8")
|
||||
except Exception as e:
|
||||
raise SkillParseError(f"Failed to read {filename}: {e}") from e
|
||||
|
||||
# Cache the loaded file
|
||||
skill.additional_files[filename] = content
|
||||
skill.load_level = SkillLoadLevel.ADDITIONAL
|
||||
|
||||
return content
|
||||
|
||||
|
||||
def discover_skills(directories: list[Path]) -> list[Skill]:
|
||||
"""
|
||||
Discover all skills in the given directories.
|
||||
|
||||
Args:
|
||||
directories: List of directories to search for skills
|
||||
|
||||
Returns:
|
||||
List of Skill objects with metadata loaded (Level 1)
|
||||
"""
|
||||
skills: list[Skill] = []
|
||||
seen_names: set[str] = set()
|
||||
|
||||
for directory in directories:
|
||||
if not directory.exists():
|
||||
logger.debug(f"Skill directory does not exist: {directory}")
|
||||
continue
|
||||
|
||||
if not directory.is_dir():
|
||||
logger.warning(f"Skill path is not a directory: {directory}")
|
||||
continue
|
||||
|
||||
for item in directory.iterdir():
|
||||
if not item.is_dir():
|
||||
continue
|
||||
|
||||
skill_md = item / "SKILL.md"
|
||||
if not skill_md.exists():
|
||||
continue
|
||||
|
||||
try:
|
||||
skill = parse_skill_md(item)
|
||||
|
||||
# Skip duplicates (first occurrence wins)
|
||||
if skill.metadata.name in seen_names:
|
||||
logger.warning(
|
||||
f"Duplicate skill name '{skill.metadata.name}' "
|
||||
f"found at {item}, skipping"
|
||||
)
|
||||
continue
|
||||
|
||||
seen_names.add(skill.metadata.name)
|
||||
skills.append(skill)
|
||||
logger.debug(f"Discovered skill: {skill.metadata.name} at {item}")
|
||||
|
||||
except (SkillParseError, FileNotFoundError) as e:
|
||||
logger.warning(f"Failed to parse skill at {item}: {e}")
|
||||
continue
|
||||
|
||||
return skills
|
||||
656
classic/forge/forge/components/skills/test_skills.py
Normal file
656
classic/forge/forge/components/skills/test_skills.py
Normal file
@@ -0,0 +1,656 @@
|
||||
"""Tests for Agent Skills (SKILL.md) support."""
|
||||
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
import pytest
|
||||
|
||||
from .skill_component import SkillComponent
|
||||
from .skill_model import SkillConfiguration, SkillLoadLevel, SkillMetadata
|
||||
from .skill_parser import (
|
||||
SkillParseError,
|
||||
_extract_frontmatter,
|
||||
discover_skills,
|
||||
load_skill_content,
|
||||
load_skill_file,
|
||||
parse_skill_md,
|
||||
)
|
||||
|
||||
|
||||
class TestSkillMetadata:
|
||||
"""Tests for SkillMetadata model validation."""
|
||||
|
||||
def test_valid_metadata(self):
|
||||
"""Test creating valid metadata."""
|
||||
meta = SkillMetadata(
|
||||
name="test-skill",
|
||||
description="A test skill",
|
||||
author="Test Author",
|
||||
version="1.0.0",
|
||||
tags=["test", "example"],
|
||||
)
|
||||
assert meta.name == "test-skill"
|
||||
assert meta.description == "A test skill"
|
||||
|
||||
def test_name_validation_lowercase(self):
|
||||
"""Test that name must be lowercase."""
|
||||
with pytest.raises(ValueError, match="lowercase"):
|
||||
SkillMetadata(name="Test-Skill", description="A test")
|
||||
|
||||
def test_name_validation_no_spaces(self):
|
||||
"""Test that name cannot contain spaces."""
|
||||
with pytest.raises(ValueError, match="lowercase"):
|
||||
SkillMetadata(name="test skill", description="A test")
|
||||
|
||||
def test_name_validation_alphanumeric_hyphens(self):
|
||||
"""Test that name can only have alphanumeric and hyphens."""
|
||||
with pytest.raises(ValueError, match="alphanumeric"):
|
||||
SkillMetadata(name="test_skill", description="A test")
|
||||
|
||||
def test_name_max_length(self):
|
||||
"""Test that name cannot exceed 64 characters."""
|
||||
with pytest.raises(ValueError):
|
||||
SkillMetadata(name="a" * 65, description="A test")
|
||||
|
||||
def test_description_max_length(self):
|
||||
"""Test that description cannot exceed 1024 characters."""
|
||||
with pytest.raises(ValueError):
|
||||
SkillMetadata(name="test", description="a" * 1025)
|
||||
|
||||
def test_allowed_tools_alias(self):
|
||||
"""Test that allowed-tools YAML field maps to allowed_tools."""
|
||||
# Simulate what pydantic does when parsing YAML with the alias field name
|
||||
data = {
|
||||
"name": "test",
|
||||
"description": "test",
|
||||
"allowed-tools": ["tool1", "tool2"],
|
||||
}
|
||||
meta = SkillMetadata.model_validate(data)
|
||||
assert meta.allowed_tools == ["tool1", "tool2"]
|
||||
|
||||
|
||||
class TestFrontmatterExtraction:
|
||||
"""Tests for YAML frontmatter extraction."""
|
||||
|
||||
def test_extract_valid_frontmatter(self):
|
||||
"""Test extracting valid frontmatter."""
|
||||
content = """---
|
||||
name: test-skill
|
||||
description: A test skill
|
||||
---
|
||||
# Skill Content
|
||||
Here is the body."""
|
||||
|
||||
yaml_part, body = _extract_frontmatter(content)
|
||||
assert yaml_part is not None
|
||||
assert "name: test-skill" in yaml_part
|
||||
assert body.strip().startswith("# Skill Content")
|
||||
|
||||
def test_no_frontmatter(self):
|
||||
"""Test content without frontmatter."""
|
||||
content = "# Just markdown\nNo frontmatter here."
|
||||
|
||||
yaml_part, body = _extract_frontmatter(content)
|
||||
assert yaml_part is None
|
||||
assert body == content
|
||||
|
||||
def test_incomplete_frontmatter(self):
|
||||
"""Test frontmatter without closing ---."""
|
||||
content = """---
|
||||
name: test
|
||||
# No closing delimiter
|
||||
Body content"""
|
||||
|
||||
yaml_part, _ = _extract_frontmatter(content)
|
||||
# Without the closing delimiter, it's not valid frontmatter
|
||||
assert yaml_part is None
|
||||
|
||||
|
||||
class TestSkillParser:
|
||||
"""Tests for SKILL.md parsing."""
|
||||
|
||||
def test_parse_valid_skill(self):
|
||||
"""Test parsing a valid SKILL.md file."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "test-skill"
|
||||
skill_dir.mkdir()
|
||||
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
skill_md.write_text(
|
||||
"""---
|
||||
name: test-skill
|
||||
description: A test skill for validation
|
||||
author: Test Author
|
||||
version: 1.0.0
|
||||
tags:
|
||||
- test
|
||||
- example
|
||||
---
|
||||
# Test Skill
|
||||
|
||||
This is the skill content.
|
||||
"""
|
||||
)
|
||||
|
||||
skill = parse_skill_md(skill_dir)
|
||||
assert skill.metadata.name == "test-skill"
|
||||
assert skill.metadata.description == "A test skill for validation"
|
||||
assert skill.metadata.author == "Test Author"
|
||||
assert skill.load_level == SkillLoadLevel.METADATA
|
||||
assert skill.content is None # Not loaded at level 1
|
||||
|
||||
def test_parse_missing_skill_md(self):
|
||||
"""Test parsing when SKILL.md doesn't exist."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "empty-skill"
|
||||
skill_dir.mkdir()
|
||||
|
||||
with pytest.raises(FileNotFoundError):
|
||||
parse_skill_md(skill_dir)
|
||||
|
||||
def test_parse_missing_frontmatter(self):
|
||||
"""Test parsing SKILL.md without frontmatter."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "no-frontmatter"
|
||||
skill_dir.mkdir()
|
||||
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
skill_md.write_text("# Just content, no frontmatter")
|
||||
|
||||
with pytest.raises(SkillParseError, match="frontmatter"):
|
||||
parse_skill_md(skill_dir)
|
||||
|
||||
def test_parse_invalid_yaml(self):
|
||||
"""Test parsing SKILL.md with invalid YAML."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "bad-yaml"
|
||||
skill_dir.mkdir()
|
||||
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
skill_md.write_text(
|
||||
"""---
|
||||
name: [invalid: yaml
|
||||
description: broken
|
||||
---
|
||||
Content"""
|
||||
)
|
||||
|
||||
with pytest.raises(SkillParseError, match="Invalid YAML"):
|
||||
parse_skill_md(skill_dir)
|
||||
|
||||
def test_parse_missing_required_fields(self):
|
||||
"""Test parsing SKILL.md with missing required fields."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "missing-fields"
|
||||
skill_dir.mkdir()
|
||||
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
skill_md.write_text(
|
||||
"""---
|
||||
name: test-skill
|
||||
---
|
||||
Missing description"""
|
||||
)
|
||||
|
||||
with pytest.raises(SkillParseError, match="Invalid metadata"):
|
||||
parse_skill_md(skill_dir)
|
||||
|
||||
def test_parse_nested_metadata(self):
|
||||
"""Test parsing SKILL.md with nested metadata section."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "nested-meta"
|
||||
skill_dir.mkdir()
|
||||
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
skill_md.write_text(
|
||||
"""---
|
||||
name: nested-skill
|
||||
description: Has nested metadata
|
||||
metadata:
|
||||
author: Nested Author
|
||||
version: 2.0.0
|
||||
---
|
||||
Content"""
|
||||
)
|
||||
|
||||
skill = parse_skill_md(skill_dir)
|
||||
assert skill.metadata.name == "nested-skill"
|
||||
assert skill.metadata.author == "Nested Author"
|
||||
assert skill.metadata.version == "2.0.0"
|
||||
|
||||
|
||||
class TestSkillLoading:
|
||||
"""Tests for progressive skill loading."""
|
||||
|
||||
def test_load_skill_content(self):
|
||||
"""Test loading full skill content (Level 2)."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "load-test"
|
||||
skill_dir.mkdir()
|
||||
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
skill_md.write_text(
|
||||
"""---
|
||||
name: load-test
|
||||
description: Test loading
|
||||
---
|
||||
# Full Content
|
||||
|
||||
This should be loaded."""
|
||||
)
|
||||
|
||||
skill = parse_skill_md(skill_dir)
|
||||
assert skill.content is None
|
||||
assert skill.load_level == SkillLoadLevel.METADATA
|
||||
|
||||
loaded = load_skill_content(skill)
|
||||
assert loaded.content is not None
|
||||
assert "Full Content" in loaded.content
|
||||
assert loaded.load_level == SkillLoadLevel.FULL_CONTENT
|
||||
|
||||
def test_load_skill_content_idempotent(self):
|
||||
"""Test that loading content twice doesn't change anything."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "idempotent"
|
||||
skill_dir.mkdir()
|
||||
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
skill_md.write_text(
|
||||
"""---
|
||||
name: idempotent
|
||||
description: Test
|
||||
---
|
||||
Content"""
|
||||
)
|
||||
|
||||
skill = parse_skill_md(skill_dir)
|
||||
loaded1 = load_skill_content(skill)
|
||||
loaded2 = load_skill_content(loaded1)
|
||||
assert loaded1.content == loaded2.content
|
||||
|
||||
def test_load_skill_file(self):
|
||||
"""Test loading additional files (Level 3)."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "extra-files"
|
||||
skill_dir.mkdir()
|
||||
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
skill_md.write_text(
|
||||
"""---
|
||||
name: extra-files
|
||||
description: Has extra files
|
||||
---
|
||||
Content"""
|
||||
)
|
||||
|
||||
reference_md = skill_dir / "reference.md"
|
||||
reference_md.write_text("# Reference\nExtra content here.")
|
||||
|
||||
skill = parse_skill_md(skill_dir)
|
||||
content = load_skill_file(skill, "reference.md")
|
||||
assert "Reference" in content
|
||||
assert "reference.md" in skill.additional_files
|
||||
|
||||
def test_load_skill_file_path_traversal(self):
|
||||
"""Test that path traversal is prevented."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "traversal-test"
|
||||
skill_dir.mkdir()
|
||||
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
skill_md.write_text(
|
||||
"""---
|
||||
name: traversal-test
|
||||
description: Test
|
||||
---
|
||||
Content"""
|
||||
)
|
||||
|
||||
skill = parse_skill_md(skill_dir)
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid filename"):
|
||||
load_skill_file(skill, "../../../etc/passwd")
|
||||
|
||||
with pytest.raises(ValueError, match="Invalid filename"):
|
||||
load_skill_file(skill, "/etc/passwd")
|
||||
|
||||
def test_list_additional_files(self):
|
||||
"""Test listing additional files in skill directory."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "list-files"
|
||||
skill_dir.mkdir()
|
||||
|
||||
skill_md = skill_dir / "SKILL.md"
|
||||
skill_md.write_text(
|
||||
"""---
|
||||
name: list-files
|
||||
description: Test
|
||||
---
|
||||
Content"""
|
||||
)
|
||||
|
||||
(skill_dir / "file1.md").write_text("File 1")
|
||||
(skill_dir / "file2.txt").write_text("File 2")
|
||||
|
||||
skill = parse_skill_md(skill_dir)
|
||||
files = skill.list_additional_files()
|
||||
|
||||
assert "file1.md" in files
|
||||
assert "file2.txt" in files
|
||||
assert "SKILL.md" not in files # Excluded
|
||||
|
||||
|
||||
class TestSkillDiscovery:
|
||||
"""Tests for skill discovery."""
|
||||
|
||||
def test_discover_skills(self):
|
||||
"""Test discovering skills in directories."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skills_dir = Path(tmp_dir) / "skills"
|
||||
skills_dir.mkdir()
|
||||
|
||||
# Create two valid skills
|
||||
for name in ["skill-a", "skill-b"]:
|
||||
skill_dir = skills_dir / name
|
||||
skill_dir.mkdir()
|
||||
(skill_dir / "SKILL.md").write_text(
|
||||
f"""---
|
||||
name: {name}
|
||||
description: Skill {name}
|
||||
---
|
||||
Content for {name}"""
|
||||
)
|
||||
|
||||
# Create a directory without SKILL.md (should be ignored)
|
||||
(skills_dir / "not-a-skill").mkdir()
|
||||
|
||||
skills = discover_skills([skills_dir])
|
||||
names = [s.metadata.name for s in skills]
|
||||
|
||||
assert len(skills) == 2
|
||||
assert "skill-a" in names
|
||||
assert "skill-b" in names
|
||||
|
||||
def test_discover_skills_duplicates(self):
|
||||
"""Test that duplicate skill names are handled (first wins)."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
dir1 = Path(tmp_dir) / "dir1"
|
||||
dir2 = Path(tmp_dir) / "dir2"
|
||||
dir1.mkdir()
|
||||
dir2.mkdir()
|
||||
|
||||
for d, desc in [(dir1, "First"), (dir2, "Second")]:
|
||||
skill_dir = d / "duplicate-skill"
|
||||
skill_dir.mkdir()
|
||||
(skill_dir / "SKILL.md").write_text(
|
||||
f"""---
|
||||
name: duplicate-skill
|
||||
description: {desc} skill
|
||||
---
|
||||
Content"""
|
||||
)
|
||||
|
||||
skills = discover_skills([dir1, dir2])
|
||||
assert len(skills) == 1
|
||||
assert skills[0].metadata.description == "First skill"
|
||||
|
||||
def test_discover_skills_nonexistent_directory(self):
|
||||
"""Test that nonexistent directories are handled gracefully."""
|
||||
skills = discover_skills([Path("/nonexistent/path")])
|
||||
assert skills == []
|
||||
|
||||
|
||||
class TestSkillComponent:
|
||||
"""Tests for the SkillComponent."""
|
||||
|
||||
def test_component_initialization(self):
|
||||
"""Test component initializes correctly."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
config = SkillConfiguration(
|
||||
skill_directories=[Path(tmp_dir)],
|
||||
max_loaded_skills=3,
|
||||
)
|
||||
component = SkillComponent(config)
|
||||
assert component._available_skills == {}
|
||||
assert component._loaded_skills == {}
|
||||
|
||||
def test_list_skills_command(self):
|
||||
"""Test the list_skills command."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "test-skill"
|
||||
skill_dir.mkdir()
|
||||
(skill_dir / "SKILL.md").write_text(
|
||||
"""---
|
||||
name: test-skill
|
||||
description: A test skill
|
||||
author: Test Author
|
||||
---
|
||||
Content"""
|
||||
)
|
||||
|
||||
config = SkillConfiguration(skill_directories=[Path(tmp_dir)])
|
||||
component = SkillComponent(config)
|
||||
|
||||
result = component.list_skills()
|
||||
assert "test-skill" in result
|
||||
assert "A test skill" in result
|
||||
assert "Test Author" in result
|
||||
|
||||
def test_load_skill_command(self):
|
||||
"""Test the load_skill command."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "loadable"
|
||||
skill_dir.mkdir()
|
||||
(skill_dir / "SKILL.md").write_text(
|
||||
"""---
|
||||
name: loadable
|
||||
description: A loadable skill
|
||||
---
|
||||
# Instructions
|
||||
|
||||
Use this skill for testing."""
|
||||
)
|
||||
|
||||
config = SkillConfiguration(skill_directories=[Path(tmp_dir)])
|
||||
component = SkillComponent(config)
|
||||
|
||||
result = component.load_skill("loadable")
|
||||
assert "loaded successfully" in result
|
||||
assert "loadable" in component._loaded_skills
|
||||
|
||||
def test_load_skill_max_limit(self):
|
||||
"""Test that max_loaded_skills limit is enforced."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
for i in range(3):
|
||||
skill_dir = Path(tmp_dir) / f"skill-{i}"
|
||||
skill_dir.mkdir()
|
||||
(skill_dir / "SKILL.md").write_text(
|
||||
f"""---
|
||||
name: skill-{i}
|
||||
description: Skill {i}
|
||||
---
|
||||
Content"""
|
||||
)
|
||||
|
||||
config = SkillConfiguration(
|
||||
skill_directories=[Path(tmp_dir)],
|
||||
max_loaded_skills=2,
|
||||
)
|
||||
component = SkillComponent(config)
|
||||
|
||||
component.load_skill("skill-0")
|
||||
component.load_skill("skill-1")
|
||||
result = component.load_skill("skill-2")
|
||||
|
||||
assert "Cannot load skill" in result
|
||||
assert "maximum" in result
|
||||
assert len(component._loaded_skills) == 2
|
||||
|
||||
def test_unload_skill_command(self):
|
||||
"""Test the unload_skill command."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "unloadable"
|
||||
skill_dir.mkdir()
|
||||
(skill_dir / "SKILL.md").write_text(
|
||||
"""---
|
||||
name: unloadable
|
||||
description: Test
|
||||
---
|
||||
Content"""
|
||||
)
|
||||
|
||||
config = SkillConfiguration(skill_directories=[Path(tmp_dir)])
|
||||
component = SkillComponent(config)
|
||||
|
||||
component.load_skill("unloadable")
|
||||
assert "unloadable" in component._loaded_skills
|
||||
|
||||
result = component.unload_skill("unloadable")
|
||||
assert "unloaded" in result
|
||||
assert "unloadable" not in component._loaded_skills
|
||||
|
||||
def test_read_skill_file_command(self):
|
||||
"""Test the read_skill_file command."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "with-files"
|
||||
skill_dir.mkdir()
|
||||
(skill_dir / "SKILL.md").write_text(
|
||||
"""---
|
||||
name: with-files
|
||||
description: Has extra files
|
||||
---
|
||||
Main content"""
|
||||
)
|
||||
(skill_dir / "extra.md").write_text("Extra file content")
|
||||
|
||||
config = SkillConfiguration(skill_directories=[Path(tmp_dir)])
|
||||
component = SkillComponent(config)
|
||||
|
||||
# Must load skill first
|
||||
result = component.read_skill_file("with-files", "extra.md")
|
||||
assert "must be loaded first" in result
|
||||
|
||||
component.load_skill("with-files")
|
||||
result = component.read_skill_file("with-files", "extra.md")
|
||||
assert "Extra file content" in result
|
||||
|
||||
def test_get_messages_includes_catalog(self):
|
||||
"""Test that get_messages includes skill catalog."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "catalog-test"
|
||||
skill_dir.mkdir()
|
||||
(skill_dir / "SKILL.md").write_text(
|
||||
"""---
|
||||
name: catalog-test
|
||||
description: Test skill
|
||||
---
|
||||
Content"""
|
||||
)
|
||||
|
||||
config = SkillConfiguration(skill_directories=[Path(tmp_dir)])
|
||||
component = SkillComponent(config)
|
||||
|
||||
messages = list(component.get_messages())
|
||||
assert len(messages) >= 1
|
||||
assert "Available Skills" in messages[0].content
|
||||
assert "catalog-test" in messages[0].content
|
||||
|
||||
def test_get_messages_includes_loaded_content(self):
|
||||
"""Test that get_messages includes loaded skill content."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "content-test"
|
||||
skill_dir.mkdir()
|
||||
(skill_dir / "SKILL.md").write_text(
|
||||
"""---
|
||||
name: content-test
|
||||
description: Test skill
|
||||
---
|
||||
# Skill Instructions
|
||||
|
||||
Use this for testing purposes."""
|
||||
)
|
||||
|
||||
config = SkillConfiguration(skill_directories=[Path(tmp_dir)])
|
||||
component = SkillComponent(config)
|
||||
component.load_skill("content-test")
|
||||
|
||||
messages = list(component.get_messages())
|
||||
# First message is catalog, second should be loaded skill content
|
||||
assert len(messages) >= 2
|
||||
full_content = "\n".join(m.content for m in messages)
|
||||
assert "Skill Instructions" in full_content
|
||||
|
||||
def test_get_commands(self):
|
||||
"""Test that get_commands returns appropriate commands."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "cmd-test"
|
||||
skill_dir.mkdir()
|
||||
(skill_dir / "SKILL.md").write_text(
|
||||
"""---
|
||||
name: cmd-test
|
||||
description: Test
|
||||
---
|
||||
Content"""
|
||||
)
|
||||
|
||||
config = SkillConfiguration(skill_directories=[Path(tmp_dir)])
|
||||
component = SkillComponent(config)
|
||||
|
||||
commands = list(component.get_commands())
|
||||
cmd_names = [c.names[0] for c in commands]
|
||||
|
||||
assert "list_skills" in cmd_names
|
||||
assert "load_skill" in cmd_names
|
||||
# unload_skill and read_skill_file only when skills are loaded
|
||||
assert "unload_skill" not in cmd_names
|
||||
|
||||
component.load_skill("cmd-test")
|
||||
commands = list(component.get_commands())
|
||||
cmd_names = [c.names[0] for c in commands]
|
||||
assert "unload_skill" in cmd_names
|
||||
assert "read_skill_file" in cmd_names
|
||||
|
||||
def test_get_resources(self):
|
||||
"""Test that get_resources returns skill resource info."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
skill_dir = Path(tmp_dir) / "resource-test"
|
||||
skill_dir.mkdir()
|
||||
(skill_dir / "SKILL.md").write_text(
|
||||
"""---
|
||||
name: resource-test
|
||||
description: Test
|
||||
---
|
||||
Content"""
|
||||
)
|
||||
|
||||
config = SkillConfiguration(skill_directories=[Path(tmp_dir)])
|
||||
component = SkillComponent(config)
|
||||
|
||||
resources = list(component.get_resources())
|
||||
assert len(resources) >= 1
|
||||
assert any("skill" in r.lower() for r in resources)
|
||||
|
||||
def test_refresh_skills(self):
|
||||
"""Test the refresh_skills method."""
|
||||
with TemporaryDirectory() as tmp_dir:
|
||||
config = SkillConfiguration(skill_directories=[Path(tmp_dir)])
|
||||
component = SkillComponent(config)
|
||||
|
||||
# Initially empty
|
||||
assert len(component._available_skills) == 0
|
||||
|
||||
# Add a skill
|
||||
skill_dir = Path(tmp_dir) / "new-skill"
|
||||
skill_dir.mkdir()
|
||||
(skill_dir / "SKILL.md").write_text(
|
||||
"""---
|
||||
name: new-skill
|
||||
description: New
|
||||
---
|
||||
Content"""
|
||||
)
|
||||
|
||||
# Refresh
|
||||
component.refresh_skills()
|
||||
assert len(component._available_skills) == 1
|
||||
assert "new-skill" in component._available_skills
|
||||
@@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import inspect
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, Optional
|
||||
|
||||
import sentry_sdk
|
||||
@@ -37,6 +38,7 @@ from forge.components.http_client import HTTPClientComponent
|
||||
from forge.components.image_gen import ImageGeneratorComponent
|
||||
from forge.components.math_utils import MathUtilsComponent
|
||||
from forge.components.platform_blocks import PlatformBlocksComponent
|
||||
from forge.components.skills import SkillComponent, SkillConfiguration
|
||||
from forge.components.system import SystemComponent
|
||||
from forge.components.text_utils import TextUtilsComponent
|
||||
from forge.components.todo import TodoComponent
|
||||
@@ -215,6 +217,16 @@ class Agent(BaseAgent[AnyActionProposal], Configurable[AgentSettings]):
|
||||
# Platform blocks (enabled only if PLATFORM_API_KEY is set)
|
||||
self.platform_blocks = PlatformBlocksComponent()
|
||||
|
||||
# Skills (SKILL.md support)
|
||||
self.skills = SkillComponent(
|
||||
SkillConfiguration(
|
||||
skill_directories=[
|
||||
app_config.workspace / ".autogpt/skills",
|
||||
Path.home() / ".autogpt/skills",
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
self.event_history = settings.history
|
||||
|
||||
def _create_root_execution_context(
|
||||
|
||||
Reference in New Issue
Block a user