From b00aaef2027c6eac382ac738acbe564eb319bc15 Mon Sep 17 00:00:00 2001 From: Jun Song <87601811+syjn99@users.noreply.github.com> Date: Thu, 7 Aug 2025 05:18:33 +0900 Subject: [PATCH] Persist metadata sequence number using Beacon DB (#15554) * Add entry for sequence number in chain-metadata bucket & Basic getter/setter * Mark p2p-metadata flag as deprecated * Fix metaDataFromConfig: use DB instead to get seqnum * Save sequence number after updating the metadata * Fix beacon-chain/p2p unit tests: add DB in config * Add changelog * Add ReadOnlyDatabaseWithSeqNum * Code suggestion from Manu * Remove seqnum getter at interface --------- Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com> --- beacon-chain/db/alias.go | 5 +++ beacon-chain/db/iface/interface.go | 15 +++++++ beacon-chain/db/kv/BUILD.bazel | 2 + beacon-chain/db/kv/error.go | 3 ++ beacon-chain/db/kv/p2p.go | 42 ++++++++++++++++++ beacon-chain/db/kv/p2p_test.go | 33 ++++++++++++++ beacon-chain/db/kv/schema.go | 1 + beacon-chain/node/node.go | 1 - beacon-chain/p2p/BUILD.bazel | 1 + beacon-chain/p2p/broadcaster_test.go | 3 +- beacon-chain/p2p/config.go | 3 +- beacon-chain/p2p/discovery.go | 15 +++++-- beacon-chain/p2p/discovery_test.go | 5 ++- beacon-chain/p2p/fork_test.go | 10 ++++- beacon-chain/p2p/pubsub_filter_test.go | 3 +- beacon-chain/p2p/pubsub_test.go | 2 + beacon-chain/p2p/service.go | 2 +- beacon-chain/p2p/service_test.go | 12 ++++-- beacon-chain/p2p/subnets.go | 35 +++++++++++++-- beacon-chain/p2p/subnets_test.go | 4 ++ beacon-chain/p2p/utils.go | 60 +++++++++----------------- beacon-chain/p2p/utils_test.go | 26 +++++++++++ changelog/syjn99_persist-p2p-seqnum.md | 7 +++ cmd/beacon-chain/main.go | 1 - cmd/beacon-chain/usage.go | 1 - cmd/flags.go | 6 --- config/features/deprecated_flags.go | 6 +++ 27 files changed, 239 insertions(+), 65 deletions(-) create mode 100644 beacon-chain/db/kv/p2p.go create mode 100644 beacon-chain/db/kv/p2p_test.go create mode 100644 changelog/syjn99_persist-p2p-seqnum.md diff --git a/beacon-chain/db/alias.go b/beacon-chain/db/alias.go index 6263a2b991..4bc0351274 100644 --- a/beacon-chain/db/alias.go +++ b/beacon-chain/db/alias.go @@ -10,6 +10,11 @@ type ReadOnlyDatabase = iface.ReadOnlyDatabase // about head info. For head info, use github.com/prysmaticlabs/prysm/blockchain.HeadFetcher. type NoHeadAccessDatabase = iface.NoHeadAccessDatabase +// ReadOnlyDatabaseWithSeqNum exposes Prysm's Ethereum data backend for read access only, no information about +// head info, but with read/write access to the p2p metadata sequence number. +// This is used for the p2p service. +type ReadOnlyDatabaseWithSeqNum = iface.ReadOnlyDatabaseWithSeqNum + // HeadAccessDatabase exposes Prysm's Ethereum backend for read/write access with information about // chain head information. This interface should be used sparingly as the HeadFetcher is the source // of truth around chain head information while this interface serves as persistent storage for the diff --git a/beacon-chain/db/iface/interface.go b/beacon-chain/db/iface/interface.go index 435ecad614..f48253c440 100644 --- a/beacon-chain/db/iface/interface.go +++ b/beacon-chain/db/iface/interface.go @@ -64,6 +64,18 @@ type ReadOnlyDatabase interface { // Origin checkpoint sync support OriginCheckpointBlockRoot(ctx context.Context) ([32]byte, error) BackfillStatus(context.Context) (*dbval.BackfillStatus, error) + + // P2P Metadata operations. + MetadataSeqNum(ctx context.Context) (uint64, error) +} + +// ReadOnlyDatabaseWithSeqNum defines a struct which has read access to database methods +// and also has read/write access to the p2p metadata sequence number. +// Only used for the p2p service. +type ReadOnlyDatabaseWithSeqNum interface { + ReadOnlyDatabase + + SaveMetadataSeqNum(ctx context.Context, seqNum uint64) error } // NoHeadAccessDatabase defines a struct without access to chain head data. @@ -106,6 +118,9 @@ type NoHeadAccessDatabase interface { // Custody operations. UpdateSubscribedToAllDataSubnets(ctx context.Context, subscribed bool) (bool, error) UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot primitives.Slot, custodyGroupCount uint64) (primitives.Slot, uint64, error) + + // P2P Metadata operations. + SaveMetadataSeqNum(ctx context.Context, seqNum uint64) error } // HeadAccessDatabase defines a struct with access to reading chain head data. diff --git a/beacon-chain/db/kv/BUILD.bazel b/beacon-chain/db/kv/BUILD.bazel index febdaa3ccb..aa3db3caaa 100644 --- a/beacon-chain/db/kv/BUILD.bazel +++ b/beacon-chain/db/kv/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "migration_block_slot_index.go", "migration_finalized_parent.go", "migration_state_validators.go", + "p2p.go", "schema.go", "state.go", "state_summary.go", @@ -96,6 +97,7 @@ go_test( "migration_archived_index_test.go", "migration_block_slot_index_test.go", "migration_state_validators_test.go", + "p2p_test.go", "state_summary_test.go", "state_test.go", "utils_test.go", diff --git a/beacon-chain/db/kv/error.go b/beacon-chain/db/kv/error.go index 161378efd9..1233f0e376 100644 --- a/beacon-chain/db/kv/error.go +++ b/beacon-chain/db/kv/error.go @@ -19,6 +19,9 @@ var ErrNotFoundGenesisBlockRoot = errors.Wrap(ErrNotFound, "OriginGenesisRoot") // ErrNotFoundFeeRecipient is a not found error specifically for the fee recipient getter var ErrNotFoundFeeRecipient = errors.Wrap(ErrNotFound, "fee recipient") +// ErrNotFoundMetadataSeqNum is a not found error specifically for the metadata sequence number getter +var ErrNotFoundMetadataSeqNum = errors.Wrap(ErrNotFound, "metadata sequence number") + var errEmptyBlockSlice = errors.New("[]blocks.ROBlock is empty") var errIncorrectBlockParent = errors.New("unexpected missing or forked blocks in a []ROBlock") var errFinalizedChildNotFound = errors.New("unable to find finalized root descending from backfill batch") diff --git a/beacon-chain/db/kv/p2p.go b/beacon-chain/db/kv/p2p.go new file mode 100644 index 0000000000..1748eda100 --- /dev/null +++ b/beacon-chain/db/kv/p2p.go @@ -0,0 +1,42 @@ +package kv + +import ( + "context" + + "github.com/OffchainLabs/prysm/v6/encoding/bytesutil" + "github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace" + bolt "go.etcd.io/bbolt" +) + +// MetadataSeqNum retrieves the p2p metadata sequence number from the database. +// It returns 0 and ErrNotFoundMetadataSeqNum if the key does not exist. +func (s *Store) MetadataSeqNum(ctx context.Context) (uint64, error) { + _, span := trace.StartSpan(ctx, "BeaconDB.MetadataSeqNum") + defer span.End() + + var seqNum uint64 + err := s.db.View(func(tx *bolt.Tx) error { + bkt := tx.Bucket(chainMetadataBucket) + val := bkt.Get(metadataSequenceNumberKey) + if val == nil { + return ErrNotFoundMetadataSeqNum + } + + seqNum = bytesutil.BytesToUint64BigEndian(val) + return nil + }) + + return seqNum, err +} + +// SaveMetadataSeqNum saves the p2p metadata sequence number to the database. +func (s *Store) SaveMetadataSeqNum(ctx context.Context, seqNum uint64) error { + _, span := trace.StartSpan(ctx, "BeaconDB.SaveMetadataSeqNum") + defer span.End() + + return s.db.Update(func(tx *bolt.Tx) error { + bkt := tx.Bucket(chainMetadataBucket) + val := bytesutil.Uint64ToBytesBigEndian(seqNum) + return bkt.Put(metadataSequenceNumberKey, val) + }) +} diff --git a/beacon-chain/db/kv/p2p_test.go b/beacon-chain/db/kv/p2p_test.go new file mode 100644 index 0000000000..435141b8fb --- /dev/null +++ b/beacon-chain/db/kv/p2p_test.go @@ -0,0 +1,33 @@ +package kv + +import ( + "testing" + + "github.com/OffchainLabs/prysm/v6/testing/assert" + "github.com/OffchainLabs/prysm/v6/testing/require" +) + +func TestStore_MetadataSeqNum(t *testing.T) { + ctx := t.Context() + db := setupDB(t) + + seqNum, err := db.MetadataSeqNum(ctx) + require.ErrorIs(t, err, ErrNotFoundMetadataSeqNum) + assert.Equal(t, uint64(0), seqNum) + + initialSeqNum := uint64(42) + err = db.SaveMetadataSeqNum(ctx, initialSeqNum) + require.NoError(t, err) + + retrievedSeqNum, err := db.MetadataSeqNum(ctx) + require.NoError(t, err) + assert.Equal(t, initialSeqNum, retrievedSeqNum) + + updatedSeqNum := uint64(43) + err = db.SaveMetadataSeqNum(ctx, updatedSeqNum) + require.NoError(t, err) + + retrievedSeqNum, err = db.MetadataSeqNum(ctx) + require.NoError(t, err) + assert.Equal(t, updatedSeqNum, retrievedSeqNum) +} diff --git a/beacon-chain/db/kv/schema.go b/beacon-chain/db/kv/schema.go index ab267937c9..111c750060 100644 --- a/beacon-chain/db/kv/schema.go +++ b/beacon-chain/db/kv/schema.go @@ -42,6 +42,7 @@ var ( finalizedCheckpointKey = []byte("finalized-checkpoint") powchainDataKey = []byte("powchain-data") lastValidatedCheckpointKey = []byte("last-validated-checkpoint") + metadataSequenceNumberKey = []byte("metadata-seq-number") // Below keys are used to identify objects are to be fork compatible. // Objects that are only compatible with specific forks should be prefixed with such keys. diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 75380b3af9..de490cb94e 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -702,7 +702,6 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error { HostDNS: cliCtx.String(cmd.P2PHostDNS.Name), PrivateKey: cliCtx.String(cmd.P2PPrivKey.Name), StaticPeerID: cliCtx.Bool(cmd.P2PStaticID.Name), - MetaDataDir: cliCtx.String(cmd.P2PMetadata.Name), QUICPort: cliCtx.Uint(cmd.P2PQUICPort.Name), TCPPort: cliCtx.Uint(cmd.P2PTCPPort.Name), UDPPort: cliCtx.Uint(cmd.P2PUDPPort.Name), diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 9650417b28..2f9c23e221 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -49,6 +49,7 @@ go_library( "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/core/time:go_default_library", "//beacon-chain/db:go_default_library", + "//beacon-chain/db/kv:go_default_library", "//beacon-chain/p2p/encoder:go_default_library", "//beacon-chain/p2p/peers:go_default_library", "//beacon-chain/p2p/peers/peerdata:go_default_library", diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go index 219436579c..7106466ff1 100644 --- a/beacon-chain/p2p/broadcaster_test.go +++ b/beacon-chain/p2p/broadcaster_test.go @@ -265,7 +265,8 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) { s.metaData = wrapper.WrappedMetadataV0(new(ethpb.MetaDataV0)) bitV := bitfield.NewBitvector64() bitV.SetBitAt(subnet, true) - s.updateSubnetRecordWithMetadata(bitV) + err := s.updateSubnetRecordWithMetadata(bitV) + require.NoError(t, err) } assert.NoError(t, err, "Could not start discovery for node") listeners = append(listeners, listener) diff --git a/beacon-chain/p2p/config.go b/beacon-chain/p2p/config.go index fa2c28d605..453ef7ce2e 100644 --- a/beacon-chain/p2p/config.go +++ b/beacon-chain/p2p/config.go @@ -27,7 +27,6 @@ type Config struct { PrivateKey string DataDir string DiscoveryDir string - MetaDataDir string QUICPort uint TCPPort uint UDPPort uint @@ -37,7 +36,7 @@ type Config struct { AllowListCIDR string DenyListCIDR []string StateNotifier statefeed.Notifier - DB db.ReadOnlyDatabase + DB db.ReadOnlyDatabaseWithSeqNum ClockWaiter startup.ClockWaiter } diff --git a/beacon-chain/p2p/discovery.go b/beacon-chain/p2p/discovery.go index 78af22e07a..7e4ee48d8f 100644 --- a/beacon-chain/p2p/discovery.go +++ b/beacon-chain/p2p/discovery.go @@ -211,7 +211,10 @@ func (s *Service) RefreshPersistentSubnets() { } // Some data changed. Update the record and the metadata. - s.updateSubnetRecordWithMetadata(bitV) + // Not returning early here because the error comes from saving the metadata sequence number. + if err := s.updateSubnetRecordWithMetadata(bitV); err != nil { + log.WithError(err).Error("Failed to update subnet record with metadata") + } // Ping all peers. s.pingPeersAndLogEnr() @@ -269,7 +272,10 @@ func (s *Service) RefreshPersistentSubnets() { } // Some data have changed, update our record and metadata. - s.updateSubnetRecordWithMetadataV2(bitV, bitS, custodyGroupCount) + // Not returning early here because the error comes from saving the metadata sequence number. + if err := s.updateSubnetRecordWithMetadataV2(bitV, bitS, custodyGroupCount); err != nil { + log.WithError(err).Error("Failed to update subnet record with metadata") + } // Ping all peers to inform them of new metadata s.pingPeersAndLogEnr() @@ -289,7 +295,10 @@ func (s *Service) RefreshPersistentSubnets() { } // Some data changed. Update the record and the metadata. - s.updateSubnetRecordWithMetadataV3(bitV, bitS, custodyGroupCount) + // Not returning early here because the error comes from saving the metadata sequence number. + if err := s.updateSubnetRecordWithMetadataV3(bitV, bitS, custodyGroupCount); err != nil { + log.WithError(err).Error("Failed to update subnet record with metadata") + } // Ping all peers. s.pingPeersAndLogEnr() diff --git a/beacon-chain/p2p/discovery_test.go b/beacon-chain/p2p/discovery_test.go index 1b39baa9da..7f38eafdbd 100644 --- a/beacon-chain/p2p/discovery_test.go +++ b/beacon-chain/p2p/discovery_test.go @@ -16,6 +16,7 @@ import ( mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/cache" + testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/peerdata" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers" @@ -361,6 +362,8 @@ func TestStaticPeering_PeersAreAdded(t *testing.T) { cfg.StaticPeers = staticPeers cfg.StateNotifier = &mock.MockStateNotifier{} cfg.NoDiscovery = true + cfg.DB = testDB.SetupDB(t) + s, err := NewService(t.Context(), cfg) require.NoError(t, err) @@ -828,7 +831,7 @@ func TestRefreshPersistentSubnets(t *testing.T) { actualPingCount++ return nil }, - cfg: &Config{UDPPort: 2000}, + cfg: &Config{UDPPort: 2000, DB: testDB.SetupDB(t)}, peers: p2p.Peers(), genesisTime: time.Now().Add(-time.Duration(tc.epochSinceGenesis*secondsPerEpoch) * time.Second), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), diff --git a/beacon-chain/p2p/fork_test.go b/beacon-chain/p2p/fork_test.go index 3d9e89a57c..ac11b8c47c 100644 --- a/beacon-chain/p2p/fork_test.go +++ b/beacon-chain/p2p/fork_test.go @@ -10,6 +10,7 @@ import ( mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/signing" + testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" @@ -31,12 +32,15 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) { ipAddr, pkey := createAddrAndPrivKey(t) genesisTime := time.Now() genesisValidatorsRoot := make([]byte, fieldparams.RootLength) + db := testDB.SetupDB(t) + s := &Service{ cfg: &Config{ UDPPort: uint(port), StateNotifier: &mock.MockStateNotifier{}, PingInterval: testPingInterval, DisableLivenessCheck: true, + DB: db, }, genesisTime: genesisTime, genesisValidatorsRoot: genesisValidatorsRoot, @@ -57,6 +61,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) { StateNotifier: &mock.MockStateNotifier{}, PingInterval: testPingInterval, DisableLivenessCheck: true, + DB: db, } var listeners []*listenerWrapper @@ -132,8 +137,10 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) { ipAddr, pkey := createAddrAndPrivKey(t) genesisTime := time.Now() genesisValidatorsRoot := make([]byte, 32) + db := testDB.SetupDB(t) + s := &Service{ - cfg: &Config{UDPPort: uint(port), PingInterval: testPingInterval, DisableLivenessCheck: true}, + cfg: &Config{UDPPort: uint(port), PingInterval: testPingInterval, DisableLivenessCheck: true, DB: db}, genesisTime: genesisTime, genesisValidatorsRoot: genesisValidatorsRoot, custodyInfo: &custodyInfo{}, @@ -152,6 +159,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) { UDPPort: uint(port), PingInterval: testPingInterval, DisableLivenessCheck: true, + DB: db, } var listeners []*listenerWrapper diff --git a/beacon-chain/p2p/pubsub_filter_test.go b/beacon-chain/p2p/pubsub_filter_test.go index a46ddec0c7..efdf633f13 100644 --- a/beacon-chain/p2p/pubsub_filter_test.go +++ b/beacon-chain/p2p/pubsub_filter_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder" "github.com/OffchainLabs/prysm/v6/beacon-chain/startup" "github.com/OffchainLabs/prysm/v6/config/params" @@ -343,7 +344,7 @@ func TestService_MonitorsStateForkUpdates(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second) defer cancel() cs := startup.NewClockSynchronizer() - s, err := NewService(ctx, &Config{ClockWaiter: cs}) + s, err := NewService(ctx, &Config{ClockWaiter: cs, DB: testDB.SetupDB(t)}) require.NoError(t, err) require.Equal(t, false, s.isInitialized()) diff --git a/beacon-chain/p2p/pubsub_test.go b/beacon-chain/p2p/pubsub_test.go index c895efe1f8..adb4aaa4fb 100644 --- a/beacon-chain/p2p/pubsub_test.go +++ b/beacon-chain/p2p/pubsub_test.go @@ -8,6 +8,7 @@ import ( "time" mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" + testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder" testp2p "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/startup" @@ -21,6 +22,7 @@ func TestService_PublishToTopicConcurrentMapWrite(t *testing.T) { s, err := NewService(t.Context(), &Config{ StateNotifier: &mock.MockStateNotifier{}, ClockWaiter: cs, + DB: testDB.SetupDB(t), }) require.NoError(t, err) ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second) diff --git a/beacon-chain/p2p/service.go b/beacon-chain/p2p/service.go index c07c2cde54..0af4456ba4 100644 --- a/beacon-chain/p2p/service.go +++ b/beacon-chain/p2p/service.go @@ -112,7 +112,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) { return nil, errors.Wrapf(err, "failed to generate p2p private key") } - metaData, err := metaDataFromConfig(cfg) + metaData, err := metaDataFromDB(ctx, cfg.DB) if err != nil { log.WithError(err).Error("Failed to create peer metadata") return nil, err diff --git a/beacon-chain/p2p/service_test.go b/beacon-chain/p2p/service_test.go index eb04d0586f..3e17e1d1dd 100644 --- a/beacon-chain/p2p/service_test.go +++ b/beacon-chain/p2p/service_test.go @@ -9,6 +9,7 @@ import ( "time" mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" + testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers" @@ -85,7 +86,7 @@ func createHost(t *testing.T, port uint) (host.Host, *ecdsa.PrivateKey, net.IP) func TestService_Stop_SetsStartedToFalse(t *testing.T) { params.SetupTestConfigCleanup(t) - s, err := NewService(t.Context(), &Config{StateNotifier: &mock.MockStateNotifier{}}) + s, err := NewService(t.Context(), &Config{StateNotifier: &mock.MockStateNotifier{}, DB: testDB.SetupDB(t)}) require.NoError(t, err) s.started = true s.dv5Listener = &mockListener{} @@ -95,7 +96,7 @@ func TestService_Stop_SetsStartedToFalse(t *testing.T) { func TestService_Stop_DontPanicIfDv5ListenerIsNotInited(t *testing.T) { params.SetupTestConfigCleanup(t) - s, err := NewService(t.Context(), &Config{StateNotifier: &mock.MockStateNotifier{}}) + s, err := NewService(t.Context(), &Config{StateNotifier: &mock.MockStateNotifier{}, DB: testDB.SetupDB(t)}) require.NoError(t, err) assert.NoError(t, s.Stop()) } @@ -110,6 +111,7 @@ func TestService_Start_OnlyStartsOnce(t *testing.T) { TCPPort: 3000, QUICPort: 3000, ClockWaiter: cs, + DB: testDB.SetupDB(t), } s, err := NewService(t.Context(), cfg) require.NoError(t, err) @@ -159,6 +161,7 @@ func TestService_Start_NoDiscoverFlag(t *testing.T) { StateNotifier: &mock.MockStateNotifier{}, NoDiscovery: true, // <-- no s.dv5Listener is created ClockWaiter: cs, + DB: testDB.SetupDB(t), } s, err := NewService(t.Context(), cfg) require.NoError(t, err) @@ -194,6 +197,7 @@ func TestListenForNewNodes(t *testing.T) { ) params.SetupTestConfigCleanup(t) + db := testDB.SetupDB(t) // Setup bootnode. cfg := &Config{ @@ -201,6 +205,7 @@ func TestListenForNewNodes(t *testing.T) { PingInterval: testPingInterval, DisableLivenessCheck: true, UDPPort: port, + DB: db, } _, pkey := createAddrAndPrivKey(t) @@ -246,6 +251,7 @@ func TestListenForNewNodes(t *testing.T) { ClockWaiter: cs, UDPPort: port + i, TCPPort: port + i, + DB: db, } h, pkey, ipAddr := createHost(t, port+i) @@ -343,7 +349,7 @@ func TestService_JoinLeaveTopic(t *testing.T) { ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second) defer cancel() gs := startup.NewClockSynchronizer() - s, err := NewService(ctx, &Config{StateNotifier: &mock.MockStateNotifier{}, ClockWaiter: gs}) + s, err := NewService(ctx, &Config{StateNotifier: &mock.MockStateNotifier{}, ClockWaiter: gs, DB: testDB.SetupDB(t)}) require.NoError(t, err) go s.awaitStateInitialized() diff --git a/beacon-chain/p2p/subnets.go b/beacon-chain/p2p/subnets.go index a48c711ae6..152e10914f 100644 --- a/beacon-chain/p2p/subnets.go +++ b/beacon-chain/p2p/subnets.go @@ -57,6 +57,8 @@ const blobSubnetLockerVal = 110 // chosen more than sync, attestation and blob subnet (6) combined. const dataColumnSubnetVal = 150 +const errSavingSequenceNumber = "saving sequence number after updating subnets: %w" + // nodeFilter returns a function that filters nodes based on the subnet topic and subnet index. func (s *Service) nodeFilter(topic string, indices map[uint64]int) (func(node *enode.Node) (map[uint64]bool, error), error) { switch { @@ -377,13 +379,18 @@ func (s *Service) hasPeerWithSubnet(subnetTopic string) bool { // with a new value for a bitfield of subnets tracked. It also updates // the node's metadata by increasing the sequence number and the // subnets tracked by the node. -func (s *Service) updateSubnetRecordWithMetadata(bitV bitfield.Bitvector64) { +func (s *Service) updateSubnetRecordWithMetadata(bitV bitfield.Bitvector64) error { entry := enr.WithEntry(attSubnetEnrKey, &bitV) s.dv5Listener.LocalNode().Set(entry) s.metaData = wrapper.WrappedMetadataV0(&pb.MetaDataV0{ SeqNumber: s.metaData.SequenceNumber() + 1, Attnets: bitV, }) + + if err := s.saveSequenceNumberIfNeeded(); err != nil { + return fmt.Errorf(errSavingSequenceNumber, err) + } + return nil } // Updates the service's discv5 listener record's attestation subnet @@ -394,7 +401,7 @@ func (s *Service) updateSubnetRecordWithMetadataV2( bitVAtt bitfield.Bitvector64, bitVSync bitfield.Bitvector4, custodyGroupCount uint64, -) { +) error { entry := enr.WithEntry(attSubnetEnrKey, &bitVAtt) subEntry := enr.WithEntry(syncCommsSubnetEnrKey, &bitVSync) @@ -412,6 +419,11 @@ func (s *Service) updateSubnetRecordWithMetadataV2( Attnets: bitVAtt, Syncnets: bitVSync, }) + + if err := s.saveSequenceNumberIfNeeded(); err != nil { + return fmt.Errorf(errSavingSequenceNumber, err) + } + return nil } // updateSubnetRecordWithMetadataV3 updates: @@ -423,7 +435,7 @@ func (s *Service) updateSubnetRecordWithMetadataV3( bitVAtt bitfield.Bitvector64, bitVSync bitfield.Bitvector4, custodyGroupCount uint64, -) { +) error { attSubnetsEntry := enr.WithEntry(attSubnetEnrKey, &bitVAtt) syncSubnetsEntry := enr.WithEntry(syncCommsSubnetEnrKey, &bitVSync) custodyGroupCountEntry := enr.WithEntry(custodyGroupCountEnrKey, custodyGroupCount) @@ -439,6 +451,23 @@ func (s *Service) updateSubnetRecordWithMetadataV3( Syncnets: bitVSync, CustodyGroupCount: custodyGroupCount, }) + + if err := s.saveSequenceNumberIfNeeded(); err != nil { + return fmt.Errorf(errSavingSequenceNumber, err) + } + return nil +} + +// saveSequenceNumberIfNeeded saves the sequence number in DB if either of the following conditions is met: +// - the static peer ID flag is set +// - the fulu epoch is set +func (s *Service) saveSequenceNumberIfNeeded() error { + // Short-circuit if we don't need to save the sequence number. + if !(s.cfg.StaticPeerID || params.FuluEnabled()) { + return nil + } + + return s.cfg.DB.SaveMetadataSeqNum(s.ctx, s.metaData.SequenceNumber()) } func initializePersistentSubnets(id enode.ID, epoch primitives.Epoch) error { diff --git a/beacon-chain/p2p/subnets_test.go b/beacon-chain/p2p/subnets_test.go index f0536f8f1b..8fc5f6c535 100644 --- a/beacon-chain/p2p/subnets_test.go +++ b/beacon-chain/p2p/subnets_test.go @@ -10,6 +10,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/cache" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" + testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing" "github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags" "github.com/OffchainLabs/prysm/v6/config/params" ecdsaprysm "github.com/OffchainLabs/prysm/v6/crypto/ecdsa" @@ -93,6 +94,7 @@ func TestStartDiscV5_FindAndDialPeersWithSubnet(t *testing.T) { // Create 3 nodes, each subscribed to a different subnet. // Each node is connected to the bootstrap node. services := make([]*Service, 0, subnetCount) + db := testDB.SetupDB(t) for i := uint64(1); i <= subnetCount; i++ { service, err := NewService(ctx, &Config{ @@ -103,6 +105,7 @@ func TestStartDiscV5_FindAndDialPeersWithSubnet(t *testing.T) { QUICPort: uint(3000 + i), PingInterval: testPingInterval, DisableLivenessCheck: true, + DB: db, }) require.NoError(t, err) @@ -152,6 +155,7 @@ func TestStartDiscV5_FindAndDialPeersWithSubnet(t *testing.T) { UDPPort: 2010, TCPPort: 3010, QUICPort: 3010, + DB: db, } service, err := NewService(ctx, cfg) diff --git a/beacon-chain/p2p/utils.go b/beacon-chain/p2p/utils.go index 95b73cb4a6..57fe239d9b 100644 --- a/beacon-chain/p2p/utils.go +++ b/beacon-chain/p2p/utils.go @@ -2,6 +2,7 @@ package p2p import ( "bytes" + "context" "crypto/ecdsa" "crypto/rand" "encoding/base64" @@ -12,6 +13,8 @@ import ( "path" "time" + "github.com/OffchainLabs/prysm/v6/beacon-chain/db" + "github.com/OffchainLabs/prysm/v6/beacon-chain/db/kv" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/wrapper" ecdsaprysm "github.com/OffchainLabs/prysm/v6/crypto/ecdsa" @@ -27,11 +30,9 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/go-bitfield" "github.com/sirupsen/logrus" - "google.golang.org/protobuf/proto" ) const keyPath = "network-keys" -const metaDataPath = "metaData" const dialTimeout = 1 * time.Second @@ -121,45 +122,24 @@ func privKeyFromFile(path string) (*ecdsa.PrivateKey, error) { return ecdsaprysm.ConvertFromInterfacePrivKey(unmarshalledKey) } -// Retrieves node p2p metadata from a set of configuration values -// from the p2p service. -// TODO: Figure out how to do a v1/v2 check. -func metaDataFromConfig(cfg *Config) (metadata.Metadata, error) { - defaultKeyPath := path.Join(cfg.DataDir, metaDataPath) - metaDataPath := cfg.MetaDataDir +// Retrieves metadata sequence number from DB and returns a Metadata(V0) object +func metaDataFromDB(ctx context.Context, db db.ReadOnlyDatabaseWithSeqNum) (metadata.Metadata, error) { + seqNum, err := db.MetadataSeqNum(ctx) + // We can proceed if error is `kv.ErrNotFoundMetadataSeqNum` by using default value of 0 for sequence number. + if err != nil && !errors.Is(err, kv.ErrNotFoundMetadataSeqNum) { + return nil, err + } - _, err := os.Stat(defaultKeyPath) - defaultMetadataExist := !os.IsNotExist(err) - if err != nil && defaultMetadataExist { - return nil, err - } - if metaDataPath == "" && !defaultMetadataExist { - metaData := &pb.MetaDataV0{ - SeqNumber: 0, - Attnets: bitfield.NewBitvector64(), - } - dst, err := proto.Marshal(metaData) - if err != nil { - return nil, err - } - if err := file.WriteFile(defaultKeyPath, dst); err != nil { - return nil, err - } - return wrapper.WrappedMetadataV0(metaData), nil - } - if defaultMetadataExist && metaDataPath == "" { - metaDataPath = defaultKeyPath - } - src, err := os.ReadFile(metaDataPath) // #nosec G304 - if err != nil { - log.WithError(err).Error("Error reading metadata from file") - return nil, err - } - metaData := &pb.MetaDataV0{} - if err := proto.Unmarshal(src, metaData); err != nil { - return nil, err - } - return wrapper.WrappedMetadataV0(metaData), nil + // NOTE: Load V0 metadata because: + // - As the p2p service accesses metadata as an interface, and all versions implement the interface, + // there is no error in calling the fields of higher versions. It just returns the default value. + // - This approach allows us to avoid unnecessary code changes when the metadata version bumps. + // - `RefreshPersistentSubnets` runs twice every slot and it manages updating and saving metadata. + metadata := wrapper.WrappedMetadataV0(&pb.MetaDataV0{ + SeqNumber: seqNum, + Attnets: bitfield.NewBitvector64(), + }) + return metadata, nil } // Attempt to dial an address to verify its connectivity diff --git a/beacon-chain/p2p/utils_test.go b/beacon-chain/p2p/utils_test.go index 163239992d..59e5e6fd0c 100644 --- a/beacon-chain/p2p/utils_test.go +++ b/beacon-chain/p2p/utils_test.go @@ -1,8 +1,10 @@ package p2p import ( + "context" "testing" + testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/testing/assert" "github.com/OffchainLabs/prysm/v6/testing/require" @@ -80,3 +82,27 @@ func TestConvertPeerIDToNodeID(t *testing.T) { actualNodeIDStr := actualNodeID.String() require.Equal(t, expectedNodeIDStr, actualNodeIDStr) } + +func TestMetadataFromDB(t *testing.T) { + params.SetupTestConfigCleanup(t) + + t.Run("Metadata from DB", func(t *testing.T) { + beaconDB := testDB.SetupDB(t) + err := beaconDB.SaveMetadataSeqNum(t.Context(), 42) + require.NoError(t, err) + + metaData, err := metaDataFromDB(context.Background(), beaconDB) + require.NoError(t, err) + + assert.Equal(t, uint64(42), metaData.SequenceNumber()) + }) + + t.Run("Use default sequence number (=0) as Metadata not found on DB", func(t *testing.T) { + beaconDB := testDB.SetupDB(t) + + metaData, err := metaDataFromDB(context.Background(), beaconDB) + require.NoError(t, err) + + assert.Equal(t, uint64(0), metaData.SequenceNumber()) + }) +} diff --git a/changelog/syjn99_persist-p2p-seqnum.md b/changelog/syjn99_persist-p2p-seqnum.md new file mode 100644 index 0000000000..9f6a583ee7 --- /dev/null +++ b/changelog/syjn99_persist-p2p-seqnum.md @@ -0,0 +1,7 @@ +### Fixed + +- Fixed [#15544](https://github.com/OffchainLabs/prysm/issues/15544): Persist metadata sequence number if it is needed (e.g., use static peer ID option or Fulu enabled). + +### Deprecated + +- Deprecated `p2p-metadata` flag. \ No newline at end of file diff --git a/cmd/beacon-chain/main.go b/cmd/beacon-chain/main.go index 650b38a7a5..2e63486266 100644 --- a/cmd/beacon-chain/main.go +++ b/cmd/beacon-chain/main.go @@ -100,7 +100,6 @@ var appFlags = []cli.Flag{ cmd.P2PMaxPeers, cmd.P2PPrivKey, cmd.P2PStaticID, - cmd.P2PMetadata, cmd.P2PAllowList, cmd.P2PDenyList, cmd.PubsubQueueSize, diff --git a/cmd/beacon-chain/usage.go b/cmd/beacon-chain/usage.go index d22174b984..eb3ea6cd51 100644 --- a/cmd/beacon-chain/usage.go +++ b/cmd/beacon-chain/usage.go @@ -88,7 +88,6 @@ var appHelpFlagGroups = []flagGroup{ cmd.P2PHostDNS, cmd.P2PIP, cmd.P2PMaxPeers, - cmd.P2PMetadata, cmd.P2PPrivKey, cmd.P2PQUICPort, cmd.P2PStaticID, diff --git a/cmd/flags.go b/cmd/flags.go index 319ff38b4c..5ec952b9be 100644 --- a/cmd/flags.go +++ b/cmd/flags.go @@ -158,12 +158,6 @@ var ( Usage: "Enables the peer id of the node to be fixed by saving the generated network key to the default key path.", Value: false, } - // P2PMetadata defines a flag to specify the location of the peer metadata file. - P2PMetadata = &cli.StringFlag{ - Name: "p2p-metadata", - Usage: "The file containing the metadata to communicate with other peers.", - Value: "", - } // P2PMaxPeers defines a flag to specify the max number of peers in libp2p. P2PMaxPeers = &cli.IntFlag{ Name: "p2p-max-peers", diff --git a/config/features/deprecated_flags.go b/config/features/deprecated_flags.go index 6c6d6b22ba..2cf0e03401 100644 --- a/config/features/deprecated_flags.go +++ b/config/features/deprecated_flags.go @@ -108,6 +108,11 @@ var ( Usage: deprecatedUsage, Hidden: true, } + deprecatedP2PMetadata = &cli.StringFlag{ + Name: "p2p-metadata", + Usage: deprecatedUsage, + Hidden: true, + } ) // Deprecated flags for both the beacon node and validator client. @@ -130,6 +135,7 @@ var deprecatedFlags = []cli.Flag{ deprecatedEnableQuic, deprecatedAttestTimely, deprecatedDisableExperimentalState, + deprecatedP2PMetadata, } var upcomingDeprecation = []cli.Flag{