mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-06 20:13:59 -05:00
adding semi-supernode feature (#16029)
<!-- Thanks for sending a PR! Before submitting: 1. If this is your first PR, check out our contribution guide here https://docs.prylabs.network/docs/contribute/contribution-guidelines You will then need to sign our Contributor License Agreement (CLA), which will show up as a comment from a bot in this pull request after you open it. We cannot review code without a signed CLA. 2. Please file an associated tracking issue if this pull request is non-trivial and requires context for our team to understand. All features and most bug fixes should have an associated issue with a design discussed and decided upon. Small bug fixes and documentation improvements don't need issues. 3. New features and bug fixes must have tests. Documentation may need to be updated. If you're unsure what to update, send the PR, and we'll discuss in review. 4. Note that PRs updating dependencies and new Go versions are not accepted. Please file an issue instead. 5. A changelog entry is required for user facing issues. --> **What type of PR is this?** Feature **What does this PR do? Why is it needed?** | Feature | Semi-Supernode | Supernode | | ----------------------- | ------------------------- | ------------------------ | | **Custody Groups** | 64 | 128 | | **Data Columns** | 64 | 128 | | **Storage** | ~50% | ~100% | | **Blob Reconstruction** | Yes (via Reed-Solomon) | No reconstruction needed | | **Flag** | `--semi-supernode` | `--supernode` | | **Can serve all blobs** | Yes (with reconstruction) | Yes (directly) | **note** if your validator total effective balance results in more custody than the semi-supernode it will override those those requirements. cgc=64 from @nalepae Pro: - We are useful to the network - Less disconnection likelihood - Straight forward to implement Con: - We cannot revert to a full node - We have to serve incoming RPC requests corresponding to 64 columns Tested the following using this kurtosis setup ``` participants: # Super-nodes - el_type: geth el_image: ethpandaops/geth:master cl_type: prysm vc_image: gcr.io/offchainlabs/prysm/validator:latest cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest count: 2 cl_extra_params: - --supernode vc_extra_params: - --verbosity=debug # Full-nodes - el_type: geth el_image: ethpandaops/geth:master cl_type: prysm vc_image: gcr.io/offchainlabs/prysm/validator:latest cl_image: gcr.io/offchainlabs/prysm/beacon-chain:latest count: 2 validator_count: 1 cl_extra_params: - --semi-supernode vc_extra_params: - --verbosity=debug additional_services: - dora - spamoor spamoor_params: image: ethpandaops/spamoor:master max_mem: 4000 spammers: - scenario: eoatx config: throughput: 200 - scenario: blobs config: throughput: 20 network_params: fulu_fork_epoch: 0 withdrawal_type: "0x02" preset: mainnet global_log_level: debug ``` ``` curl -H "Accept: application/json" http://127.0.0.1:32961/eth/v1/node/identity {"data":{"peer_id":"16Uiu2HAm7xzhnGwea8gkcxRSC6fzUkvryP6d9HdWNkoeTkj6RSqw","enr":"enr:-Ni4QIH5u2NQz17_pTe9DcCfUyG8TidDJJjIeBpJRRm4ACQzGBpCJdyUP9eGZzwwZ2HS1TnB9ACxFMQ5LP5njnMDLm-GAZqZEXjih2F0dG5ldHOIAAAAAAAwAACDY2djQIRldGgykLZy_whwAAA4__________-CaWSCdjSCaXCErBAAE4NuZmSEAAAAAIRxdWljgjLIiXNlY3AyNTZrMaECulJrXpSOBmCsQWcGYzQsst7r3-Owlc9iZbEcJTDkB6qIc3luY25ldHMFg3RjcIIyyIN1ZHCCLuA","p2p_addresses":["/ip4/172.16.0.19/tcp/13000/p2p/16Uiu2HAm7xzhnGwea8gkcxRSC6fzUkvryP6d9HdWNkoeTkj6RSqw","/ip4/172.16.0.19/udp/13000/quic-v1/p2p/16Uiu2HAm7xzhnGwea8gkcxRSC6fzUkvryP6d9HdWNkoeTkj6RSqw"],"discovery_addresses":["/ip4/172.16.0.19/udp/12000/p2p/16Uiu2HAm7xzhnGwea8gkcxRSC6fzUkvryP6d9HdWNkoeTkj6RSqw"],"metadata":{"seq_number":"3","attnets":"0x0000000000300000","syncnets":"0x05","custody_group_count":"64"}}} ``` ``` curl -s http://127.0.0.1:32961/eth/v1/debug/beacon/data_column_sidecars/head | jq '.data | length' 64 ``` ``` curl -X 'GET' \ 'http://127.0.0.1:32961/eth/v1/beacon/blobs/head' \ -H 'accept: application/json' ``` **Which issues(s) does this PR fix?** Fixes # **Other notes for review** **Acknowledgements** - [x] I have read [CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md). - [x] I have included a uniquely named [changelog fragment file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd). - [x] I have added a description to this PR with sufficient context for reviewers to understand this PR. --------- Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com> Co-authored-by: james-prysm <jhe@offchainlabs.com> Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
This commit is contained in:
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/cache"
|
||||
statefeed "github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/state"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/helpers"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
|
||||
coreTime "github.com/OffchainLabs/prysm/v7/beacon-chain/core/time"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/transition"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/db"
|
||||
@@ -470,30 +471,35 @@ func (s *Service) removeStartupState() {
|
||||
// UpdateCustodyInfoInDB updates the custody information in the database.
|
||||
// It returns the (potentially updated) custody group count and the earliest available slot.
|
||||
func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot, uint64, error) {
|
||||
isSubscribedToAllDataSubnets := flags.Get().SubscribeAllDataSubnets
|
||||
isSupernode := flags.Get().Supernode
|
||||
isSemiSupernode := flags.Get().SemiSupernode
|
||||
|
||||
cfg := params.BeaconConfig()
|
||||
custodyRequirement := cfg.CustodyRequirement
|
||||
|
||||
// Check if the node was previously subscribed to all data subnets, and if so,
|
||||
// store the new status accordingly.
|
||||
wasSubscribedToAllDataSubnets, err := s.cfg.BeaconDB.UpdateSubscribedToAllDataSubnets(s.ctx, isSubscribedToAllDataSubnets)
|
||||
wasSupernode, err := s.cfg.BeaconDB.UpdateSubscribedToAllDataSubnets(s.ctx, isSupernode)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not update subscription status to all data subnets")
|
||||
return 0, 0, errors.Wrap(err, "update subscribed to all data subnets")
|
||||
}
|
||||
|
||||
// Warn the user if the node was previously subscribed to all data subnets and is not any more.
|
||||
if wasSubscribedToAllDataSubnets && !isSubscribedToAllDataSubnets {
|
||||
log.Warnf(
|
||||
"Because the flag `--%s` was previously used, the node will still subscribe to all data subnets.",
|
||||
flags.SubscribeAllDataSubnets.Name,
|
||||
)
|
||||
// Compute the target custody group count based on current flag configuration.
|
||||
targetCustodyGroupCount := custodyRequirement
|
||||
|
||||
// Supernode: custody all groups (either currently set or previously enabled)
|
||||
if isSupernode {
|
||||
targetCustodyGroupCount = cfg.NumberOfCustodyGroups
|
||||
}
|
||||
|
||||
// Compute the custody group count.
|
||||
custodyGroupCount := custodyRequirement
|
||||
if isSubscribedToAllDataSubnets {
|
||||
custodyGroupCount = cfg.NumberOfCustodyGroups
|
||||
// Semi-supernode: custody minimum needed for reconstruction, or custody requirement if higher
|
||||
if isSemiSupernode {
|
||||
semiSupernodeCustody, err := peerdas.MinimumCustodyGroupCountToReconstruct()
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "minimum custody group count")
|
||||
}
|
||||
|
||||
targetCustodyGroupCount = max(custodyRequirement, semiSupernodeCustody)
|
||||
}
|
||||
|
||||
// Safely compute the fulu fork slot.
|
||||
@@ -510,12 +516,23 @@ func (s *Service) updateCustodyInfoInDB(slot primitives.Slot) (primitives.Slot,
|
||||
}
|
||||
}
|
||||
|
||||
earliestAvailableSlot, custodyGroupCount, err := s.cfg.BeaconDB.UpdateCustodyInfo(s.ctx, slot, custodyGroupCount)
|
||||
earliestAvailableSlot, actualCustodyGroupCount, err := s.cfg.BeaconDB.UpdateCustodyInfo(s.ctx, slot, targetCustodyGroupCount)
|
||||
if err != nil {
|
||||
return 0, 0, errors.Wrap(err, "update custody info")
|
||||
}
|
||||
|
||||
return earliestAvailableSlot, custodyGroupCount, nil
|
||||
if isSupernode {
|
||||
log.WithFields(logrus.Fields{
|
||||
"current": actualCustodyGroupCount,
|
||||
"target": cfg.NumberOfCustodyGroups,
|
||||
}).Info("Supernode mode enabled. Will custody all data columns going forward.")
|
||||
}
|
||||
|
||||
if wasSupernode && !isSupernode {
|
||||
log.Warningf("Because the `--%s` flag was previously used, the node will continue to act as a super node.", flags.Supernode.Name)
|
||||
}
|
||||
|
||||
return earliestAvailableSlot, actualCustodyGroupCount, nil
|
||||
}
|
||||
|
||||
func spawnCountdownIfPreGenesis(ctx context.Context, genesisTime time.Time, db db.HeadAccessDatabase) {
|
||||
|
||||
@@ -642,7 +642,7 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SubscribeAllDataSubnets = true
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
@@ -680,7 +680,7 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
// ----------
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SubscribeAllDataSubnets = true
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
@@ -695,4 +695,121 @@ func TestUpdateCustodyInfoInDB(t *testing.T) {
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
})
|
||||
|
||||
t.Run("Supernode downgrade prevented", func(t *testing.T) {
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Enable supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc)
|
||||
|
||||
// Try to downgrade by removing flag
|
||||
gFlags.Supernode = false
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Should still be supernode
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc) // Still 64, not downgraded
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode downgrade prevented", func(t *testing.T) {
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Enable semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SemiSupernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Semi-supernode custodies 64 groups
|
||||
|
||||
// Try to downgrade by removing flag
|
||||
gFlags.SemiSupernode = false
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// UpdateCustodyInfo should prevent downgrade - custody count should remain at 64
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(slot + 2)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Still 64 due to downgrade prevention by UpdateCustodyInfo
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode to supernode upgrade allowed", func(t *testing.T) {
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Start with semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SemiSupernode = true
|
||||
flags.Init(gFlags)
|
||||
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc) // Semi-supernode custodies 64 groups
|
||||
|
||||
// Upgrade to full supernode
|
||||
gFlags.SemiSupernode = false
|
||||
gFlags.Supernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Should upgrade to full supernode
|
||||
upgradeSlot := slot + 2
|
||||
actualEas, actualCgc, err = service.updateCustodyInfoInDB(upgradeSlot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, upgradeSlot, actualEas) // Earliest slot updates when upgrading
|
||||
require.Equal(t, numberOfCustodyGroups, actualCgc) // Upgraded to 128
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode with high validator requirements uses higher custody", func(t *testing.T) {
|
||||
service, requirements := minimalTestService(t)
|
||||
err = requirements.db.SaveBlock(ctx, roBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Enable semi-supernode
|
||||
resetFlags := flags.Get()
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.SemiSupernode = true
|
||||
flags.Init(gFlags)
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Mock a high custody requirement (simulating many validators)
|
||||
// We need to override the custody requirement calculation
|
||||
// For this test, we'll verify the logic by checking if custodyRequirement > 64
|
||||
// Since custodyRequirement in minimalTestService is 4, we can't test the high case here
|
||||
// This would require a different test setup with actual validators
|
||||
slot := fuluForkEpoch*primitives.Slot(cfg.SlotsPerEpoch) + 1
|
||||
actualEas, actualCgc, err := service.updateCustodyInfoInDB(slot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, slot, actualEas)
|
||||
semiSupernodeCustody := numberOfCustodyGroups / 2 // 64
|
||||
// With low validator requirements (4), should use semi-supernode minimum (64)
|
||||
require.Equal(t, semiSupernodeCustody, actualCgc)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -45,12 +45,13 @@ go_test(
|
||||
"p2p_interface_test.go",
|
||||
"reconstruction_helpers_test.go",
|
||||
"reconstruction_test.go",
|
||||
"semi_supernode_test.go",
|
||||
"utils_test.go",
|
||||
"validator_test.go",
|
||||
"verification_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
":go_default_library",
|
||||
"//beacon-chain/blockchain/kzg:go_default_library",
|
||||
"//beacon-chain/state/state-native:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
|
||||
@@ -29,6 +29,38 @@ func MinimumColumnCountToReconstruct() uint64 {
|
||||
return (params.BeaconConfig().NumberOfColumns + 1) / 2
|
||||
}
|
||||
|
||||
// MinimumCustodyGroupCountToReconstruct returns the minimum number of custody groups needed to
|
||||
// custody enough data columns for reconstruction. This accounts for the relationship between
|
||||
// custody groups and columns, making it future-proof if these values change.
|
||||
// Returns an error if the configuration values are invalid (zero or would cause division by zero).
|
||||
func MinimumCustodyGroupCountToReconstruct() (uint64, error) {
|
||||
cfg := params.BeaconConfig()
|
||||
|
||||
// Validate configuration values
|
||||
if cfg.NumberOfColumns == 0 {
|
||||
return 0, errors.New("NumberOfColumns cannot be zero")
|
||||
}
|
||||
if cfg.NumberOfCustodyGroups == 0 {
|
||||
return 0, errors.New("NumberOfCustodyGroups cannot be zero")
|
||||
}
|
||||
|
||||
minimumColumnCount := MinimumColumnCountToReconstruct()
|
||||
|
||||
// Calculate how many columns each custody group represents
|
||||
columnsPerGroup := cfg.NumberOfColumns / cfg.NumberOfCustodyGroups
|
||||
|
||||
// If there are more groups than columns (columnsPerGroup = 0), this is an invalid configuration
|
||||
// for reconstruction purposes as we cannot determine a meaningful custody group count
|
||||
if columnsPerGroup == 0 {
|
||||
return 0, errors.Errorf("invalid configuration: NumberOfCustodyGroups (%d) exceeds NumberOfColumns (%d)",
|
||||
cfg.NumberOfCustodyGroups, cfg.NumberOfColumns)
|
||||
}
|
||||
|
||||
// Use ceiling division to ensure we have enough groups to cover the minimum columns
|
||||
// ceiling(a/b) = (a + b - 1) / b
|
||||
return (minimumColumnCount + columnsPerGroup - 1) / columnsPerGroup, nil
|
||||
}
|
||||
|
||||
// recoverCellsForBlobs reconstructs cells for specified blobs from the given data column sidecars.
|
||||
// This is optimized to only recover cells without computing proofs.
|
||||
// Returns a map from blob index to recovered cells.
|
||||
|
||||
160
beacon-chain/core/peerdas/semi_supernode_test.go
Normal file
160
beacon-chain/core/peerdas/semi_supernode_test.go
Normal file
@@ -0,0 +1,160 @@
|
||||
package peerdas
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/require"
|
||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||
)
|
||||
|
||||
func TestSemiSupernodeCustody(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.NumberOfCustodyGroups = 128
|
||||
cfg.NumberOfColumns = 128
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
// Create a test node ID
|
||||
nodeID := enode.ID([32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32})
|
||||
|
||||
t.Run("semi-supernode custodies exactly 64 columns", func(t *testing.T) {
|
||||
// Semi-supernode uses 64 custody groups (half of 128)
|
||||
const semiSupernodeCustodyGroupCount = 64
|
||||
|
||||
// Get custody groups for semi-supernode
|
||||
custodyGroups, err := CustodyGroups(nodeID, semiSupernodeCustodyGroupCount)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, semiSupernodeCustodyGroupCount, len(custodyGroups))
|
||||
|
||||
// Verify we get exactly 64 custody columns
|
||||
custodyColumns, err := CustodyColumns(custodyGroups)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, semiSupernodeCustodyGroupCount, len(custodyColumns))
|
||||
|
||||
// Verify the columns are valid (within 0-127 range)
|
||||
for columnIndex := range custodyColumns {
|
||||
if columnIndex >= cfg.NumberOfColumns {
|
||||
t.Fatalf("Invalid column index %d, should be less than %d", columnIndex, cfg.NumberOfColumns)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("64 columns is exactly the minimum for reconstruction", func(t *testing.T) {
|
||||
minimumCount := MinimumColumnCountToReconstruct()
|
||||
require.Equal(t, uint64(64), minimumCount)
|
||||
})
|
||||
|
||||
t.Run("semi-supernode vs supernode custody", func(t *testing.T) {
|
||||
// Semi-supernode (64 custody groups)
|
||||
semiSupernodeGroups, err := CustodyGroups(nodeID, 64)
|
||||
require.NoError(t, err)
|
||||
semiSupernodeColumns, err := CustodyColumns(semiSupernodeGroups)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Supernode (128 custody groups = all groups)
|
||||
supernodeGroups, err := CustodyGroups(nodeID, 128)
|
||||
require.NoError(t, err)
|
||||
supernodeColumns, err := CustodyColumns(supernodeGroups)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify semi-supernode has exactly half the columns of supernode
|
||||
require.Equal(t, 64, len(semiSupernodeColumns))
|
||||
require.Equal(t, 128, len(supernodeColumns))
|
||||
require.Equal(t, len(supernodeColumns)/2, len(semiSupernodeColumns))
|
||||
|
||||
// Verify all semi-supernode columns are a subset of supernode columns
|
||||
for columnIndex := range semiSupernodeColumns {
|
||||
if !supernodeColumns[columnIndex] {
|
||||
t.Fatalf("Semi-supernode column %d not found in supernode columns", columnIndex)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestMinimumCustodyGroupCountToReconstruct(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
numberOfColumns uint64
|
||||
numberOfGroups uint64
|
||||
expectedResult uint64
|
||||
}{
|
||||
{
|
||||
name: "Standard 1:1 ratio (128 columns, 128 groups)",
|
||||
numberOfColumns: 128,
|
||||
numberOfGroups: 128,
|
||||
expectedResult: 64, // Need half of 128 groups
|
||||
},
|
||||
{
|
||||
name: "2 columns per group (128 columns, 64 groups)",
|
||||
numberOfColumns: 128,
|
||||
numberOfGroups: 64,
|
||||
expectedResult: 32, // Need 64 columns, which is 32 groups (64/2)
|
||||
},
|
||||
{
|
||||
name: "4 columns per group (128 columns, 32 groups)",
|
||||
numberOfColumns: 128,
|
||||
numberOfGroups: 32,
|
||||
expectedResult: 16, // Need 64 columns, which is 16 groups (64/4)
|
||||
},
|
||||
{
|
||||
name: "Odd number requiring ceiling division (100 columns, 30 groups)",
|
||||
numberOfColumns: 100,
|
||||
numberOfGroups: 30,
|
||||
expectedResult: 17, // Need 50 columns, 3 columns per group (100/30), ceiling(50/3) = 17
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.NumberOfColumns = tt.numberOfColumns
|
||||
cfg.NumberOfCustodyGroups = tt.numberOfGroups
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
result, err := MinimumCustodyGroupCountToReconstruct()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.expectedResult, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMinimumCustodyGroupCountToReconstruct_ErrorCases(t *testing.T) {
|
||||
t.Run("Returns error when NumberOfColumns is zero", func(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.NumberOfColumns = 0
|
||||
cfg.NumberOfCustodyGroups = 128
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
_, err := MinimumCustodyGroupCountToReconstruct()
|
||||
require.NotNil(t, err)
|
||||
require.Equal(t, true, err.Error() == "NumberOfColumns cannot be zero")
|
||||
})
|
||||
|
||||
t.Run("Returns error when NumberOfCustodyGroups is zero", func(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.NumberOfColumns = 128
|
||||
cfg.NumberOfCustodyGroups = 0
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
_, err := MinimumCustodyGroupCountToReconstruct()
|
||||
require.NotNil(t, err)
|
||||
require.Equal(t, true, err.Error() == "NumberOfCustodyGroups cannot be zero")
|
||||
})
|
||||
|
||||
t.Run("Returns error when NumberOfCustodyGroups exceeds NumberOfColumns", func(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.NumberOfColumns = 128
|
||||
cfg.NumberOfCustodyGroups = 256
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
_, err := MinimumCustodyGroupCountToReconstruct()
|
||||
require.NotNil(t, err)
|
||||
// Just check that we got an error about the configuration
|
||||
require.Equal(t, true, len(err.Error()) > 0)
|
||||
})
|
||||
}
|
||||
@@ -128,9 +128,9 @@ type NoHeadAccessDatabase interface {
|
||||
BackfillFinalizedIndex(ctx context.Context, blocks []blocks.ROBlock, finalizedChildRoot [32]byte) error
|
||||
|
||||
// Custody operations.
|
||||
UpdateSubscribedToAllDataSubnets(ctx context.Context, subscribed bool) (bool, error)
|
||||
UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot primitives.Slot, custodyGroupCount uint64) (primitives.Slot, uint64, error)
|
||||
UpdateEarliestAvailableSlot(ctx context.Context, earliestAvailableSlot primitives.Slot) error
|
||||
UpdateSubscribedToAllDataSubnets(ctx context.Context, subscribed bool) (bool, error)
|
||||
|
||||
// P2P Metadata operations.
|
||||
SaveMetadataSeqNum(ctx context.Context, seqNum uint64) error
|
||||
|
||||
@@ -146,9 +146,9 @@ func (s *Store) UpdateEarliestAvailableSlot(ctx context.Context, earliestAvailab
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateSubscribedToAllDataSubnets updates the "subscribed to all data subnets" status in the database
|
||||
// only if `subscribed` is `true`.
|
||||
// It returns the previous subscription status.
|
||||
// UpdateSubscribedToAllDataSubnets updates whether the node is subscribed to all data subnets (supernode mode).
|
||||
// This is a one-way flag - once set to true, it cannot be reverted to false.
|
||||
// Returns the previous state.
|
||||
func (s *Store) UpdateSubscribedToAllDataSubnets(ctx context.Context, subscribed bool) (bool, error) {
|
||||
_, span := trace.StartSpan(ctx, "BeaconDB.UpdateSubscribedToAllDataSubnets")
|
||||
defer span.End()
|
||||
@@ -156,13 +156,11 @@ func (s *Store) UpdateSubscribedToAllDataSubnets(ctx context.Context, subscribed
|
||||
result := false
|
||||
if !subscribed {
|
||||
if err := s.db.View(func(tx *bolt.Tx) error {
|
||||
// Retrieve the custody bucket.
|
||||
bucket := tx.Bucket(custodyBucket)
|
||||
if bucket == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Retrieve the subscribe all data subnets flag.
|
||||
bytes := bucket.Get(subscribeAllDataSubnetsKey)
|
||||
if len(bytes) == 0 {
|
||||
return nil
|
||||
@@ -181,7 +179,6 @@ func (s *Store) UpdateSubscribedToAllDataSubnets(ctx context.Context, subscribed
|
||||
}
|
||||
|
||||
if err := s.db.Update(func(tx *bolt.Tx) error {
|
||||
// Retrieve the custody bucket.
|
||||
bucket, err := tx.CreateBucketIfNotExists(custodyBucket)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "create custody bucket")
|
||||
|
||||
@@ -67,6 +67,7 @@ func getSubscriptionStatusFromDB(t *testing.T, db *Store) bool {
|
||||
return subscribed
|
||||
}
|
||||
|
||||
|
||||
func TestUpdateCustodyInfo(t *testing.T) {
|
||||
ctx := t.Context()
|
||||
|
||||
@@ -274,6 +275,17 @@ func TestUpdateSubscribedToAllDataSubnets(t *testing.T) {
|
||||
require.Equal(t, false, stored)
|
||||
})
|
||||
|
||||
t.Run("initial update with empty database - set to true", func(t *testing.T) {
|
||||
db := setupDB(t)
|
||||
|
||||
prev, err := db.UpdateSubscribedToAllDataSubnets(ctx, true)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, false, prev)
|
||||
|
||||
stored := getSubscriptionStatusFromDB(t, db)
|
||||
require.Equal(t, true, stored)
|
||||
})
|
||||
|
||||
t.Run("attempt to update from true to false (should not change)", func(t *testing.T) {
|
||||
db := setupDB(t)
|
||||
|
||||
@@ -288,7 +300,7 @@ func TestUpdateSubscribedToAllDataSubnets(t *testing.T) {
|
||||
require.Equal(t, true, stored)
|
||||
})
|
||||
|
||||
t.Run("attempt to update from true to false (should not change)", func(t *testing.T) {
|
||||
t.Run("update from true to true (no change)", func(t *testing.T) {
|
||||
db := setupDB(t)
|
||||
|
||||
_, err := db.UpdateSubscribedToAllDataSubnets(ctx, true)
|
||||
|
||||
@@ -450,7 +450,7 @@ func (p *BeaconDbBlocker) blobsDataFromStoredDataColumns(root [fieldparams.RootL
|
||||
if count < peerdas.MinimumColumnCountToReconstruct() {
|
||||
// There is no way to reconstruct the data columns.
|
||||
return nil, &core.RpcError{
|
||||
Err: errors.Errorf("the node does not custody enough data columns to reconstruct blobs - please start the beacon node with the `--%s` flag to ensure this call to succeed, or retry later if it is already the case", flags.SubscribeAllDataSubnets.Name),
|
||||
Err: errors.Errorf("the node does not custody enough data columns to reconstruct blobs - please start the beacon node with the `--%s` flag to ensure this call to succeed, or retry later if it is already the case", flags.Supernode.Name),
|
||||
Reason: core.NotFound,
|
||||
}
|
||||
}
|
||||
@@ -555,7 +555,7 @@ func (p *BeaconDbBlocker) blobSidecarsFromStoredDataColumns(block blocks.ROBlock
|
||||
if count < peerdas.MinimumColumnCountToReconstruct() {
|
||||
// There is no way to reconstruct the data columns.
|
||||
return nil, &core.RpcError{
|
||||
Err: errors.Errorf("the node does not custody enough data columns to reconstruct blobs - please start the beacon node with the `--%s` flag to ensure this call to succeed, or retry later if it is already the case", flags.SubscribeAllDataSubnets.Name),
|
||||
Err: errors.Errorf("the node does not custody enough data columns to reconstruct blobs - please start the beacon node with the `--%s` flag to ensure this call to succeed, or retry later if it is already the case", flags.Supernode.Name),
|
||||
Reason: core.NotFound,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,16 +92,30 @@ func (s *Service) updateCustodyInfoIfNeeded() error {
|
||||
func (s *Service) custodyGroupCount(context.Context) (uint64, error) {
|
||||
cfg := params.BeaconConfig()
|
||||
|
||||
if flags.Get().SubscribeAllDataSubnets {
|
||||
if flags.Get().Supernode {
|
||||
return cfg.NumberOfCustodyGroups, nil
|
||||
}
|
||||
|
||||
// Calculate validator custody requirements
|
||||
validatorsCustodyRequirement, err := s.validatorsCustodyRequirement()
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "validators custody requirement")
|
||||
}
|
||||
|
||||
return max(cfg.CustodyRequirement, validatorsCustodyRequirement), nil
|
||||
effectiveCustodyRequirement := max(cfg.CustodyRequirement, validatorsCustodyRequirement)
|
||||
|
||||
// If we're not in semi-supernode mode, just use the effective requirement.
|
||||
if !flags.Get().SemiSupernode {
|
||||
return effectiveCustodyRequirement, nil
|
||||
}
|
||||
|
||||
// Semi-supernode mode custodies the minimum custody groups required for reconstruction.
|
||||
// This is future-proof and works correctly even if custody groups != columns.
|
||||
semiSupernodeTarget, err := peerdas.MinimumCustodyGroupCountToReconstruct()
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "minimum custody group count")
|
||||
}
|
||||
return max(effectiveCustodyRequirement, semiSupernodeTarget), nil
|
||||
}
|
||||
|
||||
// validatorsCustodyRequirements computes the custody requirements based on the
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
mock "github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/testing"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/cache"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/db"
|
||||
dbtesting "github.com/OffchainLabs/prysm/v7/beacon-chain/db/testing"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
|
||||
@@ -106,11 +107,20 @@ func (ts *testSetup) assertCustodyInfo(t *testing.T, expectedSlot primitives.Slo
|
||||
}
|
||||
|
||||
func withSubscribeAllDataSubnets(t *testing.T, fn func()) {
|
||||
originalFlag := flags.Get().SubscribeAllDataSubnets
|
||||
originalFlag := flags.Get().Supernode
|
||||
defer func() {
|
||||
flags.Get().SubscribeAllDataSubnets = originalFlag
|
||||
flags.Get().Supernode = originalFlag
|
||||
}()
|
||||
flags.Get().SubscribeAllDataSubnets = true
|
||||
flags.Get().Supernode = true
|
||||
fn()
|
||||
}
|
||||
|
||||
func withSemiSupernode(t *testing.T, fn func()) {
|
||||
originalFlag := flags.Get().SemiSupernode
|
||||
defer func() {
|
||||
flags.Get().SemiSupernode = originalFlag
|
||||
}()
|
||||
flags.Get().SemiSupernode = true
|
||||
fn()
|
||||
}
|
||||
|
||||
@@ -195,4 +205,150 @@ func TestCustodyGroupCount(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, config.CustodyRequirement, result)
|
||||
})
|
||||
|
||||
t.Run("SemiSupernode enabled returns half of NumberOfCustodyGroups", func(t *testing.T) {
|
||||
withSemiSupernode(t, func() {
|
||||
service := &Service{
|
||||
ctx: context.Background(),
|
||||
}
|
||||
|
||||
result, err := service.custodyGroupCount(ctx)
|
||||
require.NoError(t, err)
|
||||
expected, err := peerdas.MinimumCustodyGroupCountToReconstruct()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, result)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Supernode takes precedence over SemiSupernode", func(t *testing.T) {
|
||||
// Test that when both flags are set, supernode takes precedence
|
||||
originalSupernode := flags.Get().Supernode
|
||||
originalSemiSupernode := flags.Get().SemiSupernode
|
||||
defer func() {
|
||||
flags.Get().Supernode = originalSupernode
|
||||
flags.Get().SemiSupernode = originalSemiSupernode
|
||||
}()
|
||||
flags.Get().Supernode = true
|
||||
flags.Get().SemiSupernode = true
|
||||
|
||||
service := &Service{
|
||||
ctx: context.Background(),
|
||||
}
|
||||
|
||||
result, err := service.custodyGroupCount(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, config.NumberOfCustodyGroups, result)
|
||||
})
|
||||
|
||||
t.Run("SemiSupernode with no tracked validators returns semi-supernode target", func(t *testing.T) {
|
||||
withSemiSupernode(t, func() {
|
||||
service := &Service{
|
||||
ctx: context.Background(),
|
||||
trackedValidatorsCache: cache.NewTrackedValidatorsCache(),
|
||||
}
|
||||
|
||||
result, err := service.custodyGroupCount(ctx)
|
||||
require.NoError(t, err)
|
||||
expected, err := peerdas.MinimumCustodyGroupCountToReconstruct()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, expected, result)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestSemiSupernodeValidatorCustodyOverride(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
config := params.BeaconConfig()
|
||||
config.NumberOfCustodyGroups = 128
|
||||
config.CustodyRequirement = 4
|
||||
config.ValidatorCustodyRequirement = 8
|
||||
config.BalancePerAdditionalCustodyGroup = 1000000000 // 1 ETH in Gwei
|
||||
params.OverrideBeaconConfig(config)
|
||||
|
||||
ctx := t.Context()
|
||||
|
||||
t.Run("Semi-supernode returns target when validator requirement is lower", func(t *testing.T) {
|
||||
// When validators require less custody than semi-supernode provides,
|
||||
// use the semi-supernode target (64)
|
||||
withSemiSupernode(t, func() {
|
||||
// Setup with validators requiring only 32 groups (less than 64)
|
||||
service := &Service{
|
||||
ctx: context.Background(),
|
||||
trackedValidatorsCache: cache.NewTrackedValidatorsCache(),
|
||||
}
|
||||
|
||||
result, err := service.custodyGroupCount(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Should return semi-supernode target (64) since it's higher than validator requirement
|
||||
require.Equal(t, uint64(64), result)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Validator requirement calculation respects minimum and maximum bounds", func(t *testing.T) {
|
||||
// Verify that the validator custody requirement respects:
|
||||
// - Minimum: ValidatorCustodyRequirement (8 in our config)
|
||||
// - Maximum: NumberOfCustodyGroups (128 in our config)
|
||||
|
||||
// This ensures the formula works correctly:
|
||||
// result = min(max(count, ValidatorCustodyRequirement), NumberOfCustodyGroups)
|
||||
|
||||
require.Equal(t, uint64(8), config.ValidatorCustodyRequirement)
|
||||
require.Equal(t, uint64(128), config.NumberOfCustodyGroups)
|
||||
|
||||
// Semi-supernode target should be 64 (half of 128)
|
||||
semiSupernodeTarget, err := peerdas.MinimumCustodyGroupCountToReconstruct()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(64), semiSupernodeTarget)
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode respects base CustodyRequirement", func(t *testing.T) {
|
||||
// Test that semi-supernode respects max(CustodyRequirement, validatorsCustodyRequirement)
|
||||
// even when both are below the semi-supernode target
|
||||
params.SetupTestConfigCleanup(t)
|
||||
// Setup with high base custody requirement (but still less than 64)
|
||||
testConfig := params.BeaconConfig()
|
||||
testConfig.NumberOfCustodyGroups = 128
|
||||
testConfig.CustodyRequirement = 32 // Higher than validator requirement
|
||||
testConfig.ValidatorCustodyRequirement = 8
|
||||
params.OverrideBeaconConfig(testConfig)
|
||||
|
||||
withSemiSupernode(t, func() {
|
||||
service := &Service{
|
||||
ctx: context.Background(),
|
||||
trackedValidatorsCache: cache.NewTrackedValidatorsCache(),
|
||||
}
|
||||
|
||||
result, err := service.custodyGroupCount(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Should return semi-supernode target (64) since
|
||||
// max(CustodyRequirement=32, validatorsCustodyRequirement=0) = 32 < 64
|
||||
require.Equal(t, uint64(64), result)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Semi-supernode uses higher custody when base requirement exceeds target", func(t *testing.T) {
|
||||
// Set CustodyRequirement higher than semi-supernode target (64)
|
||||
params.SetupTestConfigCleanup(t)
|
||||
testConfig := params.BeaconConfig()
|
||||
testConfig.NumberOfCustodyGroups = 128
|
||||
testConfig.CustodyRequirement = 80 // Higher than semi-supernode target of 64
|
||||
testConfig.ValidatorCustodyRequirement = 8
|
||||
params.OverrideBeaconConfig(testConfig)
|
||||
|
||||
withSemiSupernode(t, func() {
|
||||
service := &Service{
|
||||
ctx: context.Background(),
|
||||
trackedValidatorsCache: cache.NewTrackedValidatorsCache(),
|
||||
}
|
||||
|
||||
result, err := service.custodyGroupCount(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Should return CustodyRequirement (80) since it's higher than semi-supernode target (64)
|
||||
// effectiveCustodyRequirement = max(80, 0) = 80 > 64
|
||||
require.Equal(t, uint64(80), result)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2,17 +2,17 @@ package sync
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"math/rand"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/kzg"
|
||||
mockChain "github.com/OffchainLabs/prysm/v7/beacon-chain/blockchain/testing"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/peerdas"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/db/filesystem"
|
||||
p2ptest "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing"
|
||||
"github.com/OffchainLabs/prysm/v7/config/params"
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/require"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/util"
|
||||
)
|
||||
@@ -136,3 +136,117 @@ func TestComputeRandomDelay(t *testing.T) {
|
||||
fmt.Print(waitingTime)
|
||||
require.Equal(t, expected, waitingTime)
|
||||
}
|
||||
|
||||
func TestSemiSupernodeReconstruction(t *testing.T) {
|
||||
const blobCount = 4
|
||||
numberOfColumns := params.BeaconConfig().NumberOfColumns
|
||||
|
||||
ctx := t.Context()
|
||||
|
||||
// Start the trusted setup.
|
||||
err := kzg.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
roBlock, _, verifiedRoDataColumns := util.GenerateTestFuluBlockWithSidecars(t, blobCount)
|
||||
require.Equal(t, numberOfColumns, uint64(len(verifiedRoDataColumns)))
|
||||
|
||||
minimumCount := peerdas.MinimumColumnCountToReconstruct()
|
||||
|
||||
t.Run("semi-supernode reconstruction with exactly 64 columns", func(t *testing.T) {
|
||||
// Test that reconstruction works with exactly the minimum number of columns (64).
|
||||
// This simulates semi-supernode mode which custodies exactly 64 columns.
|
||||
require.Equal(t, uint64(64), minimumCount, "Expected minimum column count to be 64")
|
||||
|
||||
chainService := &mockChain.ChainService{}
|
||||
p2p := p2ptest.NewTestP2P(t)
|
||||
storage := filesystem.NewEphemeralDataColumnStorage(t)
|
||||
|
||||
service := NewService(
|
||||
ctx,
|
||||
WithP2P(p2p),
|
||||
WithDataColumnStorage(storage),
|
||||
WithChainService(chainService),
|
||||
WithOperationNotifier(chainService.OperationNotifier()),
|
||||
)
|
||||
|
||||
// Use exactly 64 columns (minimum for reconstruction) to simulate semi-supernode mode.
|
||||
// Select the first 64 columns.
|
||||
semiSupernodeColumns := verifiedRoDataColumns[:minimumCount]
|
||||
|
||||
err = service.receiveDataColumnSidecars(ctx, semiSupernodeColumns)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = storage.Save(semiSupernodeColumns)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, false, p2p.BroadcastCalled.Load())
|
||||
|
||||
// Check received indices before reconstruction.
|
||||
require.Equal(t, minimumCount, uint64(len(chainService.DataColumns)))
|
||||
for i, actual := range chainService.DataColumns {
|
||||
require.Equal(t, uint64(i), actual.Index)
|
||||
}
|
||||
|
||||
// Run the reconstruction.
|
||||
err = service.processDataColumnSidecarsFromReconstruction(ctx, verifiedRoDataColumns[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify we can reconstruct all columns from just 64.
|
||||
// The node should have received the initial 64 columns.
|
||||
if len(chainService.DataColumns) < int(minimumCount) {
|
||||
t.Fatalf("Expected at least %d columns but got %d", minimumCount, len(chainService.DataColumns))
|
||||
}
|
||||
|
||||
block := roBlock.Block()
|
||||
slot := block.Slot()
|
||||
proposerIndex := block.ProposerIndex()
|
||||
|
||||
// Verify that we have seen at least the minimum number of columns.
|
||||
seenCount := 0
|
||||
for i := range numberOfColumns {
|
||||
if service.hasSeenDataColumnIndex(slot, proposerIndex, i) {
|
||||
seenCount++
|
||||
}
|
||||
}
|
||||
if seenCount < int(minimumCount) {
|
||||
t.Fatalf("Expected to see at least %d columns but saw %d", minimumCount, seenCount)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("semi-supernode reconstruction with random 64 columns", func(t *testing.T) {
|
||||
// Test reconstruction with 64 non-contiguous columns to simulate a real scenario.
|
||||
chainService := &mockChain.ChainService{}
|
||||
p2p := p2ptest.NewTestP2P(t)
|
||||
storage := filesystem.NewEphemeralDataColumnStorage(t)
|
||||
|
||||
service := NewService(
|
||||
ctx,
|
||||
WithP2P(p2p),
|
||||
WithDataColumnStorage(storage),
|
||||
WithChainService(chainService),
|
||||
WithOperationNotifier(chainService.OperationNotifier()),
|
||||
)
|
||||
|
||||
// Select every other column to get 64 non-contiguous columns.
|
||||
semiSupernodeColumns := make([]blocks.VerifiedRODataColumn, 0, minimumCount)
|
||||
for i := uint64(0); i < numberOfColumns && uint64(len(semiSupernodeColumns)) < minimumCount; i += 2 {
|
||||
semiSupernodeColumns = append(semiSupernodeColumns, verifiedRoDataColumns[i])
|
||||
}
|
||||
require.Equal(t, minimumCount, uint64(len(semiSupernodeColumns)))
|
||||
|
||||
err = service.receiveDataColumnSidecars(ctx, semiSupernodeColumns)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = storage.Save(semiSupernodeColumns)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Run the reconstruction.
|
||||
err = service.processDataColumnSidecarsFromReconstruction(ctx, semiSupernodeColumns[0])
|
||||
require.NoError(t, err)
|
||||
|
||||
// Verify we received the columns.
|
||||
if len(chainService.DataColumns) < int(minimumCount) {
|
||||
t.Fatalf("Expected at least %d columns but got %d", minimumCount, len(chainService.DataColumns))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -692,26 +692,28 @@ func (s *Service) dataColumnSubnetIndices(primitives.Slot) map[uint64]bool {
|
||||
}
|
||||
|
||||
// samplingSize computes the sampling size based on the samples per slot value,
|
||||
// the validators custody requirement, and whether the node is subscribed to all data subnets.
|
||||
// the validators custody requirement, and the custody group count.
|
||||
// The custody group count is the source of truth and already includes supernode/semi-supernode logic.
|
||||
// https://github.com/ethereum/consensus-specs/blob/master/specs/fulu/das-core.md#custody-sampling
|
||||
func (s *Service) samplingSize() (uint64, error) {
|
||||
cfg := params.BeaconConfig()
|
||||
|
||||
if flags.Get().SubscribeAllDataSubnets {
|
||||
return cfg.DataColumnSidecarSubnetCount, nil
|
||||
}
|
||||
|
||||
// Compute the validators custody requirement.
|
||||
validatorsCustodyRequirement, err := s.validatorsCustodyRequirement()
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "validators custody requirement")
|
||||
}
|
||||
|
||||
// Get custody group count - this is the source of truth and already reflects:
|
||||
// - Supernode mode: NUMBER_OF_CUSTODY_GROUPS
|
||||
// - Semi-supernode mode: half of NUMBER_OF_CUSTODY_GROUPS (or more if validators require)
|
||||
// - Regular mode: validator custody requirement
|
||||
custodyGroupCount, err := s.cfg.p2p.CustodyGroupCount(s.ctx)
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(err, "custody group count")
|
||||
}
|
||||
|
||||
// Sampling size should match custody to ensure we can serve what we advertise
|
||||
return max(cfg.SamplesPerSlot, validatorsCustodyRequirement, custodyGroupCount), nil
|
||||
}
|
||||
|
||||
|
||||
@@ -652,6 +652,140 @@ func TestIsDigestValid(t *testing.T) {
|
||||
assert.Equal(t, false, valid)
|
||||
}
|
||||
|
||||
func TestSamplingSize(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
ctx := context.Background()
|
||||
d := db.SetupDB(t)
|
||||
p2pService := p2ptest.NewTestP2P(t)
|
||||
|
||||
t.Run("regular node returns validator requirements", func(t *testing.T) {
|
||||
resetFlags := flags.Get()
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Disable all special modes
|
||||
gFlags := new(flags.GlobalFlags)
|
||||
gFlags.Supernode = false
|
||||
gFlags.SemiSupernode = false
|
||||
flags.Init(gFlags)
|
||||
|
||||
custodyCount := uint64(16)
|
||||
_, _, err := p2pService.UpdateCustodyInfo(0, custodyCount)
|
||||
require.NoError(t, err)
|
||||
|
||||
s := &Service{
|
||||
ctx: ctx,
|
||||
cfg: &config{
|
||||
beaconDB: d,
|
||||
p2p: p2pService,
|
||||
},
|
||||
}
|
||||
|
||||
size, err := s.samplingSize()
|
||||
require.NoError(t, err)
|
||||
// Should return max(SamplesPerSlot, validatorsCustodyRequirement, custodyGroupCount)
|
||||
// For this test, custodyGroupCount (16) should be the max
|
||||
expectedSize := max(cfg.SamplesPerSlot, custodyCount)
|
||||
require.Equal(t, expectedSize, size)
|
||||
})
|
||||
|
||||
t.Run("supernode mode returns all subnets", func(t *testing.T) {
|
||||
resetFlags := flags.Get()
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Set custody count to all groups (simulating what updateCustodyInfoInDB() does for supernode)
|
||||
_, _, err := p2pService.UpdateCustodyInfo(0, cfg.NumberOfCustodyGroups)
|
||||
require.NoError(t, err)
|
||||
|
||||
s := &Service{
|
||||
ctx: ctx,
|
||||
cfg: &config{
|
||||
beaconDB: d,
|
||||
p2p: p2pService,
|
||||
},
|
||||
}
|
||||
|
||||
size, err := s.samplingSize()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, cfg.DataColumnSidecarSubnetCount, size) // Should be 128 based on custody count
|
||||
})
|
||||
|
||||
t.Run("semi-supernode with low validator requirements returns 64", func(t *testing.T) {
|
||||
resetFlags := flags.Get()
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Set custody count to semi-supernode minimum (64)
|
||||
// This simulates what updateCustodyInfoInDB() does for semi-supernode with low validator count
|
||||
semiSupernodeCustody := cfg.DataColumnSidecarSubnetCount / 2
|
||||
_, _, err := p2pService.UpdateCustodyInfo(0, semiSupernodeCustody)
|
||||
require.NoError(t, err)
|
||||
|
||||
s := &Service{
|
||||
ctx: ctx,
|
||||
cfg: &config{
|
||||
beaconDB: d,
|
||||
p2p: p2pService,
|
||||
},
|
||||
}
|
||||
|
||||
size, err := s.samplingSize()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, semiSupernodeCustody, size) // Should be 64 based on custody count
|
||||
})
|
||||
|
||||
t.Run("semi-supernode with high validator requirements returns higher value", func(t *testing.T) {
|
||||
resetFlags := flags.Get()
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Set custody count to a high value (e.g., 100)
|
||||
// This simulates what updateCustodyInfoInDB() would set after determining
|
||||
// that validator requirements exceed the semi-supernode minimum
|
||||
highCustodyCount := uint64(100)
|
||||
_, _, err := p2pService.UpdateCustodyInfo(0, highCustodyCount)
|
||||
require.NoError(t, err)
|
||||
|
||||
s := &Service{
|
||||
ctx: ctx,
|
||||
cfg: &config{
|
||||
beaconDB: d,
|
||||
p2p: p2pService,
|
||||
},
|
||||
}
|
||||
|
||||
size, err := s.samplingSize()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, highCustodyCount, size) // Should return the higher custody count based on custody
|
||||
// Note: Warning is logged in updateCustodyInfoInDB(), not here
|
||||
})
|
||||
|
||||
t.Run("custody count is source of truth", func(t *testing.T) {
|
||||
resetFlags := flags.Get()
|
||||
defer flags.Init(resetFlags)
|
||||
|
||||
// Set custody count directly (simulating what updateCustodyInfoInDB() does)
|
||||
// For semi-supernode mode, this would be 64
|
||||
semiSupernodeCustody := cfg.DataColumnSidecarSubnetCount / 2
|
||||
_, _, err := p2pService.UpdateCustodyInfo(0, semiSupernodeCustody)
|
||||
require.NoError(t, err)
|
||||
|
||||
s := &Service{
|
||||
ctx: ctx,
|
||||
cfg: &config{
|
||||
beaconDB: d,
|
||||
p2p: p2pService,
|
||||
},
|
||||
}
|
||||
|
||||
// samplingSize() should use custody count regardless of flags
|
||||
size, err := s.samplingSize()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, semiSupernodeCustody, size) // Should be 64 based on custody count
|
||||
// Note: Downgrade prevention is handled in updateCustodyInfoInDB(), not here
|
||||
})
|
||||
}
|
||||
|
||||
// Create peer and register them to provided topics.
|
||||
func createPeer(t *testing.T, topics ...string) *p2ptest.TestP2P {
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
|
||||
2
changelog/james-prysm_semi-supernode.md
Normal file
2
changelog/james-prysm_semi-supernode.md
Normal file
@@ -0,0 +1,2 @@
|
||||
### Added
|
||||
- Added `--semi-supernode` flag to custody half of a super node's datacolumn requirements but allowing for reconstruction for blob retrieval
|
||||
@@ -36,5 +36,6 @@ go_test(
|
||||
deps = [
|
||||
"//testing/assert:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"@com_github_urfave_cli_v2//:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
@@ -333,11 +333,16 @@ var (
|
||||
Usage: "Specifies the retention period for the pruner service in terms of epochs. " +
|
||||
"If this value is less than MIN_EPOCHS_FOR_BLOCK_REQUESTS, it will be ignored.",
|
||||
}
|
||||
// SubscribeAllDataSubnets enables subscription to all data subnets.
|
||||
SubscribeAllDataSubnets = &cli.BoolFlag{
|
||||
// Supernode custodies all data.
|
||||
Supernode = &cli.BoolFlag{
|
||||
Name: "supernode",
|
||||
Aliases: []string{"subscribe-all-data-subnets"},
|
||||
Usage: "Enable subscription to all data subnets and store all blob columns, serving them over RPC. Required post-Fusaka for full blob reconstruction. This is effectively one-way: once enabled, the node keeps storing and serving all columns even if the flag is later unset.",
|
||||
Usage: "Custodies all data. Cannot be used with --semi-supernode.",
|
||||
}
|
||||
// SemiSupernode custodies just enough data to serve the blobs and blob sidecars beacon API.
|
||||
SemiSupernode = &cli.BoolFlag{
|
||||
Name: "semi-supernode",
|
||||
Usage: "Custodies just enough data to serve the blobs and blob sidecars beacon API. Cannot be used with --supernode.",
|
||||
}
|
||||
// BatchVerifierLimit sets the maximum number of signatures to batch verify at once.
|
||||
BatchVerifierLimit = &cli.IntFlag{
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package flags
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/cmd"
|
||||
"github.com/OffchainLabs/prysm/v7/config/features"
|
||||
"github.com/pkg/errors"
|
||||
@@ -13,7 +15,8 @@ const maxStateDiffExponents = 30
|
||||
// beacon node.
|
||||
type GlobalFlags struct {
|
||||
SubscribeToAllSubnets bool
|
||||
SubscribeAllDataSubnets bool
|
||||
Supernode bool
|
||||
SemiSupernode bool
|
||||
MinimumSyncPeers int
|
||||
MinimumPeersPerSubnet int
|
||||
MaxConcurrentDials int
|
||||
@@ -51,9 +54,22 @@ func ConfigureGlobalFlags(ctx *cli.Context) error {
|
||||
cfg.SubscribeToAllSubnets = true
|
||||
}
|
||||
|
||||
if ctx.Bool(SubscribeAllDataSubnets.Name) {
|
||||
log.Warning("Subscribing to all data subnets")
|
||||
cfg.SubscribeAllDataSubnets = true
|
||||
supernodeSet := ctx.Bool(Supernode.Name)
|
||||
semiSupernodeSet := ctx.Bool(SemiSupernode.Name)
|
||||
|
||||
// Ensure mutual exclusivity between supernode and semi-supernode modes
|
||||
if supernodeSet && semiSupernodeSet {
|
||||
return fmt.Errorf("cannot set both --%s and --%s flags; choose one mode", Supernode.Name, SemiSupernode.Name)
|
||||
}
|
||||
|
||||
if supernodeSet {
|
||||
log.Info("Operating in supernode mode")
|
||||
cfg.Supernode = true
|
||||
}
|
||||
|
||||
if semiSupernodeSet {
|
||||
log.Info("Operating in semi-supernode mode (custody just enough data to serve the blobs and blob sidecars beacon API)")
|
||||
cfg.SemiSupernode = true
|
||||
}
|
||||
|
||||
// State-diff-exponents
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
package flags
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/testing/require"
|
||||
"github.com/urfave/cli/v2"
|
||||
)
|
||||
|
||||
func TestValidateStateDiffExponents(t *testing.T) {
|
||||
@@ -37,3 +39,57 @@ func TestValidateStateDiffExponents(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigureGlobalFlags_SupernodeMutualExclusivity(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
supernodeSet bool
|
||||
semiSupernodeSet bool
|
||||
wantErr bool
|
||||
errMsg string
|
||||
}{
|
||||
{
|
||||
name: "both flags not set",
|
||||
supernodeSet: false,
|
||||
semiSupernodeSet: false,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "only supernode set",
|
||||
supernodeSet: true,
|
||||
semiSupernodeSet: false,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "only semi-supernode set",
|
||||
supernodeSet: false,
|
||||
semiSupernodeSet: true,
|
||||
wantErr: false,
|
||||
},
|
||||
{
|
||||
name: "both flags set - should error",
|
||||
supernodeSet: true,
|
||||
semiSupernodeSet: true,
|
||||
wantErr: true,
|
||||
errMsg: "cannot set both --supernode and --semi-supernode",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// Create a flag set and app for testing
|
||||
app := cli.NewApp()
|
||||
set := flag.NewFlagSet("test", 0)
|
||||
set.Bool(Supernode.Name, tt.supernodeSet, "")
|
||||
set.Bool(SemiSupernode.Name, tt.semiSupernodeSet, "")
|
||||
ctx := cli.NewContext(app, set, nil)
|
||||
|
||||
err := ConfigureGlobalFlags(ctx)
|
||||
if tt.wantErr {
|
||||
require.ErrorContains(t, tt.errMsg, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,7 +65,8 @@ var appFlags = []cli.Flag{
|
||||
flags.SlotsPerArchivedPoint,
|
||||
flags.DisableDebugRPCEndpoints,
|
||||
flags.SubscribeToAllSubnets,
|
||||
flags.SubscribeAllDataSubnets,
|
||||
flags.Supernode,
|
||||
flags.SemiSupernode,
|
||||
flags.HistoricalSlasherNode,
|
||||
flags.ChainID,
|
||||
flags.NetworkID,
|
||||
|
||||
@@ -107,7 +107,8 @@ var appHelpFlagGroups = []flagGroup{
|
||||
flags.MinPeersPerSubnet,
|
||||
flags.MinSyncPeers,
|
||||
flags.SubscribeToAllSubnets,
|
||||
flags.SubscribeAllDataSubnets,
|
||||
flags.Supernode,
|
||||
flags.SemiSupernode,
|
||||
},
|
||||
},
|
||||
{ // Flags relevant to storing data on disk and configuring the beacon chain database.
|
||||
|
||||
Reference in New Issue
Block a user