mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Fix orphaned blocks (#15720)
* `Broadcasted data column sidecar` log: Add `blobCount`. * `broadcastAndReceiveDataColumns`: Broadcast and receive data columns in parallel. * `ProposeBeaconBlock`: First broadcast/receive block, and then sidecars. * `broadcastReceiveBlock`: Add log. * Add changelog * Fix deadlock-option 1. * Fix deadlock-option 2. * Take notifier out of the critical section * only compute common info once, for all sidecars --------- Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
This commit is contained in:
@@ -109,15 +109,14 @@ func DataColumnSidecars(rows []kzg.CellsAndProofs, src ConstructionPopulator) ([
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "rotate cells and proofs")
|
return nil, errors.Wrap(err, "rotate cells and proofs")
|
||||||
}
|
}
|
||||||
|
info, err := src.extract()
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "extract block info")
|
||||||
|
}
|
||||||
|
|
||||||
maxIdx := params.BeaconConfig().NumberOfColumns
|
maxIdx := params.BeaconConfig().NumberOfColumns
|
||||||
roSidecars := make([]blocks.RODataColumn, 0, maxIdx)
|
roSidecars := make([]blocks.RODataColumn, 0, maxIdx)
|
||||||
for idx := range maxIdx {
|
for idx := range maxIdx {
|
||||||
info, err := src.extract()
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "extract block info")
|
|
||||||
}
|
|
||||||
|
|
||||||
sidecar := ðpb.DataColumnSidecar{
|
sidecar := ðpb.DataColumnSidecar{
|
||||||
Index: idx,
|
Index: idx,
|
||||||
Column: cells[idx],
|
Column: cells[idx],
|
||||||
|
|||||||
@@ -381,6 +381,7 @@ func (s *Service) internalBroadcastDataColumnSidecar(
|
|||||||
"timeSinceSlotStart": time.Since(slotStartTime),
|
"timeSinceSlotStart": time.Since(slotStartTime),
|
||||||
"root": fmt.Sprintf("%#x", dataColumnSidecar.BlockRoot()),
|
"root": fmt.Sprintf("%#x", dataColumnSidecar.BlockRoot()),
|
||||||
"columnSubnet": columnSubnet,
|
"columnSubnet": columnSubnet,
|
||||||
|
"blobCount": len(dataColumnSidecar.Column),
|
||||||
}).Debug("Broadcasted data column sidecar")
|
}).Debug("Broadcasted data column sidecar")
|
||||||
|
|
||||||
// Increase the number of successful broadcasts.
|
// Increase the number of successful broadcasts.
|
||||||
|
|||||||
@@ -324,18 +324,18 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
|
|||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
if err := vs.broadcastReceiveBlock(ctx, &wg, block, root); err != nil {
|
||||||
if err := vs.broadcastReceiveBlock(ctx, block, root); err != nil {
|
|
||||||
errChan <- errors.Wrap(err, "broadcast/receive block failed")
|
errChan <- errors.Wrap(err, "broadcast/receive block failed")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
errChan <- nil
|
errChan <- nil
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
if err := vs.broadcastAndReceiveSidecars(ctx, block, root, blobSidecars, dataColumnSidecars); err != nil {
|
if err := vs.broadcastAndReceiveSidecars(ctx, block, root, blobSidecars, dataColumnSidecars); err != nil {
|
||||||
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive sidecars: %v", err)
|
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive sidecars: %v", err)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
|
||||||
if err := <-errChan; err != nil {
|
if err := <-errChan; err != nil {
|
||||||
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive block: %v", err)
|
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive block: %v", err)
|
||||||
}
|
}
|
||||||
@@ -432,7 +432,26 @@ func (vs *Server) handleUnblindedBlock(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// broadcastReceiveBlock broadcasts a block and handles its reception.
|
// broadcastReceiveBlock broadcasts a block and handles its reception.
|
||||||
func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.SignedBeaconBlock, root [fieldparams.RootLength]byte) error {
|
func (vs *Server) broadcastReceiveBlock(ctx context.Context, wg *sync.WaitGroup, block interfaces.SignedBeaconBlock, root [fieldparams.RootLength]byte) error {
|
||||||
|
if err := vs.broadcastBlock(ctx, wg, block, root); err != nil {
|
||||||
|
return errors.Wrap(err, "broadcast block")
|
||||||
|
}
|
||||||
|
|
||||||
|
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
|
||||||
|
Type: blockfeed.ReceivedBlock,
|
||||||
|
Data: &blockfeed.ReceivedBlockData{SignedBlock: block},
|
||||||
|
})
|
||||||
|
|
||||||
|
if err := vs.BlockReceiver.ReceiveBlock(ctx, block, root, nil); err != nil {
|
||||||
|
return errors.Wrap(err, "receive block")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *Server) broadcastBlock(ctx context.Context, wg *sync.WaitGroup, block interfaces.SignedBeaconBlock, root [fieldparams.RootLength]byte) error {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
protoBlock, err := block.Proto()
|
protoBlock, err := block.Proto()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrap(err, "protobuf conversion failed")
|
return errors.Wrap(err, "protobuf conversion failed")
|
||||||
@@ -440,11 +459,13 @@ func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.Si
|
|||||||
if err := vs.P2P.Broadcast(ctx, protoBlock); err != nil {
|
if err := vs.P2P.Broadcast(ctx, protoBlock); err != nil {
|
||||||
return errors.Wrap(err, "broadcast failed")
|
return errors.Wrap(err, "broadcast failed")
|
||||||
}
|
}
|
||||||
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
|
|
||||||
Type: blockfeed.ReceivedBlock,
|
log.WithFields(logrus.Fields{
|
||||||
Data: &blockfeed.ReceivedBlockData{SignedBlock: block},
|
"slot": block.Block().Slot(),
|
||||||
})
|
"root": fmt.Sprintf("%#x", root),
|
||||||
return vs.BlockReceiver.ReceiveBlock(ctx, block, root, nil)
|
}).Debug("Broadcasted block")
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// broadcastAndReceiveBlobs handles the broadcasting and reception of blob sidecars.
|
// broadcastAndReceiveBlobs handles the broadcasting and reception of blob sidecars.
|
||||||
@@ -498,10 +519,6 @@ func (vs *Server) broadcastAndReceiveDataColumns(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := eg.Wait(); err != nil {
|
|
||||||
return errors.Wrap(err, "wait for data columns to be broadcasted")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := vs.DataColumnReceiver.ReceiveDataColumns(verifiedRODataColumns); err != nil {
|
if err := vs.DataColumnReceiver.ReceiveDataColumns(verifiedRODataColumns); err != nil {
|
||||||
return errors.Wrap(err, "receive data column")
|
return errors.Wrap(err, "receive data column")
|
||||||
}
|
}
|
||||||
@@ -512,6 +529,11 @@ func (vs *Server) broadcastAndReceiveDataColumns(
|
|||||||
Data: &operation.DataColumnSidecarReceivedData{DataColumn: &verifiedRODataColumn}, // #nosec G601
|
Data: &operation.DataColumnSidecarReceivedData{DataColumn: &verifiedRODataColumn}, // #nosec G601
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return errors.Wrap(err, "wait for data columns to be broadcasted")
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
4
changelog/manu-broadcast.md
Normal file
4
changelog/manu-broadcast.md
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
### Changed
|
||||||
|
- Broadcast block then sidecars, instead block and sidecars concurrently
|
||||||
|
- Broadcast and receive sidecars in concurrently, instead sequentially
|
||||||
|
|
||||||
Reference in New Issue
Block a user