From 3c61cc7d8a3c172a55eec729832c5e64d0dffaba Mon Sep 17 00:00:00 2001 From: kasey <489222+kasey@users.noreply.github.com> Date: Thu, 9 Dec 2021 16:23:00 -0600 Subject: [PATCH] allow checkpoint or genesis origin; refactoring (#9976) * allow checkpoint or genesis origin; refactoring some quick readability improvements and simplifying the logic enforcing the startup ordering of the attestation processing routine * address PR feedback * gofmt * Update beacon-chain/blockchain/receive_attestation.go Co-authored-by: Preston Van Loon * Apply suggestions from code review use log.WithError for aggregation friendliness Co-authored-by: Preston Van Loon Co-authored-by: kasey Co-authored-by: Preston Van Loon Co-authored-by: Raul Jordan Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- beacon-chain/blockchain/chain_info_test.go | 12 +- beacon-chain/blockchain/head.go | 8 +- beacon-chain/blockchain/head_test.go | 8 +- .../blockchain/process_block_helpers.go | 2 +- beacon-chain/blockchain/process_block_test.go | 6 +- .../blockchain/receive_attestation.go | 72 +-- beacon-chain/blockchain/service.go | 474 ++++++++++-------- beacon-chain/blockchain/service_test.go | 36 +- beacon-chain/db/BUILD.bazel | 1 + beacon-chain/db/errors.go | 9 + beacon-chain/db/iface/interface.go | 6 + beacon-chain/db/kv/BUILD.bazel | 3 + beacon-chain/db/kv/blocks.go | 35 ++ beacon-chain/db/kv/error.go | 46 ++ beacon-chain/db/kv/error_test.go | 24 + beacon-chain/db/kv/schema.go | 2 + beacon-chain/db/kv/wss.go | 89 ++++ beacon-chain/state/v2/state_trie.go | 23 + 18 files changed, 575 insertions(+), 281 deletions(-) create mode 100644 beacon-chain/db/errors.go create mode 100644 beacon-chain/db/kv/error.go create mode 100644 beacon-chain/db/kv/error_test.go create mode 100644 beacon-chain/db/kv/wss.go diff --git a/beacon-chain/blockchain/chain_info_test.go b/beacon-chain/blockchain/chain_info_test.go index 64b1fe046a..9eea64e52a 100644 --- a/beacon-chain/blockchain/chain_info_test.go +++ b/beacon-chain/blockchain/chain_info_test.go @@ -56,8 +56,8 @@ func TestFinalizedCheckpt_GenesisRootOk(t *testing.T) { cp := ðpb.Checkpoint{Root: genesisRoot[:]} c := setupBeaconChain(t, beaconDB) c.finalizedCheckpt = cp - c.genesisRoot = genesisRoot - assert.DeepEqual(t, c.genesisRoot[:], c.FinalizedCheckpt().Root) + c.originBlockRoot = genesisRoot + assert.DeepEqual(t, c.originBlockRoot[:], c.FinalizedCheckpt().Root) } func TestCurrentJustifiedCheckpt_CanRetrieve(t *testing.T) { @@ -77,8 +77,8 @@ func TestJustifiedCheckpt_GenesisRootOk(t *testing.T) { genesisRoot := [32]byte{'B'} cp := ðpb.Checkpoint{Root: genesisRoot[:]} c.justifiedCheckpt = cp - c.genesisRoot = genesisRoot - assert.DeepEqual(t, c.genesisRoot[:], c.CurrentJustifiedCheckpt().Root) + c.originBlockRoot = genesisRoot + assert.DeepEqual(t, c.originBlockRoot[:], c.CurrentJustifiedCheckpt().Root) } func TestPreviousJustifiedCheckpt_CanRetrieve(t *testing.T) { @@ -98,8 +98,8 @@ func TestPrevJustifiedCheckpt_GenesisRootOk(t *testing.T) { cp := ðpb.Checkpoint{Root: genesisRoot[:]} c := setupBeaconChain(t, beaconDB) c.prevJustifiedCheckpt = cp - c.genesisRoot = genesisRoot - assert.DeepEqual(t, c.genesisRoot[:], c.PreviousJustifiedCheckpt().Root) + c.originBlockRoot = genesisRoot + assert.DeepEqual(t, c.originBlockRoot[:], c.PreviousJustifiedCheckpt().Root) } func TestHeadSlot_CanRetrieve(t *testing.T) { diff --git a/beacon-chain/blockchain/head.go b/beacon-chain/blockchain/head.go index 459ae20f8a..852e9335a6 100644 --- a/beacon-chain/blockchain/head.go +++ b/beacon-chain/blockchain/head.go @@ -46,11 +46,11 @@ func (s *Service) updateHead(ctx context.Context, balances []uint64) error { // Get head from the fork choice service. f := s.finalizedCheckpt j := s.justifiedCheckpt - // To get head before the first justified epoch, the fork choice will start with genesis root + // To get head before the first justified epoch, the fork choice will start with origin root // instead of zero hashes. headStartRoot := bytesutil.ToBytes32(j.Root) if headStartRoot == params.BeaconConfig().ZeroHash { - headStartRoot = s.genesisRoot + headStartRoot = s.originBlockRoot } // In order to process head, fork choice store requires justified info. @@ -277,8 +277,8 @@ func (s *Service) notifyNewHeadEvent( newHeadStateRoot, newHeadRoot []byte, ) error { - previousDutyDependentRoot := s.genesisRoot[:] - currentDutyDependentRoot := s.genesisRoot[:] + previousDutyDependentRoot := s.originBlockRoot[:] + currentDutyDependentRoot := s.originBlockRoot[:] var previousDutyEpoch types.Epoch currentDutyEpoch := slots.ToEpoch(newHeadSlot) diff --git a/beacon-chain/blockchain/head_test.go b/beacon-chain/blockchain/head_test.go index faf2ccb95c..dd5baa4e7f 100644 --- a/beacon-chain/blockchain/head_test.go +++ b/beacon-chain/blockchain/head_test.go @@ -158,7 +158,7 @@ func Test_notifyNewHeadEvent(t *testing.T) { cfg: &config{ StateNotifier: notifier, }, - genesisRoot: [32]byte{1}, + originBlockRoot: [32]byte{1}, } newHeadStateRoot := [32]byte{2} newHeadRoot := [32]byte{3} @@ -174,8 +174,8 @@ func Test_notifyNewHeadEvent(t *testing.T) { Block: newHeadRoot[:], State: newHeadStateRoot[:], EpochTransition: false, - PreviousDutyDependentRoot: srv.genesisRoot[:], - CurrentDutyDependentRoot: srv.genesisRoot[:], + PreviousDutyDependentRoot: srv.originBlockRoot[:], + CurrentDutyDependentRoot: srv.originBlockRoot[:], } require.DeepSSZEqual(t, wanted, eventHead) }) @@ -187,7 +187,7 @@ func Test_notifyNewHeadEvent(t *testing.T) { cfg: &config{ StateNotifier: notifier, }, - genesisRoot: genesisRoot, + originBlockRoot: genesisRoot, } epoch1Start, err := slots.EpochStart(1) require.NoError(t, err) diff --git a/beacon-chain/blockchain/process_block_helpers.go b/beacon-chain/blockchain/process_block_helpers.go index be66f7f1c3..8b60e96834 100644 --- a/beacon-chain/blockchain/process_block_helpers.go +++ b/beacon-chain/blockchain/process_block_helpers.go @@ -439,7 +439,7 @@ func (s *Service) deletePoolAtts(atts []*ethpb.Attestation) error { // fork choice justification routine. func (s *Service) ensureRootNotZeros(root [32]byte) [32]byte { if root == params.BeaconConfig().ZeroHash { - return s.genesisRoot + return s.originBlockRoot } return root } diff --git a/beacon-chain/blockchain/process_block_test.go b/beacon-chain/blockchain/process_block_test.go index 0d56be28d1..af8035860b 100644 --- a/beacon-chain/blockchain/process_block_test.go +++ b/beacon-chain/blockchain/process_block_test.go @@ -733,10 +733,10 @@ func TestEnsureRootNotZeroHashes(t *testing.T) { opts := testServiceOptsNoDB() service, err := NewService(ctx, opts...) require.NoError(t, err) - service.genesisRoot = [32]byte{'a'} + service.originBlockRoot = [32]byte{'a'} r := service.ensureRootNotZeros(params.BeaconConfig().ZeroHash) - assert.Equal(t, service.genesisRoot, r, "Did not get wanted justified root") + assert.Equal(t, service.originBlockRoot, r, "Did not get wanted justified root") root := [32]byte{'b'} r = service.ensureRootNotZeros(root) assert.Equal(t, root, r, "Did not get wanted justified root") @@ -917,7 +917,7 @@ func TestUpdateJustifiedInitSync(t *testing.T) { require.NoError(t, service.cfg.BeaconDB.SaveStateSummary(ctx, ðpb.StateSummary{Root: gRoot[:]})) beaconState, _ := util.DeterministicGenesisState(t, 32) require.NoError(t, service.cfg.BeaconDB.SaveState(ctx, beaconState, gRoot)) - service.genesisRoot = gRoot + service.originBlockRoot = gRoot currentCp := ðpb.Checkpoint{Epoch: 1} service.justifiedCheckpt = currentCp newCp := ðpb.Checkpoint{Epoch: 2, Root: gRoot[:]} diff --git a/beacon-chain/blockchain/receive_attestation.go b/beacon-chain/blockchain/receive_attestation.go index 1a2f03057f..e7f4dd0fd6 100644 --- a/beacon-chain/blockchain/receive_attestation.go +++ b/beacon-chain/blockchain/receive_attestation.go @@ -7,6 +7,7 @@ import ( "time" "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/async/event" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/state" @@ -103,45 +104,56 @@ func (s *Service) VerifyFinalizedConsistency(ctx context.Context, root []byte) e } // This routine processes fork choice attestations from the pool to account for validator votes and fork choice. -func (s *Service) processAttestationsRoutine(subscribedToStateEvents chan<- struct{}) { +func (s *Service) spawnProcessAttestationsRoutine(stateFeed *event.Feed) { // Wait for state to be initialized. stateChannel := make(chan *feed.Event, 1) - stateSub := s.cfg.StateNotifier.StateFeed().Subscribe(stateChannel) - subscribedToStateEvents <- struct{}{} - <-stateChannel - stateSub.Unsubscribe() - - if s.genesisTime.IsZero() { - log.Warn("ProcessAttestations routine waiting for genesis time") - for s.genesisTime.IsZero() { - time.Sleep(1 * time.Second) - } - log.Warn("Genesis time received, now available to process attestations") - } - - st := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot) - for { + stateSub := stateFeed.Subscribe(stateChannel) + go func() { select { case <-s.ctx.Done(): + stateSub.Unsubscribe() return - case <-st.C(): - // Continue when there's no fork choice attestation, there's nothing to process and update head. - // This covers the condition when the node is still initial syncing to the head of the chain. - if s.cfg.AttPool.ForkchoiceAttestationCount() == 0 { - continue - } - s.processAttestations(s.ctx) + case <-stateChannel: + stateSub.Unsubscribe() + break + } - balances, err := s.justifiedBalances.get(s.ctx, bytesutil.ToBytes32(s.justifiedCheckpt.Root)) - if err != nil { - log.Errorf("Unable to get justified balances for root %v w/ error %s", s.justifiedCheckpt.Root, err) - continue + if s.genesisTime.IsZero() { + log.Warn("ProcessAttestations routine waiting for genesis time") + for s.genesisTime.IsZero() { + if err := s.ctx.Err(); err != nil { + log.WithError(err).Error("Giving up waiting for genesis time") + return + } + time.Sleep(1 * time.Second) } - if err := s.updateHead(s.ctx, balances); err != nil { - log.Warnf("Resolving fork due to new attestation: %v", err) + log.Warn("Genesis time received, now available to process attestations") + } + + st := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot) + for { + select { + case <-s.ctx.Done(): + return + case <-st.C(): + // Continue when there's no fork choice attestation, there's nothing to process and update head. + // This covers the condition when the node is still initial syncing to the head of the chain. + if s.cfg.AttPool.ForkchoiceAttestationCount() == 0 { + continue + } + s.processAttestations(s.ctx) + + balances, err := s.justifiedBalances.get(s.ctx, bytesutil.ToBytes32(s.justifiedCheckpt.Root)) + if err != nil { + log.WithError(err).Errorf("Unable to get justified balances for root %v", s.justifiedCheckpt.Root) + continue + } + if err := s.updateHead(s.ctx, balances); err != nil { + log.WithError(err).Warn("Resolving fork due to new attestation") + } } } - } + }() } // This processes fork choice attestations from the pool to account for validator votes and fork choice. diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 61f7c586b0..208693ec30 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -33,6 +33,7 @@ import ( "github.com/prysmaticlabs/prysm/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block" + prysmTime "github.com/prysmaticlabs/prysm/time" "github.com/prysmaticlabs/prysm/time/slots" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -45,13 +46,14 @@ const headSyncMinEpochsAfterCheckpoint = 128 // Service represents a service that handles the internal // logic of managing the full PoS beacon chain. type Service struct { - cfg *config - ctx context.Context - cancel context.CancelFunc - genesisTime time.Time - head *head - headLock sync.RWMutex - genesisRoot [32]byte + cfg *config + ctx context.Context + cancel context.CancelFunc + genesisTime time.Time + head *head + headLock sync.RWMutex + // originBlockRoot is the genesis root, or weak subjectivity checkpoint root, depending on how the node is initialized + originBlockRoot [32]byte justifiedCheckpt *ethpb.Checkpoint prevJustifiedCheckpt *ethpb.Checkpoint bestJustifiedCheckpt *ethpb.Checkpoint @@ -120,179 +122,17 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) { // Start a blockchain service's main event loop. func (s *Service) Start() { - beaconState := s.cfg.FinalizedStateAtStartUp + saved := s.cfg.FinalizedStateAtStartUp - // Make sure that attestation processor is subscribed and ready for state initializing event. - attestationProcessorSubscribed := make(chan struct{}, 1) - - // If the chain has already been initialized, simply start the block processing routine. - if beaconState != nil && !beaconState.IsNil() { - log.Info("Blockchain data already exists in DB, initializing...") - s.genesisTime = time.Unix(int64(beaconState.GenesisTime()), 0) - s.cfg.AttService.SetGenesisTime(beaconState.GenesisTime()) - if err := s.initializeChainInfo(s.ctx); err != nil { - log.Fatalf("Could not set up chain info: %v", err) + if saved != nil && !saved.IsNil() { + if err := s.startFromSavedState(saved); err != nil { + log.Fatal(err) } - - // We start a counter to genesis, if needed. - gState, err := s.cfg.BeaconDB.GenesisState(s.ctx) - if err != nil { - log.Fatalf("Could not retrieve genesis state: %v", err) - } - gRoot, err := gState.HashTreeRoot(s.ctx) - if err != nil { - log.Fatalf("Could not hash tree root genesis state: %v", err) - } - go slots.CountdownToGenesis(s.ctx, s.genesisTime, uint64(gState.NumValidators()), gRoot) - - justifiedCheckpoint, err := s.cfg.BeaconDB.JustifiedCheckpoint(s.ctx) - if err != nil { - log.Fatalf("Could not get justified checkpoint: %v", err) - } - finalizedCheckpoint, err := s.cfg.BeaconDB.FinalizedCheckpoint(s.ctx) - if err != nil { - log.Fatalf("Could not get finalized checkpoint: %v", err) - } - - // Resume fork choice. - s.justifiedCheckpt = ethpb.CopyCheckpoint(justifiedCheckpoint) - s.prevJustifiedCheckpt = ethpb.CopyCheckpoint(justifiedCheckpoint) - s.bestJustifiedCheckpt = ethpb.CopyCheckpoint(justifiedCheckpoint) - s.finalizedCheckpt = ethpb.CopyCheckpoint(finalizedCheckpoint) - s.prevFinalizedCheckpt = ethpb.CopyCheckpoint(finalizedCheckpoint) - s.resumeForkChoice(justifiedCheckpoint, finalizedCheckpoint) - - ss, err := slots.EpochStart(s.finalizedCheckpt.Epoch) - if err != nil { - log.Fatalf("Could not get start slot of finalized epoch: %v", err) - } - h := s.headBlock().Block() - if h.Slot() > ss { - log.WithFields(logrus.Fields{ - "startSlot": ss, - "endSlot": h.Slot(), - }).Info("Loading blocks to fork choice store, this may take a while.") - if err := s.fillInForkChoiceMissingBlocks(s.ctx, h, s.finalizedCheckpt, s.justifiedCheckpt); err != nil { - log.Fatalf("Could not fill in fork choice store missing blocks: %v", err) - } - } - - // not attempting to save initial sync blocks here, because there shouldn't be until - // after the statefeed.Initialized event is fired (below) - if err := s.wsVerifier.VerifyWeakSubjectivity(s.ctx, s.finalizedCheckpt.Epoch); err != nil { - // Exit run time if the node failed to verify weak subjectivity checkpoint. - log.Fatalf("could not verify initial checkpoint provided for chain sync, with err=: %v", err) - } - - s.cfg.StateNotifier.StateFeed().Send(&feed.Event{ - Type: statefeed.Initialized, - Data: &statefeed.InitializedData{ - StartTime: s.genesisTime, - GenesisValidatorsRoot: beaconState.GenesisValidatorRoot(), - }, - }) } else { - log.Info("Waiting to reach the validator deposit threshold to start the beacon chain...") - if s.cfg.ChainStartFetcher == nil { - log.Fatal("Not configured web3Service for POW chain") - return // return need for TestStartUninitializedChainWithoutConfigPOWChain. + if err := s.startFromPOWChain(); err != nil { + log.Fatal(err) } - go func() { - stateChannel := make(chan *feed.Event, 1) - stateSub := s.cfg.StateNotifier.StateFeed().Subscribe(stateChannel) - defer stateSub.Unsubscribe() - <-attestationProcessorSubscribed - for { - select { - case event := <-stateChannel: - if event.Type == statefeed.ChainStarted { - data, ok := event.Data.(*statefeed.ChainStartedData) - if !ok { - log.Error("event data is not type *statefeed.ChainStartedData") - return - } - log.WithField("starttime", data.StartTime).Debug("Received chain start event") - s.processChainStartTime(s.ctx, data.StartTime) - return - } - case <-s.ctx.Done(): - log.Debug("Context closed, exiting goroutine") - return - case err := <-stateSub.Err(): - log.WithError(err).Error("Subscription to state notifier failed") - return - } - } - }() } - - go s.processAttestationsRoutine(attestationProcessorSubscribed) -} - -// processChainStartTime initializes a series of deposits from the ChainStart deposits in the eth1 -// deposit contract, initializes the beacon chain's state, and kicks off the beacon chain. -func (s *Service) processChainStartTime(ctx context.Context, genesisTime time.Time) { - preGenesisState := s.cfg.ChainStartFetcher.PreGenesisState() - initializedState, err := s.initializeBeaconChain(ctx, genesisTime, preGenesisState, s.cfg.ChainStartFetcher.ChainStartEth1Data()) - if err != nil { - log.Fatalf("Could not initialize beacon chain: %v", err) - } - // We start a counter to genesis, if needed. - gRoot, err := initializedState.HashTreeRoot(s.ctx) - if err != nil { - log.Fatalf("Could not hash tree root genesis state: %v", err) - } - go slots.CountdownToGenesis(ctx, genesisTime, uint64(initializedState.NumValidators()), gRoot) - - // We send out a state initialized event to the rest of the services - // running in the beacon node. - s.cfg.StateNotifier.StateFeed().Send(&feed.Event{ - Type: statefeed.Initialized, - Data: &statefeed.InitializedData{ - StartTime: genesisTime, - GenesisValidatorsRoot: initializedState.GenesisValidatorRoot(), - }, - }) -} - -// initializes the state and genesis block of the beacon chain to persistent storage -// based on a genesis timestamp value obtained from the ChainStart event emitted -// by the ETH1.0 Deposit Contract and the POWChain service of the node. -func (s *Service) initializeBeaconChain( - ctx context.Context, - genesisTime time.Time, - preGenesisState state.BeaconState, - eth1data *ethpb.Eth1Data) (state.BeaconState, error) { - ctx, span := trace.StartSpan(ctx, "beacon-chain.Service.initializeBeaconChain") - defer span.End() - s.genesisTime = genesisTime - unixTime := uint64(genesisTime.Unix()) - - genesisState, err := transition.OptimizedGenesisBeaconState(unixTime, preGenesisState, eth1data) - if err != nil { - return nil, errors.Wrap(err, "could not initialize genesis state") - } - - if err := s.saveGenesisData(ctx, genesisState); err != nil { - return nil, errors.Wrap(err, "could not save genesis data") - } - - log.Info("Initialized beacon chain genesis state") - - // Clear out all pre-genesis data now that the state is initialized. - s.cfg.ChainStartFetcher.ClearPreGenesisData() - - // Update committee shuffled indices for genesis epoch. - if err := helpers.UpdateCommitteeCache(genesisState, 0 /* genesis epoch */); err != nil { - return nil, err - } - if err := helpers.UpdateProposerIndicesInCache(ctx, genesisState); err != nil { - return nil, err - } - - s.cfg.AttService.SetGenesisTime(genesisState.GenesisTime()) - - return genesisState, nil } // Stop the blockchain service's main event loop and associated goroutines. @@ -312,7 +152,7 @@ func (s *Service) Stop() error { // Status always returns nil unless there is an error condition that causes // this service to be unhealthy. func (s *Service) Status() error { - if s.genesisRoot == params.BeaconConfig().ZeroHash { + if s.originBlockRoot == params.BeaconConfig().ZeroHash { return errors.New("genesis state has not been created") } if runtime.NumGoroutine() > s.cfg.MaxRoutines { @@ -321,61 +161,105 @@ func (s *Service) Status() error { return nil } -// This gets called when beacon chain is first initialized to save genesis data (state, block, and more) in db. -func (s *Service) saveGenesisData(ctx context.Context, genesisState state.BeaconState) error { - if err := s.cfg.BeaconDB.SaveGenesisData(ctx, genesisState); err != nil { - return errors.Wrap(err, "could not save genesis data") - } - genesisBlk, err := s.cfg.BeaconDB.GenesisBlock(ctx) - if err != nil || genesisBlk == nil || genesisBlk.IsNil() { - return fmt.Errorf("could not load genesis block: %v", err) - } - genesisBlkRoot, err := genesisBlk.Block().HashTreeRoot() +func (s *Service) startFromSavedState(saved state.BeaconState) error { + log.Info("Blockchain data already exists in DB, initializing...") + s.genesisTime = time.Unix(int64(saved.GenesisTime()), 0) + s.cfg.AttService.SetGenesisTime(saved.GenesisTime()) + + originRoot, err := s.originRootFromSavedState(s.ctx) if err != nil { - return errors.Wrap(err, "could not get genesis block root") + return err + } + s.originBlockRoot = originRoot + + if err := s.initializeHeadFromDB(s.ctx); err != nil { + return errors.Wrap(err, "could not set up chain info") + } + spawnCountdownIfPreGenesis(s.ctx, s.genesisTime, s.cfg.BeaconDB) + + justified, err := s.cfg.BeaconDB.JustifiedCheckpoint(s.ctx) + if err != nil { + return errors.Wrap(err, "could not get justified checkpoint") + } + s.prevJustifiedCheckpt = ethpb.CopyCheckpoint(justified) + s.bestJustifiedCheckpt = ethpb.CopyCheckpoint(justified) + s.justifiedCheckpt = ethpb.CopyCheckpoint(justified) + + finalized, err := s.cfg.BeaconDB.FinalizedCheckpoint(s.ctx) + if err != nil { + return errors.Wrap(err, "could not get finalized checkpoint") + } + s.prevFinalizedCheckpt = ethpb.CopyCheckpoint(finalized) + s.finalizedCheckpt = ethpb.CopyCheckpoint(finalized) + + store := protoarray.New(justified.Epoch, finalized.Epoch, bytesutil.ToBytes32(finalized.Root)) + s.cfg.ForkChoiceStore = store + + ss, err := slots.EpochStart(s.finalizedCheckpt.Epoch) + if err != nil { + return errors.Wrap(err, "could not get start slot of finalized epoch") + } + h := s.headBlock().Block() + if h.Slot() > ss { + log.WithFields(logrus.Fields{ + "startSlot": ss, + "endSlot": h.Slot(), + }).Info("Loading blocks to fork choice store, this may take a while.") + if err := s.fillInForkChoiceMissingBlocks(s.ctx, h, s.finalizedCheckpt, s.justifiedCheckpt); err != nil { + return errors.Wrap(err, "could not fill in fork choice store missing blocks") + } } - s.genesisRoot = genesisBlkRoot - s.cfg.StateGen.SaveFinalizedState(0 /*slot*/, genesisBlkRoot, genesisState) - - // Finalized checkpoint at genesis is a zero hash. - genesisCheckpoint := genesisState.FinalizedCheckpoint() - - s.justifiedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint) - s.prevJustifiedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint) - s.bestJustifiedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint) - s.finalizedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint) - s.prevFinalizedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint) - - if err := s.cfg.ForkChoiceStore.ProcessBlock(ctx, - genesisBlk.Block().Slot(), - genesisBlkRoot, - params.BeaconConfig().ZeroHash, - [32]byte{}, - genesisCheckpoint.Epoch, - genesisCheckpoint.Epoch); err != nil { - log.Fatalf("Could not process genesis block for fork choice: %v", err) + // not attempting to save initial sync blocks here, because there shouldn't be any until + // after the statefeed.Initialized event is fired (below) + if err := s.wsVerifier.VerifyWeakSubjectivity(s.ctx, s.finalizedCheckpt.Epoch); err != nil { + // Exit run time if the node failed to verify weak subjectivity checkpoint. + return errors.Wrap(err, "could not verify initial checkpoint provided for chain sync") } - s.setHead(genesisBlkRoot, genesisBlk, genesisState) + s.cfg.StateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.Initialized, + Data: &statefeed.InitializedData{ + StartTime: s.genesisTime, + GenesisValidatorsRoot: saved.GenesisValidatorRoot(), + }, + }) + + s.spawnProcessAttestationsRoutine(s.cfg.StateNotifier.StateFeed()) + return nil } -// This gets called to initialize chain info variables using the finalized checkpoint stored in DB -func (s *Service) initializeChainInfo(ctx context.Context) error { +func (s *Service) originRootFromSavedState(ctx context.Context) ([32]byte, error) { + // first check if we have started from checkpoint sync and have a root + originRoot, err := s.cfg.BeaconDB.OriginBlockRoot(ctx) + if err == nil { + return originRoot, nil + } + if !errors.Is(err, db.ErrNotFound) { + return originRoot, errors.Wrap(err, "could not retrieve checkpoint sync chain origin data from db") + } + + // we got here because OriginBlockRoot gave us an ErrNotFound. this means the node was started from a genesis state, + // so we should have a value for GenesisBlock genesisBlock, err := s.cfg.BeaconDB.GenesisBlock(ctx) if err != nil { - return errors.Wrap(err, "could not get genesis block from db") + return originRoot, errors.Wrap(err, "could not get genesis block from db") } if err := helpers.BeaconBlockIsNil(genesisBlock); err != nil { - return err + return originRoot, err } genesisBlkRoot, err := genesisBlock.Block().HashTreeRoot() if err != nil { - return errors.Wrap(err, "could not get signing root of genesis block") + return genesisBlkRoot, errors.Wrap(err, "could not get signing root of genesis block") } - s.genesisRoot = genesisBlkRoot + return genesisBlkRoot, nil +} +// initializeHeadFromDB uses the finalized checkpoint and head block found in the database to set the current head +// note that this may block until stategen replays blocks between the finalized and head blocks +// if the head sync flag was specified and the gap between the finalized and head blocks is at least 128 epochs long +func (s *Service) initializeHeadFromDB(ctx context.Context) error { finalized, err := s.cfg.BeaconDB.FinalizedCheckpoint(ctx) if err != nil { return errors.Wrap(err, "could not get finalized checkpoint from db") @@ -442,11 +326,146 @@ func (s *Service) initializeChainInfo(ctx context.Context) error { return nil } -// This is called when a client starts from non-genesis slot. This passes last justified and finalized -// information to fork choice service to initializes fork choice store. -func (s *Service) resumeForkChoice(justifiedCheckpoint, finalizedCheckpoint *ethpb.Checkpoint) { - store := protoarray.New(justifiedCheckpoint.Epoch, finalizedCheckpoint.Epoch, bytesutil.ToBytes32(finalizedCheckpoint.Root)) - s.cfg.ForkChoiceStore = store +func (s *Service) startFromPOWChain() error { + log.Info("Waiting to reach the validator deposit threshold to start the beacon chain...") + if s.cfg.ChainStartFetcher == nil { + return errors.New("not configured web3Service for POW chain") + } + go func() { + stateChannel := make(chan *feed.Event, 1) + stateSub := s.cfg.StateNotifier.StateFeed().Subscribe(stateChannel) + defer stateSub.Unsubscribe() + s.spawnProcessAttestationsRoutine(s.cfg.StateNotifier.StateFeed()) + for { + select { + case event := <-stateChannel: + if event.Type == statefeed.ChainStarted { + data, ok := event.Data.(*statefeed.ChainStartedData) + if !ok { + log.Error("event data is not type *statefeed.ChainStartedData") + return + } + log.WithField("starttime", data.StartTime).Debug("Received chain start event") + s.onPowchainStart(s.ctx, data.StartTime) + return + } + case <-s.ctx.Done(): + log.Debug("Context closed, exiting goroutine") + return + case err := <-stateSub.Err(): + log.WithError(err).Error("Subscription to state notifier failed") + return + } + } + }() + + return nil +} + +// onPowchainStart initializes a series of deposits from the ChainStart deposits in the eth1 +// deposit contract, initializes the beacon chain's state, and kicks off the beacon chain. +func (s *Service) onPowchainStart(ctx context.Context, genesisTime time.Time) { + preGenesisState := s.cfg.ChainStartFetcher.PreGenesisState() + initializedState, err := s.initializeBeaconChain(ctx, genesisTime, preGenesisState, s.cfg.ChainStartFetcher.ChainStartEth1Data()) + if err != nil { + log.Fatalf("Could not initialize beacon chain: %v", err) + } + // We start a counter to genesis, if needed. + gRoot, err := initializedState.HashTreeRoot(s.ctx) + if err != nil { + log.Fatalf("Could not hash tree root genesis state: %v", err) + } + go slots.CountdownToGenesis(ctx, genesisTime, uint64(initializedState.NumValidators()), gRoot) + + // We send out a state initialized event to the rest of the services + // running in the beacon node. + s.cfg.StateNotifier.StateFeed().Send(&feed.Event{ + Type: statefeed.Initialized, + Data: &statefeed.InitializedData{ + StartTime: genesisTime, + GenesisValidatorsRoot: initializedState.GenesisValidatorRoot(), + }, + }) +} + +// initializes the state and genesis block of the beacon chain to persistent storage +// based on a genesis timestamp value obtained from the ChainStart event emitted +// by the ETH1.0 Deposit Contract and the POWChain service of the node. +func (s *Service) initializeBeaconChain( + ctx context.Context, + genesisTime time.Time, + preGenesisState state.BeaconState, + eth1data *ethpb.Eth1Data) (state.BeaconState, error) { + ctx, span := trace.StartSpan(ctx, "beacon-chain.Service.initializeBeaconChain") + defer span.End() + s.genesisTime = genesisTime + unixTime := uint64(genesisTime.Unix()) + + genesisState, err := transition.OptimizedGenesisBeaconState(unixTime, preGenesisState, eth1data) + if err != nil { + return nil, errors.Wrap(err, "could not initialize genesis state") + } + + if err := s.saveGenesisData(ctx, genesisState); err != nil { + return nil, errors.Wrap(err, "could not save genesis data") + } + + log.Info("Initialized beacon chain genesis state") + + // Clear out all pre-genesis data now that the state is initialized. + s.cfg.ChainStartFetcher.ClearPreGenesisData() + + // Update committee shuffled indices for genesis epoch. + if err := helpers.UpdateCommitteeCache(genesisState, 0 /* genesis epoch */); err != nil { + return nil, err + } + if err := helpers.UpdateProposerIndicesInCache(ctx, genesisState); err != nil { + return nil, err + } + + s.cfg.AttService.SetGenesisTime(genesisState.GenesisTime()) + + return genesisState, nil +} + +// This gets called when beacon chain is first initialized to save genesis data (state, block, and more) in db. +func (s *Service) saveGenesisData(ctx context.Context, genesisState state.BeaconState) error { + if err := s.cfg.BeaconDB.SaveGenesisData(ctx, genesisState); err != nil { + return errors.Wrap(err, "could not save genesis data") + } + genesisBlk, err := s.cfg.BeaconDB.GenesisBlock(ctx) + if err != nil || genesisBlk == nil || genesisBlk.IsNil() { + return fmt.Errorf("could not load genesis block: %v", err) + } + genesisBlkRoot, err := genesisBlk.Block().HashTreeRoot() + if err != nil { + return errors.Wrap(err, "could not get genesis block root") + } + + s.originBlockRoot = genesisBlkRoot + s.cfg.StateGen.SaveFinalizedState(0 /*slot*/, genesisBlkRoot, genesisState) + + // Finalized checkpoint at genesis is a zero hash. + genesisCheckpoint := genesisState.FinalizedCheckpoint() + + s.justifiedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint) + s.prevJustifiedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint) + s.bestJustifiedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint) + s.finalizedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint) + s.prevFinalizedCheckpt = ethpb.CopyCheckpoint(genesisCheckpoint) + + if err := s.cfg.ForkChoiceStore.ProcessBlock(ctx, + genesisBlk.Block().Slot(), + genesisBlkRoot, + params.BeaconConfig().ZeroHash, + [32]byte{}, + genesisCheckpoint.Epoch, + genesisCheckpoint.Epoch); err != nil { + log.Fatalf("Could not process genesis block for fork choice: %v", err) + } + + s.setHead(genesisBlkRoot, genesisBlk, genesisState) + return nil } // This returns true if block has been processed before. Two ways to verify the block has been processed: @@ -460,3 +479,20 @@ func (s *Service) hasBlock(ctx context.Context, root [32]byte) bool { return s.cfg.BeaconDB.HasBlock(ctx, root) } + +func spawnCountdownIfPreGenesis(ctx context.Context, genesisTime time.Time, db db.HeadAccessDatabase) { + currentTime := prysmTime.Now() + if currentTime.After(genesisTime) { + return + } + + gState, err := db.GenesisState(ctx) + if err != nil { + log.Fatalf("Could not retrieve genesis state: %v", err) + } + gRoot, err := gState.HashTreeRoot(ctx) + if err != nil { + log.Fatalf("Could not hash tree root genesis state: %v", err) + } + go slots.CountdownToGenesis(ctx, genesisTime, uint64(gState.NumValidators()), gRoot) +} diff --git a/beacon-chain/blockchain/service_test.go b/beacon-chain/blockchain/service_test.go index 7d00391508..cef44d63f2 100644 --- a/beacon-chain/blockchain/service_test.go +++ b/beacon-chain/blockchain/service_test.go @@ -9,6 +9,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/prysmaticlabs/prysm/async/event" + mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" "github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache" b "github.com/prysmaticlabs/prysm/beacon-chain/core/blocks" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" @@ -283,9 +284,11 @@ func TestChainService_InitializeChainInfo(t *testing.T) { require.NoError(t, beaconDB.SaveState(ctx, headState, genesisRoot)) require.NoError(t, beaconDB.SaveBlock(ctx, wrapper.WrappedPhase0SignedBeaconBlock(headBlock))) require.NoError(t, beaconDB.SaveFinalizedCheckpoint(ctx, ðpb.Checkpoint{Epoch: slots.ToEpoch(finalizedSlot), Root: headRoot[:]})) - c := &Service{cfg: &config{BeaconDB: beaconDB, StateGen: stategen.New(beaconDB)}} - c.cfg.FinalizedStateAtStartUp = headState - require.NoError(t, c.initializeChainInfo(ctx)) + attSrv, err := attestations.NewService(ctx, &attestations.Config{}) + require.NoError(t, err) + c, err := NewService(ctx, WithDatabase(beaconDB), WithStateGen(stategen.New(beaconDB)), WithAttestationService(attSrv), WithStateNotifier(&mock.MockStateNotifier{}), WithFinalizedStateAtStartUp(headState)) + require.NoError(t, err) + require.NoError(t, c.startFromSavedState(headState)) headBlk, err := c.HeadBlock(ctx) require.NoError(t, err) assert.DeepEqual(t, headBlock, headBlk.Proto(), "Head block incorrect") @@ -298,7 +301,7 @@ func TestChainService_InitializeChainInfo(t *testing.T) { if !bytes.Equal(headRoot[:], r) { t.Error("head slot incorrect") } - assert.Equal(t, genesisRoot, c.genesisRoot, "Genesis block root incorrect") + assert.Equal(t, genesisRoot, c.originBlockRoot, "Genesis block root incorrect") } func TestChainService_InitializeChainInfo_SetHeadAtGenesis(t *testing.T) { @@ -324,12 +327,15 @@ func TestChainService_InitializeChainInfo_SetHeadAtGenesis(t *testing.T) { require.NoError(t, beaconDB.SaveState(ctx, headState, headRoot)) require.NoError(t, beaconDB.SaveState(ctx, headState, genesisRoot)) require.NoError(t, beaconDB.SaveBlock(ctx, wrapper.WrappedPhase0SignedBeaconBlock(headBlock))) - c := &Service{cfg: &config{BeaconDB: beaconDB, StateGen: stategen.New(beaconDB)}} - require.NoError(t, c.initializeChainInfo(ctx)) + attSrv, err := attestations.NewService(ctx, &attestations.Config{}) + require.NoError(t, err) + c, err := NewService(ctx, WithDatabase(beaconDB), WithStateGen(stategen.New(beaconDB)), WithAttestationService(attSrv), WithStateNotifier(&mock.MockStateNotifier{})) + require.NoError(t, err) + require.NoError(t, c.startFromSavedState(headState)) s, err := c.HeadState(ctx) require.NoError(t, err) assert.DeepSSZEqual(t, headState.InnerStateUnsafe(), s.InnerStateUnsafe(), "Head state incorrect") - assert.Equal(t, genesisRoot, c.genesisRoot, "Genesis block root incorrect") + assert.Equal(t, genesisRoot, c.originBlockRoot, "Genesis block root incorrect") assert.DeepEqual(t, genesis, c.head.block.Proto()) } @@ -381,13 +387,15 @@ func TestChainService_InitializeChainInfo_HeadSync(t *testing.T) { Root: finalizedRoot[:], })) - c := &Service{cfg: &config{BeaconDB: beaconDB, StateGen: stategen.New(beaconDB)}} - c.cfg.FinalizedStateAtStartUp = headState - require.NoError(t, c.initializeChainInfo(ctx)) + attSrv, err := attestations.NewService(ctx, &attestations.Config{}) + require.NoError(t, err) + c, err := NewService(ctx, WithDatabase(beaconDB), WithStateGen(stategen.New(beaconDB)), WithAttestationService(attSrv), WithStateNotifier(&mock.MockStateNotifier{}), WithFinalizedStateAtStartUp(headState)) + require.NoError(t, err) + require.NoError(t, c.startFromSavedState(headState)) s, err := c.HeadState(ctx) require.NoError(t, err) assert.DeepSSZEqual(t, headState.InnerStateUnsafe(), s.InnerStateUnsafe(), "Head state incorrect") - assert.Equal(t, genesisRoot, c.genesisRoot, "Genesis block root incorrect") + assert.Equal(t, genesisRoot, c.originBlockRoot, "Genesis block root incorrect") // Since head sync is not triggered, chain is initialized to the last finalization checkpoint. assert.DeepEqual(t, finalizedBlock, c.head.block.Proto()) assert.LogsContain(t, hook, "resetting head from the checkpoint ('--head-sync' flag is ignored)") @@ -404,11 +412,11 @@ func TestChainService_InitializeChainInfo_HeadSync(t *testing.T) { require.NoError(t, beaconDB.SaveHeadBlockRoot(ctx, headRoot)) hook.Reset() - require.NoError(t, c.initializeChainInfo(ctx)) + require.NoError(t, c.initializeHeadFromDB(ctx)) s, err = c.HeadState(ctx) require.NoError(t, err) assert.DeepSSZEqual(t, headState.InnerStateUnsafe(), s.InnerStateUnsafe(), "Head state incorrect") - assert.Equal(t, genesisRoot, c.genesisRoot, "Genesis block root incorrect") + assert.Equal(t, genesisRoot, c.originBlockRoot, "Genesis block root incorrect") // Head slot is far beyond the latest finalized checkpoint, head sync is triggered. assert.DeepEqual(t, headBlock, c.head.block.Proto()) assert.LogsContain(t, hook, "Regenerating state from the last checkpoint at slot 225") @@ -478,7 +486,7 @@ func TestProcessChainStartTime_ReceivedFeed(t *testing.T) { stateChannel := make(chan *feed.Event, 1) stateSub := service.cfg.StateNotifier.StateFeed().Subscribe(stateChannel) defer stateSub.Unsubscribe() - service.processChainStartTime(context.Background(), time.Now()) + service.onPowchainStart(context.Background(), time.Now()) stateEvent := <-stateChannel require.Equal(t, int(stateEvent.Type), statefeed.Initialized) diff --git a/beacon-chain/db/BUILD.bazel b/beacon-chain/db/BUILD.bazel index 5226616c06..f7977529a8 100644 --- a/beacon-chain/db/BUILD.bazel +++ b/beacon-chain/db/BUILD.bazel @@ -5,6 +5,7 @@ go_library( srcs = [ "alias.go", "db.go", + "errors.go", "log.go", "restore.go", ], diff --git a/beacon-chain/db/errors.go b/beacon-chain/db/errors.go new file mode 100644 index 0000000000..7142e53ad3 --- /dev/null +++ b/beacon-chain/db/errors.go @@ -0,0 +1,9 @@ +package db + +import "github.com/prysmaticlabs/prysm/beacon-chain/db/kv" + +// ErrNotFound can be used to determine if an error from a method in the database package +// represents a "not found" error. These often require different handling than a low-level +// i/o error. This variable copies the value in the kv package to the same scope as the Database interfaces, +// so that it is available to code paths that do not interact directly with the kv package. +var ErrNotFound = kv.ErrNotFound diff --git a/beacon-chain/db/iface/interface.go b/beacon-chain/db/iface/interface.go index 1fda162bf0..ad900097f7 100644 --- a/beacon-chain/db/iface/interface.go +++ b/beacon-chain/db/iface/interface.go @@ -48,6 +48,9 @@ type ReadOnlyDatabase interface { DepositContractAddress(ctx context.Context) ([]byte, error) // Powchain operations. PowchainData(ctx context.Context) (*ethpb.ETH1ChainData, error) + + // origin checkpoint sync support + OriginBlockRoot(ctx context.Context) ([32]byte, error) } // NoHeadAccessDatabase defines a struct without access to chain head data. @@ -90,6 +93,9 @@ type HeadAccessDatabase interface { LoadGenesis(ctx context.Context, r io.Reader) error SaveGenesisData(ctx context.Context, state state.BeaconState) error EnsureEmbeddedGenesis(ctx context.Context) error + + // initialization method needed for origin checkpoint sync + SaveOrigin(ctx context.Context, state io.Reader, block io.Reader) error } // SlasherDatabase interface for persisting data related to detecting slashable offenses on Ethereum. diff --git a/beacon-chain/db/kv/BUILD.bazel b/beacon-chain/db/kv/BUILD.bazel index 9604b2f1ee..c5b9bc55a0 100644 --- a/beacon-chain/db/kv/BUILD.bazel +++ b/beacon-chain/db/kv/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "checkpoint.go", "deposit_contract.go", "encoding.go", + "error.go", "finalized_block_roots.go", "genesis.go", "key.go", @@ -24,6 +25,7 @@ go_library( "state_summary.go", "state_summary_cache.go", "utils.go", + "wss.go", ], importpath = "github.com/prysmaticlabs/prysm/beacon-chain/db/kv", visibility = [ @@ -81,6 +83,7 @@ go_test( "checkpoint_test.go", "deposit_contract_test.go", "encoding_test.go", + "error_test.go", "finalized_block_roots_test.go", "genesis_test.go", "init_test.go", diff --git a/beacon-chain/db/kv/blocks.go b/beacon-chain/db/kv/blocks.go index f17b8d4322..61f94eb819 100644 --- a/beacon-chain/db/kv/blocks.go +++ b/beacon-chain/db/kv/blocks.go @@ -46,6 +46,28 @@ func (s *Store) Block(ctx context.Context, blockRoot [32]byte) (block.SignedBeac return blk, err } +// OriginBlockRoot returns the value written to the db in SaveOriginBlockRoot +// This is the root of a finalized block within the weak subjectivity period +// at the time the chain was started, used to initialize the database and chain +// without syncing from genesis. +func (s *Store) OriginBlockRoot(ctx context.Context) ([32]byte, error) { + _, span := trace.StartSpan(ctx, "BeaconDB.OriginBlockRoot") + defer span.End() + + var root [32]byte + err := s.db.View(func(tx *bolt.Tx) error { + bkt := tx.Bucket(blocksBucket) + rootSlice := bkt.Get(originBlockRootKey) + if rootSlice == nil { + return ErrNotFoundOriginBlockRoot + } + copy(root[:], rootSlice) + return nil + }) + + return root, err +} + // HeadBlock returns the latest canonical block in the Ethereum Beacon Chain. func (s *Store) HeadBlock(ctx context.Context) (block.SignedBeaconBlock, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.HeadBlock") @@ -336,6 +358,19 @@ func (s *Store) SaveGenesisBlockRoot(ctx context.Context, blockRoot [32]byte) er }) } +// SaveOriginBlockRoot is used to keep track of the block root used for origin sync. +// This should be a finalized block from within the current weak subjectivity period. +// This value is used by a running beacon chain node to locate the state at the beginning +// of the chain history, in places where genesis would typically be used. +func (s *Store) SaveOriginBlockRoot(ctx context.Context, blockRoot [32]byte) error { + _, span := trace.StartSpan(ctx, "BeaconDB.SaveOriginBlockRoot") + defer span.End() + return s.db.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket(blocksBucket) + return bucket.Put(originBlockRootKey, blockRoot[:]) + }) +} + // HighestSlotBlocksBelow returns the block with the highest slot below the input slot from the db. func (s *Store) HighestSlotBlocksBelow(ctx context.Context, slot types.Slot) ([]block.SignedBeaconBlock, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.HighestSlotBlocksBelow") diff --git a/beacon-chain/db/kv/error.go b/beacon-chain/db/kv/error.go new file mode 100644 index 0000000000..b0cdfca508 --- /dev/null +++ b/beacon-chain/db/kv/error.go @@ -0,0 +1,46 @@ +package kv + +import "errors" + +// ErrNotFound can be used directly, or as a wrapped DBError, whenever a db method needs to +// indicate that a value couldn't be found. +var ErrNotFound = errors.New("not found in db") + +// ErrNotFoundOriginBlockRoot is an error specifically for the origin block root getter +var ErrNotFoundOriginBlockRoot = WrapDBError(ErrNotFound, "OriginBlockRoot") + +// WrapDBError wraps an error in a DBError. See commentary on DBError for more context. +func WrapDBError(e error, outer string) error { + return DBError{ + Wraps: e, + Outer: errors.New(outer), + } +} + +// DBError implements the Error method so that it can be asserted as an error. +// The Unwrap method supports error wrapping, enabling it to be used with errors.Is/As. +// The primary use case is to make it simple for database methods to return errors +// that wrap ErrNotFound, allowing calling code to check for "not found" errors +// like: `error.Is(err, ErrNotFound)`. This is intended to improve error handling +// in db lookup methods that need to differentiate between a missing value and some +// other database error. for more background see: +// https://go.dev/blog/go1.13-errors +type DBError struct { + Wraps error + Outer error +} + +// Error satisfies the error interface, so that DBErrors can be used anywhere that +// expects an `error`. +func (e DBError) Error() string { + es := e.Outer.Error() + if e.Wraps != nil { + es += ": " + e.Wraps.Error() + } + return es +} + +// Unwrap is used by the errors package Is and As methods. +func (e DBError) Unwrap() error { + return e.Wraps +} diff --git a/beacon-chain/db/kv/error_test.go b/beacon-chain/db/kv/error_test.go new file mode 100644 index 0000000000..8b02e8cbcb --- /dev/null +++ b/beacon-chain/db/kv/error_test.go @@ -0,0 +1,24 @@ +package kv + +import ( + "errors" + "testing" +) + +func TestWrappedSentinelError(t *testing.T) { + e := ErrNotFoundOriginBlockRoot + if !errors.Is(e, ErrNotFoundOriginBlockRoot) { + t.Error("expected that a copy of ErrNotFoundOriginBlockRoot would have an is-a relationship") + } + + outer := errors.New("wrapped error") + e2 := DBError{Wraps: ErrNotFoundOriginBlockRoot, Outer: outer} + if !errors.Is(e2, ErrNotFoundOriginBlockRoot) { + t.Error("expected that errors.Is would know DBError wraps ErrNotFoundOriginBlockRoot") + } + + // test that the innermost not found error is detected + if !errors.Is(e2, ErrNotFound) { + t.Error("expected that errors.Is would know ErrNotFoundOriginBlockRoot wraps ErrNotFound") + } +} diff --git a/beacon-chain/db/kv/schema.go b/beacon-chain/db/kv/schema.go index 7fa05bed0f..40f4f4a6d4 100644 --- a/beacon-chain/db/kv/schema.go +++ b/beacon-chain/db/kv/schema.go @@ -48,6 +48,8 @@ var ( // Objects that are only compatible with specific forks should be prefixed with such keys. altairKey = []byte("altair") mergeKey = []byte("merge") + // block root included in the beacon state used by weak subjectivity initial sync + originBlockRootKey = []byte("origin-block-root") // Deprecated: This index key was migrated in PR 6461. Do not use, except for migrations. lastArchivedIndexKey = []byte("last-archived") diff --git a/beacon-chain/db/kv/wss.go b/beacon-chain/db/kv/wss.go new file mode 100644 index 0000000000..82194dc3cb --- /dev/null +++ b/beacon-chain/db/kv/wss.go @@ -0,0 +1,89 @@ +package kv + +import ( + "context" + "io" + "io/ioutil" + + "github.com/pkg/errors" + types "github.com/prysmaticlabs/eth2-types" + statev2 "github.com/prysmaticlabs/prysm/beacon-chain/state/v2" + "github.com/prysmaticlabs/prysm/config/params" + ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper" +) + +// SaveOrigin loads an ssz serialized Block & BeaconState from an io.Reader +// (ex: an open file) prepares the database so that the beacon node can begin +// syncing, using the provided values as their point of origin. This is an alternative +// to syncing from genesis, and should only be run on an empty database. +func (s *Store) SaveOrigin(ctx context.Context, stateReader, blockReader io.Reader) error { + // unmarshal both block and state before trying to save anything + // so that we fail early if there is any issue with the ssz data + blk := ðpb.SignedBeaconBlockAltair{} + bb, err := ioutil.ReadAll(blockReader) + if err != nil { + return errors.Wrap(err, "error reading block given to SaveOrigin") + } + if err := blk.UnmarshalSSZ(bb); err != nil { + return errors.Wrap(err, "could not unmarshal checkpoint block") + } + wblk, err := wrapper.WrappedAltairSignedBeaconBlock(blk) + if err != nil { + return errors.Wrap(err, "could not wrap checkpoint block") + } + bs, err := statev2.InitializeFromSSZReader(stateReader) + if err != nil { + return errors.Wrap(err, "could not initialize checkpoint state from reader") + } + + // save block + if err := s.SaveBlock(ctx, wblk); err != nil { + return errors.Wrap(err, "could not save checkpoint block") + } + blockRoot, err := blk.Block.HashTreeRoot() + if err != nil { + return errors.Wrap(err, "could not compute HashTreeRoot of checkpoint block") + } + + // save state + if err = s.SaveState(ctx, bs, blockRoot); err != nil { + return errors.Wrap(err, "could not save state") + } + if err = s.SaveStateSummary(ctx, ðpb.StateSummary{ + Slot: bs.Slot(), + Root: blockRoot[:], + }); err != nil { + return errors.Wrap(err, "could not save state summary") + } + + // save origin block root in special key, to be used when the canonical + // origin (start of chain, ie alternative to genesis) block or state is needed + if err = s.SaveOriginBlockRoot(ctx, blockRoot); err != nil { + return errors.Wrap(err, "could not save origin block root") + } + + // mark block as head of chain, so that processing will pick up from this point + if err = s.SaveHeadBlockRoot(ctx, blockRoot); err != nil { + return errors.Wrap(err, "could not save head block root") + } + + // rebuild the checkpoint from the block + // use it to mark the block as justified and finalized + slotEpoch, err := blk.Block.Slot.SafeDivSlot(params.BeaconConfig().SlotsPerEpoch) + if err != nil { + return err + } + chkpt := ðpb.Checkpoint{ + Epoch: types.Epoch(slotEpoch), + Root: blockRoot[:], + } + if err = s.SaveJustifiedCheckpoint(ctx, chkpt); err != nil { + return errors.Wrap(err, "could not mark checkpoint sync block as justified") + } + if err = s.SaveFinalizedCheckpoint(ctx, chkpt); err != nil { + return errors.Wrap(err, "could not mark checkpoint sync block as finalized") + } + + return nil +} diff --git a/beacon-chain/state/v2/state_trie.go b/beacon-chain/state/v2/state_trie.go index fc00961abd..2b00c95c40 100644 --- a/beacon-chain/state/v2/state_trie.go +++ b/beacon-chain/state/v2/state_trie.go @@ -2,6 +2,8 @@ package v2 import ( "context" + "io" + "io/ioutil" "runtime" "sort" @@ -35,6 +37,27 @@ func InitializeFromProto(st *ethpb.BeaconStateAltair) (*BeaconState, error) { return InitializeFromProtoUnsafe(proto.Clone(st).(*ethpb.BeaconStateAltair)) } +// InitializeFromSSZReader can be used when the source for a serialized BeaconState object +// is an io.Reader. This allows client code to remain agnostic about whether the data comes +// from the network or a file without needing to read the entire state into mem as a large byte slice. +func InitializeFromSSZReader(r io.Reader) (*BeaconState, error) { + b, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + return InitializeFromSSZBytes(b) +} + +// InitializeFromSSZBytes is a convenience method to obtain a BeaconState by unmarshaling +// a slice of bytes containing the ssz-serialized representation of the state. +func InitializeFromSSZBytes(marshaled []byte) (*BeaconState, error) { + st := ðpb.BeaconStateAltair{} + if err := st.UnmarshalSSZ(marshaled); err != nil { + return nil, err + } + return InitializeFromProtoUnsafe(st) +} + // InitializeFromProtoUnsafe directly uses the beacon state protobuf pointer // and sets it as the inner state of the BeaconState type. func InitializeFromProtoUnsafe(st *ethpb.BeaconStateAltair) (*BeaconState, error) {