Aggregate logs when broadcasting data column sidecars (#15748)

* Aggregate logs when broadcasting data column sidecars

* Fix James' comment.
This commit is contained in:
Manu NALEPA
2025-10-01 10:21:10 +02:00
committed by GitHub
parent 08c855fd4b
commit 45d6002411
11 changed files with 160 additions and 114 deletions

View File

@@ -89,7 +89,7 @@ func (mb *mockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
return nil
}
func (mb *mockBroadcaster) BroadcastDataColumnSidecar(_ uint64, _ blocks.VerifiedRODataColumn) error {
func (mb *mockBroadcaster) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn) error {
mb.broadcastCalled = true
return nil
}

View File

@@ -162,6 +162,7 @@ go_test(
"//cmd/beacon-chain/flags:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//consensus-types/wrapper:go_default_library",

View File

@@ -5,14 +5,18 @@ import (
"context"
"fmt"
"reflect"
"slices"
"sync"
"time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/altair"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/helpers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/crypto/hash"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
@@ -306,86 +310,150 @@ func (s *Service) BroadcastLightClientFinalityUpdate(ctx context.Context, update
return nil
}
// BroadcastDataColumnSidecar broadcasts a data column to the p2p network, the message is assumed to be
// broadcasted to the current fork and to the input column subnet.
func (s *Service) BroadcastDataColumnSidecar(
dataColumnSubnet uint64,
dataColumnSidecar blocks.VerifiedRODataColumn,
) error {
// Add tracing to the function.
ctx, span := trace.StartSpan(s.ctx, "p2p.BroadcastDataColumnSidecar")
defer span.End()
// BroadcastDataColumnSidecars broadcasts multiple data column sidecars to the p2p network, after ensuring
// there is at least one peer in each needed subnet. If not, it will attempt to find one before broadcasting.
// This function is non-blocking. It stops trying to broadcast a given sidecar when more than one slot has passed, or the context is
// cancelled (whichever comes first).
func (s *Service) BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn) error {
// Increase the number of broadcast attempts.
dataColumnSidecarBroadcastAttempts.Add(float64(len(sidecars)))
// Retrieve the current fork digest.
forkDigest, err := s.currentForkDigest()
if err != nil {
err := errors.Wrap(err, "current fork digest")
tracing.AnnotateError(span, err)
return err
return errors.Wrap(err, "current fork digest")
}
// Non-blocking broadcast, with attempts to discover a column subnet peer if none available.
go s.internalBroadcastDataColumnSidecar(ctx, dataColumnSubnet, dataColumnSidecar, forkDigest)
go s.broadcastDataColumnSidecars(ctx, forkDigest, sidecars)
return nil
}
func (s *Service) internalBroadcastDataColumnSidecar(
ctx context.Context,
columnSubnet uint64,
dataColumnSidecar blocks.VerifiedRODataColumn,
forkDigest [fieldparams.VersionLength]byte,
) {
// Add tracing to the function.
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastDataColumnSidecar")
defer span.End()
// Increase the number of broadcast attempts.
dataColumnSidecarBroadcastAttempts.Inc()
// Define a one-slot length context timeout.
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
oneSlot := time.Duration(secondsPerSlot) * time.Second
ctx, cancel := context.WithTimeout(ctx, oneSlot)
defer cancel()
// Build the topic corresponding to this column subnet and this fork digest.
topic := dataColumnSubnetToTopic(columnSubnet, forkDigest)
// Compute the wrapped subnet index.
wrappedSubIdx := columnSubnet + dataColumnSubnetVal
// Find peers if needed.
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, columnSubnet); err != nil {
log.WithError(err).Error("Failed to find peers for data column subnet")
tracing.AnnotateError(span, err)
// broadcastDataColumnSidecars broadcasts multiple data column sidecars to the p2p network, after ensuring
// there is at least one peer in each needed subnet. If not, it will attempt to find one before broadcasting.
// It returns when all broadcasts are complete, or the context is cancelled (whichever comes first).
func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [fieldparams.VersionLength]byte, sidecars []blocks.VerifiedRODataColumn) {
type rootAndIndex struct {
root [fieldparams.RootLength]byte
index uint64
}
// Broadcast the data column sidecar to the network.
if err := s.broadcastObject(ctx, dataColumnSidecar, topic); err != nil {
log.WithError(err).Error("Failed to broadcast data column sidecar")
tracing.AnnotateError(span, err)
var (
wg sync.WaitGroup
timings sync.Map
)
logLevel := logrus.GetLevel()
slotPerRoot := make(map[[fieldparams.RootLength]byte]primitives.Slot, 1)
for _, sidecar := range sidecars {
slotPerRoot[sidecar.BlockRoot()] = sidecar.Slot()
wg.Go(func() {
// Add tracing to the function.
ctx, span := trace.StartSpan(s.ctx, "p2p.broadcastDataColumnSidecars")
defer span.End()
// Compute the subnet for this data column sidecar.
subnet := peerdas.ComputeSubnetForDataColumnSidecar(sidecar.Index)
// Build the topic corresponding to subnet column subnet and this fork digest.
topic := dataColumnSubnetToTopic(subnet, forkDigest)
// Compute the wrapped subnet index.
wrappedSubIdx := subnet + dataColumnSubnetVal
// Find peers if needed.
if err := s.findPeersIfNeeded(ctx, wrappedSubIdx, DataColumnSubnetTopicFormat, forkDigest, subnet); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot find peers if needed")
return
}
// Broadcast the data column sidecar to the network.
if err := s.broadcastObject(ctx, sidecar, topic); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot broadcast data column sidecar")
return
}
// Increase the number of successful broadcasts.
dataColumnSidecarBroadcasts.Inc()
// Record the timing for log purposes.
if logLevel >= logrus.DebugLevel {
root := sidecar.BlockRoot()
timings.Store(rootAndIndex{root: root, index: sidecar.Index}, time.Now())
}
})
}
// Wait for all broadcasts to finish.
wg.Wait()
// The rest of this function is only for debug logging purposes.
if logLevel < logrus.DebugLevel {
return
}
header := dataColumnSidecar.SignedBlockHeader.GetHeader()
slot := header.GetSlot()
slotStartTime, err := slots.StartTime(s.genesisTime, slot)
if err != nil {
log.WithError(err).Error("Failed to convert slot to time")
type logInfo struct {
durationMin time.Duration
durationMax time.Duration
indices []uint64
}
log.WithFields(logrus.Fields{
"slot": slot,
"timeSinceSlotStart": time.Since(slotStartTime),
"root": fmt.Sprintf("%#x", dataColumnSidecar.BlockRoot()),
"columnSubnet": columnSubnet,
"blobCount": len(dataColumnSidecar.Column),
}).Debug("Broadcasted data column sidecar")
logInfoPerRoot := make(map[[fieldparams.RootLength]byte]*logInfo, 1)
// Increase the number of successful broadcasts.
dataColumnSidecarBroadcasts.Inc()
timings.Range(func(key any, value any) bool {
rootAndIndex, ok := key.(rootAndIndex)
if !ok {
log.Error("Could not cast key to rootAndIndex")
return true
}
broadcastTime, ok := value.(time.Time)
if !ok {
log.Error("Could not cast value to time.Time")
return true
}
slot, ok := slotPerRoot[rootAndIndex.root]
if !ok {
log.WithField("root", fmt.Sprintf("%#x", rootAndIndex.root)).Error("Could not find slot for root")
return true
}
duration, err := slots.SinceSlotStart(slot, s.genesisTime, broadcastTime)
if err != nil {
log.WithError(err).Error("Could not compute duration since slot start")
return true
}
info, ok := logInfoPerRoot[rootAndIndex.root]
if !ok {
logInfoPerRoot[rootAndIndex.root] = &logInfo{durationMin: duration, durationMax: duration, indices: []uint64{rootAndIndex.index}}
return true
}
info.durationMin = min(info.durationMin, duration)
info.durationMax = max(info.durationMax, duration)
info.indices = append(info.indices, rootAndIndex.index)
return true
})
for root, info := range logInfoPerRoot {
slices.Sort(info.indices)
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"slot": slotPerRoot[root],
"count": len(info.indices),
"indices": helpers.PrettySlice(info.indices),
"timeSinceSlotStartMin": info.durationMin,
"timeSinceSlotStartMax": info.durationMax,
}).Debug("Broadcasted data column sidecars")
}
}
func (s *Service) findPeersIfNeeded(

View File

@@ -19,6 +19,7 @@ import (
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v6/consensus-types/wrapper"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
@@ -664,6 +665,8 @@ func TestService_BroadcastDataColumn(t *testing.T) {
topicFormat = DataColumnSubnetTopicFormat
)
ctx := t.Context()
// Load the KZG trust setup.
err := kzg.Start()
require.NoError(t, err)
@@ -686,7 +689,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
_, pkey, ipAddr := createHost(t, port)
service := &Service{
ctx: t.Context(),
ctx: ctx,
host: p1.BHost,
pubsub: p1.PubSub(),
joinedTopics: map[string]*pubsub.Topic{},
@@ -695,7 +698,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),
subnetsLock: make(map[uint64]*sync.RWMutex),
subnetsLockLock: sync.Mutex{},
peers: peers.NewStatus(t.Context(), &peers.StatusConfig{ScorerParams: &scorers.Config{}}),
peers: peers.NewStatus(ctx, &peers.StatusConfig{ScorerParams: &scorers.Config{}}),
custodyInfo: &custodyInfo{},
}
@@ -722,7 +725,7 @@ func TestService_BroadcastDataColumn(t *testing.T) {
time.Sleep(50 * time.Millisecond)
// Broadcast to peers and wait.
err = service.BroadcastDataColumnSidecar(subnet, verifiedRoSidecar)
err = service.BroadcastDataColumnSidecars(ctx, []blocks.VerifiedRODataColumn{verifiedRoSidecar})
require.NoError(t, err)
// Receive the message.

View File

@@ -52,7 +52,7 @@ type (
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastLightClientOptimisticUpdate(ctx context.Context, update interfaces.LightClientOptimisticUpdate) error
BroadcastLightClientFinalityUpdate(ctx context.Context, update interfaces.LightClientFinalityUpdate) error
BroadcastDataColumnSidecar(columnSubnet uint64, dataColumnSidecar blocks.VerifiedRODataColumn) error
BroadcastDataColumnSidecars(ctx context.Context, sidecars []blocks.VerifiedRODataColumn) error
}
// SetStreamHandler configures p2p to handle streams of a certain topic ID.

View File

@@ -169,7 +169,7 @@ func (*FakeP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interfac
}
// BroadcastDataColumnSidecar -- fake.
func (*FakeP2P) BroadcastDataColumnSidecar(_ uint64, _ blocks.VerifiedRODataColumn) error {
func (*FakeP2P) BroadcastDataColumnSidecars(_ context.Context, _ []blocks.VerifiedRODataColumn) error {
return nil
}

View File

@@ -63,7 +63,7 @@ func (m *MockBroadcaster) BroadcastLightClientFinalityUpdate(_ context.Context,
}
// BroadcastDataColumnSidecar broadcasts a data column for mock.
func (m *MockBroadcaster) BroadcastDataColumnSidecar(uint64, blocks.VerifiedRODataColumn) error {
func (m *MockBroadcaster) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn) error {
m.BroadcastCalled.Store(true)
return nil
}

View File

@@ -233,7 +233,7 @@ func (p *TestP2P) BroadcastLightClientFinalityUpdate(_ context.Context, _ interf
}
// BroadcastDataColumnSidecar broadcasts a data column for mock.
func (p *TestP2P) BroadcastDataColumnSidecar(uint64, blocks.VerifiedRODataColumn) error {
func (p *TestP2P) BroadcastDataColumnSidecars(context.Context, []blocks.VerifiedRODataColumn) error {
p.BroadcastCalled.Store(true)
return nil
}

View File

@@ -352,7 +352,7 @@ func (vs *Server) broadcastAndReceiveSidecars(
dataColumnSidecars []blocks.RODataColumn,
) error {
if block.Version() >= version.Fulu {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars, root); err != nil {
if err := vs.broadcastAndReceiveDataColumns(ctx, dataColumnSidecars); err != nil {
return errors.Wrap(err, "broadcast and receive data columns")
}
return nil
@@ -495,43 +495,22 @@ func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethp
}
// broadcastAndReceiveDataColumns handles the broadcasting and reception of data columns sidecars.
func (vs *Server) broadcastAndReceiveDataColumns(
ctx context.Context,
roSidecars []blocks.RODataColumn,
root [fieldparams.RootLength]byte,
) error {
verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
eg, _ := errgroup.WithContext(ctx)
for _, roSidecar := range roSidecars {
// We build this block ourselves, so we can upgrade the read only data column sidecar into a verified one.
verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roSidecar)
verifiedRODataColumns = append(verifiedRODataColumns, verifiedRODataColumn)
eg.Go(func() error {
// Compute the subnet index based on the column index.
subnet := peerdas.ComputeSubnetForDataColumnSidecar(roSidecar.Index)
if err := vs.P2P.BroadcastDataColumnSidecar(subnet, verifiedRODataColumn); err != nil {
return errors.Wrap(err, "broadcast data column")
}
return nil
})
func (vs *Server) broadcastAndReceiveDataColumns(ctx context.Context, roSidecars []blocks.RODataColumn) error {
// We built this block ourselves, so we can upgrade the read only data column sidecar into a verified one.
verifiedSidecars := make([]blocks.VerifiedRODataColumn, 0, len(roSidecars))
for _, sidecar := range roSidecars {
verifiedSidecar := blocks.NewVerifiedRODataColumn(sidecar)
verifiedSidecars = append(verifiedSidecars, verifiedSidecar)
}
if err := vs.DataColumnReceiver.ReceiveDataColumns(verifiedRODataColumns); err != nil {
return errors.Wrap(err, "receive data column")
// Broadcast sidecars (non blocking).
if err := vs.P2P.BroadcastDataColumnSidecars(ctx, verifiedSidecars); err != nil {
return errors.Wrap(err, "broadcast data column sidecars")
}
for _, verifiedRODataColumn := range verifiedRODataColumns {
vs.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.DataColumnSidecarReceived,
Data: &operation.DataColumnSidecarReceivedData{DataColumn: &verifiedRODataColumn}, // #nosec G601
})
}
if err := eg.Wait(); err != nil {
return errors.Wrap(err, "wait for data columns to be broadcasted")
// In parallel, receive sidecars.
if err := vs.DataColumnReceiver.ReceiveDataColumns(verifiedSidecars); err != nil {
return errors.Wrap(err, "receive data columns")
}
return nil

View File

@@ -267,16 +267,9 @@ func (s *Service) broadcastAndReceiveUnseenDataColumnSidecars(
unseenIndices[sidecar.Index] = true
}
// Broadcast all the data column sidecars we reconstructed but did not see via gossip.
for _, sidecar := range unseenSidecars {
// Compute the subnet for this data column sidecar.
subnet := peerdas.ComputeSubnetForDataColumnSidecar(sidecar.Index)
// Broadcast the data column sidecar.
if err := s.cfg.p2p.BroadcastDataColumnSidecar(subnet, sidecar); err != nil {
// Don't return on error on broadcast failure, just log it.
log.WithError(err).Error("Broadcast data column")
}
// Broadcast all the data column sidecars we reconstructed but did not see via gossip (non blocking).
if err := s.cfg.p2p.BroadcastDataColumnSidecars(ctx, unseenSidecars); err != nil {
return nil, errors.Wrap(err, "broadcast data column sidecars")
}
// Receive data column sidecars.

View File

@@ -0,0 +1,2 @@
### Changed
- Aggregate logs when broadcasting data column sidecars (one per root instead of one per sidecar)