From 5bbcfe5237550422b42b170aae5ddd24fced2d7f Mon Sep 17 00:00:00 2001 From: Bastin <43618253+Inspector-Butters@users.noreply.github.com> Date: Mon, 19 May 2025 17:15:13 +0200 Subject: [PATCH] Enable Light Client req/resp domain (#15281) * add rpc toppic mappings and types * placeholder funcs * bootstrap write chunk * add rpc toppic mappings and types * placeholder funcs * bootstrap write chunk * deps * add messageMapping entries * add handlers and register RPC * deps * tests * read context in tests * add tests * add flag and changelog entry * fix linter * deps * increase topic count in ratelimiter test * handle flag --- beacon-chain/p2p/BUILD.bazel | 1 + beacon-chain/p2p/rpc_topic_mappings.go | 48 +- beacon-chain/sync/BUILD.bazel | 4 + beacon-chain/sync/rate_limiter.go | 6 + beacon-chain/sync/rate_limiter_test.go | 2 +- beacon-chain/sync/rpc.go | 20 +- beacon-chain/sync/rpc_chunked_response.go | 77 +++ beacon-chain/sync/rpc_light_client.go | 222 +++++++++ beacon-chain/sync/rpc_light_client_test.go | 521 +++++++++++++++++++++ changelog/bastin_add-lc-req-resp.md | 3 + proto/prysm/v1alpha1/p2p_messages.pb.go | 100 +++- proto/prysm/v1alpha1/p2p_messages.proto | 24 +- proto/testing/test.pb.go | 4 +- 13 files changed, 998 insertions(+), 34 deletions(-) create mode 100644 beacon-chain/sync/rpc_light_client.go create mode 100644 beacon-chain/sync/rpc_light_client_test.go create mode 100644 changelog/bastin_add-lc-req-resp.md diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 05bc42e1f4..6bc5619f87 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -55,6 +55,7 @@ go_library( "//beacon-chain/startup:go_default_library", "//cmd/beacon-chain/flags:go_default_library", "//config/features:go_default_library", + "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/interfaces:go_default_library", "//consensus-types/primitives:go_default_library", diff --git a/beacon-chain/p2p/rpc_topic_mappings.go b/beacon-chain/p2p/rpc_topic_mappings.go index e19d3faba2..fc35fc2d2b 100644 --- a/beacon-chain/p2p/rpc_topic_mappings.go +++ b/beacon-chain/p2p/rpc_topic_mappings.go @@ -4,6 +4,7 @@ import ( "reflect" p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" @@ -43,6 +44,18 @@ const BlobSidecarsByRangeName = "/blob_sidecars_by_range" // BlobSidecarsByRootName is the name for the BlobSidecarsByRoot v1 message topic. const BlobSidecarsByRootName = "/blob_sidecars_by_root" +// LightClientBootstrapName is the name for the LightClientBootstrap message topic, +const LightClientBootstrapName = "/light_client_bootstrap" + +// LightClientUpdatesByRangeName is the name for the LightClientUpdatesByRange topic. +const LightClientUpdatesByRangeName = "/light_client_updates_by_range" + +// LightClientFinalityUpdateName is the name for the LightClientFinalityUpdate topic. +const LightClientFinalityUpdateName = "/light_client_finality_update" + +// LightClientOptimisticUpdateName is the name for the LightClientOptimisticUpdate topic. +const LightClientOptimisticUpdateName = "/light_client_optimistic_update" + const ( // V1 RPC Topics // RPCStatusTopicV1 defines the v1 topic for the status rpc method. @@ -66,6 +79,15 @@ const ( // /eth2/beacon_chain/req/blob_sidecars_by_root/1/ RPCBlobSidecarsByRootTopicV1 = protocolPrefix + BlobSidecarsByRootName + SchemaVersionV1 + // RPCLightClientBootstrapTopicV1 is a topic for requesting a light client bootstrap. + RPCLightClientBootstrapTopicV1 = protocolPrefix + LightClientBootstrapName + SchemaVersionV1 + // RPCLightClientUpdatesByRangeTopicV1 is a topic for requesting light client updates by range. + RPCLightClientUpdatesByRangeTopicV1 = protocolPrefix + LightClientUpdatesByRangeName + SchemaVersionV1 + // RPCLightClientFinalityUpdateTopicV1 is a topic for requesting a light client finality update. + RPCLightClientFinalityUpdateTopicV1 = protocolPrefix + LightClientFinalityUpdateName + SchemaVersionV1 + // RPCLightClientOptimisticUpdateTopicV1 is a topic for requesting a light client Optimistic update. + RPCLightClientOptimisticUpdateTopicV1 = protocolPrefix + LightClientOptimisticUpdateName + SchemaVersionV1 + // V2 RPC Topics // RPCBlocksByRangeTopicV2 defines v2 the topic for the blocks by range rpc method. RPCBlocksByRangeTopicV2 = protocolPrefix + BeaconBlocksByRangeMessageName + SchemaVersionV2 @@ -101,6 +123,12 @@ var RPCTopicMappings = map[string]interface{}{ RPCBlobSidecarsByRangeTopicV1: new(pb.BlobSidecarsByRangeRequest), // BlobSidecarsByRoot v1 Message RPCBlobSidecarsByRootTopicV1: new(p2ptypes.BlobSidecarsByRootReq), + + // Light client + RPCLightClientBootstrapTopicV1: new([fieldparams.RootLength]byte), + RPCLightClientUpdatesByRangeTopicV1: new(pb.LightClientUpdatesByRangeRequest), + RPCLightClientFinalityUpdateTopicV1: new(interface{}), + RPCLightClientOptimisticUpdateTopicV1: new(interface{}), } // Maps all registered protocol prefixes. @@ -111,14 +139,18 @@ var protocolMapping = map[string]bool{ // Maps all the protocol message names for the different rpc // topics. var messageMapping = map[string]bool{ - StatusMessageName: true, - GoodbyeMessageName: true, - BeaconBlocksByRangeMessageName: true, - BeaconBlocksByRootsMessageName: true, - PingMessageName: true, - MetadataMessageName: true, - BlobSidecarsByRangeName: true, - BlobSidecarsByRootName: true, + StatusMessageName: true, + GoodbyeMessageName: true, + BeaconBlocksByRangeMessageName: true, + BeaconBlocksByRootsMessageName: true, + PingMessageName: true, + MetadataMessageName: true, + BlobSidecarsByRangeName: true, + BlobSidecarsByRootName: true, + LightClientBootstrapName: true, + LightClientUpdatesByRangeName: true, + LightClientFinalityUpdateName: true, + LightClientOptimisticUpdateName: true, } // Maps all the RPC messages which are to updated in altair. diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 0131a474e8..e4966bc1ff 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -26,6 +26,7 @@ go_library( "rpc_blob_sidecars_by_root.go", "rpc_chunked_response.go", "rpc_goodbye.go", + "rpc_light_client.go", "rpc_metadata.go", "rpc_ping.go", "rpc_send_request.go", @@ -114,6 +115,7 @@ go_library( "//encoding/bytesutil:go_default_library", "//encoding/ssz/equality:go_default_library", "//io/file:go_default_library", + "//math:go_default_library", "//monitoring/tracing:go_default_library", "//monitoring/tracing/trace:go_default_library", "//network/forks:go_default_library", @@ -168,6 +170,7 @@ go_test( "rpc_blob_sidecars_by_root_test.go", "rpc_goodbye_test.go", "rpc_handler_test.go", + "rpc_light_client_test.go", "rpc_metadata_test.go", "rpc_ping_test.go", "rpc_send_request_test.go", @@ -231,6 +234,7 @@ go_test( "//beacon-chain/verification:go_default_library", "//cache/lru:go_default_library", "//cmd/beacon-chain/flags:go_default_library", + "//config/features:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", diff --git a/beacon-chain/sync/rate_limiter.go b/beacon-chain/sync/rate_limiter.go index c026ac574e..685c9d5eb4 100644 --- a/beacon-chain/sync/rate_limiter.go +++ b/beacon-chain/sync/rate_limiter.go @@ -80,6 +80,12 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter { // BlobSidecarsByRangeV1 topicMap[addEncoding(p2p.RPCBlobSidecarsByRangeTopicV1)] = blobCollector + // Light client requests + topicMap[addEncoding(p2p.RPCLightClientBootstrapTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */) + topicMap[addEncoding(p2p.RPCLightClientUpdatesByRangeTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */) + topicMap[addEncoding(p2p.RPCLightClientOptimisticUpdateTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */) + topicMap[addEncoding(p2p.RPCLightClientFinalityUpdateTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, leakyBucketPeriod, false /* deleteEmptyBuckets */) + // General topic for all rpc requests. topicMap[rpcLimiterTopic] = leakybucket.NewCollector(5, defaultBurstLimit*2, leakyBucketPeriod, false /* deleteEmptyBuckets */) diff --git a/beacon-chain/sync/rate_limiter_test.go b/beacon-chain/sync/rate_limiter_test.go index 88d9bb5ab2..434b5dd7aa 100644 --- a/beacon-chain/sync/rate_limiter_test.go +++ b/beacon-chain/sync/rate_limiter_test.go @@ -18,7 +18,7 @@ import ( func TestNewRateLimiter(t *testing.T) { rlimiter := newRateLimiter(mockp2p.NewTestP2P(t)) - assert.Equal(t, len(rlimiter.limiterMap), 12, "correct number of topics not registered") + assert.Equal(t, len(rlimiter.limiterMap), 16, "correct number of topics not registered") } func TestNewRateLimiter_FreeCorrectly(t *testing.T) { diff --git a/beacon-chain/sync/rpc.go b/beacon-chain/sync/rpc.go index 75b369f32b..a25125bd80 100644 --- a/beacon-chain/sync/rpc.go +++ b/beacon-chain/sync/rpc.go @@ -9,6 +9,7 @@ import ( "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p" p2ptypes "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types" + "github.com/OffchainLabs/prysm/v6/config/features" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" "github.com/OffchainLabs/prysm/v6/monitoring/tracing" @@ -70,14 +71,23 @@ func (s *Service) rpcHandlerByTopicFromFork(forkIndex int) (map[string]rpcHandle // Bellatrix: https://github.com/ethereum/consensus-specs/blob/dev/specs/bellatrix/p2p-interface.md#messages // Altair: https://github.com/ethereum/consensus-specs/blob/dev/specs/altair/p2p-interface.md#messages if forkIndex >= version.Altair { - return map[string]rpcHandler{ + handler := map[string]rpcHandler{ p2p.RPCStatusTopicV1: s.statusRPCHandler, p2p.RPCGoodByeTopicV1: s.goodbyeRPCHandler, p2p.RPCBlocksByRangeTopicV2: s.beaconBlocksByRangeRPCHandler, // Updated in Altair and modified in Capella p2p.RPCBlocksByRootTopicV2: s.beaconBlocksRootRPCHandler, // Updated in Altair and modified in Capella p2p.RPCPingTopicV1: s.pingHandler, p2p.RPCMetaDataTopicV2: s.metaDataHandler, // Updated in Altair - }, nil + } + + if features.Get().EnableLightClient { + handler[p2p.RPCLightClientBootstrapTopicV1] = s.lightClientBootstrapRPCHandler + handler[p2p.RPCLightClientUpdatesByRangeTopicV1] = s.lightClientUpdatesByRangeRPCHandler + handler[p2p.RPCLightClientFinalityUpdateTopicV1] = s.lightClientFinalityUpdateRPCHandler + handler[p2p.RPCLightClientOptimisticUpdateTopicV1] = s.lightClientOptimisticUpdateRPCHandler + } + + return handler, nil } // PhaseO: https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/p2p-interface.md#messages @@ -246,9 +256,11 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) { // Increment message received counter. messageReceivedCounter.WithLabelValues(topic).Inc() - // since metadata requests do not have any data in the payload, we + // since some requests do not have any data in the payload, we // do not decode anything. - if baseTopic == p2p.RPCMetaDataTopicV1 || baseTopic == p2p.RPCMetaDataTopicV2 { + if baseTopic == p2p.RPCMetaDataTopicV1 || baseTopic == p2p.RPCMetaDataTopicV2 || + baseTopic == p2p.RPCLightClientOptimisticUpdateTopicV1 || + baseTopic == p2p.RPCLightClientFinalityUpdateTopicV1 { if err := handle(ctx, base, stream); err != nil { messageFailedProcessingCounter.WithLabelValues(topic).Inc() if !errors.Is(err, p2ptypes.ErrWrongForkDigestVersion) { diff --git a/beacon-chain/sync/rpc_chunked_response.go b/beacon-chain/sync/rpc_chunked_response.go index 3d595d64c9..49d16593a0 100644 --- a/beacon-chain/sync/rpc_chunked_response.go +++ b/beacon-chain/sync/rpc_chunked_response.go @@ -161,3 +161,80 @@ func WriteBlobSidecarChunk(stream libp2pcore.Stream, tor blockchain.TemporalOrac _, err = encoding.EncodeWithMaxLength(stream, sidecar) return err } + +func WriteLightClientBootstrapChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, encoding encoder.NetworkEncoding, bootstrap interfaces.LightClientBootstrap) error { + if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { + return err + } + + valRoot := tor.GenesisValidatorsRoot() + digest, err := forks.ForkDigestFromEpoch(slots.ToEpoch(bootstrap.Header().Beacon().Slot), valRoot[:]) + if err != nil { + return err + } + + obtainedCtx := digest[:] + if err = writeContextToStream(obtainedCtx, stream); err != nil { + return err + } + + _, err = encoding.EncodeWithMaxLength(stream, bootstrap) + return err +} + +func WriteLightClientUpdateChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, encoding encoder.NetworkEncoding, update interfaces.LightClientUpdate) error { + if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { + return err + } + + valRoot := tor.GenesisValidatorsRoot() + digest, err := forks.ForkDigestFromEpoch(slots.ToEpoch(update.AttestedHeader().Beacon().Slot), valRoot[:]) + if err != nil { + return err + } + obtainedCtx := digest[:] + + if err = writeContextToStream(obtainedCtx, stream); err != nil { + return err + } + _, err = encoding.EncodeWithMaxLength(stream, update) + return err +} + +func WriteLightClientOptimisticUpdateChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, encoding encoder.NetworkEncoding, update interfaces.LightClientOptimisticUpdate) error { + if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { + return err + } + + valRoot := tor.GenesisValidatorsRoot() + digest, err := forks.ForkDigestFromEpoch(slots.ToEpoch(update.AttestedHeader().Beacon().Slot), valRoot[:]) + if err != nil { + return err + } + obtainedCtx := digest[:] + + if err = writeContextToStream(obtainedCtx, stream); err != nil { + return err + } + _, err = encoding.EncodeWithMaxLength(stream, update) + return err +} + +func WriteLightClientFinalityUpdateChunk(stream libp2pcore.Stream, tor blockchain.TemporalOracle, encoding encoder.NetworkEncoding, update interfaces.LightClientFinalityUpdate) error { + if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil { + return err + } + + valRoot := tor.GenesisValidatorsRoot() + digest, err := forks.ForkDigestFromEpoch(slots.ToEpoch(update.AttestedHeader().Beacon().Slot), valRoot[:]) + if err != nil { + return err + } + obtainedCtx := digest[:] + + if err = writeContextToStream(obtainedCtx, stream); err != nil { + return err + } + _, err = encoding.EncodeWithMaxLength(stream, update) + return err +} diff --git a/beacon-chain/sync/rpc_light_client.go b/beacon-chain/sync/rpc_light_client.go new file mode 100644 index 0000000000..f63a1044f2 --- /dev/null +++ b/beacon-chain/sync/rpc_light_client.go @@ -0,0 +1,222 @@ +package sync + +import ( + "context" + "fmt" + + "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p" + "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/types" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" + "github.com/OffchainLabs/prysm/v6/config/params" + "github.com/OffchainLabs/prysm/v6/math" + "github.com/OffchainLabs/prysm/v6/monitoring/tracing" + "github.com/OffchainLabs/prysm/v6/monitoring/tracing/trace" + eth "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + libp2pcore "github.com/libp2p/go-libp2p/core" +) + +// lightClientBootstrapRPCHandler handles the /eth2/beacon_chain/req/light_client_bootstrap/1/ RPC request. +func (s *Service) lightClientBootstrapRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { + ctx, span := trace.StartSpan(ctx, "sync.lightClientBootstrapRPCHandler") + defer span.End() + ctx, cancel := context.WithTimeout(ctx, ttfbTimeout) + defer cancel() + + logger := log.WithField("handler", p2p.LightClientBootstrapName[1:]) + + SetRPCStreamDeadlines(stream) + if err := s.rateLimiter.validateRequest(stream, 1); err != nil { + logger.WithError(err).Error("s.rateLimiter.validateRequest") + return err + } + s.rateLimiter.add(stream, 1) + + rawMsg, ok := msg.(*[fieldparams.RootLength]byte) + if !ok { + logger.Error("Message is not *types.LightClientBootstrapReq") + return fmt.Errorf("message is not type %T", &[fieldparams.RootLength]byte{}) + } + blkRoot := *rawMsg + + bootstrap, err := s.cfg.beaconDB.LightClientBootstrap(ctx, blkRoot[:]) + if err != nil { + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) + logger.WithError(err).Error("s.cfg.beaconDB.LightClientBootstrap") + return err + } + if bootstrap == nil { + s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrResourceUnavailable.Error(), stream) + tracing.AnnotateError(span, err) + logger.WithError(err).Error(fmt.Sprintf("nil bootstrap for root %#x", blkRoot)) + return err + } + + SetStreamWriteDeadline(stream, defaultWriteDuration) + if err = WriteLightClientBootstrapChunk(stream, s.cfg.clock, s.cfg.p2p.Encoding(), bootstrap); err != nil { + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) + logger.WithError(err).Error("WriteLightClientBootstrapChunk") + return err + } + + logger.Info("lightClientBootstrapRPCHandler completed") + + closeStream(stream, logger) + return nil +} + +// lightClientUpdatesByRangeRPCHandler handles the /eth2/beacon_chain/req/light_client_updates_by_range/1/ RPC request. +func (s *Service) lightClientUpdatesByRangeRPCHandler(ctx context.Context, msg interface{}, stream libp2pcore.Stream) error { + ctx, span := trace.StartSpan(ctx, "sync.lightClientUpdatesByRangeRPCHandler") + defer span.End() + ctx, cancel := context.WithTimeout(ctx, ttfbTimeout) + defer cancel() + + logger := log.WithField("handler", p2p.LightClientUpdatesByRangeName[1:]) + + SetRPCStreamDeadlines(stream) + if err := s.rateLimiter.validateRequest(stream, 1); err != nil { + logger.WithError(err).Error("s.rateLimiter.validateRequest") + return err + } + s.rateLimiter.add(stream, 1) + + r, ok := msg.(*eth.LightClientUpdatesByRangeRequest) + if !ok { + logger.Error("Message is not *eth.LightClientUpdatesByRangeReq") + return fmt.Errorf("message is not type %T", ð.LightClientUpdatesByRangeRequest{}) + } + + if r.Count == 0 { + s.writeErrorResponseToStream(responseCodeInvalidRequest, "count is 0", stream) + s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + logger.Error("Count is 0") + return nil + } + + if r.Count > params.BeaconConfig().MaxRequestLightClientUpdates { + r.Count = params.BeaconConfig().MaxRequestLightClientUpdates + } + + endPeriod, err := math.Add64(r.StartPeriod, r.Count-1) + if err != nil { + s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) + s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) + tracing.AnnotateError(span, err) + logger.WithError(err).Error("End period overflows") + return err + } + + logger.Infof("LC: requesting updates by range (StartPeriod: %d, EndPeriod: %d)", r.StartPeriod, r.StartPeriod+r.Count-1) + + updates, err := s.cfg.beaconDB.LightClientUpdates(ctx, r.StartPeriod, endPeriod) + if err != nil { + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) + logger.WithError(err).Error("s.cfg.beaconDB.LightClientUpdates") + return err + } + + if len(updates) == 0 { + s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrResourceUnavailable.Error(), stream) + tracing.AnnotateError(span, err) + logger.Debugf("No update available for start period %d", r.StartPeriod) + return nil + } + + for i := r.StartPeriod; i <= endPeriod; i++ { + u, ok := updates[i] + if !ok { + break + } + + SetStreamWriteDeadline(stream, defaultWriteDuration) + if err = WriteLightClientUpdateChunk(stream, s.cfg.clock, s.cfg.p2p.Encoding(), u); err != nil { + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) + logger.WithError(err).Error("WriteLightClientUpdateChunk") + return err + } + s.rateLimiter.add(stream, 1) + } + + logger.Info("lightClientUpdatesByRangeRPCHandler completed") + + closeStream(stream, logger) + return nil +} + +// lightClientFinalityUpdateRPCHandler handles the /eth2/beacon_chain/req/light_client_finality_update/1/ RPC request. +func (s *Service) lightClientFinalityUpdateRPCHandler(ctx context.Context, _ interface{}, stream libp2pcore.Stream) error { + ctx, span := trace.StartSpan(ctx, "sync.lightClientFinalityUpdateRPCHandler") + defer span.End() + _, cancel := context.WithTimeout(ctx, ttfbTimeout) + defer cancel() + + logger := log.WithField("handler", p2p.LightClientFinalityUpdateName[1:]) + + SetRPCStreamDeadlines(stream) + if err := s.rateLimiter.validateRequest(stream, 1); err != nil { + logger.WithError(err).Error("s.rateLimiter.validateRequest") + return err + } + s.rateLimiter.add(stream, 1) + + if s.lcStore.LastFinalityUpdate() == nil { + s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrResourceUnavailable.Error(), stream) + logger.Error("No finality update available") + return nil + } + + SetStreamWriteDeadline(stream, defaultWriteDuration) + if err := WriteLightClientFinalityUpdateChunk(stream, s.cfg.clock, s.cfg.p2p.Encoding(), s.lcStore.LastFinalityUpdate()); err != nil { + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) + logger.WithError(err).Error("WriteLightClientFinalityUpdateChunk") + return err + } + + logger.Info("lightClientFinalityUpdateRPCHandler completed") + + closeStream(stream, logger) + return nil +} + +// lightClientOptimisticUpdateRPCHandler handles the /eth2/beacon_chain/req/light_client_optimistic_update/1/ RPC request. +func (s *Service) lightClientOptimisticUpdateRPCHandler(ctx context.Context, _ interface{}, stream libp2pcore.Stream) error { + ctx, span := trace.StartSpan(ctx, "sync.lightClientOptimisticUpdateRPCHandler") + defer span.End() + _, cancel := context.WithTimeout(ctx, ttfbTimeout) + defer cancel() + + logger := log.WithField("handler", p2p.LightClientOptimisticUpdateName[1:]) + + logger.Info("lightClientOptimisticUpdateRPCHandler invoked") + + SetRPCStreamDeadlines(stream) + if err := s.rateLimiter.validateRequest(stream, 1); err != nil { + logger.WithError(err).Error("s.rateLimiter.validateRequest") + return err + } + s.rateLimiter.add(stream, 1) + + if s.lcStore.LastOptimisticUpdate() == nil { + s.writeErrorResponseToStream(responseCodeResourceUnavailable, types.ErrResourceUnavailable.Error(), stream) + logger.Error("No optimistic update available") + return nil + } + + SetStreamWriteDeadline(stream, defaultWriteDuration) + if err := WriteLightClientOptimisticUpdateChunk(stream, s.cfg.clock, s.cfg.p2p.Encoding(), s.lcStore.LastOptimisticUpdate()); err != nil { + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) + logger.WithError(err).Error("WriteLightClientOptimisticUpdateChunk") + return err + } + + logger.Info("lightClientOptimisticUpdateRPCHandler completed") + + closeStream(stream, logger) + return nil +} diff --git a/beacon-chain/sync/rpc_light_client_test.go b/beacon-chain/sync/rpc_light_client_test.go new file mode 100644 index 0000000000..6e5922bf72 --- /dev/null +++ b/beacon-chain/sync/rpc_light_client_test.go @@ -0,0 +1,521 @@ +package sync + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/OffchainLabs/prysm/v6/async/abool" + mockChain "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/testing" + lightClient "github.com/OffchainLabs/prysm/v6/beacon-chain/core/light-client" + db "github.com/OffchainLabs/prysm/v6/beacon-chain/db/testing" + "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p" + p2ptest "github.com/OffchainLabs/prysm/v6/beacon-chain/p2p/testing" + "github.com/OffchainLabs/prysm/v6/beacon-chain/startup" + mockSync "github.com/OffchainLabs/prysm/v6/beacon-chain/sync/initial-sync/testing" + "github.com/OffchainLabs/prysm/v6/config/features" + "github.com/OffchainLabs/prysm/v6/config/params" + leakybucket "github.com/OffchainLabs/prysm/v6/container/leaky-bucket" + "github.com/OffchainLabs/prysm/v6/network/forks" + pb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" + "github.com/OffchainLabs/prysm/v6/runtime/version" + "github.com/OffchainLabs/prysm/v6/testing/require" + "github.com/OffchainLabs/prysm/v6/testing/util" + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/protocol" +) + +func TestRPC_LightClientBootstrap(t *testing.T) { + resetFn := features.InitWithReset(&features.Flags{ + EnableLightClient: true, + }) + defer resetFn() + + ctx := context.Background() + p2pService := p2ptest.NewTestP2P(t) + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + require.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") + + chainService := &mockChain.ChainService{ + ValidatorsRoot: [32]byte{'A'}, + Genesis: time.Unix(time.Now().Unix(), 0), + } + d := db.SetupDB(t) + r := Service{ + ctx: ctx, + cfg: &config{ + p2p: p2pService, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + beaconDB: d, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + stateNotifier: &mockChain.MockStateNotifier{}, + }, + chainStarted: abool.New(), + lcStore: &lightClient.Store{}, + subHandler: newSubTopicHandler(), + rateLimiter: newRateLimiter(p1), + } + pcl := protocol.ID(p2p.RPCLightClientBootstrapTopicV1) + topic := string(pcl) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false) + + altairDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + bellatrixDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().BellatrixForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + capellaDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().CapellaForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + denebDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().DenebForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + electraDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().ElectraForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + + for i := 1; i <= 5; i++ { + t.Run(version.String(i), func(t *testing.T) { + l := util.NewTestLightClient(t, i) + bootstrap, err := lightClient.NewLightClientBootstrapFromBeaconState(ctx, l.State.Slot(), l.State, l.Block) + require.NoError(t, err) + blockRoot, err := l.Block.Block().HashTreeRoot() + require.NoError(t, err) + + require.NoError(t, r.cfg.beaconDB.SaveLightClientBootstrap(ctx, blockRoot[:], bootstrap)) + + var wg sync.WaitGroup + wg.Add(1) + p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { + defer wg.Done() + expectSuccess(t, stream) + rpcCtx, err := readContextFromStream(stream) + require.NoError(t, err) + require.Equal(t, 4, len(rpcCtx)) + + var resSSZ []byte + + switch i { + case version.Altair: + require.DeepSSZEqual(t, altairDigest[:], rpcCtx) + var res pb.LightClientBootstrapAltair + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Bellatrix: + require.DeepSSZEqual(t, bellatrixDigest[:], rpcCtx) + var res pb.LightClientBootstrapAltair + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Capella: + require.DeepSSZEqual(t, capellaDigest[:], rpcCtx) + var res pb.LightClientBootstrapCapella + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Deneb: + require.DeepSSZEqual(t, denebDigest[:], rpcCtx) + var res pb.LightClientBootstrapDeneb + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Electra: + require.DeepSSZEqual(t, electraDigest[:], rpcCtx) + var res pb.LightClientBootstrapElectra + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + default: + t.Fatalf("unsupported version %d", i) + } + + bootstrapSSZ, err := bootstrap.MarshalSSZ() + require.NoError(t, err) + require.DeepSSZEqual(t, resSSZ, bootstrapSSZ) + }) + + stream1, err := p1.BHost.NewStream(context.Background(), p2.BHost.ID(), pcl) + require.NoError(t, err) + err = r.lightClientBootstrapRPCHandler(ctx, &blockRoot, stream1) + require.NoError(t, err) + + if util.WaitTimeout(&wg, 1*time.Second) { + t.Fatal("Did not receive stream within 1 sec") + } + }) + } + +} + +func TestRPC_LightClientOptimisticUpdate(t *testing.T) { + resetFn := features.InitWithReset(&features.Flags{ + EnableLightClient: true, + }) + defer resetFn() + + ctx := context.Background() + p2pService := p2ptest.NewTestP2P(t) + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + require.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") + + chainService := &mockChain.ChainService{ + ValidatorsRoot: [32]byte{'A'}, + Genesis: time.Unix(time.Now().Unix(), 0), + } + d := db.SetupDB(t) + r := Service{ + ctx: ctx, + cfg: &config{ + p2p: p2pService, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + beaconDB: d, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + stateNotifier: &mockChain.MockStateNotifier{}, + }, + chainStarted: abool.New(), + lcStore: &lightClient.Store{}, + subHandler: newSubTopicHandler(), + rateLimiter: newRateLimiter(p1), + } + pcl := protocol.ID(p2p.RPCLightClientOptimisticUpdateTopicV1) + topic := string(pcl) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false) + + altairDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + bellatrixDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().BellatrixForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + capellaDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().CapellaForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + denebDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().DenebForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + electraDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().ElectraForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + + for i := 1; i <= 5; i++ { + t.Run(version.String(i), func(t *testing.T) { + l := util.NewTestLightClient(t, i) + + update, err := lightClient.NewLightClientOptimisticUpdateFromBeaconState(ctx, l.State.Slot(), l.State, l.Block, l.AttestedState, l.AttestedBlock) + require.NoError(t, err) + + r.lcStore.SetLastOptimisticUpdate(update) + + var wg sync.WaitGroup + wg.Add(1) + p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { + defer wg.Done() + expectSuccess(t, stream) + var resSSZ []byte + + rpcCtx, err := readContextFromStream(stream) + require.NoError(t, err) + require.Equal(t, 4, len(rpcCtx)) + + switch i { + case version.Altair: + require.DeepSSZEqual(t, altairDigest[:], rpcCtx) + var res pb.LightClientOptimisticUpdateAltair + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Bellatrix: + require.DeepSSZEqual(t, bellatrixDigest[:], rpcCtx) + var res pb.LightClientOptimisticUpdateAltair + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Capella: + require.DeepSSZEqual(t, capellaDigest[:], rpcCtx) + var res pb.LightClientOptimisticUpdateCapella + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Deneb: + require.DeepSSZEqual(t, denebDigest[:], rpcCtx) + var res pb.LightClientOptimisticUpdateDeneb + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Electra: + require.DeepSSZEqual(t, electraDigest[:], rpcCtx) + var res pb.LightClientOptimisticUpdateDeneb + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + default: + t.Fatalf("unsupported version %d", i) + } + + updateSSZ, err := update.MarshalSSZ() + require.NoError(t, err) + require.DeepSSZEqual(t, resSSZ, updateSSZ) + }) + + stream1, err := p1.BHost.NewStream(context.Background(), p2.BHost.ID(), pcl) + require.NoError(t, err) + err = r.lightClientOptimisticUpdateRPCHandler(ctx, nil, stream1) + require.NoError(t, err) + + if util.WaitTimeout(&wg, 1*time.Second) { + t.Fatal("Did not receive stream within 1 sec") + } + }) + } +} + +func TestRPC_LightClientFinalityUpdate(t *testing.T) { + resetFn := features.InitWithReset(&features.Flags{ + EnableLightClient: true, + }) + defer resetFn() + + ctx := context.Background() + p2pService := p2ptest.NewTestP2P(t) + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + require.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") + + chainService := &mockChain.ChainService{ + ValidatorsRoot: [32]byte{'A'}, + Genesis: time.Unix(time.Now().Unix(), 0), + } + d := db.SetupDB(t) + r := Service{ + ctx: ctx, + cfg: &config{ + p2p: p2pService, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + beaconDB: d, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + stateNotifier: &mockChain.MockStateNotifier{}, + }, + chainStarted: abool.New(), + lcStore: &lightClient.Store{}, + subHandler: newSubTopicHandler(), + rateLimiter: newRateLimiter(p1), + } + pcl := protocol.ID(p2p.RPCLightClientFinalityUpdateTopicV1) + topic := string(pcl) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false) + + altairDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + bellatrixDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().BellatrixForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + capellaDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().CapellaForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + denebDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().DenebForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + electraDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().ElectraForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + + for i := 1; i <= 5; i++ { + t.Run(version.String(i), func(t *testing.T) { + l := util.NewTestLightClient(t, i) + + update, err := lightClient.NewLightClientFinalityUpdateFromBeaconState(ctx, l.State.Slot(), l.State, l.Block, l.AttestedState, l.AttestedBlock, l.FinalizedBlock) + require.NoError(t, err) + + r.lcStore.SetLastFinalityUpdate(update) + + var wg sync.WaitGroup + wg.Add(1) + p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { + defer wg.Done() + expectSuccess(t, stream) + var resSSZ []byte + + rpcCtx, err := readContextFromStream(stream) + require.NoError(t, err) + require.Equal(t, 4, len(rpcCtx)) + + switch i { + case version.Altair: + require.DeepSSZEqual(t, altairDigest[:], rpcCtx) + var res pb.LightClientFinalityUpdateAltair + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Bellatrix: + require.DeepSSZEqual(t, bellatrixDigest[:], rpcCtx) + var res pb.LightClientFinalityUpdateAltair + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Capella: + require.DeepSSZEqual(t, capellaDigest[:], rpcCtx) + var res pb.LightClientFinalityUpdateCapella + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Deneb: + require.DeepSSZEqual(t, denebDigest[:], rpcCtx) + var res pb.LightClientFinalityUpdateDeneb + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Electra: + require.DeepSSZEqual(t, electraDigest[:], rpcCtx) + var res pb.LightClientFinalityUpdateElectra + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + default: + t.Fatalf("unsupported version %d", i) + } + + updateSSZ, err := update.MarshalSSZ() + require.NoError(t, err) + require.DeepSSZEqual(t, resSSZ, updateSSZ) + }) + + stream1, err := p1.BHost.NewStream(context.Background(), p2.BHost.ID(), pcl) + require.NoError(t, err) + err = r.lightClientFinalityUpdateRPCHandler(ctx, nil, stream1) + require.NoError(t, err) + + if util.WaitTimeout(&wg, 1*time.Second) { + t.Fatal("Did not receive stream within 1 sec") + } + }) + } +} + +func TestRPC_LightClientUpdatesByRange(t *testing.T) { + resetFn := features.InitWithReset(&features.Flags{ + EnableLightClient: true, + }) + defer resetFn() + + ctx := context.Background() + p2pService := p2ptest.NewTestP2P(t) + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + require.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") + + chainService := &mockChain.ChainService{ + ValidatorsRoot: [32]byte{'A'}, + Genesis: time.Unix(time.Now().Unix(), 0), + } + d := db.SetupDB(t) + r := Service{ + ctx: ctx, + cfg: &config{ + p2p: p2pService, + initialSync: &mockSync.Sync{IsSyncing: false}, + chain: chainService, + beaconDB: d, + clock: startup.NewClock(chainService.Genesis, chainService.ValidatorsRoot), + stateNotifier: &mockChain.MockStateNotifier{}, + }, + chainStarted: abool.New(), + lcStore: &lightClient.Store{}, + subHandler: newSubTopicHandler(), + rateLimiter: newRateLimiter(p1), + } + pcl := protocol.ID(p2p.RPCLightClientUpdatesByRangeTopicV1) + topic := string(pcl) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false) + + altairDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + bellatrixDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().BellatrixForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + capellaDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().CapellaForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + denebDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().DenebForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + electraDigest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().ElectraForkEpoch, chainService.ValidatorsRoot[:]) + require.NoError(t, err) + + for i := 1; i <= 5; i++ { + t.Run(version.String(i), func(t *testing.T) { + for j := 0; j < 5; j++ { + l := util.NewTestLightClient(t, i, util.WithIncreasedAttestedSlot(uint64(j))) + update, err := lightClient.NewLightClientUpdateFromBeaconState(ctx, l.State.Slot(), l.State, l.Block, l.AttestedState, l.AttestedBlock, l.FinalizedBlock) + require.NoError(t, err) + require.NoError(t, r.cfg.beaconDB.SaveLightClientUpdate(ctx, uint64(j), update)) + } + + var wg sync.WaitGroup + wg.Add(1) + + responseCounter := 0 + + p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { + defer wg.Done() + expectSuccess(t, stream) + rpcCtx, err := readContextFromStream(stream) + require.NoError(t, err) + require.Equal(t, 4, len(rpcCtx)) + + var resSSZ []byte + + switch i { + case version.Altair: + require.DeepSSZEqual(t, altairDigest[:], rpcCtx) + var res pb.LightClientUpdateAltair + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Bellatrix: + require.DeepSSZEqual(t, bellatrixDigest[:], rpcCtx) + var res pb.LightClientUpdateAltair + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Capella: + require.DeepSSZEqual(t, capellaDigest[:], rpcCtx) + var res pb.LightClientUpdateCapella + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Deneb: + require.DeepSSZEqual(t, denebDigest[:], rpcCtx) + var res pb.LightClientUpdateDeneb + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + case version.Electra: + require.DeepSSZEqual(t, electraDigest[:], rpcCtx) + var res pb.LightClientUpdateElectra + require.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, &res)) + resSSZ, err = res.MarshalSSZ() + require.NoError(t, err) + default: + t.Fatalf("unsupported version %d", i) + } + + update, err := r.cfg.beaconDB.LightClientUpdates(ctx, 0, 4) + require.NoError(t, err) + bootstrapSSZ, err := update[uint64(responseCounter)].MarshalSSZ() + require.NoError(t, err) + require.DeepSSZEqual(t, resSSZ, bootstrapSSZ) + responseCounter++ + }) + + stream1, err := p1.BHost.NewStream(context.Background(), p2.BHost.ID(), pcl) + require.NoError(t, err) + + msg := pb.LightClientUpdatesByRangeRequest{ + StartPeriod: 0, + Count: 5, + } + err = r.lightClientUpdatesByRangeRPCHandler(ctx, &msg, stream1) + require.NoError(t, err) + + if util.WaitTimeout(&wg, 1*time.Second) { + t.Fatal("Did not receive stream within 1 sec") + } + }) + } + +} diff --git a/changelog/bastin_add-lc-req-resp.md b/changelog/bastin_add-lc-req-resp.md new file mode 100644 index 0000000000..0c6cc3ec9c --- /dev/null +++ b/changelog/bastin_add-lc-req-resp.md @@ -0,0 +1,3 @@ +### Added + +- Add support for light client req/resp domain. \ No newline at end of file diff --git a/proto/prysm/v1alpha1/p2p_messages.pb.go b/proto/prysm/v1alpha1/p2p_messages.pb.go index 8a199779b6..2dd04b0d3d 100755 --- a/proto/prysm/v1alpha1/p2p_messages.pb.go +++ b/proto/prysm/v1alpha1/p2p_messages.pb.go @@ -537,6 +537,61 @@ func (x *DataColumnSidecarsByRangeRequest) GetColumns() []uint64 { return nil } +type LightClientUpdatesByRangeRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + StartPeriod uint64 `protobuf:"varint,1,opt,name=start_period,json=startPeriod,proto3" json:"start_period,omitempty"` + Count uint64 `protobuf:"varint,2,opt,name=count,proto3" json:"count,omitempty"` +} + +func (x *LightClientUpdatesByRangeRequest) Reset() { + *x = LightClientUpdatesByRangeRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_prysm_v1alpha1_p2p_messages_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *LightClientUpdatesByRangeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*LightClientUpdatesByRangeRequest) ProtoMessage() {} + +func (x *LightClientUpdatesByRangeRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_prysm_v1alpha1_p2p_messages_proto_msgTypes[8] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use LightClientUpdatesByRangeRequest.ProtoReflect.Descriptor instead. +func (*LightClientUpdatesByRangeRequest) Descriptor() ([]byte, []int) { + return file_proto_prysm_v1alpha1_p2p_messages_proto_rawDescGZIP(), []int{8} +} + +func (x *LightClientUpdatesByRangeRequest) GetStartPeriod() uint64 { + if x != nil { + return x.StartPeriod + } + return 0 +} + +func (x *LightClientUpdatesByRangeRequest) GetCount() uint64 { + if x != nil { + return x.Count + } + return 0 +} + var File_proto_prysm_v1alpha1_p2p_messages_proto protoreflect.FileDescriptor var file_proto_prysm_v1alpha1_p2p_messages_proto_rawDesc = []byte{ @@ -655,17 +710,23 @@ var file_proto_prysm_v1alpha1_p2p_messages_proto_rawDesc = []byte{ 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x21, 0x0a, 0x07, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x04, 0x42, 0x07, 0x92, 0xb5, 0x18, 0x03, 0x31, 0x32, 0x38, 0x52, 0x07, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x73, - 0x42, 0x9a, 0x01, 0x0a, 0x19, 0x6f, 0x72, 0x67, 0x2e, 0x65, 0x74, 0x68, 0x65, 0x72, 0x65, 0x75, - 0x6d, 0x2e, 0x65, 0x74, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x42, 0x10, - 0x50, 0x32, 0x50, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, - 0x50, 0x01, 0x5a, 0x39, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, - 0x66, 0x66, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x70, 0x72, 0x79, 0x73, - 0x6d, 0x2f, 0x76, 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, - 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x3b, 0x65, 0x74, 0x68, 0xaa, 0x02, 0x15, - 0x45, 0x74, 0x68, 0x65, 0x72, 0x65, 0x75, 0x6d, 0x2e, 0x45, 0x74, 0x68, 0x2e, 0x56, 0x31, 0x61, - 0x6c, 0x70, 0x68, 0x61, 0x31, 0xca, 0x02, 0x15, 0x45, 0x74, 0x68, 0x65, 0x72, 0x65, 0x75, 0x6d, - 0x5c, 0x45, 0x74, 0x68, 0x5c, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x22, 0x5b, 0x0a, 0x20, 0x4c, 0x69, 0x67, 0x68, 0x74, 0x43, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x55, + 0x70, 0x64, 0x61, 0x74, 0x65, 0x73, 0x42, 0x79, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x21, 0x0a, 0x0c, 0x73, 0x74, 0x61, 0x72, 0x74, 0x5f, 0x70, 0x65, + 0x72, 0x69, 0x6f, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x73, 0x74, 0x61, 0x72, + 0x74, 0x50, 0x65, 0x72, 0x69, 0x6f, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x42, 0x9a, 0x01, + 0x0a, 0x19, 0x6f, 0x72, 0x67, 0x2e, 0x65, 0x74, 0x68, 0x65, 0x72, 0x65, 0x75, 0x6d, 0x2e, 0x65, + 0x74, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x42, 0x10, 0x50, 0x32, 0x50, + 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, + 0x39, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x4f, 0x66, 0x66, 0x63, + 0x68, 0x61, 0x69, 0x6e, 0x4c, 0x61, 0x62, 0x73, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, + 0x36, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x70, 0x72, 0x79, 0x73, 0x6d, 0x2f, 0x76, 0x31, + 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x3b, 0x65, 0x74, 0x68, 0xaa, 0x02, 0x15, 0x45, 0x74, 0x68, + 0x65, 0x72, 0x65, 0x75, 0x6d, 0x2e, 0x45, 0x74, 0x68, 0x2e, 0x56, 0x31, 0x61, 0x6c, 0x70, 0x68, + 0x61, 0x31, 0xca, 0x02, 0x15, 0x45, 0x74, 0x68, 0x65, 0x72, 0x65, 0x75, 0x6d, 0x5c, 0x45, 0x74, + 0x68, 0x5c, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x33, } var ( @@ -680,7 +741,7 @@ func file_proto_prysm_v1alpha1_p2p_messages_proto_rawDescGZIP() []byte { return file_proto_prysm_v1alpha1_p2p_messages_proto_rawDescData } -var file_proto_prysm_v1alpha1_p2p_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_proto_prysm_v1alpha1_p2p_messages_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_proto_prysm_v1alpha1_p2p_messages_proto_goTypes = []interface{}{ (*Status)(nil), // 0: ethereum.eth.v1alpha1.Status (*BeaconBlocksByRangeRequest)(nil), // 1: ethereum.eth.v1alpha1.BeaconBlocksByRangeRequest @@ -690,6 +751,7 @@ var file_proto_prysm_v1alpha1_p2p_messages_proto_goTypes = []interface{}{ (*MetaDataV2)(nil), // 5: ethereum.eth.v1alpha1.MetaDataV2 (*BlobSidecarsByRangeRequest)(nil), // 6: ethereum.eth.v1alpha1.BlobSidecarsByRangeRequest (*DataColumnSidecarsByRangeRequest)(nil), // 7: ethereum.eth.v1alpha1.DataColumnSidecarsByRangeRequest + (*LightClientUpdatesByRangeRequest)(nil), // 8: ethereum.eth.v1alpha1.LightClientUpdatesByRangeRequest } var file_proto_prysm_v1alpha1_p2p_messages_proto_depIdxs = []int32{ 0, // [0:0] is the sub-list for method output_type @@ -801,6 +863,18 @@ func file_proto_prysm_v1alpha1_p2p_messages_proto_init() { return nil } } + file_proto_prysm_v1alpha1_p2p_messages_proto_msgTypes[8].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*LightClientUpdatesByRangeRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -808,7 +882,7 @@ func file_proto_prysm_v1alpha1_p2p_messages_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_proto_prysm_v1alpha1_p2p_messages_proto_rawDesc, NumEnums: 0, - NumMessages: 8, + NumMessages: 9, NumExtensions: 0, NumServices: 0, }, diff --git a/proto/prysm/v1alpha1/p2p_messages.proto b/proto/prysm/v1alpha1/p2p_messages.proto index 5032f46b2d..9ac0a0fff0 100644 --- a/proto/prysm/v1alpha1/p2p_messages.proto +++ b/proto/prysm/v1alpha1/p2p_messages.proto @@ -13,13 +13,13 @@ option java_package = "org.ethereum.eth.v1alpha1"; option php_namespace = "Ethereum\\Eth\\v1alpha1"; message Status { - bytes fork_digest = 1 [ (ethereum.eth.ext.ssz_size) = "4" ]; - bytes finalized_root = 2 [ (ethereum.eth.ext.ssz_size) = "32" ]; + bytes fork_digest = 1 [(ethereum.eth.ext.ssz_size) = "4"]; + bytes finalized_root = 2 [(ethereum.eth.ext.ssz_size) = "32"]; uint64 finalized_epoch = 3 [ (ethereum.eth.ext.cast_type) = "github.com/OffchainLabs/prysm/v6/consensus-types/primitives.Epoch" ]; - bytes head_root = 4 [ (ethereum.eth.ext.ssz_size) = "32" ]; + bytes head_root = 4 [(ethereum.eth.ext.ssz_size) = "32"]; uint64 head_slot = 5 [ (ethereum.eth.ext.cast_type) = "github.com/OffchainLabs/prysm/v6/consensus-types/primitives.Slot" @@ -36,8 +36,8 @@ message BeaconBlocksByRangeRequest { } message ENRForkID { - bytes current_fork_digest = 1 [ (ethereum.eth.ext.ssz_size) = "4" ]; - bytes next_fork_version = 2 [ (ethereum.eth.ext.ssz_size) = "4" ]; + bytes current_fork_digest = 1 [(ethereum.eth.ext.ssz_size) = "4"]; + bytes next_fork_version = 2 [(ethereum.eth.ext.ssz_size) = "4"]; uint64 next_fork_epoch = 3 [ (ethereum.eth.ext.cast_type) = "github.com/OffchainLabs/prysm/v6/consensus-types/primitives.Epoch" @@ -138,5 +138,17 @@ message DataColumnSidecarsByRangeRequest { "github.com/OffchainLabs/prysm/v6/consensus-types/primitives.Slot" ]; uint64 count = 2; - repeated uint64 columns = 3 [ (ethereum.eth.ext.ssz_max) = "128" ]; + repeated uint64 columns = 3 [(ethereum.eth.ext.ssz_max) = "128"]; +} + +/* + Spec Definition: + ( + start_period: uint64 + count: uint64 + ) +*/ +message LightClientUpdatesByRangeRequest { + uint64 start_period = 1; + uint64 count = 2; } \ No newline at end of file diff --git a/proto/testing/test.pb.go b/proto/testing/test.pb.go index 6d3e0aa6a9..b5891dc142 100755 --- a/proto/testing/test.pb.go +++ b/proto/testing/test.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.31.0 -// protoc v4.25.1 +// protoc-gen-go v1.33.0 +// protoc v3.21.7 // source: proto/testing/test.proto package testing