mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
Compare commits
1 Commits
hashtree-i
...
runBatchPu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7cbb59cd3e |
@@ -7,6 +7,7 @@ import (
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/pkg/errors"
|
||||
ssz "github.com/prysmaticlabs/fastssz"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/altair"
|
||||
@@ -27,7 +28,7 @@ var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub top
|
||||
|
||||
// Broadcast a message to the p2p network, the message is assumed to be
|
||||
// broadcasted to the current fork.
|
||||
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
|
||||
func (s *Service) Broadcast(ctx context.Context, msg proto.Message, pubOpts ...pubsub.PubOpt) error {
|
||||
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
|
||||
defer span.End()
|
||||
|
||||
@@ -51,7 +52,7 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
|
||||
if !ok {
|
||||
return errors.Errorf("message of %T does not support marshaller interface", msg)
|
||||
}
|
||||
return s.broadcastObject(ctx, castMsg, fmt.Sprintf(topic, forkDigest))
|
||||
return s.broadcastObject(ctx, castMsg, fmt.Sprintf(topic, forkDigest), pubOpts...)
|
||||
}
|
||||
|
||||
// BroadcastAttestation broadcasts an attestation to the p2p network, the message is assumed to be
|
||||
@@ -209,7 +210,7 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
|
||||
|
||||
// BroadcastBlob broadcasts a blob to the p2p network, the message is assumed to be
|
||||
// broadcasted to the current fork and to the input subnet.
|
||||
func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error {
|
||||
func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar, pubOpts ...pubsub.PubOpt) error {
|
||||
ctx, span := trace.StartSpan(ctx, "p2p.BroadcastBlob")
|
||||
defer span.End()
|
||||
if blob == nil {
|
||||
@@ -223,12 +224,12 @@ func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.
|
||||
}
|
||||
|
||||
// Non-blocking broadcast, with attempts to discover a subnet peer if none available.
|
||||
go s.internalBroadcastBlob(ctx, subnet, blob, forkDigest)
|
||||
go s.internalBroadcastBlob(ctx, subnet, blob, forkDigest, pubOpts...)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [4]byte) {
|
||||
func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [4]byte, pubOpts ...pubsub.PubOpt) {
|
||||
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastBlob")
|
||||
defer span.End()
|
||||
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
|
||||
@@ -262,14 +263,14 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.broadcastObject(ctx, blobSidecar, blobSubnetToTopic(subnet, forkDigest)); err != nil {
|
||||
if err := s.broadcastObject(ctx, blobSidecar, blobSubnetToTopic(subnet, forkDigest), pubOpts...); err != nil {
|
||||
log.WithError(err).Error("Failed to broadcast blob sidecar")
|
||||
tracing.AnnotateError(span, err)
|
||||
}
|
||||
}
|
||||
|
||||
// method to broadcast messages to other peers in our gossip mesh.
|
||||
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error {
|
||||
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string, pubOpts ...pubsub.PubOpt) error {
|
||||
ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject")
|
||||
defer span.End()
|
||||
|
||||
@@ -289,7 +290,7 @@ func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic
|
||||
iid := int64(id)
|
||||
span = trace.AddMessageSendEvent(span, iid, messageLen /*uncompressed*/, messageLen /*compressed*/)
|
||||
}
|
||||
if err := s.PublishToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil {
|
||||
if err := s.PublishToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), buf.Bytes(), pubOpts...); err != nil {
|
||||
err := errors.Wrap(err, "could not publish message")
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
|
||||
@@ -32,10 +32,10 @@ type P2P interface {
|
||||
|
||||
// Broadcaster broadcasts messages to peers over the p2p pubsub protocol.
|
||||
type Broadcaster interface {
|
||||
Broadcast(context.Context, proto.Message) error
|
||||
Broadcast(context.Context, proto.Message, ...pubsub.PubOpt) error
|
||||
BroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att) error
|
||||
BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error
|
||||
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
|
||||
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar, pubOpts ...pubsub.PubOpt) error
|
||||
}
|
||||
|
||||
// SetStreamHandler configures p2p to handle streams of a certain topic ID.
|
||||
|
||||
@@ -58,6 +58,7 @@ go_library(
|
||||
"//beacon-chain/operations/synccommittee:go_default_library",
|
||||
"//beacon-chain/operations/voluntaryexits:go_default_library",
|
||||
"//beacon-chain/p2p:go_default_library",
|
||||
"//beacon-chain/p2p/encoder:go_default_library",
|
||||
"//beacon-chain/rpc/core:go_default_library",
|
||||
"//beacon-chain/startup:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
@@ -94,6 +95,8 @@ go_library(
|
||||
"@com_github_ethereum_go_ethereum//common:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
|
||||
"@com_github_golang_protobuf//ptypes/empty",
|
||||
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
|
||||
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus:go_default_library",
|
||||
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package validator
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
@@ -10,7 +11,10 @@ import (
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
emptypb "github.com/golang/protobuf/ptypes/empty"
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
"github.com/pkg/errors"
|
||||
ssz "github.com/prysmaticlabs/fastssz"
|
||||
builderapi "github.com/prysmaticlabs/prysm/v5/api/client/builder"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/builder"
|
||||
@@ -21,12 +25,15 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/kv"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
|
||||
"github.com/prysmaticlabs/prysm/v5/network/forks"
|
||||
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime/version"
|
||||
@@ -297,21 +304,24 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "Could not hash tree root: %v", err)
|
||||
}
|
||||
|
||||
bOpt, err := vs.createBatchOption(block, sidecars)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "Could not create option: %v", err)
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
errChan := make(chan error, 1)
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if err := vs.broadcastReceiveBlock(ctx, block, root); err != nil {
|
||||
if err := vs.broadcastReceiveBlock(ctx, block, root, bOpt); err != nil {
|
||||
errChan <- errors.Wrap(err, "broadcast/receive block failed")
|
||||
return
|
||||
}
|
||||
errChan <- nil
|
||||
}()
|
||||
|
||||
if err := vs.broadcastAndReceiveBlobs(ctx, sidecars, root); err != nil {
|
||||
if err := vs.broadcastAndReceiveBlobs(ctx, sidecars, root, bOpt); err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive blobs: %v", err)
|
||||
}
|
||||
|
||||
@@ -323,6 +333,39 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
|
||||
return ðpb.ProposeResponse{BlockRoot: root[:]}, nil
|
||||
}
|
||||
|
||||
func (vs *Server) createBatchOption(block interfaces.SignedBeaconBlock, blobs []*ethpb.BlobSidecar) (pubsub.PubOpt, error) {
|
||||
sszEnc := &encoder.SszNetworkEncoder{}
|
||||
pblk, err := block.Proto()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
buf := bytes.NewBuffer([]byte{})
|
||||
if _, err := sszEnc.EncodeGossip(buf, pblk.(ssz.Marshaler)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
genValRoot := vs.GenesisFetcher.GenesisValidatorsRoot()
|
||||
currDigest, err := forks.CreateForkDigest(vs.TimeFetcher.GenesisTime(), genValRoot[:])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
topicStr := fmt.Sprintf(p2p.BlockSubnetTopicFormat, currDigest) + sszEnc.ProtocolSuffix()
|
||||
blockID := p2p.MsgID(genValRoot[:], &pubsub_pb.Message{Data: buf.Bytes(), Topic: &topicStr})
|
||||
bm := pubsub.NewBatchMessage()
|
||||
bm.AddMessage(blockID)
|
||||
for i, b := range blobs {
|
||||
blobTopicStr := fmt.Sprintf(p2p.BlobSubnetTopicFormat, currDigest, i) + sszEnc.ProtocolSuffix()
|
||||
buf = bytes.NewBuffer([]byte{})
|
||||
if _, err := sszEnc.EncodeGossip(buf, b); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
blobID := p2p.MsgID(genValRoot[:], &pubsub_pb.Message{Data: buf.Bytes(), Topic: &blobTopicStr})
|
||||
bm.AddMessage(blobID)
|
||||
}
|
||||
return pubsub.WithBatchPublishing(bm), nil
|
||||
|
||||
}
|
||||
|
||||
// handleBlindedBlock processes blinded beacon blocks.
|
||||
func (vs *Server) handleBlindedBlock(ctx context.Context, block interfaces.SignedBeaconBlock) (interfaces.SignedBeaconBlock, []*ethpb.BlobSidecar, error) {
|
||||
if block.Version() < version.Bellatrix {
|
||||
@@ -363,12 +406,12 @@ func (vs *Server) blobSidecarsFromUnblindedBlock(block interfaces.SignedBeaconBl
|
||||
}
|
||||
|
||||
// broadcastReceiveBlock broadcasts a block and handles its reception.
|
||||
func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.SignedBeaconBlock, root [32]byte) error {
|
||||
func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.SignedBeaconBlock, root [32]byte, pubOpts ...pubsub.PubOpt) error {
|
||||
protoBlock, err := block.Proto()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "protobuf conversion failed")
|
||||
}
|
||||
if err := vs.P2P.Broadcast(ctx, protoBlock); err != nil {
|
||||
if err := vs.P2P.Broadcast(ctx, protoBlock, pubOpts...); err != nil {
|
||||
return errors.Wrap(err, "broadcast failed")
|
||||
}
|
||||
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
|
||||
@@ -379,7 +422,7 @@ func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.Si
|
||||
}
|
||||
|
||||
// broadcastAndReceiveBlobs handles the broadcasting and reception of blob sidecars.
|
||||
func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethpb.BlobSidecar, root [32]byte) error {
|
||||
func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethpb.BlobSidecar, root [32]byte, pubOpts ...pubsub.PubOpt) error {
|
||||
eg, eCtx := errgroup.WithContext(ctx)
|
||||
for i, sc := range sidecars {
|
||||
// Copy the iteration instance to a local variable to give each go-routine its own copy to play with.
|
||||
@@ -387,7 +430,7 @@ func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethp
|
||||
subIdx := i
|
||||
sCar := sc
|
||||
eg.Go(func() error {
|
||||
if err := vs.P2P.BroadcastBlob(eCtx, uint64(subIdx), sCar); err != nil {
|
||||
if err := vs.P2P.BroadcastBlob(eCtx, uint64(subIdx), sCar, pubOpts...); err != nil {
|
||||
return errors.Wrap(err, "broadcast blob failed")
|
||||
}
|
||||
readOnlySc, err := blocks.NewROBlobWithRoot(sCar, root)
|
||||
|
||||
5
deps.bzl
5
deps.bzl
@@ -1987,8 +1987,9 @@ def prysm_deps():
|
||||
name = "com_github_libp2p_go_libp2p_pubsub",
|
||||
build_file_proto_mode = "disable_global",
|
||||
importpath = "github.com/libp2p/go-libp2p-pubsub",
|
||||
sum = "h1:RmFQ2XAy3zQtbt2iNPy7Tt0/3fwTnHpCQSSnmGnt1Ps=",
|
||||
version = "v0.13.0",
|
||||
replace = "github.com/nisdas/go-libp2p-pubsub",
|
||||
sum = "h1:s0BSHd/oXPBk16u84LYrXmmRXzoK2GqXqr4XD7dzHSU=",
|
||||
version = "v0.3.3-0.20250312092335-b8aab45386f7",
|
||||
)
|
||||
go_repository(
|
||||
name = "com_github_libp2p_go_libp2p_testing",
|
||||
|
||||
Reference in New Issue
Block a user