streaming rewrite WIP

This commit is contained in:
Kosta Petan
2024-02-26 21:09:58 +01:00
parent 35325c0d51
commit 9c2da17ee7
7 changed files with 84 additions and 90 deletions

View File

@@ -49,7 +49,7 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor
else if (issuesEvent.Action == IssuesAction.Closed && issuesEvent.Issue.User.Type.Value == UserType.Bot)
{
_logger.LogInformation("Processing HandleClosingIssue");
await HandleClosingIssue(issueNumber, parentNumber, suffix, org, repo);
await HandleClosingIssue(issueNumber, parentNumber,skillName, labels[skillName], suffix, org, repo);
}
}
catch (System.Exception)
@@ -91,11 +91,18 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor
}
private async Task HandleClosingIssue(long issueNumber, long? parentNumber, string suffix, string org, string repo)
private async Task HandleClosingIssue(long issueNumber, long? parentNumber, string skillName, string functionName, string suffix, string org, string repo)
{
var streamProvider = _client.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create(Consts.MainNamespace, suffix+issueNumber.ToString());
var stream = streamProvider.GetStream<Event>(streamId);
var eventType = (skillName, functionName) switch
{
(nameof(PM), nameof(PM.Readme)) => EventType.ReadmeChainClosed,
(nameof(DevLead), nameof(DevLead.Plan)) => EventType.DevPlanChainClosed,
(nameof(Developer), nameof(Developer.Implement)) => EventType.CodeChainClosed,
_ => EventType.NewAsk
};
var data = new Dictionary<string, string>
{
{ "org", org },
@@ -106,7 +113,7 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor
await stream.OnNextAsync(new Event
{
Type = EventType.ChainClosed,
Type = eventType,
Data = data
});
}

View File

@@ -26,11 +26,11 @@ public class AzureGenie : Agent
{
switch (item.Type)
{
case EventType.ReadmeChainClosed:
case EventType.ReadmeCreated:
//_azureService.Store();
// postEvent ReadmeStored
break;
case EventType.CodeChainClosed:
case EventType.CodeCreated:
// _azureService.Store();
// _azureService.RunInSandbox();
break;

View File

@@ -42,7 +42,7 @@ public class Dev : AiAgent
//await _ghService.PostComment(item.Data["org"], item.Data["repo"], long.Parse(item.Data["issueNumber"]), code);
// postEvent EventType.CodeGenerated
break;
case EventType.ChainClosed:
case EventType.CodeChainClosed:
await CloseImplementation();
// postEvent EventType.CodeFinished
break;

View File

@@ -38,20 +38,32 @@ public class DeveloperLead : AiAgent
{
case EventType.DevPlanRequested:
var plan = await CreatePlan(item.Message);
await PublishEvent(Consts.MainNamespace, this.GetPrimaryKeyString(), new Event {
Type = EventType.DevPlanGenerated,
Data = new Dictionary<string, string> {
await PublishEvent(Consts.MainNamespace, this.GetPrimaryKeyString(), new Event
{
Type = EventType.DevPlanGenerated,
Data = new Dictionary<string, string> {
{ "org", item.Data["org"] },
{ "repo", item.Data["repo"] },
{ "issueNumber", item.Data["issueNumber"] },
{ "plan", plan }
},
Message = plan
Message = plan
});
break;
case EventType.ChainClosed:
await ClosePlan(item.Data["org"], item.Data["repo"], long.Parse(item.Data["issueNumber"]), long.Parse(item.Data["parentNumber"]));
// postEvent EventType.DevPlanFinished
case EventType.DevPlanChainClosed:
var latestPlan = _state.State.History.Last().Message;
await PublishEvent(Consts.MainNamespace, this.GetPrimaryKeyString(), new Event
{
Type = EventType.DevPlanCreated,
Data = new Dictionary<string, string> {
{ "org", item.Data["org"] },
{ "repo", item.Data["repo"] },
{ "issueNumber", item.Data["issueNumber"] },
{"parentNumber", item.Data["parentNumber"]},
{ "plan", latestPlan }
},
Message = latestPlan
});
break;
default:
break;
@@ -69,43 +81,6 @@ public class DeveloperLead : AiAgent
return default;
}
}
public async Task ClosePlan(string org, string repo, long issueNumber, long parentNumber)
{
var plan = await GetLatestPlan();
var suffix = $"{org}-{repo}";
var streamProvider = this.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create(Consts.MainNamespace, suffix+parentNumber.ToString());
var stream = streamProvider.GetStream<Event>(streamId);
var eventTasks = plan.steps.SelectMany(s => s.subtasks.Select(st => stream.OnNextAsync(new Event {
Type = EventType.NewAsk,
Data = new Dictionary<string, string>
{
{ "org", org },
{ "repo", repo },
{ "parentNumber", parentNumber.ToString()}
},
Message = st.prompt
})));
Task.WaitAll(eventTasks.ToArray());
//await conductor.ImplementationFlow(plan, org, repo, parentIssue.IssueNumber);
// await _ghService.MarkTaskComplete(new MarkTaskCompleteRequest
// {
// Org = org,
// Repo = repo,
// CommentId = _state.State.CommentId
// });
}
public Task<DevLeadPlanResponse> GetLatestPlan()
{
var plan = _state.State.History.Last().Message;
var response = JsonSerializer.Deserialize<DevLeadPlanResponse>(plan);
return Task.FromResult(response);
}
}
[GenerateSerializer]

View File

@@ -41,7 +41,7 @@ public class Hubber : Agent
case EventType.CodeGenerated:
await PostComment(item.Data["org"], item.Data["repo"], long.Parse(item.Data["issueNumber"]), item.Message);
break;
case EventType.DevPlanChainClosed:
case EventType.DevPlanCreated:
var plan = JsonSerializer.Deserialize<DevLeadPlanResponse>(item.Data["plan"]);
var devTasks = plan.steps.SelectMany(s => s.subtasks.Select(st => st.prompt)).Select(p =>
CreateIssue(item.Data["org"], item.Data["repo"], p, $"{nameof(Developer)}.{nameof(Developer.Implement)}", long.Parse(item.Data["parentNumber"])));

View File

@@ -45,17 +45,26 @@ public class ProductManager : AiAgent
},
Message = readme
});
//await _ghService.PostComment(item.Data["org"], item.Data["repo"], long.Parse(item.Data["issueNumber"]), readme);
// postEvent ReadmeGenerated
break;
case EventType.ChainClosed:
await CloseReadme();
// postEvent ReadmeFinished
case EventType.ReadmeChainClosed:
var lastReadme = _state.State.History.Last().Message;
await PublishEvent(Consts.MainNamespace, this.GetPrimaryKeyString(), new Event {
Type = EventType.ReadmeCreated,
Data = new Dictionary<string, string> {
{ "org", item.Data["org"] },
{ "repo", item.Data["repo"] },
{ "issueNumber", item.Data["issueNumber"] },
{ "readme", lastReadme },
{ "parentNumber", item.Data["parentNumber"] }
},
Message = lastReadme
});
break;
default:
break;
}
}
public async Task<string> CreateReadme(string ask)
{
try
@@ -69,37 +78,37 @@ public class ProductManager : AiAgent
}
}
public async Task CloseReadme()
{
// var pm = _grains.GetGrain<IManageProduct>(issueNumber, suffix);
// var readme = await pm.GetLastMessage();
// var lookup = _grains.GetGrain<ILookupMetadata>(suffix);
// var parentIssue = await lookup.GetMetadata((int)issueNumber);
// await _azService.Store(new SaveOutputRequest
// {
// ParentIssueNumber = parentIssue.IssueNumber,
// IssueNumber = (int)issueNumber,
// Output = readme,
// Extension = "md",
// Directory = "output",
// FileName = "readme",
// Org = org,
// Repo = repo
// });
// await _ghService.CommitToBranch(new CommitRequest
// {
// Dir = "output",
// Org = org,
// Repo = repo,
// ParentNumber = parentIssue.IssueNumber,
// Number = (int)issueNumber,
// Branch = $"sk-{parentIssue.IssueNumber}"
// });
// await _ghService.MarkTaskComplete(new MarkTaskCompleteRequest
// {
// Org = org,
// Repo = repo,
// CommentId = parentIssue.CommentId
// });
}
// public async Task CloseReadme()
// {
// var pm = _grains.GetGrain<IManageProduct>(issueNumber, suffix);
// var readme = await pm.GetLastMessage();
// var lookup = _grains.GetGrain<ILookupMetadata>(suffix);
// var parentIssue = await lookup.GetMetadata((int)issueNumber);
// await _azService.Store(new SaveOutputRequest
// {
// ParentIssueNumber = parentIssue.IssueNumber,
// IssueNumber = (int)issueNumber,
// Output = readme,
// Extension = "md",
// Directory = "output",
// FileName = "readme",
// Org = org,
// Repo = repo
// });
// await _ghService.CommitToBranch(new CommitRequest
// {
// Dir = "output",
// Org = org,
// Repo = repo,
// ParentNumber = parentIssue.IssueNumber,
// Number = (int)issueNumber,
// Branch = $"sk-{parentIssue.IssueNumber}"
// });
// await _ghService.MarkTaskComplete(new MarkTaskCompleteRequest
// {
// Org = org,
// Repo = repo,
// CommentId = parentIssue.CommentId
// });
// }
}

View File

@@ -24,5 +24,8 @@ public enum EventType
DevPlanChainClosed,
ReadmeRequested,
ReadmeStored,
SandboxRunFinished
SandboxRunFinished,
ReadmeCreated,
CodeCreated,
DevPlanCreated
}