Implement Validator DB Methods (#3172)

* begin db interface

* define the database interface

* interface definition simplifications

* include latest message proto

* modify pbs

* rem kv folder

* add filter interface

* lint

* ctx package is great

* interface getting better

* ctx everywhere...it's everywhere!

* block roots method

* new kv store initialization

* comments

* gaz

* implement interface

* refactor for proper naming conventions

* add todos

* proper comments

* rem unused

* add schema

* implementation simplicity

* has validator latest vote func impl

* retrieve validator latest vote

* has idx

* implement missing validator methods

* missing validator methods and test helpers

* validator index crud tests

* validator tests

* all buckets

* refactor with ok bool

* all tests passing, fmt, imports
This commit is contained in:
Raul Jordan
2019-08-12 14:33:07 -05:00
committed by GitHub
parent 715b9cd5ba
commit 6bd8ae8f67
7 changed files with 224 additions and 21 deletions

View File

@@ -9,11 +9,10 @@ import (
"github.com/boltdb/bolt"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/sirupsen/logrus"
)
var log = logrus.WithField("prefix", "beacondb")
@@ -43,7 +42,7 @@ type Database interface {
State(ctx context.Context, f *filters.QueryFilter) (*pb.BeaconState, error)
HeadState(ctx context.Context) (*pb.BeaconState, error)
SaveState(ctx context.Context, state *pb.BeaconState, blockRoot [32]byte) error
ValidatorIndex(ctx context.Context, publicKey [48]byte) (uint64, error)
ValidatorIndex(ctx context.Context, publicKey [48]byte) (uint64, bool, error)
HasValidatorIndex(ctx context.Context, publicKey [48]byte) bool
DeleteValidatorIndex(ctx context.Context, publicKey [48]byte) error
SaveValidatorIndex(ctx context.Context, publicKey [48]byte, validatorIdx uint64) error

View File

@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
@@ -6,6 +6,7 @@ go_library(
"attestations.go",
"blocks.go",
"kv.go",
"schema.go",
"state.go",
"validators.go",
],
@@ -16,6 +17,21 @@ go_library(
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"@com_github_boltdb_bolt//:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_pkg_errors//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"kv_test.go",
"validators_test.go",
],
embed = [":go_default_library"],
deps = [
"//proto/beacon/p2p/v1:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
],
)

View File

@@ -35,8 +35,7 @@ func NewKVStore(dirPath string) (*Store, error) {
kv := &Store{db: boltDB, DatabasePath: dirPath}
if err := kv.db.Update(func(tx *bolt.Tx) error {
// TODO(#3164): Update buckets.
return nil
return createBuckets(tx, validatorsBucket, attestationsBucket, blocksBucket, stateBucket)
}); err != nil {
return nil, err
}
@@ -56,3 +55,12 @@ func (k *Store) ClearDB() error {
func (k *Store) Close() error {
return k.db.Close()
}
func createBuckets(tx *bolt.Tx, buckets ...[]byte) error {
for _, bucket := range buckets {
if _, err := tx.CreateBucketIfNotExists(bucket); err != nil {
return err
}
}
return nil
}

View File

@@ -0,0 +1,39 @@
package kv
import (
"crypto/rand"
"fmt"
"math/big"
"os"
"path"
"testing"
"github.com/prysmaticlabs/prysm/shared/testutil"
)
// setupDB instantiates and returns a Store instance.
func setupDB(t testing.TB) *Store {
randPath, err := rand.Int(rand.Reader, big.NewInt(1000000))
if err != nil {
t.Fatalf("Could not generate random file path: %v", err)
}
path := path.Join(testutil.TempDir(), fmt.Sprintf("/%d", randPath))
if err := os.RemoveAll(path); err != nil {
t.Fatalf("Failed to remove directory: %v", err)
}
db, err := NewKVStore(path)
if err != nil {
t.Fatalf("Failed to instantiate DB: %v", err)
}
return db
}
// teardownDB cleans up a test Store instance.
func teardownDB(t testing.TB, db *Store) {
if err := db.Close(); err != nil {
t.Fatalf("Failed to close database: %v", err)
}
if err := os.RemoveAll(db.DatabasePath); err != nil {
t.Fatalf("Failed to remove directory: %v", err)
}
}

View File

@@ -0,0 +1,14 @@
package kv
// The schema will define how to store and retrieve data from the db.
// we can prefix or suffix certain values such as `block` with attributes
// for prefix-wide scans across the underlying BoltDB buckets when filtering data.
// For example, we might store attestations as shard + attestation_root -> attestation, making
// it easy to scan for keys that have a certain shard number as a prefix and return those
// corresponding attestations.
var (
attestationsBucket = []byte("attestations")
blocksBucket = []byte("blocks")
validatorsBucket = []byte("validators")
stateBucket = []byte("state")
)

View File

@@ -1,49 +1,109 @@
package kv
import (
"bytes"
"context"
"encoding/binary"
"github.com/boltdb/bolt"
"github.com/gogo/protobuf/proto"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)
// ValidatorLatestVote retrieval by validator index.
// TODO(#3164): Implement.
func (k *Store) ValidatorLatestVote(ctx context.Context, validatorIdx uint64) (*pb.ValidatorLatestVote, error) {
return nil, nil
buf := uint64ToBytes(validatorIdx)
latestVote := &pb.ValidatorLatestVote{}
err := k.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(validatorsBucket)
enc := bkt.Get(buf)
if enc == nil {
return nil
}
return proto.Unmarshal(enc, latestVote)
})
return latestVote, err
}
// HasValidatorLatestVote verifies if a validator index has a latest vote stored in the db.
// TODO(#3164): Implement.
func (k *Store) HasValidatorLatestVote(ctx context.Context, validatorIdx uint64) bool {
return false
buf := uint64ToBytes(validatorIdx)
exists := false
// #nosec G104. Always returns nil.
k.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(validatorsBucket)
exists = bkt.Get(buf) != nil
return nil
})
return exists
}
// SaveValidatorLatestVote by validator index.
// TODO(#3164): Implement.
func (k *Store) SaveValidatorLatestVote(ctx context.Context, validatorIdx uint64, vote *pb.ValidatorLatestVote) error {
return nil
buf := uint64ToBytes(validatorIdx)
enc, err := proto.Marshal(vote)
if err != nil {
return err
}
return k.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(validatorsBucket)
return bucket.Put(buf, enc)
})
}
// ValidatorIndex by public key.
// TODO(#3164): Implement.
func (k *Store) ValidatorIndex(ctx context.Context, publicKey [48]byte) (uint64, error) {
return 0, nil
func (k *Store) ValidatorIndex(ctx context.Context, publicKey [48]byte) (uint64, bool, error) {
var validatorIdx uint64
var ok bool
err := k.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(validatorsBucket)
enc := bkt.Get(publicKey[:])
if enc == nil {
return nil
}
var err error
buf := bytes.NewBuffer(enc)
validatorIdx, err = binary.ReadUvarint(buf)
if err != nil {
return err
}
ok = true
return nil
})
return validatorIdx, ok, err
}
// HasValidatorIndex verifies if a validator's index by public key exists in the db.
// TODO(#3164): Implement.
func (k *Store) HasValidatorIndex(ctx context.Context, publicKey [48]byte) bool {
return false
exists := false
// #nosec G104. Always returns nil.
k.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(validatorsBucket)
exists = bkt.Get(publicKey[:]) != nil
return nil
})
return exists
}
// DeleteValidatorIndex clears a validator index from the db by the validator's public key.
// TODO(#3164): Implement.
func (k *Store) DeleteValidatorIndex(ctx context.Context, publicKey [48]byte) error {
return nil
return k.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(validatorsBucket)
return bucket.Delete(publicKey[:])
})
}
// SaveValidatorIndex by public key in the db.
// TODO(#3164): Implement.
func (k *Store) SaveValidatorIndex(ctx context.Context, publicKey [48]byte, validatorIdx uint64) error {
return nil
return k.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(validatorsBucket)
buf := uint64ToBytes(validatorIdx)
return bucket.Put(publicKey[:], buf)
})
}
func uint64ToBytes(i uint64) []byte {
buf := make([]byte, 8)
binary.LittleEndian.PutUint64(buf, i)
return buf
}

View File

@@ -0,0 +1,67 @@
package kv
import (
"context"
"testing"
"github.com/gogo/protobuf/proto"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)
func TestStore_ValidatorIndexCRUD(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
validatorIdx := uint64(100)
pubKey := [48]byte{1, 2, 3, 4}
ctx := context.Background()
_, ok, err := db.ValidatorIndex(ctx, pubKey)
if err != nil {
t.Fatal(err)
}
if ok {
t.Fatal("Expected validator index to not exist")
}
if err := db.SaveValidatorIndex(ctx, pubKey, validatorIdx); err != nil {
t.Fatal(err)
}
retrievedIdx, ok, err := db.ValidatorIndex(ctx, pubKey)
if err != nil {
t.Fatal(err)
}
if !ok {
t.Fatal("Expected validator index to have been properly retrieved")
}
if retrievedIdx != validatorIdx {
t.Errorf("Wanted %d, received %d", validatorIdx, retrievedIdx)
}
if err := db.DeleteValidatorIndex(ctx, pubKey); err != nil {
t.Fatal(err)
}
if db.HasValidatorIndex(ctx, pubKey) {
t.Error("Expected validator index to have been deleted from the db")
}
}
func TestStore_ValidatorLatestVoteCRUD(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
validatorIdx := uint64(100)
latestVote := &pb.ValidatorLatestVote{
Epoch: 1,
Root: []byte("root"),
}
ctx := context.Background()
if err := db.SaveValidatorLatestVote(ctx, validatorIdx, latestVote); err != nil {
t.Fatal(err)
}
if !db.HasValidatorLatestVote(ctx, validatorIdx) {
t.Error("Expected validator latest vote to exist in the db")
}
retrievedVote, err := db.ValidatorLatestVote(ctx, validatorIdx)
if err != nil {
t.Fatal(err)
}
if !proto.Equal(latestVote, retrievedVote) {
t.Errorf("Wanted %d, received %d", latestVote, retrievedVote)
}
}