mirror of
https://github.com/microsoft/autogen.git
synced 2026-04-20 03:02:16 -04:00
improve App Host shutdown (#3727)
Improve sequence and exception handling for outpost shut down
This commit is contained in:
committed by
GitHub
parent
498854aed3
commit
dfd239105a
@@ -47,7 +47,6 @@ namespace Hello
|
||||
Message = goodbye
|
||||
}.ToCloudEvent(this.AgentId.Key);
|
||||
await PublishEvent(evt).ConfigureAwait(false);
|
||||
await Task.Delay(60000);
|
||||
await App.ShutdownAsync();
|
||||
}
|
||||
public async Task<string> SayHello(string ask)
|
||||
|
||||
@@ -99,11 +99,21 @@ public sealed class AgentWorkerRuntime : IHostedService, IDisposable, IAgentWork
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Time to shut down.
|
||||
break;
|
||||
}
|
||||
catch (Exception ex) when (!_shutdownCts.IsCancellationRequested)
|
||||
{
|
||||
_logger.LogError(ex, "Error reading from channel.");
|
||||
channel = RecreateChannel(channel);
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Shutdown requested.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -113,33 +123,38 @@ public sealed class AgentWorkerRuntime : IHostedService, IDisposable, IAgentWork
|
||||
var outboundMessages = _outboundMessagesChannel.Reader;
|
||||
while (!_shutdownCts.IsCancellationRequested)
|
||||
{
|
||||
await outboundMessages.WaitToReadAsync().ConfigureAwait(false);
|
||||
|
||||
// Read the next message if we don't already have an unsent message
|
||||
// waiting to be sent.
|
||||
if (!outboundMessages.TryRead(out var message))
|
||||
try
|
||||
{
|
||||
break;
|
||||
}
|
||||
await outboundMessages.WaitToReadAsync().ConfigureAwait(false);
|
||||
|
||||
while (!_shutdownCts.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
// Read the next message if we don't already have an unsent message
|
||||
// waiting to be sent.
|
||||
if (!outboundMessages.TryRead(out var message))
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
while (!_shutdownCts.IsCancellationRequested)
|
||||
{
|
||||
await channel.RequestStream.WriteAsync(message, _shutdownCts.Token).ConfigureAwait(false);
|
||||
break;
|
||||
}
|
||||
catch (Exception ex) when (!_shutdownCts.IsCancellationRequested)
|
||||
{
|
||||
_logger.LogError(ex, "Error writing to channel.");
|
||||
channel = RecreateChannel(channel);
|
||||
continue;
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Shutdown requested.
|
||||
break;
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
// Time to shut down.
|
||||
break;
|
||||
}
|
||||
catch (Exception ex) when (!_shutdownCts.IsCancellationRequested)
|
||||
{
|
||||
_logger.LogError(ex, "Error writing to channel.");
|
||||
channel = RecreateChannel(channel);
|
||||
continue;
|
||||
}
|
||||
catch
|
||||
{
|
||||
// Shutdown requested.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -286,10 +301,6 @@ public sealed class AgentWorkerRuntime : IHostedService, IDisposable, IAgentWork
|
||||
public async Task StopAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
_shutdownCts.Cancel();
|
||||
lock (_channelLock)
|
||||
{
|
||||
_channel?.Dispose();
|
||||
}
|
||||
|
||||
_outboundMessagesChannel.Writer.TryComplete();
|
||||
|
||||
@@ -302,6 +313,10 @@ public sealed class AgentWorkerRuntime : IHostedService, IDisposable, IAgentWork
|
||||
{
|
||||
await writeTask.ConfigureAwait(false);
|
||||
}
|
||||
lock (_channelLock)
|
||||
{
|
||||
_channel?.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
public ValueTask SendRequest(RpcRequest request)
|
||||
|
||||
@@ -49,7 +49,7 @@ public static class App
|
||||
{
|
||||
throw new InvalidOperationException("Client not started");
|
||||
}
|
||||
await RuntimeApp!.StopAsync();
|
||||
await ClientApp.StopAsync();
|
||||
await RuntimeApp!.StopAsync();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,6 +87,9 @@ internal sealed class WorkerProcessConnection : IAsyncDisposable
|
||||
_gateway.OnReceivedMessageAsync(this, message).Ignore();
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
}
|
||||
finally
|
||||
{
|
||||
_shutdownCancellationToken.Cancel();
|
||||
@@ -104,6 +107,9 @@ internal sealed class WorkerProcessConnection : IAsyncDisposable
|
||||
await ResponseStream.WriteAsync(message);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException)
|
||||
{
|
||||
}
|
||||
finally
|
||||
{
|
||||
_shutdownCancellationToken.Cancel();
|
||||
|
||||
Reference in New Issue
Block a user