diff --git a/autogpt_platform/backend/backend/api/features/chat/tools/utils.py b/autogpt_platform/backend/backend/api/features/chat/tools/utils.py index 0046d0b249..bd25594b8a 100644 --- a/autogpt_platform/backend/backend/api/features/chat/tools/utils.py +++ b/autogpt_platform/backend/backend/api/features/chat/tools/utils.py @@ -8,7 +8,12 @@ from backend.api.features.library import model as library_model from backend.api.features.store import db as store_db from backend.data import graph as graph_db from backend.data.graph import GraphModel -from backend.data.model import Credentials, CredentialsFieldInfo, CredentialsMetaInput +from backend.data.model import ( + CredentialsFieldInfo, + CredentialsMetaInput, + HostScopedCredentials, + OAuth2Credentials, +) from backend.integrations.creds_manager import IntegrationCredentialsManager from backend.util.exceptions import NotFoundError @@ -273,7 +278,14 @@ async def match_user_credentials_to_graph( for cred in available_creds if cred.provider in credential_requirements.provider and cred.type in credential_requirements.supported_types - and _credential_has_required_scopes(cred, credential_requirements) + and ( + cred.type != "oauth2" + or _credential_has_required_scopes(cred, credential_requirements) + ) + and ( + cred.type != "host_scoped" + or _credential_is_for_host(cred, credential_requirements) + ) ), None, ) @@ -318,19 +330,10 @@ async def match_user_credentials_to_graph( def _credential_has_required_scopes( - credential: Credentials, + credential: OAuth2Credentials, requirements: CredentialsFieldInfo, ) -> bool: - """ - Check if a credential has all the scopes required by the block. - - For OAuth2 credentials, verifies that the credential's scopes are a superset - of the required scopes. For other credential types, returns True (no scope check). - """ - # Only OAuth2 credentials have scopes to check - if credential.type != "oauth2": - return True - + """Check if an OAuth2 credential has all the scopes required by the input.""" # If no scopes are required, any credential matches if not requirements.required_scopes: return True @@ -339,6 +342,22 @@ def _credential_has_required_scopes( return set(credential.scopes).issuperset(requirements.required_scopes) +def _credential_is_for_host( + credential: HostScopedCredentials, + requirements: CredentialsFieldInfo, +) -> bool: + """Check if a host-scoped credential matches the host required by the input.""" + # We need to know the host to match host-scoped credentials to. + # Graph.aggregate_credentials_inputs() adds the node's set URL value (if any) + # to discriminator_values. No discriminator_values -> no host to match against. + if not requirements.discriminator_values: + return True + + # Check that credential host matches required host. + # Host-scoped credential inputs are grouped by host, so any item from the set works. + return credential.matches_url(list(requirements.discriminator_values)[0]) + + async def check_user_has_required_credentials( user_id: str, required_credentials: list[CredentialsMetaInput], diff --git a/autogpt_platform/backend/backend/data/model.py b/autogpt_platform/backend/backend/data/model.py index 331126fbd6..5a09c591c9 100644 --- a/autogpt_platform/backend/backend/data/model.py +++ b/autogpt_platform/backend/backend/data/model.py @@ -19,7 +19,6 @@ from typing import ( cast, get_args, ) -from urllib.parse import urlparse from uuid import uuid4 from prisma.enums import CreditTransactionType, OnboardingStep @@ -42,6 +41,7 @@ from typing_extensions import TypedDict from backend.integrations.providers import ProviderName from backend.util.json import loads as json_loads +from backend.util.request import parse_url from backend.util.settings import Secrets # Type alias for any provider name (including custom ones) @@ -397,19 +397,25 @@ class HostScopedCredentials(_BaseCredentials): def matches_url(self, url: str) -> bool: """Check if this credential should be applied to the given URL.""" - parsed_url = urlparse(url) - # Extract hostname without port - request_host = parsed_url.hostname + request_host, request_port = _extract_host_from_url(url) + cred_scope_host, cred_scope_port = _extract_host_from_url(self.host) if not request_host: return False - # Simple host matching - exact match or wildcard subdomain match - if self.host == request_host: + # If a port is specified in credential host, the request host port must match + if cred_scope_port is not None and request_port != cred_scope_port: + return False + # Non-standard ports are only allowed if explicitly specified in credential host + elif cred_scope_port is None and request_port not in (80, 443, None): + return False + + # Simple host matching + if cred_scope_host == request_host: return True # Support wildcard matching (e.g., "*.example.com" matches "api.example.com") - if self.host.startswith("*."): - domain = self.host[2:] # Remove "*." + if cred_scope_host.startswith("*."): + domain = cred_scope_host[2:] # Remove "*." return request_host.endswith(f".{domain}") or request_host == domain return False @@ -551,13 +557,13 @@ class CredentialsMetaInput(BaseModel, Generic[CP, CT]): ) -def _extract_host_from_url(url: str) -> str: - """Extract host from URL for grouping host-scoped credentials.""" +def _extract_host_from_url(url: str) -> tuple[str, int | None]: + """Extract host and port from URL for grouping host-scoped credentials.""" try: - parsed = urlparse(url) - return parsed.hostname or url + parsed = parse_url(url) + return parsed.hostname or url, parsed.port except Exception: - return "" + return "", None class CredentialsFieldInfo(BaseModel, Generic[CP, CT]): @@ -606,7 +612,7 @@ class CredentialsFieldInfo(BaseModel, Generic[CP, CT]): providers = frozenset( [cast(CP, "http")] + [ - cast(CP, _extract_host_from_url(str(value))) + cast(CP, parse_url(str(value)).netloc) for value in field.discriminator_values ] ) diff --git a/autogpt_platform/backend/backend/data/model_test.py b/autogpt_platform/backend/backend/data/model_test.py index 37ec6be82f..e8e2ddfa35 100644 --- a/autogpt_platform/backend/backend/data/model_test.py +++ b/autogpt_platform/backend/backend/data/model_test.py @@ -79,10 +79,23 @@ class TestHostScopedCredentials: headers={"Authorization": SecretStr("Bearer token")}, ) - assert creds.matches_url("http://localhost:8080/api/v1") + # Non-standard ports require explicit port in credential host + assert not creds.matches_url("http://localhost:8080/api/v1") assert creds.matches_url("https://localhost:443/secure/endpoint") assert creds.matches_url("http://localhost/simple") + def test_matches_url_with_explicit_port(self): + """Test URL matching with explicit port in credential host.""" + creds = HostScopedCredentials( + provider="custom", + host="localhost:8080", + headers={"Authorization": SecretStr("Bearer token")}, + ) + + assert creds.matches_url("http://localhost:8080/api/v1") + assert not creds.matches_url("http://localhost:3000/api/v1") + assert not creds.matches_url("http://localhost/simple") + def test_empty_headers_dict(self): """Test HostScopedCredentials with empty headers.""" creds = HostScopedCredentials( @@ -128,8 +141,20 @@ class TestHostScopedCredentials: ("*.example.com", "https://sub.api.example.com/test", True), ("*.example.com", "https://example.com/test", True), ("*.example.com", "https://example.org/test", False), - ("localhost", "http://localhost:3000/test", True), + # Non-standard ports require explicit port in credential host + ("localhost", "http://localhost:3000/test", False), + ("localhost:3000", "http://localhost:3000/test", True), ("localhost", "http://127.0.0.1:3000/test", False), + # IPv6 addresses (frontend stores with brackets via URL.hostname) + ("[::1]", "http://[::1]/test", True), + ("[::1]", "http://[::1]:80/test", True), + ("[::1]", "https://[::1]:443/test", True), + ("[::1]", "http://[::1]:8080/test", False), # Non-standard port + ("[::1]:8080", "http://[::1]:8080/test", True), + ("[::1]:8080", "http://[::1]:9090/test", False), + ("[2001:db8::1]", "http://[2001:db8::1]/path", True), + ("[2001:db8::1]", "https://[2001:db8::1]:443/path", True), + ("[2001:db8::1]", "http://[2001:db8::ff]/path", False), ], ) def test_url_matching_parametrized(self, host: str, test_url: str, expected: bool): diff --git a/autogpt_platform/backend/backend/util/request.py b/autogpt_platform/backend/backend/util/request.py index 9744372b15..95e5ee32f7 100644 --- a/autogpt_platform/backend/backend/util/request.py +++ b/autogpt_platform/backend/backend/util/request.py @@ -157,12 +157,7 @@ async def validate_url( is_trusted: Boolean indicating if the hostname is in trusted_origins ip_addresses: List of IP addresses for the host; empty if the host is trusted """ - # Canonicalize URL - url = url.strip("/ ").replace("\\", "/") - parsed = urlparse(url) - if not parsed.scheme: - url = f"http://{url}" - parsed = urlparse(url) + parsed = parse_url(url) # Check scheme if parsed.scheme not in ALLOWED_SCHEMES: @@ -220,6 +215,17 @@ async def validate_url( ) +def parse_url(url: str) -> URL: + """Canonicalizes and parses a URL string.""" + url = url.strip("/ ").replace("\\", "/") + + # Ensure scheme is present for proper parsing + if not re.match(r"[a-z0-9+.\-]+://", url): + url = f"http://{url}" + + return urlparse(url) + + def pin_url(url: URL, ip_addresses: Optional[list[str]] = None) -> URL: """ Pins a URL to a specific IP address to prevent DNS rebinding attacks. diff --git a/autogpt_platform/frontend/src/components/contextual/CredentialsInput/components/HotScopedCredentialsModal/HotScopedCredentialsModal.tsx b/autogpt_platform/frontend/src/components/contextual/CredentialsInput/components/HotScopedCredentialsModal/HotScopedCredentialsModal.tsx index 547952841b..63d2ae1ac5 100644 --- a/autogpt_platform/frontend/src/components/contextual/CredentialsInput/components/HotScopedCredentialsModal/HotScopedCredentialsModal.tsx +++ b/autogpt_platform/frontend/src/components/contextual/CredentialsInput/components/HotScopedCredentialsModal/HotScopedCredentialsModal.tsx @@ -41,7 +41,17 @@ export function HostScopedCredentialsModal({ const currentHost = currentUrl ? getHostFromUrl(currentUrl) : ""; const formSchema = z.object({ - host: z.string().min(1, "Host is required"), + host: z + .string() + .min(1, "Host is required") + .refine((val) => !/^[a-zA-Z][a-zA-Z\d+\-.]*:\/\//.test(val), { + message: "Enter only the host (e.g. api.example.com), not a full URL", + }) + .refine((val) => !val.includes("/"), { + message: + "Enter only the host (e.g. api.example.com), without a trailing path. " + + "You may specify a port (e.g. api.example.com:8080) if needed.", + }), title: z.string().optional(), headers: z.record(z.string()).optional(), });