mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
fix(backend): Add stricter URL validation for block requests (#8890)
We need stricter URL validation for the hostname we can request in the block code. ### Changes 🏗️ * Canonicalization: Ensures \ are converted to /, adds http:// if missing, and normalizes the input URL. * Scheme Check: Only http or https are allowed. * Hostname Validation: - Ensures a hostname exists. - Converts it to an IDNA ASCII form to prevent Unicode spoofing. - Verifies that the hostname matches a safe DNS pattern. * Trusted Origins Check: Allows certain hostnames explicitly if needed. * IP Resolution and Blocking: - Resolves the hostname to its IP addresses. - Checks against a list of private/reserved IP networks to prevent SSRF to internal services. ### Checklist 📋 #### For code changes: - [ ] I have clearly listed my changes in the PR description - [ ] I have made a test plan - [ ] I have tested my changes according to the test plan: <!-- Put your test plan here: --> - [ ] ... <details> <summary>Example test plan</summary> - [ ] Create from scratch and execute an agent with at least 3 blocks - [ ] Import an agent from file upload, and confirm it executes correctly - [ ] Upload agent to marketplace - [ ] Import an agent from marketplace and confirm it executes correctly - [ ] Edit an agent from monitor, and confirm it executes correctly </details> #### For configuration changes: - [ ] `.env.example` is updated or already compatible with my changes - [ ] `docker-compose.yml` is updated or already compatible with my changes - [ ] I have included a list of my configuration changes in the PR description (under **Changes**) <details> <summary>Examples of configuration changes</summary> - Changing ports - Adding new services that need to communicate with each other - Secrets or environment variable changes - New or infrastructure changes such as databases </details>
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
import ipaddress
|
||||
import re
|
||||
import socket
|
||||
from typing import Callable
|
||||
from urllib.parse import urlparse
|
||||
from urllib.parse import urlparse, urlunparse
|
||||
|
||||
import idna
|
||||
import requests as req
|
||||
|
||||
from backend.util.settings import Config
|
||||
@@ -21,8 +23,23 @@ BLOCKED_IP_NETWORKS = [
|
||||
# --8<-- [end:BLOCKED_IP_NETWORKS]
|
||||
]
|
||||
|
||||
ALLOWED_SCHEMES = ["http", "https"]
|
||||
HOSTNAME_REGEX = re.compile(r"^[A-Za-z0-9.-]+$") # Basic DNS-safe hostname pattern
|
||||
|
||||
def is_ip_blocked(ip: str) -> bool:
|
||||
|
||||
def _canonicalize_url(url: str) -> str:
|
||||
# Strip spaces and trailing slashes
|
||||
url = url.strip().strip("/")
|
||||
# Ensure the URL starts with http:// or https://
|
||||
if not url.startswith(("http://", "https://")):
|
||||
url = "http://" + url
|
||||
|
||||
# Replace backslashes with forward slashes to avoid parsing ambiguities
|
||||
url = url.replace("\\", "/")
|
||||
return url
|
||||
|
||||
|
||||
def _is_ip_blocked(ip: str) -> bool:
|
||||
"""
|
||||
Checks if the IP address is in a blocked network.
|
||||
"""
|
||||
@@ -35,29 +52,51 @@ def validate_url(url: str, trusted_origins: list[str]) -> str:
|
||||
Validates the URL to prevent SSRF attacks by ensuring it does not point to a private
|
||||
or untrusted IP address, unless whitelisted.
|
||||
"""
|
||||
url = url.strip().strip("/")
|
||||
if not url.startswith(("http://", "https://")):
|
||||
url = "http://" + url
|
||||
url = _canonicalize_url(url)
|
||||
parsed = urlparse(url)
|
||||
|
||||
parsed_url = urlparse(url)
|
||||
hostname = parsed_url.hostname
|
||||
# Check scheme
|
||||
if parsed.scheme not in ALLOWED_SCHEMES:
|
||||
raise ValueError(
|
||||
f"Scheme '{parsed.scheme}' is not allowed. Only HTTP/HTTPS are supported."
|
||||
)
|
||||
|
||||
if not hostname:
|
||||
raise ValueError(f"Invalid URL: Unable to determine hostname from {url}")
|
||||
# Validate and IDNA encode the hostname
|
||||
if not parsed.hostname:
|
||||
raise ValueError("Invalid URL: No hostname found.")
|
||||
|
||||
if any(hostname == origin for origin in trusted_origins):
|
||||
# IDNA encode to prevent Unicode domain attacks
|
||||
try:
|
||||
ascii_hostname = idna.encode(parsed.hostname).decode("ascii")
|
||||
except idna.IDNAError:
|
||||
raise ValueError("Invalid hostname with unsupported characters.")
|
||||
|
||||
# Check hostname characters
|
||||
if not HOSTNAME_REGEX.match(ascii_hostname):
|
||||
raise ValueError("Hostname contains invalid characters.")
|
||||
|
||||
# Rebuild the URL with the normalized, IDNA-encoded hostname
|
||||
parsed = parsed._replace(netloc=ascii_hostname)
|
||||
url = str(urlunparse(parsed))
|
||||
|
||||
# Check if hostname is a trusted origin (exact match)
|
||||
if ascii_hostname in trusted_origins:
|
||||
return url
|
||||
|
||||
# Resolve all IP addresses for the hostname
|
||||
ip_addresses = {result[4][0] for result in socket.getaddrinfo(hostname, None)}
|
||||
if not ip_addresses:
|
||||
raise ValueError(f"Unable to resolve IP address for {hostname}")
|
||||
try:
|
||||
ip_addresses = {res[4][0] for res in socket.getaddrinfo(ascii_hostname, None)}
|
||||
except socket.gaierror:
|
||||
raise ValueError(f"Unable to resolve IP address for hostname {ascii_hostname}")
|
||||
|
||||
# Check if all IP addresses are global
|
||||
if not ip_addresses:
|
||||
raise ValueError(f"No IP addresses found for {ascii_hostname}")
|
||||
|
||||
# Check if any resolved IP address falls into blocked ranges
|
||||
for ip in ip_addresses:
|
||||
if is_ip_blocked(ip):
|
||||
if _is_ip_blocked(ip):
|
||||
raise ValueError(
|
||||
f"Access to private IP address at {hostname}: {ip} is not allowed."
|
||||
f"Access to private IP address {ip} for hostname {ascii_hostname} is not allowed."
|
||||
)
|
||||
|
||||
return url
|
||||
|
||||
@@ -4,6 +4,7 @@ from backend.util.request import validate_url
|
||||
|
||||
|
||||
def test_validate_url():
|
||||
# Rejected IP ranges
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("localhost", [])
|
||||
|
||||
@@ -16,6 +17,63 @@ def test_validate_url():
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("0.0.0.0", [])
|
||||
|
||||
validate_url("google.com", [])
|
||||
validate_url("github.com", [])
|
||||
validate_url("http://github.com", [])
|
||||
# Normal URLs
|
||||
assert validate_url("google.com/a?b=c", []) == "http://google.com/a?b=c"
|
||||
assert validate_url("github.com?key=!@!@", []) == "http://github.com?key=!@!@"
|
||||
|
||||
# Scheme Enforcement
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("ftp://example.com", [])
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("file://example.com", [])
|
||||
|
||||
# International domain that converts to punycode - should be allowed if public
|
||||
assert validate_url("http://xn--exmple-cua.com", []) == "http://xn--exmple-cua.com"
|
||||
# If the domain fails IDNA encoding or is invalid, it should raise an error
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("http://exa◌mple.com", [])
|
||||
|
||||
# IPv6 Addresses
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("::1", []) # IPv6 loopback should be blocked
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("http://[::1]", []) # IPv6 loopback in URL form
|
||||
|
||||
# Suspicious Characters in Hostname
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("http://example_underscore.com", [])
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("http://exa mple.com", []) # Space in hostname
|
||||
|
||||
# Malformed URLs
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("http://", []) # No hostname
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("://missing-scheme", []) # Missing proper scheme
|
||||
|
||||
# Trusted Origins
|
||||
trusted = ["internal-api.company.com", "10.0.0.5"]
|
||||
assert (
|
||||
validate_url("internal-api.company.com", trusted)
|
||||
== "http://internal-api.company.com"
|
||||
)
|
||||
assert validate_url("10.0.0.5", ["10.0.0.5"]) == "http://10.0.0.5"
|
||||
|
||||
# Special Characters in Path or Query
|
||||
assert (
|
||||
validate_url("example.com/path%20with%20spaces", [])
|
||||
== "http://example.com/path%20with%20spaces"
|
||||
)
|
||||
|
||||
# Backslashes should be replaced with forward slashes
|
||||
assert (
|
||||
validate_url("http://example.com\\backslash", [])
|
||||
== "http://example.com/backslash"
|
||||
)
|
||||
|
||||
# Check defaulting scheme behavior for valid domains
|
||||
assert validate_url("example.com", []) == "http://example.com"
|
||||
assert validate_url("https://secure.com", []) == "https://secure.com"
|
||||
|
||||
# Non-ASCII Characters in Query/Fragment
|
||||
assert validate_url("example.com?param=äöü", []) == "http://example.com?param=äöü"
|
||||
|
||||
Reference in New Issue
Block a user