Diverse log improvements, comment additions and small refactors. (#14658)

* `logProposedBlock`: Fix log.

Before, the value of the pointer to the function were printed for `blockNumber`
instead of the block number itself.

* Add blob prefix before sidecars.

In order to prepare for data columns sidecars.

* Verification: Add log prefix.

* `validate_aggregate_proof.go`: Add comments.

* `blobSubscriber`: Fix error message.

* `registerHandlers`: Rename, add comments and little refactor.

* Remove duplicate `pb` vs. `ethpb` import.

* `rpc_ping.go`: Factorize / Add comments.

* `blobSidecarsByRangeRPCHandler`: Do not write error response if rate limited.

* `sendRecentBeaconBlocksRequest` ==> `sendBeaconBlocksRequest`.

The function itself does not know anything about the age of the beacon block.

* `beaconBlocksByRangeRPCHandler`: Refactor and add logs.

* `retentionSeconds` ==> `retentionDuration`.

* `oneEpoch`: Add documentation.

* `TestProposer_ProposeBlock_OK`: Improve error message.

* `getLocalPayloadFromEngine`: Tiny refactor.

* `eth1DataMajorityVote`: Improve log message.

* Implement `ConvertPeerIDToNodeID`and do note generate random private key if peerDAS is enabled.

* Remove useless `_`.

* `parsePeersEnr`: Fix error mesages.

* `ShouldOverrideFCU`: Fix error message.

* `blocks.go`: Minor comments improvements.

* CI: Upgrade golanci and enable spancheck.

* `ConvertPeerIDToNodeID`: Add godoc comment.

* Update CHANGELOG.md

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* Update beacon-chain/sync/initial-sync/service_test.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* Update beacon-chain/sync/rpc_beacon_blocks_by_range.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* Update beacon-chain/sync/rpc_blob_sidecars_by_range.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* Update beacon-chain/sync/rpc_ping.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* Remove trailing whitespace in godoc.

---------

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>
This commit is contained in:
Manu NALEPA
2024-11-25 10:22:33 +01:00
committed by GitHub
parent 415a42a4aa
commit 258908d50e
42 changed files with 421 additions and 246 deletions

View File

@@ -54,7 +54,7 @@ jobs:
- name: Golangci-lint
uses: golangci/golangci-lint-action@v5
with:
version: v1.55.2
version: v1.56.1
args: --config=.golangci.yml --out-${NO_FUTURE}format colored-line-number
build:

View File

@@ -73,6 +73,7 @@ linters:
- promlinter
- protogetter
- revive
- spancheck
- staticcheck
- stylecheck
- tagalign

View File

@@ -90,6 +90,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Fix panic on attestation interface since we call data before validation
- corrects nil check on some interface attestation types
- temporary solution to handling electra attesation and attester_slashing events. [pr](14655)
- Diverse log improvements and comment additions.
### Security

View File

@@ -23,10 +23,10 @@ import (
bolt "go.etcd.io/bbolt"
)
// used to represent errors for inconsistent slot ranges.
// Used to represent errors for inconsistent slot ranges.
var errInvalidSlotRange = errors.New("invalid end slot and start slot provided")
// Block retrieval by root.
// Block retrieval by root. Return nil if block is not found.
func (s *Store) Block(ctx context.Context, blockRoot [32]byte) (interfaces.ReadOnlySignedBeaconBlock, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.Block")
defer span.End()

View File

@@ -53,7 +53,7 @@ func (f *ForkChoice) ShouldOverrideFCU() (override bool) {
// Only reorg blocks that arrive late
early, err := head.arrivedEarly(f.store.genesisTime)
if err != nil {
log.WithError(err).Error("could not check if block arrived early")
log.WithError(err).Error("Could not check if block arrived early")
return
}
if early {

View File

@@ -75,6 +75,8 @@ go_library(
"//runtime/version:go_default_library",
"//time:go_default_library",
"//time/slots:go_default_library",
"@com_github_btcsuite_btcd_btcec_v2//:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/discover:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",

View File

@@ -165,14 +165,14 @@ func (s *Service) pubsubOptions() []pubsub.Option {
func parsePeersEnr(peers []string) ([]peer.AddrInfo, error) {
addrs, err := PeersFromStringAddrs(peers)
if err != nil {
return nil, fmt.Errorf("Cannot convert peers raw ENRs into multiaddresses: %w", err)
return nil, fmt.Errorf("cannot convert peers raw ENRs into multiaddresses: %w", err)
}
if len(addrs) == 0 {
return nil, fmt.Errorf("Converting peers raw ENRs into multiaddresses resulted in an empty list")
return nil, fmt.Errorf("converting peers raw ENRs into multiaddresses resulted in an empty list")
}
directAddrInfos, err := peer.AddrInfosFromP2pAddrs(addrs...)
if err != nil {
return nil, fmt.Errorf("Cannot convert peers multiaddresses into AddrInfos: %w", err)
return nil, fmt.Errorf("cannot convert peers multiaddresses into AddrInfos: %w", err)
}
return directAddrInfos, nil
}

View File

@@ -27,148 +27,148 @@ func NewFuzzTestP2P() *FakeP2P {
}
// Encoding -- fake.
func (_ *FakeP2P) Encoding() encoder.NetworkEncoding {
func (*FakeP2P) Encoding() encoder.NetworkEncoding {
return &encoder.SszNetworkEncoder{}
}
// AddConnectionHandler -- fake.
func (_ *FakeP2P) AddConnectionHandler(_, _ func(ctx context.Context, id peer.ID) error) {
func (*FakeP2P) AddConnectionHandler(_, _ func(ctx context.Context, id peer.ID) error) {
}
// AddDisconnectionHandler -- fake.
func (_ *FakeP2P) AddDisconnectionHandler(_ func(ctx context.Context, id peer.ID) error) {
func (*FakeP2P) AddDisconnectionHandler(_ func(ctx context.Context, id peer.ID) error) {
}
// AddPingMethod -- fake.
func (_ *FakeP2P) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) {
func (*FakeP2P) AddPingMethod(_ func(ctx context.Context, id peer.ID) error) {
}
// PeerID -- fake.
func (_ *FakeP2P) PeerID() peer.ID {
func (*FakeP2P) PeerID() peer.ID {
return "fake"
}
// ENR returns the enr of the local peer.
func (_ *FakeP2P) ENR() *enr.Record {
func (*FakeP2P) ENR() *enr.Record {
return new(enr.Record)
}
// DiscoveryAddresses -- fake
func (_ *FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
func (*FakeP2P) DiscoveryAddresses() ([]multiaddr.Multiaddr, error) {
return nil, nil
}
// FindPeersWithSubnet mocks the p2p func.
func (_ *FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) {
func (*FakeP2P) FindPeersWithSubnet(_ context.Context, _ string, _ uint64, _ int) (bool, error) {
return false, nil
}
// RefreshENR mocks the p2p func.
func (_ *FakeP2P) RefreshENR() {}
func (*FakeP2P) RefreshENR() {}
// LeaveTopic -- fake.
func (_ *FakeP2P) LeaveTopic(_ string) error {
func (*FakeP2P) LeaveTopic(_ string) error {
return nil
}
// Metadata -- fake.
func (_ *FakeP2P) Metadata() metadata.Metadata {
func (*FakeP2P) Metadata() metadata.Metadata {
return nil
}
// Peers -- fake.
func (_ *FakeP2P) Peers() *peers.Status {
func (*FakeP2P) Peers() *peers.Status {
return nil
}
// PublishToTopic -- fake.
func (_ *FakeP2P) PublishToTopic(_ context.Context, _ string, _ []byte, _ ...pubsub.PubOpt) error {
func (*FakeP2P) PublishToTopic(_ context.Context, _ string, _ []byte, _ ...pubsub.PubOpt) error {
return nil
}
// Send -- fake.
func (_ *FakeP2P) Send(_ context.Context, _ interface{}, _ string, _ peer.ID) (network.Stream, error) {
func (*FakeP2P) Send(_ context.Context, _ interface{}, _ string, _ peer.ID) (network.Stream, error) {
return nil, nil
}
// PubSub -- fake.
func (_ *FakeP2P) PubSub() *pubsub.PubSub {
func (*FakeP2P) PubSub() *pubsub.PubSub {
return nil
}
// MetadataSeq -- fake.
func (_ *FakeP2P) MetadataSeq() uint64 {
func (*FakeP2P) MetadataSeq() uint64 {
return 0
}
// SetStreamHandler -- fake.
func (_ *FakeP2P) SetStreamHandler(_ string, _ network.StreamHandler) {
func (*FakeP2P) SetStreamHandler(_ string, _ network.StreamHandler) {
}
// SubscribeToTopic -- fake.
func (_ *FakeP2P) SubscribeToTopic(_ string, _ ...pubsub.SubOpt) (*pubsub.Subscription, error) {
func (*FakeP2P) SubscribeToTopic(_ string, _ ...pubsub.SubOpt) (*pubsub.Subscription, error) {
return nil, nil
}
// JoinTopic -- fake.
func (_ *FakeP2P) JoinTopic(_ string, _ ...pubsub.TopicOpt) (*pubsub.Topic, error) {
func (*FakeP2P) JoinTopic(_ string, _ ...pubsub.TopicOpt) (*pubsub.Topic, error) {
return nil, nil
}
// Host -- fake.
func (_ *FakeP2P) Host() host.Host {
func (*FakeP2P) Host() host.Host {
return nil
}
// Disconnect -- fake.
func (_ *FakeP2P) Disconnect(_ peer.ID) error {
func (*FakeP2P) Disconnect(_ peer.ID) error {
return nil
}
// Broadcast -- fake.
func (_ *FakeP2P) Broadcast(_ context.Context, _ proto.Message) error {
func (*FakeP2P) Broadcast(_ context.Context, _ proto.Message) error {
return nil
}
// BroadcastAttestation -- fake.
func (_ *FakeP2P) BroadcastAttestation(_ context.Context, _ uint64, _ ethpb.Att) error {
func (*FakeP2P) BroadcastAttestation(_ context.Context, _ uint64, _ ethpb.Att) error {
return nil
}
// BroadcastSyncCommitteeMessage -- fake.
func (_ *FakeP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *ethpb.SyncCommitteeMessage) error {
func (*FakeP2P) BroadcastSyncCommitteeMessage(_ context.Context, _ uint64, _ *ethpb.SyncCommitteeMessage) error {
return nil
}
// BroadcastBlob -- fake.
func (_ *FakeP2P) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.BlobSidecar) error {
func (*FakeP2P) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.BlobSidecar) error {
return nil
}
// InterceptPeerDial -- fake.
func (_ *FakeP2P) InterceptPeerDial(peer.ID) (allow bool) {
func (*FakeP2P) InterceptPeerDial(peer.ID) (allow bool) {
return true
}
// InterceptAddrDial -- fake.
func (_ *FakeP2P) InterceptAddrDial(peer.ID, multiaddr.Multiaddr) (allow bool) {
func (*FakeP2P) InterceptAddrDial(peer.ID, multiaddr.Multiaddr) (allow bool) {
return true
}
// InterceptAccept -- fake.
func (_ *FakeP2P) InterceptAccept(_ network.ConnMultiaddrs) (allow bool) {
func (*FakeP2P) InterceptAccept(_ network.ConnMultiaddrs) (allow bool) {
return true
}
// InterceptSecured -- fake.
func (_ *FakeP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMultiaddrs) (allow bool) {
func (*FakeP2P) InterceptSecured(network.Direction, peer.ID, network.ConnMultiaddrs) (allow bool) {
return true
}
// InterceptUpgraded -- fake.
func (_ *FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) {
func (*FakeP2P) InterceptUpgraded(network.Conn) (allow bool, reason control.DisconnectReason) {
return true, 0
}

View File

@@ -18,12 +18,12 @@ type MockHost struct {
}
// ID --
func (_ *MockHost) ID() peer.ID {
func (*MockHost) ID() peer.ID {
return ""
}
// Peerstore --
func (_ *MockHost) Peerstore() peerstore.Peerstore {
func (*MockHost) Peerstore() peerstore.Peerstore {
return nil
}
@@ -33,46 +33,46 @@ func (m *MockHost) Addrs() []ma.Multiaddr {
}
// Network --
func (_ *MockHost) Network() network.Network {
func (*MockHost) Network() network.Network {
return nil
}
// Mux --
func (_ *MockHost) Mux() protocol.Switch {
func (*MockHost) Mux() protocol.Switch {
return nil
}
// Connect --
func (_ *MockHost) Connect(_ context.Context, _ peer.AddrInfo) error {
func (*MockHost) Connect(_ context.Context, _ peer.AddrInfo) error {
return nil
}
// SetStreamHandler --
func (_ *MockHost) SetStreamHandler(_ protocol.ID, _ network.StreamHandler) {}
func (*MockHost) SetStreamHandler(_ protocol.ID, _ network.StreamHandler) {}
// SetStreamHandlerMatch --
func (_ *MockHost) SetStreamHandlerMatch(protocol.ID, func(id protocol.ID) bool, network.StreamHandler) {
func (*MockHost) SetStreamHandlerMatch(protocol.ID, func(id protocol.ID) bool, network.StreamHandler) {
}
// RemoveStreamHandler --
func (_ *MockHost) RemoveStreamHandler(_ protocol.ID) {}
func (*MockHost) RemoveStreamHandler(_ protocol.ID) {}
// NewStream --
func (_ *MockHost) NewStream(_ context.Context, _ peer.ID, _ ...protocol.ID) (network.Stream, error) {
func (*MockHost) NewStream(_ context.Context, _ peer.ID, _ ...protocol.ID) (network.Stream, error) {
return nil, nil
}
// Close --
func (_ *MockHost) Close() error {
func (*MockHost) Close() error {
return nil
}
// ConnManager --
func (_ *MockHost) ConnManager() connmgr.ConnManager {
func (*MockHost) ConnManager() connmgr.ConnManager {
return nil
}
// EventBus --
func (_ *MockHost) EventBus() event.Bus {
func (*MockHost) EventBus() event.Bus {
return nil
}

View File

@@ -12,10 +12,15 @@ import (
"path"
"time"
"github.com/btcsuite/btcd/btcec/v2"
gCrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper"
ecdsaprysm "github.com/prysmaticlabs/prysm/v5/crypto/ecdsa"
"github.com/prysmaticlabs/prysm/v5/io/file"
@@ -62,6 +67,7 @@ func privKey(cfg *Config) (*ecdsa.PrivateKey, error) {
}
if defaultKeysExist {
log.WithField("filePath", defaultKeyPath).Info("Reading static P2P private key from a file. To generate a new random private key at every start, please remove this file.")
return privKeyFromFile(defaultKeyPath)
}
@@ -71,8 +77,8 @@ func privKey(cfg *Config) (*ecdsa.PrivateKey, error) {
return nil, err
}
// If the StaticPeerID flag is not set, return the private key.
if !cfg.StaticPeerID {
// If the StaticPeerID flag is not set and if peerDAS is not enabled, return the private key.
if !(cfg.StaticPeerID || params.PeerDASEnabled()) {
return ecdsaprysm.ConvertFromInterfacePrivKey(priv)
}
@@ -89,7 +95,7 @@ func privKey(cfg *Config) (*ecdsa.PrivateKey, error) {
return nil, err
}
log.Info("Wrote network key to file")
log.WithField("path", defaultKeyPath).Info("Wrote network key to file")
// Read the key from the defaultKeyPath file just written
// for the strongest guarantee that the next start will be the same as this one.
return privKeyFromFile(defaultKeyPath)
@@ -173,3 +179,27 @@ func verifyConnectivity(addr string, port uint, protocol string) {
}
}
}
// ConvertPeerIDToNodeID converts a peer ID (libp2p) to a node ID (devp2p).
func ConvertPeerIDToNodeID(pid peer.ID) (enode.ID, error) {
// Retrieve the public key object of the peer under "crypto" form.
pubkeyObjCrypto, err := pid.ExtractPublicKey()
if err != nil {
return [32]byte{}, errors.Wrapf(err, "extract public key from peer ID `%s`", pid)
}
// Extract the bytes representation of the public key.
compressedPubKeyBytes, err := pubkeyObjCrypto.Raw()
if err != nil {
return [32]byte{}, errors.Wrap(err, "public key raw")
}
// Retrieve the public key object of the peer under "SECP256K1" form.
pubKeyObjSecp256k1, err := btcec.ParsePubKey(compressedPubKeyBytes)
if err != nil {
return [32]byte{}, errors.Wrap(err, "parse public key")
}
newPubkey := &ecdsa.PublicKey{Curve: gCrypto.S256(), X: pubKeyObjSecp256k1.X(), Y: pubKeyObjSecp256k1.Y()}
return enode.PubkeyToIDV4(newPubkey), nil
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require"
@@ -64,3 +65,19 @@ func TestSerializeENR(t *testing.T) {
assert.ErrorContains(t, "could not serialize nil record", err)
})
}
func TestConvertPeerIDToNodeID(t *testing.T) {
const (
peerIDStr = "16Uiu2HAmRrhnqEfybLYimCiAYer2AtZKDGamQrL1VwRCyeh2YiFc"
expectedNodeIDStr = "eed26c5d2425ab95f57246a5dca87317c41cacee4bcafe8bbe57e5965527c290"
)
peerID, err := peer.Decode(peerIDStr)
require.NoError(t, err)
actualNodeID, err := ConvertPeerIDToNodeID(peerID)
require.NoError(t, err)
actualNodeIDStr := actualNodeID.String()
require.Equal(t, expectedNodeIDStr, actualNodeIDStr)
}

View File

@@ -339,7 +339,7 @@ func (vs *Server) handleBlindedBlock(ctx context.Context, block interfaces.Signe
sidecars, err := unblindBlobsSidecars(copiedBlock, bundle)
if err != nil {
return nil, nil, errors.Wrap(err, "unblind sidecars failed")
return nil, nil, errors.Wrap(err, "unblind blobs sidecars: commitment value doesn't match block")
}
return copiedBlock, sidecars, nil

View File

@@ -54,7 +54,7 @@ func (vs *Server) eth1DataMajorityVote(ctx context.Context, beaconState state.Be
// by ETH1_FOLLOW_DISTANCE. The head state should maintain the same ETH1Data until this condition has passed, so
// trust the existing head for the right eth1 vote until we can get a meaningful value from the deposit contract.
if latestValidTime < genesisTime+followDistanceSeconds {
log.WithField("genesisTime", genesisTime).WithField("latestValidTime", latestValidTime).Warn("voting period before genesis + follow distance, using eth1data from head")
log.WithField("genesisTime", genesisTime).WithField("latestValidTime", latestValidTime).Warn("Voting period before genesis + follow distance, using eth1data from head")
return vs.HeadFetcher.HeadETH1Data(), nil
}

View File

@@ -84,7 +84,6 @@ func (vs *Server) getLocalPayloadFromEngine(
}
setFeeRecipientIfBurnAddress(&val)
var err error
if ok && payloadId != [8]byte{} {
// Payload ID is cache hit. Return the cached payload ID.
var pid primitives.PayloadID
@@ -102,7 +101,7 @@ func (vs *Server) getLocalPayloadFromEngine(
return nil, errors.Wrap(err, "could not get cached payload from execution client")
}
}
log.WithFields(logFields).Debug("payload ID cache miss")
log.WithFields(logFields).Debug("Payload ID cache miss")
parentHash, err := vs.getParentBlockHash(ctx, st, slot)
switch {
case errors.Is(err, errActivationNotReached) || errors.Is(err, errNoTerminalBlockHash):
@@ -191,7 +190,7 @@ func (vs *Server) getLocalPayloadFromEngine(
}
warnIfFeeRecipientDiffers(val.FeeRecipient[:], res.ExecutionData.FeeRecipient())
log.WithField("value", res.Bid).Debug("received execution payload from local engine")
log.WithField("value", res.Bid).Debug("Received execution payload from local engine")
return res, nil
}

View File

@@ -912,7 +912,7 @@ func TestProposer_ProposeBlock_OK(t *testing.T) {
return &ethpb.GenericSignedBeaconBlock{Block: blk}
},
useBuilder: true,
err: "unblind sidecars failed: commitment value doesn't match block",
err: "unblind blobs sidecars: commitment value doesn't match block",
},
{
name: "electra block no blob",

View File

@@ -107,7 +107,7 @@ type blobBatchVerifier struct {
func (bbv *blobBatchVerifier) newVerifier(rb blocks.ROBlob) verification.BlobVerifier {
m := bbv.verifiers[rb.BlockRoot()]
m[rb.Index] = bbv.newBlobVerifier(rb, verification.BackfillSidecarRequirements)
m[rb.Index] = bbv.newBlobVerifier(rb, verification.BackfillBlobSidecarRequirements)
bbv.verifiers[rb.BlockRoot()] = m
return m[rb.Index]
}

View File

@@ -388,6 +388,7 @@ func TestService_CheckForPreviousEpochFork(t *testing.T) {
}
}
// oneEpoch returns the duration of one epoch.
func oneEpoch() time.Duration {
return time.Duration(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot)) * time.Second
}

View File

@@ -172,7 +172,7 @@ func (s *Service) processFetchedDataRegSync(
if len(bwb) == 0 {
return
}
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
batchFields := logrus.Fields{
"firstSlot": data.bwb[0].Block.Block().Slot(),
@@ -331,7 +331,7 @@ func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
errParentDoesNotExist, first.Block().ParentRoot(), first.Block().Slot())
}
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
s.logBatchSyncStatus(genesis, first, len(bwb))
for _, bb := range bwb {

View File

@@ -340,7 +340,7 @@ func (s *Service) fetchOriginBlobs(pids []peer.ID) error {
if len(sidecars) != len(req) {
continue
}
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncSidecarRequirements)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.InitsyncBlobSidecarRequirements)
avs := das.NewLazilyPersistentStore(s.cfg.BlobStorage, bv)
current := s.clock.CurrentSlot()
if err := avs.Persist(current, sidecars...); err != nil {

View File

@@ -495,8 +495,8 @@ func TestOriginOutsideRetention(t *testing.T) {
bdb := dbtest.SetupDB(t)
genesis := time.Unix(0, 0)
secsPerEpoch := params.BeaconConfig().SecondsPerSlot * uint64(params.BeaconConfig().SlotsPerEpoch)
retentionSeconds := time.Second * time.Duration(uint64(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest+1)*secsPerEpoch)
outsideRetention := genesis.Add(retentionSeconds)
retentionPeriod := time.Second * time.Duration(uint64(params.BeaconConfig().MinEpochsForBlobsSidecarsRequest+1)*secsPerEpoch)
outsideRetention := genesis.Add(retentionPeriod)
now := func() time.Time {
return outsideRetention
}

View File

@@ -315,7 +315,7 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
if uint64(len(roots)) > maxReqBlock {
req = roots[:maxReqBlock]
}
if err := s.sendRecentBeaconBlocksRequest(ctx, &req, pid); err != nil {
if err := s.sendBeaconBlocksRequest(ctx, &req, pid); err != nil {
tracing.AnnotateError(span, err)
log.WithError(err).Debug("Could not send recent block request")
}

View File

@@ -16,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
)
// beaconBlocksByRangeRPCHandler looks up the request blocks from the database from a given start block.
@@ -26,15 +27,23 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
defer cancel()
SetRPCStreamDeadlines(stream)
remotePeer := stream.Conn().RemotePeer()
m, ok := msg.(*pb.BeaconBlocksByRangeRequest)
if !ok {
return errors.New("message is not type *pb.BeaconBlockByRangeRequest")
}
log.WithField("startSlot", m.StartSlot).WithField("count", m.Count).Debug("Serving block by range request")
log.WithFields(logrus.Fields{
"startSlot": m.StartSlot,
"count": m.Count,
"peer": remotePeer,
}).Debug("Serving block by range request")
rp, err := validateRangeRequest(m, s.cfg.clock.CurrentSlot())
if err != nil {
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(remotePeer)
tracing.AnnotateError(span, err)
return err
}
@@ -50,12 +59,12 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
if err != nil {
return err
}
remainingBucketCapacity := blockLimiter.Remaining(stream.Conn().RemotePeer().String())
remainingBucketCapacity := blockLimiter.Remaining(remotePeer.String())
span.SetAttributes(
trace.Int64Attribute("start", int64(rp.start)), // lint:ignore uintcast -- This conversion is OK for tracing.
trace.Int64Attribute("end", int64(rp.end)), // lint:ignore uintcast -- This conversion is OK for tracing.
trace.Int64Attribute("count", int64(m.Count)),
trace.StringAttribute("peer", stream.Conn().RemotePeer().String()),
trace.StringAttribute("peer", remotePeer.String()),
trace.Int64Attribute("remaining_capacity", remainingBucketCapacity),
)
@@ -82,12 +91,19 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
}
rpcBlocksByRangeResponseLatency.Observe(float64(time.Since(batchStart).Milliseconds()))
}
if err := batch.error(); err != nil {
log.WithError(err).Debug("error in BlocksByRange batch")
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
log.WithError(err).Debug("Serving block by range request - BlocksByRange batch")
// If a rate limit is hit, it means an error response has already been sent and the stream has been closed.
if !errors.Is(err, p2ptypes.ErrRateLimited) {
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
}
tracing.AnnotateError(span, err)
return err
}
closeStream(stream, log)
return nil
}

View File

@@ -20,9 +20,9 @@ import (
"github.com/prysmaticlabs/prysm/v5/time/slots"
)
// sendRecentBeaconBlocksRequest sends a recent beacon blocks request to a peer to get
// sendBeaconBlocksRequest sends a recent beacon blocks request to a peer to get
// those corresponding blocks from that peer.
func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, requests *types.BeaconBlockByRootsReq, id peer.ID) error {
func (s *Service) sendBeaconBlocksRequest(ctx context.Context, requests *types.BeaconBlockByRootsReq, id peer.ID) error {
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
@@ -151,7 +151,7 @@ func (s *Service) sendAndSaveBlobSidecars(ctx context.Context, request types.Blo
if len(sidecars) != len(request) {
return fmt.Errorf("received %d blob sidecars, expected %d for RPC", len(sidecars), len(request))
}
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.PendingQueueSidecarRequirements)
bv := verification.NewBlobBatchVerifier(s.newBlobVerifier, verification.PendingQueueBlobSidecarRequirements)
for _, sidecar := range sidecars {
if err := verify.BlobAlignsWithBlock(sidecar, RoBlock); err != nil {
return err

View File

@@ -253,7 +253,7 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) {
})
p1.Connect(p2)
require.NoError(t, r.sendRecentBeaconBlocksRequest(context.Background(), &expectedRoots, p2.PeerID()))
require.NoError(t, r.sendBeaconBlocksRequest(context.Background(), &expectedRoots, p2.PeerID()))
if util.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
@@ -328,7 +328,7 @@ func TestRecentBeaconBlocks_RPCRequestSent_IncorrectRoot(t *testing.T) {
})
p1.Connect(p2)
require.ErrorContains(t, "received unexpected block with root", r.sendRecentBeaconBlocksRequest(context.Background(), &expectedRoots, p2.PeerID()))
require.ErrorContains(t, "received unexpected block with root", r.sendBeaconBlocksRequest(context.Background(), &expectedRoots, p2.PeerID()))
}
func TestRecentBeaconBlocksRPCHandler_HandleZeroBlocks(t *testing.T) {

View File

@@ -99,6 +99,7 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa
}
var batch blockBatch
wQuota := params.BeaconConfig().MaxRequestBlobSidecars
for batch, ok = batcher.next(ctx, stream); ok; batch, ok = batcher.next(ctx, stream) {
batchStart := time.Now()
@@ -114,7 +115,12 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa
}
if err := batch.error(); err != nil {
log.WithError(err).Debug("error in BlobSidecarsByRange batch")
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
// If a rate limit is hit, it means an error response has already been sent and the stream has been closed.
if !errors.Is(err, p2ptypes.ErrRateLimited) {
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
}
tracing.AnnotateError(span, err)
return err
}

View File

@@ -2,12 +2,12 @@ package sync
import (
"context"
"errors"
"fmt"
"strings"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
@@ -16,127 +16,191 @@ import (
)
// pingHandler reads the incoming ping rpc message from the peer.
// If the peer's sequence number is higher than the one stored locally,
// a METADATA request is sent to the peer to retrieve and update the latest metadata.
// Note: This function is misnamed, as it performs more than just reading a ping message.
func (s *Service) pingHandler(_ context.Context, msg interface{}, stream libp2pcore.Stream) error {
SetRPCStreamDeadlines(stream)
// Convert the message to SSW Uint64 type.
m, ok := msg.(*primitives.SSZUint64)
if !ok {
return fmt.Errorf("wrong message type for ping, got %T, wanted *uint64", msg)
}
// Validate the incoming request regarding rate limiting.
if err := s.rateLimiter.validateRequest(stream, 1); err != nil {
return err
return errors.Wrap(err, "validate request")
}
s.rateLimiter.add(stream, 1)
valid, err := s.validateSequenceNum(*m, stream.Conn().RemotePeer())
// Retrieve the peer ID.
peerID := stream.Conn().RemotePeer()
// Check if the peer sequence number is higher than the one we have in our store.
valid, err := s.validateSequenceNum(*m, peerID)
if err != nil {
// Descore peer for giving us a bad sequence number.
if errors.Is(err, p2ptypes.ErrInvalidSequenceNum) {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
s.writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrInvalidSequenceNum.Error(), stream)
}
return err
return errors.Wrap(err, "validate sequence number")
}
// We can already prepare a success response to the peer.
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
return err
return errors.Wrap(err, "write response")
}
sq := primitives.SSZUint64(s.cfg.p2p.MetadataSeq())
if _, err := s.cfg.p2p.Encoding().EncodeWithMaxLength(stream, &sq); err != nil {
// Retrieve our own sequence number.
seqNumber := s.cfg.p2p.MetadataSeq()
// SSZ encode our sequence number.
seqNumberSSZ := primitives.SSZUint64(seqNumber)
// Send our sequence number back to the peer.
if _, err := s.cfg.p2p.Encoding().EncodeWithMaxLength(stream, &seqNumberSSZ); err != nil {
return err
}
closeStream(stream, log)
if valid {
// If the sequence number was valid we're done.
// If the peer's sequence numberwas valid we're done.
return nil
}
// The sequence number was not valid. Start our own ping back to the peer.
// The peer's sequence number was not valid. We ask the peer for its metadata.
go func() {
// New context so the calling function doesn't cancel on us.
// Define a new context so the calling function doesn't cancel on us.
ctx, cancel := context.WithTimeout(context.Background(), ttfbTimeout)
defer cancel()
md, err := s.sendMetaDataRequest(ctx, stream.Conn().RemotePeer())
// Send a METADATA request to the peer.
peerMetadata, err := s.sendMetaDataRequest(ctx, peerID)
if err != nil {
// We cannot compare errors directly as the stream muxer error
// type isn't compatible with the error we have, so a direct
// equality checks fails.
if !strings.Contains(err.Error(), p2ptypes.ErrIODeadline.Error()) {
log.WithField("peer", stream.Conn().RemotePeer()).WithError(err).Debug("Could not send metadata request")
log.WithField("peer", peerID).WithError(err).Debug("Could not send metadata request")
}
return
}
// update metadata if there is no error
s.cfg.p2p.Peers().SetMetadata(stream.Conn().RemotePeer(), md)
// Update peer's metadata.
s.cfg.p2p.Peers().SetMetadata(peerID, peerMetadata)
}()
return nil
}
func (s *Service) sendPingRequest(ctx context.Context, id peer.ID) error {
// sendPingRequest first sends a PING request to the peer.
// If the peer responds with a sequence number higher than latest one for it we have in our store,
// then this function sends a METADATA request to the peer, and stores the metadata received.
// This function is actually poorly named, since it does more than just sending a ping request.
func (s *Service) sendPingRequest(ctx context.Context, peerID peer.ID) error {
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
metadataSeq := primitives.SSZUint64(s.cfg.p2p.MetadataSeq())
topic, err := p2p.TopicFromMessage(p2p.PingMessageName, slots.ToEpoch(s.cfg.clock.CurrentSlot()))
// Get the current epoch.
currentSlot := s.cfg.clock.CurrentSlot()
currentEpoch := slots.ToEpoch(currentSlot)
// SSZ encode our metadata sequence number.
metadataSeq := s.cfg.p2p.MetadataSeq()
encodedMetadataSeq := primitives.SSZUint64(metadataSeq)
// Get the PING topic for the current epoch.
topic, err := p2p.TopicFromMessage(p2p.PingMessageName, currentEpoch)
if err != nil {
return err
return errors.Wrap(err, "topic from message")
}
stream, err := s.cfg.p2p.Send(ctx, &metadataSeq, topic, id)
// Send the PING request to the peer.
stream, err := s.cfg.p2p.Send(ctx, &encodedMetadataSeq, topic, peerID)
if err != nil {
return err
return errors.Wrap(err, "send ping request")
}
currentTime := time.Now()
defer closeStream(stream, log)
startTime := time.Now()
// Read the response from the peer.
code, errMsg, err := ReadStatusCode(stream, s.cfg.p2p.Encoding())
if err != nil {
return err
return errors.Wrap(err, "read status code")
}
// Records the latency of the ping request for that peer.
s.cfg.p2p.Host().Peerstore().RecordLatency(id, time.Now().Sub(currentTime))
// Record the latency of the ping request for that peer.
s.cfg.p2p.Host().Peerstore().RecordLatency(peerID, time.Now().Sub(startTime))
// If the peer responded with an error, increment the bad responses scorer.
if code != 0 {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
return errors.New(errMsg)
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
return errors.Errorf("code: %d - %s", code, errMsg)
}
// Decode the sequence number from the peer.
msg := new(primitives.SSZUint64)
if err := s.cfg.p2p.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
return err
return errors.Wrap(err, "decode sequence number")
}
valid, err := s.validateSequenceNum(*msg, stream.Conn().RemotePeer())
// Determine if the peer's sequence number returned by the peer is higher than the one we have in our store.
valid, err := s.validateSequenceNum(*msg, peerID)
if err != nil {
// Descore peer for giving us a bad sequence number.
if errors.Is(err, p2ptypes.ErrInvalidSequenceNum) {
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(peerID)
}
return err
return errors.Wrap(err, "validate sequence number")
}
// The sequence number have in our store for this peer is the same as the one returned by the peer, all good.
if valid {
return nil
}
md, err := s.sendMetaDataRequest(ctx, stream.Conn().RemotePeer())
// We need to send a METADATA request to the peer to get its latest metadata.
md, err := s.sendMetaDataRequest(ctx, peerID)
if err != nil {
// do not increment bad responses, as its
// already done in the request method.
return err
// do not increment bad responses, as its already done in the request method.
return errors.Wrap(err, "send metadata request")
}
s.cfg.p2p.Peers().SetMetadata(stream.Conn().RemotePeer(), md)
// Update the metadata for the peer.
s.cfg.p2p.Peers().SetMetadata(peerID, md)
return nil
}
// validates the peer's sequence number.
// validateSequenceNum validates the peer's sequence number.
// - If the peer's sequence number is greater than the sequence number we have in our store for the peer, return false.
// - If the peer's sequence number is equal to the sequence number we have in our store for the peer, return true.
// - If the peer's sequence number is less than the sequence number we have in our store for the peer, return an error.
func (s *Service) validateSequenceNum(seq primitives.SSZUint64, id peer.ID) (bool, error) {
// Retrieve the metadata for the peer we got in our store.
md, err := s.cfg.p2p.Peers().Metadata(id)
if err != nil {
return false, err
return false, errors.Wrap(err, "get metadata")
}
// If we have no metadata for the peer, return false.
if md == nil || md.IsNil() {
return false, nil
}
// Return error on invalid sequence number.
// The peer's sequence number must be less than or equal to the sequence number we have in our store.
if md.SequenceNumber() > uint64(seq) {
return false, p2ptypes.ErrInvalidSequenceNum
}
// Return true if the peer's sequence number is equal to the sequence number we have in our store.
return md.SequenceNumber() == uint64(seq), nil
}

View File

@@ -19,7 +19,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
@@ -49,7 +48,7 @@ type BeaconBlockProcessor func(block interfaces.ReadOnlySignedBeaconBlock) error
// SendBeaconBlocksByRangeRequest sends BeaconBlocksByRange and returns fetched blocks, if any.
func SendBeaconBlocksByRangeRequest(
ctx context.Context, tor blockchain.TemporalOracle, p2pProvider p2p.SenderEncoder, pid peer.ID,
req *pb.BeaconBlocksByRangeRequest, blockProcessor BeaconBlockProcessor,
req *ethpb.BeaconBlocksByRangeRequest, blockProcessor BeaconBlockProcessor,
) ([]interfaces.ReadOnlySignedBeaconBlock, error) {
topic, err := p2p.TopicFromMessage(p2p.BeaconBlocksByRangeMessageName, slots.ToEpoch(tor.CurrentSlot()))
if err != nil {
@@ -155,7 +154,7 @@ func SendBeaconBlocksByRootRequest(
return blocks, nil
}
func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *pb.BlobSidecarsByRangeRequest, bvs ...BlobResponseValidation) ([]blocks.ROBlob, error) {
func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *ethpb.BlobSidecarsByRangeRequest, bvs ...BlobResponseValidation) ([]blocks.ROBlob, error) {
topic, err := p2p.TopicFromMessage(p2p.BlobSidecarsByRangeName, slots.ToEpoch(tor.CurrentSlot()))
if err != nil {
return nil, err
@@ -298,7 +297,7 @@ func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) BlobResponseV
}
}
func blobValidatorFromRangeReq(req *pb.BlobSidecarsByRangeRequest) BlobResponseValidation {
func blobValidatorFromRangeReq(req *ethpb.BlobSidecarsByRangeRequest) BlobResponseValidation {
end := req.StartSlot + primitives.Slot(req.Count)
return func(sc blocks.ROBlob) error {
if sc.Slot() < req.StartSlot || sc.Slot() >= end {

View File

@@ -15,6 +15,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
gcache "github.com/patrickmn/go-cache"
"github.com/pkg/errors"
"github.com/trailofbits/go-mutexasserts"
"github.com/prysmaticlabs/prysm/v5/async"
"github.com/prysmaticlabs/prysm/v5/async/abool"
"github.com/prysmaticlabs/prysm/v5/async/event"
@@ -44,22 +46,24 @@ import (
"github.com/prysmaticlabs/prysm/v5/runtime"
prysmTime "github.com/prysmaticlabs/prysm/v5/time"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/trailofbits/go-mutexasserts"
)
var _ runtime.Service = (*Service)(nil)
const rangeLimit uint64 = 1024
const seenBlockSize = 1000
const seenBlobSize = seenBlockSize * 4 // Each block can have max 4 blobs. Worst case 164kB for cache.
const seenUnaggregatedAttSize = 20000
const seenAggregatedAttSize = 16384
const seenSyncMsgSize = 1000 // Maximum of 512 sync committee members, 1000 is a safe amount.
const seenSyncContributionSize = 512 // Maximum of SYNC_COMMITTEE_SIZE as specified by the spec.
const seenExitSize = 100
const seenProposerSlashingSize = 100
const badBlockSize = 1000
const syncMetricsInterval = 10 * time.Second
const (
rangeLimit uint64 = 1024
seenBlockSize = 1000
seenBlobSize = seenBlockSize * 6 // Each block can have max 6 blobs.
seenDataColumnSize = seenBlockSize * 128 // Each block can have max 128 data columns.
seenUnaggregatedAttSize = 20000
seenAggregatedAttSize = 16384
seenSyncMsgSize = 1000 // Maximum of 512 sync committee members, 1000 is a safe amount.
seenSyncContributionSize = 512 // Maximum of SYNC_COMMITTEE_SIZE as specified by the spec.
seenExitSize = 100
seenProposerSlashingSize = 100
badBlockSize = 1000
syncMetricsInterval = 10 * time.Second
)
var (
// Seconds in one epoch.
@@ -162,18 +166,18 @@ type Service struct {
// NewService initializes new regular sync service.
func NewService(ctx context.Context, opts ...Option) *Service {
c := gcache.New(pendingBlockExpTime /* exp time */, 0 /* disable janitor */)
ctx, cancel := context.WithCancel(ctx)
r := &Service{
ctx: ctx,
cancel: cancel,
chainStarted: abool.New(),
cfg: &config{clock: startup.NewClock(time.Unix(0, 0), [32]byte{})},
slotToPendingBlocks: c,
slotToPendingBlocks: gcache.New(pendingBlockExpTime /* exp time */, 0 /* disable janitor */),
seenPendingBlocks: make(map[[32]byte]bool),
blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof),
signatureChan: make(chan *signatureVerifier, verifierLimit),
}
for _, opt := range opts {
if err := opt(r); err != nil {
return nil
@@ -224,7 +228,7 @@ func (s *Service) Start() {
s.newBlobVerifier = newBlobVerifierFromInitializer(v)
go s.verifierRoutine()
go s.registerHandlers()
go s.startTasksPostInitialSync()
s.cfg.p2p.AddConnectionHandler(s.reValidatePeer, s.sendGoodbye)
s.cfg.p2p.AddDisconnectionHandler(func(_ context.Context, _ peer.ID) error {
@@ -315,23 +319,31 @@ func (s *Service) waitForChainStart() {
s.markForChainStart()
}
func (s *Service) registerHandlers() {
func (s *Service) startTasksPostInitialSync() {
// Wait for the chain to start.
s.waitForChainStart()
select {
case <-s.initialSyncComplete:
// Register respective pubsub handlers at state synced event.
digest, err := s.currentForkDigest()
// Compute the current epoch.
currentSlot := slots.CurrentSlot(uint64(s.cfg.clock.GenesisTime().Unix()))
currentEpoch := slots.ToEpoch(currentSlot)
// Compute the current fork forkDigest.
forkDigest, err := s.currentForkDigest()
if err != nil {
log.WithError(err).Error("Could not retrieve current fork digest")
return
}
currentEpoch := slots.ToEpoch(slots.CurrentSlot(uint64(s.cfg.clock.GenesisTime().Unix())))
s.registerSubscribers(currentEpoch, digest)
// Register respective pubsub handlers at state synced event.
s.registerSubscribers(currentEpoch, forkDigest)
// Start the fork watcher.
go s.forkWatcher()
return
case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
}
}

View File

@@ -62,7 +62,7 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
}
topic := "/eth2/%x/beacon_block"
go r.registerHandlers()
go r.startTasksPostInitialSync()
time.Sleep(100 * time.Millisecond)
var vr [32]byte
@@ -143,7 +143,7 @@ func TestSyncHandlers_WaitTillSynced(t *testing.T) {
syncCompleteCh := make(chan bool)
go func() {
r.registerHandlers()
r.startTasksPostInitialSync()
syncCompleteCh <- true
}()
@@ -200,7 +200,7 @@ func TestSyncService_StopCleanly(t *testing.T) {
initialSyncComplete: make(chan struct{}),
}
go r.registerHandlers()
go r.startTasksPostInitialSync()
var vr [32]byte
require.NoError(t, gs.SetClock(startup.NewClock(time.Now(), vr)))
r.waitForChainStart()

View File

@@ -13,7 +13,7 @@ import (
func (s *Service) blobSubscriber(ctx context.Context, msg proto.Message) error {
b, ok := msg.(blocks.VerifiedROBlob)
if !ok {
return fmt.Errorf("message was not type blocks.ROBlob, type=%T", msg)
return fmt.Errorf("message was not type blocks.VerifiedROBlob, type=%T", msg)
}
return s.subscribeBlob(ctx, b)

View File

@@ -117,6 +117,9 @@ func (s *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
if seen {
return pubsub.ValidationIgnore, nil
}
// Verify the block being voted on is in the beacon chain.
// If not, store this attestation in the map of pending attestations.
if !s.validateBlockInAttestation(ctx, m) {
return pubsub.ValidationIgnore, nil
}
@@ -222,6 +225,8 @@ func (s *Service) validateAggregatedAtt(ctx context.Context, signed ethpb.Signed
return s.validateWithBatchVerifier(ctx, "aggregate", set)
}
// validateBlocksInAttestation checks if the block being voted on is in the beaconDB.
// If not, it store this attestation in the map of pending attestations.
func (s *Service) validateBlockInAttestation(ctx context.Context, satt ethpb.SignedAggregateAttAndProof) bool {
// Verify the block being voted and the processed state is in beaconDB. The block should have passed validation if it's in the beaconDB.
blockRoot := bytesutil.ToBytes32(satt.AggregateAttestationAndProof().AggregateVal().GetData().BeaconBlockRoot)

View File

@@ -51,7 +51,7 @@ func (s *Service) validateBlob(ctx context.Context, pid peer.ID, msg *pubsub.Mes
if err != nil {
return pubsub.ValidationReject, errors.Wrap(err, "roblob conversion failure")
}
vf := s.newBlobVerifier(blob, verification.GossipSidecarRequirements)
vf := s.newBlobVerifier(blob, verification.GossipBlobSidecarRequirements)
if err := vf.BlobIndexInBounds(); err != nil {
return pubsub.ValidationReject, err

View File

@@ -10,6 +10,7 @@ go_library(
"fake.go",
"initializer.go",
"interface.go",
"log.go",
"metrics.go",
"mock.go",
"result.go",

View File

@@ -169,7 +169,7 @@ func TestBatchVerifier(t *testing.T) {
blk, blbs := c.bandb(t, c.nblobs)
reqs := c.reqs
if reqs == nil {
reqs = InitsyncSidecarRequirements
reqs = InitsyncBlobSidecarRequirements
}
bbv := NewBlobBatchVerifier(c.nv(), reqs)
if c.cv == nil {

View File

@@ -2,6 +2,7 @@ package verification
import (
"context"
goError "errors"
"github.com/pkg/errors"
forkchoicetypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/types"
@@ -12,7 +13,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/runtime/logging"
"github.com/prysmaticlabs/prysm/v5/time/slots"
log "github.com/sirupsen/logrus"
)
const (
@@ -29,7 +29,7 @@ const (
RequireSidecarProposerExpected
)
var allSidecarRequirements = []Requirement{
var allBlobSidecarRequirements = []Requirement{
RequireBlobIndexInBounds,
RequireNotFromFutureSlot,
RequireSlotAboveFinalized,
@@ -43,21 +43,21 @@ var allSidecarRequirements = []Requirement{
RequireSidecarProposerExpected,
}
// GossipSidecarRequirements defines the set of requirements that BlobSidecars received on gossip
// GossipBlobSidecarRequirements defines the set of requirements that BlobSidecars received on gossip
// must satisfy in order to upgrade an ROBlob to a VerifiedROBlob.
var GossipSidecarRequirements = requirementList(allSidecarRequirements).excluding()
var GossipBlobSidecarRequirements = requirementList(allBlobSidecarRequirements).excluding()
// SpectestSidecarRequirements is used by the forkchoice spectests when verifying blobs used in the on_block tests.
// SpectestBlobSidecarRequirements is used by the forkchoice spectests when verifying blobs used in the on_block tests.
// The only requirements we exclude for these tests are the parent validity and seen tests, as these are specific to
// gossip processing and require the bad block cache that we only use there.
var SpectestSidecarRequirements = requirementList(GossipSidecarRequirements).excluding(
var SpectestBlobSidecarRequirements = requirementList(GossipBlobSidecarRequirements).excluding(
RequireSidecarParentSeen, RequireSidecarParentValid)
// InitsyncSidecarRequirements is the list of verification requirements to be used by the init-sync service
// InitsyncBlobSidecarRequirements is the list of verification requirements to be used by the init-sync service
// for batch-mode syncing. Because we only perform batch verification as part of the IsDataAvailable method
// for blobs after the block has been verified, and the blobs to be verified are keyed in the cache by the
// block root, the list of required verifications is much shorter than gossip.
var InitsyncSidecarRequirements = requirementList(GossipSidecarRequirements).excluding(
var InitsyncBlobSidecarRequirements = requirementList(GossipBlobSidecarRequirements).excluding(
RequireNotFromFutureSlot,
RequireSlotAboveFinalized,
RequireSidecarParentSeen,
@@ -71,36 +71,16 @@ var InitsyncSidecarRequirements = requirementList(GossipSidecarRequirements).exc
// execution layer mempool. Only the KZG proof verification is required.
var ELMemPoolRequirements = []Requirement{RequireSidecarKzgProofVerified}
// BackfillSidecarRequirements is the same as InitsyncSidecarRequirements.
var BackfillSidecarRequirements = requirementList(InitsyncSidecarRequirements).excluding()
// BackfillBlobSidecarRequirements is the same as InitsyncBlobSidecarRequirements.
var BackfillBlobSidecarRequirements = requirementList(InitsyncBlobSidecarRequirements).excluding()
// PendingQueueSidecarRequirements is the same as InitsyncSidecarRequirements, used by the pending blocks queue.
var PendingQueueSidecarRequirements = requirementList(InitsyncSidecarRequirements).excluding()
// PendingQueueBlobSidecarRequirements is the same as InitsyncBlobSidecarRequirements, used by the pending blocks queue.
var PendingQueueBlobSidecarRequirements = requirementList(InitsyncBlobSidecarRequirements).excluding()
var (
ErrBlobInvalid = errors.New("blob failed verification")
// ErrBlobIndexInvalid means RequireBlobIndexInBounds failed.
ErrBlobIndexInvalid = errors.Wrap(ErrBlobInvalid, "incorrect blob sidecar index")
// ErrFromFutureSlot means RequireSlotNotTooEarly failed.
ErrFromFutureSlot = errors.Wrap(ErrBlobInvalid, "slot is too far in the future")
// ErrSlotNotAfterFinalized means RequireSlotAboveFinalized failed.
ErrSlotNotAfterFinalized = errors.Wrap(ErrBlobInvalid, "slot <= finalized checkpoint")
// ErrInvalidProposerSignature means RequireValidProposerSignature failed.
ErrInvalidProposerSignature = errors.Wrap(ErrBlobInvalid, "proposer signature could not be verified")
// ErrSidecarParentNotSeen means RequireSidecarParentSeen failed.
ErrSidecarParentNotSeen = errors.Wrap(ErrBlobInvalid, "parent root has not been seen")
// ErrSidecarParentInvalid means RequireSidecarParentValid failed.
ErrSidecarParentInvalid = errors.Wrap(ErrBlobInvalid, "parent block is not valid")
// ErrSlotNotAfterParent means RequireSidecarParentSlotLower failed.
ErrSlotNotAfterParent = errors.Wrap(ErrBlobInvalid, "slot <= slot")
// ErrSidecarNotFinalizedDescendent means RequireSidecarDescendsFromFinalized failed.
ErrSidecarNotFinalizedDescendent = errors.Wrap(ErrBlobInvalid, "blob parent is not descended from the finalized block")
// ErrSidecarInclusionProofInvalid means RequireSidecarInclusionProven failed.
ErrSidecarInclusionProofInvalid = errors.Wrap(ErrBlobInvalid, "sidecar inclusion proof verification failed")
// ErrSidecarKzgProofInvalid means RequireSidecarKzgProofVerified failed.
ErrSidecarKzgProofInvalid = errors.Wrap(ErrBlobInvalid, "sidecar kzg commitment proof verification failed")
// ErrSidecarUnexpectedProposer means RequireSidecarProposerExpected failed.
ErrSidecarUnexpectedProposer = errors.Wrap(ErrBlobInvalid, "sidecar was not proposed by the expected proposer_index")
ErrBlobIndexInvalid = errors.New("incorrect blob sidecar index")
)
type ROBlobVerifier struct {
@@ -149,7 +129,7 @@ func (bv *ROBlobVerifier) BlobIndexInBounds() (err error) {
defer bv.recordResult(RequireBlobIndexInBounds, &err)
if bv.blob.Index >= fieldparams.MaxBlobsPerBlock {
log.WithFields(logging.BlobFields(bv.blob)).Debug("Sidecar index >= MAX_BLOBS_PER_BLOCK")
return ErrBlobIndexInvalid
return blobErrBuilder(ErrBlobIndexInvalid)
}
return nil
}
@@ -168,7 +148,7 @@ func (bv *ROBlobVerifier) NotFromFutureSlot() (err error) {
// If the system time is still before earliestStart, we consider the blob from a future slot and return an error.
if bv.clock.Now().Before(earliestStart) {
log.WithFields(logging.BlobFields(bv.blob)).Debug("sidecar slot is too far in the future")
return ErrFromFutureSlot
return blobErrBuilder(ErrFromFutureSlot)
}
return nil
}
@@ -181,11 +161,11 @@ func (bv *ROBlobVerifier) SlotAboveFinalized() (err error) {
fcp := bv.fc.FinalizedCheckpoint()
fSlot, err := slots.EpochStart(fcp.Epoch)
if err != nil {
return errors.Wrapf(ErrSlotNotAfterFinalized, "error computing epoch start slot for finalized checkpoint (%d) %s", fcp.Epoch, err.Error())
return errors.Wrapf(blobErrBuilder(ErrSlotNotAfterFinalized), "error computing epoch start slot for finalized checkpoint (%d) %s", fcp.Epoch, err.Error())
}
if bv.blob.Slot() <= fSlot {
log.WithFields(logging.BlobFields(bv.blob)).Debug("sidecar slot is not after finalized checkpoint")
return ErrSlotNotAfterFinalized
return blobErrBuilder(ErrSlotNotAfterFinalized)
}
return nil
}
@@ -203,7 +183,7 @@ func (bv *ROBlobVerifier) ValidProposerSignature(ctx context.Context) (err error
if err != nil {
log.WithFields(logging.BlobFields(bv.blob)).WithError(err).Debug("reusing failed proposer signature validation from cache")
blobVerificationProposerSignatureCache.WithLabelValues("hit-invalid").Inc()
return ErrInvalidProposerSignature
return blobErrBuilder(ErrInvalidProposerSignature)
}
return nil
}
@@ -213,12 +193,12 @@ func (bv *ROBlobVerifier) ValidProposerSignature(ctx context.Context) (err error
parent, err := bv.parentState(ctx)
if err != nil {
log.WithFields(logging.BlobFields(bv.blob)).WithError(err).Debug("could not replay parent state for blob signature verification")
return ErrInvalidProposerSignature
return blobErrBuilder(ErrInvalidProposerSignature)
}
// Full verification, which will subsequently be cached for anything sharing the signature cache.
if err = bv.sc.VerifySignature(sd, parent); err != nil {
log.WithFields(logging.BlobFields(bv.blob)).WithError(err).Debug("signature verification failed")
return ErrInvalidProposerSignature
return blobErrBuilder(ErrInvalidProposerSignature)
}
return nil
}
@@ -235,7 +215,7 @@ func (bv *ROBlobVerifier) SidecarParentSeen(parentSeen func([32]byte) bool) (err
return nil
}
log.WithFields(logging.BlobFields(bv.blob)).Debug("parent root has not been seen")
return ErrSidecarParentNotSeen
return blobErrBuilder(ErrSidecarParentNotSeen)
}
// SidecarParentValid represents the spec verification:
@@ -244,7 +224,7 @@ func (bv *ROBlobVerifier) SidecarParentValid(badParent func([32]byte) bool) (err
defer bv.recordResult(RequireSidecarParentValid, &err)
if badParent != nil && badParent(bv.blob.ParentRoot()) {
log.WithFields(logging.BlobFields(bv.blob)).Debug("parent root is invalid")
return ErrSidecarParentInvalid
return blobErrBuilder(ErrSidecarParentInvalid)
}
return nil
}
@@ -255,10 +235,10 @@ func (bv *ROBlobVerifier) SidecarParentSlotLower() (err error) {
defer bv.recordResult(RequireSidecarParentSlotLower, &err)
parentSlot, err := bv.fc.Slot(bv.blob.ParentRoot())
if err != nil {
return errors.Wrap(ErrSlotNotAfterParent, "parent root not in forkchoice")
return errors.Wrap(blobErrBuilder(ErrSlotNotAfterParent), "parent root not in forkchoice")
}
if parentSlot >= bv.blob.Slot() {
return ErrSlotNotAfterParent
return blobErrBuilder(ErrSlotNotAfterParent)
}
return nil
}
@@ -270,7 +250,7 @@ func (bv *ROBlobVerifier) SidecarDescendsFromFinalized() (err error) {
defer bv.recordResult(RequireSidecarDescendsFromFinalized, &err)
if !bv.fc.HasNode(bv.blob.ParentRoot()) {
log.WithFields(logging.BlobFields(bv.blob)).Debug("parent root not in forkchoice")
return ErrSidecarNotFinalizedDescendent
return blobErrBuilder(ErrSidecarNotFinalizedDescendent)
}
return nil
}
@@ -281,7 +261,7 @@ func (bv *ROBlobVerifier) SidecarInclusionProven() (err error) {
defer bv.recordResult(RequireSidecarInclusionProven, &err)
if err = blocks.VerifyKZGInclusionProof(bv.blob); err != nil {
log.WithError(err).WithFields(logging.BlobFields(bv.blob)).Debug("sidecar inclusion proof verification failed")
return ErrSidecarInclusionProofInvalid
return blobErrBuilder(ErrSidecarInclusionProofInvalid)
}
return nil
}
@@ -293,7 +273,7 @@ func (bv *ROBlobVerifier) SidecarKzgProofVerified() (err error) {
defer bv.recordResult(RequireSidecarKzgProofVerified, &err)
if err = bv.verifyBlobCommitment(bv.blob); err != nil {
log.WithError(err).WithFields(logging.BlobFields(bv.blob)).Debug("kzg commitment proof verification failed")
return ErrSidecarKzgProofInvalid
return blobErrBuilder(ErrSidecarKzgProofInvalid)
}
return nil
}
@@ -311,7 +291,7 @@ func (bv *ROBlobVerifier) SidecarProposerExpected(ctx context.Context) (err erro
}
r, err := bv.fc.TargetRootForEpoch(bv.blob.ParentRoot(), e)
if err != nil {
return ErrSidecarUnexpectedProposer
return blobErrBuilder(ErrSidecarUnexpectedProposer)
}
c := &forkchoicetypes.Checkpoint{Root: r, Epoch: e}
idx, cached := bv.pc.Proposer(c, bv.blob.Slot())
@@ -319,19 +299,19 @@ func (bv *ROBlobVerifier) SidecarProposerExpected(ctx context.Context) (err erro
pst, err := bv.parentState(ctx)
if err != nil {
log.WithError(err).WithFields(logging.BlobFields(bv.blob)).Debug("state replay to parent_root failed")
return ErrSidecarUnexpectedProposer
return blobErrBuilder(ErrSidecarUnexpectedProposer)
}
idx, err = bv.pc.ComputeProposer(ctx, bv.blob.ParentRoot(), bv.blob.Slot(), pst)
if err != nil {
log.WithError(err).WithFields(logging.BlobFields(bv.blob)).Debug("error computing proposer index from parent state")
return ErrSidecarUnexpectedProposer
return blobErrBuilder(ErrSidecarUnexpectedProposer)
}
}
if idx != bv.blob.ProposerIndex() {
log.WithError(ErrSidecarUnexpectedProposer).
log.WithError(blobErrBuilder(ErrSidecarUnexpectedProposer)).
WithFields(logging.BlobFields(bv.blob)).WithField("expectedProposer", idx).
Debug("unexpected blob proposer")
return ErrSidecarUnexpectedProposer
return blobErrBuilder(ErrSidecarUnexpectedProposer)
}
return nil
}
@@ -357,3 +337,7 @@ func blobToSignatureData(b blocks.ROBlob) SignatureData {
Slot: b.Slot(),
}
}
func blobErrBuilder(baseErr error) error {
return goError.Join(ErrBlobInvalid, baseErr)
}

View File

@@ -27,13 +27,13 @@ func TestBlobIndexInBounds(t *testing.T) {
_, blobs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 1)
b := blobs[0]
// set Index to a value that is out of bounds
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.NoError(t, v.BlobIndexInBounds())
require.Equal(t, true, v.results.executed(RequireBlobIndexInBounds))
require.NoError(t, v.results.result(RequireBlobIndexInBounds))
b.Index = fieldparams.MaxBlobsPerBlock
v = ini.NewBlobVerifier(b, GossipSidecarRequirements)
v = ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.ErrorIs(t, v.BlobIndexInBounds(), ErrBlobIndexInvalid)
require.Equal(t, true, v.results.executed(RequireBlobIndexInBounds))
require.NotNil(t, v.results.result(RequireBlobIndexInBounds))
@@ -52,7 +52,7 @@ func TestSlotNotTooEarly(t *testing.T) {
// This clock will give a current slot of 1 on the nose
happyClock := startup.NewClock(genesis, [32]byte{}, startup.WithNower(func() time.Time { return now }))
ini := Initializer{shared: &sharedResources{clock: happyClock}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.NoError(t, v.NotFromFutureSlot())
require.Equal(t, true, v.results.executed(RequireNotFromFutureSlot))
require.NoError(t, v.results.result(RequireNotFromFutureSlot))
@@ -61,7 +61,7 @@ func TestSlotNotTooEarly(t *testing.T) {
// but still in the previous slot.
closeClock := startup.NewClock(genesis, [32]byte{}, startup.WithNower(func() time.Time { return now.Add(-1 * params.BeaconConfig().MaximumGossipClockDisparityDuration() / 2) }))
ini = Initializer{shared: &sharedResources{clock: closeClock}}
v = ini.NewBlobVerifier(b, GossipSidecarRequirements)
v = ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.NoError(t, v.NotFromFutureSlot())
// This clock will give a current slot of 0, with now coming more than max clock disparity before slot 1
@@ -69,7 +69,7 @@ func TestSlotNotTooEarly(t *testing.T) {
dispClock := startup.NewClock(genesis, [32]byte{}, startup.WithNower(func() time.Time { return disparate }))
// Set up initializer to use the clock that will set now to a little to far before slot 1
ini = Initializer{shared: &sharedResources{clock: dispClock}}
v = ini.NewBlobVerifier(b, GossipSidecarRequirements)
v = ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.ErrorIs(t, v.NotFromFutureSlot(), ErrFromFutureSlot)
require.Equal(t, true, v.results.executed(RequireNotFromFutureSlot))
require.NotNil(t, v.results.result(RequireNotFromFutureSlot))
@@ -114,7 +114,7 @@ func TestSlotAboveFinalized(t *testing.T) {
_, blobs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 1)
b := blobs[0]
b.SignedBlockHeader.Header.Slot = c.slot
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
err := v.SlotAboveFinalized()
require.Equal(t, true, v.results.executed(RequireSlotAboveFinalized))
if c.err == nil {
@@ -146,7 +146,7 @@ func TestValidProposerSignature_Cached(t *testing.T) {
},
}
ini := Initializer{shared: &sharedResources{sc: sc, sr: &mockStateByRooter{sbr: sbrErrorIfCalled(t)}}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.NoError(t, v.ValidProposerSignature(ctx))
require.Equal(t, true, v.results.executed(RequireValidProposerSignature))
require.NoError(t, v.results.result(RequireValidProposerSignature))
@@ -159,7 +159,7 @@ func TestValidProposerSignature_Cached(t *testing.T) {
return true, errors.New("derp")
}
ini = Initializer{shared: &sharedResources{sc: sc, sr: &mockStateByRooter{sbr: sbrErrorIfCalled(t)}}}
v = ini.NewBlobVerifier(b, GossipSidecarRequirements)
v = ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.ErrorIs(t, v.ValidProposerSignature(ctx), ErrInvalidProposerSignature)
require.Equal(t, true, v.results.executed(RequireValidProposerSignature))
require.NotNil(t, v.results.result(RequireValidProposerSignature))
@@ -182,14 +182,14 @@ func TestValidProposerSignature_CacheMiss(t *testing.T) {
},
}
ini := Initializer{shared: &sharedResources{sc: sc, sr: sbrForValOverride(b.ProposerIndex(), &ethpb.Validator{})}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.NoError(t, v.ValidProposerSignature(ctx))
require.Equal(t, true, v.results.executed(RequireValidProposerSignature))
require.NoError(t, v.results.result(RequireValidProposerSignature))
// simulate state not found
ini = Initializer{shared: &sharedResources{sc: sc, sr: sbrNotFound(t, expectedSd.Parent)}}
v = ini.NewBlobVerifier(b, GossipSidecarRequirements)
v = ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.ErrorIs(t, v.ValidProposerSignature(ctx), ErrInvalidProposerSignature)
require.Equal(t, true, v.results.executed(RequireValidProposerSignature))
require.NotNil(t, v.results.result(RequireValidProposerSignature))
@@ -206,7 +206,7 @@ func TestValidProposerSignature_CacheMiss(t *testing.T) {
},
}
ini = Initializer{shared: &sharedResources{sc: sc, sr: sbr}}
v = ini.NewBlobVerifier(b, GossipSidecarRequirements)
v = ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
// make sure all the histories are clean before calling the method
// so we don't get polluted by previous usages
@@ -255,14 +255,14 @@ func TestSidecarParentSeen(t *testing.T) {
t.Run("happy path", func(t *testing.T) {
ini := Initializer{shared: &sharedResources{fc: fcHas}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.NoError(t, v.SidecarParentSeen(nil))
require.Equal(t, true, v.results.executed(RequireSidecarParentSeen))
require.NoError(t, v.results.result(RequireSidecarParentSeen))
})
t.Run("HasNode false, no badParent cb, expected error", func(t *testing.T) {
ini := Initializer{shared: &sharedResources{fc: fcLacks}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.ErrorIs(t, v.SidecarParentSeen(nil), ErrSidecarParentNotSeen)
require.Equal(t, true, v.results.executed(RequireSidecarParentSeen))
require.NotNil(t, v.results.result(RequireSidecarParentSeen))
@@ -270,14 +270,14 @@ func TestSidecarParentSeen(t *testing.T) {
t.Run("HasNode false, badParent true", func(t *testing.T) {
ini := Initializer{shared: &sharedResources{fc: fcLacks}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.NoError(t, v.SidecarParentSeen(badParentCb(t, b.ParentRoot(), true)))
require.Equal(t, true, v.results.executed(RequireSidecarParentSeen))
require.NoError(t, v.results.result(RequireSidecarParentSeen))
})
t.Run("HasNode false, badParent false", func(t *testing.T) {
ini := Initializer{shared: &sharedResources{fc: fcLacks}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.ErrorIs(t, v.SidecarParentSeen(badParentCb(t, b.ParentRoot(), false)), ErrSidecarParentNotSeen)
require.Equal(t, true, v.results.executed(RequireSidecarParentSeen))
require.NotNil(t, v.results.result(RequireSidecarParentSeen))
@@ -289,14 +289,14 @@ func TestSidecarParentValid(t *testing.T) {
b := blobs[0]
t.Run("parent valid", func(t *testing.T) {
ini := Initializer{shared: &sharedResources{}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.NoError(t, v.SidecarParentValid(badParentCb(t, b.ParentRoot(), false)))
require.Equal(t, true, v.results.executed(RequireSidecarParentValid))
require.NoError(t, v.results.result(RequireSidecarParentValid))
})
t.Run("parent not valid", func(t *testing.T) {
ini := Initializer{shared: &sharedResources{}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.ErrorIs(t, v.SidecarParentValid(badParentCb(t, b.ParentRoot(), true)), ErrSidecarParentInvalid)
require.Equal(t, true, v.results.executed(RequireSidecarParentValid))
require.NotNil(t, v.results.result(RequireSidecarParentValid))
@@ -340,7 +340,7 @@ func TestSidecarParentSlotLower(t *testing.T) {
}
return c.fcSlot, c.fcErr
}}}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
err := v.SidecarParentSlotLower()
require.Equal(t, true, v.results.executed(RequireSidecarParentSlotLower))
if c.err == nil {
@@ -364,7 +364,7 @@ func TestSidecarDescendsFromFinalized(t *testing.T) {
}
return false
}}}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.ErrorIs(t, v.SidecarDescendsFromFinalized(), ErrSidecarNotFinalizedDescendent)
require.Equal(t, true, v.results.executed(RequireSidecarDescendsFromFinalized))
require.NotNil(t, v.results.result(RequireSidecarDescendsFromFinalized))
@@ -376,7 +376,7 @@ func TestSidecarDescendsFromFinalized(t *testing.T) {
}
return true
}}}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.NoError(t, v.SidecarDescendsFromFinalized())
require.Equal(t, true, v.results.executed(RequireSidecarDescendsFromFinalized))
require.NoError(t, v.results.result(RequireSidecarDescendsFromFinalized))
@@ -389,7 +389,7 @@ func TestSidecarInclusionProven(t *testing.T) {
b := blobs[0]
ini := Initializer{}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.NoError(t, v.SidecarInclusionProven())
require.Equal(t, true, v.results.executed(RequireSidecarInclusionProven))
require.NoError(t, v.results.result(RequireSidecarInclusionProven))
@@ -397,7 +397,7 @@ func TestSidecarInclusionProven(t *testing.T) {
// Invert bits of the first byte of the body root to mess up the proof
byte0 := b.SignedBlockHeader.Header.BodyRoot[0]
b.SignedBlockHeader.Header.BodyRoot[0] = byte0 ^ 255
v = ini.NewBlobVerifier(b, GossipSidecarRequirements)
v = ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.ErrorIs(t, v.SidecarInclusionProven(), ErrSidecarInclusionProofInvalid)
require.Equal(t, true, v.results.executed(RequireSidecarInclusionProven))
require.NotNil(t, v.results.result(RequireSidecarInclusionProven))
@@ -409,7 +409,7 @@ func TestSidecarInclusionProvenElectra(t *testing.T) {
b := blobs[0]
ini := Initializer{}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.NoError(t, v.SidecarInclusionProven())
require.Equal(t, true, v.results.executed(RequireSidecarInclusionProven))
require.NoError(t, v.results.result(RequireSidecarInclusionProven))
@@ -417,7 +417,7 @@ func TestSidecarInclusionProvenElectra(t *testing.T) {
// Invert bits of the first byte of the body root to mess up the proof
byte0 := b.SignedBlockHeader.Header.BodyRoot[0]
b.SignedBlockHeader.Header.BodyRoot[0] = byte0 ^ 255
v = ini.NewBlobVerifier(b, GossipSidecarRequirements)
v = ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.ErrorIs(t, v.SidecarInclusionProven(), ErrSidecarInclusionProofInvalid)
require.Equal(t, true, v.results.executed(RequireSidecarInclusionProven))
require.NotNil(t, v.results.result(RequireSidecarInclusionProven))
@@ -452,21 +452,21 @@ func TestSidecarProposerExpected(t *testing.T) {
b := blobs[0]
t.Run("cached, matches", func(t *testing.T) {
ini := Initializer{shared: &sharedResources{pc: &mockProposerCache{ProposerCB: pcReturnsIdx(b.ProposerIndex())}, fc: &mockForkchoicer{TargetRootForEpochCB: fcReturnsTargetRoot([32]byte{})}}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.NoError(t, v.SidecarProposerExpected(ctx))
require.Equal(t, true, v.results.executed(RequireSidecarProposerExpected))
require.NoError(t, v.results.result(RequireSidecarProposerExpected))
})
t.Run("cached, does not match", func(t *testing.T) {
ini := Initializer{shared: &sharedResources{pc: &mockProposerCache{ProposerCB: pcReturnsIdx(b.ProposerIndex() + 1)}, fc: &mockForkchoicer{TargetRootForEpochCB: fcReturnsTargetRoot([32]byte{})}}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.ErrorIs(t, v.SidecarProposerExpected(ctx), ErrSidecarUnexpectedProposer)
require.Equal(t, true, v.results.executed(RequireSidecarProposerExpected))
require.NotNil(t, v.results.result(RequireSidecarProposerExpected))
})
t.Run("not cached, state lookup failure", func(t *testing.T) {
ini := Initializer{shared: &sharedResources{sr: sbrNotFound(t, b.ParentRoot()), pc: &mockProposerCache{ProposerCB: pcReturnsNotFound()}, fc: &mockForkchoicer{TargetRootForEpochCB: fcReturnsTargetRoot([32]byte{})}}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.ErrorIs(t, v.SidecarProposerExpected(ctx), ErrSidecarUnexpectedProposer)
require.Equal(t, true, v.results.executed(RequireSidecarProposerExpected))
require.NotNil(t, v.results.result(RequireSidecarProposerExpected))
@@ -475,14 +475,14 @@ func TestSidecarProposerExpected(t *testing.T) {
t.Run("not cached, proposer matches", func(t *testing.T) {
pc := &mockProposerCache{
ProposerCB: pcReturnsNotFound(),
ComputeProposerCB: func(ctx context.Context, root [32]byte, slot primitives.Slot, pst state.BeaconState) (primitives.ValidatorIndex, error) {
ComputeProposerCB: func(_ context.Context, root [32]byte, slot primitives.Slot, _ state.BeaconState) (primitives.ValidatorIndex, error) {
require.Equal(t, b.ParentRoot(), root)
require.Equal(t, b.Slot(), slot)
return b.ProposerIndex(), nil
},
}
ini := Initializer{shared: &sharedResources{sr: sbrForValOverride(b.ProposerIndex(), &ethpb.Validator{}), pc: pc, fc: &mockForkchoicer{TargetRootForEpochCB: fcReturnsTargetRoot([32]byte{})}}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.NoError(t, v.SidecarProposerExpected(ctx))
require.Equal(t, true, v.results.executed(RequireSidecarProposerExpected))
require.NoError(t, v.results.result(RequireSidecarProposerExpected))
@@ -490,14 +490,14 @@ func TestSidecarProposerExpected(t *testing.T) {
t.Run("not cached, proposer does not match", func(t *testing.T) {
pc := &mockProposerCache{
ProposerCB: pcReturnsNotFound(),
ComputeProposerCB: func(ctx context.Context, root [32]byte, slot primitives.Slot, pst state.BeaconState) (primitives.ValidatorIndex, error) {
ComputeProposerCB: func(_ context.Context, root [32]byte, slot primitives.Slot, _ state.BeaconState) (primitives.ValidatorIndex, error) {
require.Equal(t, b.ParentRoot(), root)
require.Equal(t, b.Slot(), slot)
return b.ProposerIndex() + 1, nil
},
}
ini := Initializer{shared: &sharedResources{sr: sbrForValOverride(b.ProposerIndex(), &ethpb.Validator{}), pc: pc, fc: &mockForkchoicer{TargetRootForEpochCB: fcReturnsTargetRoot([32]byte{})}}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.ErrorIs(t, v.SidecarProposerExpected(ctx), ErrSidecarUnexpectedProposer)
require.Equal(t, true, v.results.executed(RequireSidecarProposerExpected))
require.NotNil(t, v.results.result(RequireSidecarProposerExpected))
@@ -505,14 +505,14 @@ func TestSidecarProposerExpected(t *testing.T) {
t.Run("not cached, ComputeProposer fails", func(t *testing.T) {
pc := &mockProposerCache{
ProposerCB: pcReturnsNotFound(),
ComputeProposerCB: func(ctx context.Context, root [32]byte, slot primitives.Slot, pst state.BeaconState) (primitives.ValidatorIndex, error) {
ComputeProposerCB: func(_ context.Context, root [32]byte, slot primitives.Slot, _ state.BeaconState) (primitives.ValidatorIndex, error) {
require.Equal(t, b.ParentRoot(), root)
require.Equal(t, b.Slot(), slot)
return 0, errors.New("ComputeProposer failed")
},
}
ini := Initializer{shared: &sharedResources{sr: sbrForValOverride(b.ProposerIndex(), &ethpb.Validator{}), pc: pc, fc: &mockForkchoicer{TargetRootForEpochCB: fcReturnsTargetRoot([32]byte{})}}}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
require.ErrorIs(t, v.SidecarProposerExpected(ctx), ErrSidecarUnexpectedProposer)
require.Equal(t, true, v.results.executed(RequireSidecarProposerExpected))
require.NotNil(t, v.results.result(RequireSidecarProposerExpected))
@@ -523,7 +523,7 @@ func TestRequirementSatisfaction(t *testing.T) {
_, blobs := util.GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 1, 1)
b := blobs[0]
ini := Initializer{}
v := ini.NewBlobVerifier(b, GossipSidecarRequirements)
v := ini.NewBlobVerifier(b, GossipBlobSidecarRequirements)
_, err := v.VerifiedROBlob()
require.ErrorIs(t, err, ErrBlobInvalid)
@@ -537,7 +537,7 @@ func TestRequirementSatisfaction(t *testing.T) {
}
// satisfy everything through the backdoor and ensure we get the verified ro blob at the end
for _, r := range GossipSidecarRequirements {
for _, r := range GossipBlobSidecarRequirements {
v.results.record(r, nil)
}
require.Equal(t, true, v.results.allSatisfied())

View File

@@ -17,7 +17,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/network/forks"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/time/slots"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
)
const (
@@ -50,8 +50,8 @@ type SignatureData struct {
Slot primitives.Slot
}
func (d SignatureData) logFields() log.Fields {
return log.Fields{
func (d SignatureData) logFields() logrus.Fields {
return logrus.Fields{
"root": fmt.Sprintf("%#x", d.Root),
"parentRoot": fmt.Sprintf("%#x", d.Parent),
"signature": fmt.Sprintf("%#x", d.Signature),

View File

@@ -2,8 +2,40 @@ package verification
import "github.com/pkg/errors"
// ErrMissingVerification indicates that the given verification function was never performed on the value.
var ErrMissingVerification = errors.New("verification was not performed for requirement")
var (
// ErrFromFutureSlot means RequireSlotNotTooEarly failed.
ErrFromFutureSlot = errors.New("slot is too far in the future")
// ErrSlotNotAfterFinalized means RequireSlotAboveFinalized failed.
ErrSlotNotAfterFinalized = errors.New("slot <= finalized checkpoint")
// ErrInvalidProposerSignature means RequireValidProposerSignature failed.
ErrInvalidProposerSignature = errors.New("proposer signature could not be verified")
// ErrSidecarParentNotSeen means RequireSidecarParentSeen failed.
ErrSidecarParentNotSeen = errors.New("parent root has not been seen")
// ErrSidecarParentInvalid means RequireSidecarParentValid failed.
ErrSidecarParentInvalid = errors.New("parent block is not valid")
// ErrSlotNotAfterParent means RequireSidecarParentSlotLower failed.
ErrSlotNotAfterParent = errors.New("slot <= slot")
// ErrSidecarNotFinalizedDescendent means RequireSidecarDescendsFromFinalized failed.
ErrSidecarNotFinalizedDescendent = errors.New("parent is not descended from the finalized block")
// ErrSidecarInclusionProofInvalid means RequireSidecarInclusionProven failed.
ErrSidecarInclusionProofInvalid = errors.New("sidecar inclusion proof verification failed")
// ErrSidecarKzgProofInvalid means RequireSidecarKzgProofVerified failed.
ErrSidecarKzgProofInvalid = errors.New("sidecar kzg commitment proof verification failed")
// ErrSidecarUnexpectedProposer means RequireSidecarProposerExpected failed.
ErrSidecarUnexpectedProposer = errors.New("sidecar was not proposed by the expected proposer_index")
// ErrMissingVerification indicates that the given verification function was never performed on the value.
ErrMissingVerification = errors.New("verification was not performed for requirement")
)
// VerificationMultiError is a custom error that can be used to access individual verification failures.
type VerificationMultiError struct {

View File

@@ -0,0 +1,5 @@
package verification
import "github.com/sirupsen/logrus"
var log = logrus.WithField("prefix", "verification")

View File

@@ -39,7 +39,7 @@ func TestResultList(t *testing.T) {
func TestExportedBlobSanityCheck(t *testing.T) {
// make sure all requirement lists contain the bare minimum checks
sanity := []Requirement{RequireValidProposerSignature, RequireSidecarKzgProofVerified, RequireBlobIndexInBounds, RequireSidecarInclusionProven}
reqs := [][]Requirement{GossipSidecarRequirements, SpectestSidecarRequirements, InitsyncSidecarRequirements, BackfillSidecarRequirements, PendingQueueSidecarRequirements}
reqs := [][]Requirement{GossipBlobSidecarRequirements, SpectestBlobSidecarRequirements, InitsyncBlobSidecarRequirements, BackfillBlobSidecarRequirements, PendingQueueBlobSidecarRequirements}
for i := range reqs {
r := reqs[i]
reqMap := make(map[Requirement]struct{})
@@ -51,13 +51,13 @@ func TestExportedBlobSanityCheck(t *testing.T) {
require.Equal(t, true, ok)
}
}
require.DeepEqual(t, allSidecarRequirements, GossipSidecarRequirements)
require.DeepEqual(t, allBlobSidecarRequirements, GossipBlobSidecarRequirements)
}
func TestAllBlobRequirementsHaveStrings(t *testing.T) {
var derp Requirement = math.MaxInt
require.Equal(t, unknownRequirementName, derp.String())
for i := range allSidecarRequirements {
require.NotEqual(t, unknownRequirementName, allSidecarRequirements[i].String())
for i := range allBlobSidecarRequirements {
require.NotEqual(t, unknownRequirementName, allBlobSidecarRequirements[i].String())
}
}

View File

@@ -372,7 +372,7 @@ func runBlobStep(t *testing.T,
require.NoError(t, err)
ini, err := builder.vwait.WaitForInitializer(context.Background())
require.NoError(t, err)
bv := ini.NewBlobVerifier(ro, verification.SpectestSidecarRequirements)
bv := ini.NewBlobVerifier(ro, verification.SpectestBlobSidecarRequirements)
ctx := context.Background()
if err := bv.BlobIndexInBounds(); err != nil {
t.Logf("BlobIndexInBounds error: %s", err.Error())

View File

@@ -195,7 +195,7 @@ func logProposedBlock(log *logrus.Entry, blk interfaces.SignedBeaconBlock, blkRo
log = log.WithFields(logrus.Fields{
"payloadHash": fmt.Sprintf("%#x", bytesutil.Trunc(p.BlockHash())),
"parentHash": fmt.Sprintf("%#x", bytesutil.Trunc(p.ParentHash())),
"blockNumber": p.BlockNumber,
"blockNumber": p.BlockNumber(),
})
if !blk.IsBlinded() {
txs, err := p.Transactions()