Better Handling of Subscriber Errors in Logs Streams (#8505)

* handle subscriber error and increase buffer sizes

* operation order in unsub and close
This commit is contained in:
Raul Jordan
2021-02-23 15:33:23 -06:00
committed by GitHub
parent e40fba7679
commit 0c599521b1
3 changed files with 13 additions and 5 deletions

View File

@@ -224,9 +224,11 @@ func (ns *Server) ListPeers(ctx context.Context, _ *ptypes.Empty) (*ethpb.Peers,
// StreamBeaconLogs from the beacon node via a gRPC server-side stream.
func (ns *Server) StreamBeaconLogs(_ *ptypes.Empty, stream pb.Health_StreamBeaconLogsServer) error {
ch := make(chan []byte, ns.StreamLogsBufferSize)
defer close(ch)
sub := ns.LogsStreamer.LogsFeed().Subscribe(ch)
defer sub.Unsubscribe()
defer func() {
sub.Unsubscribe()
close(ch)
}()
recentLogs := ns.LogsStreamer.GetLastFewLogs()
logStrings := make([]string, len(recentLogs))
@@ -247,6 +249,8 @@ func (ns *Server) StreamBeaconLogs(_ *ptypes.Empty, stream pb.Health_StreamBeaco
if err := stream.Send(resp); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
case err := <-sub.Err():
return status.Errorf(codes.Canceled, "Subscriber error, closing: %v", err)
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Context canceled")
}

View File

@@ -259,7 +259,7 @@ func (s *Service) Start() {
}
nodeServer := &node.Server{
LogsStreamer: logutil.NewStreamServer(),
StreamLogsBufferSize: 100, // Enough to handle bursts of beacon node logs for gRPC streaming.
StreamLogsBufferSize: 1000, // Enough to handle bursts of beacon node logs for gRPC streaming.
BeaconDB: s.beaconDB,
Server: s.grpcServer,
SyncChecker: s.syncService,

View File

@@ -82,9 +82,11 @@ func (s *Server) StreamBeaconLogs(req *ptypes.Empty, stream pb.Health_StreamBeac
// StreamValidatorLogs from the validator client via a gRPC server-side stream.
func (s *Server) StreamValidatorLogs(_ *ptypes.Empty, stream pb.Health_StreamValidatorLogsServer) error {
ch := make(chan []byte, s.streamLogsBufferSize)
defer close(ch)
sub := s.logsStreamer.LogsFeed().Subscribe(ch)
defer sub.Unsubscribe()
defer func() {
sub.Unsubscribe()
defer close(ch)
}()
recentLogs := s.logsStreamer.GetLastFewLogs()
logStrings := make([]string, len(recentLogs))
@@ -107,6 +109,8 @@ func (s *Server) StreamValidatorLogs(_ *ptypes.Empty, stream pb.Health_StreamVal
}
case <-s.ctx.Done():
return status.Error(codes.Canceled, "Context canceled")
case err := <-sub.Err():
return status.Errorf(codes.Canceled, "Subscriber error, closing: %v", err)
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Context canceled")
}