Resolves issues from the best practices in beacon-chain/db (#6358)

* fixes 51
* fixes 52
* fixes 53
* fixes 54
* fixes 56
* Merge branch 'master' into db-best-practices
* fixes 62
* fixes 64
* fixes 65
* fixes 68
* fixes 69
This commit is contained in:
Victor Farazdagi
2020-06-23 16:51:00 +03:00
committed by GitHub
parent 22b3123ebe
commit 078c157f56
9 changed files with 82 additions and 49 deletions

View File

@@ -16,31 +16,31 @@ package filters
// FilterType defines an enum which is used as the keys in a map that tracks
// set attribute filters for data as part of the `FilterQuery` struct type.
type FilterType int
type FilterType uint8
const (
// ParentRoot defines a filter for parent roots of blocks using Simple Serialize (SSZ).
ParentRoot FilterType = 0
ParentRoot FilterType = iota
// StartSlot is used for range filters of objects by their slot (inclusive).
StartSlot FilterType = 1
StartSlot
// EndSlot is used for range filters of objects by their slot (inclusive).
EndSlot FilterType = 2
EndSlot
// StartEpoch is used for range filters of objects by their epoch (inclusive).
StartEpoch FilterType = 3
StartEpoch
// EndEpoch is used for range filters of objects by their epoch (inclusive).
EndEpoch FilterType = 4
EndEpoch
// HeadBlockRoot defines a filter for the head block root attribute of objects.
HeadBlockRoot FilterType = 5
HeadBlockRoot
// SourceEpoch defines a filter for the source epoch attribute of objects.
SourceEpoch FilterType = 6
SourceEpoch
// SourceRoot defines a filter for the source root attribute of objects.
SourceRoot FilterType = 7
SourceRoot
// TargetEpoch defines a filter for the target epoch attribute of objects.
TargetEpoch FilterType = 8
TargetEpoch
// TargetRoot defines a filter for the target root attribute of objects.
TargetRoot FilterType = 9
TargetRoot
// SlotStep is used for range filters of objects by their slot in step increments.
SlotStep FilterType = 10
SlotStep
)
// QueryFilter defines a generic interface for type-asserting

View File

@@ -33,6 +33,7 @@ type Exporter struct {
// the database, but returns the underlying database pointer itself.
func Wrap(db iface.Database) (iface.Database, error) {
if featureconfig.Get().KafkaBootstrapServers == "" {
log.Debug("Empty Kafka bootstrap servers list, database was not wrapped with Kafka exporter")
return db, nil
}

View File

@@ -35,7 +35,7 @@ func (kv *Store) LastArchivedIndex(ctx context.Context) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.LastArchivedIndex")
defer span.End()
var index uint64
err := kv.db.Update(func(tx *bolt.Tx) error {
err := kv.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket(archivedIndexRootBucket)
b := bucket.Get(lastArchivedIndexKey)
if b == nil {

View File

@@ -56,7 +56,7 @@ func (kv *Store) Attestations(ctx context.Context, f *filters.QueryFilter) ([]*e
// Creates a list of indices from the passed in filter values, such as:
// []byte("parent-root-0x2093923"), etc. to be used for looking up
// block roots that were stored under each of those indices for O(1) lookup.
indicesByBucket, err := createAttestationIndicesFromFilters(f)
indicesByBucket, err := createAttestationIndicesFromFilters(ctx, f)
if err != nil {
return errors.Wrap(err, "could not determine lookup indices")
}
@@ -64,7 +64,7 @@ func (kv *Store) Attestations(ctx context.Context, f *filters.QueryFilter) ([]*e
// lookup index, we find the intersection across all of them and use
// that list of roots to lookup the attestations. These attestations will
// meet the filter criteria.
keys := sliceutil.IntersectionByteSlices(lookupValuesForIndices(indicesByBucket, tx)...)
keys := sliceutil.IntersectionByteSlices(lookupValuesForIndices(ctx, indicesByBucket, tx)...)
for i := 0; i < len(keys); i++ {
encoded := bkt.Get(keys[i])
ac := &dbpb.AttestationContainer{}
@@ -107,8 +107,8 @@ func (kv *Store) DeleteAttestation(ctx context.Context, attDataRoot [32]byte) er
if err := decode(enc, ac); err != nil {
return err
}
indicesByBucket := createAttestationIndicesFromData(ac.Data)
if err := deleteValueForIndices(indicesByBucket, attDataRoot[:], tx); err != nil {
indicesByBucket := createAttestationIndicesFromData(ctx, ac.Data)
if err := deleteValueForIndices(ctx, indicesByBucket, attDataRoot[:], tx); err != nil {
return errors.Wrap(err, "could not delete root for DB indices")
}
return bkt.Delete(attDataRoot[:])
@@ -128,8 +128,8 @@ func (kv *Store) DeleteAttestations(ctx context.Context, attDataRoots [][32]byte
if err := decode(enc, ac); err != nil {
return err
}
indicesByBucket := createAttestationIndicesFromData(ac.Data)
if err := deleteValueForIndices(indicesByBucket, attDataRoot[:], tx); err != nil {
indicesByBucket := createAttestationIndicesFromData(ctx, ac.Data)
if err := deleteValueForIndices(ctx, indicesByBucket, attDataRoot[:], tx); err != nil {
return errors.Wrap(err, "could not delete root for DB indices")
}
if err := bkt.Delete(attDataRoot[:]); err != nil {
@@ -177,8 +177,8 @@ func (kv *Store) SaveAttestation(ctx context.Context, att *ethpb.Attestation) er
return err
}
indicesByBucket := createAttestationIndicesFromData(att.Data)
if err := updateValueForIndices(indicesByBucket, attDataRoot[:], tx); err != nil {
indicesByBucket := createAttestationIndicesFromData(ctx, att.Data)
if err := updateValueForIndices(ctx, indicesByBucket, attDataRoot[:], tx); err != nil {
return errors.Wrap(err, "could not update DB indices")
}
return bkt.Put(attDataRoot[:], enc)
@@ -219,8 +219,8 @@ func (kv *Store) SaveAttestations(ctx context.Context, atts []*ethpb.Attestation
return err
}
indicesByBucket := createAttestationIndicesFromData(att.Data)
if err := updateValueForIndices(indicesByBucket, attDataRoot[:], tx); err != nil {
indicesByBucket := createAttestationIndicesFromData(ctx, att.Data)
if err := updateValueForIndices(ctx, indicesByBucket, attDataRoot[:], tx); err != nil {
return errors.Wrap(err, "could not update DB indices")
}
@@ -240,7 +240,10 @@ func (kv *Store) SaveAttestations(ctx context.Context, atts []*ethpb.Attestation
// createAttestationIndicesFromData takes in attestation data and returns
// a map of bolt DB index buckets corresponding to each particular key for indices for
// data, such as (shard indices bucket -> shard 5).
func createAttestationIndicesFromData(attData *ethpb.AttestationData) map[string][]byte {
func createAttestationIndicesFromData(ctx context.Context, attData *ethpb.AttestationData) map[string][]byte {
ctx, span := trace.StartSpan(ctx, "BeaconDB.createAttestationIndicesFromData")
defer span.End()
indicesByBucket := make(map[string][]byte)
buckets := make([][]byte, 0)
indices := make([][]byte, 0)
@@ -277,7 +280,10 @@ func createAttestationIndicesFromData(attData *ethpb.AttestationData) map[string
// For attestations, these are list of hash tree roots of attestation.Data
// objects. If a certain filter criterion does not apply to
// attestations, an appropriate error is returned.
func createAttestationIndicesFromFilters(f *filters.QueryFilter) (map[string][]byte, error) {
func createAttestationIndicesFromFilters(ctx context.Context, f *filters.QueryFilter) (map[string][]byte, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.createAttestationIndicesFromFilters")
defer span.End()
indicesByBucket := make(map[string][]byte)
for k, v := range f.Filters() {
switch k {

View File

@@ -144,8 +144,8 @@ func (kv *Store) DeleteBlock(ctx context.Context, blockRoot [32]byte) error {
if err := decode(enc, block); err != nil {
return err
}
indicesByBucket := createBlockIndicesFromBlock(block.Block)
if err := deleteValueForIndices(indicesByBucket, blockRoot[:], tx); err != nil {
indicesByBucket := createBlockIndicesFromBlock(ctx, block.Block)
if err := deleteValueForIndices(ctx, indicesByBucket, blockRoot[:], tx); err != nil {
return errors.Wrap(err, "could not delete root for DB indices")
}
kv.blockCache.Del(string(blockRoot[:]))
@@ -172,8 +172,8 @@ func (kv *Store) DeleteBlocks(ctx context.Context, blockRoots [][32]byte) error
if err := decode(enc, block); err != nil {
return err
}
indicesByBucket := createBlockIndicesFromBlock(block.Block)
if err := deleteValueForIndices(indicesByBucket, blockRoot[:], tx); err != nil {
indicesByBucket := createBlockIndicesFromBlock(ctx, block.Block)
if err := deleteValueForIndices(ctx, indicesByBucket, blockRoot[:], tx); err != nil {
return errors.Wrap(err, "could not delete root for DB indices")
}
kv.blockCache.Del(string(blockRoot[:]))
@@ -212,8 +212,8 @@ func (kv *Store) SaveBlock(ctx context.Context, signed *ethpb.SignedBeaconBlock)
if err != nil {
return err
}
indicesByBucket := createBlockIndicesFromBlock(signed.Block)
if err := updateValueForIndices(indicesByBucket, blockRoot[:], tx); err != nil {
indicesByBucket := createBlockIndicesFromBlock(ctx, signed.Block)
if err := updateValueForIndices(ctx, indicesByBucket, blockRoot[:], tx); err != nil {
return errors.Wrap(err, "could not update DB indices")
}
kv.blockCache.Set(string(blockRoot[:]), signed, int64(len(enc)))
@@ -244,8 +244,8 @@ func (kv *Store) SaveBlocks(ctx context.Context, blocks []*ethpb.SignedBeaconBlo
if err != nil {
return err
}
indicesByBucket := createBlockIndicesFromBlock(block.Block)
if err := updateValueForIndices(indicesByBucket, blockRoot[:], tx); err != nil {
indicesByBucket := createBlockIndicesFromBlock(ctx, block.Block)
if err := updateValueForIndices(ctx, indicesByBucket, blockRoot[:], tx); err != nil {
return errors.Wrap(err, "could not update DB indices")
}
kv.blockCache.Set(string(blockRoot[:]), block, int64(len(enc)))
@@ -452,7 +452,7 @@ func getBlockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFil
// Creates a list of indices from the passed in filter values, such as:
// []byte("0x2093923") in the parent root indices bucket to be used for looking up
// block roots that were stored under each of those indices for O(1) lookup.
indicesByBucket, err := createBlockIndicesFromFilters(f)
indicesByBucket, err := createBlockIndicesFromFilters(ctx, f)
if err != nil {
return nil, errors.Wrap(err, "could not determine lookup indices")
}
@@ -460,6 +460,7 @@ func getBlockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFil
// We retrieve block roots that match a filter criteria of slot ranges, if specified.
filtersMap := f.Filters()
rootsBySlotRange := fetchBlockRootsBySlotRange(
ctx,
tx.Bucket(blockSlotIndicesBucket),
filtersMap[filters.StartSlot],
filtersMap[filters.EndSlot],
@@ -472,7 +473,7 @@ func getBlockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFil
// lookup index, we find the intersection across all of them and use
// that list of roots to lookup the block. These block will
// meet the filter criteria.
indices := lookupValuesForIndices(indicesByBucket, tx)
indices := lookupValuesForIndices(ctx, indicesByBucket, tx)
keys := rootsBySlotRange
if len(indices) > 0 {
// If we have found indices that meet the filter criteria, and there are also
@@ -496,6 +497,7 @@ func getBlockRootsByFilter(ctx context.Context, tx *bolt.Tx, f *filters.QueryFil
// range scan using sorted left-padded byte keys using a start slot and an end slot.
// If both the start and end slot are the same, and are 0, the function returns nil.
func fetchBlockRootsBySlotRange(
ctx context.Context,
bkt *bolt.Bucket,
startSlotEncoded interface{},
endSlotEncoded interface{},
@@ -503,6 +505,9 @@ func fetchBlockRootsBySlotRange(
endEpochEncoded interface{},
slotStepEncoded interface{},
) [][]byte {
ctx, span := trace.StartSpan(ctx, "BeaconDB.fetchBlockRootsBySlotRange")
defer span.End()
var startSlot, endSlot, step uint64
var ok bool
if startSlot, ok = startSlotEncoded.(uint64); !ok {
@@ -524,12 +529,12 @@ func fetchBlockRootsBySlotRange(
max := []byte(fmt.Sprintf("%07d", endSlot))
var conditional func(key, max []byte) bool
if endSlot == 0 {
conditional = func(k, max []byte) bool {
return k != nil
conditional = func(key, max []byte) bool {
return key != nil
}
} else {
conditional = func(k, max []byte) bool {
return k != nil && bytes.Compare(k, max) <= 0
conditional = func(key, max []byte) bool {
return key != nil && bytes.Compare(key, max) <= 0
}
}
rootsRange := (endSlot - startSlot) / step
@@ -562,7 +567,9 @@ func fetchBlockRootsBySlotRange(
// createBlockIndicesFromBlock takes in a beacon block and returns
// a map of bolt DB index buckets corresponding to each particular key for indices for
// data, such as (shard indices bucket -> shard 5).
func createBlockIndicesFromBlock(block *ethpb.BeaconBlock) map[string][]byte {
func createBlockIndicesFromBlock(ctx context.Context, block *ethpb.BeaconBlock) map[string][]byte {
ctx, span := trace.StartSpan(ctx, "BeaconDB.createBlockIndicesFromBlock")
defer span.End()
indicesByBucket := make(map[string][]byte)
// Every index has a unique bucket for fast, binary-search
// range scans for filtering across keys.
@@ -589,7 +596,9 @@ func createBlockIndicesFromBlock(block *ethpb.BeaconBlock) map[string][]byte {
// For blocks, these are list of signing roots of block
// objects. If a certain filter criterion does not apply to
// blocks, an appropriate error is returned.
func createBlockIndicesFromFilters(f *filters.QueryFilter) (map[string][]byte, error) {
func createBlockIndicesFromFilters(ctx context.Context, f *filters.QueryFilter) (map[string][]byte, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.createBlockIndicesFromFilters")
defer span.End()
indicesByBucket := make(map[string][]byte)
for k, v := range f.Filters() {
switch k {

View File

@@ -10,6 +10,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/params"
log "github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
var historicalStateDeletedKey = []byte("historical-states-deleted")
@@ -17,6 +18,8 @@ var archivedSlotsPerPointKey = []byte("slots-per-archived-point")
// HistoricalStatesDeleted verifies historical states exist in DB.
func (kv *Store) HistoricalStatesDeleted(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.HistoricalStatesDeleted")
defer span.End()
if !featureconfig.Get().NewStateMgmt {
return kv.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(newStateServiceCompatibleBucket)

View File

@@ -25,7 +25,7 @@ import (
const historicalStatesSize = 80
func (kv *Store) regenHistoricalStates(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStates")
ctx, span := trace.StartSpan(ctx, "BeaconDB.regenHistoricalStates")
defer span.End()
genesisState, err := kv.GenesisState(ctx)
@@ -148,7 +148,7 @@ func regenHistoricalStateTransition(
if signed == nil || signed.Block == nil {
return nil, errors.New("block can't be nil")
}
ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStateTransition")
ctx, span := trace.StartSpan(ctx, "BeaconDB.regenHistoricalStateTransition")
defer span.End()
var err error
state, err = regenHistoricalStateProcessSlots(ctx, state, signed.Block.Slot)
@@ -164,7 +164,7 @@ func regenHistoricalStateTransition(
// This runs slot transition to recompute historical state.
func regenHistoricalStateProcessSlots(ctx context.Context, state *stateTrie.BeaconState, slot uint64) (*stateTrie.BeaconState, error) {
ctx, span := trace.StartSpan(ctx, "db.regenHistoricalStateProcessSlots")
ctx, span := trace.StartSpan(ctx, "BeaconDB.regenHistoricalStateProcessSlots")
defer span.End()
if state == nil {
return nil, errors.New("state can't be nil")
@@ -196,6 +196,8 @@ func regenHistoricalStateProcessSlots(ctx context.Context, state *stateTrie.Beac
// This retrieves the last saved block's archived index.
func (kv *Store) lastSavedBlockArchivedIndex(ctx context.Context) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.lastSavedBlockArchivedIndex")
defer span.End()
b, err := kv.HighestSlotBlocks(ctx)
if err != nil {
return 0, err
@@ -217,7 +219,10 @@ func (kv *Store) lastSavedBlockArchivedIndex(ctx context.Context) (uint64, error
func (kv *Store) saveArchivedInfo(ctx context.Context,
currentState *stateTrie.BeaconState,
blocks []*ethpb.SignedBeaconBlock,
archivedIndex uint64) error {
archivedIndex uint64,
) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.saveArchivedInfo")
defer span.End()
lastBlocksRoot, err := stateutil.BlockRoot(blocks[len(blocks)-1].Block)
if err != nil {
return nil

View File

@@ -2,8 +2,10 @@ package kv
import (
"bytes"
"context"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
// lookupValuesForIndices takes in a list of indices and looks up
@@ -13,7 +15,9 @@ import (
// attestations and we have an index `[]byte("5")` under the shard indices bucket,
// we might find roots `0x23` and `0x45` stored under that index. We can then
// do a batch read for attestations corresponding to those roots.
func lookupValuesForIndices(indicesByBucket map[string][]byte, tx *bolt.Tx) [][][]byte {
func lookupValuesForIndices(ctx context.Context, indicesByBucket map[string][]byte, tx *bolt.Tx) [][][]byte {
ctx, span := trace.StartSpan(ctx, "BeaconDB.lookupValuesForIndices")
defer span.End()
values := make([][][]byte, 0)
for k, v := range indicesByBucket {
bkt := tx.Bucket([]byte(k))
@@ -30,7 +34,9 @@ func lookupValuesForIndices(indicesByBucket map[string][]byte, tx *bolt.Tx) [][]
// updateValueForIndices updates the value for each index by appending it to the previous
// values stored at said index. Typically, indices are roots of data that can then
// be used for reads or batch reads from the DB.
func updateValueForIndices(indicesByBucket map[string][]byte, root []byte, tx *bolt.Tx) error {
func updateValueForIndices(ctx context.Context, indicesByBucket map[string][]byte, root []byte, tx *bolt.Tx) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.updateValueForIndices")
defer span.End()
for k, idx := range indicesByBucket {
bkt := tx.Bucket([]byte(k))
valuesAtIndex := bkt.Get(idx)
@@ -54,7 +60,9 @@ func updateValueForIndices(indicesByBucket map[string][]byte, root []byte, tx *b
}
// deleteValueForIndices clears a root stored at each index.
func deleteValueForIndices(indicesByBucket map[string][]byte, root []byte, tx *bolt.Tx) error {
func deleteValueForIndices(ctx context.Context, indicesByBucket map[string][]byte, root []byte, tx *bolt.Tx) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.deleteValueForIndices")
defer span.End()
for k, idx := range indicesByBucket {
bkt := tx.Bucket([]byte(k))
valuesAtIndex := bkt.Get(idx)

View File

@@ -1,6 +1,7 @@
package kv
import (
"context"
"crypto/rand"
"reflect"
"testing"
@@ -128,7 +129,7 @@ func Test_deleteValueForIndices(t *testing.T) {
t.Fatal(err)
}
}
if err := deleteValueForIndices(tt.inputIndices, tt.root, tx); (err != nil) != tt.wantErr {
if err := deleteValueForIndices(context.Background(), tt.inputIndices, tt.root, tx); (err != nil) != tt.wantErr {
t.Errorf("deleteValueForIndices() error = %v, wantErr %v", err, tt.wantErr)
}
// Check updated indices.