Add DB To P2P Service (#8676)

* add files

* goimports
This commit is contained in:
Nishant Das
2021-03-26 22:51:58 +08:00
committed by GitHub
parent 961a012502
commit 7f0c92504f
7 changed files with 114 additions and 22 deletions

View File

@@ -304,7 +304,7 @@ func (s *Store) HighestSlotStatesBelow(ctx context.Context, slot types.Slot) ([]
return []iface.ReadOnlyBeaconState{st}, nil
}
// createBlockIndicesFromBlock takes in a beacon block and returns
// createStateIndicesFromStateSlot takes in a state slot and returns
// a map of bolt DB index buckets corresponding to each particular key for indices for
// data, such as (shard indices bucket -> shard 5).
func createStateIndicesFromStateSlot(ctx context.Context, slot types.Slot) map[string][]byte {

View File

@@ -430,6 +430,7 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
EnableUPnP: cliCtx.Bool(cmd.EnableUPnPFlag.Name),
DisableDiscv5: cliCtx.Bool(flags.DisableDiscv5.Name),
StateNotifier: b,
DB: b.db,
})
if err != nil {
return err

View File

@@ -43,6 +43,7 @@ go_library(
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",
@@ -106,6 +107,7 @@ go_test(
"dial_relay_node_test.go",
"discovery_test.go",
"fork_test.go",
"gossip_scoring_params_test.go",
"gossip_topic_mappings_test.go",
"options_test.go",
"parameter_test.go",
@@ -125,6 +127,7 @@ go_test(
"//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library",

View File

@@ -2,6 +2,7 @@ package p2p
import (
statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/beacon-chain/db"
)
// Config for the p2p service. These parameters are set from application level flags
@@ -26,4 +27,5 @@ type Config struct {
AllowListCIDR string
DenyListCIDR []string
StateNotifier statefeed.Notifier
DB db.ReadOnlyDatabase
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/params"
)
@@ -54,19 +55,45 @@ func peerScoringParams() (*pubsub.PeerScoreParams, *pubsub.PeerScoreThresholds)
return scoreParams, thresholds
}
func topicScoreParams(topic string) *pubsub.TopicScoreParams {
func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, error) {
activeValidators, err := s.retrieveActiveValidators()
if err != nil {
return nil, err
}
switch {
case strings.Contains(topic, "beacon_block"):
return defaultBlockTopicParams()
return defaultBlockTopicParams(), nil
case strings.Contains(topic, "beacon_aggregate_and_proof"):
return defaultAggregateTopicParams()
return defaultAggregateTopicParams(activeValidators), nil
case strings.Contains(topic, "beacon_attestation"):
return defaultAggregateSubnetTopicParams()
return defaultAggregateSubnetTopicParams(activeValidators), nil
default:
return nil
return nil, nil
}
}
func (s *Service) retrieveActiveValidators() (uint64, error) {
rt := s.cfg.DB.LastArchivedRoot(s.ctx)
if rt == params.BeaconConfig().ZeroHash {
genState, err := s.cfg.DB.GenesisState(s.ctx)
if err != nil {
return 0, err
}
if genState == nil {
return 0, errors.New("no genesis state exists")
}
return helpers.ActiveValidatorCount(genState, helpers.CurrentEpoch(genState))
}
bState, err := s.cfg.DB.State(s.ctx, rt)
if err != nil {
return 0, err
}
if bState == nil {
return 0, errors.Errorf("no state with root %#x exists", rt)
}
return helpers.ActiveValidatorCount(bState, helpers.CurrentEpoch(bState))
}
// Based on Ben's tested parameters for lighthouse.
// https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c
@@ -94,8 +121,8 @@ func defaultBlockTopicParams() *pubsub.TopicScoreParams {
}
}
func defaultAggregateTopicParams() *pubsub.TopicScoreParams {
aggPerEpoch := aggregatorsPerSlot() * uint64(params.BeaconConfig().SlotsPerEpoch)
func defaultAggregateTopicParams(activeValidators uint64) *pubsub.TopicScoreParams {
aggPerEpoch := aggregatorsPerSlot(activeValidators) * uint64(params.BeaconConfig().SlotsPerEpoch)
return &pubsub.TopicScoreParams{
TopicWeight: aggregateWeight,
TimeInMeshWeight: 0.0324,
@@ -117,12 +144,12 @@ func defaultAggregateTopicParams() *pubsub.TopicScoreParams {
}
}
func defaultAggregateSubnetTopicParams() *pubsub.TopicScoreParams {
func defaultAggregateSubnetTopicParams(activeValidators uint64) *pubsub.TopicScoreParams {
topicWeight := attestationTotalWeight / float64(params.BeaconNetworkConfig().AttestationSubnetCount)
subnetWeight := activeValidators() / params.BeaconNetworkConfig().AttestationSubnetCount
subnetWeight := activeValidators / params.BeaconNetworkConfig().AttestationSubnetCount
minimumWeight := subnetWeight / 50
numPerSlot := time.Duration(subnetWeight / uint64(params.BeaconConfig().SlotsPerEpoch))
comsPerSlot := committeeCountPerSlot()
comsPerSlot := committeeCountPerSlot(activeValidators)
exceedsThreshold := comsPerSlot >= 2*params.BeaconNetworkConfig().AttestationSubnetCount/uint64(params.BeaconConfig().SlotsPerEpoch)
firstDecay := time.Duration(1)
meshDecay := time.Duration(4)
@@ -164,22 +191,16 @@ func scoreDecay(totalDurationDecay time.Duration) float64 {
return math.Pow(decayToZero, 1/float64(numOfTimes))
}
// Default to the min-genesis for the current moment, as p2p service
// has no access to the chain service.
func activeValidators() uint64 {
return params.BeaconConfig().MinGenesisActiveValidatorCount
}
func committeeCountPerSlot() uint64 {
func committeeCountPerSlot(activeValidators uint64) uint64 {
// Use a static parameter for now rather than a dynamic one, we can use
// the actual parameter later when we have figured out how to fix a circular
// dependency in service startup order.
return helpers.SlotCommitteeCount(activeValidators())
return helpers.SlotCommitteeCount(activeValidators)
}
// Uses a very rough gauge for total aggregator size per slot.
func aggregatorsPerSlot() uint64 {
comms := committeeCountPerSlot()
func aggregatorsPerSlot(activeValidators uint64) uint64 {
comms := committeeCountPerSlot(activeValidators)
totalAggs := comms * params.BeaconConfig().TargetAggregatorsPerCommittee
return totalAggs
}

View File

@@ -0,0 +1,62 @@
package p2p
import (
"context"
"testing"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
dbutil "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestCorrect_ActiveValidatorsCount(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.MainnetConfig()
cfg.ConfigName = "test"
params.OverrideBeaconConfig(cfg)
db := dbutil.SetupDB(t)
s := &Service{
ctx: context.Background(),
cfg: &Config{DB: db},
}
bState, err := testutil.NewBeaconState(func(state *pb.BeaconState) error {
validators := make([]*ethpb.Validator, params.BeaconConfig().MinGenesisActiveValidatorCount)
for i := 0; i < len(validators); i++ {
validators[i] = &ethpb.Validator{
PublicKey: make([]byte, 48),
WithdrawalCredentials: make([]byte, 32),
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
Slashed: false,
}
}
state.Validators = validators
return nil
})
require.NoError(t, err)
require.NoError(t, db.SaveGenesisData(s.ctx, bState))
vals, err := s.retrieveActiveValidators()
assert.NoError(t, err, "genesis state not retrieved")
assert.Equal(t, int(params.BeaconConfig().MinGenesisActiveValidatorCount), int(vals), "mainnet genesis active count isn't accurate")
for i := 0; i < 100; i++ {
require.NoError(t, bState.AppendValidator(&ethpb.Validator{
PublicKey: make([]byte, 48),
WithdrawalCredentials: make([]byte, 32),
ExitEpoch: params.BeaconConfig().FarFutureEpoch,
Slashed: false,
}))
}
require.NoError(t, bState.SetSlot(10000))
require.NoError(t, db.SaveState(s.ctx, bState, [32]byte{'a'}))
// Retrieve last archived state.
vals, err = s.retrieveActiveValidators()
assert.NoError(t, err, "genesis state not retrieved")
assert.Equal(t, int(params.BeaconConfig().MinGenesisActiveValidatorCount)+100, int(vals), "mainnet genesis active count isn't accurate")
}

View File

@@ -76,7 +76,10 @@ func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub
return nil, err
}
if featureconfig.Get().EnablePeerScorer {
scoringParams := topicScoreParams(topic)
scoringParams, err := s.topicScoreParams(topic)
if err != nil {
return nil, err
}
if scoringParams != nil {
if err := topicHandle.SetScoreParams(scoringParams); err != nil {
return nil, err