Slasher db (#3270)

* first version of the watchtower api

* first commit

* remove watchtower

* working version

* fix < 0

* gaz

* Update slasher/db/db.go

* remove clear history

* moved constant to config

* gaz

* feedback changes

* compare uint64

* add constant config

* PruneSlasherStoragePeriod change
This commit is contained in:
shayzluf
2019-09-02 18:36:29 +03:00
committed by GitHub
parent 34a163b110
commit b32c19a004
7 changed files with 586 additions and 0 deletions

View File

@@ -96,6 +96,11 @@ type BeaconChainConfig struct {
EmptySignature [96]byte // EmptySignature is used to represent a zeroed out BLS Signature.
DefaultPageSize int // DefaultPageSize defines the default page size for RPC server request.
MaxPageSize int // MaxPageSize defines the max page size for RPC server respond.
// Slasher constants.
WeakSubjectivityPeriod uint64 // WeakSubjectivityPeriod defines the time period expressed in number of epochs were proof of stake network should validate block headers and attestations for slashable events.
PruneSlasherStoragePeriod uint64 // PruneSlasherStoragePeriod defines the time period expressed in number of epochs were proof of stake network should prune attestation and block header store.
}
// DepositContractConfig contains the deposits for
@@ -196,6 +201,10 @@ var defaultBeaconConfig = &BeaconChainConfig{
DefaultPageSize: 250,
MaxPageSize: 500,
// Slasher related values.
WeakSubjectivityPeriod: 54000,
PruneSlasherStoragePeriod: 10,
// Testnet misc values.
TestnetContractEndpoint: "https://beta.prylabs.net/contract", // defines an http endpoint to fetch the testnet contract addr.
}

34
slasher/db/BUILD.bazel Normal file
View File

@@ -0,0 +1,34 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"block_header.go",
"db.go",
"schema.go",
],
importpath = "github.com/prysmaticlabs/prysm/slasher/db",
visibility = ["//slasher:__subpackages__"],
deps = [
"//proto/eth/v1alpha1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/params:go_default_library",
"@com_github_boltdb_bolt//:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = [
"block_header_test.go",
"db_test.go",
],
embed = [":go_default_library"],
deps = [
"//proto/eth/v1alpha1:go_default_library",
"//shared/testutil:go_default_library",
],
)

113
slasher/db/block_header.go Normal file
View File

@@ -0,0 +1,113 @@
package db
import (
"bytes"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/boltdb/bolt"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
)
func createBlockHeader(enc []byte) (*ethpb.BeaconBlockHeader, error) {
protoBlockHeader := &ethpb.BeaconBlockHeader{}
err := proto.Unmarshal(enc, protoBlockHeader)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal encoding")
}
return protoBlockHeader, nil
}
// BlockHeader accepts an epoch and validator id and returns the corresponding block header array.
// Returns nil if the block header for those values does not exist.
func (db *Store) BlockHeader(epoch uint64, validatorID uint64) ([]*ethpb.BeaconBlockHeader, error) {
var bha []*ethpb.BeaconBlockHeader
err := db.view(func(tx *bolt.Tx) error {
c := tx.Bucket(historicBlockHeadersBucket).Cursor()
prefix := encodeEpochValidatorID(epoch, validatorID)
for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
bh, err := createBlockHeader(v)
if err != nil {
return err
}
bha = append(bha, bh)
}
return nil
})
return bha, err
}
// HasBlockHeader accepts an epoch and validator id and returns true if the block header exists.
func (db *Store) HasBlockHeader(epoch uint64, validatorID uint64) bool {
prefix := encodeEpochValidatorID(epoch, validatorID)
var hasBlockHeader bool
// #nosec G104
_ = db.view(func(tx *bolt.Tx) error {
c := tx.Bucket(historicBlockHeadersBucket).Cursor()
for k, _ := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, _ = c.Next() {
hasBlockHeader = true
return nil
}
hasBlockHeader = false
return nil
})
return hasBlockHeader
}
// SaveBlockHeader accepts a block header and writes it to disk.
func (db *Store) SaveBlockHeader(epoch uint64, validatorID uint64, blockHeader *ethpb.BeaconBlockHeader) error {
key := encodeEpochValidatorIDSig(epoch, validatorID, blockHeader.Signature)
enc, err := proto.Marshal(blockHeader)
if err != nil {
return errors.Wrap(err, "failed to encode block")
}
err = db.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicBlockHeadersBucket)
if err := bucket.Put(key, enc); err != nil {
return errors.Wrap(err, "failed to include the block header in the historic block header bucket")
}
return err
})
// prune history to max size every 10th epoch
if epoch%params.BeaconConfig().PruneSlasherStoragePeriod == 0 {
err = db.pruneHistory(epoch, params.BeaconConfig().WeakSubjectivityPeriod)
}
return err
}
// DeleteBlockHeader deletes a block header using the epoch and validator id.
func (db *Store) DeleteBlockHeader(epoch uint64, validatorID uint64, blockHeader *ethpb.BeaconBlockHeader) error {
key := encodeEpochValidatorIDSig(epoch, validatorID, blockHeader.Signature)
return db.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicBlockHeadersBucket)
if err := bucket.Delete(key); err != nil {
return errors.Wrap(err, "failed to delete the block header from historic block header bucket")
}
return bucket.Delete(key)
})
}
func (db *Store) pruneHistory(currentEpoch uint64, historySize uint64) error {
pruneTill := int64(currentEpoch) - int64(historySize)
if pruneTill <= 0 {
return nil
}
return db.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicBlockHeadersBucket)
c := tx.Bucket(historicBlockHeadersBucket).Cursor()
for k, _ := c.First(); k != nil && bytesutil.FromBytes8(k[:8]) <= uint64(pruneTill); k, _ = c.Next() {
if err := bucket.Delete(k); err != nil {
return errors.Wrap(err, "failed to delete the block header from historic block header bucket")
}
}
return nil
})
}

View File

@@ -0,0 +1,254 @@
package db
import (
"reflect"
"testing"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
)
func TestNilDBHistoryBlkHdr(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
epoch := uint64(1)
validatorID := uint64(1)
hasBlockHeader := db.HasBlockHeader(epoch, validatorID)
if hasBlockHeader {
t.Fatal("HasBlockHeader should return false")
}
bPrime, err := db.BlockHeader(epoch, validatorID)
if err != nil {
t.Fatalf("failed to get block: %v", err)
}
if bPrime != nil {
t.Fatalf("get should return nil for a non existent key")
}
}
func TestSaveHistoryBlkHdr(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
tests := []struct {
epoch uint64
vID uint64
bh *ethpb.BeaconBlockHeader
}{
{
epoch: uint64(0),
vID: uint64(0),
bh: &ethpb.BeaconBlockHeader{Signature: []byte("let me in")},
},
{
epoch: uint64(0),
vID: uint64(1),
bh: &ethpb.BeaconBlockHeader{Signature: []byte("let me in 2nd")},
},
{
epoch: uint64(1),
vID: uint64(0),
bh: &ethpb.BeaconBlockHeader{Signature: []byte("let me in 3rd")},
},
}
for _, tt := range tests {
err := db.SaveBlockHeader(tt.epoch, tt.vID, tt.bh)
if err != nil {
t.Fatalf("save block failed: %v", err)
}
bha, err := db.BlockHeader(tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block: %v", err)
}
if bha == nil || !reflect.DeepEqual(bha[0], tt.bh) {
t.Fatalf("get should return bh: %v", bha)
}
}
}
func TestDeleteHistoryBlkHdr(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
tests := []struct {
epoch uint64
vID uint64
bh *ethpb.BeaconBlockHeader
}{
{
epoch: uint64(0),
vID: uint64(0),
bh: &ethpb.BeaconBlockHeader{Signature: []byte("let me in")},
},
{
epoch: uint64(0),
vID: uint64(1),
bh: &ethpb.BeaconBlockHeader{Signature: []byte("let me in 2nd")},
},
{
epoch: uint64(1),
vID: uint64(0),
bh: &ethpb.BeaconBlockHeader{Signature: []byte("let me in 3rd")},
},
}
for _, tt := range tests {
err := db.SaveBlockHeader(tt.epoch, tt.vID, tt.bh)
if err != nil {
t.Fatalf("save block failed: %v", err)
}
}
for _, tt := range tests {
bha, err := db.BlockHeader(tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block: %v", err)
}
if bha == nil || !reflect.DeepEqual(bha[0], tt.bh) {
t.Fatalf("get should return bh: %v", bha)
}
err = db.DeleteBlockHeader(tt.epoch, tt.vID, tt.bh)
if err != nil {
t.Fatalf("save block failed: %v", err)
}
bh, err := db.BlockHeader(tt.epoch, tt.vID)
if err != nil {
t.Fatal(err)
}
if bh != nil {
t.Errorf("Expected block to have been deleted, received: %v", bh)
}
}
}
func TestHasHistoryBlkHdr(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
tests := []struct {
epoch uint64
vID uint64
bh *ethpb.BeaconBlockHeader
}{
{
epoch: uint64(0),
vID: uint64(0),
bh: &ethpb.BeaconBlockHeader{Signature: []byte("let me in")},
},
{
epoch: uint64(0),
vID: uint64(1),
bh: &ethpb.BeaconBlockHeader{Signature: []byte("let me in 2nd")},
},
{
epoch: uint64(1),
vID: uint64(0),
bh: &ethpb.BeaconBlockHeader{Signature: []byte("let me in 3rd")},
},
}
for _, tt := range tests {
found := db.HasBlockHeader(tt.epoch, tt.vID)
if found {
t.Fatal("has block header should return false for block headers that are not in db")
}
err := db.SaveBlockHeader(tt.epoch, tt.vID, tt.bh)
if err != nil {
t.Fatalf("save block failed: %v", err)
}
}
for _, tt := range tests {
err := db.SaveBlockHeader(tt.epoch, tt.vID, tt.bh)
if err != nil {
t.Fatalf("save block failed: %v", err)
}
found := db.HasBlockHeader(tt.epoch, tt.vID)
if !found {
t.Fatal("has block header should return true")
}
}
}
func TestPruneHistoryBlkHdr(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
tests := []struct {
epoch uint64
vID uint64
bh *ethpb.BeaconBlockHeader
}{
{
epoch: uint64(0),
vID: uint64(0),
bh: &ethpb.BeaconBlockHeader{Signature: []byte("let me in")},
},
{
epoch: uint64(0),
vID: uint64(1),
bh: &ethpb.BeaconBlockHeader{Signature: []byte("let me in 2nd")},
},
{
epoch: uint64(1),
vID: uint64(0),
bh: &ethpb.BeaconBlockHeader{Signature: []byte("let me in 3rd")},
},
{
epoch: uint64(2),
vID: uint64(0),
bh: &ethpb.BeaconBlockHeader{Signature: []byte("let me in 4th")},
},
{
epoch: uint64(3),
vID: uint64(0),
bh: &ethpb.BeaconBlockHeader{Signature: []byte("let me in 5th")},
},
}
for _, tt := range tests {
err := db.SaveBlockHeader(tt.epoch, tt.vID, tt.bh)
if err != nil {
t.Fatalf("save block header failed: %v", err)
}
bha, err := db.BlockHeader(tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block header: %v", err)
}
if bha == nil || !reflect.DeepEqual(bha[0], tt.bh) {
t.Fatalf("get should return bh: %v", bha)
}
}
currentEpoch := uint64(3)
historyToKeep := uint64(2)
err := db.pruneHistory(currentEpoch, historyToKeep)
if err != nil {
t.Fatalf("failed to prune: %v", err)
}
for _, tt := range tests {
bha, err := db.BlockHeader(tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block header: %v", err)
}
if tt.epoch > currentEpoch-historyToKeep {
if bha == nil || !reflect.DeepEqual(bha[0], tt.bh) {
t.Fatalf("get should return bh: %v", bha)
}
} else {
if bha != nil {
t.Fatalf("block header should have been pruned: %v", bha)
}
}
}
}

103
slasher/db/db.go Normal file
View File

@@ -0,0 +1,103 @@
package db
import (
"io"
"os"
"path"
"time"
"github.com/boltdb/bolt"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var log = logrus.WithField("prefix", "beacondb")
// Database defines the necessary methods for slasher service which may
// be implemented by any key-value or relational database in practice.
type Database interface {
io.Closer
DatabasePath() string
ClearDB() error
}
// Store defines an implementation of the Prysm Database interface
// using BoltDB as the underlying persistent kv-store for eth2.
type Store struct {
db *bolt.DB
databasePath string
}
// Close closes the underlying boltdb database.
func (db *Store) Close() error {
return db.db.Close()
}
func (db *Store) update(fn func(*bolt.Tx) error) error {
return db.db.Update(fn)
}
func (db *Store) batch(fn func(*bolt.Tx) error) error {
return db.db.Batch(fn)
}
func (db *Store) view(fn func(*bolt.Tx) error) error {
return db.db.View(fn)
}
// NewDB initializes a new DB.
func NewDB(dirPath string) (*Store, error) {
return NewKVStore(dirPath)
}
// ClearDB removes the previously stored directory at the data directory.
func (db *Store) ClearDB() error {
if _, err := os.Stat(db.databasePath); os.IsNotExist(err) {
return nil
}
return os.RemoveAll(db.databasePath)
}
// DatabasePath at which this database writes files.
func (db *Store) DatabasePath() string {
return db.databasePath
}
func createBuckets(tx *bolt.Tx, buckets ...[]byte) error {
for _, bucket := range buckets {
if _, err := tx.CreateBucketIfNotExists(bucket); err != nil {
return err
}
}
return nil
}
// NewKVStore initializes a new boltDB key-value store at the directory
// path specified, creates the kv-buckets based on the schema, and stores
// an open connection db object as a property of the Store struct.
func NewKVStore(dirPath string) (*Store, error) {
if err := os.MkdirAll(dirPath, 0700); err != nil {
return nil, err
}
datafile := path.Join(dirPath, "slasher.db")
boltDB, err := bolt.Open(datafile, 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
if err == bolt.ErrTimeout {
return nil, errors.New("cannot obtain database lock, database may be in use by another process")
}
return nil, err
}
kv := &Store{db: boltDB, databasePath: dirPath}
if err := kv.db.Update(func(tx *bolt.Tx) error {
return createBuckets(
tx,
historicAttestationsBucket,
historicBlockHeadersBucket,
)
}); err != nil {
return nil, err
}
return kv, err
}

54
slasher/db/db_test.go Normal file
View File

@@ -0,0 +1,54 @@
package db
import (
"crypto/rand"
"fmt"
"math/big"
"os"
"path"
"testing"
"github.com/prysmaticlabs/prysm/shared/testutil"
)
var _ = Database(&Store{})
// setupDB instantiates and returns a SlasherDB instance.
func setupDB(t testing.TB) *Store {
randPath, err := rand.Int(rand.Reader, big.NewInt(1000000))
if err != nil {
t.Fatalf("Could not generate random file path: %v", err)
}
path := path.Join(testutil.TempDir(), fmt.Sprintf("/%d", randPath))
if err := os.RemoveAll(path); err != nil {
t.Fatalf("Failed to remove directory: %v", err)
}
db, err := NewDB(path)
if err != nil {
t.Fatalf("Failed to instantiate DB: %v", err)
}
return db
}
// teardownDB cleans up a test BeaconDB instance.
func teardownDB(t testing.TB, db *Store) {
if err := db.Close(); err != nil {
t.Fatalf("Failed to close database: %v", err)
}
if err := os.RemoveAll(db.DatabasePath()); err != nil {
t.Fatalf("Failed to remove directory: %v", err)
}
}
func TestClearDB(t *testing.T) {
slasherDB := setupDB(t)
if err := slasherDB.ClearDB(); err != nil {
t.Fatal(err)
}
if _, err := os.Stat(slasherDB.DatabasePath()); !os.IsNotExist(err) {
t.Fatalf("db wasnt cleared %v", err)
}
}

19
slasher/db/schema.go Normal file
View File

@@ -0,0 +1,19 @@
package db
import (
"github.com/prysmaticlabs/prysm/shared/bytesutil"
)
var (
// Slasher
historicAttestationsBucket = []byte("historic-attestations-bucket")
historicBlockHeadersBucket = []byte("historic-block-headers-bucket")
)
func encodeEpochValidatorID(epoch uint64, validatorID uint64) []byte {
return append(bytesutil.Bytes8(epoch), bytesutil.Bytes8(validatorID)...)
}
func encodeEpochValidatorIDSig(epoch uint64, validatorID uint64, sig []byte) []byte {
return append(append(bytesutil.Bytes8(epoch), bytesutil.Bytes8(validatorID)...), sig...)
}