Compare commits

...

4 Commits

Author SHA1 Message Date
aarshkshah1992
9cfddb8f4d fix changelog CI 2025-10-20 14:09:49 +04:00
aarshkshah1992
e7be659cb8 add changelog 2025-10-20 13:57:24 +04:00
Aarsh Shah
aebfd8dd80 Merge branch 'develop' into feat/use-subscribe-for-blobs 2025-10-20 13:34:30 +04:00
aarshkshah1992
8fb539d3b2 use static subscribe for blob sidecar topics 2025-10-20 13:32:07 +04:00
12 changed files with 112 additions and 105 deletions

View File

@@ -79,11 +79,6 @@ func (p subscribeParameters) logFields() logrus.Fields {
}
}
// fullTopic is the fully qualified topic string, given to gossipsub.
func (p subscribeParameters) fullTopic(subnet uint64, suffix string) string {
return fmt.Sprintf(p.topicFormat, p.nse.ForkDigest, subnet) + suffix
}
// subnetTracker keeps track of which subnets we are subscribed to, out of the set of
// possible subnets described by a `subscribeParameters`.
type subnetTracker struct {
@@ -210,19 +205,20 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
return false
}
s.spawn(func() {
s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, nse)
s.subscribe(p2p.BlockSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.BlockSubnetTopicFormat,
nse.ForkDigest), s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, nse)
s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.AggregateAndProofSubnetTopicFormat, nse.ForkDigest), s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, nse)
s.subscribe(p2p.ExitSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.ExitSubnetTopicFormat, nse.ForkDigest), s.validateVoluntaryExit, s.voluntaryExitSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, nse)
s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.ProposerSlashingSubnetTopicFormat, nse.ForkDigest), s.validateProposerSlashing, s.proposerSlashingSubscriber, nse)
})
s.spawn(func() {
s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, nse)
s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.AttesterSlashingSubnetTopicFormat, nse.ForkDigest), s.validateAttesterSlashing, s.attesterSlashingSubscriber, nse)
})
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
@@ -240,6 +236,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
s.spawn(func() {
s.subscribe(
p2p.SyncContributionAndProofSubnetTopicFormat,
s.buildTopicWithoutSubnet(p2p.SyncContributionAndProofSubnetTopicFormat, nse.ForkDigest),
s.validateSyncContributionAndProof,
s.syncContributionAndProofSubscriber,
nse,
@@ -259,6 +256,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
s.spawn(func() {
s.subscribe(
p2p.LightClientOptimisticUpdateTopicFormat,
s.buildTopicWithoutSubnet(p2p.LightClientOptimisticUpdateTopicFormat, nse.ForkDigest),
s.validateLightClientOptimisticUpdate,
noopHandler,
nse,
@@ -267,6 +265,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
s.spawn(func() {
s.subscribe(
p2p.LightClientFinalityUpdateTopicFormat,
s.buildTopicWithoutSubnet(p2p.LightClientFinalityUpdateTopicFormat, nse.ForkDigest),
s.validateLightClientFinalityUpdate,
noopHandler,
nse,
@@ -280,6 +279,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
s.spawn(func() {
s.subscribe(
p2p.BlsToExecutionChangeSubnetTopicFormat,
s.buildTopicWithoutSubnet(p2p.BlsToExecutionChangeSubnetTopicFormat, nse.ForkDigest),
s.validateBlsToExecutionChange,
s.blsToExecutionChangeSubscriber,
nse,
@@ -289,32 +289,36 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
// New gossip topic in Deneb, removed in Electra
if params.BeaconConfig().DenebForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().ElectraForkEpoch {
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.BlobSubnetTopicFormat,
validate: s.validateBlob,
handle: s.blobSubscriber,
nse: nse,
getSubnetsToJoin: func(primitives.Slot) map[uint64]bool {
return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCount)
},
for i := uint64(0); i < params.BeaconConfig().BlobsidecarSubnetCount; i++ {
subnet := i
s.spawn(func() {
topic := s.buildTopicWithSubnet(p2p.BlobSubnetTopicFormat, nse.ForkDigest, subnet)
s.subscribe(
p2p.BlobSubnetTopicFormat,
topic,
s.validateBlob,
s.blobSubscriber,
nse,
)
})
})
}
}
// New gossip topic in Electra, removed in Fulu
if params.BeaconConfig().ElectraForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().FuluForkEpoch {
s.spawn(func() {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.BlobSubnetTopicFormat,
validate: s.validateBlob,
handle: s.blobSubscriber,
nse: nse,
getSubnetsToJoin: func(currentSlot primitives.Slot) map[uint64]bool {
return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra)
},
for i := uint64(0); i < params.BeaconConfig().BlobsidecarSubnetCountElectra; i++ {
subnet := i
s.spawn(func() {
topic := s.buildTopicWithSubnet(p2p.BlobSubnetTopicFormat, nse.ForkDigest, subnet)
s.subscribe(
p2p.BlobSubnetTopicFormat,
topic,
s.validateBlob,
s.blobSubscriber,
nse,
)
})
})
}
}
// New gossip topic in Fulu.
@@ -349,27 +353,40 @@ func (s *Service) subscribeLogFields(topic string, nse params.NetworkScheduleEnt
// subscribe to a given topic with a given validator and subscription handler.
// The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandler, nse params.NetworkScheduleEntry) {
func (s *Service) subscribe(topicFormat string, fullTopic string, validator wrappedVal, handle subHandler, nse params.NetworkScheduleEntry) {
if err := s.waitForInitialSync(s.ctx); err != nil {
log.WithFields(s.subscribeLogFields(topic, nse)).WithError(err).Debug("Context cancelled while waiting for initial sync, not subscribing to topic")
log.WithFields(s.subscribeLogFields(topicFormat, nse)).WithError(err).Debug("Context cancelled while waiting for initial sync, not subscribing to topic")
return
}
// Check if this subscribe request is still valid - we may have crossed another fork epoch while waiting for initial sync.
if s.subscriptionRequestExpired(nse) {
// If we are already past the next fork epoch, do not subscribe to this topic.
log.WithFields(s.subscribeLogFields(topic, nse)).Debug("Not subscribing to topic as we are already past the next fork epoch")
log.WithFields(s.subscribeLogFields(topicFormat, nse)).Debug("Not subscribing to topic as we are already past the next fork epoch")
return
}
base := p2p.GossipTopicMappings(topic, nse.Epoch)
base := p2p.GossipTopicMappings(topicFormat, nse.Epoch)
if base == nil {
// Impossible condition as it would mean topic does not exist.
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) // lint:nopanic -- Impossible condition.
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) // lint:nopanic -- Impossible condition.
}
s.subscribeWithBase(s.addDigestToTopic(topic, nse.ForkDigest), validator, handle)
s.subscribeWithBase(fullTopic, validator, handle)
}
func (s *Service) buildTopicWithoutSubnet(topicFormat string, digest [4]byte) string {
if !strings.Contains(topicFormat, "%x") {
log.Error("Topic does not have appropriate formatter for digest")
}
return (fmt.Sprintf(topicFormat, digest)) + s.cfg.p2p.Encoding().ProtocolSuffix()
}
func (s *Service) buildTopicWithSubnet(topicFormat string, digest [4]byte, subnet uint64) string {
if !strings.Contains(topicFormat, "%x") {
log.Error("Topic does not have appropriate formatter for digest")
}
return (fmt.Sprintf(topicFormat, digest, subnet)) + s.cfg.p2p.Encoding().ProtocolSuffix()
}
func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription {
topic += s.cfg.p2p.Encoding().ProtocolSuffix()
log := log.WithField("topic", topic)
// Do not resubscribe already seen subscriptions.
@@ -532,7 +549,8 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
func (s *Service) pruneNotWanted(t *subnetTracker, wantedSubnets map[uint64]bool) {
for _, subnet := range t.unwanted(wantedSubnets) {
t.cancelSubscription(subnet)
s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix()))
fullTopic := s.buildTopicWithSubnet(t.topicFormat, t.nse.ForkDigest, subnet)
s.unSubscribeFromTopic(fullTopic)
}
}
@@ -579,8 +597,7 @@ func (s *Service) trySubscribeSubnets(t *subnetTracker) {
subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot())
s.pruneNotWanted(t, subnetsToJoin)
for _, subnet := range t.missing(subnetsToJoin) {
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
topic := t.fullTopic(subnet, "")
topic := s.buildTopicWithSubnet(t.topicFormat, t.nse.ForkDigest, subnet)
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
}
}
@@ -782,22 +799,6 @@ func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID {
return newPeers
}
// Add fork digest to topic.
func (*Service) addDigestToTopic(topic string, digest [4]byte) string {
if !strings.Contains(topic, "%x") {
log.Error("Topic does not have appropriate formatter for digest")
}
return fmt.Sprintf(topic, digest)
}
// Add the digest and index to subnet topic.
func (*Service) addDigestAndIndexToTopic(topic string, digest [4]byte, idx uint64) string {
if !strings.Contains(topic, "%x") {
log.Error("Topic does not have appropriate formatter for digest")
}
return fmt.Sprintf(topic, digest, idx)
}
func (s *Service) currentForkDigest() ([4]byte, error) {
return params.ForkDigest(s.cfg.clock.CurrentEpoch()), nil
}

View File

@@ -64,7 +64,8 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest)
r.subscribe(topic, fullTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
m, ok := msg.(*pb.SignedVoluntaryExit)
assert.Equal(t, true, ok, "Object is not of type *pb.SignedVoluntaryExit")
if m.Exit == nil || m.Exit.Epoch != 55 {
@@ -110,12 +111,12 @@ func TestSubscribe_UnsubscribeTopic(t *testing.T) {
p2pService.Digest = nse.ForkDigest
topic := "/eth2/%x/voluntary_exit"
r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest)
r.subscribe(topic, fullTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
return nil
}, nse)
r.markForChainStart()
fullTopic := fmt.Sprintf(topic, p2pService.Digest) + p2pService.Encoding().ProtocolSuffix()
assert.Equal(t, true, r.subHandler.topicExists(fullTopic))
topics := p2pService.PubSub().GetTopics()
assert.Equal(t, fullTopic, topics[0])
@@ -162,7 +163,8 @@ func TestSubscribe_ReceivesAttesterSlashing(t *testing.T) {
wg.Add(1)
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
p2pService.Digest = nse.ForkDigest
r.subscribe(topic, r.noopValidator, func(ctx context.Context, msg proto.Message) error {
fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest)
r.subscribe(topic, fullTopic, r.noopValidator, func(ctx context.Context, msg proto.Message) error {
require.NoError(t, r.attesterSlashingSubscriber(ctx, msg))
wg.Done()
return nil
@@ -217,7 +219,8 @@ func TestSubscribe_ReceivesProposerSlashing(t *testing.T) {
params.OverrideBeaconConfig(params.MainnetConfig())
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
p2pService.Digest = nse.ForkDigest
r.subscribe(topic, r.noopValidator, func(ctx context.Context, msg proto.Message) error {
fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest)
r.subscribe(topic, fullTopic, r.noopValidator, func(ctx context.Context, msg proto.Message) error {
require.NoError(t, r.proposerSlashingSubscriber(ctx, msg))
wg.Done()
return nil
@@ -266,7 +269,8 @@ func TestSubscribe_HandlesPanic(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest)
r.subscribe(topic, fullTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
defer wg.Done()
panic("bad")
}, nse)
@@ -305,7 +309,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
// committee index 1
c1 := uint64(1)
fullTopic := params.fullTopic(c1, r.cfg.p2p.Encoding().ProtocolSuffix())
fullTopic := r.buildTopicWithSubnet(params.topicFormat, params.nse.ForkDigest, c1)
_, topVal := r.wrapAndReportValidation(fullTopic, r.noopValidator)
require.NoError(t, r.cfg.p2p.PubSub().RegisterTopicValidator(fullTopic, topVal))
sub1, err := r.cfg.p2p.SubscribeToTopic(fullTopic)
@@ -314,7 +318,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
// committee index 2
c2 := uint64(2)
fullTopic = params.fullTopic(c2, r.cfg.p2p.Encoding().ProtocolSuffix())
fullTopic = r.buildTopicWithSubnet(params.topicFormat, params.nse.ForkDigest, c2)
_, topVal = r.wrapAndReportValidation(fullTopic, r.noopValidator)
err = r.cfg.p2p.PubSub().RegisterTopicValidator(fullTopic, topVal)
require.NoError(t, err)
@@ -484,11 +488,11 @@ func TestFilterSubnetPeers(t *testing.T) {
defer cache.SubnetIDs.EmptyAllCaches()
digest, err := r.currentForkDigest()
assert.NoError(t, err)
defaultTopic := "/eth2/%x/beacon_attestation_%d" + r.cfg.p2p.Encoding().ProtocolSuffix()
subnet10 := r.addDigestAndIndexToTopic(defaultTopic, digest, 10)
defaultTopic := "/eth2/%x/beacon_attestation_%d"
subnet10 := r.buildTopicWithSubnet(defaultTopic, digest, 10)
cache.SubnetIDs.AddAggregatorSubnetID(currSlot, 10)
subnet20 := r.addDigestAndIndexToTopic(defaultTopic, digest, 20)
subnet20 := r.buildTopicWithSubnet(defaultTopic, digest, 20)
cache.SubnetIDs.AddAttesterSubnetID(currSlot, 20)
p1 := createPeer(t, subnet10)

View File

@@ -82,7 +82,7 @@ func FuzzValidateBeaconBlockPubSub_Phase0(f *testing.F) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(f, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
f.Add("junk", []byte("junk"), buf.Bytes(), []byte(topic))
f.Fuzz(func(t *testing.T, pid string, from, data, topic []byte) {
@@ -166,7 +166,7 @@ func FuzzValidateBeaconBlockPubSub_Altair(f *testing.F) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(f, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
f.Add("junk", []byte("junk"), buf.Bytes(), []byte(topic))
f.Fuzz(func(t *testing.T, pid string, from, data, topic []byte) {
@@ -250,7 +250,7 @@ func FuzzValidateBeaconBlockPubSub_Bellatrix(f *testing.F) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(f, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
f.Add("junk", []byte("junk"), buf.Bytes(), []byte(topic))
f.Fuzz(func(t *testing.T, pid string, from, data, topic []byte) {

View File

@@ -488,7 +488,7 @@ func TestValidateAggregateAndProof_CanValidate(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)]
d, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, d)
topic = r.buildTopicWithoutSubnet(topic, d)
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -591,7 +591,7 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)]
d, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, d)
topic = r.buildTopicWithoutSubnet(topic, d)
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),

View File

@@ -101,7 +101,7 @@ func TestValidateAttesterSlashing_ValidSlashing(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
d, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, d)
topic = r.buildTopicWithoutSubnet(topic, d)
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -146,7 +146,7 @@ func TestValidateAttesterSlashing_ValidOldSlashing(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
d, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, d)
topic = r.buildTopicWithoutSubnet(topic, d)
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -191,7 +191,7 @@ func TestValidateAttesterSlashing_InvalidSlashing_WithdrawableEpoch(t *testing.T
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
d, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, d)
topic = r.buildTopicWithoutSubnet(topic, d)
msg := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -240,7 +240,7 @@ func TestValidateAttesterSlashing_CanFilter(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.AttesterSlashing{})]
d, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, d)
topic = r.buildTopicWithoutSubnet(topic, d)
buf := new(bytes.Buffer)
_, err = p.Encoding().EncodeGossip(buf, &ethpb.AttesterSlashing{
Attestation_1: util.HydrateIndexedAttestation(&ethpb.IndexedAttestation{

View File

@@ -95,7 +95,7 @@ func TestValidateBeaconBlockPubSub_InvalidSignature(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -165,7 +165,7 @@ func TestValidateBeaconBlockPubSub_InvalidSignature_MarksBlockAsBad(t *testing.T
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -212,7 +212,7 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -275,7 +275,7 @@ func TestValidateBeaconBlockPubSub_CanRecoverStateSummary(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -341,7 +341,7 @@ func TestValidateBeaconBlockPubSub_IsInCache(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -407,7 +407,7 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -476,7 +476,7 @@ func TestValidateBeaconBlockPubSub_WithLookahead(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -544,7 +544,7 @@ func TestValidateBeaconBlockPubSub_AdvanceEpochsForState(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -653,7 +653,7 @@ func TestValidateBeaconBlockPubSub_IgnoreAndQueueBlocksFromNearFuture(t *testing
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -704,7 +704,7 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -755,7 +755,7 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromThePast(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -834,7 +834,7 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msgClone)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -975,7 +975,7 @@ func TestValidateBeaconBlockPubSub_ParentNotFinalizedDescendant(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -1041,7 +1041,7 @@ func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -1135,7 +1135,7 @@ func TestValidateBeaconBlockPubSub_InsertValidPendingBlock(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -1220,7 +1220,7 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromBadParent(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, digest)
topic = r.buildTopicWithoutSubnet(topic, digest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -1323,7 +1323,7 @@ func TestValidateBeaconBlockPubSub_ValidExecutionPayload(t *testing.T) {
genesisValidatorsRoot := r.cfg.clock.GenesisValidatorsRoot()
BellatrixDigest, err := signing.ComputeForkDigest(params.BeaconConfig().BellatrixForkVersion, genesisValidatorsRoot[:])
require.NoError(t, err)
topic = r.addDigestToTopic(topic, BellatrixDigest)
topic = r.buildTopicWithoutSubnet(topic, BellatrixDigest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -1395,7 +1395,7 @@ func TestValidateBeaconBlockPubSub_InvalidPayloadTimestamp(t *testing.T) {
genesisValidatorsRoot := r.cfg.clock.GenesisValidatorsRoot()
BellatrixDigest, err := signing.ComputeForkDigest(params.BeaconConfig().BellatrixForkVersion, genesisValidatorsRoot[:])
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, BellatrixDigest)
topic = r.buildTopicWithoutSubnet(topic, BellatrixDigest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -1558,7 +1558,7 @@ func Test_validateBeaconBlockProcessingWhenParentIsOptimistic(t *testing.T) {
genesisValidatorsRoot := r.cfg.clock.GenesisValidatorsRoot()
BellatrixDigest, err := signing.ComputeForkDigest(params.BeaconConfig().BellatrixForkVersion, genesisValidatorsRoot[:])
require.NoError(t, err)
topic = r.addDigestToTopic(topic, BellatrixDigest)
topic = r.buildTopicWithoutSubnet(topic, BellatrixDigest)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),

View File

@@ -70,7 +70,7 @@ func TestValidateBlob_InvalidMessageType(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := s.currentForkDigest()
require.NoError(t, err)
topic = s.addDigestToTopic(topic, digest)
topic = s.buildTopicWithoutSubnet(topic, digest)
result, err := s.validateBlob(ctx, "", &pubsub.Message{
Message: &pb.Message{
Data: buf.Bytes(),
@@ -129,7 +129,7 @@ func TestValidateBlob_AlreadySeenInCache(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(b)]
digest, err := s.currentForkDigest()
require.NoError(t, err)
topic = s.addDigestAndIndexToTopic(topic, digest, 0)
topic = s.buildTopicWithSubnet(topic, digest, 0)
s.setSeenBlobIndex(sc.Slot(), sc.SignedBlockHeader.Header.ProposerIndex, 0)
result, err := s.validateBlob(ctx, "", &pubsub.Message{
@@ -159,7 +159,7 @@ func TestValidateBlob_InvalidTopicIndex(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := s.currentForkDigest()
require.NoError(t, err)
topic = s.addDigestAndIndexToTopic(topic, digest, 1)
topic = s.buildTopicWithSubnet(topic, digest, 1)
result, err := s.validateBlob(ctx, "", &pubsub.Message{
Message: &pb.Message{
Data: buf.Bytes(),
@@ -274,7 +274,7 @@ func TestValidateBlob_ErrorPathsWithMock(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
digest, err := s.currentForkDigest()
require.NoError(t, err)
topic = s.addDigestAndIndexToTopic(topic, digest, 0)
topic = s.buildTopicWithSubnet(topic, digest, 0)
result, err := s.validateBlob(ctx, "", &pubsub.Message{
Message: &pb.Message{
Data: buf.Bytes(),

View File

@@ -85,7 +85,7 @@ func TestValidateDataColumn(t *testing.T) {
digest, err := service.currentForkDigest()
require.NoError(t, err)
topic = service.addDigestToTopic(topic, digest)
topic = service.buildTopicWithoutSubnet(topic, digest)
message := &pubsub.Message{Message: &pb.Message{Data: buf.Bytes(), Topic: &topic}}

View File

@@ -133,7 +133,7 @@ func TestValidateLightClientOptimisticUpdate(t *testing.T) {
topic := p2p.LightClientOptimisticUpdateTopicFormat
digest, err := s.currentForkDigest()
require.NoError(t, err)
topic = s.addDigestToTopic(topic, digest)
topic = s.buildTopicWithoutSubnet(topic, digest)
r, err := s.validateLightClientOptimisticUpdate(ctx, "", &pubsub.Message{
Message: &pb.Message{
@@ -259,7 +259,7 @@ func TestValidateLightClientFinalityUpdate(t *testing.T) {
topic := p2p.LightClientFinalityUpdateTopicFormat
digest, err := s.currentForkDigest()
require.NoError(t, err)
topic = s.addDigestToTopic(topic, digest)
topic = s.buildTopicWithoutSubnet(topic, digest)
r, err := s.validateLightClientFinalityUpdate(ctx, "", &pubsub.Message{
Message: &pb.Message{

View File

@@ -132,7 +132,7 @@ func TestValidateProposerSlashing_ValidSlashing(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
d, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, d)
topic = r.buildTopicWithoutSubnet(topic, d)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
@@ -175,7 +175,7 @@ func TestValidateProposerSlashing_ValidOldSlashing(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
d, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, d)
topic = r.buildTopicWithoutSubnet(topic, d)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),

View File

@@ -104,7 +104,7 @@ func TestValidateVoluntaryExit_ValidExit(t *testing.T) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(exit)]
d, err := r.currentForkDigest()
assert.NoError(t, err)
topic = r.addDigestToTopic(topic, d)
topic = r.buildTopicWithoutSubnet(topic, d)
m := &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),

View File

@@ -0,0 +1,2 @@
### Changed
- Use static subscription for blob sidecar topics instead of dynamic subnet management.