Refactor for super clear agent protocol implemenation (#15)

This commit is contained in:
Swifty
2023-08-18 19:02:24 +02:00
committed by GitHub
parent 13ac319c0a
commit c6a1fc44a7
10 changed files with 757 additions and 84 deletions

View File

@@ -1,6 +1,6 @@
[flake8]
max-line-length = 88
select = "E303, W293, W291, W292, E305, E231, E302"
select = "E303, W293, W292, E305, E231, E302"
exclude =
.tox,
__pycache__,

View File

@@ -18,6 +18,6 @@ if __name__ == "__main__":
port = os.getenv("PORT")
database = autogpt.db.AgentDB(database_name)
agent = autogpt.agent.AutoGPT(db=database, workspace=workspace)
agent = autogpt.agent.Agent(database=database, workspace=workspace)
agent.start(port=port, router=router)

View File

@@ -1,64 +1,220 @@
import asyncio
import os
import typing
import autogpt.utils
from autogpt.agent_protocol import Agent, Artifact, Step, Task, TaskDB
from fastapi import APIRouter, FastAPI, Response, UploadFile
from fastapi.responses import FileResponse, JSONResponse
from hypercorn.asyncio import serve
from hypercorn.config import Config
from .db import AgentDB
from .middlewares import AgentMiddleware
from .routes.agent_protocol import base_router
from .schema import Artifact, Status, Step, StepRequestBody, Task, TaskRequestBody
from .utils import run
from .workspace import Workspace
class AutoGPT(Agent):
def __init__(self, db: TaskDB, workspace: Workspace) -> None:
super().__init__(db)
class Agent:
def __init__(self, database: AgentDB, workspace: Workspace):
self.db = database
self.workspace = workspace
async def create_task(self, task: Task) -> None:
print(f"task: {task.input}")
def start(self, port: int = 8000, router: APIRouter = base_router):
"""
Start the agent server.
"""
config = Config()
config.bind = [f"localhost:{port}"]
app = FastAPI(
title="Auto-GPT Forge",
description="Modified version of The Agent Protocol.",
version="v0.4",
)
app.include_router(router)
app.add_middleware(AgentMiddleware, agent=self)
asyncio.run(serve(app, config))
async def create_task(self, task_request: TaskRequestBody) -> Task:
"""
Create a task for the agent.
"""
try:
task = await self.db.create_task(
input=task_request.input if task_request.input else None,
additional_input=task_request.additional_input
if task_request.additional_input
else None,
)
print(task)
except Exception as e:
return Response(status_code=500, content=str(e))
print(task)
return task
async def run_step(self, step: Step) -> Step:
artifacts = autogpt.utils.run(step.input)
for artifact in artifacts:
art = await self.db.create_artifact(
task_id=step.task_id,
file_name=artifact["file_name"],
uri=artifact["uri"],
agent_created=True,
step_id=step.step_id,
async def list_tasks(self) -> typing.List[str]:
"""
List the IDs of all tasks that the agent has created.
"""
try:
task_ids = [task.task_id for task in await self.db.list_tasks()]
except Exception as e:
return Response(status_code=500, content=str(e))
return task_ids
async def get_task(self, task_id: str) -> Task:
"""
Get a task by ID.
"""
if not task_id:
return Response(status_code=400, content="Task ID is required.")
if not isinstance(task_id, str):
return Response(status_code=400, content="Task ID must be a string.")
try:
task = await self.db.get_task(task_id)
except Exception as e:
return Response(status_code=500, content=str(e))
return task
async def list_steps(self, task_id: str) -> typing.List[str]:
"""
List the IDs of all steps that the task has created.
"""
if not task_id:
return Response(status_code=400, content="Task ID is required.")
if not isinstance(task_id, str):
return Response(status_code=400, content="Task ID must be a string.")
try:
steps_ids = [step.step_id for step in await self.db.list_steps(task_id)]
except Exception as e:
return Response(status_code=500, content=str(e))
return steps_ids
async def create_and_execute_step(
self, task_id: str, step_request: StepRequestBody
) -> Step:
"""
Create a step for the task.
"""
if step_request.input != "y":
step = await self.db.create_step(
task_id=task_id,
input=step_request.input if step_request else None,
additional_properties=step_request.additional_input
if step_request
else None,
)
assert isinstance(
art, Artifact
), f"Artifact not isntance of Artifact {type(art)}"
step.artifacts.append(art)
step.status = "completed"
# utils.run
artifacts = run(step.input)
for artifact in artifacts:
art = await self.db.create_artifact(
task_id=step.task_id,
file_name=artifact["file_name"],
uri=artifact["uri"],
agent_created=True,
step_id=step.step_id,
)
assert isinstance(
art, Artifact
), f"Artifact not instance of Artifact {type(art)}"
step.artifacts.append(art)
step.status = "completed"
else:
steps = await self.db.list_steps(task_id)
artifacts = await self.db.list_artifacts(task_id)
step = steps[-1]
step.artifacts = artifacts
step.output = "No more steps to run."
step.is_last = True
if isinstance(step.status, Status):
step.status = step.status.value
step.output = "Done some work"
return JSONResponse(content=step.dict(), status_code=200)
async def get_step(self, task_id: str, step_id: str) -> Step:
"""
Get a step by ID.
"""
if not task_id or not step_id:
return Response(
status_code=400, content="Task ID and step ID are required."
)
if not isinstance(task_id, str) or not isinstance(step_id, str):
return Response(
status_code=400, content="Task ID and step ID must be strings."
)
try:
step = await self.db.get_step(task_id, step_id)
except Exception as e:
return Response(status_code=500, content=str(e))
return step
async def retrieve_artifact(self, task_id: str, artifact: Artifact) -> bytes:
async def list_artifacts(self, task_id: str) -> typing.List[Artifact]:
"""
Retrieve the artifact data from wherever it is stored and return it as bytes.
List the artifacts that the task has created.
"""
if not artifact.uri.startswith("file://"):
raise NotImplementedError("Loading from uri not implemented")
file_path = artifact.uri.split("file://")[1]
if not self.workspace.exists(file_path):
raise FileNotFoundError(f"File {file_path} not found in workspace")
return self.workspace.read(file_path)
if not task_id:
return Response(status_code=400, content="Task ID is required.")
if not isinstance(task_id, str):
return Response(status_code=400, content="Task ID must be a string.")
try:
artifacts = await self.db.list_artifacts(task_id)
except Exception as e:
return Response(status_code=500, content=str(e))
return artifacts
async def save_artifact(
self, task_id: str, artifact: Artifact, data: bytes
async def create_artifact(
self,
task_id: str,
file: UploadFile | None = None,
uri: str | None = None,
) -> Artifact:
"""
Save the artifact data to the agent's workspace, loading from uri if bytes are not available.
Create an artifact for the task.
"""
assert (
data is not None and artifact.uri is not None
), "Data or Artifact uri must be set"
if not file and not uri:
return Response(status_code=400, content="No file or uri provided")
data = None
if not uri:
file_name = file.filename or str(uuid4())
try:
data = b""
while contents := file.file.read(1024 * 1024):
data += contents
except Exception as e:
return Response(status_code=500, content=str(e))
if data is not None:
file_path = os.path.join(task_id / artifact.file_name)
file_path = os.path.join(task_id / file_name)
self.write(file_path, data)
artifact.uri = f"file://{file_path}"
self.db.save_artifact(task_id, artifact)
else:
raise NotImplementedError("Loading from uri not implemented")
artifact = await self.create_artifact(
task_id=task_id,
file_name=file_name,
uri=f"file://{file_path}",
agent_created=False,
)
return artifact
async def get_artifact(self, task_id: str, artifact_id: str) -> Artifact:
"""
Get an artifact by ID.
"""
artifact = await self.db.get_artifact(task_id, artifact_id)
if not artifact.uri.startswith("file://"):
return Response(
status_code=500, content="Loading from none file uri not implemented"
)
file_path = artifact.uri.split("file://")[1]
if not self.workspace.exists(file_path):
return Response(status_code=500, content="File not found")
retrieved_artifact = self.workspace.read(file_path)
path = artifact.file_name
with open(path, "wb") as f:
f.write(retrieved_artifact)
return FileResponse(
# Note: mimetype is guessed in the FileResponse constructor
path=path,
filename=artifact.file_name,
)

View File

@@ -5,7 +5,7 @@ from fastapi import (
)
from fastapi.responses import FileResponse
from autogpt.agent_protocol.agent import base_router
from autogpt.routes.agent_protocol import base_router
def add_benchmark_routes():

View File

@@ -9,8 +9,7 @@ from typing import Dict, List, Optional
from sqlalchemy import Boolean, Column, ForeignKey, Integer, String, create_engine
from sqlalchemy.orm import DeclarativeBase, joinedload, relationship, sessionmaker
from autogpt.agent_protocol import Artifact, Step, Task, TaskDB
from autogpt.agent_protocol.models import Status, TaskInput
from .schema import Artifact, Status, Step, Task, TaskInput
class Base(DeclarativeBase):
@@ -28,7 +27,6 @@ class TaskModel(Base):
input = Column(String)
additional_input = Column(String)
steps = relationship("StepModel", back_populates="task")
artifacts = relationship("ArtifactModel", back_populates="task")
@@ -41,9 +39,9 @@ class StepModel(Base):
input = Column(String)
status = Column(String)
is_last = Column(Boolean, default=False)
additional_properties = Column(String)
task = relationship("TaskModel", back_populates="steps")
additional_properties = Column(String)
artifacts = relationship("ArtifactModel", back_populates="step")
class ArtifactModel(Base):
@@ -56,29 +54,16 @@ class ArtifactModel(Base):
file_name = Column(String)
uri = Column(String)
step = relationship("StepModel", back_populates="artifacts")
task = relationship("TaskModel", back_populates="artifacts")
def convert_to_task(task_obj: TaskModel) -> Task:
steps_list = []
for step in task_obj.steps:
status = Status.completed if step.status == "completed" else Status.created
steps_list.append(
Step(
task_id=step.task_id,
step_id=step.step_id,
name=step.name,
status=status,
is_last=step.is_last == 1,
additional_properties=step.additional_properties,
)
)
return Task(
task_id=task_obj.task_id,
input=task_obj.input,
additional_input=task_obj.additional_input,
artifacts=[],
steps=steps_list,
)
@@ -91,7 +76,7 @@ def convert_to_step(step_model: StepModel) -> Step:
agent_created=artifact.agent_created,
uri=artifact.uri,
)
for artifact in step_model.task.artifacts
for artifact in step_model.artifacts
if artifact.step_id == step_model.step_id
]
status = Status.completed if step_model.status == "completed" else Status.created
@@ -108,7 +93,7 @@ def convert_to_step(step_model: StepModel) -> Step:
# sqlite:///{database_name}
class AgentDB(TaskDB):
class AgentDB:
def __init__(self, database_string) -> None:
super().__init__()
self.engine = create_engine(database_string)
@@ -137,18 +122,21 @@ class AgentDB(TaskDB):
is_last: bool = False,
additional_properties: Optional[Dict[str, str]] = None,
) -> Step:
session = self.Session()
new_step = StepModel(
task_id=task_id,
name=name,
input=input,
status="created",
is_last=is_last,
additional_properties=additional_properties,
)
session.add(new_step)
session.commit()
session.refresh(new_step)
try:
session = self.Session()
new_step = StepModel(
task_id=task_id,
name=name,
input=input,
status="created",
is_last=is_last,
additional_properties=additional_properties,
)
session.add(new_step)
session.commit()
session.refresh(new_step)
except Exception as e:
print(e)
return convert_to_step(new_step)
async def create_artifact(
@@ -163,7 +151,12 @@ class AgentDB(TaskDB):
if existing_artifact := session.query(ArtifactModel).filter_by(uri=uri).first():
session.close()
return existing_artifact
return Artifact(
artifact_id=str(existing_artifact.artifact_id),
file_name=existing_artifact.file_name,
agent_created=existing_artifact.agent_created,
uri=existing_artifact.uri,
)
new_artifact = ArtifactModel(
task_id=task_id,
@@ -175,18 +168,22 @@ class AgentDB(TaskDB):
session.add(new_artifact)
session.commit()
session.refresh(new_artifact)
return await self.get_artifact(task_id, new_artifact.artifact_id)
return Artifact(
artifact_id=str(new_artifact.artifact_id),
file_name=new_artifact.file_name,
agent_created=new_artifact.agent_created,
uri=new_artifact.uri,
)
async def get_task(self, task_id: int) -> Task:
"""Get a task by its id"""
session = self.Session()
task_obj = (
if task_obj := (
session.query(TaskModel)
.options(joinedload(TaskModel.steps))
.options(joinedload(TaskModel.artifacts))
.filter_by(task_id=task_id)
.first()
)
if task_obj:
):
return convert_to_task(task_obj)
else:
raise DataNotFoundError("Task not found")
@@ -195,7 +192,7 @@ class AgentDB(TaskDB):
session = self.Session()
if step := (
session.query(StepModel)
.options(joinedload(StepModel.task).joinedload(TaskModel.artifacts))
.options(joinedload(StepModel.artifacts))
.filter(StepModel.step_id == step_id)
.first()
):
@@ -248,6 +245,8 @@ class AgentDB(TaskDB):
task_id=task.task_id,
input=task.input,
additional_input=task.additional_input,
artifacts=[],
steps=[],
)
for task in tasks
]

34
autogpt/middlewares.py Normal file
View File

@@ -0,0 +1,34 @@
from fastapi import FastAPI
class AgentMiddleware:
"""
Middleware that injects the agent instance into the request scope.
"""
def __init__(self, app: FastAPI, agent: "Agent"):
"""
Args:
app: The FastAPI app - automatically injected by FastAPI.
agent: The agent instance to inject into the request scope.
Examples:
>>> from fastapi import FastAPI, Request
>>> from agent_protocol.agent import Agent
>>> from agent_protocol.middlewares import AgentMiddleware
>>> app = FastAPI()
>>> @app.get("/")
>>> async def root(request: Request):
>>> agent = request["agent"]
>>> task = agent.db.create_task("Do something.")
>>> return {"task_id": a.task_id}
>>> agent = Agent()
>>> app.add_middleware(AgentMiddleware, agent=agent)
"""
self.app = app
self.agent = agent
async def __call__(self, scope, receive, send):
scope["agent"] = self.agent
await self.app(scope, receive, send)

View File

View File

@@ -0,0 +1,353 @@
"""
Routes for the Agent Service.
This module defines the API routes for the Agent service. While there are multiple endpoints provided by the service,
the ones that require special attention due to their complexity are:
1. `execute_agent_task_step`:
This route is significant because this is where the agent actually performs the work. The function handles
executing the next step for a task based on its current state, and it requires careful implementation to ensure
all scenarios (like the presence or absence of steps or a step marked as `last_step`) are handled correctly.
2. `upload_agent_task_artifacts`:
This route allows for the upload of artifacts, supporting various URI types (e.g., s3, gcs, ftp, http).
The support for different URI types makes it a bit more complex, and it's important to ensure that all
supported URI types are correctly managed. NOTE: The Auto-GPT team will eventually handle the most common
uri types for you.
3. `create_agent_task`:
While this is a simpler route, it plays a crucial role in the workflow, as it's responsible for the creation
of a new task.
Developers and contributors should be especially careful when making modifications to these routes to ensure
consistency and correctness in the system's behavior.
"""
from typing import List
from fastapi import APIRouter, Request, UploadFile
from fastapi.responses import FileResponse
from autogpt.schema import Artifact, Step, StepRequestBody, Task, TaskRequestBody
base_router = APIRouter()
@base_router.post("/agent/tasks", tags=["agent"], response_model=Task)
async def create_agent_task(request: Request, task_request: TaskRequestBody) -> Task:
"""
Creates a new task using the provided TaskRequestBody and returns a Task.
Args:
request (Request): FastAPI request object.
task (TaskRequestBody): The task request containing input and additional input data.
Returns:
Task: A new task with task_id, input, additional_input, and empty lists for artifacts and steps.
Example:
Request (TaskRequestBody defined in schema.py):
{
"input": "Write the words you receive to the file 'output.txt'.",
"additional_input": "python/code"
}
Response (Task defined in schema.py):
{
"task_id": "50da533e-3904-4401-8a07-c49adf88b5eb",
"input": "Write the word 'Washington' to a .txt file",
"additional_input": "python/code",
"artifacts": [],
"steps": []
}
"""
agent = request["agent"]
task_request = await agent.create_task(task_request)
return task_request
@base_router.get("/agent/tasks", tags=["agent"], response_model=List[str])
async def list_agent_tasks_ids(request: Request) -> List[str]:
"""
Gets a list of all task IDs.
Args:
request (Request): FastAPI request object.
Returns:
List[str]: A list of all task IDs.
Example:
Request:
GET /agent/tasks
Response:
[
"50da533e-3904-4401-8a07-c49adf88b5eb",
"b7d3c70a-7266-4b3a-818e-1327679f0117",
...
]
"""
agent = request["agent"]
return await agent.list_tasks()
@base_router.get("/agent/tasks/{task_id}", tags=["agent"], response_model=Task)
async def get_agent_task(request: Request, task_id: str):
"""
Gets the details of a task by ID.
Args:
request (Request): FastAPI request object.
task_id (str): The ID of the task.
Returns:
Task: The task with the given ID.
Example:
Request:
GET /agent/tasks/50da533e-3904-4401-8a07-c49adf88b5eb
Response (Task defined in schema.py):
{
"input": "Write the word 'Washington' to a .txt file",
"additional_input": null,
"task_id": "50da533e-3904-4401-8a07-c49adf88b5eb",
"artifacts": [
{
"artifact_id": "7a49f31c-f9c6-4346-a22c-e32bc5af4d8e",
"file_name": "output.txt",
"agent_created": true,
"uri": "file://50da533e-3904-4401-8a07-c49adf88b5eb/output.txt"
}
],
"steps": [
{
"task_id": "50da533e-3904-4401-8a07-c49adf88b5eb",
"step_id": "6bb1801a-fd80-45e8-899a-4dd723cc602e",
"input": "Write the word 'Washington' to a .txt file",
"additional_input": "challenge:write_to_file",
"name": "Write to file",
"status": "completed",
"output": "I am going to use the write_to_file command and write Washington to a file called output.txt <write_to_file('output.txt', 'Washington')>",
"additional_output": "Do you want me to continue?",
"artifacts": [
{
"artifact_id": "7a49f31c-f9c6-4346-a22c-e32bc5af4d8e",
"file_name": "output.txt",
"agent_created": true,
"uri": "file://50da533e-3904-4401-8a07-c49adf88b5eb/output.txt"
}
],
"is_last": true
}
]
}
"""
agent = request["agent"]
return await agent.get_task(task_id)
@base_router.get(
"/agent/tasks/{task_id}/steps", tags=["agent"], response_model=List[str]
)
async def list_agent_task_steps(request: Request, task_id: str) -> List[str]:
"""
Retrieves a list of step IDs associated with a specific task.
Args:
request (Request): FastAPI request object.
task_id (str): The ID of the task.
Returns:
List[str]: A list of step IDs.
Example:
Request:
GET /agent/tasks/50da533e-3904-4401-8a07-c49adf88b5eb/steps
Response:
["step1_id", "step2_id", ...]
"""
agent = request["agent"]
return await agent.list_steps(task_id)
@base_router.post("/agent/tasks/{task_id}/steps", tags=["agent"], response_model=Step)
async def execute_agent_task_step(
request: Request, task_id: str, step: StepRequestBody
) -> Step:
"""
Executes the next step for a specified task based on the current task status and returns the
executed step with additional feedback fields.
Depending on the current state of the task, the following scenarios are supported:
1. No steps exist for the task.
2. There is at least one step already for the task, and the task does not have a completed step marked as `last_step`.
3. There is a completed step marked as `last_step` already on the task.
In each of these scenarios, a step object will be returned with two additional fields: `output` and `additional_output`.
- `output`: Provides the primary response or feedback to the user.
- `additional_output`: Supplementary information or data. Its specific content is not strictly defined and can vary based on the step or agent's implementation.
Args:
request (Request): FastAPI request object.
task_id (str): The ID of the task.
step (StepRequestBody): The details for executing the step.
Returns:
Step: Details of the executed step with additional feedback.
Example:
Request:
POST /agent/tasks/50da533e-3904-4401-8a07-c49adf88b5eb/steps
{
"input": "Step input details...",
...
}
Response:
{
"task_id": "50da533e-3904-4401-8a07-c49adf88b5eb",
"step_id": "step1_id",
"output": "Primary feedback...",
"additional_output": "Supplementary details...",
...
}
"""
agent = request["agent"]
return await agent.create_and_execute_step(task_id, step)
@base_router.get(
"/agent/tasks/{task_id}/steps/{step_id}", tags=["agent"], response_model=Step
)
async def get_agent_task_step(request: Request, task_id: str, step_id: str) -> Step:
"""
Retrieves the details of a specific step for a given task.
Args:
request (Request): FastAPI request object.
task_id (str): The ID of the task.
step_id (str): The ID of the step.
Returns:
Step: Details of the specific step.
Example:
Request:
GET /agent/tasks/50da533e-3904-4401-8a07-c49adf88b5eb/steps/step1_id
Response:
{
"task_id": "50da533e-3904-4401-8a07-c49adf88b5eb",
"step_id": "step1_id",
...
}
"""
agent = request["agent"]
return await agent.get_step(task_id, step_id)
@base_router.get(
"/agent/tasks/{task_id}/artifacts", tags=["agent"], response_model=List[Artifact]
)
async def list_agent_task_artifacts(request: Request, task_id: str) -> List[Artifact]:
"""
Retrieves a list of artifacts associated with a specific task.
Args:
request (Request): FastAPI request object.
task_id (str): The ID of the task.
Returns:
List[Artifact]: A list of artifacts.
Example:
Request:
GET /agent/tasks/50da533e-3904-4401-8a07-c49adf88b5eb/artifacts
Response:
[
{"artifact_id": "artifact1_id", ...},
{"artifact_id": "artifact2_id", ...},
...
]
"""
agent = request["agent"]
return await agent.list_artifacts(task_id)
@base_router.post(
"/agent/tasks/{task_id}/artifacts", tags=["agent"], response_model=Artifact
)
async def upload_agent_task_artifacts(
request: Request,
task_id: str,
file: UploadFile | None = None,
uri: str | None = None,
) -> Artifact:
"""
Uploads an artifact for a specific task using either a provided file or a URI.
At least one of the parameters, `file` or `uri`, must be specified. The `uri` can point to
cloud storage resources such as S3, GCS, etc., or to other resources like FTP or HTTP.
To check the supported URI types for the agent, use the `/agent/artifacts/uris` endpoint.
Args:
request (Request): FastAPI request object.
task_id (str): The ID of the task.
file (UploadFile, optional): The uploaded file. Defaults to None.
uri (str, optional): The URI pointing to the resource. Defaults to None.
Returns:
Artifact: Details of the uploaded artifact.
Note:
Either `file` or `uri` must be provided. If both are provided, the behavior depends on
the agent's implementation. If neither is provided, the function will return an error.
Example:
Request:
POST /agent/tasks/50da533e-3904-4401-8a07-c49adf88b5eb/artifacts
File: <uploaded_file>
OR
URI: "s3://path/to/artifact"
Response:
{
"artifact_id": "artifact1_id",
...
}
"""
agent = request["agent"]
return await agent.create_artifact(task_id, file, uri)
@base_router.get(
"/agent/tasks/{task_id}/artifacts/{artifact_id}", tags=["agent"], response_model=str
)
async def download_agent_task_artifact(
request: Request, task_id: str, artifact_id: str
) -> FileResponse:
"""
Downloads an artifact associated with a specific task.
Args:
request (Request): FastAPI request object.
task_id (str): The ID of the task.
artifact_id (str): The ID of the artifact.
Returns:
FileResponse: The downloaded artifact file.
Example:
Request:
GET /agent/tasks/50da533e-3904-4401-8a07-c49adf88b5eb/artifacts/artifact1_id
Response:
<file_content_of_artifact>
"""
agent = request["agent"]
print(f"task_id: {task_id}, artifact_id: {artifact_id}")
return await agent.get_artifact(task_id, artifact_id)

131
autogpt/schema.py Normal file
View File

@@ -0,0 +1,131 @@
# generated by fastapi-codegen:
# filename: ../../openapi.yml
# timestamp: 2023-08-17T11:26:07+00:00
from __future__ import annotations
from enum import Enum
from typing import Any, List, Optional
from pydantic import BaseModel, Field
class TaskInput(BaseModel):
__root__: Any = Field(
...,
description="Input parameters for the task. Any value is allowed.",
example='{\n"debug": false,\n"mode": "benchmarks"\n}',
)
class Artifact(BaseModel):
artifact_id: str = Field(
...,
description="ID of the artifact.",
example="b225e278-8b4c-4f99-a696-8facf19f0e56",
)
file_name: str = Field(
..., description="Filename of the artifact.", example="main.py"
)
agent_created: Optional[bool] = Field(
None,
description="Whether the artifact has been created by the agent.",
example=False,
)
uri: Optional[str] = Field(
None,
description="URI of the artifact.",
example="file://home/bob/workspace/bucket/main.py",
)
class ArtifactUpload(BaseModel):
file: bytes = Field(..., description="File to upload.")
relative_path: Optional[str] = Field(
None,
description="Relative path of the artifact in the agent's workspace.",
example="python/code",
)
class StepInput(BaseModel):
__root__: Any = Field(
...,
description="Input parameters for the task step. Any value is allowed.",
example='{\n"file_to_refactor": "models.py"\n}',
)
class StepOutput(BaseModel):
__root__: Any = Field(
...,
description="Output that the task step has produced. Any value is allowed.",
example='{\n"tokens": 7894,\n"estimated_cost": "0,24$"\n}',
)
class TaskRequestBody(BaseModel):
input: Optional[str] = Field(
None,
description="Input prompt for the task.",
example="Write the words you receive to the file 'output.txt'.",
)
additional_input: Optional[TaskInput] = None
class Task(TaskRequestBody):
task_id: str = Field(
...,
description="The ID of the task.",
example="50da533e-3904-4401-8a07-c49adf88b5eb",
)
artifacts: Optional[List[Artifact]] = Field(
...,
description="A list of artifacts that the task has produced.",
example=[
"7a49f31c-f9c6-4346-a22c-e32bc5af4d8e",
"ab7b4091-2560-4692-a4fe-d831ea3ca7d6",
],
)
class StepRequestBody(BaseModel):
input: Optional[str] = Field(
None, description="Input prompt for the step.", example="Washington"
)
additional_input: Optional[StepInput] = None
class Status(Enum):
created = "created"
running = "running"
completed = "completed"
class Step(StepRequestBody):
task_id: str = Field(
...,
description="The ID of the task this step belongs to.",
example="50da533e-3904-4401-8a07-c49adf88b5eb",
)
step_id: str = Field(
...,
description="The ID of the task step.",
example="6bb1801a-fd80-45e8-899a-4dd723cc602e",
)
name: Optional[str] = Field(
None, description="The name of the task step.", example="Write to file"
)
status: Status = Field(..., description="The status of the task step.")
output: Optional[str] = Field(
None,
description="Output of the task step.",
example="I am going to use the write_to_file command and write Washington to a file called output.txt <write_to_file('output.txt', 'Washington')",
)
additional_output: Optional[StepOutput] = None
artifacts: Optional[List[Artifact]] = Field(
[], description="A list of artifacts that the step has produced."
)
is_last: Optional[bool] = Field(
False, description="Whether this is the last step in the task."
)

View File

@@ -43,4 +43,4 @@ def test_local_list(setup_local_workspace):
workspace.write("test2.txt", TEST_FILE_CONTENT)
files = workspace.list(".")
assert set(files) == set(["test1.txt", "test2.txt"])
assert set(files) == {"test1.txt", "test2.txt"}