Compare commits

...

1 Commits

Author SHA1 Message Date
nisdas
7cbb59cd3e Enable Batch Publishing 2025-03-14 21:34:15 +08:00
6 changed files with 69 additions and 19 deletions

View File

@@ -7,6 +7,7 @@ import (
"reflect"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/altair"
@@ -27,7 +28,7 @@ var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub top
// Broadcast a message to the p2p network, the message is assumed to be
// broadcasted to the current fork.
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
func (s *Service) Broadcast(ctx context.Context, msg proto.Message, pubOpts ...pubsub.PubOpt) error {
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
defer span.End()
@@ -51,7 +52,7 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
if !ok {
return errors.Errorf("message of %T does not support marshaller interface", msg)
}
return s.broadcastObject(ctx, castMsg, fmt.Sprintf(topic, forkDigest))
return s.broadcastObject(ctx, castMsg, fmt.Sprintf(topic, forkDigest), pubOpts...)
}
// BroadcastAttestation broadcasts an attestation to the p2p network, the message is assumed to be
@@ -209,7 +210,7 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
// BroadcastBlob broadcasts a blob to the p2p network, the message is assumed to be
// broadcasted to the current fork and to the input subnet.
func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error {
func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar, pubOpts ...pubsub.PubOpt) error {
ctx, span := trace.StartSpan(ctx, "p2p.BroadcastBlob")
defer span.End()
if blob == nil {
@@ -223,12 +224,12 @@ func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.
}
// Non-blocking broadcast, with attempts to discover a subnet peer if none available.
go s.internalBroadcastBlob(ctx, subnet, blob, forkDigest)
go s.internalBroadcastBlob(ctx, subnet, blob, forkDigest, pubOpts...)
return nil
}
func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [4]byte) {
func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [4]byte, pubOpts ...pubsub.PubOpt) {
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastBlob")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
@@ -262,14 +263,14 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob
}
}
if err := s.broadcastObject(ctx, blobSidecar, blobSubnetToTopic(subnet, forkDigest)); err != nil {
if err := s.broadcastObject(ctx, blobSidecar, blobSubnetToTopic(subnet, forkDigest), pubOpts...); err != nil {
log.WithError(err).Error("Failed to broadcast blob sidecar")
tracing.AnnotateError(span, err)
}
}
// method to broadcast messages to other peers in our gossip mesh.
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error {
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string, pubOpts ...pubsub.PubOpt) error {
ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject")
defer span.End()
@@ -289,7 +290,7 @@ func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic
iid := int64(id)
span = trace.AddMessageSendEvent(span, iid, messageLen /*uncompressed*/, messageLen /*compressed*/)
}
if err := s.PublishToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil {
if err := s.PublishToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), buf.Bytes(), pubOpts...); err != nil {
err := errors.Wrap(err, "could not publish message")
tracing.AnnotateError(span, err)
return err

View File

@@ -32,10 +32,10 @@ type P2P interface {
// Broadcaster broadcasts messages to peers over the p2p pubsub protocol.
type Broadcaster interface {
Broadcast(context.Context, proto.Message) error
Broadcast(context.Context, proto.Message, ...pubsub.PubOpt) error
BroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att) error
BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar, pubOpts ...pubsub.PubOpt) error
}
// SetStreamHandler configures p2p to handle streams of a certain topic ID.

View File

@@ -58,6 +58,7 @@ go_library(
"//beacon-chain/operations/synccommittee:go_default_library",
"//beacon-chain/operations/voluntaryexits:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/rpc/core:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
@@ -94,6 +95,8 @@ go_library(
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_golang_protobuf//ptypes/empty",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",

View File

@@ -1,6 +1,7 @@
package validator
import (
"bytes"
"context"
"fmt"
"strings"
@@ -10,7 +11,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
emptypb "github.com/golang/protobuf/ptypes/empty"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
builderapi "github.com/prysmaticlabs/prysm/v5/api/client/builder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/builder"
@@ -21,12 +25,15 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/kv"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
"github.com/prysmaticlabs/prysm/v5/network/forks"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
@@ -297,21 +304,24 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not hash tree root: %v", err)
}
bOpt, err := vs.createBatchOption(block, sidecars)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not create option: %v", err)
}
var wg sync.WaitGroup
errChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
if err := vs.broadcastReceiveBlock(ctx, block, root); err != nil {
if err := vs.broadcastReceiveBlock(ctx, block, root, bOpt); err != nil {
errChan <- errors.Wrap(err, "broadcast/receive block failed")
return
}
errChan <- nil
}()
if err := vs.broadcastAndReceiveBlobs(ctx, sidecars, root); err != nil {
if err := vs.broadcastAndReceiveBlobs(ctx, sidecars, root, bOpt); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive blobs: %v", err)
}
@@ -323,6 +333,39 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
func (vs *Server) createBatchOption(block interfaces.SignedBeaconBlock, blobs []*ethpb.BlobSidecar) (pubsub.PubOpt, error) {
sszEnc := &encoder.SszNetworkEncoder{}
pblk, err := block.Proto()
if err != nil {
return nil, err
}
buf := bytes.NewBuffer([]byte{})
if _, err := sszEnc.EncodeGossip(buf, pblk.(ssz.Marshaler)); err != nil {
return nil, err
}
genValRoot := vs.GenesisFetcher.GenesisValidatorsRoot()
currDigest, err := forks.CreateForkDigest(vs.TimeFetcher.GenesisTime(), genValRoot[:])
if err != nil {
return nil, err
}
topicStr := fmt.Sprintf(p2p.BlockSubnetTopicFormat, currDigest) + sszEnc.ProtocolSuffix()
blockID := p2p.MsgID(genValRoot[:], &pubsub_pb.Message{Data: buf.Bytes(), Topic: &topicStr})
bm := pubsub.NewBatchMessage()
bm.AddMessage(blockID)
for i, b := range blobs {
blobTopicStr := fmt.Sprintf(p2p.BlobSubnetTopicFormat, currDigest, i) + sszEnc.ProtocolSuffix()
buf = bytes.NewBuffer([]byte{})
if _, err := sszEnc.EncodeGossip(buf, b); err != nil {
return nil, err
}
blobID := p2p.MsgID(genValRoot[:], &pubsub_pb.Message{Data: buf.Bytes(), Topic: &blobTopicStr})
bm.AddMessage(blobID)
}
return pubsub.WithBatchPublishing(bm), nil
}
// handleBlindedBlock processes blinded beacon blocks.
func (vs *Server) handleBlindedBlock(ctx context.Context, block interfaces.SignedBeaconBlock) (interfaces.SignedBeaconBlock, []*ethpb.BlobSidecar, error) {
if block.Version() < version.Bellatrix {
@@ -363,12 +406,12 @@ func (vs *Server) blobSidecarsFromUnblindedBlock(block interfaces.SignedBeaconBl
}
// broadcastReceiveBlock broadcasts a block and handles its reception.
func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.SignedBeaconBlock, root [32]byte) error {
func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.SignedBeaconBlock, root [32]byte, pubOpts ...pubsub.PubOpt) error {
protoBlock, err := block.Proto()
if err != nil {
return errors.Wrap(err, "protobuf conversion failed")
}
if err := vs.P2P.Broadcast(ctx, protoBlock); err != nil {
if err := vs.P2P.Broadcast(ctx, protoBlock, pubOpts...); err != nil {
return errors.Wrap(err, "broadcast failed")
}
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
@@ -379,7 +422,7 @@ func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.Si
}
// broadcastAndReceiveBlobs handles the broadcasting and reception of blob sidecars.
func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethpb.BlobSidecar, root [32]byte) error {
func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethpb.BlobSidecar, root [32]byte, pubOpts ...pubsub.PubOpt) error {
eg, eCtx := errgroup.WithContext(ctx)
for i, sc := range sidecars {
// Copy the iteration instance to a local variable to give each go-routine its own copy to play with.
@@ -387,7 +430,7 @@ func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethp
subIdx := i
sCar := sc
eg.Go(func() error {
if err := vs.P2P.BroadcastBlob(eCtx, uint64(subIdx), sCar); err != nil {
if err := vs.P2P.BroadcastBlob(eCtx, uint64(subIdx), sCar, pubOpts...); err != nil {
return errors.Wrap(err, "broadcast blob failed")
}
readOnlySc, err := blocks.NewROBlobWithRoot(sCar, root)

View File

@@ -1987,8 +1987,9 @@ def prysm_deps():
name = "com_github_libp2p_go_libp2p_pubsub",
build_file_proto_mode = "disable_global",
importpath = "github.com/libp2p/go-libp2p-pubsub",
sum = "h1:RmFQ2XAy3zQtbt2iNPy7Tt0/3fwTnHpCQSSnmGnt1Ps=",
version = "v0.13.0",
replace = "github.com/nisdas/go-libp2p-pubsub",
sum = "h1:s0BSHd/oXPBk16u84LYrXmmRXzoK2GqXqr4XD7dzHSU=",
version = "v0.3.3-0.20250312092335-b8aab45386f7",
)
go_repository(
name = "com_github_libp2p_go_libp2p_testing",

2
go.mod
View File

@@ -290,3 +290,5 @@ require (
)
replace github.com/json-iterator/go => github.com/prestonvanloon/go v1.1.7-0.20190722034630-4f2e55fcf87b
replace github.com/libp2p/go-libp2p-pubsub => github.com/nisdas/go-libp2p-pubsub v0.3.3-0.20250312092335-b8aab45386f7