Compare commits

...

3 Commits

Author SHA1 Message Date
Kasey Kirkham
73c63a358a only compute common info once, for all sidecars 2025-09-19 07:42:31 -06:00
Manu NALEPA
293a37bef1 Add more time markers 2025-09-19 12:28:05 +02:00
Manu NALEPA
b99cf362a5 Add a lot of time markers 2025-09-19 09:21:32 +02:00
7 changed files with 102 additions and 6 deletions

View File

@@ -5,6 +5,7 @@ go_library(
srcs = [
"das_core.go",
"info.go",
"log.go",
"metrics.go",
"p2p_interface.go",
"reconstruction.go",
@@ -33,6 +34,7 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

View File

@@ -0,0 +1,5 @@
package peerdas
import "github.com/sirupsen/logrus"
var log = logrus.WithField("prefix", "peerdas")

View File

@@ -205,18 +205,25 @@ func ComputeCellsAndProofsFromFlat(blobs [][]byte, cellProofs [][]byte) ([]kzg.C
}
cellsAndProofs := make([]kzg.CellsAndProofs, 0, blobCount)
log.Debug("TIME MARKER ZETA")
for i, blob := range blobs {
log.WithField("index", i).Debug("TIME MARKER EPSILON")
var kzgBlob kzg.Blob
if copy(kzgBlob[:], blob) != len(kzgBlob) {
return nil, errors.New("wrong blob size - should never happen")
}
log.WithField("index", i).Debug("TIME MARKER ETA")
// Compute the extended cells from the (non-extended) blob.
cells, err := kzg.ComputeCells(&kzgBlob)
if err != nil {
return nil, errors.Wrap(err, "compute cells")
}
log.WithField("index", i).Debug("TIME MARKER THETA")
var proofs []kzg.Proof
for idx := uint64(i) * numberOfColumns; idx < (uint64(i)+1)*numberOfColumns; idx++ {
var kzgProof kzg.Proof
@@ -227,10 +234,17 @@ func ComputeCellsAndProofsFromFlat(blobs [][]byte, cellProofs [][]byte) ([]kzg.C
proofs = append(proofs, kzgProof)
}
log.WithField("index", i).Debug("TIME MARKER IOTA")
cellsProofs := kzg.CellsAndProofs{Cells: cells, Proofs: proofs}
cellsAndProofs = append(cellsAndProofs, cellsProofs)
log.WithField("index", i).Debug("TIME MARKER KAPPA")
}
log.Debug("TIME MARKER LAMBDA")
return cellsAndProofs, nil
}

View File

@@ -109,15 +109,14 @@ func DataColumnSidecars(rows []kzg.CellsAndProofs, src ConstructionPopulator) ([
if err != nil {
return nil, errors.Wrap(err, "rotate cells and proofs")
}
info, err := src.extract()
if err != nil {
return nil, errors.Wrap(err, "extract block info")
}
maxIdx := params.BeaconConfig().NumberOfColumns
roSidecars := make([]blocks.RODataColumn, 0, maxIdx)
for idx := range maxIdx {
info, err := src.extract()
if err != nil {
return nil, errors.Wrap(err, "extract block info")
}
sidecar := &ethpb.DataColumnSidecar{
Index: idx,
Column: cells[idx],

View File

@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"reflect"
"strings"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/altair"
@@ -52,6 +53,7 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
tracing.AnnotateError(span, ErrMessageNotMapped)
return ErrMessageNotMapped
}
castMsg, ok := msg.(ssz.Marshaler)
if !ok {
return errors.Errorf("message of %T does not support marshaller interface", msg)
@@ -410,6 +412,10 @@ func (s *Service) findPeersIfNeeded(
// method to broadcast messages to other peers in our gossip mesh.
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error {
if strings.Contains(topic, "beacon_block") {
log.Debug("TIME MARKER D")
}
ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject")
defer span.End()
@@ -422,6 +428,10 @@ func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic
return err
}
if strings.Contains(topic, "beacon_block") {
log.Debug("TIME MARKER E")
}
if span.IsRecording() {
id := hash.FastSum64(buf.Bytes())
messageLen := int64(buf.Len())
@@ -434,6 +444,10 @@ func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic
tracing.AnnotateError(span, err)
return err
}
if strings.Contains(topic, "beacon_block") {
log.Debug("TIME MARKER L")
}
return nil
}

View File

@@ -81,20 +81,46 @@ func (s *Service) LeaveTopic(topic string) error {
// PublishToTopic joins (if necessary) and publishes a message to a PubSub topic.
func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte, opts ...pubsub.PubOpt) error {
if strings.Contains(topic, "beacon_block") {
log.Debug("TIME MARKER F")
}
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return err
}
if strings.Contains(topic, "beacon_block") {
log.Debug("TIME MARKER G")
}
// Wait for at least 1 peer to be available to receive the published message.
for {
if len(topicHandle.ListPeers()) > 0 || flags.Get().MinimumSyncPeers == 0 {
if strings.Contains(topic, "beacon_block") {
log.Debug("TIME MARKER H <--- main suspect after")
}
peers := topicHandle.ListPeers()
if strings.Contains(topic, "beacon_block") {
log.Debug("TIME MARKER I <--- main suspect before")
}
if len(peers) > 0 || flags.Get().MinimumSyncPeers == 0 {
return topicHandle.Publish(ctx, data, opts...)
}
if strings.Contains(topic, "beacon_block") {
log.Debug("TIME MARKER J")
}
select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "unable to find requisite number of peers for topic %s, 0 peers found to publish to", topic)
default:
if strings.Contains(topic, "beacon_block") {
log.Debug("TIME MARKER K")
}
time.Sleep(100 * time.Millisecond)
}
}

View File

@@ -284,6 +284,8 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
dataColumnSidecars []blocks.RODataColumn
)
log.Debug("TIME MARKER 01")
ctx, span := trace.StartSpan(ctx, "ProposerServer.ProposeBeaconBlock")
defer span.End()
@@ -291,15 +293,22 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
return nil, status.Errorf(codes.InvalidArgument, "empty request")
}
log.Debug("TIME MARKER 02")
block, err := blocks.NewSignedBeaconBlock(req.Block)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%s: %v", "decode block failed", err)
}
log.Debug("TIME MARKER 03")
root, err := block.Block().HashTreeRoot()
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not hash tree root: %v", err)
}
log.Debug("TIME MARKER 04")
// For post-Fulu blinded blocks, submit to relay and return early
if block.IsBlinded() && slots.ToEpoch(block.Block().Slot()) >= params.BeaconConfig().FuluForkEpoch {
err := vs.BlockBuilder.SubmitBlindedBlockPostFulu(ctx, block)
@@ -309,16 +318,22 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
log.Debug("TIME MARKER 05")
rob, err := blocks.NewROBlockWithRoot(block, root)
if block.IsBlinded() {
block, blobSidecars, err = vs.handleBlindedBlock(ctx, block)
} else if block.Version() >= version.Deneb {
log.Debug("TIME MARKER 05A")
blobSidecars, dataColumnSidecars, err = vs.handleUnblindedBlock(rob, req)
log.Debug("TIME MARKER 05B")
}
if err != nil {
return nil, status.Errorf(codes.Internal, "%s: %v", "handle block failed", err)
}
log.Debug("TIME MARKER 06")
var wg sync.WaitGroup
errChan := make(chan error, 1)
@@ -333,13 +348,20 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
wg.Wait()
log.Debug("TIME MARKER 07")
if err := vs.broadcastAndReceiveSidecars(ctx, block, root, blobSidecars, dataColumnSidecars); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive sidecars: %v", err)
}
log.Debug("TIME MARKER 08")
if err := <-errChan; err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive block: %v", err)
}
log.Debug("TIME MARKER 09")
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
@@ -402,24 +424,32 @@ func (vs *Server) handleUnblindedBlock(
block blocks.ROBlock,
req *ethpb.GenericSignedBeaconBlock,
) ([]*ethpb.BlobSidecar, []blocks.RODataColumn, error) {
log.Debug("TIME MARKER ALPHA")
rawBlobs, proofs, err := blobsAndProofs(req)
if err != nil {
return nil, nil, err
}
log.Debug("TIME MARKER BETA")
if block.Version() >= version.Fulu {
log.Debug("TIME MARKER GAMMA")
// Compute cells and proofs from the blobs and cell proofs.
cellsAndProofs, err := peerdas.ComputeCellsAndProofsFromFlat(rawBlobs, proofs)
if err != nil {
return nil, nil, errors.Wrap(err, "compute cells and proofs")
}
log.Debug("TIME MARKER DELTA")
// Construct data column sidecars from the signed block and cells and proofs.
roDataColumnSidecars, err := peerdas.DataColumnSidecars(cellsAndProofs, peerdas.PopulateFromBlock(block))
if err != nil {
return nil, nil, errors.Wrap(err, "data column sidcars")
}
log.Debug("TIME MARKER EPSILON")
return nil, roDataColumnSidecars, nil
}
@@ -433,6 +463,7 @@ func (vs *Server) handleUnblindedBlock(
// broadcastReceiveBlock broadcasts a block and handles its reception.
func (vs *Server) broadcastReceiveBlock(ctx context.Context, wg *sync.WaitGroup, block interfaces.SignedBeaconBlock, root [fieldparams.RootLength]byte) error {
log.Debug("TIME MARKER A")
if err := vs.broadcastBlock(ctx, wg, block, root); err != nil {
return errors.Wrap(err, "broadcast block")
}
@@ -452,10 +483,15 @@ func (vs *Server) broadcastReceiveBlock(ctx context.Context, wg *sync.WaitGroup,
func (vs *Server) broadcastBlock(ctx context.Context, wg *sync.WaitGroup, block interfaces.SignedBeaconBlock, root [fieldparams.RootLength]byte) error {
defer wg.Done()
log.Debug("TIME MARKER B")
protoBlock, err := block.Proto()
if err != nil {
return errors.Wrap(err, "protobuf conversion failed")
}
log.Debug("TIME MARKER C")
if err := vs.P2P.Broadcast(ctx, protoBlock); err != nil {
return errors.Wrap(err, "broadcast failed")
}