Files
prysm/beacon-chain/sync/rpc_light_client_test.go
Bastin 5c1d827335 Unify LC API 1/2 (bootstrap) (#15476)
* add versiotToForkEpoch map

* Unify LC API (bootstrap)
2025-07-10 13:07:46 +00:00

521 lines
19 KiB
Go

package sync
import (
"sync"
"testing"
"time"
"github.com/OffchainLabs/prysm/v6/async/abool"
mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing"
lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client"
db "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/p2p"
p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v6/beacon-chain/startup"
mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing"
"github.com/OffchainLabs/prysm/v6/config/features"
"github.com/OffchainLabs/prysm/v6/config/params"
leakybucket "github.com/OffchainLabs/prysm/v6/container/leaky-bucket"
"github.com/OffchainLabs/prysm/v6/network/forks"
pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v6/runtime/version"
"github.com/OffchainLabs/prysm/v6/testing/require"
"github.com/OffchainLabs/prysm/v6/testing/util"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/protocol"
)
func TestRPC_LightClientBootstrap(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
ctx := t.Context()
p2pService := p2ptest.NewTestP2P(t)
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
require.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
chainService := &mockChain.ChainService{
ValidatorsRoot: [32]byte{'A'},
Genesis: time.Unix(time.Now().Unix(), 0),
}
d := db.SetupDB(t)
r := Service{
ctx: ctx,
cfg: &config{
p2p: p2pService,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService,
beaconDB: d,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
stateNotifier: &mockChain.MockStateNotifier{},
},
chainStarted: abool.New(),
lcStore: lightClient.NewLightClientStore(d),
subHandler: newSubTopicHandler(),
rateLimiter: newRateLimiter(p1),
}
pcl := protocol.ID(p2p.RPCLightClientBootstrapTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false)
altairDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
bellatrixDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().BellatrixForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
capellaDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().CapellaForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
denebDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().DenebForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
electraDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().ElectraForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
for i := 1; i <= 5; i++ {
t.Run(version.String(i), func(t *testing.T) {
l := util.NewTestLightClient(t, i)
bootstrap, err := lightClient.NewLightClientBootstrapFromBeaconState(ctx, l.State.Slot(), l.State, l.Block)
require.NoError(t, err)
blockRoot, err := l.Block.Block().HashTreeRoot()
require.NoError(t, err)
require.NoError(t, r.lcStore.SaveLightClientBootstrap(ctx, blockRoot, bootstrap))
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
expectSuccess(t, stream)
rpcCtx, err := readContextFromStream(stream)
require.NoError(t, err)
require.Equal(t, 4, len(rpcCtx))
var resSSZ []byte
switch i {
case version.Altair:
require.DeepSSZEqual(t, altairDigest[:], rpcCtx)
var res pb.LightClientBootstrapAltair
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Bellatrix:
require.DeepSSZEqual(t, bellatrixDigest[:], rpcCtx)
var res pb.LightClientBootstrapAltair
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Capella:
require.DeepSSZEqual(t, capellaDigest[:], rpcCtx)
var res pb.LightClientBootstrapCapella
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Deneb:
require.DeepSSZEqual(t, denebDigest[:], rpcCtx)
var res pb.LightClientBootstrapDeneb
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Electra:
require.DeepSSZEqual(t, electraDigest[:], rpcCtx)
var res pb.LightClientBootstrapElectra
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
default:
t.Fatalf("unsupported version %d", i)
}
bootstrapSSZ, err := bootstrap.MarshalSSZ()
require.NoError(t, err)
require.DeepSSZEqual(t, resSSZ, bootstrapSSZ)
})
stream1, err := p1.BHost.NewStream(t.Context(), p2.BHost.ID(), pcl)
require.NoError(t, err)
err = r.lightClientBootstrapRPCHandler(ctx, &blockRoot, stream1)
require.NoError(t, err)
if util.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
})
}
}
func TestRPC_LightClientOptimisticUpdate(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
ctx := t.Context()
p2pService := p2ptest.NewTestP2P(t)
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
require.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
chainService := &mockChain.ChainService{
ValidatorsRoot: [32]byte{'A'},
Genesis: time.Unix(time.Now().Unix(), 0),
}
d := db.SetupDB(t)
r := Service{
ctx: ctx,
cfg: &config{
p2p: p2pService,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService,
beaconDB: d,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
stateNotifier: &mockChain.MockStateNotifier{},
},
chainStarted: abool.New(),
lcStore: &lightClient.Store{},
subHandler: newSubTopicHandler(),
rateLimiter: newRateLimiter(p1),
}
pcl := protocol.ID(p2p.RPCLightClientOptimisticUpdateTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false)
altairDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
bellatrixDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().BellatrixForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
capellaDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().CapellaForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
denebDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().DenebForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
electraDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().ElectraForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
for i := 1; i <= 5; i++ {
t.Run(version.String(i), func(t *testing.T) {
l := util.NewTestLightClient(t, i)
update, err := lightClient.NewLightClientOptimisticUpdateFromBeaconState(ctx, l.State.Slot(), l.State, l.Block, l.AttestedState, l.AttestedBlock)
require.NoError(t, err)
r.lcStore.SetLastOptimisticUpdate(update)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
expectSuccess(t, stream)
var resSSZ []byte
rpcCtx, err := readContextFromStream(stream)
require.NoError(t, err)
require.Equal(t, 4, len(rpcCtx))
switch i {
case version.Altair:
require.DeepSSZEqual(t, altairDigest[:], rpcCtx)
var res pb.LightClientOptimisticUpdateAltair
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Bellatrix:
require.DeepSSZEqual(t, bellatrixDigest[:], rpcCtx)
var res pb.LightClientOptimisticUpdateAltair
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Capella:
require.DeepSSZEqual(t, capellaDigest[:], rpcCtx)
var res pb.LightClientOptimisticUpdateCapella
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Deneb:
require.DeepSSZEqual(t, denebDigest[:], rpcCtx)
var res pb.LightClientOptimisticUpdateDeneb
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Electra:
require.DeepSSZEqual(t, electraDigest[:], rpcCtx)
var res pb.LightClientOptimisticUpdateDeneb
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
default:
t.Fatalf("unsupported version %d", i)
}
updateSSZ, err := update.MarshalSSZ()
require.NoError(t, err)
require.DeepSSZEqual(t, resSSZ, updateSSZ)
})
stream1, err := p1.BHost.NewStream(t.Context(), p2.BHost.ID(), pcl)
require.NoError(t, err)
err = r.lightClientOptimisticUpdateRPCHandler(ctx, nil, stream1)
require.NoError(t, err)
if util.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
})
}
}
func TestRPC_LightClientFinalityUpdate(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
ctx := t.Context()
p2pService := p2ptest.NewTestP2P(t)
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
require.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
chainService := &mockChain.ChainService{
ValidatorsRoot: [32]byte{'A'},
Genesis: time.Unix(time.Now().Unix(), 0),
}
d := db.SetupDB(t)
r := Service{
ctx: ctx,
cfg: &config{
p2p: p2pService,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService,
beaconDB: d,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
stateNotifier: &mockChain.MockStateNotifier{},
},
chainStarted: abool.New(),
lcStore: &lightClient.Store{},
subHandler: newSubTopicHandler(),
rateLimiter: newRateLimiter(p1),
}
pcl := protocol.ID(p2p.RPCLightClientFinalityUpdateTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false)
altairDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
bellatrixDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().BellatrixForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
capellaDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().CapellaForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
denebDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().DenebForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
electraDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().ElectraForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
for i := 1; i <= 5; i++ {
t.Run(version.String(i), func(t *testing.T) {
l := util.NewTestLightClient(t, i)
update, err := lightClient.NewLightClientFinalityUpdateFromBeaconState(ctx, l.State.Slot(), l.State, l.Block, l.AttestedState, l.AttestedBlock, l.FinalizedBlock)
require.NoError(t, err)
r.lcStore.SetLastFinalityUpdate(update)
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
expectSuccess(t, stream)
var resSSZ []byte
rpcCtx, err := readContextFromStream(stream)
require.NoError(t, err)
require.Equal(t, 4, len(rpcCtx))
switch i {
case version.Altair:
require.DeepSSZEqual(t, altairDigest[:], rpcCtx)
var res pb.LightClientFinalityUpdateAltair
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Bellatrix:
require.DeepSSZEqual(t, bellatrixDigest[:], rpcCtx)
var res pb.LightClientFinalityUpdateAltair
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Capella:
require.DeepSSZEqual(t, capellaDigest[:], rpcCtx)
var res pb.LightClientFinalityUpdateCapella
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Deneb:
require.DeepSSZEqual(t, denebDigest[:], rpcCtx)
var res pb.LightClientFinalityUpdateDeneb
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Electra:
require.DeepSSZEqual(t, electraDigest[:], rpcCtx)
var res pb.LightClientFinalityUpdateElectra
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
default:
t.Fatalf("unsupported version %d", i)
}
updateSSZ, err := update.MarshalSSZ()
require.NoError(t, err)
require.DeepSSZEqual(t, resSSZ, updateSSZ)
})
stream1, err := p1.BHost.NewStream(t.Context(), p2.BHost.ID(), pcl)
require.NoError(t, err)
err = r.lightClientFinalityUpdateRPCHandler(ctx, nil, stream1)
require.NoError(t, err)
if util.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
})
}
}
func TestRPC_LightClientUpdatesByRange(t *testing.T) {
resetFn := features.InitWithReset(&features.Flags{
EnableLightClient: true,
})
defer resetFn()
ctx := t.Context()
p2pService := p2ptest.NewTestP2P(t)
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
require.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
chainService := &mockChain.ChainService{
ValidatorsRoot: [32]byte{'A'},
Genesis: time.Unix(time.Now().Unix(), 0),
}
d := db.SetupDB(t)
r := Service{
ctx: ctx,
cfg: &config{
p2p: p2pService,
initialSync: &mockSync.Sync{IsSyncing: false},
chain: chainService,
beaconDB: d,
clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot),
stateNotifier: &mockChain.MockStateNotifier{},
},
chainStarted: abool.New(),
lcStore: &lightClient.Store{},
subHandler: newSubTopicHandler(),
rateLimiter: newRateLimiter(p1),
}
pcl := protocol.ID(p2p.RPCLightClientUpdatesByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false)
altairDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
bellatrixDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().BellatrixForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
capellaDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().CapellaForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
denebDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().DenebForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
electraDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().ElectraForkEpoch, chainService.ValidatorsRoot[:])
require.NoError(t, err)
for i := 1; i <= 5; i++ {
t.Run(version.String(i), func(t *testing.T) {
for j := 0; j < 5; j++ {
l := util.NewTestLightClient(t, i, util.WithIncreasedAttestedSlot(uint64(j)))
update, err := lightClient.NewLightClientUpdateFromBeaconState(ctx, l.State.Slot(), l.State, l.Block, l.AttestedState, l.AttestedBlock, l.FinalizedBlock)
require.NoError(t, err)
require.NoError(t, r.cfg.beaconDB.SaveLightClientUpdate(ctx, uint64(j), update))
}
var wg sync.WaitGroup
wg.Add(1)
responseCounter := 0
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
expectSuccess(t, stream)
rpcCtx, err := readContextFromStream(stream)
require.NoError(t, err)
require.Equal(t, 4, len(rpcCtx))
var resSSZ []byte
switch i {
case version.Altair:
require.DeepSSZEqual(t, altairDigest[:], rpcCtx)
var res pb.LightClientUpdateAltair
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Bellatrix:
require.DeepSSZEqual(t, bellatrixDigest[:], rpcCtx)
var res pb.LightClientUpdateAltair
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Capella:
require.DeepSSZEqual(t, capellaDigest[:], rpcCtx)
var res pb.LightClientUpdateCapella
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Deneb:
require.DeepSSZEqual(t, denebDigest[:], rpcCtx)
var res pb.LightClientUpdateDeneb
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
case version.Electra:
require.DeepSSZEqual(t, electraDigest[:], rpcCtx)
var res pb.LightClientUpdateElectra
require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res))
resSSZ, err = res.MarshalSSZ()
require.NoError(t, err)
default:
t.Fatalf("unsupported version %d", i)
}
update, err := r.cfg.beaconDB.LightClientUpdates(ctx, 0, 4)
require.NoError(t, err)
bootstrapSSZ, err := update[uint64(responseCounter)].MarshalSSZ()
require.NoError(t, err)
require.DeepSSZEqual(t, resSSZ, bootstrapSSZ)
responseCounter++
})
stream1, err := p1.BHost.NewStream(t.Context(), p2.BHost.ID(), pcl)
require.NoError(t, err)
msg := pb.LightClientUpdatesByRangeRequest{
StartPeriod: 0,
Count: 5,
}
err = r.lightClientUpdatesByRangeRPCHandler(ctx, &msg, stream1)
require.NoError(t, err)
if util.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
})
}
}