Remove Old References to Block Vote Cache (#1585)

* rem references to vote cache

* fix build
This commit is contained in:
Raul Jordan
2019-02-14 08:54:29 -06:00
committed by GitHub
parent 20a340bfcc
commit e052e457df
15 changed files with 2 additions and 537 deletions

View File

@@ -36,7 +36,6 @@ go_test(
embed = [":go_default_library"],
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/utils:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/hashutil:go_default_library",

View File

@@ -7,7 +7,6 @@ import (
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/prysmaticlabs/prysm/beacon-chain/utils"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
@@ -18,18 +17,13 @@ func init() {
}
type mockDB struct {
hasBlock bool
blockVoteCache utils.BlockVoteCache
hasBlock bool
}
func (f *mockDB) HasBlock(h [32]byte) bool {
return f.hasBlock
}
func (f *mockDB) ReadBlockVoteCache(blockHashes [][32]byte) (utils.BlockVoteCache, error) {
return f.blockVoteCache, nil
}
type mockPOWClient struct {
blockExists bool
}

View File

@@ -6,7 +6,6 @@ go_library(
"attestation.go",
"block.go",
"block_operations.go",
"block_vote_cache.go",
"cleanup_history.go",
"db.go",
"pending_deposits.go",
@@ -19,7 +18,6 @@ go_library(
"//beacon-chain/core/attestations:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/state:go_default_library",
"//beacon-chain/utils:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/hashutil:go_default_library",
@@ -38,7 +36,6 @@ go_test(
"attestation_test.go",
"block_operations_test.go",
"block_test.go",
"block_vote_cache_test.go",
"cleanup_history_test.go",
"db_test.go",
"pending_deposits_test.go",
@@ -49,7 +46,6 @@ go_test(
"//beacon-chain/core/attestations:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/validators:go_default_library",
"//beacon-chain/utils:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",

View File

@@ -1,61 +0,0 @@
package db
import (
"fmt"
"github.com/boltdb/bolt"
"github.com/prysmaticlabs/prysm/beacon-chain/utils"
)
// ReadBlockVoteCache read block vote cache from DB.
func (db *BeaconDB) ReadBlockVoteCache(blockHashes [][32]byte) (utils.BlockVoteCache, error) {
blockVoteCache := utils.NewBlockVoteCache()
err := db.view(func(tx *bolt.Tx) error {
blockVoteCacheInfo := tx.Bucket(blockVoteCacheBucket)
for _, h := range blockHashes {
blob := blockVoteCacheInfo.Get(h[:])
if blob == nil {
continue
}
vote := new(utils.BlockVote)
if err := vote.Unmarshal(blob); err != nil {
return fmt.Errorf("failed to decode block vote cache for block hash %x", h)
}
blockVoteCache[h] = vote
}
return nil
})
return blockVoteCache, err
}
// DeleteBlockVoteCache removes vote cache for specified blocks from DB.
func (db *BeaconDB) DeleteBlockVoteCache(blockHashes [][32]byte) error {
err := db.update(func(tx *bolt.Tx) error {
blockVoteCacheInfo := tx.Bucket(blockVoteCacheBucket)
for _, h := range blockHashes {
if err := blockVoteCacheInfo.Delete(h[:]); err != nil {
return fmt.Errorf("failed to delete block vote cache for block hash %x: %v", h, err)
}
}
return nil
})
return err
}
// WriteBlockVoteCache write block vote cache object into DB.
func (db *BeaconDB) WriteBlockVoteCache(blockVoteCache utils.BlockVoteCache) error {
err := db.update(func(tx *bolt.Tx) error {
blockVoteCacheInfo := tx.Bucket(blockVoteCacheBucket)
for h := range blockVoteCache {
blob, err := blockVoteCache[h].Marshal()
if err != nil {
return fmt.Errorf("failed to encode block vote cache for block hash %x", h)
}
if err = blockVoteCacheInfo.Put(h[:], blob); err != nil {
return fmt.Errorf("failed to store block vote cache into DB")
}
}
return nil
})
return err
}

View File

@@ -1,66 +0,0 @@
package db
import (
"reflect"
"testing"
"github.com/prysmaticlabs/prysm/beacon-chain/utils"
)
func TestBlockVoteCacheReadWrite(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
blockVoteCache := utils.NewBlockVoteCache()
blockVote1 := &utils.BlockVote{VoterIndices: []uint32{1, 2, 3}, VoteTotalDeposit: 6}
blockVote2 := &utils.BlockVote{VoterIndices: []uint32{4, 5, 6}, VoteTotalDeposit: 15}
blockHash1 := [32]byte{1}
blockHash2 := [32]byte{2}
blockVoteCache[blockHash1] = blockVote1
blockVoteCache[blockHash2] = blockVote2
var err error
if err = db.WriteBlockVoteCache(blockVoteCache); err != nil {
t.Fatalf("failed to write block vote cache to DB: %v", err)
}
blockVoteCache2, err := db.ReadBlockVoteCache([][32]byte{blockHash1, blockHash2})
if err != nil {
t.Fatalf("failed to read block vote cache from DB: %v", err)
}
if !reflect.DeepEqual(blockVoteCache, blockVoteCache2) {
t.Error("block vote cache read write don't match")
}
}
func TestBlockVoteCacheDelete(t *testing.T) {
db := setupDB(t)
defer teardownDB(t, db)
blockVoteCache := utils.NewBlockVoteCache()
blockVote1 := &utils.BlockVote{VoterIndices: []uint32{1, 2, 3}, VoteTotalDeposit: 6}
blockVote2 := &utils.BlockVote{VoterIndices: []uint32{4, 5, 6}, VoteTotalDeposit: 15}
blockHash1 := [32]byte{1}
blockHash2 := [32]byte{2}
blockVoteCache[blockHash1] = blockVote1
blockVoteCache[blockHash2] = blockVote2
var err error
if err = db.WriteBlockVoteCache(blockVoteCache); err != nil {
t.Fatalf("failed to write block vote cache to DB: %v", err)
}
if err = db.DeleteBlockVoteCache([][32]byte{blockHash1, blockHash2}); err != nil {
t.Fatalf("failed to delete block vote cache from DB: %v", err)
}
var voteCache utils.BlockVoteCache
voteCache, err = db.ReadBlockVoteCache([][32]byte{blockHash1})
if err != nil {
t.Error("should not expect error when reading already deleted block vote cache")
}
if len(voteCache) != 0 {
t.Error("should expect empty result when reading already deleted block vote cache")
}
}

View File

@@ -68,7 +68,7 @@ func NewDB(dirPath string) (*BeaconDB, error) {
if err := db.update(func(tx *bolt.Tx) error {
return createBuckets(tx, blockBucket, attestationBucket, mainChainBucket,
chainInfoBucket, blockVoteCacheBucket, cleanupHistoryBucket, blockOperationsBucket)
chainInfoBucket, cleanupHistoryBucket, blockOperationsBucket)
}); err != nil {
return nil, err

View File

@@ -18,7 +18,6 @@ var (
blockBucket = []byte("block-bucket")
mainChainBucket = []byte("main-chain-bucket")
chainInfoBucket = []byte("chain-info")
blockVoteCacheBucket = []byte("block-vote-cache")
mainChainHeightKey = []byte("chain-height")
stateLookupKey = []byte("state")

View File

@@ -1,34 +0,0 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["service.go"],
importpath = "github.com/prysmaticlabs/prysm/beacon-chain/dbcleanup",
visibility = ["//beacon-chain:__subpackages__"],
deps = [
"//beacon-chain/db:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/event:go_default_library",
"//shared/hashutil:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["service_test.go"],
embed = [":go_default_library"],
deps = [
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/validators:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/internal:go_default_library",
"//beacon-chain/utils:go_default_library",
"//proto/beacon/p2p/v1:go_default_library",
"//shared/event:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"//shared/testutil:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)

View File

@@ -1,125 +0,0 @@
// Package dbcleanup defines the life cycle and logic of beacon DB cleanup routine.
package dbcleanup
import (
"context"
"fmt"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/sirupsen/logrus"
)
var log = logrus.WithField("prefix", "dbcleaner")
type chainService interface {
CanonicalStateFeed() *event.Feed
}
// CleanupService represents a service that handles routine task for cleaning up
// beacon DB so our DB won't grow infinitely.
// Currently it only cleans up block vote cache. In future, it could add more tasks
// such as cleaning up historical beacon states.
type CleanupService struct {
ctx context.Context
cancel context.CancelFunc
beaconDB *db.BeaconDB
chainService chainService
canonicalStateChan chan *pb.BeaconState
}
// Config defines the needed fields for creating a new cleanup service.
type Config struct {
SubscriptionBuf int
BeaconDB *db.BeaconDB
ChainService chainService
}
// NewCleanupService creates a new cleanup service instance.
func NewCleanupService(ctx context.Context, cfg *Config) *CleanupService {
ctx, cancel := context.WithCancel(ctx)
return &CleanupService{
ctx: ctx,
cancel: cancel,
beaconDB: cfg.BeaconDB,
chainService: cfg.ChainService,
canonicalStateChan: make(chan *pb.BeaconState, cfg.SubscriptionBuf),
}
}
// Start a cleanup service.
func (d *CleanupService) Start() {
log.Info("Starting service")
go d.cleanDB()
}
// Stop a cleanup service.
func (d *CleanupService) Stop() error {
defer d.cancel()
log.Info("Stopping service")
return nil
}
// Status always returns nil.
// TODO(1203): Add service health checks.
func (d *CleanupService) Status() error {
return nil
}
func (d *CleanupService) cleanDB() {
cStateSub := d.chainService.CanonicalStateFeed().Subscribe(d.canonicalStateChan)
defer cStateSub.Unsubscribe()
for {
select {
case <-d.ctx.Done():
log.Debug("Cleanup service context closed, exiting goroutine")
return
case cState := <-d.canonicalStateChan:
if err := d.cleanBlockVoteCache(cState.FinalizedEpoch); err != nil {
log.Errorf("Failed to clean block vote cache: %v", err)
}
}
}
}
func (d *CleanupService) cleanBlockVoteCache(latestFinalizedSlot uint64) error {
var lastCleanedFinalizedSlot uint64
var err error
lastCleanedFinalizedSlot, err = d.beaconDB.CleanedFinalizedSlot()
if err != nil {
return fmt.Errorf("failed to read cleaned finalized slot from DB: %v", err)
}
log.Infof("Finalized slot: latest: %d, last cleaned: %d, %d blocks' vote cache will be cleaned",
latestFinalizedSlot, lastCleanedFinalizedSlot, latestFinalizedSlot-lastCleanedFinalizedSlot)
var blockHashes [][32]byte
for slot := lastCleanedFinalizedSlot + 1; slot <= latestFinalizedSlot; slot++ {
var block *pb.BeaconBlock
block, err = d.beaconDB.BlockBySlot(slot)
if err != nil {
return fmt.Errorf("failed to read block at slot %d: %v", slot, err)
}
if block != nil {
var blockHash [32]byte
blockHash, err = hashutil.HashBeaconBlock(block)
if err != nil {
return fmt.Errorf("failed to get hash of block: %v", err)
}
blockHashes = append(blockHashes, blockHash)
}
}
if err = d.beaconDB.DeleteBlockVoteCache(blockHashes); err != nil {
return fmt.Errorf("failed to delete block vote cache: %v", err)
}
if err = d.beaconDB.SaveCleanedFinalizedSlot(latestFinalizedSlot); err != nil {
return fmt.Errorf("failed to update cleaned finalized slot: %v", err)
}
return nil
}

View File

@@ -1,135 +0,0 @@
package dbcleanup
import (
"context"
"testing"
"time"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/validators"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/internal"
"github.com/prysmaticlabs/prysm/beacon-chain/utils"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/testutil"
logTest "github.com/sirupsen/logrus/hooks/test"
)
type mockChainService struct {
stateFeed *event.Feed
}
func (m *mockChainService) CanonicalStateFeed() *event.Feed {
return m.stateFeed
}
func newMockChainService() *mockChainService {
return &mockChainService{
stateFeed: new(event.Feed),
}
}
func setupInitialDeposits(t *testing.T) []*pb.Deposit {
genesisValidatorRegistry := validators.InitialValidatorRegistry()
deposits := make([]*pb.Deposit, len(genesisValidatorRegistry))
for i := 0; i < len(deposits); i++ {
depositInput := &pb.DepositInput{
Pubkey: genesisValidatorRegistry[i].Pubkey,
}
balance := params.BeaconConfig().MaxDepositAmount
depositData, err := blocks.EncodeDepositData(depositInput, balance, time.Now().Unix())
if err != nil {
t.Fatalf("Cannot encode data: %v", err)
}
deposits[i] = &pb.Deposit{DepositData: depositData}
}
return deposits
}
func createCleanupService(beaconDB *db.BeaconDB) *CleanupService {
chainService := newMockChainService()
cleanupService := NewCleanupService(context.Background(), &Config{
SubscriptionBuf: 100,
BeaconDB: beaconDB,
ChainService: chainService,
})
return cleanupService
}
func TestLifecycle(t *testing.T) {
hook := logTest.NewGlobal()
beaconDB := internal.SetupDB(t)
defer internal.TeardownDB(t, beaconDB)
cleanupService := createCleanupService(beaconDB)
cleanupService.Start()
testutil.AssertLogsContain(t, hook, "Starting service")
if err := cleanupService.Stop(); err != nil {
t.Fatalf("failed to stop cleanup service: %v", err)
}
testutil.AssertLogsContain(t, hook, "Stopping service")
}
func TestCleanBlockVoteCache(t *testing.T) {
beaconDB := internal.SetupDB(t)
defer internal.TeardownDB(t, beaconDB)
var err error
// Pre-fill block vote cache in DB
genesisTime := uint64(time.Now().Unix())
deposits := setupInitialDeposits(t)
if err = beaconDB.InitializeState(genesisTime, deposits); err != nil {
t.Fatalf("failed to initialize DB: %v", err)
}
oldBlock := &pb.BeaconBlock{Slot: 1}
oldBlockHash, _ := hashutil.HashBeaconBlock(oldBlock)
if err = beaconDB.SaveBlock(oldBlock); err != nil {
t.Fatalf("failed to write block int DB: %v", err)
}
oldState := &pb.BeaconState{}
if err = beaconDB.SaveState(oldState); err != nil {
t.Fatalf("failed to pre-fill DB: %v", err)
}
if err := beaconDB.UpdateChainHead(oldBlock, oldState); err != nil {
t.Fatalf("failed to update chain head: %v", err)
}
oldBlockVoteCache := utils.NewBlockVoteCache()
oldBlockVoteCache[oldBlockHash] = utils.NewBlockVote()
if err = beaconDB.WriteBlockVoteCache(oldBlockVoteCache); err != nil {
t.Fatalf("failed to write block vote cache into DB: %v", err)
}
// Verify block vote cache is not cleaned before running the cleanup service
blockHashes := [][32]byte{oldBlockHash}
var blockVoteCache utils.BlockVoteCache
if blockVoteCache, err = beaconDB.ReadBlockVoteCache(blockHashes); err != nil {
t.Fatalf("failed to read block vote cache from DB: %v", err)
}
if len(blockVoteCache) != 1 {
t.Fatalf("failed to reach pre-filled block vote cache status")
}
// Now let the cleanup service do its job
cleanupService := createCleanupService(beaconDB)
state := &pb.BeaconState{FinalizedEpoch: 1}
if err = cleanupService.cleanBlockVoteCache(state.FinalizedEpoch); err != nil {
t.Fatalf("failed to clean block vote cache")
}
// Check the block vote cache has been cleaned up
if blockVoteCache, err = beaconDB.ReadBlockVoteCache(blockHashes); err != nil {
t.Errorf("failed to read block vote cache from DB: %v", err)
}
if len(blockVoteCache) != 0 {
t.Error("block vote cache is expected to be cleaned up")
}
}

View File

@@ -11,7 +11,6 @@ go_library(
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/dbcleanup:go_default_library",
"//beacon-chain/operations:go_default_library",
"//beacon-chain/powchain:go_default_library",
"//beacon-chain/rpc:go_default_library",

View File

@@ -15,7 +15,6 @@ import (
gethRPC "github.com/ethereum/go-ethereum/rpc"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/dbcleanup"
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
"github.com/prysmaticlabs/prysm/beacon-chain/powchain"
"github.com/prysmaticlabs/prysm/beacon-chain/rpc"
@@ -78,10 +77,6 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) {
return nil, err
}
if err := beacon.registerDBCleanService(ctx); err != nil {
return nil, err
}
if err := beacon.registerOperationService(); err != nil {
return nil, err
}
@@ -193,25 +188,6 @@ func (b *BeaconNode) registerBlockchainService(ctx *cli.Context) error {
return b.services.RegisterService(blockchainService)
}
func (b *BeaconNode) registerDBCleanService(ctx *cli.Context) error {
if !ctx.GlobalBool(utils.EnableDBCleanup.Name) {
return nil
}
var chainService *blockchain.ChainService
if err := b.services.FetchService(&chainService); err != nil {
return err
}
dbCleanService := dbcleanup.NewCleanupService(context.TODO(), &dbcleanup.Config{
SubscriptionBuf: 100,
BeaconDB: b.db,
ChainService: chainService,
})
return b.services.RegisterService(dbCleanService)
}
func (b *BeaconNode) registerOperationService() error {
operationService := operations.NewOperationService(context.TODO(), &operations.Config{
BeaconDB: b.db,

View File

@@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"block_vote_cache.go",
"clock.go",
"flags.go",
"shuffle.go",
@@ -21,7 +20,6 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"block_vote_cache_test.go",
"clock_test.go",
"shuffle_test.go",
],

View File

@@ -1,49 +0,0 @@
package utils
import (
"bytes"
"encoding/gob"
)
// BlockVote is for tracking which validators voted for a certain block hash
// and total deposit supported for such block hash.
type BlockVote struct {
VoterIndices []uint32
VoteTotalDeposit uint32
}
// BlockVoteCache is a map from hash to BlockVote object.
type BlockVoteCache map[[32]byte]*BlockVote
// NewBlockVote generates a fresh new BlockVote.
func NewBlockVote() *BlockVote {
return &BlockVote{VoterIndices: []uint32{}, VoteTotalDeposit: 0}
}
// Marshal serializes a BlockVote.
func (v *BlockVote) Marshal() ([]byte, error) {
var buf bytes.Buffer
encoder := gob.NewEncoder(&buf)
if err := encoder.Encode(v); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Unmarshal deserializes a BlockVote.
func (v *BlockVote) Unmarshal(blob []byte) error {
buf := bytes.NewBuffer(blob)
decoder := gob.NewDecoder(buf)
return decoder.Decode(v)
}
// NewBlockVoteCache creates a new BlockVoteCache.
func NewBlockVoteCache() BlockVoteCache {
return make(BlockVoteCache)
}
// IsVoteCacheExist looks up a BlockVote with a hash.
func (blockVoteCache BlockVoteCache) IsVoteCacheExist(blockHash [32]byte) bool {
_, ok := blockVoteCache[blockHash]
return ok
}

View File

@@ -1,26 +0,0 @@
package utils
import (
"reflect"
"testing"
)
func TestBlockVoteMarshalUnmarshall(t *testing.T) {
v1 := NewBlockVote()
v1.VoterIndices = []uint32{1, 2, 3}
v1.VoteTotalDeposit = 10
blob, err := v1.Marshal()
if err != nil {
t.Fatalf("fail to serialize block vote: %v", err)
}
v2 := new(BlockVote)
if err = v2.Unmarshal(blob); err != nil {
t.Fatalf("fail to deserialize block vote: %v", err)
}
if !reflect.DeepEqual(v1, v2) {
t.Error("block vote cache serialization and deserialization don't match")
}
}