Merge pull request #687 from Pythagora-io/development

Development
This commit is contained in:
LeonOstrez
2024-02-26 22:21:35 -08:00
committed by GitHub
21 changed files with 286 additions and 87 deletions

View File

@@ -13,7 +13,7 @@ GPT Pilot is the core technology for the [VS Code extension](https://bit.ly/3IeZ
---
📫 If you would like to get updates on future releases or just get in touch, you [can add your email here](http://eepurl.com/iD6Mpo). 📬
📫 If you would like to get updates on future releases or just get in touch, join our [Discord server](https://discord.gg/HaqXugmxr9) or you [can add your email here](http://eepurl.com/iD6Mpo). 📬
---
@@ -40,13 +40,7 @@ GPT Pilot aims to research how much GPT-4 can be utilized to generate fully work
**The main idea is that AI can write most of the code for an app (maybe 95%), but for the rest, 5%, a developer is and will be needed until we get full AGI**.
I've broken down the idea behind GPT Pilot and how it works in the following blog posts:
**[[Part 1/3] High-level concepts + GPT Pilot workflow until the coding part](https://blog.pythagora.ai/2023/08/23/430/)**
**_[[Part 2/3] GPT Pilot coding workflow](https://blog.pythagora.ai/2023/09/04/gpt-pilot-coding-workflow-part-2-3/)_**
**_[Part 3/3] Other important concepts and future plans (COMING UP)_**
If you are interested in our learnings during this project, you can check [our latest blog posts](https://blog.pythagora.ai/2024/02/19/gpt-pilot-what-did-we-learn-in-6-months-of-working-on-a-codegen-pair-programmer/).
---
@@ -109,13 +103,13 @@ This will start two containers, one being a new image built by the `Dockerfile`
# 🧑‍💻️ CLI arguments
## `app_type` and `name`
If not provided, the ProductOwner will ask for these values:
`app_type` is used as a hint to the LLM as to what kind of architecture, language options and conventions would apply. If not provided, `prompts.prompts.ask_for_app_type()` will ask for it.
See `const.common.APP_TYPES`: 'Web App', 'Script', 'Mobile App', 'Chrome Extension'
## `--get-created-apps-with-steps`
Lists all existing apps.
```bash
python main.py --get-created-apps-with-steps
```
## `app_id` and `workspace`
Continue working on an existing app using **`app_id`**
@@ -132,22 +126,8 @@ python main.py workspace=<PATH_TO_PROJECT_WORKSPACE>
Each user can have their own workspace path for each App.
## `user_id`, `email`, and `password`
These values will be saved to the User table in the DB.
```bash
python main.py user_id=me_at_work
```
If not specified, `user_id` defaults to the OS username but can be provided explicitly if your OS username differs from your GitHub or work username. This value is used to load the `App` config when the `workspace` arg is provided.
If not specified `email` will be parsed from `~/.gitconfig` if the file exists.
See also [What's the purpose of arguments.password / User.password?](https://github.com/Pythagora-io/gpt-pilot/discussions/55)
## `step`
Continue working on an existing app from a specific **`step`** (eg: `user_tasks`)
Continue working on an existing app from a specific **`step`** (eg: `development_planning`)
```bash
python main.py app_id=<ID_OF_THE_APP> step=<STEP_FROM_CONST_COMMON>
```
@@ -179,14 +159,6 @@ python main.py theme=dark
- Dark mode.
![屏幕截图 2023-10-15 104120](https://github.com/Pythagora-io/gpt-pilot/assets/138990495/942cd1c9-b774-498e-b72a-677b01be1ac3)
## `delete_unrelated_steps`
## `update_files_before_start`
# 🔎 Examples
### Backend system for billing, admin, and user management
- 💬 [Full initial prompt + additional features prompts](https://github.com/Pythagora-io/credit-based-backend-gpt-pilot-example/tree/main/prompts)
@@ -232,15 +204,16 @@ Here are the steps GPT Pilot takes to create an app:
![GPT Pilot workflow](https://github.com/Pythagora-io/gpt-pilot/assets/10895136/d89ba1d4-1208-4b7f-b3d4-76e3ccea584e)
1. You enter the app name and the description.
2. **Product Owner agent** asks a couple of questions to understand the requirements better.
3. **Product Owner agent** writes user stories and asks you if they are all correct (this helps it create code later on).
4. **Architect agent** writes up technologies that will be used for the app.
5. **DevOps agent** checks if all technologies are installed on the machine and installs them if not.
6. **Tech Lead agent** writes up development tasks that the Developer must implement. This is an important part because, for each step, the Tech Lead needs to specify how the user (real-world developer) can review if the task is done (e.g. open localhost:3000 and do something).
7. **Developer agent** takes each task and writes up what needs to be done to implement it. The description is in human-readable form.
8. Finally, **Code Monkey agent** takes the Developer's description and the existing file and implements the changes. We realized this works much better than giving it to the Developer right away to implement changes.
For more details on the roles of agents employed by GPT Pilot, please take a look at [AGENTS.md](https://github.com/Pythagora-io/gpt-pilot/blob/main/pilot/helpers/agents/AGENTS.md)
2. **Product Owner agent** like in real life, does nothing. :)
3. **Specification Writer agent** asks a couple of questions to understand the requirements better if project description is not good enough.
4. **Architect agent** writes up technologies that will be used for the app and checks if all technologies are installed on the machine and installs them if not.
5. **Tech Lead agent** writes up development tasks that the Developer must implement.
6. **Developer agent** takes each task and writes up what needs to be done to implement it. The description is in human-readable form.
7. **Code Monkey agent** takes the Developer's description and the existing file and implements the changes.
8. **Reviewer agent** reviews every step of the task and if something is done wrong Reviewer sends it back to Code Monkey.
9. **Troubleshooter agent** helps you to give good feedback to GPT Pilot when something is wrong.
10. **Debugger agent** hate to see him, but he is your best friend when things go south.
11. **Technical Writer agent** writes documentation for the project.
![GPT Pilot Coding Workflow](https://github.com/Pythagora-io/gpt-pilot/assets/10895136/53ea246c-cefe-401c-8ba0-8e4dd49c987b)

View File

@@ -14,6 +14,8 @@ MESSAGE_TYPE = {
'openFile': 'openFile', # Open a file in extension
'loadingFinished': 'loadingFinished', # Marks end of loading project
'loopTrigger': 'loopTrigger', # Trigger loop feedback popup in extension
'progress': 'progress', # Progress bar for extension only
'projectStats': 'projectStats', # Project stats for extension only
}
LOCAL_IGNORE_MESSAGE_TYPES = [
@@ -27,4 +29,6 @@ LOCAL_IGNORE_MESSAGE_TYPES = [
'openFile',
'loadingFinished',
'loopTrigger',
'progress',
'projectStats',
]

View File

@@ -58,22 +58,83 @@ def get_created_apps_with_steps():
for app in apps:
app['id'] = str(app['id'])
app['steps'] = [step for step in STEPS[:STEPS.index(app['status']) + 1]] if app['status'] is not None else []
app['development_steps'] = get_all_app_development_steps(app['id'])
# TODO this is a quick way to remove the unnecessary fields from the response
app['development_steps'] = [{k: v for k, v in dev_step.items() if k in {'id', 'created_at'}} for dev_step in
app['development_steps']]
app['development_steps'] = get_all_app_development_steps(app['id'], loading_steps_only=True)
task_counter = 1
troubleshooting_counter = 1
feature_counter = 1
feature_end_counter = 1
new_development_steps = []
for dev_step in app['development_steps']:
# Filter out unwanted keys first
filtered_step = {k: v for k, v in dev_step.items() if k in {'id', 'prompt_path'}}
if 'breakdown' in filtered_step['prompt_path']:
filtered_step['name'] = f"Task {task_counter}"
task_counter += 1
# Reset troubleshooting counter on finding 'breakdown'
troubleshooting_counter = 1
elif 'iteration' in filtered_step['prompt_path']:
filtered_step['name'] = f"Troubleshooting {troubleshooting_counter}"
troubleshooting_counter += 1
elif 'feature_plan' in filtered_step['prompt_path']:
filtered_step['name'] = f"Feature {feature_counter}"
feature_counter += 1
# Reset task and troubleshooting counters on finding 'feature_plan'
task_counter = 1
troubleshooting_counter = 1
elif 'feature_summary' in filtered_step['prompt_path']:
filtered_step['name'] = f"Feature {feature_end_counter} end"
feature_end_counter += 1
# Update the dev_step in the list
new_development_steps.append(filtered_step)
last_step = get_last_development_step(app['id'])
if last_step:
new_development_steps.append({
'id': last_step['id'],
'prompt_path': last_step['prompt_path'],
'name': 'Latest Step',
})
app['development_steps'] = new_development_steps
return apps
def get_all_app_development_steps(app_id, last_step=None):
def get_all_app_development_steps(app_id, last_step=None, loading_steps_only=False):
query = DevelopmentSteps.select().where(DevelopmentSteps.app == app_id)
if last_step is not None:
query = query.where(DevelopmentSteps.id <= last_step)
if loading_steps_only:
query = query.where((DevelopmentSteps.prompt_path.contains('breakdown')) |
# (DevelopmentSteps.prompt_path.contains('parse_task')) | Not needed for extension users but we can load this steps if needed
(DevelopmentSteps.prompt_path.contains('iteration')) |
# (DevelopmentSteps.prompt_path.contains('create_readme')) | Not needed for extension users but we can load this steps if needed
(DevelopmentSteps.prompt_path.contains('feature_plan')) |
(DevelopmentSteps.prompt_path.contains('feature_summary')))
return [model_to_dict(dev_step) for dev_step in query]
def get_last_development_step(app_id, last_step=None):
last_dev_step_query = DevelopmentSteps.select().where(DevelopmentSteps.app == app_id)
if last_step is not None:
last_dev_step_query = last_dev_step_query.where(DevelopmentSteps.id <= last_step)
# Order by ID in descending order to get the last step and fetch the first result
last_dev_step = last_dev_step_query.order_by(DevelopmentSteps.id.desc()).first()
# If a step is found, convert it to a dictionary, otherwise return None
return model_to_dict(last_dev_step) if last_dev_step else None
def save_user(user_id, email, password):
try:
user = User.get(User.id == user_id)

View File

@@ -14,6 +14,7 @@ from logger.logger import logger
from prompts.prompts import ask_user
from const.llm import END_RESPONSE
from helpers.cli import running_processes
from utils.telemetry import telemetry
class AgentConvo:
@@ -102,6 +103,8 @@ class AgentConvo:
if should_log_message:
self.log_message(message_content)
if self.agent.project.check_ipc():
telemetry.send_project_stats()
return response
def format_message_content(self, response, function_calls):
@@ -235,7 +238,8 @@ class AgentConvo:
if self.log_to_user:
if self.agent.project.checkpoints['last_development_step'] is not None:
dev_step_msg = f'\nDev step {str(self.agent.project.checkpoints["last_development_step"]["id"])}\n'
print(color_yellow_bold(dev_step_msg), end='')
if not self.agent.project.check_ipc():
print(color_yellow_bold(dev_step_msg), end='')
logger.info(dev_step_msg)
print(f"\n{content}\n", type='local')
logger.info(f"{print_msg}: {content}\n")

View File

@@ -12,6 +12,7 @@ from helpers.exceptions import TooDeepRecursionError
from logger.logger import logger
from prompts.prompts import ask_user
from utils.exit import trace_code_event
from utils.print import print_task_progress
class Debugger:
@@ -65,6 +66,7 @@ class Debugger:
user_input = answer
self.agent.project.current_task.add_user_input_to_debugging_task(user_input)
print('', type='verbose', category='agent:debugger')
llm_response = convo.send_message('dev_ops/debug.prompt',
{
'command': command['command'] if command is not None else None,
@@ -77,6 +79,7 @@ class Debugger:
DEBUG_STEPS_BREAKDOWN)
completed_steps = []
print_task_progress(i+1, i+1, user_input, 'debugger', 'in_progress')
try:
while True:
@@ -90,7 +93,8 @@ class Debugger:
test_after_code_changes=True,
continue_development=False,
is_root_task=is_root_task,
continue_from_step=len(completed_steps)
continue_from_step=len(completed_steps),
task_source='debugger',
)
# in case one step failed or llm wants to see the output to determine the next steps

View File

@@ -34,7 +34,7 @@ from utils.ignore import IgnoreMatcher
from utils.telemetry import telemetry
from utils.task import Task
from utils.utils import remove_lines_with_string
from utils.utils import remove_lines_with_string, is_extension_old_version
class Project:
@@ -58,6 +58,8 @@ class Project:
current_step (str, optional): Current step in the project. Default is None.
"""
self.args = args
# TODO remove everything related to is_extension_old_version once new version is released and everybody has to update core
self.is_extension_old_version = is_extension_old_version(args)
self.llm_req_num = 0
self.command_runs_count = 0
self.user_inputs_count = 0
@@ -102,8 +104,6 @@ class Project:
self.tasks_to_load = []
self.features_to_load = []
self.dev_steps_to_load = []
if self.continuing_project:
self.setup_loading()
# end loading of project
def set_root_path(self, root_path: str):
@@ -167,20 +167,27 @@ class Project:
if not test_api_access(self):
return False
if self.continuing_project:
self.setup_loading()
self.project_manager = ProductOwner(self)
self.spec_writer = SpecWriter(self)
print('', type='verbose', category='agent:product-owner')
self.project_manager.get_project_description(self.spec_writer)
self.project_manager.get_user_stories()
# self.user_tasks = self.project_manager.get_user_tasks()
print('', type='verbose', category='agent:architect')
self.architect = Architect(self)
self.architect.get_architecture()
print('', type='verbose', category='agent:developer')
self.developer = Developer(self)
self.developer.set_up_environment()
self.technical_writer = TechnicalWriter(self)
print('', type='verbose', category='agent:tech-lead')
self.tech_lead = TechLead(self)
self.tech_lead.create_development_plan()
@@ -194,7 +201,7 @@ class Project:
print(json.dumps({
"project_stage": "coding"
}), type='info')
self.developer.start_coding()
self.developer.start_coding('app')
return True
def finish(self):
@@ -208,6 +215,7 @@ class Project:
self.previous_features = get_features_by_app_id(self.args['app_id'])
if not self.skip_steps:
print('', type='verbose', category='pythagora')
feature_description = ask_user(self, "Project is finished! Do you want to add any features or changes? "
"If yes, describe it here and if no, just press ENTER",
require_some_input=False)
@@ -215,6 +223,7 @@ class Project:
if feature_description == '':
return
print('', type='verbose', category='agent:tech-lead')
self.tech_lead.create_feature_plan(feature_description)
# loading of features
@@ -245,7 +254,8 @@ class Project:
self.features_to_load = []
self.current_feature = feature_description
self.developer.start_coding()
self.developer.start_coding('feature')
print('', type='verbose', category='agent:tech-lead')
self.tech_lead.create_feature_summary(feature_description)
def get_directory_tree(self, with_descriptions=False):
@@ -542,7 +552,9 @@ class Project:
delete_unconnected_steps_from(self.checkpoints['last_command_run'], 'previous_step')
delete_unconnected_steps_from(self.checkpoints['last_user_input'], 'previous_step')
def ask_for_human_intervention(self, message, description=None, cbs={}, convo=None, is_root_task=False, add_loop_button=False):
def ask_for_human_intervention(self, message, description=None, cbs={}, convo=None, is_root_task=False,
add_loop_button=False, category='human-intervention'):
print('', type='verbose', category=category)
answer = ''
question = color_yellow_bold(message)

View File

@@ -103,7 +103,9 @@ class CodeMonkey(Agent):
# There are no changes or there was problem talking with the LLM, we're done here
break
print('', type='verbose', category='agent:reviewer')
content, rework_feedback = self.review_change(convo, code_change_description, file_name, file_content, content)
print('', type='verbose', category='agent:code-monkey')
if not rework_feedback:
# No rework needed, we're done here
break

View File

@@ -31,6 +31,7 @@ from const.function_calls import (EXECUTE_COMMANDS, GET_TEST_TYPE, IMPLEMENT_TAS
from database.database import save_progress, get_progress_steps, update_app_status
from utils.telemetry import telemetry
from prompts.prompts import ask_user
from utils.print import print_task_progress, print_step_progress
ENVIRONMENT_SETUP_STEP = 'environment_setup'
@@ -43,7 +44,8 @@ class Developer(Agent):
self.save_dev_steps = True
self.debugger = Debugger(self)
def start_coding(self):
def start_coding(self, task_source):
print('', type='verbose', category='agent:developer')
if not self.project.finished:
self.project.current_step = 'coding'
update_app_status(self.project.args['app_id'], self.project.current_step)
@@ -66,6 +68,7 @@ class Developer(Agent):
if current_progress_percent > threshold and threshold not in documented_thresholds:
if not self.project.skip_steps:
self.project.technical_writer.document_project(current_progress_percent)
print('', type='verbose', category='agent:developer')
documented_thresholds.add(threshold)
if self.project.tasks_to_load:
@@ -85,7 +88,9 @@ class Developer(Agent):
continue
self.project.current_task.start_new_task(dev_task['description'], i + 1)
self.implement_task(i, dev_task)
print_task_progress(i+1, len(self.project.development_plan), dev_task['description'], task_source, 'in_progress')
self.implement_task(i, task_source, dev_task)
print_task_progress(i+1, len(self.project.development_plan), dev_task['description'], task_source, 'done')
telemetry.inc("num_tasks")
# DEVELOPMENT END
@@ -98,20 +103,27 @@ class Developer(Agent):
update_app_status(self.project.args['app_id'], self.project.current_step)
message = 'The app is DONE!!! Yay...you can use it now.\n'
logger.info(message)
print(color_green_bold(message))
print(color_green_bold(message), category='success')
if not self.project.skip_steps:
telemetry.set("end_result", "success:initial-project")
telemetry.send()
else:
message = 'Feature complete!\n'
logger.info(message)
print(color_green_bold(message))
print(color_green_bold(message), category='success')
if not self.project.skip_steps:
telemetry.set("end_result", "success:feature")
telemetry.send()
def implement_task(self, i, development_task=None):
print(color_green_bold(f'Implementing task #{i + 1}: ') + color_green(f' {development_task["description"]}\n'))
def implement_task(self, i, task_source, development_task=None):
"""
Implement a single development task.
:param i: The index of the task in the development plan.
:param task_source: The source of the task, one of: 'app', 'feature', 'debugger', 'iteration'.
:param development_task: The task to implement.
"""
print(color_green_bold(f'Implementing task #{i + 1}: ') + color_green(f' {development_task["description"]}\n'), category='agent:developer')
self.project.dot_pilot_gpt.chat_log_folder(i + 1)
convo_dev_task = AgentConvo(self)
@@ -205,7 +217,8 @@ class Developer(Agent):
development_task=development_task,
continue_development=True,
is_root_task=True,
continue_from_step=len(completed_steps))
continue_from_step=len(completed_steps),
task_source=task_source)
if result['success']:
break
@@ -464,11 +477,36 @@ class Developer(Agent):
def execute_task(self, convo, task_steps, test_command=None, reset_convo=True,
test_after_code_changes=True, continue_development=False,
development_task=None, is_root_task=False, continue_from_step=0):
development_task=None, is_root_task=False, continue_from_step=0, task_source=None):
"""
Execute a list of development steps.
:param convo: The conversation to use.
:param task_steps: The steps to execute.
:param test_command: The command to test after the steps are executed.
:param reset_convo: True if the conversation should be reset before executing the steps.
:param test_after_code_changes: True if the test command should be run after code changes.
:param continue_development: If True we reach iteration.
:param development_task: The development task to execute.
:param is_root_task: True if this is the root task.
:param continue_from_step: The index of the step to continue from.
:param task_source: The source of the task, one of: 'app', 'feature', 'debugger', 'troubleshooting', 'review'.
Task source should never be None.
:return: The result of the task execution.
"""
function_uuid = str(uuid.uuid4())
convo.save_branch(function_uuid)
agent_map = {
'app': 'agent:developer',
'feature': 'agent:developer',
'debugger': 'agent:debugger',
'troubleshooting': 'agent:troubleshooter',
'review': 'agent:reviewer',
}
for (i, step) in enumerate(task_steps):
print_step_progress(i+1, len(task_steps), step, task_source)
if (step['type'] in ['save_file', 'code_change', 'modify_file'] and
'path' in step[step['type']] and
step[step['type']]['path'] not in self.modified_files):
@@ -501,12 +539,14 @@ class Developer(Agent):
# result['user_input'] = result['cli_response']
elif step['type'] in ['save_file', 'modify_file', 'code_change']:
print('', type='verbose', category='agent:code-monkey')
result = self.step_save_file(convo, step, i, test_after_code_changes)
elif step['type'] == 'delete_file':
result = self.step_delete_file(convo, step, i, test_after_code_changes)
elif step['type'] == 'human_intervention':
print('', type='verbose', category='human-intervention')
result = self.step_human_intervention(convo, task_steps, i)
# TODO background_command - if we run commands in background we should have way to kill processes
@@ -515,6 +555,7 @@ class Developer(Agent):
# terminate_named_process(step['kill_process'])
# result = {'success': True}
print(f'Finishing up {step["type"]} step.', type='verbose', category=agent_map[task_source])
logger.info(' step result: %s', result)
if (not result['success']) or (need_to_see_output and result.get("user_input") != "SKIP"):
@@ -595,7 +636,8 @@ class Developer(Agent):
return_cli_response=True, is_root_task=True)},
convo=iteration_convo,
is_root_task=True,
add_loop_button=iteration_count > 3)
add_loop_button=iteration_count > 3,
category='human-test')
logger.info('response: %s', response)
self.review_count = 0
@@ -605,6 +647,7 @@ class Developer(Agent):
return {"success": True, "user_input": user_feedback}
if user_feedback is not None:
print('', type='verbose', category='agent:troubleshooter')
user_feedback = self.bug_report_generator(user_feedback)
stuck_in_loop = user_feedback.startswith(STUCK_IN_LOOP)
if stuck_in_loop:
@@ -624,6 +667,7 @@ class Developer(Agent):
tried_alternative_solutions_to_current_issue.append(next_solution_to_try)
print_task_progress(1, 1, development_task['description'], 'troubleshooting', 'in_progress')
iteration_convo = AgentConvo(self)
iteration_description = iteration_convo.send_message('development/iteration.prompt', {
"name": self.project.args['name'],
@@ -664,7 +708,8 @@ class Developer(Agent):
iteration_convo.remove_last_x_messages(2)
task_steps = llm_response['tasks']
self.execute_task(iteration_convo, task_steps, is_root_task=True)
self.execute_task(iteration_convo, task_steps, is_root_task=True, task_source='troubleshooting')
print_task_progress(1, 1, development_task['description'], 'troubleshooting', 'done')
def bug_report_generator(self, user_feedback):
"""
@@ -726,6 +771,7 @@ class Developer(Agent):
Review all task changes and refactor big files.
:return: bool - True if the task changes passed review, False if not
"""
print('', type='verbose', category='agent:reviewer')
self.review_count += 1
review_result = self.review_code_changes()
refactoring_done = self.refactor_code()
@@ -781,7 +827,7 @@ class Developer(Agent):
}, IMPLEMENT_TASK)
task_steps = llm_response['tasks']
result = self.execute_task(review_convo, task_steps)
result = self.execute_task(review_convo, task_steps, task_source='review')
return {
'success': result['success'] if 'success' in result else False,
'implementation_needed': True,

View File

@@ -23,7 +23,7 @@ class ProductOwner(Agent):
def get_project_description(self, spec_writer):
print(json.dumps({
"project_stage": "project_description"
}), type='info')
}), type='info', category='agent:product-owner')
self.project.app = get_app(self.project.args['app_id'], error_if_not_found=False)

View File

@@ -90,6 +90,7 @@ class SpecWriter(Agent):
if len(initial_prompt) > 1500:
return initial_prompt
print('', type='verbose', category='agent:spec-writer')
spec = self.analyze_project(initial_prompt)
missing_info = self.review_spec(initial_prompt, spec)
if missing_info:

View File

@@ -33,7 +33,7 @@ class TechLead(Agent):
existing_summary = apply_project_template(self.project)
# DEVELOPMENT PLANNING
print(color_green_bold("Starting to create the action plan for development...\n"))
print(color_green_bold("Starting to create the action plan for development...\n"), category='agent:tech-lead')
logger.info("Starting to create the action plan for development...")
llm_response = self.convo_development_plan.send_message('development/plan.prompt',

View File

@@ -12,12 +12,13 @@ class TechnicalWriter(Agent):
def document_project(self, percent):
files = self.project.get_all_coded_files()
print(f'{color_green_bold("CONGRATULATIONS!!!")}')
print(f'{color_green_bold("CONGRATULATIONS!!!")}', category='success')
print(f'You reached {color_green(str(percent) + "%")} of your project generation!\n\n')
print('For now, you have created:\n')
print(f'{color_green(len(files))} files\n')
print(f'{color_green(count_lines_of_code(files))} lines of code\n\n')
print('Before continuing, GPT Pilot will create some documentation for the project...\n')
print('', type='verbose', category='agent:tech-writer')
self.create_license()
self.create_readme()
self.create_api_documentation()

View File

@@ -68,7 +68,7 @@ class TestDeveloper:
developer.execute_task.return_value = {'success': True}
# When
developer.implement_task(0, {'description': 'Do stuff'})
developer.implement_task(0, 'test', {'description': 'Do stuff'})
# Then we parse the response correctly and send list of steps to execute_task()
assert developer.execute_task.call_count == 1
@@ -97,7 +97,7 @@ class TestDeveloper:
]
# When
developer.implement_task(0, {'description': 'Do stuff'})
developer.implement_task(0, 'test', {'description': 'Do stuff'})
# Then we include the user input in the conversation to update the task list
assert mock_completion.call_count == 3

View File

@@ -194,6 +194,7 @@ def execute_command(project, command, timeout=None, success_message=None, comman
If `cli_response` not None: 'was interrupted by user', 'timed out' or `None` - caller should send `cli_response` to LLM
exit_code (int): The exit code of the process.
"""
print('', type='verbose', category='exec-command')
project.finish_loading()
if timeout is not None:
if timeout < 0:
@@ -216,7 +217,6 @@ def execute_command(project, command, timeout=None, success_message=None, comman
logger.info('--------- EXECUTE COMMAND ---------- : %s', question)
answer = ask_user(project, question, False, hint='If yes, just press ENTER. Otherwise, type "no" but it will be processed as successfully executed.')
# TODO can we use .confirm(question, default='yes').ask() https://questionary.readthedocs.io/en/stable/pages/types.html#confirmation
print('answer: ' + answer)
if answer.lower() in NEGATIVE_ANSWERS:
return None, 'SKIP', None
elif answer.lower() not in AFFIRMATIVE_ANSWERS:

View File

@@ -126,6 +126,7 @@ if __name__ == "__main__":
print('Exit', type='exit')
except Exception as err:
print('', type='verbose', category='error')
print(color_red('---------- GPT PILOT EXITING WITH ERROR ----------'))
traceback.print_exc()
print(color_red('--------------------------------------------------'))

View File

@@ -55,7 +55,10 @@ def ask_for_main_app_definition(project):
def ask_user(project, question: str, require_some_input=True, hint: str = None, ignore_user_input_count: bool = False):
while True:
if hint is not None:
print(color_white_bold(hint) + '\n', type='hint')
if project.is_extension_old_version:
print(color_white_bold(hint) + '\n', type='hint')
else:
print(color_white_bold(hint) + '\n')
project.finish_loading()
answer = styled_text(project, question, hint=hint, ignore_user_input_count=ignore_user_input_count)

View File

@@ -1,9 +1,12 @@
import builtins
from helpers.ipc import IPCClient
from const.ipc import MESSAGE_TYPE, LOCAL_IGNORE_MESSAGE_TYPES
from utils.print import remove_ansi_codes
from utils.utils import is_extension_old_version
def get_custom_print(args):
is_old_version = is_extension_old_version(args)
built_in_print = builtins.print
def print_to_external_process(*args, **kwargs):
@@ -16,9 +19,13 @@ def get_custom_print(args):
local_print(*args, **kwargs)
return
if is_old_version and not message and kwargs['type'] != MESSAGE_TYPE['loadingFinished']:
return
ipc_client_instance.send({
'type': MESSAGE_TYPE[kwargs['type']],
'content': message,
'category': kwargs['category'] if 'category' in kwargs else '',
'content': message if is_old_version else remove_ansi_codes(message),
})
if kwargs['type'] == MESSAGE_TYPE['user_input_request']:
return ipc_client_instance.listen()
@@ -30,6 +37,9 @@ def get_custom_print(args):
return
del kwargs['type']
if 'category' in kwargs:
del kwargs['category']
built_in_print(message, **kwargs)
ipc_client_instance = None

51
pilot/utils/print.py Normal file
View File

@@ -0,0 +1,51 @@
import re
def print_task_progress(index, num_of_tasks, description, task_source, status):
"""
Print task progress in extension.
:param index: Index of the task.
:param num_of_tasks: Number of tasks.
:param description: Description of the task.
:param task_source: Source of the task, one of: 'app', 'feature', 'debugger', 'troubleshooting', 'review'.
:param status: Status of the task, can be 'in_progress' or 'done'.
:return: None
"""
print({'task': {
'index': index,
'num_of_tasks': num_of_tasks,
'description': description,
'source': task_source,
'status': status,
}}, type='progress')
def print_step_progress(index, num_of_steps, step, task_source):
"""
Print step progress in extension.
:param index: Index of the step.
:param num_of_steps: Number of steps.
:param step: Name of the step.
:param task_source: Source of the task, one of: 'app', 'feature', 'debugger', 'troubleshooting', 'review'.
:return: None
"""
print({'step': {
'index': index,
'num_of_steps': num_of_steps,
'step': step,
'source': task_source,
}}, type='progress')
def remove_ansi_codes(s: str) -> str:
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
# Check if the input is a string
if isinstance(s, str):
return ansi_escape.sub('', s)
else:
# If the input is not a string, return the input as is
return s

View File

@@ -1,14 +1,9 @@
import platform
import questionary
import re
import sys
from database.database import save_user_input
from utils.style import style_config
def remove_ansi_codes(s: str) -> str:
ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
return ansi_escape.sub('', s)
from utils.print import remove_ansi_codes
def styled_select(*args, **kwargs):
@@ -29,7 +24,8 @@ def styled_text(project, question, ignore_user_input_count=False, style=None, hi
if project is not None and project.check_ipc():
response = print(question, type='user_input_request')
print(response)
if project.is_extension_old_version:
print(response)
else:
used_style = style if style is not None else style_config.get_style()
question = remove_ansi_codes(question) # Colorama and questionary are not compatible and styling doesn't work
@@ -39,7 +35,8 @@ def styled_text(project, question, ignore_user_input_count=False, style=None, hi
if not ignore_user_input_count:
save_user_input(project, question, response, hint)
print('\n\n', end='')
if project is not None and not project.check_ipc():
print('\n\n', end='')
return response

View File

@@ -377,5 +377,20 @@ class Telemetry:
f"Telemetry.send(): failed to send telemetry data: {e}", exc_info=True
)
def send_project_stats(self):
"""
Send project statistics to the extension.
Note: this method does not clear any telemetry data.
"""
if not self.enabled:
return
print({
"num_lines": self.data["num_lines"],
"num_files": self.data["num_files"],
"num_tokens": self.data["num_llm_tokens"],
}, type='projectStats')
telemetry = Telemetry()

View File

@@ -84,12 +84,12 @@ def get_prompt_components(data):
return data.update(prompts_components)
def get_sys_message(role,args=None):
def get_sys_message(role, args=None):
"""
:param role: 'product_owner', 'architect', 'dev_ops', 'tech_lead', 'full_stack_developer', 'code_monkey'
:return: { "role": "system", "content": "You are a {role}... You do..." }
"""
content = get_prompt(f'system_messages/{role}.prompt',args)
content = get_prompt(f'system_messages/{role}.prompt', args)
return {
"role": "system",
@@ -195,6 +195,7 @@ def clean_filename(filename):
return cleaned_filename
def json_serial(obj):
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, (datetime.datetime, datetime.date)):
@@ -204,7 +205,20 @@ def json_serial(obj):
else:
return str(obj)
def remove_lines_with_string(file_content, matching_string):
lines = file_content.split('\n')
new_lines = [line for line in lines if matching_string not in line.lower()]
return '\n'.join(new_lines)
def is_extension_old_version(args):
is_old_version = True
if "extension_version" in args:
arg_version_tuple = tuple(map(int, args["extension_version"].split('.')))
compare_version_tuple = (0, 0, 45)
is_old_version = arg_version_tuple <= compare_version_tuple
print(is_old_version)
return is_old_version