mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-31 08:08:18 -05:00
Compare commits
11 Commits
e2e-debugg
...
block-prop
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
73c63a358a | ||
|
|
293a37bef1 | ||
|
|
b99cf362a5 | ||
|
|
9b551959c4 | ||
|
|
8f9026bed8 | ||
|
|
2568e2e087 | ||
|
|
62a4fca4d5 | ||
|
|
7059cf4cf2 | ||
|
|
5c64cb9eb6 | ||
|
|
1bf2188f81 | ||
|
|
d78cb1fd67 |
@@ -5,6 +5,7 @@ go_library(
|
|||||||
srcs = [
|
srcs = [
|
||||||
"das_core.go",
|
"das_core.go",
|
||||||
"info.go",
|
"info.go",
|
||||||
|
"log.go",
|
||||||
"metrics.go",
|
"metrics.go",
|
||||||
"p2p_interface.go",
|
"p2p_interface.go",
|
||||||
"reconstruction.go",
|
"reconstruction.go",
|
||||||
@@ -33,6 +34,7 @@ go_library(
|
|||||||
"@com_github_pkg_errors//:go_default_library",
|
"@com_github_pkg_errors//:go_default_library",
|
||||||
"@com_github_prometheus_client_golang//prometheus:go_default_library",
|
"@com_github_prometheus_client_golang//prometheus:go_default_library",
|
||||||
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
||||||
|
"@com_github_sirupsen_logrus//:go_default_library",
|
||||||
"@org_golang_x_sync//errgroup:go_default_library",
|
"@org_golang_x_sync//errgroup:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
|
|||||||
5
beacon-chain/core/peerdas/log.go
Normal file
5
beacon-chain/core/peerdas/log.go
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
package peerdas
|
||||||
|
|
||||||
|
import "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
var log = logrus.WithField("prefix", "peerdas")
|
||||||
@@ -205,18 +205,25 @@ func ComputeCellsAndProofsFromFlat(blobs [][]byte, cellProofs [][]byte) ([]kzg.C
|
|||||||
}
|
}
|
||||||
|
|
||||||
cellsAndProofs := make([]kzg.CellsAndProofs, 0, blobCount)
|
cellsAndProofs := make([]kzg.CellsAndProofs, 0, blobCount)
|
||||||
|
log.Debug("TIME MARKER ZETA")
|
||||||
for i, blob := range blobs {
|
for i, blob := range blobs {
|
||||||
|
log.WithField("index", i).Debug("TIME MARKER EPSILON")
|
||||||
|
|
||||||
var kzgBlob kzg.Blob
|
var kzgBlob kzg.Blob
|
||||||
if copy(kzgBlob[:], blob) != len(kzgBlob) {
|
if copy(kzgBlob[:], blob) != len(kzgBlob) {
|
||||||
return nil, errors.New("wrong blob size - should never happen")
|
return nil, errors.New("wrong blob size - should never happen")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithField("index", i).Debug("TIME MARKER ETA")
|
||||||
|
|
||||||
// Compute the extended cells from the (non-extended) blob.
|
// Compute the extended cells from the (non-extended) blob.
|
||||||
cells, err := kzg.ComputeCells(&kzgBlob)
|
cells, err := kzg.ComputeCells(&kzgBlob)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "compute cells")
|
return nil, errors.Wrap(err, "compute cells")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithField("index", i).Debug("TIME MARKER THETA")
|
||||||
|
|
||||||
var proofs []kzg.Proof
|
var proofs []kzg.Proof
|
||||||
for idx := uint64(i) * numberOfColumns; idx < (uint64(i)+1)*numberOfColumns; idx++ {
|
for idx := uint64(i) * numberOfColumns; idx < (uint64(i)+1)*numberOfColumns; idx++ {
|
||||||
var kzgProof kzg.Proof
|
var kzgProof kzg.Proof
|
||||||
@@ -227,10 +234,17 @@ func ComputeCellsAndProofsFromFlat(blobs [][]byte, cellProofs [][]byte) ([]kzg.C
|
|||||||
proofs = append(proofs, kzgProof)
|
proofs = append(proofs, kzgProof)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithField("index", i).Debug("TIME MARKER IOTA")
|
||||||
|
|
||||||
cellsProofs := kzg.CellsAndProofs{Cells: cells, Proofs: proofs}
|
cellsProofs := kzg.CellsAndProofs{Cells: cells, Proofs: proofs}
|
||||||
cellsAndProofs = append(cellsAndProofs, cellsProofs)
|
cellsAndProofs = append(cellsAndProofs, cellsProofs)
|
||||||
|
|
||||||
|
log.WithField("index", i).Debug("TIME MARKER KAPPA")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("TIME MARKER LAMBDA")
|
||||||
|
|
||||||
return cellsAndProofs, nil
|
return cellsAndProofs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -109,15 +109,14 @@ func DataColumnSidecars(rows []kzg.CellsAndProofs, src ConstructionPopulator) ([
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrap(err, "rotate cells and proofs")
|
return nil, errors.Wrap(err, "rotate cells and proofs")
|
||||||
}
|
}
|
||||||
|
info, err := src.extract()
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "extract block info")
|
||||||
|
}
|
||||||
|
|
||||||
maxIdx := params.BeaconConfig().NumberOfColumns
|
maxIdx := params.BeaconConfig().NumberOfColumns
|
||||||
roSidecars := make([]blocks.RODataColumn, 0, maxIdx)
|
roSidecars := make([]blocks.RODataColumn, 0, maxIdx)
|
||||||
for idx := range maxIdx {
|
for idx := range maxIdx {
|
||||||
info, err := src.extract()
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrap(err, "extract block info")
|
|
||||||
}
|
|
||||||
|
|
||||||
sidecar := ðpb.DataColumnSidecar{
|
sidecar := ðpb.DataColumnSidecar{
|
||||||
Index: idx,
|
Index: idx,
|
||||||
Column: cells[idx],
|
Column: cells[idx],
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/altair"
|
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/altair"
|
||||||
@@ -52,6 +53,7 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
|
|||||||
tracing.AnnotateError(span, ErrMessageNotMapped)
|
tracing.AnnotateError(span, ErrMessageNotMapped)
|
||||||
return ErrMessageNotMapped
|
return ErrMessageNotMapped
|
||||||
}
|
}
|
||||||
|
|
||||||
castMsg, ok := msg.(ssz.Marshaler)
|
castMsg, ok := msg.(ssz.Marshaler)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.Errorf("message of %T does not support marshaller interface", msg)
|
return errors.Errorf("message of %T does not support marshaller interface", msg)
|
||||||
@@ -381,6 +383,7 @@ func (s *Service) internalBroadcastDataColumnSidecar(
|
|||||||
"timeSinceSlotStart": time.Since(slotStartTime),
|
"timeSinceSlotStart": time.Since(slotStartTime),
|
||||||
"root": fmt.Sprintf("%#x", dataColumnSidecar.BlockRoot()),
|
"root": fmt.Sprintf("%#x", dataColumnSidecar.BlockRoot()),
|
||||||
"columnSubnet": columnSubnet,
|
"columnSubnet": columnSubnet,
|
||||||
|
"blobCount": len(dataColumnSidecar.Column),
|
||||||
}).Debug("Broadcasted data column sidecar")
|
}).Debug("Broadcasted data column sidecar")
|
||||||
|
|
||||||
// Increase the number of successful broadcasts.
|
// Increase the number of successful broadcasts.
|
||||||
@@ -409,6 +412,10 @@ func (s *Service) findPeersIfNeeded(
|
|||||||
|
|
||||||
// method to broadcast messages to other peers in our gossip mesh.
|
// method to broadcast messages to other peers in our gossip mesh.
|
||||||
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error {
|
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error {
|
||||||
|
if strings.Contains(topic, "beacon_block") {
|
||||||
|
log.Debug("TIME MARKER D")
|
||||||
|
}
|
||||||
|
|
||||||
ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject")
|
ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
@@ -421,6 +428,10 @@ func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if strings.Contains(topic, "beacon_block") {
|
||||||
|
log.Debug("TIME MARKER E")
|
||||||
|
}
|
||||||
|
|
||||||
if span.IsRecording() {
|
if span.IsRecording() {
|
||||||
id := hash.FastSum64(buf.Bytes())
|
id := hash.FastSum64(buf.Bytes())
|
||||||
messageLen := int64(buf.Len())
|
messageLen := int64(buf.Len())
|
||||||
@@ -433,6 +444,10 @@ func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic
|
|||||||
tracing.AnnotateError(span, err)
|
tracing.AnnotateError(span, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if strings.Contains(topic, "beacon_block") {
|
||||||
|
log.Debug("TIME MARKER L")
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -81,20 +81,46 @@ func (s *Service) LeaveTopic(topic string) error {
|
|||||||
|
|
||||||
// PublishToTopic joins (if necessary) and publishes a message to a PubSub topic.
|
// PublishToTopic joins (if necessary) and publishes a message to a PubSub topic.
|
||||||
func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte, opts ...pubsub.PubOpt) error {
|
func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte, opts ...pubsub.PubOpt) error {
|
||||||
|
if strings.Contains(topic, "beacon_block") {
|
||||||
|
log.Debug("TIME MARKER F")
|
||||||
|
}
|
||||||
topicHandle, err := s.JoinTopic(topic)
|
topicHandle, err := s.JoinTopic(topic)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if strings.Contains(topic, "beacon_block") {
|
||||||
|
log.Debug("TIME MARKER G")
|
||||||
|
}
|
||||||
|
|
||||||
// Wait for at least 1 peer to be available to receive the published message.
|
// Wait for at least 1 peer to be available to receive the published message.
|
||||||
for {
|
for {
|
||||||
if len(topicHandle.ListPeers()) > 0 || flags.Get().MinimumSyncPeers == 0 {
|
if strings.Contains(topic, "beacon_block") {
|
||||||
|
log.Debug("TIME MARKER H <--- main suspect after")
|
||||||
|
}
|
||||||
|
|
||||||
|
peers := topicHandle.ListPeers()
|
||||||
|
|
||||||
|
if strings.Contains(topic, "beacon_block") {
|
||||||
|
log.Debug("TIME MARKER I <--- main suspect before")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(peers) > 0 || flags.Get().MinimumSyncPeers == 0 {
|
||||||
return topicHandle.Publish(ctx, data, opts...)
|
return topicHandle.Publish(ctx, data, opts...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if strings.Contains(topic, "beacon_block") {
|
||||||
|
log.Debug("TIME MARKER J")
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.Wrapf(ctx.Err(), "unable to find requisite number of peers for topic %s, 0 peers found to publish to", topic)
|
return errors.Wrapf(ctx.Err(), "unable to find requisite number of peers for topic %s, 0 peers found to publish to", topic)
|
||||||
default:
|
default:
|
||||||
|
if strings.Contains(topic, "beacon_block") {
|
||||||
|
log.Debug("TIME MARKER K")
|
||||||
|
}
|
||||||
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -284,6 +284,8 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
|
|||||||
dataColumnSidecars []blocks.RODataColumn
|
dataColumnSidecars []blocks.RODataColumn
|
||||||
)
|
)
|
||||||
|
|
||||||
|
log.Debug("TIME MARKER 01")
|
||||||
|
|
||||||
ctx, span := trace.StartSpan(ctx, "ProposerServer.ProposeBeaconBlock")
|
ctx, span := trace.StartSpan(ctx, "ProposerServer.ProposeBeaconBlock")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
@@ -291,15 +293,22 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
|
|||||||
return nil, status.Errorf(codes.InvalidArgument, "empty request")
|
return nil, status.Errorf(codes.InvalidArgument, "empty request")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("TIME MARKER 02")
|
||||||
|
|
||||||
block, err := blocks.NewSignedBeaconBlock(req.Block)
|
block, err := blocks.NewSignedBeaconBlock(req.Block)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, status.Errorf(codes.InvalidArgument, "%s: %v", "decode block failed", err)
|
return nil, status.Errorf(codes.InvalidArgument, "%s: %v", "decode block failed", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("TIME MARKER 03")
|
||||||
|
|
||||||
root, err := block.Block().HashTreeRoot()
|
root, err := block.Block().HashTreeRoot()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, status.Errorf(codes.Internal, "Could not hash tree root: %v", err)
|
return nil, status.Errorf(codes.Internal, "Could not hash tree root: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("TIME MARKER 04")
|
||||||
|
|
||||||
// For post-Fulu blinded blocks, submit to relay and return early
|
// For post-Fulu blinded blocks, submit to relay and return early
|
||||||
if block.IsBlinded() && slots.ToEpoch(block.Block().Slot()) >= params.BeaconConfig().FuluForkEpoch {
|
if block.IsBlinded() && slots.ToEpoch(block.Block().Slot()) >= params.BeaconConfig().FuluForkEpoch {
|
||||||
err := vs.BlockBuilder.SubmitBlindedBlockPostFulu(ctx, block)
|
err := vs.BlockBuilder.SubmitBlindedBlockPostFulu(ctx, block)
|
||||||
@@ -309,37 +318,50 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
|
|||||||
return ðpb.ProposeResponse{BlockRoot: root[:]}, nil
|
return ðpb.ProposeResponse{BlockRoot: root[:]}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("TIME MARKER 05")
|
||||||
|
|
||||||
rob, err := blocks.NewROBlockWithRoot(block, root)
|
rob, err := blocks.NewROBlockWithRoot(block, root)
|
||||||
if block.IsBlinded() {
|
if block.IsBlinded() {
|
||||||
block, blobSidecars, err = vs.handleBlindedBlock(ctx, block)
|
block, blobSidecars, err = vs.handleBlindedBlock(ctx, block)
|
||||||
} else if block.Version() >= version.Deneb {
|
} else if block.Version() >= version.Deneb {
|
||||||
|
log.Debug("TIME MARKER 05A")
|
||||||
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(rob, req)
|
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(rob, req)
|
||||||
|
log.Debug("TIME MARKER 05B")
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, status.Errorf(codes.Internal, "%s: %v", "handle block failed", err)
|
return nil, status.Errorf(codes.Internal, "%s: %v", "handle block failed", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("TIME MARKER 06")
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
errChan := make(chan error, 1)
|
errChan := make(chan error, 1)
|
||||||
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
if err := vs.broadcastReceiveBlock(ctx, &wg, block, root); err != nil {
|
||||||
if err := vs.broadcastReceiveBlock(ctx, block, root); err != nil {
|
|
||||||
errChan <- errors.Wrap(err, "broadcast/receive block failed")
|
errChan <- errors.Wrap(err, "broadcast/receive block failed")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
errChan <- nil
|
errChan <- nil
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
log.Debug("TIME MARKER 07")
|
||||||
|
|
||||||
if err := vs.broadcastAndReceiveSidecars(ctx, block, root, blobSidecars, dataColumnSidecars); err != nil {
|
if err := vs.broadcastAndReceiveSidecars(ctx, block, root, blobSidecars, dataColumnSidecars); err != nil {
|
||||||
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive sidecars: %v", err)
|
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive sidecars: %v", err)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
|
||||||
|
log.Debug("TIME MARKER 08")
|
||||||
|
|
||||||
if err := <-errChan; err != nil {
|
if err := <-errChan; err != nil {
|
||||||
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive block: %v", err)
|
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive block: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("TIME MARKER 09")
|
||||||
|
|
||||||
return ðpb.ProposeResponse{BlockRoot: root[:]}, nil
|
return ðpb.ProposeResponse{BlockRoot: root[:]}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -402,24 +424,32 @@ func (vs *Server) handleUnblindedBlock(
|
|||||||
block blocks.ROBlock,
|
block blocks.ROBlock,
|
||||||
req *ethpb.GenericSignedBeaconBlock,
|
req *ethpb.GenericSignedBeaconBlock,
|
||||||
) ([]*ethpb.BlobSidecar, []blocks.RODataColumn, error) {
|
) ([]*ethpb.BlobSidecar, []blocks.RODataColumn, error) {
|
||||||
|
log.Debug("TIME MARKER ALPHA")
|
||||||
rawBlobs, proofs, err := blobsAndProofs(req)
|
rawBlobs, proofs, err := blobsAndProofs(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
log.Debug("TIME MARKER BETA")
|
||||||
|
|
||||||
if block.Version() >= version.Fulu {
|
if block.Version() >= version.Fulu {
|
||||||
|
log.Debug("TIME MARKER GAMMA")
|
||||||
|
|
||||||
// Compute cells and proofs from the blobs and cell proofs.
|
// Compute cells and proofs from the blobs and cell proofs.
|
||||||
cellsAndProofs, err := peerdas.ComputeCellsAndProofsFromFlat(rawBlobs, proofs)
|
cellsAndProofs, err := peerdas.ComputeCellsAndProofsFromFlat(rawBlobs, proofs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, errors.Wrap(err, "compute cells and proofs")
|
return nil, nil, errors.Wrap(err, "compute cells and proofs")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("TIME MARKER DELTA")
|
||||||
|
|
||||||
// Construct data column sidecars from the signed block and cells and proofs.
|
// Construct data column sidecars from the signed block and cells and proofs.
|
||||||
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(block))
|
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(block))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, errors.Wrap(err, "data column sidcars")
|
return nil, nil, errors.Wrap(err, "data column sidcars")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debug("TIME MARKER EPSILON")
|
||||||
|
|
||||||
return nil, roDataColumnSidecars, nil
|
return nil, roDataColumnSidecars, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -432,19 +462,46 @@ func (vs *Server) handleUnblindedBlock(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// broadcastReceiveBlock broadcasts a block and handles its reception.
|
// broadcastReceiveBlock broadcasts a block and handles its reception.
|
||||||
func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.SignedBeaconBlock, root [fieldparams.RootLength]byte) error {
|
func (vs *Server) broadcastReceiveBlock(ctx context.Context, wg *sync.WaitGroup, block interfaces.SignedBeaconBlock, root [fieldparams.RootLength]byte) error {
|
||||||
protoBlock, err := block.Proto()
|
log.Debug("TIME MARKER A")
|
||||||
if err != nil {
|
if err := vs.broadcastBlock(ctx, wg, block, root); err != nil {
|
||||||
return errors.Wrap(err, "protobuf conversion failed")
|
return errors.Wrap(err, "broadcast block")
|
||||||
}
|
|
||||||
if err := vs.P2P.Broadcast(ctx, protoBlock); err != nil {
|
|
||||||
return errors.Wrap(err, "broadcast failed")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
|
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
|
||||||
Type: blockfeed.ReceivedBlock,
|
Type: blockfeed.ReceivedBlock,
|
||||||
Data: &blockfeed.ReceivedBlockData{SignedBlock: block},
|
Data: &blockfeed.ReceivedBlockData{SignedBlock: block},
|
||||||
})
|
})
|
||||||
return vs.BlockReceiver.ReceiveBlock(ctx, block, root, nil)
|
|
||||||
|
if err := vs.BlockReceiver.ReceiveBlock(ctx, block, root, nil); err != nil {
|
||||||
|
return errors.Wrap(err, "receive block")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (vs *Server) broadcastBlock(ctx context.Context, wg *sync.WaitGroup, block interfaces.SignedBeaconBlock, root [fieldparams.RootLength]byte) error {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
log.Debug("TIME MARKER B")
|
||||||
|
|
||||||
|
protoBlock, err := block.Proto()
|
||||||
|
if err != nil {
|
||||||
|
return errors.Wrap(err, "protobuf conversion failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug("TIME MARKER C")
|
||||||
|
|
||||||
|
if err := vs.P2P.Broadcast(ctx, protoBlock); err != nil {
|
||||||
|
return errors.Wrap(err, "broadcast failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
log.WithFields(logrus.Fields{
|
||||||
|
"slot": block.Block().Slot(),
|
||||||
|
"root": fmt.Sprintf("%#x", root),
|
||||||
|
}).Debug("Broadcasted block")
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// broadcastAndReceiveBlobs handles the broadcasting and reception of blob sidecars.
|
// broadcastAndReceiveBlobs handles the broadcasting and reception of blob sidecars.
|
||||||
@@ -498,10 +555,6 @@ func (vs *Server) broadcastAndReceiveDataColumns(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := eg.Wait(); err != nil {
|
|
||||||
return errors.Wrap(err, "wait for data columns to be broadcasted")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := vs.DataColumnReceiver.ReceiveDataColumns(verifiedRODataColumns); err != nil {
|
if err := vs.DataColumnReceiver.ReceiveDataColumns(verifiedRODataColumns); err != nil {
|
||||||
return errors.Wrap(err, "receive data column")
|
return errors.Wrap(err, "receive data column")
|
||||||
}
|
}
|
||||||
@@ -512,6 +565,11 @@ func (vs *Server) broadcastAndReceiveDataColumns(
|
|||||||
Data: &operation.DataColumnSidecarReceivedData{DataColumn: &verifiedRODataColumn}, // #nosec G601
|
Data: &operation.DataColumnSidecarReceivedData{DataColumn: &verifiedRODataColumn}, // #nosec G601
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := eg.Wait(); err != nil {
|
||||||
|
return errors.Wrap(err, "wait for data columns to be broadcasted")
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
4
changelog/manu-broadcast.md
Normal file
4
changelog/manu-broadcast.md
Normal file
@@ -0,0 +1,4 @@
|
|||||||
|
### Changed
|
||||||
|
- Broadcast block then sidecars, instead block and sidecars concurrently
|
||||||
|
- Broadcast and receive sidecars in concurrently, instead sequentially
|
||||||
|
|
||||||
Reference in New Issue
Block a user