mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 16:08:26 -05:00
Add HighestSlotState Getter for db (#5192)
This commit is contained in:
@@ -345,7 +345,7 @@ func (k *Store) HighestSlotBlock(ctx context.Context) (*ethpb.SignedBeaconBlock,
|
||||
// setBlockSlotBitField sets the block slot bit in DB.
|
||||
// This helps to track which slot has a saved block in db.
|
||||
func (k *Store) setBlockSlotBitField(ctx context.Context, tx *bolt.Tx, slot uint64) error {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.updateSavedBlockSlot")
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.setBlockSlotBitField")
|
||||
defer span.End()
|
||||
|
||||
bucket := tx.Bucket(slotsHasObjectBucket)
|
||||
@@ -359,7 +359,7 @@ func (k *Store) setBlockSlotBitField(ctx context.Context, tx *bolt.Tx, slot uint
|
||||
// clearBlockSlotBitField clears the block slot bit in DB.
|
||||
// This helps to track which slot has a saved block in db.
|
||||
func (k *Store) clearBlockSlotBitField(ctx context.Context, tx *bolt.Tx, slot uint64) error {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.updateSavedBlockSlot")
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.clearBlockSlotBitField")
|
||||
defer span.End()
|
||||
|
||||
bucket := tx.Bucket(slotsHasObjectBucket)
|
||||
|
||||
@@ -3,6 +3,7 @@ package kv
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/dgraph-io/ristretto"
|
||||
@@ -35,6 +36,7 @@ type Store struct {
|
||||
databasePath string
|
||||
blockCache *ristretto.Cache
|
||||
validatorIndexCache *ristretto.Cache
|
||||
stateSlotBitLock sync.Mutex
|
||||
}
|
||||
|
||||
// NewKVStore initializes a new boltDB key-value store at the directory
|
||||
|
||||
@@ -3,9 +3,11 @@ package kv
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"math"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
|
||||
"github.com/prysmaticlabs/prysm/shared/bytesutil"
|
||||
@@ -118,7 +120,10 @@ func (k *Store) SaveState(ctx context.Context, state *state.BeaconState, blockRo
|
||||
|
||||
return k.db.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(stateBucket)
|
||||
return bucket.Put(blockRoot[:], enc)
|
||||
if err := bucket.Put(blockRoot[:], enc); err != nil {
|
||||
return err
|
||||
}
|
||||
return k.setStateSlotBitField(ctx, tx, state.Slot())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -141,6 +146,9 @@ func (k *Store) SaveStates(ctx context.Context, states []*state.BeaconState, blo
|
||||
return k.db.Update(func(tx *bolt.Tx) error {
|
||||
bucket := tx.Bucket(stateBucket)
|
||||
for i, rt := range blockRoots {
|
||||
if err := k.setStateSlotBitField(ctx, tx, states[i].Slot()); err != nil {
|
||||
return err
|
||||
}
|
||||
err = bucket.Put(rt[:], multipleEncs[i])
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -196,6 +204,14 @@ func (k *Store) DeleteState(ctx context.Context, blockRoot [32]byte) error {
|
||||
}
|
||||
}
|
||||
|
||||
slot, err := slotByBlockRoot(ctx, tx, blockRoot[:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := k.clearStateSlotBitField(ctx, tx, slot); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bkt = tx.Bucket(stateBucket)
|
||||
return bkt.Delete(blockRoot[:])
|
||||
})
|
||||
@@ -229,8 +245,8 @@ func (k *Store) DeleteStates(ctx context.Context, blockRoots [][32]byte) error {
|
||||
return err
|
||||
}
|
||||
|
||||
bkt = tx.Bucket(blocksBucket)
|
||||
headBlkRoot := bkt.Get(headBlockRootKey)
|
||||
blockBkt := tx.Bucket(blocksBucket)
|
||||
headBlkRoot := blockBkt.Get(headBlockRootKey)
|
||||
bkt = tx.Bucket(stateBucket)
|
||||
c := bkt.Cursor()
|
||||
|
||||
@@ -246,6 +262,15 @@ func (k *Store) DeleteStates(ctx context.Context, blockRoots [][32]byte) error {
|
||||
return errors.New("cannot delete genesis, finalized, or head state")
|
||||
}
|
||||
}
|
||||
|
||||
slot, err := slotByBlockRoot(ctx, tx, blockRoot)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := k.clearStateSlotBitField(ctx, tx, slot); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.Delete(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -264,3 +289,136 @@ func createState(enc []byte) (*pb.BeaconState, error) {
|
||||
}
|
||||
return protoState, nil
|
||||
}
|
||||
|
||||
// slotByBlockRoot retrieves the corresponding slot of the input block root.
|
||||
func slotByBlockRoot(ctx context.Context, tx *bolt.Tx, blockRoot []byte) (uint64, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.slotByBlockRoot")
|
||||
defer span.End()
|
||||
|
||||
if featureconfig.Get().NewStateMgmt {
|
||||
bkt := tx.Bucket(stateSummaryBucket)
|
||||
enc := bkt.Get(blockRoot)
|
||||
if enc == nil {
|
||||
return 0, errors.New("state summary enc can't be nil")
|
||||
}
|
||||
stateSummary := &pb.StateSummary{}
|
||||
if err := decode(enc, stateSummary); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return stateSummary.Slot, nil
|
||||
}
|
||||
|
||||
bkt := tx.Bucket(stateBucket)
|
||||
enc := bkt.Get(blockRoot)
|
||||
if enc == nil {
|
||||
return 0, errors.New("state enc can't be nil")
|
||||
}
|
||||
|
||||
s, err := createState(enc)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
if s == nil {
|
||||
return 0, errors.New("state can't be nil")
|
||||
}
|
||||
return s.Slot, nil
|
||||
}
|
||||
|
||||
// HighestSlotStates returns the states with the highest slot from the db.
|
||||
// Ideally there should just be one state per slot, but given validator
|
||||
// can double propose, a single slot could have multiple block roots and
|
||||
// reuslts states. This returns a list of states.
|
||||
func (k *Store) HighestSlotStates(ctx context.Context) ([]*state.BeaconState, error) {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.HighestSlotState")
|
||||
defer span.End()
|
||||
var states []*state.BeaconState
|
||||
err := k.db.View(func(tx *bolt.Tx) error {
|
||||
slotBkt := tx.Bucket(slotsHasObjectBucket)
|
||||
savedSlots := slotBkt.Get(savedStateSlotsKey)
|
||||
highestIndex, err := bytesutil.HighestBitIndex(savedSlots)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
highestSlot := highestIndex - 1
|
||||
highestSlot = int(math.Max(0, float64(highestSlot)))
|
||||
f := filters.NewFilter().SetStartSlot(uint64(highestSlot)).SetEndSlot(uint64(highestSlot))
|
||||
|
||||
keys, err := getBlockRootsByFilter(ctx, tx, f)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(keys) == 0 {
|
||||
return errors.New("could not get one block root to get state")
|
||||
}
|
||||
|
||||
stateBkt := tx.Bucket(stateBucket)
|
||||
for i := range keys {
|
||||
enc := stateBkt.Get(keys[i][:])
|
||||
if enc == nil {
|
||||
continue
|
||||
}
|
||||
pbState, err := createState(enc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s, err := state.InitializeFromProtoUnsafe(pbState)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
states = append(states, s)
|
||||
}
|
||||
|
||||
return err
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(states) == 0 {
|
||||
return nil, errors.New("could not get one state")
|
||||
}
|
||||
|
||||
return states, nil
|
||||
}
|
||||
|
||||
// setStateSlotBitField sets the state slot bit in DB.
|
||||
// This helps to track which slot has a saved state in db.
|
||||
func (k *Store) setStateSlotBitField(ctx context.Context, tx *bolt.Tx, slot uint64) error {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.setStateSlotBitField")
|
||||
defer span.End()
|
||||
|
||||
k.stateSlotBitLock.Lock()
|
||||
defer k.stateSlotBitLock.Unlock()
|
||||
|
||||
bucket := tx.Bucket(slotsHasObjectBucket)
|
||||
slotBitfields := bucket.Get(savedStateSlotsKey)
|
||||
|
||||
// Copy is needed to avoid unsafe pointer conversions.
|
||||
// See: https://github.com/etcd-io/bbolt/pull/201
|
||||
tmp := make([]byte, len(slotBitfields))
|
||||
copy(tmp, slotBitfields)
|
||||
slotBitfields = bytesutil.SetBit(tmp, int(slot))
|
||||
return bucket.Put(savedStateSlotsKey, slotBitfields)
|
||||
}
|
||||
|
||||
// clearStateSlotBitField clears the state slot bit in DB.
|
||||
// This helps to track which slot has a saved state in db.
|
||||
func (k *Store) clearStateSlotBitField(ctx context.Context, tx *bolt.Tx, slot uint64) error {
|
||||
ctx, span := trace.StartSpan(ctx, "BeaconDB.clearStateSlotBitField")
|
||||
defer span.End()
|
||||
|
||||
k.stateSlotBitLock.Lock()
|
||||
defer k.stateSlotBitLock.Unlock()
|
||||
|
||||
bucket := tx.Bucket(slotsHasObjectBucket)
|
||||
slotBitfields := bucket.Get(savedStateSlotsKey)
|
||||
|
||||
// Copy is needed to avoid unsafe pointer conversions.
|
||||
// See: https://github.com/etcd-io/bbolt/pull/201
|
||||
tmp := make([]byte, len(slotBitfields))
|
||||
copy(tmp, slotBitfields)
|
||||
slotBitfields = bytesutil.ClearBit(tmp, int(slot))
|
||||
return bucket.Put(savedStateSlotsKey, slotBitfields)
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
|
||||
"github.com/prysmaticlabs/go-ssz"
|
||||
"github.com/prysmaticlabs/prysm/beacon-chain/state"
|
||||
@@ -286,3 +287,84 @@ func TestStore_DeleteHeadState(t *testing.T) {
|
||||
t.Error("Did not receive wanted error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStore_SaveDeleteState_CanGetHighest(t *testing.T) {
|
||||
db := setupDB(t)
|
||||
defer teardownDB(t, db)
|
||||
|
||||
s0 := &pb.BeaconState{Slot: 1}
|
||||
b := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: 1}}
|
||||
r, _ := ssz.HashTreeRoot(b.Block)
|
||||
if err := db.SaveBlock(context.Background(), b); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
st, err := state.InitializeFromProto(s0)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := db.SaveState(context.Background(), st, r); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s1 := &pb.BeaconState{Slot: 999}
|
||||
b = ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: 999}}
|
||||
r1, _ := ssz.HashTreeRoot(b.Block)
|
||||
if err := db.SaveBlock(context.Background(), b); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
st, err = state.InitializeFromProto(s1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := db.SaveState(context.Background(), st, r1); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
highest, err := db.HighestSlotStates(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !proto.Equal(highest[0].InnerStateUnsafe(), s1) {
|
||||
t.Errorf("Did not retrieve saved state: %v != %v", highest, s1)
|
||||
}
|
||||
|
||||
s2 := &pb.BeaconState{Slot: 1000}
|
||||
b = ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: 1000}}
|
||||
r2, _ := ssz.HashTreeRoot(b.Block)
|
||||
if err := db.SaveBlock(context.Background(), b); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
st, err = state.InitializeFromProto(s2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := db.SaveState(context.Background(), st, r2); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
highest, err = db.HighestSlotStates(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !proto.Equal(highest[0].InnerStateUnsafe(), s2) {
|
||||
t.Errorf("Did not retrieve saved state: %v != %v", highest, s2)
|
||||
}
|
||||
|
||||
db.DeleteState(context.Background(), r2)
|
||||
highest, err = db.HighestSlotStates(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !proto.Equal(highest[0].InnerStateUnsafe(), s1) {
|
||||
t.Errorf("Did not retrieve saved state: %v != %v", highest, s1)
|
||||
}
|
||||
|
||||
db.DeleteState(context.Background(), r1)
|
||||
highest, err = db.HighestSlotStates(context.Background())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !proto.Equal(highest[0].InnerStateUnsafe(), s0) {
|
||||
t.Errorf("Did not retrieve saved state: %v != %v", highest, s1)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,7 +109,7 @@ func (s *Service) Start() {
|
||||
|
||||
go func() {
|
||||
if s.listener == nil {
|
||||
return
|
||||
return
|
||||
}
|
||||
if err := s.grpcServer.Serve(s.listener); err != nil {
|
||||
log.Errorf("Could not serve gRPC: %v", err)
|
||||
|
||||
Reference in New Issue
Block a user