Implement Attestations DB Methods (#3183)

* begin db interface

* define the database interface

* interface definition simplifications

* include latest message proto

* modify pbs

* rem kv folder

* add filter interface

* lint

* ctx package is great

* interface getting better

* ctx everywhere...it's everywhere!

* block roots method

* new kv store initialization

* comments

* gaz

* implement interface

* refactor for proper naming conventions

* add todos

* proper comments

* rem unused

* add schema

* implementation simplicity

* has validator latest vote func impl

* retrieve validator latest vote

* has idx

* implement missing validator methods

* missing validator methods and test helpers

* validator index crud tests

* validator tests

* save attestation implementation

* attestation basic methods

* batch  save

* all buckets

* refactor with ok bool

* retrieval by root working

* todo for has attestations

* all tests passing, fmt, imports

* generate key use helper

* most att methods complete

* crud tests passing

* closer and closer to filtering all atts

* default no filter

* filter criteria functioning

* simplified conditional

* filter criteria func

* filter criteria

* filter criteria for atts there

* query filter map strategy

* internal filter api complete

* comments

* complete the passing of all other tests using criteria met

* imports

* fix broken build:

* breaking arg

* import sort groups

* keygen outside tx

* address feedback
This commit is contained in:
Raul Jordan
2019-08-13 11:04:33 -05:00
committed by GitHub
parent fa0ef76561
commit 8d8849feed
5 changed files with 297 additions and 17 deletions

View File

@@ -59,13 +59,13 @@ func (q *QueryFilter) Filters() map[FilterType]interface{} {
}
// SetRoot --
func (q *QueryFilter) SetRoot(val [32]byte) *QueryFilter {
func (q *QueryFilter) SetRoot(val []byte) *QueryFilter {
q.queries[Root] = val
return q
}
// SetParentRoot --
func (q *QueryFilter) SetParentRoot(val [32]byte) *QueryFilter {
func (q *QueryFilter) SetParentRoot(val []byte) *QueryFilter {
q.queries[ParentRoot] = val
return q
}

View File

@@ -8,8 +8,8 @@ func TestQueryFilter_ChainsCorrectly(t *testing.T) {
f := NewFilter().
SetStartSlot(2).
SetEndSlot(4).
SetParentRoot([32]byte{3, 4, 5}).
SetRoot([32]byte{}).
SetParentRoot([]byte{3, 4, 5}).
SetRoot([]byte{}).
SetShard(0)
filterSet := f.Filters()
if len(filterSet) != 5 {
@@ -22,7 +22,7 @@ func TestQueryFilter_ChainsCorrectly(t *testing.T) {
case EndSlot:
t.Log(v.(uint64))
case ParentRoot:
t.Log(v.([32]byte))
t.Log(v.([]byte))
case Shard:
t.Log(v.(uint64))
default:

View File

@@ -19,19 +19,24 @@ go_library(
"@com_github_boltdb_bolt//:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"attestations_test.go",
"kv_test.go",
"validators_test.go",
],
embed = [":go_default_library"],
deps = [
"//beacon-chain/db/filters:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_prysmaticlabs_go_ssz//:go_default_library",
],
)

View File

@@ -1,44 +1,186 @@
package kv
import (
"bytes"
"context"
"reflect"
"github.com/boltdb/bolt"
"github.com/gogo/protobuf/proto"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
)
// Attestation retrieval by root.
// TODO(#3164): Implement.
func (k *Store) Attestation(ctx context.Context, attRoot [32]byte) (*ethpb.Attestation, error) {
return nil, nil
att := &ethpb.Attestation{}
err := k.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(attestationsBucket)
c := bkt.Cursor()
for k, v := c.Seek(attRoot[:]); k != nil && bytes.Contains(k, attRoot[:]); k, v = c.Next() {
if v != nil {
return proto.Unmarshal(v, att)
}
}
return nil
})
return att, err
}
// Attestations retrieves a list of attestations by filter criteria.
// TODO(#3164): Implement.
func (k *Store) Attestations(ctx context.Context, f *filters.QueryFilter) ([]*ethpb.Attestation, error) {
return nil, nil
atts := make([]*ethpb.Attestation, 0)
hasFilterSpecified := !reflect.DeepEqual(f, &filters.QueryFilter{}) && f != nil
err := k.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(attestationsBucket)
c := bkt.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
if v != nil && (!hasFilterSpecified || ensureAttestationFilterCriteria(k, f)) {
att := &ethpb.Attestation{}
if err := proto.Unmarshal(v, att); err != nil {
return err
}
atts = append(atts, att)
}
}
return nil
})
return atts, err
}
// HasAttestation checks if an attestation by root exists in the db.
// TODO(#3164): Implement.
func (k *Store) HasAttestation(ctx context.Context, attRoot [32]byte) bool {
return false
exists := false
// #nosec G104. Always returns nil.
k.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(attestationsBucket)
c := bkt.Cursor()
for k, v := c.Seek(attRoot[:]); k != nil && bytes.Contains(k, attRoot[:]); k, v = c.Next() {
if v != nil {
exists = true
return nil
}
}
return nil
})
return exists
}
// DeleteAttestation by root.
// TODO(#3164): Implement.
func (k *Store) DeleteAttestation(ctx context.Context, attRoot [32]byte) error {
return nil
return k.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(attestationsBucket)
c := bkt.Cursor()
for k, v := c.Seek(attRoot[:]); k != nil && bytes.Contains(k, attRoot[:]); k, v = c.Next() {
if v != nil {
return bkt.Delete(k)
}
}
return nil
})
}
// SaveAttestation to the db.
// TODO(#3164): Implement.
func (k *Store) SaveAttestation(ctx context.Context, att *ethpb.Attestation) error {
return nil
key, err := generateAttestationKey(att)
if err != nil {
return err
}
enc, err := proto.Marshal(att)
if err != nil {
return err
}
return k.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(attestationsBucket)
return bucket.Put(key, enc)
})
}
// SaveAttestations via batch updates to the db.
// TODO(#3164): Implement.
func (k *Store) SaveAttestations(ctx context.Context, atts []*ethpb.Attestation) error {
return nil
encodedValues := make([][]byte, len(atts))
keys := make([][]byte, len(atts))
for i := 0; i < len(atts); i++ {
enc, err := proto.Marshal(atts[i])
if err != nil {
return err
}
key, err := generateAttestationKey(atts[i])
if err != nil {
return err
}
encodedValues[i] = enc
keys[i] = key
}
return k.db.Batch(func(tx *bolt.Tx) error {
bucket := tx.Bucket(attestationsBucket)
for i := 0; i < len(atts); i++ {
if err := bucket.Put(keys[i], encodedValues[i]); err != nil {
return err
}
}
return nil
})
}
func generateAttestationKey(att *ethpb.Attestation) ([]byte, error) {
buf := make([]byte, 0)
buf = append(buf, []byte("shard")...)
buf = append(buf, uint64ToBytes(att.Data.Crosslink.Shard)...)
buf = append(buf, []byte("parent-root")...)
buf = append(buf, att.Data.Crosslink.ParentRoot...)
buf = append(buf, []byte("start-epoch")...)
buf = append(buf, uint64ToBytes(att.Data.Crosslink.StartEpoch)...)
buf = append(buf, []byte("end-epoch")...)
buf = append(buf, uint64ToBytes(att.Data.Crosslink.EndEpoch)...)
buf = append(buf, []byte("root")...)
attRoot, err := ssz.HashTreeRoot(att)
if err != nil {
return nil, err
}
buf = append(buf, attRoot[:]...)
return buf, nil
}
// ensureAttestationFilterCriteria uses a set of specified filters
// to ensure the byte key used for db lookups contains the correct values
// requested by the filter. For example, if a key looks like:
// root-0x23923-parent-root-0x49349-start-epoch-3-end-epoch-4-shard-5
// and our filter criteria wants the key to contain shard 5 and
// start epoch 5, the key will NOT meet all the filter criteria and this
// function will return false.
func ensureAttestationFilterCriteria(key []byte, f *filters.QueryFilter) bool {
numCriteriaMet := 0
for k, v := range f.Filters() {
switch k {
case filters.Root:
root := v.([]byte)
if bytes.Contains(key, append([]byte("root"), root[:]...)) {
numCriteriaMet++
}
case filters.ParentRoot:
root := v.([]byte)
if bytes.Contains(key, append([]byte("parent-root"), root[:]...)) {
numCriteriaMet++
}
case filters.StartEpoch:
if bytes.Contains(key, append([]byte("start-epoch"), uint64ToBytes(v.(uint64))...)) {
numCriteriaMet++
}
case filters.EndEpoch:
if bytes.Contains(key, append([]byte("end-epoch"), uint64ToBytes(v.(uint64))...)) {
numCriteriaMet++
}
case filters.Shard:
if bytes.Contains(key, append([]byte("shard"), uint64ToBytes(v.(uint64))...)) {
numCriteriaMet++
}
}
}
return numCriteriaMet == len(f.Filters())
}

View File

@@ -0,0 +1,133 @@
package kv
import (
"context"
"testing"
"github.com/gogo/protobuf/proto"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/db/filters"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
)
func TestStore_AttestationCRUD(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
att := &ethpb.Attestation{
Data: &ethpb.AttestationData{
Crosslink: &ethpb.Crosslink{
Shard: 5,
ParentRoot: []byte("parent"),
StartEpoch: 1,
EndEpoch: 2,
},
},
}
ctx := context.Background()
if err := db.SaveAttestation(ctx, att); err != nil {
t.Fatal(err)
}
attRoot, err := ssz.HashTreeRoot(att)
if err != nil {
t.Fatal(err)
}
if !db.HasAttestation(ctx, attRoot) {
t.Error("Expected attestation to exist in the db")
}
retrievedAtt, err := db.Attestation(ctx, attRoot)
if err != nil {
t.Fatal(err)
}
if !proto.Equal(att, retrievedAtt) {
t.Errorf("Wanted %v, received %v", att, retrievedAtt)
}
if err := db.DeleteAttestation(ctx, attRoot); err != nil {
t.Fatal(err)
}
if db.HasAttestation(ctx, attRoot) {
t.Error("Expected attestation to have been deleted from the db")
}
}
func TestStore_Attestations_FiltersCorrectly(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
atts := []*ethpb.Attestation{
{
Data: &ethpb.AttestationData{
Crosslink: &ethpb.Crosslink{
Shard: 5,
ParentRoot: []byte("parent"),
StartEpoch: 1,
EndEpoch: 2,
},
},
},
{
Data: &ethpb.AttestationData{
Crosslink: &ethpb.Crosslink{
Shard: 5,
ParentRoot: []byte("parent2"),
StartEpoch: 10,
EndEpoch: 11,
},
},
},
{
Data: &ethpb.AttestationData{
Crosslink: &ethpb.Crosslink{
Shard: 4,
ParentRoot: []byte("parent3"),
StartEpoch: 1,
EndEpoch: 20,
},
},
},
}
ctx := context.Background()
if err := db.SaveAttestations(ctx, atts); err != nil {
t.Fatal(err)
}
tests := []struct {
filter *filters.QueryFilter
expectedNumAtt int
}{
{
filter: filters.NewFilter().SetShard(5),
expectedNumAtt: 2,
},
{
filter: filters.NewFilter().SetStartEpoch(1),
expectedNumAtt: 2,
},
{
filter: filters.NewFilter().SetParentRoot([]byte("parent3")),
expectedNumAtt: 1,
},
{
// Only a single attestation in the list meets the composite filter criteria above.
filter: filters.NewFilter().SetShard(5).SetStartEpoch(1),
expectedNumAtt: 1,
},
{
// No specified filter should return all attestations.
filter: nil,
expectedNumAtt: 3,
},
{
// No attestation meets the criteria below.
filter: filters.NewFilter().SetShard(1000),
expectedNumAtt: 0,
},
}
for _, tt := range tests {
retrievedAtts, err := db.Attestations(ctx, tt.filter)
if err != nil {
t.Fatal(err)
}
if len(retrievedAtts) != tt.expectedNumAtt {
t.Errorf("Expected %d attestations, received %d", tt.expectedNumAtt, len(retrievedAtts))
}
}
}