mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
Refactored github validation code
This commit is contained in:
68
autogpt_platform/backend/backend/blocks/github/_api.py
Normal file
68
autogpt_platform/backend/backend/blocks/github/_api.py
Normal file
@@ -0,0 +1,68 @@
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import requests
|
||||
|
||||
from ._auth import GithubCredentials
|
||||
|
||||
|
||||
class GitHubAPI:
|
||||
def __init__(self, credentials: GithubCredentials):
|
||||
self.credentials = credentials
|
||||
|
||||
@staticmethod
|
||||
def _validate_github_url(url: str) -> None:
|
||||
parsed_url = urlparse(url)
|
||||
if parsed_url.netloc != "github.com":
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
@staticmethod
|
||||
def _convert_to_api_url(url: str) -> str:
|
||||
"""
|
||||
Converts a standard GitHub URL to the corresponding GitHub API URL.
|
||||
Handles repository URLs, issue URLs, pull request URLs, and more.
|
||||
"""
|
||||
GitHubAPI._validate_github_url(url)
|
||||
parsed_url = urlparse(url)
|
||||
path_parts = parsed_url.path.strip("/").split("/")
|
||||
|
||||
if len(path_parts) >= 2:
|
||||
owner, repo = path_parts[0], path_parts[1]
|
||||
api_base = f"https://api.github.com/repos/{owner}/{repo}"
|
||||
|
||||
if len(path_parts) > 2:
|
||||
additional_path = "/".join(path_parts[2:])
|
||||
api_url = f"{api_base}/{additional_path}"
|
||||
else:
|
||||
# Repository base URL
|
||||
api_url = api_base
|
||||
else:
|
||||
raise ValueError("Invalid GitHub URL format.")
|
||||
|
||||
return api_url
|
||||
|
||||
def _get_headers(self) -> dict:
|
||||
return {
|
||||
"Authorization": self.credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
def get(self, url: str, **kwargs) -> requests.Response:
|
||||
api_url = self._convert_to_api_url(url)
|
||||
headers = self._get_headers()
|
||||
response = requests.get(api_url, headers=headers, **kwargs)
|
||||
response.raise_for_status()
|
||||
return response
|
||||
|
||||
def post(self, url: str, json=None, **kwargs) -> requests.Response:
|
||||
api_url = self._convert_to_api_url(url)
|
||||
headers = self._get_headers()
|
||||
response = requests.post(api_url, headers=headers, json=json, **kwargs)
|
||||
response.raise_for_status()
|
||||
return response
|
||||
|
||||
def delete(self, url: str, json=None, **kwargs) -> requests.Response:
|
||||
api_url = self._convert_to_api_url(url)
|
||||
headers = self._get_headers()
|
||||
response = requests.delete(api_url, headers=headers, json=json, **kwargs)
|
||||
response.raise_for_status()
|
||||
return response
|
||||
@@ -1,11 +1,11 @@
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import requests
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import SchemaField
|
||||
|
||||
from ._api import GitHubAPI
|
||||
from ._auth import (
|
||||
TEST_CREDENTIALS,
|
||||
TEST_CREDENTIALS_INPUT,
|
||||
@@ -68,31 +68,10 @@ class GithubCommentBlock(Block):
|
||||
def post_comment(
|
||||
credentials: GithubCredentials, issue_url: str, body_text: str
|
||||
) -> tuple[int, str]:
|
||||
|
||||
if is_github_url(issue_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
if "/pull/" in issue_url:
|
||||
api_url = (
|
||||
issue_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/issues/"
|
||||
)
|
||||
+ "/comments"
|
||||
)
|
||||
else:
|
||||
api_url = (
|
||||
issue_url.replace("github.com", "api.github.com/repos") + "/comments"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = GitHubAPI(credentials)
|
||||
data = {"body": body_text}
|
||||
|
||||
response = requests.post(api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
comments_url = issue_url + "/comments"
|
||||
response = api.post(comments_url, json=data)
|
||||
comment = response.json()
|
||||
return comment["id"], comment["html_url"]
|
||||
|
||||
@@ -166,19 +145,10 @@ class GithubMakeIssueBlock(Block):
|
||||
def create_issue(
|
||||
credentials: GithubCredentials, repo_url: str, title: str, body: str
|
||||
) -> tuple[int, str]:
|
||||
if is_github_url(repo_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
api_url = repo_url.replace("github.com", "api.github.com/repos") + "/issues"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = GitHubAPI(credentials)
|
||||
data = {"title": title, "body": body}
|
||||
|
||||
response = requests.post(api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
issues_url = repo_url + "/issues"
|
||||
response = api.post(issues_url, json=data)
|
||||
issue = response.json()
|
||||
return issue["number"], issue["html_url"]
|
||||
|
||||
@@ -245,25 +215,12 @@ class GithubReadIssueBlock(Block):
|
||||
def read_issue(
|
||||
credentials: GithubCredentials, issue_url: str
|
||||
) -> tuple[str, str, str]:
|
||||
|
||||
if not is_github_url(issue_url):
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
api_url = issue_url.replace("github.com", "api.github.com/repos")
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = GitHubAPI(credentials)
|
||||
response = api.get(issue_url)
|
||||
data = response.json()
|
||||
title = data.get("title", "No title found")
|
||||
body = data.get("body", "No body content found")
|
||||
user = data.get("user", {}).get("login", "No user found")
|
||||
|
||||
return title, body, user
|
||||
|
||||
def run(
|
||||
@@ -335,24 +292,13 @@ class GithubListIssuesBlock(Block):
|
||||
def list_issues(
|
||||
credentials: GithubCredentials, repo_url: str
|
||||
) -> list[Output.IssueItem]:
|
||||
|
||||
if not is_github_url(repo_url):
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
api_url = repo_url.replace("github.com", "api.github.com/repos") + "/issues"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = GitHubAPI(credentials)
|
||||
issues_url = repo_url + "/issues"
|
||||
response = api.get(issues_url)
|
||||
data = response.json()
|
||||
issues: list[GithubListIssuesBlock.Output.IssueItem] = [
|
||||
{"title": issue["title"], "url": issue["html_url"]} for issue in data
|
||||
]
|
||||
|
||||
return issues
|
||||
|
||||
def run(
|
||||
@@ -406,32 +352,11 @@ class GithubAddLabelBlock(Block):
|
||||
|
||||
@staticmethod
|
||||
def add_label(credentials: GithubCredentials, issue_url: str, label: str) -> str:
|
||||
|
||||
if is_github_url(issue_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
# Convert the provided GitHub URL to the API URL
|
||||
if "/pull/" in issue_url:
|
||||
api_url = (
|
||||
issue_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/issues/"
|
||||
)
|
||||
+ "/labels"
|
||||
)
|
||||
else:
|
||||
api_url = (
|
||||
issue_url.replace("github.com", "api.github.com/repos") + "/labels"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = GitHubAPI(credentials)
|
||||
data = {"labels": [label]}
|
||||
|
||||
response = requests.post(api_url, headers=headers, json=data)
|
||||
labels_url = issue_url + "/labels"
|
||||
response = api.post(labels_url, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
return "Label added successfully"
|
||||
|
||||
def run(
|
||||
@@ -488,34 +413,10 @@ class GithubRemoveLabelBlock(Block):
|
||||
|
||||
@staticmethod
|
||||
def remove_label(credentials: GithubCredentials, issue_url: str, label: str) -> str:
|
||||
if is_github_url(issue_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
# Convert the provided GitHub URL to the API URL
|
||||
if "/pull/" in issue_url:
|
||||
api_url = (
|
||||
issue_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/issues/"
|
||||
)
|
||||
+ f"/labels/{label}"
|
||||
)
|
||||
else:
|
||||
api_url = (
|
||||
issue_url.replace("github.com", "api.github.com/repos")
|
||||
+ f"/labels/{label}"
|
||||
)
|
||||
|
||||
# Log the constructed API URL for debugging
|
||||
print(f"Constructed API URL: {api_url}")
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.delete(api_url, headers=headers)
|
||||
api = GitHubAPI(credentials)
|
||||
label_url = issue_url + f"/labels/{label}"
|
||||
response = api.delete(label_url)
|
||||
response.raise_for_status()
|
||||
|
||||
return "Label removed successfully"
|
||||
|
||||
def run(
|
||||
@@ -578,26 +479,11 @@ class GithubAssignIssueBlock(Block):
|
||||
issue_url: str,
|
||||
assignee: str,
|
||||
) -> str:
|
||||
if is_github_url(issue_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
# Extracting repo path and issue number from the issue URL
|
||||
repo_path, issue_number = issue_url.replace("https://github.com/", "").split(
|
||||
"/issues/"
|
||||
)
|
||||
api_url = (
|
||||
f"https://api.github.com/repos/{repo_path}/issues/{issue_number}/assignees"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = GitHubAPI(credentials)
|
||||
assignees_url = issue_url + "/assignees"
|
||||
data = {"assignees": [assignee]}
|
||||
|
||||
response = requests.post(api_url, headers=headers, json=data)
|
||||
response = api.post(assignees_url, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
return "Issue assigned successfully"
|
||||
|
||||
def run(
|
||||
@@ -660,26 +546,11 @@ class GithubUnassignIssueBlock(Block):
|
||||
issue_url: str,
|
||||
assignee: str,
|
||||
) -> str:
|
||||
if is_github_url(issue_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
# Extracting repo path and issue number from the issue URL
|
||||
repo_path, issue_number = issue_url.replace("https://github.com/", "").split(
|
||||
"/issues/"
|
||||
)
|
||||
api_url = (
|
||||
f"https://api.github.com/repos/{repo_path}/issues/{issue_number}/assignees"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = GitHubAPI(credentials)
|
||||
assignees_url = issue_url + "/assignees"
|
||||
data = {"assignees": [assignee]}
|
||||
|
||||
response = requests.delete(api_url, headers=headers, json=data)
|
||||
response = api.delete(assignees_url, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
return "Issue unassigned successfully"
|
||||
|
||||
def run(
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import requests
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import SchemaField
|
||||
|
||||
from ._api import GitHubAPI
|
||||
from ._auth import (
|
||||
TEST_CREDENTIALS,
|
||||
TEST_CREDENTIALS_INPUT,
|
||||
@@ -15,10 +13,6 @@ from ._auth import (
|
||||
)
|
||||
|
||||
|
||||
def is_github_url(url: str) -> bool:
|
||||
return urlparse(url).netloc == "github.com"
|
||||
|
||||
|
||||
class GithubListPullRequestsBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: GithubCredentialsInput = GithubCredentialsField("repo")
|
||||
@@ -70,23 +64,13 @@ class GithubListPullRequestsBlock(Block):
|
||||
|
||||
@staticmethod
|
||||
def list_prs(credentials: GithubCredentials, repo_url: str) -> list[Output.PRItem]:
|
||||
if is_github_url(repo_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
api_url = repo_url.replace("github.com", "api.github.com/repos") + "/pulls"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = GitHubAPI(credentials)
|
||||
pulls_url = repo_url + "/pulls"
|
||||
response = api.get(pulls_url)
|
||||
data = response.json()
|
||||
pull_requests: list[GithubListPullRequestsBlock.Output.PRItem] = [
|
||||
{"title": pr["title"], "url": pr["html_url"]} for pr in data
|
||||
]
|
||||
|
||||
return pull_requests
|
||||
|
||||
def run(
|
||||
@@ -119,7 +103,11 @@ class GithubMakePullRequestBlock(Block):
|
||||
placeholder="Enter the pull request body",
|
||||
)
|
||||
head: str = SchemaField(
|
||||
description="The name of the branch where your changes are implemented. For cross-repository pull requests in the same network, namespace head with a user like this: username:branch.",
|
||||
description=(
|
||||
"The name of the branch where your changes are implemented. "
|
||||
"For cross-repository pull requests in the same network, "
|
||||
"namespace head with a user like this: username:branch."
|
||||
),
|
||||
placeholder="Enter the head branch",
|
||||
)
|
||||
base: str = SchemaField(
|
||||
@@ -171,19 +159,10 @@ class GithubMakePullRequestBlock(Block):
|
||||
head: str,
|
||||
base: str,
|
||||
) -> tuple[int, str]:
|
||||
if is_github_url(repo_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
api_url = f"https://api.github.com/repos/{repo_path}/pulls"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = GitHubAPI(credentials)
|
||||
pulls_url = repo_url + "/pulls"
|
||||
data = {"title": title, "body": body, "head": head, "base": base}
|
||||
|
||||
response = requests.post(api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
response = api.post(pulls_url, json=data)
|
||||
pr_data = response.json()
|
||||
return pr_data["number"], pr_data["html_url"]
|
||||
|
||||
@@ -205,13 +184,8 @@ class GithubMakePullRequestBlock(Block):
|
||||
)
|
||||
yield "number", number
|
||||
yield "url", url
|
||||
except requests.exceptions.HTTPError as http_err:
|
||||
if http_err.response.status_code == 422:
|
||||
error_details = http_err.response.json()
|
||||
error_message = error_details.get("message", "Unknown error")
|
||||
else:
|
||||
error_message = str(http_err)
|
||||
raise RuntimeError(f"Failed to create pull request: {error_message}")
|
||||
except Exception as e:
|
||||
yield "error", str(e)
|
||||
|
||||
|
||||
class GithubReadPullRequestBlock(Block):
|
||||
@@ -266,45 +240,21 @@ class GithubReadPullRequestBlock(Block):
|
||||
|
||||
@staticmethod
|
||||
def read_pr(credentials: GithubCredentials, pr_url: str) -> tuple[str, str, str]:
|
||||
if is_github_url(pr_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
api_url = pr_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/issues/"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = GitHubAPI(credentials)
|
||||
# Adjust the URL to access the issue endpoint for PR metadata
|
||||
issue_url = pr_url.replace("/pull/", "/issues/")
|
||||
response = api.get(issue_url)
|
||||
data = response.json()
|
||||
title = data.get("title", "No title found")
|
||||
body = data.get("body", "No body content found")
|
||||
author = data.get("user", {}).get("login", "No user found")
|
||||
|
||||
return title, body, author
|
||||
|
||||
@staticmethod
|
||||
def read_pr_changes(credentials: GithubCredentials, pr_url: str) -> str:
|
||||
api_url = (
|
||||
pr_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/pulls/"
|
||||
)
|
||||
+ "/files"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = GitHubAPI(credentials)
|
||||
files_url = pr_url + "/files"
|
||||
response = api.get(files_url)
|
||||
files = response.json()
|
||||
changes = []
|
||||
for file in files:
|
||||
@@ -312,7 +262,6 @@ class GithubReadPullRequestBlock(Block):
|
||||
patch = file.get("patch")
|
||||
if filename and patch:
|
||||
changes.append(f"File: {filename}\n{patch}")
|
||||
|
||||
return "\n\n".join(changes)
|
||||
|
||||
def run(
|
||||
@@ -381,26 +330,10 @@ class GithubAssignPRReviewerBlock(Block):
|
||||
def assign_reviewer(
|
||||
credentials: GithubCredentials, pr_url: str, reviewer: str
|
||||
) -> str:
|
||||
if is_github_url(pr_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
# Convert the PR URL to the appropriate API endpoint
|
||||
api_url = (
|
||||
pr_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/pulls/"
|
||||
)
|
||||
+ "/requested_reviewers"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = GitHubAPI(credentials)
|
||||
reviewers_url = pr_url + "/requested_reviewers"
|
||||
data = {"reviewers": [reviewer]}
|
||||
|
||||
response = requests.post(api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
api.post(reviewers_url, json=data)
|
||||
return "Reviewer assigned successfully"
|
||||
|
||||
def run(
|
||||
@@ -417,17 +350,8 @@ class GithubAssignPRReviewerBlock(Block):
|
||||
input_data.reviewer,
|
||||
)
|
||||
yield "status", status
|
||||
except requests.exceptions.HTTPError as http_err:
|
||||
if http_err.response.status_code == 422:
|
||||
error_msg = (
|
||||
"Failed to assign reviewer: "
|
||||
f"The reviewer '{input_data.reviewer}' may not have permission "
|
||||
"or the pull request is not in a valid state. "
|
||||
f"Detailed error: {http_err.response.text}"
|
||||
)
|
||||
else:
|
||||
error_msg = f"HTTP error: {http_err} - {http_err.response.text}"
|
||||
raise RuntimeError(error_msg)
|
||||
except Exception as e:
|
||||
yield "error", str(e)
|
||||
|
||||
|
||||
class GithubUnassignPRReviewerBlock(Block):
|
||||
@@ -473,24 +397,10 @@ class GithubUnassignPRReviewerBlock(Block):
|
||||
def unassign_reviewer(
|
||||
credentials: GithubCredentials, pr_url: str, reviewer: str
|
||||
) -> str:
|
||||
if is_github_url(pr_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
api_url = (
|
||||
pr_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/pulls/"
|
||||
)
|
||||
+ "/requested_reviewers"
|
||||
)
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
api = GitHubAPI(credentials)
|
||||
reviewers_url = pr_url + "/requested_reviewers"
|
||||
data = {"reviewers": [reviewer]}
|
||||
|
||||
response = requests.delete(api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
api.delete(reviewers_url, json=data)
|
||||
return "Reviewer unassigned successfully"
|
||||
|
||||
def run(
|
||||
@@ -500,12 +410,15 @@ class GithubUnassignPRReviewerBlock(Block):
|
||||
credentials: GithubCredentials,
|
||||
**kwargs,
|
||||
) -> BlockOutput:
|
||||
status = self.unassign_reviewer(
|
||||
credentials,
|
||||
input_data.pr_url,
|
||||
input_data.reviewer,
|
||||
)
|
||||
yield "status", status
|
||||
try:
|
||||
status = self.unassign_reviewer(
|
||||
credentials,
|
||||
input_data.pr_url,
|
||||
input_data.reviewer,
|
||||
)
|
||||
yield "status", status
|
||||
except Exception as e:
|
||||
yield "error", str(e)
|
||||
|
||||
|
||||
class GithubListPRReviewersBlock(Block):
|
||||
@@ -564,29 +477,14 @@ class GithubListPRReviewersBlock(Block):
|
||||
def list_reviewers(
|
||||
credentials: GithubCredentials, pr_url: str
|
||||
) -> list[Output.ReviewerItem]:
|
||||
if is_github_url(pr_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
api_url = (
|
||||
pr_url.replace("github.com", "api.github.com/repos").replace(
|
||||
"/pull/", "/pulls/"
|
||||
)
|
||||
+ "/requested_reviewers"
|
||||
)
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = GitHubAPI(credentials)
|
||||
reviewers_url = pr_url + "/requested_reviewers"
|
||||
response = api.get(reviewers_url)
|
||||
data = response.json()
|
||||
reviewers: list[GithubListPRReviewersBlock.Output.ReviewerItem] = [
|
||||
{"username": reviewer["login"], "url": reviewer["html_url"]}
|
||||
for reviewer in data.get("users", [])
|
||||
]
|
||||
|
||||
return reviewers
|
||||
|
||||
def run(
|
||||
|
||||
@@ -1,12 +1,11 @@
|
||||
import base64
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import requests
|
||||
from typing_extensions import TypedDict
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import SchemaField
|
||||
|
||||
from ._api import GitHubAPI
|
||||
from ._auth import (
|
||||
TEST_CREDENTIALS,
|
||||
TEST_CREDENTIALS_INPUT,
|
||||
@@ -16,10 +15,6 @@ from ._auth import (
|
||||
)
|
||||
|
||||
|
||||
def is_github_url(url: str) -> bool:
|
||||
return urlparse(url).netloc == "github.com"
|
||||
|
||||
|
||||
class GithubListTagsBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
credentials: GithubCredentialsInput = GithubCredentialsField("repo")
|
||||
@@ -73,20 +68,11 @@ class GithubListTagsBlock(Block):
|
||||
def list_tags(
|
||||
credentials: GithubCredentials, repo_url: str
|
||||
) -> list[Output.TagItem]:
|
||||
if is_github_url(repo_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
api_url = f"https://api.github.com/repos/{repo_path}/tags"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = GitHubAPI(credentials)
|
||||
tags_url = repo_url + "/tags"
|
||||
response = api.get(tags_url)
|
||||
data = response.json()
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
tags: list[GithubListTagsBlock.Output.TagItem] = [
|
||||
{
|
||||
"name": tag["name"],
|
||||
@@ -94,7 +80,6 @@ class GithubListTagsBlock(Block):
|
||||
}
|
||||
for tag in data
|
||||
]
|
||||
|
||||
return tags
|
||||
|
||||
def run(
|
||||
@@ -165,23 +150,18 @@ class GithubListBranchesBlock(Block):
|
||||
def list_branches(
|
||||
credentials: GithubCredentials, repo_url: str
|
||||
) -> list[Output.BranchItem]:
|
||||
if is_github_url(repo_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
|
||||
api_url = repo_url.replace("github.com", "api.github.com/repos") + "/branches"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = GitHubAPI(credentials)
|
||||
branches_url = repo_url + "/branches"
|
||||
response = api.get(branches_url)
|
||||
data = response.json()
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
branches: list[GithubListBranchesBlock.Output.BranchItem] = [
|
||||
{"name": branch["name"], "url": branch["commit"]["url"]} for branch in data
|
||||
{
|
||||
"name": branch["name"],
|
||||
"url": f"https://github.com/{repo_path}/tree/{branch['name']}",
|
||||
}
|
||||
for branch in data
|
||||
]
|
||||
|
||||
return branches
|
||||
|
||||
def run(
|
||||
@@ -257,8 +237,8 @@ class GithubListDiscussionsBlock(Block):
|
||||
def list_discussions(
|
||||
credentials: GithubCredentials, repo_url: str, num_discussions: int
|
||||
) -> list[Output.DiscussionItem]:
|
||||
if is_github_url(repo_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
api = GitHubAPI(credentials)
|
||||
# GitHub GraphQL API endpoint is different; we'll use api.post with custom URL
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
owner, repo = repo_path.split("/")
|
||||
query = """
|
||||
@@ -274,24 +254,15 @@ class GithubListDiscussionsBlock(Block):
|
||||
}
|
||||
"""
|
||||
variables = {"owner": owner, "repo": repo, "num": num_discussions}
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.post(
|
||||
response = api.post(
|
||||
"https://api.github.com/graphql",
|
||||
json={"query": query, "variables": variables},
|
||||
headers=headers,
|
||||
)
|
||||
response.raise_for_status()
|
||||
|
||||
data = response.json()
|
||||
discussions: list[GithubListDiscussionsBlock.Output.DiscussionItem] = [
|
||||
{"title": discussion["title"], "url": discussion["url"]}
|
||||
for discussion in data["data"]["repository"]["discussions"]["nodes"]
|
||||
]
|
||||
|
||||
return discussions
|
||||
|
||||
def run(
|
||||
@@ -361,23 +332,13 @@ class GithubListReleasesBlock(Block):
|
||||
def list_releases(
|
||||
credentials: GithubCredentials, repo_url: str
|
||||
) -> list[Output.ReleaseItem]:
|
||||
if is_github_url(repo_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
api_url = f"https://api.github.com/repos/{repo_path}/releases"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = GitHubAPI(credentials)
|
||||
releases_url = repo_url + "/releases"
|
||||
response = api.get(releases_url)
|
||||
data = response.json()
|
||||
releases: list[GithubListReleasesBlock.Output.ReleaseItem] = [
|
||||
{"name": release["name"], "url": release["html_url"]} for release in data
|
||||
]
|
||||
|
||||
return releases
|
||||
|
||||
def run(
|
||||
@@ -447,18 +408,9 @@ class GithubReadFileBlock(Block):
|
||||
def read_file(
|
||||
credentials: GithubCredentials, repo_url: str, file_path: str, branch: str
|
||||
) -> tuple[str, int]:
|
||||
if is_github_url(repo_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
api_url = f"https://api.github.com/repos/{repo_path}/contents/{file_path}?ref={branch}"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = GitHubAPI(credentials)
|
||||
content_url = repo_url + f"/contents/{file_path}?ref={branch}"
|
||||
response = api.get(content_url)
|
||||
content = response.json()
|
||||
|
||||
if isinstance(content, list):
|
||||
@@ -566,48 +518,33 @@ class GithubReadFolderBlock(Block):
|
||||
def read_folder(
|
||||
credentials: GithubCredentials, repo_url: str, folder_path: str, branch: str
|
||||
) -> tuple[list[Output.FileEntry], list[Output.DirEntry]]:
|
||||
if is_github_url(repo_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
api_url = f"https://api.github.com/repos/{repo_path}/contents/{folder_path}?ref={branch}"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = GitHubAPI(credentials)
|
||||
contents_url = repo_url + f"/contents/{folder_path}?ref={branch}"
|
||||
response = api.get(contents_url)
|
||||
content = response.json()
|
||||
|
||||
if isinstance(content, list):
|
||||
# Multiple entries of different types exist at this path
|
||||
if not (dir := next((d for d in content if d["type"] == "dir"), None)):
|
||||
raise TypeError("Not a folder")
|
||||
content = dir
|
||||
|
||||
if content["type"] != "dir":
|
||||
if not isinstance(content, list):
|
||||
raise TypeError("Not a folder")
|
||||
|
||||
return (
|
||||
[
|
||||
GithubReadFolderBlock.Output.FileEntry(
|
||||
name=entry["name"],
|
||||
path=entry["path"],
|
||||
size=entry["size"],
|
||||
)
|
||||
for entry in content["entries"]
|
||||
if entry["type"] == "file"
|
||||
],
|
||||
[
|
||||
GithubReadFolderBlock.Output.DirEntry(
|
||||
name=entry["name"],
|
||||
path=entry["path"],
|
||||
)
|
||||
for entry in content["entries"]
|
||||
if entry["type"] == "dir"
|
||||
],
|
||||
)
|
||||
files = [
|
||||
GithubReadFolderBlock.Output.FileEntry(
|
||||
name=entry["name"],
|
||||
path=entry["path"],
|
||||
size=entry["size"],
|
||||
)
|
||||
for entry in content
|
||||
if entry["type"] == "file"
|
||||
]
|
||||
dirs = [
|
||||
GithubReadFolderBlock.Output.DirEntry(
|
||||
name=entry["name"],
|
||||
path=entry["path"],
|
||||
)
|
||||
for entry in content
|
||||
if entry["type"] == "dir"
|
||||
]
|
||||
|
||||
return files, dirs
|
||||
|
||||
def run(
|
||||
self,
|
||||
@@ -675,28 +612,16 @@ class GithubMakeBranchBlock(Block):
|
||||
new_branch: str,
|
||||
source_branch: str,
|
||||
) -> str:
|
||||
if is_github_url(repo_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
ref_api_url = (
|
||||
f"https://api.github.com/repos/{repo_path}/git/refs/heads/{source_branch}"
|
||||
)
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.get(ref_api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = GitHubAPI(credentials)
|
||||
# Get the SHA of the source branch
|
||||
ref_url = repo_url + f"/git/refs/heads/{source_branch}"
|
||||
response = api.get(ref_url)
|
||||
sha = response.json()["object"]["sha"]
|
||||
|
||||
create_branch_api_url = f"https://api.github.com/repos/{repo_path}/git/refs"
|
||||
# Create the new branch
|
||||
create_ref_url = repo_url + "/git/refs"
|
||||
data = {"ref": f"refs/heads/{new_branch}", "sha": sha}
|
||||
|
||||
response = requests.post(create_branch_api_url, headers=headers, json=data)
|
||||
response.raise_for_status()
|
||||
|
||||
response = api.post(create_ref_url, json=data)
|
||||
return "Branch created successfully"
|
||||
|
||||
def run(
|
||||
@@ -756,18 +681,9 @@ class GithubDeleteBranchBlock(Block):
|
||||
def delete_branch(
|
||||
credentials: GithubCredentials, repo_url: str, branch: str
|
||||
) -> str:
|
||||
if is_github_url(repo_url) is False:
|
||||
raise ValueError("The input URL must be a valid GitHub URL.")
|
||||
repo_path = repo_url.replace("https://github.com/", "")
|
||||
api_url = f"https://api.github.com/repos/{repo_path}/git/refs/heads/{branch}"
|
||||
headers = {
|
||||
"Authorization": credentials.bearer(),
|
||||
"Accept": "application/vnd.github.v3+json",
|
||||
}
|
||||
|
||||
response = requests.delete(api_url, headers=headers)
|
||||
response.raise_for_status()
|
||||
|
||||
api = GitHubAPI(credentials)
|
||||
ref_url = repo_url + f"/git/refs/heads/{branch}"
|
||||
api.delete(ref_url)
|
||||
return "Branch deleted successfully"
|
||||
|
||||
def run(
|
||||
|
||||
@@ -22,7 +22,7 @@ class GetRequest:
|
||||
return response.json() if json else response.text
|
||||
|
||||
@classmethod
|
||||
def validate_url(self, url: str) -> str:
|
||||
def validate_url(cls, url: str) -> str:
|
||||
"""
|
||||
To avoid SSRF attacks, the URL should not be a private IP address
|
||||
unless it is whitelisted in TRUST_ENDPOINTS_FOR_REQUESTS config.
|
||||
|
||||
Reference in New Issue
Block a user