streaming WIP

This commit is contained in:
Kosta Petan
2024-02-19 14:48:17 +00:00
parent 4c98edc176
commit d55c2fa4b9
5 changed files with 85 additions and 53 deletions

View File

@@ -67,8 +67,7 @@ builder.Host.UseOrleans(siloBuilder =>
if (builder.Environment.IsDevelopment())
{
var connectionString = builder.Configuration.GetValue<string>("AzureOptions:CosmosConnectionString");
siloBuilder.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore");
siloBuilder.AddMemoryStreams("StreamProvider");
siloBuilder.UseCosmosReminderService( o =>
{
o.ConfigureCosmosClient(connectionString);

View File

@@ -24,22 +24,30 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor
}
protected override async Task ProcessIssuesWebhookAsync(WebhookHeaders headers, IssuesEvent issuesEvent, IssuesAction action)
{
var org = issuesEvent.Organization.Login;
var repo = issuesEvent.Repository.Name;
var issueNumber = issuesEvent.Issue.Number;
var input = issuesEvent.Issue.Body;
// Assumes the label follows the following convention: Skill.Function example: PM.Readme
var labels = issuesEvent.Issue.Labels.First().Name.Split(".");
var skillName = labels[0];
var functionName = labels[1];
var suffix = $"{org}-{repo}";
if (issuesEvent.Action == IssuesAction.Opened)
try
{
await HandleNewAsk(issueNumber, skillName, functionName, suffix, input, org, repo);
_logger.LogInformation("Processing issue event");
var org = issuesEvent.Organization.Login;
var repo = issuesEvent.Repository.Name;
var issueNumber = issuesEvent.Issue.Number;
var input = issuesEvent.Issue.Body;
// Assumes the label follows the following convention: Skill.Function example: PM.Readme
var labels = issuesEvent.Issue.Labels.First().Name.Split(".");
var skillName = labels[0];
var functionName = labels[1];
var suffix = $"{org}-{repo}";
if (issuesEvent.Action == IssuesAction.Opened)
{
await HandleNewAsk(issueNumber, skillName, functionName, suffix, input, org, repo);
}
else if (issuesEvent.Action == IssuesAction.Closed && issuesEvent.Issue.User.Type.Value == UserType.Bot)
{
await HandleClosingIssue(issueNumber, skillName, functionName, suffix, org, repo);
}
}
else if (issuesEvent.Action == IssuesAction.Closed && issuesEvent.Issue.User.Type.Value == UserType.Bot)
catch (System.Exception)
{
await HandleClosingIssue(issueNumber, skillName, functionName, suffix, org, repo);
_logger.LogError("Processing issue event");
}
}
@@ -48,20 +56,29 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor
IssueCommentEvent issueCommentEvent,
IssueCommentAction action)
{
var org = issueCommentEvent.Organization.Login;
var repo = issueCommentEvent.Repository.Name;
var issueNumber = issueCommentEvent.Issue.Number;
var input = issueCommentEvent.Issue.Body;
// Assumes the label follows the following convention: Skill.Function example: PM.Readme
var labels = issueCommentEvent.Issue.Labels.First().Name.Split(".");
var skillName = labels[0];
var functionName = labels[1];
var suffix = $"{org}-{repo}";
// we only resond to non-bot comments
if (issueCommentEvent.Sender.Type.Value != UserType.Bot)
try
{
await HandleNewAsk(issueNumber, skillName, functionName, suffix, input, org, repo);
_logger.LogInformation("Processing issue comment event");
var org = issueCommentEvent.Organization.Login;
var repo = issueCommentEvent.Repository.Name;
var issueNumber = issueCommentEvent.Issue.Number;
var input = issueCommentEvent.Issue.Body;
// Assumes the label follows the following convention: Skill.Function example: PM.Readme
var labels = issueCommentEvent.Issue.Labels.First().Name.Split(".");
var skillName = labels[0];
var functionName = labels[1];
var suffix = $"{org}-{repo}";
// we only resond to non-bot comments
if (issueCommentEvent.Sender.Type.Value != UserType.Bot)
{
await HandleNewAsk(issueNumber, skillName, functionName, suffix, input, org, repo);
}
}
catch (System.Exception)
{
_logger.LogError("Processing issue comment event");
}
}
private async Task HandleClosingIssue(long issueNumber, string skillName, string functionName, string suffix, string org, string repo)
@@ -81,32 +98,41 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor
private async Task HandleNewAsk(long issueNumber, string skillName, string functionName, string suffix, string input, string org, string repo)
{
var streamProvider = _client.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create("DevPersonas", suffix+issueNumber.ToString());
var stream = streamProvider.GetStream<Event>(streamId);
var eventType = (skillName, functionName) switch
try
{
("Do", "It") => EventType.NewAsk,
(nameof(PM), nameof(PM.Readme)) => EventType.NewAskReadme,
(nameof(DevLead), nameof(DevLead.Plan)) => EventType.NewAskPlan,
(nameof(Developer), nameof(Developer.Implement)) => EventType.NewAskImplement,
_ => EventType.NewAsk
};
await stream.OnNextAsync(new Event
{
Type = eventType,
Message = input,
Org = org,
Repo = repo,
IssueNumber = issueNumber
});
_logger.LogInformation("Handling new ask");
var streamProvider = _client.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create("DevPersonas", suffix+issueNumber.ToString());
var stream = streamProvider.GetStream<Event>(streamId);
// else if (skillName == "Repo" && functionName == "Ingest")
// {
// var ingestor = _grains.GetGrain<IIngestRepo>(suffix);
// await ingestor.IngestionFlow(org, repo, "main");
// }
var eventType = (skillName, functionName) switch
{
("Do", "It") => EventType.NewAsk,
(nameof(PM), nameof(PM.Readme)) => EventType.NewAskReadme,
(nameof(DevLead), nameof(DevLead.Plan)) => EventType.NewAskPlan,
(nameof(Developer), nameof(Developer.Implement)) => EventType.NewAskImplement,
_ => EventType.NewAsk
};
await stream.OnNextAsync(new Event
{
Type = eventType,
Message = input,
Org = org,
Repo = repo,
IssueNumber = issueNumber
});
// else if (skillName == "Repo" && functionName == "Ingest")
// {
// var ingestor = _grains.GetGrain<IIngestRepo>(suffix);
// await ingestor.IngestionFlow(org, repo, "main");
// }
}
catch (System.Exception)
{
_logger.LogError("Handling new ask");
}
}
}

View File

@@ -30,7 +30,7 @@ public class DeveloperLead : SemanticPersona
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create("DevPersonas", this.GetPrimaryKey());
var streamId = StreamId.Create("DevPersonas", this.GetPrimaryKeyString());
var stream = streamProvider.GetStream<Event>(streamId);
await stream.SubscribeAsync(HandleEvent);

View File

@@ -31,7 +31,7 @@ public class ProductManager : SemanticPersona
public async override Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider("StreamProvider");
var streamId = StreamId.Create("DevPersonas", this.GetPrimaryKey());
var streamId = StreamId.Create("DevPersonas", this.GetPrimaryKeyString());
var stream = streamProvider.GetStream<Event>(streamId);
await stream.SubscribeAsync(HandleEvent);

View File

@@ -1,9 +1,16 @@
[GenerateSerializer]
public class Event
{
[Id(0)]
public EventType Type { get; set; }
[Id(1)]
public string Message { get; set; }
[Id(2)]
public string Org { get; set; }
[Id(3)]
public string Repo { get; set; }
[Id(4)]
public long IssueNumber { get; set; }
}