diff --git a/beacon-chain/operations/synccommittee/message.go b/beacon-chain/operations/synccommittee/message.go index 5d0eb2c66a..01f50592d7 100644 --- a/beacon-chain/operations/synccommittee/message.go +++ b/beacon-chain/operations/synccommittee/message.go @@ -30,8 +30,22 @@ func (s *Store) SaveSyncCommitteeMessage(msg *ethpb.SyncCommitteeMessage) error return errors.New("not typed []ethpb.SyncCommitteeMessage") } - messages = append(messages, copied) - savedSyncCommitteeMessageTotal.Inc() + idx := -1 + for i, msg := range messages { + if msg.ValidatorIndex == copied.ValidatorIndex { + idx = i + break + } + } + if idx >= 0 { + // Override the existing messages with a new one + messages[idx] = copied + } else { + // Append the new message + messages = append(messages, copied) + savedSyncCommitteeMessageTotal.Inc() + } + return s.messageCache.Push(&queue.Item{ Key: syncCommitteeKey(msg.Slot), Value: messages, diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index ed3e8d89ab..41d160c86f 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -125,6 +125,14 @@ var ( Help: "Time to verify gossiped blocks", }, ) + + // Sync committee verification performance. + syncMessagesForUnkownBlocks = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "sync_committee_messages_unnkown_root", + Help: "The number of sync committee messages that are checked against DB to see if there vote is for an unknown root", + }, + ) ) func (s *Service) updateMetrics() { diff --git a/beacon-chain/sync/validate_sync_committee_message.go b/beacon-chain/sync/validate_sync_committee_message.go index 80a0e3c49d..9ffea67c02 100644 --- a/beacon-chain/sync/validate_sync_committee_message.go +++ b/beacon-chain/sync/validate_sync_committee_message.go @@ -90,7 +90,7 @@ func (s *Service) validateSyncCommitteeMessage( ctx, ignoreEmptyCommittee(committeeIndices), s.rejectIncorrectSyncCommittee(committeeIndices, *msg.Topic), - s.ignoreHasSeenSyncMsg(m, committeeIndices), + s.ignoreHasSeenSyncMsg(ctx, m, committeeIndices), s.rejectInvalidSyncCommitteeSignature(m), ); result != pubsub.ValidationAccept { return result, err @@ -123,24 +123,45 @@ func (s *Service) markSyncCommitteeMessagesSeen(committeeIndices []primitives.Co subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount for _, idx := range committeeIndices { subnet := uint64(idx) / subCommitteeSize - s.setSeenSyncMessageIndexSlot(m.Slot, m.ValidatorIndex, subnet) + s.setSeenSyncMessageIndexSlot(m, subnet) } } // Returns true if the node has received sync committee for the validator with index and slot. -func (s *Service) hasSeenSyncMessageIndexSlot(slot primitives.Slot, valIndex primitives.ValidatorIndex, subCommitteeIndex uint64) bool { +func (s *Service) hasSeenSyncMessageIndexSlot(ctx context.Context, m *ethpb.SyncCommitteeMessage, subCommitteeIndex uint64) bool { s.seenSyncMessageLock.RLock() defer s.seenSyncMessageLock.RUnlock() - _, seen := s.seenSyncMessageCache.Get(seenSyncCommitteeKey(slot, valIndex, subCommitteeIndex)) - return seen + rt, seen := s.seenSyncMessageCache.Get(seenSyncCommitteeKey(m.Slot, m.ValidatorIndex, subCommitteeIndex)) + if !seen { + // return early if this is the first message + return false + } + root, ok := rt.([32]byte) + if !ok { + return true // Impossible. Return true to be safe + } + if !s.cfg.chain.InForkchoice(root) && !s.cfg.beaconDB.HasBlock(ctx, root) { + syncMessagesForUnkownBlocks.Inc() + return true + } + msgRoot := [32]byte(m.BlockRoot) + if !s.cfg.chain.InForkchoice(msgRoot) && !s.cfg.beaconDB.HasBlock(ctx, msgRoot) { + syncMessagesForUnkownBlocks.Inc() + return false + } + headRoot := s.cfg.chain.CachedHeadRoot() + if root == headRoot { + return true + } + return msgRoot != headRoot } // Set sync committee message validator index and slot as seen. -func (s *Service) setSeenSyncMessageIndexSlot(slot primitives.Slot, valIndex primitives.ValidatorIndex, subCommitteeIndex uint64) { +func (s *Service) setSeenSyncMessageIndexSlot(m *ethpb.SyncCommitteeMessage, subCommitteeIndex uint64) { s.seenSyncMessageLock.Lock() defer s.seenSyncMessageLock.Unlock() - key := seenSyncCommitteeKey(slot, valIndex, subCommitteeIndex) - s.seenSyncMessageCache.Add(key, true) + key := seenSyncCommitteeKey(m.Slot, m.ValidatorIndex, subCommitteeIndex) + s.seenSyncMessageCache.Add(key, [32]byte(m.BlockRoot)) } // The `subnet_id` is valid for the given validator. This implies the validator is part of the broader @@ -184,7 +205,7 @@ func (s *Service) rejectIncorrectSyncCommittee( // There has been no other valid sync committee signature for the declared `slot`, `validator_index`, // and `subcommittee_index`. In the event of `validator_index` belongs to multiple subnets, as long // as one subnet has not been seen, we should let it in. -func (s *Service) ignoreHasSeenSyncMsg( +func (s *Service) ignoreHasSeenSyncMsg(ctx context.Context, m *ethpb.SyncCommitteeMessage, committeeIndices []primitives.CommitteeIndex, ) validationFn { return func(ctx context.Context) (pubsub.ValidationResult, error) { @@ -192,7 +213,7 @@ func (s *Service) ignoreHasSeenSyncMsg( subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount for _, idx := range committeeIndices { subnet := uint64(idx) / subCommitteeSize - if !s.hasSeenSyncMessageIndexSlot(m.Slot, m.ValidatorIndex, subnet) { + if !s.hasSeenSyncMessageIndexSlot(ctx, m, subnet) { isValid = true break } diff --git a/beacon-chain/sync/validate_sync_committee_message_test.go b/beacon-chain/sync/validate_sync_committee_message_test.go index e091829f56..26c714d62a 100644 --- a/beacon-chain/sync/validate_sync_committee_message_test.go +++ b/beacon-chain/sync/validate_sync_committee_message_test.go @@ -144,8 +144,12 @@ func TestService_ValidateSyncCommitteeMessage(t *testing.T) { s.cfg.stateGen = stategen.New(beaconDB, doublylinkedtree.New()) s.cfg.beaconDB = beaconDB s.initCaches() - - s.setSeenSyncMessageIndexSlot(1, 1, 0) + m := ðpb.SyncCommitteeMessage{ + Slot: 1, + ValidatorIndex: 1, + BlockRoot: params.BeaconConfig().ZeroHash[:], + } + s.setSeenSyncMessageIndexSlot(m, 0) return s, topic, startup.NewClock(time.Now(), [32]byte{}) }, args: args{ @@ -441,10 +445,15 @@ func TestService_ignoreHasSeenSyncMsg(t *testing.T) { name: "has seen", setupSvc: func(s *Service, msg *ethpb.SyncCommitteeMessage, topic string) (*Service, string) { s.initCaches() - s.setSeenSyncMessageIndexSlot(1, 0, 0) + m := ðpb.SyncCommitteeMessage{ + Slot: 1, + BlockRoot: params.BeaconConfig().ZeroHash[:], + } + s.setSeenSyncMessageIndexSlot(m, 0) return s, "" }, - msg: ðpb.SyncCommitteeMessage{ValidatorIndex: 0, Slot: 1}, + msg: ðpb.SyncCommitteeMessage{ValidatorIndex: 0, Slot: 1, + BlockRoot: params.BeaconConfig().ZeroHash[:]}, committee: []primitives.CommitteeIndex{1, 2, 3}, want: pubsub.ValidationIgnore, }, @@ -452,19 +461,26 @@ func TestService_ignoreHasSeenSyncMsg(t *testing.T) { name: "has not seen", setupSvc: func(s *Service, msg *ethpb.SyncCommitteeMessage, topic string) (*Service, string) { s.initCaches() - s.setSeenSyncMessageIndexSlot(1, 0, 0) + m := ðpb.SyncCommitteeMessage{ + Slot: 1, + BlockRoot: params.BeaconConfig().ZeroHash[:], + } + s.setSeenSyncMessageIndexSlot(m, 0) return s, "" }, - msg: ðpb.SyncCommitteeMessage{ValidatorIndex: 1, Slot: 1}, + msg: ðpb.SyncCommitteeMessage{ValidatorIndex: 1, Slot: 1, + BlockRoot: bytesutil.PadTo([]byte{'A'}, 32)}, committee: []primitives.CommitteeIndex{1, 2, 3}, want: pubsub.ValidationAccept, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := &Service{} + s := &Service{ + cfg: &config{chain: &mockChain.ChainService{}}, + } s, _ = tt.setupSvc(s, tt.msg, "") - f := s.ignoreHasSeenSyncMsg(tt.msg, tt.committee) + f := s.ignoreHasSeenSyncMsg(context.Background(), tt.msg, tt.committee) result, err := f(context.Background()) _ = err require.Equal(t, tt.want, result)