From f66af20000bbed3a9ee84bdd2f9d53fc8bb604d1 Mon Sep 17 00:00:00 2001 From: Reinier van der Leer Date: Mon, 2 Feb 2026 17:43:54 +0100 Subject: [PATCH] improve IPv6 support & port matching logic --- .../backend/backend/data/model.py | 30 +++++++++++-------- .../backend/backend/data/model_test.py | 29 ++++++++++++++++-- .../backend/backend/util/request.py | 10 +++---- 3 files changed, 49 insertions(+), 20 deletions(-) diff --git a/autogpt_platform/backend/backend/data/model.py b/autogpt_platform/backend/backend/data/model.py index 0ca1b24a4b..5a09c591c9 100644 --- a/autogpt_platform/backend/backend/data/model.py +++ b/autogpt_platform/backend/backend/data/model.py @@ -397,21 +397,25 @@ class HostScopedCredentials(_BaseCredentials): def matches_url(self, url: str) -> bool: """Check if this credential should be applied to the given URL.""" - request_host = _extract_host_from_url(url) + request_host, request_port = _extract_host_from_url(url) + cred_scope_host, cred_scope_port = _extract_host_from_url(self.host) if not request_host: return False - if ":" not in self.host: - # No port specified in credential host, ignore port from request host - request_host = request_host.rsplit(":", 1)[0] + # If a port is specified in credential host, the request host port must match + if cred_scope_port is not None and request_port != cred_scope_port: + return False + # Non-standard ports are only allowed if explicitly specified in credential host + elif cred_scope_port is None and request_port not in (80, 443, None): + return False - # Simple host matching - exact match or wildcard subdomain match - if self.host == request_host: + # Simple host matching + if cred_scope_host == request_host: return True # Support wildcard matching (e.g., "*.example.com" matches "api.example.com") - if self.host.startswith("*."): - domain = self.host[2:] # Remove "*." + if cred_scope_host.startswith("*."): + domain = cred_scope_host[2:] # Remove "*." return request_host.endswith(f".{domain}") or request_host == domain return False @@ -553,13 +557,13 @@ class CredentialsMetaInput(BaseModel, Generic[CP, CT]): ) -def _extract_host_from_url(url: str) -> str: - """Extract host from URL for grouping host-scoped credentials.""" +def _extract_host_from_url(url: str) -> tuple[str, int | None]: + """Extract host and port from URL for grouping host-scoped credentials.""" try: parsed = parse_url(url) - return parsed.netloc or url + return parsed.hostname or url, parsed.port except Exception: - return "" + return "", None class CredentialsFieldInfo(BaseModel, Generic[CP, CT]): @@ -608,7 +612,7 @@ class CredentialsFieldInfo(BaseModel, Generic[CP, CT]): providers = frozenset( [cast(CP, "http")] + [ - cast(CP, _extract_host_from_url(str(value))) + cast(CP, parse_url(str(value)).netloc) for value in field.discriminator_values ] ) diff --git a/autogpt_platform/backend/backend/data/model_test.py b/autogpt_platform/backend/backend/data/model_test.py index 37ec6be82f..e8e2ddfa35 100644 --- a/autogpt_platform/backend/backend/data/model_test.py +++ b/autogpt_platform/backend/backend/data/model_test.py @@ -79,10 +79,23 @@ class TestHostScopedCredentials: headers={"Authorization": SecretStr("Bearer token")}, ) - assert creds.matches_url("http://localhost:8080/api/v1") + # Non-standard ports require explicit port in credential host + assert not creds.matches_url("http://localhost:8080/api/v1") assert creds.matches_url("https://localhost:443/secure/endpoint") assert creds.matches_url("http://localhost/simple") + def test_matches_url_with_explicit_port(self): + """Test URL matching with explicit port in credential host.""" + creds = HostScopedCredentials( + provider="custom", + host="localhost:8080", + headers={"Authorization": SecretStr("Bearer token")}, + ) + + assert creds.matches_url("http://localhost:8080/api/v1") + assert not creds.matches_url("http://localhost:3000/api/v1") + assert not creds.matches_url("http://localhost/simple") + def test_empty_headers_dict(self): """Test HostScopedCredentials with empty headers.""" creds = HostScopedCredentials( @@ -128,8 +141,20 @@ class TestHostScopedCredentials: ("*.example.com", "https://sub.api.example.com/test", True), ("*.example.com", "https://example.com/test", True), ("*.example.com", "https://example.org/test", False), - ("localhost", "http://localhost:3000/test", True), + # Non-standard ports require explicit port in credential host + ("localhost", "http://localhost:3000/test", False), + ("localhost:3000", "http://localhost:3000/test", True), ("localhost", "http://127.0.0.1:3000/test", False), + # IPv6 addresses (frontend stores with brackets via URL.hostname) + ("[::1]", "http://[::1]/test", True), + ("[::1]", "http://[::1]:80/test", True), + ("[::1]", "https://[::1]:443/test", True), + ("[::1]", "http://[::1]:8080/test", False), # Non-standard port + ("[::1]:8080", "http://[::1]:8080/test", True), + ("[::1]:8080", "http://[::1]:9090/test", False), + ("[2001:db8::1]", "http://[2001:db8::1]/path", True), + ("[2001:db8::1]", "https://[2001:db8::1]:443/path", True), + ("[2001:db8::1]", "http://[2001:db8::ff]/path", False), ], ) def test_url_matching_parametrized(self, host: str, test_url: str, expected: bool): diff --git a/autogpt_platform/backend/backend/util/request.py b/autogpt_platform/backend/backend/util/request.py index d55bec702c..95e5ee32f7 100644 --- a/autogpt_platform/backend/backend/util/request.py +++ b/autogpt_platform/backend/backend/util/request.py @@ -218,12 +218,12 @@ async def validate_url( def parse_url(url: str) -> URL: """Canonicalizes and parses a URL string.""" url = url.strip("/ ").replace("\\", "/") - parsed = urlparse(url) - if not parsed.scheme: - url = f"http://{url}" - parsed = urlparse(url) - return parsed + # Ensure scheme is present for proper parsing + if not re.match(r"[a-z0-9+.\-]+://", url): + url = f"http://{url}" + + return urlparse(url) def pin_url(url: URL, ip_addresses: Optional[list[str]] = None) -> URL: