Compare commits

...

1 Commits

Author SHA1 Message Date
Kasey Kirkham
60cfd6a5a6 non-blocking write to validator client stream 2025-10-08 15:03:02 -05:00

View File

@@ -1,6 +1,8 @@
package validator
import (
"time"
"github.com/OffchainLabs/prysm/v6/async/event"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/blocks"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
@@ -53,51 +55,100 @@ func (vs *Server) StreamBlocksAltair(req *ethpb.StreamBlocksRequest, stream ethp
//
// StreamSlots sends a the block's slot and dependent roots to clients every single time a block is received by the beacon node.
func (vs *Server) StreamSlots(req *ethpb.StreamSlotsRequest, stream ethpb.BeaconNodeValidator_StreamSlotsServer) error {
ch := make(chan *feed.Event, 1)
bufchan := make(chan *feed.Event)
errchan := make(chan error)
defer func() {
select {
case err := <-errchan:
log.WithError(err).Debug("error from sending goroutine after StreamSlots timeout")
default:
return
}
}() // drain errchan to make sure the goroutine can't get stuck
go func() {
for {
select {
case ev := <-bufchan:
if ev == nil {
// channel closed
return
}
var s primitives.Slot
var currDependentRoot, prevDependentRoot [32]byte
if req.VerifiedOnly {
if ev.Type != statefeed.BlockProcessed {
continue
}
data, ok := ev.Data.(*statefeed.BlockProcessedData)
if !ok || data == nil {
continue
}
s = data.Slot
currDependentRoot = data.CurrDependentRoot
prevDependentRoot = data.PrevDependentRoot
} else {
if ev.Type != blockfeed.ReceivedBlock {
continue
}
data, ok := ev.Data.(*blockfeed.ReceivedBlockData)
if !ok || data == nil {
continue
}
s = data.SignedBlock.Block().Slot()
currDependentRoot = data.CurrDependentRoot
prevDependentRoot = data.PrevDependentRoot
}
if err := stream.Send(
&ethpb.StreamSlotsResponse{
Slot: s,
PreviousDutyDependentRoot: prevDependentRoot[:],
CurrentDutyDependentRoot: currDependentRoot[:],
}); err != nil {
errchan <- status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
return
}
}
}
}()
subchan := make(chan *feed.Event, 1)
var sub event.Subscription
if req.VerifiedOnly {
sub = vs.StateNotifier.StateFeed().Subscribe(ch)
sub = vs.StateNotifier.StateFeed().Subscribe(subchan)
} else {
sub = vs.BlockNotifier.BlockFeed().Subscribe(ch)
sub = vs.BlockNotifier.BlockFeed().Subscribe(subchan)
}
defer func() {
// drain the subchan because non-blocking send and unsubscribe can be racy
for {
select {
case ev := <-subchan:
if ev == nil {
return
}
default:
return
}
}
}()
defer close(bufchan)
defer sub.Unsubscribe()
for {
select {
case ev := <-ch:
var s primitives.Slot
var currDependentRoot, prevDependentRoot [32]byte
if req.VerifiedOnly {
if ev.Type != statefeed.BlockProcessed {
continue
}
data, ok := ev.Data.(*statefeed.BlockProcessedData)
if !ok || data == nil {
continue
}
s = data.Slot
currDependentRoot = data.CurrDependentRoot
prevDependentRoot = data.PrevDependentRoot
} else {
if ev.Type != blockfeed.ReceivedBlock {
continue
}
data, ok := ev.Data.(*blockfeed.ReceivedBlockData)
if !ok || data == nil {
continue
}
s = data.SignedBlock.Block().Slot()
currDependentRoot = data.CurrDependentRoot
prevDependentRoot = data.PrevDependentRoot
}
if err := stream.Send(
&ethpb.StreamSlotsResponse{
Slot: s,
PreviousDutyDependentRoot: prevDependentRoot[:],
CurrentDutyDependentRoot: currDependentRoot[:],
}); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
case ev := <-subchan:
select {
// This select implements a non-blocking channel send with a timeout to prevent blocking the subscription
case bufchan <- ev:
continue
case <-time.After(time.Second):
return status.Error(codes.ResourceExhausted, "Could not keep up with block events, exiting stream")
case <-vs.Ctx.Done():
return status.Error(codes.Canceled, "Context canceled")
}
case err := <-errchan:
return err
case <-sub.Err():
return status.Error(codes.Aborted, "Subscriber closed, exiting goroutine")
case <-vs.Ctx.Done():