diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 348bef3787..d4951d6df1 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -194,6 +194,7 @@ go_test( "slot_aware_cache_test.go", "subscriber_beacon_aggregate_proof_test.go", "subscriber_beacon_blocks_test.go", + "subscriber_data_column_sidecar_test.go", "subscriber_test.go", "subscription_topic_handler_test.go", "sync_fuzz_test.go", diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 8c2cc8d74c..c18db509f4 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -225,11 +225,12 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) { // New gossip topic in Fulu. if params.BeaconConfig().FuluForkEpoch <= epoch { s.subscribeWithParameters(subscribeParameters{ - topicFormat: p2p.DataColumnSubnetTopicFormat, - validate: s.validateDataColumn, - handle: s.dataColumnSubscriber, - digest: digest, - getSubnetsToJoin: s.dataColumnSubnetIndices, + topicFormat: p2p.DataColumnSubnetTopicFormat, + validate: s.validateDataColumn, + handle: s.dataColumnSubscriber, + digest: digest, + getSubnetsToJoin: s.dataColumnSubnetIndices, + getSubnetsRequiringPeers: s.allDataColumnSubnets, }) } } diff --git a/beacon-chain/sync/subscriber_data_column_sidecar.go b/beacon-chain/sync/subscriber_data_column_sidecar.go index c983a95685..49d4a9fc82 100644 --- a/beacon-chain/sync/subscriber_data_column_sidecar.go +++ b/beacon-chain/sync/subscriber_data_column_sidecar.go @@ -6,7 +6,9 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed" opfeed "github.com/OffchainLabs/prysm/v6/beacon-chain/core/feed/operation" + "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/pkg/errors" "google.golang.org/protobuf/proto" ) @@ -52,3 +54,31 @@ func (s *Service) receiveDataColumnSidecar(ctx context.Context, sidecar blocks.V return nil } + +// allDataColumnSubnets returns the data column subnets for which we need to find peers +// but don't need to subscribe to. This is used to ensure we have peers available in all subnets +// when we are serving validators. When a validator proposes a block, they need to publish data +// column sidecars on all subnets. This method returns a nil map when there is no validators custody +// requirement. +func (s *Service) allDataColumnSubnets(_ primitives.Slot) map[uint64]bool { + validatorsCustodyRequirement, err := s.validatorsCustodyRequirement() + if err != nil { + log.WithError(err).Error("Could not retrieve validators custody requirement") + return nil + } + + // If no validators are tracked, return early + if validatorsCustodyRequirement == 0 { + return nil + } + + // When we have validators with custody requirements, we need peers in all subnets + // because validators need to be able to publish data columns to all subnets when proposing + dataColumnSidecarSubnetCount := params.BeaconConfig().DataColumnSidecarSubnetCount + allSubnets := make(map[uint64]bool, dataColumnSidecarSubnetCount) + for i := range dataColumnSidecarSubnetCount { + allSubnets[i] = true + } + + return allSubnets +} diff --git a/beacon-chain/sync/subscriber_data_column_sidecar_test.go b/beacon-chain/sync/subscriber_data_column_sidecar_test.go new file mode 100644 index 0000000000..15526e1f2d --- /dev/null +++ b/beacon-chain/sync/subscriber_data_column_sidecar_test.go @@ -0,0 +1,65 @@ +package sync + +import ( + "testing" + + "github.com/OffchainLabs/prysm/v6/beacon-chain/cache" + dbtest "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing" + doublylinkedtree "github.com/OffchainLabs/prysm/v6/beacon-chain/forkchoice/doubly-linked-tree" + "github.com/OffchainLabs/prysm/v6/beacon-chain/state/stategen" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" + "github.com/OffchainLabs/prysm/v6/testing/assert" + "github.com/OffchainLabs/prysm/v6/testing/require" + "github.com/OffchainLabs/prysm/v6/testing/util" +) + +func TestAllDataColumnSubnets(t *testing.T) { + t.Run("returns nil when no validators tracked", func(t *testing.T) { + // Service with no tracked validators + svc := &Service{ + ctx: t.Context(), + trackedValidatorsCache: cache.NewTrackedValidatorsCache(), + } + + result := svc.allDataColumnSubnets(primitives.Slot(0)) + assert.Equal(t, true, len(result) == 0, "Expected nil or empty map when no validators are tracked") + }) + + t.Run("returns all subnets logic test", func(t *testing.T) { + params.SetupTestConfigCleanup(t) + ctx := t.Context() + + db := dbtest.SetupDB(t) + + // Create and save genesis state + genesisState, _ := util.DeterministicGenesisState(t, 64) + require.NoError(t, db.SaveGenesisData(ctx, genesisState)) + + // Create stategen and initialize with genesis state + stateGen := stategen.New(db, doublylinkedtree.New()) + _, err := stateGen.Resume(ctx, genesisState) + require.NoError(t, err) + + // At least one tracked validator. + tvc := cache.NewTrackedValidatorsCache() + tvc.Set(cache.TrackedValidator{Active: true, Index: 1}) + + svc := &Service{ + ctx: ctx, + trackedValidatorsCache: tvc, + cfg: &config{ + stateGen: stateGen, + beaconDB: db, + }, + } + + dataColumnSidecarSubnetCount := params.BeaconConfig().DataColumnSidecarSubnetCount + result := svc.allDataColumnSubnets(0) + assert.Equal(t, dataColumnSidecarSubnetCount, uint64(len(result))) + + for i := range dataColumnSidecarSubnetCount { + assert.Equal(t, true, result[i]) + } + }) +} diff --git a/changelog/pvl-peerdas-peer-fanout.md b/changelog/pvl-peerdas-peer-fanout.md new file mode 100644 index 0000000000..a4e03fdbb1 --- /dev/null +++ b/changelog/pvl-peerdas-peer-fanout.md @@ -0,0 +1,3 @@ +### Added + +- Configured the beacon node to seek peers when we have validator custody requirements. If one or more validators are connected to the beacon node, then the beacon node should seek a diverse set of peers such that broadcasting to all data column subnets for a block proposal is more efficient.