This commit is contained in:
Nishant Das
2020-04-30 03:28:22 +08:00
committed by GitHub
parent 2c1e3aa4ee
commit 4d30519dda

View File

@@ -177,7 +177,7 @@ func (r *Service) subscribeWithBase(base proto.Message, topic string, validator
msg, err := sub.Next(r.ctx)
if err != nil {
// This should only happen when the context is cancelled or subscription is cancelled.
log.WithError(err).Error("Subscription next failed")
log.WithError(err).Warn("Subscription next failed")
return
}
@@ -241,7 +241,7 @@ func (r *Service) subscribeDynamicWithSubnets(
// Update desired topic indices for aggregator
wantedSubs := r.aggregatorCommitteeIndices(currentSlot)
// Resize as appropriate.
r.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat)
r.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)
for _, idx := range wantedSubs {
if _, exists := subscriptions[idx]; !exists {
@@ -311,7 +311,7 @@ func (r *Service) subscribeDynamic(topicFormat string, determineSubsLen func() i
// revalidate that our currently connected subnets are valid.
func (r *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subscription,
wantedSubs []uint64, topicFormat string) {
wantedSubs []uint64, topicFormat string, digest [4]byte) {
for k, v := range subscriptions {
var wanted bool
for _, idx := range wantedSubs {
@@ -322,7 +322,8 @@ func (r *Service) reValidateSubscriptions(subscriptions map[uint64]*pubsub.Subsc
}
if !wanted && v != nil {
v.Cancel()
if err := r.p2p.PubSub().UnregisterTopicValidator(fmt.Sprintf(topicFormat, k)); err != nil {
fullTopic := fmt.Sprintf(topicFormat, digest, k) + r.p2p.Encoding().ProtocolSuffix()
if err := r.p2p.PubSub().UnregisterTopicValidator(fullTopic); err != nil {
log.WithError(err).Error("Failed to unregister topic validator")
}
delete(subscriptions, k)