Compare commits

...

2 Commits

Author SHA1 Message Date
Manu NALEPA
9acc2c6601 Refactor dynamic subnet subscriptions. 2024-12-01 08:15:56 +01:00
Manu NALEPA
ecc8aa5829 Refactor static subnet subscriptions.
Before this commit, we had 2 functions:
- `subscribeStaticWithSubnets`, and
- `subscribeStaticWithSyncSubnets`.

These two functions were very similar.
This commit merge these two functions into one.
2024-11-29 19:49:57 +01:00
3 changed files with 246 additions and 306 deletions

View File

@@ -69,6 +69,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Modified `ListAttestationsV2`, `GetAttesterSlashingsV2` and `GetAggregateAttestationV2` endpoints to use slot to determine fork version.
- Improvements to HTTP response handling. [pr](https://github.com/prysmaticlabs/prysm/pull/14673)
- Updated `Blobs` endpoint to return additional metadata fields.
- Refactor static and dynamic subnets subscription.
### Deprecated

View File

@@ -55,52 +55,32 @@ func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Messag
// Register PubSub subscribers
func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.subscribe(
p2p.BlockSubnetTopicFormat,
s.validateBeaconBlockPubSub,
s.beaconBlockSubscriber,
digest,
)
s.subscribe(
p2p.AggregateAndProofSubnetTopicFormat,
s.validateAggregateAndProof,
s.beaconAggregateProofSubscriber,
digest,
)
s.subscribe(
p2p.ExitSubnetTopicFormat,
s.validateVoluntaryExit,
s.voluntaryExitSubscriber,
digest,
)
s.subscribe(
p2p.ProposerSlashingSubnetTopicFormat,
s.validateProposerSlashing,
s.proposerSlashingSubscriber,
digest,
)
s.subscribe(
p2p.AttesterSlashingSubnetTopicFormat,
s.validateAttesterSlashing,
s.attesterSlashingSubscriber,
digest,
)
s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, digest)
s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, digest)
s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, digest)
s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, digest)
s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, digest)
if flags.Get().SubscribeToAllSubnets {
s.subscribeStaticWithSubnets(
p2p.AttestationSubnetTopicFormat,
s.validateCommitteeIndexBeaconAttestation, /* validator */
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
s.validateCommitteeIndexBeaconAttestation,
s.committeeIndexBeaconAttestationSubscriber,
digest,
"Attestation",
params.BeaconConfig().AttestationSubnetCount,
)
} else {
s.subscribeDynamicWithSubnets(
p2p.AttestationSubnetTopicFormat,
s.validateCommitteeIndexBeaconAttestation, /* validator */
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
s.validateCommitteeIndexBeaconAttestation,
s.committeeIndexBeaconAttestationSubscriber,
digest,
params.BeaconConfig().MaxCommitteesPerSlot,
s.subscribeToAttestationsSubnetsDynamic,
)
}
// Altair Fork Version
if epoch >= params.BeaconConfig().AltairForkEpoch {
s.subscribe(
@@ -109,19 +89,24 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.syncContributionAndProofSubscriber,
digest,
)
if flags.Get().SubscribeToAllSubnets {
s.subscribeStaticWithSyncSubnets(
s.subscribeStaticWithSubnets(
p2p.SyncCommitteeSubnetTopicFormat,
s.validateSyncCommitteeMessage, /* validator */
s.syncCommitteeMessageSubscriber, /* message handler */
s.validateSyncCommitteeMessage,
s.syncCommitteeMessageSubscriber,
digest,
"Sync committee",
params.BeaconConfig().SyncCommitteeSubnetCount,
)
} else {
s.subscribeDynamicWithSyncSubnets(
s.subscribeDynamicWithSubnets(
p2p.SyncCommitteeSubnetTopicFormat,
s.validateSyncCommitteeMessage, /* validator */
s.syncCommitteeMessageSubscriber, /* message handler */
s.validateSyncCommitteeMessage,
s.syncCommitteeMessageSubscriber,
digest,
params.BeaconConfig().SyncCommitteeSubnetCount,
s.subscribeToSyncSubnetsDynamic,
)
}
}
@@ -143,6 +128,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.validateBlob, /* validator */
s.blobSubscriber, /* message handler */
digest,
"Blob sidecar",
params.BeaconConfig().BlobsidecarSubnetCount,
)
}
@@ -326,130 +312,283 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte, subnetCount uint64) {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
func (s *Service) subscribeStaticWithSubnets(
topic string,
validator wrappedVal,
handle subHandler,
digest [4]byte,
humanDescription string,
subnetCount uint64,
) {
// Retrieve the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
// Retrieve the epoch of the fork corresponding to the digest.
_, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
if err != nil {
// Impossible condition as it would mean digest does not exist.
panic(err)
}
base := p2p.GossipTopicMappings(topic, e)
// Retrieve the base protobuf message.
base := p2p.GossipTopicMappings(topic, epoch)
if base == nil {
// Impossible condition as it would mean topic does not exist.
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
}
for i := uint64(0); i < subnetCount; i++ {
s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle)
// Define a ticker ticking every slot.
genesisTime := s.cfg.clock.GenesisTime()
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot)
minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet
// Subscribe to all subnets.
for i := range subnetCount {
fullTopic := s.addDigestAndIndexToTopic(topic, digest, i)
s.subscribeWithBase(fullTopic, validator, handle)
}
genesis := s.cfg.clock.GenesisTime()
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
go func() {
for {
select {
case <-s.ctx.Done():
ticker.Done()
return
case <-ticker.C():
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
continue
}
valid, err := isDigestValid(digest, genesis, genRoot)
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
if err != nil {
log.Error(err)
continue
}
if !valid {
log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
message := fmt.Sprintf("%s subnets with this digest are no longer valid, unsubscribing from all of them.", humanDescription)
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warning(message)
// Unsubscribes from all our current subnets.
for i := uint64(0); i < subnetCount; i++ {
for i := range subnetCount {
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix()
s.unSubscribeFromTopic(fullTopic)
}
ticker.Done()
return
}
// Check every slot that there are enough peers
for i := uint64(0); i < subnetCount; i++ {
if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) {
_, err := s.cfg.p2p.FindPeersWithSubnet(
s.ctx,
s.addDigestAndIndexToTopic(topic, digest, i),
i,
flags.Get().MinimumPeersPerSubnet,
)
if err != nil {
// Check that all subnets have enough peers.
for i := range subnetCount {
fullTopic := s.addDigestAndIndexToTopic(topic, digest, i)
if !s.enoughPeersAreConnected(fullTopic) {
if _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, fullTopic, i, minimumPeersPerSubnet); err != nil {
log.WithError(err).Debug("Could not search for peers")
return
}
}
}
case <-s.ctx.Done():
ticker.Done()
return
}
}
}()
}
// subscribe to a dynamically changing list of subnets. This method expects a fmt compatible
// string for the topic name and the list of subnets for subscribed topics that should be
// maintained.
type specificSubscribeFunc func(
topic string,
digest [4]byte,
genesisValidatorsRoot [fieldparams.RootLength]byte,
genesisTime time.Time,
subscriptions map[uint64]*pubsub.Subscription,
currentSlot primitives.Slot,
validate wrappedVal,
handle subHandler,
) bool
// subscribeDynamicWithSubnets subscribes to a dynamically changing list of subnets.
func (s *Service) subscribeDynamicWithSubnets(
topicFormat string,
validate wrappedVal,
handle subHandler,
digest [4]byte,
subnetCount uint64,
specificSubscribe specificSubscribeFunc,
) {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
// Initialize the subscriptions map.
subscriptions := make(map[uint64]*pubsub.Subscription, subnetCount)
// Retrieve the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
// Retrieve the epoch of the fork corresponding to the digest.
_, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
if err != nil {
// Impossible condition as it would mean digest does not exist.
panic(err)
}
base := p2p.GossipTopicMappings(topicFormat, e)
// Retrieve the base protobuf message.
base := p2p.GossipTopicMappings(topicFormat, epoch)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
}
subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot)
genesis := s.cfg.clock.GenesisTime()
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
// Retrieve the genesis time.
genesisTime := s.cfg.clock.GenesisTime()
// Define a ticker ticking every slot.
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot)
// Retrieve the current slot.
currentSlot := s.cfg.clock.CurrentSlot()
go func() {
// Subscribe to the sync subnets.
specificSubscribe(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
for {
select {
case <-s.ctx.Done():
ticker.Done()
return
case currentSlot := <-ticker.C():
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
continue
}
valid, err := isDigestValid(digest, genesis, genRoot)
if err != nil {
log.Error(err)
continue
}
if !valid {
log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
// Unsubscribes from all our current subnets.
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
isDigestValid := specificSubscribe(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
// Stop the ticker if the digest is not valid. Likely to happen after a hard fork.
if !isDigestValid {
ticker.Done()
return
}
wantedSubs := s.retrievePersistentSubs(currentSlot)
s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)
for _, idx := range wantedSubs {
s.subscribeAggregatorSubnet(subscriptions, idx, digest, validate, handle)
}
// find desired subs for attesters
attesterSubs := s.attesterSubnetIndices(currentSlot)
for _, idx := range attesterSubs {
s.lookupAttesterSubnets(digest, idx)
}
case <-s.ctx.Done():
ticker.Done()
return
}
}
}()
}
func (s *Service) subscribeToAttestationsSubnetsDynamic(
topicFormat string,
digest [4]byte,
genesisValidatorsRoot [fieldparams.RootLength]byte,
genesisTime time.Time,
subscriptions map[uint64]*pubsub.Subscription,
currentSlot primitives.Slot,
validate wrappedVal,
handle subHandler,
) bool {
// Do not subscribe if not synced.
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
return true
}
// Do not subscribe is the digest is not valid.
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
if err != nil {
log.Error(err)
return true
}
// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
if !valid {
const message = "Attestation subnets with this digest are no longer valid, unsubscribing from all of them."
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn(message)
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
return false
}
wantedSubnetsIndex := s.retrievePersistentSubs(currentSlot)
s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest)
for _, index := range wantedSubnetsIndex {
s.subscribeAggregatorSubnet(subscriptions, index, digest, validate, handle)
}
// Find desired subnets for attesters.
attesterSubnets := s.attesterSubnetIndices(currentSlot)
for _, index := range attesterSubnets {
s.lookupAttesterSubnets(digest, index)
}
return true
}
func (s *Service) subscribeToSyncSubnetsDynamic(
topicFormat string,
digest [4]byte,
genesisValidatorsRoot [fieldparams.RootLength]byte,
genesisTime time.Time,
subscriptions map[uint64]*pubsub.Subscription,
currentSlot primitives.Slot,
validate wrappedVal,
handle subHandler,
) bool {
// Get sync subnets topic.
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.SyncCommitteeMessage{})]
// Do not subscribe if not synced.
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
return true
}
// Do not subscribe is the digest is not valid.
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
if err != nil {
log.Error(err)
return true
}
// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
if !valid {
const message = "Sync subnets with this digest are no longer valid, unsubscribing from all of them."
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn(message)
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
return false
}
// Get the current epoch.
currentEpoch := slots.ToEpoch(currentSlot)
// Retrieve the subnets we want to subscribe to.
wantedSubnetsIndex := s.retrieveActiveSyncSubnets(currentEpoch)
// Remove subscriptions that are no longer wanted.
s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest)
// Subscribe to wanted subnets.
for _, subnetIndex := range wantedSubnetsIndex {
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
// Check if subscription exists.
if _, exists := subscriptions[subnetIndex]; exists {
continue
}
// We need to subscribe to the subnet.
subscription := s.subscribeWithBase(subnetTopic, validate, handle)
subscriptions[subnetIndex] = subscription
}
// Find new peers for wanted subnets if needed.
for _, subnetIndex := range wantedSubnetsIndex {
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
// Check if we have enough peers in the subnet. Skip if we do.
if s.enoughPeersAreConnected(subnetTopic) {
continue
}
// Not enough peers in the subnet, we need to search for more.
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}
return true
}
// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are
// not in the list of wanted subnets.
// TODO: Rename this functions as it does not only revalidate subscriptions.
@@ -501,206 +640,6 @@ func (s *Service) subscribeAggregatorSubnet(
}
}
// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte) {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
if err != nil {
panic(err)
}
base := p2p.GossipTopicMappings(topic, e)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
}
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle)
}
genesis := s.cfg.clock.GenesisTime()
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
go func() {
for {
select {
case <-s.ctx.Done():
ticker.Done()
return
case <-ticker.C():
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
continue
}
valid, err := isDigestValid(digest, genesis, genRoot)
if err != nil {
log.Error(err)
continue
}
if !valid {
log.Warnf("Sync subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
// Unsubscribes from all our current subnets.
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix()
s.unSubscribeFromTopic(fullTopic)
}
ticker.Done()
return
}
// Check every slot that there are enough peers
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) {
_, err := s.cfg.p2p.FindPeersWithSubnet(
s.ctx,
s.addDigestAndIndexToTopic(topic, digest, i),
i,
flags.Get().MinimumPeersPerSubnet,
)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
return
}
}
}
}
}
}()
}
// subscribeToSyncSubnets subscribes to needed sync subnets, unsubscribe from unneeded ones and search for more peers if needed.
// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise.
func (s *Service) subscribeToSyncSubnets(
topicFormat string,
digest [4]byte,
genesisValidatorsRoot [fieldparams.RootLength]byte,
genesisTime time.Time,
subscriptions map[uint64]*pubsub.Subscription,
currentSlot primitives.Slot,
validate wrappedVal,
handle subHandler,
) bool {
// Get sync subnets topic.
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.SyncCommitteeMessage{})]
// Do not subscribe if not synced.
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
return true
}
// Do not subscribe is the digest is not valid.
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
if err != nil {
log.Error(err)
return true
}
// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
if !valid {
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn("Sync subnets with this digest are no longer valid, unsubscribing from all of them.")
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
return false
}
// Get the current epoch.
currentEpoch := slots.ToEpoch(currentSlot)
// Retrieve the subnets we want to subscribe to.
wantedSubnetsIndex := s.retrieveActiveSyncSubnets(currentEpoch)
// Remove subscriptions that are no longer wanted.
s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest)
// Subscribe to wanted subnets.
for _, subnetIndex := range wantedSubnetsIndex {
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
// Check if subscription exists.
if _, exists := subscriptions[subnetIndex]; exists {
continue
}
// We need to subscribe to the subnet.
subscription := s.subscribeWithBase(subnetTopic, validate, handle)
subscriptions[subnetIndex] = subscription
}
// Find new peers for wanted subnets if needed.
for _, subnetIndex := range wantedSubnetsIndex {
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
// Check if we have enough peers in the subnet. Skip if we do.
if s.enoughPeersAreConnected(subnetTopic) {
continue
}
// Not enough peers in the subnet, we need to search for more.
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}
return true
}
// subscribeDynamicWithSyncSubnets subscribes to a dynamically changing list of subnets.
func (s *Service) subscribeDynamicWithSyncSubnets(
topicFormat string,
validate wrappedVal,
handle subHandler,
digest [4]byte,
) {
// Retrieve the number of committee subnets we need to subscribe to.
syncCommiteeSubnetsCount := params.BeaconConfig().SyncCommitteeSubnetCount
// Initialize the subscriptions map.
subscriptions := make(map[uint64]*pubsub.Subscription, syncCommiteeSubnetsCount)
// Retrieve the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
// Retrieve the epoch of the fork corresponding to the digest.
_, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
if err != nil {
panic(err)
}
// Retrieve the base protobuf message.
base := p2p.GossipTopicMappings(topicFormat, epoch)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
}
// Retrieve the genesis time.
genesisTime := s.cfg.clock.GenesisTime()
// Define a ticker ticking every slot.
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot)
// Retrieve the current slot.
currentSlot := s.cfg.clock.CurrentSlot()
go func() {
// Subscribe to the sync subnets.
s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
for {
select {
case currentSlot := <-ticker.C():
isDigestValid := s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
// Stop the ticker if the digest is not valid. Likely to happen after a hard fork.
if !isDigestValid {
ticker.Done()
return
}
case <-s.ctx.Done():
ticker.Done()
return
}
}
}()
}
// lookup peers for attester specific subnets.
func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})]

View File

@@ -335,7 +335,7 @@ func TestStaticSubnets(t *testing.T) {
r.subscribeStaticWithSubnets(defaultTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
// no-op
return nil
}, d, params.BeaconConfig().AttestationSubnetCount)
}, d, "Attestation", params.BeaconConfig().AttestationSubnetCount)
topics := r.cfg.p2p.PubSub().GetTopics()
if uint64(len(topics)) != params.BeaconConfig().AttestationSubnetCount {
t.Errorf("Wanted the number of subnet topics registered to be %d but got %d", params.BeaconConfig().AttestationSubnetCount, len(topics))
@@ -496,6 +496,7 @@ func TestFilterSubnetPeers(t *testing.T) {
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
}
// Empty cache at the end of the test.
defer cache.SubnetIDs.EmptyAllCaches()
digest, err := r.currentForkDigest()
@@ -511,8 +512,7 @@ func TestFilterSubnetPeers(t *testing.T) {
p2 := createPeer(t, subnet10, subnet20)
p3 := createPeer(t)
// Connect to all
// peers.
// Connect to all peers.
p.Connect(p1)
p.Connect(p2)
p.Connect(p3)
@@ -565,7 +565,7 @@ func TestSubscribeWithSyncSubnets_StaticOK(t *testing.T) {
defer cache.SyncSubnetIDs.EmptyAllCaches()
digest, err := r.currentForkDigest()
assert.NoError(t, err)
r.subscribeStaticWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
r.subscribeStaticWithSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, "Sync committee", params.BeaconConfig().SyncCommitteeSubnetCount)
assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics()))
cancel()
}
@@ -600,7 +600,7 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second)
digest, err := r.currentForkDigest()
assert.NoError(t, err)
r.subscribeDynamicWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
r.subscribeDynamicWithSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, params.BeaconConfig().SyncCommitteeSubnetCount, r.subscribeToSyncSubnetsDynamic)
time.Sleep(2 * time.Second)
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
topicMap := map[string]bool{}
@@ -645,7 +645,7 @@ func TestSubscribeWithSyncSubnets_StaticSwitchFork(t *testing.T) {
genRoot := r.cfg.clock.GenesisValidatorsRoot()
digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:])
assert.NoError(t, err)
r.subscribeStaticWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
r.subscribeStaticWithSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, "Sync committee", params.BeaconConfig().SyncCommitteeSubnetCount)
assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics()))
// Expect that all old topics will be unsubscribed.
@@ -689,7 +689,7 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) {
digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:])
assert.NoError(t, err)
r.subscribeDynamicWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
r.subscribeDynamicWithSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, params.BeaconConfig().SyncCommitteeSubnetCount, r.subscribeToSyncSubnetsDynamic)
time.Sleep(2 * time.Second)
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
topicMap := map[string]bool{}