rework orchestration WIP

This commit is contained in:
Kosta Petan
2024-02-01 21:03:33 +00:00
parent 5f67942126
commit 8fcd2f43ff
6 changed files with 203 additions and 192 deletions

View File

@@ -2,7 +2,6 @@ using Microsoft.AI.DevTeam;
using Microsoft.AI.DevTeam.Skills;
using Octokit.Webhooks;
using Octokit.Webhooks.Events;
using Octokit.Webhooks.Events.CommitComment;
using Octokit.Webhooks.Events.IssueComment;
using Octokit.Webhooks.Events.Issues;
using Octokit.Webhooks.Models;
@@ -12,16 +11,16 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor
{
private readonly ILogger<GithubWebHookProcessor> _logger;
private readonly IGrainFactory _grains;
private readonly IClusterClient client;
private readonly IClusterClient _client;
private readonly IManageGithub _ghService;
private readonly IManageAzure _azService;
public GithubWebHookProcessor(ILogger<GithubWebHookProcessor> logger, IGrainFactory grains,
public GithubWebHookProcessor(ILogger<GithubWebHookProcessor> logger, IGrainFactory grains,
IClusterClient client, IManageGithub ghService, IManageAzure azService)
{
_logger = logger;
_grains = grains;
this.client = client;
_client = client;
_ghService = ghService;
_azService = azService;
}
@@ -67,194 +66,49 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor
}
}
// TODO: implement
protected override Task ProcessPushWebhookAsync(WebhookHeaders headers, PushEvent pushEvent)
{
var org = pushEvent.Organization.Login;
var repo = pushEvent.Repository.Name;
// Assumes the label follows the following convention: Skill.Function example: PM.Readme
var suffix = $"{org}-{repo}";
var ingester = _grains.GetGrain<IIngestRepo>(suffix);
return Task.CompletedTask;
}
private async Task HandleClosingIssue(long issueNumber, string skillName, string functionName, string suffix, string org, string repo)
{
if (skillName == nameof(PM) && functionName == nameof(PM.Readme))
{
await HandleClosingReadme(issueNumber, suffix, org, repo);
}
else if (skillName == nameof(DevLead) && functionName == nameof(DevLead.Plan))
{
await HandleClosingDevPlan(issueNumber, suffix, org, repo);
}
else if (skillName == nameof(Developer) && functionName == nameof(Developer.Implement))
{
await HandleClosingDevImplement(issueNumber, suffix, org, repo);
}
else { } // something went wrong
}
var streamProvider = _client.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create(suffix, issueNumber.ToString());
var stream = streamProvider.GetStream<Event>(streamId);
private async Task HandleClosingDevImplement(long issueNumber, string suffix, string org, string repo)
{
var dev = _grains.GetGrain<IDevelopCode>(issueNumber, suffix);
var code = await dev.GetLastMessage();
var lookup = _grains.GetGrain<ILookupMetadata>(suffix);
var parentIssue = await lookup.GetMetadata((int)issueNumber);
await _azService.Store(new SaveOutputRequest
{
ParentIssueNumber = parentIssue.IssueNumber,
IssueNumber = (int)issueNumber,
Output = code,
Extension = "sh",
Directory = "output",
FileName = "run",
Org = org,
Repo = repo
});
var sandboxRequest = new SandboxRequest
await stream.OnNextAsync(new Event
{
Type = EventType.ChainClosed,
Org = org,
Repo = repo,
IssueNumber = (int)issueNumber,
ParentIssueNumber = parentIssue.IssueNumber
};
await _azService.RunInSandbox(sandboxRequest);
var commitRequest = new CommitRequest
{
Dir = "output",
Org = org,
Repo = repo,
ParentNumber = parentIssue.IssueNumber,
Number = (int)issueNumber,
Branch = $"sk-{parentIssue.IssueNumber}"
};
var markTaskCompleteRequest = new MarkTaskCompleteRequest
{
Org = org,
Repo = repo,
CommentId = parentIssue.CommentId
};
var sandbox = _grains.GetGrain<IManageSandbox>(issueNumber, suffix);
await sandbox.ScheduleCommitSandboxRun(commitRequest, markTaskCompleteRequest, sandboxRequest);
}
private async Task HandleClosingDevPlan(long issueNumber, string suffix, string org, string repo)
{
var devLead = _grains.GetGrain<ILeadDevelopment>(issueNumber, suffix);
var lookup = _grains.GetGrain<ILookupMetadata>(suffix);
var parentIssue = await lookup.GetMetadata((int)issueNumber);
var conductor = _grains.GetGrain<IOrchestrateWorkflows>(parentIssue.IssueNumber, suffix);
var plan = await devLead.GetLatestPlan();
await conductor.ImplementationFlow(plan, org, repo, parentIssue.IssueNumber);
await _ghService.MarkTaskComplete(new MarkTaskCompleteRequest
{
Org = org,
Repo = repo,
CommentId = parentIssue.CommentId
});
}
private async Task HandleClosingReadme(long issueNumber, string suffix, string org, string repo)
{
var pm = _grains.GetGrain<IManageProduct>(issueNumber, suffix);
var readme = await pm.GetLastMessage();
var lookup = _grains.GetGrain<ILookupMetadata>(suffix);
var parentIssue = await lookup.GetMetadata((int)issueNumber);
await _azService.Store(new SaveOutputRequest
{
ParentIssueNumber = parentIssue.IssueNumber,
IssueNumber = (int)issueNumber,
Output = readme,
Extension = "md",
Directory = "output",
FileName = "readme",
Org = org,
Repo = repo
});
await _ghService.CommitToBranch(new CommitRequest
{
Dir = "output",
Org = org,
Repo = repo,
ParentNumber = parentIssue.IssueNumber,
Number = (int)issueNumber,
Branch = $"sk-{parentIssue.IssueNumber}"
});
await _ghService.MarkTaskComplete(new MarkTaskCompleteRequest
{
Org = org,
Repo = repo,
CommentId = parentIssue.CommentId
IssueNumber = issueNumber
});
}
private async Task HandleNewAsk(long issueNumber, string skillName, string functionName, string suffix, string input, string org, string repo)
{
if (skillName == "Do" && functionName == "It")
var streamProvider = _client.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create(suffix, issueNumber.ToString());
var stream = streamProvider.GetStream<Event>(streamId);
var eventType = (skillName, functionName) switch
{
var conductor = _grains.GetGrain<IOrchestrateWorkflows>(issueNumber, suffix);
await conductor.InitialFlow(input, org, repo, issueNumber);
var streamProvider = client.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create(suffix, issueNumber.ToString());
var stream = streamProvider.GetStream<Event>(streamId);
await stream.OnNextAsync(new Event{
Type = EventType.NewAsk,
Message = input,
Org = org,
Repo = repo,
IssueNumber = issueNumber
});
}
else if (skillName == "Repo" && functionName == "Ingest")
("Do", "It") => EventType.NewAsk,
(nameof(PM), nameof(PM.Readme)) => EventType.NewAskReadme,
(nameof(DevLead), nameof(DevLead.Plan)) => EventType.NewAskPlan,
(nameof(Developer), nameof(Developer.Implement)) => EventType.NewAskImplement,
_ => EventType.NewAsk
};
await stream.OnNextAsync(new Event
{
var ingestor = _grains.GetGrain<IIngestRepo>(suffix);
await ingestor.IngestionFlow(org, repo, "main");
}
else if (skillName == nameof(PM) && functionName == nameof(PM.Readme))
{
var pm = _grains.GetGrain<IManageProduct>(issueNumber, suffix);
var readme = await pm.CreateReadme(input);
await _ghService.PostComment(new PostCommentRequest
{
Org = org,
Repo = repo,
Number = (int)issueNumber,
Content = string.IsNullOrEmpty(readme)? "Sorry, something went wrong": readme
});
}
else if (skillName == nameof(DevLead) && functionName == nameof(DevLead.Plan))
{
var devLead = _grains.GetGrain<ILeadDevelopment>(issueNumber, suffix);
var plan = await devLead.CreatePlan(input);
await _ghService.PostComment(new PostCommentRequest
{
Org = org,
Repo = repo,
Number = (int)issueNumber,
Content = string.IsNullOrEmpty(plan)? "Sorry, something went wrong":plan
});
}
else if (skillName == nameof(Developer) && functionName == nameof(Developer.Implement))
{
var dev = _grains.GetGrain<IDevelopCode>(issueNumber, suffix);
var code = await dev.GenerateCode(input);
await _ghService.PostComment(new PostCommentRequest
{
Org = org,
Repo = repo,
Number = (int)issueNumber,
Content = string.IsNullOrEmpty(code)? "Sorry, something went wrong":code
});
}
else { }// something went wrong
Type = eventType,
Message = input,
Org = org,
Repo = repo,
IssueNumber = issueNumber
});
// else if (skillName == "Repo" && functionName == "Ingest")
// {
// var ingestor = _grains.GetGrain<IIngestRepo>(suffix);
// await ingestor.IngestionFlow(org, repo, "main");
// }
}
}

View File

@@ -15,13 +15,31 @@ public class DeveloperLead : SemanticPersona, ILeadDevelopment
private readonly ISemanticTextMemory _memory;
private readonly ILogger<DeveloperLead> _logger;
private readonly IManageGithub _ghService;
protected override string MemorySegment => "dev-lead-memory";
public DeveloperLead([PersistentState("state", "messages")] IPersistentState<SemanticPersonaState> state,IKernel kernel, ISemanticTextMemory memory, ILogger<DeveloperLead> logger) : base(state)
public DeveloperLead([PersistentState("state", "messages")] IPersistentState<SemanticPersonaState> state, IKernel kernel, ISemanticTextMemory memory, ILogger<DeveloperLead> logger, IManageGithub ghService) : base(state)
{
_kernel = kernel;
_memory = memory;
_logger = logger;
_ghService = ghService;
}
public async Task CreateIssue(string org, string repo, long parentNumber, string input)
{
var devLeadIssue = await _ghService.CreateIssue(new CreateIssueRequest
{
Label = $"{nameof(DevLead)}.{nameof(DevLead.Plan)}",
Org = org,
Repo = repo,
Input = input,
ParentNumber = parentNumber
});
_state.State.ParentIssueNumber = parentNumber;
await _state.WriteStateAsync();
}
public async Task<string> CreatePlan(string ask)
{
@@ -49,7 +67,7 @@ public class DeveloperLead : SemanticPersona, ILeadDevelopment
UserType = ChatUserType.Agent
});
await _state.WriteStateAsync();
return resultMessage;
}
catch (Exception ex)
@@ -59,6 +77,23 @@ public class DeveloperLead : SemanticPersona, ILeadDevelopment
}
}
public async Task ClosePlan()
{
// var devLead = _grains.GetGrain<ILeadDevelopment>(issueNumber, suffix);
// var lookup = _grains.GetGrain<ILookupMetadata>(suffix);
// var parentIssue = await lookup.GetMetadata((int)issueNumber);
// var conductor = _grains.GetGrain<IOrchestrateWorkflows>(parentIssue.IssueNumber, suffix);
// var plan = await devLead.GetLatestPlan();
// await conductor.ImplementationFlow(plan, org, repo, parentIssue.IssueNumber);
// await _ghService.MarkTaskComplete(new MarkTaskCompleteRequest
// {
// Org = org,
// Repo = repo,
// CommentId = parentIssue.CommentId
// });
}
public Task<DevLeadPlanResponse> GetLatestPlan()
{
var plan = _state.State.History.Last().Message;
@@ -71,6 +106,7 @@ public class DeveloperLead : SemanticPersona, ILeadDevelopment
switch (item.Type)
{
case EventType.NewAsk:
await CreateIssue(item.Org, item.Repo, item.IssueNumber, item.Message);
break;
default:
break;

View File

@@ -14,16 +14,32 @@ public class Dev : SemanticPersona, IDevelopCode
private readonly IKernel _kernel;
private readonly ISemanticTextMemory _memory;
private readonly ILogger<Dev> _logger;
private readonly IManageGithub _ghService;
protected override string MemorySegment => "dev-memory";
public Dev([PersistentState("state", "messages")] IPersistentState<SemanticPersonaState> state, IKernel kernel, ISemanticTextMemory memory, ILogger<Dev> logger) : base(state)
public Dev([PersistentState("state", "messages")] IPersistentState<SemanticPersonaState> state, IKernel kernel, ISemanticTextMemory memory, ILogger<Dev> logger, IManageGithub ghService) : base(state)
{
_kernel = kernel;
_memory = memory;
_logger = logger;
_ghService = ghService;
}
public async Task CreateIssue(string org, string repo, long parentNumber, string input)
{
var devLeadIssue = await _ghService.CreateIssue(new CreateIssueRequest
{
Label = $"{nameof(Developer)}.{nameof(Developer.Implement)}",
Org = org,
Repo = repo,
Input = input,
ParentNumber = parentNumber
});
_state.State.ParentIssueNumber = parentNumber;
await _state.WriteStateAsync();
}
public async Task<string> GenerateCode(string ask)
{
try
@@ -76,6 +92,52 @@ public class Dev : SemanticPersona, IDevelopCode
throw new NotImplementedException();
}
public async Task CloseImplementation()
{
// var dev = _grains.GetGrain<IDevelopCode>(issueNumber, suffix);
// var code = await dev.GetLastMessage();
// var lookup = _grains.GetGrain<ILookupMetadata>(suffix);
// var parentIssue = await lookup.GetMetadata((int)issueNumber);
// await _azService.Store(new SaveOutputRequest
// {
// ParentIssueNumber = parentIssue.IssueNumber,
// IssueNumber = (int)issueNumber,
// Output = code,
// Extension = "sh",
// Directory = "output",
// FileName = "run",
// Org = org,
// Repo = repo
// });
// var sandboxRequest = new SandboxRequest
// {
// Org = org,
// Repo = repo,
// IssueNumber = (int)issueNumber,
// ParentIssueNumber = parentIssue.IssueNumber
// };
// await _azService.RunInSandbox(sandboxRequest);
// var commitRequest = new CommitRequest
// {
// Dir = "output",
// Org = org,
// Repo = repo,
// ParentNumber = parentIssue.IssueNumber,
// Number = (int)issueNumber,
// Branch = $"sk-{parentIssue.IssueNumber}"
// };
// var markTaskCompleteRequest = new MarkTaskCompleteRequest
// {
// Org = org,
// Repo = repo,
// CommentId = parentIssue.CommentId
// };
// var sandbox = _grains.GetGrain<IManageSandbox>(issueNumber, suffix);
// await sandbox.ScheduleCommitSandboxRun(commitRequest, markTaskCompleteRequest, sandboxRequest);
}
public async Task<UnderstandingResult> BuildUnderstanding(string content)
{
try

View File

@@ -14,13 +14,45 @@ public class ProductManager : SemanticPersona, IManageProduct
private readonly ISemanticTextMemory _memory;
private readonly ILogger<ProductManager> _logger;
private readonly IManageGithub _ghService;
protected override string MemorySegment => "pm-memory";
public ProductManager([PersistentState("state", "messages")] IPersistentState<SemanticPersonaState> state, IKernel kernel, ISemanticTextMemory memory, ILogger<ProductManager> logger) : base(state)
public ProductManager([PersistentState("state", "messages")] IPersistentState<SemanticPersonaState> state, IKernel kernel, ISemanticTextMemory memory, ILogger<ProductManager> logger, IManageGithub ghService) : base(state)
{
_kernel = kernel;
_memory = memory;
_logger = logger;
_ghService = ghService;
}
public async override Task HandleEvent(Event item, StreamSequenceToken? token)
{
switch (item.Type)
{
case EventType.NewAsk:
await CreateIssue(item.Org, item.Repo, item.IssueNumber, item.Message);
break;
default:
break;
}
}
public async Task CreateIssue(string org, string repo, long parentNumber, string input)
{
// TODO: Create branch and PR
var pmIssue = await _ghService.CreateIssue(new CreateIssueRequest
{
Label = $"{nameof(PM)}.{nameof(PM.Readme)}",
Org = org,
Repo = repo,
Input = input,
ParentNumber = parentNumber
});
_state.State.ParentIssueNumber = parentNumber;
await _state.WriteStateAsync();
}
public async Task<string> CreateReadme(string ask)
{
@@ -57,14 +89,37 @@ public class ProductManager : SemanticPersona, IManageProduct
}
}
public async override Task HandleEvent(Event item, StreamSequenceToken? token)
public async Task CloseReadme()
{
switch (item.Type)
{
case EventType.NewAsk:
break;
default:
break;
}
// var pm = _grains.GetGrain<IManageProduct>(issueNumber, suffix);
// var readme = await pm.GetLastMessage();
// var lookup = _grains.GetGrain<ILookupMetadata>(suffix);
// var parentIssue = await lookup.GetMetadata((int)issueNumber);
// await _azService.Store(new SaveOutputRequest
// {
// ParentIssueNumber = parentIssue.IssueNumber,
// IssueNumber = (int)issueNumber,
// Output = readme,
// Extension = "md",
// Directory = "output",
// FileName = "readme",
// Org = org,
// Repo = repo
// });
// await _ghService.CommitToBranch(new CommitRequest
// {
// Dir = "output",
// Org = org,
// Repo = repo,
// ParentNumber = parentIssue.IssueNumber,
// Number = (int)issueNumber,
// Branch = $"sk-{parentIssue.IssueNumber}"
// });
// await _ghService.MarkTaskComplete(new MarkTaskCompleteRequest
// {
// Org = org,
// Repo = repo,
// CommentId = parentIssue.CommentId
// });
}
}

View File

@@ -86,6 +86,7 @@ public class SemanticPersonaState
{
public List<ChatHistoryItem> History { get; set; }
public string Understanding { get; set; }
public long ParentIssueNumber { get; set; }
}
public enum ChatUserType

View File

@@ -10,8 +10,11 @@ public class Event
public enum EventType
{
NewAsk,
NewAskReadme,
NewAskPlan,
NewAskImplement,
ChainClosed,
ReadmeCreated,
PlanSubstemCreated,
PlanSubstepCreated,
CodeCreated
}