RODataColumnsVerifier.ValidProposerSignature: Ensure the expensive signature verification is only performed once for concurrent requests for the same signature data. (#15954)

* `signatureData`: Add `string` function.

* `RODataColumnsVerifier.ValidProposerSignature`: Ensure the expensive signature verification is only performed once for concurrent requests for the same signature data.

Share flight group

* `parentState` ==> `state`.

* `RODataColumnsVerifier.SidecarProposerExpected: Ensure the expensive index computation is only performed once for concurrent requests.`

* Add `wrapAttestationError`

* Fix Kasey's comment.

* Fix Terence's comment.
This commit is contained in:
Manu NALEPA
2025-11-03 15:48:41 +01:00
committed by GitHub
parent b2a9db0826
commit ec524ce99c
7 changed files with 91 additions and 32 deletions

View File

@@ -16,6 +16,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
"github.com/OffchainLabs/prysm/v6/beacon-chain/slasher/types"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing"
@@ -67,7 +68,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(
return pubsub.ValidationReject, errWrongMessage
}
if err := helpers.ValidateNilAttestation(att); err != nil {
return pubsub.ValidationReject, err
return pubsub.ValidationReject, wrapAttestationError(err, att)
}
data := att.GetData()
@@ -84,7 +85,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(
return pubsub.ValidationIgnore, err
}
if err := helpers.ValidateSlotTargetEpoch(data); err != nil {
return pubsub.ValidationReject, err
return pubsub.ValidationReject, wrapAttestationError(err, att)
}
committeeIndex := att.GetCommitteeIndex()
@@ -106,7 +107,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(
s.hasBadBlock(bytesutil.ToBytes32(data.Target.Root)) ||
s.hasBadBlock(bytesutil.ToBytes32(data.Source.Root)) {
attBadBlockCount.Inc()
return pubsub.ValidationReject, errors.New("attestation data references bad block root")
return pubsub.ValidationReject, wrapAttestationError(errors.New("attestation data references bad block root"), att)
}
}
@@ -126,7 +127,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(
if err = s.cfg.chain.VerifyLmdFfgConsistency(ctx, att); err != nil {
tracing.AnnotateError(span, err)
attBadLmdConsistencyCount.Inc()
return pubsub.ValidationReject, err
return pubsub.ValidationReject, wrapAttestationError(err, att)
}
preState, err := s.cfg.chain.AttestationTargetState(ctx, data.Target)
@@ -137,7 +138,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(
validationRes, err := s.validateUnaggregatedAttTopic(ctx, att, preState, *msg.Topic)
if validationRes != pubsub.ValidationAccept {
return validationRes, err
return validationRes, wrapAttestationError(err, att)
}
committee, err := helpers.BeaconCommitteeFromState(ctx, preState, data.Slot, committeeIndex)
@@ -148,7 +149,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(
validationRes, err = validateAttesterData(ctx, att, committee)
if validationRes != pubsub.ValidationAccept {
return validationRes, err
return validationRes, wrapAttestationError(err, att)
}
// Consolidated handling of Electra SingleAttestation vs Phase0 unaggregated attestation
@@ -183,7 +184,7 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(
validationRes, err = s.validateUnaggregatedAttWithState(ctx, attForValidation, preState)
if validationRes != pubsub.ValidationAccept {
return validationRes, err
return validationRes, wrapAttestationError(err, att)
}
if s.slasherEnabled {
@@ -339,7 +340,7 @@ func validateAttestingIndex(
// `attestation.attester_index in get_beacon_committee(state, attestation.data.slot, index)`.
inCommittee := slices.Contains(committee, attestingIndex)
if !inCommittee {
return pubsub.ValidationReject, errors.New("attester is not a member of the committee")
return pubsub.ValidationReject, errors.Errorf("attester %d is not a member of the committee", attestingIndex)
}
return pubsub.ValidationAccept, nil
@@ -392,3 +393,24 @@ func (s *Service) hasBlockAndState(ctx context.Context, blockRoot [32]byte) bool
hasState := hasStateSummary || s.cfg.beaconDB.HasState(ctx, blockRoot)
return hasState && s.cfg.chain.HasBlock(ctx, blockRoot)
}
func wrapAttestationError(err error, att eth.Att) error {
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
committeeIndex := att.GetCommitteeIndex()
attData := att.GetData()
slot := attData.Slot
slotInEpoch := slot % slotsPerEpoch
oldCommitteeIndex := attData.CommitteeIndex
blockRoot := fmt.Sprintf("%#x", attData.BeaconBlockRoot)
sourceRoot := fmt.Sprintf("%#x", attData.Source.Root)
sourceEpoch := attData.Source.Epoch
targetEpoch := attData.Target.Epoch
targetRoot := fmt.Sprintf("%#x", attData.Target.Root)
return errors.Wrapf(
err,
"attSlot: %d, attSlotInEpoch: %d, attOldCommitteeIndex: %d, attCommitteeIndex: %d, attBlockRoot: %s, attSource: {root: %s, epoch: %d}, attTarget: {root: %s, epoch: %d}",
slot, slotInEpoch, oldCommitteeIndex, committeeIndex, blockRoot, sourceRoot, sourceEpoch, targetRoot, targetEpoch,
)
}

View File

@@ -44,6 +44,7 @@ go_library(
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_spf13_afero//:go_default_library",
"@org_golang_x_sync//singleflight:go_default_library",
],
)

View File

@@ -50,6 +50,10 @@ type signatureData struct {
Slot primitives.Slot
}
func (d signatureData) concat() string {
return string(d.Root[:]) + string(d.Signature[:])
}
func (d signatureData) logFields() logrus.Fields {
return logrus.Fields{
"root": fmt.Sprintf("%#x", d.Root),

View File

@@ -21,6 +21,18 @@ func testSignedBlockBlobKeys(t *testing.T, valRoot []byte, slot primitives.Slot,
return block, blobs, sks[0], pks[0]
}
func TestSignatureDataString(t *testing.T) {
const expected = "\x01\x02\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x05\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
sigData := signatureData{
Root: [32]byte{1, 2, 3},
Signature: [96]byte{4, 5, 6},
}
actual := sigData.concat()
require.Equal(t, expected, actual)
}
func TestVerifySignature(t *testing.T) {
valRoot := [32]byte{}
_, blobs, _, pk := testSignedBlockBlobKeys(t, valRoot[:], 0, 1)

View File

@@ -257,17 +257,25 @@ func (dv *RODataColumnsVerifier) ValidProposerSignature(ctx context.Context) (er
continue
}
columnVerificationProposerSignatureCache.WithLabelValues("miss").Inc()
// Ensure the expensive signature verification is only performed once for
// concurrent requests for the same signature data.
if _, err, _ = dv.sg.Do(signatureData.concat(), func() (any, error) {
columnVerificationProposerSignatureCache.WithLabelValues("miss").Inc()
// Retrieve the parent state.
parentState, err := dv.parentState(ctx, dataColumn)
if err != nil {
return columnErrBuilder(errors.Wrap(err, "parent state"))
}
// Retrieve the parent state.
parentState, err := dv.state(ctx, dataColumn.ParentRoot())
if err != nil {
return nil, columnErrBuilder(errors.Wrap(err, "parent state"))
}
// Full verification, which will subsequently be cached for anything sharing the signature cache.
if err = dv.sc.VerifySignature(signatureData, parentState); err != nil {
return columnErrBuilder(errors.Wrap(err, "verify signature"))
// Full verification, which will subsequently be cached for anything sharing the signature cache.
if err = dv.sc.VerifySignature(signatureData, parentState); err != nil {
return nil, columnErrBuilder(errors.Wrap(err, "verify signature"))
}
return nil, nil
}); err != nil {
return err
}
}
@@ -470,15 +478,25 @@ func (dv *RODataColumnsVerifier) SidecarProposerExpected(ctx context.Context) (e
idx, cached := dv.pc.Proposer(checkpoint, dataColumnSlot)
if !cached {
// Retrieve the parent state.
parentState, err := dv.parentState(ctx, dataColumn)
if err != nil {
return columnErrBuilder(errors.Wrap(err, "parent state"))
}
parentRoot := dataColumn.ParentRoot()
// Ensure the expensive index computation is only performed once for
// concurrent requests for the same signature data.
if _, err, _ := dv.sg.Do(fmt.Sprintf("%#x", parentRoot), func() (any, error) {
// Retrieve the parent state.
parentState, err := dv.state(ctx, parentRoot)
if err != nil {
return nil, columnErrBuilder(errors.Wrap(err, "parent state"))
}
idx, err = dv.pc.ComputeProposer(ctx, parentRoot, dataColumnSlot, parentState)
if err != nil {
return columnErrBuilder(errors.Wrap(err, "compute proposer"))
// Compute the proposer index.
idx, err = dv.pc.ComputeProposer(ctx, parentRoot, dataColumnSlot, parentState)
if err != nil {
return nil, columnErrBuilder(errors.Wrap(err, "compute proposer"))
}
return nil, nil
}); err != nil {
return err
}
}
@@ -490,23 +508,21 @@ func (dv *RODataColumnsVerifier) SidecarProposerExpected(ctx context.Context) (e
return nil
}
// parentState retrieves the parent state of the data column from the cache if possible, else retrieves it from the state by rooter.
func (dv *RODataColumnsVerifier) parentState(ctx context.Context, dataColumn blocks.RODataColumn) (state.BeaconState, error) {
parentRoot := dataColumn.ParentRoot()
// state retrieves the state of the corresponding root from the cache if possible, else retrieves it from the state by rooter.
func (dv *RODataColumnsVerifier) state(ctx context.Context, root [fieldparams.RootLength]byte) (state.BeaconState, error) {
// If the parent root is already in the cache, return it.
if st, ok := dv.stateByRoot[parentRoot]; ok {
if st, ok := dv.stateByRoot[root]; ok {
return st, nil
}
// Retrieve the parent state from the state by rooter.
st, err := dv.sr.StateByRoot(ctx, parentRoot)
st, err := dv.sr.StateByRoot(ctx, root)
if err != nil {
return nil, errors.Wrap(err, "state by root")
}
// Store the parent state in the cache.
dv.stateByRoot[parentRoot] = st
dv.stateByRoot[root] = st
return st, nil
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"golang.org/x/sync/singleflight"
)
// Forkchoicer represents the forkchoice methods that the verifiers need.
@@ -41,6 +42,7 @@ type sharedResources struct {
pc proposerCache
sr StateByRooter
ic *inclusionProofCache
sg singleflight.Group
}
// Initializer is used to create different Verifiers.

View File

@@ -0,0 +1,2 @@
### Fixed
- `RODataColumnsVerifier.ValidProposerSignature`: Ensure the expensive signature verification is only performed once for concurrent requests for the same signature data.