Part 4 of caching improvement - Use Cache (#3625)

This commit is contained in:
terence tsao
2019-09-27 15:56:08 -07:00
committed by GitHub
parent 00e68c6cc7
commit 4c8269aca3
8 changed files with 175 additions and 5 deletions

View File

@@ -27,6 +27,7 @@ go_library(
"//proto/eth/v1alpha1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/event:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/params:go_default_library",
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_pkg_errors//:go_default_library",

View File

@@ -22,6 +22,7 @@ go_library(
"//proto/beacon/p2p/v1:go_default_library",
"//proto/eth/v1alpha1:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",

View File

@@ -14,6 +14,7 @@ import (
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
@@ -117,6 +118,13 @@ func (s *Store) OnBlock(ctx context.Context, b *ethpb.BeaconBlock) error {
if helpers.IsEpochStart(postState.Slot) {
logEpochData(postState)
reportEpochMetrics(postState)
// Update committee shuffled indices at the end of every epoch
if featureconfig.FeatureConfig().EnableNewCache {
if err := helpers.UpdateCommitteeCache(postState); err != nil {
return err
}
}
}
return nil

View File

@@ -16,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain/forkchoice"
"github.com/prysmaticlabs/prysm/beacon-chain/cache/depositcache"
"github.com/prysmaticlabs/prysm/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
"github.com/prysmaticlabs/prysm/beacon-chain/operations"
@@ -25,6 +26,7 @@ import (
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/event"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
@@ -172,6 +174,13 @@ func (s *Service) initializeBeaconChain(
return errors.Wrap(err, "could not save genesis data")
}
// Update committee shuffled indices for genesis epoch.
if featureconfig.FeatureConfig().EnableNewCache {
if err := helpers.UpdateCommitteeCache(genesisState); err != nil {
return err
}
}
return nil
}

View File

@@ -18,8 +18,8 @@ var (
ErrNotCommittee = errors.New("object is not a committee struct")
// maxShuffledIndicesSize defines the max number of shuffled indices list can cache.
// 2 for current epoch and next epoch.
maxShuffledIndicesSize = 2
// 3 for previous, current epoch and next epoch.
maxShuffledIndicesSize = 3
// CommitteeCacheMiss tracks the number of committee requests that aren't present in the cache.
CommitteeCacheMiss = promauto.NewCounter(prometheus.CounterOpts{
@@ -105,6 +105,9 @@ func (c *CommitteeCache) AddCommitteeShuffledList(committee *Committee) error {
// Epochs returns the epochs stored in the committee cache. These are the keys to the cache.
func (c *CommitteeCache) Epochs() ([]uint64, error) {
c.lock.RLock()
defer c.lock.RUnlock()
epochs := make([]uint64, len(c.CommitteeCache.ListKeys()))
for i, s := range c.CommitteeCache.ListKeys() {
epoch, err := strconv.Atoi(s)
@@ -118,6 +121,9 @@ func (c *CommitteeCache) Epochs() ([]uint64, error) {
// EpochInCache returns true if an input epoch is part of keys in cache.
func (c *CommitteeCache) EpochInCache(wantedEpoch uint64) (bool, error) {
c.lock.RLock()
defer c.lock.RUnlock()
for _, s := range c.CommitteeCache.ListKeys() {
epoch, err := strconv.Atoi(s)
if err != nil {
@@ -130,6 +136,54 @@ func (c *CommitteeCache) EpochInCache(wantedEpoch uint64) (bool, error) {
return false, nil
}
// CommitteeCount returns the total number of committees in a given epoch as stored in cache.
func (c *CommitteeCache) CommitteeCount(epoch uint64) (uint64, bool, error) {
c.lock.RLock()
defer c.lock.RUnlock()
obj, exists, err := c.CommitteeCache.GetByKey(strconv.Itoa(int(epoch)))
if err != nil {
return 0, false, err
}
if exists {
CommitteeCacheHit.Inc()
} else {
CommitteeCacheMiss.Inc()
return 0, false, nil
}
item, ok := obj.(*Committee)
if !ok {
return 0, false, ErrNotCommittee
}
return item.CommitteeCount, true, nil
}
// StartShard returns the start shard number in a given epoch as stored in cache.
func (c *CommitteeCache) StartShard(epoch uint64) (uint64, bool, error) {
c.lock.RLock()
defer c.lock.RUnlock()
obj, exists, err := c.CommitteeCache.GetByKey(strconv.Itoa(int(epoch)))
if err != nil {
return 0, false, err
}
if exists {
CommitteeCacheHit.Inc()
} else {
CommitteeCacheMiss.Inc()
return 0, false, nil
}
item, ok := obj.(*Committee)
if !ok {
return 0, false, ErrNotCommittee
}
return item.StartShard, true, nil
}
func startEndIndices(c *Committee, wantedShard uint64) (uint64, uint64) {
shardCount := params.BeaconConfig().ShardCount
currentShard := (wantedShard + shardCount - c.StartShard) % shardCount

View File

@@ -94,7 +94,7 @@ func TestCommitteeCache_CanRotate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
wanted = item2.Epoch + item3.Epoch
wanted = item1.Epoch + item2.Epoch + item3.Epoch
if sum(epochs) != wanted {
t.Errorf("Wanted: %v, got: %v", wanted, sum(epochs))
}
@@ -107,7 +107,7 @@ func TestCommitteeCache_CanRotate(t *testing.T) {
if err != nil {
t.Fatal(err)
}
wanted = item3.Epoch + item4.Epoch
wanted = item2.Epoch + item3.Epoch + item4.Epoch
if sum(epochs) != wanted {
t.Errorf("Wanted: %v, got: %v", wanted, sum(epochs))
}
@@ -124,6 +124,9 @@ func TestCommitteeCache_EpochInCache(t *testing.T) {
if err := cache.AddCommitteeShuffledList(&Committee{Epoch: 99}); err != nil {
t.Fatal(err)
}
if err := cache.AddCommitteeShuffledList(&Committee{Epoch: 100}); err != nil {
t.Fatal(err)
}
inCache, err := cache.EpochInCache(1)
if err != nil {
t.Fatal(err)
@@ -131,7 +134,7 @@ func TestCommitteeCache_EpochInCache(t *testing.T) {
if inCache {
t.Error("Epoch shouldn't be in cache")
}
inCache, err = cache.EpochInCache(99)
inCache, err = cache.EpochInCache(100)
if err != nil {
t.Fatal(err)
}
@@ -140,6 +143,68 @@ func TestCommitteeCache_EpochInCache(t *testing.T) {
}
}
func TestCommitteeCache_CommitteesCount(t *testing.T) {
cache := NewCommitteeCache()
committeeCount := uint64(3)
epoch := uint64(10)
item := &Committee{Epoch: epoch, CommitteeCount: committeeCount}
_, exists, err := cache.CommitteeCount(1)
if err != nil {
t.Fatal(err)
}
if exists {
t.Error("Expected committee count not to exist in empty cache")
}
if err := cache.AddCommitteeShuffledList(item); err != nil {
t.Fatal(err)
}
count, exists, err := cache.CommitteeCount(epoch)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Error("Expected committee count to be in cache")
}
if count != committeeCount {
t.Errorf("wanted: %d, got: %d", committeeCount, count)
}
}
func TestCommitteeCache_ShardCount(t *testing.T) {
cache := NewCommitteeCache()
startShard := uint64(7)
epoch := uint64(3)
item := &Committee{Epoch: epoch, StartShard: startShard}
_, exists, err := cache.StartShard(1)
if err != nil {
t.Fatal(err)
}
if exists {
t.Error("Expected start shard not to exist in empty cache")
}
if err := cache.AddCommitteeShuffledList(item); err != nil {
t.Fatal(err)
}
shard, exists, err := cache.StartShard(epoch)
if err != nil {
t.Fatal(err)
}
if !exists {
t.Error("Expected start shard to be in cache")
}
if shard != startShard {
t.Errorf("wanted: %d, got: %d", startShard, shard)
}
}
func sum(values []uint64) uint64 {
sum := uint64(0)
for _, v := range values {

View File

@@ -25,6 +25,7 @@ go_library(
"//proto/eth/v1alpha1:go_default_library",
"//shared/bls:go_default_library",
"//shared/bytesutil:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"//shared/sliceutil:go_default_library",

View File

@@ -10,6 +10,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/cache"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
ethpb "github.com/prysmaticlabs/prysm/proto/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/featureconfig"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/sliceutil"
"google.golang.org/grpc/codes"
@@ -32,6 +33,16 @@ var committeeCache = cache.NewCommitteeCache()
// ))
// return committees_per_slot * SLOTS_PER_EPOCH
func CommitteeCount(state *pb.BeaconState, epoch uint64) (uint64, error) {
if featureconfig.FeatureConfig().EnableNewCache {
count, exists, err := committeeCache.CommitteeCount(epoch)
if err != nil {
return 0, errors.Wrap(err, "could not interface with committee cache")
}
if exists {
return count, nil
}
}
minCommitteePerSlot := uint64(1)
// Max committee count per slot will be 0 when shard count is less than epoch length, this
// covers the special case to ensure there's always 1 max committee count per slot.
@@ -69,6 +80,16 @@ func CommitteeCount(state *pb.BeaconState, epoch uint64) (uint64, error) {
// count=get_committee_count(state, epoch),
// )
func CrosslinkCommittee(state *pb.BeaconState, epoch uint64, shard uint64) ([]uint64, error) {
if featureconfig.FeatureConfig().EnableNewCache {
indices, err := committeeCache.ShuffledIndices(epoch, shard)
if err != nil {
return nil, errors.Wrap(err, "could not interface with committee cache")
}
if indices != nil {
return indices, nil
}
}
seed, err := Seed(state, epoch)
if err != nil {
return nil, errors.Wrap(err, "could not get seed")
@@ -317,6 +338,16 @@ func shardDeltaFromCommitteeCount(committeeCount uint64) uint64 {
// shard = Shard((shard + SHARD_COUNT - get_shard_delta(state, check_epoch)) % SHARD_COUNT)
// return shard
func StartShard(state *pb.BeaconState, epoch uint64) (uint64, error) {
if featureconfig.FeatureConfig().EnableNewCache {
startShard, exists, err := committeeCache.StartShard(epoch)
if err != nil {
return 0, errors.Wrap(err, "could not interface with committee cache")
}
if exists {
return startShard, nil
}
}
currentEpoch := CurrentEpoch(state)
checkEpoch := currentEpoch + 1