mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
Reduce blocklist ip range
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import json
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import SchemaField
|
||||
@@ -30,9 +31,14 @@ class SendWebRequestBlock(Block):
|
||||
description="The headers to include in the request",
|
||||
default={},
|
||||
)
|
||||
body: object = SchemaField(
|
||||
json_format: bool = SchemaField(
|
||||
title="JSON format",
|
||||
description="Whether to send and receive body as JSON",
|
||||
default=True,
|
||||
)
|
||||
body: Any = SchemaField(
|
||||
description="The body of the request",
|
||||
default={},
|
||||
default=None,
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
@@ -57,14 +63,16 @@ class SendWebRequestBlock(Block):
|
||||
input_data.method.value,
|
||||
input_data.url,
|
||||
headers=input_data.headers,
|
||||
json=input_data.body,
|
||||
allow_redirects=False,
|
||||
json=input_data.body if input_data.json_format else None,
|
||||
data=input_data.body if not input_data.json_format else None,
|
||||
)
|
||||
result = response.json() if input_data.json_format else response.text
|
||||
|
||||
if response.status_code // 100 == 2:
|
||||
yield "response", response.json()
|
||||
yield "response", result
|
||||
elif response.status_code // 100 == 4:
|
||||
yield "client_error", response.json()
|
||||
yield "client_error", result
|
||||
elif response.status_code // 100 == 5:
|
||||
yield "server_error", response.json()
|
||||
yield "server_error", result
|
||||
else:
|
||||
raise ValueError(f"Unexpected status code: {response.status_code}")
|
||||
|
||||
@@ -7,13 +7,25 @@ import requests as req
|
||||
|
||||
from backend.util.settings import Config
|
||||
|
||||
# List of IP networks to block
|
||||
BLOCKED_IP_NETWORKS = [
|
||||
ipaddress.ip_network("0.0.0.0/8"), # "This" Network
|
||||
ipaddress.ip_network("10.0.0.0/8"), # Private-Use
|
||||
ipaddress.ip_network("127.0.0.0/8"), # Loopback
|
||||
ipaddress.ip_network("169.254.0.0/16"), # Link Local
|
||||
ipaddress.ip_network("172.16.0.0/12"), # Private-Use
|
||||
ipaddress.ip_network("192.168.0.0/16"), # Private-Use
|
||||
ipaddress.ip_network("224.0.0.0/4"), # Multicast
|
||||
ipaddress.ip_network("240.0.0.0/4"), # Reserved for Future Use
|
||||
]
|
||||
|
||||
def is_ip_allowed(ip: str) -> bool:
|
||||
|
||||
def is_ip_blocked(ip: str) -> bool:
|
||||
"""
|
||||
Checks if the IP address is allowed (i.e., it's a global IP address).
|
||||
Checks if the IP address is in a blocked network.
|
||||
"""
|
||||
ip_addr = ipaddress.ip_address(ip)
|
||||
return ip_addr.is_global
|
||||
return any(ip_addr in network for network in BLOCKED_IP_NETWORKS)
|
||||
|
||||
|
||||
def validate_url(url: str, trusted_origins: list[str]) -> str:
|
||||
@@ -21,26 +33,28 @@ def validate_url(url: str, trusted_origins: list[str]) -> str:
|
||||
Validates the URL to prevent SSRF attacks by ensuring it does not point to a private
|
||||
or untrusted IP address, unless whitelisted.
|
||||
"""
|
||||
if any(url.startswith(origin) for origin in trusted_origins):
|
||||
return url
|
||||
|
||||
parsed_url = urlparse(url)
|
||||
hostname = parsed_url.hostname
|
||||
|
||||
if not hostname:
|
||||
raise ValueError(f"Invalid URL: Unable to determine hostname from {url}")
|
||||
|
||||
try:
|
||||
# Resolve all IP addresses for the hostname
|
||||
ip_addresses = {result[4][0] for result in socket.getaddrinfo(hostname, None)}
|
||||
# Check if all IP addresses are global
|
||||
if all(is_ip_allowed(ip) for ip in ip_addresses):
|
||||
return url
|
||||
else:
|
||||
if any(hostname == origin for origin in trusted_origins):
|
||||
return url
|
||||
|
||||
# Resolve all IP addresses for the hostname
|
||||
ip_addresses = {result[4][0] for result in socket.getaddrinfo(hostname, None)}
|
||||
if not ip_addresses:
|
||||
raise ValueError(f"Unable to resolve IP address for {hostname}")
|
||||
|
||||
# Check if all IP addresses are global
|
||||
for ip in ip_addresses:
|
||||
if is_ip_blocked(ip):
|
||||
raise ValueError(
|
||||
f"Access to private or untrusted IP address at {hostname} is not allowed."
|
||||
f"Access to private IP address at {hostname}: {ip} is not allowed."
|
||||
)
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid or unresolvable URL: {url}") from e
|
||||
|
||||
return url
|
||||
|
||||
|
||||
class Requests:
|
||||
@@ -51,18 +65,24 @@ class Requests:
|
||||
def __init__(
|
||||
self,
|
||||
trusted_origins: list[str] | None = None,
|
||||
allow_redirects: bool = False,
|
||||
raise_for_status: bool = True,
|
||||
extra_url_validator: Callable[[str], str] | None = None,
|
||||
extra_headers: dict[str, str] | None = None,
|
||||
):
|
||||
self.trusted_origins = trusted_origins or []
|
||||
self.allow_redirects = allow_redirects
|
||||
self.trusted_origins = []
|
||||
for url in trusted_origins or []:
|
||||
hostname = urlparse(url).hostname
|
||||
if not hostname:
|
||||
raise ValueError(f"Invalid URL: Unable to determine hostname of {url}")
|
||||
self.trusted_origins.append(hostname)
|
||||
|
||||
self.raise_for_status = raise_for_status
|
||||
self.extra_url_validator = extra_url_validator
|
||||
self.extra_headers = extra_headers
|
||||
|
||||
def request(self, method, url, headers=None, *args, **kwargs) -> req.Response:
|
||||
def request(
|
||||
self, method, url, headers=None, allow_redirects=False, *args, **kwargs
|
||||
) -> req.Response:
|
||||
if self.extra_headers is not None:
|
||||
headers = {**(headers or {}), **self.extra_headers}
|
||||
|
||||
@@ -74,7 +94,7 @@ class Requests:
|
||||
method,
|
||||
url,
|
||||
headers=headers,
|
||||
allow_redirects=self.allow_redirects,
|
||||
allow_redirects=allow_redirects,
|
||||
*args,
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
@@ -751,7 +751,7 @@ const NodeBooleanInput: FC<{
|
||||
<div className={className}>
|
||||
<div className="nodrag flex items-center">
|
||||
<Switch
|
||||
checked={value}
|
||||
defaultChecked={value}
|
||||
onCheckedChange={(v) => handleInputChange(selfKey, v)}
|
||||
/>
|
||||
<span className="ml-3">{displayName}</span>
|
||||
|
||||
Reference in New Issue
Block a user