mirror of
https://github.com/Pythagora-io/gpt-pilot.git
synced 2026-01-09 21:27:53 -05:00
add git versioning of projects
This commit is contained in:
140
core/agents/git.py
Normal file
140
core/agents/git.py
Normal file
@@ -0,0 +1,140 @@
|
||||
import os
|
||||
|
||||
from core.agents.convo import AgentConvo
|
||||
from core.ui.base import pythagora_source
|
||||
|
||||
|
||||
class GitMixin:
|
||||
"""
|
||||
Mixin class for git commands
|
||||
"""
|
||||
|
||||
async def check_git_installed(self) -> bool:
|
||||
"""Check if git is installed on the system."""
|
||||
status_code, _, _ = await self.process_manager.run_command("git --version", show_output=False)
|
||||
git_available = status_code == 0
|
||||
self.state_manager.git_available = git_available
|
||||
return git_available
|
||||
|
||||
async def is_git_initialized(self) -> bool:
|
||||
"""Check if git is initialized in the workspace."""
|
||||
workspace_path = self.state_manager.get_full_project_root()
|
||||
|
||||
status_code, _, _ = await self.process_manager.run_command(
|
||||
"git rev-parse --git-dir --is-inside-git-dir",
|
||||
cwd=workspace_path,
|
||||
show_output=False,
|
||||
)
|
||||
# Will return status code 0 only if .git exists in the current directory
|
||||
git_used = status_code == 0 and os.path.exists(os.path.join(workspace_path, ".git"))
|
||||
self.state_manager.git_used = git_used
|
||||
return git_used
|
||||
|
||||
async def init_git_if_needed(self) -> bool:
|
||||
"""
|
||||
Initialize git repository if it hasn't been initialized yet.
|
||||
Returns True if initialization was needed and successful.
|
||||
"""
|
||||
|
||||
workspace_path = self.state_manager.get_full_project_root()
|
||||
if await self.is_git_initialized():
|
||||
return False
|
||||
|
||||
answer = await self.ui.ask_question(
|
||||
"Git is not initialized for this project. Do you want to initialize it now?",
|
||||
buttons={"yes": "Yes", "no": "No"},
|
||||
default="yes",
|
||||
buttons_only=True,
|
||||
source=pythagora_source,
|
||||
)
|
||||
|
||||
if answer.button == "no":
|
||||
return False
|
||||
else:
|
||||
status_code, _, stderr = await self.process_manager.run_command("git init", cwd=workspace_path)
|
||||
if status_code != 0:
|
||||
raise RuntimeError(f"Failed to initialize git repository: {stderr}")
|
||||
|
||||
# Create initial commit if there are files
|
||||
if self.current_state.files:
|
||||
# Stage all files
|
||||
status_code, _, stderr = await self.process_manager.run_command(
|
||||
"git add .",
|
||||
cwd=workspace_path,
|
||||
)
|
||||
if status_code != 0:
|
||||
raise RuntimeError(f"Failed to stage files: {stderr}")
|
||||
|
||||
# Create initial commit
|
||||
status_code, _, stderr = await self.process_manager.run_command(
|
||||
'git commit -m "initial commit"', cwd=workspace_path
|
||||
)
|
||||
if status_code != 0:
|
||||
raise RuntimeError(f"Failed to create initial commit: {stderr}")
|
||||
|
||||
self.state_manager.git_used = True
|
||||
return True
|
||||
|
||||
async def git_commit(self) -> None:
|
||||
"""
|
||||
Create a git commit with the specified message.
|
||||
Raises RuntimeError if the commit fails.
|
||||
"""
|
||||
workspace_path = self.state_manager.get_full_project_root()
|
||||
|
||||
answer = await self.ui.ask_question(
|
||||
"Do you want to create new git commit?",
|
||||
buttons={"yes": "Yes", "no": "No"},
|
||||
default="yes",
|
||||
buttons_only=True,
|
||||
source=pythagora_source,
|
||||
)
|
||||
|
||||
if answer.button == "no":
|
||||
return
|
||||
|
||||
# Stage all changes
|
||||
status_code, _, stderr = await self.process_manager.run_command("git add .", cwd=workspace_path)
|
||||
if status_code != 0:
|
||||
raise RuntimeError(f"Failed to stage changes: {stderr}")
|
||||
|
||||
# Get git diff
|
||||
status_code, git_diff, stderr = await self.process_manager.run_command(
|
||||
"git diff --cached || git diff",
|
||||
cwd=workspace_path,
|
||||
show_output=False,
|
||||
)
|
||||
if status_code != 0:
|
||||
raise RuntimeError(f"Failed to create initial commit: {stderr}")
|
||||
|
||||
llm = self.get_llm()
|
||||
convo = AgentConvo(self).template(
|
||||
"commit",
|
||||
git_diff=git_diff,
|
||||
)
|
||||
commit_message: str = await llm(convo)
|
||||
|
||||
answer = await self.ui.ask_question(
|
||||
f"Do you accept this 'git commit' message? Here is suggested message: '{commit_message}'",
|
||||
buttons={"yes": "Yes", "edit": "Edit", "no": "No, I don't want to commit changes."},
|
||||
default="yes",
|
||||
buttons_only=True,
|
||||
source=pythagora_source,
|
||||
)
|
||||
|
||||
if answer.button == "no":
|
||||
return
|
||||
elif answer.button == "edit":
|
||||
user_message = await self.ui.ask_question(
|
||||
"Please enter the commit message",
|
||||
source=pythagora_source,
|
||||
initial_text=commit_message,
|
||||
)
|
||||
commit_message = user_message.text
|
||||
|
||||
# Create commit
|
||||
status_code, _, stderr = await self.process_manager.run_command(
|
||||
f'git commit -m "{commit_message}"', cwd=workspace_path
|
||||
)
|
||||
if status_code != 0:
|
||||
raise RuntimeError(f"Failed to create commit: {stderr}")
|
||||
@@ -9,6 +9,7 @@ from core.agents.developer import Developer
|
||||
from core.agents.error_handler import ErrorHandler
|
||||
from core.agents.executor import Executor
|
||||
from core.agents.external_docs import ExternalDocumentation
|
||||
from core.agents.git import GitMixin
|
||||
from core.agents.human_input import HumanInput
|
||||
from core.agents.importer import Importer
|
||||
from core.agents.legacy_handler import LegacyHandler
|
||||
@@ -27,7 +28,7 @@ from core.ui.base import ProjectStage
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
class Orchestrator(BaseAgent):
|
||||
class Orchestrator(BaseAgent, GitMixin):
|
||||
"""
|
||||
Main agent that controls the flow of the process.
|
||||
|
||||
@@ -52,6 +53,8 @@ class Orchestrator(BaseAgent):
|
||||
self.executor = Executor(self.state_manager, self.ui)
|
||||
self.process_manager = self.executor.process_manager
|
||||
# self.chat = Chat() TODO
|
||||
if await self.check_git_installed():
|
||||
await self.init_git_if_needed()
|
||||
|
||||
await self.init_ui()
|
||||
await self.offline_changes_check()
|
||||
@@ -244,7 +247,7 @@ class Orchestrator(BaseAgent):
|
||||
return TechnicalWriter(self.state_manager, self.ui)
|
||||
elif current_task_status in [TaskStatus.DOCUMENTED, TaskStatus.SKIPPED]:
|
||||
# Task is fully done or skipped, call TaskCompleter to mark it as completed
|
||||
return TaskCompleter(self.state_manager, self.ui)
|
||||
return TaskCompleter(self.state_manager, self.ui, process_manager=self.process_manager)
|
||||
|
||||
if not state.steps and not state.iterations:
|
||||
# Ask the Developer to break down current task into actionable steps
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from core.agents.base import BaseAgent
|
||||
from core.agents.git import GitMixin
|
||||
from core.agents.response import AgentResponse
|
||||
from core.log import get_logger
|
||||
from core.telemetry import telemetry
|
||||
@@ -6,11 +7,14 @@ from core.telemetry import telemetry
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
class TaskCompleter(BaseAgent):
|
||||
class TaskCompleter(BaseAgent, GitMixin):
|
||||
agent_type = "pythagora"
|
||||
display_name = "Pythagora"
|
||||
|
||||
async def run(self) -> AgentResponse:
|
||||
if self.state_manager.git_available and self.state_manager.git_used:
|
||||
await self.git_commit()
|
||||
|
||||
current_task_index1 = self.current_state.tasks.index(self.current_state.current_task) + 1
|
||||
self.next_state.action = f"Task #{current_task_index1} complete"
|
||||
self.next_state.complete_task()
|
||||
|
||||
@@ -225,6 +225,7 @@ class ProcessManager:
|
||||
cwd: str = ".",
|
||||
env: Optional[dict[str, str]] = None,
|
||||
timeout: float = MAX_COMMAND_TIMEOUT,
|
||||
show_output: Optional[bool] = True,
|
||||
) -> tuple[Optional[int], str, str]:
|
||||
"""
|
||||
Run command and wait for it to finish.
|
||||
@@ -236,6 +237,7 @@ class ProcessManager:
|
||||
:param cwd: Working directory.
|
||||
:param env: Environment variables.
|
||||
:param timeout: Timeout in seconds.
|
||||
:param show_output: Show output in the ui.
|
||||
:return: Tuple of (status code, stdout, stderr).
|
||||
"""
|
||||
timeout = min(timeout, MAX_COMMAND_TIMEOUT)
|
||||
@@ -245,7 +247,7 @@ class ProcessManager:
|
||||
t0 = time.time()
|
||||
while process.is_running and (time.time() - t0) < timeout:
|
||||
out, err = await process.read_output(BUSY_WAIT_INTERVAL)
|
||||
if self.output_handler and (out or err):
|
||||
if self.output_handler and (out or err) and show_output:
|
||||
await self.output_handler(out, err)
|
||||
|
||||
if process.is_running:
|
||||
@@ -256,7 +258,7 @@ class ProcessManager:
|
||||
await process.wait()
|
||||
|
||||
out, err = await process.read_output()
|
||||
if self.output_handler and (out or err):
|
||||
if self.output_handler and (out or err) and show_output:
|
||||
await self.output_handler(out, err)
|
||||
|
||||
if terminated:
|
||||
|
||||
7
core/prompts/pythagora/commit.prompt
Normal file
7
core/prompts/pythagora/commit.prompt
Normal file
@@ -0,0 +1,7 @@
|
||||
You are working on an app called "{{ state.branch.project.name }}" and you need to generate commit message for next "git commit" command.
|
||||
|
||||
Here are the changes that were made from last commit:
|
||||
{{ git_diff }}
|
||||
|
||||
Respond ONLY with the commit message that you would use for the next "git commit" command, nothing else. Do not use quotes, backticks or anything else, just plain text.
|
||||
Commit message should be short and descriptive of the changes made since last commit.
|
||||
@@ -49,6 +49,8 @@ class StateManager:
|
||||
self.next_state = None
|
||||
self.current_session = None
|
||||
self.blockDb = False
|
||||
self.git_available = False
|
||||
self.git_used = False
|
||||
|
||||
@asynccontextmanager
|
||||
async def db_blocker(self):
|
||||
|
||||
Reference in New Issue
Block a user