mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
**What type of PR is this?** Other **What does this PR do? Why is it needed?** When we receive data column sidecars via gossip, if the sidecar does not respect the validation rules, a scary ERROR log is displayed. We can't to anything about it, since the error comes from an invalid incoming sidecar, so there is no need to print an ERROR message. Node: As all REJECTED gossip message, a DEBUG log is also always displayed. Example of ERROR log: ``` [2025-12-18 15:38:26.46] ERROR sync: Failed to decode message error=invalid ssz encoding. first variable element offset indexes into fixed value data [2025-12-18 15:38:26.46] DEBUG sync: Gossip message was rejected agent=erigon/caplin error=invalid ssz encoding. first variable element offset indexes into fixed value data gossipScore=0 multiaddress=/ip4/141.147.32.105/tcp/9000 peerID=16Uiu2HAmHu88k97iBist1vJg7cPNuTjJFRARKvDF7yaH3Pv3Vmso topic=/eth2/c6ecb76c/data_column_sidecar_30/ssz_snappy ``` (After this PR, the DEBUG one will still be printed.) **Acknowledgements** - [x] I have read [CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md). - [x] I have included a uniquely named [changelog fragment file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd). - [x] I have added a description with sufficient context for reviewers to understand this PR. - [x] I have tested that my changes work as expected and I added a testing plan to the PR description (if applicable).
317 lines
12 KiB
Go
317 lines
12 KiB
Go
package sync
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"time"
|
|
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
|
|
"github.com/OffchainLabs/prysm/v7/beacon-chain/verification"
|
|
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
|
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
|
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
|
"github.com/OffchainLabs/prysm/v7/crypto/rand"
|
|
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
|
|
eth "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
|
"github.com/OffchainLabs/prysm/v7/runtime/logging"
|
|
prysmTime "github.com/OffchainLabs/prysm/v7/time"
|
|
"github.com/OffchainLabs/prysm/v7/time/slots"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub
|
|
func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
|
|
const dataColumnSidecarSubTopic = "/data_column_sidecar_%d/"
|
|
|
|
dataColumnSidecarVerificationRequestsCounter.Inc()
|
|
receivedTime := prysmTime.Now()
|
|
|
|
// Always accept messages our own messages.
|
|
if pid == s.cfg.p2p.PeerID() {
|
|
return pubsub.ValidationAccept, nil
|
|
}
|
|
|
|
// Ignore messages during initial sync.
|
|
if s.cfg.initialSync.Syncing() {
|
|
return pubsub.ValidationIgnore, nil
|
|
}
|
|
|
|
// Reject messages with a nil topic.
|
|
if msg.Topic == nil {
|
|
return pubsub.ValidationReject, p2p.ErrInvalidTopic
|
|
}
|
|
|
|
// Decode the message, reject if it fails.
|
|
m, err := s.decodePubsubMessage(msg)
|
|
if err != nil {
|
|
return pubsub.ValidationReject, err
|
|
}
|
|
|
|
// Reject messages that are not of the expected type.
|
|
dcsc, ok := m.(*eth.DataColumnSidecar)
|
|
if !ok {
|
|
return pubsub.ValidationReject, errWrongMessage
|
|
}
|
|
|
|
// Convert to a read-only data column sidecar.
|
|
roDataColumn, err := blocks.NewRODataColumn(dcsc)
|
|
if err != nil {
|
|
return pubsub.ValidationReject, errors.Wrap(err, "roDataColumn conversion failure")
|
|
}
|
|
|
|
// Compute a batch of only one data column sidecar.
|
|
roDataColumns := []blocks.RODataColumn{roDataColumn}
|
|
|
|
// Create the verifier.
|
|
verifier := s.newColumnsVerifier(roDataColumns, verification.GossipDataColumnSidecarRequirements)
|
|
|
|
// Start the verification process.
|
|
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/p2p-interface.md#data_column_sidecar_subnet_id
|
|
|
|
// [REJECT] The sidecar is valid as verified by `verify_data_column_sidecar(sidecar)`.
|
|
if err := verifier.ValidFields(); err != nil {
|
|
return pubsub.ValidationReject, err
|
|
}
|
|
|
|
// [REJECT] The sidecar is for the correct subnet -- i.e. `compute_subnet_for_data_column_sidecar(sidecar.index) == subnet_id`.
|
|
if err := verifier.CorrectSubnet(dataColumnSidecarSubTopic, []string{*msg.Topic}); err != nil {
|
|
return pubsub.ValidationReject, err
|
|
}
|
|
|
|
// [IGNORE] The sidecar is not from a future slot (with a `MAXIMUM_GOSSIP_CLOCK_DISPARITY` allowance)
|
|
// -- i.e. validate that `block_header.slot <= current_slot` (a client MAY queue future sidecars for processing at the appropriate slot).
|
|
if err := verifier.NotFromFutureSlot(); err != nil {
|
|
return pubsub.ValidationIgnore, err
|
|
}
|
|
|
|
// [IGNORE] The sidecar is from a slot greater than the latest finalized slot
|
|
// -- i.e. validate that `block_header.slot > compute_start_slot_at_epoch(state.finalized_checkpoint.epoch)`
|
|
if err := verifier.SlotAboveFinalized(); err != nil {
|
|
return pubsub.ValidationIgnore, err
|
|
}
|
|
|
|
// [IGNORE] The sidecar's block's parent (defined by `block_header.parent_root`) has been seen (via gossip or non-gossip sources
|
|
// (a client MAY queue sidecars for processing once the parent block is retrieved).
|
|
if err := verifier.SidecarParentSeen(s.hasBadBlock); err != nil {
|
|
// If we haven't seen the parent, request it asynchronously.
|
|
go func() {
|
|
customCtx := context.Background()
|
|
parentRoot := roDataColumn.ParentRoot()
|
|
roots := [][fieldparams.RootLength]byte{parentRoot}
|
|
randGenerator := rand.NewGenerator()
|
|
if err := s.sendBatchRootRequest(customCtx, roots, randGenerator); err != nil {
|
|
log.WithError(err).WithFields(logging.DataColumnFields(roDataColumn)).Debug("Failed to send batch root request")
|
|
}
|
|
}()
|
|
|
|
return pubsub.ValidationIgnore, err
|
|
}
|
|
|
|
// [REJECT] The sidecar's block's parent (defined by `block_header.parent_root`) passes validation.
|
|
if err := verifier.SidecarParentValid(s.hasBadBlock); err != nil {
|
|
return pubsub.ValidationReject, err
|
|
}
|
|
|
|
// [REJECT] The proposer signature of `sidecar.signed_block_header`, is valid with respect to the `block_header.proposer_index` pubkey.
|
|
// We do not strictly respect the spec ordering here. This is necessary because signature verification depends on the parent root,
|
|
// which is only available if the parent block is known.
|
|
if err := verifier.ValidProposerSignature(ctx); err != nil {
|
|
return pubsub.ValidationReject, err
|
|
}
|
|
|
|
// [REJECT] The sidecar is from a higher slot than the sidecar's block's parent (defined by `block_header.parent_root`).
|
|
if err := verifier.SidecarParentSlotLower(); err != nil {
|
|
return pubsub.ValidationReject, err
|
|
}
|
|
|
|
// [REJECT] The current `finalized_checkpoint` is an ancestor of the sidecar's block
|
|
// -- i.e. `get_checkpoint_block(store, block_header.parent_root, store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root`.
|
|
if err := verifier.SidecarDescendsFromFinalized(); err != nil {
|
|
return pubsub.ValidationReject, err
|
|
}
|
|
|
|
// [REJECT] The sidecar's `kzg_commitments` field inclusion proof is valid as verified by `verify_data_column_sidecar_inclusion_proof(sidecar)`.
|
|
if err := verifier.SidecarInclusionProven(); err != nil {
|
|
return pubsub.ValidationReject, err
|
|
}
|
|
|
|
// [REJECT] The sidecar's column data is valid as verified by `verify_data_column_sidecar_kzg_proofs(sidecar)`.
|
|
validationResult, err := s.validateWithKzgBatchVerifier(ctx, roDataColumns)
|
|
if validationResult != pubsub.ValidationAccept {
|
|
return validationResult, err
|
|
}
|
|
// Mark KZG verification as satisfied since we did it via batch verifier
|
|
verifier.SatisfyRequirement(verification.RequireSidecarKzgProofVerified)
|
|
|
|
// [IGNORE] The sidecar is the first sidecar for the tuple `(block_header.slot, block_header.proposer_index, sidecar.index)`
|
|
// with valid header signature, sidecar inclusion proof, and kzg proof.
|
|
if s.hasSeenDataColumnIndex(roDataColumn.Slot(), roDataColumn.ProposerIndex(), roDataColumn.DataColumnSidecar.Index) {
|
|
return pubsub.ValidationIgnore, nil
|
|
}
|
|
|
|
// [REJECT] The sidecar is proposed by the expected `proposer_index` for the block's slot in the context of the current shuffling (defined by `block_header.parent_root`/`block_header.slot`).
|
|
// If the `proposer_index` cannot immediately be verified against the expected shuffling, the sidecar MAY be queued for later processing while proposers for the block's branch are calculated
|
|
// -- in such a case do not REJECT, instead IGNORE this message.
|
|
if err := verifier.SidecarProposerExpected(ctx); err != nil {
|
|
return pubsub.ValidationReject, err
|
|
}
|
|
|
|
verifiedRODataColumns, err := verifier.VerifiedRODataColumns()
|
|
if err != nil {
|
|
// This should never happen.
|
|
log.WithError(err).WithFields(logging.DataColumnFields(roDataColumn)).Error("Failed to get verified data columns")
|
|
return pubsub.ValidationIgnore, err
|
|
}
|
|
|
|
verifiedRODataColumnsCount := len(verifiedRODataColumns)
|
|
|
|
if verifiedRODataColumnsCount != 1 {
|
|
// This should never happen.
|
|
log.WithField("verifiedRODataColumnsCount", verifiedRODataColumnsCount).Error("Verified data columns count is not 1")
|
|
return pubsub.ValidationIgnore, errors.New("Wrong number of verified data columns")
|
|
}
|
|
|
|
msg.ValidatorData = verifiedRODataColumns[0]
|
|
dataColumnSidecarVerificationSuccessesCounter.Inc()
|
|
|
|
// Get the time at slot start.
|
|
startTime, err := slots.StartTime(s.cfg.clock.GenesisTime(), roDataColumn.SignedBlockHeader.Header.Slot)
|
|
if err != nil {
|
|
return pubsub.ValidationIgnore, err
|
|
}
|
|
|
|
sinceSlotStartTime := receivedTime.Sub(startTime)
|
|
validationTime := s.cfg.clock.Now().Sub(receivedTime)
|
|
dataColumnSidecarArrivalGossipSummary.Observe(float64(sinceSlotStartTime.Milliseconds()))
|
|
dataColumnSidecarVerificationGossipHistogram.Observe(float64(validationTime.Milliseconds()))
|
|
|
|
peerGossipScore := s.cfg.p2p.Peers().Scorers().GossipScorer().Score(pid)
|
|
|
|
select {
|
|
case s.dataColumnLogCh <- dataColumnLogEntry{
|
|
Slot: roDataColumn.Slot(),
|
|
ColIdx: roDataColumn.Index,
|
|
PropIdx: roDataColumn.ProposerIndex(),
|
|
BlockRoot: roDataColumn.BlockRoot(),
|
|
ParentRoot: roDataColumn.ParentRoot(),
|
|
PeerSuffix: pid.String()[len(pid.String())-6:],
|
|
PeerGossipScore: peerGossipScore,
|
|
validationTime: validationTime,
|
|
sinceStartTime: sinceSlotStartTime,
|
|
}:
|
|
default:
|
|
log.WithField("slot", roDataColumn.Slot()).Warn("Failed to send data column log entry")
|
|
}
|
|
|
|
if s.cfg.operationNotifier != nil {
|
|
s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{
|
|
Type: operation.DataColumnReceived,
|
|
Data: &operation.DataColumnReceivedData{
|
|
Slot: roDataColumn.Slot(),
|
|
Index: roDataColumn.Index,
|
|
BlockRoot: roDataColumn.BlockRoot(),
|
|
KzgCommitments: bytesutil.SafeCopy2dBytes(roDataColumn.KzgCommitments),
|
|
},
|
|
})
|
|
}
|
|
|
|
return pubsub.ValidationAccept, nil
|
|
}
|
|
|
|
// Returns true if the column with the same slot, proposer index, and column index has been seen before.
|
|
func (s *Service) hasSeenDataColumnIndex(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, index uint64) bool {
|
|
key := computeCacheKey(slot, proposerIndex, index)
|
|
_, seen := s.seenDataColumnCache.Get(key)
|
|
return seen
|
|
}
|
|
|
|
// Sets the data column with the same slot, proposer index, and data column index as seen.
|
|
func (s *Service) setSeenDataColumnIndex(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, index uint64) {
|
|
key := computeCacheKey(slot, proposerIndex, index)
|
|
s.seenDataColumnCache.Add(slot, key, true)
|
|
}
|
|
|
|
func computeCacheKey(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, index uint64) string {
|
|
key := make([]byte, 0, 96)
|
|
|
|
key = append(key, bytesutil.Bytes32(uint64(slot))...)
|
|
key = append(key, bytesutil.Bytes32(uint64(proposerIndex))...)
|
|
key = append(key, bytesutil.Bytes32(index)...)
|
|
|
|
return string(key)
|
|
}
|
|
|
|
type dataColumnLogEntry struct {
|
|
Slot primitives.Slot
|
|
ColIdx uint64
|
|
PropIdx primitives.ValidatorIndex
|
|
BlockRoot [32]byte
|
|
ParentRoot [32]byte
|
|
PeerSuffix string
|
|
PeerGossipScore float64
|
|
validationTime time.Duration
|
|
sinceStartTime time.Duration
|
|
}
|
|
|
|
func (s *Service) processDataColumnLogs() {
|
|
ticker := time.NewTicker(1 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
slotStats := make(map[primitives.Slot][fieldparams.NumberOfColumns]dataColumnLogEntry)
|
|
|
|
for {
|
|
select {
|
|
case entry := <-s.dataColumnLogCh:
|
|
cols := slotStats[entry.Slot]
|
|
cols[entry.ColIdx] = entry
|
|
slotStats[entry.Slot] = cols
|
|
case <-ticker.C:
|
|
for slot, columns := range slotStats {
|
|
var (
|
|
colIndices = make([]uint64, 0, fieldparams.NumberOfColumns)
|
|
peers = make([]string, 0, fieldparams.NumberOfColumns)
|
|
gossipScores = make([]float64, 0, fieldparams.NumberOfColumns)
|
|
validationTimes = make([]string, 0, fieldparams.NumberOfColumns)
|
|
sinceStartTimes = make([]string, 0, fieldparams.NumberOfColumns)
|
|
)
|
|
|
|
totalReceived := 0
|
|
for _, entry := range columns {
|
|
if entry.PeerSuffix == "" {
|
|
continue
|
|
}
|
|
colIndices = append(colIndices, entry.ColIdx)
|
|
peers = append(peers, entry.PeerSuffix)
|
|
gossipScores = append(gossipScores, roundFloat(entry.PeerGossipScore, 2))
|
|
validationTimes = append(validationTimes, fmt.Sprintf("%.2fms", float64(entry.validationTime.Milliseconds())))
|
|
sinceStartTimes = append(sinceStartTimes, fmt.Sprintf("%.2fms", float64(entry.sinceStartTime.Milliseconds())))
|
|
totalReceived++
|
|
}
|
|
|
|
log.WithFields(logrus.Fields{
|
|
"slot": slot,
|
|
"receivedCount": totalReceived,
|
|
"columnIndices": colIndices,
|
|
"peers": peers,
|
|
"gossipScores": gossipScores,
|
|
"validationTimes": validationTimes,
|
|
"sinceStartTimes": sinceStartTimes,
|
|
}).Debug("Accepted data column sidecars summary")
|
|
}
|
|
slotStats = make(map[primitives.Slot][fieldparams.NumberOfColumns]dataColumnLogEntry)
|
|
}
|
|
}
|
|
}
|
|
|
|
func roundFloat(f float64, decimals int) float64 {
|
|
mult := math.Pow(10, float64(decimals))
|
|
return math.Round(f*mult) / mult
|
|
}
|