mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 21:38:05 -05:00
Compare commits
9 Commits
useNewCach
...
err_on_inv
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4798c1866a | ||
|
|
10173af7e7 | ||
|
|
72eb0fbbfe | ||
|
|
0b7c005d7d | ||
|
|
65e8c37b48 | ||
|
|
689015ff01 | ||
|
|
08c14f02f6 | ||
|
|
4bb0b44f16 | ||
|
|
29237cb0bc |
@@ -12,10 +12,13 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
|
||||
- Added an error field to log `Finished building block`.
|
||||
- Implemented a new `EmptyExecutionPayloadHeader` function.
|
||||
- `Finished building block`: Display error only if not nil.
|
||||
- Return an error instead of panic when invalidating the finalized checkpoint.
|
||||
|
||||
### Changed
|
||||
|
||||
- Process light client finality updates only for new finalized epochs instead of doing it for every block.
|
||||
- Refactor subnets subscriptions.
|
||||
- Refactor RPC handlers subscriptions.
|
||||
|
||||
### Deprecated
|
||||
|
||||
@@ -75,6 +78,7 @@ Notable features:
|
||||
- Added a Prometheus error counter metric for SSE requests.
|
||||
- Save light client updates and bootstraps in DB.
|
||||
- Added more comprehensive tests for `BlockToLightClientHeader`. [PR](https://github.com/prysmaticlabs/prysm/pull/14699)
|
||||
- Added light client feature flag check to RPC handlers. [PR](https://github.com/prysmaticlabs/prysm/pull/14736)
|
||||
|
||||
### Changed
|
||||
|
||||
|
||||
@@ -140,6 +140,7 @@ go_test(
|
||||
"//beacon-chain/core/blocks:go_default_library",
|
||||
"//beacon-chain/core/feed/state:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/core/light-client:go_default_library",
|
||||
"//beacon-chain/core/signing:go_default_library",
|
||||
"//beacon-chain/core/transition:go_default_library",
|
||||
"//beacon-chain/das:go_default_library",
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/blocks"
|
||||
lightClient "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/light-client"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
|
||||
@@ -2505,173 +2506,500 @@ func fakeResult(missing []uint64) map[uint64]struct{} {
|
||||
}
|
||||
|
||||
func TestSaveLightClientUpdate(t *testing.T) {
|
||||
featCfg := &features.Flags{}
|
||||
featCfg.EnableLightClient = true
|
||||
reset := features.InitWithReset(featCfg)
|
||||
|
||||
s, tr := minimalTestService(t)
|
||||
ctx := tr.ctx
|
||||
|
||||
t.Run("Altair", func(t *testing.T) {
|
||||
featCfg := &features.Flags{}
|
||||
featCfg.EnableLightClient = true
|
||||
reset := features.InitWithReset(featCfg)
|
||||
t.Run("No old update", func(t *testing.T) {
|
||||
l := util.NewTestLightClient(t).SetupTestAltair()
|
||||
|
||||
l := util.NewTestLightClient(t).SetupTestAltair()
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().AltairForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().AltairForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
|
||||
require.NoError(t, err)
|
||||
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
|
||||
require.NoError(t, err)
|
||||
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
|
||||
require.NoError(t, err)
|
||||
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
|
||||
require.NoError(t, err)
|
||||
cfg := &postBlockProcessConfig{
|
||||
ctx: ctx,
|
||||
roblock: roblock,
|
||||
postState: l.State,
|
||||
isValidPayload: true,
|
||||
}
|
||||
|
||||
cfg := &postBlockProcessConfig{
|
||||
ctx: ctx,
|
||||
roblock: roblock,
|
||||
postState: l.State,
|
||||
isValidPayload: true,
|
||||
}
|
||||
s.saveLightClientUpdate(cfg)
|
||||
|
||||
s.saveLightClientUpdate(cfg)
|
||||
// Check that the light client update is saved
|
||||
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
|
||||
|
||||
// Check that the light client update is saved
|
||||
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
|
||||
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, u)
|
||||
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
|
||||
require.Equal(t, u.Version(), version.Altair)
|
||||
})
|
||||
|
||||
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, u)
|
||||
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
|
||||
require.Equal(t, u.Version(), version.Altair)
|
||||
t.Run("New update is better", func(t *testing.T) {
|
||||
l := util.NewTestLightClient(t).SetupTestAltair()
|
||||
|
||||
reset()
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().AltairForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
|
||||
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
|
||||
require.NoError(t, err)
|
||||
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
cfg := &postBlockProcessConfig{
|
||||
ctx: ctx,
|
||||
roblock: roblock,
|
||||
postState: l.State,
|
||||
isValidPayload: true,
|
||||
}
|
||||
|
||||
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
|
||||
|
||||
// create and save old update
|
||||
oldUpdate, err := lightClient.CreateDefaultLightClientUpdate(s.CurrentSlot(), l.AttestedState)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveLightClientUpdate(ctx, period, oldUpdate)
|
||||
require.NoError(t, err)
|
||||
|
||||
s.saveLightClientUpdate(cfg)
|
||||
|
||||
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, u)
|
||||
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
|
||||
require.Equal(t, u.Version(), version.Altair)
|
||||
})
|
||||
|
||||
t.Run("Old update is better", func(t *testing.T) {
|
||||
l := util.NewTestLightClient(t).SetupTestAltair()
|
||||
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().AltairForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
|
||||
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
|
||||
require.NoError(t, err)
|
||||
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
cfg := &postBlockProcessConfig{
|
||||
ctx: ctx,
|
||||
roblock: roblock,
|
||||
postState: l.State,
|
||||
isValidPayload: true,
|
||||
}
|
||||
|
||||
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
|
||||
|
||||
// create and save old update
|
||||
oldUpdate, err := lightClient.CreateDefaultLightClientUpdate(s.CurrentSlot(), l.AttestedState)
|
||||
require.NoError(t, err)
|
||||
|
||||
scb := make([]byte, 64)
|
||||
for i := 0; i < 5; i++ {
|
||||
scb[i] = 0x01
|
||||
}
|
||||
oldUpdate.SetSyncAggregate(ðpb.SyncAggregate{
|
||||
SyncCommitteeBits: scb,
|
||||
SyncCommitteeSignature: make([]byte, 96),
|
||||
})
|
||||
|
||||
err = s.cfg.BeaconDB.SaveLightClientUpdate(ctx, period, oldUpdate)
|
||||
require.NoError(t, err)
|
||||
|
||||
s.saveLightClientUpdate(cfg)
|
||||
|
||||
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, u)
|
||||
require.DeepEqual(t, oldUpdate, u)
|
||||
require.Equal(t, u.Version(), version.Altair)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Capella", func(t *testing.T) {
|
||||
featCfg := &features.Flags{}
|
||||
featCfg.EnableLightClient = true
|
||||
reset := features.InitWithReset(featCfg)
|
||||
t.Run("No old update", func(t *testing.T) {
|
||||
l := util.NewTestLightClient(t).SetupTestCapella(false)
|
||||
|
||||
l := util.NewTestLightClient(t).SetupTestCapella(false)
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().CapellaForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().CapellaForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
|
||||
require.NoError(t, err)
|
||||
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
|
||||
require.NoError(t, err)
|
||||
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
|
||||
require.NoError(t, err)
|
||||
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
|
||||
require.NoError(t, err)
|
||||
cfg := &postBlockProcessConfig{
|
||||
ctx: ctx,
|
||||
roblock: roblock,
|
||||
postState: l.State,
|
||||
isValidPayload: true,
|
||||
}
|
||||
|
||||
cfg := &postBlockProcessConfig{
|
||||
ctx: ctx,
|
||||
roblock: roblock,
|
||||
postState: l.State,
|
||||
isValidPayload: true,
|
||||
}
|
||||
s.saveLightClientUpdate(cfg)
|
||||
|
||||
s.saveLightClientUpdate(cfg)
|
||||
// Check that the light client update is saved
|
||||
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
|
||||
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, u)
|
||||
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
|
||||
require.Equal(t, u.Version(), version.Capella)
|
||||
})
|
||||
|
||||
// Check that the light client update is saved
|
||||
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
|
||||
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, u)
|
||||
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
|
||||
require.Equal(t, u.Version(), version.Capella)
|
||||
t.Run("New update is better", func(t *testing.T) {
|
||||
l := util.NewTestLightClient(t).SetupTestCapella(false)
|
||||
|
||||
reset()
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().CapellaForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
|
||||
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
|
||||
require.NoError(t, err)
|
||||
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
cfg := &postBlockProcessConfig{
|
||||
ctx: ctx,
|
||||
roblock: roblock,
|
||||
postState: l.State,
|
||||
isValidPayload: true,
|
||||
}
|
||||
|
||||
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
|
||||
|
||||
// create and save old update
|
||||
oldUpdate, err := lightClient.CreateDefaultLightClientUpdate(s.CurrentSlot(), l.AttestedState)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveLightClientUpdate(ctx, period, oldUpdate)
|
||||
require.NoError(t, err)
|
||||
|
||||
s.saveLightClientUpdate(cfg)
|
||||
|
||||
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, u)
|
||||
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
|
||||
require.Equal(t, u.Version(), version.Capella)
|
||||
})
|
||||
|
||||
t.Run("Old update is better", func(t *testing.T) {
|
||||
l := util.NewTestLightClient(t).SetupTestCapella(false)
|
||||
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().CapellaForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
|
||||
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
|
||||
require.NoError(t, err)
|
||||
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
cfg := &postBlockProcessConfig{
|
||||
ctx: ctx,
|
||||
roblock: roblock,
|
||||
postState: l.State,
|
||||
isValidPayload: true,
|
||||
}
|
||||
|
||||
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
|
||||
|
||||
// create and save old update
|
||||
oldUpdate, err := lightClient.CreateDefaultLightClientUpdate(s.CurrentSlot(), l.AttestedState)
|
||||
require.NoError(t, err)
|
||||
|
||||
scb := make([]byte, 64)
|
||||
for i := 0; i < 5; i++ {
|
||||
scb[i] = 0x01
|
||||
}
|
||||
oldUpdate.SetSyncAggregate(ðpb.SyncAggregate{
|
||||
SyncCommitteeBits: scb,
|
||||
SyncCommitteeSignature: make([]byte, 96),
|
||||
})
|
||||
|
||||
err = s.cfg.BeaconDB.SaveLightClientUpdate(ctx, period, oldUpdate)
|
||||
require.NoError(t, err)
|
||||
|
||||
s.saveLightClientUpdate(cfg)
|
||||
|
||||
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, u)
|
||||
require.DeepEqual(t, oldUpdate, u)
|
||||
require.Equal(t, u.Version(), version.Capella)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("Deneb", func(t *testing.T) {
|
||||
featCfg := &features.Flags{}
|
||||
featCfg.EnableLightClient = true
|
||||
reset := features.InitWithReset(featCfg)
|
||||
t.Run("No old update", func(t *testing.T) {
|
||||
l := util.NewTestLightClient(t).SetupTestDeneb(false)
|
||||
|
||||
l := util.NewTestLightClient(t).SetupTestDeneb(false)
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().DenebForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().DenebForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
|
||||
require.NoError(t, err)
|
||||
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
|
||||
require.NoError(t, err)
|
||||
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
|
||||
require.NoError(t, err)
|
||||
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
|
||||
require.NoError(t, err)
|
||||
cfg := &postBlockProcessConfig{
|
||||
ctx: ctx,
|
||||
roblock: roblock,
|
||||
postState: l.State,
|
||||
isValidPayload: true,
|
||||
}
|
||||
|
||||
cfg := &postBlockProcessConfig{
|
||||
ctx: ctx,
|
||||
roblock: roblock,
|
||||
postState: l.State,
|
||||
isValidPayload: true,
|
||||
}
|
||||
s.saveLightClientUpdate(cfg)
|
||||
|
||||
s.saveLightClientUpdate(cfg)
|
||||
// Check that the light client update is saved
|
||||
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
|
||||
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, u)
|
||||
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
|
||||
require.Equal(t, u.Version(), version.Deneb)
|
||||
})
|
||||
|
||||
// Check that the light client update is saved
|
||||
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
|
||||
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, u)
|
||||
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
|
||||
require.Equal(t, u.Version(), version.Deneb)
|
||||
t.Run("New update is better", func(t *testing.T) {
|
||||
l := util.NewTestLightClient(t).SetupTestDeneb(false)
|
||||
|
||||
reset()
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().DenebForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
|
||||
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
|
||||
require.NoError(t, err)
|
||||
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
cfg := &postBlockProcessConfig{
|
||||
ctx: ctx,
|
||||
roblock: roblock,
|
||||
postState: l.State,
|
||||
isValidPayload: true,
|
||||
}
|
||||
|
||||
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
|
||||
|
||||
// create and save old update
|
||||
oldUpdate, err := lightClient.CreateDefaultLightClientUpdate(s.CurrentSlot(), l.AttestedState)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveLightClientUpdate(ctx, period, oldUpdate)
|
||||
require.NoError(t, err)
|
||||
|
||||
s.saveLightClientUpdate(cfg)
|
||||
|
||||
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, u)
|
||||
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
|
||||
require.Equal(t, u.Version(), version.Deneb)
|
||||
})
|
||||
|
||||
t.Run("Old update is better", func(t *testing.T) {
|
||||
l := util.NewTestLightClient(t).SetupTestDeneb(false)
|
||||
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().DenebForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
|
||||
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
|
||||
require.NoError(t, err)
|
||||
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
|
||||
require.NoError(t, err)
|
||||
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
|
||||
require.NoError(t, err)
|
||||
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
|
||||
require.NoError(t, err)
|
||||
|
||||
cfg := &postBlockProcessConfig{
|
||||
ctx: ctx,
|
||||
roblock: roblock,
|
||||
postState: l.State,
|
||||
isValidPayload: true,
|
||||
}
|
||||
|
||||
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
|
||||
|
||||
// create and save old update
|
||||
oldUpdate, err := lightClient.CreateDefaultLightClientUpdate(s.CurrentSlot(), l.AttestedState)
|
||||
require.NoError(t, err)
|
||||
|
||||
scb := make([]byte, 64)
|
||||
for i := 0; i < 5; i++ {
|
||||
scb[i] = 0x01
|
||||
}
|
||||
oldUpdate.SetSyncAggregate(ðpb.SyncAggregate{
|
||||
SyncCommitteeBits: scb,
|
||||
SyncCommitteeSignature: make([]byte, 96),
|
||||
})
|
||||
|
||||
err = s.cfg.BeaconDB.SaveLightClientUpdate(ctx, period, oldUpdate)
|
||||
require.NoError(t, err)
|
||||
|
||||
s.saveLightClientUpdate(cfg)
|
||||
|
||||
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, u)
|
||||
require.DeepEqual(t, oldUpdate, u)
|
||||
require.Equal(t, u.Version(), version.Deneb)
|
||||
})
|
||||
})
|
||||
|
||||
reset()
|
||||
}
|
||||
|
||||
func TestSaveLightClientBootstrap(t *testing.T) {
|
||||
featCfg := &features.Flags{}
|
||||
featCfg.EnableLightClient = true
|
||||
reset := features.InitWithReset(featCfg)
|
||||
|
||||
s, tr := minimalTestService(t)
|
||||
ctx := tr.ctx
|
||||
|
||||
t.Run("Altair", func(t *testing.T) {
|
||||
featCfg := &features.Flags{}
|
||||
featCfg.EnableLightClient = true
|
||||
reset := features.InitWithReset(featCfg)
|
||||
|
||||
l := util.NewTestLightClient(t).SetupTestAltair()
|
||||
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().AltairForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
@@ -2704,15 +3032,9 @@ func TestSaveLightClientBootstrap(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, stateRoot, [32]byte(b.Header().Beacon().StateRoot))
|
||||
require.Equal(t, b.Version(), version.Altair)
|
||||
|
||||
reset()
|
||||
})
|
||||
|
||||
t.Run("Capella", func(t *testing.T) {
|
||||
featCfg := &features.Flags{}
|
||||
featCfg.EnableLightClient = true
|
||||
reset := features.InitWithReset(featCfg)
|
||||
|
||||
l := util.NewTestLightClient(t).SetupTestCapella(false)
|
||||
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().CapellaForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
@@ -2745,15 +3067,9 @@ func TestSaveLightClientBootstrap(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, stateRoot, [32]byte(b.Header().Beacon().StateRoot))
|
||||
require.Equal(t, b.Version(), version.Capella)
|
||||
|
||||
reset()
|
||||
})
|
||||
|
||||
t.Run("Deneb", func(t *testing.T) {
|
||||
featCfg := &features.Flags{}
|
||||
featCfg.EnableLightClient = true
|
||||
reset := features.InitWithReset(featCfg)
|
||||
|
||||
l := util.NewTestLightClient(t).SetupTestDeneb(false)
|
||||
|
||||
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().DenebForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
|
||||
@@ -2786,7 +3102,7 @@ func TestSaveLightClientBootstrap(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, stateRoot, [32]byte(b.Header().Beacon().StateRoot))
|
||||
require.Equal(t, b.Version(), version.Deneb)
|
||||
|
||||
reset()
|
||||
})
|
||||
|
||||
reset()
|
||||
}
|
||||
|
||||
@@ -282,56 +282,247 @@ func CreateDefaultLightClientUpdate(currentSlot primitives.Slot, attestedState s
|
||||
if currentEpoch < params.BeaconConfig().CapellaForkEpoch {
|
||||
m = &pb.LightClientUpdateAltair{
|
||||
AttestedHeader: &pb.LightClientHeaderAltair{
|
||||
Beacon: &pb.BeaconBlockHeader{},
|
||||
Beacon: &pb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
},
|
||||
},
|
||||
NextSyncCommittee: nextSyncCommittee,
|
||||
NextSyncCommitteeBranch: nextSyncCommitteeBranch,
|
||||
FinalityBranch: finalityBranch,
|
||||
FinalizedHeader: &pb.LightClientHeaderAltair{
|
||||
Beacon: &pb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
},
|
||||
},
|
||||
SyncAggregate: &pb.SyncAggregate{
|
||||
SyncCommitteeBits: make([]byte, 64),
|
||||
SyncCommitteeSignature: make([]byte, 96),
|
||||
},
|
||||
}
|
||||
} else if currentEpoch < params.BeaconConfig().DenebForkEpoch {
|
||||
m = &pb.LightClientUpdateCapella{
|
||||
AttestedHeader: &pb.LightClientHeaderCapella{
|
||||
Beacon: &pb.BeaconBlockHeader{},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderCapella{},
|
||||
Beacon: &pb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderCapella{
|
||||
ParentHash: make([]byte, fieldparams.RootLength),
|
||||
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
|
||||
StateRoot: make([]byte, fieldparams.RootLength),
|
||||
ReceiptsRoot: make([]byte, fieldparams.RootLength),
|
||||
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
|
||||
PrevRandao: make([]byte, fieldparams.RootLength),
|
||||
ExtraData: make([]byte, 0),
|
||||
BaseFeePerGas: make([]byte, fieldparams.RootLength),
|
||||
BlockHash: make([]byte, fieldparams.RootLength),
|
||||
TransactionsRoot: make([]byte, fieldparams.RootLength),
|
||||
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
|
||||
},
|
||||
ExecutionBranch: executionBranch,
|
||||
},
|
||||
NextSyncCommittee: nextSyncCommittee,
|
||||
NextSyncCommitteeBranch: nextSyncCommitteeBranch,
|
||||
FinalityBranch: finalityBranch,
|
||||
FinalizedHeader: &pb.LightClientHeaderCapella{
|
||||
Beacon: &pb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderCapella{
|
||||
ParentHash: make([]byte, fieldparams.RootLength),
|
||||
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
|
||||
StateRoot: make([]byte, fieldparams.RootLength),
|
||||
ReceiptsRoot: make([]byte, fieldparams.RootLength),
|
||||
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
|
||||
PrevRandao: make([]byte, fieldparams.RootLength),
|
||||
ExtraData: make([]byte, 0),
|
||||
BaseFeePerGas: make([]byte, fieldparams.RootLength),
|
||||
BlockHash: make([]byte, fieldparams.RootLength),
|
||||
TransactionsRoot: make([]byte, fieldparams.RootLength),
|
||||
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
|
||||
},
|
||||
ExecutionBranch: executionBranch,
|
||||
},
|
||||
SyncAggregate: &pb.SyncAggregate{
|
||||
SyncCommitteeBits: make([]byte, 64),
|
||||
SyncCommitteeSignature: make([]byte, 96),
|
||||
},
|
||||
}
|
||||
} else if currentEpoch < params.BeaconConfig().ElectraForkEpoch {
|
||||
m = &pb.LightClientUpdateDeneb{
|
||||
AttestedHeader: &pb.LightClientHeaderDeneb{
|
||||
Beacon: &pb.BeaconBlockHeader{},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderDeneb{},
|
||||
Beacon: &pb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
|
||||
ParentHash: make([]byte, fieldparams.RootLength),
|
||||
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
|
||||
StateRoot: make([]byte, fieldparams.RootLength),
|
||||
ReceiptsRoot: make([]byte, fieldparams.RootLength),
|
||||
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
|
||||
PrevRandao: make([]byte, fieldparams.RootLength),
|
||||
ExtraData: make([]byte, 0),
|
||||
BaseFeePerGas: make([]byte, fieldparams.RootLength),
|
||||
BlockHash: make([]byte, fieldparams.RootLength),
|
||||
TransactionsRoot: make([]byte, fieldparams.RootLength),
|
||||
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
|
||||
GasLimit: 0,
|
||||
GasUsed: 0,
|
||||
},
|
||||
ExecutionBranch: executionBranch,
|
||||
},
|
||||
NextSyncCommittee: nextSyncCommittee,
|
||||
NextSyncCommitteeBranch: nextSyncCommitteeBranch,
|
||||
FinalityBranch: finalityBranch,
|
||||
FinalizedHeader: &pb.LightClientHeaderDeneb{
|
||||
Beacon: &pb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
|
||||
ParentHash: make([]byte, fieldparams.RootLength),
|
||||
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
|
||||
StateRoot: make([]byte, fieldparams.RootLength),
|
||||
ReceiptsRoot: make([]byte, fieldparams.RootLength),
|
||||
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
|
||||
PrevRandao: make([]byte, fieldparams.RootLength),
|
||||
ExtraData: make([]byte, 0),
|
||||
BaseFeePerGas: make([]byte, fieldparams.RootLength),
|
||||
BlockHash: make([]byte, fieldparams.RootLength),
|
||||
TransactionsRoot: make([]byte, fieldparams.RootLength),
|
||||
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
|
||||
GasLimit: 0,
|
||||
GasUsed: 0,
|
||||
},
|
||||
ExecutionBranch: executionBranch,
|
||||
},
|
||||
SyncAggregate: &pb.SyncAggregate{
|
||||
SyncCommitteeBits: make([]byte, 64),
|
||||
SyncCommitteeSignature: make([]byte, 96),
|
||||
},
|
||||
}
|
||||
} else {
|
||||
if attestedState.Version() >= version.Electra {
|
||||
m = &pb.LightClientUpdateElectra{
|
||||
AttestedHeader: &pb.LightClientHeaderDeneb{
|
||||
Beacon: &pb.BeaconBlockHeader{},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderDeneb{},
|
||||
Beacon: &pb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
|
||||
ParentHash: make([]byte, fieldparams.RootLength),
|
||||
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
|
||||
StateRoot: make([]byte, fieldparams.RootLength),
|
||||
ReceiptsRoot: make([]byte, fieldparams.RootLength),
|
||||
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
|
||||
PrevRandao: make([]byte, fieldparams.RootLength),
|
||||
ExtraData: make([]byte, 0),
|
||||
BaseFeePerGas: make([]byte, fieldparams.RootLength),
|
||||
BlockHash: make([]byte, fieldparams.RootLength),
|
||||
TransactionsRoot: make([]byte, fieldparams.RootLength),
|
||||
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
|
||||
GasLimit: 0,
|
||||
GasUsed: 0,
|
||||
},
|
||||
ExecutionBranch: executionBranch,
|
||||
},
|
||||
NextSyncCommittee: nextSyncCommittee,
|
||||
NextSyncCommitteeBranch: nextSyncCommitteeBranch,
|
||||
FinalityBranch: finalityBranch,
|
||||
FinalizedHeader: &pb.LightClientHeaderDeneb{
|
||||
Beacon: &pb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
|
||||
ParentHash: make([]byte, fieldparams.RootLength),
|
||||
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
|
||||
StateRoot: make([]byte, fieldparams.RootLength),
|
||||
ReceiptsRoot: make([]byte, fieldparams.RootLength),
|
||||
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
|
||||
PrevRandao: make([]byte, fieldparams.RootLength),
|
||||
ExtraData: make([]byte, 0),
|
||||
BaseFeePerGas: make([]byte, fieldparams.RootLength),
|
||||
BlockHash: make([]byte, fieldparams.RootLength),
|
||||
TransactionsRoot: make([]byte, fieldparams.RootLength),
|
||||
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
|
||||
GasLimit: 0,
|
||||
GasUsed: 0,
|
||||
},
|
||||
ExecutionBranch: executionBranch,
|
||||
},
|
||||
SyncAggregate: &pb.SyncAggregate{
|
||||
SyncCommitteeBits: make([]byte, 64),
|
||||
SyncCommitteeSignature: make([]byte, 96),
|
||||
},
|
||||
}
|
||||
} else {
|
||||
m = &pb.LightClientUpdateDeneb{
|
||||
AttestedHeader: &pb.LightClientHeaderDeneb{
|
||||
Beacon: &pb.BeaconBlockHeader{},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderDeneb{},
|
||||
Beacon: &pb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
|
||||
ParentHash: make([]byte, fieldparams.RootLength),
|
||||
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
|
||||
StateRoot: make([]byte, fieldparams.RootLength),
|
||||
ReceiptsRoot: make([]byte, fieldparams.RootLength),
|
||||
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
|
||||
PrevRandao: make([]byte, fieldparams.RootLength),
|
||||
ExtraData: make([]byte, 0),
|
||||
BaseFeePerGas: make([]byte, fieldparams.RootLength),
|
||||
BlockHash: make([]byte, fieldparams.RootLength),
|
||||
TransactionsRoot: make([]byte, fieldparams.RootLength),
|
||||
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
|
||||
GasLimit: 0,
|
||||
GasUsed: 0,
|
||||
},
|
||||
ExecutionBranch: executionBranch,
|
||||
},
|
||||
NextSyncCommittee: nextSyncCommittee,
|
||||
NextSyncCommitteeBranch: nextSyncCommitteeBranch,
|
||||
FinalityBranch: finalityBranch,
|
||||
FinalizedHeader: &pb.LightClientHeaderDeneb{
|
||||
Beacon: &pb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
|
||||
ParentHash: make([]byte, fieldparams.RootLength),
|
||||
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
|
||||
StateRoot: make([]byte, fieldparams.RootLength),
|
||||
ReceiptsRoot: make([]byte, fieldparams.RootLength),
|
||||
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
|
||||
PrevRandao: make([]byte, fieldparams.RootLength),
|
||||
ExtraData: make([]byte, 0),
|
||||
BaseFeePerGas: make([]byte, fieldparams.RootLength),
|
||||
BlockHash: make([]byte, fieldparams.RootLength),
|
||||
TransactionsRoot: make([]byte, fieldparams.RootLength),
|
||||
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
|
||||
GasLimit: 0,
|
||||
GasUsed: 0,
|
||||
},
|
||||
ExecutionBranch: executionBranch,
|
||||
},
|
||||
SyncAggregate: &pb.SyncAggregate{
|
||||
SyncCommitteeBits: make([]byte, 64),
|
||||
SyncCommitteeSignature: make([]byte, 96),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,7 +93,7 @@ func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROB
|
||||
return nil, nil
|
||||
}
|
||||
scs := make([]blocks.ROBlob, 0, kc.count())
|
||||
for i := uint64(0); i < fieldparams.MaxBlobsPerBlock; i++ {
|
||||
for i := range uint64(fieldparams.MaxBlobsPerBlock) {
|
||||
// We already have this blob, we don't need to write it or validate it.
|
||||
if e.diskSummary.HasIndex(i) {
|
||||
continue
|
||||
|
||||
@@ -539,3 +539,231 @@ func createDefaultLightClientUpdate(currentSlot primitives.Slot, attestedState s
|
||||
|
||||
return light_client.NewWrappedUpdate(m)
|
||||
}
|
||||
|
||||
func TestStore_LightClientBootstrap_CanSaveRetrieve(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.AltairForkEpoch = 0
|
||||
cfg.CapellaForkEpoch = 1
|
||||
cfg.DenebForkEpoch = 2
|
||||
cfg.ElectraForkEpoch = 3
|
||||
cfg.EpochsPerSyncCommitteePeriod = 1
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
db := setupDB(t)
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("Nil", func(t *testing.T) {
|
||||
retrievedBootstrap, err := db.LightClientBootstrap(ctx, []byte("NilBlockRoot"))
|
||||
require.NoError(t, err)
|
||||
require.IsNil(t, retrievedBootstrap)
|
||||
})
|
||||
|
||||
t.Run("Altair", func(t *testing.T) {
|
||||
bootstrap, err := createDefaultLightClientBootstrap(primitives.Slot(uint64(params.BeaconConfig().AltairForkEpoch) * uint64(params.BeaconConfig().SlotsPerEpoch)))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = bootstrap.SetCurrentSyncCommittee(createRandomSyncCommittee())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.SaveLightClientBootstrap(ctx, []byte("blockRootAltair"), bootstrap)
|
||||
require.NoError(t, err)
|
||||
|
||||
retrievedBootstrap, err := db.LightClientBootstrap(ctx, []byte("blockRootAltair"))
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, bootstrap, retrievedBootstrap, "retrieved bootstrap does not match saved bootstrap")
|
||||
})
|
||||
|
||||
t.Run("Capella", func(t *testing.T) {
|
||||
bootstrap, err := createDefaultLightClientBootstrap(primitives.Slot(uint64(params.BeaconConfig().CapellaForkEpoch) * uint64(params.BeaconConfig().SlotsPerEpoch)))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = bootstrap.SetCurrentSyncCommittee(createRandomSyncCommittee())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.SaveLightClientBootstrap(ctx, []byte("blockRootCapella"), bootstrap)
|
||||
require.NoError(t, err)
|
||||
|
||||
retrievedBootstrap, err := db.LightClientBootstrap(ctx, []byte("blockRootCapella"))
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, bootstrap, retrievedBootstrap, "retrieved bootstrap does not match saved bootstrap")
|
||||
})
|
||||
|
||||
t.Run("Deneb", func(t *testing.T) {
|
||||
bootstrap, err := createDefaultLightClientBootstrap(primitives.Slot(uint64(params.BeaconConfig().DenebForkEpoch) * uint64(params.BeaconConfig().SlotsPerEpoch)))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = bootstrap.SetCurrentSyncCommittee(createRandomSyncCommittee())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.SaveLightClientBootstrap(ctx, []byte("blockRootDeneb"), bootstrap)
|
||||
require.NoError(t, err)
|
||||
|
||||
retrievedBootstrap, err := db.LightClientBootstrap(ctx, []byte("blockRootDeneb"))
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, bootstrap, retrievedBootstrap, "retrieved bootstrap does not match saved bootstrap")
|
||||
})
|
||||
|
||||
t.Run("Electra", func(t *testing.T) {
|
||||
bootstrap, err := createDefaultLightClientBootstrap(primitives.Slot(uint64(params.BeaconConfig().ElectraForkEpoch) * uint64(params.BeaconConfig().SlotsPerEpoch)))
|
||||
require.NoError(t, err)
|
||||
|
||||
err = bootstrap.SetCurrentSyncCommittee(createRandomSyncCommittee())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.SaveLightClientBootstrap(ctx, []byte("blockRootElectra"), bootstrap)
|
||||
require.NoError(t, err)
|
||||
|
||||
retrievedBootstrap, err := db.LightClientBootstrap(ctx, []byte("blockRootElectra"))
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, bootstrap, retrievedBootstrap, "retrieved bootstrap does not match saved bootstrap")
|
||||
})
|
||||
}
|
||||
|
||||
func createDefaultLightClientBootstrap(currentSlot primitives.Slot) (interfaces.LightClientBootstrap, error) {
|
||||
currentEpoch := slots.ToEpoch(currentSlot)
|
||||
syncCommitteeSize := params.BeaconConfig().SyncCommitteeSize
|
||||
pubKeys := make([][]byte, syncCommitteeSize)
|
||||
for i := uint64(0); i < syncCommitteeSize; i++ {
|
||||
pubKeys[i] = make([]byte, fieldparams.BLSPubkeyLength)
|
||||
}
|
||||
currentSyncCommittee := &pb.SyncCommittee{
|
||||
Pubkeys: pubKeys,
|
||||
AggregatePubkey: make([]byte, fieldparams.BLSPubkeyLength),
|
||||
}
|
||||
|
||||
var currentSyncCommitteeBranch [][]byte
|
||||
if currentEpoch >= params.BeaconConfig().ElectraForkEpoch {
|
||||
currentSyncCommitteeBranch = make([][]byte, fieldparams.SyncCommitteeBranchDepthElectra)
|
||||
} else {
|
||||
currentSyncCommitteeBranch = make([][]byte, fieldparams.SyncCommitteeBranchDepth)
|
||||
}
|
||||
for i := 0; i < len(currentSyncCommitteeBranch); i++ {
|
||||
currentSyncCommitteeBranch[i] = make([]byte, fieldparams.RootLength)
|
||||
}
|
||||
|
||||
executionBranch := make([][]byte, fieldparams.ExecutionBranchDepth)
|
||||
for i := 0; i < fieldparams.ExecutionBranchDepth; i++ {
|
||||
executionBranch[i] = make([]byte, 32)
|
||||
}
|
||||
|
||||
// TODO: can this be based on the current epoch?
|
||||
var m proto.Message
|
||||
if currentEpoch < params.BeaconConfig().CapellaForkEpoch {
|
||||
m = &pb.LightClientBootstrapAltair{
|
||||
Header: &pb.LightClientHeaderAltair{
|
||||
Beacon: &pb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
},
|
||||
},
|
||||
CurrentSyncCommittee: currentSyncCommittee,
|
||||
CurrentSyncCommitteeBranch: currentSyncCommitteeBranch,
|
||||
}
|
||||
} else if currentEpoch < params.BeaconConfig().DenebForkEpoch {
|
||||
m = &pb.LightClientBootstrapCapella{
|
||||
Header: &pb.LightClientHeaderCapella{
|
||||
Beacon: &pb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderCapella{
|
||||
ParentHash: make([]byte, fieldparams.RootLength),
|
||||
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
|
||||
StateRoot: make([]byte, fieldparams.RootLength),
|
||||
ReceiptsRoot: make([]byte, fieldparams.RootLength),
|
||||
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
|
||||
PrevRandao: make([]byte, fieldparams.RootLength),
|
||||
ExtraData: make([]byte, 0),
|
||||
BaseFeePerGas: make([]byte, fieldparams.RootLength),
|
||||
BlockHash: make([]byte, fieldparams.RootLength),
|
||||
TransactionsRoot: make([]byte, fieldparams.RootLength),
|
||||
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
|
||||
},
|
||||
ExecutionBranch: executionBranch,
|
||||
},
|
||||
CurrentSyncCommittee: currentSyncCommittee,
|
||||
CurrentSyncCommitteeBranch: currentSyncCommitteeBranch,
|
||||
}
|
||||
} else if currentEpoch < params.BeaconConfig().ElectraForkEpoch {
|
||||
m = &pb.LightClientBootstrapDeneb{
|
||||
Header: &pb.LightClientHeaderDeneb{
|
||||
Beacon: &pb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
|
||||
ParentHash: make([]byte, fieldparams.RootLength),
|
||||
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
|
||||
StateRoot: make([]byte, fieldparams.RootLength),
|
||||
ReceiptsRoot: make([]byte, fieldparams.RootLength),
|
||||
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
|
||||
PrevRandao: make([]byte, fieldparams.RootLength),
|
||||
ExtraData: make([]byte, 0),
|
||||
BaseFeePerGas: make([]byte, fieldparams.RootLength),
|
||||
BlockHash: make([]byte, fieldparams.RootLength),
|
||||
TransactionsRoot: make([]byte, fieldparams.RootLength),
|
||||
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
|
||||
GasLimit: 0,
|
||||
GasUsed: 0,
|
||||
},
|
||||
ExecutionBranch: executionBranch,
|
||||
},
|
||||
CurrentSyncCommittee: currentSyncCommittee,
|
||||
CurrentSyncCommitteeBranch: currentSyncCommitteeBranch,
|
||||
}
|
||||
} else {
|
||||
m = &pb.LightClientBootstrapElectra{
|
||||
Header: &pb.LightClientHeaderDeneb{
|
||||
Beacon: &pb.BeaconBlockHeader{
|
||||
ParentRoot: make([]byte, 32),
|
||||
StateRoot: make([]byte, 32),
|
||||
BodyRoot: make([]byte, 32),
|
||||
},
|
||||
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
|
||||
ParentHash: make([]byte, fieldparams.RootLength),
|
||||
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
|
||||
StateRoot: make([]byte, fieldparams.RootLength),
|
||||
ReceiptsRoot: make([]byte, fieldparams.RootLength),
|
||||
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
|
||||
PrevRandao: make([]byte, fieldparams.RootLength),
|
||||
ExtraData: make([]byte, 0),
|
||||
BaseFeePerGas: make([]byte, fieldparams.RootLength),
|
||||
BlockHash: make([]byte, fieldparams.RootLength),
|
||||
TransactionsRoot: make([]byte, fieldparams.RootLength),
|
||||
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
|
||||
GasLimit: 0,
|
||||
GasUsed: 0,
|
||||
},
|
||||
ExecutionBranch: executionBranch,
|
||||
},
|
||||
CurrentSyncCommittee: currentSyncCommittee,
|
||||
CurrentSyncCommitteeBranch: currentSyncCommitteeBranch,
|
||||
}
|
||||
}
|
||||
|
||||
return light_client.NewWrappedBootstrap(m)
|
||||
}
|
||||
|
||||
func createRandomSyncCommittee() *pb.SyncCommittee {
|
||||
// random number between 2 and 128
|
||||
base := rand.Int()%127 + 2
|
||||
|
||||
syncCom := make([][]byte, params.BeaconConfig().SyncCommitteeSize)
|
||||
for i := 0; uint64(i) < params.BeaconConfig().SyncCommitteeSize; i++ {
|
||||
if i%base == 0 {
|
||||
syncCom[i] = make([]byte, fieldparams.BLSPubkeyLength)
|
||||
syncCom[i][0] = 1
|
||||
continue
|
||||
}
|
||||
syncCom[i] = make([]byte, fieldparams.BLSPubkeyLength)
|
||||
}
|
||||
|
||||
return &pb.SyncCommittee{
|
||||
Pubkeys: syncCom,
|
||||
AggregatePubkey: make([]byte, fieldparams.BLSPubkeyLength),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,6 +23,9 @@ func (s *Store) setOptimisticToInvalid(ctx context.Context, root, parentRoot, la
|
||||
if node == nil {
|
||||
return invalidRoots, errors.Wrap(ErrNilNode, "could not set node to invalid")
|
||||
}
|
||||
if node.parent == nil {
|
||||
return invalidRoots, errors.New("attempting to set the root node to invalid")
|
||||
}
|
||||
if node.parent.root != parentRoot {
|
||||
return invalidRoots, errInvalidParentRoot
|
||||
}
|
||||
|
||||
@@ -385,6 +385,13 @@ func TestSetOptimisticToInvalid_ForkAtMerge_bis(t *testing.T) {
|
||||
require.DeepEqual(t, roots, [][32]byte{{'b'}, {'c'}, {'d'}, {'e'}})
|
||||
}
|
||||
|
||||
func TestSetOptimisticToInvalid_InvalidRoot(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
f := setup(1, 1)
|
||||
_, err := f.SetOptimisticToInvalid(ctx, [32]byte{}, [32]byte{}, [32]byte{})
|
||||
require.ErrorContains(t, "attempting to set the root node to invalid", err)
|
||||
}
|
||||
|
||||
func TestSetOptimisticToValid(t *testing.T) {
|
||||
f := setup(1, 1)
|
||||
op, err := f.IsOptimistic([32]byte{})
|
||||
|
||||
@@ -18,6 +18,7 @@ go_library(
|
||||
"//beacon-chain/rpc/eth/shared:go_default_library",
|
||||
"//beacon-chain/rpc/lookup:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
"//config/features:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/interfaces:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
@@ -45,6 +46,7 @@ go_test(
|
||||
"//beacon-chain/db/testing:go_default_library",
|
||||
"//beacon-chain/rpc/testutil:go_default_library",
|
||||
"//beacon-chain/state:go_default_library",
|
||||
"//config/features:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/blocks:go_default_library",
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/api/server/structs"
|
||||
lightclient "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/light-client"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/eth/shared"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
|
||||
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
|
||||
@@ -22,6 +23,11 @@ import (
|
||||
|
||||
// GetLightClientBootstrap - implements https://github.com/ethereum/beacon-APIs/blob/263f4ed6c263c967f13279c7a9f5629b51c5fc55/apis/beacon/light_client/bootstrap.yaml
|
||||
func (s *Server) GetLightClientBootstrap(w http.ResponseWriter, req *http.Request) {
|
||||
if !features.Get().EnableLightClient {
|
||||
httputil.HandleError(w, "Light client feature flag is not enabled", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
// Prepare
|
||||
ctx, span := trace.StartSpan(req.Context(), "beacon.GetLightClientBootstrap")
|
||||
defer span.End()
|
||||
@@ -76,26 +82,21 @@ func (s *Server) GetLightClientBootstrap(w http.ResponseWriter, req *http.Reques
|
||||
|
||||
// GetLightClientUpdatesByRange - implements https://github.com/ethereum/beacon-APIs/blob/263f4ed6c263c967f13279c7a9f5629b51c5fc55/apis/beacon/light_client/updates.yaml
|
||||
func (s *Server) GetLightClientUpdatesByRange(w http.ResponseWriter, req *http.Request) {
|
||||
// Prepare
|
||||
if !features.Get().EnableLightClient {
|
||||
httputil.HandleError(w, "Light client feature flag is not enabled", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, span := trace.StartSpan(req.Context(), "beacon.GetLightClientUpdatesByRange")
|
||||
defer span.End()
|
||||
|
||||
// Determine slots per period
|
||||
config := params.BeaconConfig()
|
||||
slotsPerPeriod := uint64(config.EpochsPerSyncCommitteePeriod) * uint64(config.SlotsPerEpoch)
|
||||
|
||||
// Adjust count based on configuration
|
||||
_, count, gotCount := shared.UintFromQuery(w, req, "count", true)
|
||||
if !gotCount {
|
||||
return
|
||||
} else if count == 0 {
|
||||
httputil.HandleError(w, fmt.Sprintf("got invalid 'count' query variable '%d': count must be greater than 0", count), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Determine the start and end periods
|
||||
_, startPeriod, gotStartPeriod := shared.UintFromQuery(w, req, "start_period", true)
|
||||
if !gotStartPeriod {
|
||||
httputil.HandleError(w, fmt.Sprintf("Got invalid 'count' query variable '%d': count must be greater than 0", count), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -103,33 +104,13 @@ func (s *Server) GetLightClientUpdatesByRange(w http.ResponseWriter, req *http.R
|
||||
count = config.MaxRequestLightClientUpdates
|
||||
}
|
||||
|
||||
// max possible slot is current head
|
||||
headState, err := s.HeadFetcher.HeadState(ctx)
|
||||
if err != nil {
|
||||
httputil.HandleError(w, "could not get head state: "+err.Error(), http.StatusInternalServerError)
|
||||
_, startPeriod, gotStartPeriod := shared.UintFromQuery(w, req, "start_period", true)
|
||||
if !gotStartPeriod {
|
||||
return
|
||||
}
|
||||
|
||||
maxSlot := uint64(headState.Slot())
|
||||
|
||||
// min possible slot is Altair fork period
|
||||
minSlot := uint64(config.AltairForkEpoch) * uint64(config.SlotsPerEpoch)
|
||||
|
||||
// Adjust startPeriod, the end of start period must be later than Altair fork epoch, otherwise, can not get the sync committee votes
|
||||
startPeriodEndSlot := (startPeriod+1)*slotsPerPeriod - 1
|
||||
if startPeriodEndSlot < minSlot {
|
||||
startPeriod = minSlot / slotsPerPeriod
|
||||
}
|
||||
|
||||
// Get the initial endPeriod, then we will adjust
|
||||
endPeriod := startPeriod + count - 1
|
||||
|
||||
// Adjust endPeriod, the end of end period must be earlier than current head slot
|
||||
endPeriodEndSlot := (endPeriod+1)*slotsPerPeriod - 1
|
||||
if endPeriodEndSlot > maxSlot {
|
||||
endPeriod = maxSlot / slotsPerPeriod
|
||||
}
|
||||
|
||||
// get updates
|
||||
updatesMap, err := s.BeaconDB.LightClientUpdates(ctx, startPeriod, endPeriod)
|
||||
if err != nil {
|
||||
@@ -162,6 +143,11 @@ func (s *Server) GetLightClientUpdatesByRange(w http.ResponseWriter, req *http.R
|
||||
|
||||
// GetLightClientFinalityUpdate - implements https://github.com/ethereum/beacon-APIs/blob/263f4ed6c263c967f13279c7a9f5629b51c5fc55/apis/beacon/light_client/finality_update.yaml
|
||||
func (s *Server) GetLightClientFinalityUpdate(w http.ResponseWriter, req *http.Request) {
|
||||
if !features.Get().EnableLightClient {
|
||||
httputil.HandleError(w, "Light client feature flag is not enabled", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, span := trace.StartSpan(req.Context(), "beacon.GetLightClientFinalityUpdate")
|
||||
defer span.End()
|
||||
|
||||
@@ -220,6 +206,11 @@ func (s *Server) GetLightClientFinalityUpdate(w http.ResponseWriter, req *http.R
|
||||
|
||||
// GetLightClientOptimisticUpdate - implements https://github.com/ethereum/beacon-APIs/blob/263f4ed6c263c967f13279c7a9f5629b51c5fc55/apis/beacon/light_client/optimistic_update.yaml
|
||||
func (s *Server) GetLightClientOptimisticUpdate(w http.ResponseWriter, req *http.Request) {
|
||||
if !features.Get().EnableLightClient {
|
||||
httputil.HandleError(w, "Light client feature flag is not enabled", http.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
ctx, span := trace.StartSpan(req.Context(), "beacon.GetLightClientOptimisticUpdate")
|
||||
defer span.End()
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
dbtesting "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/testutil"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
|
||||
@@ -33,6 +34,11 @@ import (
|
||||
)
|
||||
|
||||
func TestLightClientHandler_GetLightClientBootstrap(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.AltairForkEpoch = 0
|
||||
@@ -252,6 +258,11 @@ func TestLightClientHandler_GetLightClientBootstrap(t *testing.T) {
|
||||
// GetLightClientByRange tests
|
||||
|
||||
func TestLightClientHandler_GetLightClientUpdatesByRangeAltair(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
|
||||
@@ -301,6 +312,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeAltair(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientUpdatesByRangeCapella(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
params.SetupTestConfigCleanup(t)
|
||||
@@ -350,6 +366,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeCapella(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientUpdatesByRangeDeneb(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
params.SetupTestConfigCleanup(t)
|
||||
@@ -399,6 +420,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeDeneb(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleAltair(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
params.SetupTestConfigCleanup(t)
|
||||
@@ -458,6 +484,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleAltair(t *testin
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleCapella(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
params.SetupTestConfigCleanup(t)
|
||||
@@ -518,6 +549,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleCapella(t *testi
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleDeneb(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
params.SetupTestConfigCleanup(t)
|
||||
@@ -578,6 +614,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleDeneb(t *testing
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleForksAltairCapella(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
params.SetupTestConfigCleanup(t)
|
||||
@@ -646,6 +687,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleForksAltairCapel
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleForksCapellaDeneb(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
params.SetupTestConfigCleanup(t)
|
||||
@@ -715,6 +761,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleForksCapellaDene
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientUpdatesByRangeCountBiggerThanLimit(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
params.SetupTestConfigCleanup(t)
|
||||
@@ -777,6 +828,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeCountBiggerThanLimit(t *
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientUpdatesByRangeCountBiggerThanMax(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
params.SetupTestConfigCleanup(t)
|
||||
@@ -838,35 +894,22 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeCountBiggerThanMax(t *te
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientUpdatesByRangeStartPeriodBeforeAltair(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
params.SetupTestConfigCleanup(t)
|
||||
config := params.BeaconConfig()
|
||||
config.AltairForkEpoch = 1
|
||||
config.EpochsPerSyncCommitteePeriod = 1
|
||||
params.OverrideBeaconConfig(config)
|
||||
slot := primitives.Slot(config.AltairForkEpoch * primitives.Epoch(config.SlotsPerEpoch)).Add(1)
|
||||
|
||||
st, err := util.NewBeaconStateAltair()
|
||||
require.NoError(t, err)
|
||||
headSlot := slot.Add(1)
|
||||
err = st.SetSlot(headSlot)
|
||||
require.NoError(t, err)
|
||||
|
||||
db := dbtesting.SetupDB(t)
|
||||
|
||||
updatePeriod := slot.Div(uint64(config.EpochsPerSyncCommitteePeriod)).Div(uint64(config.SlotsPerEpoch))
|
||||
|
||||
update, err := createUpdate(t, version.Altair)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = db.SaveLightClientUpdate(ctx, uint64(updatePeriod), update)
|
||||
require.NoError(t, err)
|
||||
|
||||
mockChainService := &mock.ChainService{State: st}
|
||||
s := &Server{
|
||||
HeadFetcher: mockChainService,
|
||||
BeaconDB: db,
|
||||
BeaconDB: db,
|
||||
}
|
||||
startPeriod := 0
|
||||
url := fmt.Sprintf("http://foo.com/?count=2&start_period=%d", startPeriod)
|
||||
@@ -878,18 +921,17 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeStartPeriodBeforeAltair(
|
||||
|
||||
require.Equal(t, http.StatusOK, writer.Code)
|
||||
var resp structs.LightClientUpdatesByRangeResponse
|
||||
err = json.Unmarshal(writer.Body.Bytes(), &resp.Updates)
|
||||
err := json.Unmarshal(writer.Body.Bytes(), &resp.Updates)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, len(resp.Updates))
|
||||
|
||||
require.Equal(t, "altair", resp.Updates[0].Version)
|
||||
updateJson, err := structs.LightClientUpdateFromConsensus(update)
|
||||
require.NoError(t, err)
|
||||
require.DeepEqual(t, updateJson, resp.Updates[0].Data)
|
||||
|
||||
require.Equal(t, 0, len(resp.Updates))
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientUpdatesByRangeMissingUpdates(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
params.SetupTestConfigCleanup(t)
|
||||
@@ -996,6 +1038,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeMissingUpdates(t *testin
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientFinalityUpdate(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
config := params.BeaconConfig()
|
||||
@@ -1108,6 +1155,11 @@ func TestLightClientHandler_GetLightClientFinalityUpdate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientOptimisticUpdateAltair(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
config := params.BeaconConfig()
|
||||
@@ -1220,6 +1272,11 @@ func TestLightClientHandler_GetLightClientOptimisticUpdateAltair(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientOptimisticUpdateCapella(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
config := params.BeaconConfig()
|
||||
@@ -1332,6 +1389,11 @@ func TestLightClientHandler_GetLightClientOptimisticUpdateCapella(t *testing.T)
|
||||
}
|
||||
|
||||
func TestLightClientHandler_GetLightClientOptimisticUpdateDeneb(t *testing.T) {
|
||||
resetFn := features.InitWithReset(&features.Flags{
|
||||
EnableLightClient: true,
|
||||
})
|
||||
defer resetFn()
|
||||
|
||||
helpers.ClearCache()
|
||||
ctx := context.Background()
|
||||
config := params.BeaconConfig()
|
||||
|
||||
@@ -244,6 +244,7 @@ go_test(
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//proto/prysm/v1alpha1/attestation:go_default_library",
|
||||
"//proto/prysm/v1alpha1/metadata:go_default_library",
|
||||
"//runtime/version:go_default_library",
|
||||
"//testing/assert:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
"//testing/util:go_default_library",
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
@@ -39,87 +40,128 @@ func (s *Service) forkWatcher() {
|
||||
}
|
||||
}
|
||||
|
||||
// Checks if there is a fork in the next epoch and if there is
|
||||
// it registers the appropriate gossip and rpc topics.
|
||||
func (s *Service) registerForUpcomingFork(currEpoch primitives.Epoch) error {
|
||||
genRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
isNextForkEpoch, err := forks.IsForkNextEpoch(s.cfg.clock.GenesisTime(), genRoot[:])
|
||||
// registerForUpcomingFork registers appropriate gossip and RPC topic if there is a fork in the next epoch.
|
||||
func (s *Service) registerForUpcomingFork(currentEpoch primitives.Epoch) error {
|
||||
// Get the genesis validators root.
|
||||
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
|
||||
// Check if there is a fork in the next epoch.
|
||||
isForkNextEpoch, err := forks.IsForkNextEpoch(s.cfg.clock.GenesisTime(), genesisValidatorsRoot[:])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Could not retrieve next fork epoch")
|
||||
}
|
||||
// In preparation for the upcoming fork
|
||||
// in the following epoch, the node
|
||||
// will subscribe the new topics in advance.
|
||||
if isNextForkEpoch {
|
||||
nextEpoch := currEpoch + 1
|
||||
digest, err := forks.ForkDigestFromEpoch(nextEpoch, genRoot[:])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not retrieve fork digest")
|
||||
}
|
||||
if s.subHandler.digestExists(digest) {
|
||||
return nil
|
||||
}
|
||||
s.registerSubscribers(nextEpoch, digest)
|
||||
if nextEpoch == params.BeaconConfig().AltairForkEpoch {
|
||||
s.registerRPCHandlersAltair()
|
||||
}
|
||||
if nextEpoch == params.BeaconConfig().DenebForkEpoch {
|
||||
s.registerRPCHandlersDeneb()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Checks if there was a fork in the previous epoch, and if there
|
||||
// was then we deregister the topics from that particular fork.
|
||||
func (s *Service) deregisterFromPastFork(currEpoch primitives.Epoch) error {
|
||||
genRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
// This method takes care of the de-registration of
|
||||
// old gossip pubsub handlers. Once we are at the epoch
|
||||
// after the fork, we de-register from all the outdated topics.
|
||||
currFork, err := forks.Fork(currEpoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// If we are still in our genesis fork version then
|
||||
// we simply exit early.
|
||||
if currFork.Epoch == params.BeaconConfig().GenesisEpoch {
|
||||
// Exit early if there is no fork in the next epoch.
|
||||
if !isForkNextEpoch {
|
||||
return nil
|
||||
}
|
||||
epochAfterFork := currFork.Epoch + 1
|
||||
// If we are in the epoch after the fork, we start de-registering.
|
||||
if epochAfterFork == currEpoch {
|
||||
// Look at the previous fork's digest.
|
||||
epochBeforeFork := currFork.Epoch - 1
|
||||
prevDigest, err := forks.ForkDigestFromEpoch(epochBeforeFork, genRoot[:])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "Failed to determine previous epoch fork digest")
|
||||
}
|
||||
|
||||
// Exit early if there are no topics with that particular
|
||||
// digest.
|
||||
if !s.subHandler.digestExists(prevDigest) {
|
||||
return nil
|
||||
}
|
||||
prevFork, err := forks.Fork(epochBeforeFork)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to determine previous epoch fork data")
|
||||
}
|
||||
if prevFork.Epoch == params.BeaconConfig().GenesisEpoch {
|
||||
s.unregisterPhase0Handlers()
|
||||
}
|
||||
// Run through all our current active topics and see
|
||||
// if there are any subscriptions to be removed.
|
||||
for _, t := range s.subHandler.allTopics() {
|
||||
retDigest, err := p2p.ExtractGossipDigest(t)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not retrieve digest")
|
||||
continue
|
||||
}
|
||||
if retDigest == prevDigest {
|
||||
s.unSubscribeFromTopic(t)
|
||||
}
|
||||
}
|
||||
beforeForkEpoch := currentEpoch
|
||||
forkEpoch := beforeForkEpoch + 1
|
||||
|
||||
// Get the fork afterForkDigest for the next epoch.
|
||||
afterForkDigest, err := forks.ForkDigestFromEpoch(forkEpoch, genesisValidatorsRoot[:])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not retrieve fork digest")
|
||||
}
|
||||
|
||||
// Exit early if the topics for the next epoch are already registered.
|
||||
// It likely to be the case for all slots of the epoch that are not the first one.
|
||||
if s.subHandler.digestExists(afterForkDigest) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Register the subscribers (gossipsub) for the next epoch.
|
||||
s.registerSubscribers(forkEpoch, afterForkDigest)
|
||||
|
||||
// Get the handlers for the current and next fork.
|
||||
beforeForkHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(beforeForkEpoch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "RPC handler by topic from before fork epoch")
|
||||
}
|
||||
|
||||
forkHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(forkEpoch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "RPC handler by topic from fork epoch")
|
||||
}
|
||||
|
||||
// Compute newly added topics.
|
||||
newRPCHandlerByTopic := addedRPCHandlerByTopic(beforeForkHandlerByTopic, forkHandlerByTopic)
|
||||
|
||||
// Register the new RPC handlers.
|
||||
for topic, handler := range newRPCHandlerByTopic {
|
||||
s.registerRPC(topic, handler)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// deregisterFromPastFork deregisters appropriate gossip and RPC topic if there is a fork in the current epoch.
|
||||
func (s *Service) deregisterFromPastFork(currentEpoch primitives.Epoch) error {
|
||||
// Extract the genesis validators root.
|
||||
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
|
||||
// Get the fork.
|
||||
currentFork, err := forks.Fork(currentEpoch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "genesis validators root")
|
||||
}
|
||||
|
||||
// If we are still in our genesis fork version then exit early.
|
||||
if currentFork.Epoch == params.BeaconConfig().GenesisEpoch {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the epoch after the fork epoch.
|
||||
afterForkEpoch := currentFork.Epoch + 1
|
||||
|
||||
// Start de-registering if the current epoch is after the fork epoch.
|
||||
if currentEpoch != afterForkEpoch {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Look at the previous fork's digest.
|
||||
beforeForkEpoch := currentFork.Epoch - 1
|
||||
|
||||
beforeForkDigest, err := forks.ForkDigestFromEpoch(beforeForkEpoch, genesisValidatorsRoot[:])
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "fork digest from epoch")
|
||||
}
|
||||
|
||||
// Exit early if there are no topics with that particular digest.
|
||||
if !s.subHandler.digestExists(beforeForkDigest) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Compute the RPC handlers that are no longer needed.
|
||||
beforeForkHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(beforeForkEpoch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "RPC handler by topic from before fork epoch")
|
||||
}
|
||||
|
||||
forkHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentFork.Epoch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "RPC handler by topic from fork epoch")
|
||||
}
|
||||
|
||||
topicsToRemove := removedRPCTopics(beforeForkHandlerByTopic, forkHandlerByTopic)
|
||||
for topic := range topicsToRemove {
|
||||
fullTopic := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullTopic))
|
||||
}
|
||||
|
||||
// Run through all our current active topics and see
|
||||
// if there are any subscriptions to be removed.
|
||||
for _, t := range s.subHandler.allTopics() {
|
||||
retDigest, err := p2p.ExtractGossipDigest(t)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not retrieve digest")
|
||||
continue
|
||||
}
|
||||
if retDigest == beforeForkDigest {
|
||||
s.unSubscribeFromTopic(t)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/network/forks"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime/version"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/assert"
|
||||
)
|
||||
|
||||
@@ -230,7 +231,8 @@ func TestService_CheckForPreviousEpochFork(t *testing.T) {
|
||||
chainStarted: abool.New(),
|
||||
subHandler: newSubTopicHandler(),
|
||||
}
|
||||
r.registerRPCHandlers()
|
||||
err := r.registerRPCHandlers()
|
||||
assert.NoError(t, err)
|
||||
return r
|
||||
},
|
||||
currEpoch: 10,
|
||||
@@ -278,10 +280,21 @@ func TestService_CheckForPreviousEpochFork(t *testing.T) {
|
||||
prevGenesis := chainService.Genesis
|
||||
// To allow registration of v1 handlers
|
||||
chainService.Genesis = time.Now().Add(-1 * oneEpoch())
|
||||
r.registerRPCHandlers()
|
||||
err := r.registerRPCHandlers()
|
||||
assert.NoError(t, err)
|
||||
|
||||
chainService.Genesis = prevGenesis
|
||||
r.registerRPCHandlersAltair()
|
||||
previous, err := r.rpcHandlerByTopicFromFork(version.Phase0)
|
||||
assert.NoError(t, err)
|
||||
|
||||
next, err := r.rpcHandlerByTopicFromFork(version.Altair)
|
||||
assert.NoError(t, err)
|
||||
|
||||
handlerByTopic := addedRPCHandlerByTopic(previous, next)
|
||||
|
||||
for topic, handler := range handlerByTopic {
|
||||
r.registerRPC(topic, handler)
|
||||
}
|
||||
|
||||
genRoot := r.cfg.clock.GenesisValidatorsRoot()
|
||||
digest, err := forks.ForkDigestFromEpoch(0, genRoot[:])
|
||||
|
||||
@@ -9,117 +9,151 @@ import (
|
||||
|
||||
libp2pcore "github.com/libp2p/go-libp2p/core"
|
||||
"github.com/libp2p/go-libp2p/core/network"
|
||||
"github.com/libp2p/go-libp2p/core/protocol"
|
||||
"github.com/pkg/errors"
|
||||
ssz "github.com/prysmaticlabs/fastssz"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
|
||||
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
|
||||
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
|
||||
"github.com/prysmaticlabs/prysm/v5/runtime/version"
|
||||
"github.com/prysmaticlabs/prysm/v5/time/slots"
|
||||
)
|
||||
|
||||
// Time to first byte timeout. The maximum time to wait for first byte of
|
||||
// request response (time-to-first-byte). The client is expected to give up if
|
||||
// they don't receive the first byte within 5 seconds.
|
||||
var ttfbTimeout = params.BeaconConfig().TtfbTimeoutDuration()
|
||||
var (
|
||||
// Time to first byte timeout. The maximum time to wait for first byte of
|
||||
// request response (time-to-first-byte). The client is expected to give up if
|
||||
// they don't receive the first byte within 5 seconds.
|
||||
ttfbTimeout = params.BeaconConfig().TtfbTimeoutDuration()
|
||||
|
||||
// respTimeout is the maximum time for complete response transfer.
|
||||
var respTimeout = params.BeaconConfig().RespTimeoutDuration()
|
||||
// respTimeout is the maximum time for complete response transfer.
|
||||
respTimeout = params.BeaconConfig().RespTimeoutDuration()
|
||||
)
|
||||
|
||||
// rpcHandler is responsible for handling and responding to any incoming message.
|
||||
// This method may return an error to internal monitoring, but the error will
|
||||
// not be relayed to the peer.
|
||||
type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error
|
||||
|
||||
// registerRPCHandlers for p2p RPC.
|
||||
func (s *Service) registerRPCHandlers() {
|
||||
currEpoch := slots.ToEpoch(s.cfg.clock.CurrentSlot())
|
||||
// Register V2 handlers if we are past altair fork epoch.
|
||||
if currEpoch >= params.BeaconConfig().AltairForkEpoch {
|
||||
s.registerRPC(
|
||||
p2p.RPCStatusTopicV1,
|
||||
s.statusRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCGoodByeTopicV1,
|
||||
s.goodbyeRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCPingTopicV1,
|
||||
s.pingHandler,
|
||||
)
|
||||
s.registerRPCHandlersAltair()
|
||||
// rpcHandlerByTopicFromFork returns the RPC handlers for a given fork index.
|
||||
func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandler, error) {
|
||||
switch forkIndex {
|
||||
// PhaseO: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#messages
|
||||
case version.Phase0:
|
||||
return map[string]rpcHandler{
|
||||
p2p.RPCStatusTopicV1: s.statusRPCHandler,
|
||||
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
|
||||
p2p.RPCBlocksByRangeTopicV1: s.beaconBlocksByRangeRPCHandler,
|
||||
p2p.RPCBlocksByRootTopicV1: s.beaconBlocksRootRPCHandler,
|
||||
p2p.RPCPingTopicV1: s.pingHandler,
|
||||
p2p.RPCMetaDataTopicV1: s.metaDataHandler,
|
||||
}, nil
|
||||
|
||||
if currEpoch >= params.BeaconConfig().DenebForkEpoch {
|
||||
s.registerRPCHandlersDeneb()
|
||||
}
|
||||
return
|
||||
// Altair: https://github.com/ethereum/consensus-specs/tree/dev/specs/altair#messages
|
||||
// Bellatrix: https://github.com/ethereum/consensus-specs/tree/dev/specs/bellatrix#messages
|
||||
// Capella: https://github.com/ethereum/consensus-specs/tree/dev/specs/capella#messages
|
||||
case version.Altair, version.Bellatrix, version.Capella:
|
||||
return map[string]rpcHandler{
|
||||
p2p.RPCStatusTopicV1: s.statusRPCHandler,
|
||||
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
|
||||
p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler, // Modified in Altair
|
||||
p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler, // Modified in Altair
|
||||
p2p.RPCPingTopicV1: s.pingHandler,
|
||||
p2p.RPCMetaDataTopicV2: s.metaDataHandler, // Modified in Altair
|
||||
}, nil
|
||||
|
||||
// Deneb: https://github.com/ethereum/consensus-specs/blob/dev/specs/deneb/p2p-interface.md#messages
|
||||
// Electra: https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/p2p-interface.md#messages
|
||||
case version.Deneb, version.Electra:
|
||||
return map[string]rpcHandler{
|
||||
p2p.RPCStatusTopicV1: s.statusRPCHandler,
|
||||
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
|
||||
p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler,
|
||||
p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler,
|
||||
p2p.RPCPingTopicV1: s.pingHandler,
|
||||
p2p.RPCMetaDataTopicV2: s.metaDataHandler,
|
||||
p2p.RPCBlobSidecarsByRootTopicV1: s.blobSidecarByRootRPCHandler, // Added in Deneb
|
||||
p2p.RPCBlobSidecarsByRangeTopicV1: s.blobSidecarsByRangeRPCHandler, // Added in Deneb
|
||||
}, nil
|
||||
|
||||
default:
|
||||
return nil, errors.Errorf("RPC handler not found for fork index %d", forkIndex)
|
||||
}
|
||||
s.registerRPC(
|
||||
p2p.RPCStatusTopicV1,
|
||||
s.statusRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCGoodByeTopicV1,
|
||||
s.goodbyeRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCBlocksByRangeTopicV1,
|
||||
s.beaconBlocksByRangeRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCBlocksByRootTopicV1,
|
||||
s.beaconBlocksRootRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCPingTopicV1,
|
||||
s.pingHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCMetaDataTopicV1,
|
||||
s.metaDataHandler,
|
||||
)
|
||||
}
|
||||
|
||||
// registerRPCHandlers for altair.
|
||||
func (s *Service) registerRPCHandlersAltair() {
|
||||
s.registerRPC(
|
||||
p2p.RPCBlocksByRangeTopicV2,
|
||||
s.beaconBlocksByRangeRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCBlocksByRootTopicV2,
|
||||
s.beaconBlocksRootRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCMetaDataTopicV2,
|
||||
s.metaDataHandler,
|
||||
)
|
||||
// rpcHandlerByTopic returns the RPC handlers for a given epoch.
|
||||
func (s *Service) rpcHandlerByTopicFromEpoch(epoch primitives.Epoch) (map[string]rpcHandler, error) {
|
||||
// Get the beacon config.
|
||||
beaconConfig := params.BeaconConfig()
|
||||
|
||||
if epoch >= beaconConfig.ElectraForkEpoch {
|
||||
return s.rpcHandlerByTopicFromFork(version.Electra)
|
||||
}
|
||||
|
||||
if epoch >= beaconConfig.DenebForkEpoch {
|
||||
return s.rpcHandlerByTopicFromFork(version.Deneb)
|
||||
}
|
||||
|
||||
if epoch >= beaconConfig.CapellaForkEpoch {
|
||||
return s.rpcHandlerByTopicFromFork(version.Capella)
|
||||
}
|
||||
|
||||
if epoch >= beaconConfig.BellatrixForkEpoch {
|
||||
return s.rpcHandlerByTopicFromFork(version.Bellatrix)
|
||||
}
|
||||
|
||||
if epoch >= beaconConfig.AltairForkEpoch {
|
||||
return s.rpcHandlerByTopicFromFork(version.Altair)
|
||||
}
|
||||
|
||||
return s.rpcHandlerByTopicFromFork(version.Phase0)
|
||||
}
|
||||
|
||||
func (s *Service) registerRPCHandlersDeneb() {
|
||||
s.registerRPC(
|
||||
p2p.RPCBlobSidecarsByRangeTopicV1,
|
||||
s.blobSidecarsByRangeRPCHandler,
|
||||
)
|
||||
s.registerRPC(
|
||||
p2p.RPCBlobSidecarsByRootTopicV1,
|
||||
s.blobSidecarByRootRPCHandler,
|
||||
)
|
||||
// addedRPCHandlerByTopic returns the RPC handlers that are added in the new map that are not present in the old map.
|
||||
func addedRPCHandlerByTopic(previous, next map[string]rpcHandler) map[string]rpcHandler {
|
||||
added := make(map[string]rpcHandler)
|
||||
|
||||
for topic, handler := range next {
|
||||
if _, ok := previous[topic]; !ok {
|
||||
added[topic] = handler
|
||||
}
|
||||
}
|
||||
|
||||
return added
|
||||
}
|
||||
|
||||
// Remove all v1 Stream handlers that are no longer supported
|
||||
// from altair onwards.
|
||||
func (s *Service) unregisterPhase0Handlers() {
|
||||
fullBlockRangeTopic := p2p.RPCBlocksByRangeTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
fullBlockRootTopic := p2p.RPCBlocksByRootTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
fullMetadataTopic := p2p.RPCMetaDataTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
// removedTopics returns the topics that are removed in the new map that are not present in the old map.
|
||||
func removedRPCTopics(previous, next map[string]rpcHandler) map[string]bool {
|
||||
removed := make(map[string]bool)
|
||||
|
||||
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullBlockRangeTopic))
|
||||
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullBlockRootTopic))
|
||||
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullMetadataTopic))
|
||||
for topic := range previous {
|
||||
if _, ok := next[topic]; !ok {
|
||||
removed[topic] = true
|
||||
}
|
||||
}
|
||||
|
||||
return removed
|
||||
}
|
||||
|
||||
// registerRPCHandlers for p2p RPC.
|
||||
func (s *Service) registerRPCHandlers() error {
|
||||
// Get the current epoch.
|
||||
currentSlot := s.cfg.clock.CurrentSlot()
|
||||
currentEpoch := slots.ToEpoch(currentSlot)
|
||||
|
||||
// Get the RPC handlers for the current epoch.
|
||||
handlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentEpoch)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "rpc handler by topic from epoch")
|
||||
}
|
||||
|
||||
// Register the RPC handlers for the current epoch.
|
||||
for topic, handler := range handlerByTopic {
|
||||
s.registerRPC(topic, handler)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// registerRPC for a given topic with an expected protobuf message type.
|
||||
|
||||
@@ -303,14 +303,21 @@ func (s *Service) waitForChainStart() {
|
||||
|
||||
ctxMap, err := ContextByteVersionsForValRoot(clock.GenesisValidatorsRoot())
|
||||
if err != nil {
|
||||
log.WithError(err).WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()).
|
||||
log.
|
||||
WithError(err).
|
||||
WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()).
|
||||
Error("sync service failed to initialize context version map")
|
||||
return
|
||||
}
|
||||
s.ctxMap = ctxMap
|
||||
|
||||
// Register respective rpc handlers at state initialized event.
|
||||
s.registerRPCHandlers()
|
||||
err = s.registerRPCHandlers()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not register rpc handlers")
|
||||
return
|
||||
}
|
||||
|
||||
// Wait for chainstart in separate routine.
|
||||
if startTime.After(prysmTime.Now()) {
|
||||
time.Sleep(prysmTime.Until(startTime))
|
||||
|
||||
@@ -53,6 +53,30 @@ func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Messag
|
||||
return pubsub.ValidationAccept, nil
|
||||
}
|
||||
|
||||
func sliceFromCount(count uint64) []uint64 {
|
||||
result := make([]uint64, 0, count)
|
||||
|
||||
for item := range count {
|
||||
result = append(result, item)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) []uint64 {
|
||||
if flags.Get().SubscribeToAllSubnets {
|
||||
return sliceFromCount(params.BeaconConfig().SyncCommitteeSubnetCount)
|
||||
}
|
||||
|
||||
// Get the current epoch.
|
||||
currentEpoch := slots.ToEpoch(currentSlot)
|
||||
|
||||
// Retrieve the subnets we want to subscribe to.
|
||||
subs := cache.SyncSubnetIDs.GetAllSubnets(currentEpoch)
|
||||
|
||||
return slice.SetUint64(subs)
|
||||
}
|
||||
|
||||
// Register PubSub subscribers
|
||||
func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
s.subscribe(
|
||||
@@ -85,49 +109,34 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
s.attesterSlashingSubscriber,
|
||||
digest,
|
||||
)
|
||||
if flags.Get().SubscribeToAllSubnets {
|
||||
s.subscribeStaticWithSubnets(
|
||||
p2p.AttestationSubnetTopicFormat,
|
||||
s.validateCommitteeIndexBeaconAttestation, /* validator */
|
||||
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
|
||||
digest,
|
||||
params.BeaconConfig().AttestationSubnetCount,
|
||||
)
|
||||
} else {
|
||||
s.subscribeDynamicWithSubnets(
|
||||
p2p.AttestationSubnetTopicFormat,
|
||||
s.validateCommitteeIndexBeaconAttestation, /* validator */
|
||||
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
|
||||
digest,
|
||||
)
|
||||
}
|
||||
s.subscribeWithParameters(
|
||||
p2p.AttestationSubnetTopicFormat,
|
||||
s.validateCommitteeIndexBeaconAttestation,
|
||||
s.committeeIndexBeaconAttestationSubscriber,
|
||||
digest,
|
||||
s.persistentAndAggregatorSubnetIndices,
|
||||
s.attesterSubnetIndices,
|
||||
)
|
||||
// Altair Fork Version
|
||||
if epoch >= params.BeaconConfig().AltairForkEpoch {
|
||||
if params.BeaconConfig().AltairForkEpoch <= epoch {
|
||||
s.subscribe(
|
||||
p2p.SyncContributionAndProofSubnetTopicFormat,
|
||||
s.validateSyncContributionAndProof,
|
||||
s.syncContributionAndProofSubscriber,
|
||||
digest,
|
||||
)
|
||||
if flags.Get().SubscribeToAllSubnets {
|
||||
s.subscribeStaticWithSyncSubnets(
|
||||
p2p.SyncCommitteeSubnetTopicFormat,
|
||||
s.validateSyncCommitteeMessage, /* validator */
|
||||
s.syncCommitteeMessageSubscriber, /* message handler */
|
||||
digest,
|
||||
)
|
||||
} else {
|
||||
s.subscribeDynamicWithSyncSubnets(
|
||||
p2p.SyncCommitteeSubnetTopicFormat,
|
||||
s.validateSyncCommitteeMessage, /* validator */
|
||||
s.syncCommitteeMessageSubscriber, /* message handler */
|
||||
digest,
|
||||
)
|
||||
}
|
||||
s.subscribeWithParameters(
|
||||
p2p.SyncCommitteeSubnetTopicFormat,
|
||||
s.validateSyncCommitteeMessage,
|
||||
s.syncCommitteeMessageSubscriber,
|
||||
digest,
|
||||
s.activeSyncSubnetIndices,
|
||||
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
|
||||
)
|
||||
}
|
||||
|
||||
// New Gossip Topic in Capella
|
||||
if epoch >= params.BeaconConfig().CapellaForkEpoch {
|
||||
if params.BeaconConfig().CapellaForkEpoch <= epoch {
|
||||
s.subscribe(
|
||||
p2p.BlsToExecutionChangeSubnetTopicFormat,
|
||||
s.validateBlsToExecutionChange,
|
||||
@@ -137,13 +146,14 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
|
||||
}
|
||||
|
||||
// New Gossip Topic in Deneb
|
||||
if epoch >= params.BeaconConfig().DenebForkEpoch {
|
||||
s.subscribeStaticWithSubnets(
|
||||
if params.BeaconConfig().DenebForkEpoch <= epoch {
|
||||
s.subscribeWithParameters(
|
||||
p2p.BlobSubnetTopicFormat,
|
||||
s.validateBlob, /* validator */
|
||||
s.blobSubscriber, /* message handler */
|
||||
s.validateBlob,
|
||||
s.blobSubscriber,
|
||||
digest,
|
||||
params.BeaconConfig().BlobsidecarSubnetCount,
|
||||
func(primitives.Slot) []uint64 { return sliceFromCount(params.BeaconConfig().BlobsidecarSubnetCount) },
|
||||
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -324,132 +334,6 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
|
||||
}
|
||||
}
|
||||
|
||||
// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is
|
||||
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
|
||||
func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte, subnetCount uint64) {
|
||||
genRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
|
||||
if err != nil {
|
||||
// Impossible condition as it would mean digest does not exist.
|
||||
panic(err)
|
||||
}
|
||||
base := p2p.GossipTopicMappings(topic, e)
|
||||
if base == nil {
|
||||
// Impossible condition as it would mean topic does not exist.
|
||||
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
|
||||
}
|
||||
for i := uint64(0); i < subnetCount; i++ {
|
||||
s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle)
|
||||
}
|
||||
genesis := s.cfg.clock.GenesisTime()
|
||||
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
ticker.Done()
|
||||
return
|
||||
case <-ticker.C():
|
||||
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
|
||||
continue
|
||||
}
|
||||
valid, err := isDigestValid(digest, genesis, genRoot)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
if !valid {
|
||||
log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
|
||||
// Unsubscribes from all our current subnets.
|
||||
for i := uint64(0); i < subnetCount; i++ {
|
||||
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
s.unSubscribeFromTopic(fullTopic)
|
||||
}
|
||||
ticker.Done()
|
||||
return
|
||||
}
|
||||
// Check every slot that there are enough peers
|
||||
for i := uint64(0); i < subnetCount; i++ {
|
||||
if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) {
|
||||
_, err := s.cfg.p2p.FindPeersWithSubnet(
|
||||
s.ctx,
|
||||
s.addDigestAndIndexToTopic(topic, digest, i),
|
||||
i,
|
||||
flags.Get().MinimumPeersPerSubnet,
|
||||
)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not search for peers")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// subscribe to a dynamically changing list of subnets. This method expects a fmt compatible
|
||||
// string for the topic name and the list of subnets for subscribed topics that should be
|
||||
// maintained.
|
||||
func (s *Service) subscribeDynamicWithSubnets(
|
||||
topicFormat string,
|
||||
validate wrappedVal,
|
||||
handle subHandler,
|
||||
digest [4]byte,
|
||||
) {
|
||||
genRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
|
||||
if err != nil {
|
||||
// Impossible condition as it would mean digest does not exist.
|
||||
panic(err)
|
||||
}
|
||||
base := p2p.GossipTopicMappings(topicFormat, e)
|
||||
if base == nil {
|
||||
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
|
||||
}
|
||||
subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot)
|
||||
genesis := s.cfg.clock.GenesisTime()
|
||||
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
ticker.Done()
|
||||
return
|
||||
case currentSlot := <-ticker.C():
|
||||
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
|
||||
continue
|
||||
}
|
||||
valid, err := isDigestValid(digest, genesis, genRoot)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
if !valid {
|
||||
log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
|
||||
// Unsubscribes from all our current subnets.
|
||||
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
|
||||
ticker.Done()
|
||||
return
|
||||
}
|
||||
wantedSubs := s.retrievePersistentSubs(currentSlot)
|
||||
s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)
|
||||
|
||||
for _, idx := range wantedSubs {
|
||||
s.subscribeAggregatorSubnet(subscriptions, idx, digest, validate, handle)
|
||||
}
|
||||
// find desired subs for attesters
|
||||
attesterSubs := s.attesterSubnetIndices(currentSlot)
|
||||
for _, idx := range attesterSubs {
|
||||
s.lookupAttesterSubnets(digest, idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are
|
||||
// not in the list of wanted subnets.
|
||||
// TODO: Rename this functions as it does not only revalidate subscriptions.
|
||||
@@ -477,96 +361,44 @@ func (s *Service) reValidateSubscriptions(
|
||||
}
|
||||
}
|
||||
|
||||
// subscribe missing subnets for our aggregators.
|
||||
func (s *Service) subscribeAggregatorSubnet(
|
||||
subscriptions map[uint64]*pubsub.Subscription,
|
||||
idx uint64,
|
||||
// searchForPeers searches for peers in the given subnets.
|
||||
func (s *Service) searchForPeers(
|
||||
ctx context.Context,
|
||||
topicFormat string,
|
||||
digest [4]byte,
|
||||
validate wrappedVal,
|
||||
handle subHandler,
|
||||
currentSlot primitives.Slot,
|
||||
getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64,
|
||||
getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64,
|
||||
) {
|
||||
// do not subscribe if we have no peers in the same
|
||||
// subnet
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})]
|
||||
subnetTopic := fmt.Sprintf(topic, digest, idx)
|
||||
// check if subscription exists and if not subscribe the relevant subnet.
|
||||
if _, exists := subscriptions[idx]; !exists {
|
||||
subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle)
|
||||
}
|
||||
if !s.enoughPeersAreConnected(subnetTopic) {
|
||||
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet)
|
||||
// Retrieve the subnets we want to subscribe to.
|
||||
subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot)
|
||||
|
||||
// Retrieve the subnets we want to find peers for.
|
||||
subnetsToFindPeersOnlyIndex := getSubnetsToFindPeersOnly(currentSlot)
|
||||
|
||||
// Combine the subnets to subscribe and the subnets to find peers for.
|
||||
subnetsToFindPeersIndex := slice.SetUint64(append(subnetsToSubscribeIndex, subnetsToFindPeersOnlyIndex...))
|
||||
|
||||
// Find new peers for wanted subnets if needed.
|
||||
for _, subnetIndex := range subnetsToFindPeersIndex {
|
||||
topic := fmt.Sprintf(topicFormat, digest, subnetIndex)
|
||||
|
||||
// Check if we have enough peers in the subnet. Skip if we do.
|
||||
if s.enoughPeersAreConnected(topic) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Not enough peers in the subnet, we need to search for more.
|
||||
_, err := s.cfg.p2p.FindPeersWithSubnet(ctx, topic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not search for peers")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is
|
||||
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
|
||||
func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte) {
|
||||
genRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
base := p2p.GossipTopicMappings(topic, e)
|
||||
if base == nil {
|
||||
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
|
||||
}
|
||||
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
|
||||
s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle)
|
||||
}
|
||||
genesis := s.cfg.clock.GenesisTime()
|
||||
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
ticker.Done()
|
||||
return
|
||||
case <-ticker.C():
|
||||
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
|
||||
continue
|
||||
}
|
||||
valid, err := isDigestValid(digest, genesis, genRoot)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
if !valid {
|
||||
log.Warnf("Sync subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
|
||||
// Unsubscribes from all our current subnets.
|
||||
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
|
||||
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix()
|
||||
s.unSubscribeFromTopic(fullTopic)
|
||||
}
|
||||
ticker.Done()
|
||||
return
|
||||
}
|
||||
// Check every slot that there are enough peers
|
||||
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
|
||||
if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) {
|
||||
_, err := s.cfg.p2p.FindPeersWithSubnet(
|
||||
s.ctx,
|
||||
s.addDigestAndIndexToTopic(topic, digest, i),
|
||||
i,
|
||||
flags.Get().MinimumPeersPerSubnet,
|
||||
)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not search for peers")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// subscribeToSyncSubnets subscribes to needed sync subnets, unsubscribe from unneeded ones and search for more peers if needed.
|
||||
// subscribeToSubnets subscribes to needed subnets, unsubscribe from unneeded ones and search for more peers if needed.
|
||||
// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise.
|
||||
func (s *Service) subscribeToSyncSubnets(
|
||||
func (s *Service) subscribeToSubnets(
|
||||
topicFormat string,
|
||||
digest [4]byte,
|
||||
genesisValidatorsRoot [fieldparams.RootLength]byte,
|
||||
@@ -575,16 +407,15 @@ func (s *Service) subscribeToSyncSubnets(
|
||||
currentSlot primitives.Slot,
|
||||
validate wrappedVal,
|
||||
handle subHandler,
|
||||
getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64,
|
||||
getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64,
|
||||
) bool {
|
||||
// Get sync subnets topic.
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.SyncCommitteeMessage{})]
|
||||
|
||||
// Do not subscribe if not synced.
|
||||
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
|
||||
return true
|
||||
}
|
||||
|
||||
// Do not subscribe is the digest is not valid.
|
||||
// Check the validity of the digest.
|
||||
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
@@ -593,23 +424,25 @@ func (s *Service) subscribeToSyncSubnets(
|
||||
|
||||
// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
|
||||
if !valid {
|
||||
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn("Sync subnets with this digest are no longer valid, unsubscribing from all of them.")
|
||||
description := topicFormat
|
||||
if pos := strings.LastIndex(topicFormat, "/"); pos != -1 {
|
||||
description = topicFormat[pos+1:]
|
||||
}
|
||||
|
||||
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warningf("%s subnets with this digest are no longer valid, unsubscribing from all of them.", description)
|
||||
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
|
||||
return false
|
||||
}
|
||||
|
||||
// Get the current epoch.
|
||||
currentEpoch := slots.ToEpoch(currentSlot)
|
||||
|
||||
// Retrieve the subnets we want to subscribe to.
|
||||
wantedSubnetsIndex := s.retrieveActiveSyncSubnets(currentEpoch)
|
||||
subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot)
|
||||
|
||||
// Remove subscriptions that are no longer wanted.
|
||||
s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest)
|
||||
s.reValidateSubscriptions(subscriptions, subnetsToSubscribeIndex, topicFormat, digest)
|
||||
|
||||
// Subscribe to wanted subnets.
|
||||
for _, subnetIndex := range wantedSubnetsIndex {
|
||||
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
|
||||
for _, subnetIndex := range subnetsToSubscribeIndex {
|
||||
subnetTopic := fmt.Sprintf(topicFormat, digest, subnetIndex)
|
||||
|
||||
// Check if subscription exists.
|
||||
if _, exists := subscriptions[subnetIndex]; exists {
|
||||
@@ -620,38 +453,20 @@ func (s *Service) subscribeToSyncSubnets(
|
||||
subscription := s.subscribeWithBase(subnetTopic, validate, handle)
|
||||
subscriptions[subnetIndex] = subscription
|
||||
}
|
||||
|
||||
// Find new peers for wanted subnets if needed.
|
||||
for _, subnetIndex := range wantedSubnetsIndex {
|
||||
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
|
||||
|
||||
// Check if we have enough peers in the subnet. Skip if we do.
|
||||
if s.enoughPeersAreConnected(subnetTopic) {
|
||||
continue
|
||||
}
|
||||
|
||||
// Not enough peers in the subnet, we need to search for more.
|
||||
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not search for peers")
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// subscribeDynamicWithSyncSubnets subscribes to a dynamically changing list of subnets.
|
||||
func (s *Service) subscribeDynamicWithSyncSubnets(
|
||||
// subscribeWithParameters subscribes to a list of subnets.
|
||||
func (s *Service) subscribeWithParameters(
|
||||
topicFormat string,
|
||||
validate wrappedVal,
|
||||
handle subHandler,
|
||||
digest [4]byte,
|
||||
getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64,
|
||||
getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64,
|
||||
) {
|
||||
// Retrieve the number of committee subnets we need to subscribe to.
|
||||
syncCommiteeSubnetsCount := params.BeaconConfig().SyncCommitteeSubnetCount
|
||||
|
||||
// Initialize the subscriptions map.
|
||||
subscriptions := make(map[uint64]*pubsub.Subscription, syncCommiteeSubnetsCount)
|
||||
subscriptions := make(map[uint64]*pubsub.Subscription)
|
||||
|
||||
// Retrieve the genesis validators root.
|
||||
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
|
||||
@@ -678,14 +493,20 @@ func (s *Service) subscribeDynamicWithSyncSubnets(
|
||||
// Retrieve the current slot.
|
||||
currentSlot := s.cfg.clock.CurrentSlot()
|
||||
|
||||
// Subscribe to subnets.
|
||||
s.subscribeToSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
|
||||
|
||||
// Derive a new context and cancel function.
|
||||
ctx, cancel := context.WithCancel(s.ctx)
|
||||
|
||||
go func() {
|
||||
// Subscribe to the sync subnets.
|
||||
s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
|
||||
// Search for peers.
|
||||
s.searchForPeers(ctx, topicFormat, digest, currentSlot, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
|
||||
|
||||
for {
|
||||
select {
|
||||
case currentSlot := <-ticker.C():
|
||||
isDigestValid := s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
|
||||
isDigestValid := s.subscribeToSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
|
||||
|
||||
// Stop the ticker if the digest is not valid. Likely to happen after a hard fork.
|
||||
if !isDigestValid {
|
||||
@@ -693,7 +514,11 @@ func (s *Service) subscribeDynamicWithSyncSubnets(
|
||||
return
|
||||
}
|
||||
|
||||
// Search for peers.
|
||||
s.searchForPeers(ctx, topicFormat, digest, currentSlot, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
|
||||
|
||||
case <-s.ctx.Done():
|
||||
cancel()
|
||||
ticker.Done()
|
||||
return
|
||||
}
|
||||
@@ -701,21 +526,8 @@ func (s *Service) subscribeDynamicWithSyncSubnets(
|
||||
}()
|
||||
}
|
||||
|
||||
// lookup peers for attester specific subnets.
|
||||
func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})]
|
||||
subnetTopic := fmt.Sprintf(topic, digest, idx)
|
||||
if !s.enoughPeersAreConnected(subnetTopic) {
|
||||
// perform a search for peers with the desired committee index.
|
||||
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not search for peers")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) unSubscribeFromTopic(topic string) {
|
||||
log.WithField("topic", topic).Debug("Unsubscribing from topic")
|
||||
log.WithField("topic", topic).Info("Unsubscribed from")
|
||||
if err := s.cfg.p2p.PubSub().UnregisterTopicValidator(topic); err != nil {
|
||||
log.WithError(err).Error("Could not unregister topic validator")
|
||||
}
|
||||
@@ -740,19 +552,16 @@ func (s *Service) enoughPeersAreConnected(subnetTopic string) bool {
|
||||
return peersWithSubnetCount >= threshold
|
||||
}
|
||||
|
||||
func (s *Service) retrievePersistentSubs(currSlot primitives.Slot) []uint64 {
|
||||
// Persistent subscriptions from validators
|
||||
persistentSubs := s.persistentSubnetIndices()
|
||||
// Update desired topic indices for aggregator
|
||||
wantedSubs := s.aggregatorSubnetIndices(currSlot)
|
||||
func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 {
|
||||
if flags.Get().SubscribeToAllSubnets {
|
||||
return sliceFromCount(params.BeaconConfig().AttestationSubnetCount)
|
||||
}
|
||||
|
||||
// Combine subscriptions to get all requested subscriptions
|
||||
return slice.SetUint64(append(persistentSubs, wantedSubs...))
|
||||
}
|
||||
persistentSubnetIndices := s.persistentSubnetIndices()
|
||||
aggregatorSubnetIndices := s.aggregatorSubnetIndices(currentSlot)
|
||||
|
||||
func (*Service) retrieveActiveSyncSubnets(currEpoch primitives.Epoch) []uint64 {
|
||||
subs := cache.SyncSubnetIDs.GetAllSubnets(currEpoch)
|
||||
return slice.SetUint64(subs)
|
||||
// Combine subscriptions to get all requested subscriptions.
|
||||
return slice.SetUint64(append(persistentSubnetIndices, aggregatorSubnetIndices...))
|
||||
}
|
||||
|
||||
// filters out required peers for the node to function, not
|
||||
@@ -768,7 +577,7 @@ func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID {
|
||||
return pids
|
||||
}
|
||||
currSlot := s.cfg.clock.CurrentSlot()
|
||||
wantedSubs := s.retrievePersistentSubs(currSlot)
|
||||
wantedSubs := s.persistentAndAggregatorSubnetIndices(currSlot)
|
||||
wantedSubs = slice.SetUint64(append(wantedSubs, s.attesterSubnetIndices(currSlot)...))
|
||||
topic := p2p.GossipTypeMapping[reflect.TypeOf(ðpb.Attestation{})]
|
||||
|
||||
|
||||
@@ -312,37 +312,6 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
|
||||
require.LogsDoNotContain(t, hook, "Could not unregister topic validator")
|
||||
}
|
||||
|
||||
func TestStaticSubnets(t *testing.T) {
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
chain := &mockChain.ChainService{
|
||||
Genesis: time.Now(),
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
}
|
||||
r := Service{
|
||||
ctx: ctx,
|
||||
cfg: &config{
|
||||
chain: chain,
|
||||
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
|
||||
p2p: p,
|
||||
},
|
||||
chainStarted: abool.New(),
|
||||
subHandler: newSubTopicHandler(),
|
||||
}
|
||||
defaultTopic := "/eth2/%x/beacon_attestation_%d"
|
||||
d, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
r.subscribeStaticWithSubnets(defaultTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
|
||||
// no-op
|
||||
return nil
|
||||
}, d, params.BeaconConfig().AttestationSubnetCount)
|
||||
topics := r.cfg.p2p.PubSub().GetTopics()
|
||||
if uint64(len(topics)) != params.BeaconConfig().AttestationSubnetCount {
|
||||
t.Errorf("Wanted the number of subnet topics registered to be %d but got %d", params.BeaconConfig().AttestationSubnetCount, len(topics))
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
|
||||
func Test_wrapAndReportValidation(t *testing.T) {
|
||||
mChain := &mockChain.ChainService{
|
||||
Genesis: time.Now(),
|
||||
@@ -539,37 +508,6 @@ func TestFilterSubnetPeers(t *testing.T) {
|
||||
assert.Equal(t, 1, len(recPeers), "expected at least 1 suitable peer to prune")
|
||||
}
|
||||
|
||||
func TestSubscribeWithSyncSubnets_StaticOK(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.MainnetTestConfig().Copy()
|
||||
cfg.SecondsPerSlot = 1
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
chain := &mockChain.ChainService{
|
||||
Genesis: time.Now(),
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
}
|
||||
r := Service{
|
||||
ctx: ctx,
|
||||
cfg: &config{
|
||||
chain: chain,
|
||||
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
|
||||
p2p: p,
|
||||
},
|
||||
chainStarted: abool.New(),
|
||||
subHandler: newSubTopicHandler(),
|
||||
}
|
||||
// Empty cache at the end of the test.
|
||||
defer cache.SyncSubnetIDs.EmptyAllCaches()
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
r.subscribeStaticWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
|
||||
assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics()))
|
||||
cancel()
|
||||
}
|
||||
|
||||
func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.MainnetConfig().Copy()
|
||||
@@ -600,7 +538,7 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
|
||||
cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second)
|
||||
digest, err := r.currentForkDigest()
|
||||
assert.NoError(t, err)
|
||||
r.subscribeDynamicWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
|
||||
r.subscribeWithParameters(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, r.activeSyncSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} })
|
||||
time.Sleep(2 * time.Second)
|
||||
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
|
||||
topicMap := map[string]bool{}
|
||||
@@ -615,46 +553,6 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
|
||||
cancel()
|
||||
}
|
||||
|
||||
func TestSubscribeWithSyncSubnets_StaticSwitchFork(t *testing.T) {
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
params.SetupTestConfigCleanup(t)
|
||||
cfg := params.BeaconConfig()
|
||||
cfg.AltairForkEpoch = 1
|
||||
cfg.SecondsPerSlot = 1
|
||||
params.OverrideBeaconConfig(cfg)
|
||||
params.BeaconConfig().InitializeForkSchedule()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
currSlot := primitives.Slot(100)
|
||||
chain := &mockChain.ChainService{
|
||||
Genesis: time.Now().Add(-time.Duration(uint64(params.BeaconConfig().SlotsPerEpoch)*params.BeaconConfig().SecondsPerSlot) * time.Second),
|
||||
ValidatorsRoot: [32]byte{'A'},
|
||||
Slot: &currSlot,
|
||||
}
|
||||
r := Service{
|
||||
ctx: ctx,
|
||||
cfg: &config{
|
||||
chain: chain,
|
||||
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
|
||||
p2p: p,
|
||||
},
|
||||
chainStarted: abool.New(),
|
||||
subHandler: newSubTopicHandler(),
|
||||
}
|
||||
// Empty cache at the end of the test.
|
||||
defer cache.SyncSubnetIDs.EmptyAllCaches()
|
||||
genRoot := r.cfg.clock.GenesisValidatorsRoot()
|
||||
digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:])
|
||||
assert.NoError(t, err)
|
||||
r.subscribeStaticWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
|
||||
assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics()))
|
||||
|
||||
// Expect that all old topics will be unsubscribed.
|
||||
time.Sleep(2 * time.Second)
|
||||
assert.Equal(t, 0, len(r.cfg.p2p.PubSub().GetTopics()))
|
||||
|
||||
cancel()
|
||||
}
|
||||
|
||||
func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) {
|
||||
params.SetupTestConfigCleanup(t)
|
||||
p := p2ptest.NewTestP2P(t)
|
||||
@@ -689,7 +587,7 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) {
|
||||
digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:])
|
||||
assert.NoError(t, err)
|
||||
|
||||
r.subscribeDynamicWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
|
||||
r.subscribeWithParameters(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, r.activeSyncSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} })
|
||||
time.Sleep(2 * time.Second)
|
||||
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
|
||||
topicMap := map[string]bool{}
|
||||
|
||||
Reference in New Issue
Block a user