Persist metadata sequence number using Beacon DB (#15554)

* Add entry for sequence number in chain-metadata bucket & Basic getter/setter

* Mark p2p-metadata flag as deprecated

* Fix metaDataFromConfig: use DB instead to get seqnum

* Save sequence number after updating the metadata

* Fix beacon-chain/p2p unit tests: add DB in config

* Add changelog

* Add ReadOnlyDatabaseWithSeqNum

* Code suggestion from Manu

* Remove seqnum getter at interface

---------

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
This commit is contained in:
Jun Song
2025-08-07 05:18:33 +09:00
committed by GitHub
parent 0f6070a866
commit b00aaef202
27 changed files with 239 additions and 65 deletions

View File

@@ -10,6 +10,11 @@ type ReadOnlyDatabase = iface.ReadOnlyDatabase
// about head info. For head info, use github.com/prysmaticlabs/prysm/blockchain.HeadFetcher. // about head info. For head info, use github.com/prysmaticlabs/prysm/blockchain.HeadFetcher.
type NoHeadAccessDatabase = iface.NoHeadAccessDatabase type NoHeadAccessDatabase = iface.NoHeadAccessDatabase
// ReadOnlyDatabaseWithSeqNum exposes Prysm's Ethereum data backend for read access only, no information about
// head info, but with read/write access to the p2p metadata sequence number.
// This is used for the p2p service.
type ReadOnlyDatabaseWithSeqNum = iface.ReadOnlyDatabaseWithSeqNum
// HeadAccessDatabase exposes Prysm's Ethereum backend for read/write access with information about // HeadAccessDatabase exposes Prysm's Ethereum backend for read/write access with information about
// chain head information. This interface should be used sparingly as the HeadFetcher is the source // chain head information. This interface should be used sparingly as the HeadFetcher is the source
// of truth around chain head information while this interface serves as persistent storage for the // of truth around chain head information while this interface serves as persistent storage for the

View File

@@ -64,6 +64,18 @@ type ReadOnlyDatabase interface {
// Origin checkpoint sync support // Origin checkpoint sync support
OriginCheckpointBlockRoot(ctx context.Context) ([32]byte, error) OriginCheckpointBlockRoot(ctx context.Context) ([32]byte, error)
BackfillStatus(context.Context) (*dbval.BackfillStatus, error) BackfillStatus(context.Context) (*dbval.BackfillStatus, error)
// P2P Metadata operations.
MetadataSeqNum(ctx context.Context) (uint64, error)
}
// ReadOnlyDatabaseWithSeqNum defines a struct which has read access to database methods
// and also has read/write access to the p2p metadata sequence number.
// Only used for the p2p service.
type ReadOnlyDatabaseWithSeqNum interface {
ReadOnlyDatabase
SaveMetadataSeqNum(ctx context.Context, seqNum uint64) error
} }
// NoHeadAccessDatabase defines a struct without access to chain head data. // NoHeadAccessDatabase defines a struct without access to chain head data.
@@ -106,6 +118,9 @@ type NoHeadAccessDatabase interface {
// Custody operations. // Custody operations.
UpdateSubscribedToAllDataSubnets(ctx context.Context, subscribed bool) (bool, error) UpdateSubscribedToAllDataSubnets(ctx context.Context, subscribed bool) (bool, error)
UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot primitives.Slot, custodyGroupCount uint64) (primitives.Slot, uint64, error) UpdateCustodyInfo(ctx context.Context, earliestAvailableSlot primitives.Slot, custodyGroupCount uint64) (primitives.Slot, uint64, error)
// P2P Metadata operations.
SaveMetadataSeqNum(ctx context.Context, seqNum uint64) error
} }
// HeadAccessDatabase defines a struct with access to reading chain head data. // HeadAccessDatabase defines a struct with access to reading chain head data.

View File

@@ -24,6 +24,7 @@ go_library(
"migration_block_slot_index.go", "migration_block_slot_index.go",
"migration_finalized_parent.go", "migration_finalized_parent.go",
"migration_state_validators.go", "migration_state_validators.go",
"p2p.go",
"schema.go", "schema.go",
"state.go", "state.go",
"state_summary.go", "state_summary.go",
@@ -96,6 +97,7 @@ go_test(
"migration_archived_index_test.go", "migration_archived_index_test.go",
"migration_block_slot_index_test.go", "migration_block_slot_index_test.go",
"migration_state_validators_test.go", "migration_state_validators_test.go",
"p2p_test.go",
"state_summary_test.go", "state_summary_test.go",
"state_test.go", "state_test.go",
"utils_test.go", "utils_test.go",

View File

@@ -19,6 +19,9 @@ var ErrNotFoundGenesisBlockRoot = errors.Wrap(ErrNotFound, "OriginGenesisRoot")
// ErrNotFoundFeeRecipient is a not found error specifically for the fee recipient getter // ErrNotFoundFeeRecipient is a not found error specifically for the fee recipient getter
var ErrNotFoundFeeRecipient = errors.Wrap(ErrNotFound, "fee recipient") var ErrNotFoundFeeRecipient = errors.Wrap(ErrNotFound, "fee recipient")
// ErrNotFoundMetadataSeqNum is a not found error specifically for the metadata sequence number getter
var ErrNotFoundMetadataSeqNum = errors.Wrap(ErrNotFound, "metadata sequence number")
var errEmptyBlockSlice = errors.New("[]blocks.ROBlock is empty") var errEmptyBlockSlice = errors.New("[]blocks.ROBlock is empty")
var errIncorrectBlockParent = errors.New("unexpected missing or forked blocks in a []ROBlock") var errIncorrectBlockParent = errors.New("unexpected missing or forked blocks in a []ROBlock")
var errFinalizedChildNotFound = errors.New("unable to find finalized root descending from backfill batch") var errFinalizedChildNotFound = errors.New("unable to find finalized root descending from backfill batch")

42
beacon-chain/db/kv/p2p.go Normal file
View File

@@ -0,0 +1,42 @@
package kv
import (
"context"
"github.com/OffchainLabs/prysm/v6/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace"
bolt "go.etcd.io/bbolt"
)
// MetadataSeqNum retrieves the p2p metadata sequence number from the database.
// It returns 0 and ErrNotFoundMetadataSeqNum if the key does not exist.
func (s *Store) MetadataSeqNum(ctx context.Context) (uint64, error) {
_, span := trace.StartSpan(ctx, "BeaconDB.MetadataSeqNum")
defer span.End()
var seqNum uint64
err := s.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(chainMetadataBucket)
val := bkt.Get(metadataSequenceNumberKey)
if val == nil {
return ErrNotFoundMetadataSeqNum
}
seqNum = bytesutil.BytesToUint64BigEndian(val)
return nil
})
return seqNum, err
}
// SaveMetadataSeqNum saves the p2p metadata sequence number to the database.
func (s *Store) SaveMetadataSeqNum(ctx context.Context, seqNum uint64) error {
_, span := trace.StartSpan(ctx, "BeaconDB.SaveMetadataSeqNum")
defer span.End()
return s.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(chainMetadataBucket)
val := bytesutil.Uint64ToBytesBigEndian(seqNum)
return bkt.Put(metadataSequenceNumberKey, val)
})
}

View File

@@ -0,0 +1,33 @@
package kv
import (
"testing"
"github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require"
)
func TestStore_MetadataSeqNum(t *testing.T) {
ctx := t.Context()
db := setupDB(t)
seqNum, err := db.MetadataSeqNum(ctx)
require.ErrorIs(t, err, ErrNotFoundMetadataSeqNum)
assert.Equal(t, uint64(0), seqNum)
initialSeqNum := uint64(42)
err = db.SaveMetadataSeqNum(ctx, initialSeqNum)
require.NoError(t, err)
retrievedSeqNum, err := db.MetadataSeqNum(ctx)
require.NoError(t, err)
assert.Equal(t, initialSeqNum, retrievedSeqNum)
updatedSeqNum := uint64(43)
err = db.SaveMetadataSeqNum(ctx, updatedSeqNum)
require.NoError(t, err)
retrievedSeqNum, err = db.MetadataSeqNum(ctx)
require.NoError(t, err)
assert.Equal(t, updatedSeqNum, retrievedSeqNum)
}

View File

@@ -42,6 +42,7 @@ var (
finalizedCheckpointKey = []byte("finalized-checkpoint") finalizedCheckpointKey = []byte("finalized-checkpoint")
powchainDataKey = []byte("powchain-data") powchainDataKey = []byte("powchain-data")
lastValidatedCheckpointKey = []byte("last-validated-checkpoint") lastValidatedCheckpointKey = []byte("last-validated-checkpoint")
metadataSequenceNumberKey = []byte("metadata-seq-number")
// Below keys are used to identify objects are to be fork compatible. // Below keys are used to identify objects are to be fork compatible.
// Objects that are only compatible with specific forks should be prefixed with such keys. // Objects that are only compatible with specific forks should be prefixed with such keys.

View File

@@ -702,7 +702,6 @@ func (b *BeaconNode) registerP2P(cliCtx *cli.Context) error {
HostDNS: cliCtx.String(cmd.P2PHostDNS.Name), HostDNS: cliCtx.String(cmd.P2PHostDNS.Name),
PrivateKey: cliCtx.String(cmd.P2PPrivKey.Name), PrivateKey: cliCtx.String(cmd.P2PPrivKey.Name),
StaticPeerID: cliCtx.Bool(cmd.P2PStaticID.Name), StaticPeerID: cliCtx.Bool(cmd.P2PStaticID.Name),
MetaDataDir: cliCtx.String(cmd.P2PMetadata.Name),
QUICPort: cliCtx.Uint(cmd.P2PQUICPort.Name), QUICPort: cliCtx.Uint(cmd.P2PQUICPort.Name),
TCPPort: cliCtx.Uint(cmd.P2PTCPPort.Name), TCPPort: cliCtx.Uint(cmd.P2PTCPPort.Name),
UDPPort: cliCtx.Uint(cmd.P2PUDPPort.Name), UDPPort: cliCtx.Uint(cmd.P2PUDPPort.Name),

View File

@@ -49,6 +49,7 @@ go_library(
"//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/core/peerdas:go_default_library",
"//beacon-chain/core/time:go_default_library", "//beacon-chain/core/time:go_default_library",
"//beacon-chain/db:go_default_library", "//beacon-chain/db:go_default_library",
"//beacon-chain/db/kv:go_default_library",
"//beacon-chain/p2p/encoder:go_default_library", "//beacon-chain/p2p/encoder:go_default_library",
"//beacon-chain/p2p/peers:go_default_library", "//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/peers/peerdata:go_default_library", "//beacon-chain/p2p/peers/peerdata:go_default_library",

View File

@@ -265,7 +265,8 @@ func TestService_BroadcastAttestationWithDiscoveryAttempts(t *testing.T) {
s.metaData = wrapper.WrappedMetadataV0(new(ethpb.MetaDataV0)) s.metaData = wrapper.WrappedMetadataV0(new(ethpb.MetaDataV0))
bitV := bitfield.NewBitvector64() bitV := bitfield.NewBitvector64()
bitV.SetBitAt(subnet, true) bitV.SetBitAt(subnet, true)
s.updateSubnetRecordWithMetadata(bitV) err := s.updateSubnetRecordWithMetadata(bitV)
require.NoError(t, err)
} }
assert.NoError(t, err, "Could not start discovery for node") assert.NoError(t, err, "Could not start discovery for node")
listeners = append(listeners, listener) listeners = append(listeners, listener)

View File

@@ -27,7 +27,6 @@ type Config struct {
PrivateKey string PrivateKey string
DataDir string DataDir string
DiscoveryDir string DiscoveryDir string
MetaDataDir string
QUICPort uint QUICPort uint
TCPPort uint TCPPort uint
UDPPort uint UDPPort uint
@@ -37,7 +36,7 @@ type Config struct {
AllowListCIDR string AllowListCIDR string
DenyListCIDR []string DenyListCIDR []string
StateNotifier statefeed.Notifier StateNotifier statefeed.Notifier
DB db.ReadOnlyDatabase DB db.ReadOnlyDatabaseWithSeqNum
ClockWaiter startup.ClockWaiter ClockWaiter startup.ClockWaiter
} }

View File

@@ -211,7 +211,10 @@ func (s *Service) RefreshPersistentSubnets() {
} }
// Some data changed. Update the record and the metadata. // Some data changed. Update the record and the metadata.
s.updateSubnetRecordWithMetadata(bitV) // Not returning early here because the error comes from saving the metadata sequence number.
if err := s.updateSubnetRecordWithMetadata(bitV); err != nil {
log.WithError(err).Error("Failed to update subnet record with metadata")
}
// Ping all peers. // Ping all peers.
s.pingPeersAndLogEnr() s.pingPeersAndLogEnr()
@@ -269,7 +272,10 @@ func (s *Service) RefreshPersistentSubnets() {
} }
// Some data have changed, update our record and metadata. // Some data have changed, update our record and metadata.
s.updateSubnetRecordWithMetadataV2(bitV, bitS, custodyGroupCount) // Not returning early here because the error comes from saving the metadata sequence number.
if err := s.updateSubnetRecordWithMetadataV2(bitV, bitS, custodyGroupCount); err != nil {
log.WithError(err).Error("Failed to update subnet record with metadata")
}
// Ping all peers to inform them of new metadata // Ping all peers to inform them of new metadata
s.pingPeersAndLogEnr() s.pingPeersAndLogEnr()
@@ -289,7 +295,10 @@ func (s *Service) RefreshPersistentSubnets() {
} }
// Some data changed. Update the record and the metadata. // Some data changed. Update the record and the metadata.
s.updateSubnetRecordWithMetadataV3(bitV, bitS, custodyGroupCount) // Not returning early here because the error comes from saving the metadata sequence number.
if err := s.updateSubnetRecordWithMetadataV3(bitV, bitS, custodyGroupCount); err != nil {
log.WithError(err).Error("Failed to update subnet record with metadata")
}
// Ping all peers. // Ping all peers.
s.pingPeersAndLogEnr() s.pingPeersAndLogEnr()

View File

@@ -16,6 +16,7 @@ import (
mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache" "github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/peerdata" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/peerdata"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
@@ -361,6 +362,8 @@ func TestStaticPeering_PeersAreAdded(t *testing.T) {
cfg.StaticPeers = staticPeers cfg.StaticPeers = staticPeers
cfg.StateNotifier = &mock.MockStateNotifier{} cfg.StateNotifier = &mock.MockStateNotifier{}
cfg.NoDiscovery = true cfg.NoDiscovery = true
cfg.DB = testDB.SetupDB(t)
s, err := NewService(t.Context(), cfg) s, err := NewService(t.Context(), cfg)
require.NoError(t, err) require.NoError(t, err)
@@ -828,7 +831,7 @@ func TestRefreshPersistentSubnets(t *testing.T) {
actualPingCount++ actualPingCount++
return nil return nil
}, },
cfg: &Config{UDPPort: 2000}, cfg: &Config{UDPPort: 2000, DB: testDB.SetupDB(t)},
peers: p2p.Peers(), peers: p2p.Peers(),
genesisTime: time.Now().Add(-time.Duration(tc.epochSinceGenesis*secondsPerEpoch) * time.Second), genesisTime: time.Now().Add(-time.Duration(tc.epochSinceGenesis*secondsPerEpoch) * time.Second),
genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32), genesisValidatorsRoot: bytesutil.PadTo([]byte{'A'}, 32),

View File

@@ -10,6 +10,7 @@ import (
mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/signing" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/signing"
testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams"
"github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives"
@@ -31,12 +32,15 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {
ipAddr, pkey := createAddrAndPrivKey(t) ipAddr, pkey := createAddrAndPrivKey(t)
genesisTime := time.Now() genesisTime := time.Now()
genesisValidatorsRoot := make([]byte, fieldparams.RootLength) genesisValidatorsRoot := make([]byte, fieldparams.RootLength)
db := testDB.SetupDB(t)
s := &Service{ s := &Service{
cfg: &Config{ cfg: &Config{
UDPPort: uint(port), UDPPort: uint(port),
StateNotifier: &mock.MockStateNotifier{}, StateNotifier: &mock.MockStateNotifier{},
PingInterval: testPingInterval, PingInterval: testPingInterval,
DisableLivenessCheck: true, DisableLivenessCheck: true,
DB: db,
}, },
genesisTime: genesisTime, genesisTime: genesisTime,
genesisValidatorsRoot: genesisValidatorsRoot, genesisValidatorsRoot: genesisValidatorsRoot,
@@ -57,6 +61,7 @@ func TestStartDiscv5_DifferentForkDigests(t *testing.T) {
StateNotifier: &mock.MockStateNotifier{}, StateNotifier: &mock.MockStateNotifier{},
PingInterval: testPingInterval, PingInterval: testPingInterval,
DisableLivenessCheck: true, DisableLivenessCheck: true,
DB: db,
} }
var listeners []*listenerWrapper var listeners []*listenerWrapper
@@ -132,8 +137,10 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
ipAddr, pkey := createAddrAndPrivKey(t) ipAddr, pkey := createAddrAndPrivKey(t)
genesisTime := time.Now() genesisTime := time.Now()
genesisValidatorsRoot := make([]byte, 32) genesisValidatorsRoot := make([]byte, 32)
db := testDB.SetupDB(t)
s := &Service{ s := &Service{
cfg: &Config{UDPPort: uint(port), PingInterval: testPingInterval, DisableLivenessCheck: true}, cfg: &Config{UDPPort: uint(port), PingInterval: testPingInterval, DisableLivenessCheck: true, DB: db},
genesisTime: genesisTime, genesisTime: genesisTime,
genesisValidatorsRoot: genesisValidatorsRoot, genesisValidatorsRoot: genesisValidatorsRoot,
custodyInfo: &custodyInfo{}, custodyInfo: &custodyInfo{},
@@ -152,6 +159,7 @@ func TestStartDiscv5_SameForkDigests_DifferentNextForkData(t *testing.T) {
UDPPort: uint(port), UDPPort: uint(port),
PingInterval: testPingInterval, PingInterval: testPingInterval,
DisableLivenessCheck: true, DisableLivenessCheck: true,
DB: db,
} }
var listeners []*listenerWrapper var listeners []*listenerWrapper

View File

@@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup" "github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
"github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/config/params"
@@ -343,7 +344,7 @@ func TestService_MonitorsStateForkUpdates(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second) ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second)
defer cancel() defer cancel()
cs := startup.NewClockSynchronizer() cs := startup.NewClockSynchronizer()
s, err := NewService(ctx, &Config{ClockWaiter: cs}) s, err := NewService(ctx, &Config{ClockWaiter: cs, DB: testDB.SetupDB(t)})
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, false, s.isInitialized()) require.Equal(t, false, s.isInitialized())

View File

@@ -8,6 +8,7 @@ import (
"time" "time"
mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
testp2p "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing" testp2p "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup" "github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
@@ -21,6 +22,7 @@ func TestService_PublishToTopicConcurrentMapWrite(t *testing.T) {
s, err := NewService(t.Context(), &Config{ s, err := NewService(t.Context(), &Config{
StateNotifier: &mock.MockStateNotifier{}, StateNotifier: &mock.MockStateNotifier{},
ClockWaiter: cs, ClockWaiter: cs,
DB: testDB.SetupDB(t),
}) })
require.NoError(t, err) require.NoError(t, err)
ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second) ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second)

View File

@@ -112,7 +112,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
return nil, errors.Wrapf(err, "failed to generate p2p private key") return nil, errors.Wrapf(err, "failed to generate p2p private key")
} }
metaData, err := metaDataFromConfig(cfg) metaData, err := metaDataFromDB(ctx, cfg.DB)
if err != nil { if err != nil {
log.WithError(err).Error("Failed to create peer metadata") log.WithError(err).Error("Failed to create peer metadata")
return nil, err return nil, err

View File

@@ -9,6 +9,7 @@ import (
"time" "time"
mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" mock "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/encoder"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers" "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/peers/scorers"
@@ -85,7 +86,7 @@ func createHost(t *testing.T, port uint) (host.Host, *ecdsa.PrivateKey, net.IP)
func TestService_Stop_SetsStartedToFalse(t *testing.T) { func TestService_Stop_SetsStartedToFalse(t *testing.T) {
params.SetupTestConfigCleanup(t) params.SetupTestConfigCleanup(t)
s, err := NewService(t.Context(), &Config{StateNotifier: &mock.MockStateNotifier{}}) s, err := NewService(t.Context(), &Config{StateNotifier: &mock.MockStateNotifier{}, DB: testDB.SetupDB(t)})
require.NoError(t, err) require.NoError(t, err)
s.started = true s.started = true
s.dv5Listener = &mockListener{} s.dv5Listener = &mockListener{}
@@ -95,7 +96,7 @@ func TestService_Stop_SetsStartedToFalse(t *testing.T) {
func TestService_Stop_DontPanicIfDv5ListenerIsNotInited(t *testing.T) { func TestService_Stop_DontPanicIfDv5ListenerIsNotInited(t *testing.T) {
params.SetupTestConfigCleanup(t) params.SetupTestConfigCleanup(t)
s, err := NewService(t.Context(), &Config{StateNotifier: &mock.MockStateNotifier{}}) s, err := NewService(t.Context(), &Config{StateNotifier: &mock.MockStateNotifier{}, DB: testDB.SetupDB(t)})
require.NoError(t, err) require.NoError(t, err)
assert.NoError(t, s.Stop()) assert.NoError(t, s.Stop())
} }
@@ -110,6 +111,7 @@ func TestService_Start_OnlyStartsOnce(t *testing.T) {
TCPPort: 3000, TCPPort: 3000,
QUICPort: 3000, QUICPort: 3000,
ClockWaiter: cs, ClockWaiter: cs,
DB: testDB.SetupDB(t),
} }
s, err := NewService(t.Context(), cfg) s, err := NewService(t.Context(), cfg)
require.NoError(t, err) require.NoError(t, err)
@@ -159,6 +161,7 @@ func TestService_Start_NoDiscoverFlag(t *testing.T) {
StateNotifier: &mock.MockStateNotifier{}, StateNotifier: &mock.MockStateNotifier{},
NoDiscovery: true, // <-- no s.dv5Listener is created NoDiscovery: true, // <-- no s.dv5Listener is created
ClockWaiter: cs, ClockWaiter: cs,
DB: testDB.SetupDB(t),
} }
s, err := NewService(t.Context(), cfg) s, err := NewService(t.Context(), cfg)
require.NoError(t, err) require.NoError(t, err)
@@ -194,6 +197,7 @@ func TestListenForNewNodes(t *testing.T) {
) )
params.SetupTestConfigCleanup(t) params.SetupTestConfigCleanup(t)
db := testDB.SetupDB(t)
// Setup bootnode. // Setup bootnode.
cfg := &Config{ cfg := &Config{
@@ -201,6 +205,7 @@ func TestListenForNewNodes(t *testing.T) {
PingInterval: testPingInterval, PingInterval: testPingInterval,
DisableLivenessCheck: true, DisableLivenessCheck: true,
UDPPort: port, UDPPort: port,
DB: db,
} }
_, pkey := createAddrAndPrivKey(t) _, pkey := createAddrAndPrivKey(t)
@@ -246,6 +251,7 @@ func TestListenForNewNodes(t *testing.T) {
ClockWaiter: cs, ClockWaiter: cs,
UDPPort: port + i, UDPPort: port + i,
TCPPort: port + i, TCPPort: port + i,
DB: db,
} }
h, pkey, ipAddr := createHost(t, port+i) h, pkey, ipAddr := createHost(t, port+i)
@@ -343,7 +349,7 @@ func TestService_JoinLeaveTopic(t *testing.T) {
ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second) ctx, cancel := context.WithTimeout(t.Context(), 3*time.Second)
defer cancel() defer cancel()
gs := startup.NewClockSynchronizer() gs := startup.NewClockSynchronizer()
s, err := NewService(ctx, &Config{StateNotifier: &mock.MockStateNotifier{}, ClockWaiter: gs}) s, err := NewService(ctx, &Config{StateNotifier: &mock.MockStateNotifier{}, ClockWaiter: gs, DB: testDB.SetupDB(t)})
require.NoError(t, err) require.NoError(t, err)
go s.awaitStateInitialized() go s.awaitStateInitialized()

View File

@@ -57,6 +57,8 @@ const blobSubnetLockerVal = 110
// chosen more than sync, attestation and blob subnet (6) combined. // chosen more than sync, attestation and blob subnet (6) combined.
const dataColumnSubnetVal = 150 const dataColumnSubnetVal = 150
const errSavingSequenceNumber = "saving sequence number after updating subnets: %w"
// nodeFilter returns a function that filters nodes based on the subnet topic and subnet index. // nodeFilter returns a function that filters nodes based on the subnet topic and subnet index.
func (s *Service) nodeFilter(topic string, indices map[uint64]int) (func(node *enode.Node) (map[uint64]bool, error), error) { func (s *Service) nodeFilter(topic string, indices map[uint64]int) (func(node *enode.Node) (map[uint64]bool, error), error) {
switch { switch {
@@ -377,13 +379,18 @@ func (s *Service) hasPeerWithSubnet(subnetTopic string) bool {
// with a new value for a bitfield of subnets tracked. It also updates // with a new value for a bitfield of subnets tracked. It also updates
// the node's metadata by increasing the sequence number and the // the node's metadata by increasing the sequence number and the
// subnets tracked by the node. // subnets tracked by the node.
func (s *Service) updateSubnetRecordWithMetadata(bitV bitfield.Bitvector64) { func (s *Service) updateSubnetRecordWithMetadata(bitV bitfield.Bitvector64) error {
entry := enr.WithEntry(attSubnetEnrKey, &bitV) entry := enr.WithEntry(attSubnetEnrKey, &bitV)
s.dv5Listener.LocalNode().Set(entry) s.dv5Listener.LocalNode().Set(entry)
s.metaData = wrapper.WrappedMetadataV0(&pb.MetaDataV0{ s.metaData = wrapper.WrappedMetadataV0(&pb.MetaDataV0{
SeqNumber: s.metaData.SequenceNumber() + 1, SeqNumber: s.metaData.SequenceNumber() + 1,
Attnets: bitV, Attnets: bitV,
}) })
if err := s.saveSequenceNumberIfNeeded(); err != nil {
return fmt.Errorf(errSavingSequenceNumber, err)
}
return nil
} }
// Updates the service's discv5 listener record's attestation subnet // Updates the service's discv5 listener record's attestation subnet
@@ -394,7 +401,7 @@ func (s *Service) updateSubnetRecordWithMetadataV2(
bitVAtt bitfield.Bitvector64, bitVAtt bitfield.Bitvector64,
bitVSync bitfield.Bitvector4, bitVSync bitfield.Bitvector4,
custodyGroupCount uint64, custodyGroupCount uint64,
) { ) error {
entry := enr.WithEntry(attSubnetEnrKey, &bitVAtt) entry := enr.WithEntry(attSubnetEnrKey, &bitVAtt)
subEntry := enr.WithEntry(syncCommsSubnetEnrKey, &bitVSync) subEntry := enr.WithEntry(syncCommsSubnetEnrKey, &bitVSync)
@@ -412,6 +419,11 @@ func (s *Service) updateSubnetRecordWithMetadataV2(
Attnets: bitVAtt, Attnets: bitVAtt,
Syncnets: bitVSync, Syncnets: bitVSync,
}) })
if err := s.saveSequenceNumberIfNeeded(); err != nil {
return fmt.Errorf(errSavingSequenceNumber, err)
}
return nil
} }
// updateSubnetRecordWithMetadataV3 updates: // updateSubnetRecordWithMetadataV3 updates:
@@ -423,7 +435,7 @@ func (s *Service) updateSubnetRecordWithMetadataV3(
bitVAtt bitfield.Bitvector64, bitVAtt bitfield.Bitvector64,
bitVSync bitfield.Bitvector4, bitVSync bitfield.Bitvector4,
custodyGroupCount uint64, custodyGroupCount uint64,
) { ) error {
attSubnetsEntry := enr.WithEntry(attSubnetEnrKey, &bitVAtt) attSubnetsEntry := enr.WithEntry(attSubnetEnrKey, &bitVAtt)
syncSubnetsEntry := enr.WithEntry(syncCommsSubnetEnrKey, &bitVSync) syncSubnetsEntry := enr.WithEntry(syncCommsSubnetEnrKey, &bitVSync)
custodyGroupCountEntry := enr.WithEntry(custodyGroupCountEnrKey, custodyGroupCount) custodyGroupCountEntry := enr.WithEntry(custodyGroupCountEnrKey, custodyGroupCount)
@@ -439,6 +451,23 @@ func (s *Service) updateSubnetRecordWithMetadataV3(
Syncnets: bitVSync, Syncnets: bitVSync,
CustodyGroupCount: custodyGroupCount, CustodyGroupCount: custodyGroupCount,
}) })
if err := s.saveSequenceNumberIfNeeded(); err != nil {
return fmt.Errorf(errSavingSequenceNumber, err)
}
return nil
}
// saveSequenceNumberIfNeeded saves the sequence number in DB if either of the following conditions is met:
// - the static peer ID flag is set
// - the fulu epoch is set
func (s *Service) saveSequenceNumberIfNeeded() error {
// Short-circuit if we don't need to save the sequence number.
if !(s.cfg.StaticPeerID || params.FuluEnabled()) {
return nil
}
return s.cfg.DB.SaveMetadataSeqNum(s.ctx, s.metaData.SequenceNumber())
} }
func initializePersistentSubnets(id enode.ID, epoch primitives.Epoch) error { func initializePersistentSubnets(id enode.ID, epoch primitives.Epoch) error {

View File

@@ -10,6 +10,7 @@ import (
"github.com/OffchainLabs/prysm/v6/beacon-chain/cache" "github.com/OffchainLabs/prysm/v6/beacon-chain/cache"
"github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas"
testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
"github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags" "github.com/OffchainLabs/prysm/v6/cmd/beacon-chain/flags"
"github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/config/params"
ecdsaprysm "github.com/OffchainLabs/prysm/v6/crypto/ecdsa" ecdsaprysm "github.com/OffchainLabs/prysm/v6/crypto/ecdsa"
@@ -93,6 +94,7 @@ func TestStartDiscV5_FindAndDialPeersWithSubnet(t *testing.T) {
// Create 3 nodes, each subscribed to a different subnet. // Create 3 nodes, each subscribed to a different subnet.
// Each node is connected to the bootstrap node. // Each node is connected to the bootstrap node.
services := make([]*Service, 0, subnetCount) services := make([]*Service, 0, subnetCount)
db := testDB.SetupDB(t)
for i := uint64(1); i <= subnetCount; i++ { for i := uint64(1); i <= subnetCount; i++ {
service, err := NewService(ctx, &Config{ service, err := NewService(ctx, &Config{
@@ -103,6 +105,7 @@ func TestStartDiscV5_FindAndDialPeersWithSubnet(t *testing.T) {
QUICPort: uint(3000 + i), QUICPort: uint(3000 + i),
PingInterval: testPingInterval, PingInterval: testPingInterval,
DisableLivenessCheck: true, DisableLivenessCheck: true,
DB: db,
}) })
require.NoError(t, err) require.NoError(t, err)
@@ -152,6 +155,7 @@ func TestStartDiscV5_FindAndDialPeersWithSubnet(t *testing.T) {
UDPPort: 2010, UDPPort: 2010,
TCPPort: 3010, TCPPort: 3010,
QUICPort: 3010, QUICPort: 3010,
DB: db,
} }
service, err := NewService(ctx, cfg) service, err := NewService(ctx, cfg)

View File

@@ -2,6 +2,7 @@ package p2p
import ( import (
"bytes" "bytes"
"context"
"crypto/ecdsa" "crypto/ecdsa"
"crypto/rand" "crypto/rand"
"encoding/base64" "encoding/base64"
@@ -12,6 +13,8 @@ import (
"path" "path"
"time" "time"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db"
"github.com/OffchainLabs/prysm/v6/beacon-chain/db/kv"
"github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/consensus-types/wrapper" "github.com/OffchainLabs/prysm/v6/consensus-types/wrapper"
ecdsaprysm "github.com/OffchainLabs/prysm/v6/crypto/ecdsa" ecdsaprysm "github.com/OffchainLabs/prysm/v6/crypto/ecdsa"
@@ -27,11 +30,9 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield" "github.com/prysmaticlabs/go-bitfield"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"google.golang.org/protobuf/proto"
) )
const keyPath = "network-keys" const keyPath = "network-keys"
const metaDataPath = "metaData"
const dialTimeout = 1 * time.Second const dialTimeout = 1 * time.Second
@@ -121,45 +122,24 @@ func privKeyFromFile(path string) (*ecdsa.PrivateKey, error) {
return ecdsaprysm.ConvertFromInterfacePrivKey(unmarshalledKey) return ecdsaprysm.ConvertFromInterfacePrivKey(unmarshalledKey)
} }
// Retrieves node p2p metadata from a set of configuration values // Retrieves metadata sequence number from DB and returns a Metadata(V0) object
// from the p2p service. func metaDataFromDB(ctx context.Context, db db.ReadOnlyDatabaseWithSeqNum) (metadata.Metadata, error) {
// TODO: Figure out how to do a v1/v2 check. seqNum, err := db.MetadataSeqNum(ctx)
func metaDataFromConfig(cfg *Config) (metadata.Metadata, error) { // We can proceed if error is `kv.ErrNotFoundMetadataSeqNum` by using default value of 0 for sequence number.
defaultKeyPath := path.Join(cfg.DataDir, metaDataPath) if err != nil && !errors.Is(err, kv.ErrNotFoundMetadataSeqNum) {
metaDataPath := cfg.MetaDataDir return nil, err
}
_, err := os.Stat(defaultKeyPath) // NOTE: Load V0 metadata because:
defaultMetadataExist := !os.IsNotExist(err) // - As the p2p service accesses metadata as an interface, and all versions implement the interface,
if err != nil && defaultMetadataExist { // there is no error in calling the fields of higher versions. It just returns the default value.
return nil, err // - This approach allows us to avoid unnecessary code changes when the metadata version bumps.
} // - `RefreshPersistentSubnets` runs twice every slot and it manages updating and saving metadata.
if metaDataPath == "" && !defaultMetadataExist { metadata := wrapper.WrappedMetadataV0(&pb.MetaDataV0{
metaData := &pb.MetaDataV0{ SeqNumber: seqNum,
SeqNumber: 0, Attnets: bitfield.NewBitvector64(),
Attnets: bitfield.NewBitvector64(), })
} return metadata, nil
dst, err := proto.Marshal(metaData)
if err != nil {
return nil, err
}
if err := file.WriteFile(defaultKeyPath, dst); err != nil {
return nil, err
}
return wrapper.WrappedMetadataV0(metaData), nil
}
if defaultMetadataExist && metaDataPath == "" {
metaDataPath = defaultKeyPath
}
src, err := os.ReadFile(metaDataPath) // #nosec G304
if err != nil {
log.WithError(err).Error("Error reading metadata from file")
return nil, err
}
metaData := &pb.MetaDataV0{}
if err := proto.Unmarshal(src, metaData); err != nil {
return nil, err
}
return wrapper.WrappedMetadataV0(metaData), nil
} }
// Attempt to dial an address to verify its connectivity // Attempt to dial an address to verify its connectivity

View File

@@ -1,8 +1,10 @@
package p2p package p2p
import ( import (
"context"
"testing" "testing"
testDB "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
"github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/config/params"
"github.com/OffchainLabs/prysm/v6/testing/assert" "github.com/OffchainLabs/prysm/v6/testing/assert"
"github.com/OffchainLabs/prysm/v6/testing/require" "github.com/OffchainLabs/prysm/v6/testing/require"
@@ -80,3 +82,27 @@ func TestConvertPeerIDToNodeID(t *testing.T) {
actualNodeIDStr := actualNodeID.String() actualNodeIDStr := actualNodeID.String()
require.Equal(t, expectedNodeIDStr, actualNodeIDStr) require.Equal(t, expectedNodeIDStr, actualNodeIDStr)
} }
func TestMetadataFromDB(t *testing.T) {
params.SetupTestConfigCleanup(t)
t.Run("Metadata from DB", func(t *testing.T) {
beaconDB := testDB.SetupDB(t)
err := beaconDB.SaveMetadataSeqNum(t.Context(), 42)
require.NoError(t, err)
metaData, err := metaDataFromDB(context.Background(), beaconDB)
require.NoError(t, err)
assert.Equal(t, uint64(42), metaData.SequenceNumber())
})
t.Run("Use default sequence number (=0) as Metadata not found on DB", func(t *testing.T) {
beaconDB := testDB.SetupDB(t)
metaData, err := metaDataFromDB(context.Background(), beaconDB)
require.NoError(t, err)
assert.Equal(t, uint64(0), metaData.SequenceNumber())
})
}

View File

@@ -0,0 +1,7 @@
### Fixed
- Fixed [#15544](https://github.com/OffchainLabs/prysm/issues/15544): Persist metadata sequence number if it is needed (e.g., use static peer ID option or Fulu enabled).
### Deprecated
- Deprecated `p2p-metadata` flag.

View File

@@ -100,7 +100,6 @@ var appFlags = []cli.Flag{
cmd.P2PMaxPeers, cmd.P2PMaxPeers,
cmd.P2PPrivKey, cmd.P2PPrivKey,
cmd.P2PStaticID, cmd.P2PStaticID,
cmd.P2PMetadata,
cmd.P2PAllowList, cmd.P2PAllowList,
cmd.P2PDenyList, cmd.P2PDenyList,
cmd.PubsubQueueSize, cmd.PubsubQueueSize,

View File

@@ -88,7 +88,6 @@ var appHelpFlagGroups = []flagGroup{
cmd.P2PHostDNS, cmd.P2PHostDNS,
cmd.P2PIP, cmd.P2PIP,
cmd.P2PMaxPeers, cmd.P2PMaxPeers,
cmd.P2PMetadata,
cmd.P2PPrivKey, cmd.P2PPrivKey,
cmd.P2PQUICPort, cmd.P2PQUICPort,
cmd.P2PStaticID, cmd.P2PStaticID,

View File

@@ -158,12 +158,6 @@ var (
Usage: "Enables the peer id of the node to be fixed by saving the generated network key to the default key path.", Usage: "Enables the peer id of the node to be fixed by saving the generated network key to the default key path.",
Value: false, Value: false,
} }
// P2PMetadata defines a flag to specify the location of the peer metadata file.
P2PMetadata = &cli.StringFlag{
Name: "p2p-metadata",
Usage: "The file containing the metadata to communicate with other peers.",
Value: "",
}
// P2PMaxPeers defines a flag to specify the max number of peers in libp2p. // P2PMaxPeers defines a flag to specify the max number of peers in libp2p.
P2PMaxPeers = &cli.IntFlag{ P2PMaxPeers = &cli.IntFlag{
Name: "p2p-max-peers", Name: "p2p-max-peers",

View File

@@ -108,6 +108,11 @@ var (
Usage: deprecatedUsage, Usage: deprecatedUsage,
Hidden: true, Hidden: true,
} }
deprecatedP2PMetadata = &cli.StringFlag{
Name: "p2p-metadata",
Usage: deprecatedUsage,
Hidden: true,
}
) )
// Deprecated flags for both the beacon node and validator client. // Deprecated flags for both the beacon node and validator client.
@@ -130,6 +135,7 @@ var deprecatedFlags = []cli.Flag{
deprecatedEnableQuic, deprecatedEnableQuic,
deprecatedAttestTimely, deprecatedAttestTimely,
deprecatedDisableExperimentalState, deprecatedDisableExperimentalState,
deprecatedP2PMetadata,
} }
var upcomingDeprecation = []cli.Flag{ var upcomingDeprecation = []cli.Flag{