From ce2344301c7f44bfbb0ea59032baf371022643e0 Mon Sep 17 00:00:00 2001 From: kasey <489222+kasey@users.noreply.github.com> Date: Mon, 4 Dec 2023 15:01:39 -0600 Subject: [PATCH] forkchoice.Getter wrapper with locking wrappers (#13244) * forkchoice.Getter wrapper with locking wrappers * comments * lint * only expose fast fc getters * potuz feedback re rlock * update mocks for new fc method * appease deepsource * add missing exported func comment * yeet errors to make the linter happy * even more devious _discard * rm TargetRoot * derp * handle nil error in _discard * deep source --------- Co-authored-by: Kasey Kirkham Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com> --- beacon-chain/forkchoice/BUILD.bazel | 15 +- beacon-chain/forkchoice/interfaces.go | 43 +-- beacon-chain/forkchoice/ro.go | 164 ++++++++++ beacon-chain/forkchoice/ro_test.go | 282 ++++++++++++++++++ .../sync/initial-sync/blocks_fetcher.go | 1 + 5 files changed, 487 insertions(+), 18 deletions(-) create mode 100644 beacon-chain/forkchoice/ro.go create mode 100644 beacon-chain/forkchoice/ro_test.go diff --git a/beacon-chain/forkchoice/BUILD.bazel b/beacon-chain/forkchoice/BUILD.bazel index c6b2fd2fb8..29cc3677ae 100644 --- a/beacon-chain/forkchoice/BUILD.bazel +++ b/beacon-chain/forkchoice/BUILD.bazel @@ -1,4 +1,4 @@ -load("@prysm//tools/go:def.bzl", "go_library") +load("@prysm//tools/go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", @@ -6,6 +6,7 @@ go_library( "doc.go", "error.go", "interfaces.go", + "ro.go", ], importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice", visibility = [ @@ -22,3 +23,15 @@ go_library( "@com_github_pkg_errors//:go_default_library", ], ) + +go_test( + name = "go_default_test", + srcs = ["ro_test.go"], + embed = [":go_default_library"], + deps = [ + "//beacon-chain/forkchoice/types:go_default_library", + "//config/fieldparams:go_default_library", + "//consensus-types/primitives:go_default_library", + "//testing/require:go_default_library", + ], +) diff --git a/beacon-chain/forkchoice/interfaces.go b/beacon-chain/forkchoice/interfaces.go index 062b7706b7..8ce0a80504 100644 --- a/beacon-chain/forkchoice/interfaces.go +++ b/beacon-chain/forkchoice/interfaces.go @@ -16,10 +16,9 @@ type BalancesByRooter func(context.Context, [32]byte) ([]uint64, error) // ForkChoicer represents the full fork choice interface composed of all the sub-interfaces. type ForkChoicer interface { + RLocker // separate interface isolates read locking for ROForkChoice. Lock() Unlock() - RLock() - RUnlock() HeadRetriever // to compute head. BlockProcessor // to track new block for fork choice. AttestationProcessor // to track new attestation for fork choice. @@ -27,6 +26,12 @@ type ForkChoicer interface { Setter // to set fork choice information. } +// RLocker represents forkchoice's internal RWMutex read-only lock/unlock methods. +type RLocker interface { + RLock() + RUnlock() +} + // HeadRetriever retrieves head root and optimistic info of the current chain. type HeadRetriever interface { Head(context.Context) ([32]byte, error) @@ -47,29 +52,33 @@ type AttestationProcessor interface { // Getter returns fork choice related information. type Getter interface { - HasNode([32]byte) bool - ProposerBoost() [fieldparams.RootLength]byte + FastGetter AncestorRoot(ctx context.Context, root [32]byte, slot primitives.Slot) ([32]byte, error) CommonAncestor(ctx context.Context, root1 [32]byte, root2 [32]byte) ([32]byte, primitives.Slot, error) - IsCanonical(root [32]byte) bool - FinalizedCheckpoint() *forkchoicetypes.Checkpoint - IsViableForCheckpoint(*forkchoicetypes.Checkpoint) (bool, error) - FinalizedPayloadBlockHash() [32]byte - JustifiedCheckpoint() *forkchoicetypes.Checkpoint - PreviousJustifiedCheckpoint() *forkchoicetypes.Checkpoint - JustifiedPayloadBlockHash() [32]byte - UnrealizedJustifiedPayloadBlockHash() [32]byte - NodeCount() int - HighestReceivedBlockSlot() primitives.Slot - ReceivedBlocksLastEpoch() (uint64, error) ForkChoiceDump(context.Context) (*forkchoice2.Dump, error) - Weight(root [32]byte) (uint64, error) Tips() ([][32]byte, []primitives.Slot) +} + +type FastGetter interface { + FinalizedCheckpoint() *forkchoicetypes.Checkpoint + FinalizedPayloadBlockHash() [32]byte + HasNode([32]byte) bool + HighestReceivedBlockSlot() primitives.Slot + IsCanonical(root [32]byte) bool IsOptimistic(root [32]byte) (bool, error) + IsViableForCheckpoint(*forkchoicetypes.Checkpoint) (bool, error) + JustifiedCheckpoint() *forkchoicetypes.Checkpoint + JustifiedPayloadBlockHash() [32]byte + LastRoot(primitives.Epoch) [32]byte + NodeCount() int + PreviousJustifiedCheckpoint() *forkchoicetypes.Checkpoint + ProposerBoost() [fieldparams.RootLength]byte + ReceivedBlocksLastEpoch() (uint64, error) ShouldOverrideFCU() bool Slot([32]byte) (primitives.Slot, error) - LastRoot(primitives.Epoch) [32]byte TargetRootForEpoch([32]byte, primitives.Epoch) ([32]byte, error) + UnrealizedJustifiedPayloadBlockHash() [32]byte + Weight(root [32]byte) (uint64, error) } // Setter allows to set forkchoice information diff --git a/beacon-chain/forkchoice/ro.go b/beacon-chain/forkchoice/ro.go new file mode 100644 index 0000000000..995bcd33e9 --- /dev/null +++ b/beacon-chain/forkchoice/ro.go @@ -0,0 +1,164 @@ +package forkchoice + +import ( + forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" +) + +// ROForkChoice is an implementation of forkchoice.Getter which calls `Rlock`/`RUnlock` +// around a delegated method call to the underlying Getter implementation. +type ROForkChoice struct { + getter FastGetter + l RLocker +} + +var _ FastGetter = &ROForkChoice{} + +// ROWrappable represents the subset of ForkChoicer a type needs to support +// in order for ROForkChoice to wrap it. This simplifies the creation of a mock +// type that can be used to assert that all of the wrapped methods are correctly +// called between mutex acquire/release. +type ROWrappable interface { + RLocker + FastGetter +} + +// NewROForkChoice returns an ROForkChoice that delegates forkchoice.Getter calls to the +// given value after first using its Locker methods to make sure it is correctly locked. +func NewROForkChoice(w ROWrappable) *ROForkChoice { + return &ROForkChoice{getter: w, l: w} +} + +// HasNode delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) HasNode(root [32]byte) bool { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.HasNode(root) +} + +// ProposerBoost delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) ProposerBoost() [fieldparams.RootLength]byte { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.ProposerBoost() +} + +// IsCanonical delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) IsCanonical(root [32]byte) bool { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.IsCanonical(root) +} + +// FinalizedCheckpoint delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) FinalizedCheckpoint() *forkchoicetypes.Checkpoint { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.FinalizedCheckpoint() +} + +// IsViableForCheckpoint delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) IsViableForCheckpoint(cp *forkchoicetypes.Checkpoint) (bool, error) { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.IsViableForCheckpoint(cp) +} + +// FinalizedPayloadBlockHash delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) FinalizedPayloadBlockHash() [32]byte { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.FinalizedPayloadBlockHash() +} + +// JustifiedCheckpoint delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) JustifiedCheckpoint() *forkchoicetypes.Checkpoint { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.JustifiedCheckpoint() +} + +// PreviousJustifiedCheckpoint delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) PreviousJustifiedCheckpoint() *forkchoicetypes.Checkpoint { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.PreviousJustifiedCheckpoint() +} + +// JustifiedPayloadBlockHash delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) JustifiedPayloadBlockHash() [32]byte { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.JustifiedPayloadBlockHash() +} + +// UnrealizedJustifiedPayloadBlockHash delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) UnrealizedJustifiedPayloadBlockHash() [32]byte { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.UnrealizedJustifiedPayloadBlockHash() +} + +// NodeCount delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) NodeCount() int { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.NodeCount() +} + +// HighestReceivedBlockSlot delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) HighestReceivedBlockSlot() primitives.Slot { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.HighestReceivedBlockSlot() +} + +// ReceivedBlocksLastEpoch delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) ReceivedBlocksLastEpoch() (uint64, error) { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.ReceivedBlocksLastEpoch() +} + +// Weight delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) Weight(root [32]byte) (uint64, error) { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.Weight(root) +} + +// IsOptimistic delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) IsOptimistic(root [32]byte) (bool, error) { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.IsOptimistic(root) +} + +// ShouldOverrideFCU delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) ShouldOverrideFCU() bool { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.ShouldOverrideFCU() +} + +// Slot delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) Slot(root [32]byte) (primitives.Slot, error) { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.Slot(root) +} + +// LastRoot delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) LastRoot(e primitives.Epoch) [32]byte { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.LastRoot(e) +} + +// TargetRootForEpoch delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) TargetRootForEpoch(root [32]byte, epoch primitives.Epoch) ([32]byte, error) { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.TargetRootForEpoch(root, epoch) +} diff --git a/beacon-chain/forkchoice/ro_test.go b/beacon-chain/forkchoice/ro_test.go new file mode 100644 index 0000000000..5cbbb34ef4 --- /dev/null +++ b/beacon-chain/forkchoice/ro_test.go @@ -0,0 +1,282 @@ +package forkchoice + +import ( + "io" + "testing" + + forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/testing/require" +) + +type mockCall int + +const ( + lockCalled mockCall = iota + unlockCalled + rlockCalled + runlockCalled + hasNodeCalled + proposerBoostCalled + isCanonicalCalled + finalizedCheckpointCalled + isViableForCheckpointCalled + finalizedPayloadBlockHashCalled + justifiedCheckpointCalled + previousJustifiedCheckpointCalled + justifiedPayloadBlockHashCalled + unrealizedJustifiedPayloadBlockHashCalled + nodeCountCalled + highestReceivedBlockSlotCalled + receivedBlocksLastEpochCalled + weightCalled + isOptimisticCalled + shouldOverrideFCUCalled + slotCalled + lastRootCalled + targetRootForEpochCalled +) + +func _discard(t *testing.T, e error) { + if e != nil { + _, err := io.Discard.Write([]byte(e.Error())) + require.NoError(t, err) + } +} + +// ensures that the correct func was called with the correct lock pattern +// for each method in the interface. +func TestROLocking(t *testing.T) { + cases := []struct { + name string + call mockCall + cb func(FastGetter) + }{ + { + name: "hasNodeCalled", + call: hasNodeCalled, + cb: func(g FastGetter) { g.HasNode([32]byte{}) }, + }, + { + name: "proposerBoostCalled", + call: proposerBoostCalled, + cb: func(g FastGetter) { g.ProposerBoost() }, + }, + { + name: "isCanonicalCalled", + call: isCanonicalCalled, + cb: func(g FastGetter) { g.IsCanonical([32]byte{}) }, + }, + { + name: "finalizedCheckpointCalled", + call: finalizedCheckpointCalled, + cb: func(g FastGetter) { g.FinalizedCheckpoint() }, + }, + { + name: "isViableForCheckpointCalled", + call: isViableForCheckpointCalled, + cb: func(g FastGetter) { _, err := g.IsViableForCheckpoint(nil); _discard(t, err) }, + }, + { + name: "finalizedPayloadBlockHashCalled", + call: finalizedPayloadBlockHashCalled, + cb: func(g FastGetter) { g.FinalizedPayloadBlockHash() }, + }, + { + name: "justifiedCheckpointCalled", + call: justifiedCheckpointCalled, + cb: func(g FastGetter) { g.JustifiedCheckpoint() }, + }, + { + name: "previousJustifiedCheckpointCalled", + call: previousJustifiedCheckpointCalled, + cb: func(g FastGetter) { g.PreviousJustifiedCheckpoint() }, + }, + { + name: "justifiedPayloadBlockHashCalled", + call: justifiedPayloadBlockHashCalled, + cb: func(g FastGetter) { g.JustifiedPayloadBlockHash() }, + }, + { + name: "unrealizedJustifiedPayloadBlockHashCalled", + call: unrealizedJustifiedPayloadBlockHashCalled, + cb: func(g FastGetter) { g.UnrealizedJustifiedPayloadBlockHash() }, + }, + { + name: "nodeCountCalled", + call: nodeCountCalled, + cb: func(g FastGetter) { g.NodeCount() }, + }, + { + name: "highestReceivedBlockSlotCalled", + call: highestReceivedBlockSlotCalled, + cb: func(g FastGetter) { g.HighestReceivedBlockSlot() }, + }, + { + name: "receivedBlocksLastEpochCalled", + call: receivedBlocksLastEpochCalled, + cb: func(g FastGetter) { _, err := g.ReceivedBlocksLastEpoch(); _discard(t, err) }, + }, + { + name: "weightCalled", + call: weightCalled, + cb: func(g FastGetter) { _, err := g.Weight([32]byte{}); _discard(t, err) }, + }, + { + name: "isOptimisticCalled", + call: isOptimisticCalled, + cb: func(g FastGetter) { _, err := g.IsOptimistic([32]byte{}); _discard(t, err) }, + }, + { + name: "shouldOverrideFCUCalled", + call: shouldOverrideFCUCalled, + cb: func(g FastGetter) { g.ShouldOverrideFCU() }, + }, + { + name: "slotCalled", + call: slotCalled, + cb: func(g FastGetter) { _, err := g.Slot([32]byte{}); _discard(t, err) }, + }, + { + name: "lastRootCalled", + call: lastRootCalled, + cb: func(g FastGetter) { g.LastRoot(0) }, + }, + { + name: "targetRootForEpochCalled", + call: targetRootForEpochCalled, + cb: func(g FastGetter) { _, err := g.TargetRootForEpoch([32]byte{}, 0); _discard(t, err) }, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + m := &mockROForkchoice{} + ro := NewROForkChoice(m) + c.cb(ro) + require.Equal(t, rlockCalled, m.calls[0]) + require.Equal(t, c.call, m.calls[1]) + require.Equal(t, runlockCalled, m.calls[2]) + }) + } +} + +type mockROForkchoice struct { + calls []mockCall +} + +var _ FastGetter = &mockROForkchoice{} + +var _ RLocker = &mockROForkchoice{} + +func (ro *mockROForkchoice) Lock() { + ro.calls = append(ro.calls, lockCalled) +} + +func (ro *mockROForkchoice) RLock() { + ro.calls = append(ro.calls, rlockCalled) +} + +func (ro *mockROForkchoice) Unlock() { + ro.calls = append(ro.calls, unlockCalled) +} + +func (ro *mockROForkchoice) RUnlock() { + ro.calls = append(ro.calls, runlockCalled) +} + +func (ro *mockROForkchoice) HasNode(_ [32]byte) bool { + ro.calls = append(ro.calls, hasNodeCalled) + return false +} + +func (ro *mockROForkchoice) ProposerBoost() [fieldparams.RootLength]byte { + ro.calls = append(ro.calls, proposerBoostCalled) + return [fieldparams.RootLength]byte{} +} + +func (ro *mockROForkchoice) IsCanonical(_ [32]byte) bool { + ro.calls = append(ro.calls, isCanonicalCalled) + return false +} + +func (ro *mockROForkchoice) FinalizedCheckpoint() *forkchoicetypes.Checkpoint { + ro.calls = append(ro.calls, finalizedCheckpointCalled) + return nil +} + +func (ro *mockROForkchoice) IsViableForCheckpoint(_ *forkchoicetypes.Checkpoint) (bool, error) { + ro.calls = append(ro.calls, isViableForCheckpointCalled) + return false, nil +} + +func (ro *mockROForkchoice) FinalizedPayloadBlockHash() [32]byte { + ro.calls = append(ro.calls, finalizedPayloadBlockHashCalled) + return [32]byte{} +} + +func (ro *mockROForkchoice) JustifiedCheckpoint() *forkchoicetypes.Checkpoint { + ro.calls = append(ro.calls, justifiedCheckpointCalled) + return nil +} + +func (ro *mockROForkchoice) PreviousJustifiedCheckpoint() *forkchoicetypes.Checkpoint { + ro.calls = append(ro.calls, previousJustifiedCheckpointCalled) + return nil +} + +func (ro *mockROForkchoice) JustifiedPayloadBlockHash() [32]byte { + ro.calls = append(ro.calls, justifiedPayloadBlockHashCalled) + return [32]byte{} +} + +func (ro *mockROForkchoice) UnrealizedJustifiedPayloadBlockHash() [32]byte { + ro.calls = append(ro.calls, unrealizedJustifiedPayloadBlockHashCalled) + return [32]byte{} +} + +func (ro *mockROForkchoice) NodeCount() int { + ro.calls = append(ro.calls, nodeCountCalled) + return 0 +} + +func (ro *mockROForkchoice) HighestReceivedBlockSlot() primitives.Slot { + ro.calls = append(ro.calls, highestReceivedBlockSlotCalled) + return 0 +} + +func (ro *mockROForkchoice) ReceivedBlocksLastEpoch() (uint64, error) { + ro.calls = append(ro.calls, receivedBlocksLastEpochCalled) + return 0, nil +} + +func (ro *mockROForkchoice) Weight(_ [32]byte) (uint64, error) { + ro.calls = append(ro.calls, weightCalled) + return 0, nil +} + +func (ro *mockROForkchoice) IsOptimistic(_ [32]byte) (bool, error) { + ro.calls = append(ro.calls, isOptimisticCalled) + return false, nil +} + +func (ro *mockROForkchoice) ShouldOverrideFCU() bool { + ro.calls = append(ro.calls, shouldOverrideFCUCalled) + return false +} + +func (ro *mockROForkchoice) Slot(_ [32]byte) (primitives.Slot, error) { + ro.calls = append(ro.calls, slotCalled) + return 0, nil +} + +func (ro *mockROForkchoice) LastRoot(_ primitives.Epoch) [32]byte { + ro.calls = append(ro.calls, lastRootCalled) + return [32]byte{} +} + +// TargetRootForEpoch implements FastGetter. +func (ro *mockROForkchoice) TargetRootForEpoch(_ [32]byte, _ primitives.Epoch) ([32]byte, error) { + ro.calls = append(ro.calls, targetRootForEpochCalled) + return [32]byte{}, nil +} diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 397bac69ae..a01292aa99 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -33,6 +33,7 @@ import ( ) const ( + // maxPendingRequests limits how many concurrent fetch request one can initiate. maxPendingRequests = 64 // peersPercentagePerRequest caps percentage of peers to be used in a request.