Compare commits

...

2 Commits

Author SHA1 Message Date
Manu NALEPA
9acc2c6601 Refactor dynamic subnet subscriptions. 2024-12-01 08:15:56 +01:00
Manu NALEPA
ecc8aa5829 Refactor static subnet subscriptions.
Before this commit, we had 2 functions:
- `subscribeStaticWithSubnets`, and
- `subscribeStaticWithSyncSubnets`.

These two functions were very similar.
This commit merge these two functions into one.
2024-11-29 19:49:57 +01:00
3 changed files with 246 additions and 306 deletions

View File

@@ -69,6 +69,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Modified `ListAttestationsV2`, `GetAttesterSlashingsV2` and `GetAggregateAttestationV2` endpoints to use slot to determine fork version. - Modified `ListAttestationsV2`, `GetAttesterSlashingsV2` and `GetAggregateAttestationV2` endpoints to use slot to determine fork version.
- Improvements to HTTP response handling. [pr](https://github.com/prysmaticlabs/prysm/pull/14673) - Improvements to HTTP response handling. [pr](https://github.com/prysmaticlabs/prysm/pull/14673)
- Updated `Blobs` endpoint to return additional metadata fields. - Updated `Blobs` endpoint to return additional metadata fields.
- Refactor static and dynamic subnets subscription.
### Deprecated ### Deprecated

View File

@@ -55,52 +55,32 @@ func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Messag
// Register PubSub subscribers // Register PubSub subscribers
func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.subscribe( s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, digest)
p2p.BlockSubnetTopicFormat, s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, digest)
s.validateBeaconBlockPubSub, s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, digest)
s.beaconBlockSubscriber, s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, digest)
digest, s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, digest)
)
s.subscribe(
p2p.AggregateAndProofSubnetTopicFormat,
s.validateAggregateAndProof,
s.beaconAggregateProofSubscriber,
digest,
)
s.subscribe(
p2p.ExitSubnetTopicFormat,
s.validateVoluntaryExit,
s.voluntaryExitSubscriber,
digest,
)
s.subscribe(
p2p.ProposerSlashingSubnetTopicFormat,
s.validateProposerSlashing,
s.proposerSlashingSubscriber,
digest,
)
s.subscribe(
p2p.AttesterSlashingSubnetTopicFormat,
s.validateAttesterSlashing,
s.attesterSlashingSubscriber,
digest,
)
if flags.Get().SubscribeToAllSubnets { if flags.Get().SubscribeToAllSubnets {
s.subscribeStaticWithSubnets( s.subscribeStaticWithSubnets(
p2p.AttestationSubnetTopicFormat, p2p.AttestationSubnetTopicFormat,
s.validateCommitteeIndexBeaconAttestation, /* validator */ s.validateCommitteeIndexBeaconAttestation,
s.committeeIndexBeaconAttestationSubscriber, /* message handler */ s.committeeIndexBeaconAttestationSubscriber,
digest, digest,
"Attestation",
params.BeaconConfig().AttestationSubnetCount, params.BeaconConfig().AttestationSubnetCount,
) )
} else { } else {
s.subscribeDynamicWithSubnets( s.subscribeDynamicWithSubnets(
p2p.AttestationSubnetTopicFormat, p2p.AttestationSubnetTopicFormat,
s.validateCommitteeIndexBeaconAttestation, /* validator */ s.validateCommitteeIndexBeaconAttestation,
s.committeeIndexBeaconAttestationSubscriber, /* message handler */ s.committeeIndexBeaconAttestationSubscriber,
digest, digest,
params.BeaconConfig().MaxCommitteesPerSlot,
s.subscribeToAttestationsSubnetsDynamic,
) )
} }
// Altair Fork Version // Altair Fork Version
if epoch >= params.BeaconConfig().AltairForkEpoch { if epoch >= params.BeaconConfig().AltairForkEpoch {
s.subscribe( s.subscribe(
@@ -109,19 +89,24 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.syncContributionAndProofSubscriber, s.syncContributionAndProofSubscriber,
digest, digest,
) )
if flags.Get().SubscribeToAllSubnets { if flags.Get().SubscribeToAllSubnets {
s.subscribeStaticWithSyncSubnets( s.subscribeStaticWithSubnets(
p2p.SyncCommitteeSubnetTopicFormat, p2p.SyncCommitteeSubnetTopicFormat,
s.validateSyncCommitteeMessage, /* validator */ s.validateSyncCommitteeMessage,
s.syncCommitteeMessageSubscriber, /* message handler */ s.syncCommitteeMessageSubscriber,
digest, digest,
"Sync committee",
params.BeaconConfig().SyncCommitteeSubnetCount,
) )
} else { } else {
s.subscribeDynamicWithSyncSubnets( s.subscribeDynamicWithSubnets(
p2p.SyncCommitteeSubnetTopicFormat, p2p.SyncCommitteeSubnetTopicFormat,
s.validateSyncCommitteeMessage, /* validator */ s.validateSyncCommitteeMessage,
s.syncCommitteeMessageSubscriber, /* message handler */ s.syncCommitteeMessageSubscriber,
digest, digest,
params.BeaconConfig().SyncCommitteeSubnetCount,
s.subscribeToSyncSubnetsDynamic,
) )
} }
} }
@@ -143,6 +128,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.validateBlob, /* validator */ s.validateBlob, /* validator */
s.blobSubscriber, /* message handler */ s.blobSubscriber, /* message handler */
digest, digest,
"Blob sidecar",
params.BeaconConfig().BlobsidecarSubnetCount, params.BeaconConfig().BlobsidecarSubnetCount,
) )
} }
@@ -326,130 +312,283 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is // subscribe to a static subnet with the given topic and index. A given validator and subscription handler is
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding. // used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte, subnetCount uint64) { func (s *Service) subscribeStaticWithSubnets(
genRoot := s.cfg.clock.GenesisValidatorsRoot() topic string,
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:]) validator wrappedVal,
handle subHandler,
digest [4]byte,
humanDescription string,
subnetCount uint64,
) {
// Retrieve the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
// Retrieve the epoch of the fork corresponding to the digest.
_, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
if err != nil { if err != nil {
// Impossible condition as it would mean digest does not exist. // Impossible condition as it would mean digest does not exist.
panic(err) panic(err)
} }
base := p2p.GossipTopicMappings(topic, e)
// Retrieve the base protobuf message.
base := p2p.GossipTopicMappings(topic, epoch)
if base == nil { if base == nil {
// Impossible condition as it would mean topic does not exist. // Impossible condition as it would mean topic does not exist.
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
} }
for i := uint64(0); i < subnetCount; i++ {
s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle) // Define a ticker ticking every slot.
genesisTime := s.cfg.clock.GenesisTime()
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot)
minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet
// Subscribe to all subnets.
for i := range subnetCount {
fullTopic := s.addDigestAndIndexToTopic(topic, digest, i)
s.subscribeWithBase(fullTopic, validator, handle)
} }
genesis := s.cfg.clock.GenesisTime()
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
go func() { go func() {
for { for {
select { select {
case <-s.ctx.Done():
ticker.Done()
return
case <-ticker.C(): case <-ticker.C():
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
continue continue
} }
valid, err := isDigestValid(digest, genesis, genRoot)
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
if err != nil { if err != nil {
log.Error(err) log.Error(err)
continue continue
} }
if !valid { if !valid {
log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest) message := fmt.Sprintf("%s subnets with this digest are no longer valid, unsubscribing from all of them.", humanDescription)
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warning(message)
// Unsubscribes from all our current subnets. // Unsubscribes from all our current subnets.
for i := uint64(0); i < subnetCount; i++ { for i := range subnetCount {
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix() fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix()
s.unSubscribeFromTopic(fullTopic) s.unSubscribeFromTopic(fullTopic)
} }
ticker.Done() ticker.Done()
return return
} }
// Check every slot that there are enough peers
for i := uint64(0); i < subnetCount; i++ { // Check that all subnets have enough peers.
if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) { for i := range subnetCount {
_, err := s.cfg.p2p.FindPeersWithSubnet( fullTopic := s.addDigestAndIndexToTopic(topic, digest, i)
s.ctx,
s.addDigestAndIndexToTopic(topic, digest, i), if !s.enoughPeersAreConnected(fullTopic) {
i, if _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, fullTopic, i, minimumPeersPerSubnet); err != nil {
flags.Get().MinimumPeersPerSubnet,
)
if err != nil {
log.WithError(err).Debug("Could not search for peers") log.WithError(err).Debug("Could not search for peers")
return return
} }
} }
} }
case <-s.ctx.Done():
ticker.Done()
return
} }
} }
}() }()
} }
// subscribe to a dynamically changing list of subnets. This method expects a fmt compatible type specificSubscribeFunc func(
// string for the topic name and the list of subnets for subscribed topics that should be topic string,
// maintained. digest [4]byte,
genesisValidatorsRoot [fieldparams.RootLength]byte,
genesisTime time.Time,
subscriptions map[uint64]*pubsub.Subscription,
currentSlot primitives.Slot,
validate wrappedVal,
handle subHandler,
) bool
// subscribeDynamicWithSubnets subscribes to a dynamically changing list of subnets.
func (s *Service) subscribeDynamicWithSubnets( func (s *Service) subscribeDynamicWithSubnets(
topicFormat string, topicFormat string,
validate wrappedVal, validate wrappedVal,
handle subHandler, handle subHandler,
digest [4]byte, digest [4]byte,
subnetCount uint64,
specificSubscribe specificSubscribeFunc,
) { ) {
genRoot := s.cfg.clock.GenesisValidatorsRoot() // Initialize the subscriptions map.
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:]) subscriptions := make(map[uint64]*pubsub.Subscription, subnetCount)
// Retrieve the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
// Retrieve the epoch of the fork corresponding to the digest.
_, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
if err != nil { if err != nil {
// Impossible condition as it would mean digest does not exist.
panic(err) panic(err)
} }
base := p2p.GossipTopicMappings(topicFormat, e)
// Retrieve the base protobuf message.
base := p2p.GossipTopicMappings(topicFormat, epoch)
if base == nil { if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat)) panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
} }
subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot)
genesis := s.cfg.clock.GenesisTime() // Retrieve the genesis time.
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot) genesisTime := s.cfg.clock.GenesisTime()
// Define a ticker ticking every slot.
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot)
// Retrieve the current slot.
currentSlot := s.cfg.clock.CurrentSlot()
go func() { go func() {
// Subscribe to the sync subnets.
specificSubscribe(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
for { for {
select { select {
case <-s.ctx.Done():
ticker.Done()
return
case currentSlot := <-ticker.C(): case currentSlot := <-ticker.C():
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() { isDigestValid := specificSubscribe(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
continue
} // Stop the ticker if the digest is not valid. Likely to happen after a hard fork.
valid, err := isDigestValid(digest, genesis, genRoot) if !isDigestValid {
if err != nil {
log.Error(err)
continue
}
if !valid {
log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
// Unsubscribes from all our current subnets.
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
ticker.Done() ticker.Done()
return return
} }
wantedSubs := s.retrievePersistentSubs(currentSlot)
s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)
for _, idx := range wantedSubs { case <-s.ctx.Done():
s.subscribeAggregatorSubnet(subscriptions, idx, digest, validate, handle) ticker.Done()
} return
// find desired subs for attesters
attesterSubs := s.attesterSubnetIndices(currentSlot)
for _, idx := range attesterSubs {
s.lookupAttesterSubnets(digest, idx)
}
} }
} }
}() }()
} }
func (s *Service) subscribeToAttestationsSubnetsDynamic(
topicFormat string,
digest [4]byte,
genesisValidatorsRoot [fieldparams.RootLength]byte,
genesisTime time.Time,
subscriptions map[uint64]*pubsub.Subscription,
currentSlot primitives.Slot,
validate wrappedVal,
handle subHandler,
) bool {
// Do not subscribe if not synced.
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
return true
}
// Do not subscribe is the digest is not valid.
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
if err != nil {
log.Error(err)
return true
}
// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
if !valid {
const message = "Attestation subnets with this digest are no longer valid, unsubscribing from all of them."
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn(message)
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
return false
}
wantedSubnetsIndex := s.retrievePersistentSubs(currentSlot)
s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest)
for _, index := range wantedSubnetsIndex {
s.subscribeAggregatorSubnet(subscriptions, index, digest, validate, handle)
}
// Find desired subnets for attesters.
attesterSubnets := s.attesterSubnetIndices(currentSlot)
for _, index := range attesterSubnets {
s.lookupAttesterSubnets(digest, index)
}
return true
}
func (s *Service) subscribeToSyncSubnetsDynamic(
topicFormat string,
digest [4]byte,
genesisValidatorsRoot [fieldparams.RootLength]byte,
genesisTime time.Time,
subscriptions map[uint64]*pubsub.Subscription,
currentSlot primitives.Slot,
validate wrappedVal,
handle subHandler,
) bool {
// Get sync subnets topic.
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.SyncCommitteeMessage{})]
// Do not subscribe if not synced.
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
return true
}
// Do not subscribe is the digest is not valid.
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
if err != nil {
log.Error(err)
return true
}
// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
if !valid {
const message = "Sync subnets with this digest are no longer valid, unsubscribing from all of them."
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn(message)
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
return false
}
// Get the current epoch.
currentEpoch := slots.ToEpoch(currentSlot)
// Retrieve the subnets we want to subscribe to.
wantedSubnetsIndex := s.retrieveActiveSyncSubnets(currentEpoch)
// Remove subscriptions that are no longer wanted.
s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest)
// Subscribe to wanted subnets.
for _, subnetIndex := range wantedSubnetsIndex {
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
// Check if subscription exists.
if _, exists := subscriptions[subnetIndex]; exists {
continue
}
// We need to subscribe to the subnet.
subscription := s.subscribeWithBase(subnetTopic, validate, handle)
subscriptions[subnetIndex] = subscription
}
// Find new peers for wanted subnets if needed.
for _, subnetIndex := range wantedSubnetsIndex {
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
// Check if we have enough peers in the subnet. Skip if we do.
if s.enoughPeersAreConnected(subnetTopic) {
continue
}
// Not enough peers in the subnet, we need to search for more.
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}
return true
}
// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are // reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are
// not in the list of wanted subnets. // not in the list of wanted subnets.
// TODO: Rename this functions as it does not only revalidate subscriptions. // TODO: Rename this functions as it does not only revalidate subscriptions.
@@ -501,206 +640,6 @@ func (s *Service) subscribeAggregatorSubnet(
} }
} }
// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte) {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
if err != nil {
panic(err)
}
base := p2p.GossipTopicMappings(topic, e)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
}
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle)
}
genesis := s.cfg.clock.GenesisTime()
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
go func() {
for {
select {
case <-s.ctx.Done():
ticker.Done()
return
case <-ticker.C():
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
continue
}
valid, err := isDigestValid(digest, genesis, genRoot)
if err != nil {
log.Error(err)
continue
}
if !valid {
log.Warnf("Sync subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
// Unsubscribes from all our current subnets.
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix()
s.unSubscribeFromTopic(fullTopic)
}
ticker.Done()
return
}
// Check every slot that there are enough peers
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) {
_, err := s.cfg.p2p.FindPeersWithSubnet(
s.ctx,
s.addDigestAndIndexToTopic(topic, digest, i),
i,
flags.Get().MinimumPeersPerSubnet,
)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
return
}
}
}
}
}
}()
}
// subscribeToSyncSubnets subscribes to needed sync subnets, unsubscribe from unneeded ones and search for more peers if needed.
// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise.
func (s *Service) subscribeToSyncSubnets(
topicFormat string,
digest [4]byte,
genesisValidatorsRoot [fieldparams.RootLength]byte,
genesisTime time.Time,
subscriptions map[uint64]*pubsub.Subscription,
currentSlot primitives.Slot,
validate wrappedVal,
handle subHandler,
) bool {
// Get sync subnets topic.
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.SyncCommitteeMessage{})]
// Do not subscribe if not synced.
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
return true
}
// Do not subscribe is the digest is not valid.
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
if err != nil {
log.Error(err)
return true
}
// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
if !valid {
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn("Sync subnets with this digest are no longer valid, unsubscribing from all of them.")
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
return false
}
// Get the current epoch.
currentEpoch := slots.ToEpoch(currentSlot)
// Retrieve the subnets we want to subscribe to.
wantedSubnetsIndex := s.retrieveActiveSyncSubnets(currentEpoch)
// Remove subscriptions that are no longer wanted.
s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest)
// Subscribe to wanted subnets.
for _, subnetIndex := range wantedSubnetsIndex {
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
// Check if subscription exists.
if _, exists := subscriptions[subnetIndex]; exists {
continue
}
// We need to subscribe to the subnet.
subscription := s.subscribeWithBase(subnetTopic, validate, handle)
subscriptions[subnetIndex] = subscription
}
// Find new peers for wanted subnets if needed.
for _, subnetIndex := range wantedSubnetsIndex {
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
// Check if we have enough peers in the subnet. Skip if we do.
if s.enoughPeersAreConnected(subnetTopic) {
continue
}
// Not enough peers in the subnet, we need to search for more.
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}
return true
}
// subscribeDynamicWithSyncSubnets subscribes to a dynamically changing list of subnets.
func (s *Service) subscribeDynamicWithSyncSubnets(
topicFormat string,
validate wrappedVal,
handle subHandler,
digest [4]byte,
) {
// Retrieve the number of committee subnets we need to subscribe to.
syncCommiteeSubnetsCount := params.BeaconConfig().SyncCommitteeSubnetCount
// Initialize the subscriptions map.
subscriptions := make(map[uint64]*pubsub.Subscription, syncCommiteeSubnetsCount)
// Retrieve the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
// Retrieve the epoch of the fork corresponding to the digest.
_, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
if err != nil {
panic(err)
}
// Retrieve the base protobuf message.
base := p2p.GossipTopicMappings(topicFormat, epoch)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
}
// Retrieve the genesis time.
genesisTime := s.cfg.clock.GenesisTime()
// Define a ticker ticking every slot.
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot)
// Retrieve the current slot.
currentSlot := s.cfg.clock.CurrentSlot()
go func() {
// Subscribe to the sync subnets.
s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
for {
select {
case currentSlot := <-ticker.C():
isDigestValid := s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
// Stop the ticker if the digest is not valid. Likely to happen after a hard fork.
if !isDigestValid {
ticker.Done()
return
}
case <-s.ctx.Done():
ticker.Done()
return
}
}
}()
}
// lookup peers for attester specific subnets. // lookup peers for attester specific subnets.
func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) { func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})] topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})]

View File

@@ -335,7 +335,7 @@ func TestStaticSubnets(t *testing.T) {
r.subscribeStaticWithSubnets(defaultTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error { r.subscribeStaticWithSubnets(defaultTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
// no-op // no-op
return nil return nil
}, d, params.BeaconConfig().AttestationSubnetCount) }, d, "Attestation", params.BeaconConfig().AttestationSubnetCount)
topics := r.cfg.p2p.PubSub().GetTopics() topics := r.cfg.p2p.PubSub().GetTopics()
if uint64(len(topics)) != params.BeaconConfig().AttestationSubnetCount { if uint64(len(topics)) != params.BeaconConfig().AttestationSubnetCount {
t.Errorf("Wanted the number of subnet topics registered to be %d but got %d", params.BeaconConfig().AttestationSubnetCount, len(topics)) t.Errorf("Wanted the number of subnet topics registered to be %d but got %d", params.BeaconConfig().AttestationSubnetCount, len(topics))
@@ -496,6 +496,7 @@ func TestFilterSubnetPeers(t *testing.T) {
chainStarted: abool.New(), chainStarted: abool.New(),
subHandler: newSubTopicHandler(), subHandler: newSubTopicHandler(),
} }
// Empty cache at the end of the test. // Empty cache at the end of the test.
defer cache.SubnetIDs.EmptyAllCaches() defer cache.SubnetIDs.EmptyAllCaches()
digest, err := r.currentForkDigest() digest, err := r.currentForkDigest()
@@ -511,8 +512,7 @@ func TestFilterSubnetPeers(t *testing.T) {
p2 := createPeer(t, subnet10, subnet20) p2 := createPeer(t, subnet10, subnet20)
p3 := createPeer(t) p3 := createPeer(t)
// Connect to all // Connect to all peers.
// peers.
p.Connect(p1) p.Connect(p1)
p.Connect(p2) p.Connect(p2)
p.Connect(p3) p.Connect(p3)
@@ -565,7 +565,7 @@ func TestSubscribeWithSyncSubnets_StaticOK(t *testing.T) {
defer cache.SyncSubnetIDs.EmptyAllCaches() defer cache.SyncSubnetIDs.EmptyAllCaches()
digest, err := r.currentForkDigest() digest, err := r.currentForkDigest()
assert.NoError(t, err) assert.NoError(t, err)
r.subscribeStaticWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest) r.subscribeStaticWithSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, "Sync committee", params.BeaconConfig().SyncCommitteeSubnetCount)
assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics())) assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics()))
cancel() cancel()
} }
@@ -600,7 +600,7 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second) cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second)
digest, err := r.currentForkDigest() digest, err := r.currentForkDigest()
assert.NoError(t, err) assert.NoError(t, err)
r.subscribeDynamicWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest) r.subscribeDynamicWithSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, params.BeaconConfig().SyncCommitteeSubnetCount, r.subscribeToSyncSubnetsDynamic)
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics())) assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
topicMap := map[string]bool{} topicMap := map[string]bool{}
@@ -645,7 +645,7 @@ func TestSubscribeWithSyncSubnets_StaticSwitchFork(t *testing.T) {
genRoot := r.cfg.clock.GenesisValidatorsRoot() genRoot := r.cfg.clock.GenesisValidatorsRoot()
digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:]) digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:])
assert.NoError(t, err) assert.NoError(t, err)
r.subscribeStaticWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest) r.subscribeStaticWithSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, "Sync committee", params.BeaconConfig().SyncCommitteeSubnetCount)
assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics())) assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics()))
// Expect that all old topics will be unsubscribed. // Expect that all old topics will be unsubscribed.
@@ -689,7 +689,7 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) {
digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:]) digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:])
assert.NoError(t, err) assert.NoError(t, err)
r.subscribeDynamicWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest) r.subscribeDynamicWithSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, params.BeaconConfig().SyncCommitteeSubnetCount, r.subscribeToSyncSubnetsDynamic)
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics())) assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
topicMap := map[string]bool{} topicMap := map[string]bool{}