streaming working

This commit is contained in:
Kosta Petan
2024-02-20 13:18:57 +01:00
parent a3a3fcab40
commit 2158e8cc84
3 changed files with 22 additions and 111 deletions

View File

@@ -64,100 +64,10 @@ builder.Services.AddSingleton<IAnalyzeCode, CodeAnalyzer>();
builder.Host.UseOrleans(siloBuilder =>
{
if (builder.Environment.IsDevelopment())
{
var connectionString = builder.Configuration.GetValue<string>("AzureOptions:CosmosConnectionString");
//siloBuilder.AddMemoryStreams("StreamProvider");
siloBuilder.UseCosmosReminderService(o =>
{
o.ConfigureCosmosClient(connectionString);
o.ContainerName = "reminders";
o.DatabaseName = "devteam";
o.IsResourceCreationEnabled = true;
});
siloBuilder.AddCosmosGrainStorage(
name: "PubSubStore",
configureOptions: o =>
{
o.ConfigureCosmosClient(connectionString);
o.ContainerName = "pubsubstore";
o.DatabaseName = "pubsub";
o.IsResourceCreationEnabled = true;
});
siloBuilder.AddCosmosGrainStorage(
name: "messages",
configureOptions: o =>
{
o.ConfigureCosmosClient(connectionString);
o.ContainerName = "persistence";
o.DatabaseName = "devteam";
o.IsResourceCreationEnabled = true;
});
var ehConnectionString = builder.Configuration.GetValue<string>("AzureOptions:EventHubsConnectionString");
var storageConnectionString = builder.Configuration.GetValue<string>("AzureOptions:StorageAccountConnectionString");
siloBuilder.UseLocalhostClustering()
.AddEventHubStreams("StreamProvider", (ISiloEventHubStreamConfigurator configurator) =>
{
configurator.ConfigureEventHub(builder => builder.Configure(options =>
{
options.ConfigureEventHubConnection(
ehConnectionString,
"sk-dev-team",
"$Default");
}));
configurator.UseAzureTableCheckpointer(
builder => builder.Configure(options =>
{
options.ConfigureTableServiceClient(storageConnectionString);
options.PersistInterval = TimeSpan.FromSeconds(10);
}));
}); ;
}
else
{
var cosmosDbconnectionString = builder.Configuration.GetValue<string>("AzureOptions:CosmosConnectionString");
siloBuilder.Configure<ClusterOptions>(options =>
{
options.ClusterId = "ai-dev-cluster";
options.ServiceId = "ai-dev-cluster";
});
siloBuilder.Configure<SiloMessagingOptions>(options =>
{
options.ResponseTimeout = TimeSpan.FromMinutes(3);
options.SystemResponseTimeout = TimeSpan.FromMinutes(3);
});
siloBuilder.Configure<ClientMessagingOptions>(options =>
{
options.ResponseTimeout = TimeSpan.FromMinutes(3);
});
siloBuilder.UseCosmosClustering(o =>
{
o.ConfigureCosmosClient(cosmosDbconnectionString);
o.ContainerName = "devteam";
o.DatabaseName = "clustering";
o.IsResourceCreationEnabled = true;
});
siloBuilder.UseCosmosReminderService(o =>
{
o.ConfigureCosmosClient(cosmosDbconnectionString);
o.ContainerName = "devteam";
o.DatabaseName = "reminders";
o.IsResourceCreationEnabled = true;
});
siloBuilder.AddCosmosGrainStorage(
name: "messages",
configureOptions: o =>
{
o.ConfigureCosmosClient(cosmosDbconnectionString);
o.ContainerName = "devteam";
o.DatabaseName = "persistence";
o.IsResourceCreationEnabled = true;
});
//TODO: Add streaming here
}
siloBuilder.UseLocalhostClustering()
.AddMemoryStreams("StreamProvider")
.AddMemoryGrainStorage("PubSubStore")
.AddMemoryGrainStorage("messages");
});
@@ -167,9 +77,8 @@ builder.Services.Configure<JsonSerializerOptions>(options =>
});
var app = builder.Build();
app.UseRouting();
app.UseEndpoints(endpoints =>
app.UseRouting()
.UseEndpoints(endpoints =>
{
endpoints.MapGitHubWebhooks();
});

View File

@@ -27,7 +27,7 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor
try
{
_logger.LogInformation("Processing issue event");
var org = issuesEvent.Organization.Login;
var org = issuesEvent.Organization.Login;
var repo = issuesEvent.Repository.Name;
var issueNumber = issuesEvent.Issue.Number;
var input = issuesEvent.Issue.Body;
@@ -38,10 +38,12 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor
var suffix = $"{org}-{repo}";
if (issuesEvent.Action == IssuesAction.Opened)
{
_logger.LogInformation("Processing HandleNewAsk");
await HandleNewAsk(issueNumber, skillName, functionName, suffix, input, org, repo);
}
else if (issuesEvent.Action == IssuesAction.Closed && issuesEvent.Issue.User.Type.Value == UserType.Bot)
{
_logger.LogInformation("Processing HandleClosingIssue");
await HandleClosingIssue(issueNumber, skillName, functionName, suffix, org, repo);
}
}
@@ -59,7 +61,7 @@ public sealed class GithubWebHookProcessor : WebhookEventProcessor
try
{
_logger.LogInformation("Processing issue comment event");
var org = issueCommentEvent.Organization.Login;
var org = issueCommentEvent.Organization.Login;
var repo = issueCommentEvent.Repository.Name;
var issueNumber = issueCommentEvent.Issue.Number;
var input = issueCommentEvent.Issue.Body;

View File

@@ -44,7 +44,7 @@ public class ProductManager : SemanticPersona
case EventType.NewAsk:
await CreateIssue(item.Org, item.Repo, item.IssueNumber, item.Message);
break;
case EventType.NewAskReadme:
case EventType.NewAskReadme:
await CreateReadme(item.Message);
break;
case EventType.ChainClosed:
@@ -57,18 +57,18 @@ public class ProductManager : SemanticPersona
public async Task CreateIssue(string org, string repo, long parentNumber, string input)
{
// TODO: Create branch and PR
var pmIssue = await _ghService.CreateIssue(new CreateIssueRequest
{
Label = $"{nameof(PM)}.{nameof(PM.Readme)}",
Org = org,
Repo = repo,
Input = input,
ParentNumber = parentNumber
});
//TODO: Create branch and PR
var pmIssue = await _ghService.CreateIssue(new CreateIssueRequest
{
Label = $"{nameof(PM)}.{nameof(PM.Readme)}",
Org = org,
Repo = repo,
Input = input,
ParentNumber = parentNumber
});
_state.State.ParentIssueNumber = parentNumber;
await _state.WriteStateAsync();
_state.State.ParentIssueNumber = parentNumber;
await _state.WriteStateAsync();
}
public async Task<string> CreateReadme(string ask)