mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-04-29 03:00:45 -04:00
Compare commits
64 Commits
fix/git-di
...
alona/fix-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1dfbe9390c | ||
|
|
b442637aad | ||
|
|
61e966f877 | ||
|
|
846a9eed7d | ||
|
|
9741207cea | ||
|
|
cb7293b5e0 | ||
|
|
caf3852f59 | ||
|
|
e1e53ca5b6 | ||
|
|
c000f15b0c | ||
|
|
59553e943b | ||
|
|
eb7e800166 | ||
|
|
bcdd410105 | ||
|
|
419eae1878 | ||
|
|
04b3b8a035 | ||
|
|
f98a00def9 | ||
|
|
7a6a759ec6 | ||
|
|
673c455007 | ||
|
|
d5a77719fa | ||
|
|
a77d016a28 | ||
|
|
67f6881ab0 | ||
|
|
072bb0e29e | ||
|
|
aee4f6bcc7 | ||
|
|
04ca22ee1d | ||
|
|
c321c4fc19 | ||
|
|
345cd03ec2 | ||
|
|
045a78e2d7 | ||
|
|
90db91ca84 | ||
|
|
08d1d78525 | ||
|
|
cbe02a405b | ||
|
|
e1874a9b80 | ||
|
|
43a3f38cf8 | ||
|
|
be14d3275f | ||
|
|
8fea63bdef | ||
|
|
3fd0311eac | ||
|
|
6f8fc1142b | ||
|
|
9c9a3f1d5c | ||
|
|
33253292e5 | ||
|
|
8757476ff4 | ||
|
|
676632987d | ||
|
|
4cd33de9b3 | ||
|
|
a85e6e7202 | ||
|
|
a42e7e6283 | ||
|
|
7f413b27ee | ||
|
|
8bb1df6d01 | ||
|
|
92d194286b | ||
|
|
ed52f1c740 | ||
|
|
d078baa918 | ||
|
|
a21db6b863 | ||
|
|
7a07520654 | ||
|
|
60d52b91e6 | ||
|
|
44078776c3 | ||
|
|
85ee18ca0b | ||
|
|
da4c3a4ec0 | ||
|
|
84f5dd0332 | ||
|
|
8e397de517 | ||
|
|
fc2a9158b0 | ||
|
|
437cbc57c0 | ||
|
|
bce166f7a1 | ||
|
|
13fe3589dd | ||
|
|
8a0355ede5 | ||
|
|
44a42a2ee2 | ||
|
|
6a87f1857d | ||
|
|
6c4af040bd | ||
|
|
ba4c593049 |
@@ -27,7 +27,7 @@ repos:
|
||||
- id: ruff
|
||||
entry: ruff check --config enterprise/dev_config/python/ruff.toml
|
||||
types_or: [python, pyi, jupyter]
|
||||
args: [--fix]
|
||||
args: [--fix, --show-fixes]
|
||||
files: ^enterprise/
|
||||
# Run the formatter.
|
||||
- id: ruff-format
|
||||
|
||||
@@ -227,12 +227,23 @@ class SaasUserAuth(UserAuth):
|
||||
def get_api_key_from_header(request: Request):
|
||||
auth_header = request.headers.get('Authorization')
|
||||
if auth_header and auth_header.startswith('Bearer '):
|
||||
return auth_header.replace('Bearer ', '')
|
||||
api_key = auth_header.replace('Bearer ', '')
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Got API key from Authorization header: '
|
||||
f'key_preview={api_key[:10] if api_key else "None"}...'
|
||||
)
|
||||
return api_key
|
||||
|
||||
# This is a temp hack
|
||||
# Streamable HTTP MCP Client works via redirect requests, but drops the Authorization header for reason
|
||||
# We include `X-Session-API-Key` header by default due to nested runtimes, so it used as a drop in replacement here
|
||||
return request.headers.get('X-Session-API-Key')
|
||||
session_api_key = request.headers.get('X-Session-API-Key')
|
||||
if session_api_key:
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Got API key from X-Session-API-Key header: '
|
||||
f'key_preview={session_api_key[:10] if session_api_key else "None"}...'
|
||||
)
|
||||
return session_api_key
|
||||
|
||||
|
||||
async def saas_user_auth_from_bearer(request: Request) -> SaasUserAuth | None:
|
||||
@@ -259,7 +270,11 @@ async def saas_user_auth_from_cookie(request: Request) -> SaasUserAuth | None:
|
||||
try:
|
||||
signed_token = request.cookies.get('keycloak_auth')
|
||||
if not signed_token:
|
||||
logger.info('[TOKEN_DEBUG] No keycloak_auth cookie found in request')
|
||||
return None
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Found keycloak_auth cookie, size={len(signed_token)}'
|
||||
)
|
||||
return await saas_user_auth_from_signed_token(signed_token)
|
||||
except Exception as exc:
|
||||
raise CookieError from exc
|
||||
@@ -272,6 +287,10 @@ async def saas_user_auth_from_signed_token(signed_token: str) -> SaasUserAuth:
|
||||
logger.debug('saas_user_auth_from_signed_token:decoded')
|
||||
access_token = decoded['access_token']
|
||||
refresh_token = decoded['refresh_token']
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Cookie tokens: '
|
||||
f'refresh_token_preview={refresh_token[:20] if refresh_token else "None"}...'
|
||||
)
|
||||
logger.debug(
|
||||
'saas_user_auth_from_signed_token',
|
||||
extra={
|
||||
@@ -287,6 +306,35 @@ async def saas_user_auth_from_signed_token(signed_token: str) -> SaasUserAuth:
|
||||
user_id = access_token_payload['sub']
|
||||
email = access_token_payload['email']
|
||||
email_verified = access_token_payload['email_verified']
|
||||
|
||||
# Check if we have an offline token in the database
|
||||
logger.info(f'[TOKEN_DEBUG] Checking for offline token for user {user_id}')
|
||||
try:
|
||||
offline_token = await token_manager.load_offline_token(user_id)
|
||||
if offline_token:
|
||||
# Compare tokens definitively
|
||||
tokens_match = offline_token == refresh_token
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Token comparison: '
|
||||
f'TOKENS_ARE_{"SAME" if tokens_match else "DIFFERENT"}! '
|
||||
f'Cookie len={len(refresh_token) if refresh_token else 0}, '
|
||||
f'DB len={len(offline_token) if offline_token else 0}'
|
||||
)
|
||||
if not tokens_match:
|
||||
# Log first 50 chars for better comparison
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Cookie token: {refresh_token[:50] if refresh_token else "None"}...'
|
||||
)
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] DB offline token: {offline_token[:50] if offline_token else "None"}...'
|
||||
)
|
||||
# TODO: Consider using offline_token instead of refresh_token
|
||||
# refresh_token = offline_token
|
||||
else:
|
||||
logger.info('[TOKEN_DEBUG] No offline token in DB, using cookie token')
|
||||
except Exception as e:
|
||||
logger.error(f'[TOKEN_DEBUG] Error loading offline token: {e}')
|
||||
|
||||
logger.debug('saas_user_auth_from_signed_token:return')
|
||||
|
||||
return SaasUserAuth(
|
||||
@@ -304,6 +352,11 @@ async def get_user_auth_from_keycloak_id(keycloak_user_id: str) -> UserAuth:
|
||||
offline_token = await token_manager.load_offline_token(keycloak_user_id)
|
||||
if offline_token is None:
|
||||
logger.info('no_offline_token_found')
|
||||
else:
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Using offline token from DB for user {keycloak_user_id}: '
|
||||
f'{offline_token[:20] if offline_token else "None"}...'
|
||||
)
|
||||
|
||||
user_auth = SaasUserAuth(
|
||||
user_id=keycloak_user_id,
|
||||
|
||||
@@ -266,6 +266,10 @@ class TokenManager:
|
||||
user_id = user_info.get('sub')
|
||||
username = user_info.get('preferred_username')
|
||||
logger.info(f'Getting token for user {username} and IDP {idp}')
|
||||
logger.info(
|
||||
'[TOKEN_SOURCE_DEBUG] get_idp_token called with access_token '
|
||||
'(from cookie/request), will check DB for stored tokens'
|
||||
)
|
||||
token_store = await AuthTokenStore.get_instance(
|
||||
keycloak_user_id=user_id, idp=idp
|
||||
)
|
||||
@@ -439,8 +443,16 @@ class TokenManager:
|
||||
0 if refresh_expires_in == 0 else current_time + refresh_expires_in
|
||||
)
|
||||
|
||||
# Log detailed expiration info for debugging
|
||||
access_expires_hours = expires_in / 3600 if expires_in > 0 else 'Never'
|
||||
refresh_expires_days = (
|
||||
refresh_expires_in / 86400 if refresh_expires_in > 0 else 'Never'
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f'Token refresh successful. New access token expires at: {access_token_expires_at}, refresh token expires at: {refresh_token_expires_at}'
|
||||
f'[TOKEN_DEBUG] Token refresh successful. Access token expires in: {access_expires_hours} hours, '
|
||||
f'Refresh token expires in: {refresh_expires_days} days. '
|
||||
f'Raw values - expires_in: {expires_in}s, refresh_expires_in: {refresh_expires_in}s'
|
||||
)
|
||||
return {
|
||||
'access_token': access_token,
|
||||
@@ -457,15 +469,32 @@ class TokenManager:
|
||||
async def get_idp_token_from_offline_token(
|
||||
self, offline_token: str, idp: ProviderType
|
||||
) -> str:
|
||||
logger.info('Getting IDP token from offline token')
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Getting {idp} token from offline token. '
|
||||
f'Token preview: {offline_token[:20] if offline_token else "None"}...'
|
||||
)
|
||||
logger.info(
|
||||
f'[TOKEN_SOURCE_DEBUG] Using OFFLINE token (from DB) to refresh {idp} token, '
|
||||
f'token_length={len(offline_token) if offline_token else 0}'
|
||||
)
|
||||
|
||||
try:
|
||||
logger.info('[TOKEN_DEBUG] Calling Keycloak to refresh offline token...')
|
||||
tokens = await get_keycloak_openid(self.external).a_refresh_token(
|
||||
offline_token
|
||||
)
|
||||
logger.info('[TOKEN_DEBUG] Keycloak refresh successful!')
|
||||
return await self.get_idp_token(tokens['access_token'], idp)
|
||||
except KeycloakConnectionError:
|
||||
logger.exception('KeycloakConnectionError when refreshing token')
|
||||
except KeycloakConnectionError as e:
|
||||
logger.error(
|
||||
f'[TOKEN_DEBUG] KeycloakConnectionError when refreshing token: {e}'
|
||||
)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f'[TOKEN_DEBUG] Unexpected error refreshing Keycloak token: '
|
||||
f'{type(e).__name__}: {e}'
|
||||
)
|
||||
raise
|
||||
|
||||
@retry(
|
||||
|
||||
@@ -8,6 +8,7 @@ from server.clustered_conversation_manager import ClusteredConversationManager
|
||||
from server.saas_nested_conversation_manager import SaasNestedConversationManager
|
||||
|
||||
from openhands.core.config import LLMConfig, OpenHandsConfig
|
||||
from openhands.core.logger import openhands_logger as logger
|
||||
from openhands.events.action import MessageAction
|
||||
from openhands.server.config.server_config import ServerConfig
|
||||
from openhands.server.conversation_manager.conversation_manager import (
|
||||
@@ -33,8 +34,8 @@ class LegacyCacheEntry:
|
||||
|
||||
@dataclass
|
||||
class LegacyConversationManager(ConversationManager):
|
||||
"""
|
||||
Conversation manager for use while migrating - since existing conversations are not nested!
|
||||
"""Conversation manager for use while migrating - since existing conversations are not nested.
|
||||
|
||||
Separate class from SaasNestedConversationManager so it can be easliy removed in a few weeks.
|
||||
(As of 2025-07-23)
|
||||
"""
|
||||
@@ -187,10 +188,33 @@ class LegacyConversationManager(ConversationManager):
|
||||
initial_user_msg: MessageAction | None = None,
|
||||
replay_json: str | None = None,
|
||||
) -> AgentLoopInfo:
|
||||
try:
|
||||
has_tokens = bool(
|
||||
settings
|
||||
and hasattr(settings, 'provider_tokens')
|
||||
and settings.provider_tokens
|
||||
)
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] LegacyManager.maybe_start_agent_loop ENTRY: '
|
||||
f'sid={sid}, user_id={user_id}, has_provider_tokens={has_tokens}'
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f'[TOKEN_DEBUG] Error logging entry: {e}')
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] LegacyManager.maybe_start_agent_loop ENTRY: sid={sid}, user_id={user_id}'
|
||||
)
|
||||
|
||||
if await self.should_start_in_legacy_mode(sid):
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] LegacyManager: Routing {sid} to ClusteredConversationManager (legacy mode)'
|
||||
)
|
||||
return await self.legacy_conversation_manager.maybe_start_agent_loop(
|
||||
sid, settings, user_id, initial_user_msg, replay_json
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] LegacyManager: Routing {sid} to SaasNestedConversationManager (new mode)'
|
||||
)
|
||||
return await self.conversation_manager.maybe_start_agent_loop(
|
||||
sid, settings, user_id, initial_user_msg, replay_json
|
||||
)
|
||||
@@ -270,8 +294,8 @@ class LegacyConversationManager(ConversationManager):
|
||||
del self._legacy_cache[key]
|
||||
|
||||
async def should_start_in_legacy_mode(self, conversation_id: str) -> bool:
|
||||
"""
|
||||
Check if a conversation should run in legacy mode by directly checking the runtime.
|
||||
"""Check if a conversation should run in legacy mode by directly checking the runtime.
|
||||
|
||||
The /list method does not include stopped conversations even though the PVC for these
|
||||
may not yet have been deleted, so we need to check /sessions/{session_id} directly.
|
||||
"""
|
||||
@@ -283,11 +307,32 @@ class LegacyConversationManager(ConversationManager):
|
||||
cached_entry = self._legacy_cache[conversation_id]
|
||||
# Check if the cached value is still valid
|
||||
if time.time() - cached_entry.timestamp <= _LEGACY_ENTRY_TIMEOUT_SECONDS:
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] LegacyManager: Using cached legacy status for {conversation_id}: '
|
||||
f'is_legacy={cached_entry.is_legacy}'
|
||||
)
|
||||
return cached_entry.is_legacy
|
||||
|
||||
# If not in cache or expired, check the runtime directly
|
||||
runtime = await self.conversation_manager._get_runtime(conversation_id)
|
||||
|
||||
# Log runtime details for debugging
|
||||
if runtime:
|
||||
logger.info(
|
||||
f"[TOKEN_DEBUG] LegacyManager: Runtime check for {conversation_id}: "
|
||||
f"status={runtime.get('status')}, has_command={bool(runtime.get('command'))}, "
|
||||
f"command_preview={str(runtime.get('command', ''))[:100]}"
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] LegacyManager: No runtime found for {conversation_id}'
|
||||
)
|
||||
|
||||
is_legacy = self.is_legacy_runtime(runtime)
|
||||
logger.info(
|
||||
f"[TOKEN_DEBUG] LegacyManager: Determined legacy status for {conversation_id}: "
|
||||
f"is_legacy={is_legacy}, will use {'ClusteredConversationManager' if is_legacy else 'SaasNestedConversationManager'}"
|
||||
)
|
||||
|
||||
# Cache the result with current timestamp
|
||||
self._legacy_cache[conversation_id] = LegacyCacheEntry(is_legacy, time.time())
|
||||
@@ -295,8 +340,7 @@ class LegacyConversationManager(ConversationManager):
|
||||
return is_legacy
|
||||
|
||||
def is_legacy_runtime(self, runtime: dict | None) -> bool:
|
||||
"""
|
||||
Determine if a runtime is a legacy runtime based on its command.
|
||||
"""Determine if a runtime is a legacy runtime based on its command.
|
||||
|
||||
Args:
|
||||
runtime: The runtime dictionary or None if not found
|
||||
@@ -304,9 +348,25 @@ class LegacyConversationManager(ConversationManager):
|
||||
Returns:
|
||||
bool: True if this is a legacy runtime, False otherwise
|
||||
"""
|
||||
if runtime is None:
|
||||
# Ensure runtime is actually a dict (not None, mock, or other object)
|
||||
if not isinstance(runtime, dict):
|
||||
return False
|
||||
return 'openhands.server' not in runtime['command']
|
||||
|
||||
# Handle case where command field might not exist (e.g., paused runtimes)
|
||||
command = runtime.get('command', '')
|
||||
if not command:
|
||||
# If no command field, check if this is a paused runtime
|
||||
# Paused runtimes should use the new conversation manager
|
||||
if runtime.get('status', '').lower() == 'paused':
|
||||
return False
|
||||
# Unknown state - default to False (use new manager)
|
||||
return False
|
||||
|
||||
# Ensure command is a string before checking substring
|
||||
if not isinstance(command, str):
|
||||
return False
|
||||
|
||||
return 'openhands.server' not in command
|
||||
|
||||
@classmethod
|
||||
def get_instance(
|
||||
|
||||
@@ -417,12 +417,35 @@ async def refresh_tokens(
|
||||
x_session_api_key: Annotated[str | None, Header(alias='X-Session-API-Key')],
|
||||
) -> TokenResponse:
|
||||
"""Return the latest token for a given provider."""
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] /api/refresh-tokens called: provider={provider}, sid={sid}, '
|
||||
f'has_session_key={bool(x_session_api_key)}'
|
||||
)
|
||||
|
||||
user_id = _get_user_id(sid)
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Got user_id: {user_id[:8]}...' if user_id else 'No user_id'
|
||||
)
|
||||
|
||||
session_api_key = await _get_session_api_key(user_id, sid)
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Session key validation: '
|
||||
f'expected={session_api_key[:8] if session_api_key else None}..., '
|
||||
f'received={x_session_api_key[:8] if x_session_api_key else None}..., '
|
||||
f'match={session_api_key == x_session_api_key}'
|
||||
)
|
||||
|
||||
if session_api_key != x_session_api_key:
|
||||
logger.error(
|
||||
f'[TOKEN_DEBUG] Session key mismatch! Returning 403. '
|
||||
f'Expected: {session_api_key[:8] if session_api_key else "None"}..., '
|
||||
f'Got: {x_session_api_key[:8] if x_session_api_key else "None"}...'
|
||||
)
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail='Forbidden')
|
||||
|
||||
logger.info(f'Refreshing token for conversation {sid}')
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Session validated. Refreshing {provider} token for {sid}'
|
||||
)
|
||||
provider_handler = ProviderHandler(
|
||||
create_provider_tokens_object([provider]), external_auth_id=user_id
|
||||
)
|
||||
|
||||
@@ -174,37 +174,121 @@ class SaasNestedConversationManager(ConversationManager):
|
||||
initial_user_msg: MessageAction | None = None,
|
||||
replay_json: str | None = None,
|
||||
) -> AgentLoopInfo:
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] SaasNestedConversationManager.maybe_start_agent_loop ENTRY: '
|
||||
f'sid={sid}, user_id={user_id}'
|
||||
)
|
||||
|
||||
# First we check redis to see if we are already starting - or the runtime will tell us the session is stopped
|
||||
redis = self._get_redis_client()
|
||||
key = self._get_redis_conversation_key(user_id, sid)
|
||||
starting = await redis.get(key)
|
||||
|
||||
logger.info(f'[TOKEN_DEBUG] Getting runtime for sid={sid}...')
|
||||
runtime = await self._get_runtime(sid)
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Runtime info for {sid}: '
|
||||
f'exists={runtime is not None}, '
|
||||
f'runtime_id={runtime.get("runtime_id") if runtime else None}, '
|
||||
f'status={runtime.get("status") if runtime else None}, '
|
||||
f'session_api_key_exists={bool(runtime.get("session_api_key")) if runtime else False}'
|
||||
)
|
||||
|
||||
# Get raw runtime status for branching decisions
|
||||
raw_runtime_status = (runtime.get('status') or '').lower() if runtime else ''
|
||||
|
||||
# Use _parse_status() only to compute the UI-facing ConversationStatus
|
||||
status = self._parse_status(runtime) if runtime else ConversationStatus.STOPPED
|
||||
|
||||
nested_url = None
|
||||
session_api_key = None
|
||||
status = ConversationStatus.STOPPED
|
||||
event_store = EventStore(sid, self.file_store, user_id)
|
||||
|
||||
if runtime:
|
||||
nested_url = self._get_nested_url_for_runtime(runtime['runtime_id'], sid)
|
||||
session_api_key = runtime.get('session_api_key')
|
||||
status_str = (runtime.get('status') or 'stopped').upper()
|
||||
if status_str in ConversationStatus:
|
||||
status = ConversationStatus[status_str]
|
||||
if status is ConversationStatus.STOPPED and starting:
|
||||
status = ConversationStatus.STARTING
|
||||
|
||||
if status is ConversationStatus.STOPPED:
|
||||
# Mark the agentloop as starting in redis
|
||||
await redis.set(key, 1, ex=_REDIS_ENTRY_TIMEOUT_SECONDS)
|
||||
|
||||
# Start the agent loop in the background
|
||||
asyncio.create_task(
|
||||
self._start_agent_loop(
|
||||
sid, settings, user_id, initial_user_msg, replay_json
|
||||
)
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Retrieved from runtime: '
|
||||
f'key_preview={session_api_key[:10] if session_api_key else "None"}..., '
|
||||
f'runtime_id={runtime.get("runtime_id")}, '
|
||||
f'raw_status={raw_runtime_status}, '
|
||||
f'parsed_status={status}'
|
||||
)
|
||||
|
||||
# Determine if we need to start/resume the conversation
|
||||
# Key insight: We should only skip starting if:
|
||||
# 1. Runtime is running AND
|
||||
# 2. We're already starting (redis flag) OR conversation already exists
|
||||
|
||||
should_schedule_work = False
|
||||
is_resume = False
|
||||
|
||||
if raw_runtime_status == 'paused':
|
||||
# Always resume paused conversations
|
||||
should_schedule_work = True
|
||||
is_resume = True
|
||||
logger.info(f'[TOKEN_DEBUG] Will resume paused conversation {sid}')
|
||||
elif raw_runtime_status in ('stopped', ''):
|
||||
# Start new for stopped or non-existent runtimes
|
||||
should_schedule_work = True
|
||||
logger.info(f'[TOKEN_DEBUG] Will start new conversation {sid} (status={raw_runtime_status})')
|
||||
elif raw_runtime_status == 'running':
|
||||
# For running, only start if not already starting
|
||||
if starting:
|
||||
logger.info(f'[TOKEN_DEBUG] Already starting {sid} per Redis, returning STARTING')
|
||||
return AgentLoopInfo(
|
||||
conversation_id=sid,
|
||||
url=nested_url,
|
||||
session_api_key=session_api_key,
|
||||
event_store=event_store,
|
||||
status=ConversationStatus.STARTING,
|
||||
)
|
||||
else:
|
||||
# Runtime is running but we're not starting - this means conversation should exist
|
||||
# Return RUNNING status
|
||||
logger.info(f'[TOKEN_DEBUG] Runtime running and not starting, returning RUNNING')
|
||||
return AgentLoopInfo(
|
||||
conversation_id=sid,
|
||||
url=nested_url,
|
||||
session_api_key=session_api_key,
|
||||
event_store=event_store,
|
||||
status=ConversationStatus.RUNNING,
|
||||
)
|
||||
|
||||
if should_schedule_work:
|
||||
if not starting:
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Scheduling {"resume" if is_resume else "start"} '
|
||||
f'for sid={sid} (raw_status={raw_runtime_status})'
|
||||
)
|
||||
await redis.set(key, 1, ex=_REDIS_ENTRY_TIMEOUT_SECONDS)
|
||||
|
||||
asyncio.create_task(
|
||||
self._start_agent_loop(
|
||||
sid, settings, user_id, initial_user_msg, replay_json, is_resume
|
||||
)
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Already starting {sid} according to Redis, not scheduling again'
|
||||
)
|
||||
|
||||
# Return STARTING when work is scheduled or in progress
|
||||
return AgentLoopInfo(
|
||||
conversation_id=sid,
|
||||
url=nested_url,
|
||||
session_api_key=session_api_key,
|
||||
event_store=event_store,
|
||||
status=ConversationStatus.STARTING,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Returning from maybe_start_agent_loop: '
|
||||
f'sid={sid}, status={status}, '
|
||||
f'has_url={bool(nested_url)}, has_api_key={bool(session_api_key)}'
|
||||
)
|
||||
|
||||
return AgentLoopInfo(
|
||||
conversation_id=sid,
|
||||
url=nested_url,
|
||||
@@ -214,14 +298,29 @@ class SaasNestedConversationManager(ConversationManager):
|
||||
)
|
||||
|
||||
async def _start_agent_loop(
|
||||
self, sid, settings, user_id, initial_user_msg=None, replay_json=None
|
||||
self,
|
||||
sid,
|
||||
settings,
|
||||
user_id,
|
||||
initial_user_msg=None,
|
||||
replay_json=None,
|
||||
is_resume=False,
|
||||
):
|
||||
try:
|
||||
logger.info(f'starting_agent_loop:{sid}', extra={'session_id': sid})
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] SaaS _start_agent_loop: sid={sid}, is_resume={is_resume}'
|
||||
)
|
||||
|
||||
if is_resume:
|
||||
logger.info(f'[RESUME_DEBUG] Resuming existing runtime for sid={sid}')
|
||||
else:
|
||||
logger.info(f'[RESUME_DEBUG] Creating new runtime for sid={sid}')
|
||||
|
||||
await self.ensure_num_conversations_below_limit(sid, user_id)
|
||||
provider_handler = self._get_provider_handler(settings)
|
||||
runtime = await self._create_runtime(
|
||||
sid, user_id, settings, provider_handler
|
||||
sid, user_id, settings, provider_handler, is_resume
|
||||
)
|
||||
await runtime.connect()
|
||||
|
||||
@@ -233,16 +332,51 @@ class SaasNestedConversationManager(ConversationManager):
|
||||
)
|
||||
|
||||
session_api_key = runtime.session.headers['X-Session-API-Key']
|
||||
|
||||
await self._start_conversation(
|
||||
sid,
|
||||
user_id,
|
||||
settings,
|
||||
initial_user_msg,
|
||||
replay_json,
|
||||
runtime.runtime_url,
|
||||
session_api_key,
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Got session_api_key from runtime: '
|
||||
f'key_preview={session_api_key[:10] if session_api_key else "None"}...'
|
||||
)
|
||||
|
||||
# Check if we should skip conversation creation on resume
|
||||
if is_resume:
|
||||
# Get the existing runtime to check if we already have a session_api_key
|
||||
existing_runtime = await self._get_runtime(sid)
|
||||
if existing_runtime and existing_runtime.get('session_api_key'):
|
||||
logger.info(
|
||||
'[RESUME_DEBUG] Skipping conversation creation for resume '
|
||||
'(using existing session_api_key)'
|
||||
)
|
||||
# Use the EXISTING session_api_key for resume, not the new one
|
||||
existing_session_key = existing_runtime.get('session_api_key')
|
||||
# Just wait for the runtime to be ready
|
||||
async with httpx.AsyncClient(
|
||||
headers={'X-Session-API-Key': existing_session_key}
|
||||
) as client:
|
||||
await self._wait_for_conversation_ready(
|
||||
client, runtime.runtime_url, sid
|
||||
)
|
||||
else:
|
||||
# Resume but no existing session, create new conversation
|
||||
await self._start_conversation(
|
||||
sid,
|
||||
user_id,
|
||||
settings,
|
||||
initial_user_msg,
|
||||
replay_json,
|
||||
runtime.runtime_url,
|
||||
session_api_key,
|
||||
)
|
||||
else:
|
||||
# Not a resume, normal start
|
||||
await self._start_conversation(
|
||||
sid,
|
||||
user_id,
|
||||
settings,
|
||||
initial_user_msg,
|
||||
replay_json,
|
||||
runtime.runtime_url,
|
||||
session_api_key,
|
||||
)
|
||||
finally:
|
||||
# remove the starting entry from redis
|
||||
redis = self._get_redis_client()
|
||||
@@ -260,6 +394,11 @@ class SaasNestedConversationManager(ConversationManager):
|
||||
session_api_key: str,
|
||||
):
|
||||
logger.info('starting_nested_conversation', extra={'sid': sid})
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] _start_conversation with session_api_key: '
|
||||
f'key_preview={session_api_key[:10] if session_api_key else "None"}..., '
|
||||
f'api_url={api_url[:50] if api_url else "None"}...'
|
||||
)
|
||||
async with httpx.AsyncClient(
|
||||
headers={
|
||||
'X-Session-API-Key': session_api_key,
|
||||
@@ -418,16 +557,35 @@ class SaasNestedConversationManager(ConversationManager):
|
||||
):
|
||||
"""Wait for the conversation to be ready by checking the events endpoint."""
|
||||
# TODO: Find out why /api/conversations/{sid} returns RUNNING when events are not available
|
||||
for _ in range(5):
|
||||
logger.info(
|
||||
f'[WEBSOCKET_DEBUG] Starting _wait_for_conversation_ready for sid={sid}, '
|
||||
f'will check events endpoint up to 5 times'
|
||||
)
|
||||
for attempt in range(5):
|
||||
try:
|
||||
logger.info('checking_events_endpoint_running', extra={'sid': sid})
|
||||
logger.info(
|
||||
f'[WEBSOCKET_DEBUG] Attempt {attempt+1}/5: Checking {api_url}/api/conversations/{sid}/events'
|
||||
)
|
||||
response = await client.get(f'{api_url}/api/conversations/{sid}/events')
|
||||
if response.is_success:
|
||||
logger.info('events_endpoint_is_running', extra={'sid': sid})
|
||||
logger.info(
|
||||
f'[WEBSOCKET_DEBUG] Events endpoint ready after {attempt+1} attempts. '
|
||||
f'Frontend should now be able to connect via websocket.'
|
||||
)
|
||||
break
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
logger.warning('events_endpoint_not_ready', extra={'sid': sid})
|
||||
logger.warning(
|
||||
f'[WEBSOCKET_DEBUG] Events endpoint not ready (attempt {attempt+1}/5): {e}'
|
||||
)
|
||||
await asyncio.sleep(5)
|
||||
else:
|
||||
logger.error(
|
||||
f'[WEBSOCKET_DEBUG] CRITICAL: Events endpoint never became ready after 5 attempts! '
|
||||
f'Frontend will not receive events for sid={sid}'
|
||||
)
|
||||
|
||||
async def send_to_event_stream(self, connection_id: str, data: dict):
|
||||
# Not supported - clients should connect directly to the nested server!
|
||||
@@ -462,10 +620,17 @@ class SaasNestedConversationManager(ConversationManager):
|
||||
|
||||
async def close_session(self, sid: str):
|
||||
logger.info('close_session', extra={'sid': sid})
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] close_session called for {sid}, about to pause runtime'
|
||||
)
|
||||
runtime = await self._get_runtime(sid)
|
||||
if runtime is None:
|
||||
logger.info('no_session_to_close', extra={'sid': sid})
|
||||
logger.info(f'[TOKEN_DEBUG] No runtime found to close for {sid}')
|
||||
return
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Pausing runtime {runtime.get("runtime_id")} for session {sid}'
|
||||
)
|
||||
async with self._httpx_client() as client:
|
||||
response = await client.post(
|
||||
f'{self.remote_runtime_api_url}/pause',
|
||||
@@ -691,6 +856,15 @@ class SaasNestedConversationManager(ConversationManager):
|
||||
provider_tokens = None
|
||||
if isinstance(settings, ConversationInitData):
|
||||
provider_tokens = settings.git_provider_tokens
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Getting provider handler: '
|
||||
f'has_settings={settings is not None}, '
|
||||
f'is_ConversationInitData={isinstance(settings, ConversationInitData)}, '
|
||||
f'has_provider_tokens={bool(provider_tokens)}, '
|
||||
f'token_count={len(provider_tokens) if provider_tokens else 0}'
|
||||
)
|
||||
|
||||
provider_handler = ProviderHandler(
|
||||
provider_tokens=provider_tokens
|
||||
or cast(PROVIDER_TOKEN_TYPE, MappingProxyType({}))
|
||||
@@ -703,7 +877,40 @@ class SaasNestedConversationManager(ConversationManager):
|
||||
user_id: str,
|
||||
settings: Settings,
|
||||
provider_handler: ProviderHandler,
|
||||
is_resume: bool = False,
|
||||
):
|
||||
# Check if we have an existing runtime to understand the context
|
||||
logger.info(
|
||||
f'[RESUME_DEBUG] _create_runtime called for sid={sid}, is_resume={is_resume}'
|
||||
)
|
||||
existing_runtime = await self._get_runtime(sid)
|
||||
|
||||
# Determine if we should attach to existing runtime
|
||||
attach_to_existing = False
|
||||
if existing_runtime:
|
||||
raw_status = (existing_runtime.get('status') or '').lower()
|
||||
logger.info(
|
||||
f'[RESUME_DEBUG] Found existing runtime: '
|
||||
f'runtime_id={existing_runtime.get("runtime_id")}, '
|
||||
f'status={raw_status}, '
|
||||
f'has_api_key={bool(existing_runtime.get("session_api_key"))}'
|
||||
)
|
||||
|
||||
# Attach to existing runtime if it's paused or running
|
||||
if raw_status in ('paused', 'running'):
|
||||
attach_to_existing = True
|
||||
logger.info(
|
||||
f'[RESUME_DEBUG] Will attach to existing {raw_status} runtime'
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f'[RESUME_DEBUG] Will create new runtime (existing is {raw_status})'
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
'[RESUME_DEBUG] No existing runtime found, creating fresh runtime'
|
||||
)
|
||||
|
||||
llm_registry, conversation_stats, config = (
|
||||
create_registry_and_conversation_stats(self.config, sid, user_id, settings)
|
||||
)
|
||||
@@ -764,6 +971,29 @@ class SaasNestedConversationManager(ConversationManager):
|
||||
if self._runtime_container_image:
|
||||
config.sandbox.runtime_container_image = self._runtime_container_image
|
||||
|
||||
# Log the attach_to_existing decision
|
||||
logger.info(
|
||||
f'[ATTACH_DEBUG] Making attach_to_existing decision: '
|
||||
f'sid={sid}, attach_to_existing={attach_to_existing}, '
|
||||
f'reasoning={"attach to paused/running" if attach_to_existing else "create new"}'
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Creating RemoteRuntime: '
|
||||
f'sid={sid}, attach_to_existing={attach_to_existing}, '
|
||||
f'user_id={user_id}, '
|
||||
f'has_provider_tokens={bool(provider_handler and provider_handler.provider_tokens)}'
|
||||
)
|
||||
|
||||
# Log the state of tokens before runtime creation
|
||||
if provider_handler and provider_handler.provider_tokens:
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Provider tokens before runtime creation: '
|
||||
f'{list(provider_handler.provider_tokens.keys())}'
|
||||
)
|
||||
else:
|
||||
logger.info('[TOKEN_DEBUG] No provider tokens before runtime creation')
|
||||
|
||||
runtime = RemoteRuntime(
|
||||
config=config,
|
||||
event_stream=None, # type: ignore[arg-type]
|
||||
@@ -771,7 +1001,7 @@ class SaasNestedConversationManager(ConversationManager):
|
||||
plugins=agent.sandbox_plugins,
|
||||
# env_vars=env_vars,
|
||||
# status_callback: Callable[..., None] | None = None,
|
||||
attach_to_existing=False,
|
||||
attach_to_existing=attach_to_existing,
|
||||
headless_mode=False,
|
||||
user_id=user_id,
|
||||
# git_provider_tokens: PROVIDER_TOKEN_TYPE | None = None,
|
||||
@@ -779,6 +1009,10 @@ class SaasNestedConversationManager(ConversationManager):
|
||||
llm_registry=llm_registry,
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] RemoteRuntime created: runtime_id={runtime.runtime_id if hasattr(runtime, "runtime_id") else "N/A"}'
|
||||
)
|
||||
|
||||
# TODO: This is a hack. The setup_initial_env method directly calls the methods on the action
|
||||
# execution server, even though there are not any variables to set. In the nested env, there
|
||||
# is currently no direct access to the action execution server, so we should either add a
|
||||
@@ -826,10 +1060,27 @@ class SaasNestedConversationManager(ConversationManager):
|
||||
|
||||
async def _get_runtime(self, sid: str) -> dict | None:
|
||||
async with self._httpx_client() as client:
|
||||
response = await client.get(f'{self.remote_runtime_api_url}/sessions/{sid}')
|
||||
url = f'{self.remote_runtime_api_url}/sessions/{sid}'
|
||||
logger.info(f'[TOKEN_DEBUG] Fetching runtime from: {url}')
|
||||
|
||||
response = await client.get(url)
|
||||
|
||||
if not response.is_success:
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Runtime fetch failed: '
|
||||
f'status_code={response.status_code}, '
|
||||
f'sid={sid}'
|
||||
)
|
||||
return None
|
||||
|
||||
response_json = response.json()
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Runtime fetched successfully: '
|
||||
f'sid={sid}, '
|
||||
f'runtime_id={response_json.get("runtime_id")}, '
|
||||
f'status={response_json.get("status")}, '
|
||||
f'has_api_key={bool(response_json.get("session_api_key"))}'
|
||||
)
|
||||
|
||||
# Hack: This endpoint doesn't return the session_id
|
||||
response_json['session_id'] = sid
|
||||
@@ -839,12 +1090,26 @@ class SaasNestedConversationManager(ConversationManager):
|
||||
def _parse_status(self, runtime: dict):
|
||||
# status is one of running, stoppped, paused, error, starting
|
||||
status = (runtime.get('status') or '').upper()
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] _parse_status: input_status="{runtime.get("status")}", '
|
||||
f'uppercase_status="{status}", '
|
||||
f'is_paused={status == "PAUSED"}, '
|
||||
f'is_stopped={status == "STOPPED"}'
|
||||
)
|
||||
|
||||
if status == 'PAUSED':
|
||||
logger.info('[TOKEN_DEBUG] Mapping PAUSED -> ConversationStatus.STOPPED')
|
||||
return ConversationStatus.STOPPED
|
||||
elif status == 'STOPPED':
|
||||
logger.info('[TOKEN_DEBUG] Mapping STOPPED -> ConversationStatus.ARCHIVED')
|
||||
return ConversationStatus.ARCHIVED
|
||||
if status in ConversationStatus:
|
||||
logger.info(f'[TOKEN_DEBUG] Direct mapping to ConversationStatus.{status}')
|
||||
return ConversationStatus[status]
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Unknown status "{status}", defaulting to ConversationStatus.STOPPED'
|
||||
)
|
||||
return ConversationStatus.STOPPED
|
||||
|
||||
def _get_nested_url_for_runtime(self, runtime_id: str, conversation_id: str):
|
||||
|
||||
@@ -37,6 +37,14 @@ class ApiKeyStore:
|
||||
"""
|
||||
api_key = self.generate_api_key()
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Creating API key: '
|
||||
f'user_id={user_id}, '
|
||||
f'name={name}, '
|
||||
f'expires_at={expires_at}, '
|
||||
f'key_preview={api_key[:10] if api_key else "None"}...'
|
||||
)
|
||||
|
||||
with self.session_maker() as session:
|
||||
key_record = ApiKey(
|
||||
key=api_key, user_id=user_id, name=name, expires_at=expires_at
|
||||
@@ -44,21 +52,43 @@ class ApiKeyStore:
|
||||
session.add(key_record)
|
||||
session.commit()
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] API key created successfully: '
|
||||
f'key_id={key_record.id}, '
|
||||
f'user_id={user_id}, '
|
||||
f'name={name}'
|
||||
)
|
||||
|
||||
return api_key
|
||||
|
||||
def validate_api_key(self, api_key: str) -> str | None:
|
||||
"""Validate an API key and return the associated user_id if valid."""
|
||||
now = datetime.now(UTC)
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Validating API key: '
|
||||
f'key_preview={api_key[:10] if api_key else "None"}...'
|
||||
)
|
||||
|
||||
with self.session_maker() as session:
|
||||
key_record = session.query(ApiKey).filter(ApiKey.key == api_key).first()
|
||||
|
||||
if not key_record:
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] API key not found in database: '
|
||||
f'key_preview={api_key[:10] if api_key else "None"}...'
|
||||
)
|
||||
return None
|
||||
|
||||
# Check if the key has expired
|
||||
if key_record.expires_at and key_record.expires_at < now:
|
||||
logger.info(f'API key has expired: {key_record.id}')
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] API key expired: '
|
||||
f'key_id={key_record.id}, '
|
||||
f'expires_at={key_record.expires_at}, '
|
||||
f'now={now}'
|
||||
)
|
||||
return None
|
||||
|
||||
# Update last_used_at timestamp
|
||||
@@ -69,6 +99,13 @@ class ApiKeyStore:
|
||||
)
|
||||
session.commit()
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] API key validated successfully: '
|
||||
f'key_id={key_record.id}, '
|
||||
f'user_id={key_record.user_id}, '
|
||||
f'name={key_record.name}'
|
||||
)
|
||||
|
||||
return key_record.user_id
|
||||
|
||||
def delete_api_key(self, api_key: str) -> bool:
|
||||
|
||||
@@ -175,17 +175,17 @@ class TestIsLegacyRuntime:
|
||||
assert result is True
|
||||
|
||||
def test_is_legacy_runtime_empty_command(self, legacy_manager):
|
||||
"""Test with empty command."""
|
||||
"""Test with empty command - should use new manager."""
|
||||
runtime = {'command': ''}
|
||||
result = legacy_manager.is_legacy_runtime(runtime)
|
||||
assert result is True
|
||||
assert result is False # Empty command means use new manager
|
||||
|
||||
def test_is_legacy_runtime_missing_command_key(self, legacy_manager):
|
||||
"""Test with runtime missing command key."""
|
||||
"""Test with runtime missing command key - should use new manager."""
|
||||
runtime = {'other_key': 'value'}
|
||||
# This should raise a KeyError
|
||||
with pytest.raises(KeyError):
|
||||
legacy_manager.is_legacy_runtime(runtime)
|
||||
# Should not raise KeyError, returns False (use new manager)
|
||||
result = legacy_manager.is_legacy_runtime(runtime)
|
||||
assert result is False
|
||||
|
||||
|
||||
class TestShouldStartInLegacyMode:
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { useState, useEffect } from "react";
|
||||
import { isFileImage } from "#/utils/is-file-image";
|
||||
import { displayErrorToast } from "#/utils/custom-toast-handlers";
|
||||
import { validateFiles } from "#/utils/file-validation";
|
||||
@@ -32,6 +33,25 @@ export function InteractiveChatBox({
|
||||
const { curAgentState } = useAgentStore();
|
||||
const { data: conversation } = useActiveConversation();
|
||||
|
||||
// Track if agent has reached AWAITING_USER_INPUT state for first time
|
||||
const [hasSeenAwaitingUserInput, setHasSeenAwaitingUserInput] = useState(false);
|
||||
|
||||
// Reset the flag when conversation is STARTING (new or restart)
|
||||
useEffect(() => {
|
||||
if (conversation?.status === "STARTING") {
|
||||
console.log("[CHAT_INPUT_DEBUG] Conversation STARTING - resetting hasSeenAwaitingUserInput to false");
|
||||
setHasSeenAwaitingUserInput(false);
|
||||
}
|
||||
}, [conversation?.status]);
|
||||
|
||||
// Set flag when we see AWAITING_USER_INPUT for the first time
|
||||
useEffect(() => {
|
||||
if (curAgentState === AgentState.AWAITING_USER_INPUT && !hasSeenAwaitingUserInput) {
|
||||
console.log("[CHAT_INPUT_DEBUG] Agent reached AWAITING_USER_INPUT - enabling input");
|
||||
setHasSeenAwaitingUserInput(true);
|
||||
}
|
||||
}, [curAgentState, hasSeenAwaitingUserInput]);
|
||||
|
||||
// Helper function to validate and filter files
|
||||
const validateAndFilterFiles = (selectedFiles: File[]) => {
|
||||
const validation = validateFiles(selectedFiles, [...images, ...files]);
|
||||
@@ -137,9 +157,18 @@ export function InteractiveChatBox({
|
||||
};
|
||||
|
||||
const isDisabled =
|
||||
!hasSeenAwaitingUserInput || // Block until first AWAITING_USER_INPUT
|
||||
curAgentState === AgentState.LOADING ||
|
||||
curAgentState === AgentState.AWAITING_USER_CONFIRMATION;
|
||||
|
||||
// Debug logging for disable state
|
||||
console.log("[CHAT_INPUT_DEBUG] Input disabled state:", {
|
||||
isDisabled,
|
||||
hasSeenAwaitingUserInput,
|
||||
curAgentState,
|
||||
conversationStatus: conversation?.status
|
||||
});
|
||||
|
||||
return (
|
||||
<div data-testid="interactive-chat-box">
|
||||
<CustomChatInput
|
||||
|
||||
@@ -135,6 +135,7 @@ export function WsClientProvider({
|
||||
const { setErrorMessage, removeErrorMessage } = useWSErrorMessage();
|
||||
const queryClient = useQueryClient();
|
||||
const sioRef = React.useRef<Socket | null>(null);
|
||||
const connectErrorCountRef = React.useRef<number>(0);
|
||||
const [webSocketStatus, setWebSocketStatus] =
|
||||
React.useState<WebSocketStatus>("DISCONNECTED");
|
||||
const [events, setEvents] = React.useState<Record<string, unknown>[]>([]);
|
||||
@@ -159,9 +160,33 @@ export function WsClientProvider({
|
||||
function handleConnect() {
|
||||
setWebSocketStatus("CONNECTED");
|
||||
removeErrorMessage();
|
||||
// Reset error count on successful connection
|
||||
connectErrorCountRef.current = 0;
|
||||
console.log('[WS_DEBUG] Connection error count reset to 0');
|
||||
}
|
||||
|
||||
function handleMessage(event: Record<string, unknown>) {
|
||||
// Log important message events for debugging
|
||||
if (event.event_type === "message" || event.action === "message") {
|
||||
const sender = (event as any).sender || "unknown";
|
||||
const content = (event as any).content || (event as any).message || "";
|
||||
console.log('[MESSAGE_DEBUG] Message received:', {
|
||||
sender,
|
||||
content: content.substring(0, 100),
|
||||
eventId: (event as any).id,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
// Log agent state changes
|
||||
if (isAgentStateChangeObservation(event)) {
|
||||
console.log('[AGENT_DEBUG] Agent state changed:', {
|
||||
newState: event.extras.agent_state,
|
||||
eventId: event.id,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
}
|
||||
|
||||
handleAssistantMessage(event);
|
||||
|
||||
if (isOpenHandsEvent(event)) {
|
||||
@@ -202,6 +227,11 @@ export function WsClientProvider({
|
||||
}
|
||||
|
||||
if (isUserMessage(event)) {
|
||||
console.log('[MESSAGE_DEBUG] User message confirmed:', {
|
||||
content: (event as any).content?.substring(0, 100),
|
||||
eventId: (event as any).id,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
removeOptimisticUserMessage();
|
||||
}
|
||||
|
||||
@@ -268,23 +298,50 @@ export function WsClientProvider({
|
||||
setErrorMessage(hasValidMessageProperty(data) ? data.message : "");
|
||||
}
|
||||
|
||||
function handleError(data: unknown) {
|
||||
function handleError(data: unknown, isConnectionError = false) {
|
||||
// set status
|
||||
setWebSocketStatus("DISCONNECTED");
|
||||
updateStatusWhenErrorMessagePresent(data);
|
||||
|
||||
setErrorMessage(
|
||||
hasValidMessageProperty(data)
|
||||
? data.message
|
||||
: "An unknown error occurred on the WebSocket connection.",
|
||||
);
|
||||
// For connection errors during STARTING, use retry logic
|
||||
if (isConnectionError) {
|
||||
connectErrorCountRef.current += 1;
|
||||
const errorCount = connectErrorCountRef.current;
|
||||
const conversationStatus = conversation?.status;
|
||||
|
||||
console.log('[WS_DEBUG] Connection error handling:', {
|
||||
errorCount,
|
||||
conversationStatus,
|
||||
willShowError: errorCount > 3 && conversationStatus !== "STARTING",
|
||||
});
|
||||
|
||||
// Only show error banner if:
|
||||
// 1. We've failed more than 3 times (persistent failure)
|
||||
// 2. AND we're not in STARTING state (where failures are expected)
|
||||
if (errorCount > 3 && conversationStatus !== "STARTING") {
|
||||
setErrorMessage(
|
||||
hasValidMessageProperty(data)
|
||||
? data.message
|
||||
: "Unable to establish WebSocket connection. Messages will be sent via HTTP.",
|
||||
);
|
||||
}
|
||||
} else {
|
||||
// Non-connection errors always show immediately
|
||||
setErrorMessage(
|
||||
hasValidMessageProperty(data)
|
||||
? data.message
|
||||
: "An unknown error occurred on the WebSocket connection.",
|
||||
);
|
||||
}
|
||||
|
||||
// check if something went wrong with the conversation.
|
||||
refetchConversation();
|
||||
}
|
||||
|
||||
React.useEffect(() => {
|
||||
console.log('[WS_DEBUG] Conversation ID changed:', conversationId);
|
||||
lastEventRef.current = null;
|
||||
connectErrorCountRef.current = 0; // Reset error count for new conversation
|
||||
|
||||
// reset events when conversationId changes
|
||||
setEvents([]);
|
||||
@@ -293,34 +350,70 @@ export function WsClientProvider({
|
||||
}, [conversationId]);
|
||||
|
||||
React.useEffect(() => {
|
||||
console.log('[WS_DEBUG] WebSocket connection effect triggered:', {
|
||||
conversationId,
|
||||
conversation: conversation ? {
|
||||
id: conversation.id,
|
||||
status: conversation.status,
|
||||
runtime_status: conversation.runtime_status,
|
||||
session_api_key: conversation.session_api_key ? 'present' : 'missing',
|
||||
url: conversation.url
|
||||
} : 'null',
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
if (!conversationId) {
|
||||
throw new Error("No conversation ID provided");
|
||||
}
|
||||
|
||||
// Clear error messages when conversation is intentionally stopped
|
||||
if (conversation && conversation.status === "STOPPED") {
|
||||
console.log('[WS_DEBUG] Conversation STOPPED, disconnecting WebSocket');
|
||||
removeErrorMessage();
|
||||
setWebSocketStatus("DISCONNECTED");
|
||||
return () => undefined; // conversation intentionally stopped
|
||||
}
|
||||
|
||||
// Set connecting status when conversation is starting
|
||||
// Allow WebSocket connection during STARTING status to receive real-time updates
|
||||
if (conversation && conversation.status === "STARTING") {
|
||||
console.log('[WS_DEBUG] Conversation STARTING, will attempt WebSocket connection');
|
||||
removeErrorMessage();
|
||||
setWebSocketStatus("CONNECTING");
|
||||
return () => undefined; // conversation is starting, will connect when ready
|
||||
// Don't return early - let it continue to establish connection
|
||||
}
|
||||
|
||||
// Only connect when conversation is fully loaded and running
|
||||
// Only connect when conversation exists and is not stopped
|
||||
if (
|
||||
!conversation ||
|
||||
conversation.status !== "RUNNING" ||
|
||||
!conversation.runtime_status ||
|
||||
(conversation.status !== "RUNNING" && conversation.status !== "STARTING") ||
|
||||
conversation.runtime_status === "STATUS$STOPPED"
|
||||
) {
|
||||
console.log('[WS_DEBUG] NOT connecting WebSocket because:', {
|
||||
hasConversation: !!conversation,
|
||||
status: conversation?.status,
|
||||
runtime_status: conversation?.runtime_status,
|
||||
reason: !conversation ? 'no conversation' :
|
||||
(conversation.status !== "RUNNING" && conversation.status !== "STARTING") ? `status is ${conversation.status}, not RUNNING/STARTING` :
|
||||
'runtime is stopped'
|
||||
});
|
||||
return () => undefined; // conversation not ready for WebSocket connection
|
||||
}
|
||||
|
||||
// Check if session_api_key is available - required for WebSocket authentication
|
||||
if (!conversation.session_api_key) {
|
||||
console.log('[WS_DEBUG] No session_api_key yet, skipping WebSocket connection', {
|
||||
conversationId,
|
||||
status: conversation.status,
|
||||
hasUrl: !!conversation.url,
|
||||
});
|
||||
// This effect runs whenever conversation object changes. Since we're not setting any state that
|
||||
// would cause re-renders, we rely on the existing polling to refetch conversation and trigger this
|
||||
// effect again when session_api_key becomes available.
|
||||
return () => undefined; // Wait for session_api_key to become available
|
||||
}
|
||||
|
||||
console.log('[WS_DEBUG] ESTABLISHING WebSocket connection for conversation with status:', conversation.status);
|
||||
|
||||
let sio = sioRef.current;
|
||||
|
||||
if (sio?.connected) {
|
||||
@@ -338,6 +431,17 @@ export function WsClientProvider({
|
||||
session_api_key: conversation.session_api_key, // Have to set here because socketio doesn't support custom headers. :(
|
||||
};
|
||||
|
||||
// Debug: Check critical connection parameters
|
||||
console.log('[WS_DEBUG] WebSocket connection parameters:', {
|
||||
hasUrl: !!conversation.url,
|
||||
url: conversation.url,
|
||||
hasSessionApiKey: !!conversation.session_api_key,
|
||||
sessionApiKeyLength: conversation.session_api_key?.length,
|
||||
conversationStatus: conversation.status,
|
||||
runtimeStatus: conversation.runtime_status,
|
||||
query,
|
||||
});
|
||||
|
||||
let baseUrl: string | null = null;
|
||||
let socketPath: string;
|
||||
if (conversation.url && !conversation.url.startsWith("/")) {
|
||||
@@ -359,11 +463,51 @@ export function WsClientProvider({
|
||||
query,
|
||||
});
|
||||
|
||||
sio.on("connect", handleConnect);
|
||||
console.log('[WS_DEBUG] Attempting WebSocket connection:', {
|
||||
baseUrl,
|
||||
socketPath,
|
||||
fullUrl: `ws://${baseUrl}${socketPath}`,
|
||||
hasSessionApiKey: !!query.session_api_key,
|
||||
conversationId: query.conversation_id,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
|
||||
sio.on("connect", () => {
|
||||
console.log('[WS_DEBUG] ✅ WebSocket CONNECTED successfully!', {
|
||||
socketId: sio.id,
|
||||
connected: sio.connected,
|
||||
conversationStatus: conversation.status,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
handleConnect();
|
||||
});
|
||||
sio.on("oh_event", handleMessage);
|
||||
sio.on("connect_error", handleError);
|
||||
sio.on("connect_failed", handleError);
|
||||
sio.on("disconnect", handleDisconnect);
|
||||
sio.on("connect_error", (error) => {
|
||||
console.log('[WS_DEBUG] ❌ WebSocket connect_error:', {
|
||||
errorMessage: error.message,
|
||||
errorType: error.type,
|
||||
errorData: error.data,
|
||||
conversationStatus: conversation.status,
|
||||
hasUrl: !!conversation.url,
|
||||
hasSessionApiKey: !!conversation.session_api_key,
|
||||
errorCount: connectErrorCountRef.current + 1,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
handleError(error, true); // true = isConnectionError
|
||||
});
|
||||
sio.on("connect_failed", (error) => {
|
||||
console.log('[WS_DEBUG] WebSocket connect_failed:', error);
|
||||
handleError(error, true); // true = isConnectionError
|
||||
});
|
||||
sio.on("disconnect", (reason) => {
|
||||
console.log('[WS_DEBUG] ⚠️ WebSocket disconnected:', {
|
||||
reason,
|
||||
wasConnected: sio.connected,
|
||||
conversationStatus: conversation.status,
|
||||
timestamp: new Date().toISOString(),
|
||||
});
|
||||
handleDisconnect(reason);
|
||||
});
|
||||
|
||||
sioRef.current = sio;
|
||||
|
||||
|
||||
@@ -6,11 +6,21 @@ import ConversationService from "#/api/conversation-service/conversation-service
|
||||
export const useActiveConversation = () => {
|
||||
const { conversationId } = useConversationId();
|
||||
const userConversation = useUserConversation(conversationId, (query) => {
|
||||
if (query.state.data?.status === "STARTING") {
|
||||
const status = query.state.data?.status;
|
||||
console.log('[CONVERSATION_DEBUG] Polling conversation:', {
|
||||
conversationId,
|
||||
status,
|
||||
runtime_status: query.state.data?.runtime_status,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
if (status === "STARTING") {
|
||||
console.log('[CONVERSATION_DEBUG] Status is STARTING, polling every 3s');
|
||||
return 3000; // 3 seconds
|
||||
}
|
||||
// TODO: Return conversation title as a WS event to avoid polling
|
||||
// This was changed from 5 minutes to 30 seconds to poll for updated conversation title after an auto update
|
||||
console.log('[CONVERSATION_DEBUG] Status is not STARTING, polling every 30s');
|
||||
return 30000; // 30 seconds
|
||||
});
|
||||
|
||||
|
||||
@@ -104,8 +104,19 @@ export function getStatusCode(
|
||||
runtimeStatus: RuntimeStatus | null,
|
||||
agentState: AgentState | null,
|
||||
) {
|
||||
// Debug logging for status determination
|
||||
console.log('[STATUS_DEBUG] getStatusCode called:', {
|
||||
statusMessage,
|
||||
webSocketStatus,
|
||||
conversationStatus,
|
||||
runtimeStatus,
|
||||
agentState,
|
||||
timestamp: new Date().toISOString()
|
||||
});
|
||||
|
||||
// Handle conversation and runtime stopped states
|
||||
if (conversationStatus === "STOPPED" || runtimeStatus === "STATUS$STOPPED") {
|
||||
console.log('[STATUS_DEBUG] Returning STOPPED status');
|
||||
return I18nKey.CHAT_INTERFACE$STOPPED;
|
||||
}
|
||||
|
||||
@@ -134,11 +145,24 @@ export function getStatusCode(
|
||||
return runtimeStatus;
|
||||
}
|
||||
|
||||
// Handle conversation starting state BEFORE WebSocket states
|
||||
// This ensures users see "Initializing agent..." instead of "Connecting..."
|
||||
if (conversationStatus === "STARTING") {
|
||||
console.log('[STATUS_DEBUG] Conversation STARTING, showing initializing status');
|
||||
return I18nKey.AGENT_STATUS$INITIALIZING;
|
||||
}
|
||||
|
||||
// Handle WebSocket connection states
|
||||
if (webSocketStatus === "DISCONNECTED") {
|
||||
console.log('[STATUS_DEBUG] WebSocket DISCONNECTED, returning disconnected status');
|
||||
return I18nKey.CHAT_INTERFACE$DISCONNECTED;
|
||||
}
|
||||
if (webSocketStatus === "CONNECTING") {
|
||||
console.log('[STATUS_DEBUG] WebSocket CONNECTING - now only shown if conversation not STARTING', {
|
||||
conversationStatus,
|
||||
runtimeStatus,
|
||||
agentState
|
||||
});
|
||||
return I18nKey.CHAT_INTERFACE$CONNECTING;
|
||||
}
|
||||
|
||||
|
||||
@@ -174,7 +174,17 @@ class ProviderHandler:
|
||||
) -> SecretStr | None:
|
||||
"""Get latest token from service"""
|
||||
try:
|
||||
async with httpx.AsyncClient() as client:
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Attempting to fetch latest {provider} token from '
|
||||
f'{self.REFRESH_TOKEN_URL} for session {self.sid}'
|
||||
)
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Using session API key: '
|
||||
f'{self.session_api_key[:10] if self.session_api_key else "None"}..., '
|
||||
f'provider={provider.value}, sid={self.sid}'
|
||||
)
|
||||
|
||||
async with httpx.AsyncClient(follow_redirects=False) as client:
|
||||
resp = await client.get(
|
||||
self.REFRESH_TOKEN_URL,
|
||||
headers={
|
||||
@@ -183,13 +193,72 @@ class ProviderHandler:
|
||||
params={'provider': provider.value, 'sid': self.sid},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Response status: {resp.status_code} for {provider}'
|
||||
)
|
||||
|
||||
# Log response headers for debugging
|
||||
logger.info(f'[TOKEN_DEBUG] Response headers: {dict(resp.headers)}')
|
||||
|
||||
# Check for redirect (expired Keycloak session)
|
||||
if resp.status_code == 302:
|
||||
redirect_url = resp.headers.get('Location', 'Unknown')
|
||||
# Check if this is OAuth2 proxy CSRF issue vs actual token expiry
|
||||
is_csrf_issue = '_oauth2_proxy_csrf' in resp.headers.get(
|
||||
'set-cookie', ''
|
||||
)
|
||||
logger.error(
|
||||
f'[TOKEN_DEBUG] Got 302 redirect for {provider} token refresh. '
|
||||
f'{"OAuth2 Proxy CSRF validation failed" if is_csrf_issue else "Keycloak session expired"}. '
|
||||
f'Redirect URL: {redirect_url[:200]}... '
|
||||
f'User needs to re-authenticate.'
|
||||
)
|
||||
# Log OAuth2 proxy cookie details
|
||||
set_cookie = resp.headers.get('set-cookie', 'N/A')
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] OAuth2 proxy CSRF cookie in redirect: {set_cookie[:150]}...'
|
||||
)
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] This appears to be {"a CSRF validation issue (pod changed?)" if is_csrf_issue else "a token expiry issue"}'
|
||||
)
|
||||
# Don't try to parse JSON from a redirect response
|
||||
return None
|
||||
|
||||
# Check for forbidden (wrong session key)
|
||||
if resp.status_code == 403:
|
||||
logger.error(
|
||||
f'[TOKEN_DEBUG] Got 403 Forbidden for {provider} token refresh. '
|
||||
f'Session key mismatch or invalid. Session API key: '
|
||||
f'{self.session_api_key[:10] if self.session_api_key else "None"}... '
|
||||
f'Response body: {resp.text[:200]}'
|
||||
)
|
||||
return None
|
||||
|
||||
# Check for unauthorized
|
||||
if resp.status_code == 401:
|
||||
logger.error(
|
||||
f'[TOKEN_DEBUG] Got 401 Unauthorized for {provider} token refresh. '
|
||||
f'Authentication failed. Response body: {resp.text[:200]}'
|
||||
)
|
||||
return None
|
||||
|
||||
resp.raise_for_status()
|
||||
data = TokenResponse.model_validate_json(resp.text)
|
||||
|
||||
# Log token info (safely)
|
||||
token_str = data.token
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Successfully fetched {provider} token. '
|
||||
f'Token prefix: {token_str[:10] if len(token_str) > 10 else "SHORT"}, '
|
||||
f'Length: {len(token_str)}'
|
||||
)
|
||||
|
||||
return SecretStr(data.token)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f'Failed to fetch latest token for provider {provider}: {e}',
|
||||
f'[TOKEN_DEBUG] Failed to fetch latest token for provider {provider}: '
|
||||
f'{type(e).__name__}: {e}',
|
||||
exc_info=True,
|
||||
)
|
||||
|
||||
|
||||
@@ -209,15 +209,26 @@ class Runtime(FileEditRuntimeMixin):
|
||||
return self._runtime_initialized
|
||||
|
||||
def setup_initial_env(self) -> None:
|
||||
logger.debug(
|
||||
f'[ENV_SETUP_DEBUG] setup_initial_env called with attach_to_existing={self.attach_to_existing}'
|
||||
)
|
||||
if self.attach_to_existing:
|
||||
logger.debug(
|
||||
'[ENV_SETUP_DEBUG] Skipping environment setup - attach_to_existing is True'
|
||||
)
|
||||
return
|
||||
logger.debug(
|
||||
'[ENV_SETUP_DEBUG] Performing full environment setup - attach_to_existing is False'
|
||||
)
|
||||
logger.debug(f'Adding env vars: {self.initial_env_vars.keys()}')
|
||||
self.add_env_vars(self.initial_env_vars)
|
||||
if self.config.sandbox.runtime_startup_env_vars:
|
||||
self.add_env_vars(self.config.sandbox.runtime_startup_env_vars)
|
||||
|
||||
# Configure git settings
|
||||
logger.debug('[ENV_SETUP_DEBUG] Configuring git settings')
|
||||
self._setup_git_config()
|
||||
logger.debug('[ENV_SETUP_DEBUG] Environment setup complete')
|
||||
|
||||
def close(self) -> None:
|
||||
"""This should only be called by conversation manager or closing the session.
|
||||
|
||||
@@ -77,6 +77,14 @@ class RemoteRuntime(ActionExecutionClient):
|
||||
git_provider_tokens,
|
||||
)
|
||||
logger.debug(f'RemoteRuntime.init user_id {user_id}')
|
||||
# Debug logging for initialization parameters
|
||||
self.log(
|
||||
'info',
|
||||
f'[TOKEN_DEBUG] RemoteRuntime.__init__ called with: '
|
||||
f'sid={sid}, attach_to_existing={attach_to_existing}, '
|
||||
f'has_tokens={git_provider_tokens is not None}, '
|
||||
f'user_id={user_id}',
|
||||
)
|
||||
if self.config.sandbox.api_key is None:
|
||||
raise ValueError(
|
||||
'API key is required to use the remote runtime. '
|
||||
@@ -134,6 +142,11 @@ class RemoteRuntime(ActionExecutionClient):
|
||||
|
||||
def _start_or_attach_to_runtime(self) -> None:
|
||||
self.log('info', 'Starting or attaching to runtime')
|
||||
self.log(
|
||||
'info',
|
||||
f'[TOKEN_DEBUG] _start_or_attach_to_runtime: attach_to_existing={self.attach_to_existing}, '
|
||||
f'has_tokens={self.git_provider_tokens is not None}',
|
||||
)
|
||||
existing_runtime = self._check_existing_runtime()
|
||||
if existing_runtime:
|
||||
self.log('info', f'Using existing runtime with ID: {self.runtime_id}')
|
||||
@@ -163,15 +176,38 @@ class RemoteRuntime(ActionExecutionClient):
|
||||
assert self.runtime_url is not None, (
|
||||
'Runtime URL is not set. This should never happen.'
|
||||
)
|
||||
# Log initialization path differences
|
||||
if not self.attach_to_existing:
|
||||
self.log('info', 'Waiting for runtime to be alive...')
|
||||
self.log(
|
||||
'info',
|
||||
'[INIT_PATH_DEBUG] Following NEW runtime path: will wait for runtime and show "Runtime is ready"',
|
||||
)
|
||||
else:
|
||||
self.log(
|
||||
'info',
|
||||
'[INIT_PATH_DEBUG] Following ATTACH path: skipping "Waiting" message and "Runtime is ready" message',
|
||||
)
|
||||
|
||||
self._wait_until_alive()
|
||||
|
||||
if not self.attach_to_existing:
|
||||
self.log('info', 'Runtime is ready.')
|
||||
self.log('info', '[INIT_PATH_DEBUG] Completed NEW runtime initialization')
|
||||
else:
|
||||
self.log(
|
||||
'info',
|
||||
'[INIT_PATH_DEBUG] Completed ATTACH runtime initialization (minimal setup)',
|
||||
)
|
||||
self.set_runtime_status(RuntimeStatus.READY)
|
||||
|
||||
def _check_existing_runtime(self) -> bool:
|
||||
self.log('info', f'Checking for existing runtime with session ID: {self.sid}')
|
||||
self.log(
|
||||
'info',
|
||||
f'[TOKEN_DEBUG] _check_existing_runtime: attach_to_existing={self.attach_to_existing}, '
|
||||
f'has_tokens={self.git_provider_tokens is not None}',
|
||||
)
|
||||
try:
|
||||
response = self._send_runtime_api_request(
|
||||
'GET',
|
||||
@@ -180,6 +216,11 @@ class RemoteRuntime(ActionExecutionClient):
|
||||
data = response.json()
|
||||
status = data.get('status')
|
||||
self.log('info', f'Found runtime with status: {status}')
|
||||
self.log(
|
||||
'info',
|
||||
f'[TOKEN_DEBUG] Runtime response data: runtime_id={data.get("runtime_id")}, '
|
||||
f'status={status}, session_id={data.get("session_id")}',
|
||||
)
|
||||
if status == 'running' or status == 'paused':
|
||||
self._parse_runtime_response(response)
|
||||
except httpx.HTTPError as e:
|
||||
@@ -199,22 +240,37 @@ class RemoteRuntime(ActionExecutionClient):
|
||||
|
||||
if status == 'running':
|
||||
self.log('info', 'Found existing runtime in running state')
|
||||
self.log(
|
||||
'info', '[STATE_TRANSITION] Runtime state: running → (no change needed)'
|
||||
)
|
||||
return True
|
||||
elif status == 'stopped':
|
||||
self.log('info', 'Found existing runtime, but it is stopped')
|
||||
self.log(
|
||||
'info', '[STATE_TRANSITION] Runtime state: stopped → (cannot resume)'
|
||||
)
|
||||
return False
|
||||
elif status == 'paused':
|
||||
self.log(
|
||||
'info', 'Found existing runtime in paused state, attempting to resume'
|
||||
)
|
||||
self.log('info', '[STATE_TRANSITION] Runtime state: paused → resuming')
|
||||
try:
|
||||
self._resume_runtime()
|
||||
self.log('info', 'Successfully resumed paused runtime')
|
||||
self.log(
|
||||
'info',
|
||||
'[STATE_TRANSITION] Runtime state: paused → running (resume successful)',
|
||||
)
|
||||
return True
|
||||
except Exception as e:
|
||||
self.log(
|
||||
'error', f'Failed to resume paused runtime: {e}', exc_info=True
|
||||
)
|
||||
self.log(
|
||||
'error',
|
||||
'[STATE_TRANSITION] Runtime state: paused → failed (resume failed)',
|
||||
)
|
||||
# Return false to indicate we couldn't use the existing runtime
|
||||
return False
|
||||
else:
|
||||
@@ -313,6 +369,21 @@ class RemoteRuntime(ActionExecutionClient):
|
||||
4. Update env vars
|
||||
"""
|
||||
self.log('info', f'Attempting to resume runtime with ID: {self.runtime_id}')
|
||||
# Debug logging for token refresh investigation
|
||||
self.log(
|
||||
'info',
|
||||
f'[TOKEN_DEBUG] Starting resume process for runtime {self.runtime_id}',
|
||||
)
|
||||
self.log(
|
||||
'info',
|
||||
f'[TOKEN_DEBUG] attach_to_existing={self.attach_to_existing}, '
|
||||
f'has git_provider_tokens={self.git_provider_tokens is not None}',
|
||||
)
|
||||
self.log(
|
||||
'info',
|
||||
f'[RESUME_PATH_DEBUG] _resume_runtime called with attach_to_existing={self.attach_to_existing}. '
|
||||
f'This is problematic if False - we are resuming but not attaching!',
|
||||
)
|
||||
self.set_runtime_status(RuntimeStatus.STARTING_RUNTIME)
|
||||
try:
|
||||
response = self._send_runtime_api_request(
|
||||
@@ -343,6 +414,11 @@ class RemoteRuntime(ActionExecutionClient):
|
||||
raise
|
||||
|
||||
try:
|
||||
self.log(
|
||||
'info',
|
||||
'[ENV_SETUP_DEBUG] Calling setup_initial_env() after resume. '
|
||||
f'attach_to_existing={self.attach_to_existing} - if False, this might re-initialize things!',
|
||||
)
|
||||
self.setup_initial_env()
|
||||
self.log('info', 'Successfully set up initial environment after resume')
|
||||
except Exception as e:
|
||||
@@ -354,6 +430,10 @@ class RemoteRuntime(ActionExecutionClient):
|
||||
raise
|
||||
|
||||
self.log('info', 'Runtime successfully resumed and alive.')
|
||||
self.log(
|
||||
'info',
|
||||
f'[RESUME_COMPLETE_DEBUG] Resume completed with attach_to_existing={self.attach_to_existing}',
|
||||
)
|
||||
|
||||
def _parse_runtime_response(self, response: httpx.Response) -> None:
|
||||
start_response = response.json()
|
||||
|
||||
@@ -78,8 +78,16 @@ class ConversationManager(ABC):
|
||||
|
||||
async def is_agent_loop_running(self, sid: str) -> bool:
|
||||
"""Check if an agent loop is running for the given session ID."""
|
||||
from openhands.core.logger import openhands_logger as logger
|
||||
|
||||
sids = await self.get_running_agent_loops(filter_to_sids={sid})
|
||||
return bool(sids)
|
||||
is_running = bool(sids)
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] is_agent_loop_running check: '
|
||||
f'sid={sid}, result={is_running}, '
|
||||
f'found_sids={sids}'
|
||||
)
|
||||
return is_running
|
||||
|
||||
@abstractmethod
|
||||
async def get_running_agent_loops(
|
||||
|
||||
@@ -159,6 +159,12 @@ class StandaloneConversationManager(ConversationManager):
|
||||
f'join_conversation:{sid}:{connection_id}',
|
||||
extra={'session_id': sid, 'user_id': user_id},
|
||||
)
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] ConversationManager.join_conversation: '
|
||||
f'sid={sid}, connection_id={connection_id}, '
|
||||
f'has_settings={settings is not None}, '
|
||||
f'SOURCE=conversation_manager (entry point for joins)'
|
||||
)
|
||||
await self.sio.enter_room(connection_id, ROOM_KEY.format(sid=sid))
|
||||
self._local_connection_id_to_session_id[connection_id] = sid
|
||||
agent_loop_info = await self.maybe_start_agent_loop(sid, settings, user_id)
|
||||
@@ -251,6 +257,13 @@ class StandaloneConversationManager(ConversationManager):
|
||||
# Get all items and convert to list for sorting
|
||||
items: Iterable[tuple[str, Session]] = self._local_agent_loops_by_sid.items()
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Standalone.get_running_agent_loops: '
|
||||
f'found {len(self._local_agent_loops_by_sid)} sessions, '
|
||||
f'filter_to_sids={filter_to_sids}, '
|
||||
f'session_ids={list(self._local_agent_loops_by_sid.keys())}'
|
||||
)
|
||||
|
||||
# Filter items if needed
|
||||
if filter_to_sids is not None:
|
||||
items = (item for item in items if item[0] in filter_to_sids)
|
||||
@@ -286,11 +299,25 @@ class StandaloneConversationManager(ConversationManager):
|
||||
replay_json: str | None = None,
|
||||
) -> AgentLoopInfo:
|
||||
logger.info(f'maybe_start_agent_loop:{sid}', extra={'session_id': sid})
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] StandaloneConversationManager.maybe_start_agent_loop ENTRY: '
|
||||
f'sid={sid}, user_id={user_id}'
|
||||
)
|
||||
session = self._local_agent_loops_by_sid.get(sid)
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] maybe_start_agent_loop: '
|
||||
f'sid={sid}, session_exists={session is not None}, '
|
||||
f'will_start_new={session is None}'
|
||||
)
|
||||
if not session:
|
||||
logger.info(f'[TOKEN_DEBUG] Starting NEW agent loop for sid={sid}')
|
||||
session = await self._start_agent_loop(
|
||||
sid, settings, user_id, initial_user_msg, replay_json
|
||||
)
|
||||
else:
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Using EXISTING agent loop for sid={sid} - THIS IS RESUME!'
|
||||
)
|
||||
return self._agent_loop_info_from_session(session)
|
||||
|
||||
async def _start_agent_loop(
|
||||
@@ -302,6 +329,14 @@ class StandaloneConversationManager(ConversationManager):
|
||||
replay_json: str | None = None,
|
||||
) -> Session:
|
||||
logger.info(f'starting_agent_loop:{sid}', extra={'session_id': sid})
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] StandaloneConversationManager._start_agent_loop CALLED: '
|
||||
f'sid={sid}, user_id={user_id}, '
|
||||
f'has_settings={settings is not None}, '
|
||||
f'has_initial_msg={initial_user_msg is not None}, '
|
||||
f'has_replay={replay_json is not None}, '
|
||||
f'SOURCE=standalone._start_agent_loop'
|
||||
)
|
||||
|
||||
response_ids = await self.get_running_agent_loops(user_id)
|
||||
if len(response_ids) >= self.config.max_concurrent_conversations:
|
||||
|
||||
@@ -49,6 +49,11 @@ async def connect(connection_id: str, environ: dict) -> None:
|
||||
logger.info(
|
||||
f'Socket request for conversation {conversation_id} with connection_id {connection_id}'
|
||||
)
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] SocketIO connect: conversation_id={conversation_id}, '
|
||||
f'connection_id={connection_id}, latest_event_id={latest_event_id}, '
|
||||
f'SOURCE=listen_socket.py (SocketIO entry point)'
|
||||
)
|
||||
raw_list = query_params.get('providers_set', [])
|
||||
providers_list = []
|
||||
for item in raw_list:
|
||||
@@ -121,6 +126,11 @@ async def connect(connection_id: str, environ: dict) -> None:
|
||||
user_id, conversation_id, providers_set
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] About to join conversation: conversation_id={conversation_id}, '
|
||||
f'has_conversation_init_data={conversation_init_data is not None}'
|
||||
)
|
||||
|
||||
agent_loop_info = await conversation_manager.join_conversation(
|
||||
conversation_id,
|
||||
connection_id,
|
||||
|
||||
@@ -341,9 +341,25 @@ async def get_conversation(
|
||||
filter_to_sids={conversation_id}
|
||||
)
|
||||
agent_loop_info = agent_loop_infos[0] if agent_loop_infos else None
|
||||
|
||||
# Add debug logging
|
||||
logger.info(
|
||||
f'[FRONTEND_DEBUG] GET /conversations/{conversation_id}: '
|
||||
f'agent_loop_status={agent_loop_info.status if agent_loop_info else None}, '
|
||||
f'num_connections={num_connections}, '
|
||||
f'has_runtime={agent_loop_info is not None}'
|
||||
)
|
||||
|
||||
conversation_info = await _get_conversation_info(
|
||||
metadata, num_connections, agent_loop_info
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f'[FRONTEND_DEBUG] Returning conversation_info to frontend: '
|
||||
f'status={conversation_info.status if conversation_info else None}, '
|
||||
f'runtime_status={conversation_info.runtime_status if conversation_info else None}'
|
||||
)
|
||||
|
||||
return conversation_info
|
||||
except FileNotFoundError:
|
||||
return None
|
||||
@@ -485,6 +501,13 @@ async def start_conversation(
|
||||
return the existing agent loop info.
|
||||
"""
|
||||
logger.info(f'Starting conversation: {conversation_id}')
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] REST API /start endpoint: '
|
||||
f'conversation_id={conversation_id}, '
|
||||
f'user_id={user_id}, '
|
||||
f'has_providers={providers_set.providers_set is not None and len(providers_set.providers_set) > 0}, '
|
||||
f'SOURCE=manage_conversations.py (REST API start)'
|
||||
)
|
||||
|
||||
try:
|
||||
# Check that the conversation exists
|
||||
@@ -505,11 +528,33 @@ async def start_conversation(
|
||||
)
|
||||
|
||||
# Start the agent loop
|
||||
# Log the actual conversation manager type
|
||||
manager_class = type(conversation_manager).__name__
|
||||
manager_module = type(conversation_manager).__module__
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] Conversation manager type: class={manager_class}, '
|
||||
f'module={manager_module}, SOURCE=manage_conversations.py'
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] About to call maybe_start_agent_loop from REST API: '
|
||||
f'conversation_id={conversation_id}'
|
||||
)
|
||||
agent_loop_info = await conversation_manager.maybe_start_agent_loop(
|
||||
sid=conversation_id,
|
||||
settings=conversation_init_data,
|
||||
user_id=user_id,
|
||||
)
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] maybe_start_agent_loop returned: status={agent_loop_info.status}'
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f'[FRONTEND_DEBUG] Returning to frontend from /start endpoint: '
|
||||
f'status=ok, conversation_status={agent_loop_info.status}, '
|
||||
f'conversation_id={conversation_id}. '
|
||||
f'Frontend will receive this status and should handle it appropriately.'
|
||||
)
|
||||
|
||||
return ConversationResponse(
|
||||
status='ok',
|
||||
|
||||
@@ -114,7 +114,27 @@ class AgentSession:
|
||||
- agent_to_llm_config:
|
||||
- agent_configs:
|
||||
"""
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] AgentSession.start() called: '
|
||||
f'sid={self.sid}, '
|
||||
f'runtime_name={runtime_name}, '
|
||||
f'has_git_tokens={git_provider_tokens is not None}, '
|
||||
f'selected_repository={selected_repository}, '
|
||||
f'has_initial_message={initial_message is not None}, '
|
||||
f'has_replay_json={replay_json is not None}, '
|
||||
f'SOURCE=agent_session.py (WebSocket API)'
|
||||
)
|
||||
|
||||
if git_provider_tokens:
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] Git provider tokens present: {list(git_provider_tokens.keys())}'
|
||||
)
|
||||
|
||||
if self.controller or self.runtime:
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] Session already started - controller exists: {self.controller is not None}, '
|
||||
f'runtime exists: {self.runtime is not None}'
|
||||
)
|
||||
raise RuntimeError(
|
||||
'Session already started. You need to close this session and start a new one.'
|
||||
)
|
||||
@@ -132,6 +152,11 @@ class AgentSession:
|
||||
custom_secrets=custom_secrets if custom_secrets else {} # type: ignore[arg-type]
|
||||
)
|
||||
try:
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] About to call _create_runtime: '
|
||||
f'runtime_name={runtime_name}, '
|
||||
f'has_tokens={git_provider_tokens is not None}'
|
||||
)
|
||||
runtime_connected = await self._create_runtime(
|
||||
runtime_name=runtime_name,
|
||||
config=config,
|
||||
@@ -141,18 +166,40 @@ class AgentSession:
|
||||
selected_repository=selected_repository,
|
||||
selected_branch=selected_branch,
|
||||
)
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] _create_runtime returned: runtime_connected={runtime_connected}'
|
||||
)
|
||||
|
||||
if not runtime_connected:
|
||||
self.logger.error(
|
||||
'[TOKEN_DEBUG] Runtime connection failed but continuing with session start'
|
||||
)
|
||||
|
||||
repo_directory = None
|
||||
if self.runtime and runtime_connected and selected_repository:
|
||||
repo_directory = selected_repository.split('/')[-1]
|
||||
|
||||
self.logger.info(
|
||||
'[TOKEN_DEBUG] Setting up provider handler and event stream secrets'
|
||||
)
|
||||
if git_provider_tokens:
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] Creating ProviderHandler with tokens: {list(git_provider_tokens.keys())}'
|
||||
)
|
||||
provider_handler = ProviderHandler(provider_tokens=git_provider_tokens)
|
||||
await provider_handler.set_event_stream_secrets(self.event_stream)
|
||||
self.logger.info(
|
||||
'[TOKEN_DEBUG] Provider handler event stream secrets set successfully'
|
||||
)
|
||||
else:
|
||||
self.logger.info('[TOKEN_DEBUG] No git provider tokens to set up')
|
||||
|
||||
if custom_secrets:
|
||||
self.logger.info('[TOKEN_DEBUG] Setting custom secrets in event stream')
|
||||
custom_secrets_handler.set_event_stream_secrets(self.event_stream)
|
||||
self.logger.info('[TOKEN_DEBUG] Custom secrets set successfully')
|
||||
|
||||
self.logger.info('[TOKEN_DEBUG] About to create memory')
|
||||
self.memory = await self._create_memory(
|
||||
selected_repository=selected_repository,
|
||||
repo_directory=repo_directory,
|
||||
@@ -161,13 +208,24 @@ class AgentSession:
|
||||
custom_secrets_descriptions=custom_secrets_handler.get_custom_secrets_descriptions(),
|
||||
working_dir=config.workspace_mount_path_in_sandbox,
|
||||
)
|
||||
self.logger.info('[TOKEN_DEBUG] Memory created successfully')
|
||||
|
||||
# NOTE: this needs to happen before controller is created
|
||||
# so MCP tools can be included into the SystemMessageAction
|
||||
self.logger.info(
|
||||
'[TOKEN_DEBUG] Checking if MCP tools should be added to agent'
|
||||
)
|
||||
if self.runtime and runtime_connected and agent.config.enable_mcp:
|
||||
self.logger.info('[TOKEN_DEBUG] Adding MCP tools to agent')
|
||||
await add_mcp_tools_to_agent(agent, self.runtime, self.memory)
|
||||
self.logger.info('[TOKEN_DEBUG] MCP tools added successfully')
|
||||
else:
|
||||
self.logger.info(
|
||||
'[TOKEN_DEBUG] Skipping MCP tools - either no runtime, not connected, or MCP disabled'
|
||||
)
|
||||
|
||||
if replay_json:
|
||||
self.logger.info('[TOKEN_DEBUG] Running replay scenario')
|
||||
initial_message = self._run_replay(
|
||||
initial_message,
|
||||
replay_json,
|
||||
@@ -178,7 +236,9 @@ class AgentSession:
|
||||
agent_to_llm_config,
|
||||
agent_configs,
|
||||
)
|
||||
self.logger.info('[TOKEN_DEBUG] Replay completed successfully')
|
||||
else:
|
||||
self.logger.info('[TOKEN_DEBUG] About to create agent controller')
|
||||
self.controller, restored_state = self._create_controller(
|
||||
agent,
|
||||
config.security.confirmation_mode,
|
||||
@@ -187,19 +247,41 @@ class AgentSession:
|
||||
agent_to_llm_config=agent_to_llm_config,
|
||||
agent_configs=agent_configs,
|
||||
)
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] Agent controller created successfully: restored_state={restored_state}'
|
||||
)
|
||||
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] Setting up initial agent state: closed={self._closed}, has_initial_message={initial_message is not None}'
|
||||
)
|
||||
if not self._closed:
|
||||
if initial_message:
|
||||
self.logger.info(
|
||||
'[TOKEN_DEBUG] Adding initial message to event stream and setting agent to RUNNING'
|
||||
)
|
||||
self.event_stream.add_event(initial_message, EventSource.USER)
|
||||
self.event_stream.add_event(
|
||||
ChangeAgentStateAction(AgentState.RUNNING),
|
||||
EventSource.ENVIRONMENT,
|
||||
)
|
||||
self.logger.info(
|
||||
'[TOKEN_DEBUG] Agent state set to RUNNING successfully'
|
||||
)
|
||||
else:
|
||||
self.logger.info(
|
||||
'[TOKEN_DEBUG] No initial message - setting agent to AWAITING_USER_INPUT'
|
||||
)
|
||||
self.event_stream.add_event(
|
||||
ChangeAgentStateAction(AgentState.AWAITING_USER_INPUT),
|
||||
EventSource.ENVIRONMENT,
|
||||
)
|
||||
self.logger.info(
|
||||
'[TOKEN_DEBUG] Agent state set to AWAITING_USER_INPUT successfully'
|
||||
)
|
||||
|
||||
self.logger.info(
|
||||
'[TOKEN_DEBUG] Agent session start completed successfully!'
|
||||
)
|
||||
finished = True
|
||||
finally:
|
||||
self._starting = False
|
||||
@@ -321,6 +403,49 @@ class AgentSession:
|
||||
|
||||
self.logger.debug(f'Initializing runtime `{runtime_name}` now...')
|
||||
runtime_cls = get_runtime_cls(runtime_name)
|
||||
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] In _create_runtime: '
|
||||
f'runtime_cls={runtime_cls.__name__ if runtime_cls else None}, '
|
||||
f'is_RemoteRuntime={runtime_cls == RemoteRuntime}, '
|
||||
f'sid={self.sid}'
|
||||
)
|
||||
|
||||
# Log whether we should be attaching to existing or not
|
||||
# This is where we would check if runtime exists, but for now just log
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] DECISION POINT in agent_session.py: Should we attach_to_existing? '
|
||||
f'Currently hardcoded to False. Session ID: {self.sid}'
|
||||
)
|
||||
|
||||
# Check if runtime exists for debugging
|
||||
if runtime_cls == RemoteRuntime:
|
||||
import httpx
|
||||
|
||||
try:
|
||||
url = f'{config.sandbox.remote_runtime_api_url}/sessions/{self.sid}'
|
||||
headers = {}
|
||||
if config.sandbox.api_key:
|
||||
headers['X-API-Key'] = config.sandbox.api_key
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(url, headers=headers, timeout=30)
|
||||
if response.status_code == 404:
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] No existing runtime found for session {self.sid} - '
|
||||
f'should create new (attach_to_existing=False)'
|
||||
)
|
||||
else:
|
||||
data = response.json()
|
||||
status = data.get('status', '')
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] Found existing runtime for session {self.sid} with status={status} - '
|
||||
f'should attach (attach_to_existing=True) but we are using False!'
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.warning(
|
||||
f'[TOKEN_DEBUG] Error checking runtime existence: {e}'
|
||||
)
|
||||
|
||||
if runtime_cls == RemoteRuntime:
|
||||
# If provider tokens is passed in custom secrets, then remove provider from provider tokens
|
||||
# We prioritize provider tokens set in custom secrets
|
||||
@@ -328,6 +453,17 @@ class AgentSession:
|
||||
git_provider_tokens, custom_secrets
|
||||
)
|
||||
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] Creating RemoteRuntime: sid={self.sid}, '
|
||||
f'attach_to_existing=False (HARDCODED!), '
|
||||
f'has_tokens={overrided_tokens is not None}, '
|
||||
f'user_id={self.user_id}'
|
||||
)
|
||||
if overrided_tokens:
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] Provider tokens present: {list(overrided_tokens.keys())}'
|
||||
)
|
||||
|
||||
self.runtime = runtime_cls(
|
||||
config=config,
|
||||
event_stream=self.event_stream,
|
||||
@@ -362,21 +498,47 @@ class AgentSession:
|
||||
git_provider_tokens=git_provider_tokens,
|
||||
)
|
||||
|
||||
self.logger.info('[TOKEN_DEBUG] About to connect to runtime')
|
||||
try:
|
||||
await self.runtime.connect()
|
||||
self.logger.info('[TOKEN_DEBUG] Runtime connected successfully')
|
||||
except AgentRuntimeUnavailableError as e:
|
||||
self.logger.error(f'Runtime initialization failed: {e}')
|
||||
self.logger.error(f'[TOKEN_DEBUG] Runtime connection failed: {e}')
|
||||
if self._status_callback:
|
||||
self._status_callback(
|
||||
'error', RuntimeStatus.ERROR_RUNTIME_DISCONNECTED, str(e)
|
||||
)
|
||||
return False
|
||||
|
||||
await self.runtime.clone_or_init_repo(
|
||||
git_provider_tokens, selected_repository, selected_branch
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] About to clone/init repo: repository={selected_repository}, branch={selected_branch}'
|
||||
)
|
||||
await call_sync_from_async(self.runtime.maybe_run_setup_script)
|
||||
await call_sync_from_async(self.runtime.maybe_setup_git_hooks)
|
||||
try:
|
||||
await self.runtime.clone_or_init_repo(
|
||||
git_provider_tokens, selected_repository, selected_branch
|
||||
)
|
||||
self.logger.info(
|
||||
'[TOKEN_DEBUG] Repository clone/init completed successfully'
|
||||
)
|
||||
except Exception as e:
|
||||
self.logger.error(f'[TOKEN_DEBUG] Repository clone/init failed: {e}')
|
||||
raise
|
||||
|
||||
self.logger.info('[TOKEN_DEBUG] Running setup script')
|
||||
try:
|
||||
await call_sync_from_async(self.runtime.maybe_run_setup_script)
|
||||
self.logger.info('[TOKEN_DEBUG] Setup script completed successfully')
|
||||
except Exception as e:
|
||||
self.logger.error(f'[TOKEN_DEBUG] Setup script failed: {e}')
|
||||
raise
|
||||
|
||||
self.logger.info('[TOKEN_DEBUG] Setting up git hooks')
|
||||
try:
|
||||
await call_sync_from_async(self.runtime.maybe_setup_git_hooks)
|
||||
self.logger.info('[TOKEN_DEBUG] Git hooks setup completed successfully')
|
||||
except Exception as e:
|
||||
self.logger.error(f'[TOKEN_DEBUG] Git hooks setup failed: {e}')
|
||||
raise
|
||||
|
||||
self.logger.debug(
|
||||
f'Runtime initialized with plugins: {[plugin.name for plugin in self.runtime.plugins]}'
|
||||
@@ -423,7 +585,13 @@ class AgentSession:
|
||||
'-------------------------------------------------------------------------------------------'
|
||||
)
|
||||
self.logger.debug(msg)
|
||||
self.logger.info('[TOKEN_DEBUG] About to restore state from previous session')
|
||||
initial_state = self._maybe_restore_state()
|
||||
self.logger.info(
|
||||
f'[TOKEN_DEBUG] State restoration completed: has_initial_state={initial_state is not None}'
|
||||
)
|
||||
|
||||
self.logger.info('[TOKEN_DEBUG] Creating AgentController instance')
|
||||
controller = AgentController(
|
||||
sid=self.sid,
|
||||
user_id=self.user_id,
|
||||
@@ -442,6 +610,7 @@ class AgentSession:
|
||||
replay_events=replay_events,
|
||||
security_analyzer=self.runtime.security_analyzer if self.runtime else None,
|
||||
)
|
||||
self.logger.info('[TOKEN_DEBUG] AgentController instance created successfully')
|
||||
|
||||
return (controller, initial_state is not None)
|
||||
|
||||
|
||||
@@ -26,6 +26,15 @@ class ServerConversation:
|
||||
event_stream: EventStream | None = None,
|
||||
runtime: Runtime | None = None,
|
||||
):
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] ServerConversation.__init__ called: '
|
||||
f'sid={sid}, '
|
||||
f'has_runtime={runtime is not None}, '
|
||||
f'SOURCE=conversation.py (REST API)'
|
||||
)
|
||||
self.sid = sid
|
||||
self.config = config
|
||||
self.file_store = file_store
|
||||
@@ -37,8 +46,17 @@ class ServerConversation:
|
||||
|
||||
if runtime:
|
||||
self._attach_to_existing = True
|
||||
logger.info(
|
||||
'[TOKEN_DEBUG] ServerConversation using provided runtime, _attach_to_existing=True'
|
||||
)
|
||||
else:
|
||||
runtime_cls = get_runtime_cls(self.config.runtime)
|
||||
logger.info(
|
||||
f'[TOKEN_DEBUG] ServerConversation creating runtime: '
|
||||
f'runtime_cls={runtime_cls.__name__ if runtime_cls else None}, '
|
||||
f'attach_to_existing=True (HARDCODED in conversation.py!), '
|
||||
f'sid={self.sid}'
|
||||
)
|
||||
runtime = runtime_cls(
|
||||
llm_registry=LLMRegistry(self.config),
|
||||
config=config,
|
||||
|
||||
Reference in New Issue
Block a user