Files
OpenHands/opendevin/security/invariant/client.py
adragos e0b67ad2f1 feat: add Security Analyzer functionality (#3058)
* feat: Initial work on security analyzer

* feat: Add remote invariant client

* chore: improve fault tolerance of client

* feat: Add button to enable Invariant Security Analyzer

* [feat] confirmation mode for bash actions

* feat: Add Invariant Tab with security risk outputs

* feat: Add modal setting for Confirmation Mode

* fix: frontend tests for confirmation mode switch

* fix: add missing CONFIRMATION_MODE value in SettingsModal.test.tsx

* fix: update test to integrate new setting

* feat: Initial work on security analyzer

* feat: Add remote invariant client

* chore: improve fault tolerance of client

* feat: Add button to enable Invariant Security Analyzer

* feat: Add Invariant Tab with security risk outputs

* feat: integrate security analyzer with confirmation mode

* feat: improve invariant analyzer tab

* feat: Implement user confirmation for running bash/python code

* fix: don't display rejected actions

* fix: make confirmation show only on assistant messages

* feat: download traces, update policy, implement settings, auto-approve based on defined risk

* Fix: low risk not being shown because it's 0

* fix: duplicate logs in tab

* fix: log duplication

* chore: prepare for merge, remove logging

* Merge confirmation_mode from OpenDevin main

* test: update tests to pass

* chore: finish merging changes, security analyzer now operational again

* feat: document Security Analyzers

* refactor: api, monitor

* chore: lint, fix risk None, revert policy

* fix: check security_risk for None

* refactor: rename instances of invariant to security analyzer

* feat: add /api/options/security-analyzers endpoint

* Move security analyzer from tab to modal

* Temporary fix lock when security analyzer is not chosen

* feat: don't show lock at all when security analyzer is not enabled

* refactor:
- Frontend:
* change type of SECURITY_ANALYZER from bool to string
* add combobox to select SECURITY_ANALYZER, current options are "invariant and "" (no security analyzer)
* Security is now a modal, lock in bottom right is visible only if there's a security analyzer selected
- Backend:
* add close to SecurityAnalyzer
* instantiate SecurityAnalyzer based on provided string from frontend

* fix: update close to be async, to be consistent with other close on resources

* fix: max height of modal (prevent overflow)

* feat: add logo

* small fixes

* update docs for creating a security analyzer module

* fix linting

* update timeout for http client

* fix: move security_analyzer config from agent to session

* feat: add security_risk to browser actions

* add optional remark on combobox

* fix: asdict not called on dataclass, remove invariant dependency

* fix: exclude None values when serializing

* feat: take default policy from invariant-server instead of being hardcoded

* fix: check if policy is None

* update image name

* test: fix some failing runs

* fix: security analyzer tests

* refactor: merge confirmation_mode and security_analyzer into SecurityConfig. Change invariant error message for docker

* test: add tests for invariant parsing actions / observations

* fix: python linting for test_security.py

* Apply suggestions from code review

Co-authored-by: Engel Nyst <enyst@users.noreply.github.com>

* use ActionSecurityRisk | None intead of Optional

* refactor action parsing

* add extra check

* lint parser.py

* test: add field keep_prompt to test_security

* docs: add information about how to enable the analyzer

* test: Remove trailing whitespace in README.md text

---------

Co-authored-by: Mislav Balunovic <mislav.balunovic@gmail.com>
Co-authored-by: Engel Nyst <enyst@users.noreply.github.com>
Co-authored-by: Xingyao Wang <xingyao6@illinois.edu>
2024-08-13 11:29:41 +00:00

136 lines
5.0 KiB
Python

import time
from typing import Any, Optional, Tuple, Union, List, Dict
import requests
from requests.exceptions import ConnectionError, HTTPError, Timeout
class InvariantClient:
timeout: int = 120
def __init__(self, server_url: str, session_id: Optional[str] = None):
self.server = server_url
self.session_id, err = self._create_session(session_id)
if err:
raise RuntimeError(f'Failed to create session: {err}')
self.Policy = self._Policy(self)
self.Monitor = self._Monitor(self)
def _create_session(
self, session_id: Optional[str] = None
) -> Tuple[Optional[str], Optional[Exception]]:
elapsed = 0
while elapsed < self.timeout:
try:
if session_id:
response = requests.get(
f'{self.server}/session/new?session_id={session_id}', timeout=60
)
else:
response = requests.get(f'{self.server}/session/new', timeout=60)
response.raise_for_status()
return response.json().get('id'), None
except (ConnectionError, Timeout):
elapsed += 1
time.sleep(1)
except HTTPError as http_err:
return None, http_err
except Exception as err:
return None, err
return None, ConnectionError('Connection timed out')
def close_session(self) -> Union[None, Exception]:
try:
response = requests.delete(
f'{self.server}/session/?session_id={self.session_id}', timeout=60
)
response.raise_for_status()
except (ConnectionError, Timeout, HTTPError) as err:
return err
return None
class _Policy:
def __init__(self, invariant):
self.server = invariant.server
self.session_id = invariant.session_id
def _create_policy(self, rule: str) -> Tuple[Optional[str], Optional[Exception]]:
try:
response = requests.post(
f'{self.server}/policy/new?session_id={self.session_id}',
json={'rule': rule},
timeout=60,
)
response.raise_for_status()
return response.json().get('policy_id'), None
except (ConnectionError, Timeout, HTTPError) as err:
return None, err
def get_template(self) -> Tuple[Optional[str], Optional[Exception]]:
try:
response = requests.get(
f'{self.server}/policy/template',
timeout=60,
)
response.raise_for_status()
return response.json(), None
except (ConnectionError, Timeout, HTTPError) as err:
return None, err
def from_string(self, rule: str):
policy_id, err = self._create_policy(rule)
if err:
raise err
self.policy_id = policy_id
return self
def analyze(self, trace: List[Dict]) -> Union[Any, Exception]:
try:
response = requests.post(
f'{self.server}/policy/{self.policy_id}/analyze?session_id={self.session_id}',
json={'trace': trace},
timeout=60,
)
response.raise_for_status()
return response.json(), None
except (ConnectionError, Timeout, HTTPError) as err:
return None, err
class _Monitor:
def __init__(self, invariant):
self.server = invariant.server
self.session_id = invariant.session_id
self.policy = ''
def _create_monitor(self, rule: str) -> Tuple[Optional[str], Optional[Exception]]:
try:
response = requests.post(
f'{self.server}/monitor/new?session_id={self.session_id}',
json={'rule': rule},
timeout=60,
)
response.raise_for_status()
return response.json().get('monitor_id'), None
except (ConnectionError, Timeout, HTTPError) as err:
return None, err
def from_string(self, rule: str):
monitor_id, err = self._create_monitor(rule)
if err:
raise err
self.monitor_id = monitor_id
self.policy = rule
return self
def check(self, past_events: List[Dict], pending_events: List[Dict]) -> Union[Any, Exception]:
try:
response = requests.post(
f'{self.server}/monitor/{self.monitor_id}/check?session_id={self.session_id}',
json={"past_events": past_events, "pending_events": pending_events},
timeout=60,
)
response.raise_for_status()
return response.json(), None
except (ConnectionError, Timeout, HTTPError) as err:
return None, err