From d55c2fa4b9fc94b236eb51bb82eb3068770b5e82 Mon Sep 17 00:00:00 2001 From: Kosta Petan Date: Mon, 19 Feb 2024 14:48:17 +0000 Subject: [PATCH] streaming WIP --- src/apps/gh-flow/Program.cs | 3 +- .../Services/GithubWebHookProcessor.cs | 124 +++++++++++------- .../Actors/DevLead/DeveloperLead.cs | 2 +- .../Actors/ProductManager/ProductManager.cs | 2 +- src/libs/Microsoft.AI.DevTeam/Events/Event.cs | 7 + 5 files changed, 85 insertions(+), 53 deletions(-) diff --git a/src/apps/gh-flow/Program.cs b/src/apps/gh-flow/Program.cs index b841bba5e..ea154c051 100644 --- a/src/apps/gh-flow/Program.cs +++ b/src/apps/gh-flow/Program.cs @@ -67,8 +67,7 @@ builder.Host.UseOrleans(siloBuilder => if (builder.Environment.IsDevelopment()) { var connectionString = builder.Configuration.GetValue("AzureOptions:CosmosConnectionString"); - siloBuilder.AddMemoryStreams("StreamProvider") - .AddMemoryGrainStorage("PubSubStore"); + siloBuilder.AddMemoryStreams("StreamProvider"); siloBuilder.UseCosmosReminderService( o => { o.ConfigureCosmosClient(connectionString); diff --git a/src/apps/gh-flow/Services/GithubWebHookProcessor.cs b/src/apps/gh-flow/Services/GithubWebHookProcessor.cs index 4f470aff9..376fe14dc 100644 --- a/src/apps/gh-flow/Services/GithubWebHookProcessor.cs +++ b/src/apps/gh-flow/Services/GithubWebHookProcessor.cs @@ -24,22 +24,30 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor } protected override async Task ProcessIssuesWebhookAsync(WebhookHeaders headers, IssuesEvent issuesEvent, IssuesAction action) { - var org = issuesEvent.Organization.Login; - var repo = issuesEvent.Repository.Name; - var issueNumber = issuesEvent.Issue.Number; - var input = issuesEvent.Issue.Body; - // Assumes the label follows the following convention: Skill.Function example: PM.Readme - var labels = issuesEvent.Issue.Labels.First().Name.Split("."); - var skillName = labels[0]; - var functionName = labels[1]; - var suffix = $"{org}-{repo}"; - if (issuesEvent.Action == IssuesAction.Opened) + try { - await HandleNewAsk(issueNumber, skillName, functionName, suffix, input, org, repo); + _logger.LogInformation("Processing issue event"); + var org = issuesEvent.Organization.Login; + var repo = issuesEvent.Repository.Name; + var issueNumber = issuesEvent.Issue.Number; + var input = issuesEvent.Issue.Body; + // Assumes the label follows the following convention: Skill.Function example: PM.Readme + var labels = issuesEvent.Issue.Labels.First().Name.Split("."); + var skillName = labels[0]; + var functionName = labels[1]; + var suffix = $"{org}-{repo}"; + if (issuesEvent.Action == IssuesAction.Opened) + { + await HandleNewAsk(issueNumber, skillName, functionName, suffix, input, org, repo); + } + else if (issuesEvent.Action == IssuesAction.Closed && issuesEvent.Issue.User.Type.Value == UserType.Bot) + { + await HandleClosingIssue(issueNumber, skillName, functionName, suffix, org, repo); + } } - else if (issuesEvent.Action == IssuesAction.Closed && issuesEvent.Issue.User.Type.Value == UserType.Bot) + catch (System.Exception) { - await HandleClosingIssue(issueNumber, skillName, functionName, suffix, org, repo); + _logger.LogError("Processing issue event"); } } @@ -48,20 +56,29 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor IssueCommentEvent issueCommentEvent, IssueCommentAction action) { - var org = issueCommentEvent.Organization.Login; - var repo = issueCommentEvent.Repository.Name; - var issueNumber = issueCommentEvent.Issue.Number; - var input = issueCommentEvent.Issue.Body; - // Assumes the label follows the following convention: Skill.Function example: PM.Readme - var labels = issueCommentEvent.Issue.Labels.First().Name.Split("."); - var skillName = labels[0]; - var functionName = labels[1]; - var suffix = $"{org}-{repo}"; - // we only resond to non-bot comments - if (issueCommentEvent.Sender.Type.Value != UserType.Bot) + try { - await HandleNewAsk(issueNumber, skillName, functionName, suffix, input, org, repo); + _logger.LogInformation("Processing issue comment event"); + var org = issueCommentEvent.Organization.Login; + var repo = issueCommentEvent.Repository.Name; + var issueNumber = issueCommentEvent.Issue.Number; + var input = issueCommentEvent.Issue.Body; + // Assumes the label follows the following convention: Skill.Function example: PM.Readme + var labels = issueCommentEvent.Issue.Labels.First().Name.Split("."); + var skillName = labels[0]; + var functionName = labels[1]; + var suffix = $"{org}-{repo}"; + // we only resond to non-bot comments + if (issueCommentEvent.Sender.Type.Value != UserType.Bot) + { + await HandleNewAsk(issueNumber, skillName, functionName, suffix, input, org, repo); + } } + catch (System.Exception) + { + _logger.LogError("Processing issue comment event"); + } + } private async Task HandleClosingIssue(long issueNumber, string skillName, string functionName, string suffix, string org, string repo) @@ -81,32 +98,41 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor private async Task HandleNewAsk(long issueNumber, string skillName, string functionName, string suffix, string input, string org, string repo) { - var streamProvider = _client.GetStreamProvider("StreamProvider"); - var streamId = StreamId.Create("DevPersonas", suffix+issueNumber.ToString()); - var stream = streamProvider.GetStream(streamId); - - var eventType = (skillName, functionName) switch + try { - ("Do", "It") => EventType.NewAsk, - (nameof(PM), nameof(PM.Readme)) => EventType.NewAskReadme, - (nameof(DevLead), nameof(DevLead.Plan)) => EventType.NewAskPlan, - (nameof(Developer), nameof(Developer.Implement)) => EventType.NewAskImplement, - _ => EventType.NewAsk - }; - await stream.OnNextAsync(new Event - { - Type = eventType, - Message = input, - Org = org, - Repo = repo, - IssueNumber = issueNumber - }); + _logger.LogInformation("Handling new ask"); + var streamProvider = _client.GetStreamProvider("StreamProvider"); + var streamId = StreamId.Create("DevPersonas", suffix+issueNumber.ToString()); + var stream = streamProvider.GetStream(streamId); - // else if (skillName == "Repo" && functionName == "Ingest") - // { - // var ingestor = _grains.GetGrain(suffix); - // await ingestor.IngestionFlow(org, repo, "main"); - // } + var eventType = (skillName, functionName) switch + { + ("Do", "It") => EventType.NewAsk, + (nameof(PM), nameof(PM.Readme)) => EventType.NewAskReadme, + (nameof(DevLead), nameof(DevLead.Plan)) => EventType.NewAskPlan, + (nameof(Developer), nameof(Developer.Implement)) => EventType.NewAskImplement, + _ => EventType.NewAsk + }; + await stream.OnNextAsync(new Event + { + Type = eventType, + Message = input, + Org = org, + Repo = repo, + IssueNumber = issueNumber + }); + + // else if (skillName == "Repo" && functionName == "Ingest") + // { + // var ingestor = _grains.GetGrain(suffix); + // await ingestor.IngestionFlow(org, repo, "main"); + // } + } + catch (System.Exception) + { + _logger.LogError("Handling new ask"); + } + } } diff --git a/src/libs/Microsoft.AI.DevTeam/Actors/DevLead/DeveloperLead.cs b/src/libs/Microsoft.AI.DevTeam/Actors/DevLead/DeveloperLead.cs index c6408737e..0a49b4881 100644 --- a/src/libs/Microsoft.AI.DevTeam/Actors/DevLead/DeveloperLead.cs +++ b/src/libs/Microsoft.AI.DevTeam/Actors/DevLead/DeveloperLead.cs @@ -30,7 +30,7 @@ public class DeveloperLead : SemanticPersona public async override Task OnActivateAsync(CancellationToken cancellationToken) { var streamProvider = this.GetStreamProvider("StreamProvider"); - var streamId = StreamId.Create("DevPersonas", this.GetPrimaryKey()); + var streamId = StreamId.Create("DevPersonas", this.GetPrimaryKeyString()); var stream = streamProvider.GetStream(streamId); await stream.SubscribeAsync(HandleEvent); diff --git a/src/libs/Microsoft.AI.DevTeam/Actors/ProductManager/ProductManager.cs b/src/libs/Microsoft.AI.DevTeam/Actors/ProductManager/ProductManager.cs index 5e5624a71..581551064 100644 --- a/src/libs/Microsoft.AI.DevTeam/Actors/ProductManager/ProductManager.cs +++ b/src/libs/Microsoft.AI.DevTeam/Actors/ProductManager/ProductManager.cs @@ -31,7 +31,7 @@ public class ProductManager : SemanticPersona public async override Task OnActivateAsync(CancellationToken cancellationToken) { var streamProvider = this.GetStreamProvider("StreamProvider"); - var streamId = StreamId.Create("DevPersonas", this.GetPrimaryKey()); + var streamId = StreamId.Create("DevPersonas", this.GetPrimaryKeyString()); var stream = streamProvider.GetStream(streamId); await stream.SubscribeAsync(HandleEvent); diff --git a/src/libs/Microsoft.AI.DevTeam/Events/Event.cs b/src/libs/Microsoft.AI.DevTeam/Events/Event.cs index 18e71f42e..1dd3aaf8c 100644 --- a/src/libs/Microsoft.AI.DevTeam/Events/Event.cs +++ b/src/libs/Microsoft.AI.DevTeam/Events/Event.cs @@ -1,9 +1,16 @@ + +[GenerateSerializer] public class Event { + [Id(0)] public EventType Type { get; set; } + [Id(1)] public string Message { get; set; } + [Id(2)] public string Org { get; set; } + [Id(3)] public string Repo { get; set; } + [Id(4)] public long IssueNumber { get; set; } }