Compare commits

...

1 Commits

Author SHA1 Message Date
openhands 11def95da0 CLI: Implement /clear command to start new conversations
- Modified /clear command to create new conversation and runner instances instead of just clearing screen
- Updated /new command to match /clear functionality for consistency
- Updated help text: '/clear': 'Start a new conversation from scratch'
- Added error handling for conversation setup failures
- Added test to verify /clear command description is correct
- Applied code formatting with ruff

Fixes #11121

Co-authored-by: openhands <openhands@all-hands.dev>
2025-09-25 16:01:29 +00:00
12 changed files with 156 additions and 140 deletions
+32 -4
View File
@@ -91,7 +91,22 @@ def run_cli_entry() -> None:
continue
elif command == "/clear":
display_welcome(session_id)
print_formatted_text(
HTML("<yellow>Starting new conversation...</yellow>")
)
# Create a new conversation to clear context
try:
conversation = setup_conversation()
runner = ConversationRunner(conversation)
session_id = str(uuid.uuid4())[:8]
display_welcome(session_id)
print_formatted_text(
HTML("<green>✓ New conversation started</green>")
)
except Exception as e:
print_formatted_text(
HTML(f"<red>Error creating new conversation: {e}</red>")
)
continue
elif command == "/help":
display_help()
@@ -108,7 +123,9 @@ def run_cli_entry() -> None:
continue
elif command == "/confirm":
runner.toggle_confirmation_mode()
new_status = "enabled" if runner.is_confirmation_mode_enabled else "disabled"
new_status = (
"enabled" if runner.is_confirmation_mode_enabled else "disabled"
)
print_formatted_text(
HTML(f"<yellow>Confirmation mode {new_status}</yellow>")
)
@@ -117,8 +134,19 @@ def run_cli_entry() -> None:
print_formatted_text(
HTML("<yellow>Starting new conversation...</yellow>")
)
session_id = str(uuid.uuid4())[:8]
display_welcome(session_id)
# Create a new conversation to clear context
try:
conversation = setup_conversation()
runner = ConversationRunner(conversation)
session_id = str(uuid.uuid4())[:8]
display_welcome(session_id)
print_formatted_text(
HTML("<green>✓ New conversation started</green>")
)
except Exception as e:
print_formatted_text(
HTML(f"<red>Error creating new conversation: {e}</red>")
)
continue
elif command == "/resume":
if not (
+5 -14
View File
@@ -3,7 +3,7 @@ from openhands.sdk.security.confirmation_policy import (
AlwaysConfirm,
NeverConfirm,
ConfirmRisky,
ConfirmationPolicyBase
ConfirmationPolicyBase,
)
from openhands.sdk.conversation.state import AgentExecutionStatus
from openhands.sdk.event.utils import get_unmatched_actions
@@ -30,11 +30,11 @@ class ConversationRunner:
else:
self.set_confirmation_policy(AlwaysConfirm())
def set_confirmation_policy(self, confirmation_policy: ConfirmationPolicyBase) -> None:
def set_confirmation_policy(
self, confirmation_policy: ConfirmationPolicyBase
) -> None:
self.conversation.set_confirmation_policy(confirmation_policy)
def _start_listener(self) -> None:
self.listener = PauseListener(on_pause=self.conversation.pause)
self.listener.start()
@@ -121,28 +121,23 @@ class ConversationRunner:
if not pending_actions:
return UserConfirmation.ACCEPT
result = ask_user_confirmation(
pending_actions,
isinstance(self.conversation.state.confirmation_policy, ConfirmRisky)
isinstance(self.conversation.state.confirmation_policy, ConfirmRisky),
)
decision = result.decision
policy_change = result.policy_change
if decision == UserConfirmation.REJECT:
self.conversation.reject_pending_actions(
result.reason or "User rejected the actions"
)
return decision
if decision == UserConfirmation.DEFER:
self.conversation.pause()
return decision
if isinstance(policy_change, NeverConfirm):
print_formatted_text(
HTML(
@@ -152,8 +147,6 @@ class ConversationRunner:
self.set_confirmation_policy(policy_change)
return decision
if isinstance(policy_change, ConfirmRisky):
print_formatted_text(
HTML(
@@ -164,8 +157,6 @@ class ConversationRunner:
self.set_confirmation_policy(policy_change)
return decision
# Accept action without changing existing policies
assert decision == UserConfirmation.ACCEPT
return decision
+7 -6
View File
@@ -1,7 +1,4 @@
from openhands.sdk import (
Conversation,
BaseConversation
)
from openhands.sdk import Conversation, BaseConversation
from openhands_cli.tui.settings.store import AgentStore
from prompt_toolkit import HTML, print_formatted_text
from openhands.tools.execute_bash import BashTool
@@ -16,12 +13,14 @@ register_tool("TaskTrackerTool", TaskTrackerTool)
class MissingAgentSpec(Exception):
"""Raised when agent specification is not found or invalid."""
pass
def setup_conversation() -> BaseConversation:
"""
Setup the conversation with agent.
Raises:
MissingAgentSpec: If agent specification is not found or invalid.
"""
@@ -29,7 +28,9 @@ def setup_conversation() -> BaseConversation:
agent_store = AgentStore()
agent = agent_store.load()
if not agent:
raise MissingAgentSpec("Agent specification not found. Please configure your agent settings.")
raise MissingAgentSpec(
"Agent specification not found. Please configure your agent settings."
)
# Create conversation - agent context is now set in AgentStore.load()
conversation = Conversation(agent=agent)
@@ -35,6 +35,7 @@ def main() -> None:
except Exception as e:
print_formatted_text(HTML(f"<red>Error starting agent chat: {e}</red>"))
import traceback
traceback.print_exc()
raise
@@ -21,6 +21,7 @@ from prompt_toolkit.widgets import Frame, TextArea
from openhands_cli.pt_style import COLOR_GREY
class SettingsScreen:
def __init__(self, conversation: Conversation | None = None):
self.file_store = LocalFileStore(PERSISTENCE_DIR)
@@ -39,7 +40,7 @@ class SettingsScreen:
labels_and_values = []
if not advanced_llm_settings:
# Attempt to determine provider, fallback if not directly available
provider = llm.model.split('/')[0] if '/' in llm.model else 'Unknown'
provider = llm.model.split("/")[0] if "/" in llm.model else "Unknown"
labels_and_values.extend(
[
@@ -52,15 +53,27 @@ class SettingsScreen:
[
(" Custom Model", llm.model),
(" Base URL", llm.base_url),
]
)
labels_and_values.extend([
(" API Key", "********" if llm.api_key else "Not Set"),
(" Confirmation Mode", "Enabled" if self.conversation.confirmation_policy_active else "Disabled"),
(" Memory Condensation", "Enabled" if agent_spec.condenser else "Disabled"),
(" Configuration File", os.path.join(PERSISTENCE_DIR, AGENT_SETTINGS_PATH))
])
labels_and_values.extend(
[
(" API Key", "********" if llm.api_key else "Not Set"),
(
" Confirmation Mode",
"Enabled"
if self.conversation.confirmation_policy_active
else "Disabled",
),
(
" Memory Condensation",
"Enabled" if agent_spec.condenser else "Disabled",
),
(
" Configuration File",
os.path.join(PERSISTENCE_DIR, AGENT_SETTINGS_PATH),
),
]
)
# Calculate max widths for alignment
# Ensure values are strings for len() calculation
@@ -115,11 +128,11 @@ class SettingsScreen:
step_counter,
provider,
self.conversation.agent.llm.api_key if self.conversation else None,
escapable=escapable
escapable=escapable,
)
save_settings_confirmation()
except KeyboardInterrupt:
print_formatted_text(HTML('\n<red>Cancelled settings change.</red>'))
print_formatted_text(HTML("\n<red>Cancelled settings change.</red>"))
return
# Store the collected settings for persistence
@@ -133,74 +146,43 @@ class SettingsScreen:
base_url = prompt_base_url(step_counter)
api_key = prompt_api_key(
step_counter,
custom_model.split('/')[0] if len(custom_model.split('/')) > 1 else '',
custom_model.split("/")[0] if len(custom_model.split("/")) > 1 else "",
self.conversation.agent.llm.api_key if self.conversation else None,
escapable=escapable
escapable=escapable,
)
memory_condensation = choose_memory_condensation(step_counter)
# Confirm save
save_settings_confirmation()
except KeyboardInterrupt:
print_formatted_text(HTML('\n<red>Cancelled settings change.</red>'))
print_formatted_text(HTML("\n<red>Cancelled settings change.</red>"))
return
# Store the collected settings for persistence
self._save_advanced_settings(
custom_model,
base_url,
api_key,
memory_condensation
custom_model, base_url, api_key, memory_condensation
)
def _save_llm_settings(
self,
model,
api_key,
base_url: str | None = None
) -> None:
llm = LLM(
model=model,
api_key=api_key,
base_url=base_url,
service_id="agent"
)
def _save_llm_settings(self, model, api_key, base_url: str | None = None) -> None:
llm = LLM(model=model, api_key=api_key, base_url=base_url, service_id="agent")
agent = self.agent_store.load()
if not agent:
agent = get_default_agent(
llm=llm,
working_dir=WORK_DIR,
cli_mode=True
)
agent = get_default_agent(llm=llm, working_dir=WORK_DIR, cli_mode=True)
agent = agent.model_copy(update={"llm": llm})
self.agent_store.save(agent)
def _save_advanced_settings(
self,
custom_model: str,
base_url: str,
api_key: str,
memory_condensation: bool
self, custom_model: str, base_url: str, api_key: str, memory_condensation: bool
):
self._save_llm_settings(
custom_model,
api_key,
base_url=base_url
)
self._save_llm_settings(custom_model, api_key, base_url=base_url)
agent_spec = self.agent_store.load()
if not agent_spec:
return
if not memory_condensation:
agent_spec.model_copy(update={"condenser": None})
self.agent_store.save(agent_spec)
@@ -9,6 +9,7 @@ from prompt_toolkit import HTML, print_formatted_text
class AgentStore:
"""Single source of truth for persisting/retrieving AgentSpec."""
def __init__(self) -> None:
self.file_store = LocalFileStore(root=PERSISTENCE_DIR)
@@ -21,27 +22,27 @@ class AgentStore:
updated_tools = get_default_tools(
working_dir=WORK_DIR,
persistence_dir=PERSISTENCE_DIR,
enable_browser=False
enable_browser=False,
)
# Create agent context with current working directory
agent_context = AgentContext(
system_message_suffix=f"You current working directory is: {WORK_DIR}",
)
agent = agent.model_copy(update={
"tools": updated_tools,
"agent_context": agent_context
})
agent = agent.model_copy(
update={"tools": updated_tools, "agent_context": agent_context}
)
return agent
except FileNotFoundError:
return None
except Exception:
print_formatted_text(HTML("\n<red>Agent configuration file is corrupted!</red>"))
print_formatted_text(
HTML("\n<red>Agent configuration file is corrupted!</red>")
)
return None
def save(self, agent: Agent) -> None:
serialized_spec = agent.model_dump_json(context={"expose_secrets": True})
self.file_store.write(AGENT_SETTINGS_PATH, serialized_spec)
+1 -1
View File
@@ -15,7 +15,7 @@ DEFAULT_STYLE = get_cli_style()
COMMANDS = {
"/exit": "Exit the application",
"/help": "Display available commands",
"/clear": "Clear the screen",
"/clear": "Start a new conversation from scratch",
"/status": "Display conversation details",
"/confirm": "Toggle confirmation mode on/off",
"/new": "Create a new conversation",
@@ -9,9 +9,9 @@ from openhands_cli.user_actions.settings_action import (
from openhands_cli.user_actions.types import UserConfirmation
__all__ = [
'ask_user_confirmation',
'exit_session_confirmation',
'UserConfirmation',
'settings_type_confirmation',
'choose_llm_provider',
"ask_user_confirmation",
"exit_session_confirmation",
"UserConfirmation",
"settings_type_confirmation",
"choose_llm_provider",
]
@@ -2,7 +2,7 @@ from prompt_toolkit import HTML, print_formatted_text
from openhands.sdk.security.confirmation_policy import (
ConfirmRisky,
SecurityRisk,
NeverConfirm
NeverConfirm,
)
from openhands_cli.user_actions.types import UserConfirmation, ConfirmationResult
@@ -10,8 +10,7 @@ from openhands_cli.user_actions.utils import cli_confirm, cli_text_input
def ask_user_confirmation(
pending_actions: list,
using_risk_based_policy: bool = False
pending_actions: list, using_risk_based_policy: bool = False
) -> ConfirmationResult:
"""Ask user to confirm pending actions.
@@ -65,7 +64,7 @@ def ask_user_confirmation(
elif index == 2:
try:
reason_result = cli_text_input(
'Please enter your reason for rejecting these actions: '
"Please enter your reason for rejecting these actions: "
)
except Exception:
return ConfirmationResult(decision=UserConfirmation.DEFER)
@@ -73,10 +72,10 @@ def ask_user_confirmation(
# Support both string return and (reason, cancelled) tuple for tests
cancelled = False
if isinstance(reason_result, tuple) and len(reason_result) >= 1:
reason = reason_result[0] or ''
reason = reason_result[0] or ""
cancelled = bool(reason_result[1]) if len(reason_result) > 1 else False
else:
reason = str(reason_result or '').strip()
reason = str(reason_result or "").strip()
if cancelled:
return ConfirmationResult(decision=UserConfirmation.DEFER)
@@ -84,13 +83,12 @@ def ask_user_confirmation(
return ConfirmationResult(decision=UserConfirmation.REJECT, reason=reason)
elif index == 3:
return ConfirmationResult(
decision=UserConfirmation.ACCEPT,
policy_change=NeverConfirm()
decision=UserConfirmation.ACCEPT, policy_change=NeverConfirm()
)
elif index == 4:
return ConfirmationResult(
decision=UserConfirmation.ACCEPT,
policy_change=ConfirmRisky(threshold=SecurityRisk.HIGH)
policy_change=ConfirmRisky(threshold=SecurityRisk.HIGH),
)
return ConfirmationResult(decision=UserConfirmation.REJECT)
@@ -5,10 +5,7 @@ from prompt_toolkit.completion import FuzzyWordCompleter
from pydantic import SecretStr
from openhands.sdk.llm import (
VERIFIED_MODELS,
UNVERIFIED_MODELS_EXCLUDING_BEDROCK
)
from openhands.sdk.llm import VERIFIED_MODELS, UNVERIFIED_MODELS_EXCLUDING_BEDROCK
from openhands_cli.user_actions.utils import cli_confirm, cli_text_input
from prompt_toolkit.validation import Validator, ValidationError
@@ -24,35 +21,37 @@ class NonEmptyValueValidator(Validator):
class SettingsType(Enum):
BASIC = 'basic'
ADVANCED = 'advanced'
BASIC = "basic"
ADVANCED = "advanced"
def settings_type_confirmation() -> SettingsType:
question = 'Which settings would you like to modify?'
question = "Which settings would you like to modify?"
choices = [
'LLM (Basic)',
'LLM (Advanced)',
'Go back',
"LLM (Basic)",
"LLM (Advanced)",
"Go back",
]
index = cli_confirm(question, choices)
if choices[index] == 'Go back':
if choices[index] == "Go back":
raise KeyboardInterrupt
options_map = {
0: SettingsType.BASIC,
1: SettingsType.ADVANCED
}
options_map = {0: SettingsType.BASIC, 1: SettingsType.ADVANCED}
return options_map.get(index)
def choose_llm_provider(step_counter: StepCounter, escapable=True) -> str:
question = step_counter.next_step('Select LLM Provider (TAB for options, CTRL-c to cancel): ')
options = list(VERIFIED_MODELS.keys()).copy() + list(UNVERIFIED_MODELS_EXCLUDING_BEDROCK.keys()).copy()
alternate_option = 'Select another provider'
question = step_counter.next_step(
"Select LLM Provider (TAB for options, CTRL-c to cancel): "
)
options = (
list(VERIFIED_MODELS.keys()).copy()
+ list(UNVERIFIED_MODELS_EXCLUDING_BEDROCK.keys()).copy()
)
alternate_option = "Select another provider"
display_options = options[:4] + [alternate_option]
@@ -61,7 +60,9 @@ def choose_llm_provider(step_counter: StepCounter, escapable=True) -> str:
if display_options[index] != alternate_option:
return chosen_option
question = step_counter.existing_step('Type LLM Provider (TAB to complete, CTRL-c to cancel): ')
question = step_counter.existing_step(
"Type LLM Provider (TAB to complete, CTRL-c to cancel): "
)
return cli_text_input(
question, escapable=True, completer=FuzzyWordCompleter(options, WORD=True)
)
@@ -70,16 +71,20 @@ def choose_llm_provider(step_counter: StepCounter, escapable=True) -> str:
def choose_llm_model(step_counter: StepCounter, provider: str, escapable=True) -> str:
"""Choose LLM model using spec-driven approach. Return (model, deferred)."""
models = VERIFIED_MODELS.get(provider, []) + UNVERIFIED_MODELS_EXCLUDING_BEDROCK.get(provider, [])
models = VERIFIED_MODELS.get(
provider, []
) + UNVERIFIED_MODELS_EXCLUDING_BEDROCK.get(provider, [])
if provider == 'openhands':
if provider == "openhands":
question = (
step_counter.next_step('Select Available OpenHands Model:\n')
+ 'LLM usage is billed at the providers rates with no markup. Details: https://docs.all-hands.dev/usage/llms/openhands-llms'
step_counter.next_step("Select Available OpenHands Model:\n")
+ "LLM usage is billed at the providers rates with no markup. Details: https://docs.all-hands.dev/usage/llms/openhands-llms"
)
else:
question = step_counter.next_step('Select LLM Model (TAB for options, CTRL-c to cancel): ')
alternate_option = 'Select another model'
question = step_counter.next_step(
"Select LLM Model (TAB for options, CTRL-c to cancel): "
)
alternate_option = "Select another model"
display_options = models[:4] + [alternate_option]
index = cli_confirm(question, display_options, escapable=escapable)
chosen_option = display_options[index]
@@ -87,19 +92,20 @@ def choose_llm_model(step_counter: StepCounter, provider: str, escapable=True) -
if chosen_option != alternate_option:
return chosen_option
question = step_counter.existing_step('Type model id (TAB to complete, CTRL-c to cancel): ')
question = step_counter.existing_step(
"Type model id (TAB to complete, CTRL-c to cancel): "
)
return cli_text_input(
question, escapable=True, completer=FuzzyWordCompleter(models, WORD=True)
)
def prompt_api_key(
step_counter: StepCounter,
provider: str,
existing_api_key: SecretStr | None = None,
escapable=True
escapable=True,
) -> str:
helper_text = (
"\nYou can find your OpenHands LLM API Key in the API Keys tab of OpenHands Cloud: "
@@ -109,17 +115,19 @@ def prompt_api_key(
)
if existing_api_key:
masked_key = existing_api_key.get_secret_value()[:3] + '***'
question = f'Enter API Key [{masked_key}] (CTRL-c to cancel, ENTER to keep current, type new to change): '
masked_key = existing_api_key.get_secret_value()[:3] + "***"
question = f"Enter API Key [{masked_key}] (CTRL-c to cancel, ENTER to keep current, type new to change): "
# For existing keys, allow empty input to keep current key
validator = None
else:
question = 'Enter API Key (CTRL-c to cancel): '
question = "Enter API Key (CTRL-c to cancel): "
# For new keys, require non-empty input
validator = NonEmptyValueValidator()
question = helper_text + step_counter.next_step(question)
return cli_text_input(question, escapable=escapable, validator=validator, is_password=True)
return cli_text_input(
question, escapable=escapable, validator=validator, is_password=True
)
# Advanced settings functions
@@ -132,13 +140,15 @@ def prompt_custom_model(step_counter: StepCounter, escapable=True) -> str:
def prompt_base_url(step_counter: StepCounter, escapable=True) -> str:
"""Prompt for base URL."""
question = step_counter.next_step("Base URL (CTRL-c to cancel): ")
return cli_text_input(question, escapable=escapable, validator=NonEmptyValueValidator())
return cli_text_input(
question, escapable=escapable, validator=NonEmptyValueValidator()
)
def choose_memory_condensation(step_counter: StepCounter, escapable=True) -> bool:
"""Choose memory condensation setting."""
question = step_counter.next_step("Memory Condensation (CTRL-c to cancel): ")
choices = ['Enable', 'Disable']
choices = ["Enable", "Disable"]
index = cli_confirm(question, choices, escapable=escapable)
return index == 0 # True for Enable, False for Disable
@@ -146,9 +156,9 @@ def choose_memory_condensation(step_counter: StepCounter, escapable=True) -> boo
def save_settings_confirmation() -> bool:
"""Prompt user to confirm saving settings."""
question = 'Save new settings? (They will take effect after restart)'
discard = 'No, discard'
options = ['Yes, save', discard]
question = "Save new settings? (They will take effect after restart)"
discard = "No, discard"
options = ["Yes, save", discard]
index = cli_confirm(question, options)
if options[index] == discard:
@@ -106,7 +106,7 @@ def cli_text_input(
escapable: bool = True,
completer: Completer | None = None,
validator: Validator = None,
is_password: bool = False
is_password: bool = False,
) -> str:
"""Prompt user to enter text input with optional validation.
@@ -126,15 +126,14 @@ def cli_text_input(
if escapable:
@kb.add('c-c')
@kb.add("c-c")
def _(event: KeyPressEvent) -> None:
raise KeyboardInterrupt()
@kb.add('c-p')
@kb.add("c-p")
def _(event: KeyPressEvent) -> None:
raise KeyboardInterrupt()
reason = str(
prompt(
question,
@@ -142,7 +141,7 @@ def cli_text_input(
key_bindings=kb,
completer=completer,
is_password=is_password,
validator=validator
validator=validator,
)
)
return reason.strip()
+5
View File
@@ -92,3 +92,8 @@ def test_commands_dict() -> None:
assert command.startswith('/')
assert isinstance(description, str)
assert len(description) > 0
def test_clear_command_description() -> None:
"""Test that /clear command has the correct description."""
assert COMMANDS['/clear'] == "Start a new conversation from scratch"