[Refactor, Fix]: Agent controller state/metrics management (#9012)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Rohit Malhotra
2025-06-16 11:24:13 -04:00
committed by GitHub
parent cbe32a1a12
commit 2fd1fdcd7e
27 changed files with 1404 additions and 667 deletions

View File

@@ -7,7 +7,6 @@ import time
import traceback
from typing import Callable
import litellm # noqa
from litellm.exceptions import ( # noqa
APIConnectionError,
APIError,
@@ -25,7 +24,8 @@ from litellm.exceptions import ( # noqa
from openhands.controller.agent import Agent
from openhands.controller.replay import ReplayManager
from openhands.controller.state.state import State, TrafficControlState
from openhands.controller.state.state import State
from openhands.controller.state.state_tracker import StateTracker
from openhands.controller.stuck import StuckDetector
from openhands.core.config import AgentConfig, LLMConfig
from openhands.core.exceptions import (
@@ -61,7 +61,6 @@ from openhands.events.action import (
)
from openhands.events.action.agent import CondensationAction, RecallAction
from openhands.events.event import Event
from openhands.events.event_filter import EventFilter
from openhands.events.observation import (
AgentDelegateObservation,
AgentStateChangedObservation,
@@ -69,10 +68,11 @@ from openhands.events.observation import (
NullObservation,
Observation,
)
from openhands.events.serialization.event import event_to_trajectory, truncate_content
from openhands.events.serialization.event import truncate_content
from openhands.llm.llm import LLM
from openhands.llm.metrics import Metrics, TokenUsage
from openhands.memory.view import View
from openhands.storage.files import FileStore
# note: RESUME is only available on web GUI
TRAFFIC_CONTROL_REMINDER = (
@@ -101,11 +101,13 @@ class AgentController:
self,
agent: Agent,
event_stream: EventStream,
max_iterations: int,
max_budget_per_task: float | None = None,
iteration_delta: int,
budget_per_task_delta: float | None = None,
agent_to_llm_config: dict[str, LLMConfig] | None = None,
agent_configs: dict[str, AgentConfig] | None = None,
sid: str | None = None,
file_store: FileStore | None = None,
user_id: str | None = None,
confirmation_mode: bool = False,
initial_state: State | None = None,
is_delegate: bool = False,
@@ -132,7 +134,10 @@ class AgentController:
status_callback: Optional callback function to handle status updates.
replay_events: A list of logs to replay.
"""
self.id = sid or event_stream.sid
self.user_id = user_id
self.file_store = file_store
self.agent = agent
self.headless_mode = headless_mode
self.is_delegate = is_delegate
@@ -146,29 +151,22 @@ class AgentController:
EventStreamSubscriber.AGENT_CONTROLLER, self.on_event, self.id
)
# filter out events that are not relevant to the agent
# so they will not be included in the agent history
self.agent_history_filter = EventFilter(
exclude_types=(
NullAction,
NullObservation,
ChangeAgentStateAction,
AgentStateChangedObservation,
),
exclude_hidden=True,
)
self.state_tracker = StateTracker(sid, file_store, user_id)
# state from the previous session, state from a parent agent, or a fresh state
self.set_initial_state(
state=initial_state,
max_iterations=max_iterations,
max_iterations=iteration_delta,
max_budget_per_task=budget_per_task_delta,
confirmation_mode=confirmation_mode,
)
self.max_budget_per_task = max_budget_per_task
self.state = self.state_tracker.state # TODO: share between manager and controller for backward compatability; we should ideally move all state related logic to the state manager
self.agent_to_llm_config = agent_to_llm_config if agent_to_llm_config else {}
self.agent_configs = agent_configs if agent_configs else {}
self._initial_max_iterations = max_iterations
self._initial_max_budget_per_task = max_budget_per_task
self._initial_max_iterations = iteration_delta
self._initial_max_budget_per_task = budget_per_task_delta
# stuck helper
self._stuck_detector = StuckDetector(self.state)
@@ -214,26 +212,7 @@ class AgentController:
if set_stop_state:
await self.set_agent_state_to(AgentState.STOPPED)
# we made history, now is the time to rewrite it!
# the final state.history will be used by external scripts like evals, tests, etc.
# history will need to be complete WITH delegates events
# like the regular agent history, it does not include:
# - 'hidden' events, events with hidden=True
# - backend events (the default 'filtered out' types, types in self.filter_out)
start_id = self.state.start_id if self.state.start_id >= 0 else 0
end_id = (
self.state.end_id
if self.state.end_id >= 0
else self.event_stream.get_latest_event_id()
)
self.state.history = list(
self.event_stream.search_events(
start_id=start_id,
end_id=end_id,
reverse=False,
filter=self.agent_history_filter,
)
)
self.state_tracker.close(self.event_stream)
# unsubscribe from the event stream
# only the root parent controller subscribes to the event stream
@@ -257,14 +236,6 @@ class AgentController:
extra_merged = {'session_id': self.id, **extra}
getattr(logger, level)(message, extra=extra_merged, stacklevel=2)
def update_state_before_step(self) -> None:
self.state.iteration += 1
self.state.local_iteration += 1
async def update_state_after_step(self) -> None:
# update metrics especially for cost. Use deepcopy to avoid it being modified by agent._reset()
self.state.local_metrics = copy.deepcopy(self.agent.llm.metrics)
async def _react_to_exception(
self,
e: Exception,
@@ -390,10 +361,17 @@ class AgentController:
# If we have a delegate that is not finished or errored, forward events to it
if self.delegate is not None:
delegate_state = self.delegate.get_agent_state()
if delegate_state not in (
AgentState.FINISHED,
AgentState.ERROR,
AgentState.REJECTED,
if (
delegate_state
not in (
AgentState.FINISHED,
AgentState.ERROR,
AgentState.REJECTED,
)
or 'RuntimeError: Agent reached maximum iteration.'
in self.delegate.state.last_error
or 'RuntimeError:Agent reached maximum budget for conversation'
in self.delegate.state.last_error
):
# Forward the event to delegate and skip parent processing
asyncio.get_event_loop().run_until_complete(
@@ -412,9 +390,7 @@ class AgentController:
if hasattr(event, 'hidden') and event.hidden:
return
# if the event is not filtered out, add it to the history
if self.agent_history_filter.include(event):
self.state.history.append(event)
self.state_tracker.add_history(event)
if isinstance(event, Action):
await self._handle_action(event)
@@ -457,11 +433,9 @@ class AgentController:
elif isinstance(action, AgentFinishAction):
self.state.outputs = action.outputs
self.state.metrics.merge(self.state.local_metrics)
await self.set_agent_state_to(AgentState.FINISHED)
elif isinstance(action, AgentRejectAction):
self.state.outputs = action.outputs
self.state.metrics.merge(self.state.local_metrics)
await self.set_agent_state_to(AgentState.REJECTED)
async def _handle_observation(self, observation: Observation) -> None:
@@ -481,8 +455,10 @@ class AgentController:
log_level, str(observation_to_print), extra={'msg_type': 'OBSERVATION'}
)
# TODO: these metrics come from the draft editor, and they get accumulated into controller's state metrics and the agent's llm metrics
# In the future, we should have a more principled way to sharing metrics across all LLM instances for a given conversation
if observation.llm_metrics is not None:
self.agent.llm.metrics.merge(observation.llm_metrics)
self.state_tracker.merge_metrics(observation.llm_metrics)
# this happens for runnable actions and microagent actions
if self._pending_action and self._pending_action.id == observation.cause:
@@ -496,9 +472,6 @@ class AgentController:
if self.state.agent_state == AgentState.USER_REJECTED:
await self.set_agent_state_to(AgentState.AWAITING_USER_INPUT)
return
elif isinstance(observation, ErrorObservation):
if self.state.agent_state == AgentState.ERROR:
self.state.metrics.merge(self.state.local_metrics)
async def _handle_message_action(self, action: MessageAction) -> None:
"""Handles message actions from the event stream.
@@ -516,22 +489,6 @@ class AgentController:
str(action),
extra={'msg_type': 'ACTION', 'event_source': EventSource.USER},
)
# Extend max iterations when the user sends a message (only in non-headless mode)
if self._initial_max_iterations is not None and not self.headless_mode:
self.state.max_iterations = (
self.state.iteration + self._initial_max_iterations
)
if (
self.state.traffic_control_state == TrafficControlState.THROTTLING
or self.state.traffic_control_state == TrafficControlState.PAUSED
):
self.state.traffic_control_state = TrafficControlState.NORMAL
self.log(
'debug',
f'Extended max iterations to {self.state.max_iterations} after user message',
)
# try to retrieve microagents relevant to the user message
# set pending_action while we search for information
# if this is the first user message for this agent, matters for the microagent info type
first_user_message = self._first_user_message()
@@ -605,36 +562,16 @@ class AgentController:
return
if new_state in (AgentState.STOPPED, AgentState.ERROR):
# sync existing metrics BEFORE resetting the agent
await self.update_state_after_step()
self.state.metrics.merge(self.state.local_metrics)
self._reset()
elif (
new_state == AgentState.RUNNING
and self.state.agent_state == AgentState.PAUSED
# TODO: do we really need both THROTTLING and PAUSED states, or can we clean up one of them completely?
and self.state.traffic_control_state == TrafficControlState.THROTTLING
):
# user intends to interrupt traffic control and let the task resume temporarily
self.state.traffic_control_state = TrafficControlState.PAUSED
# User has chosen to deliberately continue - lets double the max iterations
if (
self.state.iteration is not None
and self.state.max_iterations is not None
and self._initial_max_iterations is not None
and not self.headless_mode
):
if self.state.iteration >= self.state.max_iterations:
self.state.max_iterations += self._initial_max_iterations
if (
self.state.metrics.accumulated_cost is not None
and self.max_budget_per_task is not None
and self._initial_max_budget_per_task is not None
):
if self.state.metrics.accumulated_cost >= self.max_budget_per_task:
self.max_budget_per_task += self._initial_max_budget_per_task
elif self._pending_action is not None and (
# User is allowing to check control limits and expand them if applicable
if (
self.state.agent_state == AgentState.ERROR
and new_state == AgentState.RUNNING
):
self.state_tracker.maybe_increase_control_flags_limits(self.headless_mode)
if self._pending_action is not None and (
new_state in (AgentState.USER_CONFIRMED, AgentState.USER_REJECTED)
):
if hasattr(self._pending_action, 'thought'):
@@ -659,6 +596,10 @@ class AgentController:
EventSource.ENVIRONMENT,
)
# Save state whenever agent state changes to ensure we don't lose state
# in case of crashes or unexpected circumstances
self.save_state()
def get_agent_state(self) -> AgentState:
"""Returns the current state of the agent.
@@ -686,19 +627,27 @@ class AgentController:
agent_cls: type[Agent] = Agent.get_cls(action.agent)
agent_config = self.agent_configs.get(action.agent, self.agent.config)
llm_config = self.agent_to_llm_config.get(action.agent, self.agent.llm.config)
llm = LLM(config=llm_config, retry_listener=self._notify_on_llm_retry)
# Make sure metrics are shared between parent and child for global accumulation
llm = LLM(
config=llm_config,
retry_listener=self.agent.llm.retry_listener,
metrics=self.state.metrics,
)
delegate_agent = agent_cls(llm=llm, config=agent_config)
# Take a snapshot of the current metrics before starting the delegate
state = State(
session_id=self.id.removesuffix('-delegate'),
inputs=action.inputs or {},
local_iteration=0,
iteration=self.state.iteration,
max_iterations=self.state.max_iterations,
iteration_flag=self.state.iteration_flag,
budget_flag=self.state.budget_flag,
delegate_level=self.state.delegate_level + 1,
# global metrics should be shared between parent and child
metrics=self.state.metrics,
# start on top of the stream
start_id=self.event_stream.get_latest_event_id() + 1,
parent_metrics_snapshot=self.state_tracker.get_metrics_snapshot(),
parent_iteration=self.state.iteration_flag.current_value,
)
self.log(
'debug',
@@ -708,10 +657,12 @@ class AgentController:
# Create the delegate with is_delegate=True so it does NOT subscribe directly
self.delegate = AgentController(
sid=self.id + '-delegate',
file_store=self.file_store,
user_id=self.user_id,
agent=delegate_agent,
event_stream=self.event_stream,
max_iterations=self.state.max_iterations,
max_budget_per_task=self.max_budget_per_task,
iteration_delta=self._initial_max_iterations,
budget_per_task_delta=self._initial_max_budget_per_task,
agent_to_llm_config=self.agent_to_llm_config,
agent_configs=self.agent_configs,
initial_state=state,
@@ -730,7 +681,13 @@ class AgentController:
delegate_state = self.delegate.get_agent_state()
# update iteration that is shared across agents
self.state.iteration = self.delegate.state.iteration
self.state.iteration_flag.current_value = (
self.delegate.state.iteration_flag.current_value
)
# Calculate delegate-specific metrics before closing the delegate
delegate_metrics = self.state.get_local_metrics()
logger.info(f'Local metrics for delegate: {delegate_metrics}')
# close the delegate controller before adding new events
asyncio.get_event_loop().run_until_complete(self.delegate.close())
@@ -743,8 +700,12 @@ class AgentController:
# prepare delegate result observation
# TODO: replace this with AI-generated summary (#2395)
# Filter out metrics from the formatted output to avoid clutter
display_outputs = {
k: v for k, v in delegate_outputs.items() if k != 'metrics'
}
formatted_output = ', '.join(
f'{key}: {value}' for key, value in delegate_outputs.items()
f'{key}: {value}' for key, value in display_outputs.items()
)
content = (
f'{self.delegate.agent.name} finishes task with {formatted_output}'
@@ -798,24 +759,16 @@ class AgentController:
self.log(
'debug',
f'LEVEL {self.state.delegate_level} LOCAL STEP {self.state.local_iteration} GLOBAL STEP {self.state.iteration}',
f'LEVEL {self.state.delegate_level} LOCAL STEP {self.state.get_local_step()} GLOBAL STEP {self.state.iteration_flag.current_value}',
extra={'msg_type': 'STEP'},
)
stop_step = False
if self.state.iteration >= self.state.max_iterations:
stop_step = await self._handle_traffic_control(
'iteration', self.state.iteration, self.state.max_iterations
)
if self.max_budget_per_task is not None:
current_cost = self.state.metrics.accumulated_cost
if current_cost > self.max_budget_per_task:
stop_step = await self._handle_traffic_control(
'budget', current_cost, self.max_budget_per_task
)
if stop_step:
logger.warning('Stopping agent due to traffic control')
return
# Ensure budget control flag is synchronized with the latest metrics.
# In the future, we should centralized the use of one LLM object per conversation.
# This will help us unify the cost for auto generating titles, running the condensor, etc.
# Before many microservices will touh the same llm cost field, we should sync with the budget flag for the controller
# and check that we haven't exceeded budget BEFORE executing an agent step.
self.state_tracker.sync_budget_flag_with_metrics()
if self._is_stuck():
await self._react_to_exception(
@@ -823,7 +776,13 @@ class AgentController:
)
return
self.update_state_before_step()
try:
self.state_tracker.run_control_flags()
except Exception as e:
logger.warning('Control flag limits hit')
await self._react_to_exception(e)
return
action: Action = NullAction()
if self._replay_manager.should_replay():
@@ -894,60 +853,9 @@ class AgentController:
self.event_stream.add_event(action, action._source) # type: ignore [attr-defined]
await self.update_state_after_step()
log_level = 'info' if LOG_ALL_EVENTS else 'debug'
self.log(log_level, str(action), extra={'msg_type': 'ACTION'})
def _notify_on_llm_retry(self, retries: int, max: int) -> None:
if self.status_callback is not None:
msg_id = 'STATUS$LLM_RETRY'
self.status_callback(
'info', msg_id, f'Retrying LLM request, {retries} / {max}'
)
async def _handle_traffic_control(
self, limit_type: str, current_value: float, max_value: float
) -> bool:
"""Handles agent state after hitting the traffic control limit.
Args:
limit_type (str): The type of limit that was hit.
current_value (float): The current value of the limit.
max_value (float): The maximum value of the limit.
"""
stop_step = False
if self.state.traffic_control_state == TrafficControlState.PAUSED:
self.log(
'debug', 'Hitting traffic control, temporarily resume upon user request'
)
self.state.traffic_control_state = TrafficControlState.NORMAL
else:
self.state.traffic_control_state = TrafficControlState.THROTTLING
# Format values as integers for iterations, keep decimals for budget
if limit_type == 'iteration':
current_str = str(int(current_value))
max_str = str(int(max_value))
else:
current_str = f'{current_value:.2f}'
max_str = f'{max_value:.2f}'
if self.headless_mode:
e = RuntimeError(
f'Agent reached maximum {limit_type} in headless mode. '
f'Current {limit_type}: {current_str}, max {limit_type}: {max_str}'
)
await self._react_to_exception(e)
else:
e = RuntimeError(
f'Agent reached maximum {limit_type}. '
f'Current {limit_type}: {current_str}, max {limit_type}: {max_str}. '
)
# FIXME: this isn't really an exception--we should have a different path
await self._react_to_exception(e)
stop_step = True
return stop_step
@property
def _pending_action(self) -> Action | None:
"""Get the current pending action with time tracking.
@@ -1015,150 +923,26 @@ class AgentController:
self,
state: State | None,
max_iterations: int,
max_budget_per_task: float | None,
confirmation_mode: bool = False,
) -> None:
"""Sets the initial state for the agent, either from the previous session, or from a parent agent, or by creating a new one.
Args:
state: The state to initialize with, or None to create a new state.
max_iterations: The maximum number of iterations allowed for the task.
confirmation_mode: Whether to enable confirmation mode.
"""
# state can come from:
# - the previous session, in which case it has history
# - from a parent agent, in which case it has no history
# - None / a new state
# If state is None, we create a brand new state and still load the event stream so we can restore the history
if state is None:
self.state = State(
session_id=self.id.removesuffix('-delegate'),
inputs={},
max_iterations=max_iterations,
confirmation_mode=confirmation_mode,
)
self.state.start_id = 0
self.log(
'info',
f'AgentController {self.id} - created new state. start_id: {self.state.start_id}',
)
else:
self.state = state
if self.state.start_id <= -1:
self.state.start_id = 0
self.log(
'info',
f'AgentController {self.id} initializing history from event {self.state.start_id}',
)
):
self.state_tracker.set_initial_state(
self.id,
self.agent,
state,
max_iterations,
max_budget_per_task,
confirmation_mode,
)
# Always load from the event stream to avoid losing history
self._init_history()
self.state_tracker._init_history(
self.event_stream,
)
def get_trajectory(self, include_screenshots: bool = False) -> list[dict]:
# state history could be partially hidden/truncated before controller is closed
assert self._closed
return [
event_to_trajectory(event, include_screenshots)
for event in self.state.history
]
def _init_history(self) -> None:
"""Initializes the agent's history from the event stream.
The history is a list of events that:
- Excludes events of types listed in self.filter_out
- Excludes events with hidden=True attribute
- For delegate events (between AgentDelegateAction and AgentDelegateObservation):
- Excludes all events between the action and observation
- Includes the delegate action and observation themselves
"""
# define range of events to fetch
# delegates start with a start_id and initially won't find any events
# otherwise we're restoring a previous session
start_id = self.state.start_id if self.state.start_id >= 0 else 0
end_id = (
self.state.end_id
if self.state.end_id >= 0
else self.event_stream.get_latest_event_id()
)
# sanity check
if start_id > end_id + 1:
self.log(
'warning',
f'start_id {start_id} is greater than end_id + 1 ({end_id + 1}). History will be empty.',
)
self.state.history = []
return
events: list[Event] = []
# Get rest of history
events_to_add = list(
self.event_stream.search_events(
start_id=start_id,
end_id=end_id,
reverse=False,
filter=self.agent_history_filter,
)
)
events.extend(events_to_add)
# Find all delegate action/observation pairs
delegate_ranges: list[tuple[int, int]] = []
delegate_action_ids: list[int] = [] # stack of unmatched delegate action IDs
for event in events:
if isinstance(event, AgentDelegateAction):
delegate_action_ids.append(event.id)
# Note: we can get agent=event.agent and task=event.inputs.get('task','')
# if we need to track these in the future
elif isinstance(event, AgentDelegateObservation):
# Match with most recent unmatched delegate action
if not delegate_action_ids:
self.log(
'warning',
f'Found AgentDelegateObservation without matching action at id={event.id}',
)
continue
action_id = delegate_action_ids.pop()
delegate_ranges.append((action_id, event.id))
# Filter out events between delegate action/observation pairs
if delegate_ranges:
filtered_events: list[Event] = []
current_idx = 0
for start_id, end_id in sorted(delegate_ranges):
# Add events before delegate range
filtered_events.extend(
event for event in events[current_idx:] if event.id < start_id
)
# Add delegate action and observation
filtered_events.extend(
event for event in events if event.id in (start_id, end_id)
)
# Update index to after delegate range
current_idx = next(
(i for i, e in enumerate(events) if e.id > end_id), len(events)
)
# Add any remaining events after last delegate range
filtered_events.extend(events[current_idx:])
self.state.history = filtered_events
else:
self.state.history = events
# make sure history is in sync
self.state.start_id = start_id
return self.state_tracker.get_trajectory(include_screenshots)
def _handle_long_context_error(self) -> None:
# When context window is exceeded, keep roughly half of agent interactions
@@ -1359,7 +1143,7 @@ class AgentController:
action: The action to attach metrics to
"""
# Get metrics from agent LLM
agent_metrics = self.agent.llm.metrics
agent_metrics = self.state.metrics
# Get metrics from condenser LLM if it exists
condenser_metrics: TokenUsage | None = None
@@ -1390,10 +1174,10 @@ class AgentController:
# Log the metrics information for debugging
# Get the latest usage directly from the agent's metrics
latest_usage = None
if self.agent.llm.metrics.token_usages:
latest_usage = self.agent.llm.metrics.token_usages[-1]
if self.state.metrics.token_usages:
latest_usage = self.state.metrics.token_usages[-1]
accumulated_usage = self.agent.llm.metrics.accumulated_token_usage
accumulated_usage = self.state.metrics.accumulated_token_usage
self.log(
'debug',
f'Action metrics - accumulated_cost: {metrics.accumulated_cost}, '
@@ -1481,3 +1265,6 @@ class AgentController:
None,
)
return self._cached_first_user_message
def save_state(self):
self.state_tracker.save_state()