Use SlotTickerWithOffset for StreamIndexedAttestations (#5999)

* Change streamindexed to slot ticker with offset
* Make 2/3rds
* Add test
* Merge branch 'master' of github.com:prysmaticlabs/prysm into ticker-offset
* Add check for offset
* Merge branch 'master' of github.com:prysmaticlabs/prysm into ticker-offset
* Fix test
* Merge refs/heads/master into ticker-offset
This commit is contained in:
Ivan Martinez
2020-05-26 15:34:09 -04:00
committed by GitHub
parent 0f7cf212a2
commit 5afee4ba3a
4 changed files with 51 additions and 4 deletions

View File

@@ -5,7 +5,6 @@ import (
"sort"
"strconv"
"strings"
"time"
ptypes "github.com/gogo/protobuf/types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
@@ -332,11 +331,11 @@ func (bs *Server) StreamIndexedAttestations(
// already being done by the attestation pool in the operations service.
func (bs *Server) collectReceivedAttestations(ctx context.Context) {
attsByRoot := make(map[[32]byte][]*ethpb.Attestation)
halfASlot := slotutil.DivideSlotBy(2 /* 1/2 slot duration */)
ticker := time.NewTicker(halfASlot)
twoThirdsASlot := 2 * slotutil.DivideSlotBy(3) /* 2/3 slot duration */
ticker := slotutil.GetSlotTickerWithOffset(bs.GenesisTimeFetcher.GenesisTime(), twoThirdsASlot, params.BeaconConfig().SecondsPerSlot)
for {
select {
case <-ticker.C:
case _ = <-ticker.C():
aggregatedAttsByTarget := make(map[[32]byte][]*ethpb.Attestation)
for root, atts := range attsByRoot {
// We aggregate the received attestations, we know they all have the same data root.

View File

@@ -983,6 +983,9 @@ func TestServer_StreamIndexedAttestations_ContextCanceled(t *testing.T) {
server := &Server{
Ctx: ctx,
AttestationNotifier: chainService.OperationNotifier(),
GenesisTimeFetcher: &chainMock.ChainService{
Genesis: time.Now(),
},
}
exitRoutine := make(chan bool)

View File

@@ -51,6 +51,23 @@ func GetSlotTicker(genesisTime time.Time, secondsPerSlot uint64) *SlotTicker {
return ticker
}
// GetSlotTickerWithOffset is a constructor for SlotTicker that allows a offset of time from genesis,
// entering a offset greater than secondsPerSlot is not allowed.
func GetSlotTickerWithOffset(genesisTime time.Time, offset time.Duration, secondsPerSlot uint64) *SlotTicker {
if genesisTime.Unix() == 0 {
panic("zero genesis time")
}
if offset > time.Duration(secondsPerSlot)*time.Second {
panic("invalid ticker offset")
}
ticker := &SlotTicker{
c: make(chan uint64),
done: make(chan struct{}),
}
ticker.start(genesisTime.Add(offset), secondsPerSlot, roughtime.Since, roughtime.Until, time.After)
return ticker
}
func (s *SlotTicker) start(
genesisTime time.Time,
secondsPerSlot uint64,

View File

@@ -109,3 +109,31 @@ func TestSlotTickerGenesis(t *testing.T) {
t.Fatalf("Expected %d, got %d", 1, slot)
}
}
func TestGetSlotTickerWithOffset_OK(t *testing.T) {
genesisTime := time.Now()
secondsPerSlot := uint64(4)
offset := time.Duration(secondsPerSlot/2) * time.Second
offsetTicker := GetSlotTickerWithOffset(genesisTime, offset, secondsPerSlot)
normalTicker := GetSlotTicker(genesisTime, secondsPerSlot)
firstTicked := 0
for {
select {
case _ = <-offsetTicker.C():
if firstTicked != 1 {
t.Fatal("Expected other ticker to tick first")
}
firstTicked = 2
return
case _ = <-normalTicker.C():
if firstTicked != 0 {
t.Fatal("Expected normal ticker to tick first")
}
firstTicked = 1
break
}
}
}