mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-01-11 16:18:07 -05:00
Compare commits
18 Commits
hackathon-
...
contributo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
86c3f50506 | ||
|
|
8729ca9aab | ||
|
|
6747c056f0 | ||
|
|
6e28227c42 | ||
|
|
13285ea15c | ||
|
|
5fe1366825 | ||
|
|
84a54af9dc | ||
|
|
31675c9fb7 | ||
|
|
4908f4633d | ||
|
|
6ac4132e64 | ||
|
|
da5bf6755d | ||
|
|
a52f7a1efb | ||
|
|
23d1c0010c | ||
|
|
51f9336e9e | ||
|
|
22b4e8b8c0 | ||
|
|
038041f467 | ||
|
|
952f6f58ef | ||
|
|
151fad5ced |
@@ -3,12 +3,12 @@ import time
|
||||
from enum import Enum
|
||||
from typing import Literal
|
||||
|
||||
import requests
|
||||
from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials
|
||||
from pydantic import SecretStr
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import CredentialsField, CredentialsMetaInput, SchemaField
|
||||
from backend.util.request import requests
|
||||
|
||||
TEST_CREDENTIALS = APIKeyCredentials(
|
||||
id="01234567-89ab-cdef-0123-456789abcdef",
|
||||
@@ -217,7 +217,6 @@ class AIShortformVideoCreatorBlock(Block):
|
||||
url = "https://webhook.site/token"
|
||||
headers = {"Accept": "application/json", "Content-Type": "application/json"}
|
||||
response = requests.post(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
webhook_data = response.json()
|
||||
return webhook_data["uuid"], f"https://webhook.site/{webhook_data['uuid']}"
|
||||
|
||||
@@ -228,14 +227,12 @@ class AIShortformVideoCreatorBlock(Block):
|
||||
logger.debug(
|
||||
f"API Response Status Code: {response.status_code}, Content: {response.text}"
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def check_video_status(self, api_key: SecretStr, pid: str) -> dict:
|
||||
url = f"https://www.revid.ai/api/public/v2/status?pid={pid}"
|
||||
headers = {"key": api_key.get_secret_value()}
|
||||
response = requests.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def wait_for_video(
|
||||
|
||||
43
autogpt_platform/backend/backend/blocks/github/_api.py
Normal file
43
autogpt_platform/backend/backend/blocks/github/_api.py
Normal file
@@ -0,0 +1,43 @@
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from backend.blocks.github._auth import GithubCredentials
|
||||
from backend.util.request import Requests
|
||||
|
||||
|
||||
def _convert_to_api_url(url: str) -> str:
|
||||
"""
|
||||
Converts a standard GitHub URL to the corresponding GitHub API URL.
|
||||
Handles repository URLs, issue URLs, pull request URLs, and more.
|
||||
"""
|
||||
parsed_url = urlparse(url)
|
||||
path_parts = parsed_url.path.strip("/").split("/")
|
||||
|
||||
if len(path_parts) >= 2:
|
||||
owner, repo = path_parts[0], path_parts[1]
|
||||
api_base = f"https://api.github.com/repos/{owner}/{repo}"
|
||||
|
||||
if len(path_parts) > 2:
|
||||
additional_path = "/".join(path_parts[2:])
|
||||
api_url = f"{api_base}/{additional_path}"
|
||||
else:
|
||||
# Repository base URL
|
||||
api_url = api_base
|
||||
else:
|
||||
raise ValueError("Invalid GitHub URL format.")
|
||||
|
||||
return api_url
|
||||
|
||||
|
||||
def _get_headers(credentials: GithubCredentials) -> dict[str, str]:
|
||||
return {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
|
||||
def get_api(credentials: GithubCredentials) -> Requests:
|
||||
return Requests(
|
||||
trusted_origins=["https://api.github.com", "https://github.com"],
|
||||
extra_url_validator=_convert_to_api_url,
|
||||
extra_headers=_get_headers(credentials),
|
||||
)
|
||||
@@ -1,9 +1,11 @@
|
||||
import requests
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import SchemaField
|
||||
|
||||
from ._api import get_api
|
||||
from ._auth import (
|
||||
TEST_CREDENTIALS,
|
||||
TEST_CREDENTIALS_INPUT,
|
||||
@@ -13,6 +15,10 @@ from ._auth import (
|
||||
)
|
||||
|
||||
|
||||
def is_github_url(url: str) -> bool:
|
||||
return urlparse(url).netloc == "github.com"
|
||||
|
||||
|
||||
# --8<-- [start:GithubCommentBlockExample]
|
||||
class GithubCommentBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
@@ -62,27 +68,10 @@ class GithubCommentBlock(Block):
|
||||
def post_comment(
|
||||
credentials: GithubCredentials, issue_url: str, body_text: str
|
||||
) -> tuple[int, str]:
|
||||
if "/pull/" in issue_url:
|
||||
api_url = (
|
||||
issue_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/issues/"
|
||||
)
|
||||
+ "/comments"
|
||||
)
|
||||
else:
|
||||
api_url = (
|
||||
issue_url.replace("github.com", "api.github.com/repos") + "/comments"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = get_api(credentials)
|
||||
data = {"body": body_text}
|
||||
|
||||
response = requests.post(api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
comments_url = issue_url + "/comments"
|
||||
response = api.post(comments_url, json=data)
|
||||
comment = response.json()
|
||||
return comment["id"], comment["html_url"]
|
||||
|
||||
@@ -156,16 +145,10 @@ class GithubMakeIssueBlock(Block):
|
||||
def create_issue(
|
||||
credentials: GithubCredentials, repo_url: str, title: str, body: str
|
||||
) -> tuple[int, str]:
|
||||
api_url = repo_url.replace("github.com", "api.github.com/repos") + "/issues"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = get_api(credentials)
|
||||
data = {"title": title, "body": body}
|
||||
|
||||
response = requests.post(api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
issues_url = repo_url + "/issues"
|
||||
response = api.post(issues_url, json=data)
|
||||
issue = response.json()
|
||||
return issue["number"], issue["html_url"]
|
||||
|
||||
@@ -232,21 +215,12 @@ class GithubReadIssueBlock(Block):
|
||||
def read_issue(
|
||||
credentials: GithubCredentials, issue_url: str
|
||||
) -> tuple[str, str, str]:
|
||||
api_url = issue_url.replace("github.com", "api.github.com/repos")
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = get_api(credentials)
|
||||
response = api.get(issue_url)
|
||||
data = response.json()
|
||||
title = data.get("title", "No title found")
|
||||
body = data.get("body", "No body content found")
|
||||
user = data.get("user", {}).get("login", "No user found")
|
||||
|
||||
return title, body, user
|
||||
|
||||
def run(
|
||||
@@ -318,20 +292,13 @@ class GithubListIssuesBlock(Block):
|
||||
def list_issues(
|
||||
credentials: GithubCredentials, repo_url: str
|
||||
) -> list[Output.IssueItem]:
|
||||
api_url = repo_url.replace("github.com", "api.github.com/repos") + "/issues"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = get_api(credentials)
|
||||
issues_url = repo_url + "/issues"
|
||||
response = api.get(issues_url)
|
||||
data = response.json()
|
||||
issues: list[GithubListIssuesBlock.Output.IssueItem] = [
|
||||
{"title": issue["title"], "url": issue["html_url"]} for issue in data
|
||||
]
|
||||
|
||||
return issues
|
||||
|
||||
def run(
|
||||
@@ -385,28 +352,10 @@ class GithubAddLabelBlock(Block):
|
||||
|
||||
@staticmethod
|
||||
def add_label(credentials: GithubCredentials, issue_url: str, label: str) -> str:
|
||||
# Convert the provided GitHub URL to the API URL
|
||||
if "/pull/" in issue_url:
|
||||
api_url = (
|
||||
issue_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/issues/"
|
||||
)
|
||||
+ "/labels"
|
||||
)
|
||||
else:
|
||||
api_url = (
|
||||
issue_url.replace("github.com", "api.github.com/repos") + "/labels"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = get_api(credentials)
|
||||
data = {"labels": [label]}
|
||||
|
||||
response = requests.post(api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
labels_url = issue_url + "/labels"
|
||||
api.post(labels_url, json=data)
|
||||
return "Label added successfully"
|
||||
|
||||
def run(
|
||||
@@ -463,31 +412,9 @@ class GithubRemoveLabelBlock(Block):
|
||||
|
||||
@staticmethod
|
||||
def remove_label(credentials: GithubCredentials, issue_url: str, label: str) -> str:
|
||||
# Convert the provided GitHub URL to the API URL
|
||||
if "/pull/" in issue_url:
|
||||
api_url = (
|
||||
issue_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/issues/"
|
||||
)
|
||||
+ f"/labels/{label}"
|
||||
)
|
||||
else:
|
||||
api_url = (
|
||||
issue_url.replace("github.com", "api.github.com/repos")
|
||||
+ f"/labels/{label}"
|
||||
)
|
||||
|
||||
# Log the constructed API URL for debugging
|
||||
print(f"Constructed API URL: {api_url}")
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.delete(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = get_api(credentials)
|
||||
label_url = issue_url + f"/labels/{label}"
|
||||
api.delete(label_url)
|
||||
return "Label removed successfully"
|
||||
|
||||
def run(
|
||||
@@ -550,23 +477,10 @@ class GithubAssignIssueBlock(Block):
|
||||
issue_url: str,
|
||||
assignee: str,
|
||||
) -> str:
|
||||
# Extracting repo path and issue number from the issue URL
|
||||
repo_path, issue_number = issue_url.replace("https://github.com/", "").split(
|
||||
"/issues/"
|
||||
)
|
||||
api_url = (
|
||||
f"https://api.github.com/repos/{repo_path}/issues/{issue_number}/assignees"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = get_api(credentials)
|
||||
assignees_url = issue_url + "/assignees"
|
||||
data = {"assignees": [assignee]}
|
||||
|
||||
response = requests.post(api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
api.post(assignees_url, json=data)
|
||||
return "Issue assigned successfully"
|
||||
|
||||
def run(
|
||||
@@ -629,23 +543,10 @@ class GithubUnassignIssueBlock(Block):
|
||||
issue_url: str,
|
||||
assignee: str,
|
||||
) -> str:
|
||||
# Extracting repo path and issue number from the issue URL
|
||||
repo_path, issue_number = issue_url.replace("https://github.com/", "").split(
|
||||
"/issues/"
|
||||
)
|
||||
api_url = (
|
||||
f"https://api.github.com/repos/{repo_path}/issues/{issue_number}/assignees"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = get_api(credentials)
|
||||
assignees_url = issue_url + "/assignees"
|
||||
data = {"assignees": [assignee]}
|
||||
|
||||
response = requests.delete(api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
api.delete(assignees_url, json=data)
|
||||
return "Issue unassigned successfully"
|
||||
|
||||
def run(
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import requests
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import SchemaField
|
||||
|
||||
from ._api import get_api
|
||||
from ._auth import (
|
||||
TEST_CREDENTIALS,
|
||||
TEST_CREDENTIALS_INPUT,
|
||||
@@ -64,20 +64,13 @@ class GithubListPullRequestsBlock(Block):
|
||||
|
||||
@staticmethod
|
||||
def list_prs(credentials: GithubCredentials, repo_url: str) -> list[Output.PRItem]:
|
||||
api_url = repo_url.replace("github.com", "api.github.com/repos") + "/pulls"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = get_api(credentials)
|
||||
pulls_url = repo_url + "/pulls"
|
||||
response = api.get(pulls_url)
|
||||
data = response.json()
|
||||
pull_requests: list[GithubListPullRequestsBlock.Output.PRItem] = [
|
||||
{"title": pr["title"], "url": pr["html_url"]} for pr in data
|
||||
]
|
||||
|
||||
return pull_requests
|
||||
|
||||
def run(
|
||||
@@ -110,7 +103,11 @@ class GithubMakePullRequestBlock(Block):
|
||||
placeholder="Enter the pull request body",
|
||||
)
|
||||
head: str = SchemaField(
|
||||
description="The name of the branch where your changes are implemented. For cross-repository pull requests in the same network, namespace head with a user like this: username:branch.",
|
||||
description=(
|
||||
"The name of the branch where your changes are implemented. "
|
||||
"For cross-repository pull requests in the same network, "
|
||||
"namespace head with a user like this: username:branch."
|
||||
),
|
||||
placeholder="Enter the head branch",
|
||||
)
|
||||
base: str = SchemaField(
|
||||
@@ -162,17 +159,10 @@ class GithubMakePullRequestBlock(Block):
|
||||
head: str,
|
||||
base: str,
|
||||
) -> tuple[int, str]:
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
api_url = f"https://api.github.com/repos/{repo_path}/pulls"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = get_api(credentials)
|
||||
pulls_url = repo_url + "/pulls"
|
||||
data = {"title": title, "body": body, "head": head, "base": base}
|
||||
|
||||
response = requests.post(api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
response = api.post(pulls_url, json=data)
|
||||
pr_data = response.json()
|
||||
return pr_data["number"], pr_data["html_url"]
|
||||
|
||||
@@ -194,13 +184,8 @@ class GithubMakePullRequestBlock(Block):
|
||||
)
|
||||
yield "number", number
|
||||
yield "url", url
|
||||
except requests.exceptions.HTTPError as http_err:
|
||||
if http_err.response.status_code == 422:
|
||||
error_details = http_err.response.json()
|
||||
error_message = error_details.get("message", "Unknown error")
|
||||
else:
|
||||
error_message = str(http_err)
|
||||
raise RuntimeError(f"Failed to create pull request: {error_message}")
|
||||
except Exception as e:
|
||||
yield "error", str(e)
|
||||
|
||||
|
||||
class GithubReadPullRequestBlock(Block):
|
||||
@@ -255,42 +240,21 @@ class GithubReadPullRequestBlock(Block):
|
||||
|
||||
@staticmethod
|
||||
def read_pr(credentials: GithubCredentials, pr_url: str) -> tuple[str, str, str]:
|
||||
api_url = pr_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/issues/"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = get_api(credentials)
|
||||
# Adjust the URL to access the issue endpoint for PR metadata
|
||||
issue_url = pr_url.replace("/pull/", "/issues/")
|
||||
response = api.get(issue_url)
|
||||
data = response.json()
|
||||
title = data.get("title", "No title found")
|
||||
body = data.get("body", "No body content found")
|
||||
author = data.get("user", {}).get("login", "No user found")
|
||||
|
||||
return title, body, author
|
||||
|
||||
@staticmethod
|
||||
def read_pr_changes(credentials: GithubCredentials, pr_url: str) -> str:
|
||||
api_url = (
|
||||
pr_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/pulls/"
|
||||
)
|
||||
+ "/files"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = get_api(credentials)
|
||||
files_url = pr_url + "/files"
|
||||
response = api.get(files_url)
|
||||
files = response.json()
|
||||
changes = []
|
||||
for file in files:
|
||||
@@ -298,7 +262,6 @@ class GithubReadPullRequestBlock(Block):
|
||||
patch = file.get("patch")
|
||||
if filename and patch:
|
||||
changes.append(f"File: {filename}\n{patch}")
|
||||
|
||||
return "\n\n".join(changes)
|
||||
|
||||
def run(
|
||||
@@ -367,23 +330,10 @@ class GithubAssignPRReviewerBlock(Block):
|
||||
def assign_reviewer(
|
||||
credentials: GithubCredentials, pr_url: str, reviewer: str
|
||||
) -> str:
|
||||
# Convert the PR URL to the appropriate API endpoint
|
||||
api_url = (
|
||||
pr_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/pulls/"
|
||||
)
|
||||
+ "/requested_reviewers"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = get_api(credentials)
|
||||
reviewers_url = pr_url + "/requested_reviewers"
|
||||
data = {"reviewers": [reviewer]}
|
||||
|
||||
response = requests.post(api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
api.post(reviewers_url, json=data)
|
||||
return "Reviewer assigned successfully"
|
||||
|
||||
def run(
|
||||
@@ -400,17 +350,8 @@ class GithubAssignPRReviewerBlock(Block):
|
||||
input_data.reviewer,
|
||||
)
|
||||
yield "status", status
|
||||
except requests.exceptions.HTTPError as http_err:
|
||||
if http_err.response.status_code == 422:
|
||||
error_msg = (
|
||||
"Failed to assign reviewer: "
|
||||
f"The reviewer '{input_data.reviewer}' may not have permission "
|
||||
"or the pull request is not in a valid state. "
|
||||
f"Detailed error: {http_err.response.text}"
|
||||
)
|
||||
else:
|
||||
error_msg = f"HTTP error: {http_err} - {http_err.response.text}"
|
||||
raise RuntimeError(error_msg)
|
||||
except Exception as e:
|
||||
yield "error", str(e)
|
||||
|
||||
|
||||
class GithubUnassignPRReviewerBlock(Block):
|
||||
@@ -456,21 +397,10 @@ class GithubUnassignPRReviewerBlock(Block):
|
||||
def unassign_reviewer(
|
||||
credentials: GithubCredentials, pr_url: str, reviewer: str
|
||||
) -> str:
|
||||
api_url = (
|
||||
pr_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/pulls/"
|
||||
)
|
||||
+ "/requested_reviewers"
|
||||
)
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = get_api(credentials)
|
||||
reviewers_url = pr_url + "/requested_reviewers"
|
||||
data = {"reviewers": [reviewer]}
|
||||
|
||||
response = requests.delete(api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
api.delete(reviewers_url, json=data)
|
||||
return "Reviewer unassigned successfully"
|
||||
|
||||
def run(
|
||||
@@ -480,12 +410,15 @@ class GithubUnassignPRReviewerBlock(Block):
|
||||
credentials: GithubCredentials,
|
||||
**kwargs,
|
||||
) -> BlockOutput:
|
||||
status = self.unassign_reviewer(
|
||||
credentials,
|
||||
input_data.pr_url,
|
||||
input_data.reviewer,
|
||||
)
|
||||
yield "status", status
|
||||
try:
|
||||
status = self.unassign_reviewer(
|
||||
credentials,
|
||||
input_data.pr_url,
|
||||
input_data.reviewer,
|
||||
)
|
||||
yield "status", status
|
||||
except Exception as e:
|
||||
yield "error", str(e)
|
||||
|
||||
|
||||
class GithubListPRReviewersBlock(Block):
|
||||
@@ -544,26 +477,14 @@ class GithubListPRReviewersBlock(Block):
|
||||
def list_reviewers(
|
||||
credentials: GithubCredentials, pr_url: str
|
||||
) -> list[Output.ReviewerItem]:
|
||||
api_url = (
|
||||
pr_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/pulls/"
|
||||
)
|
||||
+ "/requested_reviewers"
|
||||
)
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = get_api(credentials)
|
||||
reviewers_url = pr_url + "/requested_reviewers"
|
||||
response = api.get(reviewers_url)
|
||||
data = response.json()
|
||||
reviewers: list[GithubListPRReviewersBlock.Output.ReviewerItem] = [
|
||||
{"username": reviewer["login"], "url": reviewer["html_url"]}
|
||||
for reviewer in data.get("users", [])
|
||||
]
|
||||
|
||||
return reviewers
|
||||
|
||||
def run(
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import base64
|
||||
|
||||
import requests
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import SchemaField
|
||||
|
||||
from ._api import get_api
|
||||
from ._auth import (
|
||||
TEST_CREDENTIALS,
|
||||
TEST_CREDENTIALS_INPUT,
|
||||
@@ -68,17 +68,11 @@ class GithubListTagsBlock(Block):
|
||||
def list_tags(
|
||||
credentials: GithubCredentials, repo_url: str
|
||||
) -> list[Output.TagItem]:
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
api_url = f"https://api.github.com/repos/{repo_path}/tags"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = get_api(credentials)
|
||||
tags_url = repo_url + "/tags"
|
||||
response = api.get(tags_url)
|
||||
data = response.json()
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
tags: list[GithubListTagsBlock.Output.TagItem] = [
|
||||
{
|
||||
"name": tag["name"],
|
||||
@@ -86,7 +80,6 @@ class GithubListTagsBlock(Block):
|
||||
}
|
||||
for tag in data
|
||||
]
|
||||
|
||||
return tags
|
||||
|
||||
def run(
|
||||
@@ -157,20 +150,18 @@ class GithubListBranchesBlock(Block):
|
||||
def list_branches(
|
||||
credentials: GithubCredentials, repo_url: str
|
||||
) -> list[Output.BranchItem]:
|
||||
api_url = repo_url.replace("github.com", "api.github.com/repos") + "/branches"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = get_api(credentials)
|
||||
branches_url = repo_url + "/branches"
|
||||
response = api.get(branches_url)
|
||||
data = response.json()
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
branches: list[GithubListBranchesBlock.Output.BranchItem] = [
|
||||
{"name": branch["name"], "url": branch["commit"]["url"]} for branch in data
|
||||
{
|
||||
"name": branch["name"],
|
||||
"url": f"https://github.com/{repo_path}/tree/{branch['name']}",
|
||||
}
|
||||
for branch in data
|
||||
]
|
||||
|
||||
return branches
|
||||
|
||||
def run(
|
||||
@@ -246,6 +237,8 @@ class GithubListDiscussionsBlock(Block):
|
||||
def list_discussions(
|
||||
credentials: GithubCredentials, repo_url: str, num_discussions: int
|
||||
) -> list[Output.DiscussionItem]:
|
||||
api = get_api(credentials)
|
||||
# GitHub GraphQL API endpoint is different; we'll use api.post with custom URL
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
owner, repo = repo_path.split("/")
|
||||
query = """
|
||||
@@ -261,24 +254,15 @@ class GithubListDiscussionsBlock(Block):
|
||||
}
|
||||
"""
|
||||
variables = {"owner": owner, "repo": repo, "num": num_discussions}
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
response = api.post(
|
||||
"https://api.github.com/graphql",
|
||||
json={"query": query, "variables": variables},
|
||||
headers=headers,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
discussions: list[GithubListDiscussionsBlock.Output.DiscussionItem] = [
|
||||
{"title": discussion["title"], "url": discussion["url"]}
|
||||
for discussion in data["data"]["repository"]["discussions"]["nodes"]
|
||||
]
|
||||
|
||||
return discussions
|
||||
|
||||
def run(
|
||||
@@ -348,21 +332,13 @@ class GithubListReleasesBlock(Block):
|
||||
def list_releases(
|
||||
credentials: GithubCredentials, repo_url: str
|
||||
) -> list[Output.ReleaseItem]:
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
api_url = f"https://api.github.com/repos/{repo_path}/releases"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = get_api(credentials)
|
||||
releases_url = repo_url + "/releases"
|
||||
response = api.get(releases_url)
|
||||
data = response.json()
|
||||
releases: list[GithubListReleasesBlock.Output.ReleaseItem] = [
|
||||
{"name": release["name"], "url": release["html_url"]} for release in data
|
||||
]
|
||||
|
||||
return releases
|
||||
|
||||
def run(
|
||||
@@ -432,16 +408,9 @@ class GithubReadFileBlock(Block):
|
||||
def read_file(
|
||||
credentials: GithubCredentials, repo_url: str, file_path: str, branch: str
|
||||
) -> tuple[str, int]:
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
api_url = f"https://api.github.com/repos/{repo_path}/contents/{file_path}?ref={branch}"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = get_api(credentials)
|
||||
content_url = repo_url + f"/contents/{file_path}?ref={branch}"
|
||||
response = api.get(content_url)
|
||||
content = response.json()
|
||||
|
||||
if isinstance(content, list):
|
||||
@@ -549,46 +518,33 @@ class GithubReadFolderBlock(Block):
|
||||
def read_folder(
|
||||
credentials: GithubCredentials, repo_url: str, folder_path: str, branch: str
|
||||
) -> tuple[list[Output.FileEntry], list[Output.DirEntry]]:
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
api_url = f"https://api.github.com/repos/{repo_path}/contents/{folder_path}?ref={branch}"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = get_api(credentials)
|
||||
contents_url = repo_url + f"/contents/{folder_path}?ref={branch}"
|
||||
response = api.get(contents_url)
|
||||
content = response.json()
|
||||
|
||||
if isinstance(content, list):
|
||||
# Multiple entries of different types exist at this path
|
||||
if not (dir := next((d for d in content if d["type"] == "dir"), None)):
|
||||
raise TypeError("Not a folder")
|
||||
content = dir
|
||||
|
||||
if content["type"] != "dir":
|
||||
if not isinstance(content, list):
|
||||
raise TypeError("Not a folder")
|
||||
|
||||
return (
|
||||
[
|
||||
GithubReadFolderBlock.Output.FileEntry(
|
||||
name=entry["name"],
|
||||
path=entry["path"],
|
||||
size=entry["size"],
|
||||
)
|
||||
for entry in content["entries"]
|
||||
if entry["type"] == "file"
|
||||
],
|
||||
[
|
||||
GithubReadFolderBlock.Output.DirEntry(
|
||||
name=entry["name"],
|
||||
path=entry["path"],
|
||||
)
|
||||
for entry in content["entries"]
|
||||
if entry["type"] == "dir"
|
||||
],
|
||||
)
|
||||
files = [
|
||||
GithubReadFolderBlock.Output.FileEntry(
|
||||
name=entry["name"],
|
||||
path=entry["path"],
|
||||
size=entry["size"],
|
||||
)
|
||||
for entry in content
|
||||
if entry["type"] == "file"
|
||||
]
|
||||
dirs = [
|
||||
GithubReadFolderBlock.Output.DirEntry(
|
||||
name=entry["name"],
|
||||
path=entry["path"],
|
||||
)
|
||||
for entry in content
|
||||
if entry["type"] == "dir"
|
||||
]
|
||||
|
||||
return files, dirs
|
||||
|
||||
def run(
|
||||
self,
|
||||
@@ -656,26 +612,16 @@ class GithubMakeBranchBlock(Block):
|
||||
new_branch: str,
|
||||
source_branch: str,
|
||||
) -> str:
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
ref_api_url = (
|
||||
f"https://api.github.com/repos/{repo_path}/git/refs/heads/{source_branch}"
|
||||
)
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(ref_api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = get_api(credentials)
|
||||
# Get the SHA of the source branch
|
||||
ref_url = repo_url + f"/git/refs/heads/{source_branch}"
|
||||
response = api.get(ref_url)
|
||||
sha = response.json()["object"]["sha"]
|
||||
|
||||
create_branch_api_url = f"https://api.github.com/repos/{repo_path}/git/refs"
|
||||
# Create the new branch
|
||||
create_ref_url = repo_url + "/git/refs"
|
||||
data = {"ref": f"refs/heads/{new_branch}", "sha": sha}
|
||||
|
||||
response = requests.post(create_branch_api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
response = api.post(create_ref_url, json=data)
|
||||
return "Branch created successfully"
|
||||
|
||||
def run(
|
||||
@@ -735,16 +681,9 @@ class GithubDeleteBranchBlock(Block):
|
||||
def delete_branch(
|
||||
credentials: GithubCredentials, repo_url: str, branch: str
|
||||
) -> str:
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
api_url = f"https://api.github.com/repos/{repo_path}/git/refs/heads/{branch}"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.delete(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = get_api(credentials)
|
||||
ref_url = repo_url + f"/git/refs/heads/{branch}"
|
||||
api.delete(ref_url)
|
||||
return "Branch deleted successfully"
|
||||
|
||||
def run(
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from typing import Any, Optional
|
||||
|
||||
import requests
|
||||
from backend.util.request import requests
|
||||
|
||||
|
||||
class GetRequest:
|
||||
@@ -11,5 +11,4 @@ class GetRequest:
|
||||
if headers is None:
|
||||
headers = {}
|
||||
response = requests.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
return response.json() if json else response.text
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import json
|
||||
from enum import Enum
|
||||
|
||||
import requests
|
||||
from typing import Any
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import SchemaField
|
||||
from backend.util.request import requests
|
||||
|
||||
|
||||
class HttpMethod(Enum):
|
||||
@@ -31,9 +31,14 @@ class SendWebRequestBlock(Block):
|
||||
description="The headers to include in the request",
|
||||
default={},
|
||||
)
|
||||
body: object = SchemaField(
|
||||
json_format: bool = SchemaField(
|
||||
title="JSON format",
|
||||
description="Whether to send and receive body as JSON",
|
||||
default=True,
|
||||
)
|
||||
body: Any = SchemaField(
|
||||
description="The body of the request",
|
||||
default={},
|
||||
default=None,
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
@@ -58,13 +63,16 @@ class SendWebRequestBlock(Block):
|
||||
input_data.method.value,
|
||||
input_data.url,
|
||||
headers=input_data.headers,
|
||||
json=input_data.body,
|
||||
json=input_data.body if input_data.json_format else None,
|
||||
data=input_data.body if not input_data.json_format else None,
|
||||
)
|
||||
result = response.json() if input_data.json_format else response.text
|
||||
|
||||
if response.status_code // 100 == 2:
|
||||
yield "response", response.json()
|
||||
yield "response", result
|
||||
elif response.status_code // 100 == 4:
|
||||
yield "client_error", response.json()
|
||||
yield "client_error", result
|
||||
elif response.status_code // 100 == 5:
|
||||
yield "server_error", response.json()
|
||||
yield "server_error", result
|
||||
else:
|
||||
raise ValueError(f"Unexpected status code: {response.status_code}")
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, Literal, Optional
|
||||
|
||||
import requests
|
||||
from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials
|
||||
from pydantic import SecretStr
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import CredentialsField, CredentialsMetaInput, SchemaField
|
||||
from backend.util.request import requests
|
||||
|
||||
TEST_CREDENTIALS = APIKeyCredentials(
|
||||
id="01234567-89ab-cdef-0123-456789abcdef",
|
||||
@@ -242,9 +243,8 @@ class IdeogramModelBlock(Block):
|
||||
|
||||
try:
|
||||
response = requests.post(url, json=data, headers=headers)
|
||||
response.raise_for_status()
|
||||
return response.json()["data"][0]["url"]
|
||||
except requests.exceptions.RequestException as e:
|
||||
except RequestException as e:
|
||||
raise Exception(f"Failed to fetch image: {str(e)}")
|
||||
|
||||
def upscale_image(self, api_key: SecretStr, image_url: str):
|
||||
@@ -256,7 +256,6 @@ class IdeogramModelBlock(Block):
|
||||
try:
|
||||
# Step 1: Download the image from the provided URL
|
||||
image_response = requests.get(image_url)
|
||||
image_response.raise_for_status()
|
||||
|
||||
# Step 2: Send the downloaded image to the upscale API
|
||||
files = {
|
||||
@@ -272,8 +271,7 @@ class IdeogramModelBlock(Block):
|
||||
files=files,
|
||||
)
|
||||
|
||||
response.raise_for_status()
|
||||
return response.json()["data"][0]["url"]
|
||||
|
||||
except requests.exceptions.RequestException as e:
|
||||
except RequestException as e:
|
||||
raise Exception(f"Failed to upscale image: {str(e)}")
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import requests
|
||||
|
||||
from backend.blocks.jina._auth import (
|
||||
JinaCredentials,
|
||||
JinaCredentialsField,
|
||||
@@ -7,6 +5,7 @@ from backend.blocks.jina._auth import (
|
||||
)
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import SchemaField
|
||||
from backend.util.request import requests
|
||||
|
||||
|
||||
class JinaChunkingBlock(Block):
|
||||
@@ -57,7 +56,6 @@ class JinaChunkingBlock(Block):
|
||||
}
|
||||
|
||||
response = requests.post(url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
result = response.json()
|
||||
|
||||
all_chunks.extend(result.get("chunks", []))
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import requests
|
||||
|
||||
from backend.blocks.jina._auth import (
|
||||
JinaCredentials,
|
||||
JinaCredentialsField,
|
||||
@@ -7,6 +5,7 @@ from backend.blocks.jina._auth import (
|
||||
)
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import SchemaField
|
||||
from backend.util.request import requests
|
||||
|
||||
|
||||
class JinaEmbeddingBlock(Block):
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
from enum import Enum
|
||||
from typing import List, Literal
|
||||
|
||||
import requests
|
||||
from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials
|
||||
from pydantic import SecretStr
|
||||
|
||||
@@ -13,6 +12,7 @@ from backend.data.model import (
|
||||
SchemaField,
|
||||
SecretField,
|
||||
)
|
||||
from backend.util.request import requests
|
||||
|
||||
TEST_CREDENTIALS = APIKeyCredentials(
|
||||
id="01234567-89ab-cdef-0123-456789abcdef",
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
import time
|
||||
from typing import Literal
|
||||
|
||||
import requests
|
||||
from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials
|
||||
from pydantic import SecretStr
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import CredentialsField, CredentialsMetaInput, SchemaField
|
||||
from backend.util.request import requests
|
||||
|
||||
TEST_CREDENTIALS = APIKeyCredentials(
|
||||
id="01234567-89ab-cdef-0123-456789abcdef",
|
||||
@@ -118,7 +118,6 @@ class CreateTalkingAvatarVideoBlock(Block):
|
||||
"authorization": f"Basic {api_key.get_secret_value()}",
|
||||
}
|
||||
response = requests.post(url, json=payload, headers=headers)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def get_clip_status(self, api_key: SecretStr, clip_id: str) -> dict:
|
||||
@@ -128,7 +127,6 @@ class CreateTalkingAvatarVideoBlock(Block):
|
||||
"authorization": f"Basic {api_key.get_secret_value()}",
|
||||
}
|
||||
response = requests.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def run(
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
from typing import Any, Literal
|
||||
|
||||
import requests
|
||||
from autogpt_libs.supabase_integration_credentials_store.types import APIKeyCredentials
|
||||
from pydantic import SecretStr
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import CredentialsField, CredentialsMetaInput, SchemaField
|
||||
from backend.util.request import requests
|
||||
|
||||
TEST_CREDENTIALS = APIKeyCredentials(
|
||||
id="01234567-89ab-cdef-0123-456789abcdef",
|
||||
@@ -86,7 +86,6 @@ class UnrealTextToSpeechBlock(Block):
|
||||
}
|
||||
|
||||
response = requests.post(url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def run(
|
||||
|
||||
@@ -2,9 +2,10 @@ import time
|
||||
from typing import Optional
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import requests
|
||||
from autogpt_libs.supabase_integration_credentials_store import OAuth2Credentials
|
||||
|
||||
from backend.util.request import requests
|
||||
|
||||
from .base import BaseOAuthHandler
|
||||
|
||||
|
||||
@@ -56,13 +57,12 @@ class GitHubOAuthHandler(BaseOAuthHandler):
|
||||
"X-GitHub-Api-Version": "2022-11-28",
|
||||
}
|
||||
|
||||
response = requests.delete(
|
||||
requests.delete(
|
||||
url=self.revoke_url.format(client_id=self.client_id),
|
||||
auth=(self.client_id, self.client_secret),
|
||||
headers=headers,
|
||||
json={"access_token": credentials.access_token.get_secret_value()},
|
||||
)
|
||||
response.raise_for_status()
|
||||
return True
|
||||
|
||||
def _refresh_tokens(self, credentials: OAuth2Credentials) -> OAuth2Credentials:
|
||||
@@ -88,7 +88,6 @@ class GitHubOAuthHandler(BaseOAuthHandler):
|
||||
}
|
||||
headers = {"Accept": "application/json"}
|
||||
response = requests.post(self.token_url, data=request_body, headers=headers)
|
||||
response.raise_for_status()
|
||||
token_data: dict = response.json()
|
||||
|
||||
username = self._request_username(token_data["access_token"])
|
||||
|
||||
@@ -103,12 +103,11 @@ class GoogleOAuthHandler(BaseOAuthHandler):
|
||||
|
||||
def revoke_tokens(self, credentials: OAuth2Credentials) -> bool:
|
||||
session = AuthorizedSession(credentials)
|
||||
response = session.post(
|
||||
session.post(
|
||||
self.revoke_uri,
|
||||
params={"token": credentials.access_token.get_secret_value()},
|
||||
headers={"content-type": "application/x-www-form-urlencoded"},
|
||||
)
|
||||
response.raise_for_status()
|
||||
return True
|
||||
|
||||
def _request_email(
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
from base64 import b64encode
|
||||
from urllib.parse import urlencode
|
||||
|
||||
import requests
|
||||
from autogpt_libs.supabase_integration_credentials_store import OAuth2Credentials
|
||||
|
||||
from backend.util.request import requests
|
||||
|
||||
from .base import BaseOAuthHandler
|
||||
|
||||
|
||||
@@ -49,7 +50,6 @@ class NotionOAuthHandler(BaseOAuthHandler):
|
||||
"Accept": "application/json",
|
||||
}
|
||||
response = requests.post(self.token_url, json=request_body, headers=headers)
|
||||
response.raise_for_status()
|
||||
token_data = response.json()
|
||||
# Email is only available for non-bot users
|
||||
email = (
|
||||
|
||||
132
autogpt_platform/backend/backend/util/request.py
Normal file
132
autogpt_platform/backend/backend/util/request.py
Normal file
@@ -0,0 +1,132 @@
|
||||
import ipaddress
|
||||
import socket
|
||||
from typing import Callable
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import requests as req
|
||||
|
||||
from backend.util.settings import Config
|
||||
|
||||
# List of IP networks to block
|
||||
BLOCKED_IP_NETWORKS = [
|
||||
ipaddress.ip_network("0.0.0.0/8"), # "This" Network
|
||||
ipaddress.ip_network("10.0.0.0/8"), # Private-Use
|
||||
ipaddress.ip_network("127.0.0.0/8"), # Loopback
|
||||
ipaddress.ip_network("169.254.0.0/16"), # Link Local
|
||||
ipaddress.ip_network("172.16.0.0/12"), # Private-Use
|
||||
ipaddress.ip_network("192.168.0.0/16"), # Private-Use
|
||||
ipaddress.ip_network("224.0.0.0/4"), # Multicast
|
||||
ipaddress.ip_network("240.0.0.0/4"), # Reserved for Future Use
|
||||
]
|
||||
|
||||
|
||||
def is_ip_blocked(ip: str) -> bool:
|
||||
"""
|
||||
Checks if the IP address is in a blocked network.
|
||||
"""
|
||||
ip_addr = ipaddress.ip_address(ip)
|
||||
return any(ip_addr in network for network in BLOCKED_IP_NETWORKS)
|
||||
|
||||
|
||||
def validate_url(url: str, trusted_origins: list[str]) -> str:
|
||||
"""
|
||||
Validates the URL to prevent SSRF attacks by ensuring it does not point to a private
|
||||
or untrusted IP address, unless whitelisted.
|
||||
"""
|
||||
url = url.strip().strip("/")
|
||||
if not url.startswith(("http://", "https://")):
|
||||
url = "http://" + url
|
||||
|
||||
parsed_url = urlparse(url)
|
||||
hostname = parsed_url.hostname
|
||||
|
||||
if not hostname:
|
||||
raise ValueError(f"Invalid URL: Unable to determine hostname from {url}")
|
||||
|
||||
if any(hostname == origin for origin in trusted_origins):
|
||||
return url
|
||||
|
||||
# Resolve all IP addresses for the hostname
|
||||
ip_addresses = {result[4][0] for result in socket.getaddrinfo(hostname, None)}
|
||||
if not ip_addresses:
|
||||
raise ValueError(f"Unable to resolve IP address for {hostname}")
|
||||
|
||||
# Check if all IP addresses are global
|
||||
for ip in ip_addresses:
|
||||
if is_ip_blocked(ip):
|
||||
raise ValueError(
|
||||
f"Access to private IP address at {hostname}: {ip} is not allowed."
|
||||
)
|
||||
|
||||
return url
|
||||
|
||||
|
||||
class Requests:
|
||||
"""
|
||||
A wrapper around the requests library that validates URLs before making requests.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
trusted_origins: list[str] | None = None,
|
||||
raise_for_status: bool = True,
|
||||
extra_url_validator: Callable[[str], str] | None = None,
|
||||
extra_headers: dict[str, str] | None = None,
|
||||
):
|
||||
self.trusted_origins = []
|
||||
for url in trusted_origins or []:
|
||||
hostname = urlparse(url).hostname
|
||||
if not hostname:
|
||||
raise ValueError(f"Invalid URL: Unable to determine hostname of {url}")
|
||||
self.trusted_origins.append(hostname)
|
||||
|
||||
self.raise_for_status = raise_for_status
|
||||
self.extra_url_validator = extra_url_validator
|
||||
self.extra_headers = extra_headers
|
||||
|
||||
def request(
|
||||
self, method, url, headers=None, allow_redirects=False, *args, **kwargs
|
||||
) -> req.Response:
|
||||
if self.extra_headers is not None:
|
||||
headers = {**(headers or {}), **self.extra_headers}
|
||||
|
||||
url = validate_url(url, self.trusted_origins)
|
||||
if self.extra_url_validator is not None:
|
||||
url = self.extra_url_validator(url)
|
||||
|
||||
response = req.request(
|
||||
method,
|
||||
url,
|
||||
headers=headers,
|
||||
allow_redirects=allow_redirects,
|
||||
*args,
|
||||
**kwargs,
|
||||
)
|
||||
if self.raise_for_status:
|
||||
response.raise_for_status()
|
||||
|
||||
return response
|
||||
|
||||
def get(self, url, *args, **kwargs) -> req.Response:
|
||||
return self.request("GET", url, *args, **kwargs)
|
||||
|
||||
def post(self, url, *args, **kwargs) -> req.Response:
|
||||
return self.request("POST", url, *args, **kwargs)
|
||||
|
||||
def put(self, url, *args, **kwargs) -> req.Response:
|
||||
return self.request("PUT", url, *args, **kwargs)
|
||||
|
||||
def delete(self, url, *args, **kwargs) -> req.Response:
|
||||
return self.request("DELETE", url, *args, **kwargs)
|
||||
|
||||
def head(self, url, *args, **kwargs) -> req.Response:
|
||||
return self.request("HEAD", url, *args, **kwargs)
|
||||
|
||||
def options(self, url, *args, **kwargs) -> req.Response:
|
||||
return self.request("OPTIONS", url, *args, **kwargs)
|
||||
|
||||
def patch(self, url, *args, **kwargs) -> req.Response:
|
||||
return self.request("PATCH", url, *args, **kwargs)
|
||||
|
||||
|
||||
requests = Requests(trusted_origins=Config().trust_endpoints_for_requests)
|
||||
@@ -161,6 +161,11 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
|
||||
description="Name of the event bus",
|
||||
)
|
||||
|
||||
trust_endpoints_for_requests: List[str] = Field(
|
||||
default_factory=list,
|
||||
description="A whitelist of trusted internal endpoints for the backend to make requests to.",
|
||||
)
|
||||
|
||||
backend_cors_allow_origins: List[str] = Field(default_factory=list)
|
||||
|
||||
@field_validator("backend_cors_allow_origins")
|
||||
|
||||
21
autogpt_platform/backend/test/util/test_request.py
Normal file
21
autogpt_platform/backend/test/util/test_request.py
Normal file
@@ -0,0 +1,21 @@
|
||||
import pytest
|
||||
|
||||
from backend.util.request import validate_url
|
||||
|
||||
|
||||
def test_validate_url():
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("localhost", [])
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("192.168.1.1", [])
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("127.0.0.1", [])
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
validate_url("0.0.0.0", [])
|
||||
|
||||
validate_url("google.com", [])
|
||||
validate_url("github.com", [])
|
||||
validate_url("http://github.com", [])
|
||||
@@ -768,7 +768,7 @@ const NodeBooleanInput: FC<{
|
||||
<div className={className}>
|
||||
<div className="nodrag flex items-center">
|
||||
<Switch
|
||||
checked={value}
|
||||
defaultChecked={value}
|
||||
onCheckedChange={(v) => handleInputChange(selfKey, v)}
|
||||
/>
|
||||
<span className="ml-3">{displayName}</span>
|
||||
|
||||
Reference in New Issue
Block a user