Use Unique Slot Time Tickers for Slasher (#9723)

This commit is contained in:
Raul Jordan
2021-10-03 02:49:01 -05:00
committed by GitHub
parent c21e43e4c5
commit f7845afa57
2 changed files with 28 additions and 12 deletions

View File

@@ -57,8 +57,10 @@ type Service struct {
blksQueue *blocksQueue
ctx context.Context
cancel context.CancelFunc
slotTicker *slots.SlotTicker
genesisTime time.Time
attsSlotTicker *slots.SlotTicker
blocksSlotTicker *slots.SlotTicker
pruningSlotTicker *slots.SlotTicker
}
// New instantiates a new slasher from configuration values.
@@ -113,26 +115,34 @@ func (s *Service) run() {
}
stateSub.Unsubscribe()
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
s.slotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
s.waitForSync(s.genesisTime)
indexedAttsChan := make(chan *ethpb.IndexedAttestation, 1)
beaconBlockHeadersChan := make(chan *ethpb.SignedBeaconBlockHeader, 1)
log.Info("Completed chain sync, starting slashing detection")
go s.processQueuedAttestations(s.ctx, s.slotTicker.C())
go s.processQueuedBlocks(s.ctx, s.slotTicker.C())
go s.receiveAttestations(s.ctx, indexedAttsChan)
go s.receiveBlocks(s.ctx, beaconBlockHeadersChan)
go s.pruneSlasherData(s.ctx, s.slotTicker.C())
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
s.attsSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
s.blocksSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
s.pruningSlotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
go s.processQueuedAttestations(s.ctx, s.attsSlotTicker.C())
go s.processQueuedBlocks(s.ctx, s.blocksSlotTicker.C())
go s.pruneSlasherData(s.ctx, s.pruningSlotTicker.C())
}
// Stop the slasher service.
func (s *Service) Stop() error {
s.cancel()
if s.slotTicker != nil {
s.slotTicker.Done()
if s.attsSlotTicker != nil {
s.attsSlotTicker.Done()
}
if s.blocksSlotTicker != nil {
s.blocksSlotTicker.Done()
}
if s.pruningSlotTicker != nil {
s.pruningSlotTicker.Done()
}
return nil
}
@@ -146,9 +156,11 @@ func (s *Service) waitForSync(genesisTime time.Time) {
if slots.SinceGenesis(genesisTime) == 0 || !s.serviceCfg.SyncChecker.Syncing() {
return
}
slotTicker := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot)
defer slotTicker.Done()
for {
select {
case <-s.slotTicker.C():
case <-slotTicker.C():
// If node is still syncing, do not operate slasher.
if s.serviceCfg.SyncChecker.Syncing() {
continue

View File

@@ -59,7 +59,9 @@ func TestService_StartStop_ChainStartEvent(t *testing.T) {
Data: &statefeed.ChainStartedData{StartTime: time.Now()},
})
time.Sleep(time.Millisecond * 100)
srv.slotTicker = &slots.SlotTicker{}
srv.attsSlotTicker = &slots.SlotTicker{}
srv.blocksSlotTicker = &slots.SlotTicker{}
srv.pruningSlotTicker = &slots.SlotTicker{}
require.NoError(t, srv.Stop())
require.NoError(t, srv.Status())
require.LogsContain(t, hook, "received chain start event")
@@ -92,7 +94,9 @@ func TestService_StartStop_ChainAlreadyInitialized(t *testing.T) {
Data: &statefeed.InitializedData{StartTime: time.Now()},
})
time.Sleep(time.Millisecond * 100)
srv.slotTicker = &slots.SlotTicker{}
srv.attsSlotTicker = &slots.SlotTicker{}
srv.blocksSlotTicker = &slots.SlotTicker{}
srv.pruningSlotTicker = &slots.SlotTicker{}
require.NoError(t, srv.Stop())
require.NoError(t, srv.Status())
require.LogsContain(t, hook, "chain already initialized")