Compare commits

...

9 Commits

Author SHA1 Message Date
Manu NALEPA
2c56c650e6 jimmy 2025-02-25 17:43:13 +01:00
Manu NALEPA
6c0bba7197 Implement validator custody. 2025-02-22 17:15:34 +01:00
Manu NALEPA
35a2a32106 blobsFromStoredDataColumns: Simplify.
Do not make any more a difference between "can theoretically reconstruct" and "can actually reconstruct".
2025-02-21 16:03:49 +01:00
Manu NALEPA
2a72703d3e dataColumnSidecarByRangeRPCHandler: Remove custody columns in logs. 2025-02-21 16:03:49 +01:00
Manu NALEPA
3b5a6b5e2f dataColumnSidecarByRootRPCHandler: Remove custody columns in logs. 2025-02-21 16:03:49 +01:00
Manu NALEPA
36958b552d Sync service: Add tracked validators cache. 2025-02-21 16:03:49 +01:00
Manu NALEPA
ae1a6be8a3 Implement ValidatorsCustodyRequirement. 2025-02-21 16:03:49 +01:00
Manu NALEPA
4f146f9a30 Add VALIDATOR_CUSTODY_REQUIREMENT and BALANCE_PER_ADDITIONAL_CUSTODY_GROUP. 2025-02-21 16:03:49 +01:00
Manu NALEPA
f07036ab3c Node info: Rename cache and mutex. 2025-02-21 16:03:48 +01:00
33 changed files with 778 additions and 200 deletions

View File

@@ -682,8 +682,12 @@ func (s *Service) areDataColumnsAvailable(ctx context.Context, root [32]byte, si
// https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/das-core.md#custody-sampling
nodeID := s.cfg.P2P.NodeID()
// Prevent custody group count to change during the rest of the function.
peerdas.CustodyGroupCountMut.RLock()
defer peerdas.CustodyGroupCountMut.RUnlock()
// Get the custody group sampling size for the node.
custodyGroupSamplingSize := peerdas.CustodyGroupSamplingSize()
custodyGroupSamplingSize := peerdas.CustodyGroupSamplingSize(peerdas.Actual)
peerInfo, _, err := peerdas.Info(nodeID, custodyGroupSamplingSize)
if err != nil {
return errors.Wrap(err, "peer info")

View File

@@ -14,11 +14,13 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/state:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//crypto/hash:go_default_library",
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
@@ -46,10 +48,12 @@ go_test(
deps = [
":go_default_library",
"//beacon-chain/blockchain/kzg:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",

View File

@@ -11,10 +11,12 @@ import (
"github.com/holiman/uint256"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg"
beaconState "github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/crypto/hash"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
@@ -30,6 +32,13 @@ var (
maxUint256 = &uint256.Int{math.MaxUint64, math.MaxUint64, math.MaxUint64, math.MaxUint64}
)
type CustodyType int
const (
Target CustodyType = iota
Actual
)
// CustodyGroups computes the custody groups the node should participate in for custody.
// https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/das-core.md#get_custody_groups
func CustodyGroups(nodeId enode.ID, custodyGroupCount uint64) (map[uint64]bool, error) {
@@ -193,10 +202,14 @@ func DataColumnSidecars(signedBlock interfaces.ReadOnlySignedBeaconBlock, blobs
// CustodyGroupSamplingSize returns the number of custody groups the node should sample from.
// https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/das-core.md#custody-sampling
func CustodyGroupSamplingSize() uint64 {
samplesPerSlot := params.BeaconConfig().SamplesPerSlot
custodyGroupCount := CustodyGroupCount()
func CustodyGroupSamplingSize(ct CustodyType) uint64 {
custodyGroupCount := TargetCustodyGroupCount.Get()
if ct == Actual {
custodyGroupCount = ActualCustodyGroupCount()
}
samplesPerSlot := params.BeaconConfig().SamplesPerSlot
return max(samplesPerSlot, custodyGroupCount)
}
@@ -226,6 +239,28 @@ func CustodyColumns(custodyGroups map[uint64]bool) (map[uint64]bool, error) {
return columns, nil
}
// ValidatorsCustodyRequirement returns the number of custody groups regarding the validator indices attached to the beacon node.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/fulu/das-core.md#validator-custody
func ValidatorsCustodyRequirement(state beaconState.BeaconState, validatorsIndex map[primitives.ValidatorIndex]bool) (uint64, error) {
totalNodeBalance := uint64(0)
for index := range validatorsIndex {
balance, err := state.BalanceAtIndex(index)
if err != nil {
return 0, errors.Wrapf(err, "balance at index for validator index %v", index)
}
totalNodeBalance += balance
}
beaconConfig := params.BeaconConfig()
numberOfCustodyGroup := beaconConfig.NumberOfCustodyGroups
validatorCustodyRequirement := beaconConfig.ValidatorCustodyRequirement
balancePerAdditionalCustodyGroup := beaconConfig.BalancePerAdditionalCustodyGroup
count := totalNodeBalance / balancePerAdditionalCustodyGroup
return min(max(count, validatorCustodyRequirement), numberOfCustodyGroup), nil
}
// Blobs extract blobs from `dataColumnsSidecar`.
// This can be seen as the reciprocal function of DataColumnSidecars.
// `dataColumnsSidecar` needs to contain the datacolumns corresponding to the non-extended matrix,

View File

@@ -6,8 +6,10 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
state_native "github.com/prysmaticlabs/prysm/v5/beacon-chain/state/state-native"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
@@ -147,3 +149,96 @@ func TestDataColumnsSidecarsBlobsRoundtrip(t *testing.T) {
// Check that the blobs are the same.
require.DeepSSZEqual(t, verifiedROBlobs, roundtripBlobs)
}
func TestValidatorsCustodyRequirement(t *testing.T) {
testCases := []struct {
name string
count uint64
expected uint64
}{
{name: "0 validators", count: 0, expected: 8},
{name: "1 validator", count: 1, expected: 8},
{name: "8 validators", count: 8, expected: 8},
{name: "9 validators", count: 9, expected: 9},
{name: "100 validators", count: 100, expected: 100},
{name: "128 validators", count: 128, expected: 128},
{name: "129 validators", count: 129, expected: 128},
{name: "1000 validators", count: 1000, expected: 128},
}
const balance = uint64(32_000_000_000)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
balances := make([]uint64, 0, tc.count)
for range tc.count {
balances = append(balances, balance)
}
validatorsIndex := make(map[primitives.ValidatorIndex]bool)
for i := range tc.count {
validatorsIndex[primitives.ValidatorIndex(i)] = true
}
beaconState, err := state_native.InitializeFromProtoFulu(&ethpb.BeaconStateElectra{Balances: balances})
require.NoError(t, err)
actual, err := peerdas.ValidatorsCustodyRequirement(beaconState, validatorsIndex)
require.NoError(t, err)
require.Equal(t, tc.expected, actual)
})
}
}
func TestCustodyGroupSamplingSize(t *testing.T) {
testCases := []struct {
name string
custodyType peerdas.CustodyType
validatorsCustodyRequirement uint64
toAdvertiseCustodyGroupCount uint64
expected uint64
}{
{
name: "target, lower than samples per slot",
custodyType: peerdas.Target,
validatorsCustodyRequirement: 2,
expected: 8,
},
{
name: "target, higher than samples per slot",
custodyType: peerdas.Target,
validatorsCustodyRequirement: 100,
expected: 100,
},
{
name: "actual, lower than samples per slot",
custodyType: peerdas.Actual,
validatorsCustodyRequirement: 3,
toAdvertiseCustodyGroupCount: 4,
expected: 8,
},
{
name: "actual, higher than samples per slot",
custodyType: peerdas.Actual,
validatorsCustodyRequirement: 100,
toAdvertiseCustodyGroupCount: 101,
expected: 100,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Set the validators custody requirement for target custody group count.
peerdas.TargetCustodyGroupCount.SetValidatorsCustodyRequirement(tc.validatorsCustodyRequirement)
// Set the to advertise custody group count.
peerdas.ToAdvertiseCustodyGroupCount.Set(tc.toAdvertiseCustodyGroupCount)
// Compute the custody group sampling size.
actual := peerdas.CustodyGroupSamplingSize(tc.custodyType)
// Check the result.
require.Equal(t, tc.expected, actual)
})
}
}

View File

@@ -7,38 +7,64 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
lru "github.com/hashicorp/golang-lru"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
)
// info contains all useful peerDAS related information regarding a peer.
type info struct {
CustodyGroups map[uint64]bool
CustodyColumns map[uint64]bool
DataColumnsSubnets map[uint64]bool
}
type (
info struct {
CustodyGroups map[uint64]bool
CustodyColumns map[uint64]bool
DataColumnsSubnets map[uint64]bool
}
targetCustodyGroupCount struct {
mut sync.RWMutex
validatorsCustodyRequirement uint64
}
toAdverstiseCustodyGroupCount struct {
mut sync.RWMutex
value uint64
}
)
const (
cacheSize = 200
keySize = 32 + 8
nodeInfoCacheSize = 200
nodeInfoCachKeySize = 32 + 8
)
var (
mut sync.Mutex
cache *lru.Cache
// CustodyGroupCountMut is a mutex to be used by caller to ensure neither
// TargetCustodyGroupCount nor ToAdvertiseCustodyGroupCount are being modified.
// (This is not necessary to use this mutex for any data protection.)
CustodyGroupCountMut sync.RWMutex
// TargetCustodyGroupCount represents the target number of custody groups we should custody
// regarding the validators we are tracking.
TargetCustodyGroupCount targetCustodyGroupCount
// ToAdvertiseCustodyGroupCount represents the number of custody groups to advertise to the network.
ToAdvertiseCustodyGroupCount toAdverstiseCustodyGroupCount
nodeInfoCacheMut sync.Mutex
nodeInfoCache *lru.Cache
)
// Info returns the peerDAS information for a given nodeID and custodyGroupCount.
// It returns a boolean indicating if the peer info was already in the cache and an error if any.
func Info(nodeID enode.ID, custodyGroupCount uint64) (*info, bool, error) {
// Create a new cache if it doesn't exist.
if err := createCacheIfNeeded(); err != nil {
if err := createInfoCacheIfNeeded(); err != nil {
return nil, false, errors.Wrap(err, "create cache if needed")
}
// Compute the key.
key := computeKey(nodeID, custodyGroupCount)
key := computeInfoCacheKey(nodeID, custodyGroupCount)
// If the value is already in the cache, return it.
if value, ok := cache.Get(key); ok {
if value, ok := nodeInfoCache.Get(key); ok {
peerInfo, ok := value.(*info)
if !ok {
return nil, false, errors.New("failed to cast peer info (should never happen)")
@@ -70,34 +96,89 @@ func Info(nodeID enode.ID, custodyGroupCount uint64) (*info, bool, error) {
}
// Add the result to the cache.
cache.Add(key, result)
nodeInfoCache.Add(key, result)
return result, false, nil
}
// createCacheIfNeeded creates a new cache if it doesn't exist.
func createCacheIfNeeded() error {
mut.Lock()
defer mut.Unlock()
// createInfoCacheIfNeeded creates a new cache if it doesn't exist.
func createInfoCacheIfNeeded() error {
nodeInfoCacheMut.Lock()
defer nodeInfoCacheMut.Unlock()
if cache == nil {
c, err := lru.New(cacheSize)
if nodeInfoCache == nil {
c, err := lru.New(nodeInfoCacheSize)
if err != nil {
return errors.Wrap(err, "lru new")
}
cache = c
nodeInfoCache = c
}
return nil
}
// computeKey returns a unique key for a node and its custodyGroupCount.
func computeKey(nodeID enode.ID, custodyGroupCount uint64) [keySize]byte {
var key [keySize]byte
// computeInfoCacheKey returns a unique key for a node and its custodyGroupCount.
func computeInfoCacheKey(nodeID enode.ID, custodyGroupCount uint64) [nodeInfoCachKeySize]byte {
var key [nodeInfoCachKeySize]byte
copy(key[:32], nodeID[:])
binary.BigEndian.PutUint64(key[32:], custodyGroupCount)
return key
}
// setValidatorsCustodyRequirement sets the validators custody requirement.
func (tcgc *targetCustodyGroupCount) SetValidatorsCustodyRequirement(value uint64) {
tcgc.mut.Lock()
defer tcgc.mut.Unlock()
tcgc.validatorsCustodyRequirement = value
}
// CustodyGroupCount returns the number of groups we should participate in for custody.
func (tcgc *targetCustodyGroupCount) Get() uint64 {
// If subscribed to all subnets, return the number of custody groups.
if flags.Get().SubscribeToAllSubnets {
return params.BeaconConfig().NumberOfCustodyGroups
}
tcgc.mut.RLock()
defer tcgc.mut.RUnlock()
// If no validators are tracked, return the default custody requirement.
if tcgc.validatorsCustodyRequirement == 0 {
return params.BeaconConfig().CustodyRequirement
}
// Return the validators custody requirement.
return tcgc.validatorsCustodyRequirement
}
// Set sets the to advertise custody group count.
func (tacgc *toAdverstiseCustodyGroupCount) Set(value uint64) {
tacgc.mut.Lock()
defer tacgc.mut.Unlock()
tacgc.value = value
}
// Get returns the to advertise custody group count.
func (tacgc *toAdverstiseCustodyGroupCount) Get() uint64 {
// If subscribed to all subnets, return the number of custody groups.
if flags.Get().SubscribeToAllSubnets {
return params.BeaconConfig().NumberOfCustodyGroups
}
custodyRequirement := params.BeaconConfig().CustodyRequirement
tacgc.mut.RLock()
defer tacgc.mut.RUnlock()
return max(tacgc.value, custodyRequirement)
}
// ActualCustodyGroupCount returns the actual custody group count.
func ActualCustodyGroupCount() uint64 {
return min(TargetCustodyGroupCount.Get(), ToAdvertiseCustodyGroupCount.Get())
}

View File

@@ -5,6 +5,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/testing/require"
)
@@ -25,3 +26,103 @@ func TestInfo(t *testing.T) {
require.DeepEqual(t, expectedDataColumnsSubnets, actual.DataColumnsSubnets)
}
}
func TestTargetCustodyGroupCount(t *testing.T) {
testCases := []struct {
name string
subscribeToAllSubnets bool
validatorsCustodyRequirement uint64
expected uint64
}{
{
name: "subscribed to all subnets",
subscribeToAllSubnets: true,
validatorsCustodyRequirement: 100,
expected: 128,
},
{
name: "no validators attached",
subscribeToAllSubnets: false,
validatorsCustodyRequirement: 0,
expected: 4,
},
{
name: "some validators attached",
subscribeToAllSubnets: false,
validatorsCustodyRequirement: 100,
expected: 100,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Subscribe to all subnets if needed.
if tc.subscribeToAllSubnets {
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.SubscribeToAllSubnets = true
flags.Init(gFlags)
defer flags.Init(resetFlags)
}
// Set the validators custody requirement.
peerdas.TargetCustodyGroupCount.SetValidatorsCustodyRequirement(tc.validatorsCustodyRequirement)
// Get the target custody group count.
actual := peerdas.TargetCustodyGroupCount.Get()
// Compare the expected and actual values.
require.Equal(t, tc.expected, actual)
})
}
}
func TestToAdvertiseCustodyGroupCount(t *testing.T) {
testCases := []struct {
name string
subscribeToAllSubnets bool
toAdvertiseCustodyGroupCount uint64
expected uint64
}{
{
name: "subscribed to all subnets",
subscribeToAllSubnets: true,
toAdvertiseCustodyGroupCount: 100,
expected: 128,
},
{
name: "higher than custody requirement",
subscribeToAllSubnets: false,
toAdvertiseCustodyGroupCount: 100,
expected: 100,
},
{
name: "lower than custody requirement",
subscribeToAllSubnets: false,
toAdvertiseCustodyGroupCount: 1,
expected: 4,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Subscribe to all subnets if needed.
if tc.subscribeToAllSubnets {
resetFlags := flags.Get()
gFlags := new(flags.GlobalFlags)
gFlags.SubscribeToAllSubnets = true
flags.Init(gFlags)
defer flags.Init(resetFlags)
}
// Set the to advertise custody group count.
peerdas.ToAdvertiseCustodyGroupCount.Set(tc.toAdvertiseCustodyGroupCount)
// Get the to advertise custody group count.
actual := peerdas.ToAdvertiseCustodyGroupCount.Get()
// Compare the expected and actual values.
require.Equal(t, tc.expected, actual)
})
}
}

View File

@@ -4,7 +4,6 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
)
@@ -111,15 +110,6 @@ func ComputeCustodyGroupForColumn(columnIndex uint64) (uint64, error) {
return columnIndex / columnsPerGroup, nil
}
// CustodyGroupCount returns the number of groups we should participate in for custody.
func CustodyGroupCount() uint64 {
if flags.Get().SubscribeToAllSubnets {
return params.BeaconConfig().NumberOfCustodyGroups
}
return params.BeaconConfig().CustodyRequirement
}
// CustodyGroupCountFromRecord extracts the custody group count from an ENR record.
func CustodyGroupCountFromRecord(record *enr.Record) (uint64, error) {
if record == nil {

View File

@@ -7,51 +7,11 @@ import (
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
)
func TestCustodyGroupCount(t *testing.T) {
testCases := []struct {
name string
subscribeToAllSubnets bool
expected uint64
}{
{
name: "subscribeToAllSubnets=false",
subscribeToAllSubnets: false,
expected: params.BeaconConfig().CustodyRequirement,
},
{
name: "subscribeToAllSubnets=true",
subscribeToAllSubnets: true,
expected: params.BeaconConfig().DataColumnSidecarSubnetCount,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Set flags.
resetFlags := flags.Get()
defer func() {
flags.Init(resetFlags)
}()
params.SetupTestConfigCleanup(t)
gFlags := new(flags.GlobalFlags)
gFlags.SubscribeToAllSubnets = tc.subscribeToAllSubnets
flags.Init(gFlags)
// Get the custody subnet count.
actual := peerdas.CustodyGroupCount()
require.Equal(t, tc.expected, actual)
})
}
}
func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
dbBlock := util.NewBeaconBlockDeneb()
require.NoError(t, kzg.Start())

View File

@@ -64,7 +64,7 @@ func (s *LazilyPersistentStoreColumn) PersistColumns(current primitives.Slot, sc
}
// IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified.
// BlobSidecars already in the db are assumed to have been previously verified against the block.
// DataColumnsSidecars already in the db are assumed to have been previously verified against the block.
func (s *LazilyPersistentStoreColumn) IsDataAvailable(
ctx context.Context,
nodeID enode.ID,
@@ -154,7 +154,7 @@ func fullCommitmentsToCheck(nodeID enode.ID, block blocks.ROBlock, currentSlot p
}
// Retrieve the groups count.
custodyGroupCount := peerdas.CustodyGroupCount()
custodyGroupCount := peerdas.ActualCustodyGroupCount()
// Retrieve peer info.
peerInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)

View File

@@ -859,6 +859,7 @@ func (b *BeaconNode) registerSyncService(initialSyncComplete chan struct{}, bFil
regularsync.WithBlobStorage(b.BlobStorage),
regularsync.WithVerifierWaiter(b.verifyInitWaiter),
regularsync.WithAvailableBlocker(bFillStore),
regularsync.WithTrackedValidatorsCache(b.trackedValidatorsCache),
)
return b.services.RegisterService(rs)
}

View File

@@ -79,6 +79,7 @@ go_library(
"//time/slots:go_default_library",
"@com_github_btcsuite_btcd_btcec_v2//:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
"@com_github_ethereum_go_ethereum//log:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/discover:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",

View File

@@ -11,13 +11,13 @@ import (
// AdmissibleCustodyGroupsPeers returns a list of peers that custody a super set of the local node's custody groups.
func (s *Service) AdmissibleCustodyGroupsPeers(peers []peer.ID) ([]peer.ID, error) {
localCustodyGroupCount := peerdas.CustodyGroupCount()
localCustodyGroupCount := peerdas.ActualCustodyGroupCount()
return s.custodyGroupsAdmissiblePeers(peers, localCustodyGroupCount)
}
// AdmissibleCustodySamplingPeers returns a list of peers that custody a super set of the local node's sampling columns.
func (s *Service) AdmissibleCustodySamplingPeers(peers []peer.ID) ([]peer.ID, error) {
localSubnetSamplingSize := peerdas.CustodyGroupSamplingSize()
localSubnetSamplingSize := peerdas.CustodyGroupSamplingSize(peerdas.Actual)
return s.custodyGroupsAdmissiblePeers(peers, localSubnetSamplingSize)
}

View File

@@ -3,10 +3,12 @@ package p2p
import (
"bytes"
"crypto/ecdsa"
"io"
"net"
"sync"
"time"
glog "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/discover"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
@@ -242,7 +244,7 @@ func (s *Service) RefreshPersistentSubnets() {
}
// Get the current custody group count.
custodyGroupCount := peerdas.CustodyGroupCount()
custodyGroupCount := peerdas.ActualCustodyGroupCount()
// Get the custody group count we store in our record.
inRecordCustodyGroupCount, err := peerdas.CustodyGroupCountFromRecord(record)
@@ -456,6 +458,7 @@ func (s *Service) createListener(
Bootnodes: bootNodes,
PingInterval: s.cfg.PingInterval,
NoFindnodeLivenessCheck: s.cfg.DisableLivenessCheck,
Log: glog.NewLogger(glog.NewTerminalHandlerWithLevel(io.Discard, -12, true)),
}
listener, err := discover.ListenV5(conn, localNode, dv5Cfg)
@@ -492,7 +495,7 @@ func (s *Service) createLocalNode(
}
if params.FuluEnabled() {
custodyGroupCount := peerdas.CustodyGroupCount()
custodyGroupCount := peerdas.ActualCustodyGroupCount()
localNode.Set(peerdas.Cgc(custodyGroupCount))
}

View File

@@ -198,7 +198,7 @@ func TestGetSpec(t *testing.T) {
data, ok := resp.Data.(map[string]interface{})
require.Equal(t, true, ok)
assert.Equal(t, 171, len(data))
assert.Equal(t, 173, len(data))
for k, v := range data {
t.Run(k, func(t *testing.T) {
switch k {
@@ -547,10 +547,14 @@ func TestGetSpec(t *testing.T) {
assert.Equal(t, "1152", v)
case "NUMBER_OF_CUSTODY_GROUPS":
assert.Equal(t, "128", v)
case "BALANCE_PER_ADDITIONAL_CUSTODY_GROUP":
assert.Equal(t, "32000000000", v)
case "CUSTODY_REQUIREMENT":
assert.Equal(t, "4", v)
case "SAMPLES_PER_SLOT":
assert.Equal(t, "8", v)
case "VALIDATOR_CUSTODY_REQUIREMENT":
assert.Equal(t, "8", v)
case "MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS":
assert.Equal(t, "4096", v)
case "MAX_BLOB_COMMITMENTS_PER_BLOCK":

View File

@@ -51,7 +51,6 @@ go_test(
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/state/stategen/mock:go_default_library",
"//beacon-chain/verification:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",

View File

@@ -294,12 +294,6 @@ func (p *BeaconDbBlocker) blobsFromStoredDataColumns(indices map[uint64]bool, ro
root := bytesutil.ToBytes32(rootBytes)
// Get the number of groups we should custody.
custodyGroupCount := peerdas.CustodyGroupCount()
// Determine if we are theoretically able to reconstruct the data columns.
canTheoreticallyReconstruct := peerdas.CanSelfReconstruct(custodyGroupCount)
// Retrieve the data columns indice actually we store.
summary := p.BlobStorage.Summary(root)
@@ -313,30 +307,19 @@ func (p *BeaconDbBlocker) blobsFromStoredDataColumns(indices map[uint64]bool, ro
storedDataColumnCount := uint64(len(storedDataColumnsIndices))
storedGroupCount := storedDataColumnCount / columnsPerGroup
// Determine is we acually able to reconstruct the data columns.
canActuallyReconstruct := peerdas.CanSelfReconstruct(storedGroupCount)
// Determine is we are able to reconstruct the data columns.
canReconstruct := peerdas.CanSelfReconstruct(storedGroupCount)
if !canTheoreticallyReconstruct && !canActuallyReconstruct {
if !canReconstruct {
// There is no way to reconstruct the data columns.
return nil, &core.RpcError{
Err: errors.Errorf("the node does not custody enough data columns to reconstruct blobs. Please start the beacon node with the `--%s` flag to ensure this call to success.", flags.SubscribeToAllSubnets.Name),
Err: errors.Errorf("the node does not custody enough data columns to reconstruct blobs. Please start the beacon node with the `--%s` flag to ensure this call to success, or retry later if it already the case.", flags.SubscribeToAllSubnets.Name),
Reason: core.NotFound,
}
}
nonExtendedColumnsCount := uint64(fieldparams.NumberOfColumns / 2)
if canTheoreticallyReconstruct && !canActuallyReconstruct {
// This case may happen if the node started recently with a big enough custody count, but did not (yet) backfill all the columns.
return nil, &core.RpcError{
Err: errors.Errorf("not all data columns are available for this blob. Wanted: %d, got: %d. Please retry later.", nonExtendedColumnsCount, storedDataColumnCount),
Reason: core.NotFound}
}
// - The case !canTheoreticallyReconstruct && canActuallyReconstruct may happen if the node used to custody enough columns,
// but do not custody enough columns anymore. We are still able to reconstruct the data columns.
// - The case canTheoreticallyReconstruct && canActuallyReconstruct is the happy path.
// Check if we store all the non extended columns. If so, we can respond without reconstructing.
missingColumns := make(map[uint64]bool)
for columnIndex := range nonExtendedColumnsCount {

View File

@@ -22,7 +22,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/core"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/testutil"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
@@ -306,37 +305,27 @@ func TestBlobsFromStoredDataColumns(t *testing.T) {
params.OverrideBeaconConfig(cfg)
testCases := []struct {
errorReason core.ErrorReason
isError bool
subscribeToAllSubnets bool
storedColumnsIndice []int
name string
errorReason core.ErrorReason
isError bool
storedColumnsIndice []int
name string
}{
{
name: "Cannot theoretically nor actually reconstruct",
subscribeToAllSubnets: false,
storedColumnsIndice: noDataColumnsIndice,
isError: true,
errorReason: core.NotFound,
name: "Cannot reconstruct",
storedColumnsIndice: noDataColumnsIndice,
isError: true,
errorReason: core.NotFound,
},
{
name: "Can theoretically but not actually reconstruct",
subscribeToAllSubnets: true,
storedColumnsIndice: noDataColumnsIndice,
isError: true,
errorReason: core.NotFound,
name: "No need to reconstruct",
storedColumnsIndice: originalColumnsIndice,
isError: false,
},
{
name: "No need to reconstruct",
subscribeToAllSubnets: true,
storedColumnsIndice: originalColumnsIndice,
isError: false,
},
{
name: "Reconstruction needed",
subscribeToAllSubnets: false,
storedColumnsIndice: extendedColumnsIndice,
isError: false,
name: "Reconstruction needed",
storedColumnsIndice: extendedColumnsIndice,
isError: false,
},
}
@@ -374,13 +363,6 @@ func TestBlobsFromStoredDataColumns(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Set the subscription to all subnets flags.
resetFlags := flags.Get()
params.SetupTestConfigCleanup(t)
gFlags := new(flags.GlobalFlags)
gFlags.SubscribeToAllSubnets = tc.subscribeToAllSubnets
flags.Init(gFlags)
// Define a blob storage.
blobStorage := filesystem.NewEphemeralBlobStorage(t)
@@ -405,9 +387,6 @@ func TestBlobsFromStoredDataColumns(t *testing.T) {
expected := verifiedRoBlobs
require.DeepSSZEqual(t, expected, actual)
}
// Reset flags.
flags.Init(resetFlags)
})
}
}

View File

@@ -57,6 +57,7 @@ go_library(
"validate_sync_committee_message.go",
"validate_sync_contribution_proof.go",
"validate_voluntary_exit.go",
"validators_custody.go",
],
importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync",
visibility = [
@@ -197,6 +198,7 @@ go_test(
"validate_sync_committee_message_test.go",
"validate_sync_contribution_proof_test.go",
"validate_voluntary_exit_test.go",
"validators_custody_test.go",
],
embed = [":go_default_library"],
shard_count = 4,

View File

@@ -54,8 +54,12 @@ func (s *Service) reconstructDataColumns(ctx context.Context, verifiedRODataColu
// Retrieve the node ID.
nodeID := s.cfg.p2p.NodeID()
// Prevent custody group count to change during the rest of the function.
peerdas.CustodyGroupCountMut.RLock()
defer peerdas.CustodyGroupCountMut.RUnlock()
// Compute the custody group count.
custodyGroupCount := peerdas.CustodyGroupCount()
custodyGroupCount := peerdas.ActualCustodyGroupCount()
// Retrieve our local node info.
localNodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
@@ -167,8 +171,12 @@ func (s *Service) scheduleReconstructedDataColumnsBroadcast(
// Get the node ID.
nodeID := s.cfg.p2p.NodeID()
// Prevent custody group count to change during the rest of the function.
peerdas.CustodyGroupCountMut.RLock()
defer peerdas.CustodyGroupCountMut.RUnlock()
// Get the custody group count.
custodyGroupCount := peerdas.CustodyGroupCount()
custodyGroupCount := peerdas.ActualCustodyGroupCount()
// Retrieve the local node info.
localNodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)

View File

@@ -99,7 +99,8 @@ func (d *dataColumnSampler1D) Run(ctx context.Context) {
nodeID := d.p2p.NodeID()
// Verify if we need to run sampling or not, if not, return directly.
custodyGroupCount := peerdas.CustodyGroupCount()
// TODO: Rework this part to take into account dynamic custody group count with peer sampling.
custodyGroupCount := peerdas.ActualCustodyGroupCount()
// Retrieve our local node info.
localNodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)

View File

@@ -771,7 +771,7 @@ func (f *blocksFetcher) custodyColumns() (map[uint64]bool, error) {
localNodeID := f.p2p.NodeID()
// Retrieve the number of groups we should custody.
localCustodyGroupCount := peerdas.CustodyGroupCount()
localCustodyGroupCount := peerdas.ActualCustodyGroupCount()
// Retrieve the local node info.
localNodeInfo, _, err := peerdas.Info(localNodeID, localCustodyGroupCount)

View File

@@ -352,7 +352,7 @@ func (s *Service) missingColumnRequest(roBlock blocks.ROBlock, store *filesystem
nodeID := s.cfg.P2P.NodeID()
// Get the custody group count.
custodyGroupsCount := peerdas.CustodyGroupCount()
custodyGroupsCount := peerdas.ActualCustodyGroupCount()
// Retrieve the peer info.
peerInfo, _, err := peerdas.Info(nodeID, custodyGroupsCount)
@@ -492,7 +492,7 @@ func (s *Service) fetchOriginColumns(pids []peer.ID) error {
log.WithField("root", fmt.Sprintf("%#x", r)).WithField("peerID", pids[i]).Warn("Columns from peer for origin block were unusable")
continue
}
log.WithField("nColumns", len(sidecars)).WithField("root", fmt.Sprintf("%#x", r)).Info("Successfully downloaded blobs for checkpoint sync block")
log.WithField("nColumns", len(sidecars)).WithField("root", fmt.Sprintf("%#x", r)).Info("Successfully downloaded data columns for checkpoint sync block")
return nil
}
return fmt.Errorf("no connected peer able to provide columns for checkpoint sync block %#x", r)

View File

@@ -188,3 +188,11 @@ func WithAvailableBlocker(avb coverage.AvailableBlocker) Option {
return nil
}
}
// WithTrackedValidatorsCache for tracked validators cache.
func WithTrackedValidatorsCache(c *cache.TrackedValidatorsCache) Option {
return func(s *Service) error {
s.trackedValidatorsCache = c
return nil
}
}

View File

@@ -313,7 +313,7 @@ func (s *Service) buildRequestsForMissingDataColumns(root [32]byte, block interf
nodeID := s.cfg.p2p.NodeID()
// Retrieve the number of groups we should sample from.
samplingGroupSize := peerdas.CustodyGroupSamplingSize()
samplingGroupSize := peerdas.CustodyGroupSamplingSize(peerdas.Actual)
// Retrieve the peer info.
peerInfo, _, err := peerdas.Info(nodeID, samplingGroupSize)

View File

@@ -8,7 +8,6 @@ import (
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
@@ -94,36 +93,14 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i
return errors.New("message is not type *pb.DataColumnSidecarsByRangeRequest")
}
// Get our node ID.
nodeID := s.cfg.p2p.NodeID()
numberOfColumns := params.BeaconConfig().NumberOfColumns
// Get the number of groups we should custody.
custodyGroupCount := peerdas.CustodyGroupCount()
// Retrieve the peer info.
peerInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
if err != nil {
s.writeErrorResponseToStream(responseCodeServerError, err.Error(), stream)
return errors.Wrap(err, "peer info")
}
custodyColumns := peerInfo.CustodyColumns
custodyColumnsCount := uint64(len(custodyColumns))
// Compute requested columns.
requestedColumns := r.Columns
requestedColumnsCount := uint64(len(requestedColumns))
// Format log fields.
var (
custodyColumnsLog interface{} = "all"
requestedColumnsLog interface{} = "all"
)
if custodyColumnsCount != numberOfColumns {
custodyColumnsLog = uint64MapToSortedSlice(custodyColumns)
}
var requestedColumnsLog interface{} = "all"
if requestedColumnsCount != numberOfColumns {
requestedColumnsLog = requestedColumns
@@ -134,7 +111,6 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i
log.WithFields(logrus.Fields{
"remotePeer": remotePeer,
"custodyColumns": custodyColumnsLog,
"requestedColumns": requestedColumnsLog,
"startSlot": r.StartSlot,
"count": r.Count,

View File

@@ -10,7 +10,6 @@ import (
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
@@ -103,32 +102,9 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
return errors.Wrapf(err, "unexpected error computing min valid blob request slot, current_slot=%d", cs)
}
// Retrieve our node ID.
nodeID := s.cfg.p2p.NodeID()
// Retrieve the number of groups we should custody.
custodyGroupCount := peerdas.CustodyGroupCount()
// Retrieve the peer info.
peerInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
if err != nil {
s.writeErrorResponseToStream(responseCodeServerError, err.Error(), stream)
return errors.Wrap(err, "peer info")
}
custodyColumns := peerInfo.CustodyColumns
custodyColumnsCount := uint64(len(custodyColumns))
var custody interface{} = "all"
if custodyColumnsCount != numberOfColumns {
custody = uint64MapToSortedSlice(custodyColumns)
}
remotePeer := stream.Conn().RemotePeer()
log := log.WithFields(logrus.Fields{
"peer": remotePeer,
"custody": custody,
"columns": requestedColumnsByRootLog,
})

View File

@@ -167,6 +167,7 @@ type Service struct {
newBlobVerifier verification.NewBlobVerifier
newColumnsVerifier verification.NewDataColumnsVerifier
availableBlocker coverage.AvailableBlocker
trackedValidatorsCache *cache.TrackedValidatorsCache
dataColumsnReconstructionLock sync.Mutex
receivedDataColumnsFromRoot *gcache.Cache
receivedDataColumnsFromRootLock sync.RWMutex
@@ -265,6 +266,7 @@ func (s *Service) Start() {
s.processPendingAttsQueue()
s.maintainPeerStatuses()
s.resyncIfBehind()
go s.maintainValidatorsCustody()
// Update sync metrics.
async.RunEvery(s.ctx, syncMetricsInterval, s.updateMetrics)

View File

@@ -592,7 +592,7 @@ func (s *Service) enoughPeersAreConnected(subnetTopic string) bool {
func (s *Service) dataColumnSubnetIndices(_ primitives.Slot) []uint64 {
nodeID := s.cfg.p2p.NodeID()
custodyGroupCount := peerdas.CustodyGroupSamplingSize()
custodyGroupCount := peerdas.CustodyGroupSamplingSize(peerdas.Target)
nodeInfo, _, err := peerdas.Info(nodeID, custodyGroupCount)
if err != nil {

View File

@@ -0,0 +1,147 @@
package sync
import (
"fmt"
"strings"
"time"
"github.com/prysmaticlabs/prysm/v5/async"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/time/slots"
)
const updateToAdvertiseCustodyGroupCountPeriod = 1 * time.Minute
func (s *Service) maintainValidatorsCustody() {
async.RunEvery(s.ctx, updateToAdvertiseCustodyGroupCountPeriod, s.updateToAdvertiseCustodyGroupCount)
stateCh := make(chan *feed.Event, 1)
stateSub := s.cfg.stateNotifier.StateFeed().Subscribe(stateCh)
defer stateSub.Unsubscribe()
latestProcessedEpoch := params.BeaconConfig().FarFutureEpoch
for {
select {
case event := <-stateCh:
latestProcessedEpoch = s.handleEvent(event, latestProcessedEpoch)
case err := <-stateSub.Err():
log.WithError(err).Error("DataColumnSampler1D subscription to state feed failed")
case <-s.ctx.Done():
log.Debug("Context canceled, exiting data column sampling loop.")
return
}
}
}
// updateToAdvertiseCustodyGroupCount updates the custody group count to advertise.
func (s *Service) updateToAdvertiseCustodyGroupCount() {
// Retrieve the registered topics, and store them in a map for quick lookup.
registeredTopicsSlice := s.subHandler.allTopics()
registeredTopics := make(map[string]bool, len(registeredTopicsSlice))
for _, topic := range registeredTopicsSlice {
topicMessage := extractGossipMessage(topic)
registeredTopics[topicMessage] = true
}
// Get the node ID.
nodeID := s.cfg.p2p.NodeID()
peerdas.CustodyGroupCountMut.Lock()
defer peerdas.CustodyGroupCountMut.Unlock()
// Get the custody group count.
targetCustodyGroupCount := peerdas.TargetCustodyGroupCount.Get()
// Get the peerDAS info.
info, _, err := peerdas.Info(nodeID, targetCustodyGroupCount)
if err != nil {
log.WithError(err).Error("Failed to get peerDAS info")
return
}
for column := range info.CustodyColumns {
topicMessage := fmt.Sprintf(p2p.GossipDataColumnSidecarMessage+"_%d", column)
if !registeredTopics[topicMessage] {
// At least one data column subnet we should be subscribed to is not.
return
}
}
// All data column subnets we should be subscribed to are.
peerdas.ToAdvertiseCustodyGroupCount.Set(targetCustodyGroupCount)
}
// handleEvent handles a state feed event.
func (s *Service) handleEvent(event *feed.Event, latestProcessedEpoch primitives.Epoch) primitives.Epoch {
// Ignore events that are not block processed events.
if event.Type != state.BlockProcessed {
return latestProcessedEpoch
}
// Ignore events that do not have the correct data type.
data, ok := event.Data.(*state.BlockProcessedData)
if !ok {
log.Error("Event feed data is not of type *statefeed.BlockProcessedData")
return latestProcessedEpoch
}
// Ignore events that are not verified.
if !data.Verified {
return latestProcessedEpoch
}
// Return early if this epoch has already been processed.
epoch := slots.ToEpoch(data.Slot)
if epoch == latestProcessedEpoch {
return latestProcessedEpoch
}
// Get the indices of the tracked validators.
indices := s.trackedValidatorsCache.Indices()
// Write lock custody group count.
peerdas.CustodyGroupCountMut.Lock()
defer peerdas.CustodyGroupCountMut.Unlock()
// Set the validators custody requirement if there are no tracked validators.
if len(indices) == 0 {
peerdas.TargetCustodyGroupCount.SetValidatorsCustodyRequirement(0)
return epoch
}
// Get the state for the block root.
state := s.cfg.stateGen.StateByRootIfCachedNoCopy(data.BlockRoot)
if state == nil {
return latestProcessedEpoch
}
// Get the validators custody requirement.
validatorsCustodyRequirement, err := peerdas.ValidatorsCustodyRequirement(state, indices)
if err != nil {
log.WithError(err).Error("Failed to get validators custody requirement")
return latestProcessedEpoch
}
// Set the validators custody requirement.
peerdas.TargetCustodyGroupCount.SetValidatorsCustodyRequirement(validatorsCustodyRequirement)
return epoch
}
// extractGossipMessage extracts the gossip data column sidecar message from a topic.
func extractGossipMessage(s string) string {
parts := strings.SplitN(s, "/", 5)
if len(parts) < 4 {
return ""
}
return parts[3]
}

View File

@@ -0,0 +1,215 @@
package sync
import (
"context"
"testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
testDB "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
doublylinkedtree "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/doubly-linked-tree"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util"
)
func TestUpdateToAdvertiseCustodyGroupCount(t *testing.T) {
testCases := []struct {
name string
topics []string
validatorsCustodyRequirement uint64
expected uint64
}{
{
name: "some missing registered topics",
topics: []string{"/eth2/ae729ef4/data_column_sidecar_1"},
validatorsCustodyRequirement: 9,
expected: 4,
},
{
name: "all registered topics",
topics: []string{
"/eth2/ae729ef4/data_column_sidecar_1",
"/eth2/ae729ef4/data_column_sidecar_6",
"/eth2/ae729ef4/data_column_sidecar_17",
"/eth2/ae729ef4/data_column_sidecar_19",
"/eth2/ae729ef4/data_column_sidecar_42",
"/eth2/ae729ef4/data_column_sidecar_75",
"/eth2/ae729ef4/data_column_sidecar_87",
"/eth2/ae729ef4/data_column_sidecar_102",
"/eth2/ae729ef4/data_column_sidecar_117",
},
validatorsCustodyRequirement: 9,
expected: 9,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Create a new subTopicHandler and add the topics.
subHandler := newSubTopicHandler()
for _, topic := range tc.topics {
subHandler.addTopic(topic, nil)
}
// Create a new service.
service := &Service{
cfg: &config{
p2p: &p2p.Service{},
},
subHandler: subHandler,
}
// Set the target custody group count.
peerdas.TargetCustodyGroupCount.SetValidatorsCustodyRequirement(tc.validatorsCustodyRequirement)
// Update the custody group count to advertise.
service.updateToAdvertiseCustodyGroupCount()
// Get the custody group count to advertise.
actual := peerdas.ToAdvertiseCustodyGroupCount.Get()
// Check if the custody group count to advertise is as expected.
require.Equal(t, tc.expected, actual)
})
}
}
func TestHandleEvent(t *testing.T) {
verifiedEvent := &feed.Event{
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: 33,
BlockRoot: [32]byte{},
Verified: true,
},
}
testCases := []struct {
name string
event *feed.Event
latestProcessedEpoch primitives.Epoch
validatorsBalance []uint64
expectedTargetCustodyGroupCount uint64
expectedEpoch primitives.Epoch
}{
{
name: "wrong event type",
event: &feed.Event{Type: statefeed.ChainStarted},
latestProcessedEpoch: 42,
expectedTargetCustodyGroupCount: 4,
expectedEpoch: 42,
},
{
name: "wrong data type",
event: &feed.Event{
Type: statefeed.BlockProcessed,
Data: "wrong data type",
},
latestProcessedEpoch: 42,
expectedTargetCustodyGroupCount: 4,
expectedEpoch: 42,
},
{
name: "data not verified",
event: &feed.Event{
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Verified: false,
},
},
latestProcessedEpoch: 42,
expectedTargetCustodyGroupCount: 4,
expectedEpoch: 42,
},
{
name: "epoch already processed",
event: verifiedEvent,
latestProcessedEpoch: 1,
expectedTargetCustodyGroupCount: 4,
expectedEpoch: 1,
},
{
name: "no tracked validators",
event: verifiedEvent,
latestProcessedEpoch: 0,
expectedTargetCustodyGroupCount: 4,
expectedEpoch: 1,
},
{
name: "some tracked validators",
event: verifiedEvent,
latestProcessedEpoch: 0,
validatorsBalance: []uint64{64_000_000_000, 64_000_000_000, 64_000_000_000, 64_000_000_000, 33_000_000_000},
expectedTargetCustodyGroupCount: 9,
expectedEpoch: 1,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
beaconDB := testDB.SetupDB(t)
stateGen := stategen.New(beaconDB, doublylinkedtree.New())
state, _ := util.DeterministicGenesisState(t, 32)
err := state.SetBalances(tc.validatorsBalance)
require.NoError(t, err)
err = stateGen.SaveState(ctx, [32]byte{}, state)
require.NoError(t, err)
service := &Service{
trackedValidatorsCache: cache.NewTrackedValidatorsCache(),
cfg: &config{
stateGen: stateGen,
},
}
for index := range tc.validatorsBalance {
validator := cache.TrackedValidator{
Active: true,
Index: primitives.ValidatorIndex(index),
}
service.trackedValidatorsCache.Set(validator)
}
actualEpoch := service.handleEvent(tc.event, tc.latestProcessedEpoch)
require.Equal(t, tc.expectedEpoch, actualEpoch)
actualTargetCustodyGroup := peerdas.TargetCustodyGroupCount.Get()
require.Equal(t, tc.expectedTargetCustodyGroupCount, actualTargetCustodyGroup)
})
}
}
func TestExtractGossipMessage(t *testing.T) {
testCases := []struct {
name string
expected string
}{
{
name: "/eth2/ae729ef4/beacon_attestation_28/ssz_snappy",
expected: "beacon_attestation_28",
},
{
name: "/eth2/ae729ef4/beacon_attestation_28",
expected: "beacon_attestation_28",
},
{
name: "/eth2/ae729ef4",
expected: "",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := extractGossipMessage(tc.name)
require.Equal(t, tc.expected, actual)
})
}
}

View File

@@ -274,6 +274,8 @@ type BeaconChainConfig struct {
MaxCellsInExtendedMatrix uint64 `yaml:"MAX_CELLS_IN_EXTENDED_MATRIX"` // MaxCellsInExtendedMatrix is the full data of one-dimensional erasure coding extended blobs (in row major format).
DataColumnSidecarSubnetCount uint64 `yaml:"DATA_COLUMN_SIDECAR_SUBNET_COUNT" spec:"true"` // DataColumnSidecarSubnetCount is the number of data column sidecar subnets used in the gossipsub protocol
MaxRequestDataColumnSidecars uint64 `yaml:"MAX_REQUEST_DATA_COLUMN_SIDECARS" spec:"true"` // MaxRequestDataColumnSidecars is the maximum number of data column sidecars in a single request
ValidatorCustodyRequirement uint64 `yaml:"VALIDATOR_CUSTODY_REQUIREMENT" spec:"true"` // ValidatorCustodyRequirement is the minimum number of custody groups an honest node with validators attached custodies and serves samples from
BalancePerAdditionalCustodyGroup uint64 `yaml:"BALANCE_PER_ADDITIONAL_CUSTODY_GROUP" spec:"true"` // BalancePerAdditionalCustodyGroup is the balance increment corresponding to one additional group to custody.
// Networking Specific Parameters
GossipMaxSize uint64 `yaml:"GOSSIP_MAX_SIZE" spec:"true"` // GossipMaxSize is the maximum allowed size of uncompressed gossip messages.

View File

@@ -24,7 +24,6 @@ import (
// These are variables that we don't use in Prysm. (i.e. future hardfork, light client... etc)
// IMPORTANT: Use one field per line and sort these alphabetically to reduce conflicts.
var placeholderFields = []string{
"BALANCE_PER_ADDITIONAL_CUSTODY_GROUP",
"BLOB_SIDECAR_SUBNET_COUNT_FULU",
"EIP6110_FORK_EPOCH",
"EIP6110_FORK_VERSION",

View File

@@ -315,6 +315,8 @@ var mainnetBeaconConfig = &BeaconChainConfig{
CustodyRequirement: 4,
MinEpochsForDataColumnSidecarsRequest: 4096,
MaxCellsInExtendedMatrix: 768,
ValidatorCustodyRequirement: 8,
BalancePerAdditionalCustodyGroup: 32_000_000_000,
// Values related to networking parameters.
GossipMaxSize: 10 * 1 << 20, // 10 MiB