mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 07:58:22 -05:00
Refactor subnets subscriptions. (#14711)
* Refactor subnets subscriptions. * Remove totally static/dynamic distinction. * Unsubscribing from topic: Use INFO instead of log. ==> So we have something symmetrical with subscriptions. * Address Nishant's comment.
This commit is contained in:
@@ -16,6 +16,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
|
||||
### Changed
|
||||
|
||||
- Process light client finality updates only for new finalized epochs instead of doing it for every block.
|
||||
- Refactor subnets subscriptions.
|
||||
|
||||
### Deprecated
|
||||
|
||||
|
||||
@@ -53,6 +53,30 @@ func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Messag
|
||||
return pubsub.ValidationAccept, nil
|
||||
}
|
||||
|
||||
func sliceFromCount(count uint64) []uint64 {
|
||||
result := make([]uint64, 0, count)
|
||||
|
||||
for item := range count {
|
||||
result = append(result, item)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) []uint64 {
|
||||
if flags.Get().SubscribeToAllSubnets {
|
||||
return sliceFromCount(params.BeaconConfig().SyncCommitteeSubnetCount)
|
||||
}
|
||||
|
||||
// Get the current epoch.
|
||||
currentEpoch := slots.ToEpoch(currentSlot)
|
||||
|
||||
// Retrieve the subnets we want to subscribe to.
|
||||
subs := cache.SyncSubnetIDs.GetAllSubnets(currentEpoch)
|
||||
|
||||
return slice.SetUint64(subs)
|
||||
}
|
||||
|
||||
// Register PubSub subscribers
|
||||
func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
s.subscribe(
|
||||
@@ -85,22 +109,14 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
s.attesterSlashingSubscriber,
|
||||
digest,
|
||||
)
|
||||
if flags.Get().SubscribeToAllSubnets {
|
||||
s.subscribeStaticWithSubnets(
|
||||
p2p.AttestationSubnetTopicFormat,
|
||||
s.validateCommitteeIndexBeaconAttestation, /* validator */
|
||||
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
|
||||
digest,
|
||||
params.BeaconConfig().AttestationSubnetCount,
|
||||
)
|
||||
} else {
|
||||
s.subscribeDynamicWithSubnets(
|
||||
p2p.AttestationSubnetTopicFormat,
|
||||
s.validateCommitteeIndexBeaconAttestation, /* validator */
|
||||
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
|
||||
digest,
|
||||
)
|
||||
}
|
||||
s.subscribeWithParameters(
|
||||
p2p.AttestationSubnetTopicFormat,
|
||||
s.validateCommitteeIndexBeaconAttestation,
|
||||
s.committeeIndexBeaconAttestationSubscriber,
|
||||
digest,
|
||||
s.persistentAndAggregatorSubnetIndices,
|
||||
s.attesterSubnetIndices,
|
||||
)
|
||||
// Altair Fork Version
|
||||
if epoch >= params.BeaconConfig().AltairForkEpoch {
|
||||
s.subscribe(
|
||||
@@ -109,21 +125,14 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
s.syncContributionAndProofSubscriber,
|
||||
digest,
|
||||
)
|
||||
if flags.Get().SubscribeToAllSubnets {
|
||||
s.subscribeStaticWithSyncSubnets(
|
||||
p2p.SyncCommitteeSubnetTopicFormat,
|
||||
s.validateSyncCommitteeMessage, /* validator */
|
||||
s.syncCommitteeMessageSubscriber, /* message handler */
|
||||
digest,
|
||||
)
|
||||
} else {
|
||||
s.subscribeDynamicWithSyncSubnets(
|
||||
p2p.SyncCommitteeSubnetTopicFormat,
|
||||
s.validateSyncCommitteeMessage, /* validator */
|
||||
s.syncCommitteeMessageSubscriber, /* message handler */
|
||||
digest,
|
||||
)
|
||||
}
|
||||
s.subscribeWithParameters(
|
||||
p2p.SyncCommitteeSubnetTopicFormat,
|
||||
s.validateSyncCommitteeMessage,
|
||||
s.syncCommitteeMessageSubscriber,
|
||||
digest,
|
||||
s.activeSyncSubnetIndices,
|
||||
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
|
||||
)
|
||||
}
|
||||
|
||||
// New Gossip Topic in Capella
|
||||
@@ -138,12 +147,13 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
|
||||
// New Gossip Topic in Deneb
|
||||
if epoch >= params.BeaconConfig().DenebForkEpoch {
|
||||
s.subscribeStaticWithSubnets(
|
||||
s.subscribeWithParameters(
|
||||
p2p.BlobSubnetTopicFormat,
|
||||
s.validateBlob, /* validator */
|
||||
s.blobSubscriber, /* message handler */
|
||||
s.validateBlob,
|
||||
s.blobSubscriber,
|
||||
digest,
|
||||
params.BeaconConfig().BlobsidecarSubnetCount,
|
||||
func(primitives.Slot) []uint64 { return sliceFromCount(params.BeaconConfig().BlobsidecarSubnetCount) },
|
||||
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -324,132 +334,6 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
|
||||
}
|
||||
}
|
||||
|
||||
// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is
|
||||
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
|
||||
func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte, subnetCount uint64) {
|
||||
genRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
|
||||
if err != nil {
|
||||
// Impossible condition as it would mean digest does not exist.
|
||||
panic(err)
|
||||
}
|
||||
base := p2p.GossipTopicMappings(topic, e)
|
||||
if base == nil {
|
||||
// Impossible condition as it would mean topic does not exist.
|
||||
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
|
||||
}
|
||||
for i := uint64(0); i < subnetCount; i++ {
|
||||
s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle)
|
||||
}
|
||||
genesis := s.cfg.clock.GenesisTime()
|
||||
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
ticker.Done()
|
||||
return
|
||||
case <-ticker.C():
|
||||
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
|
||||
continue
|
||||
}
|
||||
valid, err := isDigestValid(digest, genesis, genRoot)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
if !valid {
|
||||
log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
|
||||
// Unsubscribes from all our current subnets.
|
||||
for i := uint64(0); i < subnetCount; i++ {
|
||||
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
s.unSubscribeFromTopic(fullTopic)
|
||||
}
|
||||
ticker.Done()
|
||||
return
|
||||
}
|
||||
// Check every slot that there are enough peers
|
||||
for i := uint64(0); i < subnetCount; i++ {
|
||||
if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) {
|
||||
_, err := s.cfg.p2p.FindPeersWithSubnet(
|
||||
s.ctx,
|
||||
s.addDigestAndIndexToTopic(topic, digest, i),
|
||||
i,
|
||||
flags.Get().MinimumPeersPerSubnet,
|
||||
)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not search for peers")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// subscribe to a dynamically changing list of subnets. This method expects a fmt compatible
|
||||
// string for the topic name and the list of subnets for subscribed topics that should be
|
||||
// maintained.
|
||||
func (s *Service) subscribeDynamicWithSubnets(
|
||||
topicFormat string,
|
||||
validate wrappedVal,
|
||||
handle subHandler,
|
||||
digest [4]byte,
|
||||
) {
|
||||
genRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
|
||||
if err != nil {
|
||||
// Impossible condition as it would mean digest does not exist.
|
||||
panic(err)
|
||||
}
|
||||
base := p2p.GossipTopicMappings(topicFormat, e)
|
||||
if base == nil {
|
||||
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
|
||||
}
|
||||
subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot)
|
||||
genesis := s.cfg.clock.GenesisTime()
|
||||
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
ticker.Done()
|
||||
return
|
||||
case currentSlot := <-ticker.C():
|
||||
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
|
||||
continue
|
||||
}
|
||||
valid, err := isDigestValid(digest, genesis, genRoot)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
if !valid {
|
||||
log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
|
||||
// Unsubscribes from all our current subnets.
|
||||
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
|
||||
ticker.Done()
|
||||
return
|
||||
}
|
||||
wantedSubs := s.retrievePersistentSubs(currentSlot)
|
||||
s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)
|
||||
|
||||
for _, idx := range wantedSubs {
|
||||
s.subscribeAggregatorSubnet(subscriptions, idx, digest, validate, handle)
|
||||
}
|
||||
// find desired subs for attesters
|
||||
attesterSubs := s.attesterSubnetIndices(currentSlot)
|
||||
for _, idx := range attesterSubs {
|
||||
s.lookupAttesterSubnets(digest, idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are
|
||||
// not in the list of wanted subnets.
|
||||
// TODO: Rename this functions as it does not only revalidate subscriptions.
|
||||
@@ -477,96 +361,44 @@ func (s *Service) reValidateSubscriptions(
|
||||
}
|
||||
}
|
||||
|
||||
// subscribe missing subnets for our aggregators.
|
||||
func (s *Service) subscribeAggregatorSubnet(
|
||||
subscriptions map[uint64]*pubsub.Subscription,
|
||||
idx uint64,
|
||||
// searchForPeers searches for peers in the given subnets.
|
||||
func (s *Service) searchForPeers(
|
||||
ctx context.Context,
|
||||
topicFormat string,
|
||||
digest [4]byte,
|
||||
validate wrappedVal,
|
||||
handle subHandler,
|
||||
currentSlot primitives.Slot,
|
||||
getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64,
|
||||
getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64,
|
||||
) {
|
||||
// do not subscribe if we have no peers in the same
|
||||
// subnet
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})]
|
||||
subnetTopic := fmt.Sprintf(topic, digest, idx)
|
||||
// check if subscription exists and if not subscribe the relevant subnet.
|
||||
if _, exists := subscriptions[idx]; !exists {
|
||||
subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle)
|
||||
}
|
||||
if !s.enoughPeersAreConnected(subnetTopic) {
|
||||
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet)
|
||||
// Retrieve the subnets we want to subscribe to.
|
||||
subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot)
|
||||
|
||||
// Retrieve the subnets we want to find peers for.
|
||||
subnetsToFindPeersOnlyIndex := getSubnetsToFindPeersOnly(currentSlot)
|
||||
|
||||
// Combine the subnets to subscribe and the subnets to find peers for.
|
||||
subnetsToFindPeersIndex := slice.SetUint64(append(subnetsToSubscribeIndex, subnetsToFindPeersOnlyIndex...))
|
||||
|
||||
// Find new peers for wanted subnets if needed.
|
||||
for _, subnetIndex := range subnetsToFindPeersIndex {
|
||||
topic := fmt.Sprintf(topicFormat, digest, subnetIndex)
|
||||
|
||||
// Check if we have enough peers in the subnet. Skip if we do.
|
||||
if s.enoughPeersAreConnected(topic) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Not enough peers in the subnet, we need to search for more.
|
||||
_, err := s.cfg.p2p.FindPeersWithSubnet(ctx, topic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not search for peers")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is
|
||||
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
|
||||
func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte) {
|
||||
genRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
base := p2p.GossipTopicMappings(topic, e)
|
||||
if base == nil {
|
||||
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
|
||||
}
|
||||
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
|
||||
s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle)
|
||||
}
|
||||
genesis := s.cfg.clock.GenesisTime()
|
||||
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
ticker.Done()
|
||||
return
|
||||
case <-ticker.C():
|
||||
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
|
||||
continue
|
||||
}
|
||||
valid, err := isDigestValid(digest, genesis, genRoot)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
if !valid {
|
||||
log.Warnf("Sync subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
|
||||
// Unsubscribes from all our current subnets.
|
||||
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
|
||||
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
s.unSubscribeFromTopic(fullTopic)
|
||||
}
|
||||
ticker.Done()
|
||||
return
|
||||
}
|
||||
// Check every slot that there are enough peers
|
||||
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
|
||||
if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) {
|
||||
_, err := s.cfg.p2p.FindPeersWithSubnet(
|
||||
s.ctx,
|
||||
s.addDigestAndIndexToTopic(topic, digest, i),
|
||||
i,
|
||||
flags.Get().MinimumPeersPerSubnet,
|
||||
)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not search for peers")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// subscribeToSyncSubnets subscribes to needed sync subnets, unsubscribe from unneeded ones and search for more peers if needed.
|
||||
// subscribeToSubnets subscribes to needed subnets, unsubscribe from unneeded ones and search for more peers if needed.
|
||||
// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise.
|
||||
func (s *Service) subscribeToSyncSubnets(
|
||||
func (s *Service) subscribeToSubnets(
|
||||
topicFormat string,
|
||||
digest [4]byte,
|
||||
genesisValidatorsRoot [fieldparams.RootLength]byte,
|
||||
@@ -575,16 +407,15 @@ func (s *Service) subscribeToSyncSubnets(
|
||||
currentSlot primitives.Slot,
|
||||
validate wrappedVal,
|
||||
handle subHandler,
|
||||
getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64,
|
||||
getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64,
|
||||
) bool {
|
||||
// Get sync subnets topic.
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})]
|
||||
|
||||
// Do not subscribe if not synced.
|
||||
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
|
||||
return true
|
||||
}
|
||||
|
||||
// Do not subscribe is the digest is not valid.
|
||||
// Check the validity of the digest.
|
||||
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
@@ -593,23 +424,25 @@ func (s *Service) subscribeToSyncSubnets(
|
||||
|
||||
// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
|
||||
if !valid {
|
||||
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn("Sync subnets with this digest are no longer valid, unsubscribing from all of them.")
|
||||
description := topicFormat
|
||||
if pos := strings.LastIndex(topicFormat, "/"); pos != -1 {
|
||||
description = topicFormat[pos+1:]
|
||||
}
|
||||
|
||||
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warningf("%s subnets with this digest are no longer valid, unsubscribing from all of them.", description)
|
||||
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
|
||||
return false
|
||||
}
|
||||
|
||||
// Get the current epoch.
|
||||
currentEpoch := slots.ToEpoch(currentSlot)
|
||||
|
||||
// Retrieve the subnets we want to subscribe to.
|
||||
wantedSubnetsIndex := s.retrieveActiveSyncSubnets(currentEpoch)
|
||||
subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot)
|
||||
|
||||
// Remove subscriptions that are no longer wanted.
|
||||
s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest)
|
||||
s.reValidateSubscriptions(subscriptions, subnetsToSubscribeIndex, topicFormat, digest)
|
||||
|
||||
// Subscribe to wanted subnets.
|
||||
for _, subnetIndex := range wantedSubnetsIndex {
|
||||
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
|
||||
for _, subnetIndex := range subnetsToSubscribeIndex {
|
||||
subnetTopic := fmt.Sprintf(topicFormat, digest, subnetIndex)
|
||||
|
||||
// Check if subscription exists.
|
||||
if _, exists := subscriptions[subnetIndex]; exists {
|
||||
@@ -620,38 +453,20 @@ func (s *Service) subscribeToSyncSubnets(
|
||||
subscription := s.subscribeWithBase(subnetTopic, validate, handle)
|
||||
subscriptions[subnetIndex] = subscription
|
||||
}
|
||||
|
||||
// Find new peers for wanted subnets if needed.
|
||||
for _, subnetIndex := range wantedSubnetsIndex {
|
||||
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
|
||||
|
||||
// Check if we have enough peers in the subnet. Skip if we do.
|
||||
if s.enoughPeersAreConnected(subnetTopic) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Not enough peers in the subnet, we need to search for more.
|
||||
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not search for peers")
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// subscribeDynamicWithSyncSubnets subscribes to a dynamically changing list of subnets.
|
||||
func (s *Service) subscribeDynamicWithSyncSubnets(
|
||||
// subscribeWithParameters subscribes to a list of subnets.
|
||||
func (s *Service) subscribeWithParameters(
|
||||
topicFormat string,
|
||||
validate wrappedVal,
|
||||
handle subHandler,
|
||||
digest [4]byte,
|
||||
getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64,
|
||||
getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64,
|
||||
) {
|
||||
// Retrieve the number of committee subnets we need to subscribe to.
|
||||
syncCommiteeSubnetsCount := params.BeaconConfig().SyncCommitteeSubnetCount
|
||||
|
||||
// Initialize the subscriptions map.
|
||||
subscriptions := make(map[uint64]*pubsub.Subscription, syncCommiteeSubnetsCount)
|
||||
subscriptions := make(map[uint64]*pubsub.Subscription)
|
||||
|
||||
// Retrieve the genesis validators root.
|
||||
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
@@ -678,14 +493,20 @@ func (s *Service) subscribeDynamicWithSyncSubnets(
|
||||
// Retrieve the current slot.
|
||||
currentSlot := s.cfg.clock.CurrentSlot()
|
||||
|
||||
// Subscribe to subnets.
|
||||
s.subscribeToSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
|
||||
|
||||
// Derive a new context and cancel function.
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
|
||||
go func() {
|
||||
// Subscribe to the sync subnets.
|
||||
s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
|
||||
// Search for peers.
|
||||
s.searchForPeers(ctx, topicFormat, digest, currentSlot, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
|
||||
|
||||
for {
|
||||
select {
|
||||
case currentSlot := <-ticker.C():
|
||||
isDigestValid := s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
|
||||
isDigestValid := s.subscribeToSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
|
||||
|
||||
// Stop the ticker if the digest is not valid. Likely to happen after a hard fork.
|
||||
if !isDigestValid {
|
||||
@@ -693,7 +514,11 @@ func (s *Service) subscribeDynamicWithSyncSubnets(
|
||||
return
|
||||
}
|
||||
|
||||
// Search for peers.
|
||||
s.searchForPeers(ctx, topicFormat, digest, currentSlot, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
|
||||
|
||||
case <-s.ctx.Done():
|
||||
cancel()
|
||||
ticker.Done()
|
||||
return
|
||||
}
|
||||
@@ -701,21 +526,8 @@ func (s *Service) subscribeDynamicWithSyncSubnets(
|
||||
}()
|
||||
}
|
||||
|
||||
// lookup peers for attester specific subnets.
|
||||
func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})]
|
||||
subnetTopic := fmt.Sprintf(topic, digest, idx)
|
||||
if !s.enoughPeersAreConnected(subnetTopic) {
|
||||
// perform a search for peers with the desired committee index.
|
||||
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not search for peers")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) unSubscribeFromTopic(topic string) {
|
||||
log.WithField("topic", topic).Debug("Unsubscribing from topic")
|
||||
log.WithField("topic", topic).Info("Unsubscribed from")
|
||||
if err := s.cfg.p2p.PubSub().UnregisterTopicValidator(topic); err != nil {
|
||||
log.WithError(err).Error("Could not unregister topic validator")
|
||||
}
|
||||
@@ -740,19 +552,16 @@ func (s *Service) enoughPeersAreConnected(subnetTopic string) bool {
|
||||
return peersWithSubnetCount >= threshold
|
||||
}
|
||||
|
||||
func (s *Service) retrievePersistentSubs(currSlot primitives.Slot) []uint64 {
|
||||
// Persistent subscriptions from validators
|
||||
persistentSubs := s.persistentSubnetIndices()
|
||||
// Update desired topic indices for aggregator
|
||||
wantedSubs := s.aggregatorSubnetIndices(currSlot)
|
||||
func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 {
|
||||
if flags.Get().SubscribeToAllSubnets {
|
||||
return sliceFromCount(params.BeaconConfig().AttestationSubnetCount)
|
||||
}
|
||||
|
||||
// Combine subscriptions to get all requested subscriptions
|
||||
return slice.SetUint64(append(persistentSubs, wantedSubs...))
|
||||
}
|
||||
persistentSubnetIndices := s.persistentSubnetIndices()
|
||||
aggregatorSubnetIndices := s.aggregatorSubnetIndices(currentSlot)
|
||||
|
||||
func (*Service) retrieveActiveSyncSubnets(currEpoch primitives.Epoch) []uint64 {
|
||||
subs := cache.SyncSubnetIDs.GetAllSubnets(currEpoch)
|
||||
return slice.SetUint64(subs)
|
||||
// Combine subscriptions to get all requested subscriptions.
|
||||
return slice.SetUint64(append(persistentSubnetIndices, aggregatorSubnetIndices...))
|
||||
}
|
||||
|
||||
// filters out required peers for the node to function, not
|
||||
@@ -768,7 +577,7 @@ func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID {
|
||||
return pids
|
||||
}
|
||||
currSlot := s.cfg.clock.CurrentSlot()
|
||||
wantedSubs := s.retrievePersistentSubs(currSlot)
|
||||
wantedSubs := s.persistentAndAggregatorSubnetIndices(currSlot)
|
||||
wantedSubs = slice.SetUint64(append(wantedSubs, s.attesterSubnetIndices(currSlot)...))
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})]
|
||||
|
||||
|
||||
@@ -312,37 +312,6 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
|
||||
require.LogsDoNotContain(t, hook, "Could not unregister topic validator")
|
||||
}
|
||||
|
||||
func TestStaticSubnets(t *testing.T) {
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
chain := &mockChain.ChainService{
|
||||
Genesis: time.Now(),
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
}
|
||||
r := Service{
|
||||
ctx: ctx,
|
||||
cfg: &config{
|
||||
chain: chain,
|
||||
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
|
||||
p2p: p,
|
||||
},
|
||||
chainStarted: abool.New(),
|
||||
subHandler: newSubTopicHandler(),
|
||||
}
|
||||
defaultTopic := "/eth2/%x/beacon_attestation_%d"
|
||||
d, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
r.subscribeStaticWithSubnets(defaultTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
|
||||
// no-op
|
||||
return nil
|
||||
}, d, params.BeaconConfig().AttestationSubnetCount)
|
||||
topics := r.cfg.p2p.PubSub().GetTopics()
|
||||
if uint64(len(topics)) != params.BeaconConfig().AttestationSubnetCount {
|
||||
t.Errorf("Wanted the number of subnet topics registered to be %d but got %d", params.BeaconConfig().AttestationSubnetCount, len(topics))
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
|
||||
func Test_wrapAndReportValidation(t *testing.T) {
|
||||
mChain := &mockChain.ChainService{
|
||||
Genesis: time.Now(),
|
||||
@@ -539,37 +508,6 @@ func TestFilterSubnetPeers(t *testing.T) {
|
||||
assert.Equal(t, 1, len(recPeers), "expected at least 1 suitable peer to prune")
|
||||
}
|
||||
|
||||
func TestSubscribeWithSyncSubnets_StaticOK(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.MainnetTestConfig().Copy()
|
||||
cfg.SecondsPerSlot = 1
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
chain := &mockChain.ChainService{
|
||||
Genesis: time.Now(),
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
}
|
||||
r := Service{
|
||||
ctx: ctx,
|
||||
cfg: &config{
|
||||
chain: chain,
|
||||
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
|
||||
p2p: p,
|
||||
},
|
||||
chainStarted: abool.New(),
|
||||
subHandler: newSubTopicHandler(),
|
||||
}
|
||||
// Empty cache at the end of the test.
|
||||
defer cache.SyncSubnetIDs.EmptyAllCaches()
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
r.subscribeStaticWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
|
||||
assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics()))
|
||||
cancel()
|
||||
}
|
||||
|
||||
func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.MainnetConfig().Copy()
|
||||
@@ -600,7 +538,7 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
|
||||
cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second)
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
r.subscribeDynamicWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
|
||||
r.subscribeWithParameters(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, r.activeSyncSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} })
|
||||
time.Sleep(2 * time.Second)
|
||||
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
|
||||
topicMap := map[string]bool{}
|
||||
@@ -615,46 +553,6 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
|
||||
cancel()
|
||||
}
|
||||
|
||||
func TestSubscribeWithSyncSubnets_StaticSwitchFork(t *testing.T) {
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.AltairForkEpoch = 1
|
||||
cfg.SecondsPerSlot = 1
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
params.BeaconConfig().InitializeForkSchedule()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
currSlot := primitives.Slot(100)
|
||||
chain := &mockChain.ChainService{
|
||||
Genesis: time.Now().Add(-time.Duration(uint64(params.BeaconConfig().SlotsPerEpoch)*params.BeaconConfig().SecondsPerSlot) * time.Second),
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
Slot: &currSlot,
|
||||
}
|
||||
r := Service{
|
||||
ctx: ctx,
|
||||
cfg: &config{
|
||||
chain: chain,
|
||||
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
|
||||
p2p: p,
|
||||
},
|
||||
chainStarted: abool.New(),
|
||||
subHandler: newSubTopicHandler(),
|
||||
}
|
||||
// Empty cache at the end of the test.
|
||||
defer cache.SyncSubnetIDs.EmptyAllCaches()
|
||||
genRoot := r.cfg.clock.GenesisValidatorsRoot()
|
||||
digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:])
|
||||
assert.NoError(t, err)
|
||||
r.subscribeStaticWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
|
||||
assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics()))
|
||||
|
||||
// Expect that all old topics will be unsubscribed.
|
||||
time.Sleep(2 * time.Second)
|
||||
assert.Equal(t, 0, len(r.cfg.p2p.PubSub().GetTopics()))
|
||||
|
||||
cancel()
|
||||
}
|
||||
|
||||
func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
@@ -689,7 +587,7 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) {
|
||||
digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:])
|
||||
assert.NoError(t, err)
|
||||
|
||||
r.subscribeDynamicWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
|
||||
r.subscribeWithParameters(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, r.activeSyncSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} })
|
||||
time.Sleep(2 * time.Second)
|
||||
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
|
||||
topicMap := map[string]bool{}
|
||||
|
||||
Reference in New Issue
Block a user