p2p: batch publish data column sidecars

a relatively small change to optimize network send order.

Without this, network writes tend to prioritize sending data for one column
to all peers before sending data for later columns (e.g for two columns
and 4 peers per column it would send A,A,A,A,B,B,B,B). With batch
publishing we can change the write order to round robin across columns
(e.g. A,B,A,B,A,B,A,B).

In cases where the process is sending at a rate over the network limit,
this approach allows at least some copies of the column to propagate
through the network. In early simulations with bandwidth limits of
50mbps for the publisher, this improved dissemination by ~20-30%.
This commit is contained in:
Marco Munizaga
2025-12-10 09:28:59 -08:00
parent a3210157e2
commit ae5911147e
2 changed files with 46 additions and 3 deletions

View File

@@ -22,6 +22,7 @@ import (
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/time/slots"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/sirupsen/logrus"
@@ -374,6 +375,7 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
logLevel := logrus.GetLevel()
slotPerRoot := make(map[[fieldparams.RootLength]byte]primitives.Slot, 1)
var messageBatch pubsub.MessageBatch
for _, sidecar := range sidecars {
slotPerRoot[sidecar.BlockRoot()] = sidecar.Slot()
@@ -399,7 +401,7 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
}
// Broadcast the data column sidecar to the network.
if err := s.broadcastObject(ctx, sidecar, topic); err != nil {
if err := s.batchObject(ctx, &messageBatch, sidecar, topic); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Error("Cannot broadcast data column sidecar")
return
@@ -416,8 +418,9 @@ func (s *Service) broadcastDataColumnSidecars(ctx context.Context, forkDigest [f
})
}
// Wait for all broadcasts to finish.
// Wait for all messages to be added to the batch.
wg.Wait()
s.pubsub.PublishBatch(&messageBatch)
// The rest of this function is only for debug logging purposes.
if logLevel < logrus.DebugLevel {
@@ -506,6 +509,18 @@ func (s *Service) findPeersIfNeeded(
// method to broadcast messages to other peers in our gossip mesh.
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error {
return s.broadcastOrBatchObject(ctx, nil, obj, topic)
}
// method to add an object to a message batch for a future broadcast.
func (s *Service) batchObject(ctx context.Context, batch *pubsub.MessageBatch, obj ssz.Marshaler, topic string) error {
return s.broadcastOrBatchObject(ctx, batch, obj, topic)
}
// method to broadcast or batch messages to other peers in our gossip mesh. If
// batch is non-nil the message is added to the batch WITHOUT publishing. The
// caller MUST publish the batch after all messages have been added to the batch
func (s *Service) broadcastOrBatchObject(ctx context.Context, batch *pubsub.MessageBatch, obj ssz.Marshaler, topic string) error {
ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject")
defer span.End()
@@ -525,7 +540,14 @@ func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic
iid := int64(id)
span = trace.AddMessageSendEvent(span, iid, messageLen /*uncompressed*/, messageLen /*compressed*/)
}
if err := s.PublishToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil {
var err error
if batch != nil {
err = s.addToBatch(ctx, batch, topic, buf.Bytes())
} else {
err = s.PublishToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), buf.Bytes())
}
if err != nil {
err := errors.Wrap(err, "could not publish message")
tracing.AnnotateError(span, err)
return err

View File

@@ -99,6 +99,27 @@ func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte,
}
}
// addToBatch joins (if necessary) a topic and adds the message to a message batch.
func (s *Service) addToBatch(ctx context.Context, batch *pubsub.MessageBatch, topic string, data []byte, opts ...pubsub.PubOpt) error {
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return err
}
// Wait for at least 1 peer to be available to receive the published message.
for {
if len(topicHandle.ListPeers()) > 0 || flags.Get().MinimumSyncPeers == 0 {
return topicHandle.AddToBatch(ctx, batch, data, opts...)
}
select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "unable to find requisite number of peers for topic %s, 0 peers found to publish to", topic)
default:
time.Sleep(100 * time.Millisecond)
}
}
}
// SubscribeToTopic joins (if necessary) and subscribes to PubSub topic.
func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error) {
s.awaitStateInitialized() // Genesis time and genesis validators root are required to subscribe.