From 00204ffa6aabb579c1ad12330bddfad122869144 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Mon, 26 May 2025 00:31:09 +0200 Subject: [PATCH] PeerDAS: Implement the validation pipeline for data column sidecars received via gossip (#15310) * PeerDAS: Implement the validation pipeline for data column sidecars received via gossip * Fix Terence's comment * Fix Terence's comment. * Fix Terence's comment. --- beacon-chain/core/peerdas/metrics.go | 2 +- beacon-chain/p2p/gossip_topic_mappings.go | 1 + beacon-chain/p2p/interfaces.go | 2 + beacon-chain/p2p/pubsub_filter_test.go | 9 +- beacon-chain/p2p/service.go | 9 + beacon-chain/p2p/testing/fuzz_p2p.go | 6 + beacon-chain/p2p/testing/mock_peermanager.go | 6 + beacon-chain/p2p/topics.go | 5 + beacon-chain/sync/BUILD.bazel | 4 + beacon-chain/sync/decode_pubsub.go | 2 + beacon-chain/sync/metrics.go | 19 ++ beacon-chain/sync/service.go | 13 + beacon-chain/sync/subscriber.go | 23 +- beacon-chain/sync/validate_data_column.go | 299 ++++++++++++++++++ .../sync/validate_data_column_test.go | 224 +++++++++++++ beacon-chain/verification/mock.go | 79 ++++- changelog/manu-peerdas-validation.md | 2 + consensus-types/blocks/rodatacolumn.go | 2 + 18 files changed, 699 insertions(+), 8 deletions(-) create mode 100644 beacon-chain/sync/validate_data_column.go create mode 100644 beacon-chain/sync/validate_data_column_test.go create mode 100644 changelog/manu-peerdas-validation.md diff --git a/beacon-chain/core/peerdas/metrics.go b/beacon-chain/core/peerdas/metrics.go index cf8d73254b..caa92ebc56 100644 --- a/beacon-chain/core/peerdas/metrics.go +++ b/beacon-chain/core/peerdas/metrics.go @@ -9,6 +9,6 @@ var dataColumnComputationTime = promauto.NewHistogram( prometheus.HistogramOpts{ Name: "beacon_data_column_sidecar_computation_milliseconds", Help: "Captures the time taken to compute data column sidecars from blobs.", - Buckets: []float64{100, 250, 500, 750, 1000, 1500, 2000, 4000, 8000, 12000, 16000}, + Buckets: []float64{25, 50, 100, 250, 500, 750, 1000}, }, ) diff --git a/beacon-chain/p2p/gossip_topic_mappings.go b/beacon-chain/p2p/gossip_topic_mappings.go index 918cd8f1cb..eac93eef57 100644 --- a/beacon-chain/p2p/gossip_topic_mappings.go +++ b/beacon-chain/p2p/gossip_topic_mappings.go @@ -24,6 +24,7 @@ var gossipTopicMappings = map[string]func() proto.Message{ BlobSubnetTopicFormat: func() proto.Message { return ðpb.BlobSidecar{} }, LightClientOptimisticUpdateTopicFormat: func() proto.Message { return ðpb.LightClientOptimisticUpdateAltair{} }, LightClientFinalityUpdateTopicFormat: func() proto.Message { return ðpb.LightClientFinalityUpdateAltair{} }, + DataColumnSubnetTopicFormat: func() proto.Message { return ðpb.DataColumnSidecar{} }, } // GossipTopicMappings is a function to return the assigned data type diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index ee0142a04f..d8aa5bf6cb 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -8,6 +8,7 @@ import ( "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1/metadata" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/connmgr" @@ -84,6 +85,7 @@ type PeerManager interface { PeerID() peer.ID Host() host.Host ENR() *enr.Record + NodeID() enode.ID DiscoveryAddresses() ([]multiaddr.Multiaddr, error) RefreshPersistentSubnets() FindPeersWithSubnet(ctx context.Context, topic string, subIndex uint64, threshold int) (bool, error) diff --git a/beacon-chain/p2p/pubsub_filter_test.go b/beacon-chain/p2p/pubsub_filter_test.go index 9b867031c5..d52c342d31 100644 --- a/beacon-chain/p2p/pubsub_filter_test.go +++ b/beacon-chain/p2p/pubsub_filter_test.go @@ -90,7 +90,14 @@ func TestService_CanSubscribe(t *testing.T) { formatting := []interface{}{digest} // Special case for attestation subnets which have a second formatting placeholder. - if topic == AttestationSubnetTopicFormat || topic == SyncCommitteeSubnetTopicFormat || topic == BlobSubnetTopicFormat { + topics := map[string]bool{ + AttestationSubnetTopicFormat: true, + SyncCommitteeSubnetTopicFormat: true, + BlobSubnetTopicFormat: true, + DataColumnSubnetTopicFormat: true, + } + + if topics[topic] { formatting = append(formatting, 0 /* some subnet ID */) } diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index ab27a28aaf..1ec67b456d 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -363,6 +363,15 @@ func (s *Service) ENR() *enr.Record { return s.dv5Listener.Self().Record() } +// NodeID returns the local node's node ID for discovery. +func (s *Service) NodeID() enode.ID { + if s.dv5Listener == nil { + return enode.ID{} + } + + return s.dv5Listener.Self().ID() +} + // DiscoveryAddresses represents our enr addresses as multiaddresses. func (s *Service) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { if s.dv5Listener == nil { diff --git a/beacon-chain/p2p/testing/fuzz_p2p.go b/beacon-chain/p2p/testing/fuzz_p2p.go index 5e0b808d34..d72aa34f2c 100644 --- a/beacon-chain/p2p/testing/fuzz_p2p.go +++ b/beacon-chain/p2p/testing/fuzz_p2p.go @@ -8,6 +8,7 @@ import ( "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1/metadata" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/control" @@ -56,6 +57,11 @@ func (*FakeP2P) ENR() *enr.Record { return new(enr.Record) } +// NodeID returns the node id of the local peer. +func (*FakeP2P) NodeID() enode.ID { + return [32]byte{} +} + // DiscoveryAddresses -- fake func (*FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { return nil, nil diff --git a/beacon-chain/p2p/testing/mock_peermanager.go b/beacon-chain/p2p/testing/mock_peermanager.go index a59c76c173..5b1d488323 100644 --- a/beacon-chain/p2p/testing/mock_peermanager.go +++ b/beacon-chain/p2p/testing/mock_peermanager.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -39,6 +40,11 @@ func (m *MockPeerManager) ENR() *enr.Record { return m.Enr } +// NodeID . +func (m MockPeerManager) NodeID() enode.ID { + return [32]byte{} +} + // DiscoveryAddresses . func (m *MockPeerManager) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) { if m.FailDiscoveryAddr { diff --git a/beacon-chain/p2p/topics.go b/beacon-chain/p2p/topics.go index 966c5c4183..c84e092f24 100644 --- a/beacon-chain/p2p/topics.go +++ b/beacon-chain/p2p/topics.go @@ -34,6 +34,9 @@ const ( GossipLightClientFinalityUpdateMessage = "light_client_finality_update" // GossipLightClientOptimisticUpdateMessage is the name for the light client optimistic update message type. GossipLightClientOptimisticUpdateMessage = "light_client_optimistic_update" + // GossipDataColumnSidecarMessage is the name for the data column sidecar message type. + GossipDataColumnSidecarMessage = "data_column_sidecar" + // Topic Formats // // AttestationSubnetTopicFormat is the topic format for the attestation subnet. @@ -60,4 +63,6 @@ const ( LightClientFinalityUpdateTopicFormat = GossipProtocolAndDigest + GossipLightClientFinalityUpdateMessage // LightClientOptimisticUpdateTopicFormat is the topic format for the light client optimistic update subnet. LightClientOptimisticUpdateTopicFormat = GossipProtocolAndDigest + GossipLightClientOptimisticUpdateMessage + // DataColumnSubnetTopicFormat is the topic format for the data column subnet. + DataColumnSubnetTopicFormat = GossipProtocolAndDigest + GossipDataColumnSidecarMessage + "_%d" ) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index e4966bc1ff..42666faf02 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -49,6 +49,7 @@ go_library( "validate_beacon_blocks.go", "validate_blob.go", "validate_bls_to_execution_change.go", + "validate_data_column.go", "validate_light_client.go", "validate_proposer_slashing.go", "validate_sync_committee_message.go", @@ -123,6 +124,7 @@ go_library( "//proto/prysm/v1alpha1/attestation:go_default_library", "//proto/prysm/v1alpha1/metadata:go_default_library", "//runtime:go_default_library", + "//runtime/logging:go_default_library", "//runtime/messagehandler:go_default_library", "//runtime/version:go_default_library", "//time:go_default_library", @@ -189,6 +191,7 @@ go_test( "validate_beacon_blocks_test.go", "validate_blob_test.go", "validate_bls_to_execution_change_test.go", + "validate_data_column_test.go", "validate_light_client_test.go", "validate_proposer_slashing_test.go", "validate_sync_committee_message_test.go", @@ -273,6 +276,7 @@ go_test( "@com_github_libp2p_go_libp2p_pubsub//pb:go_default_library", "@com_github_patrickmn_go_cache//:go_default_library", "@com_github_pkg_errors//:go_default_library", + "@com_github_prysmaticlabs_fastssz//:go_default_library", "@com_github_prysmaticlabs_go_bitfield//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", "@com_github_sirupsen_logrus//hooks/test:go_default_library", diff --git a/beacon-chain/sync/decode_pubsub.go b/beacon-chain/sync/decode_pubsub.go index 2886891d9f..9aa9c09b27 100644 --- a/beacon-chain/sync/decode_pubsub.go +++ b/beacon-chain/sync/decode_pubsub.go @@ -45,6 +45,8 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err topic = p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})] case strings.Contains(topic, p2p.GossipBlobSidecarMessage): topic = p2p.GossipTypeMapping[reflect.TypeOf(ðpb.BlobSidecar{})] + case strings.Contains(topic, p2p.GossipDataColumnSidecarMessage): + topic = p2p.GossipTypeMapping[reflect.TypeOf(ðpb.DataColumnSidecar{})] } base := p2p.GossipTopicMappings(topic, 0) diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index 4b376a6c0f..a632c2665a 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -184,6 +184,25 @@ var ( Help: "Count the number of times blobs have been found in the database.", }, ) + + // Data column sidecar validation, beacon metrics specs + dataColumnSidecarVerificationRequestsCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "beacon_data_column_sidecar_processing_requests_total", + Help: "Count the number of data column sidecars submitted for verification", + }) + + dataColumnSidecarVerificationSuccessesCounter = promauto.NewCounter(prometheus.CounterOpts{ + Name: "beacon_data_column_sidecar_processing_successes_total", + Help: "Count the number of data column sidecars verified for gossip", + }) + + dataColumnSidecarVerificationGossipHistogram = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "beacon_data_column_sidecar_gossip_verification_milliseconds", + Help: "Captures the time taken to verify data column sidecars.", + Buckets: []float64{100, 250, 500, 750, 1000, 1500, 2000, 4000, 8000, 12000, 16000}, + }, + ) ) func (s *Service) updateMetrics() { diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index bcdc41f92b..bffb2fc8e1 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -140,6 +140,7 @@ type Service struct { seenBlockCache *lru.Cache seenBlobLock sync.RWMutex seenBlobCache *lru.Cache + seenDataColumnCache *lru.Cache seenAggregatedAttestationLock sync.RWMutex seenAggregatedAttestationCache *lru.Cache seenUnAggregatedAttestationLock sync.RWMutex @@ -163,10 +164,12 @@ type Service struct { initialSyncComplete chan struct{} verifierWaiter *verification.InitializerWaiter newBlobVerifier verification.NewBlobVerifier + newColumnsVerifier verification.NewDataColumnsVerifier availableBlocker coverage.AvailableBlocker ctxMap ContextByteVersions slasherEnabled bool lcStore *lightClient.Store + dataColumnLogCh chan dataColumnLogEntry } // NewService initializes new regular sync service. @@ -181,6 +184,7 @@ func NewService(ctx context.Context, opts ...Option) *Service { seenPendingBlocks: make(map[[32]byte]bool), blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), signatureChan: make(chan *signatureVerifier, verifierLimit), + dataColumnLogCh: make(chan dataColumnLogEntry, 1000), } for _, opt := range opts { @@ -223,6 +227,12 @@ func newBlobVerifierFromInitializer(ini *verification.Initializer) verification. } } +func newDataColumnsVerifierFromInitializer(ini *verification.Initializer) verification.NewDataColumnsVerifier { + return func(roDataColumns []blocks.RODataColumn, reqs []verification.Requirement) verification.DataColumnsVerifier { + return ini.NewDataColumnsVerifier(roDataColumns, reqs) + } +} + // Start the regular sync service. func (s *Service) Start() { v, err := s.verifierWaiter.WaitForInitializer(s.ctx) @@ -231,9 +241,11 @@ func (s *Service) Start() { return } s.newBlobVerifier = newBlobVerifierFromInitializer(v) + s.newColumnsVerifier = newDataColumnsVerifierFromInitializer(v) go s.verifierRoutine() go s.startTasksPostInitialSync() + go s.processDataColumnLogs() s.cfg.p2p.AddConnectionHandler(s.reValidatePeer, s.sendGoodbye) s.cfg.p2p.AddDisconnectionHandler(func(_ context.Context, _ peer.ID) error { @@ -285,6 +297,7 @@ func (s *Service) Status() error { func (s *Service) initCaches() { s.seenBlockCache = lruwrpr.New(seenBlockSize) s.seenBlobCache = lruwrpr.New(seenBlobSize) + s.seenDataColumnCache = lruwrpr.New(seenDataColumnSize) s.seenAggregatedAttestationCache = lruwrpr.New(seenAggregatedAttSize) s.seenUnAggregatedAttestationCache = lruwrpr.New(seenUnaggregatedAttSize) s.seenSyncMessageCache = lruwrpr.New(seenSyncMsgSize) diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index a44e47ce43..730731eba4 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -117,7 +117,8 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { s.persistentAndAggregatorSubnetIndices, s.attesterSubnetIndices, ) - // Altair fork version + + // New gossip topic in Altair if params.BeaconConfig().AltairForkEpoch <= epoch { s.subscribe( p2p.SyncContributionAndProofSubnetTopicFormat, @@ -159,7 +160,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { ) } - // New gossip topic in Deneb, modified in Electra + // New gossip topic in Deneb, removed in Electra if params.BeaconConfig().DenebForkEpoch <= epoch && epoch < params.BeaconConfig().ElectraForkEpoch { s.subscribeWithParameters( p2p.BlobSubnetTopicFormat, @@ -173,8 +174,8 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { ) } - // Modified gossip topic in Electra - if params.BeaconConfig().ElectraForkEpoch <= epoch { + // New gossip topic in Electra, removed in Fulu + if params.BeaconConfig().ElectraForkEpoch <= epoch && epoch < params.BeaconConfig().FuluForkEpoch { s.subscribeWithParameters( p2p.BlobSubnetTopicFormat, s.validateBlob, @@ -186,6 +187,18 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, ) } + + // New gossip topic in Fulu + if params.BeaconConfig().FuluForkEpoch <= epoch { + s.subscribeWithParameters( + p2p.DataColumnSubnetTopicFormat, + s.validateDataColumn, + func(context.Context, proto.Message) error { return nil }, + digest, + func(primitives.Slot) []uint64 { return nil }, + func(currentSlot primitives.Slot) []uint64 { return []uint64{} }, + ) + } } // subscribe to a given topic with a given validator and subscription handler. @@ -345,7 +358,7 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p if features.Get().EnableFullSSZDataLogging { fields["message"] = hexutil.Encode(msg.Data) } - log.WithError(err).WithFields(fields).Debugf("Gossip message was rejected") + log.WithError(err).WithFields(fields).Debug("Gossip message was rejected") messageFailedValidationCounter.WithLabelValues(topic).Inc() } if b == pubsub.ValidationIgnore { diff --git a/beacon-chain/sync/validate_data_column.go b/beacon-chain/sync/validate_data_column.go new file mode 100644 index 0000000000..1cc4662821 --- /dev/null +++ b/beacon-chain/sync/validate_data_column.go @@ -0,0 +1,299 @@ +package sync + +import ( + "context" + "fmt" + "math" + "time" + + "github.com/OffchainLabs/prysm/v6/beacon-chain/verification" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" + "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + "github.com/OffchainLabs/prysm/v6/crypto/rand" + "github.com/OffchainLabs/prysm/v6/encoding/bytesutil" + eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "github.com/OffchainLabs/prysm/v6/runtime/logging" + prysmTime "github.com/OffchainLabs/prysm/v6/time" + "github.com/OffchainLabs/prysm/v6/time/slots" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" + + "github.com/sirupsen/logrus" +) + +// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub +func (s *Service) validateDataColumn(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) { + const dataColumnSidecarSubTopic = "/data_column_sidecar_%d/" + + dataColumnSidecarVerificationRequestsCounter.Inc() + receivedTime := prysmTime.Now() + + // Always accept messages our own messages. + if pid == s.cfg.p2p.PeerID() { + return pubsub.ValidationAccept, nil + } + + // Ignore messages during initial sync. + if s.cfg.initialSync.Syncing() { + return pubsub.ValidationIgnore, nil + } + + // Reject messages with a nil topic. + if msg.Topic == nil { + return pubsub.ValidationReject, errInvalidTopic + } + + // Decode the message, reject if it fails. + m, err := s.decodePubsubMessage(msg) + if err != nil { + log.WithError(err).Error("Failed to decode message") + return pubsub.ValidationReject, err + } + + // Reject messages that are not of the expected type. + dcsc, ok := m.(*eth.DataColumnSidecar) + if !ok { + log.WithField("message", m).Error("Message is not of type *eth.DataColumnSidecar") + return pubsub.ValidationReject, errWrongMessage + } + + // Convert to a read-only data column sidecar. + roDataColumn, err := blocks.NewRODataColumn(dcsc) + if err != nil { + return pubsub.ValidationReject, errors.Wrap(err, "roDataColumn conversion failure") + } + + // Compute a batch of only one data column sidecar. + roDataColumns := []blocks.RODataColumn{roDataColumn} + + // Create the verifier. + verifier := s.newColumnsVerifier(roDataColumns, verification.GossipDataColumnSidecarRequirements) + + // Start the verification process. + // https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/p2p-interface.md#the-gossip-domain-gossipsub + + // [REJECT] The sidecar is valid as verified by `verify_data_column_sidecar(sidecar)`. + if err := verifier.ValidFields(); err != nil { + return pubsub.ValidationReject, err + } + + // [REJECT] The sidecar is for the correct subnet -- i.e. `compute_subnet_for_data_column_sidecar(sidecar.index) == subnet_id`. + if err := verifier.CorrectSubnet(dataColumnSidecarSubTopic, []string{*msg.Topic}); err != nil { + return pubsub.ValidationReject, err + } + + // [IGNORE] The sidecar is not from a future slot (with a `MAXIMUM_GOSSIP_CLOCK_DISPARITY`` allowance + // -- i.e. validate that `block_header.slot <= current_slot` (a client MAY queue future sidecars for processing at the appropriate slot). + if err := verifier.NotFromFutureSlot(); err != nil { + return pubsub.ValidationIgnore, err + } + + // [IGNORE] The sidecar is from a slot greater than the latest finalized slot + // -- i.e. validate that `block_header.slot > compute_start_slot_at_epoch(state.finalized_checkpoint.epoch)` + if err := verifier.SlotAboveFinalized(); err != nil { + return pubsub.ValidationIgnore, err + } + + // [IGNORE] The sidecar's block's parent (defined by `block_header.parent_root`) has been seen (via gossip or non-gossip sources + // (a client MAY queue sidecars for processing once the parent block is retrieved). + if err := verifier.SidecarParentSeen(s.hasBadBlock); err != nil { + // If we haven't seen the parent, request it asynchronously. + go func() { + customCtx := context.Background() + parentRoot := roDataColumn.ParentRoot() + roots := [][fieldparams.RootLength]byte{parentRoot} + randGenerator := rand.NewGenerator() + if err := s.sendBatchRootRequest(customCtx, roots, randGenerator); err != nil { + log.WithError(err).WithFields(logging.DataColumnFields(roDataColumn)).Debug("Failed to send batch root request") + } + }() + + return pubsub.ValidationIgnore, err + } + + // [REJECT] The sidecar's block's parent (defined by `block_header.parent_root`) passes validation. + if err := verifier.SidecarParentValid(s.hasBadBlock); err != nil { + return pubsub.ValidationReject, err + } + + // [REJECT] The proposer signature of `sidecar.signed_block_header`, is valid with respect to the `block_header.proposer_index` pubkey. + // We do not strictly respect the spec ordering here. This is necessary because signature verification depends on the parent root, + // which is only available if the parent block is known. + if err := verifier.ValidProposerSignature(ctx); err != nil { + return pubsub.ValidationReject, err + } + + // [REJECT] The sidecar is from a higher slot than the sidecar's block's parent (defined by `block_header.parent_root`). + if err := verifier.SidecarParentSlotLower(); err != nil { + return pubsub.ValidationReject, err + } + + // [REJECT] The current finalized_checkpoint is an ancestor of the sidecar's block + // -- i.e. `get_checkpoint_block(store, block_header.parent_root, store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root`. + if err := verifier.SidecarDescendsFromFinalized(); err != nil { + return pubsub.ValidationReject, err + } + + // [REJECT] The sidecar's kzg_commitments field inclusion proof is valid as verified by `verify_data_column_sidecar_inclusion_proof(sidecar)`. + if err := verifier.SidecarInclusionProven(); err != nil { + return pubsub.ValidationReject, err + } + + // [REJECT] The sidecar's column data is valid as verified by `verify_data_column_sidecar_kzg_proofs(sidecar)`. + if err := verifier.SidecarKzgProofVerified(); err != nil { + return pubsub.ValidationReject, err + } + + // [IGNORE] The sidecar is the first sidecar for the tuple `(block_header.slot, block_header.proposer_index, sidecar.index)` + // with valid header signature, sidecar inclusion proof, and kzg proof. + if s.hasSeenDataColumnIndex(roDataColumn.Slot(), roDataColumn.ProposerIndex(), roDataColumn.DataColumnSidecar.Index) { + return pubsub.ValidationIgnore, nil + } + + // [REJECT] The sidecar is proposed by the expected `proposer_index` for the block's slot in the context of the current shuffling (defined by block_header.parent_root/block_header.slot). + // If the `proposer_index` cannot immediately be verified against the expected shuffling, the sidecar MAY be queued for later processing while proposers for the block's branch are calculated + // -- in such a case do not REJECT, instead IGNORE this message. + if err := verifier.SidecarProposerExpected(ctx); err != nil { + return pubsub.ValidationReject, err + } + + verifiedRODataColumns, err := verifier.VerifiedRODataColumns() + if err != nil { + // This should never happen. + log.WithError(err).WithFields(logging.DataColumnFields(roDataColumn)).Error("Failed to get verified data columns") + return pubsub.ValidationIgnore, err + } + + verifiedRODataColumnsCount := len(verifiedRODataColumns) + + if verifiedRODataColumnsCount != 1 { + // This should never happen. + log.WithField("verifiedRODataColumnsCount", verifiedRODataColumnsCount).Error("Verified data columns count is not 1") + return pubsub.ValidationIgnore, errors.New("Wrong number of verified data columns") + } + + msg.ValidatorData = verifiedRODataColumns[0] + dataColumnSidecarVerificationSuccessesCounter.Inc() + + // Get the time at slot start. + startTime, err := slots.ToTime(uint64(s.cfg.chain.GenesisTime().Unix()), roDataColumn.SignedBlockHeader.Header.Slot) + if err != nil { + return pubsub.ValidationIgnore, err + } + + sinceSlotStartTime := receivedTime.Sub(startTime) + validationTime := s.cfg.clock.Now().Sub(receivedTime) + dataColumnSidecarVerificationGossipHistogram.Observe(float64(validationTime.Milliseconds())) + + peerGossipScore := s.cfg.p2p.Peers().Scorers().GossipScorer().Score(pid) + + select { + case s.dataColumnLogCh <- dataColumnLogEntry{ + Slot: roDataColumn.Slot(), + ColIdx: roDataColumn.Index, + PropIdx: roDataColumn.ProposerIndex(), + BlockRoot: roDataColumn.BlockRoot(), + ParentRoot: roDataColumn.ParentRoot(), + PeerSuffix: pid.String()[len(pid.String())-6:], + PeerGossipScore: peerGossipScore, + validationTime: validationTime, + sinceStartTime: sinceSlotStartTime, + }: + default: + log.WithField("slot", roDataColumn.Slot()).Warn("Failed to send data column log entry") + } + + return pubsub.ValidationAccept, nil +} + +// Returns true if the column with the same slot, proposer index, and column index has been seen before. +func (s *Service) hasSeenDataColumnIndex(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, index uint64) bool { + key := computeCacheKey(slot, proposerIndex, index) + _, seen := s.seenDataColumnCache.Get(key) + return seen +} + +// Sets the data column with the same slot, proposer index, and data column index as seen. +func (s *Service) setSeenDataColumnIndex(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, index uint64) { + key := computeCacheKey(slot, proposerIndex, index) + s.seenDataColumnCache.Add(key, true) +} + +func computeCacheKey(slot primitives.Slot, proposerIndex primitives.ValidatorIndex, index uint64) string { + key := make([]byte, 0, 96) + + key = append(key, bytesutil.Bytes32(uint64(slot))...) + key = append(key, bytesutil.Bytes32(uint64(proposerIndex))...) + key = append(key, bytesutil.Bytes32(index)...) + + return string(key) +} + +type dataColumnLogEntry struct { + Slot primitives.Slot + ColIdx uint64 + PropIdx primitives.ValidatorIndex + BlockRoot [32]byte + ParentRoot [32]byte + PeerSuffix string + PeerGossipScore float64 + validationTime time.Duration + sinceStartTime time.Duration +} + +func (s *Service) processDataColumnLogs() { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + slotStats := make(map[primitives.Slot][fieldparams.NumberOfColumns]dataColumnLogEntry) + + for { + select { + case entry := <-s.dataColumnLogCh: + cols := slotStats[entry.Slot] + cols[entry.ColIdx] = entry + slotStats[entry.Slot] = cols + case <-ticker.C: + for slot, columns := range slotStats { + var ( + colIndices = make([]uint64, 0, fieldparams.NumberOfColumns) + peers = make([]string, 0, fieldparams.NumberOfColumns) + gossipScores = make([]float64, 0, fieldparams.NumberOfColumns) + validationTimes = make([]string, 0, fieldparams.NumberOfColumns) + sinceStartTimes = make([]string, 0, fieldparams.NumberOfColumns) + ) + + totalReceived := 0 + for _, entry := range columns { + if entry.PeerSuffix == "" { + continue + } + colIndices = append(colIndices, entry.ColIdx) + peers = append(peers, entry.PeerSuffix) + gossipScores = append(gossipScores, roundFloat(entry.PeerGossipScore, 2)) + validationTimes = append(validationTimes, fmt.Sprintf("%.2fms", float64(entry.validationTime.Milliseconds()))) + sinceStartTimes = append(sinceStartTimes, fmt.Sprintf("%.2fms", float64(entry.sinceStartTime.Milliseconds()))) + totalReceived++ + } + + log.WithFields(logrus.Fields{ + "slot": slot, + "receivedCount": totalReceived, + "columnIndices": colIndices, + "peers": peers, + "gossipScores": gossipScores, + "validationTimes": validationTimes, + "sinceStartTimes": sinceStartTimes, + }).Debug("Accepted data column sidecars summary") + } + slotStats = make(map[primitives.Slot][fieldparams.NumberOfColumns]dataColumnLogEntry) + } + } +} + +func roundFloat(f float64, decimals int) float64 { + mult := math.Pow(10, float64(decimals)) + return math.Round(f*mult) / mult +} diff --git a/beacon-chain/sync/validate_data_column_test.go b/beacon-chain/sync/validate_data_column_test.go new file mode 100644 index 0000000000..8cbe0df4a3 --- /dev/null +++ b/beacon-chain/sync/validate_data_column_test.go @@ -0,0 +1,224 @@ +package sync + +import ( + "bytes" + "context" + "errors" + "reflect" + "testing" + "time" + + mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" + "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p" + p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing" + "github.com/OffchainLabs/prysm/v6/beacon-chain/startup" + mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing" + "github.com/OffchainLabs/prysm/v6/beacon-chain/verification" + lruwrpr "github.com/OffchainLabs/prysm/v6/cache/lru" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" + ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "github.com/OffchainLabs/prysm/v6/testing/require" + "github.com/OffchainLabs/prysm/v6/testing/util" + pubsub "github.com/libp2p/go-libp2p-pubsub" + pb "github.com/libp2p/go-libp2p-pubsub/pb" + ssz "github.com/prysmaticlabs/fastssz" +) + +func TestValidateDataColumn(t *testing.T) { + ctx := context.Background() + + t.Run("from self", func(t *testing.T) { + p := p2ptest.NewTestP2P(t) + s := &Service{cfg: &config{p2p: p}} + + result, err := s.validateDataColumn(ctx, s.cfg.p2p.PeerID(), nil) + require.NoError(t, err) + require.Equal(t, result, pubsub.ValidationAccept) + }) + + t.Run("syncing", func(t *testing.T) { + p := p2ptest.NewTestP2P(t) + s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{IsSyncing: true}}} + + result, err := s.validateDataColumn(ctx, "", nil) + require.NoError(t, err) + require.Equal(t, result, pubsub.ValidationIgnore) + }) + + t.Run("invalid topic", func(t *testing.T) { + p := p2ptest.NewTestP2P(t) + s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}}} + + result, err := s.validateDataColumn(ctx, "", &pubsub.Message{Message: &pb.Message{}}) + require.ErrorIs(t, errInvalidTopic, err) + require.Equal(t, result, pubsub.ValidationReject) + }) + + serviceAndMessage := func(t *testing.T, newDataColumnsVerifier verification.NewDataColumnsVerifier, msg ssz.Marshaler) (*Service, *pubsub.Message) { + const genesisNSec = 0 + + p := p2ptest.NewTestP2P(t) + genesisSec := time.Now().Unix() - int64(params.BeaconConfig().SecondsPerSlot) + chainService := &mock.ChainService{Genesis: time.Unix(genesisSec, genesisNSec)} + + clock := startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot) + service := &Service{ + cfg: &config{p2p: p, initialSync: &mockSync.Sync{}, clock: clock, chain: chainService}, + newColumnsVerifier: newDataColumnsVerifier, + seenDataColumnCache: lruwrpr.New(seenDataColumnSize), + } + + // Encode a `beaconBlock` message instead of expected. + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, msg) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] + digest, err := service.currentForkDigest() + require.NoError(t, err) + + topic = service.addDigestToTopic(topic, digest) + + message := &pubsub.Message{Message: &pb.Message{Data: buf.Bytes(), Topic: &topic}} + + return service, message + } + + t.Run("invalid message type", func(t *testing.T) { + // Encode a `beaconBlock` message instead of expected. + service, message := serviceAndMessage(t, nil, util.NewBeaconBlock()) + result, err := service.validateDataColumn(ctx, "", message) + require.ErrorIs(t, errWrongMessage, err) + require.Equal(t, pubsub.ValidationReject, result) + }) + + genericError := errors.New("generic error") + + dataColumnSidecarMsg := ðpb.DataColumnSidecar{ + SignedBlockHeader: ðpb.SignedBeaconBlockHeader{ + Header: ðpb.BeaconBlockHeader{ + ParentRoot: make([]byte, fieldparams.RootLength), + StateRoot: make([]byte, fieldparams.RootLength), + BodyRoot: make([]byte, fieldparams.RootLength), + }, + Signature: make([]byte, fieldparams.BLSSignatureLength), + }, + KzgCommitmentsInclusionProof: [][]byte{ + make([]byte, 32), + make([]byte, 32), + make([]byte, 32), + make([]byte, 32), + }, + } + + testCases := []struct { + name string + verifier verification.NewDataColumnsVerifier + expectedResult pubsub.ValidationResult + expectedError error + }{ + { + name: "valid fields", + verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{ErrValidFields: genericError}), + expectedResult: pubsub.ValidationReject, + expectedError: genericError, + }, + { + name: "correct subnet", + verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{ErrCorrectSubnet: genericError}), + expectedResult: pubsub.ValidationReject, + expectedError: genericError, + }, + { + name: "not for future slot", + verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{ErrNotFromFutureSlot: genericError}), + expectedResult: pubsub.ValidationIgnore, + expectedError: genericError, + }, + { + name: "slot above finalized", + verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{ErrSlotAboveFinalized: genericError}), + expectedResult: pubsub.ValidationIgnore, + expectedError: genericError, + }, + { + name: "sidecar parent seen", + verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{ErrSidecarParentSeen: genericError}), + expectedResult: pubsub.ValidationIgnore, + expectedError: genericError, + }, + { + name: "sidecar parent valid", + verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{ErrSidecarParentValid: genericError}), + expectedResult: pubsub.ValidationReject, + expectedError: genericError, + }, + { + name: "valid proposer signature", + verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{ErrValidProposerSignature: genericError}), + expectedResult: pubsub.ValidationReject, + expectedError: genericError, + }, + { + name: "sidecar parent slot lower", + verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{ErrSidecarParentSlotLower: genericError}), + expectedResult: pubsub.ValidationReject, + expectedError: genericError, + }, + { + name: "sidecar descends from finalized", + verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{ErrSidecarDescendsFromFinalized: genericError}), + expectedResult: pubsub.ValidationReject, + expectedError: genericError, + }, + { + name: "sidecar inclusion proven", + verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{ErrSidecarInclusionProven: genericError}), + expectedResult: pubsub.ValidationReject, + expectedError: genericError, + }, + { + name: "sidecar kzg proof verified", + verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{ErrSidecarKzgProofVerified: genericError}), + expectedResult: pubsub.ValidationReject, + expectedError: genericError, + }, + { + name: "sidecar proposer expected", + verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{ErrSidecarProposerExpected: genericError}), + expectedResult: pubsub.ValidationReject, + expectedError: genericError, + }, + { + name: "nominal", + verifier: testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{}), + expectedResult: pubsub.ValidationAccept, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + service, message := serviceAndMessage(t, tc.verifier, dataColumnSidecarMsg) + result, err := service.validateDataColumn(ctx, "aDummyPID", message) + require.ErrorIs(t, tc.expectedError, err) + require.Equal(t, tc.expectedResult, result) + }) + } + + t.Run("seen data column", func(t *testing.T) { + service, message := serviceAndMessage(t, testNewDataColumnSidecarsVerifier(verification.MockDataColumnsVerifier{}), dataColumnSidecarMsg) + service.setSeenDataColumnIndex(0, 0, 0) + result, err := service.validateDataColumn(ctx, "aDummyPID", message) + require.NoError(t, err) + require.Equal(t, pubsub.ValidationIgnore, result) + }) + +} + +func testNewDataColumnSidecarsVerifier(verifier verification.MockDataColumnsVerifier) verification.NewDataColumnsVerifier { + return func([]blocks.RODataColumn, []verification.Requirement) verification.DataColumnsVerifier { + return &verifier + } +} diff --git a/beacon-chain/verification/mock.go b/beacon-chain/verification/mock.go index de695fa0fb..78f56fe545 100644 --- a/beacon-chain/verification/mock.go +++ b/beacon-chain/verification/mock.go @@ -3,9 +3,12 @@ package verification import ( "context" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" ) +// Blob sidecars +// ------------- type MockBlobVerifier struct { ErrBlobIndexInBounds error ErrSlotTooEarly error @@ -21,6 +24,8 @@ type MockBlobVerifier struct { CbVerifiedROBlob func() (blocks.VerifiedROBlob, error) } +var _ BlobVerifier = &MockBlobVerifier{} + func (m *MockBlobVerifier) VerifiedROBlob() (blocks.VerifiedROBlob, error) { return m.CbVerifiedROBlob() } @@ -71,4 +76,76 @@ func (m *MockBlobVerifier) SidecarProposerExpected(_ context.Context) (err error func (*MockBlobVerifier) SatisfyRequirement(_ Requirement) {} -var _ BlobVerifier = &MockBlobVerifier{} +// Data column sidecars +// -------------------- + +type MockDataColumnsVerifier struct { + ErrValidFields error + ErrCorrectSubnet error + ErrNotFromFutureSlot error + ErrSlotAboveFinalized error + ErrSidecarParentSeen error + ErrSidecarParentValid error + ErrValidProposerSignature error + ErrSidecarParentSlotLower error + ErrSidecarDescendsFromFinalized error + ErrSidecarInclusionProven error + ErrSidecarKzgProofVerified error + ErrSidecarProposerExpected error +} + +var _ DataColumnsVerifier = &MockDataColumnsVerifier{} + +func (m *MockDataColumnsVerifier) VerifiedRODataColumns() ([]blocks.VerifiedRODataColumn, error) { + return []blocks.VerifiedRODataColumn{{}}, nil +} + +func (m *MockDataColumnsVerifier) SatisfyRequirement(_ Requirement) {} + +func (m *MockDataColumnsVerifier) ValidFields() error { + return m.ErrValidFields +} + +func (m *MockDataColumnsVerifier) CorrectSubnet(_ string, _ []string) error { + return m.ErrCorrectSubnet +} + +func (m *MockDataColumnsVerifier) NotFromFutureSlot() error { + return m.ErrNotFromFutureSlot +} + +func (m *MockDataColumnsVerifier) SlotAboveFinalized() error { + return m.ErrSlotAboveFinalized +} + +func (m *MockDataColumnsVerifier) ValidProposerSignature(_ context.Context) error { + return m.ErrValidProposerSignature +} + +func (m *MockDataColumnsVerifier) SidecarParentSeen(_ func([fieldparams.RootLength]byte) bool) error { + return m.ErrSidecarParentSeen +} + +func (m *MockDataColumnsVerifier) SidecarParentValid(_ func([fieldparams.RootLength]byte) bool) error { + return m.ErrSidecarParentValid +} + +func (m *MockDataColumnsVerifier) SidecarParentSlotLower() error { + return m.ErrSidecarParentSlotLower +} + +func (m *MockDataColumnsVerifier) SidecarDescendsFromFinalized() error { + return m.ErrSidecarDescendsFromFinalized +} + +func (m *MockDataColumnsVerifier) SidecarInclusionProven() error { + return m.ErrSidecarInclusionProven +} + +func (m *MockDataColumnsVerifier) SidecarKzgProofVerified() error { + return m.ErrSidecarKzgProofVerified +} + +func (m *MockDataColumnsVerifier) SidecarProposerExpected(_ context.Context) error { + return m.ErrSidecarProposerExpected +} diff --git a/changelog/manu-peerdas-validation.md b/changelog/manu-peerdas-validation.md new file mode 100644 index 0000000000..e217365fdb --- /dev/null +++ b/changelog/manu-peerdas-validation.md @@ -0,0 +1,2 @@ +### Added +- PeerDAS: Validation pipeline for data column sidecars received via gossip. diff --git a/consensus-types/blocks/rodatacolumn.go b/consensus-types/blocks/rodatacolumn.go index 4a018bbd22..0ac8cb1f7b 100644 --- a/consensus-types/blocks/rodatacolumn.go +++ b/consensus-types/blocks/rodatacolumn.go @@ -37,10 +37,12 @@ func NewRODataColumn(dc *ethpb.DataColumnSidecar) (RODataColumn, error) { if err := roDataColumnNilCheck(dc); err != nil { return RODataColumn{}, err } + root, err := dc.SignedBlockHeader.Header.HashTreeRoot() if err != nil { return RODataColumn{}, err } + return RODataColumn{DataColumnSidecar: dc, root: root}, nil }