Files
prysm/beacon-chain/p2p/pubsub.go
Aarsh Shah cc4510bb77 p2p: batch publish data column sidecars (#16183)
**What type of PR is this?**

Feature

What does this PR do? Why is it needed?

This PR takes @MarcoPolo 's PR at
https://github.com/OffchainLabs/prysm/pull/16130 to completion with
tests.

The description on his PR:

"""
a relatively small change to optimize network send order.

Without this, network writes tend to prioritize sending data for one
column to all peers before sending data for later columns (e.g for two
columns and 4 peers per column it would send A,A,A,A,B,B,B,B). With
batch publishing we can change the write order to round robin across
columns (e.g. A,B,A,B,A,B,A,B).

In cases where the process is sending at a rate over the network limit,
this approach allows at least some copies of the column to propagate
through the network. In early simulations with bandwidth limits of
50mbps for the publisher, this improved dissemination by ~20-30%.
"""
See the issue for some more context.

**Which issues(s) does this PR fix?**

Fixes https://github.com/OffchainLabs/prysm/issues/16129

Other notes for review

Acknowledgements

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [ ] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [ ] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).

---------

Co-authored-by: Marco Munizaga <git@marcopolo.io>
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
Co-authored-by: kasey <489222+kasey@users.noreply.github.com>
Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
2026-01-05 22:02:06 +00:00

273 lines
9.5 KiB
Go

package p2p
import (
"context"
"encoding/hex"
"fmt"
"strings"
"time"
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v7/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
pbrpc "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
)
const (
// overlay parameters
gossipSubD = 8 // topic stable mesh target count
gossipSubDlo = 6 // topic stable mesh low watermark
gossipSubDhi = 12 // topic stable mesh high watermark
// gossip parameters
gossipSubMcacheLen = 6 // number of windows to retain full messages in cache for `IWANT` responses
gossipSubMcacheGossip = 3 // number of windows to gossip about
gossipSubSeenTTL = 768 // number of seconds to retain message IDs ( 2 epochs)
// fanout ttl
gossipSubFanoutTTL = 60000000000 // TTL for fanout maps for topics we are not subscribed to but have published to, in nano seconds
// heartbeat interval
gossipSubHeartbeatInterval = 700 * time.Millisecond // frequency of heartbeat, milliseconds
// misc
rSubD = 8 // random gossip target
)
var ErrInvalidTopic = errors.New("invalid topic format")
// Specifies the fixed size context length.
const digestLength = 4
// Specifies the prefix for any pubsub topic.
const gossipTopicPrefix = "/eth2/"
// JoinTopic will join PubSub topic, if not already joined.
func (s *Service) JoinTopic(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error) {
s.joinedTopicsLock.Lock()
defer s.joinedTopicsLock.Unlock()
if _, ok := s.joinedTopics[topic]; !ok {
topicHandle, err := s.pubsub.Join(topic, opts...)
if err != nil {
return nil, err
}
s.joinedTopics[topic] = topicHandle
}
return s.joinedTopics[topic], nil
}
// LeaveTopic closes topic and removes corresponding handler from list of joined topics.
// This method will return error if there are outstanding event handlers or subscriptions.
func (s *Service) LeaveTopic(topic string) error {
s.joinedTopicsLock.Lock()
defer s.joinedTopicsLock.Unlock()
if t, ok := s.joinedTopics[topic]; ok {
if err := t.Close(); err != nil {
return err
}
delete(s.joinedTopics, topic)
}
return nil
}
// PublishToTopic joins (if necessary) and publishes a message to a PubSub topic.
func (s *Service) PublishToTopic(ctx context.Context, topic string, data []byte, opts ...pubsub.PubOpt) error {
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return err
}
// Wait for at least 1 peer to be available to receive the published message.
for {
if len(topicHandle.ListPeers()) > 0 || flags.Get().MinimumSyncPeers == 0 {
return topicHandle.Publish(ctx, data, opts...)
}
select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "unable to find requisite number of peers for topic %s, 0 peers found to publish to", topic)
default:
time.Sleep(100 * time.Millisecond)
}
}
}
// addToBatch joins (if necessary) a topic and adds the message to a message batch.
func (s *Service) addToBatch(ctx context.Context, batch *pubsub.MessageBatch, topic string, data []byte, opts ...pubsub.PubOpt) error {
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return fmt.Errorf("joining topic: %w", err)
}
// Wait for at least 1 peer to be available to receive the published message.
for {
if flags.Get().MinimumSyncPeers == 0 || len(topicHandle.ListPeers()) > 0 {
return topicHandle.AddToBatch(ctx, batch, data, opts...)
}
select {
case <-ctx.Done():
return errors.Wrapf(ctx.Err(), "unable to find requisite number of peers for topic %s, 0 peers found to publish to", topic)
case <-time.After(100 * time.Millisecond):
// reenter the for loop after 100ms
}
}
}
// SubscribeToTopic joins (if necessary) and subscribes to PubSub topic.
func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub.Subscription, error) {
s.awaitStateInitialized() // Genesis time and genesis validators root are required to subscribe.
topicHandle, err := s.JoinTopic(topic)
if err != nil {
return nil, err
}
scoringParams, err := s.topicScoreParams(topic)
if err != nil {
return nil, err
}
if scoringParams != nil {
if err := topicHandle.SetScoreParams(scoringParams); err != nil {
return nil, err
}
logGossipParameters(topic, scoringParams)
}
return topicHandle.Subscribe(opts...)
}
// peerInspector will scrape all the relevant scoring data and add it to our
// peer handler.
func (s *Service) peerInspector(peerMap map[peer.ID]*pubsub.PeerScoreSnapshot) {
// Iterate through all the connected peers and through any of their
// relevant topics.
for pid, snap := range peerMap {
s.peers.Scorers().GossipScorer().SetGossipData(pid, snap.Score,
snap.BehaviourPenalty, convertTopicScores(snap.Topics))
}
}
// pubsubOptions creates a list of options to configure our router with.
func (s *Service) pubsubOptions() []pubsub.Option {
filt := pubsub.NewAllowlistSubscriptionFilter(s.allTopicStrings()...)
filt = pubsub.WrapLimitSubscriptionFilter(filt, pubsubSubscriptionRequestLimit)
psOpts := []pubsub.Option{
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
pubsub.WithNoAuthor(),
pubsub.WithMessageIdFn(func(pmsg *pubsubpb.Message) string {
return MsgID(s.genesisValidatorsRoot, pmsg)
}),
pubsub.WithSubscriptionFilter(filt),
pubsub.WithPeerOutboundQueueSize(int(s.cfg.QueueSize)),
pubsub.WithMaxMessageSize(int(MaxMessageSize())), // lint:ignore uintcast -- Max Message Size is a config value and is naturally bounded by networking limitations.
pubsub.WithValidateQueueSize(int(s.cfg.QueueSize)),
pubsub.WithPeerScore(peerScoringParams(s.cfg.IPColocationWhitelist)),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute),
pubsub.WithGossipSubParams(pubsubGossipParam()),
pubsub.WithRawTracer(gossipTracer{host: s.host}),
}
if len(s.cfg.StaticPeers) > 0 {
directPeersAddrInfos, err := parsePeersEnr(s.cfg.StaticPeers)
if err != nil {
log.WithError(err).Error("Could not add direct peer option")
return psOpts
}
psOpts = append(psOpts, pubsub.WithDirectPeers(directPeersAddrInfos))
}
return psOpts
}
// parsePeersEnr takes a list of raw ENRs and converts them into a list of AddrInfos.
func parsePeersEnr(peers []string) ([]peer.AddrInfo, error) {
addrs, err := PeersFromStringAddrs(peers)
if err != nil {
return nil, fmt.Errorf("cannot convert peers raw ENRs into multiaddresses: %w", err)
}
if len(addrs) == 0 {
return nil, fmt.Errorf("converting peers raw ENRs into multiaddresses resulted in an empty list")
}
directAddrInfos, err := peer.AddrInfosFromP2pAddrs(addrs...)
if err != nil {
return nil, fmt.Errorf("cannot convert peers multiaddresses into AddrInfos: %w", err)
}
return directAddrInfos, nil
}
// creates a custom gossipsub parameter set.
func pubsubGossipParam() pubsub.GossipSubParams {
gParams := pubsub.DefaultGossipSubParams()
gParams.Dlo = gossipSubDlo
gParams.D = gossipSubD
gParams.Dhi = gossipSubDhi
gParams.HeartbeatInterval = gossipSubHeartbeatInterval
gParams.HistoryLength = gossipSubMcacheLen
gParams.HistoryGossip = gossipSubMcacheGossip
return gParams
}
// We have to unfortunately set this globally in order
// to configure our message id time-cache rather than instantiating
// it with a router instance.
func setPubSubParameters() {
seenTtl := 2 * time.Second * time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
pubsub.TimeCacheDuration = seenTtl
}
// convert from libp2p's internal schema to a compatible prysm protobuf format.
func convertTopicScores(topicMap map[string]*pubsub.TopicScoreSnapshot) map[string]*pbrpc.TopicScoreSnapshot {
newMap := make(map[string]*pbrpc.TopicScoreSnapshot, len(topicMap))
for t, s := range topicMap {
newMap[t] = &pbrpc.TopicScoreSnapshot{
TimeInMesh: uint64(s.TimeInMesh.Milliseconds()),
FirstMessageDeliveries: float32(s.FirstMessageDeliveries),
MeshMessageDeliveries: float32(s.MeshMessageDeliveries),
InvalidMessageDeliveries: float32(s.InvalidMessageDeliveries),
}
}
return newMap
}
// ExtractGossipDigest extracts the relevant fork digest from the gossip topic.
// Topics are in the form of /eth2/{fork-digest}/{topic} and this method extracts the
// fork digest from the topic string to a 4 byte array.
func ExtractGossipDigest(topic string) ([4]byte, error) {
// Ensure the topic prefix is correct.
if len(topic) < len(gossipTopicPrefix)+1 || topic[:len(gossipTopicPrefix)] != gossipTopicPrefix {
return [4]byte{}, ErrInvalidTopic
}
start := len(gossipTopicPrefix)
end := strings.Index(topic[start:], "/")
if end == -1 { // Ensure a topic suffix exists.
return [4]byte{}, ErrInvalidTopic
}
end += start
strDigest := topic[start:end]
digest, err := hex.DecodeString(strDigest)
if err != nil {
return [4]byte{}, err
}
if len(digest) != digestLength {
return [4]byte{}, errors.Errorf("invalid digest length wanted %d but got %d", digestLength, len(digest))
}
return bytesutil.ToBytes4(digest), nil
}
// MaxMessageSize returns the maximum allowed compressed message size.
//
// Spec pseudocode definition:
// def max_message_size() -> uint64:
//
// # Allow 1024 bytes for framing and encoding overhead but at least 1MiB in case MAX_PAYLOAD_SIZE is small.
// return max(max_compressed_len(MAX_PAYLOAD_SIZE) + 1024, 1024 * 1024)
func MaxMessageSize() uint64 {
return max(encoder.MaxCompressedLen(params.BeaconConfig().MaxPayloadSize)+1024, 1024*1024)
}