|
|
|
|
@@ -85,12 +85,14 @@ type subnetTracker struct {
|
|
|
|
|
subscribeParameters
|
|
|
|
|
mu sync.RWMutex
|
|
|
|
|
subscriptions map[uint64]*pubsub.Subscription
|
|
|
|
|
trackedTopics map[uint64]string // Maps subnet to full topic string for unsubscribing
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newSubnetTracker(p subscribeParameters) *subnetTracker {
|
|
|
|
|
return &subnetTracker{
|
|
|
|
|
subscribeParameters: p,
|
|
|
|
|
subscriptions: make(map[uint64]*pubsub.Subscription),
|
|
|
|
|
trackedTopics: make(map[uint64]string),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -117,31 +119,84 @@ func (t *subnetTracker) missing(wanted map[uint64]bool) []uint64 {
|
|
|
|
|
missing = append(missing, subnet)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if len(missing) > 0 {
|
|
|
|
|
log.WithFields(logrus.Fields{
|
|
|
|
|
"wanted": len(wanted),
|
|
|
|
|
"tracked": len(t.subscriptions),
|
|
|
|
|
"missing": missing,
|
|
|
|
|
"trackedSubnets": func() []uint64 {
|
|
|
|
|
var tracked []uint64
|
|
|
|
|
for k := range t.subscriptions {
|
|
|
|
|
tracked = append(tracked, k)
|
|
|
|
|
}
|
|
|
|
|
return tracked
|
|
|
|
|
}(),
|
|
|
|
|
}).Debug("Subnet tracker missing analysis")
|
|
|
|
|
}
|
|
|
|
|
return missing
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// tryReserveSubnets atomically checks for missing subnets and reserves them for subscription.
|
|
|
|
|
// Returns a list of subnets that were successfully reserved by this call.
|
|
|
|
|
func (t *subnetTracker) tryReserveSubnets(wanted map[uint64]bool) []uint64 {
|
|
|
|
|
t.mu.Lock()
|
|
|
|
|
defer t.mu.Unlock()
|
|
|
|
|
var reserved []uint64
|
|
|
|
|
for subnet := range wanted {
|
|
|
|
|
if _, ok := t.subscriptions[subnet]; !ok {
|
|
|
|
|
// Reserve by marking as nil (pending subscription)
|
|
|
|
|
t.subscriptions[subnet] = nil
|
|
|
|
|
reserved = append(reserved, subnet)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return reserved
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// cancelReservation removes a reservation for a subnet that failed to subscribe.
|
|
|
|
|
func (t *subnetTracker) cancelReservation(subnet uint64) {
|
|
|
|
|
t.mu.Lock()
|
|
|
|
|
defer t.mu.Unlock()
|
|
|
|
|
if sub, ok := t.subscriptions[subnet]; ok && sub == nil {
|
|
|
|
|
delete(t.subscriptions, subnet)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// cancelSubscription cancels and removes the subscription for a given subnet.
|
|
|
|
|
func (t *subnetTracker) cancelSubscription(subnet uint64) {
|
|
|
|
|
func (t *subnetTracker) cancelSubscription(subnet uint64) string {
|
|
|
|
|
t.mu.Lock()
|
|
|
|
|
defer t.mu.Unlock()
|
|
|
|
|
defer delete(t.subscriptions, subnet)
|
|
|
|
|
defer delete(t.trackedTopics, subnet)
|
|
|
|
|
|
|
|
|
|
topic := t.trackedTopics[subnet]
|
|
|
|
|
sub := t.subscriptions[subnet]
|
|
|
|
|
if sub == nil {
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
// Debug: Check for inconsistent state
|
|
|
|
|
if sub != nil && topic == "" {
|
|
|
|
|
log.WithField("subnet", subnet).Warn("Found subscription but no tracked topic")
|
|
|
|
|
} else if sub == nil && topic != "" {
|
|
|
|
|
log.WithField("subnet", subnet).WithField("topic", topic).Debug("Found tracked topic but no subscription object")
|
|
|
|
|
}
|
|
|
|
|
sub.Cancel()
|
|
|
|
|
|
|
|
|
|
if sub != nil {
|
|
|
|
|
sub.Cancel()
|
|
|
|
|
}
|
|
|
|
|
return topic
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// track asks subscriptionTracker to hold on to the subscription for a given subnet so
|
|
|
|
|
// that we can remember that it is tracked and cancel its context when it's time to unsubscribe.
|
|
|
|
|
func (t *subnetTracker) track(subnet uint64, sub *pubsub.Subscription) {
|
|
|
|
|
if sub == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
func (t *subnetTracker) track(subnet uint64, sub *pubsub.Subscription, expectedTopic string) {
|
|
|
|
|
t.mu.Lock()
|
|
|
|
|
defer t.mu.Unlock()
|
|
|
|
|
t.subscriptions[subnet] = sub
|
|
|
|
|
// Use the topic from the subscription if available, otherwise use the expected one
|
|
|
|
|
// (when sub is nil, it means the subscription already existed)
|
|
|
|
|
if sub != nil {
|
|
|
|
|
t.trackedTopics[subnet] = sub.Topic()
|
|
|
|
|
} else {
|
|
|
|
|
t.trackedTopics[subnet] = expectedTopic
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// noopValidator is a no-op that only decodes the message, but does not check its contents.
|
|
|
|
|
@@ -200,21 +255,27 @@ func (s *Service) spawn(f func()) {
|
|
|
|
|
|
|
|
|
|
// Register PubSub subscribers
|
|
|
|
|
func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribe(p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber, digest)
|
|
|
|
|
})
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribe(p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber, digest)
|
|
|
|
|
})
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribe(p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber, digest)
|
|
|
|
|
})
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribe(p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber, digest)
|
|
|
|
|
})
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribe(p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber, digest)
|
|
|
|
|
})
|
|
|
|
|
// Idempotent fixed-topic subscriptions: skip if already active.
|
|
|
|
|
fixed := []struct {
|
|
|
|
|
topicFormat string
|
|
|
|
|
val wrappedVal
|
|
|
|
|
handle subHandler
|
|
|
|
|
}{
|
|
|
|
|
{p2p.BlockSubnetTopicFormat, s.validateBeaconBlockPubSub, s.beaconBlockSubscriber},
|
|
|
|
|
{p2p.AggregateAndProofSubnetTopicFormat, s.validateAggregateAndProof, s.beaconAggregateProofSubscriber},
|
|
|
|
|
{p2p.ExitSubnetTopicFormat, s.validateVoluntaryExit, s.voluntaryExitSubscriber},
|
|
|
|
|
{p2p.ProposerSlashingSubnetTopicFormat, s.validateProposerSlashing, s.proposerSlashingSubscriber},
|
|
|
|
|
{p2p.AttesterSlashingSubnetTopicFormat, s.validateAttesterSlashing, s.attesterSlashingSubscriber},
|
|
|
|
|
}
|
|
|
|
|
for _, f := range fixed {
|
|
|
|
|
full := s.addDigestToTopic(f.topicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
|
|
|
|
if s.subHandler.topicExists(full) {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
s.spawn(func(fmt string, val wrappedVal, handle subHandler) func() {
|
|
|
|
|
return func() { s.subscribe(fmt, val, handle, digest) }
|
|
|
|
|
}(f.topicFormat, f.val, f.handle))
|
|
|
|
|
}
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribeWithParameters(subscribeParameters{
|
|
|
|
|
topicFormat: p2p.AttestationSubnetTopicFormat,
|
|
|
|
|
@@ -228,14 +289,20 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
|
|
|
|
|
|
|
|
|
// New gossip topic in Altair
|
|
|
|
|
if params.BeaconConfig().AltairForkEpoch <= epoch {
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribe(
|
|
|
|
|
p2p.SyncContributionAndProofSubnetTopicFormat,
|
|
|
|
|
s.validateSyncContributionAndProof,
|
|
|
|
|
s.syncContributionAndProofSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
// SyncContributionAndProof fixed topic
|
|
|
|
|
{
|
|
|
|
|
full := s.addDigestToTopic(p2p.SyncContributionAndProofSubnetTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
|
|
|
|
if !s.subHandler.topicExists(full) {
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribe(
|
|
|
|
|
p2p.SyncContributionAndProofSubnetTopicFormat,
|
|
|
|
|
s.validateSyncContributionAndProof,
|
|
|
|
|
s.syncContributionAndProofSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribeWithParameters(subscribeParameters{
|
|
|
|
|
topicFormat: p2p.SyncCommitteeSubnetTopicFormat,
|
|
|
|
|
@@ -247,35 +314,49 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
if features.Get().EnableLightClient {
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribe(
|
|
|
|
|
p2p.LightClientOptimisticUpdateTopicFormat,
|
|
|
|
|
s.validateLightClientOptimisticUpdate,
|
|
|
|
|
s.lightClientOptimisticUpdateSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribe(
|
|
|
|
|
p2p.LightClientFinalityUpdateTopicFormat,
|
|
|
|
|
s.validateLightClientFinalityUpdate,
|
|
|
|
|
s.lightClientFinalityUpdateSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
// Light client fixed topics
|
|
|
|
|
{
|
|
|
|
|
full := s.addDigestToTopic(p2p.LightClientOptimisticUpdateTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
|
|
|
|
if !s.subHandler.topicExists(full) {
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribe(
|
|
|
|
|
p2p.LightClientOptimisticUpdateTopicFormat,
|
|
|
|
|
s.validateLightClientOptimisticUpdate,
|
|
|
|
|
s.lightClientOptimisticUpdateSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
{
|
|
|
|
|
full := s.addDigestToTopic(p2p.LightClientFinalityUpdateTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
|
|
|
|
if !s.subHandler.topicExists(full) {
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribe(
|
|
|
|
|
p2p.LightClientFinalityUpdateTopicFormat,
|
|
|
|
|
s.validateLightClientFinalityUpdate,
|
|
|
|
|
s.lightClientFinalityUpdateSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// New gossip topic in Capella
|
|
|
|
|
if params.BeaconConfig().CapellaForkEpoch <= epoch {
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribe(
|
|
|
|
|
p2p.BlsToExecutionChangeSubnetTopicFormat,
|
|
|
|
|
s.validateBlsToExecutionChange,
|
|
|
|
|
s.blsToExecutionChangeSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
full := s.addDigestToTopic(p2p.BlsToExecutionChangeSubnetTopicFormat, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
|
|
|
|
if !s.subHandler.topicExists(full) {
|
|
|
|
|
s.spawn(func() {
|
|
|
|
|
s.subscribe(
|
|
|
|
|
p2p.BlsToExecutionChangeSubnetTopicFormat,
|
|
|
|
|
s.validateBlsToExecutionChange,
|
|
|
|
|
s.blsToExecutionChangeSubscriber,
|
|
|
|
|
digest,
|
|
|
|
|
)
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// New gossip topic in Deneb, removed in Electra
|
|
|
|
|
@@ -337,22 +418,32 @@ func (s *Service) subscribe(topic string, validator wrappedVal, handle subHandle
|
|
|
|
|
// Impossible condition as it would mean topic does not exist.
|
|
|
|
|
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic)) // lint:nopanic -- Impossible condition.
|
|
|
|
|
}
|
|
|
|
|
s.subscribeWithBase(s.addDigestToTopic(topic, digest), validator, handle)
|
|
|
|
|
fullTopic := s.addDigestToTopic(topic, digest) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
|
|
|
|
_ = s.subscribeWithBase(fullTopic, validator, handle)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// subscribeWithBase subscribes to a topic that already includes the protocol suffix.
|
|
|
|
|
func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle subHandler) *pubsub.Subscription {
|
|
|
|
|
topic += s.cfg.p2p.Encoding().ProtocolSuffix()
|
|
|
|
|
log := log.WithField("topic", topic)
|
|
|
|
|
|
|
|
|
|
// Do not resubscribe already seen subscriptions.
|
|
|
|
|
ok := s.subHandler.topicExists(topic)
|
|
|
|
|
if ok {
|
|
|
|
|
// Fast-path: avoid attempting if already actively subscribed
|
|
|
|
|
if s.subHandler.topicExists(topic) {
|
|
|
|
|
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Atomically check if topic exists and reserve it for subscription if it doesn't
|
|
|
|
|
if !s.subHandler.tryReserveTopic(topic) {
|
|
|
|
|
// Another goroutine may have just subscribed or reserved it; report and skip
|
|
|
|
|
log.WithField("topic", topic).Debug("Provided topic already has an active subscription running")
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Register validator after successfully reserving the topic
|
|
|
|
|
if err := s.cfg.p2p.PubSub().RegisterTopicValidator(s.wrapAndReportValidation(topic, validator)); err != nil {
|
|
|
|
|
log.WithError(err).Error("Could not register validator for topic")
|
|
|
|
|
// Cancel the reservation since validator registration failed
|
|
|
|
|
s.subHandler.cancelReservation(topic)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -362,6 +453,8 @@ func (s *Service) subscribeWithBase(topic string, validator wrappedVal, handle s
|
|
|
|
|
// libp2p PubSub library or a subscription request to a topic that fails to match the topic
|
|
|
|
|
// subscription filter.
|
|
|
|
|
log.WithError(err).Error("Could not subscribe topic")
|
|
|
|
|
// Cancel the reservation since subscription failed
|
|
|
|
|
s.subHandler.cancelReservation(topic)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -504,8 +597,8 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
|
|
|
|
|
// This function mutates the `subscriptionBySubnet` map, which is used to keep track of the current subscriptions.
|
|
|
|
|
func (s *Service) pruneSubscriptions(t *subnetTracker, wantedSubnets map[uint64]bool) {
|
|
|
|
|
for _, subnet := range t.unwanted(wantedSubnets) {
|
|
|
|
|
t.cancelSubscription(subnet)
|
|
|
|
|
s.unSubscribeFromTopic(t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix()))
|
|
|
|
|
topic := t.cancelSubscription(subnet)
|
|
|
|
|
s.unSubscribeFromTopic(topic)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -530,10 +623,50 @@ func (s *Service) subscribeToSubnets(t *subnetTracker) error {
|
|
|
|
|
|
|
|
|
|
subnetsToJoin := t.getSubnetsToJoin(s.cfg.clock.CurrentSlot())
|
|
|
|
|
s.pruneSubscriptions(t, subnetsToJoin)
|
|
|
|
|
for _, subnet := range t.missing(subnetsToJoin) {
|
|
|
|
|
// TODO: subscribeWithBase appends the protocol suffix, other methods don't. Make this consistent.
|
|
|
|
|
topic := t.fullTopic(subnet, "")
|
|
|
|
|
t.track(subnet, s.subscribeWithBase(topic, t.validate, t.handle))
|
|
|
|
|
|
|
|
|
|
// Filter out subnets already globally subscribed and mark them tracked to prevent re-reserve.
|
|
|
|
|
// Work on a copy to avoid mutating the original map while iterating later code paths.
|
|
|
|
|
wanted := make(map[uint64]bool, len(subnetsToJoin))
|
|
|
|
|
for subnet := range subnetsToJoin {
|
|
|
|
|
full := t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())
|
|
|
|
|
if s.subHandler.topicExists(full) {
|
|
|
|
|
t.track(subnet, nil, full)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
wanted[subnet] = true
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Atomically reserve subnets that still need subscription
|
|
|
|
|
reservedSubnets := t.tryReserveSubnets(wanted)
|
|
|
|
|
if len(reservedSubnets) > 0 {
|
|
|
|
|
log.WithFields(logrus.Fields{
|
|
|
|
|
"reservedSubnets": reservedSubnets,
|
|
|
|
|
"totalWanted": len(subnetsToJoin),
|
|
|
|
|
}).Debug("Reserved missing subnets for subscription")
|
|
|
|
|
}
|
|
|
|
|
for _, subnet := range reservedSubnets {
|
|
|
|
|
fullTopic := t.fullTopic(subnet, s.cfg.p2p.Encoding().ProtocolSuffix())
|
|
|
|
|
// Skip if already subscribed globally (e.g., another goroutine won the race).
|
|
|
|
|
// Mark as tracked so we don't repeatedly reserve and skip on subsequent ticks.
|
|
|
|
|
if s.subHandler.topicExists(fullTopic) {
|
|
|
|
|
t.track(subnet, nil, fullTopic)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
log.WithFields(logrus.Fields{
|
|
|
|
|
"subnet": subnet,
|
|
|
|
|
"topic": fullTopic,
|
|
|
|
|
}).Debug("Attempting to subscribe to reserved subnet")
|
|
|
|
|
sub := s.subscribeWithBase(fullTopic, t.validate, t.handle)
|
|
|
|
|
if sub != nil {
|
|
|
|
|
t.track(subnet, sub, fullTopic)
|
|
|
|
|
} else {
|
|
|
|
|
// Subscription failed, cancel the reservation
|
|
|
|
|
t.cancelReservation(subnet)
|
|
|
|
|
log.WithFields(logrus.Fields{
|
|
|
|
|
"subnet": subnet,
|
|
|
|
|
"topic": fullTopic,
|
|
|
|
|
}).Debug("Subscription failed, canceled reservation")
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
|