diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 1bebc5fe6b..8408e0bbf2 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -268,7 +268,7 @@ func (c *ChainService) blockProcessing() { continue } if !parentExists { - log.Debugf("Block points to nil parent: %v", err) + log.Debugf("Block points to nil parent: %#x", block.ParentHash()) continue } parent, err := c.beaconDB.GetBlock(block.ParentHash()) diff --git a/beacon-chain/db/block.go b/beacon-chain/db/block.go index d95fa0429f..87351117ce 100644 --- a/beacon-chain/db/block.go +++ b/beacon-chain/db/block.go @@ -126,34 +126,3 @@ func (db *BeaconDB) GetGenesisTime() (time.Time, error) { return genesisTime, nil } - -// GetSimulatedBlock retrieves the last block broadcast by the simulator. -func (db *BeaconDB) GetSimulatedBlock() (*types.Block, error) { - enc, err := db.get(simulatedBlockKey) - if err != nil { - return nil, err - } - - protoBlock := &pb.BeaconBlock{} - err = proto.Unmarshal(enc, protoBlock) - if err != nil { - return nil, err - } - - return types.NewBlock(protoBlock), nil -} - -// SaveSimulatedBlock saves the last broadcast block to the database. -func (db *BeaconDB) SaveSimulatedBlock(block *types.Block) error { - enc, err := block.Marshal() - if err != nil { - return err - } - - return db.put(simulatedBlockKey, enc) -} - -// HasSimulatedBlock checks if a block was broadcast by the simulator. -func (db *BeaconDB) HasSimulatedBlock() (bool, error) { - return db.has(simulatedBlockKey) -} diff --git a/beacon-chain/db/schema.go b/beacon-chain/db/schema.go index 55c60f6586..d88d18cec4 100644 --- a/beacon-chain/db/schema.go +++ b/beacon-chain/db/schema.go @@ -24,8 +24,6 @@ var ( // crystallizedStateLookupKey tracks the current crystallized state. crystallizedStateLookupKey = []byte("beacon-crystallized-state") - simulatedBlockKey = []byte("last-simulated-block") - // Data item suffixes. blockSuffix = []byte("-block") // blockhash + blockPrefix -> block canonicalSuffix = []byte("-canonical") // num(uint64 big endian) + cannoicalSuffix -> blockhash diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index a081f00b53..77e91426f2 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -301,7 +301,6 @@ func (b *BeaconNode) registerSimulatorService(ctx *cli.Context) error { defaultConf := simulator.DefaultConfig() cfg := &simulator.Config{ - Delay: defaultConf.Delay, BlockRequestBuf: defaultConf.BlockRequestBuf, BeaconDB: b.db, P2P: p2pService, diff --git a/beacon-chain/simulator/BUILD.bazel b/beacon-chain/simulator/BUILD.bazel index f72ac5799b..bd6b9c28a3 100644 --- a/beacon-chain/simulator/BUILD.bazel +++ b/beacon-chain/simulator/BUILD.bazel @@ -6,7 +6,6 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/beacon-chain/simulator", visibility = ["//beacon-chain:__subpackages__"], deps = [ - "//beacon-chain/params:go_default_library", "//beacon-chain/types:go_default_library", "//beacon-chain/utils:go_default_library", "//proto/beacon/p2p/v1:go_default_library", @@ -24,7 +23,6 @@ go_test( embed = [":go_default_library"], deps = [ "//beacon-chain/db:go_default_library", - "//beacon-chain/types:go_default_library", "//proto/beacon/p2p/v1:go_default_library", "//shared/event:go_default_library", "//shared/p2p:go_default_library", diff --git a/beacon-chain/simulator/service.go b/beacon-chain/simulator/service.go index 755b1f96ad..83746b8ef8 100644 --- a/beacon-chain/simulator/service.go +++ b/beacon-chain/simulator/service.go @@ -9,7 +9,6 @@ import ( "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes" - "github.com/prysmaticlabs/prysm/beacon-chain/params" "github.com/prysmaticlabs/prysm/beacon-chain/types" "github.com/prysmaticlabs/prysm/beacon-chain/utils" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -28,23 +27,17 @@ type p2pAPI interface { // Simulator struct. type Simulator struct { - ctx context.Context - cancel context.CancelFunc - p2p p2pAPI - web3Service types.POWChainService - beaconDB beaconDB - enablePOWChain bool - delay time.Duration - slotNum uint64 - slotTicker utils.SlotTicker - broadcastedBlocks map[[32]byte]*types.Block - broadcastedBlockHashes [][32]byte - blockRequestChan chan p2p.Message + ctx context.Context + cancel context.CancelFunc + p2p p2pAPI + web3Service types.POWChainService + beaconDB beaconDB + enablePOWChain bool + blockRequestChan chan p2p.Message } // Config options for the simulator service. type Config struct { - Delay time.Duration BlockRequestBuf int P2P p2pAPI Web3Service types.POWChainService @@ -53,12 +46,8 @@ type Config struct { } type beaconDB interface { - HasSimulatedBlock() (bool, error) - GetSimulatedBlock() (*types.Block, error) - SaveSimulatedBlock(*types.Block) error GetActiveState() *types.ActiveState GetCrystallizedState() *types.CrystallizedState - GetCanonicalBlockForSlot(uint64) (*types.Block, error) GetCanonicalBlock() (*types.Block, error) GetGenesisTime() (time.Time, error) } @@ -66,7 +55,6 @@ type beaconDB interface { // DefaultConfig options for the simulator. func DefaultConfig() *Config { return &Config{ - Delay: time.Second * time.Duration(params.GetConfig().SlotDuration), BlockRequestBuf: 100, } } @@ -75,17 +63,13 @@ func DefaultConfig() *Config { func NewSimulator(ctx context.Context, cfg *Config) *Simulator { ctx, cancel := context.WithCancel(ctx) return &Simulator{ - ctx: ctx, - cancel: cancel, - p2p: cfg.P2P, - web3Service: cfg.Web3Service, - beaconDB: cfg.BeaconDB, - delay: cfg.Delay, - enablePOWChain: cfg.EnablePOWChain, - slotNum: 1, - broadcastedBlocks: make(map[[32]byte]*types.Block), - broadcastedBlockHashes: [][32]byte{}, - blockRequestChan: make(chan p2p.Message, cfg.BlockRequestBuf), + ctx: ctx, + cancel: cancel, + p2p: cfg.P2P, + web3Service: cfg.Web3Service, + beaconDB: cfg.BeaconDB, + enablePOWChain: cfg.EnablePOWChain, + blockRequestChan: make(chan p2p.Message, cfg.BlockRequestBuf), } } @@ -98,153 +82,116 @@ func (sim *Simulator) Start() { return } - sim.slotTicker = utils.GetSlotTicker(genesisTime) - go sim.run(sim.slotTicker.C()) + slotTicker := utils.GetSlotTicker(genesisTime) + go func() { + sim.run(slotTicker.C(), sim.blockRequestChan) + close(sim.blockRequestChan) + slotTicker.Done() + }() } // Stop the sim. func (sim *Simulator) Stop() error { defer sim.cancel() log.Info("Stopping service") - sim.slotTicker.Done() - // Persist the last simulated block in the DB for future sessions - // to continue from the last simulated slot number. - if len(sim.broadcastedBlockHashes) > 0 { - lastBlockHash := sim.broadcastedBlockHashes[len(sim.broadcastedBlockHashes)-1] - lastBlock := sim.broadcastedBlocks[lastBlockHash] - return sim.beaconDB.SaveSimulatedBlock(lastBlock) - } return nil } -func (sim *Simulator) lastSimulatedSessionBlock() (*types.Block, error) { - hasBlock, err := sim.beaconDB.HasSimulatedBlock() - if err != nil { - return nil, fmt.Errorf("Could not determine if a previous simulation occurred: %v", err) - } - if !hasBlock { - return nil, nil - } - - simulatedBlock, err := sim.beaconDB.GetSimulatedBlock() - if err != nil { - return nil, fmt.Errorf("Could not fetch simulated block from db: %v", err) - } - return simulatedBlock, nil -} - -func (sim *Simulator) run(slotInterval <-chan uint64) { +func (sim *Simulator) run(slotInterval <-chan uint64, requestChan <-chan p2p.Message) { blockReqSub := sim.p2p.Subscribe(&pb.BeaconBlockRequest{}, sim.blockRequestChan) defer blockReqSub.Unsubscribe() - // Check if we saved a simulated block in the DB from a previous session. - // If that is the case, simulator will start from there. - var parentHash []byte - lastSimulatedBlock, err := sim.lastSimulatedSessionBlock() + lastBlock, err := sim.beaconDB.GetCanonicalBlock() if err != nil { - log.Errorf("Could not fetch last simulated session's block: %v", err) + log.Errorf("Could not fetch latest block: %v", err) + return } - if lastSimulatedBlock != nil { - h, err := lastSimulatedBlock.Hash() - if err != nil { - log.Errorf("Could not hash last simulated session's block: %v", err) - } - sim.slotNum = lastSimulatedBlock.SlotNumber() - sim.broadcastedBlockHashes = append(sim.broadcastedBlockHashes, h) + + lastHash, err := lastBlock.Hash() + if err != nil { + log.Errorf("Could not get hash of the latest block: %v", err) } + broadcastedBlocks := map[[32]byte]*types.Block{} + for { select { case <-sim.ctx.Done(): log.Debug("Simulator context closed, exiting goroutine") return case slot := <-slotInterval: - activeStateHash, err := sim.beaconDB.GetActiveState().Hash() + aStateHash, err := sim.beaconDB.GetActiveState().Hash() if err != nil { log.Errorf("Could not fetch active state hash: %v", err) continue } - crystallizedStateHash, err := sim.beaconDB.GetCrystallizedState().Hash() + cStateHash, err := sim.beaconDB.GetCrystallizedState().Hash() if err != nil { - log.Errorf("Could not fetch crystallized state hash: %v", err) + log.Errorf("Failed to fetch crystallized state hash: %v", err) continue } - // If we have not broadcast a simulated block yet, we set parent hash - // to the genesis block. - var hash [32]byte - if sim.slotNum == 1 { - genesisBlock, err := sim.beaconDB.GetCanonicalBlockForSlot(0) - if err != nil { - log.Errorf("Failed to retrieve genesis block: %v", err) - continue - } - hash, err = genesisBlock.Hash() - if err != nil { - log.Errorf("Failed to hash genesis block: %v", err) - continue - } - parentHash = hash[:] - } else { - parentHash = sim.broadcastedBlockHashes[len(sim.broadcastedBlockHashes)-1][:] - } - - log.WithField("currentSlot", sim.slotNum).Debug("Current slot") - var powChainRef []byte if sim.enablePOWChain { powChainRef = sim.web3Service.LatestBlockHash().Bytes() } else { - powChainRef = []byte{byte(sim.slotNum)} + powChainRef = []byte{byte(slot)} } + parentHash := make([]byte, 32) + copy(parentHash, lastHash[:]) block := types.NewBlock(&pb.BeaconBlock{ Slot: slot, Timestamp: ptypes.TimestampNow(), PowChainRef: powChainRef, - ActiveStateRoot: activeStateHash[:], - CrystallizedStateRoot: crystallizedStateHash[:], + ActiveStateRoot: aStateHash[:], + CrystallizedStateRoot: cStateHash[:], AncestorHashes: [][]byte{parentHash}, Attestations: []*pb.AggregatedAttestation{ - {Slot: sim.slotNum - 1, AttesterBitfield: []byte{byte(255)}}, + {Slot: slot - 1, AttesterBitfield: []byte{byte(255)}}, }, }) - sim.slotNum++ - - h, err := block.Hash() + hash, err := block.Hash() if err != nil { log.Errorf("Could not hash simulated block: %v", err) continue } - - log.WithField("announcedBlockHash", fmt.Sprintf("%#x", h)).Debug("Announcing block hash") sim.p2p.Broadcast(&pb.BeaconBlockHashAnnounce{ - Hash: h[:], + Hash: hash[:], }) - // We then store the block in a map for later retrieval upon a request for its full - // data being sent back. - sim.broadcastedBlocks[h] = block - sim.broadcastedBlockHashes = append(sim.broadcastedBlockHashes, h) - case msg := <-sim.blockRequestChan: + log.WithFields(logrus.Fields{ + "hash": fmt.Sprintf("%#x", hash), + "slot": slot, + }).Debug("Broadcast block hash") + + broadcastedBlocks[hash] = block + + lastHash = hash + case msg := <-requestChan: data := msg.Data.(*pb.BeaconBlockRequest) - var h [32]byte - copy(h[:], data.Hash[:32]) + var hash [32]byte + copy(hash[:], data.Hash) - block := sim.broadcastedBlocks[h] - h, err := block.Hash() - if err != nil { - log.Errorf("Could not hash block: %v", err) + block := broadcastedBlocks[hash] + if block == nil { + log.Errorf("Requested block not found: %#x", hash) continue } - log.Debugf("Responding to full block request for hash: %#x", h) + + log.WithFields(logrus.Fields{ + "hash": fmt.Sprintf("%#x", hash), + }).Debug("Responding to full block request") + // Sends the full block body to the requester. res := &pb.BeaconBlockResponse{Block: block.Proto(), Attestation: &pb.AggregatedAttestation{ - Slot: sim.slotNum - 1, + Slot: block.SlotNumber(), AttesterBitfield: []byte{byte(255)}, }} sim.p2p.Send(res, msg.Peer) + + delete(broadcastedBlocks, hash) } } } diff --git a/beacon-chain/simulator/service_test.go b/beacon-chain/simulator/service_test.go index 31a074ba1e..c4bb0c7799 100644 --- a/beacon-chain/simulator/service_test.go +++ b/beacon-chain/simulator/service_test.go @@ -2,15 +2,12 @@ package simulator import ( "context" - "fmt" "io/ioutil" "testing" - "time" "github.com/ethereum/go-ethereum/common" "github.com/golang/protobuf/proto" "github.com/prysmaticlabs/prysm/beacon-chain/db" - "github.com/prysmaticlabs/prysm/beacon-chain/types" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/event" "github.com/prysmaticlabs/prysm/shared/p2p" @@ -24,13 +21,17 @@ func init() { logrus.SetOutput(ioutil.Discard) } -type mockP2P struct{} +type mockP2P struct { + broadcastHash []byte +} func (mp *mockP2P) Subscribe(msg proto.Message, channel chan p2p.Message) event.Subscription { return new(event.Feed).Subscribe(channel) } -func (mp *mockP2P) Broadcast(msg proto.Message) {} +func (mp *mockP2P) Broadcast(msg proto.Message) { + mp.broadcastHash = msg.(*pb.BeaconBlockHashAnnounce).GetHash() +} func (mp *mockP2P) Send(msg proto.Message, peer p2p.Peer) {} @@ -40,7 +41,7 @@ func (mpow *mockPOWChainService) LatestBlockHash() common.Hash { return common.BytesToHash([]byte{}) } -func setupSimulator(t *testing.T) *Simulator { +func setupSimulator(t *testing.T) (*Simulator, *mockP2P) { ctx := context.Background() config := db.Config{Path: "", Name: "", InMemory: true} @@ -49,21 +50,22 @@ func setupSimulator(t *testing.T) *Simulator { t.Fatalf("could not setup beaconDB: %v", err) } + p2pService := &mockP2P{} + cfg := &Config{ - Delay: time.Second, BlockRequestBuf: 0, - P2P: &mockP2P{}, + P2P: p2pService, Web3Service: &mockPOWChainService{}, BeaconDB: db, EnablePOWChain: true, } - return NewSimulator(ctx, cfg) + return NewSimulator(ctx, cfg), p2pService } func TestLifecycle(t *testing.T) { hook := logTest.NewGlobal() - sim := setupSimulator(t) + sim, _ := setupSimulator(t) sim.Start() testutil.AssertLogsContain(t, hook, "Starting service") @@ -78,60 +80,57 @@ func TestLifecycle(t *testing.T) { func TestBroadcastBlockHash(t *testing.T) { hook := logTest.NewGlobal() - sim := setupSimulator(t) + sim, p2pService := setupSimulator(t) - delayChan := make(chan uint64) + slotChan := make(chan uint64) + requestChan := make(chan p2p.Message) exitRoutine := make(chan bool) go func() { - sim.run(delayChan) + sim.run(slotChan, requestChan) <-exitRoutine }() - delayChan <- 0 + // trigger a new block + slotChan <- 1 + + // test an invalid block request + requestChan <- p2p.Message{ + Data: &pb.BeaconBlockRequest{ + Hash: make([]byte, 32), + }, + } + + // test a valid block request + blockHash := p2pService.broadcastHash + requestChan <- p2p.Message{ + Data: &pb.BeaconBlockRequest{ + Hash: blockHash, + }, + } + + // trigger another block + slotChan <- 2 + + testutil.AssertLogsContain(t, hook, "Broadcast block hash") + testutil.AssertLogsContain(t, hook, "Requested block not found") + testutil.AssertLogsContain(t, hook, "Responding to full block request") + + // reset logs + hook.Reset() + + // ensure that another request for the same block can't be made + requestChan <- p2p.Message{ + Data: &pb.BeaconBlockRequest{ + Hash: blockHash, + }, + } + sim.cancel() exitRoutine <- true - testutil.AssertLogsContain(t, hook, "Announcing block hash") + testutil.AssertLogsContain(t, hook, "Requested block not found") + testutil.AssertLogsDoNotContain(t, hook, "Responding to full block request") - if len(sim.broadcastedBlockHashes) != 1 { - t.Error("Did not store the broadcasted block hash") - } hook.Reset() } - -func TestBlockRequest(t *testing.T) { - hook := logTest.NewGlobal() - sim := setupSimulator(t) - - delayChan := make(chan uint64) - exitRoutine := make(chan bool) - - go func() { - sim.run(delayChan) - <-exitRoutine - }() - - block := types.NewBlock(&pb.BeaconBlock{AncestorHashes: make([][]byte, 32)}) - h, err := block.Hash() - if err != nil { - t.Fatal(err) - } - - data := &pb.BeaconBlockRequest{ - Hash: h[:], - } - - msg := p2p.Message{ - Peer: p2p.Peer{}, - Data: data, - } - - sim.broadcastedBlocks[h] = block - - sim.blockRequestChan <- msg - sim.cancel() - exitRoutine <- true - - testutil.AssertLogsContain(t, hook, fmt.Sprintf("Responding to full block request for hash: %#x", h)) -} diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index fa2dd45471..a8f380f54e 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -211,6 +211,8 @@ func (ss *Service) receiveBlock(msg p2p.Message) { log.Errorf("Could not hash received block: %v", err) } + log.Infof("Processing response to block request: %#x", blockHash) + ctx, containsBlockSpan := trace.StartSpan(ctx, "containsBlock") blockExists, err := ss.db.HasBlock(blockHash) containsBlockSpan.End() @@ -219,6 +221,7 @@ func (ss *Service) receiveBlock(msg p2p.Message) { return } if blockExists { + log.Debug("Received a block that already exists. Exiting...") return } diff --git a/validator/beacon/service.go b/validator/beacon/service.go index 5b75454863..8cab49f750 100644 --- a/validator/beacon/service.go +++ b/validator/beacon/service.go @@ -170,7 +170,7 @@ func (s *Service) listenForAssignmentChange(client pb.BeaconServiceClient) { if err != nil { log.Errorf("Could not receive latest validator assignment from stream: %v", err) - continue + break } for _, assign := range assignment.Assignments { diff --git a/validator/beacon/service_test.go b/validator/beacon/service_test.go index 6a1bba591b..51557e727f 100644 --- a/validator/beacon/service_test.go +++ b/validator/beacon/service_test.go @@ -319,13 +319,19 @@ func TestListenForAssignmentProposer(t *testing.T) { b.listenForAssignmentChange(mockServiceValidator) testutil.AssertLogsContain(t, hook, "Validator with pub key 0xA re-assigned to shard ID 2 for PROPOSER duty") +} + +func TestListenForAssignmentError(t *testing.T) { + hook := logTest.NewGlobal() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + b := NewBeaconValidator(context.Background(), []byte{'A'}, &mockClient{ctrl}) // Testing an error coming from the stream. - stream = internal.NewMockBeaconService_ValidatorAssignmentsClient(ctrl) + stream := internal.NewMockBeaconService_ValidatorAssignmentsClient(ctrl) stream.EXPECT().Recv().Return(&pb.ValidatorAssignmentResponse{}, errors.New("stream error")) - stream.EXPECT().Recv().Return(&pb.ValidatorAssignmentResponse{}, io.EOF) - mockServiceValidator = internal.NewMockBeaconServiceClient(ctrl) + mockServiceValidator := internal.NewMockBeaconServiceClient(ctrl) mockServiceValidator.EXPECT().ValidatorAssignments( gomock.Any(), gomock.Any(), @@ -334,24 +340,38 @@ func TestListenForAssignmentProposer(t *testing.T) { b.listenForAssignmentChange(mockServiceValidator) testutil.AssertLogsContain(t, hook, "stream error") +} + +func TestListenForAssignmentClientError(t *testing.T) { + hook := logTest.NewGlobal() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + b := NewBeaconValidator(context.Background(), []byte{'A'}, &mockClient{ctrl}) // Creating a faulty stream will trigger error. - mockServiceValidator = internal.NewMockBeaconServiceClient(ctrl) + mockServiceValidator := internal.NewMockBeaconServiceClient(ctrl) mockServiceValidator.EXPECT().ValidatorAssignments( gomock.Any(), gomock.Any(), - ).Return(stream, errors.New("stream creation failed")) + ).Return(nil, errors.New("stream creation failed")) b.listenForAssignmentChange(mockServiceValidator) testutil.AssertLogsContain(t, hook, "stream creation failed") testutil.AssertLogsContain(t, hook, "could not fetch validator assigned slot and responsibility from beacon node") +} + +func TestListenForAssignmentCancelContext(t *testing.T) { + hook := logTest.NewGlobal() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + b := NewBeaconValidator(context.Background(), []byte{'A'}, &mockClient{ctrl}) // Test that the routine exits when context is closed - stream = internal.NewMockBeaconService_ValidatorAssignmentsClient(ctrl) + stream := internal.NewMockBeaconService_ValidatorAssignmentsClient(ctrl) stream.EXPECT().Recv().Return(&pb.ValidatorAssignmentResponse{}, nil) //mockServiceClient = internal.NewMockBeaconServiceClient(ctrl) - mockServiceValidator = internal.NewMockBeaconServiceClient(ctrl) + mockServiceValidator := internal.NewMockBeaconServiceClient(ctrl) mockServiceValidator.EXPECT().ValidatorAssignments( gomock.Any(), gomock.Any(),