Add spans to Slasher DB functions (#4855)

* Add interface and move slashing types to /types package

* Add spans for all DB functions

* Fix packages

* Fix func call
This commit is contained in:
Ivan Martinez
2020-02-13 12:51:30 -07:00
committed by GitHub
parent c44a30672e
commit 2473680759
23 changed files with 400 additions and 284 deletions

View File

@@ -1,6 +1,7 @@
package iface
import (
"context"
"io"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
@@ -11,64 +12,64 @@ import (
// ReadOnlyDatabase represents a read only database with functions that do not modify the DB.
type ReadOnlyDatabase interface {
// AttesterSlashing related methods.
AttesterSlashings(status types.SlashingStatus) ([]*ethpb.AttesterSlashing, error)
DeleteAttesterSlashing(attesterSlashing *ethpb.AttesterSlashing) error
HasAttesterSlashing(slashing *ethpb.AttesterSlashing) (bool, types.SlashingStatus, error)
GetLatestEpochDetected() (uint64, error)
AttesterSlashings(ctx context.Context, status types.SlashingStatus) ([]*ethpb.AttesterSlashing, error)
DeleteAttesterSlashing(ctx context.Context, attesterSlashing *ethpb.AttesterSlashing) error
HasAttesterSlashing(ctx context.Context, slashing *ethpb.AttesterSlashing) (bool, types.SlashingStatus, error)
GetLatestEpochDetected(ctx context.Context) (uint64, error)
// BlockHeader related methods.
BlockHeaders(epoch uint64, validatorID uint64) ([]*ethpb.SignedBeaconBlockHeader, error)
HasBlockHeader(epoch uint64, validatorID uint64) bool
BlockHeaders(ctx context.Context, epoch uint64, validatorID uint64) ([]*ethpb.SignedBeaconBlockHeader, error)
HasBlockHeader(ctx context.Context, epoch uint64, validatorID uint64) bool
// IndexedAttestations related methods.
IdxAttsForTargetFromID(targetEpoch uint64, validatorID uint64) ([]*ethpb.IndexedAttestation, error)
IdxAttsForTarget(targetEpoch uint64) ([]*ethpb.IndexedAttestation, error)
LatestIndexedAttestationsTargetEpoch() (uint64, error)
LatestValidatorIdx() (uint64, error)
DoubleVotes(validatorIdx uint64, dataRoot []byte, origAtt *ethpb.IndexedAttestation) ([]*ethpb.AttesterSlashing, error)
HasIndexedAttestation(targetEpoch uint64, validatorID uint64) (bool, error)
IdxAttsForTargetFromID(ctx context.Context, targetEpoch uint64, validatorID uint64) ([]*ethpb.IndexedAttestation, error)
IdxAttsForTarget(ctx context.Context, targetEpoch uint64) ([]*ethpb.IndexedAttestation, error)
LatestIndexedAttestationsTargetEpoch(ctx context.Context) (uint64, error)
LatestValidatorIdx(ctx context.Context) (uint64, error)
DoubleVotes(ctx context.Context, validatorIdx uint64, dataRoot []byte, origAtt *ethpb.IndexedAttestation) ([]*ethpb.AttesterSlashing, error)
HasIndexedAttestation(ctx context.Context, targetEpoch uint64, validatorID uint64) (bool, error)
// MinMaxSpan related methods.
ValidatorSpansMap(validatorIdx uint64) (*slashpb.EpochSpanMap, error)
ValidatorSpansMap(ctx context.Context, validatorIdx uint64) (*slashpb.EpochSpanMap, error)
// ProposerSlashing related methods.
ProposalSlashingsByStatus(status types.SlashingStatus) ([]*ethpb.ProposerSlashing, error)
HasProposerSlashing(slashing *ethpb.ProposerSlashing) (bool, types.SlashingStatus, error)
ProposalSlashingsByStatus(ctx context.Context, status types.SlashingStatus) ([]*ethpb.ProposerSlashing, error)
HasProposerSlashing(ctx context.Context, slashing *ethpb.ProposerSlashing) (bool, types.SlashingStatus, error)
// Validator Index -> Pubkey related methods.
ValidatorPubKey(validatorID uint64) ([]byte, error)
ValidatorPubKey(ctx context.Context, validatorID uint64) ([]byte, error)
}
// WriteAccessDatabase represents a write access database with only functions that can modify the DB.
type WriteAccessDatabase interface {
// AttesterSlashing related methods.
SaveAttesterSlashing(status types.SlashingStatus, slashing *ethpb.AttesterSlashing) error
SaveAttesterSlashings(status types.SlashingStatus, slashings []*ethpb.AttesterSlashing) error
SetLatestEpochDetected(epoch uint64) error
SaveAttesterSlashing(ctx context.Context, status types.SlashingStatus, slashing *ethpb.AttesterSlashing) error
SaveAttesterSlashings(ctx context.Context, status types.SlashingStatus, slashings []*ethpb.AttesterSlashing) error
SetLatestEpochDetected(ctx context.Context, epoch uint64) error
// BlockHeader related methods.
SaveBlockHeader(epoch uint64, validatorID uint64, blockHeader *ethpb.SignedBeaconBlockHeader) error
DeleteBlockHeader(epoch uint64, validatorID uint64, blockHeader *ethpb.SignedBeaconBlockHeader) error
PruneBlockHistory(currentEpoch uint64, pruningEpochAge uint64) error
SaveBlockHeader(ctx context.Context, epoch uint64, validatorID uint64, blockHeader *ethpb.SignedBeaconBlockHeader) error
DeleteBlockHeader(ctx context.Context, epoch uint64, validatorID uint64, blockHeader *ethpb.SignedBeaconBlockHeader) error
PruneBlockHistory(ctx context.Context, currentEpoch uint64, pruningEpochAge uint64) error
// IndexedAttestations related methods.
SaveIndexedAttestation(idxAttestation *ethpb.IndexedAttestation) error
DeleteIndexedAttestation(idxAttestation *ethpb.IndexedAttestation) error
PruneAttHistory(currentEpoch uint64, pruningEpochAge uint64) error
SaveIndexedAttestation(ctx context.Context, idxAttestation *ethpb.IndexedAttestation) error
DeleteIndexedAttestation(ctx context.Context, idxAttestation *ethpb.IndexedAttestation) error
PruneAttHistory(ctx context.Context, currentEpoch uint64, pruningEpochAge uint64) error
// MinMaxSpan related methods.
SaveValidatorSpansMap(validatorIdx uint64, spanMap *slashpb.EpochSpanMap) error
SaveCachedSpansMaps() error
DeleteValidatorSpanMap(validatorIdx uint64) error
SaveValidatorSpansMap(ctx context.Context, validatorIdx uint64, spanMap *slashpb.EpochSpanMap) error
SaveCachedSpansMaps(ctx context.Context) error
DeleteValidatorSpanMap(ctx context.Context, validatorIdx uint64) error
// ProposerSlashing related methods.
DeleteProposerSlashing(slashing *ethpb.ProposerSlashing) error
SaveProposerSlashing(status types.SlashingStatus, slashing *ethpb.ProposerSlashing) error
SaveProposerSlashings(status types.SlashingStatus, slashings []*ethpb.ProposerSlashing) error
DeleteProposerSlashing(ctx context.Context, slashing *ethpb.ProposerSlashing) error
SaveProposerSlashing(ctx context.Context, status types.SlashingStatus, slashing *ethpb.ProposerSlashing) error
SaveProposerSlashings(ctx context.Context, status types.SlashingStatus, slashings []*ethpb.ProposerSlashing) error
// Validator Index -> Pubkey related methods.
SavePubKey(validatorID uint64, pubKey []byte) error
DeletePubKey(validatorID uint64) error
SavePubKey(ctx context.Context, validatorID uint64, pubKey []byte) error
DeletePubKey(ctx context.Context, validatorID uint64) error
}
// FullAccessDatabase represents a full access database with only DB interaction functions.

View File

@@ -26,6 +26,7 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)

View File

@@ -2,6 +2,7 @@ package kv
import (
"bytes"
"context"
"github.com/boltdb/bolt"
"github.com/gogo/protobuf/proto"
@@ -10,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/slasher/db/types"
"go.opencensus.io/trace"
)
func unmarshalAttSlashing(enc []byte) (*ethpb.AttesterSlashing, error) {
@@ -35,7 +37,9 @@ func unmarshalAttSlashings(encoded [][]byte) ([]*ethpb.AttesterSlashing, error)
// AttesterSlashings accepts a status and returns all slashings with this status.
// returns empty []*ethpb.AttesterSlashing if no slashing has been found with this status.
func (db *Store) AttesterSlashings(status types.SlashingStatus) ([]*ethpb.AttesterSlashing, error) {
func (db *Store) AttesterSlashings(ctx context.Context, status types.SlashingStatus) ([]*ethpb.AttesterSlashing, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.AttesterSlashings")
defer span.End()
encoded := make([][]byte, 0)
err := db.view(func(tx *bolt.Tx) error {
c := tx.Bucket(slashingBucket).Cursor()
@@ -54,7 +58,9 @@ func (db *Store) AttesterSlashings(status types.SlashingStatus) ([]*ethpb.Attest
}
// DeleteAttesterSlashing deletes an attester slashing proof from db.
func (db *Store) DeleteAttesterSlashing(attesterSlashing *ethpb.AttesterSlashing) error {
func (db *Store) DeleteAttesterSlashing(ctx context.Context, attesterSlashing *ethpb.AttesterSlashing) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteAttesterSlashing")
defer span.End()
root, err := hashutil.HashProto(attesterSlashing)
if err != nil {
return errors.Wrap(err, "failed to get hash root of attesterSlashing")
@@ -73,7 +79,9 @@ func (db *Store) DeleteAttesterSlashing(attesterSlashing *ethpb.AttesterSlashing
}
// HasAttesterSlashing returns true and slashing status if a slashing is found in the db.
func (db *Store) HasAttesterSlashing(slashing *ethpb.AttesterSlashing) (bool, types.SlashingStatus, error) {
func (db *Store) HasAttesterSlashing(ctx context.Context, slashing *ethpb.AttesterSlashing) (bool, types.SlashingStatus, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.HasAttesterSlashing")
defer span.End()
var status types.SlashingStatus
var found bool
root, err := hashutil.HashProto(slashing)
@@ -94,7 +102,9 @@ func (db *Store) HasAttesterSlashing(slashing *ethpb.AttesterSlashing) (bool, ty
}
// SaveAttesterSlashing accepts a slashing proof and its status and writes it to disk.
func (db *Store) SaveAttesterSlashing(status types.SlashingStatus, slashing *ethpb.AttesterSlashing) error {
func (db *Store) SaveAttesterSlashing(ctx context.Context, status types.SlashingStatus, slashing *ethpb.AttesterSlashing) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveAttesterSlashing")
defer span.End()
enc, err := proto.Marshal(slashing)
if err != nil {
return errors.Wrap(err, "failed to marshal")
@@ -109,7 +119,9 @@ func (db *Store) SaveAttesterSlashing(status types.SlashingStatus, slashing *eth
}
// SaveAttesterSlashings accepts a slice of slashing proof and its status and writes it to disk.
func (db *Store) SaveAttesterSlashings(status types.SlashingStatus, slashings []*ethpb.AttesterSlashing) error {
func (db *Store) SaveAttesterSlashings(ctx context.Context, status types.SlashingStatus, slashings []*ethpb.AttesterSlashing) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveAttesterSlashings")
defer span.End()
enc := make([][]byte, len(slashings))
key := make([][]byte, len(slashings))
var err error
@@ -135,7 +147,9 @@ func (db *Store) SaveAttesterSlashings(status types.SlashingStatus, slashings []
}
// GetLatestEpochDetected returns the latest detected epoch from db.
func (db *Store) GetLatestEpochDetected() (uint64, error) {
func (db *Store) GetLatestEpochDetected(ctx context.Context) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.GetLatestEpochDetected")
defer span.End()
var epoch uint64
err := db.view(func(tx *bolt.Tx) error {
b := tx.Bucket(slashingBucket)
@@ -151,7 +165,9 @@ func (db *Store) GetLatestEpochDetected() (uint64, error) {
}
// SetLatestEpochDetected sets the latest slashing detected epoch in db.
func (db *Store) SetLatestEpochDetected(epoch uint64) error {
func (db *Store) SetLatestEpochDetected(ctx context.Context, epoch uint64) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SetLatestEpochDetected")
defer span.End()
return db.update(func(tx *bolt.Tx) error {
b := tx.Bucket(slashingBucket)
err := b.Put([]byte(latestEpochKey), bytesutil.Bytes8(epoch))

View File

@@ -1,6 +1,7 @@
package kv
import (
"context"
"flag"
"reflect"
"sort"
@@ -14,11 +15,12 @@ import (
func TestStore_AttesterSlashingNilBucket(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
as := &ethpb.AttesterSlashing{Attestation_1: &ethpb.IndexedAttestation{Signature: []byte("hello")}}
has, _, err := db.HasAttesterSlashing(as)
has, _, err := db.HasAttesterSlashing(ctx, as)
if err != nil {
t.Fatalf("HasAttesterSlashing should not return error: %v", err)
}
@@ -26,7 +28,7 @@ func TestStore_AttesterSlashingNilBucket(t *testing.T) {
t.Fatal("HasAttesterSlashing should return false")
}
p, err := db.AttesterSlashings(types.SlashingStatus(types.Active))
p, err := db.AttesterSlashings(ctx, types.SlashingStatus(types.Active))
if err != nil {
t.Fatalf("Failed to get attester slashing: %v", err)
}
@@ -38,9 +40,10 @@ func TestStore_AttesterSlashingNilBucket(t *testing.T) {
func TestStore_SaveAttesterSlashing(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
tests := []struct {
ss types.SlashingStatus
as *ethpb.AttesterSlashing
@@ -60,12 +63,12 @@ func TestStore_SaveAttesterSlashing(t *testing.T) {
}
for _, tt := range tests {
err := db.SaveAttesterSlashing(tt.ss, tt.as)
err := db.SaveAttesterSlashing(ctx, tt.ss, tt.as)
if err != nil {
t.Fatalf("save attester slashing failed: %v", err)
}
attesterSlashings, err := db.AttesterSlashings(tt.ss)
attesterSlashings, err := db.AttesterSlashings(ctx, tt.ss)
if err != nil {
t.Fatalf("failed to get attester slashings: %v", err)
}
@@ -80,19 +83,20 @@ func TestStore_SaveAttesterSlashing(t *testing.T) {
func TestStore_SaveAttesterSlashings(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
as := []*ethpb.AttesterSlashing{
{Attestation_1: &ethpb.IndexedAttestation{Signature: []byte("1")}},
{Attestation_1: &ethpb.IndexedAttestation{Signature: []byte("2")}},
{Attestation_1: &ethpb.IndexedAttestation{Signature: []byte("3")}},
}
err := db.SaveAttesterSlashings(types.Active, as)
err := db.SaveAttesterSlashings(ctx, types.Active, as)
if err != nil {
t.Fatalf("save attester slashing failed: %v", err)
}
attesterSlashings, err := db.AttesterSlashings(types.Active)
attesterSlashings, err := db.AttesterSlashings(ctx, types.Active)
if err != nil {
t.Fatalf("failed to get attester slashings: %v", err)
}
@@ -107,9 +111,10 @@ func TestStore_SaveAttesterSlashings(t *testing.T) {
func TestStore_UpdateAttesterSlashingStatus(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
tests := []struct {
ss types.SlashingStatus
as *ethpb.AttesterSlashing
@@ -129,14 +134,14 @@ func TestStore_UpdateAttesterSlashingStatus(t *testing.T) {
}
for _, tt := range tests {
err := db.SaveAttesterSlashing(tt.ss, tt.as)
err := db.SaveAttesterSlashing(ctx, tt.ss, tt.as)
if err != nil {
t.Fatalf("save attester slashing failed: %v", err)
}
}
for _, tt := range tests {
has, st, err := db.HasAttesterSlashing(tt.as)
has, st, err := db.HasAttesterSlashing(ctx, tt.as)
if err != nil {
t.Fatalf("Failed to get attester slashing: %v", err)
}
@@ -147,8 +152,8 @@ func TestStore_UpdateAttesterSlashingStatus(t *testing.T) {
t.Fatalf("Failed to find attester slashing with the correct status: %v", tt.as)
}
err = db.SaveAttesterSlashing(types.SlashingStatus(types.Included), tt.as)
has, st, err = db.HasAttesterSlashing(tt.as)
err = db.SaveAttesterSlashing(ctx, types.SlashingStatus(types.Included), tt.as)
has, st, err = db.HasAttesterSlashing(ctx, tt.as)
if err != nil {
t.Fatalf("Failed to get attester slashing: %v", err)
}
@@ -158,18 +163,17 @@ func TestStore_UpdateAttesterSlashingStatus(t *testing.T) {
if st != types.Included {
t.Fatalf("Failed to find attester slashing with the correct status: %v", tt.as)
}
}
}
func TestStore_LatestEpochDetected(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
e, err := db.GetLatestEpochDetected()
ctx := context.Background()
e, err := db.GetLatestEpochDetected(ctx)
if err != nil {
t.Fatalf("Get latest epoch detected failed: %v", err)
}
@@ -177,11 +181,11 @@ func TestStore_LatestEpochDetected(t *testing.T) {
t.Fatalf("Latest epoch detected should have been 0 before setting got: %d", e)
}
epoch := uint64(1)
err = db.SetLatestEpochDetected(epoch)
err = db.SetLatestEpochDetected(ctx, epoch)
if err != nil {
t.Fatalf("Set latest epoch detected failed: %v", err)
}
e, err = db.GetLatestEpochDetected()
e, err = db.GetLatestEpochDetected(ctx)
if err != nil {
t.Fatalf("Get latest epoch detected failed: %v", err)
}

View File

@@ -2,6 +2,7 @@ package kv
import (
"bytes"
"context"
"github.com/boltdb/bolt"
"github.com/gogo/protobuf/proto"
@@ -9,9 +10,12 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"go.opencensus.io/trace"
)
func unmarshalBlockHeader(enc []byte) (*ethpb.SignedBeaconBlockHeader, error) {
func unmarshalBlockHeader(ctx context.Context, enc []byte) (*ethpb.SignedBeaconBlockHeader, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalBlockHeader")
defer span.End()
protoBlockHeader := &ethpb.SignedBeaconBlockHeader{}
err := proto.Unmarshal(enc, protoBlockHeader)
if err != nil {
@@ -22,13 +26,15 @@ func unmarshalBlockHeader(enc []byte) (*ethpb.SignedBeaconBlockHeader, error) {
// BlockHeaders accepts an epoch and validator id and returns the corresponding block header array.
// Returns nil if the block header for those values does not exist.
func (db *Store) BlockHeaders(epoch uint64, validatorID uint64) ([]*ethpb.SignedBeaconBlockHeader, error) {
func (db *Store) BlockHeaders(ctx context.Context, epoch uint64, validatorID uint64) ([]*ethpb.SignedBeaconBlockHeader, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.BlockHeaders")
defer span.End()
var blockHeaders []*ethpb.SignedBeaconBlockHeader
err := db.view(func(tx *bolt.Tx) error {
c := tx.Bucket(historicBlockHeadersBucket).Cursor()
prefix := encodeEpochValidatorID(epoch, validatorID)
for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
bh, err := unmarshalBlockHeader(v)
bh, err := unmarshalBlockHeader(ctx, v)
if err != nil {
return err
}
@@ -40,7 +46,9 @@ func (db *Store) BlockHeaders(epoch uint64, validatorID uint64) ([]*ethpb.Signed
}
// HasBlockHeader accepts an epoch and validator id and returns true if the block header exists.
func (db *Store) HasBlockHeader(epoch uint64, validatorID uint64) bool {
func (db *Store) HasBlockHeader(ctx context.Context, epoch uint64, validatorID uint64) bool {
ctx, span := trace.StartSpan(ctx, "SlasherDB.HasBlockHeader")
defer span.End()
prefix := encodeEpochValidatorID(epoch, validatorID)
var hasBlockHeader bool
// #nosec G104
@@ -58,7 +66,9 @@ func (db *Store) HasBlockHeader(epoch uint64, validatorID uint64) bool {
}
// SaveBlockHeader accepts a block header and writes it to disk.
func (db *Store) SaveBlockHeader(epoch uint64, validatorID uint64, blockHeader *ethpb.SignedBeaconBlockHeader) error {
func (db *Store) SaveBlockHeader(ctx context.Context, epoch uint64, validatorID uint64, blockHeader *ethpb.SignedBeaconBlockHeader) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveBlockHeader")
defer span.End()
key := encodeEpochValidatorIDSig(epoch, validatorID, blockHeader.Signature)
enc, err := proto.Marshal(blockHeader)
if err != nil {
@@ -73,18 +83,22 @@ func (db *Store) SaveBlockHeader(epoch uint64, validatorID uint64, blockHeader *
return err
})
if err != nil {
return err
}
// Prune block header history every 10th epoch.
if epoch%params.BeaconConfig().PruneSlasherStoragePeriod == 0 {
err = db.PruneBlockHistory(epoch, params.BeaconConfig().WeakSubjectivityPeriod)
return db.PruneBlockHistory(ctx, epoch, params.BeaconConfig().WeakSubjectivityPeriod)
}
return err
return nil
}
// DeleteBlockHeader deletes a block header using the epoch and validator id.
func (db *Store) DeleteBlockHeader(epoch uint64, validatorID uint64, blockHeader *ethpb.SignedBeaconBlockHeader) error {
func (db *Store) DeleteBlockHeader(ctx context.Context, epoch uint64, validatorID uint64, blockHeader *ethpb.SignedBeaconBlockHeader) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteBlockHeader")
defer span.End()
key := encodeEpochValidatorIDSig(epoch, validatorID, blockHeader.Signature)
return db.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicBlockHeadersBucket)
if err := bucket.Delete(key); err != nil {
@@ -94,8 +108,10 @@ func (db *Store) DeleteBlockHeader(epoch uint64, validatorID uint64, blockHeader
})
}
// PruneBlockHistory removes all blocks from the DB older than the pruning epoch age.
func (db *Store) PruneBlockHistory(currentEpoch uint64, pruningEpochAge uint64) error {
// PruneBlockHistory leaves only records younger then history size.
func (db *Store) PruneBlockHistory(ctx context.Context, currentEpoch uint64, pruningEpochAge uint64) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.pruneBlockHistory")
defer span.End()
pruneTill := int64(currentEpoch) - int64(pruningEpochAge)
if pruneTill <= 0 {
return nil

View File

@@ -1,6 +1,7 @@
package kv
import (
"context"
"flag"
"reflect"
"testing"
@@ -12,19 +13,19 @@ import (
func TestNilDBHistoryBlkHdr(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
epoch := uint64(1)
validatorID := uint64(1)
hasBlockHeader := db.HasBlockHeader(epoch, validatorID)
hasBlockHeader := db.HasBlockHeader(ctx, epoch, validatorID)
if hasBlockHeader {
t.Fatal("HasBlockHeader should return false")
}
bPrime, err := db.BlockHeaders(epoch, validatorID)
bPrime, err := db.BlockHeaders(ctx, epoch, validatorID)
if err != nil {
t.Fatalf("failed to get block: %v", err)
}
@@ -36,9 +37,9 @@ func TestNilDBHistoryBlkHdr(t *testing.T) {
func TestSaveHistoryBlkHdr(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
defer teardownDB(t, db)
db := setupDB(t, cli.NewContext(app, set, nil))
ctx := context.Background()
tests := []struct {
epoch uint64
vID uint64
@@ -62,12 +63,12 @@ func TestSaveHistoryBlkHdr(t *testing.T) {
}
for _, tt := range tests {
err := db.SaveBlockHeader(tt.epoch, tt.vID, tt.bh)
err := db.SaveBlockHeader(ctx, tt.epoch, tt.vID, tt.bh)
if err != nil {
t.Fatalf("save block failed: %v", err)
}
bha, err := db.BlockHeaders(tt.epoch, tt.vID)
bha, err := db.BlockHeaders(ctx, tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block: %v", err)
}
@@ -82,9 +83,10 @@ func TestSaveHistoryBlkHdr(t *testing.T) {
func TestDeleteHistoryBlkHdr(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
tests := []struct {
epoch uint64
vID uint64
@@ -108,14 +110,14 @@ func TestDeleteHistoryBlkHdr(t *testing.T) {
}
for _, tt := range tests {
err := db.SaveBlockHeader(tt.epoch, tt.vID, tt.bh)
err := db.SaveBlockHeader(ctx, tt.epoch, tt.vID, tt.bh)
if err != nil {
t.Fatalf("save block failed: %v", err)
}
}
for _, tt := range tests {
bha, err := db.BlockHeaders(tt.epoch, tt.vID)
bha, err := db.BlockHeaders(ctx, tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block: %v", err)
}
@@ -123,11 +125,11 @@ func TestDeleteHistoryBlkHdr(t *testing.T) {
if bha == nil || !reflect.DeepEqual(bha[0], tt.bh) {
t.Fatalf("get should return bh: %v", bha)
}
err = db.DeleteBlockHeader(tt.epoch, tt.vID, tt.bh)
err = db.DeleteBlockHeader(ctx, tt.epoch, tt.vID, tt.bh)
if err != nil {
t.Fatalf("save block failed: %v", err)
}
bh, err := db.BlockHeaders(tt.epoch, tt.vID)
bh, err := db.BlockHeaders(ctx, tt.epoch, tt.vID)
if err != nil {
t.Fatal(err)
@@ -143,9 +145,10 @@ func TestDeleteHistoryBlkHdr(t *testing.T) {
func TestHasHistoryBlkHdr(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
tests := []struct {
epoch uint64
vID uint64
@@ -169,22 +172,22 @@ func TestHasHistoryBlkHdr(t *testing.T) {
}
for _, tt := range tests {
found := db.HasBlockHeader(tt.epoch, tt.vID)
found := db.HasBlockHeader(ctx, tt.epoch, tt.vID)
if found {
t.Fatal("has block header should return false for block headers that are not in db")
}
err := db.SaveBlockHeader(tt.epoch, tt.vID, tt.bh)
err := db.SaveBlockHeader(ctx, tt.epoch, tt.vID, tt.bh)
if err != nil {
t.Fatalf("save block failed: %v", err)
}
}
for _, tt := range tests {
err := db.SaveBlockHeader(tt.epoch, tt.vID, tt.bh)
err := db.SaveBlockHeader(ctx, tt.epoch, tt.vID, tt.bh)
if err != nil {
t.Fatalf("save block failed: %v", err)
}
found := db.HasBlockHeader(tt.epoch, tt.vID)
found := db.HasBlockHeader(ctx, tt.epoch, tt.vID)
if !found {
t.Fatal("has block header should return true")
@@ -195,9 +198,10 @@ func TestHasHistoryBlkHdr(t *testing.T) {
func TestPruneHistoryBlkHdr(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
tests := []struct {
epoch uint64
vID uint64
@@ -231,12 +235,12 @@ func TestPruneHistoryBlkHdr(t *testing.T) {
}
for _, tt := range tests {
err := db.SaveBlockHeader(tt.epoch, tt.vID, tt.bh)
err := db.SaveBlockHeader(ctx, tt.epoch, tt.vID, tt.bh)
if err != nil {
t.Fatalf("save block header failed: %v", err)
}
bha, err := db.BlockHeaders(tt.epoch, tt.vID)
bha, err := db.BlockHeaders(ctx, tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block header: %v", err)
}
@@ -247,13 +251,13 @@ func TestPruneHistoryBlkHdr(t *testing.T) {
}
currentEpoch := uint64(3)
historyToKeep := uint64(2)
err := db.PruneBlockHistory(currentEpoch, historyToKeep)
err := db.PruneBlockHistory(ctx, currentEpoch, historyToKeep)
if err != nil {
t.Fatalf("failed to prune: %v", err)
}
for _, tt := range tests {
bha, err := db.BlockHeaders(tt.epoch, tt.vID)
bha, err := db.BlockHeaders(ctx, tt.epoch, tt.vID)
if err != nil {
t.Fatalf("failed to get block header: %v", err)
}
@@ -266,6 +270,5 @@ func TestPruneHistoryBlkHdr(t *testing.T) {
t.Fatalf("block header should have been pruned: %v", bha)
}
}
}
}

View File

@@ -2,6 +2,7 @@ package kv
import (
"bytes"
"context"
"fmt"
"reflect"
"sort"
@@ -14,9 +15,12 @@ import (
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"go.opencensus.io/trace"
)
func unmarshalIdxAtt(enc []byte) (*ethpb.IndexedAttestation, error) {
func unmarshalIdxAtt(ctx context.Context, enc []byte) (*ethpb.IndexedAttestation, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalIdxAtt")
defer span.End()
protoIdxAtt := &ethpb.IndexedAttestation{}
err := proto.Unmarshal(enc, protoIdxAtt)
if err != nil {
@@ -25,7 +29,9 @@ func unmarshalIdxAtt(enc []byte) (*ethpb.IndexedAttestation, error) {
return protoIdxAtt, nil
}
func unmarshalCompressedIdxAttList(enc []byte) (*slashpb.CompressedIdxAttList, error) {
func unmarshalCompressedIdxAttList(ctx context.Context, enc []byte) (*slashpb.CompressedIdxAttList, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalCompressedIdxAttList")
defer span.End()
protoIdxAtt := &slashpb.CompressedIdxAttList{}
err := proto.Unmarshal(enc, protoIdxAtt)
if err != nil {
@@ -37,7 +43,9 @@ func unmarshalCompressedIdxAttList(enc []byte) (*slashpb.CompressedIdxAttList, e
// IdxAttsForTargetFromID accepts a epoch and validator index and returns a list of
// indexed attestations from that validator for the given target epoch.
// Returns nil if the indexed attestation does not exist.
func (db *Store) IdxAttsForTargetFromID(targetEpoch uint64, validatorID uint64) ([]*ethpb.IndexedAttestation, error) {
func (db *Store) IdxAttsForTargetFromID(ctx context.Context, targetEpoch uint64, validatorID uint64) ([]*ethpb.IndexedAttestation, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.IdxAttsForTargetFromID")
defer span.End()
var idxAtts []*ethpb.IndexedAttestation
err := db.view(func(tx *bolt.Tx) error {
@@ -46,7 +54,7 @@ func (db *Store) IdxAttsForTargetFromID(targetEpoch uint64, validatorID uint64)
if enc == nil {
return nil
}
idToIdxAttsList, err := unmarshalCompressedIdxAttList(enc)
idToIdxAttsList, err := unmarshalCompressedIdxAttList(ctx, enc)
if err != nil {
return err
}
@@ -62,7 +70,7 @@ func (db *Store) IdxAttsForTargetFromID(targetEpoch uint64, validatorID uint64)
if enc == nil {
continue
}
att, err := unmarshalIdxAtt(enc)
att, err := unmarshalIdxAtt(ctx, enc)
if err != nil {
return err
}
@@ -78,13 +86,15 @@ func (db *Store) IdxAttsForTargetFromID(targetEpoch uint64, validatorID uint64)
// IdxAttsForTarget accepts a target epoch and returns a list of
// indexed attestations.
// Returns nil if the indexed attestation does not exist with that target epoch.
func (db *Store) IdxAttsForTarget(targetEpoch uint64) ([]*ethpb.IndexedAttestation, error) {
func (db *Store) IdxAttsForTarget(ctx context.Context, targetEpoch uint64) ([]*ethpb.IndexedAttestation, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.IdxAttsForTarget")
defer span.End()
var idxAtts []*ethpb.IndexedAttestation
key := bytesutil.Bytes8(targetEpoch)
err := db.view(func(tx *bolt.Tx) error {
c := tx.Bucket(historicIndexedAttestationsBucket).Cursor()
for k, enc := c.Seek(key); k != nil && bytes.Equal(k[:8], key); k, _ = c.Next() {
idxAtt, err := unmarshalIdxAtt(enc)
idxAtt, err := unmarshalIdxAtt(ctx, enc)
if err != nil {
return err
}
@@ -97,7 +107,9 @@ func (db *Store) IdxAttsForTarget(targetEpoch uint64) ([]*ethpb.IndexedAttestati
// LatestIndexedAttestationsTargetEpoch returns latest target epoch in db
// returns 0 if there is no indexed attestations in db.
func (db *Store) LatestIndexedAttestationsTargetEpoch() (uint64, error) {
func (db *Store) LatestIndexedAttestationsTargetEpoch(ctx context.Context) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.LatestIndexedAttestationsTargetEpoch")
defer span.End()
var lt uint64
err := db.view(func(tx *bolt.Tx) error {
c := tx.Bucket(historicIndexedAttestationsBucket).Cursor()
@@ -113,7 +125,9 @@ func (db *Store) LatestIndexedAttestationsTargetEpoch() (uint64, error) {
// LatestValidatorIdx returns latest validator id in db
// returns 0 if there is no validators in db.
func (db *Store) LatestValidatorIdx() (uint64, error) {
func (db *Store) LatestValidatorIdx(ctx context.Context) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.LatestValidatorIdx")
defer span.End()
var lt uint64
err := db.view(func(tx *bolt.Tx) error {
c := tx.Bucket(compressedIdxAttsBucket).Cursor()
@@ -128,8 +142,10 @@ func (db *Store) LatestValidatorIdx() (uint64, error) {
}
// DoubleVotes looks up db for slashable attesting data that were preformed by the same validator.
func (db *Store) DoubleVotes(validatorIdx uint64, dataRoot []byte, origAtt *ethpb.IndexedAttestation) ([]*ethpb.AttesterSlashing, error) {
idxAtts, err := db.IdxAttsForTargetFromID(origAtt.Data.Target.Epoch, validatorIdx)
func (db *Store) DoubleVotes(ctx context.Context, validatorIdx uint64, dataRoot []byte, origAtt *ethpb.IndexedAttestation) ([]*ethpb.AttesterSlashing, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.DoubleVotes")
defer span.End()
idxAtts, err := db.IdxAttsForTargetFromID(ctx, origAtt.Data.Target.Epoch, validatorIdx)
if err != nil {
return nil, err
}
@@ -162,7 +178,9 @@ func (db *Store) DoubleVotes(validatorIdx uint64, dataRoot []byte, origAtt *ethp
}
// HasIndexedAttestation accepts an epoch and validator id and returns true if the indexed attestation exists.
func (db *Store) HasIndexedAttestation(targetEpoch uint64, validatorID uint64) (bool, error) {
func (db *Store) HasIndexedAttestation(ctx context.Context, targetEpoch uint64, validatorID uint64) (bool, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.HasIndexedAttestation")
defer span.End()
key := bytesutil.Bytes8(targetEpoch)
var hasAttestation bool
// #nosec G104
@@ -172,7 +190,7 @@ func (db *Store) HasIndexedAttestation(targetEpoch uint64, validatorID uint64) (
if enc == nil {
return nil
}
iList, err := unmarshalCompressedIdxAttList(enc)
iList, err := unmarshalCompressedIdxAttList(ctx, enc)
if err != nil {
return err
}
@@ -192,7 +210,9 @@ func (db *Store) HasIndexedAttestation(targetEpoch uint64, validatorID uint64) (
}
// SaveIndexedAttestation accepts epoch and indexed attestation and writes it to disk.
func (db *Store) SaveIndexedAttestation(idxAttestation *ethpb.IndexedAttestation) error {
func (db *Store) SaveIndexedAttestation(ctx context.Context, idxAttestation *ethpb.IndexedAttestation) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveIndexedAttestation")
defer span.End()
key := encodeEpochSig(idxAttestation.Data.Target.Epoch, idxAttestation.Signature)
enc, err := proto.Marshal(idxAttestation)
if err != nil {
@@ -205,7 +225,7 @@ func (db *Store) SaveIndexedAttestation(idxAttestation *ethpb.IndexedAttestation
if val != nil {
return nil
}
if err := saveCompressedIdxAttToEpochList(idxAttestation, tx); err != nil {
if err := saveCompressedIdxAttToEpochList(ctx, idxAttestation, tx); err != nil {
return errors.Wrap(err, "failed to save indices from indexed attestation")
}
if err := bucket.Put(key, enc); err != nil {
@@ -218,14 +238,16 @@ func (db *Store) SaveIndexedAttestation(idxAttestation *ethpb.IndexedAttestation
// Prune history to max size every PruneSlasherStoragePeriod epoch.
if idxAttestation.Data.Source.Epoch%params.BeaconConfig().PruneSlasherStoragePeriod == 0 {
wsPeriod := params.BeaconConfig().WeakSubjectivityPeriod
if err = db.PruneAttHistory(idxAttestation.Data.Source.Epoch, wsPeriod); err != nil {
if err = db.PruneAttHistory(ctx, idxAttestation.Data.Source.Epoch, wsPeriod); err != nil {
return err
}
}
return err
}
func saveCompressedIdxAttToEpochList(idxAttestation *ethpb.IndexedAttestation, tx *bolt.Tx) error {
func saveCompressedIdxAttToEpochList(ctx context.Context, idxAttestation *ethpb.IndexedAttestation, tx *bolt.Tx) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.saveCompressedIdxAttToEpochList")
defer span.End()
dataRoot, err := hashutil.HashProto(idxAttestation.Data)
if err != nil {
return errors.Wrap(err, "failed to hash indexed attestation data.")
@@ -239,7 +261,7 @@ func saveCompressedIdxAttToEpochList(idxAttestation *ethpb.IndexedAttestation, t
key := bytesutil.Bytes8(idxAttestation.Data.Target.Epoch)
bucket := tx.Bucket(compressedIdxAttsBucket)
enc := bucket.Get(key)
compressedIdxAttList, err := unmarshalCompressedIdxAttList(enc)
compressedIdxAttList, err := unmarshalCompressedIdxAttList(ctx, enc)
if err != nil {
return errors.Wrap(err, "failed to decode value into CompressedIdxAtt")
}
@@ -255,7 +277,9 @@ func saveCompressedIdxAttToEpochList(idxAttestation *ethpb.IndexedAttestation, t
}
// DeleteIndexedAttestation deletes a indexed attestation using the slot and its root as keys in their respective buckets.
func (db *Store) DeleteIndexedAttestation(idxAttestation *ethpb.IndexedAttestation) error {
func (db *Store) DeleteIndexedAttestation(ctx context.Context, idxAttestation *ethpb.IndexedAttestation) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteIndexedAttestation")
defer span.End()
key := encodeEpochSig(idxAttestation.Data.Target.Epoch, idxAttestation.Signature)
return db.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(historicIndexedAttestationsBucket)
@@ -263,7 +287,7 @@ func (db *Store) DeleteIndexedAttestation(idxAttestation *ethpb.IndexedAttestati
if enc == nil {
return nil
}
if err := removeIdxAttIndicesByEpochFromDB(idxAttestation, tx); err != nil {
if err := removeIdxAttIndicesByEpochFromDB(ctx, idxAttestation, tx); err != nil {
return err
}
if err := bucket.Delete(key); err != nil {
@@ -276,7 +300,9 @@ func (db *Store) DeleteIndexedAttestation(idxAttestation *ethpb.IndexedAttestati
})
}
func removeIdxAttIndicesByEpochFromDB(idxAttestation *ethpb.IndexedAttestation, tx *bolt.Tx) error {
func removeIdxAttIndicesByEpochFromDB(ctx context.Context, idxAttestation *ethpb.IndexedAttestation, tx *bolt.Tx) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.removeIdxAttIndicesByEpochFromDB")
defer span.End()
dataRoot, err := hashutil.HashProto(idxAttestation.Data)
if err != nil {
return err
@@ -292,7 +318,7 @@ func removeIdxAttIndicesByEpochFromDB(idxAttestation *ethpb.IndexedAttestation,
if enc == nil {
return errors.New("requested to delete data that is not present")
}
vIdxList, err := unmarshalCompressedIdxAttList(enc)
vIdxList, err := unmarshalCompressedIdxAttList(ctx, enc)
if err != nil {
return errors.Wrap(err, "failed to decode value into ValidatorIDToIndexedAttestationList")
}
@@ -315,7 +341,9 @@ func removeIdxAttIndicesByEpochFromDB(idxAttestation *ethpb.IndexedAttestation,
}
// PruneAttHistory removes all attestations from the DB older than the pruning epoch age.
func (db *Store) PruneAttHistory(currentEpoch uint64, pruningEpochAge uint64) error {
func (db *Store) PruneAttHistory(ctx context.Context, currentEpoch uint64, pruningEpochAge uint64) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.pruneAttHistory")
defer span.End()
pruneFromEpoch := int64(currentEpoch) - int64(pruningEpochAge)
if pruneFromEpoch <= 0 {
return nil

View File

@@ -1,6 +1,7 @@
package kv
import (
"context"
"flag"
"reflect"
"testing"
@@ -41,14 +42,14 @@ func init() {
func TestNilDBHistoryIdxAtt(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
epoch := uint64(1)
validatorID := uint64(1)
hasIdxAtt, err := db.HasIndexedAttestation(epoch, validatorID)
hasIdxAtt, err := db.HasIndexedAttestation(ctx, epoch, validatorID)
if err != nil {
t.Fatal(err)
}
@@ -56,7 +57,7 @@ func TestNilDBHistoryIdxAtt(t *testing.T) {
t.Fatal("HasIndexedAttestation should return false")
}
idxAtts, err := db.IdxAttsForTargetFromID(epoch, validatorID)
idxAtts, err := db.IdxAttsForTargetFromID(ctx, epoch, validatorID)
if err != nil {
t.Fatalf("failed to get indexed attestation: %v", err)
}
@@ -68,17 +69,17 @@ func TestNilDBHistoryIdxAtt(t *testing.T) {
func TestSaveIdxAtt(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
for _, tt := range tests {
err := db.SaveIndexedAttestation(tt.idxAtt)
err := db.SaveIndexedAttestation(ctx, tt.idxAtt)
if err != nil {
t.Fatalf("save indexed attestation failed: %v", err)
}
idxAtts, err := db.IdxAttsForTargetFromID(tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
idxAtts, err := db.IdxAttsForTargetFromID(ctx, tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
if err != nil {
t.Fatalf("failed to get indexed attestation: %v", err)
}
@@ -93,20 +94,20 @@ func TestSaveIdxAtt(t *testing.T) {
func TestDeleteHistoryIdxAtt(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
for _, tt := range tests {
err := db.SaveIndexedAttestation(tt.idxAtt)
err := db.SaveIndexedAttestation(ctx, tt.idxAtt)
if err != nil {
t.Fatalf("save indexed attestation failed: %v", err)
}
}
for _, tt := range tests {
idxAtts, err := db.IdxAttsForTargetFromID(tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
idxAtts, err := db.IdxAttsForTargetFromID(ctx, tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
if err != nil {
t.Fatalf("failed to get index attestation: %v", err)
}
@@ -114,16 +115,16 @@ func TestDeleteHistoryIdxAtt(t *testing.T) {
if idxAtts == nil || !reflect.DeepEqual(idxAtts[0], tt.idxAtt) {
t.Fatalf("get should return indexed attestation: %v", idxAtts)
}
err = db.DeleteIndexedAttestation(tt.idxAtt)
err = db.DeleteIndexedAttestation(ctx, tt.idxAtt)
if err != nil {
t.Fatalf("delete index attestation failed: %v", err)
}
idxAtts, err = db.IdxAttsForTargetFromID(tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
idxAtts, err = db.IdxAttsForTargetFromID(ctx, tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
if err != nil {
t.Fatal(err)
}
hasA, err := db.HasIndexedAttestation(tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
hasA, err := db.HasIndexedAttestation(ctx, tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
if err != nil {
t.Fatal(err)
}
@@ -141,12 +142,12 @@ func TestDeleteHistoryIdxAtt(t *testing.T) {
func TestHasIndexedAttestation(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
for _, tt := range tests {
exists, err := db.HasIndexedAttestation(tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
exists, err := db.HasIndexedAttestation(ctx, tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
if err != nil {
t.Fatal(err)
}
@@ -154,13 +155,13 @@ func TestHasIndexedAttestation(t *testing.T) {
t.Fatal("has indexed attestation should return false for indexed attestations that are not in db")
}
if err := db.SaveIndexedAttestation(tt.idxAtt); err != nil {
if err := db.SaveIndexedAttestation(ctx, tt.idxAtt); err != nil {
t.Fatalf("save indexed attestation failed: %v", err)
}
}
for _, tt := range tests {
exists, err := db.HasIndexedAttestation(tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
exists, err := db.HasIndexedAttestation(ctx, tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
if err != nil {
t.Fatal(err)
}
@@ -173,17 +174,17 @@ func TestHasIndexedAttestation(t *testing.T) {
func TestPruneHistoryIdxAtt(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
for _, tt := range tests {
err := db.SaveIndexedAttestation(tt.idxAtt)
err := db.SaveIndexedAttestation(ctx, tt.idxAtt)
if err != nil {
t.Fatalf("save indexed attestation failed: %v", err)
}
idxAtts, err := db.IdxAttsForTargetFromID(tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
idxAtts, err := db.IdxAttsForTargetFromID(ctx, tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
if err != nil {
t.Fatalf("failed to get indexed attestation: %v", err)
}
@@ -194,17 +195,17 @@ func TestPruneHistoryIdxAtt(t *testing.T) {
}
currentEpoch := uint64(3)
historyToKeep := uint64(1)
err := db.PruneAttHistory(currentEpoch, historyToKeep)
err := db.PruneAttHistory(ctx, currentEpoch, historyToKeep)
if err != nil {
t.Fatalf("failed to prune: %v", err)
}
for _, tt := range tests {
idxAtts, err := db.IdxAttsForTargetFromID(tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
idxAtts, err := db.IdxAttsForTargetFromID(ctx, tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
if err != nil {
t.Fatalf("failed to get indexed attestation: %v", err)
}
exists, err := db.HasIndexedAttestation(tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
exists, err := db.HasIndexedAttestation(ctx, tt.idxAtt.Data.Target.Epoch, tt.idxAtt.AttestingIndices[0])
if err != nil {
t.Fatal(err)
}

View File

@@ -1,6 +1,7 @@
package kv
import (
"context"
"fmt"
"github.com/boltdb/bolt"
@@ -9,6 +10,7 @@ import (
slashpb "github.com/prysmaticlabs/prysm/proto/slashing"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
log "github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
var highestValidatorIdx uint64
@@ -35,7 +37,9 @@ func saveToDB(db *Store) func(uint64, uint64, interface{}, int64) {
}
}
func unmarshalEpochSpanMap(enc []byte) (*slashpb.EpochSpanMap, error) {
func unmarshalEpochSpanMap(ctx context.Context, enc []byte) (*slashpb.EpochSpanMap, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalEpochSpanMap")
defer span.End()
epochSpanMap := &slashpb.EpochSpanMap{}
err := proto.Unmarshal(enc, epochSpanMap)
if err != nil {
@@ -47,7 +51,9 @@ func unmarshalEpochSpanMap(enc []byte) (*slashpb.EpochSpanMap, error) {
// ValidatorSpansMap accepts validator index and returns the corresponding spans
// map for slashing detection.
// Returns nil if the span map for this validator index does not exist.
func (db *Store) ValidatorSpansMap(validatorIdx uint64) (*slashpb.EpochSpanMap, error) {
func (db *Store) ValidatorSpansMap(ctx context.Context, validatorIdx uint64) (*slashpb.EpochSpanMap, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.ValidatorSpansMap")
defer span.End()
var err error
var spanMap *slashpb.EpochSpanMap
if db.spanCacheEnabled {
@@ -60,7 +66,7 @@ func (db *Store) ValidatorSpansMap(validatorIdx uint64) (*slashpb.EpochSpanMap,
err = db.view(func(tx *bolt.Tx) error {
b := tx.Bucket(validatorsMinMaxSpanBucket)
enc := b.Get(bytesutil.Bytes4(validatorIdx))
spanMap, err = unmarshalEpochSpanMap(enc)
spanMap, err = unmarshalEpochSpanMap(ctx, enc)
if err != nil {
return err
}
@@ -73,7 +79,9 @@ func (db *Store) ValidatorSpansMap(validatorIdx uint64) (*slashpb.EpochSpanMap,
}
// SaveValidatorSpansMap accepts a validator index and span map and writes it to disk.
func (db *Store) SaveValidatorSpansMap(validatorIdx uint64, spanMap *slashpb.EpochSpanMap) error {
func (db *Store) SaveValidatorSpansMap(ctx context.Context, validatorIdx uint64, spanMap *slashpb.EpochSpanMap) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveValidatorSpansMap")
defer span.End()
if db.spanCacheEnabled {
if validatorIdx > highestValidatorIdx {
highestValidatorIdx = validatorIdx
@@ -101,7 +109,9 @@ func (db *Store) SaveValidatorSpansMap(validatorIdx uint64, spanMap *slashpb.Epo
// SaveCachedSpansMaps saves all span map from cache to disk
// if no span maps are in db or cache is disabled it returns nil.
func (db *Store) SaveCachedSpansMaps() error {
func (db *Store) SaveCachedSpansMaps(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveCachedSpansMaps")
defer span.End()
if db.spanCacheEnabled {
err := db.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(validatorsMinMaxSpanBucket)
@@ -126,7 +136,9 @@ func (db *Store) SaveCachedSpansMaps() error {
}
// DeleteValidatorSpanMap deletes a validator span map using a validator index as bucket key.
func (db *Store) DeleteValidatorSpanMap(validatorIdx uint64) error {
func (db *Store) DeleteValidatorSpanMap(ctx context.Context, validatorIdx uint64) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteValidatorSpanMap")
defer span.End()
if db.spanCacheEnabled {
db.spanCache.Del(validatorIdx)
}

View File

@@ -1,6 +1,7 @@
package kv
import (
"context"
"flag"
"reflect"
"testing"
@@ -57,12 +58,12 @@ func init() {
func TestValidatorSpanMap_NilDB(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
validatorIdx := uint64(1)
vsm, err := db.ValidatorSpansMap(validatorIdx)
vsm, err := db.ValidatorSpansMap(ctx, validatorIdx)
if err != nil {
t.Fatalf("Nil ValidatorSpansMap should not return error: %v", err)
}
@@ -74,16 +75,16 @@ func TestValidatorSpanMap_NilDB(t *testing.T) {
func TestValidatorSpanMap_Save(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
for _, tt := range spanTests {
err := db.SaveValidatorSpansMap(tt.validatorIdx, tt.spanMap)
err := db.SaveValidatorSpansMap(ctx, tt.validatorIdx, tt.spanMap)
if err != nil {
t.Fatalf("Save validator span map failed: %v", err)
}
sm, err := db.ValidatorSpansMap(tt.validatorIdx)
sm, err := db.ValidatorSpansMap(ctx, tt.validatorIdx)
if err != nil {
t.Fatalf("Failed to get validator span map: %v", err)
}
@@ -98,18 +99,18 @@ func TestValidatorSpanMap_SaveWithCache(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
set.Bool(flags.UseSpanCacheFlag.Name, true, "enable span map cache")
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
for _, tt := range spanTests {
err := db.SaveValidatorSpansMap(tt.validatorIdx, tt.spanMap)
err := db.SaveValidatorSpansMap(ctx, tt.validatorIdx, tt.spanMap)
if err != nil {
t.Fatalf("Save validator span map failed: %v", err)
}
// wait for value to pass through cache buffers
time.Sleep(time.Millisecond * 10)
sm, err := db.ValidatorSpansMap(tt.validatorIdx)
sm, err := db.ValidatorSpansMap(ctx, tt.validatorIdx)
if err != nil {
t.Fatalf("Failed to get validator span map: %v", err)
}
@@ -123,30 +124,30 @@ func TestValidatorSpanMap_SaveWithCache(t *testing.T) {
func TestValidatorSpanMap_Delete(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
for _, tt := range spanTests {
err := db.SaveValidatorSpansMap(tt.validatorIdx, tt.spanMap)
err := db.SaveValidatorSpansMap(ctx, tt.validatorIdx, tt.spanMap)
if err != nil {
t.Fatalf("Save validator span map failed: %v", err)
}
}
for _, tt := range spanTests {
sm, err := db.ValidatorSpansMap(tt.validatorIdx)
sm, err := db.ValidatorSpansMap(ctx, tt.validatorIdx)
if err != nil {
t.Fatalf("Failed to get validator span map: %v", err)
}
if sm == nil || !proto.Equal(sm, tt.spanMap) {
t.Fatalf("Get should return validator span map: %v got: %v", tt.spanMap, sm)
}
err = db.DeleteValidatorSpanMap(tt.validatorIdx)
err = db.DeleteValidatorSpanMap(ctx, tt.validatorIdx)
if err != nil {
t.Fatalf("Delete validator span map error: %v", err)
}
sm, err = db.ValidatorSpansMap(tt.validatorIdx)
sm, err = db.ValidatorSpansMap(ctx, tt.validatorIdx)
if err != nil {
t.Fatal(err)
}
@@ -160,12 +161,12 @@ func TestValidatorSpanMap_DeleteWithCache(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
set.Bool(flags.UseSpanCacheFlag.Name, true, "enable span map cache")
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
for _, tt := range spanTests {
err := db.SaveValidatorSpansMap(tt.validatorIdx, tt.spanMap)
err := db.SaveValidatorSpansMap(ctx, tt.validatorIdx, tt.spanMap)
if err != nil {
t.Fatalf("Save validator span map failed: %v", err)
}
@@ -173,20 +174,20 @@ func TestValidatorSpanMap_DeleteWithCache(t *testing.T) {
// wait for value to pass through cache buffers
time.Sleep(time.Millisecond * 10)
for _, tt := range spanTests {
sm, err := db.ValidatorSpansMap(tt.validatorIdx)
sm, err := db.ValidatorSpansMap(ctx, tt.validatorIdx)
if err != nil {
t.Fatalf("Failed to get validator span map: %v", err)
}
if sm == nil || !proto.Equal(sm, tt.spanMap) {
t.Fatalf("Get should return validator span map: %v got: %v", tt.spanMap, sm)
}
err = db.DeleteValidatorSpanMap(tt.validatorIdx)
err = db.DeleteValidatorSpanMap(ctx, tt.validatorIdx)
if err != nil {
t.Fatalf("Delete validator span map error: %v", err)
}
// wait for value to pass through cache buffers
time.Sleep(time.Millisecond * 10)
sm, err = db.ValidatorSpansMap(tt.validatorIdx)
sm, err = db.ValidatorSpansMap(ctx, tt.validatorIdx)
if err != nil {
t.Fatal(err)
}
@@ -199,6 +200,8 @@ func TestValidatorSpanMap_DeleteWithCache(t *testing.T) {
func TestValidatorSpanMap_SaveOnEvict(t *testing.T) {
db := setupDBDiffCacheSize(t, 5, 5)
defer teardownDB(t, db)
ctx := context.Background()
tsm := &spanMapTestStruct{
validatorIdx: 1,
spanMap: &slashpb.EpochSpanMap{
@@ -210,7 +213,7 @@ func TestValidatorSpanMap_SaveOnEvict(t *testing.T) {
},
}
for i := uint64(0); i < 6; i++ {
err := db.SaveValidatorSpansMap(i, tsm.spanMap)
err := db.SaveValidatorSpansMap(ctx, i, tsm.spanMap)
if err != nil {
t.Fatalf("Save validator span map failed: %v", err)
}
@@ -219,7 +222,7 @@ func TestValidatorSpanMap_SaveOnEvict(t *testing.T) {
// Wait for value to pass through cache buffers.
time.Sleep(time.Millisecond * 1000)
for i := uint64(0); i < 6; i++ {
sm, err := db.ValidatorSpansMap(i)
sm, err := db.ValidatorSpansMap(ctx, i)
if err != nil {
t.Fatalf("Failed to get validator span map: %v", err)
}
@@ -233,25 +236,25 @@ func TestValidatorSpanMap_SaveCachedSpansMaps(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
set.Bool(flags.UseSpanCacheFlag.Name, true, "enable span map cache")
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
for _, tt := range spanTests {
err := db.SaveValidatorSpansMap(tt.validatorIdx, tt.spanMap)
err := db.SaveValidatorSpansMap(ctx, tt.validatorIdx, tt.spanMap)
if err != nil {
t.Fatalf("Save validator span map failed: %v", err)
}
}
// wait for value to pass through cache buffers
time.Sleep(time.Millisecond * 10)
err := db.SaveCachedSpansMaps()
err := db.SaveCachedSpansMaps(ctx)
if err != nil {
t.Errorf("Failed to save cached span maps to db: %v", err)
}
db.ClearSpanCache()
db.spanCache.Clear()
for _, tt := range spanTests {
sm, err := db.ValidatorSpansMap(tt.validatorIdx)
sm, err := db.ValidatorSpansMap(ctx, tt.validatorIdx)
if err != nil {
t.Fatalf("Failed to get validator span map: %v", err)
}

View File

@@ -2,6 +2,7 @@ package kv
import (
"bytes"
"context"
"github.com/boltdb/bolt"
"github.com/gogo/protobuf/proto"
@@ -9,9 +10,12 @@ import (
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/slasher/db/types"
"go.opencensus.io/trace"
)
func unmarshalProposerSlashing(enc []byte) (*ethpb.ProposerSlashing, error) {
func unmarshalProposerSlashing(ctx context.Context, enc []byte) (*ethpb.ProposerSlashing, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalProposerSlashing")
defer span.End()
protoSlashing := &ethpb.ProposerSlashing{}
if err := proto.Unmarshal(enc, protoSlashing); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal encoded proposer slashing")
@@ -19,10 +23,12 @@ func unmarshalProposerSlashing(enc []byte) (*ethpb.ProposerSlashing, error) {
return protoSlashing, nil
}
func unmarshalProposerSlashingArray(encoded [][]byte) ([]*ethpb.ProposerSlashing, error) {
func unmarshalProposerSlashingArray(ctx context.Context, encoded [][]byte) ([]*ethpb.ProposerSlashing, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.unmarshalProposerSlashingArray")
defer span.End()
proposerSlashings := make([]*ethpb.ProposerSlashing, len(encoded))
for i, enc := range encoded {
ps, err := unmarshalProposerSlashing(enc)
ps, err := unmarshalProposerSlashing(ctx, enc)
if err != nil {
return nil, err
}
@@ -32,7 +38,9 @@ func unmarshalProposerSlashingArray(encoded [][]byte) ([]*ethpb.ProposerSlashing
}
// ProposalSlashingsByStatus returns all the proposal slashing proofs with a certain status.
func (db *Store) ProposalSlashingsByStatus(status types.SlashingStatus) ([]*ethpb.ProposerSlashing, error) {
func (db *Store) ProposalSlashingsByStatus(ctx context.Context, status types.SlashingStatus) ([]*ethpb.ProposerSlashing, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.ProposalSlashingsByStatus")
defer span.End()
encoded := make([][]byte, 0)
err := db.view(func(tx *bolt.Tx) error {
c := tx.Bucket(slashingBucket).Cursor()
@@ -47,11 +55,13 @@ func (db *Store) ProposalSlashingsByStatus(status types.SlashingStatus) ([]*ethp
if err != nil {
return nil, err
}
return unmarshalProposerSlashingArray(encoded)
return unmarshalProposerSlashingArray(ctx, encoded)
}
// DeleteProposerSlashing deletes a proposer slashing proof.
func (db *Store) DeleteProposerSlashing(slashing *ethpb.ProposerSlashing) error {
func (db *Store) DeleteProposerSlashing(ctx context.Context, slashing *ethpb.ProposerSlashing) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.DeleteProposerSlashing")
defer span.End()
root, err := hashutil.HashProto(slashing)
if err != nil {
return errors.Wrap(err, "failed to get hash root of proposerSlashing")
@@ -68,7 +78,9 @@ func (db *Store) DeleteProposerSlashing(slashing *ethpb.ProposerSlashing) error
}
// HasProposerSlashing returns the slashing key if it is found in db.
func (db *Store) HasProposerSlashing(slashing *ethpb.ProposerSlashing) (bool, types.SlashingStatus, error) {
func (db *Store) HasProposerSlashing(ctx context.Context, slashing *ethpb.ProposerSlashing) (bool, types.SlashingStatus, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.HasProposerSlashing")
defer span.End()
var status types.SlashingStatus
var found bool
root, err := hashutil.HashProto(slashing)
@@ -90,7 +102,9 @@ func (db *Store) HasProposerSlashing(slashing *ethpb.ProposerSlashing) (bool, ty
}
// SaveProposerSlashing accepts a proposer slashing and its status header and writes it to disk.
func (db *Store) SaveProposerSlashing(status types.SlashingStatus, slashing *ethpb.ProposerSlashing) error {
func (db *Store) SaveProposerSlashing(ctx context.Context, status types.SlashingStatus, slashing *ethpb.ProposerSlashing) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveProposerSlashing")
defer span.End()
enc, err := proto.Marshal(slashing)
if err != nil {
return errors.Wrap(err, "failed to marshal")
@@ -105,7 +119,9 @@ func (db *Store) SaveProposerSlashing(status types.SlashingStatus, slashing *eth
}
// SaveProposerSlashings accepts a slice of slashing proof and its status and writes it to disk.
func (db *Store) SaveProposerSlashings(status types.SlashingStatus, slashings []*ethpb.ProposerSlashing) error {
func (db *Store) SaveProposerSlashings(ctx context.Context, status types.SlashingStatus, slashings []*ethpb.ProposerSlashing) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SaveProposerSlashings")
defer span.End()
encSlashings := make([][]byte, len(slashings))
keys := make([][]byte, len(slashings))
var err error

View File

@@ -1,6 +1,7 @@
package kv
import (
"context"
"flag"
"reflect"
"sort"
@@ -14,11 +15,12 @@ import (
func TestStore_ProposerSlashingNilBucket(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
ps := &ethpb.ProposerSlashing{ProposerIndex: 1}
has, _, err := db.HasProposerSlashing(ps)
has, _, err := db.HasProposerSlashing(ctx, ps)
if err != nil {
t.Fatalf("HasProposerSlashing should not return error: %v", err)
}
@@ -26,7 +28,7 @@ func TestStore_ProposerSlashingNilBucket(t *testing.T) {
t.Fatal("HasProposerSlashing should return false")
}
p, err := db.ProposalSlashingsByStatus(types.SlashingStatus(types.Active))
p, err := db.ProposalSlashingsByStatus(ctx, types.SlashingStatus(types.Active))
if err != nil {
t.Fatalf("Failed to get proposer slashing: %v", err)
}
@@ -38,9 +40,10 @@ func TestStore_ProposerSlashingNilBucket(t *testing.T) {
func TestStore_SaveProposerSlashing(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
tests := []struct {
ss types.SlashingStatus
ps *ethpb.ProposerSlashing
@@ -60,12 +63,12 @@ func TestStore_SaveProposerSlashing(t *testing.T) {
}
for _, tt := range tests {
err := db.SaveProposerSlashing(tt.ss, tt.ps)
err := db.SaveProposerSlashing(ctx, tt.ss, tt.ps)
if err != nil {
t.Fatalf("Save proposer slashing failed: %v", err)
}
proposerSlashings, err := db.ProposalSlashingsByStatus(tt.ss)
proposerSlashings, err := db.ProposalSlashingsByStatus(ctx, tt.ss)
if err != nil {
t.Fatalf("Failed to get proposer slashings: %v", err)
}
@@ -80,9 +83,10 @@ func TestStore_SaveProposerSlashing(t *testing.T) {
func TestStore_UpdateProposerSlashingStatus(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
tests := []struct {
ss types.SlashingStatus
ps *ethpb.ProposerSlashing
@@ -102,14 +106,14 @@ func TestStore_UpdateProposerSlashingStatus(t *testing.T) {
}
for _, tt := range tests {
err := db.SaveProposerSlashing(tt.ss, tt.ps)
err := db.SaveProposerSlashing(ctx, tt.ss, tt.ps)
if err != nil {
t.Fatalf("Save proposer slashing failed: %v", err)
}
}
for _, tt := range tests {
has, st, err := db.HasProposerSlashing(tt.ps)
has, st, err := db.HasProposerSlashing(ctx, tt.ps)
if err != nil {
t.Fatalf("Failed to get proposer slashing: %v", err)
}
@@ -120,8 +124,8 @@ func TestStore_UpdateProposerSlashingStatus(t *testing.T) {
t.Fatalf("Failed to find proposer slashing with the correct status: %v", tt.ps)
}
err = db.SaveProposerSlashing(types.SlashingStatus(types.Included), tt.ps)
has, st, err = db.HasProposerSlashing(tt.ps)
err = db.SaveProposerSlashing(ctx, types.SlashingStatus(types.Included), tt.ps)
has, st, err = db.HasProposerSlashing(ctx, tt.ps)
if err != nil {
t.Fatalf("Failed to get proposer slashing: %v", err)
}
@@ -139,19 +143,20 @@ func TestStore_UpdateProposerSlashingStatus(t *testing.T) {
func TestStore_SaveProposerSlashings(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
ps := []*ethpb.ProposerSlashing{
{ProposerIndex: 1},
{ProposerIndex: 2},
{ProposerIndex: 3},
}
err := db.SaveProposerSlashings(types.Active, ps)
err := db.SaveProposerSlashings(ctx, types.Active, ps)
if err != nil {
t.Fatalf("Save proposer slashings failed: %v", err)
}
proposerSlashings, err := db.ProposalSlashingsByStatus(types.Active)
proposerSlashings, err := db.ProposalSlashingsByStatus(ctx, types.Active)
if err != nil {
t.Fatalf("Failed to get proposer slashings: %v", err)
}

View File

@@ -1,14 +1,19 @@
package kv
import (
"context"
"github.com/boltdb/bolt"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"go.opencensus.io/trace"
)
// ValidatorPubKey accepts validator id and returns the corresponding pubkey.
// Returns nil if the pubkey for this validator id does not exist.
func (db *Store) ValidatorPubKey(validatorID uint64) ([]byte, error) {
func (db *Store) ValidatorPubKey(ctx context.Context, validatorID uint64) ([]byte, error) {
ctx, span := trace.StartSpan(ctx, "SlasherDB.ValidatorPubKey")
defer span.End()
var pk []byte
err := db.view(func(tx *bolt.Tx) error {
b := tx.Bucket(validatorsPublicKeysBucket)
@@ -19,7 +24,9 @@ func (db *Store) ValidatorPubKey(validatorID uint64) ([]byte, error) {
}
// SavePubKey accepts a validator id and its public key and writes it to disk.
func (db *Store) SavePubKey(validatorID uint64, pubKey []byte) error {
func (db *Store) SavePubKey(ctx context.Context, validatorID uint64, pubKey []byte) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.SavePubKey")
defer span.End()
err := db.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(validatorsPublicKeysBucket)
key := bytesutil.Bytes4(validatorID)
@@ -32,7 +39,9 @@ func (db *Store) SavePubKey(validatorID uint64, pubKey []byte) error {
}
// DeletePubKey deletes a public key of a validator id.
func (db *Store) DeletePubKey(validatorID uint64) error {
func (db *Store) DeletePubKey(ctx context.Context, validatorID uint64) error {
ctx, span := trace.StartSpan(ctx, "SlasherDB.DeletePubKey")
defer span.End()
return db.update(func(tx *bolt.Tx) error {
bucket := tx.Bucket(validatorsPublicKeysBucket)
key := bytesutil.Bytes4(validatorID)

View File

@@ -2,6 +2,7 @@ package kv
import (
"bytes"
"context"
"flag"
"testing"
@@ -35,13 +36,13 @@ func init() {
func TestNilDBValidatorPublicKey(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
validatorID := uint64(1)
pk, err := db.ValidatorPubKey(validatorID)
pk, err := db.ValidatorPubKey(ctx, validatorID)
if err != nil {
t.Fatal("nil ValidatorPubKey should not return error")
}
@@ -54,17 +55,17 @@ func TestNilDBValidatorPublicKey(t *testing.T) {
func TestSavePubKey(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
for _, tt := range pkTests {
err := db.SavePubKey(tt.validatorID, tt.pk)
err := db.SavePubKey(ctx, tt.validatorID, tt.pk)
if err != nil {
t.Fatalf("save validator public key failed: %v", err)
}
pk, err := db.ValidatorPubKey(tt.validatorID)
pk, err := db.ValidatorPubKey(ctx, tt.validatorID)
if err != nil {
t.Fatalf("failed to get validator public key: %v", err)
}
@@ -79,20 +80,20 @@ func TestSavePubKey(t *testing.T) {
func TestDeletePublicKey(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
db := setupDB(t, ctx)
db := setupDB(t, cli.NewContext(app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()
for _, tt := range pkTests {
err := db.SavePubKey(tt.validatorID, tt.pk)
err := db.SavePubKey(ctx, tt.validatorID, tt.pk)
if err != nil {
t.Fatalf("save validator public key failed: %v", err)
}
}
for _, tt := range pkTests {
pk, err := db.ValidatorPubKey(tt.validatorID)
pk, err := db.ValidatorPubKey(ctx, tt.validatorID)
if err != nil {
t.Fatalf("failed to get validator public key: %v", err)
}
@@ -100,11 +101,11 @@ func TestDeletePublicKey(t *testing.T) {
if pk == nil || !bytes.Equal(pk, tt.pk) {
t.Fatalf("get should return validator public key: %v", pk)
}
err = db.DeletePubKey(tt.validatorID)
err = db.DeletePubKey(ctx, tt.validatorID)
if err != nil {
t.Fatalf("delete validator public key: %v", err)
}
pk, err = db.ValidatorPubKey(tt.validatorID)
pk, err = db.ValidatorPubKey(ctx, tt.validatorID)
if err != nil {
t.Fatal(err)
}

View File

@@ -51,7 +51,7 @@ func SetupSlasherDBDiffCacheSize(t testing.TB, cacheItems int64, maxCacheSize in
return newDB
}
// TeardownSlasherDB cleans up a test BeaconDB instance.
// TeardownSlasherDB cleans up a test SlasherDB instance.
func TeardownSlasherDB(t testing.TB, db *kv.Store) {
if err := db.Close(); err != nil {
t.Fatalf("Failed to close database: %v", err)

View File

@@ -11,8 +11,7 @@ import (
func TestClearDB(t *testing.T) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
ctx := cli.NewContext(app, set, nil)
slasherDB := SetupSlasherDB(t, ctx)
slasherDB := SetupSlasherDB(t, cli.NewContext(app, set, nil))
defer TeardownSlasherDB(t, slasherDB)
if err := slasherDB.ClearDB(); err != nil {
t.Fatal(err)

View File

@@ -77,7 +77,7 @@ func (ss *Server) DetectAndUpdateMaxEpochSpan(
break
}
}
if err := ss.SlasherDB.SaveValidatorSpansMap(validatorIdx, spanMap); err != nil {
if err := ss.SlasherDB.SaveValidatorSpansMap(ctx, validatorIdx, spanMap); err != nil {
return 0, nil, err
}
return 0, spanMap, nil

View File

@@ -200,11 +200,12 @@ func TestServer_UpdateMaxEpochSpan(t *testing.T) {
db := testDB.SetupSlasherDB(t, c)
defer testDB.TeardownSlasherDB(t, db)
ctx := context.Background()
slasherServer := &Server{
SlasherDB: db,
}
for _, tt := range spanTestsMax {
spanMap, err := slasherServer.SlasherDB.ValidatorSpansMap(tt.validatorIdx)
spanMap, err := slasherServer.SlasherDB.ValidatorSpansMap(ctx, tt.validatorIdx)
if err != nil {
t.Fatal(err)
}
@@ -212,13 +213,13 @@ func TestServer_UpdateMaxEpochSpan(t *testing.T) {
if err != nil {
t.Fatalf("Failed to update span: %v", err)
}
if err := slasherServer.SlasherDB.SaveValidatorSpansMap(tt.validatorIdx, spanMap); err != nil {
if err := slasherServer.SlasherDB.SaveValidatorSpansMap(ctx, tt.validatorIdx, spanMap); err != nil {
t.Fatalf("Couldnt save span map for validator id: %d", tt.validatorIdx)
}
if st != tt.slashingTargetEpoch {
t.Fatalf("Expected slashing target: %d got: %d", tt.slashingTargetEpoch, st)
}
sm, err := slasherServer.SlasherDB.ValidatorSpansMap(tt.validatorIdx)
sm, err := slasherServer.SlasherDB.ValidatorSpansMap(ctx, tt.validatorIdx)
if err != nil {
t.Fatalf("Failed to retrieve span: %v", err)
}
@@ -239,7 +240,7 @@ func TestServer_UpdateMinEpochSpan(t *testing.T) {
SlasherDB: db,
}
for _, tt := range spanTestsMin {
spanMap, err := slasherServer.SlasherDB.ValidatorSpansMap(tt.validatorIdx)
spanMap, err := slasherServer.SlasherDB.ValidatorSpansMap(ctx, tt.validatorIdx)
if err != nil {
t.Fatal(err)
}
@@ -247,13 +248,13 @@ func TestServer_UpdateMinEpochSpan(t *testing.T) {
if err != nil {
t.Fatalf("Failed to update span: %v", err)
}
if err := slasherServer.SlasherDB.SaveValidatorSpansMap(tt.validatorIdx, spanMap); err != nil {
if err := slasherServer.SlasherDB.SaveValidatorSpansMap(ctx, tt.validatorIdx, spanMap); err != nil {
t.Fatalf("Couldnt save span map for validator id: %d", tt.validatorIdx)
}
if st != tt.slashingTargetEpoch {
t.Fatalf("Expected slashing target: %d got: %d", tt.slashingTargetEpoch, st)
}
sm, err := slasherServer.SlasherDB.ValidatorSpansMap(tt.validatorIdx)
sm, err := slasherServer.SlasherDB.ValidatorSpansMap(ctx, tt.validatorIdx)
if err != nil {
t.Fatalf("Failed to retrieve span: %v", err)
}
@@ -285,7 +286,7 @@ func TestServer_FailToUpdate(t *testing.T) {
},
},
}
spanMap, err := slasherServer.SlasherDB.ValidatorSpansMap(spanTestsFail.validatorIdx)
spanMap, err := slasherServer.SlasherDB.ValidatorSpansMap(ctx, spanTestsFail.validatorIdx)
if err != nil {
t.Fatal(err)
}

View File

@@ -30,7 +30,7 @@ func (ss *Server) IsSlashableAttestation(ctx context.Context, req *ethpb.Indexed
if req.Data == nil {
return nil, fmt.Errorf("cant hash nil data in indexed attestation")
}
if err := ss.SlasherDB.SaveIndexedAttestation(req); err != nil {
if err := ss.SlasherDB.SaveIndexedAttestation(ctx, req); err != nil {
return nil, err
}
indices := req.AttestingIndices
@@ -49,7 +49,7 @@ func (ss *Server) IsSlashableAttestation(ctx context.Context, req *ethpb.Indexed
}
wg.Add(1)
go func(idx uint64, root [32]byte, req *ethpb.IndexedAttestation) {
atts, err := ss.SlasherDB.DoubleVotes(idx, root[:], req)
atts, err := ss.SlasherDB.DoubleVotes(ctx, idx, root[:], req)
if err != nil {
errorChans <- err
wg.Done()
@@ -99,7 +99,7 @@ func (ss *Server) UpdateSpanMaps(ctx context.Context, req *ethpb.IndexedAttestat
}
wg.Add(1)
go func(i uint64) {
spanMap, err := ss.SlasherDB.ValidatorSpansMap(i)
spanMap, err := ss.SlasherDB.ValidatorSpansMap(ctx, i)
if err != nil {
er <- err
wg.Done()
@@ -122,7 +122,7 @@ func (ss *Server) UpdateSpanMaps(ctx context.Context, req *ethpb.IndexedAttestat
wg.Done()
return
}
if err := ss.SlasherDB.SaveValidatorSpansMap(i, spanMap); err != nil {
if err := ss.SlasherDB.SaveValidatorSpansMap(ctx, i, spanMap); err != nil {
er <- err
wg.Done()
return
@@ -142,7 +142,7 @@ func (ss *Server) UpdateSpanMaps(ctx context.Context, req *ethpb.IndexedAttestat
func (ss *Server) IsSlashableBlock(ctx context.Context, psr *slashpb.ProposerSlashingRequest) (*slashpb.ProposerSlashingResponse, error) {
//TODO(#3133): add signature validation
epoch := helpers.SlotToEpoch(psr.BlockHeader.Header.Slot)
blockHeaders, err := ss.SlasherDB.BlockHeaders(epoch, psr.ValidatorIndex)
blockHeaders, err := ss.SlasherDB.BlockHeaders(ctx, epoch, psr.ValidatorIndex)
if err != nil {
return nil, errors.Wrap(err, "slasher service error while trying to retrieve blocks")
}
@@ -156,7 +156,7 @@ func (ss *Server) IsSlashableBlock(ctx context.Context, psr *slashpb.ProposerSla
pSlashingsResponse.ProposerSlashing = append(pSlashingsResponse.ProposerSlashing, &ethpb.ProposerSlashing{ProposerIndex: psr.ValidatorIndex, Header_1: psr.BlockHeader, Header_2: bh})
}
if len(pSlashingsResponse.ProposerSlashing) == 0 && !presentInDb {
err = ss.SlasherDB.SaveBlockHeader(epoch, psr.ValidatorIndex, psr.BlockHeader)
err = ss.SlasherDB.SaveBlockHeader(ctx, epoch, psr.ValidatorIndex, psr.BlockHeader)
if err != nil {
return nil, err
}
@@ -168,7 +168,7 @@ func (ss *Server) IsSlashableBlock(ctx context.Context, psr *slashpb.ProposerSla
func (ss *Server) ProposerSlashings(ctx context.Context, st *slashpb.SlashingStatusRequest) (*slashpb.ProposerSlashingResponse, error) {
pSlashingsResponse := &slashpb.ProposerSlashingResponse{}
var err error
pSlashingsResponse.ProposerSlashing, err = ss.SlasherDB.ProposalSlashingsByStatus(types.SlashingStatus(st.Status))
pSlashingsResponse.ProposerSlashing, err = ss.SlasherDB.ProposalSlashingsByStatus(ctx, types.SlashingStatus(st.Status))
if err != nil {
return nil, err
}
@@ -179,7 +179,7 @@ func (ss *Server) ProposerSlashings(ctx context.Context, st *slashpb.SlashingSta
func (ss *Server) AttesterSlashings(ctx context.Context, st *slashpb.SlashingStatusRequest) (*slashpb.AttesterSlashingResponse, error) {
aSlashingsResponse := &slashpb.AttesterSlashingResponse{}
var err error
aSlashingsResponse.AttesterSlashing, err = ss.SlasherDB.AttesterSlashings(types.SlashingStatus(st.Status))
aSlashingsResponse.AttesterSlashing, err = ss.SlasherDB.AttesterSlashings(ctx, types.SlashingStatus(st.Status))
if err != nil {
return nil, err
}
@@ -189,7 +189,7 @@ func (ss *Server) AttesterSlashings(ctx context.Context, st *slashpb.SlashingSta
// DetectSurroundVotes is a method used to return the attestation that were detected
// by min max surround detection method.
func (ss *Server) DetectSurroundVotes(ctx context.Context, validatorIdx uint64, req *ethpb.IndexedAttestation) ([]*ethpb.AttesterSlashing, error) {
spanMap, err := ss.SlasherDB.ValidatorSpansMap(validatorIdx)
spanMap, err := ss.SlasherDB.ValidatorSpansMap(ctx, validatorIdx)
if err != nil {
return nil, errors.Wrap(err, "failed to get validator spans map")
}
@@ -201,13 +201,13 @@ func (ss *Server) DetectSurroundVotes(ctx context.Context, validatorIdx uint64,
if err != nil {
return nil, errors.Wrap(err, "failed to update max spans")
}
if err := ss.SlasherDB.SaveValidatorSpansMap(validatorIdx, spanMap); err != nil {
if err := ss.SlasherDB.SaveValidatorSpansMap(ctx, validatorIdx, spanMap); err != nil {
return nil, errors.Wrap(err, "failed to save validator spans map")
}
var as []*ethpb.AttesterSlashing
if minTargetEpoch > 0 {
attestations, err := ss.SlasherDB.IdxAttsForTargetFromID(minTargetEpoch, validatorIdx)
attestations, err := ss.SlasherDB.IdxAttsForTargetFromID(ctx, minTargetEpoch, validatorIdx)
if err != nil {
return nil, err
}
@@ -224,7 +224,7 @@ func (ss *Server) DetectSurroundVotes(ctx context.Context, validatorIdx uint64,
}
}
if maxTargetEpoch > 0 {
attestations, err := ss.SlasherDB.IdxAttsForTargetFromID(maxTargetEpoch, validatorIdx)
attestations, err := ss.SlasherDB.IdxAttsForTargetFromID(ctx, maxTargetEpoch, validatorIdx)
if err != nil {
return nil, err
}

View File

@@ -153,6 +153,7 @@ func TestServer_SameSlotSlashable(t *testing.T) {
db := testDB.SetupSlasherDB(t, c)
defer testDB.TeardownSlasherDB(t, db)
ctx := context.Background()
slasherServer := &Server{
ctx: ctx,
SlasherDB: db,
@@ -196,7 +197,7 @@ func TestServer_SameSlotSlashable(t *testing.T) {
t.Errorf("wanted slashing proof: %v got: %v", want, sr.ProposerSlashing[0])
}
if err := slasherServer.SlasherDB.SaveProposerSlashing(types.Active, sr.ProposerSlashing[0]); err != nil {
if err := slasherServer.SlasherDB.SaveProposerSlashing(ctx, types.Active, sr.ProposerSlashing[0]); err != nil {
t.Errorf("Could not call db method: %v", err)
}
if sr, err = slasherServer.ProposerSlashings(ctx, &slashpb.SlashingStatusRequest{Status: slashpb.SlashingStatusRequest_Active}); err != nil {
@@ -228,6 +229,7 @@ func TestServer_SlashDoubleAttestation(t *testing.T) {
db := testDB.SetupSlasherDB(t, c)
defer testDB.TeardownSlasherDB(t, db)
ctx := context.Background()
slasherServer := &Server{
ctx: ctx,
SlasherDB: db,
@@ -584,7 +586,7 @@ func TestServer_SlashSurroundAttestation(t *testing.T) {
t.Errorf("Wanted slashing proof: %v got: %v", want, sr.AttesterSlashing[0])
}
if err := slasherServer.SlasherDB.SaveAttesterSlashing(types.Active, sr.AttesterSlashing[0]); err != nil {
if err := slasherServer.SlasherDB.SaveAttesterSlashing(ctx, types.Active, sr.AttesterSlashing[0]); err != nil {
t.Errorf("Could not call db method: %v", err)
}
pr, err := slasherServer.ProposerSlashings(ctx, &slashpb.SlashingStatusRequest{Status: slashpb.SlashingStatusRequest_Active})

View File

@@ -26,22 +26,21 @@ func BenchmarkMinSpan(b *testing.B) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
set.Bool(flags.UseSpanCacheFlag.Name, true, "enable span map cache")
ctx := cli.NewContext(app, set, nil)
dbs := testDB.SetupSlasherDB(b, ctx)
defer testDB.TeardownSlasherDB(b, dbs)
db := testDB.SetupSlasherDB(b, cli.NewContext(app, set, nil))
defer testDB.TeardownSlasherDB(b, db)
ctx := context.Background()
context := context.Background()
slasherServer := &Server{
SlasherDB: dbs,
SlasherDB: db,
}
for _, diff := range diffs {
b.Run(fmt.Sprintf("MinSpan_diff_%d", diff), func(ib *testing.B) {
for i := uint64(0); i < uint64(ib.N); i++ {
spanMap, err := slasherServer.SlasherDB.ValidatorSpansMap(i % 10)
spanMap, err := slasherServer.SlasherDB.ValidatorSpansMap(ctx, i%10)
if err != nil {
b.Fatal(err)
}
_, _, err = slasherServer.DetectAndUpdateMinEpochSpan(context, i, i+diff, i%10, spanMap)
_, _, err = slasherServer.DetectAndUpdateMinEpochSpan(ctx, i, i+diff, i%10, spanMap)
if err != nil {
b.Fatal(err)
}
@@ -55,22 +54,21 @@ func BenchmarkMaxSpan(b *testing.B) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
set.Bool(flags.UseSpanCacheFlag.Name, true, "enable span map cache")
ctx := cli.NewContext(app, set, nil)
db := testDB.SetupSlasherDB(b, ctx)
db := testDB.SetupSlasherDB(b, cli.NewContext(app, set, nil))
defer testDB.TeardownSlasherDB(b, db)
ctx := context.Background()
context := context.Background()
slasherServer := &Server{
SlasherDB: db,
}
for _, diff := range diffs {
b.Run(fmt.Sprintf("MaxSpan_diff_%d", diff), func(ib *testing.B) {
for i := uint64(0); i < uint64(ib.N); i++ {
spanMap, err := slasherServer.SlasherDB.ValidatorSpansMap(i % 10)
spanMap, err := slasherServer.SlasherDB.ValidatorSpansMap(ctx, i%10)
if err != nil {
b.Fatal(err)
}
_, _, err = slasherServer.DetectAndUpdateMaxEpochSpan(context, diff, diff+i, i%10, spanMap)
_, _, err = slasherServer.DetectAndUpdateMaxEpochSpan(ctx, diff, diff+i, i%10, spanMap)
if err != nil {
b.Fatal(err)
}
@@ -84,9 +82,9 @@ func BenchmarkDetectSpan(b *testing.B) {
app := cli.NewApp()
set := flag.NewFlagSet("test", 0)
set.Bool(flags.UseSpanCacheFlag.Name, true, "enable span map cache")
ctx := cli.NewContext(app, set, nil)
db := testDB.SetupSlasherDB(b, ctx)
db := testDB.SetupSlasherDB(b, cli.NewContext(app, set, nil))
defer testDB.TeardownSlasherDB(b, db)
ctx := context.Background()
slasherServer := &Server{
SlasherDB: db,
@@ -94,7 +92,7 @@ func BenchmarkDetectSpan(b *testing.B) {
for _, diff := range diffs {
b.Run(fmt.Sprintf("Detect_MaxSpan_diff_%d", diff), func(ib *testing.B) {
for i := uint64(0); i < uint64(ib.N); i++ {
spanMap, err := slasherServer.SlasherDB.ValidatorSpansMap(i % 10)
spanMap, err := slasherServer.SlasherDB.ValidatorSpansMap(ctx, i%10)
if err != nil {
b.Fatal(err)
}
@@ -108,7 +106,7 @@ func BenchmarkDetectSpan(b *testing.B) {
for _, diff := range diffs {
b.Run(fmt.Sprintf("Detect_MinSpan_diff_%d", diff), func(ib *testing.B) {
for i := uint64(0); i < uint64(ib.N); i++ {
spanMap, err := slasherServer.SlasherDB.ValidatorSpansMap(i % 10)
spanMap, err := slasherServer.SlasherDB.ValidatorSpansMap(ctx, i%10)
if err != nil {
b.Fatal(err)
}
@@ -126,12 +124,12 @@ func BenchmarkCheckAttestations(b *testing.B) {
set := flag.NewFlagSet("test", 0)
set.Bool(flags.UseSpanCacheFlag.Name, true, "enable span map cache")
ctx := cli.NewContext(app, set, nil)
dbs := testDB.SetupSlasherDB(b, ctx)
defer testDB.TeardownSlasherDB(b, dbs)
db := testDB.SetupSlasherDB(b, ctx)
defer testDB.TeardownSlasherDB(b, db)
context := context.Background()
slasherServer := &Server{
ctx: context,
SlasherDB: dbs,
SlasherDB: db,
}
var cb []uint64
for i := uint64(0); i < 100; i++ {

View File

@@ -48,7 +48,7 @@ func (s *Service) historicalAttestationFeeder(ctx context.Context) error {
continue
}
}
if err := s.slasherDb.SetLatestEpochDetected(epoch); err != nil {
if err := s.slasherDb.SetLatestEpochDetected(ctx, epoch); err != nil {
log.Error(err)
continue
}
@@ -132,7 +132,7 @@ func (s *Service) detectSlashings(ctx context.Context, idxAtt *ethpb.IndexedAtte
}
if len(attSlashingResp.AttesterSlashing) > 0 {
if err := s.slasherDb.SaveAttesterSlashings(types.Active, attSlashingResp.AttesterSlashing); err != nil {
if err := s.slasherDb.SaveAttesterSlashings(ctx, types.Active, attSlashingResp.AttesterSlashing); err != nil {
return errors.Wrap(err, "failed to save attester slashings")
}
for _, as := range attSlashingResp.AttesterSlashing {
@@ -226,7 +226,7 @@ func (s *Service) attsAndCommitteesForEpoch(
}
func (s *Service) getLatestDetectedEpoch() (uint64, error) {
e, err := s.slasherDb.GetLatestEpochDetected()
e, err := s.slasherDb.GetLatestEpochDetected(s.context)
if err != nil {
return 0, err
}

View File

@@ -268,12 +268,12 @@ func (s *Service) startBeaconClient() error {
}
func (s *Service) loadSpanMaps(slasherServer rpc.Server) {
latestTargetEpoch, err := slasherServer.SlasherDB.LatestIndexedAttestationsTargetEpoch()
latestTargetEpoch, err := slasherServer.SlasherDB.LatestIndexedAttestationsTargetEpoch(s.context)
if err != nil {
log.Errorf("Could not extract latest target epoch from indexed attestations store: %v", err)
}
for epoch := uint64(0); epoch < latestTargetEpoch; epoch++ {
idxAtts, err := slasherServer.SlasherDB.IdxAttsForTarget(epoch)
idxAtts, err := slasherServer.SlasherDB.IdxAttsForTarget(s.context, epoch)
if err != nil {
log.Errorf("Got error while trying to retrieve indexed attestations from db: %v", err)
}
@@ -309,7 +309,7 @@ func (s *Service) Close() {
if err := s.Stop(); err != nil {
log.Panicf("Could not stop the slasher service: %v", err)
}
if err := s.slasherDb.SaveCachedSpansMaps(); err != nil {
if err := s.slasherDb.SaveCachedSpansMaps(s.context); err != nil {
log.Fatal("Didn't save span map cache to db. if span cache is enabled please restart with --%s", flags.RebuildSpanMapsFlag.Name)
}
if err := s.slasherDb.Close(); err != nil {