Use engine api get-blobs for block subscriber (#14513)

* Use engine api get-blobs for block subscriber

Debug

changelog

add proto marshal and unmarshal

Kasey's feedback

* Feedback

* Preston's feedback

* Exist argument should not be hardcoded with kzg count
This commit is contained in:
terence
2024-10-24 14:30:14 -07:00
committed by GitHub
parent 52cf3a155d
commit 7ac522d8ff
31 changed files with 799 additions and 213 deletions

View File

@@ -37,6 +37,7 @@ go_library(
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
@@ -105,8 +106,11 @@ go_test(
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/execution/testing:go_default_library",
"//beacon-chain/execution/types:go_default_library",
"//beacon-chain/forkchoice:go_default_library",
"//beacon-chain/forkchoice/doubly-linked-tree:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",

View File

@@ -14,6 +14,7 @@ import (
"github.com/holiman/uint256"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/execution/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
@@ -23,6 +24,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
pb "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"github.com/sirupsen/logrus"
@@ -79,6 +81,8 @@ const (
GetPayloadBodiesByRangeV1 = "engine_getPayloadBodiesByRangeV1"
// ExchangeCapabilities request string for JSON-RPC.
ExchangeCapabilities = "engine_exchangeCapabilities"
// GetBlobsV1 request string for JSON-RPC.
GetBlobsV1 = "engine_getBlobsV1"
// Defines the seconds before timing out engine endpoints with non-block execution semantics.
defaultEngineTimeout = time.Second
)
@@ -93,16 +97,15 @@ type ForkchoiceUpdatedResponse struct {
ValidationError string `json:"validationError"`
}
// PayloadReconstructor defines a service that can reconstruct a full beacon
// block with an execution payload from a signed beacon block and a connection
// to an execution client's engine API.
type PayloadReconstructor interface {
// Reconstructor defines a service responsible for reconstructing full beacon chain objects by utilizing the execution API and making requests through the execution client.
type Reconstructor interface {
ReconstructFullBlock(
ctx context.Context, blindedBlock interfaces.ReadOnlySignedBeaconBlock,
) (interfaces.SignedBeaconBlock, error)
ReconstructFullBellatrixBlockBatch(
ctx context.Context, blindedBlocks []interfaces.ReadOnlySignedBeaconBlock,
) ([]interfaces.SignedBeaconBlock, error)
ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, indices []bool) ([]blocks.VerifiedROBlob, error)
}
// EngineCaller defines a client that can interact with an Ethereum
@@ -494,6 +497,20 @@ func (s *Service) HeaderByNumber(ctx context.Context, number *big.Int) (*types.H
return hdr, err
}
// GetBlobs returns the blob and proof from the execution engine for the given versioned hashes.
func (s *Service) GetBlobs(ctx context.Context, versionedHashes []common.Hash) ([]*pb.BlobAndProof, error) {
ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.GetBlobs")
defer span.End()
// If the execution engine does not support `GetBlobsV1`, return early to prevent encountering an error later.
if !s.capabilityCache.has(GetBlobsV1) {
return nil, nil
}
result := make([]*pb.BlobAndProof, len(versionedHashes))
err := s.rpcClient.CallContext(ctx, &result, GetBlobsV1, versionedHashes)
return result, handleRPCError(err)
}
// ReconstructFullBlock takes in a blinded beacon block and reconstructs
// a beacon block with a full execution payload via the engine API.
func (s *Service) ReconstructFullBlock(
@@ -522,6 +539,109 @@ func (s *Service) ReconstructFullBellatrixBlockBatch(
return unb, nil
}
// ReconstructBlobSidecars reconstructs the verified blob sidecars for a given beacon block.
// It retrieves the KZG commitments from the block body, fetches the associated blobs and proofs,
// and constructs the corresponding verified read-only blob sidecars.
//
// The 'exists' argument is a boolean list (must be the same length as body.BlobKzgCommitments), where each element corresponds to whether a
// particular blob sidecar already exists. If exists[i] is true, the blob for the i-th KZG commitment
// has already been retrieved and does not need to be fetched again from the execution layer (EL).
//
// For example:
// - len(block.Body().BlobKzgCommitments()) == 6
// - If exists = [true, false, true, false, true, false], the function will fetch the blobs
// associated with indices 1, 3, and 5 (since those are marked as non-existent).
// - If exists = [false ... x 6], the function will attempt to fetch all blobs.
//
// Only the blobs that do not already exist (where exists[i] is false) are fetched using the KZG commitments from block body.
func (s *Service) ReconstructBlobSidecars(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte, exists []bool) ([]blocks.VerifiedROBlob, error) {
blockBody := block.Block().Body()
kzgCommitments, err := blockBody.BlobKzgCommitments()
if err != nil {
return nil, errors.Wrap(err, "could not get blob KZG commitments")
}
if len(kzgCommitments) != len(exists) {
return nil, fmt.Errorf("mismatched lengths: KZG commitments %d, exists %d", len(kzgCommitments), len(exists))
}
// Collect KZG hashes for non-existing blobs
var kzgHashes []common.Hash
for i, commitment := range kzgCommitments {
if !exists[i] {
kzgHashes = append(kzgHashes, primitives.ConvertKzgCommitmentToVersionedHash(commitment))
}
}
if len(kzgHashes) == 0 {
return nil, nil
}
// Fetch blobs from EL
blobs, err := s.GetBlobs(ctx, kzgHashes)
if err != nil {
return nil, errors.Wrap(err, "could not get blobs")
}
if len(blobs) == 0 {
return nil, nil
}
header, err := block.Header()
if err != nil {
return nil, errors.Wrap(err, "could not get header")
}
// Reconstruct verified blob sidecars
var verifiedBlobs []blocks.VerifiedROBlob
for i, blobIndex := 0, 0; i < len(kzgCommitments); i++ {
if exists[i] {
continue
}
if blobIndex >= len(blobs) || blobs[blobIndex] == nil {
blobIndex++
continue
}
blob := blobs[blobIndex]
blobIndex++
proof, err := blocks.MerkleProofKZGCommitment(blockBody, i)
if err != nil {
log.WithError(err).WithField("index", i).Error("failed to get Merkle proof for KZG commitment")
continue
}
sidecar := &ethpb.BlobSidecar{
Index: uint64(i),
Blob: blob.Blob,
KzgCommitment: kzgCommitments[i],
KzgProof: blob.KzgProof,
SignedBlockHeader: header,
CommitmentInclusionProof: proof,
}
roBlob, err := blocks.NewROBlobWithRoot(sidecar, blockRoot)
if err != nil {
log.WithError(err).WithField("index", i).Error("failed to create RO blob with root")
continue
}
// Verify the sidecar KZG proof
v := s.blobVerifier(roBlob, verification.ELMemPoolRequirements)
if err := v.SidecarKzgProofVerified(); err != nil {
log.WithError(err).WithField("index", i).Error("failed to verify KZG proof for sidecar")
continue
}
verifiedBlob, err := v.VerifiedROBlob()
if err != nil {
log.WithError(err).WithField("index", i).Error("failed to verify RO blob")
continue
}
verifiedBlobs = append(verifiedBlobs, verifiedBlob)
}
return verifiedBlobs, nil
}
func fullPayloadFromPayloadBody(
header interfaces.ExecutionData, body *pb.ExecutionPayloadBody, bVersion int,
) (interfaces.ExecutionData, error) {

View File

@@ -2,6 +2,7 @@ package execution
import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io"
@@ -20,6 +21,7 @@ import (
"github.com/holiman/uint256"
"github.com/pkg/errors"
mocks "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
@@ -37,9 +39,9 @@ import (
)
var (
_ = PayloadReconstructor(&Service{})
_ = Reconstructor(&Service{})
_ = EngineCaller(&Service{})
_ = PayloadReconstructor(&Service{})
_ = Reconstructor(&Service{})
_ = EngineCaller(&mocks.EngineClient{})
)
@@ -2390,3 +2392,110 @@ func Test_ExchangeCapabilities(t *testing.T) {
}
})
}
func TestReconstructBlobSidecars(t *testing.T) {
client := &Service{capabilityCache: &capabilityCache{}}
b := util.NewBeaconBlockDeneb()
kzgCommitments := createRandomKzgCommitments(t, 6)
b.Block.Body.BlobKzgCommitments = kzgCommitments
r, err := b.Block.HashTreeRoot()
require.NoError(t, err)
sb, err := blocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
ctx := context.Background()
t.Run("all seen", func(t *testing.T) {
exists := []bool{true, true, true, true, true, true}
verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists)
require.NoError(t, err)
require.Equal(t, 0, len(verifiedBlobs))
})
t.Run("get-blobs end point is not supported", func(t *testing.T) {
exists := []bool{true, true, true, true, true, false}
verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists)
require.NoError(t, err)
require.Equal(t, 0, len(verifiedBlobs))
})
client.capabilityCache = &capabilityCache{capabilities: map[string]interface{}{GetBlobsV1: nil}}
t.Run("recovered 6 missing blobs", func(t *testing.T) {
srv := createBlobServer(t, 6)
defer srv.Close()
rpcClient, client := setupRpcClient(t, srv.URL, client)
defer rpcClient.Close()
exists := [6]bool{}
verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists[:])
require.NoError(t, err)
require.Equal(t, 6, len(verifiedBlobs))
})
t.Run("recovered 3 missing blobs", func(t *testing.T) {
srv := createBlobServer(t, 3)
defer srv.Close()
rpcClient, client := setupRpcClient(t, srv.URL, client)
defer rpcClient.Close()
exists := []bool{true, false, true, false, true, false}
verifiedBlobs, err := client.ReconstructBlobSidecars(ctx, sb, r, exists)
require.NoError(t, err)
require.Equal(t, 3, len(verifiedBlobs))
})
}
func createRandomKzgCommitments(t *testing.T, num int) [][]byte {
kzgCommitments := make([][]byte, num)
for i := range kzgCommitments {
kzgCommitments[i] = make([]byte, 48)
_, err := rand.Read(kzgCommitments[i])
require.NoError(t, err)
}
return kzgCommitments
}
func createBlobServer(t *testing.T, numBlobs int) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
defer func() {
require.NoError(t, r.Body.Close())
}()
blobs := make([]pb.BlobAndProofJson, numBlobs)
for i := range blobs {
blobs[i] = pb.BlobAndProofJson{Blob: []byte(fmt.Sprintf("blob%d", i+1)), KzgProof: []byte(fmt.Sprintf("proof%d", i+1))}
}
respJSON := map[string]interface{}{
"jsonrpc": "2.0",
"id": 1,
"result": blobs,
}
require.NoError(t, json.NewEncoder(w).Encode(respJSON))
}))
}
func setupRpcClient(t *testing.T, url string, client *Service) (*rpc.Client, *Service) {
rpcClient, err := rpc.DialHTTP(url)
require.NoError(t, err)
client.rpcClient = rpcClient
client.capabilityCache = &capabilityCache{capabilities: map[string]interface{}{GetBlobsV1: nil}}
client.blobVerifier = testNewBlobVerifier()
return rpcClient, client
}
func testNewBlobVerifier() verification.NewBlobVerifier {
return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier {
return &verification.MockBlobVerifier{
CbVerifiedROBlob: func() (blocks.VerifiedROBlob, error) {
return blocks.VerifiedROBlob{}, nil
},
}
}
}

View File

@@ -7,6 +7,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/network"
"github.com/prysmaticlabs/prysm/v5/network/authorization"
)
@@ -115,3 +116,11 @@ func WithJwtId(jwtId string) Option {
return nil
}
}
// WithVerifierWaiter gives the sync package direct access to the verifier waiter.
func WithVerifierWaiter(v *verification.InitializerWaiter) Option {
return func(s *Service) error {
s.verifierWaiter = v
return nil
}
}

View File

@@ -78,6 +78,13 @@ func (s *Service) pollConnectionStatus(ctx context.Context) {
currClient.Close()
}
log.WithField("endpoint", logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url)).Info("Connected to new endpoint")
c, err := s.ExchangeCapabilities(ctx)
if err != nil {
errorLogger(err, "Could not exchange capabilities with execution client")
}
s.capabilityCache.save(c)
return
case <-s.ctx.Done():
log.Debug("Received cancelled context,closing existing powchain service")

View File

@@ -29,7 +29,9 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state"
native "github.com/prysmaticlabs/prysm/v5/beacon-chain/state/state-native"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/container/trie"
contracts "github.com/prysmaticlabs/prysm/v5/contracts/deposit"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
@@ -155,6 +157,9 @@ type Service struct {
lastReceivedMerkleIndex int64 // Keeps track of the last received index to prevent log spam.
runError error
preGenesisState state.BeaconState
verifierWaiter *verification.InitializerWaiter
blobVerifier verification.NewBlobVerifier
capabilityCache *capabilityCache
}
// NewService sets up a new instance with an ethclient when given a web3 endpoint as a string in the config.
@@ -192,6 +197,7 @@ func NewService(ctx context.Context, opts ...Option) (*Service, error) {
lastReceivedMerkleIndex: -1,
preGenesisState: genState,
eth1HeadTicker: time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerETH1Block) * time.Second),
capabilityCache: &capabilityCache{},
}
for _, opt := range opts {
@@ -229,6 +235,13 @@ func (s *Service) Start() {
}
}
v, err := s.verifierWaiter.WaitForInitializer(s.ctx)
if err != nil {
log.WithError(err).Error("Could not get verification initializer")
return
}
s.blobVerifier = newBlobVerifierFromInitializer(v)
s.isRunning = true
// Poll the execution client connection and fallback if errors occur.
@@ -886,3 +899,39 @@ func (s *Service) migrateOldDepositTree(eth1DataInDB *ethpb.ETH1ChainData) error
func (s *Service) removeStartupState() {
s.cfg.finalizedStateAtStartup = nil
}
func newBlobVerifierFromInitializer(ini *verification.Initializer) verification.NewBlobVerifier {
return func(b blocks.ROBlob, reqs []verification.Requirement) verification.BlobVerifier {
return ini.NewBlobVerifier(b, reqs)
}
}
type capabilityCache struct {
capabilities map[string]interface{}
capabilitiesLock sync.RWMutex
}
func (c *capabilityCache) save(cs []string) {
c.capabilitiesLock.Lock()
defer c.capabilitiesLock.Unlock()
if c.capabilities == nil {
c.capabilities = make(map[string]interface{})
}
for _, capability := range cs {
c.capabilities[capability] = struct{}{}
}
}
func (c *capabilityCache) has(capability string) bool {
c.capabilitiesLock.RLock()
defer c.capabilitiesLock.RUnlock()
if c.capabilities == nil {
return false
}
_, ok := c.capabilities[capability]
return ok
}

View File

@@ -19,8 +19,11 @@ import (
dbutil "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/testing"
mockExecution "github.com/prysmaticlabs/prysm/v5/beacon-chain/execution/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/execution/types"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice"
doublylinkedtree "github.com/prysmaticlabs/prysm/v5/beacon-chain/forkchoice/doubly-linked-tree"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/container/trie"
contracts "github.com/prysmaticlabs/prysm/v5/contracts/deposit"
@@ -92,10 +95,16 @@ func TestStart_OK(t *testing.T) {
t.Cleanup(func() {
server.Stop()
})
c := startup.NewClockSynchronizer()
require.NoError(t, c.SetClock(startup.NewClock(time.Unix(0, 0), [32]byte{})))
waiter := verification.NewInitializerWaiter(
c, forkchoice.NewROForkChoice(nil), nil)
web3Service, err := NewService(context.Background(),
WithHttpEndpoint(endpoint),
WithDepositContractAddress(testAcc.ContractAddr),
WithDatabase(beaconDB),
WithVerifierWaiter(waiter),
)
require.NoError(t, err, "unable to setup execution service")
web3Service = setDefaultMocks(web3Service)

View File

@@ -36,6 +36,8 @@ type EngineClient struct {
OverrideValidHash [32]byte
GetPayloadResponse *blocks.GetPayloadResponse
ErrGetPayload error
BlobSidecars []blocks.VerifiedROBlob
ErrorBlobSidecars error
}
// NewPayload --
@@ -106,6 +108,11 @@ func (e *EngineClient) ReconstructFullBellatrixBlockBatch(
return fullBlocks, nil
}
// ReconstructBlobSidecars is a mock implementation of the ReconstructBlobSidecars method.
func (e *EngineClient) ReconstructBlobSidecars(context.Context, interfaces.ReadOnlySignedBeaconBlock, [32]byte, []bool) ([]blocks.VerifiedROBlob, error) {
return e.BlobSidecars, e.ErrorBlobSidecars
}
// GetTerminalBlockHash --
func (e *EngineClient) GetTerminalBlockHash(ctx context.Context, transitionTime uint64) ([]byte, bool, error) {
ttd := new(big.Int)