diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/utils.py b/autogpt_platform/backend/backend/api/features/chat/tools/utils.py index 5b802d2456..71d6ea0619 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/utils.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/utils.py @@ -225,52 +225,6 @@ async def get_or_create_library_agent( return library_agents[0] -async def get_user_credentials(user_id: str) -> list[Credentials]: - """Get all available credentials for a user.""" - creds_manager = IntegrationCredentialsManager() - return await creds_manager.store.get_all_creds(user_id) - - -def _credential_has_required_scopes( - credential: Credentials, - requirements: CredentialsFieldInfo, -) -> bool: - """Check if a credential has all the scopes required by the block.""" - if credential.type != "oauth2": - return True - if not requirements.required_scopes: - return True - return set(credential.scopes).issuperset(requirements.required_scopes) - - -def find_matching_credential( - available_creds: list[Credentials], - field_info: CredentialsFieldInfo, -) -> Credentials | None: - """Find a credential that matches the required provider, type, and scopes.""" - for cred in available_creds: - if cred.provider not in field_info.provider: - continue - if cred.type not in field_info.supported_types: - continue - if not _credential_has_required_scopes(cred, field_info): - continue - return cred - return None - - -def create_credential_meta_from_match( - matching_cred: Credentials, -) -> CredentialsMetaInput: - """Create a CredentialsMetaInput from a matched credential.""" - return CredentialsMetaInput( - id=matching_cred.id, - provider=matching_cred.provider, # type: ignore - type=matching_cred.type, - title=matching_cred.title, - ) - - async def match_credentials_to_requirements( user_id: str, requirements: dict[str, CredentialsFieldInfo], @@ -326,6 +280,52 @@ async def match_credentials_to_requirements( return matched, missing +async def get_user_credentials(user_id: str) -> list[Credentials]: + """Get all available credentials for a user.""" + creds_manager = IntegrationCredentialsManager() + return await creds_manager.store.get_all_creds(user_id) + + +def find_matching_credential( + available_creds: list[Credentials], + field_info: CredentialsFieldInfo, +) -> Credentials | None: + """Find a credential that matches the required provider, type, and scopes.""" + for cred in available_creds: + if cred.provider not in field_info.provider: + continue + if cred.type not in field_info.supported_types: + continue + if not _credential_has_required_scopes(cred, field_info): + continue + return cred + return None + + +def create_credential_meta_from_match( + matching_cred: Credentials, +) -> CredentialsMetaInput: + """Create a CredentialsMetaInput from a matched credential.""" + return CredentialsMetaInput( + id=matching_cred.id, + provider=matching_cred.provider, # type: ignore + type=matching_cred.type, + title=matching_cred.title, + ) + + +def _credential_has_required_scopes( + credential: Credentials, + requirements: CredentialsFieldInfo, +) -> bool: + """Check if a credential has all the scopes required by the block.""" + if credential.type != "oauth2": + return True + if not requirements.required_scopes: + return True + return set(credential.scopes).issuperset(requirements.required_scopes) + + async def match_user_credentials_to_graph( user_id: str, graph: GraphModel,