|
|
|
|
@@ -55,52 +55,32 @@ func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Messag
|
|
|
|
|
|
|
|
|
|
// Register PubSub subscribers
|
|
|
|
|
func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
|
|
|
|
s.subscribe(
|
|
|
|
|
p2p.BlockSubnetTopicFormat,
|
|
|
|
|
s.validateBeaconBlockPubSub,
|
|
|
|
|
s.beaconBlockSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
)
|
|
|
|
|
s.subscribe(
|
|
|
|
|
p2p.AggregateAndProofSubnetTopicFormat,
|
|
|
|
|
s.validateAggregateAndProof,
|
|
|
|
|
s.beaconAggregateProofSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
)
|
|
|
|
|
s.subscribe(
|
|
|
|
|
p2p.ExitSubnetTopicFormat,
|
|
|
|
|
s.validateVoluntaryExit,
|
|
|
|
|
s.voluntaryExitSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
)
|
|
|
|
|
s.subscribe(
|
|
|
|
|
p2p.ProposerSlashingSubnetTopicFormat,
|
|
|
|
|
s.validateProposerSlashing,
|
|
|
|
|
s.proposerSlashingSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
)
|
|
|
|
|
s.subscribe(
|
|
|
|
|
p2p.AttesterSlashingSubnetTopicFormat,
|
|
|
|
|
s.validateAttesterSlashing,
|
|
|
|
|
s.attesterSlashingSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
)
|
|
|
|
|
s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, digest)
|
|
|
|
|
s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, digest)
|
|
|
|
|
s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, digest)
|
|
|
|
|
s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, digest)
|
|
|
|
|
s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, digest)
|
|
|
|
|
|
|
|
|
|
if flags.Get().SubscribeToAllSubnets {
|
|
|
|
|
s.subscribeStaticWithSubnets(
|
|
|
|
|
p2p.AttestationSubnetTopicFormat,
|
|
|
|
|
s.validateCommitteeIndexBeaconAttestation, /* validator */
|
|
|
|
|
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
|
|
|
|
|
s.validateCommitteeIndexBeaconAttestation,
|
|
|
|
|
s.committeeIndexBeaconAttestationSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
"Attestation",
|
|
|
|
|
params.BeaconConfig().AttestationSubnetCount,
|
|
|
|
|
)
|
|
|
|
|
} else {
|
|
|
|
|
s.subscribeDynamicWithSubnets(
|
|
|
|
|
p2p.AttestationSubnetTopicFormat,
|
|
|
|
|
s.validateCommitteeIndexBeaconAttestation, /* validator */
|
|
|
|
|
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
|
|
|
|
|
s.validateCommitteeIndexBeaconAttestation,
|
|
|
|
|
s.committeeIndexBeaconAttestationSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
params.BeaconConfig().MaxCommitteesPerSlot,
|
|
|
|
|
s.subscribeToAttestationsSubnetsDynamic,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Altair Fork Version
|
|
|
|
|
if epoch >= params.BeaconConfig().AltairForkEpoch {
|
|
|
|
|
s.subscribe(
|
|
|
|
|
@@ -109,19 +89,24 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
|
|
|
|
s.syncContributionAndProofSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
if flags.Get().SubscribeToAllSubnets {
|
|
|
|
|
s.subscribeStaticWithSyncSubnets(
|
|
|
|
|
s.subscribeStaticWithSubnets(
|
|
|
|
|
p2p.SyncCommitteeSubnetTopicFormat,
|
|
|
|
|
s.validateSyncCommitteeMessage, /* validator */
|
|
|
|
|
s.syncCommitteeMessageSubscriber, /* message handler */
|
|
|
|
|
s.validateSyncCommitteeMessage,
|
|
|
|
|
s.syncCommitteeMessageSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
"Sync committee",
|
|
|
|
|
params.BeaconConfig().SyncCommitteeSubnetCount,
|
|
|
|
|
)
|
|
|
|
|
} else {
|
|
|
|
|
s.subscribeDynamicWithSyncSubnets(
|
|
|
|
|
s.subscribeDynamicWithSubnets(
|
|
|
|
|
p2p.SyncCommitteeSubnetTopicFormat,
|
|
|
|
|
s.validateSyncCommitteeMessage, /* validator */
|
|
|
|
|
s.syncCommitteeMessageSubscriber, /* message handler */
|
|
|
|
|
s.validateSyncCommitteeMessage,
|
|
|
|
|
s.syncCommitteeMessageSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
params.BeaconConfig().SyncCommitteeSubnetCount,
|
|
|
|
|
s.subscribeToSyncSubnetsDynamic,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -143,6 +128,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
|
|
|
|
s.validateBlob, /* validator */
|
|
|
|
|
s.blobSubscriber, /* message handler */
|
|
|
|
|
digest,
|
|
|
|
|
"Blob sidecar",
|
|
|
|
|
params.BeaconConfig().BlobsidecarSubnetCount,
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
@@ -326,130 +312,283 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
|
|
|
|
|
|
|
|
|
|
// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is
|
|
|
|
|
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
|
|
|
|
|
func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte, subnetCount uint64) {
|
|
|
|
|
genRoot := s.cfg.clock.GenesisValidatorsRoot()
|
|
|
|
|
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
|
|
|
|
|
func (s *Service) subscribeStaticWithSubnets(
|
|
|
|
|
topic string,
|
|
|
|
|
validator wrappedVal,
|
|
|
|
|
handle subHandler,
|
|
|
|
|
digest [4]byte,
|
|
|
|
|
humanDescription string,
|
|
|
|
|
subnetCount uint64,
|
|
|
|
|
) {
|
|
|
|
|
// Retrieve the genesis validators root.
|
|
|
|
|
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
|
|
|
|
|
|
|
|
|
|
// Retrieve the epoch of the fork corresponding to the digest.
|
|
|
|
|
_, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
|
|
|
|
|
if err != nil {
|
|
|
|
|
// Impossible condition as it would mean digest does not exist.
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
base := p2p.GossipTopicMappings(topic, e)
|
|
|
|
|
|
|
|
|
|
// Retrieve the base protobuf message.
|
|
|
|
|
base := p2p.GossipTopicMappings(topic, epoch)
|
|
|
|
|
if base == nil {
|
|
|
|
|
// Impossible condition as it would mean topic does not exist.
|
|
|
|
|
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
|
|
|
|
|
}
|
|
|
|
|
for i := uint64(0); i < subnetCount; i++ {
|
|
|
|
|
s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle)
|
|
|
|
|
|
|
|
|
|
// Define a ticker ticking every slot.
|
|
|
|
|
genesisTime := s.cfg.clock.GenesisTime()
|
|
|
|
|
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
|
|
|
|
|
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot)
|
|
|
|
|
|
|
|
|
|
minimumPeersPerSubnet := flags.Get().MinimumPeersPerSubnet
|
|
|
|
|
|
|
|
|
|
// Subscribe to all subnets.
|
|
|
|
|
for i := range subnetCount {
|
|
|
|
|
fullTopic := s.addDigestAndIndexToTopic(topic, digest, i)
|
|
|
|
|
s.subscribeWithBase(fullTopic, validator, handle)
|
|
|
|
|
}
|
|
|
|
|
genesis := s.cfg.clock.GenesisTime()
|
|
|
|
|
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.ctx.Done():
|
|
|
|
|
ticker.Done()
|
|
|
|
|
return
|
|
|
|
|
case <-ticker.C():
|
|
|
|
|
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
valid, err := isDigestValid(digest, genesis, genRoot)
|
|
|
|
|
|
|
|
|
|
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error(err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !valid {
|
|
|
|
|
log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
|
|
|
|
|
message := fmt.Sprintf("%s subnets with this digest are no longer valid, unsubscribing from all of them.", humanDescription)
|
|
|
|
|
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warning(message)
|
|
|
|
|
|
|
|
|
|
// Unsubscribes from all our current subnets.
|
|
|
|
|
for i := uint64(0); i < subnetCount; i++ {
|
|
|
|
|
for i := range subnetCount {
|
|
|
|
|
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
|
|
|
|
s.unSubscribeFromTopic(fullTopic)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ticker.Done()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// Check every slot that there are enough peers
|
|
|
|
|
for i := uint64(0); i < subnetCount; i++ {
|
|
|
|
|
if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) {
|
|
|
|
|
_, err := s.cfg.p2p.FindPeersWithSubnet(
|
|
|
|
|
s.ctx,
|
|
|
|
|
s.addDigestAndIndexToTopic(topic, digest, i),
|
|
|
|
|
i,
|
|
|
|
|
flags.Get().MinimumPeersPerSubnet,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
|
|
|
|
|
// Check that all subnets have enough peers.
|
|
|
|
|
for i := range subnetCount {
|
|
|
|
|
fullTopic := s.addDigestAndIndexToTopic(topic, digest, i)
|
|
|
|
|
|
|
|
|
|
if !s.enoughPeersAreConnected(fullTopic) {
|
|
|
|
|
if _, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, fullTopic, i, minimumPeersPerSubnet); err != nil {
|
|
|
|
|
log.WithError(err).Debug("Could not search for peers")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
case <-s.ctx.Done():
|
|
|
|
|
ticker.Done()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// subscribe to a dynamically changing list of subnets. This method expects a fmt compatible
|
|
|
|
|
// string for the topic name and the list of subnets for subscribed topics that should be
|
|
|
|
|
// maintained.
|
|
|
|
|
type specificSubscribeFunc func(
|
|
|
|
|
topic string,
|
|
|
|
|
digest [4]byte,
|
|
|
|
|
genesisValidatorsRoot [fieldparams.RootLength]byte,
|
|
|
|
|
genesisTime time.Time,
|
|
|
|
|
subscriptions map[uint64]*pubsub.Subscription,
|
|
|
|
|
currentSlot primitives.Slot,
|
|
|
|
|
validate wrappedVal,
|
|
|
|
|
handle subHandler,
|
|
|
|
|
) bool
|
|
|
|
|
|
|
|
|
|
// subscribeDynamicWithSubnets subscribes to a dynamically changing list of subnets.
|
|
|
|
|
func (s *Service) subscribeDynamicWithSubnets(
|
|
|
|
|
topicFormat string,
|
|
|
|
|
validate wrappedVal,
|
|
|
|
|
handle subHandler,
|
|
|
|
|
digest [4]byte,
|
|
|
|
|
subnetCount uint64,
|
|
|
|
|
specificSubscribe specificSubscribeFunc,
|
|
|
|
|
) {
|
|
|
|
|
genRoot := s.cfg.clock.GenesisValidatorsRoot()
|
|
|
|
|
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
|
|
|
|
|
// Initialize the subscriptions map.
|
|
|
|
|
subscriptions := make(map[uint64]*pubsub.Subscription, subnetCount)
|
|
|
|
|
|
|
|
|
|
// Retrieve the genesis validators root.
|
|
|
|
|
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
|
|
|
|
|
|
|
|
|
|
// Retrieve the epoch of the fork corresponding to the digest.
|
|
|
|
|
_, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
|
|
|
|
|
if err != nil {
|
|
|
|
|
// Impossible condition as it would mean digest does not exist.
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
base := p2p.GossipTopicMappings(topicFormat, e)
|
|
|
|
|
|
|
|
|
|
// Retrieve the base protobuf message.
|
|
|
|
|
base := p2p.GossipTopicMappings(topicFormat, epoch)
|
|
|
|
|
if base == nil {
|
|
|
|
|
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
|
|
|
|
|
}
|
|
|
|
|
subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot)
|
|
|
|
|
genesis := s.cfg.clock.GenesisTime()
|
|
|
|
|
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
|
|
|
|
|
|
|
|
|
|
// Retrieve the genesis time.
|
|
|
|
|
genesisTime := s.cfg.clock.GenesisTime()
|
|
|
|
|
|
|
|
|
|
// Define a ticker ticking every slot.
|
|
|
|
|
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
|
|
|
|
|
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot)
|
|
|
|
|
|
|
|
|
|
// Retrieve the current slot.
|
|
|
|
|
currentSlot := s.cfg.clock.CurrentSlot()
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
// Subscribe to the sync subnets.
|
|
|
|
|
specificSubscribe(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.ctx.Done():
|
|
|
|
|
ticker.Done()
|
|
|
|
|
return
|
|
|
|
|
case currentSlot := <-ticker.C():
|
|
|
|
|
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
valid, err := isDigestValid(digest, genesis, genRoot)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error(err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if !valid {
|
|
|
|
|
log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
|
|
|
|
|
// Unsubscribes from all our current subnets.
|
|
|
|
|
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
|
|
|
|
|
isDigestValid := specificSubscribe(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
|
|
|
|
|
|
|
|
|
|
// Stop the ticker if the digest is not valid. Likely to happen after a hard fork.
|
|
|
|
|
if !isDigestValid {
|
|
|
|
|
ticker.Done()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
wantedSubs := s.retrievePersistentSubs(currentSlot)
|
|
|
|
|
s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)
|
|
|
|
|
|
|
|
|
|
for _, idx := range wantedSubs {
|
|
|
|
|
s.subscribeAggregatorSubnet(subscriptions, idx, digest, validate, handle)
|
|
|
|
|
}
|
|
|
|
|
// find desired subs for attesters
|
|
|
|
|
attesterSubs := s.attesterSubnetIndices(currentSlot)
|
|
|
|
|
for _, idx := range attesterSubs {
|
|
|
|
|
s.lookupAttesterSubnets(digest, idx)
|
|
|
|
|
}
|
|
|
|
|
case <-s.ctx.Done():
|
|
|
|
|
ticker.Done()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Service) subscribeToAttestationsSubnetsDynamic(
|
|
|
|
|
topicFormat string,
|
|
|
|
|
digest [4]byte,
|
|
|
|
|
genesisValidatorsRoot [fieldparams.RootLength]byte,
|
|
|
|
|
genesisTime time.Time,
|
|
|
|
|
subscriptions map[uint64]*pubsub.Subscription,
|
|
|
|
|
currentSlot primitives.Slot,
|
|
|
|
|
validate wrappedVal,
|
|
|
|
|
handle subHandler,
|
|
|
|
|
) bool {
|
|
|
|
|
// Do not subscribe if not synced.
|
|
|
|
|
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Do not subscribe is the digest is not valid.
|
|
|
|
|
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error(err)
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
|
|
|
|
|
if !valid {
|
|
|
|
|
const message = "Attestation subnets with this digest are no longer valid, unsubscribing from all of them."
|
|
|
|
|
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn(message)
|
|
|
|
|
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
wantedSubnetsIndex := s.retrievePersistentSubs(currentSlot)
|
|
|
|
|
s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest)
|
|
|
|
|
|
|
|
|
|
for _, index := range wantedSubnetsIndex {
|
|
|
|
|
s.subscribeAggregatorSubnet(subscriptions, index, digest, validate, handle)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Find desired subnets for attesters.
|
|
|
|
|
attesterSubnets := s.attesterSubnetIndices(currentSlot)
|
|
|
|
|
for _, index := range attesterSubnets {
|
|
|
|
|
s.lookupAttesterSubnets(digest, index)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Service) subscribeToSyncSubnetsDynamic(
|
|
|
|
|
topicFormat string,
|
|
|
|
|
digest [4]byte,
|
|
|
|
|
genesisValidatorsRoot [fieldparams.RootLength]byte,
|
|
|
|
|
genesisTime time.Time,
|
|
|
|
|
subscriptions map[uint64]*pubsub.Subscription,
|
|
|
|
|
currentSlot primitives.Slot,
|
|
|
|
|
validate wrappedVal,
|
|
|
|
|
handle subHandler,
|
|
|
|
|
) bool {
|
|
|
|
|
// Get sync subnets topic.
|
|
|
|
|
topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})]
|
|
|
|
|
|
|
|
|
|
// Do not subscribe if not synced.
|
|
|
|
|
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Do not subscribe is the digest is not valid.
|
|
|
|
|
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error(err)
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
|
|
|
|
|
if !valid {
|
|
|
|
|
const message = "Sync subnets with this digest are no longer valid, unsubscribing from all of them."
|
|
|
|
|
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn(message)
|
|
|
|
|
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Get the current epoch.
|
|
|
|
|
currentEpoch := slots.ToEpoch(currentSlot)
|
|
|
|
|
|
|
|
|
|
// Retrieve the subnets we want to subscribe to.
|
|
|
|
|
wantedSubnetsIndex := s.retrieveActiveSyncSubnets(currentEpoch)
|
|
|
|
|
|
|
|
|
|
// Remove subscriptions that are no longer wanted.
|
|
|
|
|
s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest)
|
|
|
|
|
|
|
|
|
|
// Subscribe to wanted subnets.
|
|
|
|
|
for _, subnetIndex := range wantedSubnetsIndex {
|
|
|
|
|
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
|
|
|
|
|
|
|
|
|
|
// Check if subscription exists.
|
|
|
|
|
if _, exists := subscriptions[subnetIndex]; exists {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// We need to subscribe to the subnet.
|
|
|
|
|
subscription := s.subscribeWithBase(subnetTopic, validate, handle)
|
|
|
|
|
subscriptions[subnetIndex] = subscription
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Find new peers for wanted subnets if needed.
|
|
|
|
|
for _, subnetIndex := range wantedSubnetsIndex {
|
|
|
|
|
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
|
|
|
|
|
|
|
|
|
|
// Check if we have enough peers in the subnet. Skip if we do.
|
|
|
|
|
if s.enoughPeersAreConnected(subnetTopic) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Not enough peers in the subnet, we need to search for more.
|
|
|
|
|
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.WithError(err).Debug("Could not search for peers")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are
|
|
|
|
|
// not in the list of wanted subnets.
|
|
|
|
|
// TODO: Rename this functions as it does not only revalidate subscriptions.
|
|
|
|
|
@@ -501,206 +640,6 @@ func (s *Service) subscribeAggregatorSubnet(
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is
|
|
|
|
|
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
|
|
|
|
|
func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte) {
|
|
|
|
|
genRoot := s.cfg.clock.GenesisValidatorsRoot()
|
|
|
|
|
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
base := p2p.GossipTopicMappings(topic, e)
|
|
|
|
|
if base == nil {
|
|
|
|
|
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
|
|
|
|
|
}
|
|
|
|
|
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
|
|
|
|
|
s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle)
|
|
|
|
|
}
|
|
|
|
|
genesis := s.cfg.clock.GenesisTime()
|
|
|
|
|
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.ctx.Done():
|
|
|
|
|
ticker.Done()
|
|
|
|
|
return
|
|
|
|
|
case <-ticker.C():
|
|
|
|
|
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
valid, err := isDigestValid(digest, genesis, genRoot)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error(err)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
if !valid {
|
|
|
|
|
log.Warnf("Sync subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
|
|
|
|
|
// Unsubscribes from all our current subnets.
|
|
|
|
|
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
|
|
|
|
|
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
|
|
|
|
s.unSubscribeFromTopic(fullTopic)
|
|
|
|
|
}
|
|
|
|
|
ticker.Done()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
// Check every slot that there are enough peers
|
|
|
|
|
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
|
|
|
|
|
if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) {
|
|
|
|
|
_, err := s.cfg.p2p.FindPeersWithSubnet(
|
|
|
|
|
s.ctx,
|
|
|
|
|
s.addDigestAndIndexToTopic(topic, digest, i),
|
|
|
|
|
i,
|
|
|
|
|
flags.Get().MinimumPeersPerSubnet,
|
|
|
|
|
)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.WithError(err).Debug("Could not search for peers")
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// subscribeToSyncSubnets subscribes to needed sync subnets, unsubscribe from unneeded ones and search for more peers if needed.
|
|
|
|
|
// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise.
|
|
|
|
|
func (s *Service) subscribeToSyncSubnets(
|
|
|
|
|
topicFormat string,
|
|
|
|
|
digest [4]byte,
|
|
|
|
|
genesisValidatorsRoot [fieldparams.RootLength]byte,
|
|
|
|
|
genesisTime time.Time,
|
|
|
|
|
subscriptions map[uint64]*pubsub.Subscription,
|
|
|
|
|
currentSlot primitives.Slot,
|
|
|
|
|
validate wrappedVal,
|
|
|
|
|
handle subHandler,
|
|
|
|
|
) bool {
|
|
|
|
|
// Get sync subnets topic.
|
|
|
|
|
topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})]
|
|
|
|
|
|
|
|
|
|
// Do not subscribe if not synced.
|
|
|
|
|
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Do not subscribe is the digest is not valid.
|
|
|
|
|
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.Error(err)
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
|
|
|
|
|
if !valid {
|
|
|
|
|
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn("Sync subnets with this digest are no longer valid, unsubscribing from all of them.")
|
|
|
|
|
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Get the current epoch.
|
|
|
|
|
currentEpoch := slots.ToEpoch(currentSlot)
|
|
|
|
|
|
|
|
|
|
// Retrieve the subnets we want to subscribe to.
|
|
|
|
|
wantedSubnetsIndex := s.retrieveActiveSyncSubnets(currentEpoch)
|
|
|
|
|
|
|
|
|
|
// Remove subscriptions that are no longer wanted.
|
|
|
|
|
s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest)
|
|
|
|
|
|
|
|
|
|
// Subscribe to wanted subnets.
|
|
|
|
|
for _, subnetIndex := range wantedSubnetsIndex {
|
|
|
|
|
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
|
|
|
|
|
|
|
|
|
|
// Check if subscription exists.
|
|
|
|
|
if _, exists := subscriptions[subnetIndex]; exists {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// We need to subscribe to the subnet.
|
|
|
|
|
subscription := s.subscribeWithBase(subnetTopic, validate, handle)
|
|
|
|
|
subscriptions[subnetIndex] = subscription
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Find new peers for wanted subnets if needed.
|
|
|
|
|
for _, subnetIndex := range wantedSubnetsIndex {
|
|
|
|
|
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
|
|
|
|
|
|
|
|
|
|
// Check if we have enough peers in the subnet. Skip if we do.
|
|
|
|
|
if s.enoughPeersAreConnected(subnetTopic) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Not enough peers in the subnet, we need to search for more.
|
|
|
|
|
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
|
|
|
|
|
if err != nil {
|
|
|
|
|
log.WithError(err).Debug("Could not search for peers")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// subscribeDynamicWithSyncSubnets subscribes to a dynamically changing list of subnets.
|
|
|
|
|
func (s *Service) subscribeDynamicWithSyncSubnets(
|
|
|
|
|
topicFormat string,
|
|
|
|
|
validate wrappedVal,
|
|
|
|
|
handle subHandler,
|
|
|
|
|
digest [4]byte,
|
|
|
|
|
) {
|
|
|
|
|
// Retrieve the number of committee subnets we need to subscribe to.
|
|
|
|
|
syncCommiteeSubnetsCount := params.BeaconConfig().SyncCommitteeSubnetCount
|
|
|
|
|
|
|
|
|
|
// Initialize the subscriptions map.
|
|
|
|
|
subscriptions := make(map[uint64]*pubsub.Subscription, syncCommiteeSubnetsCount)
|
|
|
|
|
|
|
|
|
|
// Retrieve the genesis validators root.
|
|
|
|
|
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
|
|
|
|
|
|
|
|
|
|
// Retrieve the epoch of the fork corresponding to the digest.
|
|
|
|
|
_, epoch, err := forks.RetrieveForkDataFromDigest(digest, genesisValidatorsRoot[:])
|
|
|
|
|
if err != nil {
|
|
|
|
|
panic(err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Retrieve the base protobuf message.
|
|
|
|
|
base := p2p.GossipTopicMappings(topicFormat, epoch)
|
|
|
|
|
if base == nil {
|
|
|
|
|
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Retrieve the genesis time.
|
|
|
|
|
genesisTime := s.cfg.clock.GenesisTime()
|
|
|
|
|
|
|
|
|
|
// Define a ticker ticking every slot.
|
|
|
|
|
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
|
|
|
|
|
ticker := slots.NewSlotTicker(genesisTime, secondsPerSlot)
|
|
|
|
|
|
|
|
|
|
// Retrieve the current slot.
|
|
|
|
|
currentSlot := s.cfg.clock.CurrentSlot()
|
|
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
|
// Subscribe to the sync subnets.
|
|
|
|
|
s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case currentSlot := <-ticker.C():
|
|
|
|
|
isDigestValid := s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
|
|
|
|
|
|
|
|
|
|
// Stop the ticker if the digest is not valid. Likely to happen after a hard fork.
|
|
|
|
|
if !isDigestValid {
|
|
|
|
|
ticker.Done()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
case <-s.ctx.Done():
|
|
|
|
|
ticker.Done()
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// lookup peers for attester specific subnets.
|
|
|
|
|
func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
|
|
|
|
|
topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})]
|
|
|
|
|
|