diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index ee20349c7b..31a1bbbab4 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -112,7 +112,7 @@ func (b *BeaconNode) Start() { log.Info("Already shutting down, interrupt more to panic", "times", i-1) } } - debug.Exit() // Ensure trace and CPU profile data are flushed. + debug.Exit(b.ctx) // Ensure trace and CPU profile data are flushed. panic("Panic closing the beacon node") }() diff --git a/shared/debug/debug.go b/shared/debug/debug.go index e28c9d6e47..e5cbc4026a 100644 --- a/shared/debug/debug.go +++ b/shared/debug/debug.go @@ -364,11 +364,15 @@ func startPProf(address string) { // Exit stops all running profiles, flushing their output to the // respective file. -func Exit() { - if err := Handler.StopCPUProfile(); err != nil { - log.Errorf("Failed to stop CPU profiling: %v", err) +func Exit(ctx *cli.Context) { + if traceFile := ctx.GlobalString(TraceFlag.Name); traceFile != "" { + if err := Handler.StopGoTrace(); err != nil { + log.Errorf("Failed to stop go tracing: %v", err) + } } - if err := Handler.StopGoTrace(); err != nil { - log.Errorf("Failed to stop go tracing: %v", err) + if cpuFile := ctx.GlobalString(CPUProfileFlag.Name); cpuFile != "" { + if err := Handler.StopCPUProfile(); err != nil { + log.Errorf("Failed to stop CPU profiling: %v", err) + } } } diff --git a/validator/beacon/service.go b/validator/beacon/service.go index 135c64a5aa..9e31cf622b 100644 --- a/validator/beacon/service.go +++ b/validator/beacon/service.go @@ -182,6 +182,11 @@ func (s *Service) listenForCrystallizedStates(client pb.BeaconServiceClient) { if err == io.EOF { break } + // If context is canceled we stop the loop. + if s.ctx.Err() != nil { + log.Debugf("Context has been canceled so shutting down the loop: %v", s.ctx.Err()) + break + } if err != nil { log.Errorf("Could not receive latest crystallized beacon state from stream: %v", err) continue @@ -247,6 +252,11 @@ func (s *Service) listenForProcessedAttestations(client pb.BeaconServiceClient) if err == io.EOF { break } + // If context is canceled we stop the loop. + if s.ctx.Err() != nil { + log.Debugf("Context has been canceled so shutting down the loop: %v", s.ctx.Err()) + break + } if err != nil { log.Errorf("Could not receive latest attestation from stream: %v", err) continue diff --git a/validator/beacon/service_test.go b/validator/beacon/service_test.go index b23f997599..6ea8e779ad 100644 --- a/validator/beacon/service_test.go +++ b/validator/beacon/service_test.go @@ -305,6 +305,22 @@ func TestListenForCrystallizedStates(t *testing.T) { b.listenForCrystallizedStates(mockServiceClient) testutil.AssertLogsContain(t, hook, "Validator selected as proposer") + + // Test that the routine exits when context is closed + stream = internal.NewMockBeaconService_LatestCrystallizedStateClient(ctrl) + + stream.EXPECT().Recv().Return(&pbp2p.CrystallizedState{}, nil) + + mockServiceClient = internal.NewMockBeaconServiceClient(ctrl) + mockServiceClient.EXPECT().LatestCrystallizedState( + gomock.Any(), + gomock.Any(), + ).Return(stream, nil) + b.cancel() + + b.listenForCrystallizedStates(mockServiceClient) + testutil.AssertLogsContain(t, hook, "Context has been canceled so shutting down the loop") + } func TestListenForProcessedAttestations(t *testing.T) { @@ -355,4 +371,19 @@ func TestListenForProcessedAttestations(t *testing.T) { b.listenForProcessedAttestations(mockServiceClient) testutil.AssertLogsContain(t, hook, "stream creation failed") testutil.AssertLogsContain(t, hook, "Could not receive latest attestation from stream") + + // Test that the routine exits when context is closed + stream = internal.NewMockBeaconService_LatestAttestationClient(ctrl) + + stream.EXPECT().Recv().Return(&pbp2p.AggregatedAttestation{}, nil) + + mockServiceClient = internal.NewMockBeaconServiceClient(ctrl) + mockServiceClient.EXPECT().LatestAttestation( + gomock.Any(), + gomock.Any(), + ).Return(stream, nil) + b.cancel() + + b.listenForProcessedAttestations(mockServiceClient) + testutil.AssertLogsContain(t, hook, "Context has been canceled so shutting down the loop") } diff --git a/validator/main.go b/validator/main.go index 925f6fd4b7..d6d05b1894 100644 --- a/validator/main.go +++ b/validator/main.go @@ -82,7 +82,7 @@ VERSION: } app.After = func(ctx *cli.Context) error { - debug.Exit() + debug.Exit(ctx) return nil } diff --git a/validator/node/node.go b/validator/node/node.go index d2d2d4ca7e..a8e1f20e23 100644 --- a/validator/node/node.go +++ b/validator/node/node.go @@ -33,6 +33,7 @@ const shardChainDBName = "shardchaindata" // the entire lifecycle of services attached to it participating in // Ethereum 2.0. type ShardEthereum struct { + ctx *cli.Context services *shared.ServiceRegistry // Lifecycle and service store. lock sync.RWMutex stop chan struct{} // Channel to wait for termination notifications. @@ -43,6 +44,7 @@ type ShardEthereum struct { func NewShardInstance(ctx *cli.Context) (*ShardEthereum, error) { registry := shared.NewServiceRegistry() shardEthereum := &ShardEthereum{ + ctx: ctx, services: registry, stop: make(chan struct{}), } @@ -102,7 +104,7 @@ func (s *ShardEthereum) Start() { log.Info("Already shutting down, interrupt more to panic.", "times", i-1) } } - debug.Exit() // Ensure trace and CPU profile data are flushed. + debug.Exit(s.ctx) // Ensure trace and CPU profile data are flushed. panic("Panic closing the sharding validator") }()