PeerDAS: find data column peers when tracking validators (#15654)

* Subscription: Get fanout peers in all data column subnets when at least one validator is connected

* Apply suggestion from @nalepae

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>

* Apply suggestion from @nalepae

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>

* Apply suggestion from @nalepae

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>

* Apply suggestion from @nalepae

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>

* Updated test structure to @nalepae suggestion

---------

Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
This commit is contained in:
Preston Van Loon
2025-09-08 19:31:15 -05:00
committed by GitHub
parent d335a52c49
commit 7191a5bcdf
5 changed files with 105 additions and 5 deletions

View File

@@ -194,6 +194,7 @@ go_test(
"slot_aware_cache_test.go",
"subscriber_beacon_aggregate_proof_test.go",
"subscriber_beacon_blocks_test.go",
"subscriber_data_column_sidecar_test.go",
"subscriber_test.go",
"subscription_topic_handler_test.go",
"sync_fuzz_test.go",

View File

@@ -225,11 +225,12 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
// New gossip topic in Fulu.
if params.BeaconConfig().FuluForkEpoch <= epoch {
s.subscribeWithParameters(subscribeParameters{
topicFormat: p2p.DataColumnSubnetTopicFormat,
validate: s.validateDataColumn,
handle: s.dataColumnSubscriber,
digest: digest,
getSubnetsToJoin: s.dataColumnSubnetIndices,
topicFormat: p2p.DataColumnSubnetTopicFormat,
validate: s.validateDataColumn,
handle: s.dataColumnSubscriber,
digest: digest,
getSubnetsToJoin: s.dataColumnSubnetIndices,
getSubnetsRequiringPeers: s.allDataColumnSubnets,
})
}
}

View File

@@ -6,7 +6,9 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed"
opfeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
)
@@ -52,3 +54,31 @@ func (s *Service) receiveDataColumnSidecar(ctx context.Context, sidecar blocks.V
return nil
}
// allDataColumnSubnets returns the data column subnets for which we need to find peers
// but don't need to subscribe to. This is used to ensure we have peers available in all subnets
// when we are serving validators. When a validator proposes a block, they need to publish data
// column sidecars on all subnets. This method returns a nil map when there is no validators custody
// requirement.
func (s *Service) allDataColumnSubnets(_ primitives.Slot) map[uint64]bool {
validatorsCustodyRequirement, err := s.validatorsCustodyRequirement()
if err != nil {
log.WithError(err).Error("Could not retrieve validators custody requirement")
return nil
}
// If no validators are tracked, return early
if validatorsCustodyRequirement == 0 {
return nil
}
// When we have validators with custody requirements, we need peers in all subnets
// because validators need to be able to publish data columns to all subnets when proposing
dataColumnSidecarSubnetCount := params.BeaconConfig().DataColumnSidecarSubnetCount
allSubnets := make(map[uint64]bool, dataColumnSidecarSubnetCount)
for i := range dataColumnSidecarSubnetCount {
allSubnets[i] = true
}
return allSubnets
}

View File

@@ -0,0 +1,65 @@
package sync
import (
"testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
dbtest "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
doublylinkedtree "github.com/OffchainLabs/prysm/v6/beacon-chain/forkchoice/doubly-linked-tree"
"github.com/OffchainLabs/prysm/v6/beacon-chain/state/stategen"
"github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
)
func TestAllDataColumnSubnets(t *testing.T) {
t.Run("returns nil when no validators tracked", func(t *testing.T) {
// Service with no tracked validators
svc := &Service{
ctx: t.Context(),
trackedValidatorsCache: cache.NewTrackedValidatorsCache(),
}
result := svc.allDataColumnSubnets(primitives.Slot(0))
assert.Equal(t, true, len(result) == 0, "Expected nil or empty map when no validators are tracked")
})
t.Run("returns all subnets logic test", func(t *testing.T) {
params.SetupTestConfigCleanup(t)
ctx := t.Context()
db := dbtest.SetupDB(t)
// Create and save genesis state
genesisState, _ := util.DeterministicGenesisState(t, 64)
require.NoError(t, db.SaveGenesisData(ctx, genesisState))
// Create stategen and initialize with genesis state
stateGen := stategen.New(db, doublylinkedtree.New())
_, err := stateGen.Resume(ctx, genesisState)
require.NoError(t, err)
// At least one tracked validator.
tvc := cache.NewTrackedValidatorsCache()
tvc.Set(cache.TrackedValidator{Active: true, Index: 1})
svc := &Service{
ctx: ctx,
trackedValidatorsCache: tvc,
cfg: &config{
stateGen: stateGen,
beaconDB: db,
},
}
dataColumnSidecarSubnetCount := params.BeaconConfig().DataColumnSidecarSubnetCount
result := svc.allDataColumnSubnets(0)
assert.Equal(t, dataColumnSidecarSubnetCount, uint64(len(result)))
for i := range dataColumnSidecarSubnetCount {
assert.Equal(t, true, result[i])
}
})
}

View File

@@ -0,0 +1,3 @@
### Added
- Configured the beacon node to seek peers when we have validator custody requirements. If one or more validators are connected to the beacon node, then the beacon node should seek a diverse set of peers such that broadcasting to all data column subnets for a block proposal is more efficient.