mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-05-02 03:02:54 -04:00
Archived point retrieval and recovery (#5075)
This commit is contained in:
@@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"cold.go",
|
||||
"epoch_boundary_root.go",
|
||||
"errors.go",
|
||||
"hot.go",
|
||||
@@ -33,6 +34,7 @@ go_library(
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"cold_test.go",
|
||||
"epoch_boundary_root_test.go",
|
||||
"hot_test.go",
|
||||
"replay_test.go",
|
||||
|
||||
53
beacon-chain/state/stategen/cold.go
Normal file
53
beacon-chain/state/stategen/cold.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package stategen
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
"go.opencensus.io/trace"
|
||||
)
|
||||
|
||||
// Given the archive index, this returns the archived cold state in the DB.
|
||||
// If the archived state does not exist in the state, it'll compute it and save it.
|
||||
func (s *State) archivedPointByIndex(ctx context.Context, archiveIndex uint64) (*state.BeaconState, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "stateGen.loadArchivedPointByIndex")
|
||||
defer span.End()
|
||||
if s.beaconDB.HasArchivedPoint(ctx, archiveIndex) {
|
||||
return s.beaconDB.ArchivedPointState(ctx, archiveIndex)
|
||||
}
|
||||
|
||||
// If for certain reasons, archived point does not exist in DB,
|
||||
// a node should regenerate it and save it.
|
||||
return s.recoverArchivedPointByIndex(ctx, archiveIndex)
|
||||
}
|
||||
|
||||
// This recovers an archived point by index. For certain reasons (ex. user toggles feature flag),
|
||||
// an archived point may not be present in the DB. This regenerates the archived point state via
|
||||
// playback and saves the archived root/state to the DB.
|
||||
func (s *State) recoverArchivedPointByIndex(ctx context.Context, archiveIndex uint64) (*state.BeaconState, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "stateGen.recoverArchivedPointByIndex")
|
||||
defer span.End()
|
||||
|
||||
archivedSlot := archiveIndex * s.slotsPerArchivedPoint
|
||||
archivedState, err := s.ComputeStateUpToSlot(ctx, archivedSlot)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not compute state up to archived index slot")
|
||||
}
|
||||
if archivedState == nil {
|
||||
return nil, errUnknownArchivedState
|
||||
}
|
||||
lastRoot, _, err := s.lastSavedBlock(ctx, archivedSlot)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "could not get last valid block up to archived index slot")
|
||||
}
|
||||
|
||||
if err := s.beaconDB.SaveArchivedPointRoot(ctx, lastRoot, archiveIndex); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := s.beaconDB.SaveArchivedPointState(ctx, archivedState, archiveIndex); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return archivedState, nil
|
||||
}
|
||||
112
beacon-chain/state/stategen/cold_test.go
Normal file
112
beacon-chain/state/stategen/cold_test.go
Normal file
@@ -0,0 +1,112 @@
|
||||
package stategen
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
||||
"github.com/prysmaticlabs/prysm/shared/testutil"
|
||||
)
|
||||
|
||||
func TestArchivedPointByIndex_HasPoint(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
db := testDB.SetupDB(t)
|
||||
defer testDB.TeardownDB(t, db)
|
||||
|
||||
service := New(db)
|
||||
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
|
||||
index := uint64(999)
|
||||
if err := service.beaconDB.SaveArchivedPointState(ctx, beaconState, index); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := service.beaconDB.SaveArchivedPointRoot(ctx, [32]byte{'A'}, index); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
savedArchivedState, err := service.archivedPointByIndex(ctx, index)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !proto.Equal(beaconState.InnerStateUnsafe(), savedArchivedState.InnerStateUnsafe()) {
|
||||
t.Error("Diff saved state")
|
||||
}
|
||||
}
|
||||
|
||||
func TestArchivedPointByIndex_DoesntHavePoint(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
db := testDB.SetupDB(t)
|
||||
defer testDB.TeardownDB(t, db)
|
||||
|
||||
service := New(db)
|
||||
|
||||
gBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}}
|
||||
gRoot, err := ssz.HashTreeRoot(gBlk.Block)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := service.beaconDB.SaveBlock(ctx, gBlk); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
|
||||
if err := service.beaconDB.SaveState(ctx, beaconState, gRoot); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
service.slotsPerArchivedPoint = 32
|
||||
recoveredState, err := service.archivedPointByIndex(ctx, 2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if recoveredState.Slot() != service.slotsPerArchivedPoint*2 {
|
||||
t.Error("Diff state slot")
|
||||
}
|
||||
savedArchivedState, err := service.beaconDB.ArchivedPointState(ctx, 2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !proto.Equal(recoveredState.InnerStateUnsafe(), savedArchivedState.InnerStateUnsafe()) {
|
||||
t.Error("Diff saved archived state")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRecoverArchivedPointByIndex_CanRecover(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
db := testDB.SetupDB(t)
|
||||
defer testDB.TeardownDB(t, db)
|
||||
|
||||
service := New(db)
|
||||
|
||||
gBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}}
|
||||
gRoot, err := ssz.HashTreeRoot(gBlk.Block)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := service.beaconDB.SaveBlock(ctx, gBlk); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
|
||||
if err := service.beaconDB.SaveState(ctx, beaconState, gRoot); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
service.slotsPerArchivedPoint = 32
|
||||
recoveredState, err := service.recoverArchivedPointByIndex(ctx, 1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if recoveredState.Slot() != service.slotsPerArchivedPoint {
|
||||
t.Error("Diff state slot")
|
||||
}
|
||||
savedArchivedState, err := service.beaconDB.ArchivedPointState(ctx, 1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !proto.Equal(recoveredState.InnerStateUnsafe(), savedArchivedState.InnerStateUnsafe()) {
|
||||
t.Error("Diff saved state")
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
type State struct {
|
||||
beaconDB db.NoHeadAccessDatabase
|
||||
lastArchivedSlot uint64
|
||||
slotsPerArchivedPoint uint64
|
||||
epochBoundarySlotToRoot map[uint64][32]byte
|
||||
epochBoundaryLock sync.RWMutex
|
||||
hotStateCache *cache.HotStateCache
|
||||
|
||||
@@ -44,8 +44,8 @@ func TestService_ReceiveAttestations(t *testing.T) {
|
||||
client := mock.NewMockBeaconChainClient(ctrl)
|
||||
|
||||
bs := Service{
|
||||
beaconClient: client,
|
||||
blockFeed: new(event.Feed),
|
||||
beaconClient: client,
|
||||
blockFeed: new(event.Feed),
|
||||
receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1),
|
||||
collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1),
|
||||
}
|
||||
@@ -70,17 +70,16 @@ func TestService_ReceiveAttestations(t *testing.T) {
|
||||
bs.receiveAttestations(ctx)
|
||||
}
|
||||
|
||||
|
||||
func TestService_ReceiveAttestations_Batched(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
client := mock.NewMockBeaconChainClient(ctrl)
|
||||
|
||||
bs := Service{
|
||||
beaconClient: client,
|
||||
blockFeed: new(event.Feed),
|
||||
slasherDB: testDB.SetupSlasherDB(t, false),
|
||||
attestationFeed: new(event.Feed),
|
||||
beaconClient: client,
|
||||
blockFeed: new(event.Feed),
|
||||
slasherDB: testDB.SetupSlasherDB(t, false),
|
||||
attestationFeed: new(event.Feed),
|
||||
receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1),
|
||||
collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1),
|
||||
}
|
||||
@@ -104,7 +103,7 @@ func TestService_ReceiveAttestations_Batched(t *testing.T) {
|
||||
att,
|
||||
nil,
|
||||
).Do(func() {
|
||||
time.Sleep(2*time.Second)
|
||||
time.Sleep(2 * time.Second)
|
||||
cancel()
|
||||
})
|
||||
|
||||
@@ -114,7 +113,7 @@ func TestService_ReceiveAttestations_Batched(t *testing.T) {
|
||||
bs.receivedAttestationsBuffer <- att
|
||||
att.Data.Target.Epoch = 8
|
||||
bs.receivedAttestationsBuffer <- att
|
||||
atts := <- bs.collectedAttestationsBuffer
|
||||
atts := <-bs.collectedAttestationsBuffer
|
||||
if len(atts) != 3 {
|
||||
t.Fatalf("Expected %d received attestations to be batched", len(atts))
|
||||
}
|
||||
|
||||
@@ -39,21 +39,21 @@ type ChainFetcher interface {
|
||||
|
||||
// Service struct for the beaconclient service of the slasher.
|
||||
type Service struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
cert string
|
||||
conn *grpc.ClientConn
|
||||
provider string
|
||||
beaconClient ethpb.BeaconChainClient
|
||||
slasherDB db.Database
|
||||
nodeClient ethpb.NodeClient
|
||||
clientFeed *event.Feed
|
||||
blockFeed *event.Feed
|
||||
attestationFeed *event.Feed
|
||||
proposerSlashingsChan chan *ethpb.ProposerSlashing
|
||||
attesterSlashingsChan chan *ethpb.AttesterSlashing
|
||||
attesterSlashingsFeed *event.Feed
|
||||
proposerSlashingsFeed *event.Feed
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
cert string
|
||||
conn *grpc.ClientConn
|
||||
provider string
|
||||
beaconClient ethpb.BeaconChainClient
|
||||
slasherDB db.Database
|
||||
nodeClient ethpb.NodeClient
|
||||
clientFeed *event.Feed
|
||||
blockFeed *event.Feed
|
||||
attestationFeed *event.Feed
|
||||
proposerSlashingsChan chan *ethpb.ProposerSlashing
|
||||
attesterSlashingsChan chan *ethpb.AttesterSlashing
|
||||
attesterSlashingsFeed *event.Feed
|
||||
proposerSlashingsFeed *event.Feed
|
||||
receivedAttestationsBuffer chan *ethpb.IndexedAttestation
|
||||
collectedAttestationsBuffer chan []*ethpb.IndexedAttestation
|
||||
}
|
||||
@@ -71,18 +71,18 @@ type Config struct {
|
||||
func NewBeaconClientService(ctx context.Context, cfg *Config) *Service {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
return &Service{
|
||||
cert: cfg.BeaconCert,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
provider: cfg.BeaconProvider,
|
||||
blockFeed: new(event.Feed),
|
||||
clientFeed: new(event.Feed),
|
||||
attestationFeed: new(event.Feed),
|
||||
slasherDB: cfg.SlasherDB,
|
||||
proposerSlashingsChan: make(chan *ethpb.ProposerSlashing, 1),
|
||||
attesterSlashingsChan: make(chan *ethpb.AttesterSlashing, 1),
|
||||
attesterSlashingsFeed: cfg.AttesterSlashingsFeed,
|
||||
proposerSlashingsFeed: cfg.ProposerSlashingsFeed,
|
||||
cert: cfg.BeaconCert,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
provider: cfg.BeaconProvider,
|
||||
blockFeed: new(event.Feed),
|
||||
clientFeed: new(event.Feed),
|
||||
attestationFeed: new(event.Feed),
|
||||
slasherDB: cfg.SlasherDB,
|
||||
proposerSlashingsChan: make(chan *ethpb.ProposerSlashing, 1),
|
||||
attesterSlashingsChan: make(chan *ethpb.AttesterSlashing, 1),
|
||||
attesterSlashingsFeed: cfg.AttesterSlashingsFeed,
|
||||
proposerSlashingsFeed: cfg.ProposerSlashingsFeed,
|
||||
receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1),
|
||||
collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1),
|
||||
}
|
||||
|
||||
@@ -245,7 +245,7 @@ func (s *SpanDetector) updateMaxSpan(ctx context.Context, att *ethpb.IndexedAtte
|
||||
defer traceSpan.End()
|
||||
source := att.Data.Source.Epoch
|
||||
target := att.Data.Target.Epoch
|
||||
latestMaxSpanDistanceObserved.Set(float64(target-source))
|
||||
latestMaxSpanDistanceObserved.Set(float64(target - source))
|
||||
valIndices := make([]uint64, len(att.AttestingIndices))
|
||||
copy(valIndices, att.AttestingIndices)
|
||||
for epoch := source + 1; epoch < target; epoch++ {
|
||||
|
||||
Reference in New Issue
Block a user