From 4be8de2476daf8a4e6d837d8877d410e27c323dc Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Fri, 4 Jul 2025 00:35:28 +0200 Subject: [PATCH] PeerDAS: Implement reconstruction of data column sidecars retrieved from the execution client. (#15469) * `BestFinalized`: No functional change. Improve comments and reduce scope. * PeerDAS execution: Implement engine method `GetBlobsV2` and `ReconstructDataColumnSidecars`. * Fix James' comment. * Fix James' comment. --- beacon-chain/execution/BUILD.bazel | 2 + beacon-chain/execution/engine_client.go | 160 +++++++++++--- beacon-chain/execution/engine_client_test.go | 208 +++++++++++++++++- beacon-chain/execution/testing/BUILD.bazel | 1 + .../execution/testing/mock_engine_client.go | 9 +- beacon-chain/p2p/peers/status.go | 84 ++++--- .../validator/construct_generic_block.go | 29 ++- .../rpc/prysm/v1alpha1/validator/proposer.go | 2 +- .../v1alpha1/validator/proposer_bellatrix.go | 26 +-- .../validator/proposer_bellatrix_test.go | 4 +- .../prysm/v1alpha1/validator/proposer_test.go | 2 +- changelog/manu-peerdas-get-blobs-V2.md | 3 + consensus-types/blocks/execution.go | 6 +- consensus-types/blocks/get_payload.go | 20 +- 14 files changed, 465 insertions(+), 91 deletions(-) create mode 100644 changelog/manu-peerdas-get-blobs-V2.md diff --git a/beacon-chain/execution/BUILD.bazel b/beacon-chain/execution/BUILD.bazel index ca38175597..e8133d404b 100644 --- a/beacon-chain/execution/BUILD.bazel +++ b/beacon-chain/execution/BUILD.bazel @@ -31,6 +31,7 @@ go_library( "//beacon-chain/core/feed:go_default_library", "//beacon-chain/core/feed/state:go_default_library", "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/core/transition:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/execution/types:go_default_library", @@ -97,6 +98,7 @@ go_test( embed = [":go_default_library"], deps = [ "//async/event:go_default_library", + "//beacon-chain/blockchain/kzg:go_default_library", "//beacon-chain/cache/depositsnapshot:go_default_library", "//beacon-chain/core/feed:go_default_library", "//beacon-chain/core/feed/state:go_default_library", diff --git a/beacon-chain/execution/engine_client.go b/beacon-chain/execution/engine_client.go index 7147836099..cd2c2df187 100644 --- a/beacon-chain/execution/engine_client.go +++ b/beacon-chain/execution/engine_client.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/OffchainLabs/prysm/v6/beacon-chain/core/peerdas" "github.com/OffchainLabs/prysm/v6/beacon-chain/execution/types" "github.com/OffchainLabs/prysm/v6/beacon-chain/verification" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" @@ -44,11 +45,18 @@ var ( GetPayloadMethodV3, GetPayloadBodiesByHashV1, GetPayloadBodiesByRangeV1, + GetBlobsV1, } + electraEngineEndpoints = []string{ NewPayloadMethodV4, GetPayloadMethodV4, } + + fuluEngineEndpoints = []string{ + GetPayloadMethodV5, + GetBlobsV2, + } ) const ( @@ -73,6 +81,8 @@ const ( GetPayloadMethodV3 = "engine_getPayloadV3" // GetPayloadMethodV4 is the get payload method added for electra GetPayloadMethodV4 = "engine_getPayloadV4" + // GetPayloadMethodV5 is the get payload method added for fulu + GetPayloadMethodV5 = "engine_getPayloadV5" // BlockByHashMethod request string for JSON-RPC. BlockByHashMethod = "eth_getBlockByHash" // BlockByNumberMethod request string for JSON-RPC. @@ -85,11 +95,16 @@ const ( ExchangeCapabilities = "engine_exchangeCapabilities" // GetBlobsV1 request string for JSON-RPC. GetBlobsV1 = "engine_getBlobsV1" + // GetBlobsV2 request string for JSON-RPC. + GetBlobsV2 = "engine_getBlobsV2" // Defines the seconds before timing out engine endpoints with non-block execution semantics. defaultEngineTimeout = time.Second ) -var errInvalidPayloadBodyResponse = errors.New("engine api payload body response is invalid") +var ( + errInvalidPayloadBodyResponse = errors.New("engine api payload body response is invalid") + errMissingBlobsAndProofsFromEL = errors.New("engine api payload body response is missing blobs and proofs") +) // ForkchoiceUpdatedResponse is the response kind received by the // engine_forkchoiceUpdatedV1 endpoint. @@ -107,7 +122,8 @@ type Reconstructor interface { ReconstructFullBellatrixBlockBatch( ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock, ) ([]interfaces.SignedBeaconBlock, error) - ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, hi func(uint64) bool) ([]blocks.VerifiedROBlob, error) + ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte, hi func(uint64) bool) ([]blocks.VerifiedROBlob, error) + ReconstructDataColumnSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte) ([]blocks.VerifiedRODataColumn, error) } // EngineCaller defines a client that can interact with an Ethereum @@ -256,14 +272,17 @@ func (s *Service) ForkchoiceUpdated( } func getPayloadMethodAndMessage(slot primitives.Slot) (string, proto.Message) { - pe := slots.ToEpoch(slot) - if pe >= params.BeaconConfig().ElectraForkEpoch { + epoch := slots.ToEpoch(slot) + if epoch >= params.BeaconConfig().FuluForkEpoch { + return GetPayloadMethodV5, &pb.ExecutionBundleFulu{} + } + if epoch >= params.BeaconConfig().ElectraForkEpoch { return GetPayloadMethodV4, &pb.ExecutionBundleElectra{} } - if pe >= params.BeaconConfig().DenebForkEpoch { + if epoch >= params.BeaconConfig().DenebForkEpoch { return GetPayloadMethodV3, &pb.ExecutionPayloadDenebWithValueAndBlobsBundle{} } - if pe >= params.BeaconConfig().CapellaForkEpoch { + if epoch >= params.BeaconConfig().CapellaForkEpoch { return GetPayloadMethodV2, &pb.ExecutionPayloadCapellaWithValue{} } return GetPayloadMethod, &pb.ExecutionPayload{} @@ -289,7 +308,7 @@ func (s *Service) GetPayload(ctx context.Context, payloadId [8]byte, slot primit } res, err := blocks.NewGetPayloadResponse(result) if err != nil { - return nil, err + return nil, errors.Wrap(err, "new get payload response") } return res, nil } @@ -298,33 +317,36 @@ func (s *Service) ExchangeCapabilities(ctx context.Context) ([]string, error) { ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.ExchangeCapabilities") defer span.End() - // Only check for electra related engine methods if it has been activated. if params.ElectraEnabled() { supportedEngineEndpoints = append(supportedEngineEndpoints, electraEngineEndpoints...) } - var result []string - err := s.rpcClient.CallContext(ctx, &result, ExchangeCapabilities, supportedEngineEndpoints) - if err != nil { + + if params.FuluEnabled() { + supportedEngineEndpoints = append(supportedEngineEndpoints, fuluEngineEndpoints...) + } + + elSupportedEndpointsSlice := make([]string, len(supportedEngineEndpoints)) + if err := s.rpcClient.CallContext(ctx, &elSupportedEndpointsSlice, ExchangeCapabilities, supportedEngineEndpoints); err != nil { return nil, handleRPCError(err) } - var unsupported []string - for _, s1 := range supportedEngineEndpoints { - supported := false - for _, s2 := range result { - if s1 == s2 { - supported = true - break - } - } - if !supported { - unsupported = append(unsupported, s1) + elSupportedEndpoints := make(map[string]bool, len(elSupportedEndpointsSlice)) + for _, method := range elSupportedEndpointsSlice { + elSupportedEndpoints[method] = true + } + + unsupported := make([]string, 0) + for _, method := range supportedEngineEndpoints { + if !elSupportedEndpoints[method] { + unsupported = append(unsupported, method) } } + if len(unsupported) != 0 { - log.Warnf("Please update client, detected the following unsupported engine methods: %s", unsupported) + log.WithField("methods", unsupported).Warning("Connected execution client does not support some requested engine methods") } - return result, handleRPCError(err) + + return elSupportedEndpointsSlice, nil } // GetTerminalBlockHash returns the valid terminal block hash based on total difficulty. @@ -495,9 +517,10 @@ func (s *Service) HeaderByNumber(ctx context.Context, number *big.Int) (*types.H func (s *Service) GetBlobs(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProof, error) { ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobs") defer span.End() + // If the execution engine does not support `GetBlobsV1`, return early to prevent encountering an error later. if !s.capabilityCache.has(GetBlobsV1) { - return nil, nil + return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV1)) } result := make([]*pb.BlobAndProof, len(versionedHashes)) @@ -505,6 +528,19 @@ func (s *Service) GetBlobs(ctx context.Context, versionedHashes []common.Hash) ( return result, handleRPCError(err) } +func (s *Service) GetBlobsV2(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProofV2, error) { + ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobsV2") + defer span.End() + + if !s.capabilityCache.has(GetBlobsV2) { + return nil, errors.New(fmt.Sprintf("%s is not supported", GetBlobsV2)) + } + + result := make([]*pb.BlobAndProofV2, len(versionedHashes)) + err := s.rpcClient.CallContext(ctx, &result, GetBlobsV2, versionedHashes) + return result, handleRPCError(err) +} + // ReconstructFullBlock takes in a blinded beacon block and reconstructs // a beacon block with a full execution payload via the engine API. func (s *Service) ReconstructFullBlock( @@ -615,6 +651,75 @@ func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces. return verifiedBlobs, nil } +// ReconstructDataColumnSidecars reconstructs the verified data column sidecars for a given beacon block. +// It retrieves the KZG commitments from the block body, fetches the associated blobs and cell proofs from the EL, +// and constructs the corresponding verified read-only data column sidecars. +func (s *Service) ReconstructDataColumnSidecars(ctx context.Context, signedROBlock interfaces.ReadOnlySignedBeaconBlock, blockRoot [fieldparams.RootLength]byte) ([]blocks.VerifiedRODataColumn, error) { + block := signedROBlock.Block() + + log := log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", blockRoot), + "slot": block.Slot(), + }) + + kzgCommitments, err := block.Body().BlobKzgCommitments() + if err != nil { + return nil, wrapWithBlockRoot(err, blockRoot, "blob KZG commitments") + } + + // Collect KZG hashes for all blobs. + versionedHashes := make([]common.Hash, 0, len(kzgCommitments)) + for _, commitment := range kzgCommitments { + versionedHash := primitives.ConvertKzgCommitmentToVersionedHash(commitment) + versionedHashes = append(versionedHashes, versionedHash) + } + + // Fetch all blobsAndCellsProofs from the execution client. + blobAndProofV2s, err := s.GetBlobsV2(ctx, versionedHashes) + if err != nil { + return nil, wrapWithBlockRoot(err, blockRoot, "get blobs V2") + } + + // Return early if nothing is returned from the EL. + if len(blobAndProofV2s) == 0 { + log.Debug("No blobs returned from EL") + return nil, nil + } + + // Extract the blobs and proofs from the blobAndProofV2s. + blobs, cellProofs := make([][]byte, 0, len(blobAndProofV2s)), make([][]byte, 0, len(blobAndProofV2s)) + for _, blobsAndProofs := range blobAndProofV2s { + if blobsAndProofs == nil { + return nil, wrapWithBlockRoot(errMissingBlobsAndProofsFromEL, blockRoot, "") + } + + blobs, cellProofs = append(blobs, blobsAndProofs.Blob), append(cellProofs, blobsAndProofs.KzgProofs...) + } + + // Construct the data column sidcars from the blobs and cell proofs provided by the execution client. + dataColumnSidecars, err := peerdas.ConstructDataColumnSidecars(signedROBlock, blobs, cellProofs) + if err != nil { + return nil, wrapWithBlockRoot(err, blockRoot, "construct data column sidecars") + } + + // Finally, construct verified RO data column sidecars. + // We trust the execution layer we are connected to, so we can upgrade the read only data column sidecar into a verified one. + verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(dataColumnSidecars)) + for _, dataColumnSidecar := range dataColumnSidecars { + roDataColumn, err := blocks.NewRODataColumnWithRoot(dataColumnSidecar, blockRoot) + if err != nil { + return nil, wrapWithBlockRoot(err, blockRoot, "new read-only data column with root") + } + + verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roDataColumn) + verifiedRODataColumns = append(verifiedRODataColumns, verifiedRODataColumn) + } + + log.Debug("Data columns successfully reconstructed from the execution client.") + + return verifiedRODataColumns, nil +} + func fullPayloadFromPayloadBody( header interfaces.ExecutionData, body *pb.ExecutionPayloadBody, bVersion int, ) (interfaces.ExecutionData, error) { @@ -902,3 +1007,8 @@ func toBlockNumArg(number *big.Int) string { } return hexutil.EncodeBig(number) } + +// wrapWithBlockRoot returns a new error with the given block root. +func wrapWithBlockRoot(err error, blockRoot [32]byte, message string) error { + return errors.Wrap(err, fmt.Sprintf("%s for block %#x", message, blockRoot)) +} diff --git a/beacon-chain/execution/engine_client_test.go b/beacon-chain/execution/engine_client_test.go index ee2e6a5b0c..e6bbbf0460 100644 --- a/beacon-chain/execution/engine_client_test.go +++ b/beacon-chain/execution/engine_client_test.go @@ -13,6 +13,7 @@ import ( "strings" "testing" + "github.com/OffchainLabs/prysm/v6/beacon-chain/blockchain/kzg" "github.com/OffchainLabs/prysm/v6/beacon-chain/db/filesystem" mocks "github.com/OffchainLabs/prysm/v6/beacon-chain/execution/testing" "github.com/OffchainLabs/prysm/v6/beacon-chain/verification" @@ -167,6 +168,7 @@ func TestClient_HTTP(t *testing.T) { cfg.CapellaForkEpoch = 1 cfg.DenebForkEpoch = 2 cfg.ElectraForkEpoch = 3 + cfg.FuluForkEpoch = 4 params.OverrideBeaconConfig(cfg) t.Run(GetPayloadMethod, func(t *testing.T) { @@ -317,11 +319,11 @@ func TestClient_HTTP(t *testing.T) { require.DeepEqual(t, uint64(2), g) commitments := [][]byte{bytesutil.PadTo([]byte("commitment1"), fieldparams.BLSPubkeyLength), bytesutil.PadTo([]byte("commitment2"), fieldparams.BLSPubkeyLength)} - require.DeepEqual(t, commitments, resp.BlobsBundle.KzgCommitments) + require.DeepEqual(t, commitments, resp.BlobsBundler.GetKzgCommitments()) proofs := [][]byte{bytesutil.PadTo([]byte("proof1"), fieldparams.BLSPubkeyLength), bytesutil.PadTo([]byte("proof2"), fieldparams.BLSPubkeyLength)} - require.DeepEqual(t, proofs, resp.BlobsBundle.Proofs) + require.DeepEqual(t, proofs, resp.BlobsBundler.GetProofs()) blobs := [][]byte{bytesutil.PadTo([]byte("a"), fieldparams.BlobLength), bytesutil.PadTo([]byte("b"), fieldparams.BlobLength)} - require.DeepEqual(t, blobs, resp.BlobsBundle.Blobs) + require.DeepEqual(t, blobs, resp.BlobsBundler.GetBlobs()) }) t.Run(GetPayloadMethodV4, func(t *testing.T) { payloadId := [8]byte{1} @@ -372,11 +374,11 @@ func TestClient_HTTP(t *testing.T) { require.DeepEqual(t, uint64(2), g) commitments := [][]byte{bytesutil.PadTo([]byte("commitment1"), fieldparams.BLSPubkeyLength), bytesutil.PadTo([]byte("commitment2"), fieldparams.BLSPubkeyLength)} - require.DeepEqual(t, commitments, resp.BlobsBundle.KzgCommitments) + require.DeepEqual(t, commitments, resp.BlobsBundler.GetKzgCommitments()) proofs := [][]byte{bytesutil.PadTo([]byte("proof1"), fieldparams.BLSPubkeyLength), bytesutil.PadTo([]byte("proof2"), fieldparams.BLSPubkeyLength)} - require.DeepEqual(t, proofs, resp.BlobsBundle.Proofs) + require.DeepEqual(t, proofs, resp.BlobsBundler.GetProofs()) blobs := [][]byte{bytesutil.PadTo([]byte("a"), fieldparams.BlobLength), bytesutil.PadTo([]byte("b"), fieldparams.BlobLength)} - require.DeepEqual(t, blobs, resp.BlobsBundle.Blobs) + require.DeepEqual(t, blobs, resp.BlobsBundler.GetBlobs()) requests := &pb.ExecutionRequests{ Deposits: []*pb.DepositRequest{ { @@ -405,7 +407,52 @@ func TestClient_HTTP(t *testing.T) { require.DeepEqual(t, requests, resp.ExecutionRequests) }) + t.Run(GetPayloadMethodV5, func(t *testing.T) { + payloadId := [8]byte{1} + want, ok := fix["ExecutionBundleFulu"].(*pb.GetPayloadV5ResponseJson) + require.Equal(t, true, ok) + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + defer func() { + require.NoError(t, r.Body.Close()) + }() + enc, err := io.ReadAll(r.Body) + require.NoError(t, err) + jsonRequestString := string(enc) + reqArg, err := json.Marshal(pb.PayloadIDBytes(payloadId)) + require.NoError(t, err) + + // We expect the JSON string RPC request contains the right arguments. + require.Equal(t, true, strings.Contains( + jsonRequestString, string(reqArg), + )) + resp := map[string]interface{}{ + "jsonrpc": "2.0", + "id": 1, + "result": want, + } + err = json.NewEncoder(w).Encode(resp) + require.NoError(t, err) + })) + defer srv.Close() + + rpcClient, err := rpc.DialHTTP(srv.URL) + require.NoError(t, err) + defer rpcClient.Close() + + client := &Service{} + client.rpcClient = rpcClient + + // We call the RPC method via HTTP and expect a proper result. + resp, err := client.GetPayload(ctx, payloadId, 4*params.BeaconConfig().SlotsPerEpoch) + require.NoError(t, err) + _, ok = resp.BlobsBundler.(*pb.BlobsBundleV2) + if !ok { + t.Logf("resp.BlobsBundler has unexpected type: %T", resp.BlobsBundler) + } + require.Equal(t, ok, true) + }) t.Run(ForkchoiceUpdatedMethod+" VALID status", func(t *testing.T) { forkChoiceState := &pb.ForkchoiceState{ HeadBlockHash: []byte("head"), @@ -1539,6 +1586,7 @@ func fixtures() map[string]interface{} { "ExecutionPayloadCapellaWithValue": s.ExecutionPayloadWithValueCapella, "ExecutionPayloadDenebWithValue": s.ExecutionPayloadWithValueDeneb, "ExecutionBundleElectra": s.ExecutionBundleElectra, + "ExecutionBundleFulu": s.ExecutionBundleFulu, "ValidPayloadStatus": s.ValidPayloadStatus, "InvalidBlockHashStatus": s.InvalidBlockHashStatus, "AcceptedStatus": s.AcceptedStatus, @@ -1774,6 +1822,36 @@ func fixturesStruct() *payloadFixtures { append([]byte{pb.WithdrawalRequestType}, withdrawalRequestBytes...), append([]byte{pb.ConsolidationRequestType}, consolidationRequestBytes...)}, } + executionBundleFixtureFulu := &pb.GetPayloadV5ResponseJson{ + ShouldOverrideBuilder: true, + ExecutionPayload: &pb.ExecutionPayloadDenebJSON{ + ParentHash: &common.Hash{'a'}, + FeeRecipient: &common.Address{'b'}, + StateRoot: &common.Hash{'c'}, + ReceiptsRoot: &common.Hash{'d'}, + LogsBloom: &hexutil.Bytes{'e'}, + PrevRandao: &common.Hash{'f'}, + BaseFeePerGas: "0x123", + BlockHash: &common.Hash{'g'}, + Transactions: []hexutil.Bytes{{'h'}}, + Withdrawals: []*pb.Withdrawal{}, + BlockNumber: &hexUint, + GasLimit: &hexUint, + GasUsed: &hexUint, + Timestamp: &hexUint, + BlobGasUsed: &bgu, + ExcessBlobGas: &ebg, + }, + BlockValue: "0x11fffffffff", + BlobsBundle: &pb.BlobBundleV2JSON{ + Commitments: []hexutil.Bytes{[]byte("commitment1"), []byte("commitment2")}, + Proofs: []hexutil.Bytes{[]byte("proof1"), []byte("proof2")}, + Blobs: []hexutil.Bytes{{'a'}, {'b'}}, + }, + ExecutionRequests: []hexutil.Bytes{append([]byte{pb.DepositRequestType}, depositRequestBytes...), + append([]byte{pb.WithdrawalRequestType}, withdrawalRequestBytes...), + append([]byte{pb.ConsolidationRequestType}, consolidationRequestBytes...)}, + } parent := bytesutil.PadTo([]byte("parentHash"), fieldparams.RootLength) sha3Uncles := bytesutil.PadTo([]byte("sha3Uncles"), fieldparams.RootLength) miner := bytesutil.PadTo([]byte("miner"), fieldparams.FeeRecipientLength) @@ -1868,6 +1946,7 @@ func fixturesStruct() *payloadFixtures { ExecutionPayloadWithValueCapella: executionPayloadWithValueFixtureCapella, ExecutionPayloadWithValueDeneb: executionPayloadWithValueFixtureDeneb, ExecutionBundleElectra: executionBundleFixtureElectra, + ExecutionBundleFulu: executionBundleFixtureFulu, ValidPayloadStatus: validStatus, InvalidBlockHashStatus: inValidBlockHashStatus, AcceptedStatus: acceptedStatus, @@ -1892,6 +1971,7 @@ type payloadFixtures struct { ExecutionPayloadWithValueCapella *pb.GetPayloadV2ResponseJson ExecutionPayloadWithValueDeneb *pb.GetPayloadV3ResponseJson ExecutionBundleElectra *pb.GetPayloadV4ResponseJson + ExecutionBundleFulu *pb.GetPayloadV5ResponseJson ValidPayloadStatus *pb.PayloadStatus InvalidBlockHashStatus *pb.PayloadStatus AcceptedStatus *pb.PayloadStatus @@ -2361,7 +2441,7 @@ func Test_ExchangeCapabilities(t *testing.T) { for _, item := range results { require.NotNil(t, item) } - assert.LogsContain(t, logHook, "Please update client, detected the following unsupported engine methods:") + assert.LogsContain(t, logHook, "Connected execution client does not support some requested engine methods") }) t.Run("list of items", func(t *testing.T) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -2424,7 +2504,7 @@ func TestReconstructBlobSidecars(t *testing.T) { t.Run("get-blobs end point is not supported", func(t *testing.T) { hi := mockSummary(t, []bool{true, true, true, true, true, false}) verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, hi) - require.NoError(t, err) + require.ErrorContains(t, "engine_getBlobsV1 is not supported", err) require.Equal(t, 0, len(verifiedBlobs)) }) @@ -2476,6 +2556,76 @@ func TestReconstructBlobSidecars(t *testing.T) { }) } +func TestReconstructDataColumnSidecars(t *testing.T) { + // Start the trusted setup. + err := kzg.Start() + require.NoError(t, err) + + // Setup right fork epoch + params.SetupTestConfigCleanup(t) + cfg := params.BeaconConfig().Copy() + cfg.CapellaForkEpoch = 1 + cfg.DenebForkEpoch = 2 + cfg.ElectraForkEpoch = 3 + cfg.FuluForkEpoch = 4 + params.OverrideBeaconConfig(cfg) + + client := &Service{capabilityCache: &capabilityCache{}} + b := util.NewBeaconBlockFulu() + b.Block.Slot = 4 * params.BeaconConfig().SlotsPerEpoch + kzgCommitments := createRandomKzgCommitments(t, 6) + b.Block.Body.BlobKzgCommitments = kzgCommitments + r, err := b.Block.HashTreeRoot() + require.NoError(t, err) + sb, err := blocks.NewSignedBeaconBlock(b) + require.NoError(t, err) + + ctx := context.Background() + + t.Run("GetBlobsV2 is not supported", func(t *testing.T) { + _, err := client.ReconstructDataColumnSidecars(ctx, sb, r) + require.ErrorContains(t, "get blobs V2 for block", err) + }) + + t.Run("nothing received", func(t *testing.T) { + srv := createBlobServerV2(t, 0, []bool{}) + defer srv.Close() + + rpcClient, client := setupRpcClientV2(t, srv.URL, client) + defer rpcClient.Close() + + dataColumns, err := client.ReconstructDataColumnSidecars(ctx, sb, r) + require.NoError(t, err) + require.Equal(t, 0, len(dataColumns)) + }) + + t.Run("receiving all blobs", func(t *testing.T) { + blobMasks := []bool{true, true, true, true, true, true} + srv := createBlobServerV2(t, 6, blobMasks) + defer srv.Close() + + rpcClient, client := setupRpcClientV2(t, srv.URL, client) + defer rpcClient.Close() + + dataColumns, err := client.ReconstructDataColumnSidecars(ctx, sb, r) + require.NoError(t, err) + require.Equal(t, 128, len(dataColumns)) + }) + + t.Run("missing some blobs", func(t *testing.T) { + blobMasks := []bool{false, true, true, true, true, true} + srv := createBlobServerV2(t, 6, blobMasks) + defer srv.Close() + + rpcClient, client := setupRpcClientV2(t, srv.URL, client) + defer rpcClient.Close() + + dataColumns, err := client.ReconstructDataColumnSidecars(ctx, sb, r) + require.ErrorContains(t, errMissingBlobsAndProofsFromEL.Error(), err) + require.Equal(t, 0, len(dataColumns)) + }) +} + func createRandomKzgCommitments(t *testing.T, num int) [][]byte { kzgCommitments := make([][]byte, num) for i := range kzgCommitments { @@ -2511,6 +2661,42 @@ func createBlobServer(t *testing.T, numBlobs int, callbackFuncs ...func()) *http })) } +func createBlobServerV2(t *testing.T, numBlobs int, blobMasks []bool) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + defer func() { + require.NoError(t, r.Body.Close()) + }() + + require.Equal(t, len(blobMasks), numBlobs) + + blobAndCellProofs := make([]*pb.BlobAndProofV2Json, numBlobs) + for i := range blobAndCellProofs { + if !blobMasks[i] { + continue + } + + blobAndCellProofs[i] = &pb.BlobAndProofV2Json{ + Blob: []byte("0xblob"), + KzgProofs: []hexutil.Bytes{}, + } + for j := 0; j < int(params.BeaconConfig().NumberOfColumns); j++ { + cellProof := make([]byte, 48) + blobAndCellProofs[i].KzgProofs = append(blobAndCellProofs[i].KzgProofs, cellProof) + } + } + + respJSON := map[string]interface{}{ + "jsonrpc": "2.0", + "id": 1, + "result": blobAndCellProofs, + } + + err := json.NewEncoder(w).Encode(respJSON) + require.NoError(t, err) + })) +} + func setupRpcClient(t *testing.T, url string, client *Service) (*rpc.Client, *Service) { rpcClient, err := rpc.DialHTTP(url) require.NoError(t, err) @@ -2522,6 +2708,12 @@ func setupRpcClient(t *testing.T, url string, client *Service) (*rpc.Client, *Se return rpcClient, client } +func setupRpcClientV2(t *testing.T, url string, client *Service) (*rpc.Client, *Service) { + rpcClient, client := setupRpcClient(t, url, client) + client.capabilityCache = &capabilityCache{capabilities: map[string]interface{}{GetBlobsV2: nil}} + return rpcClient, client +} + func testNewBlobVerifier() verification.NewBlobVerifier { return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier { return &verification.MockBlobVerifier{ diff --git a/beacon-chain/execution/testing/BUILD.bazel b/beacon-chain/execution/testing/BUILD.bazel index 459478e805..68604f10aa 100644 --- a/beacon-chain/execution/testing/BUILD.bazel +++ b/beacon-chain/execution/testing/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//beacon-chain/execution/types:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/state/state-native:go_default_library", + "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", "//consensus-types/interfaces:go_default_library", diff --git a/beacon-chain/execution/testing/mock_engine_client.go b/beacon-chain/execution/testing/mock_engine_client.go index 0354ba79bc..cc716e4713 100644 --- a/beacon-chain/execution/testing/mock_engine_client.go +++ b/beacon-chain/execution/testing/mock_engine_client.go @@ -4,6 +4,7 @@ import ( "context" "math/big" + fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" "github.com/OffchainLabs/prysm/v6/config/params" "github.com/OffchainLabs/prysm/v6/consensus-types/blocks" "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" @@ -38,6 +39,8 @@ type EngineClient struct { ErrGetPayload error BlobSidecars []blocks.VerifiedROBlob ErrorBlobSidecars error + DataColumnSidecars []blocks.VerifiedRODataColumn + ErrorDataColumnSidecars error } // NewPayload -- @@ -109,10 +112,14 @@ func (e *EngineClient) ReconstructFullBellatrixBlockBatch( } // ReconstructBlobSidecars is a mock implementation of the ReconstructBlobSidecars method. -func (e *EngineClient) ReconstructBlobSidecars(context.Context, interfaces.ReadOnlySignedBeaconBlock, [32]byte, func(uint64) bool) ([]blocks.VerifiedROBlob, error) { +func (e *EngineClient) ReconstructBlobSidecars(context.Context, interfaces.ReadOnlySignedBeaconBlock, [fieldparams.RootLength]byte, func(uint64) bool) ([]blocks.VerifiedROBlob, error) { return e.BlobSidecars, e.ErrorBlobSidecars } +func (e *EngineClient) ReconstructDataColumnSidecars(context.Context, interfaces.ReadOnlySignedBeaconBlock, [fieldparams.RootLength]byte) ([]blocks.VerifiedRODataColumn, error) { + return e.DataColumnSidecars, e.ErrorDataColumnSidecars +} + // GetTerminalBlockHash -- func (e *EngineClient) GetTerminalBlockHash(ctx context.Context, transitionTime uint64) ([]byte, bool, error) { ttd := new(big.Int) diff --git a/beacon-chain/p2p/peers/status.go b/beacon-chain/p2p/peers/status.go index 0f6be0f410..59ee1fb370 100644 --- a/beacon-chain/p2p/peers/status.go +++ b/beacon-chain/p2p/peers/status.go @@ -705,31 +705,46 @@ func (p *Status) deprecatedPrune() { p.tallyIPTracker() } -// BestFinalized returns the highest finalized epoch equal to or higher than ours that is agreed -// upon by the majority of peers. This method may not return the absolute highest finalized, but -// the finalized epoch in which most peers can serve blocks (plurality voting). -// Ideally, all peers would be reporting the same finalized epoch but some may be behind due to their -// own latency, or because of their finalized epoch at the time we queried them. -// Returns epoch number and list of peers that are at or beyond that epoch. +// BestFinalized returns the highest finalized epoch equal to or higher than `ourFinalizedEpoch` +// that is agreed upon by the majority of peers, and the peers agreeing on this finalized epoch. +// This method may not return the absolute highest finalized epoch, but the finalized epoch in which +// most peers can serve blocks (plurality voting). Ideally, all peers would be reporting the same +// finalized epoch but some may be behind due to their own latency, or because of their finalized +// epoch at the time we queried them. func (p *Status) BestFinalized(maxPeers int, ourFinalizedEpoch primitives.Epoch) (primitives.Epoch, []peer.ID) { + // Retrieve all connected peers. connected := p.Connected() + + // key: finalized epoch, value: number of peers that support this finalized epoch. finalizedEpochVotes := make(map[primitives.Epoch]uint64) + + // key: peer ID, value: finalized epoch of the peer. pidEpoch := make(map[peer.ID]primitives.Epoch, len(connected)) + + // key: peer ID, value: head slot of the peer. pidHead := make(map[peer.ID]primitives.Slot, len(connected)) + potentialPIDs := make([]peer.ID, 0, len(connected)) for _, pid := range connected { peerChainState, err := p.ChainState(pid) - if err == nil && peerChainState != nil && peerChainState.FinalizedEpoch >= ourFinalizedEpoch { - finalizedEpochVotes[peerChainState.FinalizedEpoch]++ - pidEpoch[pid] = peerChainState.FinalizedEpoch - potentialPIDs = append(potentialPIDs, pid) - pidHead[pid] = peerChainState.HeadSlot + + // Skip if the peer's finalized epoch is not defined, or if the peer's finalized epoch is + // lower than ours. + if err != nil || peerChainState == nil || peerChainState.FinalizedEpoch < ourFinalizedEpoch { + continue } + + finalizedEpochVotes[peerChainState.FinalizedEpoch]++ + + pidEpoch[pid] = peerChainState.FinalizedEpoch + pidHead[pid] = peerChainState.HeadSlot + + potentialPIDs = append(potentialPIDs, pid) } // Select the target epoch, which is the epoch most peers agree upon. - var targetEpoch primitives.Epoch - var mostVotes uint64 + // If there is a tie, select the highest epoch. + targetEpoch, mostVotes := primitives.Epoch(0), uint64(0) for epoch, count := range finalizedEpochVotes { if count > mostVotes || (count == mostVotes && epoch > targetEpoch) { mostVotes = count @@ -737,11 +752,12 @@ func (p *Status) BestFinalized(maxPeers int, ourFinalizedEpoch primitives.Epoch) } } - // Sort PIDs by finalized epoch, in decreasing order. + // Sort PIDs by finalized (epoch, head), in decreasing order. sort.Slice(potentialPIDs, func(i, j int) bool { if pidEpoch[potentialPIDs[i]] == pidEpoch[potentialPIDs[j]] { return pidHead[potentialPIDs[i]] > pidHead[potentialPIDs[j]] } + return pidEpoch[potentialPIDs[i]] > pidEpoch[potentialPIDs[j]] }) @@ -765,25 +781,36 @@ func (p *Status) BestFinalized(maxPeers int, ourFinalizedEpoch primitives.Epoch) // and is shared by at least minPeers. func (p *Status) BestNonFinalized(minPeers int, ourHeadEpoch primitives.Epoch) (primitives.Epoch, []peer.ID) { connected := p.Connected() - epochVotes := make(map[primitives.Epoch]uint64) - pidEpoch := make(map[peer.ID]primitives.Epoch, len(connected)) - pidHead := make(map[peer.ID]primitives.Slot, len(connected)) - potentialPIDs := make([]peer.ID, 0, len(connected)) + slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch + ourHeadSlot := slotsPerEpoch.Mul(uint64(ourHeadEpoch)) - ourHeadSlot := params.BeaconConfig().SlotsPerEpoch.Mul(uint64(ourHeadEpoch)) + // key: head epoch, value: number of peers that support this epoch. + epochVotes := make(map[primitives.Epoch]uint64) + + // key: peer ID, value: head epoch of the peer. + pidEpoch := make(map[peer.ID]primitives.Epoch, len(connected)) + + // key: peer ID, value: head slot of the peer. + pidHead := make(map[peer.ID]primitives.Slot, len(connected)) + + potentialPIDs := make([]peer.ID, 0, len(connected)) for _, pid := range connected { peerChainState, err := p.ChainState(pid) - if err == nil && peerChainState != nil && peerChainState.HeadSlot > ourHeadSlot { - epoch := slots.ToEpoch(peerChainState.HeadSlot) - epochVotes[epoch]++ - pidEpoch[pid] = epoch - pidHead[pid] = peerChainState.HeadSlot - potentialPIDs = append(potentialPIDs, pid) + // Skip if the peer's head epoch is not defined, or if the peer's head slot is + // lower or equal than ours. + if err != nil || peerChainState == nil || peerChainState.HeadSlot <= ourHeadSlot { + continue } + + epoch := slots.ToEpoch(peerChainState.HeadSlot) + epochVotes[epoch]++ + pidEpoch[pid] = epoch + pidHead[pid] = peerChainState.HeadSlot + potentialPIDs = append(potentialPIDs, pid) } // Select the target epoch, which has enough peers' votes (>= minPeers). - var targetEpoch primitives.Epoch + targetEpoch := primitives.Epoch(0) for epoch, votes := range epochVotes { if votes >= uint64(minPeers) && targetEpoch < epoch { targetEpoch = epoch @@ -1019,7 +1046,10 @@ func (p *Status) isfromBadIP(pid peer.ID) error { if val, ok := p.ipTracker[ip.String()]; ok { if val > CollocationLimit { - return errors.Errorf("collocation limit exceeded: got %d - limit %d", val, CollocationLimit) + return errors.Errorf( + "colocation limit exceeded: got %d - limit %d for peer %v with IP %v", + val, CollocationLimit, pid, ip.String(), + ) } } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/construct_generic_block.go b/beacon-chain/rpc/prysm/v1alpha1/validator/construct_generic_block.go index 049be1b91d..138949fe66 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/construct_generic_block.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/construct_generic_block.go @@ -8,13 +8,18 @@ import ( enginev1 "github.com/OffchainLabs/prysm/v6/proto/engine/v1" ethpb "github.com/OffchainLabs/prysm/v6/proto/prysm/v1alpha1" "github.com/OffchainLabs/prysm/v6/runtime/version" + "github.com/pkg/errors" "google.golang.org/protobuf/proto" ) // constructGenericBeaconBlock constructs a `GenericBeaconBlock` based on the block version and other parameters. -func (vs *Server) constructGenericBeaconBlock(sBlk interfaces.SignedBeaconBlock, blobsBundle *enginev1.BlobsBundle, winningBid primitives.Wei) (*ethpb.GenericBeaconBlock, error) { +func (vs *Server) constructGenericBeaconBlock( + sBlk interfaces.SignedBeaconBlock, + blobsBundler enginev1.BlobsBundler, + winningBid primitives.Wei, +) (*ethpb.GenericBeaconBlock, error) { if sBlk == nil || sBlk.Block() == nil { - return nil, fmt.Errorf("block cannot be nil") + return nil, errors.New("block cannot be nil") } blockProto, err := sBlk.Block().Proto() @@ -35,11 +40,23 @@ func (vs *Server) constructGenericBeaconBlock(sBlk interfaces.SignedBeaconBlock, case version.Capella: return vs.constructCapellaBlock(blockProto, isBlinded, bidStr), nil case version.Deneb: - return vs.constructDenebBlock(blockProto, isBlinded, bidStr, blobsBundle), nil + bundle, ok := blobsBundler.(*enginev1.BlobsBundle) + if blobsBundler != nil && !ok { + return nil, fmt.Errorf("expected *BlobsBundler, got %T", blobsBundler) + } + return vs.constructDenebBlock(blockProto, isBlinded, bidStr, bundle), nil case version.Electra: - return vs.constructElectraBlock(blockProto, isBlinded, bidStr, blobsBundle), nil + bundle, ok := blobsBundler.(*enginev1.BlobsBundle) + if blobsBundler != nil && !ok { + return nil, fmt.Errorf("expected *BlobsBundler, got %T", blobsBundler) + } + return vs.constructElectraBlock(blockProto, isBlinded, bidStr, bundle), nil case version.Fulu: - return vs.constructFuluBlock(blockProto, isBlinded, bidStr, blobsBundle), nil + bundle, ok := blobsBundler.(*enginev1.BlobsBundleV2) + if blobsBundler != nil && !ok { + return nil, fmt.Errorf("expected *BlobsBundleV2, got %T", blobsBundler) + } + return vs.constructFuluBlock(blockProto, isBlinded, bidStr, bundle), nil default: return nil, fmt.Errorf("unknown block version: %d", sBlk.Version()) } @@ -92,7 +109,7 @@ func (vs *Server) constructElectraBlock(blockProto proto.Message, isBlinded bool return ðpb.GenericBeaconBlock{Block: ðpb.GenericBeaconBlock_Electra{Electra: electraContents}, IsBlinded: false, PayloadValue: payloadValue} } -func (vs *Server) constructFuluBlock(blockProto proto.Message, isBlinded bool, payloadValue string, bundle *enginev1.BlobsBundle) *ethpb.GenericBeaconBlock { +func (vs *Server) constructFuluBlock(blockProto proto.Message, isBlinded bool, payloadValue string, bundle *enginev1.BlobsBundleV2) *ethpb.GenericBeaconBlock { if isBlinded { return ðpb.GenericBeaconBlock{Block: ðpb.GenericBeaconBlock_BlindedFulu{BlindedFulu: blockProto.(*ethpb.BlindedBeaconBlockFulu)}, IsBlinded: true, PayloadValue: payloadValue} } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go index 1206077ca1..d7c76ab214 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go @@ -232,7 +232,7 @@ func (vs *Server) BuildBlockParallel(ctx context.Context, sBlk interfaces.Signed }() winningBid := primitives.ZeroWei() - var bundle *enginev1.BlobsBundle + var bundle enginev1.BlobsBundler if sBlk.Version() >= version.Bellatrix { local, err := vs.getLocalPayload(ctx, sBlk.Block(), head) if err != nil { diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_bellatrix.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_bellatrix.go index 8b56f6af3c..e8ed8467d7 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_bellatrix.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_bellatrix.go @@ -54,7 +54,7 @@ const blockBuilderTimeout = 1 * time.Second const gasLimitAdjustmentFactor = 1024 // Sets the execution data for the block. Execution data can come from local EL client or remote builder depends on validator registration and circuit breaker conditions. -func setExecutionData(ctx context.Context, blk interfaces.SignedBeaconBlock, local *blocks.GetPayloadResponse, bid builder.Bid, builderBoostFactor primitives.Gwei) (primitives.Wei, *enginev1.BlobsBundle, error) { +func setExecutionData(ctx context.Context, blk interfaces.SignedBeaconBlock, local *blocks.GetPayloadResponse, bid builder.Bid, builderBoostFactor primitives.Gwei) (primitives.Wei, enginev1.BlobsBundler, error) { _, span := trace.StartSpan(ctx, "ProposerServer.setExecutionData") defer span.End() @@ -69,13 +69,13 @@ func setExecutionData(ctx context.Context, blk interfaces.SignedBeaconBlock, loc // Use local payload if builder payload is nil. if bid == nil { - return local.Bid, local.BlobsBundle, setLocalExecution(blk, local) + return local.Bid, local.BlobsBundler, setLocalExecution(blk, local) } builderPayload, err := bid.Header() if err != nil { log.WithError(err).Warn("Proposer: failed to retrieve header from BuilderBid") - return local.Bid, local.BlobsBundle, setLocalExecution(blk, local) + return local.Bid, local.BlobsBundler, setLocalExecution(blk, local) } switch { @@ -84,7 +84,7 @@ func setExecutionData(ctx context.Context, blk interfaces.SignedBeaconBlock, loc if err != nil { tracing.AnnotateError(span, err) log.WithError(err).Warn("Proposer: failed to match withdrawals root") - return local.Bid, local.BlobsBundle, setLocalExecution(blk, local) + return local.Bid, local.BlobsBundler, setLocalExecution(blk, local) } // Compare payload values between local and builder. Default to the local value if it is higher. @@ -97,7 +97,7 @@ func setExecutionData(ctx context.Context, blk interfaces.SignedBeaconBlock, loc "minBuilderBid": minBid, "builderGweiValue": builderValueGwei, }).Warn("Proposer: using local execution payload because min bid not attained") - return local.Bid, local.BlobsBundle, setLocalExecution(blk, local) + return local.Bid, local.BlobsBundler, setLocalExecution(blk, local) } // Use local block if min difference is not attained @@ -108,7 +108,7 @@ func setExecutionData(ctx context.Context, blk interfaces.SignedBeaconBlock, loc "minBidDiff": minDiff, "builderGweiValue": builderValueGwei, }).Warn("Proposer: using local execution payload because min difference with local value was not attained") - return local.Bid, local.BlobsBundle, setLocalExecution(blk, local) + return local.Bid, local.BlobsBundler, setLocalExecution(blk, local) } // Use builder payload if the following in true: @@ -133,7 +133,7 @@ func setExecutionData(ctx context.Context, blk interfaces.SignedBeaconBlock, loc bidDeneb, ok := bid.(builder.BidDeneb) if !ok { log.Warnf("bid type %T does not implement builder.BidDeneb", bid) - return local.Bid, local.BlobsBundle, setLocalExecution(blk, local) + return local.Bid, local.BlobsBundler, setLocalExecution(blk, local) } else { builderKzgCommitments = bidDeneb.BlobKzgCommitments() } @@ -144,14 +144,14 @@ func setExecutionData(ctx context.Context, blk interfaces.SignedBeaconBlock, loc bidElectra, ok := bid.(builder.BidElectra) if !ok { log.Warnf("bid type %T does not implement builder.BidElectra", bid) - return local.Bid, local.BlobsBundle, setLocalExecution(blk, local) + return local.Bid, local.BlobsBundler, setLocalExecution(blk, local) } else { executionRequests = bidElectra.ExecutionRequests() } } if err := setBuilderExecution(blk, builderPayload, builderKzgCommitments, executionRequests); err != nil { log.WithError(err).Warn("Proposer: failed to set builder payload") - return local.Bid, local.BlobsBundle, setLocalExecution(blk, local) + return local.Bid, local.BlobsBundler, setLocalExecution(blk, local) } else { return bid.Value(), nil, nil } @@ -171,11 +171,11 @@ func setExecutionData(ctx context.Context, blk interfaces.SignedBeaconBlock, loc trace.Int64Attribute("builderGweiValue", int64(builderValueGwei)), // lint:ignore uintcast -- This is OK for tracing. trace.Int64Attribute("builderBoostFactor", int64(builderBoostFactor)), // lint:ignore uintcast -- This is OK for tracing. ) - return local.Bid, local.BlobsBundle, setLocalExecution(blk, local) + return local.Bid, local.BlobsBundler, setLocalExecution(blk, local) default: // Bellatrix case. if err := setBuilderExecution(blk, builderPayload, nil, nil); err != nil { log.WithError(err).Warn("Proposer: failed to set builder payload") - return local.Bid, local.BlobsBundle, setLocalExecution(blk, local) + return local.Bid, local.BlobsBundler, setLocalExecution(blk, local) } else { return bid.Value(), nil, nil } @@ -375,8 +375,8 @@ func matchingWithdrawalsRoot(local, builder interfaces.ExecutionData) (bool, err // It delegates to setExecution for the actual work. func setLocalExecution(blk interfaces.SignedBeaconBlock, local *blocks.GetPayloadResponse) error { var kzgCommitments [][]byte - if local.BlobsBundle != nil { - kzgCommitments = local.BlobsBundle.KzgCommitments + if local.BlobsBundler != nil { + kzgCommitments = local.BlobsBundler.GetKzgCommitments() } if local.ExecutionRequests != nil { if err := blk.SetExecutionRequests(local.ExecutionRequests); err != nil { diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_bellatrix_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_bellatrix_test.go index 52559233d5..89a04e2071 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_bellatrix_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_bellatrix_test.go @@ -519,7 +519,7 @@ func TestServer_setExecutionData(t *testing.T) { PayloadIDBytes: id, GetPayloadResponse: &blocks.GetPayloadResponse{ ExecutionData: ed, - BlobsBundle: blobsBundle, + BlobsBundler: blobsBundle, Bid: primitives.ZeroWei(), }, } @@ -527,7 +527,7 @@ func TestServer_setExecutionData(t *testing.T) { res, err := vs.getLocalPayload(ctx, blk.Block(), capellaTransitionState) require.NoError(t, err) require.Equal(t, uint64(4), res.ExecutionData.BlockNumber()) - require.DeepEqual(t, res.BlobsBundle, blobsBundle) + require.DeepEqual(t, res.BlobsBundler, blobsBundle) }) t.Run("Can get builder payload and blobs in Deneb", func(t *testing.T) { cfg := params.BeaconConfig().Copy() diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go index aa9b0ca1de..4a183c151d 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_test.go @@ -529,7 +529,7 @@ func TestServer_GetBeaconBlock_Deneb(t *testing.T) { PayloadIDBytes: &enginev1.PayloadIDBytes{1}, GetPayloadResponse: &blocks.GetPayloadResponse{ ExecutionData: ed, - BlobsBundle: bundle, + BlobsBundler: bundle, }, } diff --git a/changelog/manu-peerdas-get-blobs-V2.md b/changelog/manu-peerdas-get-blobs-V2.md new file mode 100644 index 0000000000..6fb1d55383 --- /dev/null +++ b/changelog/manu-peerdas-get-blobs-V2.md @@ -0,0 +1,3 @@ +### Added +- Implement engine method `GetBlobsV2` +- Implement execution `ReconstructDataColumnSidecars`, which reconstruct data column sidecars from data fetched from the execution layer. diff --git a/consensus-types/blocks/execution.go b/consensus-types/blocks/execution.go index 4eb8eadf88..7e4156d386 100644 --- a/consensus-types/blocks/execution.go +++ b/consensus-types/blocks/execution.go @@ -2,7 +2,6 @@ package blocks import ( "bytes" - "errors" fieldparams "github.com/OffchainLabs/prysm/v6/config/fieldparams" consensus_types "github.com/OffchainLabs/prysm/v6/consensus-types" @@ -10,6 +9,7 @@ import ( "github.com/OffchainLabs/prysm/v6/encoding/bytesutil" "github.com/OffchainLabs/prysm/v6/encoding/ssz" enginev1 "github.com/OffchainLabs/prysm/v6/proto/engine/v1" + "github.com/pkg/errors" fastssz "github.com/prysmaticlabs/fastssz" "google.golang.org/protobuf/proto" ) @@ -40,8 +40,10 @@ func NewWrappedExecutionData(v proto.Message) (interfaces.ExecutionData, error) case *enginev1.ExecutionBundleElectra: // note: no payload changes in electra so using deneb return WrappedExecutionPayloadDeneb(pbStruct.Payload) + case *enginev1.ExecutionBundleFulu: + return WrappedExecutionPayloadDeneb(pbStruct.Payload) default: - return nil, ErrUnsupportedVersion + return nil, errors.Wrapf(ErrUnsupportedVersion, "type %T", pbStruct) } } diff --git a/consensus-types/blocks/get_payload.go b/consensus-types/blocks/get_payload.go index f6a749860f..5853f090a6 100644 --- a/consensus-types/blocks/get_payload.go +++ b/consensus-types/blocks/get_payload.go @@ -5,6 +5,7 @@ import ( "github.com/OffchainLabs/prysm/v6/consensus-types/interfaces" "github.com/OffchainLabs/prysm/v6/consensus-types/primitives" pb "github.com/OffchainLabs/prysm/v6/proto/engine/v1" + "github.com/pkg/errors" "google.golang.org/protobuf/proto" ) @@ -12,7 +13,7 @@ import ( // GetPayloadResponseV(1|2|3|4) value. type GetPayloadResponse struct { ExecutionData interfaces.ExecutionData - BlobsBundle *pb.BlobsBundle + BlobsBundler pb.BlobsBundler OverrideBuilder bool // todo: should we convert this to Gwei up front? Bid primitives.Wei @@ -24,6 +25,10 @@ type bundleGetter interface { GetBlobsBundle() *pb.BlobsBundle } +type bundleV2Getter interface { + GetBlobsBundle() *pb.BlobsBundleV2 +} + // bidValueGetter is an interface satisfied by get payload responses that have a bid value. type bidValueGetter interface { GetValue() []byte @@ -41,10 +46,13 @@ func NewGetPayloadResponse(msg proto.Message) (*GetPayloadResponse, error) { r := &GetPayloadResponse{} bundleGetter, hasBundle := msg.(bundleGetter) if hasBundle { - r.BlobsBundle = bundleGetter.GetBlobsBundle() + r.BlobsBundler = bundleGetter.GetBlobsBundle() + } + bundleV2Getter, hasBundle := msg.(bundleV2Getter) + if hasBundle { + r.BlobsBundler = bundleV2Getter.GetBlobsBundle() } bidValueGetter, hasBid := msg.(bidValueGetter) - executionRequestsGetter, hasExecutionRequests := msg.(executionRequestsGetter) wei := primitives.ZeroWei() if hasBid { // The protobuf types that engine api responses unmarshal into store their values in little endian form. @@ -60,13 +68,15 @@ func NewGetPayloadResponse(msg proto.Message) (*GetPayloadResponse, error) { } ed, err := NewWrappedExecutionData(msg) if err != nil { - return nil, err + return nil, errors.Wrap(err, "new wrapped execution data") } r.ExecutionData = ed + + executionRequestsGetter, hasExecutionRequests := msg.(executionRequestsGetter) if hasExecutionRequests { requests, err := executionRequestsGetter.GetDecodedExecutionRequests(params.BeaconConfig().ExecutionRequestLimits()) if err != nil { - return nil, err + return nil, errors.Wrap(err, "get decoded execution requests") } r.ExecutionRequests = requests }