Compare commits

...

10 Commits

Author SHA1 Message Date
Kasey Kirkham
9493af1cdc consistently use slot instead of version 2025-09-18 15:24:26 -05:00
Kasey Kirkham
8a270d2a5c build sidecars during block publish 2025-09-18 15:07:14 -05:00
Manu NALEPA
9b551959c4 Take notifier out of the critical section 2025-09-18 18:57:12 +02:00
Manu NALEPA
8f9026bed8 Fix deadlock-option 2. 2025-09-18 18:53:27 +02:00
Manu NALEPA
2568e2e087 Fix deadlock-option 1. 2025-09-18 18:50:00 +02:00
Manu NALEPA
62a4fca4d5 Add changelog 2025-09-18 18:16:12 +02:00
Manu NALEPA
7059cf4cf2 broadcastReceiveBlock: Add log. 2025-09-18 18:14:09 +02:00
Manu NALEPA
5c64cb9eb6 ProposeBeaconBlock: First broadcast/receive block, and then sidecars. 2025-09-18 18:07:03 +02:00
Manu NALEPA
1bf2188f81 broadcastAndReceiveDataColumns: Broadcast and receive data columns in parallel. 2025-09-18 16:03:59 +02:00
Manu NALEPA
d78cb1fd67 Broadcasted data column sidecar log: Add blobCount. 2025-09-18 16:03:33 +02:00
4 changed files with 210 additions and 90 deletions

View File

@@ -381,6 +381,7 @@ func (s *Service) internalBroadcastDataColumnSidecar(
"timeSinceSlotStart": time.Since(slotStartTime),
"root": fmt.Sprintf("%#x", dataColumnSidecar.BlockRoot()),
"columnSubnet": columnSubnet,
"blobCount": len(dataColumnSidecar.Column),
}).Debug("Broadcasted data column sidecar")
// Increase the number of successful broadcasts.

View File

@@ -279,86 +279,218 @@ func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.Signed
//
// ProposeBeaconBlock handles the proposal of beacon blocks.
func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSignedBeaconBlock) (*ethpb.ProposeResponse, error) {
var (
blobSidecars []*ethpb.BlobSidecar
dataColumnSidecars []blocks.RODataColumn
)
ctx, span := trace.StartSpan(ctx, "ProposerServer.ProposeBeaconBlock")
defer span.End()
if req == nil {
return nil, status.Errorf(codes.InvalidArgument, "empty request")
}
block, err := blocks.NewSignedBeaconBlock(req.Block)
prop, err := unblindProposalRequest(ctx, req, vs.BlockBuilder)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s: %v", "decode block failed", err)
}
root, err := block.Block().HashTreeRoot()
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not hash tree root: %v", err)
return nil, status.Errorf(codes.Internal, "could not unblind block: %v", err)
}
// For post-Fulu blinded blocks, submit to relay and return early
if block.IsBlinded() && slots.ToEpoch(block.Block().Slot()) >= params.BeaconConfig().FuluForkEpoch {
err := vs.BlockBuilder.SubmitBlindedBlockPostFulu(ctx, block)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not submit blinded block post-Fulu: %v", err)
}
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
if !prop.shouldBroadcastBlock() {
return &ethpb.ProposeResponse{BlockRoot: prop.block.RootSlice()}, nil
}
rob, err := blocks.NewROBlockWithRoot(block, root)
if block.IsBlinded() {
block, blobSidecars, err = vs.handleBlindedBlock(ctx, block)
} else if block.Version() >= version.Deneb {
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(rob, req)
}
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: %v", "handle block failed", err)
}
var wg sync.WaitGroup
errChan := make(chan error, 1)
wg.Add(1)
ec := make(chan error)
go func() {
defer wg.Done()
if err := vs.broadcastReceiveBlock(ctx, block, root); err != nil {
errChan <- errors.Wrap(err, "broadcast/receive block failed")
return
if err := vs.broadcastBlock(ctx, prop.block); err != nil {
ec <- errors.Wrap(err, "broadcast block")
} else {
ec <- nil
}
errChan <- nil
}()
if err := vs.broadcastAndReceiveSidecars(ctx, block, root, blobSidecars, dataColumnSidecars); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive sidecars: %v", err)
// build sidecars concurrently with block broadcast
if err := prop.buildSidecars(); err != nil {
return nil, status.Errorf(codes.Internal, "could not build sidecars: %v", err)
}
wg.Wait()
if err := <-errChan; err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive block: %v", err)
// wait for result of block broadcast
if err := <-ec; err != nil {
return nil, status.Errorf(codes.Internal, "broadcast block failed: %v", err)
}
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
if prop.shouldBroadcastColumns() {
cols, err := prop.buildColumns()
if err != nil {
return nil, status.Errorf(codes.Internal, "could not construct data columns: %v", err)
}
if err := vs.broadcastAndReceiveDataColumns(ctx, cols, prop.block.Root()); err != nil {
return nil, status.Errorf(codes.Internal, "could not broadcast and receive data columns: %v", err)
}
} else if prop.shouldBroadcastBlobs() {
blobs, err := prop.buildBlobs()
if err != nil {
return nil, status.Errorf(codes.Internal, "could not construct blob sidecars: %v", err)
}
if err := vs.broadcastAndReceiveBlobs(ctx, blobs, prop.block.Root()); err != nil {
return nil, status.Errorf(codes.Internal, "could not broadcast and receive blob sidecars: %v", err)
}
}
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{SignedBlock: prop.block},
})
if err := vs.BlockReceiver.ReceiveBlock(ctx, prop.block, prop.block.Root(), nil); err != nil {
return nil, status.Errorf(codes.Internal, "receive block failed: %v", err)
}
return &ethpb.ProposeResponse{BlockRoot: prop.block.RootSlice()}, nil
}
type proposal struct {
block blocks.ROBlock
epoch primitives.Epoch
req *ethpb.GenericSignedBeaconBlock
blobs []*ethpb.BlobSidecar
columns []blocks.RODataColumn
}
func parsedProposal(block interfaces.SignedBeaconBlock, req *ethpb.GenericSignedBeaconBlock) (*proposal, error) {
rob, err := blocks.NewROBlock(block)
if err != nil {
return nil, errors.Wrap(err, "could not create read-only block")
}
return &proposal{
block: rob,
epoch: slots.ToEpoch(rob.Block().Slot()),
req: req,
}, nil
}
func unblindProposalRequest(ctx context.Context, req *ethpb.GenericSignedBeaconBlock, bldr builder.BlockBuilder) (*proposal, error) {
block, err := blocks.NewSignedBeaconBlock(req.Block)
if err != nil {
return nil, errors.Wrap(err, "decode block failed")
}
if slots.ToEpoch(block.Block().Slot()) < params.BeaconConfig().BellatrixForkEpoch || !block.IsBlinded() {
return parsedProposal(block, req)
}
if bldr == nil || !bldr.Configured() {
return nil, errors.New("can not propose blinded block without block builder configured")
}
if slots.ToEpoch(block.Block().Slot()) >= params.BeaconConfig().FuluForkEpoch {
if err := bldr.SubmitBlindedBlockPostFulu(ctx, block); err != nil {
return nil, status.Errorf(codes.Internal, "could not submit blinded block post-Fulu: %v", err)
}
return parsedProposal(block, req)
}
payload, bundle, err := bldr.SubmitBlindedBlock(ctx, block)
if err != nil {
return nil, errors.Wrap(err, "submit blinded block failed")
}
ublock, err := block.Copy()
if err != nil {
return nil, errors.Wrap(err, "copy block")
}
if err := ublock.Unblind(payload); err != nil {
return nil, errors.Wrap(err, "unblind failed")
}
unblinded, err := parsedProposal(ublock, req)
if err != nil {
return nil, errors.Wrap(err, "could not create unblinder")
}
sidecars, err := unblindBlobsSidecars(ublock, bundle)
if err != nil {
return nil, errors.Wrap(err, "unblind blobs sidecars: commitment value doesn't match block")
}
unblinded.blobs = sidecars
return unblinded, nil
}
func (u *proposal) buildSidecars() error {
var err error
if u.shouldBroadcastBlobs() {
u.blobs, err = u.buildBlobs()
if err != nil {
return errors.Wrap(err, "could not construct blob sidecars")
}
}
if u.shouldBroadcastColumns() {
u.columns, err = u.buildColumns()
if err != nil {
return errors.Wrap(err, "could not construct data columns")
}
}
return nil
}
func (u *proposal) buildColumns() ([]blocks.RODataColumn, error) {
if u.epoch < params.BeaconConfig().FuluForkEpoch {
return nil, nil // No data columns before fulu.
}
if len(u.columns) > 0 {
return u.columns, nil
}
rawBlobs, proofs, err := blobsAndProofs(u.req)
if err != nil {
return nil, err
}
// Compute cells and proofs from the blobs and cell proofs.
cellsAndProofs, err := peerdas.ComputeCellsAndProofsFromFlat(rawBlobs, proofs)
if err != nil {
return nil, errors.Wrap(err, "compute cells and proofs")
}
// Construct data column sidecars from the signed block and cells and proofs.
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(u.block))
if err != nil {
return nil, errors.Wrap(err, "data column sidcars")
}
return roDataColumnSidecars, nil
}
func (u *proposal) buildBlobs() ([]*ethpb.BlobSidecar, error) {
if u.epoch < params.BeaconConfig().DenebForkEpoch || u.epoch >= params.BeaconConfig().FuluForkEpoch {
return nil, nil // No blobs before deneb, data column sidecars after fulu
}
if len(u.blobs) > 0 {
return u.blobs, nil
}
rawBlobs, proofs, err := blobsAndProofs(u.req)
if err != nil {
return nil, err
}
return BuildBlobSidecars(u.block, rawBlobs, proofs)
}
func (u *proposal) shouldBroadcastBlock() bool {
// we should broadcast if the block is unblinded, or blinded and pre-fulu
return !u.block.IsBlinded() || u.epoch < params.BeaconConfig().FuluForkEpoch
}
func (u *proposal) shouldBroadcastBlobs() bool {
return u.epoch >= params.BeaconConfig().DenebForkEpoch && u.epoch < params.BeaconConfig().FuluForkEpoch
}
func (u *proposal) shouldBroadcastColumns() bool {
return u.epoch >= params.BeaconConfig().FuluForkEpoch
}
// broadcastAndReceiveSidecars broadcasts and receives sidecars.
func (vs *Server) broadcastAndReceiveSidecars(
ctx context.Context,
block interfaces.SignedBeaconBlock,
root [fieldparams.RootLength]byte,
block blocks.ROBlock,
blobSidecars []*ethpb.BlobSidecar,
dataColumnSidecars []blocks.RODataColumn,
) error {
if block.Version() >= version.Fulu {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars, root); err != nil {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars, block.Root()); err != nil {
return errors.Wrap(err, "broadcast and receive data columns")
}
return nil
}
if err := vs.broadcastAndReceiveBlobs(ctx, blobSidecars, root); err != nil {
if err := vs.broadcastAndReceiveBlobs(ctx, blobSidecars, block.Root()); err != nil {
return errors.Wrap(err, "broadcast and receive blobs")
}
@@ -398,41 +530,13 @@ func (vs *Server) handleBlindedBlock(ctx context.Context, block interfaces.Signe
return copiedBlock, sidecars, nil
}
func (vs *Server) handleUnblindedBlock(
block blocks.ROBlock,
req *ethpb.GenericSignedBeaconBlock,
) ([]*ethpb.BlobSidecar, []blocks.RODataColumn, error) {
rawBlobs, proofs, err := blobsAndProofs(req)
if err != nil {
return nil, nil, err
}
// broadcastReceiveBlock broadcasts a block and handles its reception.
func (vs *Server) broadcastReceiveBlock(ctx context.Context, block blocks.ROBlock) error {
if block.Version() >= version.Fulu {
// Compute cells and proofs from the blobs and cell proofs.
cellsAndProofs, err := peerdas.ComputeCellsAndProofsFromFlat(rawBlobs, proofs)
if err != nil {
return nil, nil, errors.Wrap(err, "compute cells and proofs")
}
// Construct data column sidecars from the signed block and cells and proofs.
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(block))
if err != nil {
return nil, nil, errors.Wrap(err, "data column sidcars")
}
return nil, roDataColumnSidecars, nil
}
blobSidecars, err := BuildBlobSidecars(block, rawBlobs, proofs)
if err != nil {
return nil, nil, errors.Wrap(err, "build blob sidecars")
}
return blobSidecars, nil, nil
return nil
}
// broadcastReceiveBlock broadcasts a block and handles its reception.
func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.SignedBeaconBlock, root [fieldparams.RootLength]byte) error {
func (vs *Server) broadcastBlock(ctx context.Context, block blocks.ROBlock) error {
protoBlock, err := block.Proto()
if err != nil {
return errors.Wrap(err, "protobuf conversion failed")
@@ -440,15 +544,20 @@ func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.Si
if err := vs.P2P.Broadcast(ctx, protoBlock); err != nil {
return errors.Wrap(err, "broadcast failed")
}
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
Type: blockfeed.ReceivedBlock,
Data: &blockfeed.ReceivedBlockData{SignedBlock: block},
})
return vs.BlockReceiver.ReceiveBlock(ctx, block, root, nil)
log.WithFields(logrus.Fields{
"slot": block.Block().Slot(),
"root": fmt.Sprintf("%#x", block.Root()),
}).Debug("Broadcasted block")
return nil
}
// broadcastAndReceiveBlobs handles the broadcasting and reception of blob sidecars.
func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethpb.BlobSidecar, root [fieldparams.RootLength]byte) error {
if len(sidecars) == 0 {
return nil
}
eg, eCtx := errgroup.WithContext(ctx)
for subIdx, sc := range sidecars {
eg.Go(func() error {
@@ -479,6 +588,9 @@ func (vs *Server) broadcastAndReceiveDataColumns(
roSidecars []blocks.RODataColumn,
root [fieldparams.RootLength]byte,
) error {
if len(roSidecars) == 0 {
return nil
}
verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
eg, _ := errgroup.WithContext(ctx)
for _, roSidecar := range roSidecars {
@@ -498,10 +610,6 @@ func (vs *Server) broadcastAndReceiveDataColumns(
})
}
if err := eg.Wait(); err != nil {
return errors.Wrap(err, "wait for data columns to be broadcasted")
}
if err := vs.DataColumnReceiver.ReceiveDataColumns(verifiedRODataColumns); err != nil {
return errors.Wrap(err, "receive data column")
}
@@ -512,6 +620,11 @@ func (vs *Server) broadcastAndReceiveDataColumns(
Data: &operation.DataColumnSidecarReceivedData{DataColumn: &verifiedRODataColumn}, // #nosec G601
})
}
if err := eg.Wait(); err != nil {
return errors.Wrap(err, "wait for data columns to be broadcasted")
}
return nil
}

View File

@@ -0,0 +1,2 @@
### Changed
- Build DataColumnSidecars concurrently with block publishing.

View File

@@ -0,0 +1,4 @@
### Changed
- Broadcast block then sidecars, instead block and sidecars concurrently
- Broadcast and receive sidecars in concurrently, instead sequentially