From 8bffae1316f0d3db85241f9fb85eb0cc2e9c83b2 Mon Sep 17 00:00:00 2001 From: Yutaro Mori Date: Mon, 15 Oct 2018 00:29:57 +0900 Subject: [PATCH] Various cleanup and bugfixes around validator/RBC interaction (#657) --- beacon-chain/blockchain/BUILD.bazel | 1 - beacon-chain/blockchain/service.go | 52 ++--- beacon-chain/blockchain/service_test.go | 146 ++------------ beacon-chain/casper/BUILD.bazel | 1 + beacon-chain/casper/validator.go | 19 +- beacon-chain/casper/validator_test.go | 6 +- beacon-chain/rpc/service.go | 20 +- beacon-chain/simulator/BUILD.bazel | 3 +- beacon-chain/simulator/service.go | 5 +- beacon-chain/sync/service.go | 10 +- beacon-chain/types/crystallized_state.go | 5 +- beacon-chain/utils/BUILD.bazel | 2 - shared/slotticker/BUILD.bazel | 14 ++ .../slotticker/slotticker.go | 26 ++- .../slotticker/slotticker_test.go | 25 ++- shared/testutil/log.go | 29 --- validator/attester/service.go | 8 +- validator/beacon/BUILD.bazel | 3 +- validator/beacon/service.go | 178 +++++++++--------- validator/beacon/service_test.go | 74 ++++---- validator/node/node.go | 5 +- validator/params/config.go | 4 +- validator/proposer/service.go | 3 +- validator/proposer/service_test.go | 4 +- 24 files changed, 251 insertions(+), 392 deletions(-) create mode 100644 shared/slotticker/BUILD.bazel rename beacon-chain/utils/slot_ticker.go => shared/slotticker/slotticker.go (72%) rename beacon-chain/utils/slot_ticker_test.go => shared/slotticker/slotticker_test.go (79%) diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index 5d19ed78be..8cd2a13263 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "//beacon-chain/db:go_default_library", "//beacon-chain/powchain:go_default_library", "//beacon-chain/types:go_default_library", - "//beacon-chain/utils:go_default_library", "//shared/event:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", ], diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 8408e0bbf2..2c1dc1132d 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -4,13 +4,11 @@ package blockchain import ( "context" "fmt" - "sync" "time" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/powchain" "github.com/prysmaticlabs/prysm/beacon-chain/types" - "github.com/prysmaticlabs/prysm/beacon-chain/utils" "github.com/prysmaticlabs/prysm/shared/event" "github.com/sirupsen/logrus" ) @@ -26,12 +24,10 @@ type ChainService struct { web3Service *powchain.Web3Service incomingBlockFeed *event.Feed incomingBlockChan chan *types.Block + processedBlockChan chan *types.Block canonicalBlockFeed *event.Feed canonicalCrystallizedStateFeed *event.Feed - blocksPendingProcessing [][32]byte - lock sync.Mutex genesisTime time.Time - slotTicker utils.SlotTicker enableCrossLinks bool enableRewardChecking bool enableAttestationValidity bool @@ -61,10 +57,10 @@ func NewChainService(ctx context.Context, cfg *Config) (*ChainService, error) { beaconDB: cfg.BeaconDB, web3Service: cfg.Web3Service, incomingBlockChan: make(chan *types.Block, cfg.IncomingBlockBuf), + processedBlockChan: make(chan *types.Block), incomingBlockFeed: new(event.Feed), canonicalBlockFeed: new(event.Feed), canonicalCrystallizedStateFeed: new(event.Feed), - blocksPendingProcessing: [][32]byte{}, enablePOWChain: cfg.EnablePOWChain, enableCrossLinks: cfg.EnableCrossLinks, enableRewardChecking: cfg.EnableRewardChecking, @@ -83,15 +79,13 @@ func (c *ChainService) Start() { return } - c.slotTicker = utils.GetSlotTicker(c.genesisTime) - go c.updateHead(c.slotTicker.C()) - go c.blockProcessing() + go c.updateHead(c.processedBlockChan) + go c.blockProcessing(c.processedBlockChan) } // Stop the blockchain service's main event loop and associated goroutines. func (c *ChainService) Stop() error { defer c.cancel() - c.slotTicker.Done() log.Info("Stopping service") return nil @@ -131,27 +125,13 @@ func (c *ChainService) doesPoWBlockExist(block *types.Block) bool { // at an in-memory slice of block hashes pending processing and // selects the best block according to the in-protocol fork choice // rule as canonical. This block is then persisted to storage. -func (c *ChainService) updateHead(slotInterval <-chan uint64) { +func (c *ChainService) updateHead(processedBlock <-chan *types.Block) { for { select { case <-c.ctx.Done(): return - case slot := <-slotInterval: - log.WithField("slotNumber", slot).Info("New beacon slot") - - // First, we check if there were any blocks processed in the previous slot. - // If there is, we fetch the first one from the DB. - if len(c.blocksPendingProcessing) == 0 { - continue - } - - // Naive fork choice rule: we pick the first block we processed for the previous slot - // as canonical. - block, err := c.beaconDB.GetBlock(c.blocksPendingProcessing[0]) - if err != nil { - log.Errorf("Could not get block: %v", err) - continue - } + case block := <-processedBlock: + log.WithField("slot", block.SlotNumber()).Info("New beacon slot") h, err := block.Hash() if err != nil { @@ -171,7 +151,8 @@ func (c *ChainService) updateHead(slotInterval <-chan uint64) { aState := c.beaconDB.GetActiveState() var stateTransitioned bool - for cState.IsCycleTransition(parentBlock.SlotNumber()) { + for cState.IsCycleTransition(block.SlotNumber()) { + log.Infof("Recalculating crystallized state") cState, aState, err = cState.NewStateRecalculations( aState, block, @@ -229,17 +210,11 @@ func (c *ChainService) updateHead(slotInterval <-chan uint64) { c.canonicalCrystallizedStateFeed.Send(cState) } c.canonicalBlockFeed.Send(block) - - // Clear the blocks pending processing, mutex lock for thread safety - // in updating this slice. - c.lock.Lock() - c.blocksPendingProcessing = [][32]byte{} - c.lock.Unlock() } } } -func (c *ChainService) blockProcessing() { +func (c *ChainService) blockProcessing(processedBlock chan<- *types.Block) { subBlock := c.incomingBlockFeed.Subscribe(c.incomingBlockChan) defer subBlock.Unsubscribe() for { @@ -299,11 +274,8 @@ func (c *ChainService) blockProcessing() { log.Infof("Finished processing received block: %#x", blockHash) - // We push the hash of the block we just stored to a pending processing - // slice the fork choice rule will utilize. - c.lock.Lock() - c.blocksPendingProcessing = append(c.blocksPendingProcessing, blockHash) - c.lock.Unlock() + // Push the block to trigger the fork choice rule + processedBlock <- block } } } diff --git a/beacon-chain/blockchain/service_test.go b/beacon-chain/blockchain/service_test.go index 396ef382d4..2950b596e4 100644 --- a/beacon-chain/blockchain/service_test.go +++ b/beacon-chain/blockchain/service_test.go @@ -3,7 +3,6 @@ package blockchain import ( "context" "errors" - "fmt" "io/ioutil" "math" "math/big" @@ -136,9 +135,10 @@ func TestRunningChainServiceFaultyPOWChain(t *testing.T) { PowChainRef: []byte("a"), }) + blockChan := make(chan *types.Block) exitRoutine := make(chan bool) go func() { - chainService.blockProcessing() + chainService.blockProcessing(blockChan) <-exitRoutine }() @@ -202,10 +202,11 @@ func TestRunningChainService(t *testing.T) { AncestorHashes: [][]byte{{}}, }) + blockChan := make(chan *types.Block) exitRoutine := make(chan bool) t.Log([][]byte{parentHash[:]}) go func() { - chainService.blockProcessing() + chainService.blockProcessing(blockChan) <-exitRoutine }() @@ -215,9 +216,10 @@ func TestRunningChainService(t *testing.T) { chainService.incomingBlockChan <- blockNoParent chainService.incomingBlockChan <- block + <-blockChan chainService.cancel() exitRoutine <- true - testutil.WaitForLog(t, hook, "Chain service context closed, exiting goroutine") + testutil.AssertLogsContain(t, hook, "Chain service context closed, exiting goroutine") testutil.AssertLogsContain(t, hook, "Block points to nil parent") testutil.AssertLogsContain(t, hook, "Finished processing received block") } @@ -239,119 +241,6 @@ func TestDoesPOWBlockExist(t *testing.T) { testutil.AssertLogsContain(t, hook, "fetching PoW block corresponding to mainchain reference failed") } -func TestUpdateHeadEmpty(t *testing.T) { - hook := logTest.NewGlobal() - chainService := setupBeaconChain(t, false) - defer chainService.beaconDB.Close() - - active := types.NewGenesisActiveState() - crystallized, err := types.NewGenesisCrystallizedState("") - if err != nil { - t.Fatalf("Can't generate genesis state: %v", err) - } - ActiveStateRoot, _ := active.Hash() - CrystallizedStateRoot, _ := crystallized.Hash() - - genesis := types.NewGenesisBlock(ActiveStateRoot, CrystallizedStateRoot) - genesisHash, err := genesis.Hash() - if err != nil { - t.Fatalf("Could not get genesis block hash: %v", err) - } - - block := types.NewBlock(&pb.BeaconBlock{ - Slot: 64, - ActiveStateRoot: ActiveStateRoot[:], - CrystallizedStateRoot: CrystallizedStateRoot[:], - AncestorHashes: [][]byte{genesisHash[:]}, - PowChainRef: []byte("a"), - }) - - exitRoutine := make(chan bool) - slotChan := make(chan uint64) - go func() { - chainService.updateHead(slotChan) - <-exitRoutine - }() - - if err := chainService.beaconDB.SaveBlock(block); err != nil { - t.Fatal(err) - } - - // If blocks pending processing is empty, the updateHead routine does nothing. - chainService.blocksPendingProcessing = [][32]byte{} - slotChan <- 0 - chainService.cancel() - exitRoutine <- true - - testutil.AssertLogsDoNotContain(t, hook, "Applying fork choice rule") -} - -func TestUpdateHeadNoBlock(t *testing.T) { - hook := logTest.NewGlobal() - chainService := setupBeaconChain(t, false) - defer chainService.beaconDB.Close() - - exitRoutine := make(chan bool) - slotChan := make(chan uint64) - go func() { - chainService.updateHead(slotChan) - <-exitRoutine - }() - - // If blocks pending processing contains a hash of a block that does not exist - // in persistent storage, we expect an error log to be thrown as - // that is unexpected behavior given the block should have been saved during - // processing. - fakeBlock := types.NewBlock(&pb.BeaconBlock{Slot: 100}) - fakeBlockHash, err := fakeBlock.Hash() - if err != nil { - t.Fatal(err) - } - chainService.blocksPendingProcessing = [][32]byte{} - chainService.blocksPendingProcessing = append(chainService.blocksPendingProcessing, fakeBlockHash) - slotChan <- 0 - chainService.cancel() - exitRoutine <- true - - testutil.AssertLogsContain(t, hook, "Could not get block") -} - -func TestUpdateHeadNoParent(t *testing.T) { - hook := logTest.NewGlobal() - chainService := setupBeaconChain(t, false) - defer chainService.beaconDB.Close() - - exitRoutine := make(chan bool) - slotChan := make(chan uint64) - go func() { - chainService.updateHead(slotChan) - <-exitRoutine - }() - - // non-existent parent hash should log an error in updateHead. - noParentBlock := types.NewBlock(&pb.BeaconBlock{ - Slot: 64, - AncestorHashes: [][]byte{{}}, - }) - noParentBlockHash, err := noParentBlock.Hash() - if err != nil { - t.Fatalf("Could not hash block: %v", err) - } - - if err := chainService.beaconDB.SaveBlock(noParentBlock); err != nil { - t.Fatal(err) - } - - chainService.blocksPendingProcessing = [][32]byte{} - chainService.blocksPendingProcessing = append(chainService.blocksPendingProcessing, noParentBlockHash) - - slotChan <- 0 - chainService.cancel() - exitRoutine <- true - - testutil.AssertLogsContain(t, hook, fmt.Sprintf("Failed to get parent of block %#x", noParentBlockHash)) -} - func TestUpdateHead(t *testing.T) { hook := logTest.NewGlobal() chainService := setupBeaconChain(t, false) @@ -378,15 +267,11 @@ func TestUpdateHead(t *testing.T) { AncestorHashes: [][]byte{genesisHash[:]}, PowChainRef: []byte("a"), }) - hash, err := block.Hash() - if err != nil { - t.Fatalf("Could not hash block: %v", err) - } exitRoutine := make(chan bool) - slotChan := make(chan uint64) + blockChan := make(chan *types.Block) go func() { - chainService.updateHead(slotChan) + chainService.updateHead(blockChan) <-exitRoutine }() @@ -395,9 +280,7 @@ func TestUpdateHead(t *testing.T) { } // If blocks pending processing is empty, the updateHead routine does nothing. - chainService.blocksPendingProcessing = [][32]byte{} - chainService.blocksPendingProcessing = append(chainService.blocksPendingProcessing, hash) - slotChan <- 0 + blockChan <- block chainService.cancel() exitRoutine <- true @@ -443,13 +326,19 @@ func TestProcessBlocksWithCorrectAttestations(t *testing.T) { }}, }) + blockChan := make(chan *types.Block) exitRoutine := make(chan bool) go func() { - chainService.blockProcessing() + chainService.blockProcessing(blockChan) <-exitRoutine }() chainService.incomingBlockChan <- block1 + block1Returned := <-blockChan + + if block1 != block1Returned { + t.Fatalf("expected %v and %v to be the same", block1, block1Returned) + } block1Hash, err := block1.Hash() if err != nil { @@ -484,8 +373,11 @@ func TestProcessBlocksWithCorrectAttestations(t *testing.T) { }}) chainService.incomingBlockChan <- block1 + <-blockChan chainService.incomingBlockChan <- block2 + <-blockChan chainService.incomingBlockChan <- block3 + <-blockChan chainService.cancel() exitRoutine <- true diff --git a/beacon-chain/casper/BUILD.bazel b/beacon-chain/casper/BUILD.bazel index 668d7e6ee4..0ea6779be9 100644 --- a/beacon-chain/casper/BUILD.bazel +++ b/beacon-chain/casper/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//beacon-chain/params:go_default_library", "//beacon-chain/utils:go_default_library", "//proto/beacon/p2p/v1:go_default_library", + "//proto/beacon/rpc/v1:go_default_library", "//shared/bitutil:go_default_library", "//shared/mathutil:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", diff --git a/beacon-chain/casper/validator.go b/beacon-chain/casper/validator.go index 14971f80d0..10b457f4af 100644 --- a/beacon-chain/casper/validator.go +++ b/beacon-chain/casper/validator.go @@ -6,6 +6,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/params" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + pbrpc "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1" "github.com/prysmaticlabs/prysm/shared/bitutil" ) @@ -143,27 +144,29 @@ func ValidatorShardID(pubKey []byte, validators []*pb.ValidatorRecord, shardComm return 0, fmt.Errorf("can't find shard ID for validator with public key %#x", pubKey) } -// ValidatorSlotAndResponsibility returns a validator's assingned slot number +// ValidatorSlotAndRole returns a validator's assingned slot number // and whether it should act as an attester or proposer. -func ValidatorSlotAndResponsibility(pubKey []byte, validators []*pb.ValidatorRecord, shardCommittees []*pb.ShardAndCommitteeArray) (uint64, string, error) { +func ValidatorSlotAndRole(pubKey []byte, validators []*pb.ValidatorRecord, shardCommittees []*pb.ShardAndCommitteeArray) (uint64, pbrpc.ValidatorRole, error) { index, err := ValidatorIndex(pubKey, validators) if err != nil { - return 0, "", err + return 0, pbrpc.ValidatorRole_UNKNOWN, err } for slot, slotCommittee := range shardCommittees { for i, committee := range slotCommittee.ArrayShardAndCommittee { for v, validator := range committee.Committee { - if i == 0 && v == slot%len(committee.Committee) && validator == index { - return uint64(slot), "proposer", nil + if validator != index { + continue } - if validator == index { - return uint64(slot), "attester", nil + if i == 0 && v == slot%len(committee.Committee) { + return uint64(slot), pbrpc.ValidatorRole_PROPOSER, nil } + + return uint64(slot), pbrpc.ValidatorRole_ATTESTER, nil } } } - return 0, "", fmt.Errorf("can't find slot number for validator with public key %#x", pubKey) + return 0, pbrpc.ValidatorRole_UNKNOWN, fmt.Errorf("can't find slot number for validator with public key %#x", pubKey) } // TotalActiveValidatorDeposit returns the total deposited amount in Gwei for all active validators. diff --git a/beacon-chain/casper/validator_test.go b/beacon-chain/casper/validator_test.go index 50443de222..9b623b5265 100644 --- a/beacon-chain/casper/validator_test.go +++ b/beacon-chain/casper/validator_test.go @@ -238,12 +238,12 @@ func TestValidatorSlotAndResponsibility(t *testing.T) { {Shard: 8, Committee: []uint32{54, 55, 56, 57, 58, 59}}, }}, } - if _, _, err := ValidatorSlotAndResponsibility([]byte("100"), validators, shardCommittees); err == nil { + if _, _, err := ValidatorSlotAndRole([]byte("100"), validators, shardCommittees); err == nil { t.Fatalf("ValidatorSlot should have failed, there's no validator with pubkey 100") } validators[59].Pubkey = []byte("100") - slot, _, err := ValidatorSlotAndResponsibility([]byte("100"), validators, shardCommittees) + slot, _, err := ValidatorSlotAndRole([]byte("100"), validators, shardCommittees) if err != nil { t.Fatalf("call ValidatorSlot failed: %v", err) } @@ -252,7 +252,7 @@ func TestValidatorSlotAndResponsibility(t *testing.T) { } validators[60].Pubkey = []byte("101") - if _, _, err := ValidatorSlotAndResponsibility([]byte("101"), validators, shardCommittees); err == nil { + if _, _, err := ValidatorSlotAndRole([]byte("101"), validators, shardCommittees); err == nil { t.Fatalf("ValidatorSlot should have failed, validator indexed at 60 is not in the committee") } } diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index 637ef6e12d..74a984fa7c 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -292,7 +292,6 @@ func (s *Service) ValidatorShardID(ctx context.Context, req *pb.PublicKey) (*pb. shardID, err := casper.ValidatorShardID( req.PublicKey, - cState.Validators(), cState.ShardAndCommitteesForSlots(), ) @@ -308,7 +307,7 @@ func (s *Service) ValidatorShardID(ctx context.Context, req *pb.PublicKey) (*pb. func (s *Service) ValidatorSlotAndResponsibility(ctx context.Context, req *pb.PublicKey) (*pb.SlotResponsibilityResponse, error) { cState := s.beaconDB.GetCrystallizedState() - slot, responsibility, err := casper.ValidatorSlotAndResponsibility( + slot, role, err := casper.ValidatorSlotAndRole( req.PublicKey, cState.Validators(), cState.ShardAndCommitteesForSlots(), @@ -317,13 +316,6 @@ func (s *Service) ValidatorSlotAndResponsibility(ctx context.Context, req *pb.Pu return nil, fmt.Errorf("could not get assigned validator slot for attester/proposer: %v", err) } - var role pb.ValidatorRole - if responsibility == "proposer" { - role = pb.ValidatorRole_PROPOSER - } else if responsibility == "attester" { - role = pb.ValidatorRole_ATTESTER - } - return &pb.SlotResponsibilityResponse{Slot: slot, Role: role}, nil } @@ -350,6 +342,7 @@ func (s *Service) ValidatorIndex(ctx context.Context, req *pb.PublicKey) (*pb.In func (s *Service) ValidatorAssignments( req *pb.ValidatorAssignmentRequest, stream pb.BeaconService_ValidatorAssignmentsServer) error { + sub := s.chainService.CanonicalCrystallizedStateFeed().Subscribe(s.canonicalStateChan) defer sub.Unsubscribe() for { @@ -403,7 +396,7 @@ func assignmentsForPublicKeys(keys []*pb.PublicKey, cState *types.CrystallizedSt // For the corresponding public key and current crystallized state, // we determine the assigned slot for the validator and whether it // should act as a proposer or attester. - assignedSlot, responsibility, err := casper.ValidatorSlotAndResponsibility( + assignedSlot, role, err := casper.ValidatorSlotAndRole( val.GetPublicKey(), cState.Validators(), cState.ShardAndCommitteesForSlots(), @@ -412,13 +405,6 @@ func assignmentsForPublicKeys(keys []*pb.PublicKey, cState *types.CrystallizedSt return nil, err } - var role pb.ValidatorRole - if responsibility == "proposer" { - role = pb.ValidatorRole_PROPOSER - } else { - role = pb.ValidatorRole_ATTESTER - } - // We determine the assigned shard ID for the validator // based on a public key and current crystallized state. shardID, err := casper.ValidatorShardID( diff --git a/beacon-chain/simulator/BUILD.bazel b/beacon-chain/simulator/BUILD.bazel index bd6b9c28a3..68db35071a 100644 --- a/beacon-chain/simulator/BUILD.bazel +++ b/beacon-chain/simulator/BUILD.bazel @@ -6,11 +6,12 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/beacon-chain/simulator", visibility = ["//beacon-chain:__subpackages__"], deps = [ + "//beacon-chain/params:go_default_library", "//beacon-chain/types:go_default_library", - "//beacon-chain/utils:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//shared/event:go_default_library", "//shared/p2p:go_default_library", + "//shared/slotticker:go_default_library", "@com_github_golang_protobuf//proto:go_default_library", "@com_github_golang_protobuf//ptypes:go_default_library_gen", "@com_github_sirupsen_logrus//:go_default_library", diff --git a/beacon-chain/simulator/service.go b/beacon-chain/simulator/service.go index 83746b8ef8..cef127c2f4 100644 --- a/beacon-chain/simulator/service.go +++ b/beacon-chain/simulator/service.go @@ -9,11 +9,12 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" + "github.com/prysmaticlabs/prysm/beacon-chain/params" "github.com/prysmaticlabs/prysm/beacon-chain/types" - "github.com/prysmaticlabs/prysm/beacon-chain/utils" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/shared/p2p" + "github.com/prysmaticlabs/prysm/shared/slotticker" "github.com/sirupsen/logrus" ) @@ -82,7 +83,7 @@ func (sim *Simulator) Start() { return } - slotTicker := utils.GetSlotTicker(genesisTime) + slotTicker := slotticker.GetSlotTicker(genesisTime, params.GetConfig().SlotDuration) go func() { sim.run(slotTicker.C(), sim.blockRequestChan) close(sim.blockRequestChan) diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index a8f380f54e..8f01981b75 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -229,13 +229,7 @@ func (ss *Service) receiveBlock(msg p2p.Message) { // Verify attestation coming from proposer then forward block to the subscribers. attestation := types.NewAttestation(response.Attestation) cState := ss.db.GetCrystallizedState() - parentBlock, err := ss.db.GetBlock(block.ParentHash()) - if err != nil { - log.Errorf("Failed to get parent slot: %v", err) - return - } - parentSlot := parentBlock.SlotNumber() - proposerShardID, _, err := casper.ProposerShardAndIndex(cState.ShardAndCommitteesForSlots(), cState.LastStateRecalculationSlot(), parentSlot) + proposerShardID, _, err := casper.ProposerShardAndIndex(cState.ShardAndCommitteesForSlots(), cState.LastStateRecalculationSlot(), block.SlotNumber()) if err != nil { log.Errorf("Failed to get proposer shard ID: %v", err) return @@ -319,7 +313,7 @@ func (ss *Service) receiveAttestation(msg p2p.Message) { } validatorExists := attestation.ContainsValidator(a.AttesterBitfield()) if validatorExists { - log.Debugf("Received attestation 0x%v already", h) + log.Debugf("Received attestation %#x already", h) return } } diff --git a/beacon-chain/types/crystallized_state.go b/beacon-chain/types/crystallized_state.go index 8878c041c5..fbf0cb0a92 100644 --- a/beacon-chain/types/crystallized_state.go +++ b/beacon-chain/types/crystallized_state.go @@ -196,10 +196,7 @@ func (c *CrystallizedState) DepositsPenalizedInPeriod() []uint32 { // IsCycleTransition checks if a new cycle has been reached. At that point, // a new crystallized state and active state transition will occur. func (c *CrystallizedState) IsCycleTransition(slotNumber uint64) bool { - if c.LastStateRecalculationSlot() == 0 && slotNumber == params.GetConfig().CycleLength-1 { - return true - } - return slotNumber >= c.LastStateRecalculationSlot()+params.GetConfig().CycleLength-1 + return slotNumber >= c.LastStateRecalculationSlot()+params.GetConfig().CycleLength } // isValidatorSetChange checks if a validator set change transition can be processed. At that point, diff --git a/beacon-chain/utils/BUILD.bazel b/beacon-chain/utils/BUILD.bazel index 4a5719830e..e06387a9ce 100644 --- a/beacon-chain/utils/BUILD.bazel +++ b/beacon-chain/utils/BUILD.bazel @@ -6,7 +6,6 @@ go_library( "clock.go", "flags.go", "shuffle.go", - "slot_ticker.go", ], importpath = "github.com/prysmaticlabs/prysm/beacon-chain/utils", visibility = ["//beacon-chain:__subpackages__"], @@ -23,7 +22,6 @@ go_test( srcs = [ "clock_test.go", "shuffle_test.go", - "slot_ticker_test.go", ], embed = [":go_default_library"], deps = [ diff --git a/shared/slotticker/BUILD.bazel b/shared/slotticker/BUILD.bazel new file mode 100644 index 0000000000..a6881343a4 --- /dev/null +++ b/shared/slotticker/BUILD.bazel @@ -0,0 +1,14 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["slotticker.go"], + importpath = "github.com/prysmaticlabs/prysm/shared/slotticker", + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = ["slotticker_test.go"], + embed = [":go_default_library"], +) diff --git a/beacon-chain/utils/slot_ticker.go b/shared/slotticker/slotticker.go similarity index 72% rename from beacon-chain/utils/slot_ticker.go rename to shared/slotticker/slotticker.go index 8374d1e738..e487b6f776 100644 --- a/beacon-chain/utils/slot_ticker.go +++ b/shared/slotticker/slotticker.go @@ -1,9 +1,7 @@ -package utils +package slotticker import ( "time" - - "github.com/prysmaticlabs/prysm/beacon-chain/params" ) // SlotTicker is a special ticker for the beacon chain block. @@ -30,23 +28,39 @@ func (s *SlotTicker) Done() { }() } -// GetSlotTicker is the constructor for SlotTicker -func GetSlotTicker(genesisTime time.Time) SlotTicker { +// GetSlotTicker is the constructor for SlotTicker. +func GetSlotTicker(genesisTime time.Time, slotDuration uint64) SlotTicker { ticker := SlotTicker{ c: make(chan uint64), done: make(chan struct{}), } - ticker.start(genesisTime, params.GetConfig().SlotDuration, time.Since, time.Until, time.After) + ticker.start(genesisTime, slotDuration, time.Since, time.Until, time.After) return ticker } +// CurrentSlot accepts the genesis time and returns the current time's slot. +func CurrentSlot( + genesisTime time.Time, + slotDuration uint64, + since func(time.Time) time.Duration) uint64 { + + sinceGenesis := since(genesisTime) + if sinceGenesis < 0 { + return 0 + } + + durationInSeconds := time.Duration(slotDuration) * time.Second + return uint64(sinceGenesis / durationInSeconds) +} + func (s *SlotTicker) start( genesisTime time.Time, slotDuration uint64, since func(time.Time) time.Duration, until func(time.Time) time.Duration, after func(time.Duration) <-chan time.Time) { + d := time.Duration(slotDuration) * time.Second go func() { diff --git a/beacon-chain/utils/slot_ticker_test.go b/shared/slotticker/slotticker_test.go similarity index 79% rename from beacon-chain/utils/slot_ticker_test.go rename to shared/slotticker/slotticker_test.go index 7d5fc1a008..0a84d335bf 100644 --- a/beacon-chain/utils/slot_ticker_test.go +++ b/shared/slotticker/slotticker_test.go @@ -1,4 +1,4 @@ -package utils +package slotticker import ( "testing" @@ -100,3 +100,26 @@ func TestSlotTickerGenesis(t *testing.T) { t.Fatalf("Expected 1, got %d", slot) } } + +func TestCurrentSlot(t *testing.T) { + // Test slot 0 + genesisTime := time.Now() + slot := CurrentSlot(genesisTime, 5, time.Since) + if slot != 0 { + t.Errorf("Expected 0, got: %d", slot) + } + + // Test a future genesis time + genesisTime = genesisTime.Add(3 * time.Second) + slot = CurrentSlot(genesisTime, 5, time.Since) + if slot != 0 { + t.Errorf("Expected 0, got: %d", slot) + } + + // Test slot 3 + genesisTime = genesisTime.Add(-18 * time.Second) + slot = CurrentSlot(genesisTime, 5, time.Since) + if slot != 3 { + t.Errorf("Expected 3, got: %d", slot) + } +} diff --git a/shared/testutil/log.go b/shared/testutil/log.go index 9c081aa59a..8b2054bfbb 100644 --- a/shared/testutil/log.go +++ b/shared/testutil/log.go @@ -4,7 +4,6 @@ package testutil import ( "strings" "testing" - "time" "github.com/sirupsen/logrus/hooks/test" ) @@ -37,31 +36,3 @@ func assertLogs(t *testing.T, hook *test.Hook, want string, flag bool) { t.Fatalf("unwanted log found: %s", want) } } - -// WaitForLog scans for log entries in a way that prevents -// race conditions from causing failed tests. -func WaitForLog(t *testing.T, hook *test.Hook, want string) { - t.Logf("scanning for: %s", want) - - match := false - ticker := time.NewTicker(1 * time.Second) - - for { - entries := hook.AllEntries() - if match { - ticker.Stop() - break - } - if len(ticker.C) != 0 { - ticker.Stop() - t.Fatalf("log not found: %s", want) - break - } - for _, e := range entries { - if strings.Contains(e.Message, want) { - match = true - t.Logf("log: %s", e.Message) - } - } - } -} diff --git a/validator/attester/service.go b/validator/attester/service.go index cc052488a2..f4e817a4c5 100644 --- a/validator/attester/service.go +++ b/validator/attester/service.go @@ -23,7 +23,6 @@ type rpcClientService interface { type beaconClientService interface { AttesterAssignmentFeed() *event.Feed - PublicKey() []byte } // Attester holds functionality required to run a block attester @@ -35,6 +34,7 @@ type Attester struct { rpcClientService rpcClientService assignmentChan chan *pbp2p.BeaconBlock shardID uint64 + publicKey []byte } // Config options for an attester service. @@ -43,6 +43,7 @@ type Config struct { ShardID uint64 Assigner beaconClientService Client rpcClientService + PublicKey []byte } // NewAttester creates a new attester instance. @@ -54,6 +55,7 @@ func NewAttester(ctx context.Context, cfg *Config) *Attester { beaconService: cfg.Assigner, rpcClientService: cfg.Client, shardID: cfg.ShardID, + publicKey: cfg.PublicKey, assignmentChan: make(chan *pbp2p.BeaconBlock, cfg.AssignmentBuf), } } @@ -94,7 +96,7 @@ func (a *Attester) run(attester pb.AttesterServiceClient, validator pb.Validator latestBlockHash := hashutil.Hash(data) pubKeyReq := &pb.PublicKey{ - PublicKey: a.beaconService.PublicKey(), + PublicKey: a.publicKey, } shardID, err := validator.ValidatorShardID(a.ctx, pubKeyReq) if err != nil { @@ -126,7 +128,7 @@ func (a *Attester) run(attester pb.AttesterServiceClient, validator pb.Validator log.Errorf("could not attest head: %v", err) continue } - log.Infof("Attestation proposed successfully with hash 0x%x", res.AttestationHash) + log.Infof("Attestation proposed successfully with hash %#x", res.AttestationHash) } } } diff --git a/validator/beacon/BUILD.bazel b/validator/beacon/BUILD.bazel index 96bad1a33d..adeb9baeeb 100644 --- a/validator/beacon/BUILD.bazel +++ b/validator/beacon/BUILD.bazel @@ -8,8 +8,8 @@ go_library( deps = [ "//proto/beacon/rpc/v1:go_default_library", "//shared/event:go_default_library", + "//shared/slotticker:go_default_library", "//validator/params:go_default_library", - "//validator/utils:go_default_library", "@com_github_golang_protobuf//ptypes:go_default_library_gen", "@com_github_sirupsen_logrus//:go_default_library", "@io_bazel_rules_go//proto/wkt:empty_go_proto", @@ -25,7 +25,6 @@ go_test( "//proto/beacon/rpc/v1:go_default_library", "//shared/testutil:go_default_library", "//validator/internal:go_default_library", - "//validator/params:go_default_library", "@com_github_golang_mock//gomock:go_default_library", "@com_github_golang_protobuf//ptypes:go_default_library_gen", "@com_github_sirupsen_logrus//:go_default_library", diff --git a/validator/beacon/service.go b/validator/beacon/service.go index 8cab49f750..c0e419a532 100644 --- a/validator/beacon/service.go +++ b/validator/beacon/service.go @@ -3,16 +3,16 @@ package beacon import ( "bytes" "context" + "fmt" "io" - "math" "time" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/empty" pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1" "github.com/prysmaticlabs/prysm/shared/event" + "github.com/prysmaticlabs/prysm/shared/slotticker" "github.com/prysmaticlabs/prysm/validator/params" - "github.com/prysmaticlabs/prysm/validator/utils" "github.com/sirupsen/logrus" ) @@ -35,7 +35,6 @@ type Service struct { proposerAssignmentFeed *event.Feed processedAttestationFeed *event.Feed genesisTimestamp time.Time - slotAlignmentDuration time.Duration } // NewBeaconValidator instantiates a service that interacts with a beacon node @@ -50,7 +49,6 @@ func NewBeaconValidator(ctx context.Context, pubKey []byte, rpcClient rpcClientS attesterAssignmentFeed: new(event.Feed), proposerAssignmentFeed: new(event.Feed), processedAttestationFeed: new(event.Feed), - slotAlignmentDuration: time.Duration(params.DefaultConfig().SlotDuration) * time.Second, } } @@ -69,23 +67,21 @@ func (s *Service) Start() { // Note: this does not validate the current system time against a global // NTP server, which will be important to do in production. // currently in a cycle we are supposed to participate in. - s.fetchCurrentAssignmentsAndGenesisTime(beaconServiceClient) - - // Then, we kick off a routine that uses the begins a ticker based on the beacon node's - // genesis timestamp and the validator will use this slot ticker to - // determine when it is assigned to perform proposals or attestations. - // - // We block until the current time is a multiple of params.SlotDuration - // so the validator and beacon node's internal tickers are aligned. - utils.BlockingWait(s.slotAlignmentDuration) + if err := s.fetchCurrentAssignmentsAndGenesisTime(beaconServiceClient); err != nil { + log.Error(err) + return + } // We kick off a routine that listens for stream of validator assignment coming from // beacon node. This will update validator client on which slot, shard ID and what // responsbility to perform. go s.listenForAssignmentChange(beaconServiceClient) - slotTicker := time.NewTicker(s.slotAlignmentDuration) - go s.waitForAssignment(slotTicker.C, beaconServiceClient) + slotTicker := slotticker.GetSlotTicker(s.genesisTimestamp, params.DemoConfig().SlotDuration) + go func() { + s.waitForAssignment(slotTicker.C(), beaconServiceClient) + slotTicker.Done() + }() go s.listenForProcessedAttestations(beaconServiceClient) } @@ -110,7 +106,7 @@ func (s *Service) Stop() error { // // From this, the validator client can deduce what slot interval the beacon // node is in and determine when exactly it is time to propose or attest. -func (s *Service) fetchCurrentAssignmentsAndGenesisTime(client pb.BeaconServiceClient) { +func (s *Service) fetchCurrentAssignmentsAndGenesisTime(client pb.BeaconServiceClient) error { // Currently fetches assignments for all validators. req := &pb.ValidatorAssignmentRequest{ AllValidators: true, @@ -119,32 +115,23 @@ func (s *Service) fetchCurrentAssignmentsAndGenesisTime(client pb.BeaconServiceC if err != nil { // If this RPC request fails, the entire system should fatal as it is critical for // the validator to begin this way. - log.Fatalf("could not fetch genesis time and latest canonical state from beacon node: %v", err) + return fmt.Errorf("could not fetch genesis time and latest canonical state from beacon node: %v", err) } // Determine what slot the beacon node is in by checking the number of seconds // since the genesis block. genesisTimestamp, err := ptypes.Timestamp(res.GetGenesisTimestamp()) if err != nil { - log.Fatalf("cannot compute genesis timestamp: %v", err) + return fmt.Errorf("cannot compute genesis timestamp: %v", err) } - log.Infof("Setting validator genesis time to %s", genesisTimestamp.Format(time.UnixDate)) s.genesisTimestamp = genesisTimestamp - for _, assign := range res.Assignments { - if bytes.Equal(assign.PublicKey.PublicKey, s.pubKey) { - s.role = assign.Role - // + 1 to account for the genesis block being slot 0. - s.assignedSlot = s.CurrentCycleStartSlot(params.DemoConfig().CycleLength) + assign.AssignedSlot + 1 - s.shardID = assign.ShardId - log.Infof("Validator shuffled. Pub key 0x%s re-assigned to shard ID %d for %v duty at slot %d", - string(s.pubKey), - s.shardID, - s.role, - s.assignedSlot) - } + startSlot := s.startSlot() + if err := s.assignRole(res.Assignments, startSlot); err != nil { + return fmt.Errorf("unable to assign a role: %v", err) } + return nil } // listenForAssignmentChange listens for validator assignment changes via a RPC stream. @@ -153,7 +140,7 @@ func (s *Service) listenForAssignmentChange(client pb.BeaconServiceClient) { req := &pb.ValidatorAssignmentRequest{PublicKeys: []*pb.PublicKey{{PublicKey: s.pubKey}}} stream, err := client.ValidatorAssignments(s.ctx, req) if err != nil { - log.Errorf("could not fetch validator assigned slot and responsibility from beacon node: %v", err) + log.Errorf("failed to fetch validator assignments stream: %v", err) return } for { @@ -173,55 +160,43 @@ func (s *Service) listenForAssignmentChange(client pb.BeaconServiceClient) { break } - for _, assign := range assignment.Assignments { - if bytes.Equal(assign.PublicKey.PublicKey, s.pubKey) { - s.role = assign.Role - if s.CurrentCycleStartSlot(params.DemoConfig().CycleLength) == 0 { - // +1 to account for genesis block being slot 0. - s.assignedSlot = params.DemoConfig().CycleLength + assign.AssignedSlot + 1 - } else { - s.assignedSlot = s.CurrentCycleStartSlot(params.DemoConfig().CycleLength) + assign.AssignedSlot + 1 - } - s.shardID = assign.ShardId - - log.Infof("Validator with pub key 0x%s re-assigned to shard ID %d for %v duty at slot %d", - string(s.pubKey), - s.shardID, - s.role, - s.assignedSlot) - } + startSlot := s.startSlot() + if err := s.assignRole(assignment.Assignments, startSlot); err != nil { + log.Errorf("Could not assign a role for validator: %v", err) + break } } } // waitForAssignment waits till it's validator's role to attest or propose. Then it forwards // the canonical block to the proposer service or attester service to process. -func (s *Service) waitForAssignment(ticker <-chan time.Time, client pb.BeaconServiceClient) { +func (s *Service) waitForAssignment(ticker <-chan uint64, client pb.BeaconServiceClient) { for { select { case <-s.ctx.Done(): return - case <-ticker: - currentSlot := s.CurrentBeaconSlot() - log.Infof("role: %v, assigned slot: %d, current slot: %d", s.role, s.assignedSlot, currentSlot) - if s.role == pb.ValidatorRole_ATTESTER && s.assignedSlot == currentSlot { - log.WithField("slotNumber", s.CurrentBeaconSlot()).Info("Assigned attest slot number reached") - block, err := client.CanonicalHead(s.ctx, &empty.Empty{}) - if err != nil { - log.Errorf("Could not fetch canonical head via gRPC from beacon node: %v", err) - continue - } + case slot := <-ticker: + log = log.WithField("slot", slot) + log.Infof("tick") + + // Special case: skip responsibilities if assigned to the genesis block. + if s.assignedSlot != slot || s.assignedSlot == 0 { + continue + } + + block, err := client.CanonicalHead(s.ctx, &empty.Empty{}) + if err != nil { + log.Errorf("Could not fetch canonical head via gRPC from beacon node: %v", err) + continue + } + + if s.role == pb.ValidatorRole_ATTESTER { + log.Info("Assigned attestation slot number reached") // We forward the latest canonical block to the attester service a feed. s.attesterAssignmentFeed.Send(block) - - } else if s.role == pb.ValidatorRole_PROPOSER && s.assignedSlot == currentSlot { - log.WithField("slotNumber", s.CurrentBeaconSlot()).Info("Assigned proposal slot number reached") - block, err := client.CanonicalHead(s.ctx, &empty.Empty{}) - if err != nil { - log.Errorf("Could not fetch canonical head via gRPC from beacon node: %v", err) - continue - } + } else if s.role == pb.ValidatorRole_PROPOSER { + log.Info("Assigned proposal slot number reached") // We forward the latest canonical block to the proposer service a feed. s.proposerAssignmentFeed.Send(block) } @@ -259,6 +234,52 @@ func (s *Service) listenForProcessedAttestations(client pb.BeaconServiceClient) } } +// startSlot returns the first slot of the given slot's cycle. +func (s *Service) startSlot() uint64 { + duration := params.DemoConfig().SlotDuration + cycleLength := params.DemoConfig().CycleLength + slot := slotticker.CurrentSlot(s.genesisTimestamp, duration, time.Since) + return slot - slot%cycleLength +} + +func (s *Service) assignRole(assignments []*pb.Assignment, startSlot uint64) error { + var role pb.ValidatorRole + var assignedSlot uint64 + var shardID uint64 + for _, assign := range assignments { + if !bytes.Equal(assign.PublicKey.PublicKey, s.pubKey) { + continue + } + + role = assign.Role + assignedSlot = startSlot + assign.AssignedSlot + shardID = assign.ShardId + + log.Infof("Validator shuffled. Pub key %#x assigned to shard ID %d for %v duty at slot %d", + s.pubKey, + shardID, + role, + assignedSlot) + + break + } + + if role == pb.ValidatorRole_UNKNOWN { + return fmt.Errorf("validator role was not assigned for key: %x", s.pubKey) + } + + s.role = role + s.assignedSlot = assignedSlot + s.shardID = shardID + + log = log.WithFields(logrus.Fields{ + "role": role, + "assignedSlot": assignedSlot, + "shardID": shardID, + }) + return nil +} + // AttesterAssignmentFeed returns a feed that is written to whenever it is the validator's // slot to perform attestations. func (s *Service) AttesterAssignmentFeed() *event.Feed { @@ -276,24 +297,3 @@ func (s *Service) ProposerAssignmentFeed() *event.Feed { func (s *Service) ProcessedAttestationFeed() *event.Feed { return s.processedAttestationFeed } - -// PublicKey returns validator's public key. -func (s *Service) PublicKey() []byte { - return s.pubKey -} - -// CurrentBeaconSlot based on the genesis timestamp of the protocol. -func (s *Service) CurrentBeaconSlot() uint64 { - secondsSinceGenesis := time.Since(s.genesisTimestamp).Seconds() - if secondsSinceGenesis-params.DefaultConfig().SlotDuration < 0 { - return 0 - } - return uint64(math.Floor(secondsSinceGenesis/params.DefaultConfig().SlotDuration)) - 1 -} - -// CurrentCycleStartSlot returns the slot at which the current cycle started. -func (s *Service) CurrentCycleStartSlot(cycleLength uint64) uint64 { - currentSlot := s.CurrentBeaconSlot() - cycleNum := currentSlot / cycleLength - return uint64(cycleNum) * cycleLength -} diff --git a/validator/beacon/service_test.go b/validator/beacon/service_test.go index 51557e727f..bb0a5292b6 100644 --- a/validator/beacon/service_test.go +++ b/validator/beacon/service_test.go @@ -1,7 +1,6 @@ package beacon import ( - "bytes" "context" "errors" "io" @@ -16,7 +15,6 @@ import ( pb "github.com/prysmaticlabs/prysm/proto/beacon/rpc/v1" "github.com/prysmaticlabs/prysm/shared/testutil" "github.com/prysmaticlabs/prysm/validator/internal" - "github.com/prysmaticlabs/prysm/validator/params" "github.com/sirupsen/logrus" logTest "github.com/sirupsen/logrus/hooks/test" ) @@ -56,6 +54,15 @@ func (fc *mockLifecycleClient) BeaconServiceClient() pb.BeaconServiceClient { gomock.Any(), ).Return(&pb.CurrentAssignmentsResponse{ GenesisTimestamp: ptypes.TimestampNow(), + Assignments: []*pb.Assignment{ + { + PublicKey: &pb.PublicKey{ + PublicKey: []byte{0}, + }, + ShardId: 0, + Role: pb.ValidatorRole_PROPOSER, + }, + }, }, nil) attesterStream := internal.NewMockBeaconService_LatestAttestationClient(fc.ctrl) @@ -90,10 +97,9 @@ func TestLifecycle(t *testing.T) { if b.ProcessedAttestationFeed() == nil { t.Error("ProcessedAttestationFeed empty") } - if !bytes.Equal(b.PublicKey(), []byte{}) { - t.Error("Incorrect public key") - } - b.slotAlignmentDuration = time.Millisecond * 10 + + b.pubKey = []byte{0} + b.Start() time.Sleep(time.Millisecond * 10) testutil.AssertLogsContain(t, hook, "Starting service") @@ -101,16 +107,6 @@ func TestLifecycle(t *testing.T) { testutil.AssertLogsContain(t, hook, "Stopping service") } -func TestCurrentBeaconSlot(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - b := NewBeaconValidator(context.Background(), []byte{}, &mockLifecycleClient{ctrl}) - b.genesisTimestamp = time.Now() - if b.CurrentBeaconSlot() != 0 { - t.Errorf("Expected us to be in the 0th slot, received %v", b.CurrentBeaconSlot()) - } -} - func TestWaitForAssignmentProposer(t *testing.T) { hook := logTest.NewGlobal() ctrl := gomock.NewController(t) @@ -124,16 +120,15 @@ func TestWaitForAssignmentProposer(t *testing.T) { ).Return(nil, nil) exitRoutine := make(chan bool) - timeChan := make(chan time.Time) + slotChan := make(chan uint64) go func() { - b.waitForAssignment(timeChan, mockServiceClient) + b.waitForAssignment(slotChan, mockServiceClient) <-exitRoutine }() b.role = pb.ValidatorRole_PROPOSER - b.genesisTimestamp = time.Now() - b.assignedSlot = 0 - timeChan <- time.Now() + b.assignedSlot = 1 + slotChan <- 1 b.cancel() exitRoutine <- true @@ -153,16 +148,15 @@ func TestWaitForAssignmentProposerError(t *testing.T) { ).Return(nil, errors.New("failed")) exitRoutine := make(chan bool) - timeChan := make(chan time.Time) + slotChan := make(chan uint64) go func() { - b.waitForAssignment(timeChan, mockServiceClient) + b.waitForAssignment(slotChan, mockServiceClient) <-exitRoutine }() b.role = pb.ValidatorRole_PROPOSER - b.genesisTimestamp = time.Now() - b.assignedSlot = 0 - timeChan <- time.Now() + b.assignedSlot = 1 + slotChan <- 1 b.cancel() exitRoutine <- true @@ -182,20 +176,19 @@ func TestWaitForAssignmentAttester(t *testing.T) { ).Return(nil, nil) exitRoutine := make(chan bool) - timeChan := make(chan time.Time) + slotChan := make(chan uint64) go func() { - b.waitForAssignment(timeChan, mockServiceClient) + b.waitForAssignment(slotChan, mockServiceClient) <-exitRoutine }() b.role = pb.ValidatorRole_ATTESTER - b.genesisTimestamp = time.Now() - b.assignedSlot = 0 - timeChan <- time.Now() + b.assignedSlot = 1 + slotChan <- 1 b.cancel() exitRoutine <- true - testutil.AssertLogsContain(t, hook, "Assigned attest slot number reached") + testutil.AssertLogsContain(t, hook, "Assigned attestation slot number reached") } func TestWaitForAssignmentAttesterError(t *testing.T) { @@ -211,16 +204,15 @@ func TestWaitForAssignmentAttesterError(t *testing.T) { ).Return(nil, errors.New("failed")) exitRoutine := make(chan bool) - timeChan := make(chan time.Time) + slotChan := make(chan uint64) go func() { - b.waitForAssignment(timeChan, mockServiceClient) + b.waitForAssignment(slotChan, mockServiceClient) <-exitRoutine }() b.role = pb.ValidatorRole_ATTESTER - b.genesisTimestamp = time.Now() - b.assignedSlot = 0 - timeChan <- time.Now() + b.assignedSlot = 1 + slotChan <- 1 b.cancel() exitRoutine <- true @@ -302,11 +294,10 @@ func TestListenForAssignmentProposer(t *testing.T) { stream := internal.NewMockBeaconService_ValidatorAssignmentsClient(ctrl) // Testing proposer assignment. - assignedSlot := b.CurrentCycleStartSlot(params.DefaultConfig().CycleLength) + 2 stream.EXPECT().Recv().Return(&pb.ValidatorAssignmentResponse{Assignments: []*pb.Assignment{{ PublicKey: &pb.PublicKey{PublicKey: []byte{'A'}}, ShardId: 2, - AssignedSlot: assignedSlot, + AssignedSlot: 2, Role: pb.ValidatorRole_PROPOSER}}}, nil) stream.EXPECT().Recv().Return(&pb.ValidatorAssignmentResponse{}, io.EOF) @@ -316,9 +307,11 @@ func TestListenForAssignmentProposer(t *testing.T) { gomock.Any(), ).Return(stream, nil) + b.genesisTimestamp = time.Now() + b.pubKey = []byte{'A'} b.listenForAssignmentChange(mockServiceValidator) - testutil.AssertLogsContain(t, hook, "Validator with pub key 0xA re-assigned to shard ID 2 for PROPOSER duty") + testutil.AssertLogsContain(t, hook, "Validator shuffled. Pub key 0x41 assigned to shard ID 2 for PROPOSER duty") } func TestListenForAssignmentError(t *testing.T) { @@ -357,7 +350,6 @@ func TestListenForAssignmentClientError(t *testing.T) { b.listenForAssignmentChange(mockServiceValidator) testutil.AssertLogsContain(t, hook, "stream creation failed") - testutil.AssertLogsContain(t, hook, "could not fetch validator assigned slot and responsibility from beacon node") } func TestListenForAssignmentCancelContext(t *testing.T) { diff --git a/validator/node/node.go b/validator/node/node.go index bf1a4cc88f..b8270645a8 100644 --- a/validator/node/node.go +++ b/validator/node/node.go @@ -93,7 +93,7 @@ func NewShardInstance(ctx *cli.Context) (*ShardEthereum, error) { return nil, err } - if err := shardEthereum.registerAttesterService(); err != nil { + if err := shardEthereum.registerAttesterService(pubKey); err != nil { return nil, err } @@ -199,7 +199,7 @@ func (s *ShardEthereum) registerBeaconService(pubKey []byte) error { } // registerAttesterService that listens to assignments from the beacon service. -func (s *ShardEthereum) registerAttesterService() error { +func (s *ShardEthereum) registerAttesterService(pubKey []byte) error { var beaconService *beacon.Service if err := s.services.FetchService(&beaconService); err != nil { return err @@ -214,6 +214,7 @@ func (s *ShardEthereum) registerAttesterService() error { Assigner: beaconService, AssignmentBuf: 100, Client: rpcService, + PublicKey: pubKey, }) return s.services.RegisterService(att) } diff --git a/validator/params/config.go b/validator/params/config.go index 63f697a284..0fb9f01603 100644 --- a/validator/params/config.go +++ b/validator/params/config.go @@ -32,7 +32,7 @@ func DefaultCollationSizeLimit() int64 { // Config contains configs for node to participate in the sharded universe. type Config struct { - CollationSizeLimit int64 // CollationSizeLimit is the maximum size the serialized blobs in a collation can take. - SlotDuration float64 // SlotDuration in seconds. + CollationSizeLimit int64 // CollationSizeLimit is the maximum size the serialized blobs in a collation can take. + SlotDuration uint64 // SlotDuration in seconds. CycleLength uint64 } diff --git a/validator/proposer/service.go b/validator/proposer/service.go index 63618eb4d4..64689528cf 100644 --- a/validator/proposer/service.go +++ b/validator/proposer/service.go @@ -77,7 +77,6 @@ func (p *Proposer) Start() { go p.run(p.ctx.Done(), client) go p.processAttestation(p.ctx.Done()) - } // Stop the main loop. @@ -183,7 +182,7 @@ func (p *Proposer) run(done <-chan struct{}, client pb.ProposerServiceClient) { continue } - log.Infof("Block proposed successfully with hash 0x%x", res.BlockHash) + log.Infof("Block proposed successfully with hash %#x", res.BlockHash) p.pendingAttestation = nil p.lock.Unlock() } diff --git a/validator/proposer/service_test.go b/validator/proposer/service_test.go index ae7df3ffcd..efbd9d13ae 100644 --- a/validator/proposer/service_test.go +++ b/validator/proposer/service_test.go @@ -134,7 +134,7 @@ func TestProposerReceiveBeaconBlock(t *testing.T) { exitRoutine <- true testutil.AssertLogsContain(t, hook, "Performing proposer responsibility") - testutil.AssertLogsContain(t, hook, fmt.Sprintf("Block proposed successfully with hash 0x%x", []byte("hi"))) + testutil.AssertLogsContain(t, hook, fmt.Sprintf("Block proposed successfully with hash %#x", []byte("hi"))) testutil.AssertLogsContain(t, hook, "Proposer context closed") } @@ -226,7 +226,7 @@ func TestFullProposalOfBlock(t *testing.T) { exitRoutine <- true testutil.AssertLogsContain(t, hook, "Performing proposer responsibility") - testutil.AssertLogsContain(t, hook, fmt.Sprintf("Block proposed successfully with hash 0x%x", []byte("hi"))) + testutil.AssertLogsContain(t, hook, fmt.Sprintf("Block proposed successfully with hash %#x", []byte("hi"))) testutil.AssertLogsContain(t, hook, "Proposer context closed") testutil.AssertLogsContain(t, hook, "Attestation stored in memory") testutil.AssertLogsContain(t, hook, "Proposer context closed")