diff --git a/beacon-chain/attestation/service_test.go b/beacon-chain/attestation/service_test.go index 906ce74421..3977342aba 100644 --- a/beacon-chain/attestation/service_test.go +++ b/beacon-chain/attestation/service_test.go @@ -49,6 +49,7 @@ func TestUpdateLatestAttestation_UpdatesLatest(t *testing.T) { if err := beaconDB.SaveBlock(block); err != nil { t.Fatal(err) } + beaconState.LatestBlock = block if err := beaconDB.UpdateChainHead(ctx, block, beaconState); err != nil { t.Fatal(err) } @@ -76,6 +77,7 @@ func TestUpdateLatestAttestation_UpdatesLatest(t *testing.T) { Slot: params.BeaconConfig().GenesisSlot + 36, ValidatorRegistry: validators, } + beaconState.LatestBlock = block if err := beaconDB.UpdateChainHead(ctx, block, beaconState); err != nil { t.Fatalf("could not save state: %v", err) } @@ -115,6 +117,7 @@ func TestAttestationPool_UpdatesAttestationPool(t *testing.T) { if err := beaconDB.SaveBlock(block); err != nil { t.Fatal(err) } + beaconState.LatestBlock = block if err := beaconDB.UpdateChainHead(ctx, block, beaconState); err != nil { t.Fatal(err) } @@ -140,6 +143,7 @@ func TestLatestAttestationTarget_CantGetAttestation(t *testing.T) { if err := beaconDB.SaveState(ctx, &pb.BeaconState{ ValidatorRegistry: []*pb.Validator{{}}, + LatestBlock: &pb.BeaconBlock{Slot: params.BeaconConfig().GenesisSlot}, }); err != nil { t.Fatalf("could not save state: %v", err) } @@ -164,6 +168,7 @@ func TestLatestAttestationTarget_ReturnsLatestAttestedBlock(t *testing.T) { pubKey := []byte{'A'} if err := beaconDB.SaveState(ctx, &pb.BeaconState{ ValidatorRegistry: []*pb.Validator{{Pubkey: pubKey}}, + LatestBlock: &pb.BeaconBlock{Slot: params.BeaconConfig().GenesisSlot}, }); err != nil { t.Fatalf("could not save state: %v", err) } @@ -233,6 +238,7 @@ func TestUpdateLatestAttestation_CacheEnabledAndMiss(t *testing.T) { if err := beaconDB.SaveBlock(block); err != nil { t.Fatal(err) } + beaconState.LatestBlock = block if err := beaconDB.UpdateChainHead(ctx, block, beaconState); err != nil { t.Fatal(err) } @@ -263,6 +269,7 @@ func TestUpdateLatestAttestation_CacheEnabledAndMiss(t *testing.T) { Slot: params.BeaconConfig().GenesisSlot + 36, ValidatorRegistry: validators, } + beaconState.LatestBlock = block if err := beaconDB.UpdateChainHead(ctx, block, beaconState); err != nil { t.Fatalf("could not save state: %v", err) } @@ -316,6 +323,7 @@ func TestUpdateLatestAttestation_CacheEnabledAndHit(t *testing.T) { if err := beaconDB.SaveBlock(block); err != nil { t.Fatal(err) } + beaconState.LatestBlock = block if err := beaconDB.UpdateChainHead(ctx, block, beaconState); err != nil { t.Fatal(err) } @@ -383,6 +391,7 @@ func TestUpdateLatestAttestation_InvalidIndex(t *testing.T) { if err := beaconDB.SaveBlock(block); err != nil { t.Fatal(err) } + beaconState.LatestBlock = block if err := beaconDB.UpdateChainHead(ctx, block, beaconState); err != nil { t.Fatal(err) } @@ -425,6 +434,7 @@ func TestUpdateLatestAttestation_BatchUpdate(t *testing.T) { if err := beaconDB.SaveBlock(block); err != nil { t.Fatal(err) } + beaconState.LatestBlock = block if err := beaconDB.UpdateChainHead(ctx, block, beaconState); err != nil { t.Fatal(err) } diff --git a/beacon-chain/blockchain/block_processing.go b/beacon-chain/blockchain/block_processing.go index 720f2885cf..d7c1955ca8 100644 --- a/beacon-chain/blockchain/block_processing.go +++ b/beacon-chain/blockchain/block_processing.go @@ -68,7 +68,7 @@ func (c *ChainService) ReceiveBlock(ctx context.Context, block *pb.BeaconBlock) if parent == nil { return nil, errors.New("parent does not exist in DB") } - beaconState, err := c.beaconDB.HistoricalStateFromSlot(ctx, parent.Slot) + beaconState, err := c.beaconDB.HistoricalStateFromSlot(ctx, parent.Slot, parentRoot) if err != nil { return nil, fmt.Errorf("could not retrieve beacon state: %v", err) } @@ -286,8 +286,12 @@ func (c *ChainService) runStateTransition( "slotsSinceGenesis", newState.Slot-params.BeaconConfig().GenesisSlot, ).Info("Block transition successfully processed") + blockRoot, err := hashutil.HashBeaconBlock(block) + if err != nil { + return nil, err + } // Save Historical States. - if err := c.beaconDB.SaveHistoricalState(ctx, beaconState); err != nil { + if err := c.beaconDB.SaveHistoricalState(ctx, beaconState, blockRoot); err != nil { return nil, fmt.Errorf("could not save historical state: %v", err) } } diff --git a/beacon-chain/blockchain/block_processing_test.go b/beacon-chain/blockchain/block_processing_test.go index 3e9c54fd46..59be582a66 100644 --- a/beacon-chain/blockchain/block_processing_test.go +++ b/beacon-chain/blockchain/block_processing_test.go @@ -32,7 +32,7 @@ func initBlockStateRoot(t *testing.T, block *pb.BeaconBlock, chainService *Chain if err != nil { t.Fatal(err) } - beaconState, err := chainService.beaconDB.HistoricalStateFromSlot(context.Background(), parent.Slot) + beaconState, err := chainService.beaconDB.HistoricalStateFromSlot(context.Background(), parent.Slot, parentRoot) if err != nil { t.Fatalf("Unable to retrieve state %v", err) } @@ -116,10 +116,6 @@ func TestReceiveBlock_ProcessCorrectly(t *testing.T) { if err != nil { t.Fatalf("Could not tree hash state: %v", err) } - if err := db.SaveHistoricalState(ctx, beaconState); err != nil { - t.Fatal(err) - } - genesis := b.NewGenesisBlock([]byte{}) if err := chainService.beaconDB.SaveBlock(genesis); err != nil { t.Fatalf("Could not save block to db: %v", err) @@ -128,6 +124,11 @@ func TestReceiveBlock_ProcessCorrectly(t *testing.T) { if err != nil { t.Fatalf("Unable to get tree hash root of canonical head: %v", err) } + + if err := db.SaveHistoricalState(ctx, beaconState, parentHash); err != nil { + t.Fatal(err) + } + if err := chainService.beaconDB.UpdateChainHead(ctx, genesis, beaconState); err != nil { t.Fatal(err) } @@ -183,9 +184,7 @@ func TestReceiveBlock_UsesParentBlockState(t *testing.T) { if err != nil { t.Fatalf("Can't generate genesis state: %v", err) } - if err := chainService.beaconDB.SaveHistoricalState(ctx, beaconState); err != nil { - t.Fatal(err) - } + stateRoot, err := hashutil.HashProto(beaconState) if err != nil { t.Fatalf("Could not tree hash state: %v", err) @@ -195,7 +194,9 @@ func TestReceiveBlock_UsesParentBlockState(t *testing.T) { if err := chainService.beaconDB.UpdateChainHead(ctx, genesisBlock, beaconState); err != nil { t.Fatal(err) } - + if err := chainService.beaconDB.SaveHistoricalState(ctx, beaconState, parentHash); err != nil { + t.Fatal(err) + } // We ensure the block uses the right state parent if its ancestor is not block.Slot-1. block := &pb.BeaconBlock{ Slot: beaconState.Slot + 4, @@ -242,14 +243,14 @@ func TestReceiveBlock_DeletesBadBlock(t *testing.T) { if err != nil { t.Fatalf("Could not tree hash state: %v", err) } - if err := chainService.beaconDB.SaveHistoricalState(ctx, beaconState); err != nil { - t.Fatal(err) - } parentHash, genesisBlock := setupGenesisBlock(t, chainService) if err := chainService.beaconDB.UpdateChainHead(ctx, genesisBlock, beaconState); err != nil { t.Fatal(err) } + if err := chainService.beaconDB.SaveHistoricalState(ctx, beaconState, parentHash); err != nil { + t.Fatal(err) + } beaconState.Slot++ @@ -321,10 +322,11 @@ func TestReceiveBlock_CheckBlockStateRoot_GoodState(t *testing.T) { if err != nil { t.Fatalf("Can't generate genesis state: %v", err) } - if err := chainService.beaconDB.SaveHistoricalState(ctx, beaconState); err != nil { + + parentHash, genesisBlock := setupGenesisBlock(t, chainService) + if err := chainService.beaconDB.SaveHistoricalState(ctx, beaconState, parentHash); err != nil { t.Fatal(err) } - parentHash, genesisBlock := setupGenesisBlock(t, chainService) beaconState.Slot++ if err := chainService.beaconDB.UpdateChainHead(ctx, genesisBlock, beaconState); err != nil { t.Fatal(err) @@ -365,10 +367,10 @@ func TestReceiveBlock_CheckBlockStateRoot_BadState(t *testing.T) { if err != nil { t.Fatalf("Can't generate genesis state: %v", err) } - if err := chainService.beaconDB.SaveHistoricalState(ctx, beaconState); err != nil { + parentHash, genesisBlock := setupGenesisBlock(t, chainService) + if err := chainService.beaconDB.SaveHistoricalState(ctx, beaconState, parentHash); err != nil { t.Fatal(err) } - parentHash, genesisBlock := setupGenesisBlock(t, chainService) beaconState.Slot++ if err := chainService.beaconDB.UpdateChainHead(ctx, genesisBlock, beaconState); err != nil { t.Fatal(err) @@ -453,7 +455,7 @@ func TestReceiveBlock_RemovesPendingDeposits(t *testing.T) { } depositRoot := depositTrie.Root() beaconState.LatestEth1Data.DepositRootHash32 = depositRoot[:] - if err := db.SaveHistoricalState(context.Background(), beaconState); err != nil { + if err := db.SaveHistoricalState(context.Background(), beaconState, parentHash); err != nil { t.Fatal(err) } @@ -471,6 +473,7 @@ func TestReceiveBlock_RemovesPendingDeposits(t *testing.T) { }, } + beaconState.LatestBlock = block beaconState.Slot-- beaconState.DepositIndex = 0 if err := chainService.beaconDB.SaveState(ctx, beaconState); err != nil { @@ -502,7 +505,7 @@ func TestReceiveBlock_RemovesPendingDeposits(t *testing.T) { if err := chainService.beaconDB.SaveState(ctx, beaconState); err != nil { t.Fatal(err) } - if err := db.SaveHistoricalState(context.Background(), beaconState); err != nil { + if err := db.SaveHistoricalState(context.Background(), beaconState, blockRoot); err != nil { t.Fatal(err) } computedState, err := chainService.ReceiveBlock(context.Background(), block) @@ -579,6 +582,7 @@ func TestReceiveBlock_OnChainSplit(t *testing.T) { t.Fatalf("Could not tree hash state: %v", err) } parentHash, genesisBlock := setupGenesisBlock(t, chainService) + beaconState.LatestBlock = genesisBlock if err := db.UpdateChainHead(ctx, genesisBlock, beaconState); err != nil { t.Fatal(err) } @@ -609,6 +613,7 @@ func TestReceiveBlock_OnChainSplit(t *testing.T) { if err = db.SaveBlock(block); err != nil { t.Fatal(err) } + computedState.LatestBlock = block if err = db.UpdateChainHead(ctx, block, computedState); err != nil { t.Fatal(err) } @@ -619,7 +624,7 @@ func TestReceiveBlock_OnChainSplit(t *testing.T) { } // Common ancestor is block at slot 3 - commonAncestor, err := db.BlockBySlot(ctx, genesisSlot+3) + commonAncestor, err := db.CanonicalBlockBySlot(ctx, genesisSlot+3) if err != nil { t.Fatal(err) } @@ -629,7 +634,7 @@ func TestReceiveBlock_OnChainSplit(t *testing.T) { t.Fatal(err) } - beaconState, err = db.HistoricalStateFromSlot(ctx, commonAncestor.Slot) + beaconState, err = db.HistoricalStateFromSlot(ctx, commonAncestor.Slot, parentHash) if err != nil { t.Fatal(err) } @@ -645,6 +650,10 @@ func TestReceiveBlock_OnChainSplit(t *testing.T) { RandaoReveal: createRandaoReveal(t, beaconState, privKeys), Body: &pb.BeaconBlockBody{}, } + rootF, _ := hashutil.HashBeaconBlock(blockF) + if err := db.SaveHistoricalState(ctx, beaconState, rootF); err != nil { + t.Fatal(err) + } initBlockStateRoot(t, blockF, chainService) computedState, err := chainService.ReceiveBlock(ctx, blockF) diff --git a/beacon-chain/blockchain/fork_choice.go b/beacon-chain/blockchain/fork_choice.go index 3a7ffa3712..5267cfc66c 100644 --- a/beacon-chain/blockchain/fork_choice.go +++ b/beacon-chain/blockchain/fork_choice.go @@ -45,7 +45,7 @@ func (c *ChainService) updateFFGCheckPts(ctx context.Context, state *pb.BeaconSt // the slot of justified block saved in DB. if lastJustifiedSlot > savedJustifiedBlock.Slot { // Retrieve the new justified block from DB using the new justified slot and save it. - newJustifiedBlock, err := c.beaconDB.BlockBySlot(ctx, lastJustifiedSlot) + newJustifiedBlock, err := c.beaconDB.CanonicalBlockBySlot(ctx, lastJustifiedSlot) if err != nil { return err } @@ -55,14 +55,18 @@ func (c *ChainService) updateFFGCheckPts(ctx context.Context, state *pb.BeaconSt for newJustifiedBlock == nil { log.WithField("slot", lastAvailBlkSlot-params.BeaconConfig().GenesisSlot).Debug("Missing block in DB, looking one slot back") lastAvailBlkSlot-- - newJustifiedBlock, err = c.beaconDB.BlockBySlot(ctx, lastAvailBlkSlot) + newJustifiedBlock, err = c.beaconDB.CanonicalBlockBySlot(ctx, lastAvailBlkSlot) if err != nil { return err } } + newJustifiedRoot, err := hashutil.HashBeaconBlock(newJustifiedBlock) + if err != nil { + return err + } // Fetch justified state from historical states db. - newJustifiedState, err := c.beaconDB.HistoricalStateFromSlot(ctx, newJustifiedBlock.Slot) + newJustifiedState, err := c.beaconDB.HistoricalStateFromSlot(ctx, newJustifiedBlock.Slot, newJustifiedRoot) if err != nil { return err } @@ -83,7 +87,7 @@ func (c *ChainService) updateFFGCheckPts(ctx context.Context, state *pb.BeaconSt } if lastFinalizedSlot > savedFinalizedBlock.Slot { // Retrieve the new finalized block from DB using the new finalized slot and save it. - newFinalizedBlock, err := c.beaconDB.BlockBySlot(ctx, lastFinalizedSlot) + newFinalizedBlock, err := c.beaconDB.CanonicalBlockBySlot(ctx, lastFinalizedSlot) if err != nil { return err } @@ -93,15 +97,19 @@ func (c *ChainService) updateFFGCheckPts(ctx context.Context, state *pb.BeaconSt for newFinalizedBlock == nil { log.WithField("slot", lastAvailBlkSlot-params.BeaconConfig().GenesisSlot).Debug("Missing block in DB, looking one slot back") lastAvailBlkSlot-- - newFinalizedBlock, err = c.beaconDB.BlockBySlot(ctx, lastAvailBlkSlot) + newFinalizedBlock, err = c.beaconDB.CanonicalBlockBySlot(ctx, lastAvailBlkSlot) if err != nil { return err } } + newFinalizedRoot, err := hashutil.HashBeaconBlock(newFinalizedBlock) + if err != nil { + return err + } // Generate the new finalized state with using new finalized block and // save it. - newFinalizedState, err := c.beaconDB.HistoricalStateFromSlot(ctx, lastFinalizedSlot) + newFinalizedState, err := c.beaconDB.HistoricalStateFromSlot(ctx, lastFinalizedSlot, newFinalizedRoot) if err != nil { return err } @@ -176,7 +184,7 @@ func (c *ChainService) ApplyForkChoiceRule( "newRoot": fmt.Sprintf("%#x", bytesutil.Trunc(newHeadRoot[:])), }).Warn("Reorg happened") // Only regenerate head state if there was a reorg. - newState, err = c.beaconDB.HistoricalStateFromSlot(ctx, newHead.Slot) + newState, err = c.beaconDB.HistoricalStateFromSlot(ctx, newHead.Slot, newHeadRoot) if err != nil { return fmt.Errorf("could not gen state: %v", err) } @@ -189,7 +197,7 @@ func (c *ChainService) ApplyForkChoiceRule( // If we receive forked blocks. if newHead.Slot != newState.Slot { - newState, err = c.beaconDB.HistoricalStateFromSlot(ctx, newHead.Slot) + newState, err = c.beaconDB.HistoricalStateFromSlot(ctx, newHead.Slot, newHeadRoot) if err != nil { return fmt.Errorf("could not gen state: %v", err) } @@ -296,29 +304,28 @@ func (c *ChainService) lmdGhost( // get_children(store: Store, block: BeaconBlock) -> List[BeaconBlock] // returns the child blocks of the given block. func (c *ChainService) blockChildren(ctx context.Context, block *pb.BeaconBlock, highestSlot uint64) ([]*pb.BeaconBlock, error) { - var children []*pb.BeaconBlock - - currentRoot, err := hashutil.HashBeaconBlock(block) + blockRoot, err := hashutil.HashBeaconBlock(block) if err != nil { - return nil, fmt.Errorf("could not tree hash incoming block: %v", err) + return nil, err } + var children []*pb.BeaconBlock startSlot := block.Slot + 1 for i := startSlot; i <= highestSlot; i++ { - block, err := c.beaconDB.BlockBySlot(ctx, i) + kids, err := c.beaconDB.BlocksBySlot(ctx, i) if err != nil { return nil, fmt.Errorf("could not get block by slot: %v", err) } - // Continue if there's a skip block. - if block == nil { - continue - } + children = append(children, kids...) + } - parentRoot := bytesutil.ToBytes32(block.ParentRootHash32) - if currentRoot == parentRoot { - children = append(children, block) + filteredChildren := []*pb.BeaconBlock{} + for _, kid := range children { + parentRoot := bytesutil.ToBytes32(kid.ParentRootHash32) + if blockRoot == parentRoot { + filteredChildren = append(filteredChildren, kid) } } - return children, nil + return filteredChildren, nil } // isDescendant checks if the new head block is a descendant block of the current head. diff --git a/beacon-chain/blockchain/fork_choice_reorg_test.go b/beacon-chain/blockchain/fork_choice_reorg_test.go index 996dd3163f..3795de5529 100644 --- a/beacon-chain/blockchain/fork_choice_reorg_test.go +++ b/beacon-chain/blockchain/fork_choice_reorg_test.go @@ -103,7 +103,7 @@ func TestApplyForkChoice_ChainSplitReorg(t *testing.T) { if err := chainService.beaconDB.SaveBlock(blocks[forkIndex]); err != nil { t.Fatal(err) } - if err := chainService.beaconDB.SaveHistoricalState(ctx, forkState); err != nil { + if err := chainService.beaconDB.SaveHistoricalState(ctx, forkState, roots[forkIndex]); err != nil { t.Fatal(err) } } @@ -127,7 +127,7 @@ func TestApplyForkChoice_ChainSplitReorg(t *testing.T) { } chainService.attsService = attHandler - block4State, err := chainService.beaconDB.HistoricalStateFromSlot(ctx, blocks[4].Slot) + block4State, err := chainService.beaconDB.HistoricalStateFromSlot(ctx, blocks[4].Slot, roots[4]) if err != nil { t.Fatal(err) } diff --git a/beacon-chain/blockchain/fork_choice_test.go b/beacon-chain/blockchain/fork_choice_test.go index e366e9ef8d..e895c57064 100644 --- a/beacon-chain/blockchain/fork_choice_test.go +++ b/beacon-chain/blockchain/fork_choice_test.go @@ -117,9 +117,16 @@ func TestApplyForkChoice_SetsCanonicalHead(t *testing.T) { BlockHash32: []byte("b"), }, } + blockRoot, err := hashutil.HashBeaconBlock(block) + if err != nil { + t.Fatal(err) + } if err := chainService.beaconDB.SaveBlock(block); err != nil { t.Fatal(err) } + if err := chainService.beaconDB.SaveHistoricalState(context.Background(), beaconState, blockRoot); err != nil { + t.Fatal(err) + } if err := chainService.ApplyForkChoiceRule(context.Background(), block, tt.state); err != nil { t.Errorf("Expected head to update, received %v", err) } @@ -333,7 +340,7 @@ func TestBlockChildren_2InARow(t *testing.T) { // When we input block B1, we should get B2 back. wanted := []*pb.BeaconBlock{block2} if !reflect.DeepEqual(wanted, childrenBlock) { - t.Errorf("Wrong children block received") + t.Errorf("Wrong children block received, want %v, received %v", wanted, childrenBlock) } } @@ -519,6 +526,7 @@ func TestLMDGhost_TrivialHeadUpdate(t *testing.T) { if err = chainService.beaconDB.SaveBlock(block2); err != nil { t.Fatalf("Could not save block: %v", err) } + beaconState.LatestBlock = block2 if err = chainService.beaconDB.UpdateChainHead(ctx, block2, beaconState); err != nil { t.Fatalf("Could update chain head: %v", err) } @@ -537,7 +545,7 @@ func TestLMDGhost_TrivialHeadUpdate(t *testing.T) { t.Fatalf("Could not run LMD GHOST: %v", err) } if !reflect.DeepEqual(block2, head) { - t.Errorf("Expected head to equal %v, received %v", block1, head) + t.Errorf("Expected head to equal %v, received %v", block2, head) } } @@ -1491,6 +1499,7 @@ func TestUpdateFFGCheckPts_NewJustifiedSkipSlot(t *testing.T) { if err := chainSvc.beaconDB.SaveBlock(gBlock); err != nil { t.Fatal(err) } + gState.LatestBlock = gBlock if err := chainSvc.beaconDB.UpdateChainHead(ctx, gBlock, gState); err != nil { t.Fatal(err) } @@ -1531,7 +1540,9 @@ func TestUpdateFFGCheckPts_NewJustifiedSkipSlot(t *testing.T) { if err := chainSvc.beaconDB.SaveBlock(block); err != nil { t.Fatal(err) } - if err := chainSvc.beaconDB.SaveState(ctx, &pb.BeaconState{Slot: genesisSlot + lastAvailableSlot}); err != nil { + computedState := &pb.BeaconState{Slot: genesisSlot + lastAvailableSlot} + computedState.LatestBlock = block + if err := chainSvc.beaconDB.SaveState(ctx, computedState); err != nil { t.Fatal(err) } if err := chainSvc.beaconDB.UpdateChainHead(ctx, block, gState); err != nil { @@ -1553,7 +1564,7 @@ func TestUpdateFFGCheckPts_NewJustifiedSkipSlot(t *testing.T) { } if newJustifiedState.Slot-genesisSlot != lastAvailableSlot { t.Errorf("Wanted justification state slot: %d, got: %d", - offset, newJustifiedState.Slot-genesisSlot) + lastAvailableSlot, newJustifiedState.Slot-genesisSlot) } if newJustifiedBlock.Slot-genesisSlot != lastAvailableSlot { t.Errorf("Wanted justification block slot: %d, got: %d", diff --git a/beacon-chain/blockchain/stategenerator/BUILD.bazel b/beacon-chain/blockchain/stategenerator/BUILD.bazel index 749abf25d1..0b7cf1541c 100644 --- a/beacon-chain/blockchain/stategenerator/BUILD.bazel +++ b/beacon-chain/blockchain/stategenerator/BUILD.bazel @@ -24,7 +24,6 @@ go_test( deps = [ "//beacon-chain/chaintest/backend:go_default_library", "//beacon-chain/db:go_default_library", - "//proto/beacon/p2p/v1:go_default_library", "//shared/featureconfig:go_default_library", "//shared/params:go_default_library", "@com_github_gogo_protobuf//proto:go_default_library", diff --git a/beacon-chain/blockchain/stategenerator/state_generator.go b/beacon-chain/blockchain/stategenerator/state_generator.go index e5c86501d8..03a59f0395 100644 --- a/beacon-chain/blockchain/stategenerator/state_generator.go +++ b/beacon-chain/blockchain/stategenerator/state_generator.go @@ -25,7 +25,7 @@ var log = logrus.WithField("prefix", "stategenerator") func GenerateStateFromBlock(ctx context.Context, db *db.BeaconDB, slot uint64) (*pb.BeaconState, error) { ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.stategenerator.GenerateStateFromBlock") defer span.End() - fState, err := db.HistoricalStateFromSlot(ctx, slot) + fState, err := db.HistoricalStateFromSlot(ctx, slot, [32]byte{}) if err != nil { return nil, err } @@ -54,10 +54,11 @@ func GenerateStateFromBlock(ctx context.Context, db *db.BeaconDB, slot uint64) ( } // from input slot, retrieve its corresponding block and call that the most recent block. - mostRecentBlock, err := db.BlockBySlot(ctx, slot) + mostRecentBlocks, err := db.BlocksBySlot(ctx, slot) if err != nil { return nil, err } + mostRecentBlock := mostRecentBlocks[0] // if the most recent block is a skip block, we get its parent block. // ex: @@ -66,10 +67,11 @@ func GenerateStateFromBlock(ctx context.Context, db *db.BeaconDB, slot uint64) ( lastSlot := slot for mostRecentBlock == nil { lastSlot-- - mostRecentBlock, err = db.BlockBySlot(ctx, lastSlot) + blocks, err := db.BlocksBySlot(ctx, lastSlot) if err != nil { return nil, err } + mostRecentBlock = blocks[0] } // retrieve the block list to recompute state of the input slot. diff --git a/beacon-chain/blockchain/stategenerator/state_generator_test.go b/beacon-chain/blockchain/stategenerator/state_generator_test.go index d147de97e4..97363ee2ab 100644 --- a/beacon-chain/blockchain/stategenerator/state_generator_test.go +++ b/beacon-chain/blockchain/stategenerator/state_generator_test.go @@ -2,14 +2,12 @@ package stategenerator_test import ( "context" - "strings" "testing" "github.com/gogo/protobuf/proto" "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/stategenerator" "github.com/prysmaticlabs/prysm/beacon-chain/chaintest/backend" "github.com/prysmaticlabs/prysm/beacon-chain/db" - pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/params" ) @@ -162,28 +160,3 @@ func TestGenerateState_WithNilBlocksOK(t *testing.T) { t.Error("generated and saved states are unequal") } } - -func TestGenerateState_NilLatestFinalizedBlock(t *testing.T) { - b, err := backend.NewSimulatedBackend() - if err != nil { - t.Fatalf("Could not create a new simulated backend %v", err) - } - beaconDB := b.DB() - defer b.Shutdown() - defer db.TeardownDB(beaconDB) - beaconState := &pb.BeaconState{ - Slot: params.BeaconConfig().GenesisSlot + params.BeaconConfig().SlotsPerEpoch*4, - } - if err := beaconDB.SaveFinalizedState(beaconState); err != nil { - t.Fatalf("Unable to save finalized state") - } - if err := beaconDB.SaveHistoricalState(context.Background(), beaconState); err != nil { - t.Fatalf("Unable to save finalized state") - } - - slot := params.BeaconConfig().GenesisSlot + 1 + params.BeaconConfig().SlotsPerEpoch*4 - want := "latest head in state is nil" - if _, err := stategenerator.GenerateStateFromBlock(context.Background(), beaconDB, slot); !strings.Contains(err.Error(), want) { - t.Errorf("Expected %v, received %v", want, err) - } -} diff --git a/beacon-chain/db/block.go b/beacon-chain/db/block.go index a6152b757d..cb9efededa 100644 --- a/beacon-chain/db/block.go +++ b/beacon-chain/db/block.go @@ -1,6 +1,7 @@ package db import ( + "bytes" "context" "errors" "fmt" @@ -100,7 +101,7 @@ func (db *BeaconDB) SaveBlock(block *pb.BeaconBlock) error { if err != nil { return fmt.Errorf("failed to encode block: %v", err) } - slotBinary := encodeSlotNumber(block.Slot) + slotRootBinary := encodeSlotNumberRoot(block.Slot, root) if block.Slot > db.highestBlockSlot { db.highestBlockSlot = block.Slot @@ -108,8 +109,7 @@ func (db *BeaconDB) SaveBlock(block *pb.BeaconBlock) error { return db.update(func(tx *bolt.Tx) error { bucket := tx.Bucket(blockBucket) - mainChain := tx.Bucket(mainChainBucket) - if err := mainChain.Put(slotBinary, root[:]); err != nil { + if err := bucket.Put(slotRootBinary, enc); err != nil { return fmt.Errorf("failed to include the block in the main chain bucket: %v", err) } return bucket.Put(root[:], enc) @@ -122,12 +122,11 @@ func (db *BeaconDB) DeleteBlock(block *pb.BeaconBlock) error { if err != nil { return fmt.Errorf("failed to tree hash block: %v", err) } - slotBinary := encodeSlotNumber(block.Slot) + slotRootBinary := encodeSlotNumberRoot(block.Slot, root) return db.update(func(tx *bolt.Tx) error { bucket := tx.Bucket(blockBucket) - mainChain := tx.Bucket(mainChainBucket) - if err := mainChain.Delete(slotBinary); err != nil { + if err := bucket.Delete(slotRootBinary); err != nil { return fmt.Errorf("failed to include the block in the main chain bucket: %v", err) } return bucket.Delete(root[:]) @@ -197,7 +196,6 @@ func (db *BeaconDB) ChainHead() (*pb.BeaconBlock, error) { var block *pb.BeaconBlock err := db.view(func(tx *bolt.Tx) error { chainInfo := tx.Bucket(chainInfoBucket) - mainChain := tx.Bucket(mainChainBucket) blockBkt := tx.Bucket(blockBucket) height := chainInfo.Get(mainChainHeightKey) @@ -205,7 +203,7 @@ func (db *BeaconDB) ChainHead() (*pb.BeaconBlock, error) { return errors.New("unable to determine chain height") } - blockRoot := mainChain.Get(height) + blockRoot := chainInfo.Get(canonicalHeadKey) if blockRoot == nil { return fmt.Errorf("root at the current height not found: %d", height) } @@ -243,20 +241,29 @@ func (db *BeaconDB) UpdateChainHead(ctx context.Context, block *pb.BeaconBlock, return fmt.Errorf("failed to save beacon state as canonical: %v", err) } + blockEnc, err := proto.Marshal(block) + if err != nil { + return err + } + return db.update(func(tx *bolt.Tx) error { blockBucket := tx.Bucket(blockBucket) chainInfo := tx.Bucket(chainInfoBucket) - mainChain := tx.Bucket(mainChainBucket) + mainChainBucket := tx.Bucket(mainChainBucket) if blockBucket.Get(blockRoot[:]) == nil { return fmt.Errorf("expected block %#x to have already been saved before updating head: %v", blockRoot, err) } - if err := mainChain.Put(slotBinary, blockRoot[:]); err != nil { - return fmt.Errorf("failed to include the block in the main chain bucket: %v", err) + if err := chainInfo.Put(mainChainHeightKey, slotBinary); err != nil { + return err } - if err := chainInfo.Put(mainChainHeightKey, slotBinary); err != nil { + if err := mainChainBucket.Put(slotBinary, blockEnc); err != nil { + return err + } + + if err := chainInfo.Put(canonicalHeadKey, blockRoot[:]); err != nil { return fmt.Errorf("failed to record the block as the head of the main chain: %v", err) } @@ -264,10 +271,9 @@ func (db *BeaconDB) UpdateChainHead(ctx context.Context, block *pb.BeaconBlock, }) } -// BlockBySlot accepts a slot number and returns the corresponding block in the main chain. -// Returns nil if a block was not recorded for the given slot. -func (db *BeaconDB) BlockBySlot(ctx context.Context, slot uint64) (*pb.BeaconBlock, error) { - _, span := trace.StartSpan(ctx, "BeaconDB.BlockBySlot") +// CanonicalBlockBySlot accepts a slot number and returns the corresponding canonical block. +func (db *BeaconDB) CanonicalBlockBySlot(ctx context.Context, slot uint64) (*pb.BeaconBlock, error) { + _, span := trace.StartSpan(ctx, "BeaconDB.CanonicalBlockBySlot") defer span.End() span.AddAttributes(trace.Int64Attribute("slot", int64(slot-params.BeaconConfig().GenesisSlot))) @@ -275,27 +281,47 @@ func (db *BeaconDB) BlockBySlot(ctx context.Context, slot uint64) (*pb.BeaconBlo slotEnc := encodeSlotNumber(slot) err := db.view(func(tx *bolt.Tx) error { - mainChain := tx.Bucket(mainChainBucket) - blockBkt := tx.Bucket(blockBucket) - - blockRoot := mainChain.Get(slotEnc) - if blockRoot == nil { - return nil - } - - enc := blockBkt.Get(blockRoot) - if enc == nil { - return nil - } - + bkt := tx.Bucket(mainChainBucket) + blockEnc := bkt.Get(slotEnc) var err error - block, err = createBlock(enc) + if blockEnc != nil { + block, err = createBlock(blockEnc) + } return err }) return block, err } +// BlocksBySlot accepts a slot number and returns the corresponding blocks in the db. +// Returns empty list if no blocks were recorded for the given slot. +func (db *BeaconDB) BlocksBySlot(ctx context.Context, slot uint64) ([]*pb.BeaconBlock, error) { + _, span := trace.StartSpan(ctx, "BeaconDB.BlocksBySlot") + defer span.End() + span.AddAttributes(trace.Int64Attribute("slot", int64(slot-params.BeaconConfig().GenesisSlot))) + + blocks := []*pb.BeaconBlock{} + slotEnc := encodeSlotNumber(slot) + + err := db.view(func(tx *bolt.Tx) error { + c := tx.Bucket(blockBucket).Cursor() + + var err error + prefix := slotEnc + for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() { + block, err := createBlock(v) + if err != nil { + return err + } + blocks = append(blocks, block) + } + + return err + }) + + return blocks, err +} + // HighestBlockSlot returns the in-memory value for the highest block we've // seen in the database. func (db *BeaconDB) HighestBlockSlot() uint64 { diff --git a/beacon-chain/db/block_test.go b/beacon-chain/db/block_test.go index 285193da5a..9a35528c60 100644 --- a/beacon-chain/db/block_test.go +++ b/beacon-chain/db/block_test.go @@ -113,13 +113,13 @@ func TestDeleteBlock_OK(t *testing.T) { } } -func TestBlockBySlotEmptyChain_OK(t *testing.T) { +func TestBlocksBySlotEmptyChain_OK(t *testing.T) { db := setupDB(t) defer teardownDB(t, db) ctx := context.Background() - block, _ := db.BlockBySlot(ctx, 0) - if block != nil { + blocks, _ := db.BlocksBySlot(ctx, 0) + if len(blocks) > 0 { t.Error("BlockBySlot should return nil for an empty chain") } } @@ -158,7 +158,7 @@ func TestUpdateChainHead_OK(t *testing.T) { t.Fatalf("failed to initialize state: %v", err) } - block, err := db.BlockBySlot(ctx, 0) + block, err := db.ChainHead() if err != nil { t.Fatalf("failed to get genesis block: %v", err) } @@ -173,7 +173,7 @@ func TestUpdateChainHead_OK(t *testing.T) { } block2 := &pb.BeaconBlock{ - Slot: 1, + Slot: params.BeaconConfig().GenesisSlot + 1, ParentRootHash32: bHash[:], } b2Hash, err := hashutil.HashBeaconBlock(block2) @@ -187,7 +187,7 @@ func TestUpdateChainHead_OK(t *testing.T) { t.Fatalf("failed to record the new head of the main chain: %v", err) } - b2Prime, err := db.BlockBySlot(ctx, 1) + b2Prime, err := db.CanonicalBlockBySlot(ctx, params.BeaconConfig().GenesisSlot+1) if err != nil { t.Fatalf("failed to retrieve slot 1: %v", err) } diff --git a/beacon-chain/db/schema.go b/beacon-chain/db/schema.go index 33d0790934..14e7228bd2 100644 --- a/beacon-chain/db/schema.go +++ b/beacon-chain/db/schema.go @@ -25,6 +25,7 @@ var ( validatorBucket = []byte("validator") mainChainHeightKey = []byte("chain-height") + canonicalHeadKey = []byte("canonical-head") stateLookupKey = []byte("state") finalizedStateLookupKey = []byte("finalized-state") justifiedStateLookupKey = []byte("justified-state") @@ -35,6 +36,10 @@ var ( cleanupHistoryBucket = []byte("cleanup-history-bucket") ) +func encodeSlotNumberRoot(number uint64, root [32]byte) []byte { + return append(bytesutil.Bytes8(number), root[:]...) +} + // encodeSlotNumber encodes a slot number as little-endian uint32. func encodeSlotNumber(number uint64) []byte { return bytesutil.Bytes8(number) diff --git a/beacon-chain/db/state.go b/beacon-chain/db/state.go index 4abe486a63..bdc56b36b0 100644 --- a/beacon-chain/db/state.go +++ b/beacon-chain/db/state.go @@ -1,11 +1,14 @@ package db import ( + "bytes" "context" "encoding/binary" "errors" "fmt" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/boltdb/bolt" "github.com/gogo/protobuf/proto" "github.com/prometheus/client_golang/prometheus" @@ -44,7 +47,7 @@ func (db *BeaconDB) InitializeState(ctx context.Context, genesisTime uint64, dep blockRoot, _ := hashutil.HashBeaconBlock(genesisBlock) // #nosec G104 blockEnc, _ := proto.Marshal(genesisBlock) - zeroBinary := encodeSlotNumber(0) + zeroBinary := encodeSlotNumberRoot(params.BeaconConfig().GenesisSlot, blockRoot) db.serializedState = stateEnc db.stateHash = stateHash @@ -63,10 +66,14 @@ func (db *BeaconDB) InitializeState(ctx context.Context, genesisTime uint64, dep return fmt.Errorf("failed to record block height: %v", err) } - if err := mainChain.Put(zeroBinary, blockRoot[:]); err != nil { + if err := mainChain.Put(zeroBinary, blockEnc); err != nil { return fmt.Errorf("failed to record block hash: %v", err) } + if err := chainInfo.Put(canonicalHeadKey, blockRoot[:]); err != nil { + return fmt.Errorf("failed to record block as canonical: %v", err) + } + if err := blockBkt.Put(blockRoot[:], blockEnc); err != nil { return err } @@ -168,8 +175,15 @@ func (db *BeaconDB) SaveState(ctx context.Context, beaconState *pb.BeaconState) db.serializedState = enc db.stateHash = stateHash - if err := db.SaveHistoricalState(ctx, beaconState); err != nil { - return err + if beaconState.LatestBlock != nil { + blockRoot, err := hashutil.HashBeaconBlock(beaconState.LatestBlock) + if err != nil { + return err + } + + if err := db.SaveHistoricalState(ctx, beaconState, blockRoot); err != nil { + return err + } } return db.update(func(tx *bolt.Tx) error { @@ -211,11 +225,11 @@ func (db *BeaconDB) SaveFinalizedState(beaconState *pb.BeaconState) error { } // SaveHistoricalState saves the last finalized state in the db. -func (db *BeaconDB) SaveHistoricalState(ctx context.Context, beaconState *pb.BeaconState) error { +func (db *BeaconDB) SaveHistoricalState(ctx context.Context, beaconState *pb.BeaconState, blockRoot [32]byte) error { ctx, span := trace.StartSpan(ctx, "beacon-chain.db.SaveHistoricalState") defer span.End() - slotBinary := encodeSlotNumber(beaconState.Slot) + slotRootBinary := encodeSlotNumberRoot(beaconState.Slot, blockRoot) stateHash, err := hashutil.HashProto(beaconState) if err != nil { return err @@ -224,7 +238,7 @@ func (db *BeaconDB) SaveHistoricalState(ctx context.Context, beaconState *pb.Bea return db.update(func(tx *bolt.Tx) error { histState := tx.Bucket(histStateBucket) chainInfo := tx.Bucket(chainInfoBucket) - if err := histState.Put(slotBinary, stateHash[:]); err != nil { + if err := histState.Put(slotRootBinary, stateHash[:]); err != nil { return err } beaconStateEnc, err := proto.Marshal(beaconState) @@ -271,11 +285,10 @@ func (db *BeaconDB) FinalizedState() (*pb.BeaconState, error) { // HistoricalStateFromSlot retrieves the state that is closest to the input slot, // while being smaller than or equal to the input slot. -func (db *BeaconDB) HistoricalStateFromSlot(ctx context.Context, slot uint64) (*pb.BeaconState, error) { +func (db *BeaconDB) HistoricalStateFromSlot(ctx context.Context, slot uint64, blockRoot [32]byte) (*pb.BeaconState, error) { if ctx.Err() != nil { return nil, ctx.Err() } - _, span := trace.StartSpan(ctx, "BeaconDB.HistoricalStateFromSlot") defer span.End() span.AddAttributes(trace.Int64Attribute("slotSinceGenesis", int64(slot))) @@ -291,8 +304,11 @@ func (db *BeaconDB) HistoricalStateFromSlot(ctx context.Context, slot uint64) (* hsCursor := histState.Cursor() for k, v := hsCursor.First(); k != nil; k, v = hsCursor.Next() { - slotNumber := decodeToSlotNumber(k) - if slotNumber == slot { + slotBinary := k[:8] + blockRootBinary := k[8:] + slotNumber := decodeToSlotNumber(slotBinary) + + if slotNumber == slot && bytes.Equal(blockRootBinary, blockRoot[:]) { stateExists = true highestStateSlot = slotNumber histStateKey = v @@ -303,7 +319,8 @@ func (db *BeaconDB) HistoricalStateFromSlot(ctx context.Context, slot uint64) (* // If no historical state exists, retrieve and decode the finalized state. if !stateExists { for k, v := hsCursor.First(); k != nil; k, v = hsCursor.Next() { - slotNumber := decodeToSlotNumber(k) + slotBinary := k[:8] + slotNumber := decodeToSlotNumber(slotBinary) // find the state with slot closest to the requested slot if slotNumber > highestStateSlot && slotNumber <= slot { stateExists = true @@ -462,7 +479,8 @@ func (db *BeaconDB) deleteHistoricalStates(slot uint64) error { hsCursor := histState.Cursor() for k, v := hsCursor.First(); k != nil; k, v = hsCursor.Next() { - keySlotNumber := decodeToSlotNumber(k) + slotBinary := k[:8] + keySlotNumber := decodeToSlotNumber(slotBinary) if keySlotNumber < slot { if err := histState.Delete(k); err != nil { return err diff --git a/beacon-chain/db/state_test.go b/beacon-chain/db/state_test.go index 4a78b68e5e..d7c5461d6b 100644 --- a/beacon-chain/db/state_test.go +++ b/beacon-chain/db/state_test.go @@ -266,11 +266,11 @@ func TestHistoricalState_CanSaveRetrieve(t *testing.T) { if err := db.SaveFinalizedState(tt.state); err != nil { t.Fatalf("could not save finalized state: %v", err) } - if err := db.SaveHistoricalState(context.Background(), tt.state); err != nil { + if err := db.SaveHistoricalState(context.Background(), tt.state, [32]byte{}); err != nil { t.Fatalf("could not save historical state: %v", err) } - retState, err := db.HistoricalStateFromSlot(ctx, tt.state.Slot) + retState, err := db.HistoricalStateFromSlot(ctx, tt.state.Slot, [32]byte{}) if err != nil { t.Fatalf("Unable to retrieve state %v", err) } @@ -346,10 +346,11 @@ func TestHistoricalState_Pruning(t *testing.T) { } for _, tt := range tests { - if err := db.SaveHistoricalState(context.Background(), tt.histState1); err != nil { + root := [32]byte{} + if err := db.SaveHistoricalState(context.Background(), tt.histState1, root); err != nil { t.Fatalf("could not save historical state: %v", err) } - if err := db.SaveHistoricalState(context.Background(), tt.histState2); err != nil { + if err := db.SaveHistoricalState(context.Background(), tt.histState2, root); err != nil { t.Fatalf("could not save historical state: %v", err) } @@ -359,11 +360,11 @@ func TestHistoricalState_Pruning(t *testing.T) { } // Save a dummy genesis state so that db doesnt return an error. - if err := db.SaveHistoricalState(context.Background(), &pb.BeaconState{Slot: slotGen(0), FinalizedEpoch: 1}); err != nil { + if err := db.SaveHistoricalState(context.Background(), &pb.BeaconState{Slot: slotGen(0), FinalizedEpoch: 1}, root); err != nil { t.Fatalf("could not save historical state: %v", err) } - retState, err := db.HistoricalStateFromSlot(ctx, tt.histState1.Slot) + retState, err := db.HistoricalStateFromSlot(ctx, tt.histState1.Slot, root) if err != nil { t.Fatalf("Unable to retrieve state %v", err) } @@ -372,7 +373,7 @@ func TestHistoricalState_Pruning(t *testing.T) { t.Errorf("Saved and retrieved states are equal when they supposed to be different %d", tt.histState1.Slot-params.BeaconConfig().GenesisSlot) } - retState, err = db.HistoricalStateFromSlot(ctx, tt.histState2.Slot) + retState, err = db.HistoricalStateFromSlot(ctx, tt.histState2.Slot, root) if err != nil { t.Fatalf("Unable to retrieve state %v", err) } diff --git a/beacon-chain/operations/service_test.go b/beacon-chain/operations/service_test.go index 6c355ac233..c8608b19a3 100644 --- a/beacon-chain/operations/service_test.go +++ b/beacon-chain/operations/service_test.go @@ -136,7 +136,8 @@ func TestRetrieveAttestations_OK(t *testing.T) { } } if err := beaconDB.SaveState(context.Background(), &pb.BeaconState{ - Slot: params.BeaconConfig().GenesisSlot + 64, + Slot: params.BeaconConfig().GenesisSlot + 64, + LatestBlock: &pb.BeaconBlock{Slot: params.BeaconConfig().GenesisSlot}, LatestCrosslinks: []*pb.Crosslink{{ Epoch: params.BeaconConfig().GenesisEpoch, CrosslinkDataRootHash32: params.BeaconConfig().ZeroHash[:]}}}); err != nil { diff --git a/beacon-chain/sync/initial-sync/service_test.go b/beacon-chain/sync/initial-sync/service_test.go index 087afa6e56..b0a9fac840 100644 --- a/beacon-chain/sync/initial-sync/service_test.go +++ b/beacon-chain/sync/initial-sync/service_test.go @@ -177,11 +177,11 @@ func TestProcessingBlocks_SkippedSlots(t *testing.T) { batchSize := 20 expectedSlot := params.BeaconConfig().GenesisSlot + uint64(batchSize) ss.highestObservedSlot = expectedSlot - blk, err := ss.db.BlockBySlot(ctx, params.BeaconConfig().GenesisSlot) + blks, err := ss.db.BlocksBySlot(ctx, params.BeaconConfig().GenesisSlot) if err != nil { t.Fatalf("Unable to get genesis block %v", err) } - h, err := hashutil.HashBeaconBlock(blk) + h, err := hashutil.HashBeaconBlock(blks[0]) if err != nil { t.Fatalf("Unable to hash block %v", err) } diff --git a/beacon-chain/sync/initial-sync/sync_state.go b/beacon-chain/sync/initial-sync/sync_state.go index 72080fb573..2a7802f6f4 100644 --- a/beacon-chain/sync/initial-sync/sync_state.go +++ b/beacon-chain/sync/initial-sync/sync_state.go @@ -25,11 +25,6 @@ func (s *InitialSync) processState(msg p2p.Message) { return } - if err := s.db.SaveHistoricalState(ctx, finalizedState); err != nil { - log.Errorf("Could not save new historical state: %v", err) - return - } - if err := s.db.SaveFinalizedBlock(finalizedState.LatestBlock); err != nil { log.Errorf("Could not save finalized block %v", err) return @@ -45,6 +40,12 @@ func (s *InitialSync) processState(msg p2p.Message) { log.Errorf("Could not hash finalized block %v", err) return } + + if err := s.db.SaveHistoricalState(ctx, finalizedState, root); err != nil { + log.Errorf("Could not save new historical state: %v", err) + return + } + if err := s.db.SaveAttestationTarget(ctx, &pb.AttestationTarget{ Slot: finalizedState.LatestBlock.Slot, BlockRoot: root[:], diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index 7a96cf94a6..76db10793a 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -12,10 +12,6 @@ var ( Name: "regsync_batched_block_req", Help: "The number of received batch block requests", }) - blockReqSlot = promauto.NewCounter(prometheus.CounterOpts{ - Name: "regsync_block_req_by_slot", - Help: "The number of received block requests by slot", - }) blockReqHash = promauto.NewCounter(prometheus.CounterOpts{ Name: "regsync_block_req_by_hash", Help: "The number of received block requests by hash", diff --git a/beacon-chain/sync/receive_block.go b/beacon-chain/sync/receive_block.go index de045e1b02..1a66422ae4 100644 --- a/beacon-chain/sync/receive_block.go +++ b/beacon-chain/sync/receive_block.go @@ -193,7 +193,7 @@ func (rs *RegularSync) validateAndProcessBlock( "slot": block.Slot, "root": fmt.Sprintf("%#x", bytesutil.Trunc(blockRoot[:]))}, ).Warn("Received Block from a forked chain") - if err := rs.db.SaveHistoricalState(ctx, beaconState); err != nil { + if err := rs.db.SaveHistoricalState(ctx, beaconState, blockRoot); err != nil { log.Errorf("Could not save historical state %v", err) return nil, nil, false, err } diff --git a/beacon-chain/sync/regular_sync.go b/beacon-chain/sync/regular_sync.go index dd0d921b57..272a6531a6 100644 --- a/beacon-chain/sync/regular_sync.go +++ b/beacon-chain/sync/regular_sync.go @@ -74,7 +74,6 @@ type RegularSync struct { blockAnnouncementFeed *event.Feed announceBlockBuf chan p2p.Message blockBuf chan p2p.Message - blockRequestBySlot chan p2p.Message blockRequestByHash chan p2p.Message batchedRequestBuf chan p2p.Message stateRequestBuf chan p2p.Message @@ -96,7 +95,6 @@ type RegularSync struct { type RegularSyncConfig struct { BlockAnnounceBufferSize int BlockBufferSize int - BlockReqSlotBufferSize int BlockReqHashBufferSize int BatchedBufferSize int StateReqBufferSize int @@ -118,7 +116,6 @@ func DefaultRegularSyncConfig() *RegularSyncConfig { return &RegularSyncConfig{ BlockAnnounceBufferSize: params.BeaconConfig().DefaultBufferSize, BlockBufferSize: params.BeaconConfig().DefaultBufferSize, - BlockReqSlotBufferSize: params.BeaconConfig().DefaultBufferSize, BlockReqHashBufferSize: params.BeaconConfig().DefaultBufferSize, BatchedBufferSize: params.BeaconConfig().DefaultBufferSize, StateReqBufferSize: params.BeaconConfig().DefaultBufferSize, @@ -145,7 +142,6 @@ func NewRegularSyncService(ctx context.Context, cfg *RegularSyncConfig) *Regular blockAnnouncementFeed: new(event.Feed), announceBlockBuf: make(chan p2p.Message, cfg.BlockAnnounceBufferSize), blockBuf: make(chan p2p.Message, cfg.BlockBufferSize), - blockRequestBySlot: make(chan p2p.Message, cfg.BlockReqSlotBufferSize), blockRequestByHash: make(chan p2p.Message, cfg.BlockReqHashBufferSize), batchedRequestBuf: make(chan p2p.Message, cfg.BatchedBufferSize), stateRequestBuf: make(chan p2p.Message, cfg.StateReqBufferSize), @@ -187,7 +183,6 @@ func (rs *RegularSync) BlockAnnouncementFeed() *event.Feed { func (rs *RegularSync) run() { announceBlockSub := rs.p2p.Subscribe(&pb.BeaconBlockAnnounce{}, rs.announceBlockBuf) blockSub := rs.p2p.Subscribe(&pb.BeaconBlockResponse{}, rs.blockBuf) - blockRequestSub := rs.p2p.Subscribe(&pb.BeaconBlockRequestBySlotNumber{}, rs.blockRequestBySlot) blockRequestHashSub := rs.p2p.Subscribe(&pb.BeaconBlockRequest{}, rs.blockRequestByHash) batchedBlockRequestSub := rs.p2p.Subscribe(&pb.BatchedBeaconBlockRequest{}, rs.batchedRequestBuf) stateRequestSub := rs.p2p.Subscribe(&pb.BeaconStateRequest{}, rs.stateRequestBuf) @@ -200,7 +195,6 @@ func (rs *RegularSync) run() { defer announceBlockSub.Unsubscribe() defer blockSub.Unsubscribe() - defer blockRequestSub.Unsubscribe() defer blockRequestHashSub.Unsubscribe() defer batchedBlockRequestSub.Unsubscribe() defer stateRequestSub.Unsubscribe() @@ -228,8 +222,6 @@ func (rs *RegularSync) run() { go safelyHandleMessage(rs.receiveExitRequest, msg) case msg := <-rs.blockBuf: go safelyHandleMessage(rs.receiveBlock, msg) - case msg := <-rs.blockRequestBySlot: - go safelyHandleMessage(rs.handleBlockRequestBySlot, msg) case msg := <-rs.blockRequestByHash: go safelyHandleMessage(rs.handleBlockRequestByHash, msg) case msg := <-rs.batchedRequestBuf: @@ -289,43 +281,6 @@ func safelyHandleMessage(fn func(p2p.Message) error, msg p2p.Message) { } } -// handleBlockRequestBySlot processes a block request from the p2p layer. -// if found, the block is sent to the requesting peer. -func (rs *RegularSync) handleBlockRequestBySlot(msg p2p.Message) error { - ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleBlockRequestBySlot") - defer span.End() - blockReqSlot.Inc() - - request, ok := msg.Data.(*pb.BeaconBlockRequestBySlotNumber) - if !ok { - log.Error("Received malformed beacon block request p2p message") - return errors.New("incoming message is not type *pb.BeaconBlockRequestBySlotNumber") - } - - block, err := rs.db.BlockBySlot(ctx, request.SlotNumber) - if err != nil || block == nil { - if block == nil { - log.WithField("slot", request.SlotNumber-params.BeaconConfig().GenesisSlot).Debug( - "block does not exist") - return errors.New("block does not exist") - } - log.Errorf("Error retrieving block from db: %v", err) - return err - } - - log.WithField("slot", - fmt.Sprintf("%d", request.SlotNumber-params.BeaconConfig().GenesisSlot)).Debug("Sending requested block to peer") - - defer sentBlocks.Inc() - if err := rs.p2p.Send(ctx, &pb.BeaconBlockResponse{ - Block: block, - }, msg.Peer); err != nil { - log.Error(err) - return err - } - return nil -} - func (rs *RegularSync) handleStateRequest(msg p2p.Message) error { ctx, span := trace.StartSpan(msg.Ctx, "beacon-chain.sync.handleStateRequest") defer span.End() @@ -545,7 +500,6 @@ func (rs *RegularSync) handleBatchedBlockRequest(msg p2p.Message) error { if err != nil { return fmt.Errorf("could not build canonical block list %v", err) } - log.WithField("peer", msg.Peer).Debug("Sending response for batch blocks") defer sentBatchedBlocks.Inc() diff --git a/beacon-chain/sync/regular_sync_test.go b/beacon-chain/sync/regular_sync_test.go index 12eef07b9b..5330e268c2 100644 --- a/beacon-chain/sync/regular_sync_test.go +++ b/beacon-chain/sync/regular_sync_test.go @@ -346,56 +346,6 @@ func TestProcessBlock_MultipleBlocksProcessedOK(t *testing.T) { hook.Reset() } -func TestBlockRequest_InvalidMsg(t *testing.T) { - hook := logTest.NewGlobal() - - db := internal.SetupDB(t) - defer internal.TeardownDB(t, db) - ss := setupService(db) - - malformedRequest := &pb.BeaconBlockAnnounce{ - Hash: []byte{'t', 'e', 's', 't'}, - } - - invalidmsg := p2p.Message{ - Ctx: context.Background(), - Data: malformedRequest, - Peer: "", - } - - if err := ss.handleBlockRequestBySlot(invalidmsg); err == nil { - t.Error("Expected error, received nil") - } - testutil.AssertLogsContain(t, hook, "Received malformed beacon block request p2p message") -} - -func TestBlockRequest_OK(t *testing.T) { - hook := logTest.NewGlobal() - - db := internal.SetupDB(t) - defer internal.TeardownDB(t, db) - ss := setupService(db) - - request1 := &pb.BeaconBlockRequestBySlotNumber{ - SlotNumber: 20, - } - - msg1 := p2p.Message{ - Ctx: context.Background(), - Data: request1, - Peer: "", - } - if err := db.SaveBlock(&pb.BeaconBlock{Slot: 20}); err != nil { - t.Fatal(err) - } - - if err := ss.handleBlockRequestBySlot(msg1); err != nil { - t.Error(err) - } - - testutil.AssertLogsContain(t, hook, "Sending requested block to peer") -} - func TestReceiveAttestation_OK(t *testing.T) { hook := logTest.NewGlobal() ms := &mockChainService{}