Compare commits

...

9 Commits

Author SHA1 Message Date
Potuz
4798c1866a changelog 2024-12-19 10:54:01 -03:00
Potuz
10173af7e7 Merge remote-tracking branch 'origin/develop' into err_on_invalid 2024-12-19 10:52:43 -03:00
Potuz
72eb0fbbfe Return an error when attempting to invalidate the finalized checkpoint 2024-12-19 10:46:57 -03:00
Rupam Dey
0b7c005d7d add light client flag check to rpc handlers (#14736)
* add lc flag check to rpc handlers

* deps

* changelog

* update tests

* deps
2024-12-19 08:06:37 +00:00
Manu NALEPA
65e8c37b48 Refactor RPC handlers subscription. (#14732)
* Refactor RPC handlers subscription.

* Fix Sammy's comments.

* Update beacon-chain/sync/fork_watcher.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* Update beacon-chain/sync/fork_watcher.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* Update beacon-chain/sync/fork_watcher.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

* Update beacon-chain/sync/fork_watcher.go

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>

---------

Co-authored-by: Sammy Rosso <15244892+saolyn@users.noreply.github.com>
2024-12-18 13:05:20 +00:00
Bastin
689015ff01 Add tests for save and read LC Bootstrap DB functions (#14724)
* add tests for bootstrapSaveAndRetreive

* not available boostrap

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2024-12-18 09:17:32 +00:00
Bastin
08c14f02f6 clean up the updates by range handler (#14719)
* clean up the updates by range handler

* remove redundant HandleErrors

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2024-12-18 08:08:29 +00:00
Bastin
4bb0b44f16 Add tests for saving light client updates (#14717)
* move flag settings out of individual tests

* add tests with better or worst old update

* remove whitespace

* add zero fields for default update at deneb block in electra

---------

Co-authored-by: Radosław Kapka <rkapka@wp.pl>
2024-12-18 08:08:25 +00:00
Manu NALEPA
29237cb0bc Refactor subnets subscriptions. (#14711)
* Refactor subnets subscriptions.

* Remove totally static/dynamic distinction.

* Unsubscribing from topic: Use INFO instead of log.

==> So we have something symmetrical with subscriptions.

* Address Nishant's comment.
2024-12-17 12:47:29 +00:00
18 changed files with 1388 additions and 779 deletions

View File

@@ -12,10 +12,13 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Added an error field to log `Finished building block`.
- Implemented a new `EmptyExecutionPayloadHeader` function.
- `Finished building block`: Display error only if not nil.
- Return an error instead of panic when invalidating the finalized checkpoint.
### Changed
- Process light client finality updates only for new finalized epochs instead of doing it for every block.
- Refactor subnets subscriptions.
- Refactor RPC handlers subscriptions.
### Deprecated
@@ -75,6 +78,7 @@ Notable features:
- Added a Prometheus error counter metric for SSE requests.
- Save light client updates and bootstraps in DB.
- Added more comprehensive tests for `BlockToLightClientHeader`. [PR](https://github.com/prysmaticlabs/prysm/pull/14699)
- Added light client feature flag check to RPC handlers. [PR](https://github.com/prysmaticlabs/prysm/pull/14736)
### Changed

View File

@@ -140,6 +140,7 @@ go_test(
"//beacon-chain/core/blocks:go_default_library",
"//beacon-chain/core/feed/state:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/light-client:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/core/transition:go_default_library",
"//beacon-chain/das:go_default_library",

View File

@@ -14,6 +14,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/blocks"
lightClient "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/light-client"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/das"
@@ -2505,173 +2506,500 @@ func fakeResult(missing []uint64) map[uint64]struct{} {
}
func TestSaveLightClientUpdate(t *testing.T) {
featCfg := &features.Flags{}
featCfg.EnableLightClient = true
reset := features.InitWithReset(featCfg)
s, tr := minimalTestService(t)
ctx := tr.ctx
t.Run("Altair", func(t *testing.T) {
featCfg := &features.Flags{}
featCfg.EnableLightClient = true
reset := features.InitWithReset(featCfg)
t.Run("No old update", func(t *testing.T) {
l := util.NewTestLightClient(t).SetupTestAltair()
l := util.NewTestLightClient(t).SetupTestAltair()
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().AltairForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().AltairForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
require.NoError(t, err)
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
require.NoError(t, err)
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
require.NoError(t, err)
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
require.NoError(t, err)
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
require.NoError(t, err)
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
require.NoError(t, err)
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
require.NoError(t, err)
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
require.NoError(t, err)
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
postState: l.State,
isValidPayload: true,
}
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
postState: l.State,
isValidPayload: true,
}
s.saveLightClientUpdate(cfg)
s.saveLightClientUpdate(cfg)
// Check that the light client update is saved
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
// Check that the light client update is saved
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
require.NoError(t, err)
require.NotNil(t, u)
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
require.NoError(t, err)
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
require.Equal(t, u.Version(), version.Altair)
})
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
require.NoError(t, err)
require.NotNil(t, u)
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
require.NoError(t, err)
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
require.Equal(t, u.Version(), version.Altair)
t.Run("New update is better", func(t *testing.T) {
l := util.NewTestLightClient(t).SetupTestAltair()
reset()
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().AltairForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
require.NoError(t, err)
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
require.NoError(t, err)
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
require.NoError(t, err)
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
require.NoError(t, err)
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
postState: l.State,
isValidPayload: true,
}
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
// create and save old update
oldUpdate, err := lightClient.CreateDefaultLightClientUpdate(s.CurrentSlot(), l.AttestedState)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveLightClientUpdate(ctx, period, oldUpdate)
require.NoError(t, err)
s.saveLightClientUpdate(cfg)
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
require.NoError(t, err)
require.NotNil(t, u)
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
require.NoError(t, err)
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
require.Equal(t, u.Version(), version.Altair)
})
t.Run("Old update is better", func(t *testing.T) {
l := util.NewTestLightClient(t).SetupTestAltair()
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().AltairForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
require.NoError(t, err)
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
require.NoError(t, err)
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
require.NoError(t, err)
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
require.NoError(t, err)
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
postState: l.State,
isValidPayload: true,
}
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
// create and save old update
oldUpdate, err := lightClient.CreateDefaultLightClientUpdate(s.CurrentSlot(), l.AttestedState)
require.NoError(t, err)
scb := make([]byte, 64)
for i := 0; i < 5; i++ {
scb[i] = 0x01
}
oldUpdate.SetSyncAggregate(&ethpb.SyncAggregate{
SyncCommitteeBits: scb,
SyncCommitteeSignature: make([]byte, 96),
})
err = s.cfg.BeaconDB.SaveLightClientUpdate(ctx, period, oldUpdate)
require.NoError(t, err)
s.saveLightClientUpdate(cfg)
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
require.NoError(t, err)
require.NotNil(t, u)
require.DeepEqual(t, oldUpdate, u)
require.Equal(t, u.Version(), version.Altair)
})
})
t.Run("Capella", func(t *testing.T) {
featCfg := &features.Flags{}
featCfg.EnableLightClient = true
reset := features.InitWithReset(featCfg)
t.Run("No old update", func(t *testing.T) {
l := util.NewTestLightClient(t).SetupTestCapella(false)
l := util.NewTestLightClient(t).SetupTestCapella(false)
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().CapellaForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().CapellaForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
require.NoError(t, err)
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
require.NoError(t, err)
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
require.NoError(t, err)
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
require.NoError(t, err)
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
require.NoError(t, err)
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
require.NoError(t, err)
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
require.NoError(t, err)
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
require.NoError(t, err)
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
postState: l.State,
isValidPayload: true,
}
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
postState: l.State,
isValidPayload: true,
}
s.saveLightClientUpdate(cfg)
s.saveLightClientUpdate(cfg)
// Check that the light client update is saved
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
require.NoError(t, err)
require.NotNil(t, u)
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
require.NoError(t, err)
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
require.Equal(t, u.Version(), version.Capella)
})
// Check that the light client update is saved
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
require.NoError(t, err)
require.NotNil(t, u)
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
require.NoError(t, err)
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
require.Equal(t, u.Version(), version.Capella)
t.Run("New update is better", func(t *testing.T) {
l := util.NewTestLightClient(t).SetupTestCapella(false)
reset()
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().CapellaForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
require.NoError(t, err)
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
require.NoError(t, err)
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
require.NoError(t, err)
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
require.NoError(t, err)
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
postState: l.State,
isValidPayload: true,
}
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
// create and save old update
oldUpdate, err := lightClient.CreateDefaultLightClientUpdate(s.CurrentSlot(), l.AttestedState)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveLightClientUpdate(ctx, period, oldUpdate)
require.NoError(t, err)
s.saveLightClientUpdate(cfg)
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
require.NoError(t, err)
require.NotNil(t, u)
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
require.NoError(t, err)
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
require.Equal(t, u.Version(), version.Capella)
})
t.Run("Old update is better", func(t *testing.T) {
l := util.NewTestLightClient(t).SetupTestCapella(false)
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().CapellaForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
require.NoError(t, err)
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
require.NoError(t, err)
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
require.NoError(t, err)
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
require.NoError(t, err)
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
postState: l.State,
isValidPayload: true,
}
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
// create and save old update
oldUpdate, err := lightClient.CreateDefaultLightClientUpdate(s.CurrentSlot(), l.AttestedState)
require.NoError(t, err)
scb := make([]byte, 64)
for i := 0; i < 5; i++ {
scb[i] = 0x01
}
oldUpdate.SetSyncAggregate(&ethpb.SyncAggregate{
SyncCommitteeBits: scb,
SyncCommitteeSignature: make([]byte, 96),
})
err = s.cfg.BeaconDB.SaveLightClientUpdate(ctx, period, oldUpdate)
require.NoError(t, err)
s.saveLightClientUpdate(cfg)
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
require.NoError(t, err)
require.NotNil(t, u)
require.DeepEqual(t, oldUpdate, u)
require.Equal(t, u.Version(), version.Capella)
})
})
t.Run("Deneb", func(t *testing.T) {
featCfg := &features.Flags{}
featCfg.EnableLightClient = true
reset := features.InitWithReset(featCfg)
t.Run("No old update", func(t *testing.T) {
l := util.NewTestLightClient(t).SetupTestDeneb(false)
l := util.NewTestLightClient(t).SetupTestDeneb(false)
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().DenebForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().DenebForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
require.NoError(t, err)
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
require.NoError(t, err)
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
require.NoError(t, err)
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
require.NoError(t, err)
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
require.NoError(t, err)
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
require.NoError(t, err)
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
require.NoError(t, err)
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
require.NoError(t, err)
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
postState: l.State,
isValidPayload: true,
}
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
postState: l.State,
isValidPayload: true,
}
s.saveLightClientUpdate(cfg)
s.saveLightClientUpdate(cfg)
// Check that the light client update is saved
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
require.NoError(t, err)
require.NotNil(t, u)
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
require.NoError(t, err)
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
require.Equal(t, u.Version(), version.Deneb)
})
// Check that the light client update is saved
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
require.NoError(t, err)
require.NotNil(t, u)
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
require.NoError(t, err)
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
require.Equal(t, u.Version(), version.Deneb)
t.Run("New update is better", func(t *testing.T) {
l := util.NewTestLightClient(t).SetupTestDeneb(false)
reset()
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().DenebForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
require.NoError(t, err)
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
require.NoError(t, err)
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
require.NoError(t, err)
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
require.NoError(t, err)
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
postState: l.State,
isValidPayload: true,
}
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
// create and save old update
oldUpdate, err := lightClient.CreateDefaultLightClientUpdate(s.CurrentSlot(), l.AttestedState)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveLightClientUpdate(ctx, period, oldUpdate)
require.NoError(t, err)
s.saveLightClientUpdate(cfg)
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
require.NoError(t, err)
require.NotNil(t, u)
attestedStateRoot, err := l.AttestedState.HashTreeRoot(ctx)
require.NoError(t, err)
require.Equal(t, attestedStateRoot, [32]byte(u.AttestedHeader().Beacon().StateRoot))
require.Equal(t, u.Version(), version.Deneb)
})
t.Run("Old update is better", func(t *testing.T) {
l := util.NewTestLightClient(t).SetupTestDeneb(false)
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().DenebForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
err := s.cfg.BeaconDB.SaveBlock(ctx, l.AttestedBlock)
require.NoError(t, err)
attestedBlockRoot, err := l.AttestedBlock.Block().HashTreeRoot()
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.AttestedState, attestedBlockRoot)
require.NoError(t, err)
currentBlockRoot, err := l.Block.Block().HashTreeRoot()
require.NoError(t, err)
roblock, err := consensusblocks.NewROBlockWithRoot(l.Block, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, roblock)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveState(ctx, l.State, currentBlockRoot)
require.NoError(t, err)
err = s.cfg.BeaconDB.SaveBlock(ctx, l.FinalizedBlock)
require.NoError(t, err)
cfg := &postBlockProcessConfig{
ctx: ctx,
roblock: roblock,
postState: l.State,
isValidPayload: true,
}
period := slots.SyncCommitteePeriod(slots.ToEpoch(l.AttestedState.Slot()))
// create and save old update
oldUpdate, err := lightClient.CreateDefaultLightClientUpdate(s.CurrentSlot(), l.AttestedState)
require.NoError(t, err)
scb := make([]byte, 64)
for i := 0; i < 5; i++ {
scb[i] = 0x01
}
oldUpdate.SetSyncAggregate(&ethpb.SyncAggregate{
SyncCommitteeBits: scb,
SyncCommitteeSignature: make([]byte, 96),
})
err = s.cfg.BeaconDB.SaveLightClientUpdate(ctx, period, oldUpdate)
require.NoError(t, err)
s.saveLightClientUpdate(cfg)
u, err := s.cfg.BeaconDB.LightClientUpdate(ctx, period)
require.NoError(t, err)
require.NotNil(t, u)
require.DeepEqual(t, oldUpdate, u)
require.Equal(t, u.Version(), version.Deneb)
})
})
reset()
}
func TestSaveLightClientBootstrap(t *testing.T) {
featCfg := &features.Flags{}
featCfg.EnableLightClient = true
reset := features.InitWithReset(featCfg)
s, tr := minimalTestService(t)
ctx := tr.ctx
t.Run("Altair", func(t *testing.T) {
featCfg := &features.Flags{}
featCfg.EnableLightClient = true
reset := features.InitWithReset(featCfg)
l := util.NewTestLightClient(t).SetupTestAltair()
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().AltairForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
@@ -2704,15 +3032,9 @@ func TestSaveLightClientBootstrap(t *testing.T) {
require.NoError(t, err)
require.Equal(t, stateRoot, [32]byte(b.Header().Beacon().StateRoot))
require.Equal(t, b.Version(), version.Altair)
reset()
})
t.Run("Capella", func(t *testing.T) {
featCfg := &features.Flags{}
featCfg.EnableLightClient = true
reset := features.InitWithReset(featCfg)
l := util.NewTestLightClient(t).SetupTestCapella(false)
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().CapellaForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
@@ -2745,15 +3067,9 @@ func TestSaveLightClientBootstrap(t *testing.T) {
require.NoError(t, err)
require.Equal(t, stateRoot, [32]byte(b.Header().Beacon().StateRoot))
require.Equal(t, b.Version(), version.Capella)
reset()
})
t.Run("Deneb", func(t *testing.T) {
featCfg := &features.Flags{}
featCfg.EnableLightClient = true
reset := features.InitWithReset(featCfg)
l := util.NewTestLightClient(t).SetupTestDeneb(false)
s.genesisTime = time.Unix(time.Now().Unix()-(int64(params.BeaconConfig().DenebForkEpoch)*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)), 0)
@@ -2786,7 +3102,7 @@ func TestSaveLightClientBootstrap(t *testing.T) {
require.NoError(t, err)
require.Equal(t, stateRoot, [32]byte(b.Header().Beacon().StateRoot))
require.Equal(t, b.Version(), version.Deneb)
reset()
})
reset()
}

View File

@@ -282,56 +282,247 @@ func CreateDefaultLightClientUpdate(currentSlot primitives.Slot, attestedState s
if currentEpoch < params.BeaconConfig().CapellaForkEpoch {
m = &pb.LightClientUpdateAltair{
AttestedHeader: &pb.LightClientHeaderAltair{
Beacon: &pb.BeaconBlockHeader{},
Beacon: &pb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
},
NextSyncCommittee: nextSyncCommittee,
NextSyncCommitteeBranch: nextSyncCommitteeBranch,
FinalityBranch: finalityBranch,
FinalizedHeader: &pb.LightClientHeaderAltair{
Beacon: &pb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
},
SyncAggregate: &pb.SyncAggregate{
SyncCommitteeBits: make([]byte, 64),
SyncCommitteeSignature: make([]byte, 96),
},
}
} else if currentEpoch < params.BeaconConfig().DenebForkEpoch {
m = &pb.LightClientUpdateCapella{
AttestedHeader: &pb.LightClientHeaderCapella{
Beacon: &pb.BeaconBlockHeader{},
Execution: &enginev1.ExecutionPayloadHeaderCapella{},
Beacon: &pb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
Execution: &enginev1.ExecutionPayloadHeaderCapella{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
ExtraData: make([]byte, 0),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
TransactionsRoot: make([]byte, fieldparams.RootLength),
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
},
ExecutionBranch: executionBranch,
},
NextSyncCommittee: nextSyncCommittee,
NextSyncCommitteeBranch: nextSyncCommitteeBranch,
FinalityBranch: finalityBranch,
FinalizedHeader: &pb.LightClientHeaderCapella{
Beacon: &pb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
Execution: &enginev1.ExecutionPayloadHeaderCapella{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
ExtraData: make([]byte, 0),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
TransactionsRoot: make([]byte, fieldparams.RootLength),
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
},
ExecutionBranch: executionBranch,
},
SyncAggregate: &pb.SyncAggregate{
SyncCommitteeBits: make([]byte, 64),
SyncCommitteeSignature: make([]byte, 96),
},
}
} else if currentEpoch < params.BeaconConfig().ElectraForkEpoch {
m = &pb.LightClientUpdateDeneb{
AttestedHeader: &pb.LightClientHeaderDeneb{
Beacon: &pb.BeaconBlockHeader{},
Execution: &enginev1.ExecutionPayloadHeaderDeneb{},
Beacon: &pb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
ExtraData: make([]byte, 0),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
TransactionsRoot: make([]byte, fieldparams.RootLength),
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
GasLimit: 0,
GasUsed: 0,
},
ExecutionBranch: executionBranch,
},
NextSyncCommittee: nextSyncCommittee,
NextSyncCommitteeBranch: nextSyncCommitteeBranch,
FinalityBranch: finalityBranch,
FinalizedHeader: &pb.LightClientHeaderDeneb{
Beacon: &pb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
ExtraData: make([]byte, 0),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
TransactionsRoot: make([]byte, fieldparams.RootLength),
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
GasLimit: 0,
GasUsed: 0,
},
ExecutionBranch: executionBranch,
},
SyncAggregate: &pb.SyncAggregate{
SyncCommitteeBits: make([]byte, 64),
SyncCommitteeSignature: make([]byte, 96),
},
}
} else {
if attestedState.Version() >= version.Electra {
m = &pb.LightClientUpdateElectra{
AttestedHeader: &pb.LightClientHeaderDeneb{
Beacon: &pb.BeaconBlockHeader{},
Execution: &enginev1.ExecutionPayloadHeaderDeneb{},
Beacon: &pb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
ExtraData: make([]byte, 0),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
TransactionsRoot: make([]byte, fieldparams.RootLength),
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
GasLimit: 0,
GasUsed: 0,
},
ExecutionBranch: executionBranch,
},
NextSyncCommittee: nextSyncCommittee,
NextSyncCommitteeBranch: nextSyncCommitteeBranch,
FinalityBranch: finalityBranch,
FinalizedHeader: &pb.LightClientHeaderDeneb{
Beacon: &pb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
ExtraData: make([]byte, 0),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
TransactionsRoot: make([]byte, fieldparams.RootLength),
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
GasLimit: 0,
GasUsed: 0,
},
ExecutionBranch: executionBranch,
},
SyncAggregate: &pb.SyncAggregate{
SyncCommitteeBits: make([]byte, 64),
SyncCommitteeSignature: make([]byte, 96),
},
}
} else {
m = &pb.LightClientUpdateDeneb{
AttestedHeader: &pb.LightClientHeaderDeneb{
Beacon: &pb.BeaconBlockHeader{},
Execution: &enginev1.ExecutionPayloadHeaderDeneb{},
Beacon: &pb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
ExtraData: make([]byte, 0),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
TransactionsRoot: make([]byte, fieldparams.RootLength),
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
GasLimit: 0,
GasUsed: 0,
},
ExecutionBranch: executionBranch,
},
NextSyncCommittee: nextSyncCommittee,
NextSyncCommitteeBranch: nextSyncCommitteeBranch,
FinalityBranch: finalityBranch,
FinalizedHeader: &pb.LightClientHeaderDeneb{
Beacon: &pb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
ExtraData: make([]byte, 0),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
TransactionsRoot: make([]byte, fieldparams.RootLength),
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
GasLimit: 0,
GasUsed: 0,
},
ExecutionBranch: executionBranch,
},
SyncAggregate: &pb.SyncAggregate{
SyncCommitteeBits: make([]byte, 64),
SyncCommitteeSignature: make([]byte, 96),
},
}
}
}

View File

@@ -93,7 +93,7 @@ func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROB
return nil, nil
}
scs := make([]blocks.ROBlob, 0, kc.count())
for i := uint64(0); i < fieldparams.MaxBlobsPerBlock; i++ {
for i := range uint64(fieldparams.MaxBlobsPerBlock) {
// We already have this blob, we don't need to write it or validate it.
if e.diskSummary.HasIndex(i) {
continue

View File

@@ -539,3 +539,231 @@ func createDefaultLightClientUpdate(currentSlot primitives.Slot, attestedState s
return light_client.NewWrappedUpdate(m)
}
func TestStore_LightClientBootstrap_CanSaveRetrieve(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.AltairForkEpoch = 0
cfg.CapellaForkEpoch = 1
cfg.DenebForkEpoch = 2
cfg.ElectraForkEpoch = 3
cfg.EpochsPerSyncCommitteePeriod = 1
params.OverrideBeaconConfig(cfg)
db := setupDB(t)
ctx := context.Background()
t.Run("Nil", func(t *testing.T) {
retrievedBootstrap, err := db.LightClientBootstrap(ctx, []byte("NilBlockRoot"))
require.NoError(t, err)
require.IsNil(t, retrievedBootstrap)
})
t.Run("Altair", func(t *testing.T) {
bootstrap, err := createDefaultLightClientBootstrap(primitives.Slot(uint64(params.BeaconConfig().AltairForkEpoch) * uint64(params.BeaconConfig().SlotsPerEpoch)))
require.NoError(t, err)
err = bootstrap.SetCurrentSyncCommittee(createRandomSyncCommittee())
require.NoError(t, err)
err = db.SaveLightClientBootstrap(ctx, []byte("blockRootAltair"), bootstrap)
require.NoError(t, err)
retrievedBootstrap, err := db.LightClientBootstrap(ctx, []byte("blockRootAltair"))
require.NoError(t, err)
require.DeepEqual(t, bootstrap, retrievedBootstrap, "retrieved bootstrap does not match saved bootstrap")
})
t.Run("Capella", func(t *testing.T) {
bootstrap, err := createDefaultLightClientBootstrap(primitives.Slot(uint64(params.BeaconConfig().CapellaForkEpoch) * uint64(params.BeaconConfig().SlotsPerEpoch)))
require.NoError(t, err)
err = bootstrap.SetCurrentSyncCommittee(createRandomSyncCommittee())
require.NoError(t, err)
err = db.SaveLightClientBootstrap(ctx, []byte("blockRootCapella"), bootstrap)
require.NoError(t, err)
retrievedBootstrap, err := db.LightClientBootstrap(ctx, []byte("blockRootCapella"))
require.NoError(t, err)
require.DeepEqual(t, bootstrap, retrievedBootstrap, "retrieved bootstrap does not match saved bootstrap")
})
t.Run("Deneb", func(t *testing.T) {
bootstrap, err := createDefaultLightClientBootstrap(primitives.Slot(uint64(params.BeaconConfig().DenebForkEpoch) * uint64(params.BeaconConfig().SlotsPerEpoch)))
require.NoError(t, err)
err = bootstrap.SetCurrentSyncCommittee(createRandomSyncCommittee())
require.NoError(t, err)
err = db.SaveLightClientBootstrap(ctx, []byte("blockRootDeneb"), bootstrap)
require.NoError(t, err)
retrievedBootstrap, err := db.LightClientBootstrap(ctx, []byte("blockRootDeneb"))
require.NoError(t, err)
require.DeepEqual(t, bootstrap, retrievedBootstrap, "retrieved bootstrap does not match saved bootstrap")
})
t.Run("Electra", func(t *testing.T) {
bootstrap, err := createDefaultLightClientBootstrap(primitives.Slot(uint64(params.BeaconConfig().ElectraForkEpoch) * uint64(params.BeaconConfig().SlotsPerEpoch)))
require.NoError(t, err)
err = bootstrap.SetCurrentSyncCommittee(createRandomSyncCommittee())
require.NoError(t, err)
err = db.SaveLightClientBootstrap(ctx, []byte("blockRootElectra"), bootstrap)
require.NoError(t, err)
retrievedBootstrap, err := db.LightClientBootstrap(ctx, []byte("blockRootElectra"))
require.NoError(t, err)
require.DeepEqual(t, bootstrap, retrievedBootstrap, "retrieved bootstrap does not match saved bootstrap")
})
}
func createDefaultLightClientBootstrap(currentSlot primitives.Slot) (interfaces.LightClientBootstrap, error) {
currentEpoch := slots.ToEpoch(currentSlot)
syncCommitteeSize := params.BeaconConfig().SyncCommitteeSize
pubKeys := make([][]byte, syncCommitteeSize)
for i := uint64(0); i < syncCommitteeSize; i++ {
pubKeys[i] = make([]byte, fieldparams.BLSPubkeyLength)
}
currentSyncCommittee := &pb.SyncCommittee{
Pubkeys: pubKeys,
AggregatePubkey: make([]byte, fieldparams.BLSPubkeyLength),
}
var currentSyncCommitteeBranch [][]byte
if currentEpoch >= params.BeaconConfig().ElectraForkEpoch {
currentSyncCommitteeBranch = make([][]byte, fieldparams.SyncCommitteeBranchDepthElectra)
} else {
currentSyncCommitteeBranch = make([][]byte, fieldparams.SyncCommitteeBranchDepth)
}
for i := 0; i < len(currentSyncCommitteeBranch); i++ {
currentSyncCommitteeBranch[i] = make([]byte, fieldparams.RootLength)
}
executionBranch := make([][]byte, fieldparams.ExecutionBranchDepth)
for i := 0; i < fieldparams.ExecutionBranchDepth; i++ {
executionBranch[i] = make([]byte, 32)
}
// TODO: can this be based on the current epoch?
var m proto.Message
if currentEpoch < params.BeaconConfig().CapellaForkEpoch {
m = &pb.LightClientBootstrapAltair{
Header: &pb.LightClientHeaderAltair{
Beacon: &pb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
},
CurrentSyncCommittee: currentSyncCommittee,
CurrentSyncCommitteeBranch: currentSyncCommitteeBranch,
}
} else if currentEpoch < params.BeaconConfig().DenebForkEpoch {
m = &pb.LightClientBootstrapCapella{
Header: &pb.LightClientHeaderCapella{
Beacon: &pb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
Execution: &enginev1.ExecutionPayloadHeaderCapella{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
ExtraData: make([]byte, 0),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
TransactionsRoot: make([]byte, fieldparams.RootLength),
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
},
ExecutionBranch: executionBranch,
},
CurrentSyncCommittee: currentSyncCommittee,
CurrentSyncCommitteeBranch: currentSyncCommitteeBranch,
}
} else if currentEpoch < params.BeaconConfig().ElectraForkEpoch {
m = &pb.LightClientBootstrapDeneb{
Header: &pb.LightClientHeaderDeneb{
Beacon: &pb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
ExtraData: make([]byte, 0),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
TransactionsRoot: make([]byte, fieldparams.RootLength),
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
GasLimit: 0,
GasUsed: 0,
},
ExecutionBranch: executionBranch,
},
CurrentSyncCommittee: currentSyncCommittee,
CurrentSyncCommitteeBranch: currentSyncCommitteeBranch,
}
} else {
m = &pb.LightClientBootstrapElectra{
Header: &pb.LightClientHeaderDeneb{
Beacon: &pb.BeaconBlockHeader{
ParentRoot: make([]byte, 32),
StateRoot: make([]byte, 32),
BodyRoot: make([]byte, 32),
},
Execution: &enginev1.ExecutionPayloadHeaderDeneb{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
ExtraData: make([]byte, 0),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
TransactionsRoot: make([]byte, fieldparams.RootLength),
WithdrawalsRoot: make([]byte, fieldparams.RootLength),
GasLimit: 0,
GasUsed: 0,
},
ExecutionBranch: executionBranch,
},
CurrentSyncCommittee: currentSyncCommittee,
CurrentSyncCommitteeBranch: currentSyncCommitteeBranch,
}
}
return light_client.NewWrappedBootstrap(m)
}
func createRandomSyncCommittee() *pb.SyncCommittee {
// random number between 2 and 128
base := rand.Int()%127 + 2
syncCom := make([][]byte, params.BeaconConfig().SyncCommitteeSize)
for i := 0; uint64(i) < params.BeaconConfig().SyncCommitteeSize; i++ {
if i%base == 0 {
syncCom[i] = make([]byte, fieldparams.BLSPubkeyLength)
syncCom[i][0] = 1
continue
}
syncCom[i] = make([]byte, fieldparams.BLSPubkeyLength)
}
return &pb.SyncCommittee{
Pubkeys: syncCom,
AggregatePubkey: make([]byte, fieldparams.BLSPubkeyLength),
}
}

View File

@@ -23,6 +23,9 @@ func (s *Store) setOptimisticToInvalid(ctx context.Context, root, parentRoot, la
if node == nil {
return invalidRoots, errors.Wrap(ErrNilNode, "could not set node to invalid")
}
if node.parent == nil {
return invalidRoots, errors.New("attempting to set the root node to invalid")
}
if node.parent.root != parentRoot {
return invalidRoots, errInvalidParentRoot
}

View File

@@ -385,6 +385,13 @@ func TestSetOptimisticToInvalid_ForkAtMerge_bis(t *testing.T) {
require.DeepEqual(t, roots, [][32]byte{{'b'}, {'c'}, {'d'}, {'e'}})
}
func TestSetOptimisticToInvalid_InvalidRoot(t *testing.T) {
ctx := context.Background()
f := setup(1, 1)
_, err := f.SetOptimisticToInvalid(ctx, [32]byte{}, [32]byte{}, [32]byte{})
require.ErrorContains(t, "attempting to set the root node to invalid", err)
}
func TestSetOptimisticToValid(t *testing.T) {
f := setup(1, 1)
op, err := f.IsOptimistic([32]byte{})

View File

@@ -18,6 +18,7 @@ go_library(
"//beacon-chain/rpc/eth/shared:go_default_library",
"//beacon-chain/rpc/lookup:go_default_library",
"//beacon-chain/state:go_default_library",
"//config/features:go_default_library",
"//config/params:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
@@ -45,6 +46,7 @@ go_test(
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/rpc/testutil:go_default_library",
"//beacon-chain/state:go_default_library",
"//config/features:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",

View File

@@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/api/server/structs"
lightclient "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/light-client"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/eth/shared"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
@@ -22,6 +23,11 @@ import (
// GetLightClientBootstrap - implements https://github.com/ethereum/beacon-APIs/blob/263f4ed6c263c967f13279c7a9f5629b51c5fc55/apis/beacon/light_client/bootstrap.yaml
func (s *Server) GetLightClientBootstrap(w http.ResponseWriter, req *http.Request) {
if !features.Get().EnableLightClient {
httputil.HandleError(w, "Light client feature flag is not enabled", http.StatusNotFound)
return
}
// Prepare
ctx, span := trace.StartSpan(req.Context(), "beacon.GetLightClientBootstrap")
defer span.End()
@@ -76,26 +82,21 @@ func (s *Server) GetLightClientBootstrap(w http.ResponseWriter, req *http.Reques
// GetLightClientUpdatesByRange - implements https://github.com/ethereum/beacon-APIs/blob/263f4ed6c263c967f13279c7a9f5629b51c5fc55/apis/beacon/light_client/updates.yaml
func (s *Server) GetLightClientUpdatesByRange(w http.ResponseWriter, req *http.Request) {
// Prepare
if !features.Get().EnableLightClient {
httputil.HandleError(w, "Light client feature flag is not enabled", http.StatusNotFound)
return
}
ctx, span := trace.StartSpan(req.Context(), "beacon.GetLightClientUpdatesByRange")
defer span.End()
// Determine slots per period
config := params.BeaconConfig()
slotsPerPeriod := uint64(config.EpochsPerSyncCommitteePeriod) * uint64(config.SlotsPerEpoch)
// Adjust count based on configuration
_, count, gotCount := shared.UintFromQuery(w, req, "count", true)
if !gotCount {
return
} else if count == 0 {
httputil.HandleError(w, fmt.Sprintf("got invalid 'count' query variable '%d': count must be greater than 0", count), http.StatusInternalServerError)
return
}
// Determine the start and end periods
_, startPeriod, gotStartPeriod := shared.UintFromQuery(w, req, "start_period", true)
if !gotStartPeriod {
httputil.HandleError(w, fmt.Sprintf("Got invalid 'count' query variable '%d': count must be greater than 0", count), http.StatusBadRequest)
return
}
@@ -103,33 +104,13 @@ func (s *Server) GetLightClientUpdatesByRange(w http.ResponseWriter, req *http.R
count = config.MaxRequestLightClientUpdates
}
// max possible slot is current head
headState, err := s.HeadFetcher.HeadState(ctx)
if err != nil {
httputil.HandleError(w, "could not get head state: "+err.Error(), http.StatusInternalServerError)
_, startPeriod, gotStartPeriod := shared.UintFromQuery(w, req, "start_period", true)
if !gotStartPeriod {
return
}
maxSlot := uint64(headState.Slot())
// min possible slot is Altair fork period
minSlot := uint64(config.AltairForkEpoch) * uint64(config.SlotsPerEpoch)
// Adjust startPeriod, the end of start period must be later than Altair fork epoch, otherwise, can not get the sync committee votes
startPeriodEndSlot := (startPeriod+1)*slotsPerPeriod - 1
if startPeriodEndSlot < minSlot {
startPeriod = minSlot / slotsPerPeriod
}
// Get the initial endPeriod, then we will adjust
endPeriod := startPeriod + count - 1
// Adjust endPeriod, the end of end period must be earlier than current head slot
endPeriodEndSlot := (endPeriod+1)*slotsPerPeriod - 1
if endPeriodEndSlot > maxSlot {
endPeriod = maxSlot / slotsPerPeriod
}
// get updates
updatesMap, err := s.BeaconDB.LightClientUpdates(ctx, startPeriod, endPeriod)
if err != nil {
@@ -162,6 +143,11 @@ func (s *Server) GetLightClientUpdatesByRange(w http.ResponseWriter, req *http.R
// GetLightClientFinalityUpdate - implements https://github.com/ethereum/beacon-APIs/blob/263f4ed6c263c967f13279c7a9f5629b51c5fc55/apis/beacon/light_client/finality_update.yaml
func (s *Server) GetLightClientFinalityUpdate(w http.ResponseWriter, req *http.Request) {
if !features.Get().EnableLightClient {
httputil.HandleError(w, "Light client feature flag is not enabled", http.StatusNotFound)
return
}
ctx, span := trace.StartSpan(req.Context(), "beacon.GetLightClientFinalityUpdate")
defer span.End()
@@ -220,6 +206,11 @@ func (s *Server) GetLightClientFinalityUpdate(w http.ResponseWriter, req *http.R
// GetLightClientOptimisticUpdate - implements https://github.com/ethereum/beacon-APIs/blob/263f4ed6c263c967f13279c7a9f5629b51c5fc55/apis/beacon/light_client/optimistic_update.yaml
func (s *Server) GetLightClientOptimisticUpdate(w http.ResponseWriter, req *http.Request) {
if !features.Get().EnableLightClient {
httputil.HandleError(w, "Light client feature flag is not enabled", http.StatusNotFound)
return
}
ctx, span := trace.StartSpan(req.Context(), "beacon.GetLightClientOptimisticUpdate")
defer span.End()

View File

@@ -19,6 +19,7 @@ import (
dbtesting "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/testutil"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/config/features"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
@@ -33,6 +34,11 @@ import (
)
func TestLightClientHandler_GetLightClientBootstrap(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.AltairForkEpoch = 0
@@ -252,6 +258,11 @@ func TestLightClientHandler_GetLightClientBootstrap(t *testing.T) {
// GetLightClientByRange tests
func TestLightClientHandler_GetLightClientUpdatesByRangeAltair(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
@@ -301,6 +312,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeAltair(t *testing.T) {
}
func TestLightClientHandler_GetLightClientUpdatesByRangeCapella(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
params.SetupTestConfigCleanup(t)
@@ -350,6 +366,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeCapella(t *testing.T) {
}
func TestLightClientHandler_GetLightClientUpdatesByRangeDeneb(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
params.SetupTestConfigCleanup(t)
@@ -399,6 +420,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeDeneb(t *testing.T) {
}
func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleAltair(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
params.SetupTestConfigCleanup(t)
@@ -458,6 +484,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleAltair(t *testin
}
func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleCapella(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
params.SetupTestConfigCleanup(t)
@@ -518,6 +549,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleCapella(t *testi
}
func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleDeneb(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
params.SetupTestConfigCleanup(t)
@@ -578,6 +614,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleDeneb(t *testing
}
func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleForksAltairCapella(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
params.SetupTestConfigCleanup(t)
@@ -646,6 +687,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleForksAltairCapel
}
func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleForksCapellaDeneb(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
params.SetupTestConfigCleanup(t)
@@ -715,6 +761,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeMultipleForksCapellaDene
}
func TestLightClientHandler_GetLightClientUpdatesByRangeCountBiggerThanLimit(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
params.SetupTestConfigCleanup(t)
@@ -777,6 +828,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeCountBiggerThanLimit(t *
}
func TestLightClientHandler_GetLightClientUpdatesByRangeCountBiggerThanMax(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
params.SetupTestConfigCleanup(t)
@@ -838,35 +894,22 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeCountBiggerThanMax(t *te
}
func TestLightClientHandler_GetLightClientUpdatesByRangeStartPeriodBeforeAltair(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
params.SetupTestConfigCleanup(t)
config := params.BeaconConfig()
config.AltairForkEpoch = 1
config.EpochsPerSyncCommitteePeriod = 1
params.OverrideBeaconConfig(config)
slot := primitives.Slot(config.AltairForkEpoch * primitives.Epoch(config.SlotsPerEpoch)).Add(1)
st, err := util.NewBeaconStateAltair()
require.NoError(t, err)
headSlot := slot.Add(1)
err = st.SetSlot(headSlot)
require.NoError(t, err)
db := dbtesting.SetupDB(t)
updatePeriod := slot.Div(uint64(config.EpochsPerSyncCommitteePeriod)).Div(uint64(config.SlotsPerEpoch))
update, err := createUpdate(t, version.Altair)
require.NoError(t, err)
err = db.SaveLightClientUpdate(ctx, uint64(updatePeriod), update)
require.NoError(t, err)
mockChainService := &mock.ChainService{State: st}
s := &Server{
HeadFetcher: mockChainService,
BeaconDB: db,
BeaconDB: db,
}
startPeriod := 0
url := fmt.Sprintf("http://foo.com/?count=2&start_period=%d", startPeriod)
@@ -878,18 +921,17 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeStartPeriodBeforeAltair(
require.Equal(t, http.StatusOK, writer.Code)
var resp structs.LightClientUpdatesByRangeResponse
err = json.Unmarshal(writer.Body.Bytes(), &resp.Updates)
err := json.Unmarshal(writer.Body.Bytes(), &resp.Updates)
require.NoError(t, err)
require.Equal(t, 1, len(resp.Updates))
require.Equal(t, "altair", resp.Updates[0].Version)
updateJson, err := structs.LightClientUpdateFromConsensus(update)
require.NoError(t, err)
require.DeepEqual(t, updateJson, resp.Updates[0].Data)
require.Equal(t, 0, len(resp.Updates))
}
func TestLightClientHandler_GetLightClientUpdatesByRangeMissingUpdates(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
params.SetupTestConfigCleanup(t)
@@ -996,6 +1038,11 @@ func TestLightClientHandler_GetLightClientUpdatesByRangeMissingUpdates(t *testin
}
func TestLightClientHandler_GetLightClientFinalityUpdate(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
config := params.BeaconConfig()
@@ -1108,6 +1155,11 @@ func TestLightClientHandler_GetLightClientFinalityUpdate(t *testing.T) {
}
func TestLightClientHandler_GetLightClientOptimisticUpdateAltair(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
config := params.BeaconConfig()
@@ -1220,6 +1272,11 @@ func TestLightClientHandler_GetLightClientOptimisticUpdateAltair(t *testing.T) {
}
func TestLightClientHandler_GetLightClientOptimisticUpdateCapella(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
config := params.BeaconConfig()
@@ -1332,6 +1389,11 @@ func TestLightClientHandler_GetLightClientOptimisticUpdateCapella(t *testing.T)
}
func TestLightClientHandler_GetLightClientOptimisticUpdateDeneb(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
helpers.ClearCache()
ctx := context.Background()
config := params.BeaconConfig()

View File

@@ -244,6 +244,7 @@ go_test(
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/attestation:go_default_library",
"//proto/prysm/v1alpha1/metadata:go_default_library",
"//runtime/version:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",

View File

@@ -1,6 +1,7 @@
package sync
import (
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/config/params"
@@ -39,87 +40,128 @@ func (s *Service) forkWatcher() {
}
}
// Checks if there is a fork in the next epoch and if there is
// it registers the appropriate gossip and rpc topics.
func (s *Service) registerForUpcomingFork(currEpoch primitives.Epoch) error {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
isNextForkEpoch, err := forks.IsForkNextEpoch(s.cfg.clock.GenesisTime(), genRoot[:])
// registerForUpcomingFork registers appropriate gossip and RPC topic if there is a fork in the next epoch.
func (s *Service) registerForUpcomingFork(currentEpoch primitives.Epoch) error {
// Get the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
// Check if there is a fork in the next epoch.
isForkNextEpoch, err := forks.IsForkNextEpoch(s.cfg.clock.GenesisTime(), genesisValidatorsRoot[:])
if err != nil {
return errors.Wrap(err, "Could not retrieve next fork epoch")
}
// In preparation for the upcoming fork
// in the following epoch, the node
// will subscribe the new topics in advance.
if isNextForkEpoch {
nextEpoch := currEpoch + 1
digest, err := forks.ForkDigestFromEpoch(nextEpoch, genRoot[:])
if err != nil {
return errors.Wrap(err, "could not retrieve fork digest")
}
if s.subHandler.digestExists(digest) {
return nil
}
s.registerSubscribers(nextEpoch, digest)
if nextEpoch == params.BeaconConfig().AltairForkEpoch {
s.registerRPCHandlersAltair()
}
if nextEpoch == params.BeaconConfig().DenebForkEpoch {
s.registerRPCHandlersDeneb()
}
}
return nil
}
// Checks if there was a fork in the previous epoch, and if there
// was then we deregister the topics from that particular fork.
func (s *Service) deregisterFromPastFork(currEpoch primitives.Epoch) error {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
// This method takes care of the de-registration of
// old gossip pubsub handlers. Once we are at the epoch
// after the fork, we de-register from all the outdated topics.
currFork, err := forks.Fork(currEpoch)
if err != nil {
return err
}
// If we are still in our genesis fork version then
// we simply exit early.
if currFork.Epoch == params.BeaconConfig().GenesisEpoch {
// Exit early if there is no fork in the next epoch.
if !isForkNextEpoch {
return nil
}
epochAfterFork := currFork.Epoch + 1
// If we are in the epoch after the fork, we start de-registering.
if epochAfterFork == currEpoch {
// Look at the previous fork's digest.
epochBeforeFork := currFork.Epoch - 1
prevDigest, err := forks.ForkDigestFromEpoch(epochBeforeFork, genRoot[:])
if err != nil {
return errors.Wrap(err, "Failed to determine previous epoch fork digest")
}
// Exit early if there are no topics with that particular
// digest.
if !s.subHandler.digestExists(prevDigest) {
return nil
}
prevFork, err := forks.Fork(epochBeforeFork)
if err != nil {
return errors.Wrap(err, "failed to determine previous epoch fork data")
}
if prevFork.Epoch == params.BeaconConfig().GenesisEpoch {
s.unregisterPhase0Handlers()
}
// Run through all our current active topics and see
// if there are any subscriptions to be removed.
for _, t := range s.subHandler.allTopics() {
retDigest, err := p2p.ExtractGossipDigest(t)
if err != nil {
log.WithError(err).Error("Could not retrieve digest")
continue
}
if retDigest == prevDigest {
s.unSubscribeFromTopic(t)
}
}
beforeForkEpoch := currentEpoch
forkEpoch := beforeForkEpoch + 1
// Get the fork afterForkDigest for the next epoch.
afterForkDigest, err := forks.ForkDigestFromEpoch(forkEpoch, genesisValidatorsRoot[:])
if err != nil {
return errors.Wrap(err, "could not retrieve fork digest")
}
// Exit early if the topics for the next epoch are already registered.
// It likely to be the case for all slots of the epoch that are not the first one.
if s.subHandler.digestExists(afterForkDigest) {
return nil
}
// Register the subscribers (gossipsub) for the next epoch.
s.registerSubscribers(forkEpoch, afterForkDigest)
// Get the handlers for the current and next fork.
beforeForkHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(beforeForkEpoch)
if err != nil {
return errors.Wrap(err, "RPC handler by topic from before fork epoch")
}
forkHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(forkEpoch)
if err != nil {
return errors.Wrap(err, "RPC handler by topic from fork epoch")
}
// Compute newly added topics.
newRPCHandlerByTopic := addedRPCHandlerByTopic(beforeForkHandlerByTopic, forkHandlerByTopic)
// Register the new RPC handlers.
for topic, handler := range newRPCHandlerByTopic {
s.registerRPC(topic, handler)
}
return nil
}
// deregisterFromPastFork deregisters appropriate gossip and RPC topic if there is a fork in the current epoch.
func (s *Service) deregisterFromPastFork(currentEpoch primitives.Epoch) error {
// Extract the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
// Get the fork.
currentFork, err := forks.Fork(currentEpoch)
if err != nil {
return errors.Wrap(err, "genesis validators root")
}
// If we are still in our genesis fork version then exit early.
if currentFork.Epoch == params.BeaconConfig().GenesisEpoch {
return nil
}
// Get the epoch after the fork epoch.
afterForkEpoch := currentFork.Epoch + 1
// Start de-registering if the current epoch is after the fork epoch.
if currentEpoch != afterForkEpoch {
return nil
}
// Look at the previous fork's digest.
beforeForkEpoch := currentFork.Epoch - 1
beforeForkDigest, err := forks.ForkDigestFromEpoch(beforeForkEpoch, genesisValidatorsRoot[:])
if err != nil {
return errors.Wrap(err, "fork digest from epoch")
}
// Exit early if there are no topics with that particular digest.
if !s.subHandler.digestExists(beforeForkDigest) {
return nil
}
// Compute the RPC handlers that are no longer needed.
beforeForkHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(beforeForkEpoch)
if err != nil {
return errors.Wrap(err, "RPC handler by topic from before fork epoch")
}
forkHandlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentFork.Epoch)
if err != nil {
return errors.Wrap(err, "RPC handler by topic from fork epoch")
}
topicsToRemove := removedRPCTopics(beforeForkHandlerByTopic, forkHandlerByTopic)
for topic := range topicsToRemove {
fullTopic := topic + s.cfg.p2p.Encoding().ProtocolSuffix()
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullTopic))
}
// Run through all our current active topics and see
// if there are any subscriptions to be removed.
for _, t := range s.subHandler.allTopics() {
retDigest, err := p2p.ExtractGossipDigest(t)
if err != nil {
log.WithError(err).Error("Could not retrieve digest")
continue
}
if retDigest == beforeForkDigest {
s.unSubscribeFromTopic(t)
}
}
return nil
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/network/forks"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
)
@@ -230,7 +231,8 @@ func TestService_CheckForPreviousEpochFork(t *testing.T) {
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
}
r.registerRPCHandlers()
err := r.registerRPCHandlers()
assert.NoError(t, err)
return r
},
currEpoch: 10,
@@ -278,10 +280,21 @@ func TestService_CheckForPreviousEpochFork(t *testing.T) {
prevGenesis := chainService.Genesis
// To allow registration of v1 handlers
chainService.Genesis = time.Now().Add(-1 * oneEpoch())
r.registerRPCHandlers()
err := r.registerRPCHandlers()
assert.NoError(t, err)
chainService.Genesis = prevGenesis
r.registerRPCHandlersAltair()
previous, err := r.rpcHandlerByTopicFromFork(version.Phase0)
assert.NoError(t, err)
next, err := r.rpcHandlerByTopicFromFork(version.Altair)
assert.NoError(t, err)
handlerByTopic := addedRPCHandlerByTopic(previous, next)
for topic, handler := range handlerByTopic {
r.registerRPC(topic, handler)
}
genRoot := r.cfg.clock.GenesisValidatorsRoot()
digest, err := forks.ForkDigestFromEpoch(0, genRoot[:])

View File

@@ -9,117 +9,151 @@ import (
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/pkg/errors"
ssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
)
// Time to first byte timeout. The maximum time to wait for first byte of
// request response (time-to-first-byte). The client is expected to give up if
// they don't receive the first byte within 5 seconds.
var ttfbTimeout = params.BeaconConfig().TtfbTimeoutDuration()
var (
// Time to first byte timeout. The maximum time to wait for first byte of
// request response (time-to-first-byte). The client is expected to give up if
// they don't receive the first byte within 5 seconds.
ttfbTimeout = params.BeaconConfig().TtfbTimeoutDuration()
// respTimeout is the maximum time for complete response transfer.
var respTimeout = params.BeaconConfig().RespTimeoutDuration()
// respTimeout is the maximum time for complete response transfer.
respTimeout = params.BeaconConfig().RespTimeoutDuration()
)
// rpcHandler is responsible for handling and responding to any incoming message.
// This method may return an error to internal monitoring, but the error will
// not be relayed to the peer.
type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error
// registerRPCHandlers for p2p RPC.
func (s *Service) registerRPCHandlers() {
currEpoch := slots.ToEpoch(s.cfg.clock.CurrentSlot())
// Register V2 handlers if we are past altair fork epoch.
if currEpoch >= params.BeaconConfig().AltairForkEpoch {
s.registerRPC(
p2p.RPCStatusTopicV1,
s.statusRPCHandler,
)
s.registerRPC(
p2p.RPCGoodByeTopicV1,
s.goodbyeRPCHandler,
)
s.registerRPC(
p2p.RPCPingTopicV1,
s.pingHandler,
)
s.registerRPCHandlersAltair()
// rpcHandlerByTopicFromFork returns the RPC handlers for a given fork index.
func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandler, error) {
switch forkIndex {
// PhaseO: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#messages
case version.Phase0:
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
p2p.RPCBlocksByRangeTopicV1: s.beaconBlocksByRangeRPCHandler,
p2p.RPCBlocksByRootTopicV1: s.beaconBlocksRootRPCHandler,
p2p.RPCPingTopicV1: s.pingHandler,
p2p.RPCMetaDataTopicV1: s.metaDataHandler,
}, nil
if currEpoch >= params.BeaconConfig().DenebForkEpoch {
s.registerRPCHandlersDeneb()
}
return
// Altair: https://github.com/ethereum/consensus-specs/tree/dev/specs/altair#messages
// Bellatrix: https://github.com/ethereum/consensus-specs/tree/dev/specs/bellatrix#messages
// Capella: https://github.com/ethereum/consensus-specs/tree/dev/specs/capella#messages
case version.Altair, version.Bellatrix, version.Capella:
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler, // Modified in Altair
p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler, // Modified in Altair
p2p.RPCPingTopicV1: s.pingHandler,
p2p.RPCMetaDataTopicV2: s.metaDataHandler, // Modified in Altair
}, nil
// Deneb: https://github.com/ethereum/consensus-specs/blob/dev/specs/deneb/p2p-interface.md#messages
// Electra: https://github.com/ethereum/consensus-specs/blob/dev/specs/electra/p2p-interface.md#messages
case version.Deneb, version.Electra:
return map[string]rpcHandler{
p2p.RPCStatusTopicV1: s.statusRPCHandler,
p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler,
p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler,
p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler,
p2p.RPCPingTopicV1: s.pingHandler,
p2p.RPCMetaDataTopicV2: s.metaDataHandler,
p2p.RPCBlobSidecarsByRootTopicV1: s.blobSidecarByRootRPCHandler, // Added in Deneb
p2p.RPCBlobSidecarsByRangeTopicV1: s.blobSidecarsByRangeRPCHandler, // Added in Deneb
}, nil
default:
return nil, errors.Errorf("RPC handler not found for fork index %d", forkIndex)
}
s.registerRPC(
p2p.RPCStatusTopicV1,
s.statusRPCHandler,
)
s.registerRPC(
p2p.RPCGoodByeTopicV1,
s.goodbyeRPCHandler,
)
s.registerRPC(
p2p.RPCBlocksByRangeTopicV1,
s.beaconBlocksByRangeRPCHandler,
)
s.registerRPC(
p2p.RPCBlocksByRootTopicV1,
s.beaconBlocksRootRPCHandler,
)
s.registerRPC(
p2p.RPCPingTopicV1,
s.pingHandler,
)
s.registerRPC(
p2p.RPCMetaDataTopicV1,
s.metaDataHandler,
)
}
// registerRPCHandlers for altair.
func (s *Service) registerRPCHandlersAltair() {
s.registerRPC(
p2p.RPCBlocksByRangeTopicV2,
s.beaconBlocksByRangeRPCHandler,
)
s.registerRPC(
p2p.RPCBlocksByRootTopicV2,
s.beaconBlocksRootRPCHandler,
)
s.registerRPC(
p2p.RPCMetaDataTopicV2,
s.metaDataHandler,
)
// rpcHandlerByTopic returns the RPC handlers for a given epoch.
func (s *Service) rpcHandlerByTopicFromEpoch(epoch primitives.Epoch) (map[string]rpcHandler, error) {
// Get the beacon config.
beaconConfig := params.BeaconConfig()
if epoch >= beaconConfig.ElectraForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Electra)
}
if epoch >= beaconConfig.DenebForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Deneb)
}
if epoch >= beaconConfig.CapellaForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Capella)
}
if epoch >= beaconConfig.BellatrixForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Bellatrix)
}
if epoch >= beaconConfig.AltairForkEpoch {
return s.rpcHandlerByTopicFromFork(version.Altair)
}
return s.rpcHandlerByTopicFromFork(version.Phase0)
}
func (s *Service) registerRPCHandlersDeneb() {
s.registerRPC(
p2p.RPCBlobSidecarsByRangeTopicV1,
s.blobSidecarsByRangeRPCHandler,
)
s.registerRPC(
p2p.RPCBlobSidecarsByRootTopicV1,
s.blobSidecarByRootRPCHandler,
)
// addedRPCHandlerByTopic returns the RPC handlers that are added in the new map that are not present in the old map.
func addedRPCHandlerByTopic(previous, next map[string]rpcHandler) map[string]rpcHandler {
added := make(map[string]rpcHandler)
for topic, handler := range next {
if _, ok := previous[topic]; !ok {
added[topic] = handler
}
}
return added
}
// Remove all v1 Stream handlers that are no longer supported
// from altair onwards.
func (s *Service) unregisterPhase0Handlers() {
fullBlockRangeTopic := p2p.RPCBlocksByRangeTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
fullBlockRootTopic := p2p.RPCBlocksByRootTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
fullMetadataTopic := p2p.RPCMetaDataTopicV1 + s.cfg.p2p.Encoding().ProtocolSuffix()
// removedTopics returns the topics that are removed in the new map that are not present in the old map.
func removedRPCTopics(previous, next map[string]rpcHandler) map[string]bool {
removed := make(map[string]bool)
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullBlockRangeTopic))
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullBlockRootTopic))
s.cfg.p2p.Host().RemoveStreamHandler(protocol.ID(fullMetadataTopic))
for topic := range previous {
if _, ok := next[topic]; !ok {
removed[topic] = true
}
}
return removed
}
// registerRPCHandlers for p2p RPC.
func (s *Service) registerRPCHandlers() error {
// Get the current epoch.
currentSlot := s.cfg.clock.CurrentSlot()
currentEpoch := slots.ToEpoch(currentSlot)
// Get the RPC handlers for the current epoch.
handlerByTopic, err := s.rpcHandlerByTopicFromEpoch(currentEpoch)
if err != nil {
return errors.Wrap(err, "rpc handler by topic from epoch")
}
// Register the RPC handlers for the current epoch.
for topic, handler := range handlerByTopic {
s.registerRPC(topic, handler)
}
return nil
}
// registerRPC for a given topic with an expected protobuf message type.

View File

@@ -303,14 +303,21 @@ func (s *Service) waitForChainStart() {
ctxMap, err := ContextByteVersionsForValRoot(clock.GenesisValidatorsRoot())
if err != nil {
log.WithError(err).WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()).
log.
WithError(err).
WithField("genesisValidatorRoot", clock.GenesisValidatorsRoot()).
Error("sync service failed to initialize context version map")
return
}
s.ctxMap = ctxMap
// Register respective rpc handlers at state initialized event.
s.registerRPCHandlers()
err = s.registerRPCHandlers()
if err != nil {
log.WithError(err).Error("Could not register rpc handlers")
return
}
// Wait for chainstart in separate routine.
if startTime.After(prysmTime.Now()) {
time.Sleep(prysmTime.Until(startTime))

View File

@@ -53,6 +53,30 @@ func (s *Service) noopValidator(_ context.Context, _ peer.ID, msg *pubsub.Messag
return pubsub.ValidationAccept, nil
}
func sliceFromCount(count uint64) []uint64 {
result := make([]uint64, 0, count)
for item := range count {
result = append(result, item)
}
return result
}
func (s *Service) activeSyncSubnetIndices(currentSlot primitives.Slot) []uint64 {
if flags.Get().SubscribeToAllSubnets {
return sliceFromCount(params.BeaconConfig().SyncCommitteeSubnetCount)
}
// Get the current epoch.
currentEpoch := slots.ToEpoch(currentSlot)
// Retrieve the subnets we want to subscribe to.
subs := cache.SyncSubnetIDs.GetAllSubnets(currentEpoch)
return slice.SetUint64(subs)
}
// Register PubSub subscribers
func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.subscribe(
@@ -85,49 +109,34 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
s.attesterSlashingSubscriber,
digest,
)
if flags.Get().SubscribeToAllSubnets {
s.subscribeStaticWithSubnets(
p2p.AttestationSubnetTopicFormat,
s.validateCommitteeIndexBeaconAttestation, /* validator */
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
digest,
params.BeaconConfig().AttestationSubnetCount,
)
} else {
s.subscribeDynamicWithSubnets(
p2p.AttestationSubnetTopicFormat,
s.validateCommitteeIndexBeaconAttestation, /* validator */
s.committeeIndexBeaconAttestationSubscriber, /* message handler */
digest,
)
}
s.subscribeWithParameters(
p2p.AttestationSubnetTopicFormat,
s.validateCommitteeIndexBeaconAttestation,
s.committeeIndexBeaconAttestationSubscriber,
digest,
s.persistentAndAggregatorSubnetIndices,
s.attesterSubnetIndices,
)
// Altair Fork Version
if epoch >= params.BeaconConfig().AltairForkEpoch {
if params.BeaconConfig().AltairForkEpoch <= epoch {
s.subscribe(
p2p.SyncContributionAndProofSubnetTopicFormat,
s.validateSyncContributionAndProof,
s.syncContributionAndProofSubscriber,
digest,
)
if flags.Get().SubscribeToAllSubnets {
s.subscribeStaticWithSyncSubnets(
p2p.SyncCommitteeSubnetTopicFormat,
s.validateSyncCommitteeMessage, /* validator */
s.syncCommitteeMessageSubscriber, /* message handler */
digest,
)
} else {
s.subscribeDynamicWithSyncSubnets(
p2p.SyncCommitteeSubnetTopicFormat,
s.validateSyncCommitteeMessage, /* validator */
s.syncCommitteeMessageSubscriber, /* message handler */
digest,
)
}
s.subscribeWithParameters(
p2p.SyncCommitteeSubnetTopicFormat,
s.validateSyncCommitteeMessage,
s.syncCommitteeMessageSubscriber,
digest,
s.activeSyncSubnetIndices,
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
)
}
// New Gossip Topic in Capella
if epoch >= params.BeaconConfig().CapellaForkEpoch {
if params.BeaconConfig().CapellaForkEpoch <= epoch {
s.subscribe(
p2p.BlsToExecutionChangeSubnetTopicFormat,
s.validateBlsToExecutionChange,
@@ -137,13 +146,14 @@ func (s *Service) registerSubscribers(epoch primitives.Epoch, digest [4]byte) {
}
// New Gossip Topic in Deneb
if epoch >= params.BeaconConfig().DenebForkEpoch {
s.subscribeStaticWithSubnets(
if params.BeaconConfig().DenebForkEpoch <= epoch {
s.subscribeWithParameters(
p2p.BlobSubnetTopicFormat,
s.validateBlob, /* validator */
s.blobSubscriber, /* message handler */
s.validateBlob,
s.blobSubscriber,
digest,
params.BeaconConfig().BlobsidecarSubnetCount,
func(primitives.Slot) []uint64 { return sliceFromCount(params.BeaconConfig().BlobsidecarSubnetCount) },
func(currentSlot primitives.Slot) []uint64 { return []uint64{} },
)
}
}
@@ -324,132 +334,6 @@ func (s *Service) wrapAndReportValidation(topic string, v wrappedVal) (string, p
}
}
// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribeStaticWithSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte, subnetCount uint64) {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
if err != nil {
// Impossible condition as it would mean digest does not exist.
panic(err)
}
base := p2p.GossipTopicMappings(topic, e)
if base == nil {
// Impossible condition as it would mean topic does not exist.
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
}
for i := uint64(0); i < subnetCount; i++ {
s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle)
}
genesis := s.cfg.clock.GenesisTime()
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
go func() {
for {
select {
case <-s.ctx.Done():
ticker.Done()
return
case <-ticker.C():
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
continue
}
valid, err := isDigestValid(digest, genesis, genRoot)
if err != nil {
log.Error(err)
continue
}
if !valid {
log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
// Unsubscribes from all our current subnets.
for i := uint64(0); i < subnetCount; i++ {
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix()
s.unSubscribeFromTopic(fullTopic)
}
ticker.Done()
return
}
// Check every slot that there are enough peers
for i := uint64(0); i < subnetCount; i++ {
if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) {
_, err := s.cfg.p2p.FindPeersWithSubnet(
s.ctx,
s.addDigestAndIndexToTopic(topic, digest, i),
i,
flags.Get().MinimumPeersPerSubnet,
)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
return
}
}
}
}
}
}()
}
// subscribe to a dynamically changing list of subnets. This method expects a fmt compatible
// string for the topic name and the list of subnets for subscribed topics that should be
// maintained.
func (s *Service) subscribeDynamicWithSubnets(
topicFormat string,
validate wrappedVal,
handle subHandler,
digest [4]byte,
) {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
if err != nil {
// Impossible condition as it would mean digest does not exist.
panic(err)
}
base := p2p.GossipTopicMappings(topicFormat, e)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
}
subscriptions := make(map[uint64]*pubsub.Subscription, params.BeaconConfig().MaxCommitteesPerSlot)
genesis := s.cfg.clock.GenesisTime()
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
go func() {
for {
select {
case <-s.ctx.Done():
ticker.Done()
return
case currentSlot := <-ticker.C():
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
continue
}
valid, err := isDigestValid(digest, genesis, genRoot)
if err != nil {
log.Error(err)
continue
}
if !valid {
log.Warnf("Attestation subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
// Unsubscribes from all our current subnets.
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
ticker.Done()
return
}
wantedSubs := s.retrievePersistentSubs(currentSlot)
s.reValidateSubscriptions(subscriptions, wantedSubs, topicFormat, digest)
for _, idx := range wantedSubs {
s.subscribeAggregatorSubnet(subscriptions, idx, digest, validate, handle)
}
// find desired subs for attesters
attesterSubs := s.attesterSubnetIndices(currentSlot)
for _, idx := range attesterSubs {
s.lookupAttesterSubnets(digest, idx)
}
}
}
}()
}
// reValidateSubscriptions unsubscribe from topics we are currently subscribed to but that are
// not in the list of wanted subnets.
// TODO: Rename this functions as it does not only revalidate subscriptions.
@@ -477,96 +361,44 @@ func (s *Service) reValidateSubscriptions(
}
}
// subscribe missing subnets for our aggregators.
func (s *Service) subscribeAggregatorSubnet(
subscriptions map[uint64]*pubsub.Subscription,
idx uint64,
// searchForPeers searches for peers in the given subnets.
func (s *Service) searchForPeers(
ctx context.Context,
topicFormat string,
digest [4]byte,
validate wrappedVal,
handle subHandler,
currentSlot primitives.Slot,
getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64,
getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64,
) {
// do not subscribe if we have no peers in the same
// subnet
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})]
subnetTopic := fmt.Sprintf(topic, digest, idx)
// check if subscription exists and if not subscribe the relevant subnet.
if _, exists := subscriptions[idx]; !exists {
subscriptions[idx] = s.subscribeWithBase(subnetTopic, validate, handle)
}
if !s.enoughPeersAreConnected(subnetTopic) {
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet)
// Retrieve the subnets we want to subscribe to.
subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot)
// Retrieve the subnets we want to find peers for.
subnetsToFindPeersOnlyIndex := getSubnetsToFindPeersOnly(currentSlot)
// Combine the subnets to subscribe and the subnets to find peers for.
subnetsToFindPeersIndex := slice.SetUint64(append(subnetsToSubscribeIndex, subnetsToFindPeersOnlyIndex...))
// Find new peers for wanted subnets if needed.
for _, subnetIndex := range subnetsToFindPeersIndex {
topic := fmt.Sprintf(topicFormat, digest, subnetIndex)
// Check if we have enough peers in the subnet. Skip if we do.
if s.enoughPeersAreConnected(topic) {
continue
}
// Not enough peers in the subnet, we need to search for more.
_, err := s.cfg.p2p.FindPeersWithSubnet(ctx, topic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}
}
// subscribe to a static subnet with the given topic and index. A given validator and subscription handler is
// used to handle messages from the subnet. The base protobuf message is used to initialize new messages for decoding.
func (s *Service) subscribeStaticWithSyncSubnets(topic string, validator wrappedVal, handle subHandler, digest [4]byte) {
genRoot := s.cfg.clock.GenesisValidatorsRoot()
_, e, err := forks.RetrieveForkDataFromDigest(digest, genRoot[:])
if err != nil {
panic(err)
}
base := p2p.GossipTopicMappings(topic, e)
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
}
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
s.subscribeWithBase(s.addDigestAndIndexToTopic(topic, digest, i), validator, handle)
}
genesis := s.cfg.clock.GenesisTime()
ticker := slots.NewSlotTicker(genesis, params.BeaconConfig().SecondsPerSlot)
go func() {
for {
select {
case <-s.ctx.Done():
ticker.Done()
return
case <-ticker.C():
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
continue
}
valid, err := isDigestValid(digest, genesis, genRoot)
if err != nil {
log.Error(err)
continue
}
if !valid {
log.Warnf("Sync subnets with digest %#x are no longer valid, unsubscribing from all of them.", digest)
// Unsubscribes from all our current subnets.
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
fullTopic := fmt.Sprintf(topic, digest, i) + s.cfg.p2p.Encoding().ProtocolSuffix()
s.unSubscribeFromTopic(fullTopic)
}
ticker.Done()
return
}
// Check every slot that there are enough peers
for i := uint64(0); i < params.BeaconConfig().SyncCommitteeSubnetCount; i++ {
if !s.enoughPeersAreConnected(s.addDigestAndIndexToTopic(topic, digest, i)) {
_, err := s.cfg.p2p.FindPeersWithSubnet(
s.ctx,
s.addDigestAndIndexToTopic(topic, digest, i),
i,
flags.Get().MinimumPeersPerSubnet,
)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
return
}
}
}
}
}
}()
}
// subscribeToSyncSubnets subscribes to needed sync subnets, unsubscribe from unneeded ones and search for more peers if needed.
// subscribeToSubnets subscribes to needed subnets, unsubscribe from unneeded ones and search for more peers if needed.
// Returns `true` if the digest is valid (wrt. the current epoch), `false` otherwise.
func (s *Service) subscribeToSyncSubnets(
func (s *Service) subscribeToSubnets(
topicFormat string,
digest [4]byte,
genesisValidatorsRoot [fieldparams.RootLength]byte,
@@ -575,16 +407,15 @@ func (s *Service) subscribeToSyncSubnets(
currentSlot primitives.Slot,
validate wrappedVal,
handle subHandler,
getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64,
getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64,
) bool {
// Get sync subnets topic.
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.SyncCommitteeMessage{})]
// Do not subscribe if not synced.
if s.chainStarted.IsSet() && s.cfg.initialSync.Syncing() {
return true
}
// Do not subscribe is the digest is not valid.
// Check the validity of the digest.
valid, err := isDigestValid(digest, genesisTime, genesisValidatorsRoot)
if err != nil {
log.Error(err)
@@ -593,23 +424,25 @@ func (s *Service) subscribeToSyncSubnets(
// Unsubscribe from all subnets if the digest is not valid. It's likely to be the case after a hard fork.
if !valid {
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warn("Sync subnets with this digest are no longer valid, unsubscribing from all of them.")
description := topicFormat
if pos := strings.LastIndex(topicFormat, "/"); pos != -1 {
description = topicFormat[pos+1:]
}
log.WithField("digest", fmt.Sprintf("%#x", digest)).Warningf("%s subnets with this digest are no longer valid, unsubscribing from all of them.", description)
s.reValidateSubscriptions(subscriptions, []uint64{}, topicFormat, digest)
return false
}
// Get the current epoch.
currentEpoch := slots.ToEpoch(currentSlot)
// Retrieve the subnets we want to subscribe to.
wantedSubnetsIndex := s.retrieveActiveSyncSubnets(currentEpoch)
subnetsToSubscribeIndex := getSubnetsToSubscribe(currentSlot)
// Remove subscriptions that are no longer wanted.
s.reValidateSubscriptions(subscriptions, wantedSubnetsIndex, topicFormat, digest)
s.reValidateSubscriptions(subscriptions, subnetsToSubscribeIndex, topicFormat, digest)
// Subscribe to wanted subnets.
for _, subnetIndex := range wantedSubnetsIndex {
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
for _, subnetIndex := range subnetsToSubscribeIndex {
subnetTopic := fmt.Sprintf(topicFormat, digest, subnetIndex)
// Check if subscription exists.
if _, exists := subscriptions[subnetIndex]; exists {
@@ -620,38 +453,20 @@ func (s *Service) subscribeToSyncSubnets(
subscription := s.subscribeWithBase(subnetTopic, validate, handle)
subscriptions[subnetIndex] = subscription
}
// Find new peers for wanted subnets if needed.
for _, subnetIndex := range wantedSubnetsIndex {
subnetTopic := fmt.Sprintf(topic, digest, subnetIndex)
// Check if we have enough peers in the subnet. Skip if we do.
if s.enoughPeersAreConnected(subnetTopic) {
continue
}
// Not enough peers in the subnet, we need to search for more.
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, subnetIndex, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}
return true
}
// subscribeDynamicWithSyncSubnets subscribes to a dynamically changing list of subnets.
func (s *Service) subscribeDynamicWithSyncSubnets(
// subscribeWithParameters subscribes to a list of subnets.
func (s *Service) subscribeWithParameters(
topicFormat string,
validate wrappedVal,
handle subHandler,
digest [4]byte,
getSubnetsToSubscribe func(currentSlot primitives.Slot) []uint64,
getSubnetsToFindPeersOnly func(currentSlot primitives.Slot) []uint64,
) {
// Retrieve the number of committee subnets we need to subscribe to.
syncCommiteeSubnetsCount := params.BeaconConfig().SyncCommitteeSubnetCount
// Initialize the subscriptions map.
subscriptions := make(map[uint64]*pubsub.Subscription, syncCommiteeSubnetsCount)
subscriptions := make(map[uint64]*pubsub.Subscription)
// Retrieve the genesis validators root.
genesisValidatorsRoot := s.cfg.clock.GenesisValidatorsRoot()
@@ -678,14 +493,20 @@ func (s *Service) subscribeDynamicWithSyncSubnets(
// Retrieve the current slot.
currentSlot := s.cfg.clock.CurrentSlot()
// Subscribe to subnets.
s.subscribeToSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
// Derive a new context and cancel function.
ctx, cancel := context.WithCancel(s.ctx)
go func() {
// Subscribe to the sync subnets.
s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
// Search for peers.
s.searchForPeers(ctx, topicFormat, digest, currentSlot, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
for {
select {
case currentSlot := <-ticker.C():
isDigestValid := s.subscribeToSyncSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle)
isDigestValid := s.subscribeToSubnets(topicFormat, digest, genesisValidatorsRoot, genesisTime, subscriptions, currentSlot, validate, handle, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
// Stop the ticker if the digest is not valid. Likely to happen after a hard fork.
if !isDigestValid {
@@ -693,7 +514,11 @@ func (s *Service) subscribeDynamicWithSyncSubnets(
return
}
// Search for peers.
s.searchForPeers(ctx, topicFormat, digest, currentSlot, getSubnetsToSubscribe, getSubnetsToFindPeersOnly)
case <-s.ctx.Done():
cancel()
ticker.Done()
return
}
@@ -701,21 +526,8 @@ func (s *Service) subscribeDynamicWithSyncSubnets(
}()
}
// lookup peers for attester specific subnets.
func (s *Service) lookupAttesterSubnets(digest [4]byte, idx uint64) {
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})]
subnetTopic := fmt.Sprintf(topic, digest, idx)
if !s.enoughPeersAreConnected(subnetTopic) {
// perform a search for peers with the desired committee index.
_, err := s.cfg.p2p.FindPeersWithSubnet(s.ctx, subnetTopic, idx, flags.Get().MinimumPeersPerSubnet)
if err != nil {
log.WithError(err).Debug("Could not search for peers")
}
}
}
func (s *Service) unSubscribeFromTopic(topic string) {
log.WithField("topic", topic).Debug("Unsubscribing from topic")
log.WithField("topic", topic).Info("Unsubscribed from")
if err := s.cfg.p2p.PubSub().UnregisterTopicValidator(topic); err != nil {
log.WithError(err).Error("Could not unregister topic validator")
}
@@ -740,19 +552,16 @@ func (s *Service) enoughPeersAreConnected(subnetTopic string) bool {
return peersWithSubnetCount >= threshold
}
func (s *Service) retrievePersistentSubs(currSlot primitives.Slot) []uint64 {
// Persistent subscriptions from validators
persistentSubs := s.persistentSubnetIndices()
// Update desired topic indices for aggregator
wantedSubs := s.aggregatorSubnetIndices(currSlot)
func (s *Service) persistentAndAggregatorSubnetIndices(currentSlot primitives.Slot) []uint64 {
if flags.Get().SubscribeToAllSubnets {
return sliceFromCount(params.BeaconConfig().AttestationSubnetCount)
}
// Combine subscriptions to get all requested subscriptions
return slice.SetUint64(append(persistentSubs, wantedSubs...))
}
persistentSubnetIndices := s.persistentSubnetIndices()
aggregatorSubnetIndices := s.aggregatorSubnetIndices(currentSlot)
func (*Service) retrieveActiveSyncSubnets(currEpoch primitives.Epoch) []uint64 {
subs := cache.SyncSubnetIDs.GetAllSubnets(currEpoch)
return slice.SetUint64(subs)
// Combine subscriptions to get all requested subscriptions.
return slice.SetUint64(append(persistentSubnetIndices, aggregatorSubnetIndices...))
}
// filters out required peers for the node to function, not
@@ -768,7 +577,7 @@ func (s *Service) filterNeededPeers(pids []peer.ID) []peer.ID {
return pids
}
currSlot := s.cfg.clock.CurrentSlot()
wantedSubs := s.retrievePersistentSubs(currSlot)
wantedSubs := s.persistentAndAggregatorSubnetIndices(currSlot)
wantedSubs = slice.SetUint64(append(wantedSubs, s.attesterSubnetIndices(currSlot)...))
topic := p2p.GossipTypeMapping[reflect.TypeOf(&ethpb.Attestation{})]

View File

@@ -312,37 +312,6 @@ func TestRevalidateSubscription_CorrectlyFormatsTopic(t *testing.T) {
require.LogsDoNotContain(t, hook, "Could not unregister topic validator")
}
func TestStaticSubnets(t *testing.T) {
p := p2ptest.NewTestP2P(t)
ctx, cancel := context.WithCancel(context.Background())
chain := &mockChain.ChainService{
Genesis: time.Now(),
ValidatorsRoot: [32]byte{'A'},
}
r := Service{
ctx: ctx,
cfg: &config{
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
p2p: p,
},
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
}
defaultTopic := "/eth2/%x/beacon_attestation_%d"
d, err := r.currentForkDigest()
assert.NoError(t, err)
r.subscribeStaticWithSubnets(defaultTopic, r.noopValidator, func(_ context.Context, msg proto.Message) error {
// no-op
return nil
}, d, params.BeaconConfig().AttestationSubnetCount)
topics := r.cfg.p2p.PubSub().GetTopics()
if uint64(len(topics)) != params.BeaconConfig().AttestationSubnetCount {
t.Errorf("Wanted the number of subnet topics registered to be %d but got %d", params.BeaconConfig().AttestationSubnetCount, len(topics))
}
cancel()
}
func Test_wrapAndReportValidation(t *testing.T) {
mChain := &mockChain.ChainService{
Genesis: time.Now(),
@@ -539,37 +508,6 @@ func TestFilterSubnetPeers(t *testing.T) {
assert.Equal(t, 1, len(recPeers), "expected at least 1 suitable peer to prune")
}
func TestSubscribeWithSyncSubnets_StaticOK(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.MainnetTestConfig().Copy()
cfg.SecondsPerSlot = 1
params.OverrideBeaconConfig(cfg)
p := p2ptest.NewTestP2P(t)
ctx, cancel := context.WithCancel(context.Background())
chain := &mockChain.ChainService{
Genesis: time.Now(),
ValidatorsRoot: [32]byte{'A'},
}
r := Service{
ctx: ctx,
cfg: &config{
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
p2p: p,
},
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
}
// Empty cache at the end of the test.
defer cache.SyncSubnetIDs.EmptyAllCaches()
digest, err := r.currentForkDigest()
assert.NoError(t, err)
r.subscribeStaticWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics()))
cancel()
}
func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.MainnetConfig().Copy()
@@ -600,7 +538,7 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
cache.SyncSubnetIDs.AddSyncCommitteeSubnets([]byte("pubkey"), currEpoch, []uint64{0, 1}, 10*time.Second)
digest, err := r.currentForkDigest()
assert.NoError(t, err)
r.subscribeDynamicWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
r.subscribeWithParameters(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, r.activeSyncSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} })
time.Sleep(2 * time.Second)
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
topicMap := map[string]bool{}
@@ -615,46 +553,6 @@ func TestSubscribeWithSyncSubnets_DynamicOK(t *testing.T) {
cancel()
}
func TestSubscribeWithSyncSubnets_StaticSwitchFork(t *testing.T) {
p := p2ptest.NewTestP2P(t)
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.AltairForkEpoch = 1
cfg.SecondsPerSlot = 1
params.OverrideBeaconConfig(cfg)
params.BeaconConfig().InitializeForkSchedule()
ctx, cancel := context.WithCancel(context.Background())
currSlot := primitives.Slot(100)
chain := &mockChain.ChainService{
Genesis: time.Now().Add(-time.Duration(uint64(params.BeaconConfig().SlotsPerEpoch)*params.BeaconConfig().SecondsPerSlot) * time.Second),
ValidatorsRoot: [32]byte{'A'},
Slot: &currSlot,
}
r := Service{
ctx: ctx,
cfg: &config{
chain: chain,
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
p2p: p,
},
chainStarted: abool.New(),
subHandler: newSubTopicHandler(),
}
// Empty cache at the end of the test.
defer cache.SyncSubnetIDs.EmptyAllCaches()
genRoot := r.cfg.clock.GenesisValidatorsRoot()
digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:])
assert.NoError(t, err)
r.subscribeStaticWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
assert.Equal(t, int(params.BeaconConfig().SyncCommitteeSubnetCount), len(r.cfg.p2p.PubSub().GetTopics()))
// Expect that all old topics will be unsubscribed.
time.Sleep(2 * time.Second)
assert.Equal(t, 0, len(r.cfg.p2p.PubSub().GetTopics()))
cancel()
}
func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) {
params.SetupTestConfigCleanup(t)
p := p2ptest.NewTestP2P(t)
@@ -689,7 +587,7 @@ func TestSubscribeWithSyncSubnets_DynamicSwitchFork(t *testing.T) {
digest, err := signing.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, genRoot[:])
assert.NoError(t, err)
r.subscribeDynamicWithSyncSubnets(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest)
r.subscribeWithParameters(p2p.SyncCommitteeSubnetTopicFormat, nil, nil, digest, r.activeSyncSubnetIndices, func(currentSlot primitives.Slot) []uint64 { return []uint64{} })
time.Sleep(2 * time.Second)
assert.Equal(t, 2, len(r.cfg.p2p.PubSub().GetTopics()))
topicMap := map[string]bool{}