mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 13:58:09 -05:00
Compare commits
4 Commits
backfill-d
...
feat/use-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9cfddb8f4d | ||
|
|
e7be659cb8 | ||
|
|
aebfd8dd80 | ||
|
|
8fb539d3b2 |
@@ -79,11 +79,6 @@ func (p subscribeParameters) logFields() logrus.Fields {
|
||||
}
|
||||
}
|
||||
|
||||
// fullTopic is the fully qualified topic string, given to gossipsub.
|
||||
func (p subscribeParameters) fullTopic(subnet uint64, suffix string) string {
|
||||
return fmt.Sprintf(p.topicFormat, p.nse.ForkDigest, subnet) + suffix
|
||||
}
|
||||
|
||||
// subnetTracker keeps track of which subnets we are subscribed to, out of the set of
|
||||
// possible subnets described by a `subscribeParameters`.
|
||||
type subnetTracker struct {
|
||||
@@ -210,19 +205,20 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
|
||||
return false
|
||||
}
|
||||
s.spawn(func() {
|
||||
s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, nse)
|
||||
s.subscribe(p2p.BlockSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.BlockSubnetTopicFormat,
|
||||
nse.ForkDigest), s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, nse)
|
||||
})
|
||||
s.spawn(func() {
|
||||
s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, nse)
|
||||
s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.AggregateAndProofSubnetTopicFormat, nse.ForkDigest), s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, nse)
|
||||
})
|
||||
s.spawn(func() {
|
||||
s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, nse)
|
||||
s.subscribe(p2p.ExitSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.ExitSubnetTopicFormat, nse.ForkDigest), s.validateVoluntaryExit, s.voluntaryExitSubscriber, nse)
|
||||
})
|
||||
s.spawn(func() {
|
||||
s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, nse)
|
||||
s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.ProposerSlashingSubnetTopicFormat, nse.ForkDigest), s.validateProposerSlashing, s.proposerSlashingSubscriber, nse)
|
||||
})
|
||||
s.spawn(func() {
|
||||
s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, nse)
|
||||
s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.buildTopicWithoutSubnet(p2p.AttesterSlashingSubnetTopicFormat, nse.ForkDigest), s.validateAttesterSlashing, s.attesterSlashingSubscriber, nse)
|
||||
})
|
||||
s.spawn(func() {
|
||||
s.subscribeWithParameters(subscribeParameters{
|
||||
@@ -240,6 +236,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
|
||||
s.spawn(func() {
|
||||
s.subscribe(
|
||||
p2p.SyncContributionAndProofSubnetTopicFormat,
|
||||
s.buildTopicWithoutSubnet(p2p.SyncContributionAndProofSubnetTopicFormat, nse.ForkDigest),
|
||||
s.validateSyncContributionAndProof,
|
||||
s.syncContributionAndProofSubscriber,
|
||||
nse,
|
||||
@@ -259,6 +256,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
|
||||
s.spawn(func() {
|
||||
s.subscribe(
|
||||
p2p.LightClientOptimisticUpdateTopicFormat,
|
||||
s.buildTopicWithoutSubnet(p2p.LightClientOptimisticUpdateTopicFormat, nse.ForkDigest),
|
||||
s.validateLightClientOptimisticUpdate,
|
||||
noopHandler,
|
||||
nse,
|
||||
@@ -267,6 +265,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
|
||||
s.spawn(func() {
|
||||
s.subscribe(
|
||||
p2p.LightClientFinalityUpdateTopicFormat,
|
||||
s.buildTopicWithoutSubnet(p2p.LightClientFinalityUpdateTopicFormat, nse.ForkDigest),
|
||||
s.validateLightClientFinalityUpdate,
|
||||
noopHandler,
|
||||
nse,
|
||||
@@ -280,6 +279,7 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
|
||||
s.spawn(func() {
|
||||
s.subscribe(
|
||||
p2p.BlsToExecutionChangeSubnetTopicFormat,
|
||||
s.buildTopicWithoutSubnet(p2p.BlsToExecutionChangeSubnetTopicFormat, nse.ForkDigest),
|
||||
s.validateBlsToExecutionChange,
|
||||
s.blsToExecutionChangeSubscriber,
|
||||
nse,
|
||||
@@ -289,32 +289,36 @@ func (s *Service) registerSubscribers(nse params.NetworkScheduleEntry) bool {
|
||||
|
||||
// New gossip topic in Deneb, removed in Electra
|
||||
if params.BeaconConfig().DenebForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().ElectraForkEpoch {
|
||||
s.spawn(func() {
|
||||
s.subscribeWithParameters(subscribeParameters{
|
||||
topicFormat: p2p.BlobSubnetTopicFormat,
|
||||
validate: s.validateBlob,
|
||||
handle: s.blobSubscriber,
|
||||
nse: nse,
|
||||
getSubnetsToJoin: func(primitives.Slot) map[uint64]bool {
|
||||
return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCount)
|
||||
},
|
||||
for i := uint64(0); i < params.BeaconConfig().BlobsidecarSubnetCount; i++ {
|
||||
subnet := i
|
||||
s.spawn(func() {
|
||||
topic := s.buildTopicWithSubnet(p2p.BlobSubnetTopicFormat, nse.ForkDigest, subnet)
|
||||
s.subscribe(
|
||||
p2p.BlobSubnetTopicFormat,
|
||||
topic,
|
||||
s.validateBlob,
|
||||
s.blobSubscriber,
|
||||
nse,
|
||||
)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// New gossip topic in Electra, removed in Fulu
|
||||
if params.BeaconConfig().ElectraForkEpoch <= nse.Epoch && nse.Epoch < params.BeaconConfig().FuluForkEpoch {
|
||||
s.spawn(func() {
|
||||
s.subscribeWithParameters(subscribeParameters{
|
||||
topicFormat: p2p.BlobSubnetTopicFormat,
|
||||
validate: s.validateBlob,
|
||||
handle: s.blobSubscriber,
|
||||
nse: nse,
|
||||
getSubnetsToJoin: func(currentSlot primitives.Slot) map[uint64]bool {
|
||||
return mapFromCount(params.BeaconConfig().BlobsidecarSubnetCountElectra)
|
||||
},
|
||||
for i := uint64(0); i < params.BeaconConfig().BlobsidecarSubnetCountElectra; i++ {
|
||||
subnet := i
|
||||
s.spawn(func() {
|
||||
topic := s.buildTopicWithSubnet(p2p.BlobSubnetTopicFormat, nse.ForkDigest, subnet)
|
||||
s.subscribe(
|
||||
p2p.BlobSubnetTopicFormat,
|
||||
topic,
|
||||
s.validateBlob,
|
||||
s.blobSubscriber,
|
||||
nse,
|
||||
)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// New gossip topic in Fulu.
|
||||
@@ -349,27 +353,40 @@ func (s *Service) subscribeLogFields(topic string, nse params.NetworkScheduleEnt
|
||||
|
||||
// subscribe to a given topic with a given validator and subscription handler.
|
||||
// The base protobuf message is used to initialize new messages for decoding.
|
||||
func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandler, nse params.NetworkScheduleEntry) {
|
||||
func (s *Service) subscribe(topicFormat string, fullTopic string, validator wrappedVal, handle subHandler, nse params.NetworkScheduleEntry) {
|
||||
if err := s.waitForInitialSync(s.ctx); err != nil {
|
||||
log.WithFields(s.subscribeLogFields(topic, nse)).WithError(err).Debug("Context cancelled while waiting for initial sync, not subscribing to topic")
|
||||
log.WithFields(s.subscribeLogFields(topicFormat, nse)).WithError(err).Debug("Context cancelled while waiting for initial sync, not subscribing to topic")
|
||||
return
|
||||
}
|
||||
// Check if this subscribe request is still valid - we may have crossed another fork epoch while waiting for initial sync.
|
||||
if s.subscriptionRequestExpired(nse) {
|
||||
// If we are already past the next fork epoch, do not subscribe to this topic.
|
||||
log.WithFields(s.subscribeLogFields(topic, nse)).Debug("Not subscribing to topic as we are already past the next fork epoch")
|
||||
log.WithFields(s.subscribeLogFields(topicFormat, nse)).Debug("Not subscribing to topic as we are already past the next fork epoch")
|
||||
return
|
||||
}
|
||||
base := p2p.GossipTopicMappings(topic, nse.Epoch)
|
||||
base := p2p.GossipTopicMappings(topicFormat, nse.Epoch)
|
||||
if base == nil {
|
||||
// Impossible condition as it would mean topic does not exist.
|
||||
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) // lint:nopanic -- Impossible condition.
|
||||
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) // lint:nopanic -- Impossible condition.
|
||||
}
|
||||
s.subscribeWithBase(s.addDigestToTopic(topic, nse.ForkDigest), validator, handle)
|
||||
s.subscribeWithBase(fullTopic, validator, handle)
|
||||
}
|
||||
|
||||
func (s *Service) buildTopicWithoutSubnet(topicFormat string, digest [4]byte) string {
|
||||
if !strings.Contains(topicFormat, "%x") {
|
||||
log.Error("Topic does not have appropriate formatter for digest")
|
||||
}
|
||||
return (fmt.Sprintf(topicFormat, digest)) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
}
|
||||
|
||||
func (s *Service) buildTopicWithSubnet(topicFormat string, digest [4]byte, subnet uint64) string {
|
||||
if !strings.Contains(topicFormat, "%x") {
|
||||
log.Error("Topic does not have appropriate formatter for digest")
|
||||
}
|
||||
return (fmt.Sprintf(topicFormat, digest, subnet)) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
}
|
||||
|
||||
func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription {
|
||||
topic += s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
log := log.WithField("topic", topic)
|
||||
|
||||
// Do not resubscribe already seen subscriptions.
|
||||
@@ -532,7 +549,8 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
|
||||
func (s *Service) pruneNotWanted(t *subnetTracker, wantedSubnets map[uint64]bool) {
|
||||
for _, subnet := range t.unwanted(wantedSubnets) {
|
||||
t.cancelSubscription(subnet)
|
||||
s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix()))
|
||||
fullTopic := s.buildTopicWithSubnet(t.topicFormat, t.nse.ForkDigest, subnet)
|
||||
s.unSubscribeFromTopic(fullTopic)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -579,8 +597,7 @@ func (s *Service) trySubscribeSubnets(t *subnetTracker) {
|
||||
subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot())
|
||||
s.pruneNotWanted(t, subnetsToJoin)
|
||||
for _, subnet := range t.missing(subnetsToJoin) {
|
||||
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
|
||||
topic := t.fullTopic(subnet, "")
|
||||
topic := s.buildTopicWithSubnet(t.topicFormat, t.nse.ForkDigest, subnet)
|
||||
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
|
||||
}
|
||||
}
|
||||
@@ -782,22 +799,6 @@ func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID {
|
||||
return newPeers
|
||||
}
|
||||
|
||||
// Add fork digest to topic.
|
||||
func (*Service) addDigestToTopic(topic string, digest [4]byte) string {
|
||||
if !strings.Contains(topic, "%x") {
|
||||
log.Error("Topic does not have appropriate formatter for digest")
|
||||
}
|
||||
return fmt.Sprintf(topic, digest)
|
||||
}
|
||||
|
||||
// Add the digest and index to subnet topic.
|
||||
func (*Service) addDigestAndIndexToTopic(topic string, digest [4]byte, idx uint64) string {
|
||||
if !strings.Contains(topic, "%x") {
|
||||
log.Error("Topic does not have appropriate formatter for digest")
|
||||
}
|
||||
return fmt.Sprintf(topic, digest, idx)
|
||||
}
|
||||
|
||||
func (s *Service) currentForkDigest() ([4]byte, error) {
|
||||
return params.ForkDigest(s.cfg.clock.CurrentEpoch()), nil
|
||||
}
|
||||
|
||||
@@ -64,7 +64,8 @@ func TestSubscribe_ReceivesValidMessage(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
|
||||
fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest)
|
||||
r.subscribe(topic, fullTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
|
||||
m, ok := msg.(*pb.SignedVoluntaryExit)
|
||||
assert.Equal(t, true, ok, "Object is not of type *pb.SignedVoluntaryExit")
|
||||
if m.Exit == nil || m.Exit.Epoch != 55 {
|
||||
@@ -110,12 +111,12 @@ func TestSubscribe_UnsubscribeTopic(t *testing.T) {
|
||||
p2pService.Digest = nse.ForkDigest
|
||||
topic := "/eth2/%x/voluntary_exit"
|
||||
|
||||
r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
|
||||
fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest)
|
||||
r.subscribe(topic, fullTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
|
||||
return nil
|
||||
}, nse)
|
||||
r.markForChainStart()
|
||||
|
||||
fullTopic := fmt.Sprintf(topic, p2pService.Digest) + p2pService.Encoding().ProtocolSuffix()
|
||||
assert.Equal(t, true, r.subHandler.topicExists(fullTopic))
|
||||
topics := p2pService.PubSub().GetTopics()
|
||||
assert.Equal(t, fullTopic, topics[0])
|
||||
@@ -162,7 +163,8 @@ func TestSubscribe_ReceivesAttesterSlashing(t *testing.T) {
|
||||
wg.Add(1)
|
||||
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
|
||||
p2pService.Digest = nse.ForkDigest
|
||||
r.subscribe(topic, r.noopValidator, func(ctx context.Context, msg proto.Message) error {
|
||||
fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest)
|
||||
r.subscribe(topic, fullTopic, r.noopValidator, func(ctx context.Context, msg proto.Message) error {
|
||||
require.NoError(t, r.attesterSlashingSubscriber(ctx, msg))
|
||||
wg.Done()
|
||||
return nil
|
||||
@@ -217,7 +219,8 @@ func TestSubscribe_ReceivesProposerSlashing(t *testing.T) {
|
||||
params.OverrideBeaconConfig(params.MainnetConfig())
|
||||
nse := params.GetNetworkScheduleEntry(r.cfg.clock.CurrentEpoch())
|
||||
p2pService.Digest = nse.ForkDigest
|
||||
r.subscribe(topic, r.noopValidator, func(ctx context.Context, msg proto.Message) error {
|
||||
fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest)
|
||||
r.subscribe(topic, fullTopic, r.noopValidator, func(ctx context.Context, msg proto.Message) error {
|
||||
require.NoError(t, r.proposerSlashingSubscriber(ctx, msg))
|
||||
wg.Done()
|
||||
return nil
|
||||
@@ -266,7 +269,8 @@ func TestSubscribe_HandlesPanic(t *testing.T) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
r.subscribe(topic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
|
||||
fullTopic := r.buildTopicWithoutSubnet(topic, nse.ForkDigest)
|
||||
r.subscribe(topic, fullTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
|
||||
defer wg.Done()
|
||||
panic("bad")
|
||||
}, nse)
|
||||
@@ -305,7 +309,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
|
||||
|
||||
// committee index 1
|
||||
c1 := uint64(1)
|
||||
fullTopic := params.fullTopic(c1, r.cfg.p2p.Encoding().ProtocolSuffix())
|
||||
fullTopic := r.buildTopicWithSubnet(params.topicFormat, params.nse.ForkDigest, c1)
|
||||
_, topVal := r.wrapAndReportValidation(fullTopic, r.noopValidator)
|
||||
require.NoError(t, r.cfg.p2p.PubSub().RegisterTopicValidator(fullTopic, topVal))
|
||||
sub1, err := r.cfg.p2p.SubscribeToTopic(fullTopic)
|
||||
@@ -314,7 +318,7 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
|
||||
|
||||
// committee index 2
|
||||
c2 := uint64(2)
|
||||
fullTopic = params.fullTopic(c2, r.cfg.p2p.Encoding().ProtocolSuffix())
|
||||
fullTopic = r.buildTopicWithSubnet(params.topicFormat, params.nse.ForkDigest, c2)
|
||||
_, topVal = r.wrapAndReportValidation(fullTopic, r.noopValidator)
|
||||
err = r.cfg.p2p.PubSub().RegisterTopicValidator(fullTopic, topVal)
|
||||
require.NoError(t, err)
|
||||
@@ -484,11 +488,11 @@ func TestFilterSubnetPeers(t *testing.T) {
|
||||
defer cache.SubnetIDs.EmptyAllCaches()
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
defaultTopic := "/eth2/%x/beacon_attestation_%d" + r.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
subnet10 := r.addDigestAndIndexToTopic(defaultTopic, digest, 10)
|
||||
defaultTopic := "/eth2/%x/beacon_attestation_%d"
|
||||
subnet10 := r.buildTopicWithSubnet(defaultTopic, digest, 10)
|
||||
cache.SubnetIDs.AddAggregatorSubnetID(currSlot, 10)
|
||||
|
||||
subnet20 := r.addDigestAndIndexToTopic(defaultTopic, digest, 20)
|
||||
subnet20 := r.buildTopicWithSubnet(defaultTopic, digest, 20)
|
||||
cache.SubnetIDs.AddAttesterSubnetID(currSlot, 20)
|
||||
|
||||
p1 := createPeer(t, subnet10)
|
||||
|
||||
@@ -82,7 +82,7 @@ func FuzzValidateBeaconBlockPubSub_Phase0(f *testing.F) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(f, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
|
||||
f.Add("junk", []byte("junk"), buf.Bytes(), []byte(topic))
|
||||
f.Fuzz(func(t *testing.T, pid string, from, data, topic []byte) {
|
||||
@@ -166,7 +166,7 @@ func FuzzValidateBeaconBlockPubSub_Altair(f *testing.F) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(f, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
|
||||
f.Add("junk", []byte("junk"), buf.Bytes(), []byte(topic))
|
||||
f.Fuzz(func(t *testing.T, pid string, from, data, topic []byte) {
|
||||
@@ -250,7 +250,7 @@ func FuzzValidateBeaconBlockPubSub_Bellatrix(f *testing.F) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(f, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
|
||||
f.Add("junk", []byte("junk"), buf.Bytes(), []byte(topic))
|
||||
f.Fuzz(func(t *testing.T, pid string, from, data, topic []byte) {
|
||||
|
||||
@@ -488,7 +488,7 @@ func TestValidateAggregateAndProof_CanValidate(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)]
|
||||
d, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, d)
|
||||
topic = r.buildTopicWithoutSubnet(topic, d)
|
||||
msg := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -591,7 +591,7 @@ func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)]
|
||||
d, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, d)
|
||||
topic = r.buildTopicWithoutSubnet(topic, d)
|
||||
msg := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
|
||||
@@ -101,7 +101,7 @@ func TestValidateAttesterSlashing_ValidSlashing(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
|
||||
d, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, d)
|
||||
topic = r.buildTopicWithoutSubnet(topic, d)
|
||||
msg := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -146,7 +146,7 @@ func TestValidateAttesterSlashing_ValidOldSlashing(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
|
||||
d, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, d)
|
||||
topic = r.buildTopicWithoutSubnet(topic, d)
|
||||
msg := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -191,7 +191,7 @@ func TestValidateAttesterSlashing_InvalidSlashing_WithdrawableEpoch(t *testing.T
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
|
||||
d, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, d)
|
||||
topic = r.buildTopicWithoutSubnet(topic, d)
|
||||
msg := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -240,7 +240,7 @@ func TestValidateAttesterSlashing_CanFilter(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.AttesterSlashing{})]
|
||||
d, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, d)
|
||||
topic = r.buildTopicWithoutSubnet(topic, d)
|
||||
buf := new(bytes.Buffer)
|
||||
_, err = p.Encoding().EncodeGossip(buf, ðpb.AttesterSlashing{
|
||||
Attestation_1: util.HydrateIndexedAttestation(ðpb.IndexedAttestation{
|
||||
|
||||
@@ -95,7 +95,7 @@ func TestValidateBeaconBlockPubSub_InvalidSignature(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -165,7 +165,7 @@ func TestValidateBeaconBlockPubSub_InvalidSignature_MarksBlockAsBad(t *testing.T
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -212,7 +212,7 @@ func TestValidateBeaconBlockPubSub_BlockAlreadyPresentInDB(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -275,7 +275,7 @@ func TestValidateBeaconBlockPubSub_CanRecoverStateSummary(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -341,7 +341,7 @@ func TestValidateBeaconBlockPubSub_IsInCache(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -407,7 +407,7 @@ func TestValidateBeaconBlockPubSub_ValidProposerSignature(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -476,7 +476,7 @@ func TestValidateBeaconBlockPubSub_WithLookahead(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -544,7 +544,7 @@ func TestValidateBeaconBlockPubSub_AdvanceEpochsForState(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -653,7 +653,7 @@ func TestValidateBeaconBlockPubSub_IgnoreAndQueueBlocksFromNearFuture(t *testing
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -704,7 +704,7 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromFuture(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -755,7 +755,7 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromThePast(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -834,7 +834,7 @@ func TestValidateBeaconBlockPubSub_SeenProposerSlot(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msgClone)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -975,7 +975,7 @@ func TestValidateBeaconBlockPubSub_ParentNotFinalizedDescendant(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -1041,7 +1041,7 @@ func TestValidateBeaconBlockPubSub_InvalidParentBlock(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -1135,7 +1135,7 @@ func TestValidateBeaconBlockPubSub_InsertValidPendingBlock(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -1220,7 +1220,7 @@ func TestValidateBeaconBlockPubSub_RejectBlocksFromBadParent(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, digest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, digest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -1323,7 +1323,7 @@ func TestValidateBeaconBlockPubSub_ValidExecutionPayload(t *testing.T) {
|
||||
genesisValidatorsRoot := r.cfg.clock.GenesisValidatorsRoot()
|
||||
BellatrixDigest, err := signing.ComputeForkDigest(params.BeaconConfig().BellatrixForkVersion, genesisValidatorsRoot[:])
|
||||
require.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, BellatrixDigest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, BellatrixDigest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -1395,7 +1395,7 @@ func TestValidateBeaconBlockPubSub_InvalidPayloadTimestamp(t *testing.T) {
|
||||
genesisValidatorsRoot := r.cfg.clock.GenesisValidatorsRoot()
|
||||
BellatrixDigest, err := signing.ComputeForkDigest(params.BeaconConfig().BellatrixForkVersion, genesisValidatorsRoot[:])
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, BellatrixDigest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, BellatrixDigest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -1558,7 +1558,7 @@ func Test_validateBeaconBlockProcessingWhenParentIsOptimistic(t *testing.T) {
|
||||
genesisValidatorsRoot := r.cfg.clock.GenesisValidatorsRoot()
|
||||
BellatrixDigest, err := signing.ComputeForkDigest(params.BeaconConfig().BellatrixForkVersion, genesisValidatorsRoot[:])
|
||||
require.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, BellatrixDigest)
|
||||
topic = r.buildTopicWithoutSubnet(topic, BellatrixDigest)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
|
||||
@@ -70,7 +70,7 @@ func TestValidateBlob_InvalidMessageType(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestToTopic(topic, digest)
|
||||
topic = s.buildTopicWithoutSubnet(topic, digest)
|
||||
result, err := s.validateBlob(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -129,7 +129,7 @@ func TestValidateBlob_AlreadySeenInCache(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(b)]
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestAndIndexToTopic(topic, digest, 0)
|
||||
topic = s.buildTopicWithSubnet(topic, digest, 0)
|
||||
|
||||
s.setSeenBlobIndex(sc.Slot(), sc.SignedBlockHeader.Header.ProposerIndex, 0)
|
||||
result, err := s.validateBlob(ctx, "", &pubsub.Message{
|
||||
@@ -159,7 +159,7 @@ func TestValidateBlob_InvalidTopicIndex(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestAndIndexToTopic(topic, digest, 1)
|
||||
topic = s.buildTopicWithSubnet(topic, digest, 1)
|
||||
result, err := s.validateBlob(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -274,7 +274,7 @@ func TestValidateBlob_ErrorPathsWithMock(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestAndIndexToTopic(topic, digest, 0)
|
||||
topic = s.buildTopicWithSubnet(topic, digest, 0)
|
||||
result, err := s.validateBlob(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{
|
||||
Data: buf.Bytes(),
|
||||
|
||||
@@ -85,7 +85,7 @@ func TestValidateDataColumn(t *testing.T) {
|
||||
digest, err := service.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
|
||||
topic = service.addDigestToTopic(topic, digest)
|
||||
topic = service.buildTopicWithoutSubnet(topic, digest)
|
||||
|
||||
message := &pubsub.Message{Message: &pb.Message{Data: buf.Bytes(), Topic: &topic}}
|
||||
|
||||
|
||||
@@ -133,7 +133,7 @@ func TestValidateLightClientOptimisticUpdate(t *testing.T) {
|
||||
topic := p2p.LightClientOptimisticUpdateTopicFormat
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestToTopic(topic, digest)
|
||||
topic = s.buildTopicWithoutSubnet(topic, digest)
|
||||
|
||||
r, err := s.validateLightClientOptimisticUpdate(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{
|
||||
@@ -259,7 +259,7 @@ func TestValidateLightClientFinalityUpdate(t *testing.T) {
|
||||
topic := p2p.LightClientFinalityUpdateTopicFormat
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestToTopic(topic, digest)
|
||||
topic = s.buildTopicWithoutSubnet(topic, digest)
|
||||
|
||||
r, err := s.validateLightClientFinalityUpdate(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{
|
||||
|
||||
@@ -132,7 +132,7 @@ func TestValidateProposerSlashing_ValidSlashing(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
|
||||
d, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, d)
|
||||
topic = r.buildTopicWithoutSubnet(topic, d)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
@@ -175,7 +175,7 @@ func TestValidateProposerSlashing_ValidOldSlashing(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(slashing)]
|
||||
d, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, d)
|
||||
topic = r.buildTopicWithoutSubnet(topic, d)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
|
||||
@@ -104,7 +104,7 @@ func TestValidateVoluntaryExit_ValidExit(t *testing.T) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(exit)]
|
||||
d, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
topic = r.addDigestToTopic(topic, d)
|
||||
topic = r.buildTopicWithoutSubnet(topic, d)
|
||||
m := &pubsub.Message{
|
||||
Message: &pubsubpb.Message{
|
||||
Data: buf.Bytes(),
|
||||
|
||||
@@ -0,0 +1,2 @@
|
||||
### Changed
|
||||
- Use static subscription for blob sidecar topics instead of dynamic subnet management.
|
||||
Reference in New Issue
Block a user