refactor(backend): update get_microagent_management_conversations API to support V1 (#11313)

Co-authored-by: Tim O'Farrell <tofarr@gmail.com>
Co-authored-by: openhands <openhands@all-hands.dev>
Co-authored-by: sp.wack <83104063+amanape@users.noreply.github.com>
Co-authored-by: Engel Nyst <enyst@users.noreply.github.com>
This commit is contained in:
Hiep Le
2025-11-04 17:44:44 +07:00
committed by GitHub
parent 2fc8ab2601
commit fa431fb956
2 changed files with 421 additions and 113 deletions

View File

@@ -1104,47 +1104,154 @@ def add_experiment_config_for_conversation(
return False
@app.get('/microagent-management/conversations')
async def get_microagent_management_conversations(
selected_repository: str,
page_id: str | None = None,
limit: int = 20,
conversation_store: ConversationStore = Depends(get_conversation_store),
provider_tokens: PROVIDER_TOKEN_TYPE = Depends(get_provider_tokens),
) -> ConversationInfoResultSet:
"""Get conversations for the microagent management page with pagination support.
This endpoint returns conversations with conversation_trigger = 'microagent_management'
and only includes conversations with active PRs. Pagination is supported.
def _parse_combined_page_id(page_id: str | None) -> tuple[str | None, str | None]:
"""Parse combined page_id to extract separate V0 and V1 page_ids.
Args:
page_id: Optional page ID for pagination
limit: Maximum number of results per page (default: 20)
selected_repository: Optional repository filter to limit results to a specific repository
conversation_store: Conversation store dependency
provider_tokens: Provider tokens for checking PR status
"""
conversation_metadata_result_set = await conversation_store.search(page_id, limit)
page_id: Combined page_id (base64-encoded JSON) or legacy V0 page_id
# Apply age filter first using common function
filtered_results = _filter_conversations_by_age(
conversation_metadata_result_set.results, config.conversation_max_age_seconds
Returns:
Tuple of (v0_page_id, v1_page_id)
"""
v0_page_id = None
v1_page_id = None
if page_id:
try:
# Try to parse as JSON first
page_data = json.loads(base64.b64decode(page_id))
v0_page_id = page_data.get('v0')
v1_page_id = page_data.get('v1')
except (json.JSONDecodeError, TypeError, Exception):
# Fallback: treat as v0 page_id for backward compatibility
# This catches base64 decode errors and any other parsing issues
v0_page_id = page_id
return v0_page_id, v1_page_id
async def _fetch_v1_conversations_safe(
app_conversation_service: AppConversationService,
v1_page_id: str | None,
limit: int,
) -> tuple[list[ConversationInfo], str | None]:
"""Safely fetch V1 conversations with error handling.
Args:
app_conversation_service: App conversation service for V1
v1_page_id: Page ID for V1 pagination
limit: Maximum number of results
Returns:
Tuple of (v1_conversations, v1_next_page_id)
"""
v1_conversations = []
v1_next_page_id = None
try:
age_filter_date = None
if config.conversation_max_age_seconds:
age_filter_date = datetime.now(timezone.utc) - timedelta(
seconds=config.conversation_max_age_seconds
)
app_conversation_page = await app_conversation_service.search_app_conversations(
page_id=v1_page_id,
limit=limit,
created_at__gte=age_filter_date,
)
v1_conversations = [
_to_conversation_info(app_conv) for app_conv in app_conversation_page.items
]
v1_next_page_id = app_conversation_page.next_page_id
except Exception as e:
# V1 system might not be available or initialized yet
logger.debug(f'V1 conversation service not available: {str(e)}')
return v1_conversations, v1_next_page_id
async def _process_v0_conversations(
conversation_metadata_result_set,
) -> list[ConversationInfo]:
"""Process V0 conversations with age filtering and agent loop info.
Args:
conversation_metadata_result_set: Result set from V0 conversation store
Returns:
List of processed ConversationInfo objects
"""
# Apply age filter to V0 conversations
v0_filtered_results = _filter_conversations_by_age(
conversation_metadata_result_set.results,
config.conversation_max_age_seconds,
)
# Check if the last PR is active (not closed/merged)
provider_handler = ProviderHandler(provider_tokens)
v0_conversation_ids = set(
conversation.conversation_id for conversation in v0_filtered_results
)
# Apply additional filters
final_filtered_results = []
for conversation in filtered_results:
# Get agent loop info for V0 conversations
await conversation_manager.get_connections(filter_to_sids=v0_conversation_ids)
v0_agent_loop_info = await conversation_manager.get_agent_loop_info(
filter_to_sids=v0_conversation_ids
)
v0_agent_loop_info_by_conversation_id = {
info.conversation_id: info for info in v0_agent_loop_info
}
# Convert to ConversationInfo objects
v0_conversations = await wait_all(
_get_conversation_info(
conversation=conversation,
num_connections=sum(
1
for conversation_id in v0_agent_loop_info_by_conversation_id.values()
if conversation_id == conversation.conversation_id
),
agent_loop_info=v0_agent_loop_info_by_conversation_id.get(
conversation.conversation_id
),
)
for conversation in v0_filtered_results
)
return v0_conversations
async def _apply_microagent_filters(
conversations: list[ConversationInfo],
selected_repository: str,
provider_handler: ProviderHandler,
) -> list[ConversationInfo]:
"""Apply microagent management specific filters to conversations.
Filters conversations by:
- Trigger type (MICROAGENT_MANAGEMENT)
- Repository match
- PR status (only open PRs)
Args:
conversations: List of conversations to filter
selected_repository: Repository to filter by
provider_handler: Handler for checking PR status
Returns:
Filtered list of conversations
"""
filtered = []
for conversation in conversations:
# Only include microagent_management conversations
if conversation.trigger != ConversationTrigger.MICROAGENT_MANAGEMENT:
continue
# Apply repository filter if specified
# Apply repository filter
if conversation.selected_repository != selected_repository:
continue
# Check if PR is still open
if (
conversation.pr_number
and len(conversation.pr_number) > 0
@@ -1159,12 +1266,101 @@ async def get_microagent_management_conversations(
# Skip this conversation if the PR is closed/merged
continue
final_filtered_results.append(conversation)
filtered.append(conversation)
return await _build_conversation_result_set(
final_filtered_results, conversation_metadata_result_set.next_page_id
return filtered
def _create_combined_page_id(
v0_next_page_id: str | None, v1_next_page_id: str | None
) -> str | None:
"""Create a combined page_id from V0 and V1 page_ids.
Args:
v0_next_page_id: Next page ID for V0 conversations
v1_next_page_id: Next page ID for V1 conversations
Returns:
Base64-encoded JSON combining both page_ids, or None if no next pages
"""
if not v0_next_page_id and not v1_next_page_id:
return None
next_page_data = {
'v0': v0_next_page_id,
'v1': v1_next_page_id,
}
return base64.b64encode(json.dumps(next_page_data).encode()).decode()
@app.get('/microagent-management/conversations')
async def get_microagent_management_conversations(
selected_repository: str,
page_id: str | None = None,
limit: int = 20,
conversation_store: ConversationStore = Depends(get_conversation_store),
provider_tokens: PROVIDER_TOKEN_TYPE = Depends(get_provider_tokens),
app_conversation_service: AppConversationService = app_conversation_service_dependency,
) -> ConversationInfoResultSet:
"""Get conversations for the microagent management page with pagination support.
This endpoint returns conversations with conversation_trigger = 'microagent_management'
and only includes conversations with active PRs. Pagination is supported.
Args:
page_id: Optional page ID for pagination
limit: Maximum number of results per page (default: 20)
selected_repository: Repository filter to limit results to a specific repository
conversation_store: Conversation store dependency
provider_tokens: Provider tokens for checking PR status
app_conversation_service: App conversation service for V1 conversations
Returns:
ConversationInfoResultSet with filtered and paginated results
"""
# Parse page_id to extract V0 and V1 components
v0_page_id, v1_page_id = _parse_combined_page_id(page_id)
# Fetch V0 conversations
conversation_metadata_result_set = await conversation_store.search(
v0_page_id, limit
)
# Fetch V1 conversations (with graceful error handling)
v1_conversations, v1_next_page_id = await _fetch_v1_conversations_safe(
app_conversation_service, v1_page_id, limit
)
# Process V0 conversations
v0_conversations = await _process_v0_conversations(conversation_metadata_result_set)
# Apply microagent-specific filters
provider_handler = ProviderHandler(provider_tokens)
v0_filtered = await _apply_microagent_filters(
v0_conversations, selected_repository, provider_handler
)
v1_filtered = await _apply_microagent_filters(
v1_conversations, selected_repository, provider_handler
)
# Combine and sort results
all_conversations = v0_filtered + v1_filtered
all_conversations.sort(
key=lambda x: x.created_at or datetime.min.replace(tzinfo=timezone.utc),
reverse=True,
)
# Limit to requested number of results
final_results = all_conversations[:limit]
# Create combined page_id for pagination
next_page_id = _create_combined_page_id(
conversation_metadata_result_set.next_page_id, v1_next_page_id
)
return ConversationInfoResultSet(results=final_results, next_page_id=next_page_id)
def _to_conversation_info(app_conversation: AppConversation) -> ConversationInfo:
"""Convert a V1 AppConversation into an old style ConversationInfo"""