New spanner db structure (#6061)

* new spanner db structure

* lint fixes

* go mod fix

* fix iface

* remove unused

* remove extra line

* change from db

* exported field

* exported field

* revert to original

* fix

* ivan feedback

* ivan feedback

* ivan feedback

* revert mod changes

* fix db impl

* gaz

* import fix

* Try to fix tests

* ivan feedback

* new epoch store

* added comment

* fix error

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: Ivan Martinez <ivanthegreatdev@gmail.com>
This commit is contained in:
Shay Zluf
2020-06-02 17:41:21 +03:00
committed by GitHub
parent d152b48815
commit fd19fd10a9
11 changed files with 564 additions and 8 deletions

View File

@@ -153,12 +153,12 @@ func (k *Store) SaveAttestation(ctx context.Context, att *ethpb.Attestation) err
return err
}
err := k.db.Update(func(tx *bolt.Tx) error {
attDataRoot, err := ssz.HashTreeRoot(att.Data)
if err != nil {
return err
}
attDataRoot, err := ssz.HashTreeRoot(att.Data)
if err != nil {
return err
}
err = k.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(attestationsBucket)
ac := &dbpb.AttestationContainer{
Data: att.Data,

View File

@@ -6,6 +6,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/slasher/db/iface",
visibility = ["//slasher/db:__subpackages__"],
deps = [
"//slasher/db/kv:go_default_library",
"//slasher/db/types:go_default_library",
"//slasher/detection/attestations/types:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",

View File

@@ -8,6 +8,7 @@ import (
"io"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/slasher/db/kv"
"github.com/prysmaticlabs/prysm/slasher/db/types"
detectionTypes "github.com/prysmaticlabs/prysm/slasher/detection/attestations/types"
)
@@ -31,6 +32,7 @@ type ReadOnlyDatabase interface {
LatestIndexedAttestationsTargetEpoch(ctx context.Context) (uint64, error)
// MinMaxSpan related methods.
EpochSpans(ctx context.Context, epoch uint64) (kv.EpochStore, error)
EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]detectionTypes.Span, bool, error)
EpochSpanByValidatorIndex(ctx context.Context, validatorIdx uint64, epoch uint64) (detectionTypes.Span, error)
EpochsSpanByValidatorsIndices(ctx context.Context, validatorIndices []uint64, maxEpoch uint64) (map[uint64]map[uint64]detectionTypes.Span, error)
@@ -68,6 +70,7 @@ type WriteAccessDatabase interface {
PruneAttHistory(ctx context.Context, currentEpoch uint64, pruningEpochAge uint64) error
// MinMaxSpan related methods.
SaveEpochSpans(ctx context.Context, epoch uint64, spans kv.EpochStore) error
SaveEpochSpansMap(ctx context.Context, epoch uint64, spanMap map[uint64]detectionTypes.Span) error
SaveValidatorEpochSpan(ctx context.Context, validatorIdx uint64, epoch uint64, spans detectionTypes.Span) error
SaveCachedSpansMaps(ctx context.Context) error
@@ -98,7 +101,12 @@ type FullAccessDatabase interface {
type Database interface {
io.Closer
FullAccessDatabase
DatabasePath() string
ClearDB() error
}
// EpochSpansStore represents a data access layer for marshaling and unmarshaling validator spans for each validator per epoch.
type EpochSpansStore interface {
SetValidatorSpan(ctx context.Context, idx uint64, newSpan detectionTypes.Span) error
GetValidatorSpan(ctx context.Context, idx uint64) (detectionTypes.Span, error)
}

View File

@@ -7,11 +7,13 @@ go_library(
"attester_slashings.go",
"block_header.go",
"chain_data.go",
"epoch_store.go",
"indexed_attestations.go",
"kv.go",
"proposer_slashings.go",
"schema.go",
"spanner.go",
"spanner_new.go",
"validator_id_pubkey.go",
],
importpath = "github.com/prysmaticlabs/prysm/slasher/db/kv",
@@ -39,11 +41,14 @@ go_test(
name = "go_default_test",
srcs = [
"attester_slashings_test.go",
"benchmark_test.go",
"block_header_test.go",
"chain_data_test.go",
"epoch_store_test.go",
"indexed_attestations_test.go",
"kv_test.go",
"proposer_slashings_test.go",
"spanner_new_test.go",
"spanner_test.go",
"validator_id_pubkey_test.go",
],

View File

@@ -0,0 +1,136 @@
package kv
import (
"context"
"flag"
"testing"
"github.com/prysmaticlabs/prysm/slasher/detection/attestations/types"
"github.com/urfave/cli/v2"
)
const (
benchmarkValidator = 300000
)
func BenchmarkStore_SaveEpochSpans(b *testing.B) {
ctx := context.Background()
sigBytes := [2]byte{}
app := cli.App{}
set := flag.NewFlagSet("test", 0)
db := setupDB(b, cli.NewContext(&app, set, nil))
es := EpochStore{}
err := es.SetValidatorSpan(ctx, benchmarkValidator, types.Span{MinSpan: 1, MaxSpan: 2, SigBytes: sigBytes, HasAttested: true})
if err != nil {
b.Error(err)
}
for i := 0; i < benchmarkValidator; i++ {
err = es.SetValidatorSpan(ctx, uint64(i), types.Span{MinSpan: 1, MaxSpan: 2, SigBytes: sigBytes, HasAttested: true})
if err != nil {
b.Error(err)
}
}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := db.SaveEpochSpans(ctx, uint64(i%54000), es)
if err != nil {
b.Fatalf("Save validator span map failed: %v", err)
}
}
}
func BenchmarkStore_EpochSpans(b *testing.B) {
app := cli.App{}
set := flag.NewFlagSet("test", 0)
db := setupDB(b, cli.NewContext(&app, set, nil))
ctx := context.Background()
sigBytes := [2]byte{}
es := EpochStore{}
err := es.SetValidatorSpan(ctx, benchmarkValidator, types.Span{MinSpan: 1, MaxSpan: 2, SigBytes: sigBytes, HasAttested: true})
if err != nil {
b.Error(err)
}
for i := 0; i < benchmarkValidator; i++ {
err = es.SetValidatorSpan(ctx, uint64(i), types.Span{MinSpan: 1, MaxSpan: 2, SigBytes: sigBytes, HasAttested: true})
if err != nil {
b.Error(err)
}
}
b.Log(len(es))
for i := 0; i < 200; i++ {
err := db.SaveEpochSpans(ctx, uint64(i), es)
if err != nil {
b.Fatalf("Save validator span map failed: %v", err)
}
}
b.Log(db.db.Info())
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := db.EpochSpans(ctx, uint64(i%200))
if err != nil {
b.Fatalf("Read validator span map failed: %v", err)
}
}
}
func BenchmarkStore_GetValidatorSpan(b *testing.B) {
ctx := context.Background()
sigBytes := [2]byte{}
es := EpochStore{}
err := es.SetValidatorSpan(ctx, benchmarkValidator, types.Span{MinSpan: 1, MaxSpan: 2, SigBytes: sigBytes, HasAttested: true})
if err != nil {
b.Error(err)
}
for i := 0; i < benchmarkValidator; i++ {
err = es.SetValidatorSpan(ctx, uint64(i), types.Span{MinSpan: uint16(i), MaxSpan: uint16(benchmarkValidator - i), SigBytes: sigBytes, HasAttested: true})
if err != nil {
b.Error(err)
}
}
b.Log(len(es))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := es.GetValidatorSpan(ctx, uint64(i%benchmarkValidator))
if err != nil {
b.Fatalf("Read validator span map failed: %v", err)
}
}
}
func BenchmarkStore_SetValidatorSpan(b *testing.B) {
ctx := context.Background()
sigBytes := [2]byte{}
es := EpochStore{}
err := es.SetValidatorSpan(ctx, benchmarkValidator, types.Span{MinSpan: 1, MaxSpan: 2, SigBytes: sigBytes, HasAttested: true})
if err != nil {
b.Error(err)
}
for i := 0; i < benchmarkValidator; i++ {
err = es.SetValidatorSpan(ctx, uint64(i), types.Span{MinSpan: uint16(i), MaxSpan: uint16(benchmarkValidator - i), SigBytes: sigBytes, HasAttested: true})
if err != nil {
b.Error(err)
}
}
b.Log(len(es))
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
err := es.SetValidatorSpan(ctx, uint64(i%benchmarkValidator), types.Span{MinSpan: uint16(i), MaxSpan: uint16(benchmarkValidator - i), SigBytes: sigBytes, HasAttested: true})
if err != nil {
b.Fatalf("Read validator span map failed: %v", err)
}
}
}

View File

@@ -0,0 +1,77 @@
package kv
import (
"context"
"errors"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/slasher/detection/attestations/types"
)
// EpochStore defines an implementation of the slasher data access interface
// using byte array as data source to extract and put validator spans into.
type EpochStore []byte
// ErrWrongSize appears when attempting to use epoch store byte array with size that
// is not a multiple of spanner encoded length.
var ErrWrongSize = errors.New("wrong data length for min max span byte array")
var highestObservedValidatorIdx uint64
// NewEpochStore initialize epoch store from a byte array
// returns error if byte length is not a multiple of encoded spanner length.
func NewEpochStore(spans []byte) (EpochStore, error) {
if len(spans)%spannerEncodedLength != 0 {
return nil, ErrWrongSize
}
es := EpochStore{}
es = spans
return es, nil
}
// GetValidatorSpan unmarshal a span from an encoded, flattened array.
func (es EpochStore) GetValidatorSpan(ctx context.Context, idx uint64) (types.Span, error) {
r := types.Span{}
if len(es)%spannerEncodedLength != 0 {
return r, ErrWrongSize
}
origLength := uint64(len(es)) / spannerEncodedLength
requestedLength := idx + 1
if origLength < requestedLength {
return r, nil
}
cursor := idx * spannerEncodedLength
r.MinSpan = bytesutil.FromBytes2(es[cursor : cursor+2])
r.MaxSpan = bytesutil.FromBytes2(es[cursor+2 : cursor+4])
sigB := [2]byte{}
copy(sigB[:], es[cursor+4:cursor+6])
r.SigBytes = sigB
r.HasAttested = bytesutil.ToBool(es[cursor+6])
return r, nil
}
// SetValidatorSpan marshal a validator span into an encoded, flattened array.
func (es *EpochStore) SetValidatorSpan(ctx context.Context, idx uint64, newSpan types.Span) error {
if len(*es)%spannerEncodedLength != 0 {
return errors.New("wrong data length for min max span byte array")
}
if highestObservedValidatorIdx < idx {
highestObservedValidatorIdx = idx
}
if len(*es) == 0 {
requestedLength := highestObservedValidatorIdx*spannerEncodedLength + spannerEncodedLength
*es = make([]byte, requestedLength, requestedLength)
}
cursor := idx * spannerEncodedLength
endCursor := cursor + spannerEncodedLength
spansLength := uint64(len(*es))
if endCursor > spansLength {
diff := endCursor - spansLength
b := make([]byte, diff, diff)
*es = append(*es, b...)
}
enc := marshalSpan(newSpan)
ba := *es
copy(ba[cursor:], enc)
return nil
}

View File

@@ -0,0 +1,165 @@
package kv
import (
"context"
"encoding/hex"
"reflect"
"testing"
"github.com/prysmaticlabs/prysm/slasher/detection/attestations/types"
)
type spansValueTests struct {
name string
validatorID uint64
oldSpans string
spansLength uint64
validatorSpan types.Span
err error
}
var exampleSpansValues []spansValueTests
func init() {
exampleSpansValues = []spansValueTests{
{
name: "Validator 0 first time",
validatorSpan: types.Span{
MinSpan: 1,
MaxSpan: 2,
SigBytes: [2]byte{1, 1},
HasAttested: false,
},
spansLength: spannerEncodedLength,
validatorID: 0,
},
{
name: "Validator 300000 first time",
validatorSpan: types.Span{
MinSpan: 256,
MaxSpan: 677,
SigBytes: [2]byte{255, 250},
HasAttested: true,
},
validatorID: 300000,
spansLength: spannerEncodedLength*300000 + spannerEncodedLength,
},
{
name: "Validator 1 with highestObservedValidatorIdx 300000",
validatorSpan: types.Span{
MinSpan: 54000,
MaxSpan: 54001,
SigBytes: [2]byte{250, 255},
HasAttested: true,
},
validatorID: 1,
spansLength: spannerEncodedLength*300000 + spannerEncodedLength,
},
{
name: "Validator 0 not with old spans(disregards the highestObservedValidatorIdx)",
validatorSpan: types.Span{
MinSpan: 65535,
MaxSpan: 65535,
SigBytes: [2]byte{255, 255},
HasAttested: true,
},
validatorID: 0,
oldSpans: "01000000000000",
spansLength: spannerEncodedLength,
},
}
}
func TestStore_GetValidatorSpan(t *testing.T) {
ctx := context.Background()
tooSmall, err := hex.DecodeString("000000")
if err != nil {
t.Fatal(err)
}
es, err := NewEpochStore(tooSmall)
if err != ErrWrongSize {
t.Error("expected error")
}
//nil es
span, err := es.GetValidatorSpan(ctx, 1)
if !reflect.DeepEqual(span, types.Span{}) {
t.Errorf("Expected empty span to be returned: %v", span)
}
tooBig, err := hex.DecodeString("0000000000000000")
if err != nil {
t.Fatal(err)
}
es = tooBig
span, err = es.GetValidatorSpan(ctx, 1)
if !reflect.DeepEqual(span, types.Span{}) {
t.Errorf("Expected empty span to be returned: %v", span)
}
if err != ErrWrongSize {
t.Error("Expected error")
}
oneValidator, err := hex.DecodeString("01010101010101")
if err != nil {
t.Fatal(err)
}
es = oneValidator
span, err = es.GetValidatorSpan(ctx, 0)
if !reflect.DeepEqual(span, types.Span{MinSpan: 257, MaxSpan: 257, SigBytes: [2]byte{1, 1}, HasAttested: true}) {
t.Errorf("Expected types.Span{MinSpan: 1, MaxSpan: 1, SigBytes: [2]byte{1, 1}, HasAttested: true} to be returned: %v", span)
}
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
span, err = es.GetValidatorSpan(ctx, 1)
if !reflect.DeepEqual(span, types.Span{}) {
t.Errorf("Expected empty span to be returned: %v", span)
}
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
twoValidator, err := hex.DecodeString("0101010101010101010101010101")
if err != nil {
t.Fatal(err)
}
es = twoValidator
span, err = es.GetValidatorSpan(ctx, 0)
if !reflect.DeepEqual(span, types.Span{MinSpan: 257, MaxSpan: 257, SigBytes: [2]byte{1, 1}, HasAttested: true}) {
t.Errorf("Expected types.Span{MinSpan: 1, MaxSpan: 1, SigBytes: [2]byte{1, 1}, HasAttested: true} to be returned: %v", span)
}
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
span, err = es.GetValidatorSpan(ctx, 1)
if !reflect.DeepEqual(span, types.Span{MinSpan: 257, MaxSpan: 257, SigBytes: [2]byte{1, 1}, HasAttested: true}) {
t.Errorf("Expected types.Span{MinSpan: 1, MaxSpan: 1, SigBytes: [2]byte{1, 1}, HasAttested: true} to be returned: %v", span)
}
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
}
func TestStore_SetValidatorSpan(t *testing.T) {
ctx := context.Background()
for _, tt := range exampleSpansValues {
oldSpans, err := hex.DecodeString(tt.oldSpans)
if err != nil {
t.Fatal(err)
}
es, err := NewEpochStore(oldSpans)
if err != tt.err {
t.Errorf("Expected error: %v got: %v", tt.err, err)
}
err = es.SetValidatorSpan(ctx, tt.validatorID, tt.validatorSpan)
if uint64(len(es)) != tt.spansLength {
t.Errorf("Expected spans length: %d got: %d", tt.spansLength, len(es))
}
span, err := es.GetValidatorSpan(ctx, tt.validatorID)
if err != nil {
t.Errorf("Got error while trying to get span from spans byte array: %v", err)
}
if !reflect.DeepEqual(span, tt.validatorSpan) {
t.Errorf("Expected validator span: %v got: %v ", tt.validatorSpan, span)
}
}
}

View File

@@ -115,6 +115,7 @@ func NewKVStore(dirPath string, cfg *Config) (*Store, error) {
compressedIdxAttsBucket,
validatorsPublicKeysBucket,
validatorsMinMaxSpanBucket,
validatorsMinMaxSpanBucketNew,
slashingBucket,
chainDataBucket,
)

View File

@@ -8,7 +8,7 @@ import (
const (
latestEpochKey = "LATEST_EPOCH_DETECTED"
chainHeadKey = "CHAIN_HEAD"
// spannerEncodedLength the byte length of validator span data structure.
spannerEncodedLength = 7
)
@@ -25,7 +25,8 @@ var (
// In order to quickly detect surround and surrounded attestations we need to store
// the min and max span for each validator for each epoch.
// see https://github.com/protolambda/eth2-surround/blob/master/README.md#min-max-surround
validatorsMinMaxSpanBucket = []byte("validators-min-max-span-bucket")
validatorsMinMaxSpanBucket = []byte("validators-min-max-span-bucket")
validatorsMinMaxSpanBucketNew = []byte("validators-min-max-span-bucket-new")
)
func encodeSlotValidatorID(slot uint64, validatorID uint64) []byte {

View File

@@ -0,0 +1,50 @@
package kv
import (
"context"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
bolt "go.etcd.io/bbolt"
"go.opencensus.io/trace"
)
// EpochSpans accepts epoch and returns the corresponding spans byte array
// for slashing detection.
// Returns span byte array, and error in case of db error.
// returns empty byte array if no entry for this epoch exists in db.
func (db *Store) EpochSpans(ctx context.Context, epoch uint64) (EpochStore, error) {
ctx, span := trace.StartSpan(ctx, "slasherDB.EpochSpans")
defer span.End()
var err error
var spans []byte
err = db.view(func(tx *bolt.Tx) error {
b := tx.Bucket(validatorsMinMaxSpanBucketNew)
if b == nil {
return nil
}
spans = b.Get(bytesutil.Bytes8(epoch))
return nil
})
if spans == nil {
spans = []byte{}
}
return spans, err
}
// SaveEpochSpans accepts a epoch and span byte array and writes it to disk.
func (db *Store) SaveEpochSpans(ctx context.Context, epoch uint64, es EpochStore) error {
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveEpochSpans")
defer span.End()
if len(es)%spannerEncodedLength != 0 {
return ErrWrongSize
}
return db.update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(validatorsMinMaxSpanBucketNew)
if err != nil {
return err
}
return b.Put(bytesutil.Bytes8(epoch), es)
})
}

View File

@@ -0,0 +1,112 @@
package kv
import (
"context"
"encoding/hex"
"flag"
"reflect"
"testing"
"github.com/prysmaticlabs/prysm/slasher/detection/attestations/types"
"github.com/urfave/cli/v2"
)
type spansTestStruct struct {
name string
epoch uint64
spansHex string
spansResultHex string
validator1Span types.Span
err error
}
var spanNewTests []spansTestStruct
func init() {
spanNewTests = []spansTestStruct{
{
name: "span too small",
epoch: 1,
spansHex: "00000000",
spansResultHex: "",
validator1Span: types.Span{},
err: ErrWrongSize,
},
{
name: "no validator 1 in spans",
epoch: 2,
spansHex: "00000000000000",
spansResultHex: "00000000000000",
validator1Span: types.Span{},
err: nil,
},
{
name: "validator 1 in spans",
epoch: 3,
spansHex: "0000000000000001000000000000",
spansResultHex: "0000000000000001000000000000",
validator1Span: types.Span{MinSpan: 1},
err: nil,
},
}
}
func TestValidatorSpans_NilDB(t *testing.T) {
app := cli.App{}
set := flag.NewFlagSet("test", 0)
db := setupDB(t, cli.NewContext(&app, set, nil))
ctx := context.Background()
validatorIdx := uint64(1)
es, err := db.EpochSpans(ctx, validatorIdx)
if err != nil {
t.Fatalf("Nil EpochSpansMap should not return error: %v", err)
}
if !reflect.DeepEqual(es, EpochStore{}) {
t.Fatal("EpochSpans should return empty byte array if no record exists in the db")
}
}
func TestStore_SaveReadEpochSpans(t *testing.T) {
app := cli.App{}
set := flag.NewFlagSet("test", 0)
db := setupDB(t, cli.NewContext(&app, set, nil))
ctx := context.Background()
for _, tt := range spanNewTests {
t.Run(tt.name, func(t *testing.T) {
spans, err := hex.DecodeString(tt.spansHex)
if err != nil {
t.Fatal(err)
}
es := EpochStore{}
es = spans
err = db.SaveEpochSpans(ctx, tt.epoch, es)
if err != tt.err {
t.Fatalf("Failed to get the right error expected: %v got: %v", tt.err, err)
}
sm, err := db.EpochSpans(ctx, tt.epoch)
if err != nil {
t.Fatalf("Failed to get validator spans: %v", err)
}
spansResult, err := hex.DecodeString(tt.spansResultHex)
if err != nil {
t.Fatal(err)
}
esr := EpochStore{}
esr = spansResult
if !reflect.DeepEqual(sm, esr) {
t.Fatalf("Get should return validator spans: %v got: %v", spansResult, sm)
}
s, err := es.GetValidatorSpan(ctx, 1)
if err != tt.err {
t.Fatalf("Failed to get validator 1 span: %v", err)
}
if !reflect.DeepEqual(s, tt.validator1Span) {
t.Fatalf("Get should return validator span for validator 2: %v got: %v", tt.validator1Span, s)
}
})
}
}