diff --git a/beacon-chain/cache/subnet_ids.go b/beacon-chain/cache/subnet_ids.go index e47bf8288f..20273ade01 100644 --- a/beacon-chain/cache/subnet_ids.go +++ b/beacon-chain/cache/subnet_ids.go @@ -24,6 +24,8 @@ type subnetIDs struct { // SubnetIDs for attester and aggregator. var SubnetIDs = newSubnetIDs() +var subnetKey = "persistent-subnets" + func newSubnetIDs() *subnetIDs { // Given a node can calculate committee assignments of current epoch and next epoch. // Max size is set to 2 epoch length. @@ -91,11 +93,11 @@ func (s *subnetIDs) GetAggregatorSubnetIDs(slot primitives.Slot) []uint64 { // GetPersistentSubnets retrieves the persistent subnet and expiration time of that validator's // subscription. -func (s *subnetIDs) GetPersistentSubnets(pubkey []byte) ([]uint64, bool, time.Time) { +func (s *subnetIDs) GetPersistentSubnets() ([]uint64, bool, time.Time) { s.subnetsLock.RLock() defer s.subnetsLock.RUnlock() - id, duration, ok := s.persistentSubnets.GetWithExpiration(string(pubkey)) + id, duration, ok := s.persistentSubnets.GetWithExpiration(subnetKey) if !ok { return []uint64{}, ok, time.Time{} } @@ -122,11 +124,11 @@ func (s *subnetIDs) GetAllSubnets() []uint64 { // AddPersistentCommittee adds the relevant committee for that particular validator along with its // expiration period. -func (s *subnetIDs) AddPersistentCommittee(pubkey []byte, comIndex []uint64, duration time.Duration) { +func (s *subnetIDs) AddPersistentCommittee(comIndex []uint64, duration time.Duration) { s.subnetsLock.Lock() defer s.subnetsLock.Unlock() - s.persistentSubnets.Set(string(pubkey), comIndex, duration) + s.persistentSubnets.Set(subnetKey, comIndex, duration) } // EmptyAllCaches empties out all the related caches and flushes any stored diff --git a/beacon-chain/cache/subnet_ids_test.go b/beacon-chain/cache/subnet_ids_test.go index c94d8ffa52..67652c3d81 100644 --- a/beacon-chain/cache/subnet_ids_test.go +++ b/beacon-chain/cache/subnet_ids_test.go @@ -3,10 +3,8 @@ package cache import ( "testing" - fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/testing/assert" - "github.com/prysmaticlabs/prysm/v4/testing/require" ) func TestSubnetIDsCache_RoundTrip(t *testing.T) { @@ -46,21 +44,8 @@ func TestSubnetIDsCache_RoundTrip(t *testing.T) { func TestSubnetIDsCache_PersistentCommitteeRoundtrip(t *testing.T) { c := newSubnetIDs() - for i := 0; i < 20; i++ { - pubkey := [fieldparams.BLSPubkeyLength]byte{byte(i)} - c.AddPersistentCommittee(pubkey[:], []uint64{uint64(i)}, 0) - } + c.AddPersistentCommittee([]uint64{0, 1, 2, 7, 8}, 0) - for i := uint64(0); i < 20; i++ { - pubkey := [fieldparams.BLSPubkeyLength]byte{byte(i)} - - idxs, ok, _ := c.GetPersistentSubnets(pubkey[:]) - if !ok { - t.Errorf("Couldn't find entry in cache for pubkey %#x", pubkey) - continue - } - require.Equal(t, i, idxs[0]) - } coms := c.GetAllSubnets() - assert.Equal(t, 20, len(coms)) + assert.Equal(t, 5, len(coms)) } diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 6ed282db9e..2a0e6ea45e 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -77,6 +77,7 @@ go_library( "@com_github_ethereum_go_ethereum//p2p/discover:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", + "@com_github_holiman_uint256//:go_default_library", "@com_github_kr_pretty//:go_default_library", "@com_github_libp2p_go_libp2p//:go_default_library", "@com_github_libp2p_go_libp2p//config:go_default_library", @@ -164,6 +165,7 @@ go_test( "//testing/require:go_default_library", "//testing/util:go_default_library", "//time:go_default_library", + "//time/slots:go_default_library", "@com_github_ethereum_go_ethereum//crypto:go_default_library", "@com_github_ethereum_go_ethereum//p2p/discover:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index a56806e438..fa05250567 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -42,6 +42,12 @@ func (s *Service) RefreshENR() { if s.dv5Listener == nil || !s.isInitialized() { return } + currEpoch := slots.ToEpoch(slots.CurrentSlot(uint64(s.genesisTime.Unix()))) + if err := initializePersistentSubnets(s.dv5Listener.LocalNode().ID(), currEpoch); err != nil { + log.WithError(err).Error("Could not initialize persistent subnets") + return + } + bitV := bitfield.NewBitvector64() committees := cache.SubnetIDs.GetAllSubnets() for _, idx := range committees { @@ -52,8 +58,8 @@ func (s *Service) RefreshENR() { log.WithError(err).Error("Could not retrieve att bitfield") return } + // Compare current epoch with our fork epochs - currEpoch := slots.ToEpoch(slots.CurrentSlot(uint64(s.genesisTime.Unix()))) altairForkEpoch := params.BeaconConfig().AltairForkEpoch switch { // Altair Behaviour diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index 9a2701884b..eafa409cf7 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -36,6 +36,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/runtime/version" "github.com/prysmaticlabs/prysm/v4/testing/assert" "github.com/prysmaticlabs/prysm/v4/testing/require" + "github.com/prysmaticlabs/prysm/v4/time/slots" logTest "github.com/sirupsen/logrus/hooks/test" ) @@ -364,7 +365,15 @@ func TestRefreshENR_ForkBoundaries(t *testing.T) { return s }, postValidation: func(t *testing.T, s *Service) { - assert.DeepEqual(t, bitfield.NewBitvector64(), s.metaData.AttnetsBitfield()) + currEpoch := slots.ToEpoch(slots.CurrentSlot(uint64(s.genesisTime.Unix()))) + subs, err := computeSubscribedSubnets(s.dv5Listener.LocalNode().ID(), currEpoch) + assert.NoError(t, err) + + bitV := bitfield.NewBitvector64() + for _, idx := range subs { + bitV.SetBitAt(idx, true) + } + assert.DeepEqual(t, bitV, s.metaData.AttnetsBitfield()) }, }, { @@ -382,7 +391,7 @@ func TestRefreshENR_ForkBoundaries(t *testing.T) { s.dv5Listener = listener s.metaData = wrapper.WrappedMetadataV0(new(ethpb.MetaDataV0)) s.updateSubnetRecordWithMetadata([]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}) - cache.SubnetIDs.AddPersistentCommittee([]byte{'A'}, []uint64{1, 2, 3, 23}, 0) + cache.SubnetIDs.AddPersistentCommittee([]uint64{1, 2, 3, 23}, 0) return s }, postValidation: func(t *testing.T, s *Service) { @@ -411,7 +420,7 @@ func TestRefreshENR_ForkBoundaries(t *testing.T) { s.dv5Listener = listener s.metaData = wrapper.WrappedMetadataV0(new(ethpb.MetaDataV0)) s.updateSubnetRecordWithMetadata([]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}) - cache.SubnetIDs.AddPersistentCommittee([]byte{'A'}, []uint64{1, 2, 3, 23}, 0) + cache.SubnetIDs.AddPersistentCommittee([]uint64{1, 2, 3, 23}, 0) return s }, postValidation: func(t *testing.T, s *Service) { @@ -447,7 +456,15 @@ func TestRefreshENR_ForkBoundaries(t *testing.T) { postValidation: func(t *testing.T, s *Service) { assert.Equal(t, version.Altair, s.metaData.Version()) assert.DeepEqual(t, bitfield.Bitvector4{0x00}, s.metaData.MetadataObjV1().Syncnets) - assert.DeepEqual(t, bitfield.Bitvector64{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}, s.metaData.AttnetsBitfield()) + currEpoch := slots.ToEpoch(slots.CurrentSlot(uint64(s.genesisTime.Unix()))) + subs, err := computeSubscribedSubnets(s.dv5Listener.LocalNode().ID(), currEpoch) + assert.NoError(t, err) + + bitV := bitfield.NewBitvector64() + for _, idx := range subs { + bitV.SetBitAt(idx, true) + } + assert.DeepEqual(t, bitV, s.metaData.AttnetsBitfield()) }, }, { @@ -472,7 +489,7 @@ func TestRefreshENR_ForkBoundaries(t *testing.T) { s.dv5Listener = listener s.metaData = wrapper.WrappedMetadataV0(new(ethpb.MetaDataV0)) s.updateSubnetRecordWithMetadata([]byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}) - cache.SubnetIDs.AddPersistentCommittee([]byte{'A'}, []uint64{1, 2, 3, 23}, 0) + cache.SubnetIDs.AddPersistentCommittee([]uint64{1, 2, 3, 23}, 0) cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte{'A'}, 0, []uint64{0, 1}, 0) return s }, diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index 1cba687dd4..66b9252b95 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -4,13 +4,20 @@ import ( "context" "strings" "sync" + "time" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" + "github.com/holiman/uint256" "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/cache" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/consensus-types/wrapper" + "github.com/prysmaticlabs/prysm/v4/crypto/hash" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" mathutil "github.com/prysmaticlabs/prysm/v4/math" "go.opencensus.io/trace" @@ -178,6 +185,82 @@ func (s *Service) updateSubnetRecordWithMetadataV2(bitVAtt bitfield.Bitvector64, }) } +func initializePersistentSubnets(id enode.ID, epoch primitives.Epoch) error { + _, ok, expTime := cache.SubnetIDs.GetPersistentSubnets() + if ok && expTime.After(time.Now()) { + return nil + } + subs, err := computeSubscribedSubnets(id, epoch) + if err != nil { + return err + } + newExpTime := computeSubscriptionExpirationTime(id, epoch) + cache.SubnetIDs.AddPersistentCommittee(subs, newExpTime) + return nil +} + +// Spec pseudocode definition: +// +// def compute_subscribed_subnets(node_id: NodeID, epoch: Epoch) -> Sequence[SubnetID]: +// +// return [compute_subscribed_subnet(node_id, epoch, index) for index in range(SUBNETS_PER_NODE)] +func computeSubscribedSubnets(nodeID enode.ID, epoch primitives.Epoch) ([]uint64, error) { + subs := []uint64{} + for i := uint64(0); i < params.BeaconConfig().SubnetsPerNode; i++ { + sub, err := computeSubscribedSubnet(nodeID, epoch, i) + if err != nil { + return nil, err + } + subs = append(subs, sub) + } + return subs, nil +} + +// Spec pseudocode definition: +// +// def compute_subscribed_subnet(node_id: NodeID, epoch: Epoch, index: int) -> SubnetID: +// +// node_id_prefix = node_id >> (NODE_ID_BITS - ATTESTATION_SUBNET_PREFIX_BITS) +// node_offset = node_id % EPOCHS_PER_SUBNET_SUBSCRIPTION +// permutation_seed = hash(uint_to_bytes(uint64((epoch + node_offset) // EPOCHS_PER_SUBNET_SUBSCRIPTION))) +// permutated_prefix = compute_shuffled_index( +// node_id_prefix, +// 1 << ATTESTATION_SUBNET_PREFIX_BITS, +// permutation_seed, +// ) +// return SubnetID((permutated_prefix + index) % ATTESTATION_SUBNET_COUNT) +func computeSubscribedSubnet(nodeID enode.ID, epoch primitives.Epoch, index uint64) (uint64, error) { + nodeOffset, nodeIdPrefix := computeOffsetAndPrefix(nodeID) + seedInput := (nodeOffset + uint64(epoch)) / params.BeaconConfig().EpochsPerSubnetSubscription + permSeed := hash.Hash(bytesutil.Bytes8(seedInput)) + permutatedPrefix, err := helpers.ComputeShuffledIndex(primitives.ValidatorIndex(nodeIdPrefix), 1<