Compare commits

...

1 Commits

Author SHA1 Message Date
rauljordan
61b663d2ad fix up faulty slasher ticker 2021-10-03 00:10:26 -04:00
3 changed files with 49 additions and 23 deletions

View File

@@ -76,10 +76,14 @@ func (s *Service) receiveBlocks(ctx context.Context, beaconBlockHeadersChan chan
// these attestations from a queue, then group them all by validator chunk index.
// This grouping will allow us to perform detection on batches of attestations
// per validator chunk index which can be done concurrently.
func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-chan types.Slot) {
func (s *Service) processQueuedAttestations(ctx context.Context) {
tick := make(chan types.Slot, 1)
defer close(tick)
sub := s.slotTickFeed.Subscribe(tick)
defer sub.Unsubscribe()
for {
select {
case currentSlot := <-slotTicker:
case currentSlot := <-tick:
attestations := s.attsQueue.dequeue()
currentEpoch := slots.ToEpoch(currentSlot)
// We take all the attestations in the queue and filter out
@@ -134,10 +138,14 @@ func (s *Service) processQueuedAttestations(ctx context.Context, slotTicker <-ch
// Process queued blocks every time an epoch ticker fires. We retrieve
// these blocks from a queue, then perform double proposal detection.
func (s *Service) processQueuedBlocks(ctx context.Context, slotTicker <-chan types.Slot) {
func (s *Service) processQueuedBlocks(ctx context.Context) {
tick := make(chan types.Slot, 1)
defer close(tick)
sub := s.slotTickFeed.Subscribe(tick)
defer sub.Unsubscribe()
for {
select {
case currentSlot := <-slotTicker:
case currentSlot := <-tick:
blocks := s.blksQueue.dequeue()
currentEpoch := slots.ToEpoch(currentSlot)
@@ -174,10 +182,14 @@ func (s *Service) processQueuedBlocks(ctx context.Context, slotTicker <-chan typ
}
// Prunes slasher data on each slot tick to prevent unnecessary build-up of disk space usage.
func (s *Service) pruneSlasherData(ctx context.Context, slotTicker <-chan types.Slot) {
func (s *Service) pruneSlasherData(ctx context.Context) {
tick := make(chan types.Slot, 1)
defer close(tick)
sub := s.slotTickFeed.Subscribe(tick)
defer sub.Unsubscribe()
for {
select {
case <-slotTicker:
case <-tick:
headEpoch := slots.ToEpoch(s.serviceCfg.HeadStateFetcher.HeadSlot())
if err := s.pruneSlasherDataWithinSlidingWindow(ctx, headEpoch); err != nil {
log.WithError(err).Error("Could not prune slasher data")

View File

@@ -57,8 +57,8 @@ type Service struct {
blksQueue *blocksQueue
ctx context.Context
cancel context.CancelFunc
slotTicker *slots.SlotTicker
genesisTime time.Time
slotTickFeed *event.Feed
}
// New instantiates a new slasher from configuration values.
@@ -67,12 +67,13 @@ func New(ctx context.Context, srvCfg *ServiceConfig) (*Service, error) {
return &Service{
params: DefaultParams(),
serviceCfg: srvCfg,
indexedAttsChan: make(chan *ethpb.IndexedAttestation, 1),
beaconBlockHeadersChan: make(chan *ethpb.SignedBeaconBlockHeader, 1),
indexedAttsChan: make(chan *ethpb.IndexedAttestation, 100),
beaconBlockHeadersChan: make(chan *ethpb.SignedBeaconBlockHeader, 100),
attsQueue: newAttestationsQueue(),
blksQueue: newBlocksQueue(),
ctx: ctx,
cancel: cancel,
slotTickFeed: new(event.Feed),
}, nil
}
@@ -112,28 +113,26 @@ func (s *Service) run() {
return
}
stateSub.Unsubscribe()
secondsPerSlot := params.BeaconConfig().SecondsPerSlot
s.slotTicker = slots.NewSlotTicker(s.genesisTime, secondsPerSlot)
go s.tickSlots()
stateSub.Unsubscribe()
s.waitForSync(s.genesisTime)
log.Info("Completed chain sync, starting slashing detection")
indexedAttsChan := make(chan *ethpb.IndexedAttestation, 1)
beaconBlockHeadersChan := make(chan *ethpb.SignedBeaconBlockHeader, 1)
log.Info("Completed chain sync, starting slashing detection")
go s.processQueuedAttestations(s.ctx, s.slotTicker.C())
go s.processQueuedBlocks(s.ctx, s.slotTicker.C())
go s.receiveAttestations(s.ctx, indexedAttsChan)
go s.receiveBlocks(s.ctx, beaconBlockHeadersChan)
go s.pruneSlasherData(s.ctx, s.slotTicker.C())
go s.processQueuedAttestations(s.ctx)
go s.processQueuedBlocks(s.ctx)
go s.pruneSlasherData(s.ctx)
}
// Stop the slasher service.
func (s *Service) Stop() error {
s.cancel()
if s.slotTicker != nil {
s.slotTicker.Done()
}
return nil
}
@@ -142,13 +141,31 @@ func (s *Service) Status() error {
return nil
}
// Tick slots and notify over event feed.
func (s *Service) tickSlots() {
slotsTicker := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot)
defer slotsTicker.Done()
for {
select {
case slot := <-slotsTicker.C():
s.slotTickFeed.Send(slot)
case <-s.ctx.Done():
return
}
}
}
func (s *Service) waitForSync(genesisTime time.Time) {
if slots.SinceGenesis(genesisTime) == 0 || !s.serviceCfg.SyncChecker.Syncing() {
return
}
tick := make(chan time.Time, 1)
defer close(tick)
sub := s.slotTickFeed.Subscribe(tick)
defer sub.Unsubscribe()
for {
select {
case <-s.slotTicker.C():
case <-tick:
// If node is still syncing, do not operate slasher.
if s.serviceCfg.SyncChecker.Syncing() {
continue

View File

@@ -15,7 +15,6 @@ import (
mockSync "github.com/prysmaticlabs/prysm/beacon-chain/sync/initial-sync/testing"
"github.com/prysmaticlabs/prysm/testing/require"
"github.com/prysmaticlabs/prysm/testing/util"
"github.com/prysmaticlabs/prysm/time/slots"
"github.com/sirupsen/logrus"
logTest "github.com/sirupsen/logrus/hooks/test"
)
@@ -59,7 +58,6 @@ func TestService_StartStop_ChainStartEvent(t *testing.T) {
Data: &statefeed.ChainStartedData{StartTime: time.Now()},
})
time.Sleep(time.Millisecond * 100)
srv.slotTicker = &slots.SlotTicker{}
require.NoError(t, srv.Stop())
require.NoError(t, srv.Status())
require.LogsContain(t, hook, "received chain start event")
@@ -92,7 +90,6 @@ func TestService_StartStop_ChainAlreadyInitialized(t *testing.T) {
Data: &statefeed.InitializedData{StartTime: time.Now()},
})
time.Sleep(time.Millisecond * 100)
srv.slotTicker = &slots.SlotTicker{}
require.NoError(t, srv.Stop())
require.NoError(t, srv.Status())
require.LogsContain(t, hook, "chain already initialized")