Compare commits

...

3 Commits

Author SHA1 Message Date
Nishant Das
6f0eb4750e Change Default Timeout To 30 Seconds (#11487) 2022-09-22 22:59:36 +08:00
Nishant Das
653ea3b030 Fix Gossipsub Parameter (#11425)
(cherry picked from commit fbe591c363)
2022-09-09 10:47:29 -05:00
Potuz
17e1986063 Prune during on_tick (#11387)
* Prune during on_tick

* add unit test

* fix tests

Co-authored-by: terencechain <terence@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
(cherry picked from commit fc509cc220)
2022-09-09 08:50:57 -05:00
13 changed files with 49 additions and 110 deletions

View File

@@ -1170,6 +1170,7 @@ func TestOnBlock_CanFinalize_WithOnTick(t *testing.T) {
gs, keys := util.DeterministicGenesisState(t, 32)
require.NoError(t, service.saveGenesisData(ctx, gs))
require.NoError(t, fcs.UpdateFinalizedCheckpoint(&forkchoicetypes.Checkpoint{Root: service.originBlockRoot}))
testState := gs.Copy()
for i := types.Slot(1); i <= 4*params.BeaconConfig().SlotsPerEpoch; i++ {

View File

@@ -36,7 +36,6 @@ func New() *ForkChoice {
nodeByRoot: make(map[[fieldparams.RootLength]byte]*Node),
nodeByPayload: make(map[[fieldparams.RootLength]byte]*Node),
slashedIndices: make(map[types.ValidatorIndex]bool),
pruneThreshold: defaultPruneThreshold,
receivedBlocksLastEpoch: [fieldparams.SlotsPerEpoch]types.Slot{},
}

View File

@@ -11,15 +11,6 @@ import (
v1 "github.com/prysmaticlabs/prysm/v3/proto/eth/v1"
)
// depth returns the length of the path to the root of Fork Choice
func (n *Node) depth() uint64 {
ret := uint64(0)
for node := n.parent; node != nil; node = node.parent {
ret += 1
}
return ret
}
// applyWeightChanges recomputes the weight of the node passed as an argument and all of its descendants,
// using the current balance stored in each node. This function requires a lock
// in Store.nodesLock

View File

@@ -140,25 +140,6 @@ func TestNode_UpdateBestDescendant_LowerWeightChild(t *testing.T) {
assert.Equal(t, s.treeRootNode.children[0], s.treeRootNode.bestDescendant)
}
func TestNode_TestDepth(t *testing.T) {
f := setup(1, 1)
ctx := context.Background()
// Input child is best descendant
state, blkRoot, err := prepareForkchoiceState(ctx, 1, indexToHash(1), params.BeaconConfig().ZeroHash, params.BeaconConfig().ZeroHash, 1, 1)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, state, blkRoot))
state, blkRoot, err = prepareForkchoiceState(ctx, 2, indexToHash(2), indexToHash(1), params.BeaconConfig().ZeroHash, 1, 1)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, state, blkRoot))
state, blkRoot, err = prepareForkchoiceState(ctx, 3, indexToHash(3), params.BeaconConfig().ZeroHash, params.BeaconConfig().ZeroHash, 1, 1)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, state, blkRoot))
s := f.store
require.Equal(t, s.nodeByRoot[indexToHash(2)].depth(), uint64(2))
require.Equal(t, s.nodeByRoot[indexToHash(3)].depth(), uint64(1))
}
func TestNode_ViableForHead(t *testing.T) {
tests := []struct {
n *Node

View File

@@ -70,5 +70,5 @@ func (f *ForkChoice) NewSlot(ctx context.Context, slot types.Slot) error {
if !features.Get().DisablePullTips {
f.updateUnrealizedCheckpoints()
}
return nil
return f.store.prune(ctx)
}

View File

@@ -12,10 +12,6 @@ import (
"go.opencensus.io/trace"
)
// This defines the minimal number of block nodes that can be in the tree
// before getting pruned upon new finalization.
const defaultPruneThreshold = 256
// applyProposerBoostScore applies the current proposer boost scores to the
// relevant nodes. This function requires a lock in Store.nodesLock.
func (s *Store) applyProposerBoostScore(newBalances []uint64) error {
@@ -59,11 +55,6 @@ func (s *Store) proposerBoost() [fieldparams.RootLength]byte {
return s.proposerBoostRoot
}
// PruneThreshold of fork choice store.
func (s *Store) PruneThreshold() uint64 {
return s.pruneThreshold
}
// head starts from justified root and then follows the best descendant links
// to find the best block for head. This function assumes a lock on s.nodesLock
func (s *Store) head(ctx context.Context) ([32]byte, error) {
@@ -215,8 +206,7 @@ func (s *Store) pruneFinalizedNodeByRootMap(ctx context.Context, node, finalized
return nil
}
// prune prunes the fork choice store with the new finalized root. The store is only pruned if the input
// root is different than the current store finalized root, and the number of the store has met prune threshold.
// prune prunes the fork choice store. It removes all nodes that compete with the finalized root.
// This function does not prune for invalid optimistically synced nodes, it deals only with pruning upon finalization
func (s *Store) prune(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "doublyLinkedForkchoice.Prune")
@@ -232,10 +222,8 @@ func (s *Store) prune(ctx context.Context) error {
if !ok || finalizedNode == nil {
return errUnknownFinalizedRoot
}
// The number of the nodes has not met the prune threshold.
// Pruning at small numbers incurs more cost than benefit.
if finalizedNode.depth() < s.pruneThreshold {
// return early if we haven't changed the finalized checkpoint
if finalizedNode.parent == nil {
return nil
}

View File

@@ -12,15 +12,6 @@ import (
"github.com/prysmaticlabs/prysm/v3/testing/require"
)
func TestStore_PruneThreshold(t *testing.T) {
s := &Store{
pruneThreshold: defaultPruneThreshold,
}
if got := s.PruneThreshold(); got != defaultPruneThreshold {
t.Errorf("PruneThreshold() = %v, want %v", got, defaultPruneThreshold)
}
}
func TestStore_JustifiedEpoch(t *testing.T) {
j := types.Epoch(100)
f := setup(j, j)
@@ -154,30 +145,6 @@ func TestStore_Insert(t *testing.T) {
assert.Equal(t, indexToHash(100), child.root, "Incorrect root")
}
func TestStore_Prune_LessThanThreshold(t *testing.T) {
// Define 100 nodes in store.
numOfNodes := uint64(100)
f := setup(0, 0)
ctx := context.Background()
state, blkRoot, err := prepareForkchoiceState(ctx, 1, indexToHash(1), params.BeaconConfig().ZeroHash, params.BeaconConfig().ZeroHash, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, state, blkRoot))
for i := uint64(2); i < numOfNodes; i++ {
state, blkRoot, err = prepareForkchoiceState(ctx, types.Slot(i), indexToHash(i), indexToHash(i-1), params.BeaconConfig().ZeroHash, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, state, blkRoot))
}
s := f.store
s.pruneThreshold = 100
// Finalized root has depth 99 so everything before it should be pruned,
// but PruneThreshold is at 100 so nothing will be pruned.
s.finalizedCheckpoint.Root = indexToHash(99)
require.NoError(t, s.prune(context.Background()))
assert.Equal(t, 100, len(s.nodeByRoot), "Incorrect nodes count")
}
func TestStore_Prune_MoreThanThreshold(t *testing.T) {
// Define 100 nodes in store.
numOfNodes := uint64(100)
@@ -193,7 +160,6 @@ func TestStore_Prune_MoreThanThreshold(t *testing.T) {
}
s := f.store
s.pruneThreshold = 0
// Finalized root is at index 99 so everything before 99 should be pruned.
s.finalizedCheckpoint.Root = indexToHash(99)
@@ -216,7 +182,6 @@ func TestStore_Prune_MoreThanOnce(t *testing.T) {
}
s := f.store
s.pruneThreshold = 0
// Finalized root is at index 11 so everything before 11 should be pruned.
s.finalizedCheckpoint.Root = indexToHash(10)
@@ -229,6 +194,25 @@ func TestStore_Prune_MoreThanOnce(t *testing.T) {
assert.Equal(t, 80, len(s.nodeByRoot), "Incorrect nodes count")
}
func TestStore_Prune_ReturnEarly(t *testing.T) {
// Define 100 nodes in store.
numOfNodes := uint64(100)
f := setup(0, 0)
ctx := context.Background()
state, blkRoot, err := prepareForkchoiceState(ctx, 1, indexToHash(1), params.BeaconConfig().ZeroHash, params.BeaconConfig().ZeroHash, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, state, blkRoot))
for i := uint64(2); i < numOfNodes; i++ {
state, blkRoot, err = prepareForkchoiceState(ctx, types.Slot(i), indexToHash(i), indexToHash(i-1), params.BeaconConfig().ZeroHash, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, state, blkRoot))
}
require.NoError(t, f.store.prune(ctx))
nodeCount := f.NodeCount()
require.NoError(t, f.store.prune(ctx))
require.Equal(t, nodeCount, f.NodeCount())
}
// This unit tests starts with a simple branch like this
//
// - 1
@@ -245,7 +229,6 @@ func TestStore_Prune_NoDanglingBranch(t *testing.T) {
state, blkRoot, err = prepareForkchoiceState(ctx, 2, indexToHash(2), params.BeaconConfig().ZeroHash, params.BeaconConfig().ZeroHash, 0, 0)
require.NoError(t, err)
require.NoError(t, f.InsertNode(ctx, state, blkRoot))
f.store.pruneThreshold = 0
s := f.store
s.finalizedCheckpoint.Root = indexToHash(1)
@@ -330,7 +313,6 @@ func TestStore_PruneMapsNodes(t *testing.T) {
require.NoError(t, f.InsertNode(ctx, state, blkRoot))
s := f.store
s.pruneThreshold = 0
s.finalizedCheckpoint.Root = indexToHash(1)
require.NoError(t, s.prune(context.Background()))
require.Equal(t, len(s.nodeByRoot), 1)

View File

@@ -24,7 +24,6 @@ type Store struct {
unrealizedFinalizedCheckpoint *forkchoicetypes.Checkpoint // best unrealized finalized checkpoint in store.
prevJustifiedCheckpoint *forkchoicetypes.Checkpoint // previous justified checkpoint in store.
finalizedCheckpoint *forkchoicetypes.Checkpoint // latest finalized epoch in store.
pruneThreshold uint64 // do not prune tree unless threshold is reached.
proposerBoostRoot [fieldparams.RootLength]byte // latest block root that was boosted after being received in a timely manner.
previousProposerBoostRoot [fieldparams.RootLength]byte // previous block root that was boosted after being received in a timely manner.
previousProposerBoostScore uint64 // previous proposer boosted root score.

View File

@@ -261,14 +261,6 @@ func TestVotes_CanFindHead(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, indexToHash(10), r, "Incorrect head for with justified epoch at 3")
// Verify pruning below the prune threshold does not affect head.
f.store.pruneThreshold = 1000
prevRoot := f.store.finalizedCheckpoint.Root
f.store.finalizedCheckpoint.Root = indexToHash(5)
require.NoError(t, f.store.prune(context.Background()))
assert.Equal(t, 11, len(f.store.nodeByRoot), "Incorrect nodes length after prune")
f.store.finalizedCheckpoint.Root = prevRoot
r, err = f.Head(context.Background(), balances)
require.NoError(t, err)
assert.Equal(t, indexToHash(10), r, "Incorrect head for with justified epoch at 3")
@@ -289,13 +281,11 @@ func TestVotes_CanFindHead(t *testing.T) {
// 8
// / \
// 9 10
f.store.pruneThreshold = 1
f.store.finalizedCheckpoint.Root = indexToHash(5)
require.NoError(t, f.store.prune(context.Background()))
assert.Equal(t, 5, len(f.store.nodeByRoot), "Incorrect nodes length after prune")
// we pruned artificially the justified root.
f.store.justifiedCheckpoint.Root = indexToHash(5)
f.store.finalizedCheckpoint.Root = prevRoot
r, err = f.Head(context.Background(), balances)
require.NoError(t, err)

View File

@@ -44,5 +44,5 @@ func TestHeartbeatParameters(t *testing.T) {
func TestMiscParameters(t *testing.T) {
params.SetupTestConfigCleanup(t)
setPubSubParameters()
assert.Equal(t, randomSubD, pubsub.RandomSubD, "randomSubD")
assert.Equal(t, rSubD, 8, "rSubD")
}

View File

@@ -8,8 +8,10 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
pbrpc "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
)
@@ -32,7 +34,7 @@ const (
gossipSubHeartbeatInterval = 700 * time.Millisecond // frequency of heartbeat, milliseconds
// misc
randomSubD = 6 // random gossip target
rSubD = 8 // random gossip target
)
var errInvalidTopic = errors.New("invalid topic format")
@@ -128,6 +130,25 @@ func (s *Service) peerInspector(peerMap map[peer.ID]*pubsub.PeerScoreSnapshot) {
}
}
// Creates a list of pubsub options to configure out router with.
func (s *Service) pubsubOptions() []pubsub.Option {
psOpts := []pubsub.Option{
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
pubsub.WithNoAuthor(),
pubsub.WithMessageIdFn(func(pmsg *pubsubpb.Message) string {
return MsgID(s.genesisValidatorsRoot, pmsg)
}),
pubsub.WithSubscriptionFilter(s),
pubsub.WithPeerOutboundQueueSize(pubsubQueueSize),
pubsub.WithMaxMessageSize(int(params.BeaconNetworkConfig().GossipMaxSizeBellatrix)),
pubsub.WithValidateQueueSize(pubsubQueueSize),
pubsub.WithPeerScore(peerScoringParams()),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute),
pubsub.WithGossipSubParams(pubsubGossipParam()),
}
return psOpts
}
// creates a custom gossipsub parameter set.
func pubsubGossipParam() pubsub.GossipSubParams {
gParams := pubsub.DefaultGossipSubParams()

View File

@@ -18,7 +18,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsubpb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/p2p/protocol/identify"
"github.com/multiformats/go-multiaddr"
"github.com/pkg/errors"
@@ -139,19 +138,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
// due to libp2p's gossipsub implementation not taking into
// account previously added peers when creating the gossipsub
// object.
psOpts := []pubsub.Option{
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
pubsub.WithNoAuthor(),
pubsub.WithMessageIdFn(func(pmsg *pubsubpb.Message) string {
return MsgID(s.genesisValidatorsRoot, pmsg)
}),
pubsub.WithSubscriptionFilter(s),
pubsub.WithPeerOutboundQueueSize(pubsubQueueSize),
pubsub.WithValidateQueueSize(pubsubQueueSize),
pubsub.WithPeerScore(peerScoringParams()),
pubsub.WithPeerScoreInspect(s.peerInspector, time.Minute),
pubsub.WithGossipSubParams(pubsubGossipParam()),
}
psOpts := s.pubsubOptions()
// Set the pubsub global parameters that we require.
setPubSubParameters()
// Reinitialize them in the event we are running a custom config.

View File

@@ -9,7 +9,7 @@ import (
)
// DefaultRPCHTTPTimeout for HTTP requests via an RPC connection to an execution node.
const DefaultRPCHTTPTimeout = time.Second * 6
const DefaultRPCHTTPTimeout = time.Second * 30
// This creates a custom HTTP transport which we can attach to our HTTP client
// in order to inject JWT auth strings into our HTTP request headers. Authentication