From e3ac8b7745cfc6da75fed8edf1eba47e3fdcc808 Mon Sep 17 00:00:00 2001 From: terencechain Date: Thu, 18 May 2023 09:13:18 -0700 Subject: [PATCH] Add Blob Gossip (#12413) --- beacon-chain/blockchain/receive_block.go | 6 + beacon-chain/blockchain/testing/mock.go | 6 + beacon-chain/p2p/gossip_topic_mappings.go | 1 + beacon-chain/p2p/pubsub_filter.go | 7 +- beacon-chain/p2p/pubsub_filter_test.go | 2 +- beacon-chain/p2p/topics.go | 5 +- beacon-chain/sync/BUILD.bazel | 3 + beacon-chain/sync/decode_pubsub.go | 2 + beacon-chain/sync/pending_blocks_queue.go | 33 ++ beacon-chain/sync/service.go | 7 + beacon-chain/sync/subscriber.go | 18 +- beacon-chain/sync/subscriber_blob_sidecar.go | 24 ++ beacon-chain/sync/subscriber_test.go | 2 +- beacon-chain/sync/validate_blob.go | 190 +++++++++ beacon-chain/sync/validate_blob_test.go | 415 +++++++++++++++++++ config/fieldparams/mainnet.go | 1 + config/fieldparams/minimal.go | 1 + config/params/mainnet_config.go | 2 + config/params/network_config.go | 1 + testing/util/block.go | 13 + 20 files changed, 732 insertions(+), 7 deletions(-) create mode 100644 beacon-chain/sync/subscriber_blob_sidecar.go create mode 100644 beacon-chain/sync/validate_blob.go create mode 100644 beacon-chain/sync/validate_blob_test.go diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index 16470eabd3..40fbfd26e1 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -34,6 +34,7 @@ type BlockReceiver interface { ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error ReceiveBlockBatch(ctx context.Context, blocks []interfaces.ReadOnlySignedBeaconBlock, blkRoots [][32]byte) error HasBlock(ctx context.Context, root [32]byte) bool + RecentBlockSlot(root [32]byte) (primitives.Slot, error) } // SlashingReceiver interface defines the methods of chain service for receiving validated slashing over the wire. @@ -237,6 +238,11 @@ func (s *Service) HasBlock(ctx context.Context, root [32]byte) bool { return s.hasBlockInInitSyncOrDB(ctx, root) } +// RecentBlockSlot returns block slot form fork choice store +func (s *Service) RecentBlockSlot(root [32]byte) (primitives.Slot, error) { + return s.cfg.ForkChoiceStore.Slot(root) +} + // ReceiveAttesterSlashing receives an attester slashing and inserts it to forkchoice func (s *Service) ReceiveAttesterSlashing(ctx context.Context, slashing *ethpb.AttesterSlashing) { s.cfg.ForkChoiceStore.Lock() diff --git a/beacon-chain/blockchain/testing/mock.go b/beacon-chain/blockchain/testing/mock.go index bf17d450ac..8c6fbd6f67 100644 --- a/beacon-chain/blockchain/testing/mock.go +++ b/beacon-chain/blockchain/testing/mock.go @@ -69,6 +69,7 @@ type ChainService struct { OptimisticCheckRootReceived [32]byte FinalizedRoots map[[32]byte]bool OptimisticRoots map[[32]byte]bool + BlockSlot primitives.Slot } func (s *ChainService) Ancestor(ctx context.Context, root []byte, slot primitives.Slot) ([]byte, error) { @@ -389,6 +390,11 @@ func (s *ChainService) HasBlock(ctx context.Context, rt [32]byte) bool { return s.InitSyncBlockRoots[rt] } +// RecentBlockSlot mocks the same method in the chain service. +func (s *ChainService) RecentBlockSlot([32]byte) (primitives.Slot, error) { + return s.BlockSlot, nil +} + // HeadGenesisValidatorsRoot mocks HeadGenesisValidatorsRoot method in chain service. func (_ *ChainService) HeadGenesisValidatorsRoot() [32]byte { return [32]byte{} diff --git a/beacon-chain/p2p/gossip_topic_mappings.go b/beacon-chain/p2p/gossip_topic_mappings.go index 354530efba..7bffaa454d 100644 --- a/beacon-chain/p2p/gossip_topic_mappings.go +++ b/beacon-chain/p2p/gossip_topic_mappings.go @@ -21,6 +21,7 @@ var gossipTopicMappings = map[string]proto.Message{ SyncContributionAndProofSubnetTopicFormat: ðpb.SignedContributionAndProof{}, SyncCommitteeSubnetTopicFormat: ðpb.SyncCommitteeMessage{}, BlsToExecutionChangeSubnetTopicFormat: ðpb.SignedBLSToExecutionChange{}, + BlobSubnetTopicFormat: ðpb.SignedBlobSidecar{}, } // GossipTopicMappings is a function to return the assigned data type diff --git a/beacon-chain/p2p/pubsub_filter.go b/beacon-chain/p2p/pubsub_filter.go index b9f42c1e3b..866c2ba8f1 100644 --- a/beacon-chain/p2p/pubsub_filter.go +++ b/beacon-chain/p2p/pubsub_filter.go @@ -57,12 +57,17 @@ func (s *Service) CanSubscribe(topic string) bool { log.WithError(err).Error("Could not determine Capella fork digest") return false } - + denebForkDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().DenebForkEpoch, s.genesisValidatorsRoot) + if err != nil { + log.WithError(err).Error("Could not determine Deneb fork digest") + return false + } switch parts[2] { case fmt.Sprintf("%x", phase0ForkDigest): case fmt.Sprintf("%x", altairForkDigest): case fmt.Sprintf("%x", bellatrixForkDigest): case fmt.Sprintf("%x", capellaForkDigest): + case fmt.Sprintf("%x", denebForkDigest): default: return false } diff --git a/beacon-chain/p2p/pubsub_filter_test.go b/beacon-chain/p2p/pubsub_filter_test.go index 18d542b02f..d3a59f9366 100644 --- a/beacon-chain/p2p/pubsub_filter_test.go +++ b/beacon-chain/p2p/pubsub_filter_test.go @@ -90,7 +90,7 @@ func TestService_CanSubscribe(t *testing.T) { formatting := []interface{}{digest} // Special case for attestation subnets which have a second formatting placeholder. - if topic == AttestationSubnetTopicFormat || topic == SyncCommitteeSubnetTopicFormat { + if topic == AttestationSubnetTopicFormat || topic == SyncCommitteeSubnetTopicFormat || topic == BlobSubnetTopicFormat { formatting = append(formatting, 0 /* some subnet ID */) } diff --git a/beacon-chain/p2p/topics.go b/beacon-chain/p2p/topics.go index ee9af95f96..3187e36a5c 100644 --- a/beacon-chain/p2p/topics.go +++ b/beacon-chain/p2p/topics.go @@ -28,7 +28,8 @@ const ( GossipContributionAndProofMessage = "sync_committee_contribution_and_proof" // GossipBlsToExecutionChangeMessage is the name for the bls to execution change message type. GossipBlsToExecutionChangeMessage = "bls_to_execution_change" - + // GossipBlobSidecarMessage is the name for the blob sidecar message type. + GossipBlobSidecarMessage = "blob_sidecar" // Topic Formats // // AttestationSubnetTopicFormat is the topic format for the attestation subnet. @@ -49,4 +50,6 @@ const ( SyncContributionAndProofSubnetTopicFormat = GossipProtocolAndDigest + GossipContributionAndProofMessage // BlsToExecutionChangeSubnetTopicFormat is the topic format for the bls to execution change subnet. BlsToExecutionChangeSubnetTopicFormat = GossipProtocolAndDigest + GossipBlsToExecutionChangeMessage + // BlobSubnetTopicFormat is the topic format for the blob subnet. + BlobSubnetTopicFormat = GossipProtocolAndDigest + GossipBlobSidecarMessage + "_%d" ) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 17c1c503d3..7c5ac46b28 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "subscriber_beacon_aggregate_proof.go", "subscriber_beacon_attestation.go", "subscriber_beacon_blocks.go", + "subscriber_blob_sidecar.go", "subscriber_bls_to_execution_change.go", "subscriber_handlers.go", "subscriber_sync_committee_message.go", @@ -42,6 +43,7 @@ go_library( "validate_attester_slashing.go", "validate_beacon_attestation.go", "validate_beacon_blocks.go", + "validate_blob.go", "validate_bls_to_execution_change.go", "validate_proposer_slashing.go", "validate_sync_committee_message.go", @@ -164,6 +166,7 @@ go_test( "validate_attester_slashing_test.go", "validate_beacon_attestation_test.go", "validate_beacon_blocks_test.go", + "validate_blob_test.go", "validate_bls_to_execution_change_test.go", "validate_proposer_slashing_test.go", "validate_sync_committee_message_test.go", diff --git a/beacon-chain/sync/decode_pubsub.go b/beacon-chain/sync/decode_pubsub.go index 1f2bb32bcc..a6d18bb245 100644 --- a/beacon-chain/sync/decode_pubsub.go +++ b/beacon-chain/sync/decode_pubsub.go @@ -37,6 +37,8 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err // differentiate them below. case strings.Contains(topic, p2p.GossipSyncCommitteeMessage) && !strings.Contains(topic, p2p.SyncContributionAndProofSubnetTopicFormat): topic = p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})] + case strings.Contains(topic, p2p.GossipBlobSidecarMessage): + topic = p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SignedBlobSidecar{})] } base := p2p.GossipTopicMappings(topic, 0) diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index ba99277357..ea1796c32d 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -19,6 +19,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v4/encoding/ssz/equality" "github.com/prysmaticlabs/prysm/v4/monitoring/tracing" + eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/time/slots" "github.com/sirupsen/logrus" "github.com/trailofbits/go-mutexasserts" @@ -427,6 +428,21 @@ func (s *Service) pendingBlocksInCache(slot primitives.Slot) []interfaces.ReadOn return blks } +// This returns signed blob sidecar given input key from slotToPendingBlobs. +func (s *Service) pendingBlobsInCache(slot primitives.Slot) []*eth.SignedBlobSidecar { + k := slotToCacheKey(slot) + value, ok := s.slotToPendingBlobs.Get(k) + if !ok { + return []*eth.SignedBlobSidecar{} + } + bs, ok := value.([]*eth.SignedBlobSidecar) + if !ok { + log.Debug("pendingBlobsInCache: value is not of type []*eth.SignedBlobSidecar") + return []*eth.SignedBlobSidecar{} + } + return bs +} + // This adds input signed beacon block to slotToPendingBlocks cache. func (s *Service) addPendingBlockToCache(b interfaces.ReadOnlySignedBeaconBlock) error { if err := blocks.BeaconBlockIsNil(b); err != nil { @@ -445,6 +461,23 @@ func (s *Service) addPendingBlockToCache(b interfaces.ReadOnlySignedBeaconBlock) return nil } +// This adds blob to slotToPendingBlobs cache. +func (s *Service) addPendingBlobToCache(b *eth.SignedBlobSidecar) error { + blobs := s.pendingBlobsInCache(b.Message.Slot) + + // If we already have seen the index. Ignore it. + for _, blob := range blobs { + if blob.Message.Index == b.Message.Index { + return nil + } + } + + blobs = append(blobs, b) + k := slotToCacheKey(b.Message.Slot) + s.slotToPendingBlobs.Set(k, blobs, pendingBlockExpTime) + return nil +} + // This converts input string to slot. func cacheKeyToSlot(s string) primitives.Slot { b := []byte(s) diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 5ed48b5cdb..f212e2f85d 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -44,6 +44,7 @@ var _ runtime.Service = (*Service)(nil) const rangeLimit uint64 = 1024 const seenBlockSize = 1000 +const seenBlobSize = seenBlockSize * 4 // Each block can have max 4 blobs. Worst case 164kB for cache. const seenUnaggregatedAttSize = 20000 const seenAggregatedAttSize = 1024 const seenSyncMsgSize = 1000 // Maximum of 512 sync committee members, 1000 is a safe amount. @@ -110,6 +111,7 @@ type Service struct { ctx context.Context cancel context.CancelFunc slotToPendingBlocks *gcache.Cache + slotToPendingBlobs *gcache.Cache seenPendingBlocks map[[32]byte]bool blkRootToPendingAtts map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof subHandler *subTopicHandler @@ -120,6 +122,8 @@ type Service struct { rateLimiter *limiter seenBlockLock sync.RWMutex seenBlockCache *lru.Cache + seenBlobLock sync.RWMutex + seenBlobCache *lru.Cache seenAggregatedAttestationLock sync.RWMutex seenAggregatedAttestationCache *lru.Cache seenUnAggregatedAttestationLock sync.RWMutex @@ -146,6 +150,7 @@ type Service struct { // NewService initializes new regular sync service. func NewService(ctx context.Context, opts ...Option) *Service { c := gcache.New(pendingBlockExpTime /* exp time */, 2*pendingBlockExpTime /* prune time */) + pendingBlobCache := gcache.New(0 /* exp time */, 0 /* prune time */) ctx, cancel := context.WithCancel(ctx) r := &Service{ ctx: ctx, @@ -153,6 +158,7 @@ func NewService(ctx context.Context, opts ...Option) *Service { chainStarted: abool.New(), cfg: &config{clock: startup.NewClock(time.Unix(0, 0), [32]byte{})}, slotToPendingBlocks: c, + slotToPendingBlobs: pendingBlobCache, seenPendingBlocks: make(map[[32]byte]bool), blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof), signatureChan: make(chan *signatureVerifier, verifierLimit), @@ -223,6 +229,7 @@ func (s *Service) Status() error { // and prevent DoS. func (s *Service) initCaches() { s.seenBlockCache = lruwrpr.New(seenBlockSize) + s.seenBlobCache = lruwrpr.New(seenBlobSize) s.seenAggregatedAttestationCache = lruwrpr.New(seenAggregatedAttSize) s.seenUnAggregatedAttestationCache = lruwrpr.New(seenUnaggregatedAttSize) s.seenSyncMessageCache = lruwrpr.New(seenSyncMsgSize) diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index d3ba37f7e0..91e22dc6d0 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -90,6 +90,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { s.validateCommitteeIndexBeaconAttestation, /* validator */ s.committeeIndexBeaconAttestationSubscriber, /* message handler */ digest, + params.BeaconNetworkConfig().AttestationSubnetCount, ) } else { s.subscribeDynamicWithSubnets( @@ -133,6 +134,17 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { digest, ) } + + // New Gossip Topic in Deneb + if epoch >= params.BeaconConfig().DenebForkEpoch { + s.subscribeStaticWithSubnets( + p2p.BlobSubnetTopicFormat, + s.validateBlob, /* validator */ + s.blobSubscriber, /* message handler */ + digest, + params.BeaconNetworkConfig().BlobsidecarSubnetCount, + ) + } } // subscribe to a given topic with a given validator and subscription handler. @@ -303,7 +315,7 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p // subscribe to a static subnet with the given topic and index.A given validator and subscription handler is // used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding. -func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte) { +func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte, subnetCount uint64) { genRoot := s.cfg.clock.GenesisValidatorsRoot() _, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:]) if err != nil { @@ -339,7 +351,7 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, if !valid { log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest) // Unsubscribes from all our current subnets. - for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ { + for i := uint64(0); i < subnetCount; i++ { fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix() s.unSubscribeFromTopic(fullTopic) } @@ -347,7 +359,7 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, return } // Check every slot that there are enough peers - for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ { + for i := uint64(0); i < subnetCount; i++ { if !s.validPeersExist(s.addDigestAndIndexToTopic(topic, digest, i)) { log.Debugf("No peers found subscribed to attestation gossip subnet with "+ "committee index %d. Searching network for peers subscribed to the subnet.", i) diff --git a/beacon-chain/sync/subscriber_blob_sidecar.go b/beacon-chain/sync/subscriber_blob_sidecar.go new file mode 100644 index 0000000000..1b86b84125 --- /dev/null +++ b/beacon-chain/sync/subscriber_blob_sidecar.go @@ -0,0 +1,24 @@ +package sync + +import ( + "context" + "fmt" + + eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "google.golang.org/protobuf/proto" +) + +func (s *Service) blobSubscriber(ctx context.Context, msg proto.Message) error { + b, ok := msg.(*eth.SignedBlobSidecar) + if !ok { + return fmt.Errorf("message was not type *eth.SignedBlobSidecar, type=%T", msg) + } + + log.WithFields(blobFields(b.Message)).Debug("Received blob sidecar") + + s.setSeenBlobIndex(b.Message.Blob, b.Message.Index) + + // TODO: Store blobs in cache. Will be addressed in subsequent PR. + + return nil +} diff --git a/beacon-chain/sync/subscriber_test.go b/beacon-chain/sync/subscriber_test.go index 878e69a748..59a65637a9 100644 --- a/beacon-chain/sync/subscriber_test.go +++ b/beacon-chain/sync/subscriber_test.go @@ -335,7 +335,7 @@ func TestStaticSubnets(t *testing.T) { r.subscribeStaticWithSubnets(defaultTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error { // no-op return nil - }, d) + }, d, params.BeaconNetworkConfig().AttestationSubnetCount) topics := r.cfg.p2p.PubSub().GetTopics() if uint64(len(topics)) != params.BeaconNetworkConfig().AttestationSubnetCount { t.Errorf("Wanted the number of subnet topics registered to be %d but got %d", params.BeaconNetworkConfig().AttestationSubnetCount, len(topics)) diff --git a/beacon-chain/sync/validate_blob.go b/beacon-chain/sync/validate_blob.go new file mode 100644 index 0000000000..5847860fa2 --- /dev/null +++ b/beacon-chain/sync/validate_blob.go @@ -0,0 +1,190 @@ +package sync + +import ( + "context" + "fmt" + "strings" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" + "github.com/prysmaticlabs/prysm/v4/config/params" + "github.com/prysmaticlabs/prysm/v4/crypto/bls" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + "github.com/prysmaticlabs/prysm/v4/network/forks" + eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/time/slots" + "github.com/sirupsen/logrus" +) + +func (s *Service) validateBlob(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) { + if pid == s.cfg.p2p.PeerID() { + return pubsub.ValidationAccept, nil + } + if s.cfg.initialSync.Syncing() { + return pubsub.ValidationIgnore, nil + } + if msg.Topic == nil { + return pubsub.ValidationReject, errInvalidTopic + } + m, err := s.decodePubsubMessage(msg) + if err != nil { + log.WithError(err).Error("Failed to decode message") + return pubsub.ValidationReject, err + } + + sBlob, ok := m.(*eth.SignedBlobSidecar) + if !ok { + log.WithField("message", m).Error("Message is not of type *eth.SignedBlobSidecar") + return pubsub.ValidationReject, errWrongMessage + } + blob := sBlob.Message + + // [REJECT] The sidecar is for the correct topic -- i.e. sidecar.index matches the topic {index}. + want := fmt.Sprintf("blob_sidecar_%d", blob.Index) + if !strings.Contains(*msg.Topic, want) { + log.WithFields(blobFields(blob)).Debug("Sidecar blob does not match topic") + return pubsub.ValidationReject, fmt.Errorf("wrong topic name: %s", *msg.Topic) + } + + // [IGNORE] The sidecar is not from a future slot (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) -- + // i.e. validate that sidecar.slot <= current_slot (a client MAY queue future blocks for processing at the appropriate slot). + genesisTime := uint64(s.cfg.chain.GenesisTime().Unix()) + if err := slots.VerifyTime(genesisTime, blob.Slot, earlyBlockProcessingTolerance); err != nil { + log.WithError(err).WithFields(blobFields(blob)).Debug("Ignored blob: too far into future") + return pubsub.ValidationIgnore, errors.Wrap(err, "blob too far into future") + } + + // [IGNORE] The sidecar is from a slot greater than the latest finalized slot -- + // i.e. validate that sidecar.slot > compute_start_slot_at_epoch(state.finalized_checkpoint.epoch) + startSlot, err := slots.EpochStart(s.cfg.chain.FinalizedCheckpt().Epoch) + if err != nil { + return pubsub.ValidationIgnore, err + } + if startSlot >= blob.Slot { + err := fmt.Errorf("finalized slot %d greater or equal to blob slot %d", startSlot, blob.Slot) + log.WithFields(blobFields(blob)).Debug(err) + return pubsub.ValidationIgnore, err + } + + // [IGNORE] The sidecar's block's parent (defined by sidecar.block_parent_root) has been seen (via both gossip and non-gossip sources) + parentRoot := bytesutil.ToBytes32(blob.BlockParentRoot) + if !s.cfg.chain.HasBlock(ctx, parentRoot) { + if err := s.addPendingBlobToCache(sBlob); err != nil { + log.WithError(err).WithFields(blobFields(blob)).Error("Failed to add blob to cache") + return pubsub.ValidationIgnore, err + } + log.WithFields(blobFields(blob)).Debug("Ignored blob: parent block not found") + return pubsub.ValidationIgnore, nil + } + + // [REJECT] The sidecar's block's parent (defined by sidecar.block_parent_root) passes validation. + parentSlot, err := s.cfg.chain.RecentBlockSlot(parentRoot) + if err != nil { + return pubsub.ValidationIgnore, err + } + // [REJECT] The sidecar is from a higher slot than the sidecar's block's parent (defined by sidecar.block_parent_root). + if parentSlot >= blob.Slot { + err := fmt.Errorf("parent block slot %d greater or equal to blob slot %d", parentSlot, blob.Slot) + log.WithFields(blobFields(blob)).Debug(err) + return pubsub.ValidationReject, err + } + + // [REJECT] The proposer signature, signed_blob_sidecar.signature, + // is valid with respect to the sidecar.proposer_index pubkey. + parentState, err := s.cfg.stateGen.StateByRoot(ctx, parentRoot) + if err != nil { + return pubsub.ValidationIgnore, err + } + if err := verifyBlobSignature(parentState, sBlob); err != nil { + log.WithError(err).WithFields(blobFields(blob)).Debug("Failed to verify blob signature") + return pubsub.ValidationReject, err + } + + // [IGNORE] The sidecar is the only sidecar with valid signature received for the tuple (sidecar.block_root, sidecar.index). + if s.hasSeenBlobIndex(blob.BlockRoot, blob.Index) { + return pubsub.ValidationIgnore, nil + } + + // [REJECT] The sidecar is proposed by the expected proposer_index for the block's slot in the context of the current shuffling (defined by block_parent_root/slot) + parentState, err = transition.ProcessSlotsUsingNextSlotCache(ctx, parentState, parentRoot[:], blob.Slot) + if err != nil { + return pubsub.ValidationIgnore, err + } + idx, err := helpers.BeaconProposerIndex(ctx, parentState) + if err != nil { + return pubsub.ValidationIgnore, err + } + if blob.ProposerIndex != idx { + err := fmt.Errorf("expected proposer index %d, got %d", idx, blob.ProposerIndex) + log.WithFields(blobFields(blob)).Debug(err) + return pubsub.ValidationReject, err + } + + msg.ValidatorData = sBlob + + return pubsub.ValidationAccept, nil +} + +func verifyBlobSignature(st state.BeaconState, blob *eth.SignedBlobSidecar) error { + currentEpoch := slots.ToEpoch(blob.Message.Slot) + fork, err := forks.Fork(currentEpoch) + if err != nil { + return err + } + domain, err := signing.Domain(fork, currentEpoch, params.BeaconConfig().DomainBlobSidecar, st.GenesisValidatorsRoot()) + if err != nil { + return err + } + proposer, err := st.ValidatorAtIndex(blob.Message.ProposerIndex) + if err != nil { + return err + } + pb, err := bls.PublicKeyFromBytes(proposer.PublicKey) + if err != nil { + return err + } + sig, err := bls.SignatureFromBytes(blob.Signature) + if err != nil { + return err + } + sr, err := signing.ComputeSigningRoot(blob.Message, domain) + if err != nil { + return err + } + if !sig.Verify(pb, sr[:]) { + return signing.ErrSigFailedToVerify + } + + return nil +} + +// Returns true if the blob with the same root and index has been seen before. +func (s *Service) hasSeenBlobIndex(root []byte, index uint64) bool { + s.seenBlobLock.RLock() + defer s.seenBlobLock.RUnlock() + b := append(root, bytesutil.Bytes32(index)...) + _, seen := s.seenBlobCache.Get(string(b)) + return seen +} + +// Set blob index and root as seen. +func (s *Service) setSeenBlobIndex(root []byte, index uint64) { + s.seenBlobLock.Lock() + defer s.seenBlobLock.Unlock() + b := append(root, bytesutil.Bytes32(index)...) + s.seenBlobCache.Add(string(b), true) +} + +func blobFields(b *eth.BlobSidecar) logrus.Fields { + return logrus.Fields{ + "slot": b.Slot, + "proposerIndex": b.ProposerIndex, + "blockRoot": fmt.Sprintf("%#x", b.BlockRoot), + "index": b.Index, + } +} diff --git a/beacon-chain/sync/validate_blob_test.go b/beacon-chain/sync/validate_blob_test.go new file mode 100644 index 0000000000..3afd2d6a05 --- /dev/null +++ b/beacon-chain/sync/validate_blob_test.go @@ -0,0 +1,415 @@ +package sync + +import ( + "bytes" + "context" + "reflect" + "testing" + "time" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + pb "github.com/libp2p/go-libp2p-pubsub/pb" + gcache "github.com/patrickmn/go-cache" + mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing" + dbtest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing" + doublylinkedtree "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/doubly-linked-tree" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen" + mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing" + lruwrpr "github.com/prysmaticlabs/prysm/v4/cache/lru" + "github.com/prysmaticlabs/prysm/v4/config/params" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/crypto/bls" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/testing/require" + "github.com/prysmaticlabs/prysm/v4/testing/util" + logTest "github.com/sirupsen/logrus/hooks/test" +) + +func TestValidateBlob_FromSelf(t *testing.T) { + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + s := &Service{cfg: &config{p2p: p}} + result, err := s.validateBlob(ctx, s.cfg.p2p.PeerID(), nil) + require.NoError(t, err) + require.Equal(t, result, pubsub.ValidationAccept) +} + +func TestValidateBlob_InitSync(t *testing.T) { + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{IsSyncing: true}}} + result, err := s.validateBlob(ctx, "", nil) + require.NoError(t, err) + require.Equal(t, result, pubsub.ValidationIgnore) +} + +func TestValidateBlob_InvalidTopic(t *testing.T) { + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}}} + result, err := s.validateBlob(ctx, "", &pubsub.Message{ + Message: &pb.Message{}, + }) + require.ErrorIs(t, errInvalidTopic, err) + require.Equal(t, result, pubsub.ValidationReject) +} + +func TestValidateBlob_InvalidMessageType(t *testing.T) { + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + chainService := &mock.ChainService{Genesis: time.Unix(time.Now().Unix()-int64(params.BeaconConfig().SecondsPerSlot), 0)} + s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}, clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}} + + msg := util.NewBeaconBlock() + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, msg) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] + digest, err := s.currentForkDigest() + require.NoError(t, err) + topic = s.addDigestToTopic(topic, digest) + result, err := s.validateBlob(ctx, "", &pubsub.Message{ + Message: &pb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }}) + require.ErrorIs(t, errWrongMessage, err) + require.Equal(t, result, pubsub.ValidationReject) +} + +func TestValidateBlob_InvalidTopicIndex(t *testing.T) { + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + chainService := &mock.ChainService{Genesis: time.Unix(time.Now().Unix()-int64(params.BeaconConfig().SecondsPerSlot), 0)} + s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}, clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}} + + msg := util.NewBlobsidecar() + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, msg) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)] + digest, err := s.currentForkDigest() + require.NoError(t, err) + topic = s.addDigestAndIndexToTopic(topic, digest, 1) + result, err := s.validateBlob(ctx, "", &pubsub.Message{ + Message: &pb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }}) + require.ErrorContains(t, "/eth2/f5a5fd42/blob_sidecar_1", err) + require.Equal(t, result, pubsub.ValidationReject) +} + +func TestValidateBlob_OlderThanFinalizedEpoch(t *testing.T) { + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + chainService := &mock.ChainService{Genesis: time.Now(), FinalizedCheckPoint: ð.Checkpoint{Epoch: 1}} + s := &Service{cfg: &config{ + p2p: p, + initialSync: &mockSync.Sync{}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}} + + b := util.NewBlobsidecar() + b.Message.Slot = chainService.CurrentSlot() + 1 + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, b) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(b)] + digest, err := s.currentForkDigest() + require.NoError(t, err) + topic = s.addDigestAndIndexToTopic(topic, digest, 0) + result, err := s.validateBlob(ctx, "", &pubsub.Message{ + Message: &pb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }}) + require.ErrorContains(t, "finalized slot 32 greater or equal to blob slot 1", err) + require.Equal(t, result, pubsub.ValidationIgnore) +} + +func TestValidateBlob_UnknownParentBlock(t *testing.T) { + hook := logTest.NewGlobal() + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + chainService := &mock.ChainService{Genesis: time.Now(), FinalizedCheckPoint: ð.Checkpoint{}} + s := &Service{ + slotToPendingBlobs: gcache.New(time.Second, 2*time.Second), + cfg: &config{ + p2p: p, + initialSync: &mockSync.Sync{}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}} + + b := util.NewBlobsidecar() + b.Message.Slot = chainService.CurrentSlot() + 1 + buf := new(bytes.Buffer) + _, err := p.Encoding().EncodeGossip(buf, b) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(b)] + digest, err := s.currentForkDigest() + require.NoError(t, err) + topic = s.addDigestAndIndexToTopic(topic, digest, 0) + result, err := s.validateBlob(ctx, "", &pubsub.Message{ + Message: &pb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }}) + require.NoError(t, err) + require.Equal(t, result, pubsub.ValidationIgnore) + require.LogsContain(t, hook, "Ignored blob: parent block not found") + + pendingBlobs := s.pendingBlobsInCache(b.Message.Slot) + require.Equal(t, 1, len(pendingBlobs)) +} + +func TestValidateBlob_HigherThanParentSlot(t *testing.T) { + db := dbtest.SetupDB(t) + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + chainService := &mock.ChainService{Genesis: time.Now(), FinalizedCheckPoint: ð.Checkpoint{}, DB: db} + s := &Service{ + slotToPendingBlobs: gcache.New(time.Second, 2*time.Second), + cfg: &config{ + p2p: p, + initialSync: &mockSync.Sync{}, + chain: chainService, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}} + + b := util.NewBlobsidecar() + b.Message.Slot = chainService.CurrentSlot() + 1 + chainService.BlockSlot = chainService.CurrentSlot() + 1 + bb := util.NewBeaconBlock() + bb.Block.Slot = b.Message.Slot + signedBb, err := blocks.NewSignedBeaconBlock(bb) + require.NoError(t, err) + require.NoError(t, db.SaveBlock(ctx, signedBb)) + r, err := signedBb.Block().HashTreeRoot() + require.NoError(t, err) + + b.Message.BlockParentRoot = r[:] + + buf := new(bytes.Buffer) + _, err = p.Encoding().EncodeGossip(buf, b) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(b)] + digest, err := s.currentForkDigest() + require.NoError(t, err) + topic = s.addDigestAndIndexToTopic(topic, digest, 0) + result, err := s.validateBlob(ctx, "", &pubsub.Message{ + Message: &pb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }}) + require.ErrorContains(t, "parent block slot 1 greater or equal to blob slot 1", err) + require.Equal(t, result, pubsub.ValidationReject) +} + +func TestValidateBlob_InvalidProposerSignature(t *testing.T) { + db := dbtest.SetupDB(t) + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + chainService := &mock.ChainService{Genesis: time.Now(), FinalizedCheckPoint: ð.Checkpoint{}, DB: db} + stateGen := stategen.New(db, doublylinkedtree.New()) + s := &Service{ + slotToPendingBlobs: gcache.New(time.Second, 2*time.Second), + cfg: &config{ + p2p: p, + initialSync: &mockSync.Sync{}, + chain: chainService, + stateGen: stateGen, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}} + + b := util.NewBlobsidecar() + b.Message.Slot = chainService.CurrentSlot() + 1 + sk, err := bls.SecretKeyFromBytes(bytesutil.PadTo([]byte("sk"), 32)) + require.NoError(t, err) + b.Signature = sk.Sign([]byte("data")).Marshal() + + bb := util.NewBeaconBlock() + signedBb, err := blocks.NewSignedBeaconBlock(bb) + require.NoError(t, err) + require.NoError(t, db.SaveBlock(ctx, signedBb)) + r, err := signedBb.Block().HashTreeRoot() + require.NoError(t, err) + + b.Message.BlockParentRoot = r[:] + + buf := new(bytes.Buffer) + _, err = p.Encoding().EncodeGossip(buf, b) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(b)] + digest, err := s.currentForkDigest() + require.NoError(t, err) + topic = s.addDigestAndIndexToTopic(topic, digest, 0) + result, err := s.validateBlob(ctx, "", &pubsub.Message{ + Message: &pb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }}) + require.ErrorIs(t, signing.ErrSigFailedToVerify, err) + require.Equal(t, result, pubsub.ValidationReject) +} + +func TestValidateBlob_AlreadySeenInCache(t *testing.T) { + db := dbtest.SetupDB(t) + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + chainService := &mock.ChainService{Genesis: time.Now(), FinalizedCheckPoint: ð.Checkpoint{}, DB: db} + stateGen := stategen.New(db, doublylinkedtree.New()) + s := &Service{ + slotToPendingBlobs: gcache.New(time.Second, 2*time.Second), + seenBlobCache: lruwrpr.New(10), + cfg: &config{ + p2p: p, + initialSync: &mockSync.Sync{}, + chain: chainService, + stateGen: stateGen, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}} + + b := util.NewBlobsidecar() + b.Message.Slot = chainService.CurrentSlot() + 1 + beaconState, privKeys := util.DeterministicGenesisState(t, 100) + + bb := util.NewBeaconBlock() + signedBb, err := blocks.NewSignedBeaconBlock(bb) + require.NoError(t, err) + require.NoError(t, db.SaveBlock(ctx, signedBb)) + r, err := signedBb.Block().HashTreeRoot() + require.NoError(t, err) + require.NoError(t, db.SaveState(ctx, beaconState, r)) + + b.Message.BlockParentRoot = r[:] + b.Signature, err = signing.ComputeDomainAndSign(beaconState, 0, b.Message, params.BeaconConfig().DomainBlobSidecar, privKeys[0]) + require.NoError(t, err) + + buf := new(bytes.Buffer) + _, err = p.Encoding().EncodeGossip(buf, b) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(b)] + digest, err := s.currentForkDigest() + require.NoError(t, err) + topic = s.addDigestAndIndexToTopic(topic, digest, 0) + + s.setSeenBlobIndex(bytesutil.PadTo([]byte{}, 32), 0) + result, err := s.validateBlob(ctx, "", &pubsub.Message{ + Message: &pb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }}) + require.NoError(t, err) + require.Equal(t, result, pubsub.ValidationIgnore) +} + +func TestValidateBlob_IncorrectProposerIndex(t *testing.T) { + db := dbtest.SetupDB(t) + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + chainService := &mock.ChainService{Genesis: time.Now(), FinalizedCheckPoint: ð.Checkpoint{}, DB: db} + stateGen := stategen.New(db, doublylinkedtree.New()) + s := &Service{ + slotToPendingBlobs: gcache.New(time.Second, 2*time.Second), + seenBlobCache: lruwrpr.New(10), + cfg: &config{ + p2p: p, + initialSync: &mockSync.Sync{}, + chain: chainService, + stateGen: stateGen, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}} + + b := util.NewBlobsidecar() + b.Message.Slot = chainService.CurrentSlot() + 1 + beaconState, privKeys := util.DeterministicGenesisState(t, 100) + + bb := util.NewBeaconBlock() + signedBb, err := blocks.NewSignedBeaconBlock(bb) + require.NoError(t, err) + require.NoError(t, db.SaveBlock(ctx, signedBb)) + r, err := signedBb.Block().HashTreeRoot() + require.NoError(t, err) + require.NoError(t, db.SaveState(ctx, beaconState, r)) + + b.Message.BlockParentRoot = r[:] + b.Signature, err = signing.ComputeDomainAndSign(beaconState, 0, b.Message, params.BeaconConfig().DomainBlobSidecar, privKeys[0]) + require.NoError(t, err) + + buf := new(bytes.Buffer) + _, err = p.Encoding().EncodeGossip(buf, b) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(b)] + digest, err := s.currentForkDigest() + require.NoError(t, err) + topic = s.addDigestAndIndexToTopic(topic, digest, 0) + + result, err := s.validateBlob(ctx, "", &pubsub.Message{ + Message: &pb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }}) + require.ErrorContains(t, "expected proposer index 21, got 0", err) + require.Equal(t, result, pubsub.ValidationReject) +} + +func TestValidateBlob_EverythingPasses(t *testing.T) { + db := dbtest.SetupDB(t) + ctx := context.Background() + p := p2ptest.NewTestP2P(t) + chainService := &mock.ChainService{Genesis: time.Now(), FinalizedCheckPoint: ð.Checkpoint{}, DB: db} + stateGen := stategen.New(db, doublylinkedtree.New()) + s := &Service{ + slotToPendingBlobs: gcache.New(time.Second, 2*time.Second), + seenBlobCache: lruwrpr.New(10), + cfg: &config{ + p2p: p, + initialSync: &mockSync.Sync{}, + chain: chainService, + stateGen: stateGen, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}} + + b := util.NewBlobsidecar() + b.Message.Slot = chainService.CurrentSlot() + 1 + beaconState, privKeys := util.DeterministicGenesisState(t, 100) + + bb := util.NewBeaconBlock() + signedBb, err := blocks.NewSignedBeaconBlock(bb) + require.NoError(t, err) + require.NoError(t, db.SaveBlock(ctx, signedBb)) + r, err := signedBb.Block().HashTreeRoot() + require.NoError(t, err) + require.NoError(t, db.SaveState(ctx, beaconState, r)) + + b.Message.BlockParentRoot = r[:] + b.Message.ProposerIndex = 21 + b.Signature, err = signing.ComputeDomainAndSign(beaconState, 0, b.Message, params.BeaconConfig().DomainBlobSidecar, privKeys[21]) + require.NoError(t, err) + + buf := new(bytes.Buffer) + _, err = p.Encoding().EncodeGossip(buf, b) + require.NoError(t, err) + + topic := p2p.GossipTypeMapping[reflect.TypeOf(b)] + digest, err := s.currentForkDigest() + require.NoError(t, err) + topic = s.addDigestAndIndexToTopic(topic, digest, 0) + + result, err := s.validateBlob(ctx, "", &pubsub.Message{ + Message: &pb.Message{ + Data: buf.Bytes(), + Topic: &topic, + }}) + require.NoError(t, err) + require.Equal(t, result, pubsub.ValidationAccept) +} diff --git a/config/fieldparams/mainnet.go b/config/fieldparams/mainnet.go index c04017796c..85e1a62d97 100644 --- a/config/fieldparams/mainnet.go +++ b/config/fieldparams/mainnet.go @@ -26,4 +26,5 @@ const ( SyncCommitteeAggregationBytesLength = 16 // SyncCommitteeAggregationBytesLength defines the length of sync committee aggregate bytes. SyncAggregateSyncCommitteeBytesLength = 64 // SyncAggregateSyncCommitteeBytesLength defines the length of sync committee bytes in a sync aggregate. MaxWithdrawalsPerPayload = 16 // MaxWithdrawalsPerPayloadLength defines the maximum number of withdrawals that can be included in a payload. + BlobLength = 131072 // BlobLength defines the byte length of a blob. ) diff --git a/config/fieldparams/minimal.go b/config/fieldparams/minimal.go index f48ba4e741..255c364714 100644 --- a/config/fieldparams/minimal.go +++ b/config/fieldparams/minimal.go @@ -26,4 +26,5 @@ const ( SyncCommitteeAggregationBytesLength = 1 // SyncCommitteeAggregationBytesLength defines the sync committee aggregate bytes. SyncAggregateSyncCommitteeBytesLength = 4 // SyncAggregateSyncCommitteeBytesLength defines the length of sync committee bytes in a sync aggregate. MaxWithdrawalsPerPayload = 4 // MaxWithdrawalsPerPayloadLength defines the maximum number of withdrawals that can be included in a payload. + BlobLength = 4 // BlobLength defines the byte length of a blob. ) diff --git a/config/params/mainnet_config.go b/config/params/mainnet_config.go index c0bfe58304..5f6f8ef1ad 100644 --- a/config/params/mainnet_config.go +++ b/config/params/mainnet_config.go @@ -31,6 +31,7 @@ var mainnetNetworkConfig = &NetworkConfig{ MaxChunkSize: 1 << 20, // 1 MiB MaxChunkSizeBellatrix: 10 * 1 << 20, // 10 MiB AttestationSubnetCount: 64, + BlobsidecarSubnetCount: 4, AttestationPropagationSlotRange: 32, MaxRequestBlocks: 1 << 10, // 1024 TtfbTimeout: 5 * time.Second, @@ -176,6 +177,7 @@ var mainnetBeaconConfig = &BeaconChainConfig{ DomainApplicationMask: bytesutil.Uint32ToBytes4(0x00000001), DomainApplicationBuilder: bytesutil.Uint32ToBytes4(0x00000001), DomainBLSToExecutionChange: bytesutil.Uint32ToBytes4(0x0A000000), + DomainBlobSidecar: bytesutil.Uint32ToBytes4(0x0B000000), // Prysm constants. GweiPerEth: 1000000000, diff --git a/config/params/network_config.go b/config/params/network_config.go index 7cfddff8fb..1dbcbf4b90 100644 --- a/config/params/network_config.go +++ b/config/params/network_config.go @@ -14,6 +14,7 @@ type NetworkConfig struct { MaxChunkSize uint64 `yaml:"MAX_CHUNK_SIZE"` // MaxChunkSize is the maximum allowed size of uncompressed req/resp chunked responses. MaxChunkSizeBellatrix uint64 `yaml:"MAX_CHUNK_SIZE_BELLATRIX"` // MaxChunkSizeBellatrix is the maximum allowed size of uncompressed req/resp chunked responses after the bellatrix epoch. AttestationSubnetCount uint64 `yaml:"ATTESTATION_SUBNET_COUNT"` // AttestationSubnetCount is the number of attestation subnets used in the gossipsub protocol. + BlobsidecarSubnetCount uint64 `yaml:"BLOBSIDECAR_SUBNET_COUNT"` // BlobsidecarSubnetCount is the number of blobsidecar subnets used in the gossipsub protocol. AttestationPropagationSlotRange primitives.Slot `yaml:"ATTESTATION_PROPAGATION_SLOT_RANGE"` // AttestationPropagationSlotRange is the maximum number of slots during which an attestation can be propagated. MaxRequestBlocks uint64 `yaml:"MAX_REQUEST_BLOCKS"` // MaxRequestBlocks is the maximum number of blocks in a single request. TtfbTimeout time.Duration `yaml:"TTFB_TIMEOUT"` // TtfbTimeout is the maximum time to wait for first byte of request response (time-to-first-byte). diff --git a/testing/util/block.go b/testing/util/block.go index ceeedae2bb..33da22d14e 100644 --- a/testing/util/block.go +++ b/testing/util/block.go @@ -77,6 +77,19 @@ func NewBeaconBlock() *ethpb.SignedBeaconBlock { } } +func NewBlobsidecar() *ethpb.SignedBlobSidecar { + return ðpb.SignedBlobSidecar{ + Message: ðpb.BlobSidecar{ + BlockRoot: make([]byte, fieldparams.RootLength), + BlockParentRoot: make([]byte, fieldparams.RootLength), + Blob: make([]byte, fieldparams.BlobLength), + KzgCommitment: make([]byte, fieldparams.BLSPubkeyLength), + KzgProof: make([]byte, fieldparams.BLSPubkeyLength), + }, + Signature: make([]byte, fieldparams.BLSSignatureLength), + } +} + // GenerateFullBlock generates a fully valid block with the requested parameters. // Use BlockGenConfig to declare the conditions you would like the block generated under. func GenerateFullBlock(