mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 05:47:59 -05:00
Recover skipped archived point (#5950)
This commit is contained in:
@@ -30,6 +30,7 @@ go_library(
|
||||
"//shared/params:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
|
||||
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@io_opencensus_go//trace:go_default_library",
|
||||
],
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -55,6 +56,15 @@ func (s *State) MigrateToCold(ctx context.Context, finalizedSlot uint64, finaliz
|
||||
nextArchivedPointSlot := (lastArchivedIndex + 1) * s.slotsPerArchivedPoint
|
||||
// Only migrate if current slot is equal to or greater than next archived point slot.
|
||||
if stateSummary.Slot >= nextArchivedPointSlot {
|
||||
// If was a skipped archival index. The node should recover previous last archived index and state.
|
||||
if skippedArchivedPoint(archivedPointIndex, lastArchivedIndex) {
|
||||
recoveredIndex, err := s.recoverArchivedPoint(ctx, archivedPointIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lastArchivedIndex = recoveredIndex
|
||||
}
|
||||
|
||||
if !s.beaconDB.HasState(ctx, r) {
|
||||
continue
|
||||
}
|
||||
@@ -99,3 +109,48 @@ func (s *State) MigrateToCold(ctx context.Context, finalizedSlot uint64, finaliz
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// This recovers the last archived point. By passing in the current archived point, this recomputes
|
||||
// the state of last skipped archived point and save the missing state, archived point root, archived index to the DB.
|
||||
func (s *State) recoverArchivedPoint(ctx context.Context, currentArchivedPoint uint64) (uint64, error) {
|
||||
missingIndex := currentArchivedPoint - 1
|
||||
missingIndexSlot := missingIndex * s.slotsPerArchivedPoint
|
||||
blks, err := s.beaconDB.HighestSlotBlocksBelow(ctx, missingIndexSlot)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if len(blks) != 1 {
|
||||
return 0, errUnknownBlock
|
||||
}
|
||||
missingRoot, err := ssz.HashTreeRoot(blks[0].Block)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
missingState, err := s.StateByRoot(ctx, missingRoot)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if err := s.beaconDB.SaveState(ctx, missingState, missingRoot); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err := s.beaconDB.SaveArchivedPointRoot(ctx, missingRoot, missingIndex); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if err := s.beaconDB.SaveLastArchivedIndex(ctx, missingIndex); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
log.WithFields(logrus.Fields{
|
||||
"slot": blks[0].Block.Slot,
|
||||
"archiveIndex": missingIndex,
|
||||
"root": hex.EncodeToString(bytesutil.Trunc(missingRoot[:])),
|
||||
}).Info("Saved recovered archived point during state migration")
|
||||
|
||||
return missingIndex, nil
|
||||
}
|
||||
|
||||
// This returns true if the last archived point was skipped.
|
||||
func skippedArchivedPoint(currentArchivedPoint uint64, lastArchivedPoint uint64) bool {
|
||||
return currentArchivedPoint-lastArchivedPoint > 1
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"testing"
|
||||
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
|
||||
testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
|
||||
@@ -173,3 +174,77 @@ func TestMigrateToCold_CantDeleteCurrentArchivedIndex(t *testing.T) {
|
||||
t.Error("State should not be deleted")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSkippedArchivedPoint_CanRecover(t *testing.T) {
|
||||
db := testDB.SetupDB(t)
|
||||
ctx := context.Background()
|
||||
service := New(db, cache.NewStateSummaryCache())
|
||||
service.slotsPerArchivedPoint = 32
|
||||
|
||||
b := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: 31}}
|
||||
if err := service.beaconDB.SaveBlock(ctx, b); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
r, err := ssz.HashTreeRoot(b.Block)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
beaconState, _ := testutil.DeterministicGenesisState(t, 32)
|
||||
if err := beaconState.SetSlot(31); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := service.beaconDB.SaveState(ctx, beaconState, r); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
currentArchivedPoint := uint64(2)
|
||||
lastPoint, err := service.recoverArchivedPoint(ctx, currentArchivedPoint)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if lastPoint != currentArchivedPoint-1 {
|
||||
t.Error("Did not get wanted point")
|
||||
}
|
||||
if !service.beaconDB.HasArchivedPoint(ctx, lastPoint) {
|
||||
t.Error("Did not save archived point index")
|
||||
}
|
||||
if service.beaconDB.ArchivedPointRoot(ctx, lastPoint) != r {
|
||||
t.Error("Did not get wanted archived index root")
|
||||
}
|
||||
}
|
||||
|
||||
func TestSkippedArchivedPoint(t *testing.T) {
|
||||
tests := []struct {
|
||||
a uint64
|
||||
b uint64
|
||||
c bool
|
||||
}{
|
||||
{
|
||||
a: 0,
|
||||
b: 1,
|
||||
c: false,
|
||||
},
|
||||
{
|
||||
a: 1,
|
||||
b: 1,
|
||||
c: false,
|
||||
},
|
||||
{
|
||||
a: 1,
|
||||
b: 2,
|
||||
c: false,
|
||||
},
|
||||
{
|
||||
a: 1,
|
||||
b: 3,
|
||||
c: true,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
if tt.c != skippedArchivedPoint(tt.b, tt.a) {
|
||||
t.Fatalf("skippedArchivedPoint(%d, %d) = %v, wanted: %v", tt.b, tt.a, skippedArchivedPoint(tt.b, tt.a), tt.c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -189,7 +189,7 @@ var (
|
||||
// GrpcMaxCallRecvMsgSizeFlag defines the max call message size for GRPC
|
||||
GrpcMaxCallRecvMsgSizeFlag = &cli.IntFlag{
|
||||
Name: "grpc-max-msg-size",
|
||||
Usage: "Integer to define max recieve message call size (default: 4194304 (for 40MB))",
|
||||
Usage: "Integer to define max recieve message call size (default: 4194304 (for 4MB))",
|
||||
Value: 1 << 22,
|
||||
}
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user