From ec524ce99c56b37e237d6187942a59d71c8a4360 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Mon, 3 Nov 2025 15:48:41 +0100 Subject: [PATCH] `RODataColumnsVerifier.ValidProposerSignature`: Ensure the expensive signature verification is only performed once for concurrent requests for the same signature data. (#15954) * `signatureData`: Add `string` function. * `RODataColumnsVerifier.ValidProposerSignature`: Ensure the expensive signature verification is only performed once for concurrent requests for the same signature data. Share flight group * `parentState` ==> `state`. * `RODataColumnsVerifier.SidecarProposerExpected: Ensure the expensive index computation is only performed once for concurrent requests.` * Add `wrapAttestationError` * Fix Kasey's comment. * Fix Terence's comment. --- .../sync/validate_beacon_attestation.go | 38 ++++++++--- beacon-chain/verification/BUILD.bazel | 1 + beacon-chain/verification/cache.go | 4 ++ beacon-chain/verification/cache_test.go | 12 ++++ beacon-chain/verification/data_column.go | 64 ++++++++++++------- beacon-chain/verification/initializer.go | 2 + changelog/manu-singleflight.md | 2 + 7 files changed, 91 insertions(+), 32 deletions(-) create mode 100644 changelog/manu-singleflight.md diff --git a/beacon-chain/sync/validate_beacon_attestation.go b/beacon-chain/sync/validate_beacon_attestation.go index 0d6237d81d..aee654cf0f 100644 --- a/beacon-chain/sync/validate_beacon_attestation.go +++ b/beacon-chain/sync/validate_beacon_attestation.go @@ -16,6 +16,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p" "github.com/OffchainLabs/prysm/v6/beacon-chain/slasher/types" "github.com/OffchainLabs/prysm/v6/beacon-chain/state" + "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/encoding/bytesutil" "github.com/OffchainLabs/prysm/v6/monitoring/tracing" @@ -67,7 +68,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation( return pubsub.ValidationReject, errWrongMessage } if err := helpers.ValidateNilAttestation(att); err != nil { - return pubsub.ValidationReject, err + return pubsub.ValidationReject, wrapAttestationError(err, att) } data := att.GetData() @@ -84,7 +85,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation( return pubsub.ValidationIgnore, err } if err := helpers.ValidateSlotTargetEpoch(data); err != nil { - return pubsub.ValidationReject, err + return pubsub.ValidationReject, wrapAttestationError(err, att) } committeeIndex := att.GetCommitteeIndex() @@ -106,7 +107,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation( s.hasBadBlock(bytesutil.ToBytes32(data.Target.Root)) || s.hasBadBlock(bytesutil.ToBytes32(data.Source.Root)) { attBadBlockCount.Inc() - return pubsub.ValidationReject, errors.New("attestation data references bad block root") + return pubsub.ValidationReject, wrapAttestationError(errors.New("attestation data references bad block root"), att) } } @@ -126,7 +127,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation( if err = s.cfg.chain.VerifyLmdFfgConsistency(ctx, att); err != nil { tracing.AnnotateError(span, err) attBadLmdConsistencyCount.Inc() - return pubsub.ValidationReject, err + return pubsub.ValidationReject, wrapAttestationError(err, att) } preState, err := s.cfg.chain.AttestationTargetState(ctx, data.Target) @@ -137,7 +138,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation( validationRes, err := s.validateUnaggregatedAttTopic(ctx, att, preState, *msg.Topic) if validationRes != pubsub.ValidationAccept { - return validationRes, err + return validationRes, wrapAttestationError(err, att) } committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, committeeIndex) @@ -148,7 +149,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation( validationRes, err = validateAttesterData(ctx, att, committee) if validationRes != pubsub.ValidationAccept { - return validationRes, err + return validationRes, wrapAttestationError(err, att) } // Consolidated handling of Electra SingleAttestation vs Phase0 unaggregated attestation @@ -183,7 +184,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation( validationRes, err = s.validateUnaggregatedAttWithState(ctx, attForValidation, preState) if validationRes != pubsub.ValidationAccept { - return validationRes, err + return validationRes, wrapAttestationError(err, att) } if s.slasherEnabled { @@ -339,7 +340,7 @@ func validateAttestingIndex( // `attestation.attester_index in get_beacon_committee(state, attestation.data.slot, index)`. inCommittee := slices.Contains(committee, attestingIndex) if !inCommittee { - return pubsub.ValidationReject, errors.New("attester is not a member of the committee") + return pubsub.ValidationReject, errors.Errorf("attester %d is not a member of the committee", attestingIndex) } return pubsub.ValidationAccept, nil @@ -392,3 +393,24 @@ func (s *Service) hasBlockAndState(ctx context.Context, blockRoot [32]byte) bool hasState := hasStateSummary || s.cfg.beaconDB.HasState(ctx, blockRoot) return hasState && s.cfg.chain.HasBlock(ctx, blockRoot) } + +func wrapAttestationError(err error, att eth.Att) error { + slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch + committeeIndex := att.GetCommitteeIndex() + + attData := att.GetData() + slot := attData.Slot + slotInEpoch := slot % slotsPerEpoch + oldCommitteeIndex := attData.CommitteeIndex + blockRoot := fmt.Sprintf("%#x", attData.BeaconBlockRoot) + sourceRoot := fmt.Sprintf("%#x", attData.Source.Root) + sourceEpoch := attData.Source.Epoch + targetEpoch := attData.Target.Epoch + targetRoot := fmt.Sprintf("%#x", attData.Target.Root) + + return errors.Wrapf( + err, + "attSlot: %d, attSlotInEpoch: %d, attOldCommitteeIndex: %d, attCommitteeIndex: %d, attBlockRoot: %s, attSource: {root: %s, epoch: %d}, attTarget: {root: %s, epoch: %d}", + slot, slotInEpoch, oldCommitteeIndex, committeeIndex, blockRoot, sourceRoot, sourceEpoch, targetRoot, targetEpoch, + ) +} diff --git a/beacon-chain/verification/BUILD.bazel b/beacon-chain/verification/BUILD.bazel index fc48a5f9bb..d2d6621da6 100644 --- a/beacon-chain/verification/BUILD.bazel +++ b/beacon-chain/verification/BUILD.bazel @@ -44,6 +44,7 @@ go_library( "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_github_spf13_afero//:go_default_library", + "@org_golang_x_sync//singleflight:go_default_library", ], ) diff --git a/beacon-chain/verification/cache.go b/beacon-chain/verification/cache.go index c8801c9e29..71931d002c 100644 --- a/beacon-chain/verification/cache.go +++ b/beacon-chain/verification/cache.go @@ -50,6 +50,10 @@ type signatureData struct { Slot primitives.Slot } +func (d signatureData) concat() string { + return string(d.Root[:]) + string(d.Signature[:]) +} + func (d signatureData) logFields() logrus.Fields { return logrus.Fields{ "root": fmt.Sprintf("%#x", d.Root), diff --git a/beacon-chain/verification/cache_test.go b/beacon-chain/verification/cache_test.go index 06fba601dc..91dd4fc658 100644 --- a/beacon-chain/verification/cache_test.go +++ b/beacon-chain/verification/cache_test.go @@ -21,6 +21,18 @@ func testSignedBlockBlobKeys(t *testing.T, valRoot []byte, slot primitives.Slot, return block, blobs, sks[0], pks[0] } +func TestSignatureDataString(t *testing.T) { + const expected = "\x01\x02\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + sigData := signatureData{ + Root: [32]byte{1, 2, 3}, + Signature: [96]byte{4, 5, 6}, + } + + actual := sigData.concat() + require.Equal(t, expected, actual) +} + func TestVerifySignature(t *testing.T) { valRoot := [32]byte{} _, blobs, _, pk := testSignedBlockBlobKeys(t, valRoot[:], 0, 1) diff --git a/beacon-chain/verification/data_column.go b/beacon-chain/verification/data_column.go index c6ae4891c3..e094a7365b 100644 --- a/beacon-chain/verification/data_column.go +++ b/beacon-chain/verification/data_column.go @@ -257,17 +257,25 @@ func (dv *RODataColumnsVerifier) ValidProposerSignature(ctx context.Context) (er continue } - columnVerificationProposerSignatureCache.WithLabelValues("miss").Inc() + // Ensure the expensive signature verification is only performed once for + // concurrent requests for the same signature data. + if _, err, _ = dv.sg.Do(signatureData.concat(), func() (any, error) { + columnVerificationProposerSignatureCache.WithLabelValues("miss").Inc() - // Retrieve the parent state. - parentState, err := dv.parentState(ctx, dataColumn) - if err != nil { - return columnErrBuilder(errors.Wrap(err, "parent state")) - } + // Retrieve the parent state. + parentState, err := dv.state(ctx, dataColumn.ParentRoot()) + if err != nil { + return nil, columnErrBuilder(errors.Wrap(err, "parent state")) + } - // Full verification, which will subsequently be cached for anything sharing the signature cache. - if err = dv.sc.VerifySignature(signatureData, parentState); err != nil { - return columnErrBuilder(errors.Wrap(err, "verify signature")) + // Full verification, which will subsequently be cached for anything sharing the signature cache. + if err = dv.sc.VerifySignature(signatureData, parentState); err != nil { + return nil, columnErrBuilder(errors.Wrap(err, "verify signature")) + } + + return nil, nil + }); err != nil { + return err } } @@ -470,15 +478,25 @@ func (dv *RODataColumnsVerifier) SidecarProposerExpected(ctx context.Context) (e idx, cached := dv.pc.Proposer(checkpoint, dataColumnSlot) if !cached { - // Retrieve the parent state. - parentState, err := dv.parentState(ctx, dataColumn) - if err != nil { - return columnErrBuilder(errors.Wrap(err, "parent state")) - } + parentRoot := dataColumn.ParentRoot() + // Ensure the expensive index computation is only performed once for + // concurrent requests for the same signature data. + if _, err, _ := dv.sg.Do(fmt.Sprintf("%#x", parentRoot), func() (any, error) { + // Retrieve the parent state. + parentState, err := dv.state(ctx, parentRoot) + if err != nil { + return nil, columnErrBuilder(errors.Wrap(err, "parent state")) + } - idx, err = dv.pc.ComputeProposer(ctx, parentRoot, dataColumnSlot, parentState) - if err != nil { - return columnErrBuilder(errors.Wrap(err, "compute proposer")) + // Compute the proposer index. + idx, err = dv.pc.ComputeProposer(ctx, parentRoot, dataColumnSlot, parentState) + if err != nil { + return nil, columnErrBuilder(errors.Wrap(err, "compute proposer")) + } + + return nil, nil + }); err != nil { + return err } } @@ -490,23 +508,21 @@ func (dv *RODataColumnsVerifier) SidecarProposerExpected(ctx context.Context) (e return nil } -// parentState retrieves the parent state of the data column from the cache if possible, else retrieves it from the state by rooter. -func (dv *RODataColumnsVerifier) parentState(ctx context.Context, dataColumn blocks.RODataColumn) (state.BeaconState, error) { - parentRoot := dataColumn.ParentRoot() - +// state retrieves the state of the corresponding root from the cache if possible, else retrieves it from the state by rooter. +func (dv *RODataColumnsVerifier) state(ctx context.Context, root [fieldparams.RootLength]byte) (state.BeaconState, error) { // If the parent root is already in the cache, return it. - if st, ok := dv.stateByRoot[parentRoot]; ok { + if st, ok := dv.stateByRoot[root]; ok { return st, nil } // Retrieve the parent state from the state by rooter. - st, err := dv.sr.StateByRoot(ctx, parentRoot) + st, err := dv.sr.StateByRoot(ctx, root) if err != nil { return nil, errors.Wrap(err, "state by root") } // Store the parent state in the cache. - dv.stateByRoot[parentRoot] = st + dv.stateByRoot[root] = st return st, nil } diff --git a/beacon-chain/verification/initializer.go b/beacon-chain/verification/initializer.go index 79511e4a22..a949690055 100644 --- a/beacon-chain/verification/initializer.go +++ b/beacon-chain/verification/initializer.go @@ -14,6 +14,7 @@ import ( "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "golang.org/x/sync/singleflight" ) // Forkchoicer represents the forkchoice methods that the verifiers need. @@ -41,6 +42,7 @@ type sharedResources struct { pc proposerCache sr StateByRooter ic *inclusionProofCache + sg singleflight.Group } // Initializer is used to create different Verifiers. diff --git a/changelog/manu-singleflight.md b/changelog/manu-singleflight.md new file mode 100644 index 0000000000..0b0e7174e1 --- /dev/null +++ b/changelog/manu-singleflight.md @@ -0,0 +1,2 @@ +### Fixed +- `RODataColumnsVerifier.ValidProposerSignature`: Ensure the expensive signature verification is only performed once for concurrent requests for the same signature data. \ No newline at end of file