mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 13:58:09 -05:00
Compare commits
1 Commits
rm-interop
...
backfill-v
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c7cd813761 |
@@ -141,7 +141,7 @@ func ValidateBLSToExecutionChange(st state.ReadOnlyBeaconState, signed *ethpb.Si
|
||||
// next_validator_index = ValidatorIndex((expected_withdrawals[-1].validator_index + 1) % len(state.validators))
|
||||
// state.next_withdrawal_validator_index = next_validator_index
|
||||
// else:
|
||||
// # Advance sweep by the max length of the sweep if there was not a full set of withdrawals
|
||||
// # FillFwd sweep by the max length of the sweep if there was not a full set of withdrawals
|
||||
// next_index = state.next_withdrawal_validator_index + MAX_VALIDATORS_PER_WITHDRAWALS_SWEEP
|
||||
// next_validator_index = ValidatorIndex(next_index % len(state.validators))
|
||||
// state.next_withdrawal_validator_index = next_validator_index
|
||||
|
||||
@@ -16,6 +16,7 @@ go_library(
|
||||
"//consensus-types/interfaces:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//monitoring/backup:go_default_library",
|
||||
"//proto/dbval:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"@com_github_ethereum_go_ethereum//common:go_default_library",
|
||||
],
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/monitoring/backup"
|
||||
"github.com/prysmaticlabs/prysm/v4/proto/dbval"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
)
|
||||
|
||||
@@ -56,7 +57,7 @@ type ReadOnlyDatabase interface {
|
||||
RegistrationByValidatorID(ctx context.Context, id primitives.ValidatorIndex) (*ethpb.ValidatorRegistrationV1, error)
|
||||
// origin checkpoint sync support
|
||||
OriginCheckpointBlockRoot(ctx context.Context) ([32]byte, error)
|
||||
BackfillBlockRoot(ctx context.Context) ([32]byte, error)
|
||||
BackfillStatus(context.Context) (*dbval.BackfillStatus, error)
|
||||
}
|
||||
|
||||
// NoHeadAccessDatabase defines a struct without access to chain head data.
|
||||
@@ -107,7 +108,7 @@ type HeadAccessDatabase interface {
|
||||
|
||||
// initialization method needed for origin checkpoint sync
|
||||
SaveOrigin(ctx context.Context, serState, serBlock []byte) error
|
||||
SaveBackfillBlockRoot(ctx context.Context, blockRoot [32]byte) error
|
||||
SaveBackfillStatus(context.Context, *dbval.BackfillStatus) error
|
||||
}
|
||||
|
||||
// SlasherDatabase interface for persisting data related to detecting slashable offenses on Ethereum.
|
||||
|
||||
@@ -4,6 +4,7 @@ go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"archived_point.go",
|
||||
"backfill.go",
|
||||
"backup.go",
|
||||
"blocks.go",
|
||||
"checkpoint.go",
|
||||
@@ -48,6 +49,7 @@ go_library(
|
||||
"//io/file:go_default_library",
|
||||
"//monitoring/progress:go_default_library",
|
||||
"//monitoring/tracing:go_default_library",
|
||||
"//proto/dbval:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//runtime/version:go_default_library",
|
||||
"//time:go_default_library",
|
||||
@@ -73,6 +75,7 @@ go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"archived_point_test.go",
|
||||
"backfill_test.go",
|
||||
"backup_test.go",
|
||||
"blocks_test.go",
|
||||
"checkpoint_test.go",
|
||||
@@ -107,6 +110,7 @@ go_test(
|
||||
"//consensus-types/interfaces:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//encoding/bytesutil:go_default_library",
|
||||
"//proto/dbval:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//proto/testing:go_default_library",
|
||||
"//testing/assert:go_default_library",
|
||||
|
||||
39
beacon-chain/db/kv/backfill.go
Normal file
39
beacon-chain/db/kv/backfill.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/proto/dbval"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
"go.opencensus.io/trace"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func (s *Store) SaveBackfillStatus(ctx context.Context, bf *dbval.BackfillStatus) error {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBackfillStatus")
|
||||
defer span.End()
|
||||
bfb, err := proto.Marshal(bf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(blocksBucket)
|
||||
return bucket.Put(backfillStatusKey, bfb)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Store) BackfillStatus(ctx context.Context) (*dbval.BackfillStatus, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBackfillStatus")
|
||||
defer span.End()
|
||||
bf := &dbval.BackfillStatus{}
|
||||
err := s.db.View(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(blocksBucket)
|
||||
bs := bucket.Get(backfillStatusKey)
|
||||
if len(bs) == 0 {
|
||||
return errors.Wrap(ErrNotFound, "BackfillStatus not found")
|
||||
}
|
||||
return proto.Unmarshal(bs, bf)
|
||||
})
|
||||
return bf, err
|
||||
}
|
||||
37
beacon-chain/db/kv/backfill_test.go
Normal file
37
beacon-chain/db/kv/backfill_test.go
Normal file
@@ -0,0 +1,37 @@
|
||||
package kv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/v4/proto/dbval"
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/require"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func TestBackfillRoundtrip(t *testing.T) {
|
||||
db := setupDB(t)
|
||||
b := &dbval.BackfillStatus{}
|
||||
b.HighSlot = 23
|
||||
b.LowSlot = 13
|
||||
b.HighRoot = bytesutil.PadTo([]byte("high"), 42)
|
||||
b.LowRoot = bytesutil.PadTo([]byte("low"), 24)
|
||||
m, err := proto.Marshal(b)
|
||||
require.NoError(t, err)
|
||||
ub := &dbval.BackfillStatus{}
|
||||
require.NoError(t, proto.Unmarshal(m, ub))
|
||||
require.Equal(t, b.HighSlot, ub.HighSlot)
|
||||
require.DeepEqual(t, b.HighRoot, ub.HighRoot)
|
||||
require.Equal(t, b.LowSlot, ub.LowSlot)
|
||||
require.DeepEqual(t, b.LowRoot, ub.LowRoot)
|
||||
|
||||
ctx := context.Background()
|
||||
require.NoError(t, db.SaveBackfillStatus(ctx, b))
|
||||
dbub, err := db.BackfillStatus(ctx)
|
||||
|
||||
require.Equal(t, b.HighSlot, dbub.HighSlot)
|
||||
require.DeepEqual(t, b.HighRoot, dbub.HighRoot)
|
||||
require.Equal(t, b.LowSlot, dbub.LowSlot)
|
||||
require.DeepEqual(t, b.LowRoot, dbub.LowRoot)
|
||||
}
|
||||
@@ -70,25 +70,6 @@ func (s *Store) OriginCheckpointBlockRoot(ctx context.Context) ([32]byte, error)
|
||||
return root, err
|
||||
}
|
||||
|
||||
// BackfillBlockRoot keeps track of the highest block available before the OriginCheckpointBlockRoot
|
||||
func (s *Store) BackfillBlockRoot(ctx context.Context) ([32]byte, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.BackfillBlockRoot")
|
||||
defer span.End()
|
||||
|
||||
var root [32]byte
|
||||
err := s.db.View(func(tx *bolt.Tx) error {
|
||||
bkt := tx.Bucket(blocksBucket)
|
||||
rootSlice := bkt.Get(backfillBlockRootKey)
|
||||
if len(rootSlice) == 0 {
|
||||
return ErrNotFoundBackfillBlockRoot
|
||||
}
|
||||
root = bytesutil.ToBytes32(rootSlice)
|
||||
return nil
|
||||
})
|
||||
|
||||
return root, err
|
||||
}
|
||||
|
||||
// HeadBlock returns the latest canonical block in the Ethereum Beacon Chain.
|
||||
func (s *Store) HeadBlock(ctx context.Context) (interfaces.ReadOnlySignedBeaconBlock, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.HeadBlock")
|
||||
@@ -417,17 +398,6 @@ func (s *Store) SaveOriginCheckpointBlockRoot(ctx context.Context, blockRoot [32
|
||||
})
|
||||
}
|
||||
|
||||
// SaveBackfillBlockRoot is used to keep track of the most recently backfilled block root when
|
||||
// the node was initialized via checkpoint sync.
|
||||
func (s *Store) SaveBackfillBlockRoot(ctx context.Context, blockRoot [32]byte) error {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBackfillBlockRoot")
|
||||
defer span.End()
|
||||
return s.db.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(blocksBucket)
|
||||
return bucket.Put(backfillBlockRootKey, blockRoot[:])
|
||||
})
|
||||
}
|
||||
|
||||
// HighestRootsBelowSlot returns roots from the database slot index from the highest slot below the input slot.
|
||||
// The slot value at the beginning of the return list is the slot where the roots were found. This is helpful so that
|
||||
// calling code can make decisions based on the slot without resolving the blocks to discover their slot (for instance
|
||||
|
||||
@@ -92,23 +92,6 @@ var blockTests = []struct {
|
||||
},
|
||||
}
|
||||
|
||||
func TestStore_SaveBackfillBlockRoot(t *testing.T) {
|
||||
db := setupDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
_, err := db.BackfillBlockRoot(ctx)
|
||||
require.ErrorIs(t, err, ErrNotFoundBackfillBlockRoot)
|
||||
|
||||
var expected [32]byte
|
||||
copy(expected[:], []byte{0x23})
|
||||
err = db.SaveBackfillBlockRoot(ctx, expected)
|
||||
require.NoError(t, err)
|
||||
actual, err := db.BackfillBlockRoot(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, actual)
|
||||
|
||||
}
|
||||
|
||||
func TestStore_SaveBlock_NoDuplicates(t *testing.T) {
|
||||
BlockCacheSize = 1
|
||||
slot := primitives.Slot(20)
|
||||
|
||||
@@ -57,8 +57,8 @@ var (
|
||||
saveBlindedBeaconBlocksKey = []byte("save-blinded-beacon-blocks")
|
||||
// block root included in the beacon state used by weak subjectivity initial sync
|
||||
originCheckpointBlockRootKey = []byte("origin-checkpoint-block-root")
|
||||
// block root tracking the progress of backfill, or pointing at genesis if backfill has not been initiated
|
||||
backfillBlockRootKey = []byte("backfill-block-root")
|
||||
// tracking data about an ongoing backfill
|
||||
backfillStatusKey = []byte("backfill-status")
|
||||
|
||||
// Deprecated: This index key was migrated in PR 6461. Do not use, except for migrations.
|
||||
lastArchivedIndexKey = []byte("last-archived")
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/ssz/detect"
|
||||
"github.com/prysmaticlabs/prysm/v4/proto/dbval"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/runtime/version"
|
||||
)
|
||||
@@ -24,11 +25,6 @@ func (s *Store) SaveOrigin(ctx context.Context, serState, serBlock []byte) error
|
||||
}
|
||||
return errors.Wrap(err, "genesis block root query error: checkpoint sync must verify genesis to proceed")
|
||||
}
|
||||
err = s.SaveBackfillBlockRoot(ctx, genesisRoot)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "unable to save genesis root as initial backfill starting point for checkpoint sync")
|
||||
}
|
||||
|
||||
cf, err := detect.FromState(serState)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not sniff config+fork for origin state bytes")
|
||||
@@ -50,11 +46,24 @@ func (s *Store) SaveOrigin(ctx context.Context, serState, serBlock []byte) error
|
||||
}
|
||||
blk := wblk.Block()
|
||||
|
||||
// save block
|
||||
blockRoot, err := blk.HashTreeRoot()
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not compute HashTreeRoot of checkpoint block")
|
||||
}
|
||||
|
||||
bf := &dbval.BackfillStatus{
|
||||
HighSlot: uint64(wblk.Block().Slot()),
|
||||
HighRoot: blockRoot[:],
|
||||
LowSlot: 0,
|
||||
LowRoot: genesisRoot[:],
|
||||
OriginRoot: blockRoot[:],
|
||||
OriginSlot: uint64(wblk.Block().Slot()),
|
||||
}
|
||||
|
||||
if err = s.SaveBackfillStatus(ctx, bf); err != nil {
|
||||
return errors.Wrap(err, "unable to save backfill status data to db for checkpoint sync.")
|
||||
}
|
||||
|
||||
log.Infof("saving checkpoint block to db, w/ root=%#x", blockRoot)
|
||||
if err := s.SaveBlock(ctx, wblk); err != nil {
|
||||
return errors.Wrap(err, "could not save checkpoint block")
|
||||
|
||||
@@ -209,6 +209,18 @@ func New(cliCtx *cli.Context, opts ...Option) (*BeaconNode, error) {
|
||||
if err := bfs.Reload(ctx); err != nil {
|
||||
return nil, errors.Wrap(err, "backfill status initialization error")
|
||||
}
|
||||
bf, err := backfill.NewService(ctx,
|
||||
backfill.WithGenesisWaiter(beacon.genesisWaiter),
|
||||
backfill.WithStatusUpdater(bfs),
|
||||
backfill.WithBackfillDB(beacon.db),
|
||||
backfill.WithP2P(beacon.fetchP2P()),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "backfill service initialization error")
|
||||
}
|
||||
if err := beacon.services.RegisterService(bf); err != nil {
|
||||
return nil, errors.Wrap(err, "error registering backfill service")
|
||||
}
|
||||
|
||||
log.Debugln("Starting State Gen")
|
||||
if err := beacon.startStateGen(ctx, bfs, beacon.forkChoicer); err != nil {
|
||||
@@ -496,7 +508,7 @@ func (b *BeaconNode) startSlasherDB(cliCtx *cli.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *BeaconNode) startStateGen(ctx context.Context, bfs *backfill.Status, fc forkchoice.ForkChoicer) error {
|
||||
func (b *BeaconNode) startStateGen(ctx context.Context, bfs *backfill.StatusUpdater, fc forkchoice.ForkChoicer) error {
|
||||
opts := []stategen.StateGenOption{stategen.WithBackfillStatus(bfs)}
|
||||
sg := stategen.New(b.db, fc, opts...)
|
||||
|
||||
|
||||
@@ -50,7 +50,7 @@ type State struct {
|
||||
finalizedInfo *finalizedInfo
|
||||
epochBoundaryStateCache *epochBoundaryState
|
||||
saveHotStateDB *saveHotStateDbConfig
|
||||
backfillStatus *backfill.Status
|
||||
backfillStatus *backfill.StatusUpdater
|
||||
migrationLock *sync.Mutex
|
||||
fc forkchoice.ForkChoicer
|
||||
}
|
||||
@@ -77,7 +77,7 @@ type finalizedInfo struct {
|
||||
// StateGenOption is a functional option for controlling the initialization of a *State value
|
||||
type StateGenOption func(*State)
|
||||
|
||||
func WithBackfillStatus(bfs *backfill.Status) StateGenOption {
|
||||
func WithBackfillStatus(bfs *backfill.StatusUpdater) StateGenOption {
|
||||
return func(sg *State) {
|
||||
sg.backfillStatus = bfs
|
||||
}
|
||||
|
||||
@@ -2,15 +2,27 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["status.go"],
|
||||
srcs = [
|
||||
"batch.go",
|
||||
"batcher.go",
|
||||
"service.go",
|
||||
"status.go",
|
||||
"worker.go",
|
||||
],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/backfill",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//beacon-chain/p2p:go_default_library",
|
||||
"//beacon-chain/startup:go_default_library",
|
||||
"//consensus-types/blocks:go_default_library",
|
||||
"//consensus-types/interfaces:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//proto/dbval:go_default_library",
|
||||
"//runtime:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@org_golang_google_protobuf//proto:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -19,11 +31,13 @@ go_test(
|
||||
srcs = ["status_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/blocks:go_default_library",
|
||||
"//consensus-types/blocks/testing:go_default_library",
|
||||
"//consensus-types/interfaces:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//proto/dbval:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"//testing/util:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
|
||||
34
beacon-chain/sync/backfill/batch.go
Normal file
34
beacon-chain/sync/backfill/batch.go
Normal file
@@ -0,0 +1,34 @@
|
||||
package backfill
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type batchId string
|
||||
|
||||
type batch struct {
|
||||
scheduled time.Time
|
||||
retries int
|
||||
begin primitives.Slot
|
||||
end primitives.Slot // half-open interval, [begin, end), ie >= start, < end.
|
||||
results []blocks.ROBlock
|
||||
err error
|
||||
succeeded bool
|
||||
}
|
||||
|
||||
func (b batch) logFields() log.Fields {
|
||||
return map[string]interface{}{
|
||||
"batch_id": b.id(),
|
||||
"scheduled": b.scheduled.String(),
|
||||
"retries": b.retries,
|
||||
}
|
||||
}
|
||||
|
||||
func (b batch) id() batchId {
|
||||
return batchId(fmt.Sprintf("%d:%d", b.begin, b.end))
|
||||
}
|
||||
109
beacon-chain/sync/backfill/batcher.go
Normal file
109
beacon-chain/sync/backfill/batcher.go
Normal file
@@ -0,0 +1,109 @@
|
||||
package backfill
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
)
|
||||
|
||||
var ErrRetryLimitExceeded = errors.New("Unable to retrieve backfill batch")
|
||||
|
||||
type batcher struct {
|
||||
nWorkers int
|
||||
size primitives.Slot
|
||||
su *StatusUpdater
|
||||
todo chan batch
|
||||
done chan batch
|
||||
errc chan error
|
||||
// outstanding is keyed by the id of the batch that is relied on
|
||||
// ie if batch id 2 relies on batch id 1, and 1 is head
|
||||
outstanding map[batchId]*batch
|
||||
nextId batchId
|
||||
lastId batchId
|
||||
}
|
||||
|
||||
func (br *batcher) run(ctx context.Context) {
|
||||
status := br.su.Status()
|
||||
// Set min at bottom of backfill range. Add 1 because range is inclusive.
|
||||
min := primitives.Slot(status.LowSlot) + 1
|
||||
initial := br.next(min, primitives.Slot(status.HighSlot))
|
||||
br.nextId, br.lastId = initial.id(), initial.id()
|
||||
br.outstanding[initial.id()] = &initial
|
||||
br.todo <- initial
|
||||
for {
|
||||
for i := 0; i < br.nWorkers-len(br.outstanding); i++ {
|
||||
last := br.outstanding[br.lastId]
|
||||
newLast := br.next(min, last.begin)
|
||||
br.outstanding[newLast.id()] = &newLast
|
||||
br.lastId = newLast.id()
|
||||
br.todo <- newLast
|
||||
}
|
||||
select {
|
||||
case b := <-br.done:
|
||||
if err := br.completeBatch(b); err != nil {
|
||||
br.errc <- err
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (br *batcher) completeBatch(b batch) error {
|
||||
// if the batch failed, send it back to the work queue.
|
||||
// we have no limit on the number of retries, because all batches are necessary.
|
||||
if b.err != nil {
|
||||
b.err = nil
|
||||
br.outstanding[b.id()] = &b
|
||||
br.todo <- b
|
||||
return nil
|
||||
}
|
||||
|
||||
br.outstanding[b.id()] = &b
|
||||
if err := br.includeCompleted(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (br *batcher) includeCompleted() error {
|
||||
for len(br.outstanding) > 0 {
|
||||
b := br.outstanding[br.nextId]
|
||||
if !b.succeeded {
|
||||
return nil
|
||||
}
|
||||
if err := br.updateDB(*b); err != nil {
|
||||
return err
|
||||
}
|
||||
status := br.su.Status()
|
||||
min := primitives.Slot(status.LowSlot)
|
||||
promote := br.outstanding[br.next(min, b.begin).id()]
|
||||
br.nextId = promote.id()
|
||||
delete(br.outstanding, b.id())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (br *batcher) updateDB(b batch) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (br *batcher) next(min, upper primitives.Slot) batch {
|
||||
n := batch{begin: min}
|
||||
n.end = upper // Batches don't overlap because end is exclusive, begin is inclusive.
|
||||
if upper > br.size+min {
|
||||
n.begin = upper - br.size
|
||||
}
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
func newBatcher(size primitives.Slot, su *StatusUpdater, todo, done chan batch) *batcher {
|
||||
return &batcher{
|
||||
size: size,
|
||||
su: su,
|
||||
todo: todo,
|
||||
done: done,
|
||||
}
|
||||
}
|
||||
140
beacon-chain/sync/backfill/service.go
Normal file
140
beacon-chain/sync/backfill/service.go
Normal file
@@ -0,0 +1,140 @@
|
||||
package backfill
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/runtime"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const defaultWorkerCount = 1
|
||||
|
||||
type Service struct {
|
||||
ctx context.Context
|
||||
genesisWaiter startup.GenesisWaiter
|
||||
genesis *startup.Genesis
|
||||
clock startup.Clock
|
||||
su *StatusUpdater
|
||||
db BackfillDB
|
||||
p2p p2p.P2P
|
||||
nWorkers int
|
||||
todo chan batch
|
||||
done chan batch
|
||||
errChan chan error
|
||||
workers map[workerId]*p2pWorker
|
||||
batcher *batcher
|
||||
batchSize uint64
|
||||
}
|
||||
|
||||
var _ runtime.Service = (*Service)(nil)
|
||||
|
||||
type ServiceOption func(*Service) error
|
||||
|
||||
func WithStatusUpdater(su *StatusUpdater) ServiceOption {
|
||||
return func(s *Service) error {
|
||||
s.su = su
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithGenesisWaiter(gw startup.GenesisWaiter) ServiceOption {
|
||||
return func(s *Service) error {
|
||||
s.genesisWaiter = gw
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithBackfillDB(db BackfillDB) ServiceOption {
|
||||
return func(s *Service) error {
|
||||
s.db = db
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithP2P(p p2p.P2P) ServiceOption {
|
||||
return func(s *Service) error {
|
||||
s.p2p = p
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithWorkerCount(n int) ServiceOption {
|
||||
return func(s *Service) error {
|
||||
s.nWorkers = n
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func NewService(ctx context.Context, opts ...ServiceOption) (*Service, error) {
|
||||
s := &Service{
|
||||
ctx: ctx,
|
||||
}
|
||||
for _, o := range opts {
|
||||
if err := o(s); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if s.nWorkers == 0 {
|
||||
s.nWorkers = defaultWorkerCount
|
||||
}
|
||||
if s.todo == nil {
|
||||
s.todo = make(chan batch)
|
||||
}
|
||||
if s.done == nil {
|
||||
s.done = make(chan batch)
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Service) Start() {
|
||||
genesis, err := s.genesisWaiter.WaitForGenesis(s.ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("backfill service failed to start while waiting for genesis data")
|
||||
}
|
||||
s.clock = genesis.Clock()
|
||||
if err := s.spawnBatcher(); err != nil {
|
||||
log.WithError(err).Fatal("error starting backfill service")
|
||||
}
|
||||
s.spawnWorkers()
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case err := <-s.errChan:
|
||||
if err := s.tryRecover(err); err != nil {
|
||||
log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) tryRecover(err error) error {
|
||||
log.WithError(err).Error("error from the batcher")
|
||||
// If error is not recoverable, reply with an error, which will shut down the service.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) Status() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) spawnWorkers() {
|
||||
for i := 0; i < s.nWorkers; i++ {
|
||||
id := workerId(i)
|
||||
s.workers[id] = newP2pWorker(id, s.p2p, s.todo, s.done)
|
||||
go s.workers[id].run(s.ctx)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) spawnBatcher() error {
|
||||
s.batcher = newBatcher(primitives.Slot(s.batchSize), s.su, s.todo, s.done)
|
||||
go s.batcher.run(s.ctx)
|
||||
return nil
|
||||
}
|
||||
@@ -2,121 +2,152 @@ package backfill
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v4/proto/dbval"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
// NewStatus correctly initializes a Status value with the required database value.
|
||||
func NewStatus(store BackfillDB) *Status {
|
||||
return &Status{
|
||||
// NewStatus correctly initializes a StatusUpdater value with the required database value.
|
||||
func NewStatus(store BackfillDB) *StatusUpdater {
|
||||
return &StatusUpdater{
|
||||
store: store,
|
||||
}
|
||||
}
|
||||
|
||||
// Status provides a way to update and query the status of a backfill process that may be necessary to track when
|
||||
// StatusUpdater provides a way to update and query the status of a backfill process that may be necessary to track when
|
||||
// a node was initialized via checkpoint sync. With checkpoint sync, there will be a gap in node history from genesis
|
||||
// until the checkpoint sync origin block. Status provides the means to update the value keeping track of the lower
|
||||
// end of the missing block range via the Advance() method, to check whether a Slot is missing from the database
|
||||
// until the checkpoint sync origin block. StatusUpdater provides the means to update the value keeping track of the lower
|
||||
// end of the missing block range via the FillFwd() method, to check whether a Slot is missing from the database
|
||||
// via the SlotCovered() method, and to see the current StartGap() and EndGap().
|
||||
type Status struct {
|
||||
start primitives.Slot
|
||||
end primitives.Slot
|
||||
type StatusUpdater struct {
|
||||
sync.RWMutex
|
||||
store BackfillDB
|
||||
genesisSync bool
|
||||
status *dbval.BackfillStatus
|
||||
}
|
||||
|
||||
// SlotCovered uses StartGap() and EndGap() to determine if the given slot is covered by the current chain history.
|
||||
// If the slot is <= StartGap(), or >= EndGap(), the result is true.
|
||||
// If the slot is between StartGap() and EndGap(), the result is false.
|
||||
func (s *Status) SlotCovered(sl primitives.Slot) bool {
|
||||
// SlotCovered determines if the given slot is covered by the current chain history.
|
||||
// If the slot is <= backfill low slot, or >= backfill high slot, the result is true.
|
||||
// If the slot is between the backfill low and high slots, the result is false.
|
||||
func (s *StatusUpdater) SlotCovered(sl primitives.Slot) bool {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
// short circuit if the node was synced from genesis
|
||||
if s.genesisSync {
|
||||
return true
|
||||
}
|
||||
if s.StartGap() < sl && sl < s.EndGap() {
|
||||
if s.status.LowSlot < uint64(sl) && uint64(sl) < s.status.HighSlot {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// StartGap returns the slot at the beginning of the range that needs to be backfilled.
|
||||
func (s *Status) StartGap() primitives.Slot {
|
||||
return s.start
|
||||
}
|
||||
var ErrFillFwdPastUpper = errors.New("cannot move backfill StatusUpdater above upper bound of backfill")
|
||||
var ErrFillBackPastLower = errors.New("cannot move backfill StatusUpdater below lower bound of backfill")
|
||||
|
||||
// EndGap returns the slot at the end of the range that needs to be backfilled.
|
||||
func (s *Status) EndGap() primitives.Slot {
|
||||
return s.end
|
||||
}
|
||||
|
||||
var ErrAdvancePastOrigin = errors.New("cannot advance backfill Status beyond the origin checkpoint slot")
|
||||
|
||||
// Advance advances the backfill position to the given slot & root.
|
||||
// It updates the backfill block root entry in the database,
|
||||
// and also updates the Status value's copy of the backfill position slot.
|
||||
func (s *Status) Advance(ctx context.Context, upTo primitives.Slot, root [32]byte) error {
|
||||
if upTo > s.end {
|
||||
return errors.Wrapf(ErrAdvancePastOrigin, "advance slot=%d, origin slot=%d", upTo, s.end)
|
||||
// FillFwd moves the lower bound of the backfill status to the given slot & root,
|
||||
// saving the new state to the database and then updating StatusUpdater's in-memory copy with the saved value.
|
||||
func (s *StatusUpdater) FillFwd(ctx context.Context, newLow primitives.Slot, root [32]byte) error {
|
||||
status := s.Status()
|
||||
unl := uint64(newLow)
|
||||
if unl > status.HighSlot {
|
||||
return errors.Wrapf(ErrFillFwdPastUpper, "advance slot=%d, origin slot=%d", unl, status.HighSlot)
|
||||
}
|
||||
s.start = upTo
|
||||
return s.store.SaveBackfillBlockRoot(ctx, root)
|
||||
status.LowSlot = unl
|
||||
status.LowRoot = root[:]
|
||||
return s.updateStatus(ctx, status)
|
||||
}
|
||||
|
||||
// Reload queries the database for backfill status, initializing the internal data and validating the database state.
|
||||
func (s *Status) Reload(ctx context.Context) error {
|
||||
cpRoot, err := s.store.OriginCheckpointBlockRoot(ctx)
|
||||
// FillBack moves the upper bound of the backfill status to the given slot & root,
|
||||
// saving the new state to the database and then updating StatusUpdater's in-memory copy with the saved value.
|
||||
func (s *StatusUpdater) FillBack(ctx context.Context, newHigh primitives.Slot, root [32]byte) error {
|
||||
status := s.Status()
|
||||
unh := uint64(newHigh)
|
||||
if unh < status.LowSlot {
|
||||
return errors.Wrapf(ErrFillBackPastLower, "advance slot=%d, origin slot=%d", unh, status.LowSlot)
|
||||
}
|
||||
status.HighSlot = unh
|
||||
status.HighRoot = root[:]
|
||||
return s.updateStatus(ctx, status)
|
||||
}
|
||||
|
||||
// recover will check to see if the db is from a legacy checkpoint sync and either build a new BackfillStatus
|
||||
// or label the node as synced from genesis.
|
||||
func (s *StatusUpdater) recoverLegacy(ctx context.Context) error {
|
||||
cpr, err := s.store.OriginCheckpointBlockRoot(ctx)
|
||||
if errors.Is(err, db.ErrNotFoundOriginBlockRoot) {
|
||||
s.genesisSync = true
|
||||
return nil
|
||||
}
|
||||
|
||||
cpb, err := s.store.Block(ctx, cpr)
|
||||
if err != nil {
|
||||
// mark genesis sync and short circuit further lookups
|
||||
if errors.Is(err, db.ErrNotFoundOriginBlockRoot) {
|
||||
s.genesisSync = true
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
return errors.Wrapf(err, "error retrieving block for origin checkpoint root=%#x", cpr)
|
||||
}
|
||||
cpBlock, err := s.store.Block(ctx, cpRoot)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error retrieving block for origin checkpoint root=%#x", cpRoot)
|
||||
if err := blocks.BeaconBlockIsNil(cpb); err != nil {
|
||||
return errors.Wrapf(err, "nil block found for origin checkpoint root=%#x", cpr)
|
||||
}
|
||||
if err := blocks.BeaconBlockIsNil(cpBlock); err != nil {
|
||||
return err
|
||||
}
|
||||
s.end = cpBlock.Block().Slot()
|
||||
|
||||
_, err = s.store.GenesisBlockRoot(ctx)
|
||||
gbr, err := s.store.GenesisBlockRoot(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, db.ErrNotFoundGenesisBlockRoot) {
|
||||
return errors.Wrap(err, "genesis block root required for checkpoint sync")
|
||||
}
|
||||
return err
|
||||
}
|
||||
os := uint64(cpb.Block().Slot())
|
||||
bs := &dbval.BackfillStatus{
|
||||
HighSlot: os,
|
||||
HighRoot: cpr[:],
|
||||
LowSlot: 0,
|
||||
LowRoot: gbr[:],
|
||||
OriginSlot: os,
|
||||
OriginRoot: cpr[:],
|
||||
}
|
||||
return s.updateStatus(ctx, bs)
|
||||
}
|
||||
|
||||
bfRoot, err := s.store.BackfillBlockRoot(ctx)
|
||||
// Reload queries the database for backfill status, initializing the internal data and validating the database state.
|
||||
func (s *StatusUpdater) Reload(ctx context.Context) error {
|
||||
status, err := s.store.BackfillStatus(ctx)
|
||||
if err != nil {
|
||||
if errors.Is(err, db.ErrNotFoundBackfillBlockRoot) {
|
||||
return errors.Wrap(err, "found origin checkpoint block root, but no backfill block root")
|
||||
if errors.Is(err, db.ErrNotFound) {
|
||||
return s.recoverLegacy(ctx)
|
||||
}
|
||||
}
|
||||
return s.updateStatus(ctx, status)
|
||||
}
|
||||
|
||||
func (s *StatusUpdater) updateStatus(ctx context.Context, bs *dbval.BackfillStatus) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
if proto.Equal(s.status, bs) {
|
||||
return nil
|
||||
}
|
||||
if err := s.store.SaveBackfillStatus(ctx, bs); err != nil {
|
||||
return err
|
||||
}
|
||||
bfBlock, err := s.store.Block(ctx, bfRoot)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "error retrieving block for backfill root=%#x", bfRoot)
|
||||
}
|
||||
if err := blocks.BeaconBlockIsNil(bfBlock); err != nil {
|
||||
return err
|
||||
}
|
||||
s.start = bfBlock.Block().Slot()
|
||||
|
||||
s.status = bs
|
||||
return nil
|
||||
}
|
||||
|
||||
// BackfillDB describes the set of DB methods that the Status type needs to function.
|
||||
type BackfillDB interface {
|
||||
SaveBackfillBlockRoot(ctx context.Context, blockRoot [32]byte) error
|
||||
GenesisBlockRoot(ctx context.Context) ([32]byte, error)
|
||||
OriginCheckpointBlockRoot(ctx context.Context) ([32]byte, error)
|
||||
BackfillBlockRoot(ctx context.Context) ([32]byte, error)
|
||||
Block(ctx context.Context, blockRoot [32]byte) (interfaces.ReadOnlySignedBeaconBlock, error)
|
||||
func (s *StatusUpdater) Status() *dbval.BackfillStatus {
|
||||
s.RLock()
|
||||
defer s.RUnlock()
|
||||
return proto.Clone(s.status).(*dbval.BackfillStatus)
|
||||
}
|
||||
|
||||
// BackfillDB describes the set of DB methods that the StatusUpdater type needs to function.
|
||||
type BackfillDB interface {
|
||||
SaveBackfillStatus(context.Context, *dbval.BackfillStatus) error
|
||||
BackfillStatus(context.Context) (*dbval.BackfillStatus, error)
|
||||
OriginCheckpointBlockRoot(context.Context) ([32]byte, error)
|
||||
Block(context.Context, [32]byte) (interfaces.ReadOnlySignedBeaconBlock, error)
|
||||
GenesisBlockRoot(context.Context) ([32]byte, error)
|
||||
}
|
||||
|
||||
@@ -4,9 +4,11 @@ import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
blocktest "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||
"github.com/prysmaticlabs/prysm/v4/proto/dbval"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
@@ -21,17 +23,28 @@ type mockBackfillDB struct {
|
||||
saveBackfillBlockRoot func(ctx context.Context, blockRoot [32]byte) error
|
||||
genesisBlockRoot func(ctx context.Context) ([32]byte, error)
|
||||
originCheckpointBlockRoot func(ctx context.Context) ([32]byte, error)
|
||||
backfillBlockRoot func(ctx context.Context) ([32]byte, error)
|
||||
block func(ctx context.Context, blockRoot [32]byte) (interfaces.ReadOnlySignedBeaconBlock, error)
|
||||
saveBackfillStatus func(ctx context.Context, status *dbval.BackfillStatus) error
|
||||
backfillStatus func(context.Context) (*dbval.BackfillStatus, error)
|
||||
status *dbval.BackfillStatus
|
||||
err error
|
||||
}
|
||||
|
||||
var _ BackfillDB = &mockBackfillDB{}
|
||||
|
||||
func (db *mockBackfillDB) SaveBackfillBlockRoot(ctx context.Context, blockRoot [32]byte) error {
|
||||
if db.saveBackfillBlockRoot != nil {
|
||||
return db.saveBackfillBlockRoot(ctx, blockRoot)
|
||||
func (db *mockBackfillDB) SaveBackfillStatus(ctx context.Context, status *dbval.BackfillStatus) error {
|
||||
if db.saveBackfillStatus != nil {
|
||||
return db.saveBackfillStatus(ctx, status)
|
||||
}
|
||||
return errEmptyMockDBMethod
|
||||
db.status = status
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *mockBackfillDB) BackfillStatus(ctx context.Context) (*dbval.BackfillStatus, error) {
|
||||
if db.backfillStatus != nil {
|
||||
return db.backfillStatus(ctx)
|
||||
}
|
||||
return db.status, nil
|
||||
}
|
||||
|
||||
func (db *mockBackfillDB) GenesisBlockRoot(ctx context.Context) ([32]byte, error) {
|
||||
@@ -48,13 +61,6 @@ func (db *mockBackfillDB) OriginCheckpointBlockRoot(ctx context.Context) ([32]by
|
||||
return [32]byte{}, errEmptyMockDBMethod
|
||||
}
|
||||
|
||||
func (db *mockBackfillDB) BackfillBlockRoot(ctx context.Context) ([32]byte, error) {
|
||||
if db.backfillBlockRoot != nil {
|
||||
return db.backfillBlockRoot(ctx)
|
||||
}
|
||||
return [32]byte{}, errEmptyMockDBMethod
|
||||
}
|
||||
|
||||
func (db *mockBackfillDB) Block(ctx context.Context, blockRoot [32]byte) (interfaces.ReadOnlySignedBeaconBlock, error) {
|
||||
if db.block != nil {
|
||||
return db.block(ctx, blockRoot)
|
||||
@@ -66,42 +72,42 @@ func TestSlotCovered(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
slot primitives.Slot
|
||||
status *Status
|
||||
status *StatusUpdater
|
||||
result bool
|
||||
}{
|
||||
{
|
||||
name: "below start true",
|
||||
status: &Status{start: 1},
|
||||
status: &StatusUpdater{status: &dbval.BackfillStatus{LowSlot: 1}},
|
||||
slot: 0,
|
||||
result: true,
|
||||
},
|
||||
{
|
||||
name: "above end true",
|
||||
status: &Status{end: 1},
|
||||
status: &StatusUpdater{status: &dbval.BackfillStatus{HighSlot: 1}},
|
||||
slot: 2,
|
||||
result: true,
|
||||
},
|
||||
{
|
||||
name: "equal end true",
|
||||
status: &Status{end: 1},
|
||||
status: &StatusUpdater{status: &dbval.BackfillStatus{HighSlot: 1}},
|
||||
slot: 1,
|
||||
result: true,
|
||||
},
|
||||
{
|
||||
name: "equal start true",
|
||||
status: &Status{start: 2},
|
||||
status: &StatusUpdater{status: &dbval.BackfillStatus{LowSlot: 2}},
|
||||
slot: 2,
|
||||
result: true,
|
||||
},
|
||||
{
|
||||
name: "between false",
|
||||
status: &Status{start: 1, end: 3},
|
||||
status: &StatusUpdater{status: &dbval.BackfillStatus{LowSlot: 1, HighSlot: 3}},
|
||||
slot: 2,
|
||||
result: false,
|
||||
},
|
||||
{
|
||||
name: "genesisSync always true",
|
||||
status: &Status{genesisSync: true},
|
||||
status: &StatusUpdater{genesisSync: true},
|
||||
slot: 100,
|
||||
result: true,
|
||||
},
|
||||
@@ -121,17 +127,17 @@ func TestAdvance(t *testing.T) {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
s := &Status{end: 100, store: mdb}
|
||||
s := &StatusUpdater{status: &dbval.BackfillStatus{HighSlot: 100}, store: mdb}
|
||||
var root [32]byte
|
||||
copy(root[:], []byte{0x23, 0x23})
|
||||
require.NoError(t, s.Advance(ctx, 90, root))
|
||||
require.NoError(t, s.FillFwd(ctx, 90, root))
|
||||
require.Equal(t, root, saveBackfillBuf[0])
|
||||
not := s.SlotCovered(95)
|
||||
require.Equal(t, false, not)
|
||||
|
||||
// this should still be len 1 after failing to advance
|
||||
require.Equal(t, 1, len(saveBackfillBuf))
|
||||
require.ErrorIs(t, s.Advance(ctx, s.end+1, root), ErrAdvancePastOrigin)
|
||||
require.ErrorIs(t, s.FillFwd(ctx, primitives.Slot(s.status.HighSlot)+1, root), ErrFillFwdPastUpper)
|
||||
// this has an element in it from the previous test, there shouldn't be an additional one
|
||||
require.Equal(t, 1, len(saveBackfillBuf))
|
||||
}
|
||||
@@ -171,7 +177,7 @@ func TestReload(t *testing.T) {
|
||||
name string
|
||||
db BackfillDB
|
||||
err error
|
||||
expected *Status
|
||||
expected *StatusUpdater
|
||||
}{
|
||||
/*{
|
||||
name: "origin not found, implying genesis sync ",
|
||||
@@ -180,7 +186,7 @@ func TestReload(t *testing.T) {
|
||||
originCheckpointBlockRoot: func(ctx context.Context) ([32]byte, error) {
|
||||
return [32]byte{}, db.ErrNotFoundOriginBlockRoot
|
||||
}},
|
||||
expected: &Status{genesisSync: true},
|
||||
expected: &StatusUpdater{genesisSync: true},
|
||||
},
|
||||
{
|
||||
name: "genesis not found error",
|
||||
@@ -318,7 +324,7 @@ func TestReload(t *testing.T) {
|
||||
err: derp,
|
||||
},*/
|
||||
{
|
||||
name: "complete happy path",
|
||||
name: "legacy recovery",
|
||||
db: &mockBackfillDB{
|
||||
genesisBlockRoot: goodBlockRoot(params.BeaconConfig().ZeroHash),
|
||||
originCheckpointBlockRoot: goodBlockRoot(originRoot),
|
||||
@@ -331,15 +337,15 @@ func TestReload(t *testing.T) {
|
||||
}
|
||||
return nil, errors.New("not derp")
|
||||
},
|
||||
backfillBlockRoot: goodBlockRoot(backfillRoot),
|
||||
backfillStatus: func(context.Context) (*dbval.BackfillStatus, error) { return nil, db.ErrNotFound },
|
||||
},
|
||||
err: derp,
|
||||
expected: &Status{genesisSync: false, start: backfillSlot, end: originSlot},
|
||||
expected: &StatusUpdater{genesisSync: false, status: &dbval.BackfillStatus{LowSlot: 0, HighSlot: uint64(originSlot)}},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
s := &Status{
|
||||
s := &StatusUpdater{
|
||||
store: c.db,
|
||||
}
|
||||
err := s.Reload(ctx)
|
||||
@@ -352,7 +358,7 @@ func TestReload(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
require.Equal(t, c.expected.genesisSync, s.genesisSync)
|
||||
require.Equal(t, c.expected.start, s.start)
|
||||
require.Equal(t, c.expected.end, s.end)
|
||||
require.Equal(t, c.expected.status.LowSlot, s.status.LowSlot)
|
||||
require.Equal(t, c.expected.status.HighSlot, s.status.HighSlot)
|
||||
}
|
||||
}
|
||||
|
||||
44
beacon-chain/sync/backfill/worker.go
Normal file
44
beacon-chain/sync/backfill/worker.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package backfill
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type workerId int
|
||||
|
||||
type p2pWorker struct {
|
||||
id workerId
|
||||
p2p p2p.P2P
|
||||
todo chan batch
|
||||
done chan batch
|
||||
}
|
||||
|
||||
func (w *p2pWorker) run(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case b := <-w.todo:
|
||||
log.WithFields(b.logFields()).Debug("Backfill worker received batch.")
|
||||
w.done <- w.handle(b)
|
||||
case <-ctx.Done():
|
||||
log.WithField("worker_id", w.id).Info("Backfill worker exiting after context canceled.")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (w *p2pWorker) handle(b batch) batch {
|
||||
// if the batch is not successfully fetched and validated, increment the attempts counter
|
||||
return b
|
||||
}
|
||||
|
||||
func newP2pWorker(id workerId, p p2p.P2P, todo, done chan batch) *p2pWorker {
|
||||
return &p2pWorker{
|
||||
id: id,
|
||||
p2p: p,
|
||||
todo: todo,
|
||||
done: done,
|
||||
}
|
||||
}
|
||||
@@ -7,6 +7,7 @@ go_library(
|
||||
"factory.go",
|
||||
"getters.go",
|
||||
"proto.go",
|
||||
"roblock.go",
|
||||
"setters.go",
|
||||
"types.go",
|
||||
],
|
||||
|
||||
63
consensus-types/blocks/roblock.go
Normal file
63
consensus-types/blocks/roblock.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package blocks
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||
)
|
||||
|
||||
var ErrRootLength error = errors.New("incorrect length for hash_tree_root")
|
||||
|
||||
type ROBlock struct {
|
||||
interfaces.ReadOnlySignedBeaconBlock
|
||||
root [32]byte
|
||||
}
|
||||
|
||||
func (b ROBlock) Root() [32]byte {
|
||||
return b.root
|
||||
}
|
||||
|
||||
func NewROBlock(b interfaces.ReadOnlySignedBeaconBlock, root [32]byte) ROBlock {
|
||||
return ROBlock{ReadOnlySignedBeaconBlock: b, root: root}
|
||||
}
|
||||
|
||||
// ROBlockSlice implements sort.Interface so that slices of ROBlocks can be easily sorted
|
||||
type ROBlockSlice []ROBlock
|
||||
|
||||
var _ sort.Interface = ROBlockSlice{}
|
||||
|
||||
// Less reports whether the element with index i must sort before the element with index j.
|
||||
func (s ROBlockSlice) Less(i, j int) bool {
|
||||
si, sj := s[i].Block().Slot(), s[j].Block().Slot()
|
||||
|
||||
// lower slot wins
|
||||
if si != sj {
|
||||
return s[i].Block().Slot() < s[j].Block().Slot()
|
||||
}
|
||||
|
||||
// break slot tie lexicographically comparing roots byte for byte
|
||||
ri, rj := s[i].Root(), s[j].Root()
|
||||
k := 0
|
||||
for ; k < fieldparams.RootLength; k++ {
|
||||
// advance the byte offset until you hit the end
|
||||
if ri[k] == rj[k] {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if k == fieldparams.RootLength {
|
||||
return false
|
||||
}
|
||||
return ri[k] < rj[k]
|
||||
}
|
||||
|
||||
// Swap swaps the elements with indexes i and j.
|
||||
func (s ROBlockSlice) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
||||
// Len is the number of elements in the collection.
|
||||
func (s ROBlockSlice) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
@@ -18,8 +18,8 @@ searchstring="prysmaticlabs/prysm/v4/"
|
||||
for ((i = 0; i < arraylength; i++)); do
|
||||
color "34" "$destination"
|
||||
destination=${file_list[i]#*$searchstring}
|
||||
chmod 755 "$destination"
|
||||
cp -R -L "${file_list[i]}" "$destination"
|
||||
chmod 755 "$destination"
|
||||
done
|
||||
|
||||
# Run goimports on newly generated protos
|
||||
|
||||
23
proto/dbval/BUILD.bazel
Normal file
23
proto/dbval/BUILD.bazel
Normal file
@@ -0,0 +1,23 @@
|
||||
load("@prysm//tools/go:def.bzl", "go_library")
|
||||
load("@rules_proto//proto:defs.bzl", "proto_library")
|
||||
load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
|
||||
|
||||
proto_library(
|
||||
name = "dbval_proto",
|
||||
srcs = ["dbval.proto"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
||||
go_proto_library(
|
||||
name = "dbval_go_proto",
|
||||
importpath = "github.com/prysmaticlabs/prysm/v4/proto/dbval",
|
||||
proto = ":dbval_proto",
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
embed = [":dbval_go_proto"],
|
||||
importpath = "github.com/prysmaticlabs/prysm/v4/proto/dbval",
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
||||
197
proto/dbval/dbval.pb.go
generated
Executable file
197
proto/dbval/dbval.pb.go
generated
Executable file
@@ -0,0 +1,197 @@
|
||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.28.1
|
||||
// protoc v3.15.8
|
||||
// source: proto/dbval/dbval.proto
|
||||
|
||||
package dbval
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
)
|
||||
|
||||
const (
|
||||
// Verify that this generated code is sufficiently up-to-date.
|
||||
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
|
||||
// Verify that runtime/protoimpl is sufficiently up-to-date.
|
||||
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
|
||||
)
|
||||
|
||||
type BackfillStatus struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
HighSlot uint64 `protobuf:"varint,1,opt,name=high_slot,json=highSlot,proto3" json:"high_slot,omitempty"`
|
||||
HighRoot []byte `protobuf:"bytes,2,opt,name=high_root,json=highRoot,proto3" json:"high_root,omitempty"`
|
||||
LowSlot uint64 `protobuf:"varint,3,opt,name=low_slot,json=lowSlot,proto3" json:"low_slot,omitempty"`
|
||||
LowRoot []byte `protobuf:"bytes,4,opt,name=low_root,json=lowRoot,proto3" json:"low_root,omitempty"`
|
||||
OriginSlot uint64 `protobuf:"varint,5,opt,name=origin_slot,json=originSlot,proto3" json:"origin_slot,omitempty"`
|
||||
OriginRoot []byte `protobuf:"bytes,6,opt,name=origin_root,json=originRoot,proto3" json:"origin_root,omitempty"`
|
||||
}
|
||||
|
||||
func (x *BackfillStatus) Reset() {
|
||||
*x = BackfillStatus{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_proto_dbval_dbval_proto_msgTypes[0]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
}
|
||||
|
||||
func (x *BackfillStatus) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*BackfillStatus) ProtoMessage() {}
|
||||
|
||||
func (x *BackfillStatus) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_proto_dbval_dbval_proto_msgTypes[0]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use BackfillStatus.ProtoReflect.Descriptor instead.
|
||||
func (*BackfillStatus) Descriptor() ([]byte, []int) {
|
||||
return file_proto_dbval_dbval_proto_rawDescGZIP(), []int{0}
|
||||
}
|
||||
|
||||
func (x *BackfillStatus) GetHighSlot() uint64 {
|
||||
if x != nil {
|
||||
return x.HighSlot
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *BackfillStatus) GetHighRoot() []byte {
|
||||
if x != nil {
|
||||
return x.HighRoot
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *BackfillStatus) GetLowSlot() uint64 {
|
||||
if x != nil {
|
||||
return x.LowSlot
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *BackfillStatus) GetLowRoot() []byte {
|
||||
if x != nil {
|
||||
return x.LowRoot
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *BackfillStatus) GetOriginSlot() uint64 {
|
||||
if x != nil {
|
||||
return x.OriginSlot
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *BackfillStatus) GetOriginRoot() []byte {
|
||||
if x != nil {
|
||||
return x.OriginRoot
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
var File_proto_dbval_dbval_proto protoreflect.FileDescriptor
|
||||
|
||||
var file_proto_dbval_dbval_proto_rawDesc = []byte{
|
||||
0x0a, 0x17, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x64, 0x62, 0x76, 0x61, 0x6c, 0x2f, 0x64, 0x62,
|
||||
0x76, 0x61, 0x6c, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x12, 0x65, 0x74, 0x68, 0x65, 0x72,
|
||||
0x65, 0x75, 0x6d, 0x2e, 0x65, 0x74, 0x68, 0x2e, 0x64, 0x62, 0x76, 0x61, 0x6c, 0x22, 0xc2, 0x01,
|
||||
0x0a, 0x0e, 0x42, 0x61, 0x63, 0x6b, 0x66, 0x69, 0x6c, 0x6c, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73,
|
||||
0x12, 0x1b, 0x0a, 0x09, 0x68, 0x69, 0x67, 0x68, 0x5f, 0x73, 0x6c, 0x6f, 0x74, 0x18, 0x01, 0x20,
|
||||
0x01, 0x28, 0x04, 0x52, 0x08, 0x68, 0x69, 0x67, 0x68, 0x53, 0x6c, 0x6f, 0x74, 0x12, 0x1b, 0x0a,
|
||||
0x09, 0x68, 0x69, 0x67, 0x68, 0x5f, 0x72, 0x6f, 0x6f, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c,
|
||||
0x52, 0x08, 0x68, 0x69, 0x67, 0x68, 0x52, 0x6f, 0x6f, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x6c, 0x6f,
|
||||
0x77, 0x5f, 0x73, 0x6c, 0x6f, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x6c, 0x6f,
|
||||
0x77, 0x53, 0x6c, 0x6f, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x6c, 0x6f, 0x77, 0x5f, 0x72, 0x6f, 0x6f,
|
||||
0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x6c, 0x6f, 0x77, 0x52, 0x6f, 0x6f, 0x74,
|
||||
0x12, 0x1f, 0x0a, 0x0b, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x5f, 0x73, 0x6c, 0x6f, 0x74, 0x18,
|
||||
0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x53, 0x6c, 0x6f,
|
||||
0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x5f, 0x72, 0x6f, 0x6f, 0x74,
|
||||
0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0a, 0x6f, 0x72, 0x69, 0x67, 0x69, 0x6e, 0x52, 0x6f,
|
||||
0x6f, 0x74, 0x42, 0x35, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d,
|
||||
0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x61, 0x74, 0x69, 0x63, 0x6c, 0x61, 0x62, 0x73, 0x2f, 0x70,
|
||||
0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x34, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x64, 0x62,
|
||||
0x76, 0x61, 0x6c, 0x3b, 0x64, 0x62, 0x76, 0x61, 0x6c, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
file_proto_dbval_dbval_proto_rawDescOnce sync.Once
|
||||
file_proto_dbval_dbval_proto_rawDescData = file_proto_dbval_dbval_proto_rawDesc
|
||||
)
|
||||
|
||||
func file_proto_dbval_dbval_proto_rawDescGZIP() []byte {
|
||||
file_proto_dbval_dbval_proto_rawDescOnce.Do(func() {
|
||||
file_proto_dbval_dbval_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_dbval_dbval_proto_rawDescData)
|
||||
})
|
||||
return file_proto_dbval_dbval_proto_rawDescData
|
||||
}
|
||||
|
||||
var file_proto_dbval_dbval_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
|
||||
var file_proto_dbval_dbval_proto_goTypes = []interface{}{
|
||||
(*BackfillStatus)(nil), // 0: ethereum.eth.dbval.BackfillStatus
|
||||
}
|
||||
var file_proto_dbval_dbval_proto_depIdxs = []int32{
|
||||
0, // [0:0] is the sub-list for method output_type
|
||||
0, // [0:0] is the sub-list for method input_type
|
||||
0, // [0:0] is the sub-list for extension type_name
|
||||
0, // [0:0] is the sub-list for extension extendee
|
||||
0, // [0:0] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_proto_dbval_dbval_proto_init() }
|
||||
func file_proto_dbval_dbval_proto_init() {
|
||||
if File_proto_dbval_dbval_proto != nil {
|
||||
return
|
||||
}
|
||||
if !protoimpl.UnsafeEnabled {
|
||||
file_proto_dbval_dbval_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*BackfillStatus); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
type x struct{}
|
||||
out := protoimpl.TypeBuilder{
|
||||
File: protoimpl.DescBuilder{
|
||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||
RawDescriptor: file_proto_dbval_dbval_proto_rawDesc,
|
||||
NumEnums: 0,
|
||||
NumMessages: 1,
|
||||
NumExtensions: 0,
|
||||
NumServices: 0,
|
||||
},
|
||||
GoTypes: file_proto_dbval_dbval_proto_goTypes,
|
||||
DependencyIndexes: file_proto_dbval_dbval_proto_depIdxs,
|
||||
MessageInfos: file_proto_dbval_dbval_proto_msgTypes,
|
||||
}.Build()
|
||||
File_proto_dbval_dbval_proto = out.File
|
||||
file_proto_dbval_dbval_proto_rawDesc = nil
|
||||
file_proto_dbval_dbval_proto_goTypes = nil
|
||||
file_proto_dbval_dbval_proto_depIdxs = nil
|
||||
}
|
||||
14
proto/dbval/dbval.proto
Normal file
14
proto/dbval/dbval.proto
Normal file
@@ -0,0 +1,14 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package ethereum.eth.dbval;
|
||||
|
||||
option go_package = "github.com/prysmaticlabs/prysm/v4/proto/dbval;dbval";
|
||||
|
||||
message BackfillStatus {
|
||||
uint64 high_slot = 1;
|
||||
bytes high_root = 2;
|
||||
uint64 low_slot = 3;
|
||||
bytes low_root = 4;
|
||||
uint64 origin_slot = 5;
|
||||
bytes origin_root = 6;
|
||||
}
|
||||
Reference in New Issue
Block a user