Compare commits

..

3 Commits

Author SHA1 Message Date
openhands
3a85a357c3 Add LLMSecurityAnalyzer with tests 2025-05-28 22:16:43 +00:00
openhands
4a76459066 Add PushoverSecurityAnalyzer and BullySecurityAnalyzer with tests 2025-05-28 22:10:44 +00:00
openhands
d48b04e7a2 Add integration tests for security analyzer 2025-05-28 22:02:11 +00:00
8 changed files with 965 additions and 19 deletions

View File

@@ -35,11 +35,7 @@ from openhands.utils.tenacity_stop import stop_if_should_exit
class RemoteRuntime(ActionExecutionClient):
"""
This runtime will connect to a remote oh-runtime-client.
The attach_to_existing option is ignored, `connect` will always resume if possible.
"""
"""This runtime will connect to a remote oh-runtime-client."""
port: int = 60000 # default port for the remote runtime client
runtime_id: str | None = None
@@ -124,20 +120,28 @@ class RemoteRuntime(ActionExecutionClient):
self._runtime_initialized = True
def _start_or_attach_to_runtime(self) -> None:
self.send_status_message('STATUS$STARTING_CONTAINER')
if self.config.sandbox.runtime_container_image is None:
self.log(
'info',
f'Building remote runtime with base image: {self.config.sandbox.base_container_image}',
existing_runtime = self._check_existing_runtime()
if existing_runtime:
self.log('debug', f'Using existing runtime with ID: {self.runtime_id}')
elif self.attach_to_existing:
raise AgentRuntimeNotFoundError(
f'Could not find existing runtime for SID: {self.sid}'
)
self._build_runtime()
else:
self.log(
'info',
f'Starting remote runtime with image: {self.config.sandbox.runtime_container_image}',
)
self.container_image = self.config.sandbox.runtime_container_image
self._start_runtime()
self.send_status_message('STATUS$STARTING_CONTAINER')
if self.config.sandbox.runtime_container_image is None:
self.log(
'info',
f'Building remote runtime with base image: {self.config.sandbox.base_container_image}',
)
self._build_runtime()
else:
self.log(
'info',
f'Starting remote runtime with image: {self.config.sandbox.runtime_container_image}',
)
self.container_image = self.config.sandbox.runtime_container_image
self._start_runtime()
assert self.runtime_id is not None, (
'Runtime ID is not set. This should never happen.'
)
@@ -145,9 +149,11 @@ class RemoteRuntime(ActionExecutionClient):
'Runtime URL is not set. This should never happen.'
)
self.send_status_message('STATUS$WAITING_FOR_CLIENT')
self.log('info', 'Waiting for runtime to be alive...')
if not self.attach_to_existing:
self.log('info', 'Waiting for runtime to be alive...')
self._wait_until_alive()
self.log('info', 'Runtime is ready.')
if not self.attach_to_existing:
self.log('info', 'Runtime is ready.')
self.send_status_message(' ')
def _check_existing_runtime(self) -> bool:

View File

@@ -0,0 +1,56 @@
from typing import Any
from fastapi import Request
from openhands.events.action.action import (
Action,
ActionConfirmationStatus,
ActionSecurityRisk,
)
from openhands.events.event import Event
from openhands.events.stream import EventStream
from openhands.security.analyzer import SecurityAnalyzer
class BullySecurityAnalyzer(SecurityAnalyzer):
"""A security analyzer that blocks all actions by marking them as high risk."""
settings: dict[str, Any] = {}
def __init__(self, event_stream: EventStream) -> None:
"""Initializes a new instance of the BullySecurityAnalyzer class.
Args:
event_stream: The event stream to listen for events.
"""
super().__init__(event_stream)
self.settings = {}
async def handle_api_request(self, request: Request) -> Any:
"""Handles the incoming API request."""
raise NotImplementedError('API requests not supported in BullySecurityAnalyzer')
async def security_risk(self, event: Action) -> ActionSecurityRisk:
"""Evaluates the Action for security risks and returns the risk level.
Always returns HIGH risk for all actions.
"""
return ActionSecurityRisk.HIGH
async def should_confirm(self, event: Event) -> bool:
"""Determines if the event should be confirmed based on its security risk.
Args:
event: The event to check.
Returns:
True if the event should be confirmed, False otherwise.
"""
risk = event.security_risk if hasattr(event, 'security_risk') else None # type: ignore [attr-defined]
return (
risk is not None
and risk < self.settings.get('RISK_SEVERITY', ActionSecurityRisk.MEDIUM)
and hasattr(event, 'confirmation_state')
and event.confirmation_state
== ActionConfirmationStatus.AWAITING_CONFIRMATION
)

View File

@@ -0,0 +1,135 @@
from typing import Any
from fastapi import Request
from openhands.core.config import LLMConfig
from openhands.core.message import Message, TextContent
from openhands.events.action.action import (
Action,
ActionConfirmationStatus,
ActionSecurityRisk,
)
from openhands.events.event import Event
from openhands.events.stream import EventStream
from openhands.llm.llm import LLM
from openhands.security.analyzer import SecurityAnalyzer
class LLMSecurityAnalyzer(SecurityAnalyzer):
"""A security analyzer that uses an LLM to evaluate actions for security risks."""
settings: dict[str, Any] = {}
llm: LLM | None = None
def __init__(
self, event_stream: EventStream, llm_config: LLMConfig | None = None
) -> None:
"""Initializes a new instance of the LLMSecurityAnalyzer class.
Args:
event_stream: The event stream to listen for events.
llm_config: The configuration for the LLM to use for security analysis.
If None, a default configuration will be used.
"""
super().__init__(event_stream)
self.settings = {}
# Initialize the LLM with the provided config or a default config
if llm_config is None:
llm_config = LLMConfig(
model='gpt-4o',
temperature=0.0,
max_output_tokens=10, # We only need a short YES/NO response
)
self.llm = LLM(config=llm_config)
async def handle_api_request(self, request: Request) -> Any:
"""Handles the incoming API request."""
raise NotImplementedError('API requests not supported in LLMSecurityAnalyzer')
async def security_risk(self, event: Action) -> ActionSecurityRisk:
"""Evaluates the Action for security risks and returns the risk level.
Uses an LLM to determine if the action is safe or not.
Args:
event: The action to evaluate.
Returns:
HIGH if the LLM determines the action is unsafe, LOW otherwise.
"""
if self.llm is None:
# If LLM is not available, default to LOW risk
return ActionSecurityRisk.LOW
# Create a system message that instructs the LLM on how to evaluate actions
system_message = Message(
role='system',
content=[
TextContent(
text=(
'You are a security analyzer that evaluates actions for security risks. '
'You will be given an action and should determine if it poses a security risk. '
"Respond with ONLY 'YES' if the action is potentially harmful or risky, "
"or 'NO' if the action appears safe. "
'Consider the following as potentially harmful: '
'- Destructive file system operations (rm -rf, etc.) '
'- Downloading and executing untrusted code '
'- Accessing sensitive information '
'- Network operations that could exfiltrate data '
'- Any command that could compromise system integrity'
)
)
],
)
# Create a user message with the action details
action_type = type(event).__name__
action_details = str(event)
user_message = Message(
role='user',
content=[
TextContent(
text=(
f'Action type: {action_type}\n'
f'Action details: {action_details}\n\n'
'Is this action potentially harmful? Answer with ONLY YES or NO.'
)
)
],
)
# Send the messages to the LLM
messages = [system_message, user_message]
response = self.llm.completion(
messages=self.llm.format_messages_for_llm(messages),
max_tokens=10, # Keep it short
)
# Extract the response text
response_text = response['choices'][0]['message']['content'].strip().upper()
# Determine the risk level based on the response
if 'YES' in response_text:
return ActionSecurityRisk.HIGH
else:
return ActionSecurityRisk.LOW
async def should_confirm(self, event: Event) -> bool:
"""Determines if the event should be confirmed based on its security risk.
Args:
event: The event to check.
Returns:
True if the event should be confirmed, False otherwise.
"""
risk = event.security_risk if hasattr(event, 'security_risk') else None # type: ignore [attr-defined]
return (
risk is not None
and risk < self.settings.get('RISK_SEVERITY', ActionSecurityRisk.MEDIUM)
and hasattr(event, 'confirmation_state')
and event.confirmation_state
== ActionConfirmationStatus.AWAITING_CONFIRMATION
)

View File

@@ -1,6 +1,12 @@
from openhands.security.analyzer import SecurityAnalyzer
from openhands.security.bully import BullySecurityAnalyzer
from openhands.security.invariant.analyzer import InvariantAnalyzer
from openhands.security.llm_analyzer import LLMSecurityAnalyzer
from openhands.security.pushover import PushoverSecurityAnalyzer
SecurityAnalyzers: dict[str, type[SecurityAnalyzer]] = {
'invariant': InvariantAnalyzer,
'pushover': PushoverSecurityAnalyzer,
'bully': BullySecurityAnalyzer,
'llm': LLMSecurityAnalyzer,
}

View File

@@ -0,0 +1,58 @@
from typing import Any
from fastapi import Request
from openhands.events.action.action import (
Action,
ActionConfirmationStatus,
ActionSecurityRisk,
)
from openhands.events.event import Event
from openhands.events.stream import EventStream
from openhands.security.analyzer import SecurityAnalyzer
class PushoverSecurityAnalyzer(SecurityAnalyzer):
"""A security analyzer that allows all actions by marking them as low risk."""
settings: dict[str, Any] = {}
def __init__(self, event_stream: EventStream) -> None:
"""Initializes a new instance of the PushoverSecurityAnalyzer class.
Args:
event_stream: The event stream to listen for events.
"""
super().__init__(event_stream)
self.settings = {}
async def handle_api_request(self, request: Request) -> Any:
"""Handles the incoming API request."""
raise NotImplementedError(
'API requests not supported in PushoverSecurityAnalyzer'
)
async def security_risk(self, event: Action) -> ActionSecurityRisk:
"""Evaluates the Action for security risks and returns the risk level.
Always returns LOW risk for all actions.
"""
return ActionSecurityRisk.LOW
async def should_confirm(self, event: Event) -> bool:
"""Determines if the event should be confirmed based on its security risk.
Args:
event: The event to check.
Returns:
True if the event should be confirmed, False otherwise.
"""
risk = event.security_risk if hasattr(event, 'security_risk') else None # type: ignore [attr-defined]
return (
risk is not None
and risk < self.settings.get('RISK_SEVERITY', ActionSecurityRisk.MEDIUM)
and hasattr(event, 'confirmation_state')
and event.confirmation_state
== ActionConfirmationStatus.AWAITING_CONFIRMATION
)

View File

@@ -0,0 +1,182 @@
import pathlib
import tempfile
import pytest
from openhands.events.action import (
CmdRunAction,
IPythonRunCellAction,
MessageAction,
)
from openhands.events.action.action import ActionConfirmationStatus, ActionSecurityRisk
from openhands.events.event import EventSource
from openhands.events.stream import EventStream
from openhands.security.bully import BullySecurityAnalyzer
from openhands.security.pushover import PushoverSecurityAnalyzer
from openhands.storage import get_file_store
@pytest.fixture
def temp_dir():
# get a temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
pathlib.Path(temp_dir).mkdir(parents=True, exist_ok=True)
yield temp_dir
@pytest.mark.asyncio
async def test_pushover_security_analyzer_allows_all_actions(temp_dir):
"""Test that the PushoverSecurityAnalyzer allows all actions by marking them as low risk."""
# Set up the event stream
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('main', file_store)
# Create the security analyzer
security_analyzer = PushoverSecurityAnalyzer(event_stream)
# Create a potentially dangerous command action
dangerous_action = CmdRunAction(
command='rm -rf /', thought='Deleting root directory'
)
dangerous_action._source = EventSource.AGENT
# Add the action to the event stream
event_stream.add_event(dangerous_action, EventSource.AGENT)
# Manually call the security analyzer's on_event method
await security_analyzer.on_event(dangerous_action)
# Verify that the security risk was set to LOW regardless of the action
assert hasattr(dangerous_action, 'security_risk')
assert dangerous_action.security_risk == ActionSecurityRisk.LOW
# Create a safe command action
safe_action = CmdRunAction(command='ls -la', thought='Listing files')
safe_action._source = EventSource.AGENT
# Add the action to the event stream
event_stream.add_event(safe_action, EventSource.AGENT)
# Manually call the security analyzer's on_event method
await security_analyzer.on_event(safe_action)
# Verify that the security risk was set to LOW
assert hasattr(safe_action, 'security_risk')
assert safe_action.security_risk == ActionSecurityRisk.LOW
# Create a Python action
python_action = IPythonRunCellAction(
code="import os\nos.system('echo hello')", thought='Running system command'
)
python_action._source = EventSource.AGENT
# Add the action to the event stream
event_stream.add_event(python_action, EventSource.AGENT)
# Manually call the security analyzer's on_event method
await security_analyzer.on_event(python_action)
# Verify that the security risk was set to LOW
assert hasattr(python_action, 'security_risk')
assert python_action.security_risk == ActionSecurityRisk.LOW
@pytest.mark.asyncio
async def test_bully_security_analyzer_blocks_all_actions(temp_dir):
"""Test that the BullySecurityAnalyzer blocks all actions by marking them as high risk."""
# Set up the event stream
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('main', file_store)
# Create the security analyzer
security_analyzer = BullySecurityAnalyzer(event_stream)
# Create a safe command action
safe_action = CmdRunAction(command='ls -la', thought='Listing files')
safe_action._source = EventSource.AGENT
# Add the action to the event stream
event_stream.add_event(safe_action, EventSource.AGENT)
# Manually call the security analyzer's on_event method
await security_analyzer.on_event(safe_action)
# Verify that the security risk was set to HIGH regardless of the action
assert hasattr(safe_action, 'security_risk')
assert safe_action.security_risk == ActionSecurityRisk.HIGH
# Create a harmless Python action
python_action = IPythonRunCellAction(
code="print('Hello, world!')", thought='Printing a message'
)
python_action._source = EventSource.AGENT
# Add the action to the event stream
event_stream.add_event(python_action, EventSource.AGENT)
# Manually call the security analyzer's on_event method
await security_analyzer.on_event(python_action)
# Verify that the security risk was set to HIGH
assert hasattr(python_action, 'security_risk')
assert python_action.security_risk == ActionSecurityRisk.HIGH
# Create a message action (which should be harmless)
message_action = MessageAction(content='Hello, world!')
message_action._source = EventSource.AGENT
# Add the action to the event stream
event_stream.add_event(message_action, EventSource.AGENT)
# Manually call the security analyzer's on_event method
await security_analyzer.on_event(message_action)
# Verify that the security risk was set to HIGH
assert hasattr(message_action, 'security_risk')
assert message_action.security_risk == ActionSecurityRisk.HIGH
@pytest.mark.asyncio
async def test_security_analyzer_confirmation_behavior(temp_dir):
"""Test that the security analyzers correctly handle confirmation based on risk level."""
# Set up the event stream
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('main', file_store)
# Create the security analyzers
pushover_analyzer = PushoverSecurityAnalyzer(event_stream)
bully_analyzer = BullySecurityAnalyzer(event_stream)
# Create a command action
action = CmdRunAction(command='echo test', thought='Running echo')
action._source = EventSource.AGENT
# Set the action to awaiting confirmation
action.confirmation_state = ActionConfirmationStatus.AWAITING_CONFIRMATION
# Set the RISK_SEVERITY setting to HIGH (so MEDIUM and LOW risk actions need confirmation)
pushover_analyzer.settings = {'RISK_SEVERITY': ActionSecurityRisk.HIGH}
bully_analyzer.settings = {'RISK_SEVERITY': ActionSecurityRisk.HIGH}
# Process with pushover analyzer
await pushover_analyzer.on_event(action)
# Verify that the security risk was set to LOW
assert hasattr(action, 'security_risk')
assert action.security_risk == ActionSecurityRisk.LOW
# Verify that the pushover analyzer would confirm the action
assert await pushover_analyzer.should_confirm(action)
# Reset the action
action.security_risk = None
# Process with bully analyzer
await bully_analyzer.on_event(action)
# Verify that the security risk was set to HIGH
assert hasattr(action, 'security_risk')
assert action.security_risk == ActionSecurityRisk.HIGH
# Verify that the bully analyzer would not confirm the action
assert not await bully_analyzer.should_confirm(action)

View File

@@ -0,0 +1,197 @@
import pathlib
import tempfile
from unittest.mock import patch
import pytest
from openhands.core.config import LLMConfig
from openhands.events.action import (
CmdRunAction,
IPythonRunCellAction,
)
from openhands.events.action.action import ActionConfirmationStatus, ActionSecurityRisk
from openhands.events.event import EventSource
from openhands.events.stream import EventStream
from openhands.security.llm_analyzer import LLMSecurityAnalyzer
from openhands.storage import get_file_store
@pytest.fixture
def temp_dir():
# get a temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
pathlib.Path(temp_dir).mkdir(parents=True, exist_ok=True)
yield temp_dir
@pytest.fixture
def mock_llm_response():
# Mock the LLM response for security analysis
return {
'choices': [
{
'message': {
'content': 'NO',
'role': 'assistant',
}
}
]
}
@pytest.fixture
def mock_llm_dangerous_response():
# Mock the LLM response for security analysis of dangerous actions
return {
'choices': [
{
'message': {
'content': 'YES',
'role': 'assistant',
}
}
]
}
@pytest.mark.asyncio
async def test_llm_security_analyzer_safe_action(temp_dir, mock_llm_response):
"""Test that the LLMSecurityAnalyzer correctly identifies safe actions."""
# Set up the event stream
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('main', file_store)
# Create a mock LLM config
llm_config = LLMConfig(model='gpt-4o', temperature=0.0)
# Create a safe command action
safe_action = CmdRunAction(command='ls -la', thought='Listing files')
safe_action._source = EventSource.AGENT
# Mock the LLM completion method
with patch('openhands.llm.llm.litellm_completion', return_value=mock_llm_response):
# Create the security analyzer
security_analyzer = LLMSecurityAnalyzer(event_stream, llm_config)
# Add the action to the event stream
event_stream.add_event(safe_action, EventSource.AGENT)
# Manually call the security analyzer's on_event method
await security_analyzer.on_event(safe_action)
# Verify that the security risk was set to LOW
assert hasattr(safe_action, 'security_risk')
assert safe_action.security_risk == ActionSecurityRisk.LOW
# Set the action to awaiting confirmation
safe_action.confirmation_state = ActionConfirmationStatus.AWAITING_CONFIRMATION
# Set the RISK_SEVERITY setting to HIGH (so MEDIUM and LOW risk actions need confirmation)
security_analyzer.settings = {'RISK_SEVERITY': ActionSecurityRisk.HIGH}
# Verify that the security analyzer would confirm the action
assert await security_analyzer.should_confirm(safe_action)
@pytest.mark.asyncio
async def test_llm_security_analyzer_dangerous_action(
temp_dir, mock_llm_dangerous_response
):
"""Test that the LLMSecurityAnalyzer correctly identifies dangerous actions."""
# Set up the event stream
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('main', file_store)
# Create a mock LLM config
llm_config = LLMConfig(model='gpt-4o', temperature=0.0)
# Create a dangerous command action
dangerous_action = CmdRunAction(
command='rm -rf /', thought='Deleting root directory'
)
dangerous_action._source = EventSource.AGENT
# Mock the LLM completion method
with patch(
'openhands.llm.llm.litellm_completion', return_value=mock_llm_dangerous_response
):
# Create the security analyzer
security_analyzer = LLMSecurityAnalyzer(event_stream, llm_config)
# Add the action to the event stream
event_stream.add_event(dangerous_action, EventSource.AGENT)
# Manually call the security analyzer's on_event method
await security_analyzer.on_event(dangerous_action)
# Verify that the security risk was set to HIGH
assert hasattr(dangerous_action, 'security_risk')
assert dangerous_action.security_risk == ActionSecurityRisk.HIGH
# Set the action to awaiting confirmation
dangerous_action.confirmation_state = (
ActionConfirmationStatus.AWAITING_CONFIRMATION
)
# Set the RISK_SEVERITY setting to HIGH (so MEDIUM and LOW risk actions need confirmation)
security_analyzer.settings = {'RISK_SEVERITY': ActionSecurityRisk.HIGH}
# Verify that the security analyzer would not confirm the action
assert not await security_analyzer.should_confirm(dangerous_action)
@pytest.mark.asyncio
async def test_llm_security_analyzer_python_code(
temp_dir, mock_llm_response, mock_llm_dangerous_response
):
"""Test that the LLMSecurityAnalyzer correctly evaluates Python code."""
# Set up the event stream
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('main', file_store)
# Create a mock LLM config
llm_config = LLMConfig(model='gpt-4o', temperature=0.0)
# Create a safe Python action
safe_code = IPythonRunCellAction(
code="print('Hello, world!')", thought='Printing a message'
)
safe_code._source = EventSource.AGENT
# Create a dangerous Python action
dangerous_code = IPythonRunCellAction(
code="import os\nos.system('rm -rf /')", thought='Running system command'
)
dangerous_code._source = EventSource.AGENT
# Mock the LLM completion method for safe code
with patch('openhands.llm.llm.litellm_completion', return_value=mock_llm_response):
# Create the security analyzer
security_analyzer = LLMSecurityAnalyzer(event_stream, llm_config)
# Add the action to the event stream
event_stream.add_event(safe_code, EventSource.AGENT)
# Manually call the security analyzer's on_event method
await security_analyzer.on_event(safe_code)
# Verify that the security risk was set to LOW
assert hasattr(safe_code, 'security_risk')
assert safe_code.security_risk == ActionSecurityRisk.LOW
# Mock the LLM completion method for dangerous code
with patch(
'openhands.llm.llm.litellm_completion', return_value=mock_llm_dangerous_response
):
# Create a new security analyzer
security_analyzer = LLMSecurityAnalyzer(event_stream, llm_config)
# Add the action to the event stream
event_stream.add_event(dangerous_code, EventSource.AGENT)
# Manually call the security analyzer's on_event method
await security_analyzer.on_event(dangerous_code)
# Verify that the security risk was set to HIGH
assert hasattr(dangerous_code, 'security_risk')
assert dangerous_code.security_risk == ActionSecurityRisk.HIGH

View File

@@ -0,0 +1,306 @@
import pathlib
import tempfile
from unittest.mock import MagicMock, patch
import pytest
from openhands.core.schema import AgentState
from openhands.events.action import (
ChangeAgentStateAction,
CmdRunAction,
IPythonRunCellAction,
)
from openhands.events.action.action import ActionConfirmationStatus, ActionSecurityRisk
from openhands.events.event import EventSource
from openhands.events.stream import EventStream
from openhands.security.invariant import InvariantAnalyzer
from openhands.security.invariant.client import InvariantClient
from openhands.storage import get_file_store
@pytest.fixture
def temp_dir():
# get a temporary directory
with tempfile.TemporaryDirectory() as temp_dir:
pathlib.Path(temp_dir).mkdir(parents=True, exist_ok=True)
yield temp_dir
@pytest.mark.asyncio
async def test_security_analyzer_blocks_high_risk_action(temp_dir):
"""Test that high-risk actions are blocked by the security analyzer."""
# Set up the event stream
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('main', file_store)
# Create a dangerous command action
dangerous_action = CmdRunAction(
command='rm -rf /', thought='Deleting root directory'
)
dangerous_action._source = EventSource.AGENT
# Mock the Docker and HTTP clients
mock_container = MagicMock()
mock_container.status = 'running'
mock_container.attrs = {
'NetworkSettings': {'Ports': {'8000/tcp': [{'HostPort': '34567'}]}}
}
mock_docker = MagicMock()
mock_docker.from_env().containers.list.return_value = [mock_container]
mock_httpx = MagicMock()
mock_httpx.get().json.return_value = {'id': 'mock-session-id'}
mock_httpx.post().json.side_effect = [
{'monitor_id': 'mock-monitor-id'},
[
'PolicyViolation(Disallow rm -rf [risk=high], ranges=[<2 ranges>])'
], # Security check response
]
# Create a policy that marks 'rm -rf' commands as high risk
policy = """
raise "Disallow rm -rf [risk=high]" if:
(call: ToolCall)
call is tool:run
match("rm -rf", call.function.arguments.command)
"""
with (
patch(f'{InvariantAnalyzer.__module__}.docker', mock_docker),
patch(f'{InvariantClient.__module__}.httpx', mock_httpx),
):
# Create the security analyzer
security_analyzer = InvariantAnalyzer(event_stream, policy)
# Add the action to the event stream
event_stream.add_event(dangerous_action, EventSource.AGENT)
# Manually call the security analyzer's on_event method
await security_analyzer.on_event(dangerous_action)
# Verify that the security risk was set to HIGH
assert hasattr(dangerous_action, 'security_risk')
assert dangerous_action.security_risk == ActionSecurityRisk.HIGH
# Set the action to awaiting confirmation
dangerous_action.confirmation_state = (
ActionConfirmationStatus.AWAITING_CONFIRMATION
)
# Verify that the security analyzer would confirm the action
assert not await security_analyzer.should_confirm(dangerous_action)
@pytest.mark.asyncio
async def test_security_analyzer_allows_low_risk_action(temp_dir):
"""Test that low-risk actions are allowed by the security analyzer."""
# Set up the event stream
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('main', file_store)
# Create a safe command action
safe_action = CmdRunAction(command='ls -la', thought='Listing files')
safe_action._source = EventSource.AGENT
# Mock the Docker and HTTP clients
mock_container = MagicMock()
mock_container.status = 'running'
mock_container.attrs = {
'NetworkSettings': {'Ports': {'8000/tcp': [{'HostPort': '34567'}]}}
}
mock_docker = MagicMock()
mock_docker.from_env().containers.list.return_value = [mock_container]
mock_httpx = MagicMock()
mock_httpx.get().json.return_value = {'id': 'mock-session-id'}
mock_httpx.post().json.side_effect = [
{'monitor_id': 'mock-monitor-id'},
[], # Security check response - no violations
]
# Create a policy that marks 'rm -rf' commands as high risk (but we'll use a safe command)
policy = """
raise "Disallow rm -rf [risk=high]" if:
(call: ToolCall)
call is tool:run
match("rm -rf", call.function.arguments.command)
"""
with (
patch(f'{InvariantAnalyzer.__module__}.docker', mock_docker),
patch(f'{InvariantClient.__module__}.httpx', mock_httpx),
):
# Create the security analyzer
security_analyzer = InvariantAnalyzer(event_stream, policy)
# Add the action to the event stream
event_stream.add_event(safe_action, EventSource.AGENT)
# Manually call the security analyzer's on_event method
await security_analyzer.on_event(safe_action)
# Verify that the security risk was set to LOW
assert hasattr(safe_action, 'security_risk')
assert safe_action.security_risk == ActionSecurityRisk.LOW
@pytest.mark.asyncio
async def test_security_analyzer_medium_risk_with_confirmation(temp_dir):
"""Test that medium-risk actions require confirmation when confirmation mode is enabled."""
# Set up the event stream
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('main', file_store)
# Create a medium-risk Python action
medium_risk_action = IPythonRunCellAction(
code="import os\nos.system('echo hello')", thought='Running system command'
)
medium_risk_action._source = EventSource.AGENT
# Mock the Docker and HTTP clients
mock_container = MagicMock()
mock_container.status = 'running'
mock_container.attrs = {
'NetworkSettings': {'Ports': {'8000/tcp': [{'HostPort': '34567'}]}}
}
mock_docker = MagicMock()
mock_docker.from_env().containers.list.return_value = [mock_container]
mock_httpx = MagicMock()
mock_httpx.get().json.return_value = {'id': 'mock-session-id'}
mock_httpx.post().json.side_effect = [
{'monitor_id': 'mock-monitor-id'},
[
'PolicyViolation(Potentially unsafe code [risk=medium], ranges=[<2 ranges>])'
], # Security check response
]
# Create a policy that marks certain Python code as medium risk
policy = """
raise "Potentially unsafe code [risk=medium]" if:
(call: ToolCall)
call is tool:run_ipython
match("os.system", call.function.arguments.code)
"""
with (
patch(f'{InvariantAnalyzer.__module__}.docker', mock_docker),
patch(f'{InvariantClient.__module__}.httpx', mock_httpx),
):
# Create the security analyzer
security_analyzer = InvariantAnalyzer(event_stream, policy)
# Set the RISK_SEVERITY setting to HIGH (so MEDIUM risk actions need confirmation)
security_analyzer.settings = {'RISK_SEVERITY': ActionSecurityRisk.HIGH}
# Add the action to the event stream
event_stream.add_event(medium_risk_action, EventSource.AGENT)
# Manually call the security analyzer's on_event method
await security_analyzer.on_event(medium_risk_action)
# Verify that the security risk was set to MEDIUM
assert hasattr(medium_risk_action, 'security_risk')
assert medium_risk_action.security_risk == ActionSecurityRisk.MEDIUM
# Set the action to awaiting confirmation
medium_risk_action.confirmation_state = (
ActionConfirmationStatus.AWAITING_CONFIRMATION
)
# Verify that the security analyzer would confirm the action
assert await security_analyzer.should_confirm(medium_risk_action)
# Mock the add_event method to capture the confirmation action
original_add_event = event_stream.add_event
confirmation_actions = []
def mock_add_event(event, source):
if isinstance(event, ChangeAgentStateAction):
confirmation_actions.append(event)
return original_add_event(event, source)
event_stream.add_event = mock_add_event
# Call the confirm method
await security_analyzer.confirm(medium_risk_action)
# Verify that a confirmation action was added
assert len(confirmation_actions) > 0
assert isinstance(confirmation_actions[0], ChangeAgentStateAction)
assert confirmation_actions[0].agent_state == AgentState.USER_CONFIRMED
@pytest.mark.asyncio
async def test_security_analyzer_user_rejection(temp_dir):
"""Test that actions can be rejected by the user."""
# Set up the event stream
file_store = get_file_store('local', temp_dir)
event_stream = EventStream('main', file_store)
# Create a medium-risk command action
medium_risk_action = CmdRunAction(
command='wget https://example.com/file.sh -O /tmp/file.sh',
thought='Downloading a file',
)
medium_risk_action._source = EventSource.AGENT
# Mock the Docker and HTTP clients
mock_container = MagicMock()
mock_container.status = 'running'
mock_container.attrs = {
'NetworkSettings': {'Ports': {'8000/tcp': [{'HostPort': '34567'}]}}
}
mock_docker = MagicMock()
mock_docker.from_env().containers.list.return_value = [mock_container]
mock_httpx = MagicMock()
mock_httpx.get().json.return_value = {'id': 'mock-session-id'}
mock_httpx.post().json.side_effect = [
{'monitor_id': 'mock-monitor-id'},
[
'PolicyViolation(Potentially unsafe command [risk=medium], ranges=[<2 ranges>])'
], # Security check response
]
# Create a policy that marks certain commands as medium risk
policy = """
raise "Potentially unsafe command [risk=medium]" if:
(call: ToolCall)
call is tool:run
match("wget", call.function.arguments.command)
"""
with (
patch(f'{InvariantAnalyzer.__module__}.docker', mock_docker),
patch(f'{InvariantClient.__module__}.httpx', mock_httpx),
):
# Create the security analyzer
# Set the RISK_SEVERITY setting to HIGH (so MEDIUM risk actions need confirmation)
security_analyzer = InvariantAnalyzer(event_stream, policy)
# Set the RISK_SEVERITY setting to HIGH (so MEDIUM risk actions need confirmation)
security_analyzer.settings = {'RISK_SEVERITY': ActionSecurityRisk.HIGH}
# Add the action to the event stream
event_stream.add_event(medium_risk_action, EventSource.AGENT)
# Manually call the security analyzer's on_event method
await security_analyzer.on_event(medium_risk_action)
# Verify that the security risk was set to MEDIUM
assert hasattr(medium_risk_action, 'security_risk')
assert medium_risk_action.security_risk == ActionSecurityRisk.MEDIUM
# Set the action to awaiting confirmation
medium_risk_action.confirmation_state = (
ActionConfirmationStatus.AWAITING_CONFIRMATION
)
# Verify that the security analyzer would confirm the action
assert await security_analyzer.should_confirm(medium_risk_action)
# Set the action to rejected
medium_risk_action.confirmation_state = ActionConfirmationStatus.REJECTED
# Verify that the security analyzer would not confirm the action
assert not await security_analyzer.should_confirm(medium_risk_action)