mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 05:47:59 -05:00
Add Blob Gossip (#12413)
This commit is contained in:
committed by
Kasey Kirkham
parent
e7818152d0
commit
d7accd58ec
@@ -34,6 +34,7 @@ type BlockReceiver interface {
|
||||
ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error
|
||||
ReceiveBlockBatch(ctx context.Context, blocks []interfaces.ReadOnlySignedBeaconBlock, blkRoots [][32]byte) error
|
||||
HasBlock(ctx context.Context, root [32]byte) bool
|
||||
RecentBlockSlot(root [32]byte) (primitives.Slot, error)
|
||||
}
|
||||
|
||||
// SlashingReceiver interface defines the methods of chain service for receiving validated slashing over the wire.
|
||||
@@ -237,6 +238,11 @@ func (s *Service) HasBlock(ctx context.Context, root [32]byte) bool {
|
||||
return s.hasBlockInInitSyncOrDB(ctx, root)
|
||||
}
|
||||
|
||||
// RecentBlockSlot returns block slot form fork choice store
|
||||
func (s *Service) RecentBlockSlot(root [32]byte) (primitives.Slot, error) {
|
||||
return s.cfg.ForkChoiceStore.Slot(root)
|
||||
}
|
||||
|
||||
// ReceiveAttesterSlashing receives an attester slashing and inserts it to forkchoice
|
||||
func (s *Service) ReceiveAttesterSlashing(ctx context.Context, slashing *ethpb.AttesterSlashing) {
|
||||
s.cfg.ForkChoiceStore.Lock()
|
||||
|
||||
@@ -69,6 +69,7 @@ type ChainService struct {
|
||||
OptimisticCheckRootReceived [32]byte
|
||||
FinalizedRoots map[[32]byte]bool
|
||||
OptimisticRoots map[[32]byte]bool
|
||||
BlockSlot primitives.Slot
|
||||
}
|
||||
|
||||
func (s *ChainService) Ancestor(ctx context.Context, root []byte, slot primitives.Slot) ([]byte, error) {
|
||||
@@ -389,6 +390,11 @@ func (s *ChainService) HasBlock(ctx context.Context, rt [32]byte) bool {
|
||||
return s.InitSyncBlockRoots[rt]
|
||||
}
|
||||
|
||||
// RecentBlockSlot mocks the same method in the chain service.
|
||||
func (s *ChainService) RecentBlockSlot([32]byte) (primitives.Slot, error) {
|
||||
return s.BlockSlot, nil
|
||||
}
|
||||
|
||||
// HeadGenesisValidatorsRoot mocks HeadGenesisValidatorsRoot method in chain service.
|
||||
func (_ *ChainService) HeadGenesisValidatorsRoot() [32]byte {
|
||||
return [32]byte{}
|
||||
|
||||
@@ -21,6 +21,7 @@ var gossipTopicMappings = map[string]proto.Message{
|
||||
SyncContributionAndProofSubnetTopicFormat: ðpb.SignedContributionAndProof{},
|
||||
SyncCommitteeSubnetTopicFormat: ðpb.SyncCommitteeMessage{},
|
||||
BlsToExecutionChangeSubnetTopicFormat: ðpb.SignedBLSToExecutionChange{},
|
||||
BlobSubnetTopicFormat: ðpb.SignedBlobSidecar{},
|
||||
}
|
||||
|
||||
// GossipTopicMappings is a function to return the assigned data type
|
||||
|
||||
@@ -57,12 +57,17 @@ func (s *Service) CanSubscribe(topic string) bool {
|
||||
log.WithError(err).Error("Could not determine Capella fork digest")
|
||||
return false
|
||||
}
|
||||
|
||||
denebForkDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().DenebForkEpoch, s.genesisValidatorsRoot)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not determine Deneb fork digest")
|
||||
return false
|
||||
}
|
||||
switch parts[2] {
|
||||
case fmt.Sprintf("%x", phase0ForkDigest):
|
||||
case fmt.Sprintf("%x", altairForkDigest):
|
||||
case fmt.Sprintf("%x", bellatrixForkDigest):
|
||||
case fmt.Sprintf("%x", capellaForkDigest):
|
||||
case fmt.Sprintf("%x", denebForkDigest):
|
||||
default:
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -90,7 +90,7 @@ func TestService_CanSubscribe(t *testing.T) {
|
||||
formatting := []interface{}{digest}
|
||||
|
||||
// Special case for attestation subnets which have a second formatting placeholder.
|
||||
if topic == AttestationSubnetTopicFormat || topic == SyncCommitteeSubnetTopicFormat {
|
||||
if topic == AttestationSubnetTopicFormat || topic == SyncCommitteeSubnetTopicFormat || topic == BlobSubnetTopicFormat {
|
||||
formatting = append(formatting, 0 /* some subnet ID */)
|
||||
}
|
||||
|
||||
|
||||
@@ -28,7 +28,8 @@ const (
|
||||
GossipContributionAndProofMessage = "sync_committee_contribution_and_proof"
|
||||
// GossipBlsToExecutionChangeMessage is the name for the bls to execution change message type.
|
||||
GossipBlsToExecutionChangeMessage = "bls_to_execution_change"
|
||||
|
||||
// GossipBlobSidecarMessage is the name for the blob sidecar message type.
|
||||
GossipBlobSidecarMessage = "blob_sidecar"
|
||||
// Topic Formats
|
||||
//
|
||||
// AttestationSubnetTopicFormat is the topic format for the attestation subnet.
|
||||
@@ -49,4 +50,6 @@ const (
|
||||
SyncContributionAndProofSubnetTopicFormat = GossipProtocolAndDigest + GossipContributionAndProofMessage
|
||||
// BlsToExecutionChangeSubnetTopicFormat is the topic format for the bls to execution change subnet.
|
||||
BlsToExecutionChangeSubnetTopicFormat = GossipProtocolAndDigest + GossipBlsToExecutionChangeMessage
|
||||
// BlobSubnetTopicFormat is the topic format for the blob subnet.
|
||||
BlobSubnetTopicFormat = GossipProtocolAndDigest + GossipBlobSidecarMessage + "_%d"
|
||||
)
|
||||
|
||||
@@ -131,6 +131,9 @@ func TestGetSpec(t *testing.T) {
|
||||
var dam [4]byte
|
||||
copy(dam[:], []byte{'1', '0', '0', '0'})
|
||||
config.DomainApplicationMask = dam
|
||||
var dbs [4]byte
|
||||
copy(dam[:], []byte{'2', '0', '0', '0'})
|
||||
config.DomainBlobSidecar = dbs
|
||||
|
||||
params.OverrideBeaconConfig(config)
|
||||
|
||||
@@ -138,7 +141,7 @@ func TestGetSpec(t *testing.T) {
|
||||
resp, err := server.GetSpec(context.Background(), &emptypb.Empty{})
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 108, len(resp.Data))
|
||||
assert.Equal(t, 109, len(resp.Data))
|
||||
for k, v := range resp.Data {
|
||||
switch k {
|
||||
case "CONFIG_NAME":
|
||||
@@ -343,6 +346,8 @@ func TestGetSpec(t *testing.T) {
|
||||
assert.Equal(t, "0x0a000000", v)
|
||||
case "DOMAIN_APPLICATION_BUILDER":
|
||||
assert.Equal(t, "0x00000001", v)
|
||||
case "DOMAIN_BLOB_SIDECAR":
|
||||
assert.Equal(t, "0x00000000", v)
|
||||
case "TRANSITION_TOTAL_DIFFICULTY":
|
||||
assert.Equal(t, "0", v)
|
||||
case "TERMINAL_BLOCK_HASH_ACTIVATION_EPOCH":
|
||||
|
||||
@@ -33,6 +33,7 @@ go_library(
|
||||
"subscriber_beacon_aggregate_proof.go",
|
||||
"subscriber_beacon_attestation.go",
|
||||
"subscriber_beacon_blocks.go",
|
||||
"subscriber_blob_sidecar.go",
|
||||
"subscriber_bls_to_execution_change.go",
|
||||
"subscriber_handlers.go",
|
||||
"subscriber_sync_committee_message.go",
|
||||
@@ -42,6 +43,7 @@ go_library(
|
||||
"validate_attester_slashing.go",
|
||||
"validate_beacon_attestation.go",
|
||||
"validate_beacon_blocks.go",
|
||||
"validate_blob.go",
|
||||
"validate_bls_to_execution_change.go",
|
||||
"validate_proposer_slashing.go",
|
||||
"validate_sync_committee_message.go",
|
||||
@@ -164,6 +166,7 @@ go_test(
|
||||
"validate_attester_slashing_test.go",
|
||||
"validate_beacon_attestation_test.go",
|
||||
"validate_beacon_blocks_test.go",
|
||||
"validate_blob_test.go",
|
||||
"validate_bls_to_execution_change_test.go",
|
||||
"validate_proposer_slashing_test.go",
|
||||
"validate_sync_committee_message_test.go",
|
||||
|
||||
@@ -37,6 +37,8 @@ func (s *Service) decodePubsubMessage(msg *pubsub.Message) (ssz.Unmarshaler, err
|
||||
// differentiate them below.
|
||||
case strings.Contains(topic, p2p.GossipSyncCommitteeMessage) && !strings.Contains(topic, p2p.SyncContributionAndProofSubnetTopicFormat):
|
||||
topic = p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})]
|
||||
case strings.Contains(topic, p2p.GossipBlobSidecarMessage):
|
||||
topic = p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SignedBlobSidecar{})]
|
||||
}
|
||||
|
||||
base := p2p.GossipTopicMappings(topic, 0)
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/ssz/equality"
|
||||
"github.com/prysmaticlabs/prysm/v4/monitoring/tracing"
|
||||
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/trailofbits/go-mutexasserts"
|
||||
@@ -427,6 +428,21 @@ func (s *Service) pendingBlocksInCache(slot primitives.Slot) []interfaces.ReadOn
|
||||
return blks
|
||||
}
|
||||
|
||||
// This returns signed blob sidecar given input key from slotToPendingBlobs.
|
||||
func (s *Service) pendingBlobsInCache(slot primitives.Slot) []*eth.SignedBlobSidecar {
|
||||
k := slotToCacheKey(slot)
|
||||
value, ok := s.slotToPendingBlobs.Get(k)
|
||||
if !ok {
|
||||
return []*eth.SignedBlobSidecar{}
|
||||
}
|
||||
bs, ok := value.([]*eth.SignedBlobSidecar)
|
||||
if !ok {
|
||||
log.Debug("pendingBlobsInCache: value is not of type []*eth.SignedBlobSidecar")
|
||||
return []*eth.SignedBlobSidecar{}
|
||||
}
|
||||
return bs
|
||||
}
|
||||
|
||||
// This adds input signed beacon block to slotToPendingBlocks cache.
|
||||
func (s *Service) addPendingBlockToCache(b interfaces.ReadOnlySignedBeaconBlock) error {
|
||||
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
||||
@@ -445,6 +461,23 @@ func (s *Service) addPendingBlockToCache(b interfaces.ReadOnlySignedBeaconBlock)
|
||||
return nil
|
||||
}
|
||||
|
||||
// This adds blob to slotToPendingBlobs cache.
|
||||
func (s *Service) addPendingBlobToCache(b *eth.SignedBlobSidecar) error {
|
||||
blobs := s.pendingBlobsInCache(b.Message.Slot)
|
||||
|
||||
// If we already have seen the index. Ignore it.
|
||||
for _, blob := range blobs {
|
||||
if blob.Message.Index == b.Message.Index {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
blobs = append(blobs, b)
|
||||
k := slotToCacheKey(b.Message.Slot)
|
||||
s.slotToPendingBlobs.Set(k, blobs, pendingBlockExpTime)
|
||||
return nil
|
||||
}
|
||||
|
||||
// This converts input string to slot.
|
||||
func cacheKeyToSlot(s string) primitives.Slot {
|
||||
b := []byte(s)
|
||||
|
||||
@@ -44,6 +44,7 @@ var _ runtime.Service = (*Service)(nil)
|
||||
|
||||
const rangeLimit uint64 = 1024
|
||||
const seenBlockSize = 1000
|
||||
const seenBlobSize = seenBlockSize * 4 // Each block can have max 4 blobs. Worst case 164kB for cache.
|
||||
const seenUnaggregatedAttSize = 20000
|
||||
const seenAggregatedAttSize = 1024
|
||||
const seenSyncMsgSize = 1000 // Maximum of 512 sync committee members, 1000 is a safe amount.
|
||||
@@ -110,6 +111,7 @@ type Service struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
slotToPendingBlocks *gcache.Cache
|
||||
slotToPendingBlobs *gcache.Cache
|
||||
seenPendingBlocks map[[32]byte]bool
|
||||
blkRootToPendingAtts map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof
|
||||
subHandler *subTopicHandler
|
||||
@@ -120,6 +122,8 @@ type Service struct {
|
||||
rateLimiter *limiter
|
||||
seenBlockLock sync.RWMutex
|
||||
seenBlockCache *lru.Cache
|
||||
seenBlobLock sync.RWMutex
|
||||
seenBlobCache *lru.Cache
|
||||
seenAggregatedAttestationLock sync.RWMutex
|
||||
seenAggregatedAttestationCache *lru.Cache
|
||||
seenUnAggregatedAttestationLock sync.RWMutex
|
||||
@@ -146,6 +150,7 @@ type Service struct {
|
||||
// NewService initializes new regular sync service.
|
||||
func NewService(ctx context.Context, opts ...Option) *Service {
|
||||
c := gcache.New(pendingBlockExpTime /* exp time */, 2*pendingBlockExpTime /* prune time */)
|
||||
pendingBlobCache := gcache.New(0 /* exp time */, 0 /* prune time */)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
r := &Service{
|
||||
ctx: ctx,
|
||||
@@ -153,6 +158,7 @@ func NewService(ctx context.Context, opts ...Option) *Service {
|
||||
chainStarted: abool.New(),
|
||||
cfg: &config{clock: startup.NewClock(time.Unix(0, 0), [32]byte{})},
|
||||
slotToPendingBlocks: c,
|
||||
slotToPendingBlobs: pendingBlobCache,
|
||||
seenPendingBlocks: make(map[[32]byte]bool),
|
||||
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
|
||||
signatureChan: make(chan *signatureVerifier, verifierLimit),
|
||||
@@ -223,6 +229,7 @@ func (s *Service) Status() error {
|
||||
// and prevent DoS.
|
||||
func (s *Service) initCaches() {
|
||||
s.seenBlockCache = lruwrpr.New(seenBlockSize)
|
||||
s.seenBlobCache = lruwrpr.New(seenBlobSize)
|
||||
s.seenAggregatedAttestationCache = lruwrpr.New(seenAggregatedAttSize)
|
||||
s.seenUnAggregatedAttestationCache = lruwrpr.New(seenUnaggregatedAttSize)
|
||||
s.seenSyncMessageCache = lruwrpr.New(seenSyncMsgSize)
|
||||
|
||||
@@ -90,6 +90,7 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
s.validateCommitteeIndexBeaconAttestation, /* validator */
|
||||
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
|
||||
digest,
|
||||
params.BeaconNetworkConfig().AttestationSubnetCount,
|
||||
)
|
||||
} else {
|
||||
s.subscribeDynamicWithSubnets(
|
||||
@@ -133,6 +134,17 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
digest,
|
||||
)
|
||||
}
|
||||
|
||||
// New Gossip Topic in Deneb
|
||||
if epoch >= params.BeaconConfig().DenebForkEpoch {
|
||||
s.subscribeStaticWithSubnets(
|
||||
p2p.BlobSubnetTopicFormat,
|
||||
s.validateBlob, /* validator */
|
||||
s.blobSubscriber, /* message handler */
|
||||
digest,
|
||||
params.BeaconNetworkConfig().BlobsidecarSubnetCount,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
// subscribe to a given topic with a given validator and subscription handler.
|
||||
@@ -303,7 +315,7 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
|
||||
|
||||
// subscribe to a static subnet with the given topic and index.A given validator and subscription handler is
|
||||
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
|
||||
func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte) {
|
||||
func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte, subnetCount uint64) {
|
||||
genRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
|
||||
if err != nil {
|
||||
@@ -339,7 +351,7 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal,
|
||||
if !valid {
|
||||
log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
|
||||
// Unsubscribes from all our current subnets.
|
||||
for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ {
|
||||
for i := uint64(0); i < subnetCount; i++ {
|
||||
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
s.unSubscribeFromTopic(fullTopic)
|
||||
}
|
||||
@@ -347,7 +359,7 @@ func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal,
|
||||
return
|
||||
}
|
||||
// Check every slot that there are enough peers
|
||||
for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ {
|
||||
for i := uint64(0); i < subnetCount; i++ {
|
||||
if !s.validPeersExist(s.addDigestAndIndexToTopic(topic, digest, i)) {
|
||||
log.Debugf("No peers found subscribed to attestation gossip subnet with "+
|
||||
"committee index %d. Searching network for peers subscribed to the subnet.", i)
|
||||
|
||||
24
beacon-chain/sync/subscriber_blob_sidecar.go
Normal file
24
beacon-chain/sync/subscriber_blob_sidecar.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func (s *Service) blobSubscriber(ctx context.Context, msg proto.Message) error {
|
||||
b, ok := msg.(*eth.SignedBlobSidecar)
|
||||
if !ok {
|
||||
return fmt.Errorf("message was not type *eth.SignedBlobSidecar, type=%T", msg)
|
||||
}
|
||||
|
||||
log.WithFields(blobFields(b.Message)).Debug("Received blob sidecar")
|
||||
|
||||
s.setSeenBlobIndex(b.Message.Blob, b.Message.Index)
|
||||
|
||||
// TODO: Store blobs in cache. Will be addressed in subsequent PR.
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -335,7 +335,7 @@ func TestStaticSubnets(t *testing.T) {
|
||||
r.subscribeStaticWithSubnets(defaultTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
|
||||
// no-op
|
||||
return nil
|
||||
}, d)
|
||||
}, d, params.BeaconNetworkConfig().AttestationSubnetCount)
|
||||
topics := r.cfg.p2p.PubSub().GetTopics()
|
||||
if uint64(len(topics)) != params.BeaconNetworkConfig().AttestationSubnetCount {
|
||||
t.Errorf("Wanted the number of subnet topics registered to be %d but got %d", params.BeaconNetworkConfig().AttestationSubnetCount, len(topics))
|
||||
|
||||
190
beacon-chain/sync/validate_blob.go
Normal file
190
beacon-chain/sync/validate_blob.go
Normal file
@@ -0,0 +1,190 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/crypto/bls"
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
"github.com/prysmaticlabs/prysm/v4/network/forks"
|
||||
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/time/slots"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
func (s *Service) validateBlob(ctx context.Context, pid peer.ID, msg *pubsub.Message) (pubsub.ValidationResult, error) {
|
||||
if pid == s.cfg.p2p.PeerID() {
|
||||
return pubsub.ValidationAccept, nil
|
||||
}
|
||||
if s.cfg.initialSync.Syncing() {
|
||||
return pubsub.ValidationIgnore, nil
|
||||
}
|
||||
if msg.Topic == nil {
|
||||
return pubsub.ValidationReject, errInvalidTopic
|
||||
}
|
||||
m, err := s.decodePubsubMessage(msg)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to decode message")
|
||||
return pubsub.ValidationReject, err
|
||||
}
|
||||
|
||||
sBlob, ok := m.(*eth.SignedBlobSidecar)
|
||||
if !ok {
|
||||
log.WithField("message", m).Error("Message is not of type *eth.SignedBlobSidecar")
|
||||
return pubsub.ValidationReject, errWrongMessage
|
||||
}
|
||||
blob := sBlob.Message
|
||||
|
||||
// [REJECT] The sidecar is for the correct topic -- i.e. sidecar.index matches the topic {index}.
|
||||
want := fmt.Sprintf("blob_sidecar_%d", blob.Index)
|
||||
if !strings.Contains(*msg.Topic, want) {
|
||||
log.WithFields(blobFields(blob)).Debug("Sidecar blob does not match topic")
|
||||
return pubsub.ValidationReject, fmt.Errorf("wrong topic name: %s", *msg.Topic)
|
||||
}
|
||||
|
||||
// [IGNORE] The sidecar is not from a future slot (with a MAXIMUM_GOSSIP_CLOCK_DISPARITY allowance) --
|
||||
// i.e. validate that sidecar.slot <= current_slot (a client MAY queue future blocks for processing at the appropriate slot).
|
||||
genesisTime := uint64(s.cfg.chain.GenesisTime().Unix())
|
||||
if err := slots.VerifyTime(genesisTime, blob.Slot, earlyBlockProcessingTolerance); err != nil {
|
||||
log.WithError(err).WithFields(blobFields(blob)).Debug("Ignored blob: too far into future")
|
||||
return pubsub.ValidationIgnore, errors.Wrap(err, "blob too far into future")
|
||||
}
|
||||
|
||||
// [IGNORE] The sidecar is from a slot greater than the latest finalized slot --
|
||||
// i.e. validate that sidecar.slot > compute_start_slot_at_epoch(state.finalized_checkpoint.epoch)
|
||||
startSlot, err := slots.EpochStart(s.cfg.chain.FinalizedCheckpt().Epoch)
|
||||
if err != nil {
|
||||
return pubsub.ValidationIgnore, err
|
||||
}
|
||||
if startSlot >= blob.Slot {
|
||||
err := fmt.Errorf("finalized slot %d greater or equal to blob slot %d", startSlot, blob.Slot)
|
||||
log.WithFields(blobFields(blob)).Debug(err)
|
||||
return pubsub.ValidationIgnore, err
|
||||
}
|
||||
|
||||
// [IGNORE] The sidecar's block's parent (defined by sidecar.block_parent_root) has been seen (via both gossip and non-gossip sources)
|
||||
parentRoot := bytesutil.ToBytes32(blob.BlockParentRoot)
|
||||
if !s.cfg.chain.HasBlock(ctx, parentRoot) {
|
||||
if err := s.addPendingBlobToCache(sBlob); err != nil {
|
||||
log.WithError(err).WithFields(blobFields(blob)).Error("Failed to add blob to cache")
|
||||
return pubsub.ValidationIgnore, err
|
||||
}
|
||||
log.WithFields(blobFields(blob)).Debug("Ignored blob: parent block not found")
|
||||
return pubsub.ValidationIgnore, nil
|
||||
}
|
||||
|
||||
// [REJECT] The sidecar's block's parent (defined by sidecar.block_parent_root) passes validation.
|
||||
parentSlot, err := s.cfg.chain.RecentBlockSlot(parentRoot)
|
||||
if err != nil {
|
||||
return pubsub.ValidationIgnore, err
|
||||
}
|
||||
// [REJECT] The sidecar is from a higher slot than the sidecar's block's parent (defined by sidecar.block_parent_root).
|
||||
if parentSlot >= blob.Slot {
|
||||
err := fmt.Errorf("parent block slot %d greater or equal to blob slot %d", parentSlot, blob.Slot)
|
||||
log.WithFields(blobFields(blob)).Debug(err)
|
||||
return pubsub.ValidationReject, err
|
||||
}
|
||||
|
||||
// [REJECT] The proposer signature, signed_blob_sidecar.signature,
|
||||
// is valid with respect to the sidecar.proposer_index pubkey.
|
||||
parentState, err := s.cfg.stateGen.StateByRoot(ctx, parentRoot)
|
||||
if err != nil {
|
||||
return pubsub.ValidationIgnore, err
|
||||
}
|
||||
if err := verifyBlobSignature(parentState, sBlob); err != nil {
|
||||
log.WithError(err).WithFields(blobFields(blob)).Debug("Failed to verify blob signature")
|
||||
return pubsub.ValidationReject, err
|
||||
}
|
||||
|
||||
// [IGNORE] The sidecar is the only sidecar with valid signature received for the tuple (sidecar.block_root, sidecar.index).
|
||||
if s.hasSeenBlobIndex(blob.BlockRoot, blob.Index) {
|
||||
return pubsub.ValidationIgnore, nil
|
||||
}
|
||||
|
||||
// [REJECT] The sidecar is proposed by the expected proposer_index for the block's slot in the context of the current shuffling (defined by block_parent_root/slot)
|
||||
parentState, err = transition.ProcessSlotsUsingNextSlotCache(ctx, parentState, parentRoot[:], blob.Slot)
|
||||
if err != nil {
|
||||
return pubsub.ValidationIgnore, err
|
||||
}
|
||||
idx, err := helpers.BeaconProposerIndex(ctx, parentState)
|
||||
if err != nil {
|
||||
return pubsub.ValidationIgnore, err
|
||||
}
|
||||
if blob.ProposerIndex != idx {
|
||||
err := fmt.Errorf("expected proposer index %d, got %d", idx, blob.ProposerIndex)
|
||||
log.WithFields(blobFields(blob)).Debug(err)
|
||||
return pubsub.ValidationReject, err
|
||||
}
|
||||
|
||||
msg.ValidatorData = sBlob
|
||||
|
||||
return pubsub.ValidationAccept, nil
|
||||
}
|
||||
|
||||
func verifyBlobSignature(st state.BeaconState, blob *eth.SignedBlobSidecar) error {
|
||||
currentEpoch := slots.ToEpoch(blob.Message.Slot)
|
||||
fork, err := forks.Fork(currentEpoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
domain, err := signing.Domain(fork, currentEpoch, params.BeaconConfig().DomainBlobSidecar, st.GenesisValidatorsRoot())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
proposer, err := st.ValidatorAtIndex(blob.Message.ProposerIndex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
pb, err := bls.PublicKeyFromBytes(proposer.PublicKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sig, err := bls.SignatureFromBytes(blob.Signature)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sr, err := signing.ComputeSigningRoot(blob.Message, domain)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !sig.Verify(pb, sr[:]) {
|
||||
return signing.ErrSigFailedToVerify
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Returns true if the blob with the same root and index has been seen before.
|
||||
func (s *Service) hasSeenBlobIndex(root []byte, index uint64) bool {
|
||||
s.seenBlobLock.RLock()
|
||||
defer s.seenBlobLock.RUnlock()
|
||||
b := append(root, bytesutil.Bytes32(index)...)
|
||||
_, seen := s.seenBlobCache.Get(string(b))
|
||||
return seen
|
||||
}
|
||||
|
||||
// Set blob index and root as seen.
|
||||
func (s *Service) setSeenBlobIndex(root []byte, index uint64) {
|
||||
s.seenBlobLock.Lock()
|
||||
defer s.seenBlobLock.Unlock()
|
||||
b := append(root, bytesutil.Bytes32(index)...)
|
||||
s.seenBlobCache.Add(string(b), true)
|
||||
}
|
||||
|
||||
func blobFields(b *eth.BlobSidecar) logrus.Fields {
|
||||
return logrus.Fields{
|
||||
"slot": b.Slot,
|
||||
"proposerIndex": b.ProposerIndex,
|
||||
"blockRoot": fmt.Sprintf("%#x", b.BlockRoot),
|
||||
"index": b.Index,
|
||||
}
|
||||
}
|
||||
415
beacon-chain/sync/validate_blob_test.go
Normal file
415
beacon-chain/sync/validate_blob_test.go
Normal file
@@ -0,0 +1,415 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||
pb "github.com/libp2p/go-libp2p-pubsub/pb"
|
||||
gcache "github.com/patrickmn/go-cache"
|
||||
mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing"
|
||||
dbtest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
|
||||
doublylinkedtree "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/doubly-linked-tree"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
|
||||
p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
|
||||
mockSync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync/testing"
|
||||
lruwrpr "github.com/prysmaticlabs/prysm/v4/cache/lru"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/crypto/bls"
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/require"
|
||||
"github.com/prysmaticlabs/prysm/v4/testing/util"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
func TestValidateBlob_FromSelf(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
s := &Service{cfg: &config{p2p: p}}
|
||||
result, err := s.validateBlob(ctx, s.cfg.p2p.PeerID(), nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, result, pubsub.ValidationAccept)
|
||||
}
|
||||
|
||||
func TestValidateBlob_InitSync(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{IsSyncing: true}}}
|
||||
result, err := s.validateBlob(ctx, "", nil)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, result, pubsub.ValidationIgnore)
|
||||
}
|
||||
|
||||
func TestValidateBlob_InvalidTopic(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}}}
|
||||
result, err := s.validateBlob(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{},
|
||||
})
|
||||
require.ErrorIs(t, errInvalidTopic, err)
|
||||
require.Equal(t, result, pubsub.ValidationReject)
|
||||
}
|
||||
|
||||
func TestValidateBlob_InvalidMessageType(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
chainService := &mock.ChainService{Genesis: time.Unix(time.Now().Unix()-int64(params.BeaconConfig().SecondsPerSlot), 0)}
|
||||
s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}, clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}}
|
||||
|
||||
msg := util.NewBeaconBlock()
|
||||
buf := new(bytes.Buffer)
|
||||
_, err := p.Encoding().EncodeGossip(buf, msg)
|
||||
require.NoError(t, err)
|
||||
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestToTopic(topic, digest)
|
||||
result, err := s.validateBlob(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{
|
||||
Data: buf.Bytes(),
|
||||
Topic: &topic,
|
||||
}})
|
||||
require.ErrorIs(t, errWrongMessage, err)
|
||||
require.Equal(t, result, pubsub.ValidationReject)
|
||||
}
|
||||
|
||||
func TestValidateBlob_InvalidTopicIndex(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
chainService := &mock.ChainService{Genesis: time.Unix(time.Now().Unix()-int64(params.BeaconConfig().SecondsPerSlot), 0)}
|
||||
s := &Service{cfg: &config{p2p: p, initialSync: &mockSync.Sync{}, clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}}
|
||||
|
||||
msg := util.NewBlobsidecar()
|
||||
buf := new(bytes.Buffer)
|
||||
_, err := p.Encoding().EncodeGossip(buf, msg)
|
||||
require.NoError(t, err)
|
||||
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(msg)]
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestAndIndexToTopic(topic, digest, 1)
|
||||
result, err := s.validateBlob(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{
|
||||
Data: buf.Bytes(),
|
||||
Topic: &topic,
|
||||
}})
|
||||
require.ErrorContains(t, "/eth2/f5a5fd42/blob_sidecar_1", err)
|
||||
require.Equal(t, result, pubsub.ValidationReject)
|
||||
}
|
||||
|
||||
func TestValidateBlob_OlderThanFinalizedEpoch(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
chainService := &mock.ChainService{Genesis: time.Now(), FinalizedCheckPoint: ð.Checkpoint{Epoch: 1}}
|
||||
s := &Service{cfg: &config{
|
||||
p2p: p,
|
||||
initialSync: &mockSync.Sync{},
|
||||
chain: chainService,
|
||||
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}}
|
||||
|
||||
b := util.NewBlobsidecar()
|
||||
b.Message.Slot = chainService.CurrentSlot() + 1
|
||||
buf := new(bytes.Buffer)
|
||||
_, err := p.Encoding().EncodeGossip(buf, b)
|
||||
require.NoError(t, err)
|
||||
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(b)]
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestAndIndexToTopic(topic, digest, 0)
|
||||
result, err := s.validateBlob(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{
|
||||
Data: buf.Bytes(),
|
||||
Topic: &topic,
|
||||
}})
|
||||
require.ErrorContains(t, "finalized slot 32 greater or equal to blob slot 1", err)
|
||||
require.Equal(t, result, pubsub.ValidationIgnore)
|
||||
}
|
||||
|
||||
func TestValidateBlob_UnknownParentBlock(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctx := context.Background()
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
chainService := &mock.ChainService{Genesis: time.Now(), FinalizedCheckPoint: ð.Checkpoint{}}
|
||||
s := &Service{
|
||||
slotToPendingBlobs: gcache.New(time.Second, 2*time.Second),
|
||||
cfg: &config{
|
||||
p2p: p,
|
||||
initialSync: &mockSync.Sync{},
|
||||
chain: chainService,
|
||||
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}}
|
||||
|
||||
b := util.NewBlobsidecar()
|
||||
b.Message.Slot = chainService.CurrentSlot() + 1
|
||||
buf := new(bytes.Buffer)
|
||||
_, err := p.Encoding().EncodeGossip(buf, b)
|
||||
require.NoError(t, err)
|
||||
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(b)]
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestAndIndexToTopic(topic, digest, 0)
|
||||
result, err := s.validateBlob(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{
|
||||
Data: buf.Bytes(),
|
||||
Topic: &topic,
|
||||
}})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, result, pubsub.ValidationIgnore)
|
||||
require.LogsContain(t, hook, "Ignored blob: parent block not found")
|
||||
|
||||
pendingBlobs := s.pendingBlobsInCache(b.Message.Slot)
|
||||
require.Equal(t, 1, len(pendingBlobs))
|
||||
}
|
||||
|
||||
func TestValidateBlob_HigherThanParentSlot(t *testing.T) {
|
||||
db := dbtest.SetupDB(t)
|
||||
ctx := context.Background()
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
chainService := &mock.ChainService{Genesis: time.Now(), FinalizedCheckPoint: ð.Checkpoint{}, DB: db}
|
||||
s := &Service{
|
||||
slotToPendingBlobs: gcache.New(time.Second, 2*time.Second),
|
||||
cfg: &config{
|
||||
p2p: p,
|
||||
initialSync: &mockSync.Sync{},
|
||||
chain: chainService,
|
||||
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}}
|
||||
|
||||
b := util.NewBlobsidecar()
|
||||
b.Message.Slot = chainService.CurrentSlot() + 1
|
||||
chainService.BlockSlot = chainService.CurrentSlot() + 1
|
||||
bb := util.NewBeaconBlock()
|
||||
bb.Block.Slot = b.Message.Slot
|
||||
signedBb, err := blocks.NewSignedBeaconBlock(bb)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.SaveBlock(ctx, signedBb))
|
||||
r, err := signedBb.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
b.Message.BlockParentRoot = r[:]
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
_, err = p.Encoding().EncodeGossip(buf, b)
|
||||
require.NoError(t, err)
|
||||
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(b)]
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestAndIndexToTopic(topic, digest, 0)
|
||||
result, err := s.validateBlob(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{
|
||||
Data: buf.Bytes(),
|
||||
Topic: &topic,
|
||||
}})
|
||||
require.ErrorContains(t, "parent block slot 1 greater or equal to blob slot 1", err)
|
||||
require.Equal(t, result, pubsub.ValidationReject)
|
||||
}
|
||||
|
||||
func TestValidateBlob_InvalidProposerSignature(t *testing.T) {
|
||||
db := dbtest.SetupDB(t)
|
||||
ctx := context.Background()
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
chainService := &mock.ChainService{Genesis: time.Now(), FinalizedCheckPoint: ð.Checkpoint{}, DB: db}
|
||||
stateGen := stategen.New(db, doublylinkedtree.New())
|
||||
s := &Service{
|
||||
slotToPendingBlobs: gcache.New(time.Second, 2*time.Second),
|
||||
cfg: &config{
|
||||
p2p: p,
|
||||
initialSync: &mockSync.Sync{},
|
||||
chain: chainService,
|
||||
stateGen: stateGen,
|
||||
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}}
|
||||
|
||||
b := util.NewBlobsidecar()
|
||||
b.Message.Slot = chainService.CurrentSlot() + 1
|
||||
sk, err := bls.SecretKeyFromBytes(bytesutil.PadTo([]byte("sk"), 32))
|
||||
require.NoError(t, err)
|
||||
b.Signature = sk.Sign([]byte("data")).Marshal()
|
||||
|
||||
bb := util.NewBeaconBlock()
|
||||
signedBb, err := blocks.NewSignedBeaconBlock(bb)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.SaveBlock(ctx, signedBb))
|
||||
r, err := signedBb.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
|
||||
b.Message.BlockParentRoot = r[:]
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
_, err = p.Encoding().EncodeGossip(buf, b)
|
||||
require.NoError(t, err)
|
||||
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(b)]
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestAndIndexToTopic(topic, digest, 0)
|
||||
result, err := s.validateBlob(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{
|
||||
Data: buf.Bytes(),
|
||||
Topic: &topic,
|
||||
}})
|
||||
require.ErrorIs(t, signing.ErrSigFailedToVerify, err)
|
||||
require.Equal(t, result, pubsub.ValidationReject)
|
||||
}
|
||||
|
||||
func TestValidateBlob_AlreadySeenInCache(t *testing.T) {
|
||||
db := dbtest.SetupDB(t)
|
||||
ctx := context.Background()
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
chainService := &mock.ChainService{Genesis: time.Now(), FinalizedCheckPoint: ð.Checkpoint{}, DB: db}
|
||||
stateGen := stategen.New(db, doublylinkedtree.New())
|
||||
s := &Service{
|
||||
slotToPendingBlobs: gcache.New(time.Second, 2*time.Second),
|
||||
seenBlobCache: lruwrpr.New(10),
|
||||
cfg: &config{
|
||||
p2p: p,
|
||||
initialSync: &mockSync.Sync{},
|
||||
chain: chainService,
|
||||
stateGen: stateGen,
|
||||
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}}
|
||||
|
||||
b := util.NewBlobsidecar()
|
||||
b.Message.Slot = chainService.CurrentSlot() + 1
|
||||
beaconState, privKeys := util.DeterministicGenesisState(t, 100)
|
||||
|
||||
bb := util.NewBeaconBlock()
|
||||
signedBb, err := blocks.NewSignedBeaconBlock(bb)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.SaveBlock(ctx, signedBb))
|
||||
r, err := signedBb.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.SaveState(ctx, beaconState, r))
|
||||
|
||||
b.Message.BlockParentRoot = r[:]
|
||||
b.Signature, err = signing.ComputeDomainAndSign(beaconState, 0, b.Message, params.BeaconConfig().DomainBlobSidecar, privKeys[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
_, err = p.Encoding().EncodeGossip(buf, b)
|
||||
require.NoError(t, err)
|
||||
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(b)]
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestAndIndexToTopic(topic, digest, 0)
|
||||
|
||||
s.setSeenBlobIndex(bytesutil.PadTo([]byte{}, 32), 0)
|
||||
result, err := s.validateBlob(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{
|
||||
Data: buf.Bytes(),
|
||||
Topic: &topic,
|
||||
}})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, result, pubsub.ValidationIgnore)
|
||||
}
|
||||
|
||||
func TestValidateBlob_IncorrectProposerIndex(t *testing.T) {
|
||||
db := dbtest.SetupDB(t)
|
||||
ctx := context.Background()
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
chainService := &mock.ChainService{Genesis: time.Now(), FinalizedCheckPoint: ð.Checkpoint{}, DB: db}
|
||||
stateGen := stategen.New(db, doublylinkedtree.New())
|
||||
s := &Service{
|
||||
slotToPendingBlobs: gcache.New(time.Second, 2*time.Second),
|
||||
seenBlobCache: lruwrpr.New(10),
|
||||
cfg: &config{
|
||||
p2p: p,
|
||||
initialSync: &mockSync.Sync{},
|
||||
chain: chainService,
|
||||
stateGen: stateGen,
|
||||
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}}
|
||||
|
||||
b := util.NewBlobsidecar()
|
||||
b.Message.Slot = chainService.CurrentSlot() + 1
|
||||
beaconState, privKeys := util.DeterministicGenesisState(t, 100)
|
||||
|
||||
bb := util.NewBeaconBlock()
|
||||
signedBb, err := blocks.NewSignedBeaconBlock(bb)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.SaveBlock(ctx, signedBb))
|
||||
r, err := signedBb.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.SaveState(ctx, beaconState, r))
|
||||
|
||||
b.Message.BlockParentRoot = r[:]
|
||||
b.Signature, err = signing.ComputeDomainAndSign(beaconState, 0, b.Message, params.BeaconConfig().DomainBlobSidecar, privKeys[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
_, err = p.Encoding().EncodeGossip(buf, b)
|
||||
require.NoError(t, err)
|
||||
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(b)]
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestAndIndexToTopic(topic, digest, 0)
|
||||
|
||||
result, err := s.validateBlob(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{
|
||||
Data: buf.Bytes(),
|
||||
Topic: &topic,
|
||||
}})
|
||||
require.ErrorContains(t, "expected proposer index 21, got 0", err)
|
||||
require.Equal(t, result, pubsub.ValidationReject)
|
||||
}
|
||||
|
||||
func TestValidateBlob_EverythingPasses(t *testing.T) {
|
||||
db := dbtest.SetupDB(t)
|
||||
ctx := context.Background()
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
chainService := &mock.ChainService{Genesis: time.Now(), FinalizedCheckPoint: ð.Checkpoint{}, DB: db}
|
||||
stateGen := stategen.New(db, doublylinkedtree.New())
|
||||
s := &Service{
|
||||
slotToPendingBlobs: gcache.New(time.Second, 2*time.Second),
|
||||
seenBlobCache: lruwrpr.New(10),
|
||||
cfg: &config{
|
||||
p2p: p,
|
||||
initialSync: &mockSync.Sync{},
|
||||
chain: chainService,
|
||||
stateGen: stateGen,
|
||||
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot)}}
|
||||
|
||||
b := util.NewBlobsidecar()
|
||||
b.Message.Slot = chainService.CurrentSlot() + 1
|
||||
beaconState, privKeys := util.DeterministicGenesisState(t, 100)
|
||||
|
||||
bb := util.NewBeaconBlock()
|
||||
signedBb, err := blocks.NewSignedBeaconBlock(bb)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.SaveBlock(ctx, signedBb))
|
||||
r, err := signedBb.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.SaveState(ctx, beaconState, r))
|
||||
|
||||
b.Message.BlockParentRoot = r[:]
|
||||
b.Message.ProposerIndex = 21
|
||||
b.Signature, err = signing.ComputeDomainAndSign(beaconState, 0, b.Message, params.BeaconConfig().DomainBlobSidecar, privKeys[21])
|
||||
require.NoError(t, err)
|
||||
|
||||
buf := new(bytes.Buffer)
|
||||
_, err = p.Encoding().EncodeGossip(buf, b)
|
||||
require.NoError(t, err)
|
||||
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(b)]
|
||||
digest, err := s.currentForkDigest()
|
||||
require.NoError(t, err)
|
||||
topic = s.addDigestAndIndexToTopic(topic, digest, 0)
|
||||
|
||||
result, err := s.validateBlob(ctx, "", &pubsub.Message{
|
||||
Message: &pb.Message{
|
||||
Data: buf.Bytes(),
|
||||
Topic: &topic,
|
||||
}})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, result, pubsub.ValidationAccept)
|
||||
}
|
||||
@@ -26,4 +26,5 @@ const (
|
||||
SyncCommitteeAggregationBytesLength = 16 // SyncCommitteeAggregationBytesLength defines the length of sync committee aggregate bytes.
|
||||
SyncAggregateSyncCommitteeBytesLength = 64 // SyncAggregateSyncCommitteeBytesLength defines the length of sync committee bytes in a sync aggregate.
|
||||
MaxWithdrawalsPerPayload = 16 // MaxWithdrawalsPerPayloadLength defines the maximum number of withdrawals that can be included in a payload.
|
||||
BlobLength = 131072 // BlobLength defines the byte length of a blob.
|
||||
)
|
||||
|
||||
@@ -26,4 +26,5 @@ const (
|
||||
SyncCommitteeAggregationBytesLength = 1 // SyncCommitteeAggregationBytesLength defines the sync committee aggregate bytes.
|
||||
SyncAggregateSyncCommitteeBytesLength = 4 // SyncAggregateSyncCommitteeBytesLength defines the length of sync committee bytes in a sync aggregate.
|
||||
MaxWithdrawalsPerPayload = 4 // MaxWithdrawalsPerPayloadLength defines the maximum number of withdrawals that can be included in a payload.
|
||||
BlobLength = 4 // BlobLength defines the byte length of a blob.
|
||||
)
|
||||
|
||||
@@ -121,6 +121,7 @@ type BeaconChainConfig struct {
|
||||
DomainApplicationMask [4]byte `yaml:"DOMAIN_APPLICATION_MASK" spec:"true"` // DomainApplicationMask defines the BLS signature domain for application mask.
|
||||
DomainApplicationBuilder [4]byte `yaml:"DOMAIN_APPLICATION_BUILDER" spec:"true"` // DomainApplicationBuilder defines the BLS signature domain for application builder.
|
||||
DomainBLSToExecutionChange [4]byte `yaml:"DOMAIN_BLS_TO_EXECUTION_CHANGE" spec:"true"` // DomainBLSToExecutionChange defines the BLS signature domain to change withdrawal addresses to ETH1 prefix
|
||||
DomainBlobSidecar [4]byte `yaml:"DOMAIN_BLOB_SIDECAR" spec:"true"` // DomainBlobSidecar defines the BLS signature domain for blob sidecar.
|
||||
|
||||
// Prysm constants.
|
||||
GweiPerEth uint64 // GweiPerEth is the amount of gwei corresponding to 1 eth.
|
||||
|
||||
@@ -31,6 +31,7 @@ var mainnetNetworkConfig = &NetworkConfig{
|
||||
MaxChunkSize: 1 << 20, // 1 MiB
|
||||
MaxChunkSizeBellatrix: 10 * 1 << 20, // 10 MiB
|
||||
AttestationSubnetCount: 64,
|
||||
BlobsidecarSubnetCount: 4,
|
||||
AttestationPropagationSlotRange: 32,
|
||||
MaxRequestBlocks: 1 << 10, // 1024
|
||||
TtfbTimeout: 5 * time.Second,
|
||||
@@ -176,6 +177,7 @@ var mainnetBeaconConfig = &BeaconChainConfig{
|
||||
DomainApplicationMask: bytesutil.Uint32ToBytes4(0x00000001),
|
||||
DomainApplicationBuilder: bytesutil.Uint32ToBytes4(0x00000001),
|
||||
DomainBLSToExecutionChange: bytesutil.Uint32ToBytes4(0x0A000000),
|
||||
DomainBlobSidecar: bytesutil.Uint32ToBytes4(0x0B000000),
|
||||
|
||||
// Prysm constants.
|
||||
GweiPerEth: 1000000000,
|
||||
|
||||
@@ -14,6 +14,7 @@ type NetworkConfig struct {
|
||||
MaxChunkSize uint64 `yaml:"MAX_CHUNK_SIZE"` // MaxChunkSize is the maximum allowed size of uncompressed req/resp chunked responses.
|
||||
MaxChunkSizeBellatrix uint64 `yaml:"MAX_CHUNK_SIZE_BELLATRIX"` // MaxChunkSizeBellatrix is the maximum allowed size of uncompressed req/resp chunked responses after the bellatrix epoch.
|
||||
AttestationSubnetCount uint64 `yaml:"ATTESTATION_SUBNET_COUNT"` // AttestationSubnetCount is the number of attestation subnets used in the gossipsub protocol.
|
||||
BlobsidecarSubnetCount uint64 `yaml:"BLOBSIDECAR_SUBNET_COUNT"` // BlobsidecarSubnetCount is the number of blobsidecar subnets used in the gossipsub protocol.
|
||||
AttestationPropagationSlotRange primitives.Slot `yaml:"ATTESTATION_PROPAGATION_SLOT_RANGE"` // AttestationPropagationSlotRange is the maximum number of slots during which an attestation can be propagated.
|
||||
MaxRequestBlocks uint64 `yaml:"MAX_REQUEST_BLOCKS"` // MaxRequestBlocks is the maximum number of blocks in a single request.
|
||||
TtfbTimeout time.Duration `yaml:"TTFB_TIMEOUT"` // TtfbTimeout is the maximum time to wait for first byte of request response (time-to-first-byte).
|
||||
|
||||
@@ -162,7 +162,7 @@ func (executionPayload) WithdrawalsRoot() ([]byte, error) {
|
||||
|
||||
// ExcessDataGas --
|
||||
func (e executionPayload) ExcessDataGas() ([]byte, error) {
|
||||
return nil, consensus_types.ErrUnsupportedGetter
|
||||
return nil, consensus_types.ErrUnsupportedField
|
||||
}
|
||||
|
||||
// PbBellatrix --
|
||||
@@ -328,7 +328,7 @@ func (executionPayloadHeader) WithdrawalsRoot() ([]byte, error) {
|
||||
|
||||
// ExcessDataGas --
|
||||
func (e executionPayloadHeader) ExcessDataGas() ([]byte, error) {
|
||||
return nil, consensus_types.ErrUnsupportedGetter
|
||||
return nil, consensus_types.ErrUnsupportedField
|
||||
}
|
||||
|
||||
// PbV2 --
|
||||
@@ -523,7 +523,7 @@ func (executionPayloadCapella) WithdrawalsRoot() ([]byte, error) {
|
||||
|
||||
// ExcessDataGas returns error for executionPayloadCapella.
|
||||
func (e executionPayloadCapella) ExcessDataGas() ([]byte, error) {
|
||||
return nil, consensus_types.ErrUnsupportedGetter
|
||||
return nil, consensus_types.ErrUnsupportedField
|
||||
}
|
||||
|
||||
// PbV2 --
|
||||
@@ -690,7 +690,7 @@ func (e executionPayloadHeaderCapella) WithdrawalsRoot() ([]byte, error) {
|
||||
|
||||
// ExcessDataGas returns error for executionPayloadHeaderCapella.
|
||||
func (e executionPayloadHeaderCapella) ExcessDataGas() ([]byte, error) {
|
||||
return nil, consensus_types.ErrUnsupportedGetter
|
||||
return nil, consensus_types.ErrUnsupportedField
|
||||
}
|
||||
|
||||
// PbV2 --
|
||||
@@ -970,7 +970,7 @@ func (e executionPayloadHeaderDeneb) BlockHash() []byte {
|
||||
|
||||
// Transactions --
|
||||
func (executionPayloadHeaderDeneb) Transactions() ([][]byte, error) {
|
||||
return nil, consensus_types.ErrUnsupportedGetter
|
||||
return nil, consensus_types.ErrUnsupportedField
|
||||
}
|
||||
|
||||
// TransactionsRoot --
|
||||
@@ -980,7 +980,7 @@ func (e executionPayloadHeaderDeneb) TransactionsRoot() ([]byte, error) {
|
||||
|
||||
// Withdrawals --
|
||||
func (e executionPayloadHeaderDeneb) Withdrawals() ([]*enginev1.Withdrawal, error) {
|
||||
return nil, consensus_types.ErrUnsupportedGetter
|
||||
return nil, consensus_types.ErrUnsupportedField
|
||||
}
|
||||
|
||||
// WitdrawalsRoot --
|
||||
@@ -994,12 +994,12 @@ func (e executionPayloadHeaderDeneb) ExcessDataGas() ([]byte, error) {
|
||||
|
||||
// PbBellatrix --
|
||||
func (e executionPayloadHeaderDeneb) PbBellatrix() (*enginev1.ExecutionPayload, error) {
|
||||
return nil, consensus_types.ErrUnsupportedGetter
|
||||
return nil, consensus_types.ErrUnsupportedField
|
||||
}
|
||||
|
||||
// PbCapella --
|
||||
func (e executionPayloadHeaderDeneb) PbCapella() (*enginev1.ExecutionPayloadCapella, error) {
|
||||
return nil, consensus_types.ErrUnsupportedGetter
|
||||
return nil, consensus_types.ErrUnsupportedField
|
||||
}
|
||||
|
||||
func (e executionPayloadHeaderDeneb) ValueInGwei() (uint64, error) {
|
||||
@@ -1140,7 +1140,7 @@ func (e executionPayloadDeneb) Transactions() ([][]byte, error) {
|
||||
|
||||
// TransactionsRoot --
|
||||
func (e executionPayloadDeneb) TransactionsRoot() ([]byte, error) {
|
||||
return nil, consensus_types.ErrUnsupportedGetter
|
||||
return nil, consensus_types.ErrUnsupportedField
|
||||
}
|
||||
|
||||
// Withdrawals --
|
||||
@@ -1150,7 +1150,7 @@ func (e executionPayloadDeneb) Withdrawals() ([]*enginev1.Withdrawal, error) {
|
||||
|
||||
// WithdrawalsRoot --
|
||||
func (e executionPayloadDeneb) WithdrawalsRoot() ([]byte, error) {
|
||||
return nil, consensus_types.ErrUnsupportedGetter
|
||||
return nil, consensus_types.ErrUnsupportedField
|
||||
}
|
||||
|
||||
func (e executionPayloadDeneb) ExcessDataGas() ([]byte, error) {
|
||||
@@ -1159,12 +1159,12 @@ func (e executionPayloadDeneb) ExcessDataGas() ([]byte, error) {
|
||||
|
||||
// PbBellatrix --
|
||||
func (e executionPayloadDeneb) PbBellatrix() (*enginev1.ExecutionPayload, error) {
|
||||
return nil, consensus_types.ErrUnsupportedGetter
|
||||
return nil, consensus_types.ErrUnsupportedField
|
||||
}
|
||||
|
||||
// PbCapella --
|
||||
func (e executionPayloadDeneb) PbCapella() (*enginev1.ExecutionPayloadCapella, error) {
|
||||
return nil, consensus_types.ErrUnsupportedGetter
|
||||
return nil, consensus_types.ErrUnsupportedField
|
||||
}
|
||||
|
||||
func (e executionPayloadDeneb) ValueInGwei() (uint64, error) {
|
||||
|
||||
@@ -77,6 +77,19 @@ func NewBeaconBlock() *ethpb.SignedBeaconBlock {
|
||||
}
|
||||
}
|
||||
|
||||
func NewBlobsidecar() *ethpb.SignedBlobSidecar {
|
||||
return ðpb.SignedBlobSidecar{
|
||||
Message: ðpb.BlobSidecar{
|
||||
BlockRoot: make([]byte, fieldparams.RootLength),
|
||||
BlockParentRoot: make([]byte, fieldparams.RootLength),
|
||||
Blob: make([]byte, fieldparams.BlobLength),
|
||||
KzgCommitment: make([]byte, fieldparams.BLSPubkeyLength),
|
||||
KzgProof: make([]byte, fieldparams.BLSPubkeyLength),
|
||||
},
|
||||
Signature: make([]byte, fieldparams.BLSSignatureLength),
|
||||
}
|
||||
}
|
||||
|
||||
// GenerateFullBlock generates a fully valid block with the requested parameters.
|
||||
// Use BlockGenConfig to declare the conditions you would like the block generated under.
|
||||
func GenerateFullBlock(
|
||||
|
||||
Reference in New Issue
Block a user