chore: modernize security module typing (#3469)

- Disable E501, apply Ruff formatting
- Update typing (Self, BeforeValidator), remove dead code
- Convert Fingerprint to Pydantic dataclass and fix serialization/copy behavior
- Add TODO for dynamic namespace config
This commit is contained in:
Greyson LaLonde
2025-09-08 11:52:59 -04:00
committed by GitHub
parent f936e0f69b
commit fa06aea8d5
3 changed files with 129 additions and 150 deletions

View File

@@ -0,0 +1,15 @@
"""Security constants for CrewAI.
This module contains security-related constants used throughout the security module.
Notes:
- TODO: Determine if CREW_AI_NAMESPACE should be made dynamic or configurable
"""
from typing import Annotated
from uuid import UUID
CREW_AI_NAMESPACE: Annotated[
UUID,
"Create a deterministic UUID using v5 (SHA-1). Custom namespace for CrewAI to enhance security.",
] = UUID("f47ac10b-58cc-4372-a567-0e02b2c3d479")

View File

@@ -1,130 +1,123 @@
"""
Fingerprint Module
"""Fingerprint Module
This module provides functionality for generating and validating unique identifiers
for CrewAI agents. These identifiers are used for tracking, auditing, and security.
"""
import uuid
from datetime import datetime
from typing import Any, Dict, Optional
from typing import Annotated, Any
from uuid import UUID, uuid4, uuid5
from pydantic import BaseModel, ConfigDict, Field, field_validator
from pydantic import BaseModel, BeforeValidator, Field, PrivateAttr
from typing_extensions import Self
from crewai.security.constants import CREW_AI_NAMESPACE
def _validate_metadata(v: Any) -> dict[str, Any]:
"""Validate that metadata is a dictionary with string keys and valid values."""
if not isinstance(v, dict):
raise ValueError("Metadata must be a dictionary")
# Validate that all keys are strings
for key, value in v.items():
if not isinstance(key, str):
raise ValueError(f"Metadata keys must be strings, got {type(key)}")
# Validate nested dictionaries (prevent deeply nested structures)
if isinstance(value, dict):
# Check for nested dictionaries (limit depth to 1)
for nested_key, nested_value in value.items():
if not isinstance(nested_key, str):
raise ValueError(
f"Nested metadata keys must be strings, got {type(nested_key)}"
)
if isinstance(nested_value, dict):
raise ValueError("Metadata can only be nested one level deep")
# Check for maximum metadata size (prevent DoS)
if len(str(v)) > 10_000: # Limit metadata size to 10KB
raise ValueError("Metadata size exceeds maximum allowed (10KB)")
return v
class Fingerprint(BaseModel):
"""
A class for generating and managing unique identifiers for agents.
"""A class for generating and managing unique identifiers for agents.
Each agent has dual identifiers:
- Human-readable ID: For debugging and reference (derived from role if not specified)
- Fingerprint UUID: Unique runtime identifier for tracking and auditing
Attributes:
uuid_str (str): String representation of the UUID for this fingerprint, auto-generated
created_at (datetime): When this fingerprint was created, auto-generated
metadata (Dict[str, Any]): Additional metadata associated with this fingerprint
uuid_str: String representation of the UUID for this fingerprint, auto-generated
created_at: When this fingerprint was created, auto-generated
metadata: Additional metadata associated with this fingerprint
"""
uuid_str: str = Field(default_factory=lambda: str(uuid.uuid4()), description="String representation of the UUID")
created_at: datetime = Field(default_factory=datetime.now, description="When this fingerprint was created")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata for this fingerprint")
model_config = ConfigDict(arbitrary_types_allowed=True)
@field_validator('metadata')
@classmethod
def validate_metadata(cls, v):
"""Validate that metadata is a dictionary with string keys and valid values."""
if not isinstance(v, dict):
raise ValueError("Metadata must be a dictionary")
# Validate that all keys are strings
for key, value in v.items():
if not isinstance(key, str):
raise ValueError(f"Metadata keys must be strings, got {type(key)}")
# Validate nested dictionaries (prevent deeply nested structures)
if isinstance(value, dict):
# Check for nested dictionaries (limit depth to 1)
for nested_key, nested_value in value.items():
if not isinstance(nested_key, str):
raise ValueError(f"Nested metadata keys must be strings, got {type(nested_key)}")
if isinstance(nested_value, dict):
raise ValueError("Metadata can only be nested one level deep")
# Check for maximum metadata size (prevent DoS)
if len(str(v)) > 10000: # Limit metadata size to 10KB
raise ValueError("Metadata size exceeds maximum allowed (10KB)")
return v
def __init__(self, **data):
"""Initialize a Fingerprint with auto-generated uuid_str and created_at."""
# Remove uuid_str and created_at from data to ensure they're auto-generated
if 'uuid_str' in data:
data.pop('uuid_str')
if 'created_at' in data:
data.pop('created_at')
# Call the parent constructor with the modified data
super().__init__(**data)
_uuid_str: str = PrivateAttr(default_factory=lambda: str(uuid4()))
_created_at: datetime = PrivateAttr(default_factory=datetime.now)
metadata: Annotated[dict[str, Any], BeforeValidator(_validate_metadata)] = Field(
default_factory=dict
)
@property
def uuid(self) -> uuid.UUID:
def uuid_str(self) -> str:
"""Get the string representation of the UUID for this fingerprint."""
return self._uuid_str
@property
def created_at(self) -> datetime:
"""Get the creation timestamp for this fingerprint."""
return self._created_at
@property
def uuid(self) -> UUID:
"""Get the UUID object for this fingerprint."""
return uuid.UUID(self.uuid_str)
return UUID(self.uuid_str)
@classmethod
def _generate_uuid(cls, seed: str) -> str:
"""
Generate a deterministic UUID based on a seed string.
"""Generate a deterministic UUID based on a seed string.
Args:
seed (str): The seed string to use for UUID generation
seed: The seed string to use for UUID generation
Returns:
str: A string representation of the UUID consistently generated from the seed
A string representation of the UUID consistently generated from the seed
"""
if not isinstance(seed, str):
raise ValueError("Seed must be a string")
if not seed.strip():
raise ValueError("Seed cannot be empty or whitespace")
# Create a deterministic UUID using v5 (SHA-1)
# Custom namespace for CrewAI to enhance security
# Using a unique namespace specific to CrewAI to reduce collision risks
CREW_AI_NAMESPACE = uuid.UUID('f47ac10b-58cc-4372-a567-0e02b2c3d479')
return str(uuid.uuid5(CREW_AI_NAMESPACE, seed))
return str(uuid5(CREW_AI_NAMESPACE, seed))
@classmethod
def generate(cls, seed: Optional[str] = None, metadata: Optional[Dict[str, Any]] = None) -> 'Fingerprint':
"""
Static factory method to create a new Fingerprint.
def generate(
cls, seed: str | None = None, metadata: dict[str, Any] | None = None
) -> Self:
"""Static factory method to create a new Fingerprint.
Args:
seed (Optional[str]): A string to use as seed for the UUID generation.
seed: A string to use as seed for the UUID generation.
If None, a random UUID is generated.
metadata (Optional[Dict[str, Any]]): Additional metadata to store with the fingerprint.
metadata: Additional metadata to store with the fingerprint.
Returns:
Fingerprint: A new Fingerprint instance
A new Fingerprint instance
"""
fingerprint = cls(metadata=metadata or {})
if seed:
# For seed-based generation, we need to manually set the uuid_str after creation
object.__setattr__(fingerprint, 'uuid_str', cls._generate_uuid(seed))
# For seed-based generation, we need to manually set the _uuid_str after creation
fingerprint.__dict__["_uuid_str"] = cls._generate_uuid(seed)
return fingerprint
def __str__(self) -> str:
"""String representation of the fingerprint (the UUID)."""
return self.uuid_str
def __eq__(self, other) -> bool:
def __eq__(self, other: Any) -> bool:
"""Compare fingerprints by their UUID."""
if isinstance(other, Fingerprint):
if type(other) is Fingerprint:
return self.uuid_str == other.uuid_str
return False
@@ -132,29 +125,27 @@ class Fingerprint(BaseModel):
"""Hash of the fingerprint (based on UUID)."""
return hash(self.uuid_str)
def to_dict(self) -> Dict[str, Any]:
"""
Convert the fingerprint to a dictionary representation.
def to_dict(self) -> dict[str, Any]:
"""Convert the fingerprint to a dictionary representation.
Returns:
Dict[str, Any]: Dictionary representation of the fingerprint
Dictionary representation of the fingerprint
"""
return {
"uuid_str": self.uuid_str,
"created_at": self.created_at.isoformat(),
"metadata": self.metadata
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Fingerprint':
"""
Create a Fingerprint from a dictionary representation.
def from_dict(cls, data: dict[str, Any]) -> Self:
"""Create a Fingerprint from a dictionary representation.
Args:
data (Dict[str, Any]): Dictionary representation of a fingerprint
data: Dictionary representation of a fingerprint
Returns:
Fingerprint: A new Fingerprint instance
A new Fingerprint instance
"""
if not data:
return cls()
@@ -163,8 +154,10 @@ class Fingerprint(BaseModel):
# For consistency with existing stored fingerprints, we need to manually set these
if "uuid_str" in data:
object.__setattr__(fingerprint, 'uuid_str', data["uuid_str"])
fingerprint.__dict__["_uuid_str"] = data["uuid_str"]
if "created_at" in data and isinstance(data["created_at"], str):
object.__setattr__(fingerprint, 'created_at', datetime.fromisoformat(data["created_at"]))
fingerprint.__dict__["_created_at"] = datetime.fromisoformat(
data["created_at"]
)
return fingerprint

View File

@@ -1,5 +1,4 @@
"""
Security Configuration Module
"""Security Configuration Module
This module provides configuration for CrewAI security features, including:
- Authentication settings
@@ -10,9 +9,10 @@ The SecurityConfig class is the primary interface for managing security settings
in CrewAI applications.
"""
from typing import Any, Dict, Optional
from typing import Any
from pydantic import BaseModel, ConfigDict, Field, model_validator
from pydantic import BaseModel, ConfigDict, Field, field_validator
from typing_extensions import Self
from crewai.security.fingerprint import Fingerprint
@@ -28,7 +28,6 @@ class SecurityConfig(BaseModel):
- Impersonation/delegation tokens *TODO*
Attributes:
version (str): Version of the security configuration
fingerprint (Fingerprint): The unique fingerprint automatically generated for the component
"""
@@ -37,80 +36,52 @@ class SecurityConfig(BaseModel):
# Note: Cannot use frozen=True as existing tests modify the fingerprint property
)
version: str = Field(
default="1.0.0",
description="Version of the security configuration"
)
fingerprint: Fingerprint = Field(
default_factory=Fingerprint,
description="Unique identifier for the component"
default_factory=Fingerprint, description="Unique identifier for the component"
)
def is_compatible(self, min_version: str) -> bool:
"""
Check if this security configuration is compatible with the minimum required version.
Args:
min_version (str): Minimum required version in semver format (e.g., "1.0.0")
Returns:
bool: True if this configuration is compatible, False otherwise
"""
# Simple version comparison (can be enhanced with packaging.version if needed)
current = [int(x) for x in self.version.split(".")]
minimum = [int(x) for x in min_version.split(".")]
# Compare major, minor, patch versions
for c, m in zip(current, minimum):
if c > m:
return True
if c < m:
return False
return True
@model_validator(mode='before')
@field_validator("fingerprint", mode="before")
@classmethod
def validate_fingerprint(cls, values):
def validate_fingerprint(cls, v: Any) -> Fingerprint:
"""Ensure fingerprint is properly initialized."""
if isinstance(values, dict):
# Handle case where fingerprint is not provided or is None
if 'fingerprint' not in values or values['fingerprint'] is None:
values['fingerprint'] = Fingerprint()
# Handle case where fingerprint is a string (seed)
elif isinstance(values['fingerprint'], str):
if not values['fingerprint'].strip():
raise ValueError("Fingerprint seed cannot be empty")
values['fingerprint'] = Fingerprint.generate(seed=values['fingerprint'])
return values
if v is None:
return Fingerprint()
if isinstance(v, str):
if not v.strip():
raise ValueError("Fingerprint seed cannot be empty")
return Fingerprint.generate(seed=v)
if isinstance(v, dict):
return Fingerprint.from_dict(v)
if isinstance(v, Fingerprint):
return v
def to_dict(self) -> Dict[str, Any]:
raise ValueError(f"Invalid fingerprint type: {type(v)}")
def to_dict(self) -> dict[str, Any]:
"""
Convert the security config to a dictionary.
Returns:
Dict[str, Any]: Dictionary representation of the security config
Dictionary representation of the security config
"""
result = {
"fingerprint": self.fingerprint.to_dict()
}
return result
return {"fingerprint": self.fingerprint.to_dict()}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'SecurityConfig':
def from_dict(cls, data: dict[str, Any]) -> Self:
"""
Create a SecurityConfig from a dictionary.
Args:
data (Dict[str, Any]): Dictionary representation of a security config
data: Dictionary representation of a security config
Returns:
SecurityConfig: A new SecurityConfig instance
A new SecurityConfig instance
"""
# Make a copy to avoid modifying the original
data_copy = data.copy()
fingerprint_data = data_copy.pop("fingerprint", None)
fingerprint = Fingerprint.from_dict(fingerprint_data) if fingerprint_data else Fingerprint()
fingerprint_data = data.get("fingerprint")
fingerprint = (
Fingerprint.from_dict(fingerprint_data)
if fingerprint_data
else Fingerprint()
)
return cls(fingerprint=fingerprint)