Compare commits

...

4 Commits

Author SHA1 Message Date
Chuck Butkus
5a726cf068 Don't pass session 2026-01-26 19:08:05 -05:00
Chuck Butkus
d2a9788fe9 Merge branch 'main' into debug-logging 2026-01-26 18:48:14 -05:00
Chuck Butkus
f4889039ae Update to async Rolestore call 2026-01-26 18:40:11 -05:00
Chuck Butkus
f3901ae9ea Add logging 2026-01-26 14:38:09 -05:00
3 changed files with 98 additions and 19 deletions

View File

@@ -96,7 +96,7 @@ class LiteLlmManager:
user_settings: UserSettings,
) -> UserSettings | None:
logger.info(
'SettingsStore:umigrate_lite_llm_entries:start',
'LiteLlmManager:migrate_lite_llm_entries:start',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
if LITE_LLM_API_KEY is None or LITE_LLM_API_URL is None:
@@ -141,19 +141,35 @@ class LiteLlmManager:
return None
credits = max(max_budget - spend, 0.0)
logger.info(
'LiteLlmManager:migrate_lite_llm_entries:create_team',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
await LiteLlmManager._create_team(
client, keycloak_user_id, org_id, credits
)
logger.info(
'LiteLlmManager:migrate_lite_llm_entries:update_user',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
await LiteLlmManager._update_user(
client, keycloak_user_id, max_budget=UNLIMITED_BUDGET_SETTING
)
logger.info(
'LiteLlmManager:migrate_lite_llm_entries:add_user_to_team',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
await LiteLlmManager._add_user_to_team(
client, keycloak_user_id, org_id, credits
)
if user_settings.llm_api_key:
logger.info(
'LiteLlmManager:migrate_lite_llm_entries:update_key',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
await LiteLlmManager._update_key(
client,
keycloak_user_id,
@@ -162,6 +178,10 @@ class LiteLlmManager:
)
if user_settings.llm_api_key_for_byor:
logger.info(
'LiteLlmManager:migrate_lite_llm_entries:update_byor_key',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
await LiteLlmManager._update_key(
client,
keycloak_user_id,
@@ -169,6 +189,10 @@ class LiteLlmManager:
team_id=org_id,
)
logger.info(
'LiteLlmManager:migrate_lite_llm_entries:end',
extra={'org_id': org_id, 'user_id': keycloak_user_id},
)
return user_settings
@staticmethod

View File

@@ -4,7 +4,9 @@ Store class for managing roles.
from typing import List, Optional
from storage.database import session_maker
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from storage.database import a_session_maker, session_maker
from storage.role import Role
@@ -33,6 +35,20 @@ class RoleStore:
with session_maker() as session:
return session.query(Role).filter(Role.name == name).first()
@staticmethod
async def get_role_by_name_async(
name: str,
session: Optional[AsyncSession] = None,
) -> Optional[Role]:
"""Get role by name."""
if session is not None:
result = await session.execute(select(Role).where(Role.name == name))
return result.scalars().first()
async with a_session_maker() as session:
result = await session.execute(select(Role).where(Role.name == name))
return result.scalars().first()
@staticmethod
def list_roles() -> List[Role]:
"""List all roles."""

View File

@@ -14,9 +14,9 @@ from server.constants import (
get_default_litellm_model,
)
from server.logger import logger
from sqlalchemy import text
from sqlalchemy import select, text
from sqlalchemy.orm import joinedload
from storage.database import session_maker
from storage.database import a_session_maker, session_maker
from storage.encrypt_utils import decrypt_legacy_model
from storage.org import Org
from storage.org_member import OrgMember
@@ -116,7 +116,7 @@ class UserStore:
redis_client = UserStore._get_redis_client()
if redis_client is None:
logger.warning(
'saas_settings_store:_acquire_user_creation_lock:no_redis_client',
'user_store:_acquire_user_creation_lock:no_redis_client',
extra={'user_id': user_id},
)
return True # Proceed without locking if Redis is unavailable
@@ -159,12 +159,20 @@ class UserStore:
from storage.lite_llm_manager import LiteLlmManager
logger.info(
'user_store:migrate_user:calling_litellm_migrate_entries',
extra={'user_id': user_id},
)
await LiteLlmManager.migrate_entries(
str(org.id),
user_id,
decrypted_user_settings,
)
logger.info(
'user_store:migrate_user:done_litellm_migrate_entries',
extra={'user_id': user_id},
)
custom_settings = UserStore._has_custom_settings(
decrypted_user_settings, user_settings.user_version
)
@@ -172,7 +180,15 @@ class UserStore:
# avoids circular reference. This migrate method is temprorary until all users are migrated.
from integrations.stripe_service import migrate_customer
logger.info(
'user_store:migrate_user:calling_stripe_migrate_customer',
extra={'user_id': user_id},
)
await migrate_customer(session, user_id, org)
logger.info(
'user_store:migrate_user:done_stripe_migrate_customer',
extra={'user_id': user_id},
)
from storage.org_store import OrgStore
@@ -201,7 +217,15 @@ class UserStore:
)
session.add(user)
role = RoleStore.get_role_by_name('owner')
logger.info(
'user_store:migrate_user:calling_get_role_by_name',
extra={'user_id': user_id},
)
role = await RoleStore.get_role_by_name_async('owner')
logger.info(
'user_store:migrate_user:done_get_role_by_name',
extra={'user_id': user_id},
)
from storage.org_member_store import OrgMemberStore
@@ -229,6 +253,10 @@ class UserStore:
user_settings.already_migrated = True
session.merge(user_settings)
session.flush()
logger.info(
'user_store:migrate_user:session_flush_complete',
extra={'user_id': user_id},
)
# need to migrate conversation metadata
session.execute(
@@ -296,6 +324,10 @@ class UserStore:
session.commit()
session.refresh(user)
user.org_members # load org_members
logger.info(
'user_store:migrate_user:session_committed',
extra={'user_id': user_id},
)
return user
@staticmethod
@@ -322,7 +354,7 @@ class UserStore:
):
# The user is already being created in another thread / process
logger.info(
'saas_settings_store:create_default_settings:waiting_for_lock',
'user_store:create_default_settings:waiting_for_lock',
extra={'user_id': user_id},
)
call_async_from_sync(
@@ -372,13 +404,13 @@ class UserStore:
This is the preferred method when calling from an async context as it
avoids event loop conflicts that can occur with the sync version.
"""
with session_maker() as session:
user = (
session.query(User)
async with a_session_maker() as session:
result = await session.execute(
select(User)
.options(joinedload(User.org_members))
.filter(User.id == uuid.UUID(user_id))
.first()
)
user = result.scalars().first()
if user:
return user
@@ -386,32 +418,39 @@ class UserStore:
while not await UserStore._acquire_user_creation_lock(user_id):
# The user is already being created in another thread / process
logger.info(
'saas_settings_store:create_default_settings:waiting_for_lock',
'user_store:get_user_by_id_async:waiting_for_lock',
extra={'user_id': user_id},
)
await asyncio.sleep(_RETRY_LOAD_DELAY_SECONDS)
# Check for user again as migration could have happened while trying to get the lock.
user = (
session.query(User)
result = await session.execute(
select(User)
.options(joinedload(User.org_members))
.filter(User.id == uuid.UUID(user_id))
.first()
)
user = result.scalars().first()
if user:
return user
user_settings = (
session.query(UserSettings)
.filter(
logger.info(
'user_store:get_user_by_id_async:start_migration',
extra={'user_id': user_id},
)
result = await session.execute(
select(UserSettings).filter(
UserSettings.keycloak_user_id == user_id,
UserSettings.already_migrated.is_(False),
)
.first()
)
user_settings = result.scalars().first()
if user_settings:
token_manager = TokenManager()
user_info = await token_manager.get_user_info_from_user_id(user_id)
logger.info(
'user_store:get_user_by_id_async:calling_migrate_user',
extra={'user_id': user_id},
)
user = await UserStore.migrate_user(
user_id,
user_settings,