style: reorder credential functions in proper top-down order

High-level functions first, helpers below:
1. match_credentials_to_requirements (main entry point)
2. get_user_credentials
3. find_matching_credential
4. create_credential_meta_from_match
5. _credential_has_required_scopes (lowest-level helper)
This commit is contained in:
Otto
2026-02-03 18:43:52 +00:00
parent 5c2cae3eb3
commit 85bd23cea4

View File

@@ -225,52 +225,6 @@ async def get_or_create_library_agent(
return library_agents[0]
async def get_user_credentials(user_id: str) -> list[Credentials]:
"""Get all available credentials for a user."""
creds_manager = IntegrationCredentialsManager()
return await creds_manager.store.get_all_creds(user_id)
def _credential_has_required_scopes(
credential: Credentials,
requirements: CredentialsFieldInfo,
) -> bool:
"""Check if a credential has all the scopes required by the block."""
if credential.type != "oauth2":
return True
if not requirements.required_scopes:
return True
return set(credential.scopes).issuperset(requirements.required_scopes)
def find_matching_credential(
available_creds: list[Credentials],
field_info: CredentialsFieldInfo,
) -> Credentials | None:
"""Find a credential that matches the required provider, type, and scopes."""
for cred in available_creds:
if cred.provider not in field_info.provider:
continue
if cred.type not in field_info.supported_types:
continue
if not _credential_has_required_scopes(cred, field_info):
continue
return cred
return None
def create_credential_meta_from_match(
matching_cred: Credentials,
) -> CredentialsMetaInput:
"""Create a CredentialsMetaInput from a matched credential."""
return CredentialsMetaInput(
id=matching_cred.id,
provider=matching_cred.provider, # type: ignore
type=matching_cred.type,
title=matching_cred.title,
)
async def match_credentials_to_requirements(
user_id: str,
requirements: dict[str, CredentialsFieldInfo],
@@ -326,6 +280,52 @@ async def match_credentials_to_requirements(
return matched, missing
async def get_user_credentials(user_id: str) -> list[Credentials]:
"""Get all available credentials for a user."""
creds_manager = IntegrationCredentialsManager()
return await creds_manager.store.get_all_creds(user_id)
def find_matching_credential(
available_creds: list[Credentials],
field_info: CredentialsFieldInfo,
) -> Credentials | None:
"""Find a credential that matches the required provider, type, and scopes."""
for cred in available_creds:
if cred.provider not in field_info.provider:
continue
if cred.type not in field_info.supported_types:
continue
if not _credential_has_required_scopes(cred, field_info):
continue
return cred
return None
def create_credential_meta_from_match(
matching_cred: Credentials,
) -> CredentialsMetaInput:
"""Create a CredentialsMetaInput from a matched credential."""
return CredentialsMetaInput(
id=matching_cred.id,
provider=matching_cred.provider, # type: ignore
type=matching_cred.type,
title=matching_cred.title,
)
def _credential_has_required_scopes(
credential: Credentials,
requirements: CredentialsFieldInfo,
) -> bool:
"""Check if a credential has all the scopes required by the block."""
if credential.type != "oauth2":
return True
if not requirements.required_scopes:
return True
return set(credential.scopes).issuperset(requirements.required_scopes)
async def match_user_credentials_to_graph(
user_id: str,
graph: GraphModel,