Compare commits

...

18 Commits

Author SHA1 Message Date
Zamil Majdy
86c3f50506 Remove replicated code 2024-11-08 07:19:56 +07:00
Zamil Majdy
8729ca9aab Merge branch 'master' of github.com:jackfromeast/AutoGPT into contributor/master 2024-11-08 07:18:20 +07:00
Zamil Majdy
6747c056f0 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into contributor/master 2024-11-08 07:17:51 +07:00
Zamil Majdy
6e28227c42 Merge branch 'dev' into master 2024-11-08 05:57:36 +07:00
Zamil Majdy
13285ea15c Address comments 2024-11-08 05:54:22 +07:00
Zamil Majdy
5fe1366825 Revert invalid config 2024-11-06 15:51:48 +07:00
Zamil Majdy
84a54af9dc Reduce blocklist ip range 2024-11-06 15:42:08 +07:00
Zamil Majdy
31675c9fb7 Cleanup 2024-11-06 11:41:42 +07:00
Zamil Majdy
4908f4633d Apply secure request to all blocks 2024-11-06 11:33:14 +07:00
Zamil Majdy
6ac4132e64 Refactored github validation code 2024-11-06 10:43:21 +07:00
Zamil Majdy
da5bf6755d Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into HEAD 2024-11-06 10:15:03 +07:00
Zamil Majdy
a52f7a1efb Re-format 2024-11-04 21:44:29 +07:00
Nicholas Tindle
23d1c0010c Merge branch 'dev' into master 2024-11-04 06:48:38 -06:00
jackfromeast
51f9336e9e [URL Validation] Add the URL validation on http and search blocks to avoid SSRF attack 2024-11-03 15:03:58 -05:00
jackfromeast
22b4e8b8c0 [Github] Update the URL validation for all the github-related blocks 2024-11-03 12:03:00 -05:00
jackfromeast
038041f467 [Github] Add domain check for the input url 2024-11-02 23:45:09 -04:00
SwiftyOS
952f6f58ef docs(platform): correct readme 2024-11-01 09:13:55 +00:00
Swifty
151fad5ced docs(platform): Update frontend instructions (#8514)
update readme
2024-10-31 13:41:54 +00:00
20 changed files with 362 additions and 406 deletions

View File

@@ -3,12 +3,12 @@ import time
from enum import Enum from enum import Enum
from typing import Literal from typing import Literal
import requests
from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials
from pydantic import SecretStr from pydantic import SecretStr
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import CredentialsField, CredentialsMetaInput, SchemaField from backend.data.model import CredentialsField, CredentialsMetaInput, SchemaField
from backend.util.request import requests
TEST_CREDENTIALS = APIKeyCredentials( TEST_CREDENTIALS = APIKeyCredentials(
id="01234567-89ab-cdef-0123-456789abcdef", id="01234567-89ab-cdef-0123-456789abcdef",
@@ -217,7 +217,6 @@ class AIShortformVideoCreatorBlock(Block):
url = "https://webhook.site/token" url = "https://webhook.site/token"
headers = {"Accept": "application/json", "Content-Type": "application/json"} headers = {"Accept": "application/json", "Content-Type": "application/json"}
response = requests.post(url, headers=headers) response = requests.post(url, headers=headers)
response.raise_for_status()
webhook_data = response.json() webhook_data = response.json()
return webhook_data["uuid"], f"https://webhook.site/{webhook_data['uuid']}" return webhook_data["uuid"], f"https://webhook.site/{webhook_data['uuid']}"
@@ -228,14 +227,12 @@ class AIShortformVideoCreatorBlock(Block):
logger.debug( logger.debug(
f"API Response Status Code: {response.status_code}, Content: {response.text}" f"API Response Status Code: {response.status_code}, Content: {response.text}"
) )
response.raise_for_status()
return response.json() return response.json()
def check_video_status(self, api_key: SecretStr, pid: str) -> dict: def check_video_status(self, api_key: SecretStr, pid: str) -> dict:
url = f"https://www.revid.ai/api/public/v2/status?pid={pid}" url = f"https://www.revid.ai/api/public/v2/status?pid={pid}"
headers = {"key": api_key.get_secret_value()} headers = {"key": api_key.get_secret_value()}
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json() return response.json()
def wait_for_video( def wait_for_video(

View File

@@ -0,0 +1,43 @@
from urllib.parse import urlparse
from backend.blocks.github._auth import GithubCredentials
from backend.util.request import Requests
def _convert_to_api_url(url: str) -> str:
"""
Converts a standard GitHub URL to the corresponding GitHub API URL.
Handles repository URLs, issue URLs, pull request URLs, and more.
"""
parsed_url = urlparse(url)
path_parts = parsed_url.path.strip("/").split("/")
if len(path_parts) >= 2:
owner, repo = path_parts[0], path_parts[1]
api_base = f"https://api.github.com/repos/{owner}/{repo}"
if len(path_parts) > 2:
additional_path = "/".join(path_parts[2:])
api_url = f"{api_base}/{additional_path}"
else:
# Repository base URL
api_url = api_base
else:
raise ValueError("Invalid GitHub URL format.")
return api_url
def _get_headers(credentials: GithubCredentials) -> dict[str, str]:
return {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
def get_api(credentials: GithubCredentials) -> Requests:
return Requests(
trusted_origins=["https://api.github.com", "https://github.com"],
extra_url_validator=_convert_to_api_url,
extra_headers=_get_headers(credentials),
)

View File

@@ -1,9 +1,11 @@
import requests from urllib.parse import urlparse
from typing_extensions import TypedDict from typing_extensions import TypedDict
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField from backend.data.model import SchemaField
from ._api import get_api
from ._auth import ( from ._auth import (
TEST_CREDENTIALS, TEST_CREDENTIALS,
TEST_CREDENTIALS_INPUT, TEST_CREDENTIALS_INPUT,
@@ -13,6 +15,10 @@ from ._auth import (
) )
def is_github_url(url: str) -> bool:
return urlparse(url).netloc == "github.com"
# --8<-- [start:GithubCommentBlockExample] # --8<-- [start:GithubCommentBlockExample]
class GithubCommentBlock(Block): class GithubCommentBlock(Block):
class Input(BlockSchema): class Input(BlockSchema):
@@ -62,27 +68,10 @@ class GithubCommentBlock(Block):
def post_comment( def post_comment(
credentials: GithubCredentials, issue_url: str, body_text: str credentials: GithubCredentials, issue_url: str, body_text: str
) -> tuple[int, str]: ) -> tuple[int, str]:
if "/pull/" in issue_url: api = get_api(credentials)
api_url = (
issue_url.replace("github.com", "api.github.com/repos").replace(
"/pull/", "/issues/"
)
+ "/comments"
)
else:
api_url = (
issue_url.replace("github.com", "api.github.com/repos") + "/comments"
)
headers = {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
data = {"body": body_text} data = {"body": body_text}
comments_url = issue_url + "/comments"
response = requests.post(api_url, headers=headers, json=data) response = api.post(comments_url, json=data)
response.raise_for_status()
comment = response.json() comment = response.json()
return comment["id"], comment["html_url"] return comment["id"], comment["html_url"]
@@ -156,16 +145,10 @@ class GithubMakeIssueBlock(Block):
def create_issue( def create_issue(
credentials: GithubCredentials, repo_url: str, title: str, body: str credentials: GithubCredentials, repo_url: str, title: str, body: str
) -> tuple[int, str]: ) -> tuple[int, str]:
api_url = repo_url.replace("github.com", "api.github.com/repos") + "/issues" api = get_api(credentials)
headers = {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
data = {"title": title, "body": body} data = {"title": title, "body": body}
issues_url = repo_url + "/issues"
response = requests.post(api_url, headers=headers, json=data) response = api.post(issues_url, json=data)
response.raise_for_status()
issue = response.json() issue = response.json()
return issue["number"], issue["html_url"] return issue["number"], issue["html_url"]
@@ -232,21 +215,12 @@ class GithubReadIssueBlock(Block):
def read_issue( def read_issue(
credentials: GithubCredentials, issue_url: str credentials: GithubCredentials, issue_url: str
) -> tuple[str, str, str]: ) -> tuple[str, str, str]:
api_url = issue_url.replace("github.com", "api.github.com/repos") api = get_api(credentials)
response = api.get(issue_url)
headers = {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(api_url, headers=headers)
response.raise_for_status()
data = response.json() data = response.json()
title = data.get("title", "No title found") title = data.get("title", "No title found")
body = data.get("body", "No body content found") body = data.get("body", "No body content found")
user = data.get("user", {}).get("login", "No user found") user = data.get("user", {}).get("login", "No user found")
return title, body, user return title, body, user
def run( def run(
@@ -318,20 +292,13 @@ class GithubListIssuesBlock(Block):
def list_issues( def list_issues(
credentials: GithubCredentials, repo_url: str credentials: GithubCredentials, repo_url: str
) -> list[Output.IssueItem]: ) -> list[Output.IssueItem]:
api_url = repo_url.replace("github.com", "api.github.com/repos") + "/issues" api = get_api(credentials)
headers = { issues_url = repo_url + "/issues"
"Authorization": credentials.bearer(), response = api.get(issues_url)
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(api_url, headers=headers)
response.raise_for_status()
data = response.json() data = response.json()
issues: list[GithubListIssuesBlock.Output.IssueItem] = [ issues: list[GithubListIssuesBlock.Output.IssueItem] = [
{"title": issue["title"], "url": issue["html_url"]} for issue in data {"title": issue["title"], "url": issue["html_url"]} for issue in data
] ]
return issues return issues
def run( def run(
@@ -385,28 +352,10 @@ class GithubAddLabelBlock(Block):
@staticmethod @staticmethod
def add_label(credentials: GithubCredentials, issue_url: str, label: str) -> str: def add_label(credentials: GithubCredentials, issue_url: str, label: str) -> str:
# Convert the provided GitHub URL to the API URL api = get_api(credentials)
if "/pull/" in issue_url:
api_url = (
issue_url.replace("github.com", "api.github.com/repos").replace(
"/pull/", "/issues/"
)
+ "/labels"
)
else:
api_url = (
issue_url.replace("github.com", "api.github.com/repos") + "/labels"
)
headers = {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
data = {"labels": [label]} data = {"labels": [label]}
labels_url = issue_url + "/labels"
response = requests.post(api_url, headers=headers, json=data) api.post(labels_url, json=data)
response.raise_for_status()
return "Label added successfully" return "Label added successfully"
def run( def run(
@@ -463,31 +412,9 @@ class GithubRemoveLabelBlock(Block):
@staticmethod @staticmethod
def remove_label(credentials: GithubCredentials, issue_url: str, label: str) -> str: def remove_label(credentials: GithubCredentials, issue_url: str, label: str) -> str:
# Convert the provided GitHub URL to the API URL api = get_api(credentials)
if "/pull/" in issue_url: label_url = issue_url + f"/labels/{label}"
api_url = ( api.delete(label_url)
issue_url.replace("github.com", "api.github.com/repos").replace(
"/pull/", "/issues/"
)
+ f"/labels/{label}"
)
else:
api_url = (
issue_url.replace("github.com", "api.github.com/repos")
+ f"/labels/{label}"
)
# Log the constructed API URL for debugging
print(f"Constructed API URL: {api_url}")
headers = {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
response = requests.delete(api_url, headers=headers)
response.raise_for_status()
return "Label removed successfully" return "Label removed successfully"
def run( def run(
@@ -550,23 +477,10 @@ class GithubAssignIssueBlock(Block):
issue_url: str, issue_url: str,
assignee: str, assignee: str,
) -> str: ) -> str:
# Extracting repo path and issue number from the issue URL api = get_api(credentials)
repo_path, issue_number = issue_url.replace("https://github.com/", "").split( assignees_url = issue_url + "/assignees"
"/issues/"
)
api_url = (
f"https://api.github.com/repos/{repo_path}/issues/{issue_number}/assignees"
)
headers = {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
data = {"assignees": [assignee]} data = {"assignees": [assignee]}
api.post(assignees_url, json=data)
response = requests.post(api_url, headers=headers, json=data)
response.raise_for_status()
return "Issue assigned successfully" return "Issue assigned successfully"
def run( def run(
@@ -629,23 +543,10 @@ class GithubUnassignIssueBlock(Block):
issue_url: str, issue_url: str,
assignee: str, assignee: str,
) -> str: ) -> str:
# Extracting repo path and issue number from the issue URL api = get_api(credentials)
repo_path, issue_number = issue_url.replace("https://github.com/", "").split( assignees_url = issue_url + "/assignees"
"/issues/"
)
api_url = (
f"https://api.github.com/repos/{repo_path}/issues/{issue_number}/assignees"
)
headers = {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
data = {"assignees": [assignee]} data = {"assignees": [assignee]}
api.delete(assignees_url, json=data)
response = requests.delete(api_url, headers=headers, json=data)
response.raise_for_status()
return "Issue unassigned successfully" return "Issue unassigned successfully"
def run( def run(

View File

@@ -1,9 +1,9 @@
import requests
from typing_extensions import TypedDict from typing_extensions import TypedDict
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField from backend.data.model import SchemaField
from ._api import get_api
from ._auth import ( from ._auth import (
TEST_CREDENTIALS, TEST_CREDENTIALS,
TEST_CREDENTIALS_INPUT, TEST_CREDENTIALS_INPUT,
@@ -64,20 +64,13 @@ class GithubListPullRequestsBlock(Block):
@staticmethod @staticmethod
def list_prs(credentials: GithubCredentials, repo_url: str) -> list[Output.PRItem]: def list_prs(credentials: GithubCredentials, repo_url: str) -> list[Output.PRItem]:
api_url = repo_url.replace("github.com", "api.github.com/repos") + "/pulls" api = get_api(credentials)
headers = { pulls_url = repo_url + "/pulls"
"Authorization": credentials.bearer(), response = api.get(pulls_url)
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(api_url, headers=headers)
response.raise_for_status()
data = response.json() data = response.json()
pull_requests: list[GithubListPullRequestsBlock.Output.PRItem] = [ pull_requests: list[GithubListPullRequestsBlock.Output.PRItem] = [
{"title": pr["title"], "url": pr["html_url"]} for pr in data {"title": pr["title"], "url": pr["html_url"]} for pr in data
] ]
return pull_requests return pull_requests
def run( def run(
@@ -110,7 +103,11 @@ class GithubMakePullRequestBlock(Block):
placeholder="Enter the pull request body", placeholder="Enter the pull request body",
) )
head: str = SchemaField( head: str = SchemaField(
description="The name of the branch where your changes are implemented. For cross-repository pull requests in the same network, namespace head with a user like this: username:branch.", description=(
"The name of the branch where your changes are implemented. "
"For cross-repository pull requests in the same network, "
"namespace head with a user like this: username:branch."
),
placeholder="Enter the head branch", placeholder="Enter the head branch",
) )
base: str = SchemaField( base: str = SchemaField(
@@ -162,17 +159,10 @@ class GithubMakePullRequestBlock(Block):
head: str, head: str,
base: str, base: str,
) -> tuple[int, str]: ) -> tuple[int, str]:
repo_path = repo_url.replace("https://github.com/", "") api = get_api(credentials)
api_url = f"https://api.github.com/repos/{repo_path}/pulls" pulls_url = repo_url + "/pulls"
headers = {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
data = {"title": title, "body": body, "head": head, "base": base} data = {"title": title, "body": body, "head": head, "base": base}
response = api.post(pulls_url, json=data)
response = requests.post(api_url, headers=headers, json=data)
response.raise_for_status()
pr_data = response.json() pr_data = response.json()
return pr_data["number"], pr_data["html_url"] return pr_data["number"], pr_data["html_url"]
@@ -194,13 +184,8 @@ class GithubMakePullRequestBlock(Block):
) )
yield "number", number yield "number", number
yield "url", url yield "url", url
except requests.exceptions.HTTPError as http_err: except Exception as e:
if http_err.response.status_code == 422: yield "error", str(e)
error_details = http_err.response.json()
error_message = error_details.get("message", "Unknown error")
else:
error_message = str(http_err)
raise RuntimeError(f"Failed to create pull request: {error_message}")
class GithubReadPullRequestBlock(Block): class GithubReadPullRequestBlock(Block):
@@ -255,42 +240,21 @@ class GithubReadPullRequestBlock(Block):
@staticmethod @staticmethod
def read_pr(credentials: GithubCredentials, pr_url: str) -> tuple[str, str, str]: def read_pr(credentials: GithubCredentials, pr_url: str) -> tuple[str, str, str]:
api_url = pr_url.replace("github.com", "api.github.com/repos").replace( api = get_api(credentials)
"/pull/", "/issues/" # Adjust the URL to access the issue endpoint for PR metadata
) issue_url = pr_url.replace("/pull/", "/issues/")
response = api.get(issue_url)
headers = {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(api_url, headers=headers)
response.raise_for_status()
data = response.json() data = response.json()
title = data.get("title", "No title found") title = data.get("title", "No title found")
body = data.get("body", "No body content found") body = data.get("body", "No body content found")
author = data.get("user", {}).get("login", "No user found") author = data.get("user", {}).get("login", "No user found")
return title, body, author return title, body, author
@staticmethod @staticmethod
def read_pr_changes(credentials: GithubCredentials, pr_url: str) -> str: def read_pr_changes(credentials: GithubCredentials, pr_url: str) -> str:
api_url = ( api = get_api(credentials)
pr_url.replace("github.com", "api.github.com/repos").replace( files_url = pr_url + "/files"
"/pull/", "/pulls/" response = api.get(files_url)
)
+ "/files"
)
headers = {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(api_url, headers=headers)
response.raise_for_status()
files = response.json() files = response.json()
changes = [] changes = []
for file in files: for file in files:
@@ -298,7 +262,6 @@ class GithubReadPullRequestBlock(Block):
patch = file.get("patch") patch = file.get("patch")
if filename and patch: if filename and patch:
changes.append(f"File: {filename}\n{patch}") changes.append(f"File: {filename}\n{patch}")
return "\n\n".join(changes) return "\n\n".join(changes)
def run( def run(
@@ -367,23 +330,10 @@ class GithubAssignPRReviewerBlock(Block):
def assign_reviewer( def assign_reviewer(
credentials: GithubCredentials, pr_url: str, reviewer: str credentials: GithubCredentials, pr_url: str, reviewer: str
) -> str: ) -> str:
# Convert the PR URL to the appropriate API endpoint api = get_api(credentials)
api_url = ( reviewers_url = pr_url + "/requested_reviewers"
pr_url.replace("github.com", "api.github.com/repos").replace(
"/pull/", "/pulls/"
)
+ "/requested_reviewers"
)
headers = {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
data = {"reviewers": [reviewer]} data = {"reviewers": [reviewer]}
api.post(reviewers_url, json=data)
response = requests.post(api_url, headers=headers, json=data)
response.raise_for_status()
return "Reviewer assigned successfully" return "Reviewer assigned successfully"
def run( def run(
@@ -400,17 +350,8 @@ class GithubAssignPRReviewerBlock(Block):
input_data.reviewer, input_data.reviewer,
) )
yield "status", status yield "status", status
except requests.exceptions.HTTPError as http_err: except Exception as e:
if http_err.response.status_code == 422: yield "error", str(e)
error_msg = (
"Failed to assign reviewer: "
f"The reviewer '{input_data.reviewer}' may not have permission "
"or the pull request is not in a valid state. "
f"Detailed error: {http_err.response.text}"
)
else:
error_msg = f"HTTP error: {http_err} - {http_err.response.text}"
raise RuntimeError(error_msg)
class GithubUnassignPRReviewerBlock(Block): class GithubUnassignPRReviewerBlock(Block):
@@ -456,21 +397,10 @@ class GithubUnassignPRReviewerBlock(Block):
def unassign_reviewer( def unassign_reviewer(
credentials: GithubCredentials, pr_url: str, reviewer: str credentials: GithubCredentials, pr_url: str, reviewer: str
) -> str: ) -> str:
api_url = ( api = get_api(credentials)
pr_url.replace("github.com", "api.github.com/repos").replace( reviewers_url = pr_url + "/requested_reviewers"
"/pull/", "/pulls/"
)
+ "/requested_reviewers"
)
headers = {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
data = {"reviewers": [reviewer]} data = {"reviewers": [reviewer]}
api.delete(reviewers_url, json=data)
response = requests.delete(api_url, headers=headers, json=data)
response.raise_for_status()
return "Reviewer unassigned successfully" return "Reviewer unassigned successfully"
def run( def run(
@@ -480,12 +410,15 @@ class GithubUnassignPRReviewerBlock(Block):
credentials: GithubCredentials, credentials: GithubCredentials,
**kwargs, **kwargs,
) -> BlockOutput: ) -> BlockOutput:
status = self.unassign_reviewer( try:
credentials, status = self.unassign_reviewer(
input_data.pr_url, credentials,
input_data.reviewer, input_data.pr_url,
) input_data.reviewer,
yield "status", status )
yield "status", status
except Exception as e:
yield "error", str(e)
class GithubListPRReviewersBlock(Block): class GithubListPRReviewersBlock(Block):
@@ -544,26 +477,14 @@ class GithubListPRReviewersBlock(Block):
def list_reviewers( def list_reviewers(
credentials: GithubCredentials, pr_url: str credentials: GithubCredentials, pr_url: str
) -> list[Output.ReviewerItem]: ) -> list[Output.ReviewerItem]:
api_url = ( api = get_api(credentials)
pr_url.replace("github.com", "api.github.com/repos").replace( reviewers_url = pr_url + "/requested_reviewers"
"/pull/", "/pulls/" response = api.get(reviewers_url)
)
+ "/requested_reviewers"
)
headers = {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(api_url, headers=headers)
response.raise_for_status()
data = response.json() data = response.json()
reviewers: list[GithubListPRReviewersBlock.Output.ReviewerItem] = [ reviewers: list[GithubListPRReviewersBlock.Output.ReviewerItem] = [
{"username": reviewer["login"], "url": reviewer["html_url"]} {"username": reviewer["login"], "url": reviewer["html_url"]}
for reviewer in data.get("users", []) for reviewer in data.get("users", [])
] ]
return reviewers return reviewers
def run( def run(

View File

@@ -1,11 +1,11 @@
import base64 import base64
import requests
from typing_extensions import TypedDict from typing_extensions import TypedDict
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField from backend.data.model import SchemaField
from ._api import get_api
from ._auth import ( from ._auth import (
TEST_CREDENTIALS, TEST_CREDENTIALS,
TEST_CREDENTIALS_INPUT, TEST_CREDENTIALS_INPUT,
@@ -68,17 +68,11 @@ class GithubListTagsBlock(Block):
def list_tags( def list_tags(
credentials: GithubCredentials, repo_url: str credentials: GithubCredentials, repo_url: str
) -> list[Output.TagItem]: ) -> list[Output.TagItem]:
repo_path = repo_url.replace("https://github.com/", "") api = get_api(credentials)
api_url = f"https://api.github.com/repos/{repo_path}/tags" tags_url = repo_url + "/tags"
headers = { response = api.get(tags_url)
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(api_url, headers=headers)
response.raise_for_status()
data = response.json() data = response.json()
repo_path = repo_url.replace("https://github.com/", "")
tags: list[GithubListTagsBlock.Output.TagItem] = [ tags: list[GithubListTagsBlock.Output.TagItem] = [
{ {
"name": tag["name"], "name": tag["name"],
@@ -86,7 +80,6 @@ class GithubListTagsBlock(Block):
} }
for tag in data for tag in data
] ]
return tags return tags
def run( def run(
@@ -157,20 +150,18 @@ class GithubListBranchesBlock(Block):
def list_branches( def list_branches(
credentials: GithubCredentials, repo_url: str credentials: GithubCredentials, repo_url: str
) -> list[Output.BranchItem]: ) -> list[Output.BranchItem]:
api_url = repo_url.replace("github.com", "api.github.com/repos") + "/branches" api = get_api(credentials)
headers = { branches_url = repo_url + "/branches"
"Authorization": credentials.bearer(), response = api.get(branches_url)
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(api_url, headers=headers)
response.raise_for_status()
data = response.json() data = response.json()
repo_path = repo_url.replace("https://github.com/", "")
branches: list[GithubListBranchesBlock.Output.BranchItem] = [ branches: list[GithubListBranchesBlock.Output.BranchItem] = [
{"name": branch["name"], "url": branch["commit"]["url"]} for branch in data {
"name": branch["name"],
"url": f"https://github.com/{repo_path}/tree/{branch['name']}",
}
for branch in data
] ]
return branches return branches
def run( def run(
@@ -246,6 +237,8 @@ class GithubListDiscussionsBlock(Block):
def list_discussions( def list_discussions(
credentials: GithubCredentials, repo_url: str, num_discussions: int credentials: GithubCredentials, repo_url: str, num_discussions: int
) -> list[Output.DiscussionItem]: ) -> list[Output.DiscussionItem]:
api = get_api(credentials)
# GitHub GraphQL API endpoint is different; we'll use api.post with custom URL
repo_path = repo_url.replace("https://github.com/", "") repo_path = repo_url.replace("https://github.com/", "")
owner, repo = repo_path.split("/") owner, repo = repo_path.split("/")
query = """ query = """
@@ -261,24 +254,15 @@ class GithubListDiscussionsBlock(Block):
} }
""" """
variables = {"owner": owner, "repo": repo, "num": num_discussions} variables = {"owner": owner, "repo": repo, "num": num_discussions}
headers = { response = api.post(
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
response = requests.post(
"https://api.github.com/graphql", "https://api.github.com/graphql",
json={"query": query, "variables": variables}, json={"query": query, "variables": variables},
headers=headers,
) )
response.raise_for_status()
data = response.json() data = response.json()
discussions: list[GithubListDiscussionsBlock.Output.DiscussionItem] = [ discussions: list[GithubListDiscussionsBlock.Output.DiscussionItem] = [
{"title": discussion["title"], "url": discussion["url"]} {"title": discussion["title"], "url": discussion["url"]}
for discussion in data["data"]["repository"]["discussions"]["nodes"] for discussion in data["data"]["repository"]["discussions"]["nodes"]
] ]
return discussions return discussions
def run( def run(
@@ -348,21 +332,13 @@ class GithubListReleasesBlock(Block):
def list_releases( def list_releases(
credentials: GithubCredentials, repo_url: str credentials: GithubCredentials, repo_url: str
) -> list[Output.ReleaseItem]: ) -> list[Output.ReleaseItem]:
repo_path = repo_url.replace("https://github.com/", "") api = get_api(credentials)
api_url = f"https://api.github.com/repos/{repo_path}/releases" releases_url = repo_url + "/releases"
headers = { response = api.get(releases_url)
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(api_url, headers=headers)
response.raise_for_status()
data = response.json() data = response.json()
releases: list[GithubListReleasesBlock.Output.ReleaseItem] = [ releases: list[GithubListReleasesBlock.Output.ReleaseItem] = [
{"name": release["name"], "url": release["html_url"]} for release in data {"name": release["name"], "url": release["html_url"]} for release in data
] ]
return releases return releases
def run( def run(
@@ -432,16 +408,9 @@ class GithubReadFileBlock(Block):
def read_file( def read_file(
credentials: GithubCredentials, repo_url: str, file_path: str, branch: str credentials: GithubCredentials, repo_url: str, file_path: str, branch: str
) -> tuple[str, int]: ) -> tuple[str, int]:
repo_path = repo_url.replace("https://github.com/", "") api = get_api(credentials)
api_url = f"https://api.github.com/repos/{repo_path}/contents/{file_path}?ref={branch}" content_url = repo_url + f"/contents/{file_path}?ref={branch}"
headers = { response = api.get(content_url)
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(api_url, headers=headers)
response.raise_for_status()
content = response.json() content = response.json()
if isinstance(content, list): if isinstance(content, list):
@@ -549,46 +518,33 @@ class GithubReadFolderBlock(Block):
def read_folder( def read_folder(
credentials: GithubCredentials, repo_url: str, folder_path: str, branch: str credentials: GithubCredentials, repo_url: str, folder_path: str, branch: str
) -> tuple[list[Output.FileEntry], list[Output.DirEntry]]: ) -> tuple[list[Output.FileEntry], list[Output.DirEntry]]:
repo_path = repo_url.replace("https://github.com/", "") api = get_api(credentials)
api_url = f"https://api.github.com/repos/{repo_path}/contents/{folder_path}?ref={branch}" contents_url = repo_url + f"/contents/{folder_path}?ref={branch}"
headers = { response = api.get(contents_url)
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(api_url, headers=headers)
response.raise_for_status()
content = response.json() content = response.json()
if isinstance(content, list): if not isinstance(content, list):
# Multiple entries of different types exist at this path
if not (dir := next((d for d in content if d["type"] == "dir"), None)):
raise TypeError("Not a folder")
content = dir
if content["type"] != "dir":
raise TypeError("Not a folder") raise TypeError("Not a folder")
return ( files = [
[ GithubReadFolderBlock.Output.FileEntry(
GithubReadFolderBlock.Output.FileEntry( name=entry["name"],
name=entry["name"], path=entry["path"],
path=entry["path"], size=entry["size"],
size=entry["size"], )
) for entry in content
for entry in content["entries"] if entry["type"] == "file"
if entry["type"] == "file" ]
], dirs = [
[ GithubReadFolderBlock.Output.DirEntry(
GithubReadFolderBlock.Output.DirEntry( name=entry["name"],
name=entry["name"], path=entry["path"],
path=entry["path"], )
) for entry in content
for entry in content["entries"] if entry["type"] == "dir"
if entry["type"] == "dir" ]
],
) return files, dirs
def run( def run(
self, self,
@@ -656,26 +612,16 @@ class GithubMakeBranchBlock(Block):
new_branch: str, new_branch: str,
source_branch: str, source_branch: str,
) -> str: ) -> str:
repo_path = repo_url.replace("https://github.com/", "") api = get_api(credentials)
ref_api_url = ( # Get the SHA of the source branch
f"https://api.github.com/repos/{repo_path}/git/refs/heads/{source_branch}" ref_url = repo_url + f"/git/refs/heads/{source_branch}"
) response = api.get(ref_url)
headers = {
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
response = requests.get(ref_api_url, headers=headers)
response.raise_for_status()
sha = response.json()["object"]["sha"] sha = response.json()["object"]["sha"]
create_branch_api_url = f"https://api.github.com/repos/{repo_path}/git/refs" # Create the new branch
create_ref_url = repo_url + "/git/refs"
data = {"ref": f"refs/heads/{new_branch}", "sha": sha} data = {"ref": f"refs/heads/{new_branch}", "sha": sha}
response = api.post(create_ref_url, json=data)
response = requests.post(create_branch_api_url, headers=headers, json=data)
response.raise_for_status()
return "Branch created successfully" return "Branch created successfully"
def run( def run(
@@ -735,16 +681,9 @@ class GithubDeleteBranchBlock(Block):
def delete_branch( def delete_branch(
credentials: GithubCredentials, repo_url: str, branch: str credentials: GithubCredentials, repo_url: str, branch: str
) -> str: ) -> str:
repo_path = repo_url.replace("https://github.com/", "") api = get_api(credentials)
api_url = f"https://api.github.com/repos/{repo_path}/git/refs/heads/{branch}" ref_url = repo_url + f"/git/refs/heads/{branch}"
headers = { api.delete(ref_url)
"Authorization": credentials.bearer(),
"Accept": "application/vnd.github.v3+json",
}
response = requests.delete(api_url, headers=headers)
response.raise_for_status()
return "Branch deleted successfully" return "Branch deleted successfully"
def run( def run(

View File

@@ -1,6 +1,6 @@
from typing import Any, Optional from typing import Any, Optional
import requests from backend.util.request import requests
class GetRequest: class GetRequest:
@@ -11,5 +11,4 @@ class GetRequest:
if headers is None: if headers is None:
headers = {} headers = {}
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json() if json else response.text return response.json() if json else response.text

View File

@@ -1,10 +1,10 @@
import json import json
from enum import Enum from enum import Enum
from typing import Any
import requests
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField from backend.data.model import SchemaField
from backend.util.request import requests
class HttpMethod(Enum): class HttpMethod(Enum):
@@ -31,9 +31,14 @@ class SendWebRequestBlock(Block):
description="The headers to include in the request", description="The headers to include in the request",
default={}, default={},
) )
body: object = SchemaField( json_format: bool = SchemaField(
title="JSON format",
description="Whether to send and receive body as JSON",
default=True,
)
body: Any = SchemaField(
description="The body of the request", description="The body of the request",
default={}, default=None,
) )
class Output(BlockSchema): class Output(BlockSchema):
@@ -58,13 +63,16 @@ class SendWebRequestBlock(Block):
input_data.method.value, input_data.method.value,
input_data.url, input_data.url,
headers=input_data.headers, headers=input_data.headers,
json=input_data.body, json=input_data.body if input_data.json_format else None,
data=input_data.body if not input_data.json_format else None,
) )
result = response.json() if input_data.json_format else response.text
if response.status_code // 100 == 2: if response.status_code // 100 == 2:
yield "response", response.json() yield "response", result
elif response.status_code // 100 == 4: elif response.status_code // 100 == 4:
yield "client_error", response.json() yield "client_error", result
elif response.status_code // 100 == 5: elif response.status_code // 100 == 5:
yield "server_error", response.json() yield "server_error", result
else: else:
raise ValueError(f"Unexpected status code: {response.status_code}") raise ValueError(f"Unexpected status code: {response.status_code}")

View File

@@ -1,12 +1,13 @@
from enum import Enum from enum import Enum
from typing import Any, Dict, Literal, Optional from typing import Any, Dict, Literal, Optional
import requests
from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials
from pydantic import SecretStr from pydantic import SecretStr
from requests.exceptions import RequestException
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import CredentialsField, CredentialsMetaInput, SchemaField from backend.data.model import CredentialsField, CredentialsMetaInput, SchemaField
from backend.util.request import requests
TEST_CREDENTIALS = APIKeyCredentials( TEST_CREDENTIALS = APIKeyCredentials(
id="01234567-89ab-cdef-0123-456789abcdef", id="01234567-89ab-cdef-0123-456789abcdef",
@@ -242,9 +243,8 @@ class IdeogramModelBlock(Block):
try: try:
response = requests.post(url, json=data, headers=headers) response = requests.post(url, json=data, headers=headers)
response.raise_for_status()
return response.json()["data"][0]["url"] return response.json()["data"][0]["url"]
except requests.exceptions.RequestException as e: except RequestException as e:
raise Exception(f"Failed to fetch image: {str(e)}") raise Exception(f"Failed to fetch image: {str(e)}")
def upscale_image(self, api_key: SecretStr, image_url: str): def upscale_image(self, api_key: SecretStr, image_url: str):
@@ -256,7 +256,6 @@ class IdeogramModelBlock(Block):
try: try:
# Step 1: Download the image from the provided URL # Step 1: Download the image from the provided URL
image_response = requests.get(image_url) image_response = requests.get(image_url)
image_response.raise_for_status()
# Step 2: Send the downloaded image to the upscale API # Step 2: Send the downloaded image to the upscale API
files = { files = {
@@ -272,8 +271,7 @@ class IdeogramModelBlock(Block):
files=files, files=files,
) )
response.raise_for_status()
return response.json()["data"][0]["url"] return response.json()["data"][0]["url"]
except requests.exceptions.RequestException as e: except RequestException as e:
raise Exception(f"Failed to upscale image: {str(e)}") raise Exception(f"Failed to upscale image: {str(e)}")

View File

@@ -1,5 +1,3 @@
import requests
from backend.blocks.jina._auth import ( from backend.blocks.jina._auth import (
JinaCredentials, JinaCredentials,
JinaCredentialsField, JinaCredentialsField,
@@ -7,6 +5,7 @@ from backend.blocks.jina._auth import (
) )
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField from backend.data.model import SchemaField
from backend.util.request import requests
class JinaChunkingBlock(Block): class JinaChunkingBlock(Block):
@@ -57,7 +56,6 @@ class JinaChunkingBlock(Block):
} }
response = requests.post(url, headers=headers, json=data) response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
result = response.json() result = response.json()
all_chunks.extend(result.get("chunks", [])) all_chunks.extend(result.get("chunks", []))

View File

@@ -1,5 +1,3 @@
import requests
from backend.blocks.jina._auth import ( from backend.blocks.jina._auth import (
JinaCredentials, JinaCredentials,
JinaCredentialsField, JinaCredentialsField,
@@ -7,6 +5,7 @@ from backend.blocks.jina._auth import (
) )
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField from backend.data.model import SchemaField
from backend.util.request import requests
class JinaEmbeddingBlock(Block): class JinaEmbeddingBlock(Block):

View File

@@ -1,7 +1,6 @@
from enum import Enum from enum import Enum
from typing import List, Literal from typing import List, Literal
import requests
from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials
from pydantic import SecretStr from pydantic import SecretStr
@@ -13,6 +12,7 @@ from backend.data.model import (
SchemaField, SchemaField,
SecretField, SecretField,
) )
from backend.util.request import requests
TEST_CREDENTIALS = APIKeyCredentials( TEST_CREDENTIALS = APIKeyCredentials(
id="01234567-89ab-cdef-0123-456789abcdef", id="01234567-89ab-cdef-0123-456789abcdef",

View File

@@ -1,12 +1,12 @@
import time import time
from typing import Literal from typing import Literal
import requests
from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials
from pydantic import SecretStr from pydantic import SecretStr
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import CredentialsField, CredentialsMetaInput, SchemaField from backend.data.model import CredentialsField, CredentialsMetaInput, SchemaField
from backend.util.request import requests
TEST_CREDENTIALS = APIKeyCredentials( TEST_CREDENTIALS = APIKeyCredentials(
id="01234567-89ab-cdef-0123-456789abcdef", id="01234567-89ab-cdef-0123-456789abcdef",
@@ -118,7 +118,6 @@ class CreateTalkingAvatarVideoBlock(Block):
"authorization": f"Basic {api_key.get_secret_value()}", "authorization": f"Basic {api_key.get_secret_value()}",
} }
response = requests.post(url, json=payload, headers=headers) response = requests.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json() return response.json()
def get_clip_status(self, api_key: SecretStr, clip_id: str) -> dict: def get_clip_status(self, api_key: SecretStr, clip_id: str) -> dict:
@@ -128,7 +127,6 @@ class CreateTalkingAvatarVideoBlock(Block):
"authorization": f"Basic {api_key.get_secret_value()}", "authorization": f"Basic {api_key.get_secret_value()}",
} }
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
response.raise_for_status()
return response.json() return response.json()
def run( def run(

View File

@@ -1,11 +1,11 @@
from typing import Any, Literal from typing import Any, Literal
import requests
from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials
from pydantic import SecretStr from pydantic import SecretStr
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import CredentialsField, CredentialsMetaInput, SchemaField from backend.data.model import CredentialsField, CredentialsMetaInput, SchemaField
from backend.util.request import requests
TEST_CREDENTIALS = APIKeyCredentials( TEST_CREDENTIALS = APIKeyCredentials(
id="01234567-89ab-cdef-0123-456789abcdef", id="01234567-89ab-cdef-0123-456789abcdef",
@@ -86,7 +86,6 @@ class UnrealTextToSpeechBlock(Block):
} }
response = requests.post(url, headers=headers, json=data) response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json() return response.json()
def run( def run(

View File

@@ -2,9 +2,10 @@ import time
from typing import Optional from typing import Optional
from urllib.parse import urlencode from urllib.parse import urlencode
import requests
from autogpt_libs.supabase_integration_credentials_store import OAuth2Credentials from autogpt_libs.supabase_integration_credentials_store import OAuth2Credentials
from backend.util.request import requests
from .base import BaseOAuthHandler from .base import BaseOAuthHandler
@@ -56,13 +57,12 @@ class GitHubOAuthHandler(BaseOAuthHandler):
"X-GitHub-Api-Version": "2022-11-28", "X-GitHub-Api-Version": "2022-11-28",
} }
response = requests.delete( requests.delete(
url=self.revoke_url.format(client_id=self.client_id), url=self.revoke_url.format(client_id=self.client_id),
auth=(self.client_id, self.client_secret), auth=(self.client_id, self.client_secret),
headers=headers, headers=headers,
json={"access_token": credentials.access_token.get_secret_value()}, json={"access_token": credentials.access_token.get_secret_value()},
) )
response.raise_for_status()
return True return True
def _refresh_tokens(self, credentials: OAuth2Credentials) -> OAuth2Credentials: def _refresh_tokens(self, credentials: OAuth2Credentials) -> OAuth2Credentials:
@@ -88,7 +88,6 @@ class GitHubOAuthHandler(BaseOAuthHandler):
} }
headers = {"Accept": "application/json"} headers = {"Accept": "application/json"}
response = requests.post(self.token_url, data=request_body, headers=headers) response = requests.post(self.token_url, data=request_body, headers=headers)
response.raise_for_status()
token_data: dict = response.json() token_data: dict = response.json()
username = self._request_username(token_data["access_token"]) username = self._request_username(token_data["access_token"])

View File

@@ -103,12 +103,11 @@ class GoogleOAuthHandler(BaseOAuthHandler):
def revoke_tokens(self, credentials: OAuth2Credentials) -> bool: def revoke_tokens(self, credentials: OAuth2Credentials) -> bool:
session = AuthorizedSession(credentials) session = AuthorizedSession(credentials)
response = session.post( session.post(
self.revoke_uri, self.revoke_uri,
params={"token": credentials.access_token.get_secret_value()}, params={"token": credentials.access_token.get_secret_value()},
headers={"content-type": "application/x-www-form-urlencoded"}, headers={"content-type": "application/x-www-form-urlencoded"},
) )
response.raise_for_status()
return True return True
def _request_email( def _request_email(

View File

@@ -1,9 +1,10 @@
from base64 import b64encode from base64 import b64encode
from urllib.parse import urlencode from urllib.parse import urlencode
import requests
from autogpt_libs.supabase_integration_credentials_store import OAuth2Credentials from autogpt_libs.supabase_integration_credentials_store import OAuth2Credentials
from backend.util.request import requests
from .base import BaseOAuthHandler from .base import BaseOAuthHandler
@@ -49,7 +50,6 @@ class NotionOAuthHandler(BaseOAuthHandler):
"Accept": "application/json", "Accept": "application/json",
} }
response = requests.post(self.token_url, json=request_body, headers=headers) response = requests.post(self.token_url, json=request_body, headers=headers)
response.raise_for_status()
token_data = response.json() token_data = response.json()
# Email is only available for non-bot users # Email is only available for non-bot users
email = ( email = (

View File

@@ -0,0 +1,132 @@
import ipaddress
import socket
from typing import Callable
from urllib.parse import urlparse
import requests as req
from backend.util.settings import Config
# List of IP networks to block
BLOCKED_IP_NETWORKS = [
ipaddress.ip_network("0.0.0.0/8"), # "This" Network
ipaddress.ip_network("10.0.0.0/8"), # Private-Use
ipaddress.ip_network("127.0.0.0/8"), # Loopback
ipaddress.ip_network("169.254.0.0/16"), # Link Local
ipaddress.ip_network("172.16.0.0/12"), # Private-Use
ipaddress.ip_network("192.168.0.0/16"), # Private-Use
ipaddress.ip_network("224.0.0.0/4"), # Multicast
ipaddress.ip_network("240.0.0.0/4"), # Reserved for Future Use
]
def is_ip_blocked(ip: str) -> bool:
"""
Checks if the IP address is in a blocked network.
"""
ip_addr = ipaddress.ip_address(ip)
return any(ip_addr in network for network in BLOCKED_IP_NETWORKS)
def validate_url(url: str, trusted_origins: list[str]) -> str:
"""
Validates the URL to prevent SSRF attacks by ensuring it does not point to a private
or untrusted IP address, unless whitelisted.
"""
url = url.strip().strip("/")
if not url.startswith(("http://", "https://")):
url = "http://" + url
parsed_url = urlparse(url)
hostname = parsed_url.hostname
if not hostname:
raise ValueError(f"Invalid URL: Unable to determine hostname from {url}")
if any(hostname == origin for origin in trusted_origins):
return url
# Resolve all IP addresses for the hostname
ip_addresses = {result[4][0] for result in socket.getaddrinfo(hostname, None)}
if not ip_addresses:
raise ValueError(f"Unable to resolve IP address for {hostname}")
# Check if all IP addresses are global
for ip in ip_addresses:
if is_ip_blocked(ip):
raise ValueError(
f"Access to private IP address at {hostname}: {ip} is not allowed."
)
return url
class Requests:
"""
A wrapper around the requests library that validates URLs before making requests.
"""
def __init__(
self,
trusted_origins: list[str] | None = None,
raise_for_status: bool = True,
extra_url_validator: Callable[[str], str] | None = None,
extra_headers: dict[str, str] | None = None,
):
self.trusted_origins = []
for url in trusted_origins or []:
hostname = urlparse(url).hostname
if not hostname:
raise ValueError(f"Invalid URL: Unable to determine hostname of {url}")
self.trusted_origins.append(hostname)
self.raise_for_status = raise_for_status
self.extra_url_validator = extra_url_validator
self.extra_headers = extra_headers
def request(
self, method, url, headers=None, allow_redirects=False, *args, **kwargs
) -> req.Response:
if self.extra_headers is not None:
headers = {**(headers or {}), **self.extra_headers}
url = validate_url(url, self.trusted_origins)
if self.extra_url_validator is not None:
url = self.extra_url_validator(url)
response = req.request(
method,
url,
headers=headers,
allow_redirects=allow_redirects,
*args,
**kwargs,
)
if self.raise_for_status:
response.raise_for_status()
return response
def get(self, url, *args, **kwargs) -> req.Response:
return self.request("GET", url, *args, **kwargs)
def post(self, url, *args, **kwargs) -> req.Response:
return self.request("POST", url, *args, **kwargs)
def put(self, url, *args, **kwargs) -> req.Response:
return self.request("PUT", url, *args, **kwargs)
def delete(self, url, *args, **kwargs) -> req.Response:
return self.request("DELETE", url, *args, **kwargs)
def head(self, url, *args, **kwargs) -> req.Response:
return self.request("HEAD", url, *args, **kwargs)
def options(self, url, *args, **kwargs) -> req.Response:
return self.request("OPTIONS", url, *args, **kwargs)
def patch(self, url, *args, **kwargs) -> req.Response:
return self.request("PATCH", url, *args, **kwargs)
requests = Requests(trusted_origins=Config().trust_endpoints_for_requests)

View File

@@ -161,6 +161,11 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="Name of the event bus", description="Name of the event bus",
) )
trust_endpoints_for_requests: List[str] = Field(
default_factory=list,
description="A whitelist of trusted internal endpoints for the backend to make requests to.",
)
backend_cors_allow_origins: List[str] = Field(default_factory=list) backend_cors_allow_origins: List[str] = Field(default_factory=list)
@field_validator("backend_cors_allow_origins") @field_validator("backend_cors_allow_origins")

View File

@@ -0,0 +1,21 @@
import pytest
from backend.util.request import validate_url
def test_validate_url():
with pytest.raises(ValueError):
validate_url("localhost", [])
with pytest.raises(ValueError):
validate_url("192.168.1.1", [])
with pytest.raises(ValueError):
validate_url("127.0.0.1", [])
with pytest.raises(ValueError):
validate_url("0.0.0.0", [])
validate_url("google.com", [])
validate_url("github.com", [])
validate_url("http://github.com", [])

View File

@@ -768,7 +768,7 @@ const NodeBooleanInput: FC<{
<div className={className}> <div className={className}>
<div className="nodrag flex items-center"> <div className="nodrag flex items-center">
<Switch <Switch
checked={value} defaultChecked={value}
onCheckedChange={(v) => handleInputChange(selfKey, v)} onCheckedChange={(v) => handleInputChange(selfKey, v)}
/> />
<span className="ml-3">{displayName}</span> <span className="ml-3">{displayName}</span>