Compare commits

...

3 Commits

Author SHA1 Message Date
nisdas
7cbb59cd3e Enable Batch Publishing 2025-03-14 21:34:15 +08:00
james-prysm
7cef3b0491 adding omitempty to request object (#15031) 2025-03-11 13:43:00 +00:00
Radosław Kapka
15462844f9 Remove error from signatures of UnaggregatedAttestations and pruneAttsFromPool (#15028) 2025-03-10 22:28:51 +00:00
25 changed files with 104 additions and 91 deletions

View File

@@ -78,8 +78,8 @@ type GetBlockHeaderResponse struct {
}
type GetValidatorsRequest struct {
Ids []string `json:"ids"`
Statuses []string `json:"statuses"`
Ids []string `json:"ids,omitempty"`
Statuses []string `json:"statuses,omitempty"`
}
type GetValidatorsResponse struct {

View File

@@ -105,9 +105,7 @@ func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, args *fcuCo
go firePayloadAttributesEvent(ctx, s.cfg.StateNotifier.StateFeed(), s.CurrentSlot()+1)
// Only need to prune attestations from pool if the head has changed.
if err := s.pruneAttsFromPool(s.ctx, args.headState, args.headBlock); err != nil {
log.WithError(err).Error("could not prune attestations from pool")
}
s.pruneAttsFromPool(s.ctx, args.headState, args.headBlock)
return nil
}

View File

@@ -423,13 +423,12 @@ func (s *Service) savePostStateInfo(ctx context.Context, r [32]byte, b interface
// pruneAttsFromPool removes these attestations from the attestation pool
// which are covered by attestations from the received block.
func (s *Service) pruneAttsFromPool(ctx context.Context, headState state.BeaconState, headBlock interfaces.ReadOnlySignedBeaconBlock) error {
func (s *Service) pruneAttsFromPool(ctx context.Context, headState state.BeaconState, headBlock interfaces.ReadOnlySignedBeaconBlock) {
for _, att := range headBlock.Block().Body().Attestations() {
if err := s.pruneCoveredAttsFromPool(ctx, headState, att); err != nil {
log.WithError(err).Warn("Could not prune attestations covered by a received block's attestation")
}
}
return nil
}
func (s *Service) pruneCoveredAttsFromPool(ctx context.Context, headState state.BeaconState, att ethpb.Att) error {

View File

@@ -126,11 +126,10 @@ func Test_pruneAttsFromPool_Electra(t *testing.T) {
// into the correct number of aggregates.
require.Equal(t, 4, len(committees))
require.NoError(t, s.pruneAttsFromPool(ctx, st, rob))
s.pruneAttsFromPool(ctx, st, rob)
require.LogsDoNotContain(t, logHook, "Could not prune attestations")
attsInPool, err := s.cfg.AttPool.UnaggregatedAttestations()
require.NoError(t, err)
attsInPool := s.cfg.AttPool.UnaggregatedAttestations()
assert.Equal(t, 0, len(attsInPool))
attsInPool = s.cfg.AttPool.AggregatedAttestations()
require.Equal(t, 1, len(attsInPool))
@@ -934,7 +933,7 @@ func TestRemoveBlockAttestationsInPool(t *testing.T) {
require.NoError(t, service.cfg.AttPool.SaveAggregatedAttestations(atts))
wsb, err := consensusblocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
require.NoError(t, service.pruneAttsFromPool(context.Background(), nil /* state not needed pre-Electra */, wsb))
service.pruneAttsFromPool(context.Background(), nil /* state not needed pre-Electra */, wsb)
require.LogsDoNotContain(t, logHook, "Could not prune attestations")
require.Equal(t, 0, service.cfg.AttPool.AggregatedAttestationCount())
}

View File

@@ -23,10 +23,7 @@ import (
func (c *AttCaches) AggregateUnaggregatedAttestations(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "operations.attestations.kv.AggregateUnaggregatedAttestations")
defer span.End()
unaggregatedAtts, err := c.UnaggregatedAttestations()
if err != nil {
return err
}
unaggregatedAtts := c.UnaggregatedAttestations()
return c.aggregateUnaggregatedAtts(ctx, unaggregatedAtts)
}

View File

@@ -53,7 +53,7 @@ func (c *AttCaches) SaveUnaggregatedAttestations(atts []ethpb.Att) error {
}
// UnaggregatedAttestations returns all the unaggregated attestations in cache.
func (c *AttCaches) UnaggregatedAttestations() ([]ethpb.Att, error) {
func (c *AttCaches) UnaggregatedAttestations() []ethpb.Att {
c.unAggregateAttLock.RLock()
defer c.unAggregateAttLock.RUnlock()
unAggregatedAtts := c.unAggregatedAtt
@@ -68,7 +68,7 @@ func (c *AttCaches) UnaggregatedAttestations() ([]ethpb.Att, error) {
atts = append(atts, att.Clone())
}
}
return atts, nil
return atts
}
// UnaggregatedAttestationsBySlotIndex returns the unaggregated attestations in cache,

View File

@@ -29,8 +29,7 @@ func TestKV_Unaggregated_UnaggregatedAttestations(t *testing.T) {
// cache a bitlist whose length is different from the attestation bitlist's length
cache.seenAtt.Set(id.String(), []bitfield.Bitlist{{0b1001}}, c.DefaultExpiration)
atts, err := cache.UnaggregatedAttestations()
require.NoError(t, err)
atts := cache.UnaggregatedAttestations()
assert.Equal(t, 0, len(atts))
})
}
@@ -169,8 +168,7 @@ func TestKV_Unaggregated_DeleteUnaggregatedAttestation(t *testing.T) {
for _, att := range atts {
assert.NoError(t, cache.DeleteUnaggregatedAttestation(att))
}
returned, err := cache.UnaggregatedAttestations()
require.NoError(t, err)
returned := cache.UnaggregatedAttestations()
assert.DeepEqual(t, []ethpb.Att{}, returned)
})
@@ -234,11 +232,10 @@ func TestKV_Unaggregated_DeleteSeenUnaggregatedAttestations(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 1, count)
assert.Equal(t, 2, cache.UnaggregatedAttestationCount())
returned, err := cache.UnaggregatedAttestations()
returned := cache.UnaggregatedAttestations()
sort.Slice(returned, func(i, j int) bool {
return bytes.Compare(returned[i].GetAggregationBits(), returned[j].GetAggregationBits()) < 0
})
require.NoError(t, err)
assert.DeepEqual(t, []ethpb.Att{atts[0], atts[2]}, returned)
})
@@ -261,8 +258,7 @@ func TestKV_Unaggregated_DeleteSeenUnaggregatedAttestations(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, 3, count)
assert.Equal(t, 0, cache.UnaggregatedAttestationCount())
returned, err := cache.UnaggregatedAttestations()
require.NoError(t, err)
returned := cache.UnaggregatedAttestations()
assert.DeepEqual(t, []ethpb.Att{}, returned)
})

View File

@@ -79,8 +79,8 @@ func (m *PoolMock) SaveUnaggregatedAttestations(atts []ethpb.Att) error {
}
// UnaggregatedAttestations --
func (m *PoolMock) UnaggregatedAttestations() ([]ethpb.Att, error) {
return m.UnaggregatedAtts, nil
func (m *PoolMock) UnaggregatedAttestations() []ethpb.Att {
return m.UnaggregatedAtts
}
// UnaggregatedAttestationsBySlotIndex --

View File

@@ -26,7 +26,7 @@ type Pool interface {
// For unaggregated attestations.
SaveUnaggregatedAttestation(att ethpb.Att) error
SaveUnaggregatedAttestations(atts []ethpb.Att) error
UnaggregatedAttestations() ([]ethpb.Att, error)
UnaggregatedAttestations() []ethpb.Att
UnaggregatedAttestationsBySlotIndex(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []*ethpb.Attestation
UnaggregatedAttestationsBySlotIndexElectra(ctx context.Context, slot primitives.Slot, committeeIndex primitives.CommitteeIndex) []*ethpb.AttestationElectra
DeleteUnaggregatedAttestation(att ethpb.Att) error

View File

@@ -61,12 +61,8 @@ func (s *Service) pruneExpiredAtts() {
if _, err := s.cfg.Pool.DeleteSeenUnaggregatedAttestations(); err != nil {
log.WithError(err).Error("Cannot delete seen attestations")
}
unAggregatedAtts, err := s.cfg.Pool.UnaggregatedAttestations()
if err != nil {
log.WithError(err).Error("Could not get unaggregated attestations")
return
}
for _, att := range unAggregatedAtts {
for _, att := range s.cfg.Pool.UnaggregatedAttestations() {
if s.expired(att.GetData().Slot) {
if err := s.cfg.Pool.DeleteUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("Could not delete expired unaggregated attestation")

View File

@@ -54,9 +54,7 @@ func TestPruneExpired_Ticker(t *testing.T) {
done := make(chan struct{}, 1)
async.RunEvery(ctx, 500*time.Millisecond, func() {
atts, err := s.cfg.Pool.UnaggregatedAttestations()
require.NoError(t, err)
for _, attestation := range atts {
for _, attestation := range s.cfg.Pool.UnaggregatedAttestations() {
if attestation.GetData().Slot == 0 {
return
}

View File

@@ -7,6 +7,7 @@ import (
"reflect"
"time"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/altair"
@@ -27,7 +28,7 @@ var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub top
// Broadcast a message to the p2p network, the message is assumed to be
// broadcasted to the current fork.
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
func (s *Service) Broadcast(ctx context.Context, msg proto.Message, pubOpts ...pubsub.PubOpt) error {
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
defer span.End()
@@ -51,7 +52,7 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
if !ok {
return errors.Errorf("message of %T does not support marshaller interface", msg)
}
return s.broadcastObject(ctx, castMsg, fmt.Sprintf(topic, forkDigest))
return s.broadcastObject(ctx, castMsg, fmt.Sprintf(topic, forkDigest), pubOpts...)
}
// BroadcastAttestation broadcasts an attestation to the p2p network, the message is assumed to be
@@ -209,7 +210,7 @@ func (s *Service) broadcastSyncCommittee(ctx context.Context, subnet uint64, sMs
// BroadcastBlob broadcasts a blob to the p2p network, the message is assumed to be
// broadcasted to the current fork and to the input subnet.
func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error {
func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar, pubOpts ...pubsub.PubOpt) error {
ctx, span := trace.StartSpan(ctx, "p2p.BroadcastBlob")
defer span.End()
if blob == nil {
@@ -223,12 +224,12 @@ func (s *Service) BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.
}
// Non-blocking broadcast, with attempts to discover a subnet peer if none available.
go s.internalBroadcastBlob(ctx, subnet, blob, forkDigest)
go s.internalBroadcastBlob(ctx, subnet, blob, forkDigest, pubOpts...)
return nil
}
func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [4]byte) {
func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blobSidecar *ethpb.BlobSidecar, forkDigest [4]byte, pubOpts ...pubsub.PubOpt) {
_, span := trace.StartSpan(ctx, "p2p.internalBroadcastBlob")
defer span.End()
ctx = trace.NewContext(context.Background(), span) // clear parent context / deadline.
@@ -262,14 +263,14 @@ func (s *Service) internalBroadcastBlob(ctx context.Context, subnet uint64, blob
}
}
if err := s.broadcastObject(ctx, blobSidecar, blobSubnetToTopic(subnet, forkDigest)); err != nil {
if err := s.broadcastObject(ctx, blobSidecar, blobSubnetToTopic(subnet, forkDigest), pubOpts...); err != nil {
log.WithError(err).Error("Failed to broadcast blob sidecar")
tracing.AnnotateError(span, err)
}
}
// method to broadcast messages to other peers in our gossip mesh.
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string) error {
func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic string, pubOpts ...pubsub.PubOpt) error {
ctx, span := trace.StartSpan(ctx, "p2p.broadcastObject")
defer span.End()
@@ -289,7 +290,7 @@ func (s *Service) broadcastObject(ctx context.Context, obj ssz.Marshaler, topic
iid := int64(id)
span = trace.AddMessageSendEvent(span, iid, messageLen /*uncompressed*/, messageLen /*compressed*/)
}
if err := s.PublishToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), buf.Bytes()); err != nil {
if err := s.PublishToTopic(ctx, topic+s.Encoding().ProtocolSuffix(), buf.Bytes(), pubOpts...); err != nil {
err := errors.Wrap(err, "could not publish message")
tracing.AnnotateError(span, err)
return err

View File

@@ -32,10 +32,10 @@ type P2P interface {
// Broadcaster broadcasts messages to peers over the p2p pubsub protocol.
type Broadcaster interface {
Broadcast(context.Context, proto.Message) error
Broadcast(context.Context, proto.Message, ...pubsub.PubOpt) error
BroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att) error
BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar, pubOpts ...pubsub.PubOpt) error
}
// SetStreamHandler configures p2p to handle streams of a certain topic ID.

View File

@@ -55,11 +55,7 @@ func (s *Server) ListAttestations(w http.ResponseWriter, r *http.Request) {
attestations = s.AttestationCache.GetAll()
} else {
attestations = s.AttestationsPool.AggregatedAttestations()
unaggAtts, err := s.AttestationsPool.UnaggregatedAttestations()
if err != nil {
httputil.HandleError(w, "Could not get unaggregated attestations: "+err.Error(), http.StatusInternalServerError)
return
}
unaggAtts := s.AttestationsPool.UnaggregatedAttestations()
attestations = append(attestations, unaggAtts...)
}
@@ -114,11 +110,7 @@ func (s *Server) ListAttestationsV2(w http.ResponseWriter, r *http.Request) {
attestations = s.AttestationCache.GetAll()
} else {
attestations = s.AttestationsPool.AggregatedAttestations()
unaggAtts, err := s.AttestationsPool.UnaggregatedAttestations()
if err != nil {
httputil.HandleError(w, "Could not get unaggregated attestations: "+err.Error(), http.StatusInternalServerError)
return
}
unaggAtts := s.AttestationsPool.UnaggregatedAttestations()
attestations = append(attestations, unaggAtts...)
}

View File

@@ -162,11 +162,7 @@ func (s *Server) aggregatedAttestation(w http.ResponseWriter, slot primitives.Sl
return nil
}
atts, err := s.AttestationsPool.UnaggregatedAttestations()
if err != nil {
httputil.HandleError(w, "Could not get unaggregated attestations: "+err.Error(), http.StatusInternalServerError)
return nil
}
atts := s.AttestationsPool.UnaggregatedAttestations()
match, err = matchingAtts(atts, slot, attDataRoot, index)
if err != nil {
httputil.HandleError(w, "Could not get matching attestations: "+err.Error(), http.StatusInternalServerError)

View File

@@ -118,8 +118,7 @@ func TestGetAggregateAttestation(t *testing.T) {
pool := attestations.NewPool()
require.NoError(t, pool.SaveUnaggregatedAttestations([]ethpbalpha.Att{unaggSlot3_Root1_1, unaggSlot3_Root1_2, unaggSlot3_Root2, unaggSlot4}), "Failed to save unaggregated attestations")
unagg, err := pool.UnaggregatedAttestations()
require.NoError(t, err)
unagg := pool.UnaggregatedAttestations()
require.Equal(t, 4, len(unagg), "Expected 4 unaggregated attestations")
require.NoError(t, pool.SaveAggregatedAttestations([]ethpbalpha.Att{aggSlot1_Root1_1, aggSlot1_Root1_2, aggSlot1_Root2, aggSlot2}), "Failed to save aggregated attestations")
agg := pool.AggregatedAttestations()
@@ -268,8 +267,7 @@ func TestGetAggregateAttestation(t *testing.T) {
pool := attestations.NewPool()
require.NoError(t, pool.SaveUnaggregatedAttestations([]ethpbalpha.Att{unaggSlot3_Root1_1, unaggSlot3_Root1_2, unaggSlot3_Root2, unaggSlot4}), "Failed to save unaggregated attestations")
unagg, err := pool.UnaggregatedAttestations()
require.NoError(t, err)
unagg := pool.UnaggregatedAttestations()
require.Equal(t, 4, len(unagg), "Expected 4 unaggregated attestations")
require.NoError(t, pool.SaveAggregatedAttestations([]ethpbalpha.Att{aggSlot1_Root1_1, aggSlot1_Root1_2, aggSlot1_Root2, aggSlot2, postElectraAtt}), "Failed to save aggregated attestations")
agg := pool.AggregatedAttestations()
@@ -373,8 +371,7 @@ func TestGetAggregateAttestation(t *testing.T) {
pool := attestations.NewPool()
require.NoError(t, pool.SaveUnaggregatedAttestations([]ethpbalpha.Att{unaggSlot3_Root1_1, unaggSlot3_Root1_2, unaggSlot3_Root2, unaggSlot4}), "Failed to save unaggregated attestations")
unagg, err := pool.UnaggregatedAttestations()
require.NoError(t, err)
unagg := pool.UnaggregatedAttestations()
require.Equal(t, 4, len(unagg), "Expected 4 unaggregated attestations")
require.NoError(t, pool.SaveAggregatedAttestations([]ethpbalpha.Att{aggSlot1_Root1_1, aggSlot1_Root1_2, aggSlot1_Root2, aggSlot2, preElectraAtt}), "Failed to save aggregated attestations")
agg := pool.AggregatedAttestations()

View File

@@ -58,6 +58,7 @@ go_library(
"//beacon-chain/operations/synccommittee:go_default_library",
"//beacon-chain/operations/voluntaryexits:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/rpc/core:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
@@ -94,6 +95,8 @@ go_library(
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_golang_protobuf//ptypes/empty",
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",

View File

@@ -1,6 +1,7 @@
package validator
import (
"bytes"
"context"
"fmt"
"strings"
@@ -10,7 +11,10 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
emptypb "github.com/golang/protobuf/ptypes/empty"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
builderapi "github.com/prysmaticlabs/prysm/v5/api/client/builder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/builder"
@@ -21,12 +25,15 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/kv"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
"github.com/prysmaticlabs/prysm/v5/network/forks"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
@@ -297,21 +304,24 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not hash tree root: %v", err)
}
bOpt, err := vs.createBatchOption(block, sidecars)
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not create option: %v", err)
}
var wg sync.WaitGroup
errChan := make(chan error, 1)
wg.Add(1)
go func() {
defer wg.Done()
if err := vs.broadcastReceiveBlock(ctx, block, root); err != nil {
if err := vs.broadcastReceiveBlock(ctx, block, root, bOpt); err != nil {
errChan <- errors.Wrap(err, "broadcast/receive block failed")
return
}
errChan <- nil
}()
if err := vs.broadcastAndReceiveBlobs(ctx, sidecars, root); err != nil {
if err := vs.broadcastAndReceiveBlobs(ctx, sidecars, root, bOpt); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast/receive blobs: %v", err)
}
@@ -323,6 +333,39 @@ func (vs *Server) ProposeBeaconBlock(ctx context.Context, req *ethpb.GenericSign
return &ethpb.ProposeResponse{BlockRoot: root[:]}, nil
}
func (vs *Server) createBatchOption(block interfaces.SignedBeaconBlock, blobs []*ethpb.BlobSidecar) (pubsub.PubOpt, error) {
sszEnc := &encoder.SszNetworkEncoder{}
pblk, err := block.Proto()
if err != nil {
return nil, err
}
buf := bytes.NewBuffer([]byte{})
if _, err := sszEnc.EncodeGossip(buf, pblk.(ssz.Marshaler)); err != nil {
return nil, err
}
genValRoot := vs.GenesisFetcher.GenesisValidatorsRoot()
currDigest, err := forks.CreateForkDigest(vs.TimeFetcher.GenesisTime(), genValRoot[:])
if err != nil {
return nil, err
}
topicStr := fmt.Sprintf(p2p.BlockSubnetTopicFormat, currDigest) + sszEnc.ProtocolSuffix()
blockID := p2p.MsgID(genValRoot[:], &pubsub_pb.Message{Data: buf.Bytes(), Topic: &topicStr})
bm := pubsub.NewBatchMessage()
bm.AddMessage(blockID)
for i, b := range blobs {
blobTopicStr := fmt.Sprintf(p2p.BlobSubnetTopicFormat, currDigest, i) + sszEnc.ProtocolSuffix()
buf = bytes.NewBuffer([]byte{})
if _, err := sszEnc.EncodeGossip(buf, b); err != nil {
return nil, err
}
blobID := p2p.MsgID(genValRoot[:], &pubsub_pb.Message{Data: buf.Bytes(), Topic: &blobTopicStr})
bm.AddMessage(blobID)
}
return pubsub.WithBatchPublishing(bm), nil
}
// handleBlindedBlock processes blinded beacon blocks.
func (vs *Server) handleBlindedBlock(ctx context.Context, block interfaces.SignedBeaconBlock) (interfaces.SignedBeaconBlock, []*ethpb.BlobSidecar, error) {
if block.Version() < version.Bellatrix {
@@ -363,12 +406,12 @@ func (vs *Server) blobSidecarsFromUnblindedBlock(block interfaces.SignedBeaconBl
}
// broadcastReceiveBlock broadcasts a block and handles its reception.
func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.SignedBeaconBlock, root [32]byte) error {
func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.SignedBeaconBlock, root [32]byte, pubOpts ...pubsub.PubOpt) error {
protoBlock, err := block.Proto()
if err != nil {
return errors.Wrap(err, "protobuf conversion failed")
}
if err := vs.P2P.Broadcast(ctx, protoBlock); err != nil {
if err := vs.P2P.Broadcast(ctx, protoBlock, pubOpts...); err != nil {
return errors.Wrap(err, "broadcast failed")
}
vs.BlockNotifier.BlockFeed().Send(&feed.Event{
@@ -379,7 +422,7 @@ func (vs *Server) broadcastReceiveBlock(ctx context.Context, block interfaces.Si
}
// broadcastAndReceiveBlobs handles the broadcasting and reception of blob sidecars.
func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethpb.BlobSidecar, root [32]byte) error {
func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethpb.BlobSidecar, root [32]byte, pubOpts ...pubsub.PubOpt) error {
eg, eCtx := errgroup.WithContext(ctx)
for i, sc := range sidecars {
// Copy the iteration instance to a local variable to give each go-routine its own copy to play with.
@@ -387,7 +430,7 @@ func (vs *Server) broadcastAndReceiveBlobs(ctx context.Context, sidecars []*ethp
subIdx := i
sCar := sc
eg.Go(func() error {
if err := vs.P2P.BroadcastBlob(eCtx, uint64(subIdx), sCar); err != nil {
if err := vs.P2P.BroadcastBlob(eCtx, uint64(subIdx), sCar, pubOpts...); err != nil {
return errors.Wrap(err, "broadcast blob failed")
}
readOnlySc, err := blocks.NewROBlobWithRoot(sCar, root)

View File

@@ -40,10 +40,7 @@ func (vs *Server) packAttestations(ctx context.Context, latestState state.Beacon
atts = vs.AttPool.AggregatedAttestations()
atts = vs.validateAndDeleteAttsInPool(ctx, latestState, atts)
uAtts, err := vs.AttPool.UnaggregatedAttestations()
if err != nil {
return nil, errors.Wrap(err, "could not get unaggregated attestations")
}
uAtts := vs.AttPool.UnaggregatedAttestations()
uAtts = vs.validateAndDeleteAttsInPool(ctx, latestState, uAtts)
atts = append(atts, uAtts...)
}

View File

@@ -2949,8 +2949,7 @@ func TestProposer_DeleteAttsInPool_Aggregated(t *testing.T) {
require.NoError(t, err)
require.NoError(t, s.deleteAttsInPool(context.Background(), append(aa, unaggregatedAtts...)))
assert.Equal(t, 0, len(s.AttPool.AggregatedAttestations()), "Did not delete aggregated attestation")
atts, err := s.AttPool.UnaggregatedAttestations()
require.NoError(t, err)
atts := s.AttPool.UnaggregatedAttestations()
assert.Equal(t, 0, len(atts), "Did not delete unaggregated attestation")
}

View File

@@ -153,8 +153,7 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) {
}
}
}()
atts, err := r.cfg.attPool.UnaggregatedAttestations()
require.NoError(t, err)
atts := r.cfg.attPool.UnaggregatedAttestations()
assert.Equal(t, 1, len(atts), "Did not save unaggregated att")
assert.DeepEqual(t, att, atts[0], "Incorrect saved att")
assert.Equal(t, 0, len(r.cfg.attPool.AggregatedAttestations()), "Did save aggregated att")
@@ -248,8 +247,7 @@ func TestProcessPendingAtts_HasBlockSaveUnAggregatedAttElectra(t *testing.T) {
}
}
}()
atts, err := r.cfg.attPool.UnaggregatedAttestations()
require.NoError(t, err)
atts := r.cfg.attPool.UnaggregatedAttestations()
require.Equal(t, 1, len(atts), "Did not save unaggregated att")
assert.DeepEqual(t, att.ToAttestationElectra(committee), atts[0], "Incorrect saved att")
assert.Equal(t, 0, len(r.cfg.attPool.AggregatedAttestations()), "Did save aggregated att")
@@ -457,8 +455,7 @@ func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) {
assert.Equal(t, 1, len(r.cfg.attPool.AggregatedAttestations()), "Did not save aggregated att")
assert.DeepEqual(t, att, r.cfg.attPool.AggregatedAttestations()[0], "Incorrect saved att")
atts, err := r.cfg.attPool.UnaggregatedAttestations()
require.NoError(t, err)
atts := r.cfg.attPool.UnaggregatedAttestations()
assert.Equal(t, 0, len(atts), "Did save aggregated att")
require.LogsContain(t, hook, "Verified and saved pending attestations to pool")
cancel()

View File

@@ -57,7 +57,6 @@ func TestBeaconAggregateProofSubscriber_CanSaveUnaggregatedAttestation(t *testin
}
require.NoError(t, r.beaconAggregateProofSubscriber(context.Background(), a))
atts, err := r.cfg.attPool.UnaggregatedAttestations()
require.NoError(t, err)
atts := r.cfg.attPool.UnaggregatedAttestations()
assert.DeepEqual(t, []ethpb.Att{a.Message.Aggregate}, atts, "Did not save unaggregated attestation")
}

View File

@@ -0,0 +1,3 @@
### Changed
- changed request object for `POST /eth/v1/beacon/states/head/validators` to omit the field if empty for satisfying other clients.

View File

@@ -1987,8 +1987,9 @@ def prysm_deps():
name = "com_github_libp2p_go_libp2p_pubsub",
build_file_proto_mode = "disable_global",
importpath = "github.com/libp2p/go-libp2p-pubsub",
sum = "h1:RmFQ2XAy3zQtbt2iNPy7Tt0/3fwTnHpCQSSnmGnt1Ps=",
version = "v0.13.0",
replace = "github.com/nisdas/go-libp2p-pubsub",
sum = "h1:s0BSHd/oXPBk16u84LYrXmmRXzoK2GqXqr4XD7dzHSU=",
version = "v0.3.3-0.20250312092335-b8aab45386f7",
)
go_repository(
name = "com_github_libp2p_go_libp2p_testing",

2
go.mod
View File

@@ -290,3 +290,5 @@ require (
)
replace github.com/json-iterator/go => github.com/prestonvanloon/go v1.1.7-0.20190722034630-4f2e55fcf87b
replace github.com/libp2p/go-libp2p-pubsub => github.com/nisdas/go-libp2p-pubsub v0.3.3-0.20250312092335-b8aab45386f7