From a90ffaba49746648c43de3ac4319e7180bd69c4e Mon Sep 17 00:00:00 2001 From: terence tsao Date: Thu, 12 Mar 2020 01:38:30 +0100 Subject: [PATCH] Archived point retrieval and recovery (#5075) --- beacon-chain/state/stategen/BUILD.bazel | 2 + beacon-chain/state/stategen/cold.go | 53 ++++++++++ beacon-chain/state/stategen/cold_test.go | 112 ++++++++++++++++++++++ beacon-chain/state/stategen/service.go | 1 + slasher/beaconclient/receivers_test.go | 17 ++-- slasher/beaconclient/service.go | 54 +++++------ slasher/detection/attestations/spanner.go | 2 +- 7 files changed, 204 insertions(+), 37 deletions(-) create mode 100644 beacon-chain/state/stategen/cold.go create mode 100644 beacon-chain/state/stategen/cold_test.go diff --git a/beacon-chain/state/stategen/BUILD.bazel b/beacon-chain/state/stategen/BUILD.bazel index ae1537be0d..28d49513f6 100644 --- a/beacon-chain/state/stategen/BUILD.bazel +++ b/beacon-chain/state/stategen/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "cold.go", "epoch_boundary_root.go", "errors.go", "hot.go", @@ -33,6 +34,7 @@ go_library( go_test( name = "go_default_test", srcs = [ + "cold_test.go", "epoch_boundary_root_test.go", "hot_test.go", "replay_test.go", diff --git a/beacon-chain/state/stategen/cold.go b/beacon-chain/state/stategen/cold.go new file mode 100644 index 0000000000..ac5bc27e72 --- /dev/null +++ b/beacon-chain/state/stategen/cold.go @@ -0,0 +1,53 @@ +package stategen + +import ( + "context" + + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/beacon-chain/state" + "go.opencensus.io/trace" +) + +// Given the archive index, this returns the archived cold state in the DB. +// If the archived state does not exist in the state, it'll compute it and save it. +func (s *State) archivedPointByIndex(ctx context.Context, archiveIndex uint64) (*state.BeaconState, error) { + ctx, span := trace.StartSpan(ctx, "stateGen.loadArchivedPointByIndex") + defer span.End() + if s.beaconDB.HasArchivedPoint(ctx, archiveIndex) { + return s.beaconDB.ArchivedPointState(ctx, archiveIndex) + } + + // If for certain reasons, archived point does not exist in DB, + // a node should regenerate it and save it. + return s.recoverArchivedPointByIndex(ctx, archiveIndex) +} + +// This recovers an archived point by index. For certain reasons (ex. user toggles feature flag), +// an archived point may not be present in the DB. This regenerates the archived point state via +// playback and saves the archived root/state to the DB. +func (s *State) recoverArchivedPointByIndex(ctx context.Context, archiveIndex uint64) (*state.BeaconState, error) { + ctx, span := trace.StartSpan(ctx, "stateGen.recoverArchivedPointByIndex") + defer span.End() + + archivedSlot := archiveIndex * s.slotsPerArchivedPoint + archivedState, err := s.ComputeStateUpToSlot(ctx, archivedSlot) + if err != nil { + return nil, errors.Wrap(err, "could not compute state up to archived index slot") + } + if archivedState == nil { + return nil, errUnknownArchivedState + } + lastRoot, _, err := s.lastSavedBlock(ctx, archivedSlot) + if err != nil { + return nil, errors.Wrap(err, "could not get last valid block up to archived index slot") + } + + if err := s.beaconDB.SaveArchivedPointRoot(ctx, lastRoot, archiveIndex); err != nil { + return nil, err + } + if err := s.beaconDB.SaveArchivedPointState(ctx, archivedState, archiveIndex); err != nil { + return nil, err + } + + return archivedState, nil +} diff --git a/beacon-chain/state/stategen/cold_test.go b/beacon-chain/state/stategen/cold_test.go new file mode 100644 index 0000000000..79476267cd --- /dev/null +++ b/beacon-chain/state/stategen/cold_test.go @@ -0,0 +1,112 @@ +package stategen + +import ( + "context" + "testing" + + "github.com/gogo/protobuf/proto" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" + testDB "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + "github.com/prysmaticlabs/prysm/shared/testutil" +) + +func TestArchivedPointByIndex_HasPoint(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + service := New(db) + beaconState, _ := testutil.DeterministicGenesisState(t, 32) + index := uint64(999) + if err := service.beaconDB.SaveArchivedPointState(ctx, beaconState, index); err != nil { + t.Fatal(err) + } + if err := service.beaconDB.SaveArchivedPointRoot(ctx, [32]byte{'A'}, index); err != nil { + t.Fatal(err) + } + + savedArchivedState, err := service.archivedPointByIndex(ctx, index) + if err != nil { + t.Fatal(err) + } + if !proto.Equal(beaconState.InnerStateUnsafe(), savedArchivedState.InnerStateUnsafe()) { + t.Error("Diff saved state") + } +} + +func TestArchivedPointByIndex_DoesntHavePoint(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + service := New(db) + + gBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} + gRoot, err := ssz.HashTreeRoot(gBlk.Block) + if err != nil { + t.Fatal(err) + } + if err := service.beaconDB.SaveBlock(ctx, gBlk); err != nil { + t.Fatal(err) + } + beaconState, _ := testutil.DeterministicGenesisState(t, 32) + if err := service.beaconDB.SaveState(ctx, beaconState, gRoot); err != nil { + t.Fatal(err) + } + + service.slotsPerArchivedPoint = 32 + recoveredState, err := service.archivedPointByIndex(ctx, 2) + if err != nil { + t.Fatal(err) + } + + if recoveredState.Slot() != service.slotsPerArchivedPoint*2 { + t.Error("Diff state slot") + } + savedArchivedState, err := service.beaconDB.ArchivedPointState(ctx, 2) + if err != nil { + t.Fatal(err) + } + if !proto.Equal(recoveredState.InnerStateUnsafe(), savedArchivedState.InnerStateUnsafe()) { + t.Error("Diff saved archived state") + } +} + +func TestRecoverArchivedPointByIndex_CanRecover(t *testing.T) { + ctx := context.Background() + db := testDB.SetupDB(t) + defer testDB.TeardownDB(t, db) + + service := New(db) + + gBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} + gRoot, err := ssz.HashTreeRoot(gBlk.Block) + if err != nil { + t.Fatal(err) + } + if err := service.beaconDB.SaveBlock(ctx, gBlk); err != nil { + t.Fatal(err) + } + beaconState, _ := testutil.DeterministicGenesisState(t, 32) + if err := service.beaconDB.SaveState(ctx, beaconState, gRoot); err != nil { + t.Fatal(err) + } + + service.slotsPerArchivedPoint = 32 + recoveredState, err := service.recoverArchivedPointByIndex(ctx, 1) + if err != nil { + t.Fatal(err) + } + + if recoveredState.Slot() != service.slotsPerArchivedPoint { + t.Error("Diff state slot") + } + savedArchivedState, err := service.beaconDB.ArchivedPointState(ctx, 1) + if err != nil { + t.Fatal(err) + } + if !proto.Equal(recoveredState.InnerStateUnsafe(), savedArchivedState.InnerStateUnsafe()) { + t.Error("Diff saved state") + } +} diff --git a/beacon-chain/state/stategen/service.go b/beacon-chain/state/stategen/service.go index a7887934b7..3a12e5c2f7 100644 --- a/beacon-chain/state/stategen/service.go +++ b/beacon-chain/state/stategen/service.go @@ -13,6 +13,7 @@ import ( type State struct { beaconDB db.NoHeadAccessDatabase lastArchivedSlot uint64 + slotsPerArchivedPoint uint64 epochBoundarySlotToRoot map[uint64][32]byte epochBoundaryLock sync.RWMutex hotStateCache *cache.HotStateCache diff --git a/slasher/beaconclient/receivers_test.go b/slasher/beaconclient/receivers_test.go index 646aa06b80..f2fa07e333 100644 --- a/slasher/beaconclient/receivers_test.go +++ b/slasher/beaconclient/receivers_test.go @@ -44,8 +44,8 @@ func TestService_ReceiveAttestations(t *testing.T) { client := mock.NewMockBeaconChainClient(ctrl) bs := Service{ - beaconClient: client, - blockFeed: new(event.Feed), + beaconClient: client, + blockFeed: new(event.Feed), receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1), collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1), } @@ -70,17 +70,16 @@ func TestService_ReceiveAttestations(t *testing.T) { bs.receiveAttestations(ctx) } - func TestService_ReceiveAttestations_Batched(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() client := mock.NewMockBeaconChainClient(ctrl) bs := Service{ - beaconClient: client, - blockFeed: new(event.Feed), - slasherDB: testDB.SetupSlasherDB(t, false), - attestationFeed: new(event.Feed), + beaconClient: client, + blockFeed: new(event.Feed), + slasherDB: testDB.SetupSlasherDB(t, false), + attestationFeed: new(event.Feed), receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1), collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1), } @@ -104,7 +103,7 @@ func TestService_ReceiveAttestations_Batched(t *testing.T) { att, nil, ).Do(func() { - time.Sleep(2*time.Second) + time.Sleep(2 * time.Second) cancel() }) @@ -114,7 +113,7 @@ func TestService_ReceiveAttestations_Batched(t *testing.T) { bs.receivedAttestationsBuffer <- att att.Data.Target.Epoch = 8 bs.receivedAttestationsBuffer <- att - atts := <- bs.collectedAttestationsBuffer + atts := <-bs.collectedAttestationsBuffer if len(atts) != 3 { t.Fatalf("Expected %d received attestations to be batched", len(atts)) } diff --git a/slasher/beaconclient/service.go b/slasher/beaconclient/service.go index 7e0ccc1533..c38f81bbfe 100644 --- a/slasher/beaconclient/service.go +++ b/slasher/beaconclient/service.go @@ -39,21 +39,21 @@ type ChainFetcher interface { // Service struct for the beaconclient service of the slasher. type Service struct { - ctx context.Context - cancel context.CancelFunc - cert string - conn *grpc.ClientConn - provider string - beaconClient ethpb.BeaconChainClient - slasherDB db.Database - nodeClient ethpb.NodeClient - clientFeed *event.Feed - blockFeed *event.Feed - attestationFeed *event.Feed - proposerSlashingsChan chan *ethpb.ProposerSlashing - attesterSlashingsChan chan *ethpb.AttesterSlashing - attesterSlashingsFeed *event.Feed - proposerSlashingsFeed *event.Feed + ctx context.Context + cancel context.CancelFunc + cert string + conn *grpc.ClientConn + provider string + beaconClient ethpb.BeaconChainClient + slasherDB db.Database + nodeClient ethpb.NodeClient + clientFeed *event.Feed + blockFeed *event.Feed + attestationFeed *event.Feed + proposerSlashingsChan chan *ethpb.ProposerSlashing + attesterSlashingsChan chan *ethpb.AttesterSlashing + attesterSlashingsFeed *event.Feed + proposerSlashingsFeed *event.Feed receivedAttestationsBuffer chan *ethpb.IndexedAttestation collectedAttestationsBuffer chan []*ethpb.IndexedAttestation } @@ -71,18 +71,18 @@ type Config struct { func NewBeaconClientService(ctx context.Context, cfg *Config) *Service { ctx, cancel := context.WithCancel(ctx) return &Service{ - cert: cfg.BeaconCert, - ctx: ctx, - cancel: cancel, - provider: cfg.BeaconProvider, - blockFeed: new(event.Feed), - clientFeed: new(event.Feed), - attestationFeed: new(event.Feed), - slasherDB: cfg.SlasherDB, - proposerSlashingsChan: make(chan *ethpb.ProposerSlashing, 1), - attesterSlashingsChan: make(chan *ethpb.AttesterSlashing, 1), - attesterSlashingsFeed: cfg.AttesterSlashingsFeed, - proposerSlashingsFeed: cfg.ProposerSlashingsFeed, + cert: cfg.BeaconCert, + ctx: ctx, + cancel: cancel, + provider: cfg.BeaconProvider, + blockFeed: new(event.Feed), + clientFeed: new(event.Feed), + attestationFeed: new(event.Feed), + slasherDB: cfg.SlasherDB, + proposerSlashingsChan: make(chan *ethpb.ProposerSlashing, 1), + attesterSlashingsChan: make(chan *ethpb.AttesterSlashing, 1), + attesterSlashingsFeed: cfg.AttesterSlashingsFeed, + proposerSlashingsFeed: cfg.ProposerSlashingsFeed, receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1), collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1), } diff --git a/slasher/detection/attestations/spanner.go b/slasher/detection/attestations/spanner.go index dee89c0df4..f97bfa8214 100644 --- a/slasher/detection/attestations/spanner.go +++ b/slasher/detection/attestations/spanner.go @@ -245,7 +245,7 @@ func (s *SpanDetector) updateMaxSpan(ctx context.Context, att *ethpb.IndexedAtte defer traceSpan.End() source := att.Data.Source.Epoch target := att.Data.Target.Epoch - latestMaxSpanDistanceObserved.Set(float64(target-source)) + latestMaxSpanDistanceObserved.Set(float64(target - source)) valIndices := make([]uint64, len(att.AttestingIndices)) copy(valIndices, att.AttestingIndices) for epoch := source + 1; epoch < target; epoch++ {