Implement Precise Ticker For Slot Interval (#635)

This commit is contained in:
Yutaro Mori
2018-10-11 01:17:48 +09:00
committed by Raul Jordan
parent efeb6976d1
commit 724ae3c999
14 changed files with 282 additions and 102 deletions

View File

@@ -7,7 +7,6 @@ go_library(
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/db:go_default_library",
"//beacon-chain/params:go_default_library",
"//beacon-chain/powchain:go_default_library",
"//beacon-chain/types:go_default_library",
"//beacon-chain/utils:go_default_library",

View File

@@ -8,7 +8,6 @@ import (
"time"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/params"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
"github.com/prysmaticlabs/prysm/beacon-chain/types"
"github.com/prysmaticlabs/prysm/beacon-chain/utils"
@@ -31,8 +30,8 @@ type ChainService struct {
canonicalCrystallizedStateFeed *event.Feed
blocksPendingProcessing [][32]byte
lock sync.Mutex
genesisTimestamp time.Time
slotAlignmentDuration uint64
genesisTime time.Time
slotTicker utils.SlotTicker
enableCrossLinks bool
enableRewardChecking bool
enableAttestationValidity bool
@@ -70,7 +69,6 @@ func NewChainService(ctx context.Context, cfg *Config) (*ChainService, error) {
enableCrossLinks: cfg.EnableCrossLinks,
enableRewardChecking: cfg.EnableRewardChecking,
enableAttestationValidity: cfg.EnableAttestationValidity,
slotAlignmentDuration: params.GetConfig().SlotDuration,
}, nil
}
@@ -80,32 +78,23 @@ func (c *ChainService) Start() {
// to truly continue across sessions.
log.Info("Starting service")
genesis, err := c.beaconDB.GetCanonicalBlockForSlot(0)
var err error
c.genesisTime, err = c.beaconDB.GetGenesisTime()
if err != nil {
log.Fatalf("Could not get genesis block: %v", err)
}
c.genesisTimestamp, err = genesis.Timestamp()
if err != nil {
log.Fatalf("Could not get genesis timestamp: %v", err)
log.Fatal(err)
return
}
// If the genesis time was at 12:00:00PM and the current time is 12:00:03PM,
// the next slot should tick at 12:00:08PM. We can accomplish this
// using utils.BlockingWait and passing in the desired
// slot duration.
//
// Instead of utilizing SlotDuration from config, we utilize a property of
// RPC service struct so this value can be set to 0 seconds
// as a parameter in tests. Otherwise, tests would sleep.
utils.BlockingWait(time.Duration(c.slotAlignmentDuration) * time.Second)
go c.updateHead(time.NewTicker(time.Second * time.Duration(params.GetConfig().SlotDuration)).C)
c.slotTicker = utils.GetSlotTicker(c.genesisTime)
go c.updateHead(c.slotTicker.C())
go c.blockProcessing()
}
// Stop the blockchain service's main event loop and associated goroutines.
func (c *ChainService) Stop() error {
defer c.cancel()
c.slotTicker.Done()
log.Info("Stopping service")
return nil
}
@@ -144,13 +133,13 @@ func (c *ChainService) doesPoWBlockExist(block *types.Block) bool {
// at an in-memory slice of block hashes pending processing and
// selects the best block according to the in-protocol fork choice
// rule as canonical. This block is then persisted to storage.
func (c *ChainService) updateHead(slotInterval <-chan time.Time) {
func (c *ChainService) updateHead(slotInterval <-chan uint64) {
for {
select {
case <-c.ctx.Done():
return
case <-slotInterval:
log.WithField("slotNumber", utils.CurrentSlot(c.genesisTimestamp)).Info("New beacon slot")
case slot := <-slotInterval:
log.WithField("slotNumber", slot).Info("New beacon slot")
// First, we check if there were any blocks processed in the previous slot.
// If there is, we fetch the first one from the DB.
@@ -299,7 +288,7 @@ func (c *ChainService) blockProcessing() {
cState,
parent.SlotNumber(),
c.enableAttestationValidity,
c.genesisTimestamp,
c.genesisTime,
); !valid {
log.Debugf("Block failed validity conditions: %v", err)
continue

View File

@@ -267,9 +267,9 @@ func TestUpdateHeadEmpty(t *testing.T) {
})
exitRoutine := make(chan bool)
timeChan := make(chan time.Time)
slotChan := make(chan uint64)
go func() {
chainService.updateHead(timeChan)
chainService.updateHead(slotChan)
<-exitRoutine
}()
@@ -279,7 +279,7 @@ func TestUpdateHeadEmpty(t *testing.T) {
// If blocks pending processing is empty, the updateHead routine does nothing.
chainService.blocksPendingProcessing = [][32]byte{}
timeChan <- time.Now()
slotChan <- 0
chainService.cancel()
exitRoutine <- true
@@ -292,9 +292,9 @@ func TestUpdateHeadNoBlock(t *testing.T) {
defer chainService.beaconDB.Close()
exitRoutine := make(chan bool)
timeChan := make(chan time.Time)
slotChan := make(chan uint64)
go func() {
chainService.updateHead(timeChan)
chainService.updateHead(slotChan)
<-exitRoutine
}()
@@ -309,7 +309,7 @@ func TestUpdateHeadNoBlock(t *testing.T) {
}
chainService.blocksPendingProcessing = [][32]byte{}
chainService.blocksPendingProcessing = append(chainService.blocksPendingProcessing, fakeBlockHash)
timeChan <- time.Now()
slotChan <- 0
chainService.cancel()
exitRoutine <- true
@@ -322,9 +322,9 @@ func TestUpdateHeadNoParent(t *testing.T) {
defer chainService.beaconDB.Close()
exitRoutine := make(chan bool)
timeChan := make(chan time.Time)
slotChan := make(chan uint64)
go func() {
chainService.updateHead(timeChan)
chainService.updateHead(slotChan)
<-exitRoutine
}()
@@ -345,7 +345,7 @@ func TestUpdateHeadNoParent(t *testing.T) {
chainService.blocksPendingProcessing = [][32]byte{}
chainService.blocksPendingProcessing = append(chainService.blocksPendingProcessing, noParentBlockHash)
timeChan <- time.Now()
slotChan <- 0
chainService.cancel()
exitRoutine <- true
@@ -384,9 +384,9 @@ func TestUpdateHead(t *testing.T) {
}
exitRoutine := make(chan bool)
timeChan := make(chan time.Time)
slotChan := make(chan uint64)
go func() {
chainService.updateHead(timeChan)
chainService.updateHead(slotChan)
<-exitRoutine
}()
@@ -397,7 +397,7 @@ func TestUpdateHead(t *testing.T) {
// If blocks pending processing is empty, the updateHead routine does nothing.
chainService.blocksPendingProcessing = [][32]byte{}
chainService.blocksPendingProcessing = append(chainService.blocksPendingProcessing, hash)
timeChan <- time.Now()
slotChan <- 0
chainService.cancel()
exitRoutine <- true

View File

@@ -3,6 +3,7 @@ package db
import (
"errors"
"fmt"
"time"
"github.com/gogo/protobuf/proto"
"github.com/prysmaticlabs/prysm/beacon-chain/types"
@@ -112,6 +113,20 @@ func (db *BeaconDB) GetCanonicalBlockForSlot(slotNumber uint64) (*types.Block, e
return block, err
}
// GetGenesisTime returns the timestamp for the genesis block
func (db *BeaconDB) GetGenesisTime() (time.Time, error) {
genesis, err := db.GetCanonicalBlockForSlot(0)
if err != nil {
return time.Time{}, fmt.Errorf("Could not get genesis block: %v", err)
}
genesisTime, err := genesis.Timestamp()
if err != nil {
return time.Time{}, fmt.Errorf("Could not get genesis timestamp: %v", err)
}
return genesisTime, nil
}
// GetSimulatedBlock retrieves the last block broadcast by the simulator.
func (db *BeaconDB) GetSimulatedBlock() (*types.Block, error) {
enc, err := db.get(simulatedBlockKey)

View File

@@ -173,3 +173,43 @@ func TestGetBlockBySlotNumber(t *testing.T) {
t.Fatal("there should be an error because block does not exist in the db")
}
}
func TestGetGenesisTime(t *testing.T) {
beaconDB := startInMemoryBeaconDB(t)
defer beaconDB.Close()
time, err := beaconDB.GetGenesisTime()
if err != nil {
t.Fatalf("GetGenesisTime failed: %v", err)
}
time2, err := beaconDB.GetGenesisTime()
if err != nil {
t.Fatalf("GetGenesisTime failed on second attempt: %v", err)
}
if time != time2 {
t.Fatalf("Expected GetGenesisTime to always return same value: %v %v", time, time2)
}
}
func TestGetGenesisTimeFailure(t *testing.T) {
beaconDB := startInMemoryBeaconDB(t)
defer beaconDB.Close()
genesisBlock, err := beaconDB.GetCanonicalBlockForSlot(0)
if err != nil {
t.Fatalf("Failed to get genesis block: %v", err)
}
hash, _ := genesisBlock.Hash()
err = beaconDB.removeBlock(hash)
if err != nil {
t.Fatalf("Failed to remove genesis block: %v", err)
}
if _, err := beaconDB.GetGenesisTime(); err == nil {
t.Fatalf("Expected GetGenesisTime to fail")
}
}

View File

@@ -36,7 +36,7 @@ type Simulator struct {
enablePOWChain bool
delay time.Duration
slotNum uint64
genesisTimestamp time.Time
slotTicker utils.SlotTicker
broadcastedBlocks map[[32]byte]*types.Block
broadcastedBlockHashes [][32]byte
blockRequestChan chan p2p.Message
@@ -60,6 +60,7 @@ type beaconDB interface {
GetCrystallizedState() *types.CrystallizedState
GetCanonicalBlockForSlot(uint64) (*types.Block, error)
GetCanonicalBlock() (*types.Block, error)
GetGenesisTime() (time.Time, error)
}
// DefaultConfig options for the simulator.
@@ -91,22 +92,21 @@ func NewSimulator(ctx context.Context, cfg *Config) *Simulator {
// Start the sim.
func (sim *Simulator) Start() {
log.Info("Starting service")
genesis, err := sim.beaconDB.GetCanonicalBlockForSlot(0)
genesisTime, err := sim.beaconDB.GetGenesisTime()
if err != nil {
log.Fatalf("Could not get genesis block: %v", err)
}
sim.genesisTimestamp, err = genesis.Timestamp()
if err != nil {
log.Fatalf("Could not get genesis timestamp: %v", err)
log.Fatal(err)
return
}
go sim.run(time.NewTicker(sim.delay).C)
sim.slotTicker = utils.GetSlotTicker(genesisTime)
go sim.run(sim.slotTicker.C())
}
// Stop the sim.
func (sim *Simulator) Stop() error {
defer sim.cancel()
log.Info("Stopping service")
sim.slotTicker.Done()
// Persist the last simulated block in the DB for future sessions
// to continue from the last simulated slot number.
if len(sim.broadcastedBlockHashes) > 0 {
@@ -133,7 +133,7 @@ func (sim *Simulator) lastSimulatedSessionBlock() (*types.Block, error) {
return simulatedBlock, nil
}
func (sim *Simulator) run(delayChan <-chan time.Time) {
func (sim *Simulator) run(slotInterval <-chan uint64) {
blockReqSub := sim.p2p.Subscribe(&pb.BeaconBlockRequest{}, sim.blockRequestChan)
defer blockReqSub.Unsubscribe()
@@ -158,7 +158,7 @@ func (sim *Simulator) run(delayChan <-chan time.Time) {
case <-sim.ctx.Done():
log.Debug("Simulator context closed, exiting goroutine")
return
case <-delayChan:
case slot := <-slotInterval:
activeStateHash, err := sim.beaconDB.GetActiveState().Hash()
if err != nil {
log.Errorf("Could not fetch active state hash: %v", err)
@@ -198,14 +198,8 @@ func (sim *Simulator) run(delayChan <-chan time.Time) {
powChainRef = []byte{byte(sim.slotNum)}
}
blockSlot := utils.CurrentSlot(sim.genesisTimestamp)
if blockSlot == 0 {
// cannot process a genesis block, so we start from 1
blockSlot = 1
}
block := types.NewBlock(&pb.BeaconBlock{
Slot: blockSlot,
Slot: slot,
Timestamp: ptypes.TimestampNow(),
PowChainRef: powChainRef,
ActiveStateRoot: activeStateHash[:],

View File

@@ -80,7 +80,7 @@ func TestBroadcastBlockHash(t *testing.T) {
hook := logTest.NewGlobal()
sim := setupSimulator(t)
delayChan := make(chan time.Time)
delayChan := make(chan uint64)
exitRoutine := make(chan bool)
go func() {
@@ -88,7 +88,7 @@ func TestBroadcastBlockHash(t *testing.T) {
<-exitRoutine
}()
delayChan <- time.Time{}
delayChan <- 0
sim.cancel()
exitRoutine <- true
@@ -104,7 +104,7 @@ func TestBlockRequest(t *testing.T) {
hook := logTest.NewGlobal()
sim := setupSimulator(t)
delayChan := make(chan time.Time)
delayChan := make(chan uint64)
exitRoutine := make(chan bool)
go func() {

View File

@@ -140,9 +140,9 @@ func (b *Block) Timestamp() (time.Time, error) {
}
// isSlotValid compares the slot to the system clock to determine if the block is valid.
func (b *Block) isSlotValid(genesisTimestamp time.Time) bool {
func (b *Block) isSlotValid(genesisTime time.Time) bool {
slotDuration := time.Duration(b.SlotNumber()*params.GetConfig().SlotDuration) * time.Second
validTimeThreshold := genesisTimestamp.Add(slotDuration)
validTimeThreshold := genesisTime.Add(slotDuration)
return clock.Now().After(validTimeThreshold)
}
@@ -155,7 +155,7 @@ func (b *Block) IsValid(
cState *CrystallizedState,
parentSlot uint64,
enableAttestationValidity bool,
genesisTimestamp time.Time) bool {
genesisTime time.Time) bool {
_, err := b.Hash()
if err != nil {
log.Errorf("Could not hash incoming block: %v", err)
@@ -167,7 +167,7 @@ func (b *Block) IsValid(
return false
}
if !b.isSlotValid(genesisTimestamp) {
if !b.isSlotValid(genesisTime) {
log.Errorf("Slot of block is too high: %d", b.SlotNumber())
return false
}

View File

@@ -6,7 +6,7 @@ go_library(
"clock.go",
"flags.go",
"shuffle.go",
"slot_interval.go",
"slot_ticker.go",
],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/utils",
visibility = ["//beacon-chain:__subpackages__"],
@@ -23,6 +23,7 @@ go_test(
srcs = [
"clock_test.go",
"shuffle_test.go",
"slot_ticker_test.go",
],
embed = [":go_default_library"],
deps = [

View File

@@ -1,10 +1,7 @@
package utils
import (
"math"
"time"
"github.com/prysmaticlabs/prysm/beacon-chain/params"
)
// Clock represents a time providing interface that can be mocked for testing.
@@ -19,19 +16,3 @@ type RealClock struct{}
func (RealClock) Now() time.Time {
return time.Now()
}
// CurrentBeaconSlot based on the seconds since genesis.
func CurrentBeaconSlot() uint64 {
secondsSinceGenesis := time.Since(params.GetConfig().GenesisTime).Seconds()
return uint64(math.Floor(secondsSinceGenesis/float64(params.GetConfig().SlotDuration))) - 1
}
// BlockingWait sleeps until a specific time is reached after
// a certain duration. For example, if the genesis block
// was at 12:00:00PM and the current time is 12:00:03PM,
// we want the next slot to tick at 12:00:08PM so we can use
// this helper method to achieve that purpose.
func BlockingWait(duration time.Duration) {
d := time.Until(time.Now().Add(duration).Truncate(duration))
time.Sleep(d)
}

View File

@@ -13,7 +13,4 @@ func TestRealClockIsAccurate(t *testing.T) {
if clockTime != actualTime {
t.Errorf("The time from the Clock interface should equal the actual time. Got: %v, Expected: %v", clockTime, actualTime)
}
BlockingWait(0)
CurrentBeaconSlot()
}

View File

@@ -1,17 +0,0 @@
package utils
import (
"time"
"github.com/prysmaticlabs/prysm/beacon-chain/params"
)
// CurrentSlot returns slot number based on the genesis timestamp.
func CurrentSlot(genesisTime time.Time) uint64 {
secondsSinceGenesis := uint64(time.Since(genesisTime).Seconds())
currentSlot := secondsSinceGenesis / params.GetConfig().SlotDuration
if currentSlot < 1 {
return 0
}
return currentSlot - 1
}

View File

@@ -0,0 +1,79 @@
package utils
import (
"time"
"github.com/prysmaticlabs/prysm/beacon-chain/params"
)
// SlotTicker is a special ticker for the beacon chain block.
// The channel emits over the slot interval, and ensures that
// the ticks are in line with the genesis time. This means that
// the duration between the ticks and the genesis time are always a
// multiple of the slot duration.
// In addition, the channel returns the new slot number.
type SlotTicker struct {
c chan uint64
done chan struct{}
}
// C returns the ticker channel. Call Cancel afterwards to ensure
// that the goroutine exits cleanly.
func (s *SlotTicker) C() <-chan uint64 {
return s.c
}
// Done should be called to clean up the ticker.
func (s *SlotTicker) Done() {
go func() {
s.done <- struct{}{}
}()
}
// GetSlotTicker is the constructor for SlotTicker
func GetSlotTicker(genesisTime time.Time) SlotTicker {
ticker := SlotTicker{
c: make(chan uint64),
done: make(chan struct{}),
}
ticker.start(genesisTime, params.GetConfig().SlotDuration, time.Since, time.Until, time.After)
return ticker
}
func (s *SlotTicker) start(
genesisTime time.Time,
slotDuration uint64,
since func(time.Time) time.Duration,
until func(time.Time) time.Duration,
after func(time.Duration) <-chan time.Time) {
d := time.Duration(slotDuration) * time.Second
go func() {
sinceGenesis := since(genesisTime)
var nextTickTime time.Time
var slot uint64
if sinceGenesis < 0 {
// Handle when the current time is before the genesis time
nextTickTime = genesisTime
slot = 0
} else {
nextTick := sinceGenesis.Truncate(d) + d
nextTickTime = genesisTime.Add(nextTick)
slot = uint64(nextTick / d)
}
for {
waitTime := until(nextTickTime)
select {
case <-after(waitTime):
s.c <- slot
slot++
nextTickTime = nextTickTime.Add(d)
case <-s.done:
return
}
}
}()
}

View File

@@ -0,0 +1,102 @@
package utils
import (
"testing"
"time"
)
func TestSlotTicker(t *testing.T) {
ticker := SlotTicker{
c: make(chan uint64),
done: make(chan struct{}),
}
defer ticker.Done()
var sinceDuration time.Duration
since := func(time.Time) time.Duration {
return sinceDuration
}
var untilDuration time.Duration
until := func(time.Time) time.Duration {
return untilDuration
}
var tick chan time.Time
after := func(time.Duration) <-chan time.Time {
return tick
}
genesisTime := time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)
slotDuration := uint64(8)
// Test when the ticker starts immediately after genesis time.
sinceDuration = 1 * time.Second
untilDuration = 7 * time.Second
// Make this a buffered channel to prevent a deadlock since
// the other goroutine calls a function in this goroutine.
tick = make(chan time.Time, 2)
ticker.start(genesisTime, slotDuration, since, until, after)
// Tick once
tick <- time.Now()
slot := <-ticker.C()
if slot != 1 {
t.Fatalf("Expected 1, got %d", slot)
}
// Tick twice
tick <- time.Now()
slot = <-ticker.C()
if slot != 2 {
t.Fatalf("Expected 2, got %d", slot)
}
}
func TestSlotTickerGenesis(t *testing.T) {
ticker := SlotTicker{
c: make(chan uint64),
done: make(chan struct{}),
}
defer ticker.Done()
var sinceDuration time.Duration
since := func(time.Time) time.Duration {
return sinceDuration
}
var untilDuration time.Duration
until := func(time.Time) time.Duration {
return untilDuration
}
var tick chan time.Time
after := func(time.Duration) <-chan time.Time {
return tick
}
genesisTime := time.Date(2018, 1, 1, 0, 0, 0, 0, time.UTC)
slotDuration := uint64(8)
// Test when the ticker starts before genesis time.
sinceDuration = -1 * time.Second
untilDuration = 1 * time.Second
// Make this a buffered channel to prevent a deadlock since
// the other goroutine calls a function in this goroutine.
tick = make(chan time.Time, 2)
ticker.start(genesisTime, slotDuration, since, until, after)
// Tick once
tick <- time.Now()
slot := <-ticker.C()
if slot != 0 {
t.Fatalf("Expected 0, got %d", slot)
}
// Tick twice
tick <- time.Now()
slot = <-ticker.C()
if slot != 1 {
t.Fatalf("Expected 1, got %d", slot)
}
}