From 10fffa6e7c654f285323e5d62b5cfbdab9696a1e Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Wed, 13 Jul 2022 17:18:30 +0000 Subject: [PATCH] [Feature] - Store Only Blinded Beacon Blocks Post-Merge (#11010) * add in flags * add sync and db items * bring over all other changes * enable on * use feature flag * powchain * builds * fix up tests * pass iface * gaz * enable bellatrix blind in unmarshal only behind flag * poolside * pass rpc tests * rebuilds * naming * cleaner func * check needs resync * idiomatic * gaz * rem * build * nicer * build * cleaner * surface error * wrapping * unmarshal logs * fix up * cleaner * log * builds * Update beacon-chain/blockchain/execution_engine.go Co-authored-by: terencechain * terence feedback * test added for resync * nil check * fmt Co-authored-by: terencechain --- beacon-chain/blockchain/pow_block.go | 2 +- beacon-chain/db/kv/BUILD.bazel | 1 + beacon-chain/db/kv/blocks.go | 61 +++++++-- beacon-chain/db/kv/blocks_test.go | 80 +++++++++-- beacon-chain/db/kv/init_test.go | 4 + beacon-chain/db/kv/kv.go | 28 +++- beacon-chain/db/kv/kv_test.go | 16 +++ beacon-chain/db/kv/migration.go | 1 + .../db/kv/migration_blinded_beacon_blocks.go | 27 ++++ beacon-chain/node/node.go | 88 +++++++------ beacon-chain/powchain/engine_client.go | 11 +- beacon-chain/powchain/engine_client_test.go | 5 +- beacon-chain/powchain/testing/BUILD.bazel | 1 + .../powchain/testing/mock_engine_client.go | 53 +++++--- beacon-chain/rpc/eth/beacon/BUILD.bazel | 3 + beacon-chain/rpc/eth/beacon/blocks.go | 36 ++++- beacon-chain/rpc/eth/beacon/blocks_test.go | 5 + beacon-chain/rpc/eth/beacon/server.go | 38 +++--- .../rpc/prysm/v1alpha1/beacon/blocks.go | 6 + .../rpc/prysm/v1alpha1/beacon/blocks_test.go | 11 +- beacon-chain/rpc/service.go | 98 +++++++------- beacon-chain/sync/BUILD.bazel | 6 + beacon-chain/sync/metrics.go | 8 +- beacon-chain/sync/options.go | 8 ++ .../sync/rpc_beacon_blocks_by_range.go | 20 ++- .../sync/rpc_beacon_blocks_by_range_test.go | 124 ++++++++++++++++++ .../sync/rpc_beacon_blocks_by_root.go | 14 +- .../sync/rpc_beacon_blocks_by_root_test.go | 107 +++++++++++++++ beacon-chain/sync/rpc_chunked_response.go | 3 +- beacon-chain/sync/service.go | 32 ++--- config/features/config.go | 5 + config/features/flags.go | 7 +- consensus-types/interfaces/beacon_block.go | 1 + consensus-types/mock/block.go | 4 + consensus-types/wrapper/beacon_block.go | 51 +------ .../wrapper/beacon_block_altair.go | 4 + .../wrapper/beacon_block_bellatrix.go | 38 ++++++ .../wrapper/beacon_block_phase0.go | 4 + consensus-types/wrapper/beacon_block_test.go | 2 +- .../wrapper/blinded_beacon_block_bellatrix.go | 4 + .../shared/common/forkchoice/service.go | 4 +- 41 files changed, 782 insertions(+), 239 deletions(-) create mode 100644 beacon-chain/db/kv/migration_blinded_beacon_blocks.go diff --git a/beacon-chain/blockchain/pow_block.go b/beacon-chain/blockchain/pow_block.go index 6691f71ca8..11ea927600 100644 --- a/beacon-chain/blockchain/pow_block.go +++ b/beacon-chain/blockchain/pow_block.go @@ -84,7 +84,7 @@ func (s *Service) validateMergeBlock(ctx context.Context, b interfaces.SignedBea // getBlkParentHashAndTD retrieves the parent hash and total difficulty of the given block. func (s *Service) getBlkParentHashAndTD(ctx context.Context, blkHash []byte) ([]byte, *uint256.Int, error) { - blk, err := s.cfg.ExecutionEngineCaller.ExecutionBlockByHash(ctx, common.BytesToHash(blkHash)) + blk, err := s.cfg.ExecutionEngineCaller.ExecutionBlockByHash(ctx, common.BytesToHash(blkHash), false /* no txs */) if err != nil { return nil, nil, errors.Wrap(err, "could not get pow block") } diff --git a/beacon-chain/db/kv/BUILD.bazel b/beacon-chain/db/kv/BUILD.bazel index dd828fe086..f388cbe143 100644 --- a/beacon-chain/db/kv/BUILD.bazel +++ b/beacon-chain/db/kv/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "log.go", "migration.go", "migration_archived_index.go", + "migration_blinded_beacon_blocks.go", "migration_block_slot_index.go", "migration_state_validators.go", "powchain.go", diff --git a/beacon-chain/db/kv/blocks.go b/beacon-chain/db/kv/blocks.go index b36868c3e3..473d9e0cad 100644 --- a/beacon-chain/db/kv/blocks.go +++ b/beacon-chain/db/kv/blocks.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" ssz "github.com/prysmaticlabs/fastssz" "github.com/prysmaticlabs/prysm/beacon-chain/db/filters" + "github.com/prysmaticlabs/prysm/config/features" "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/consensus-types/interfaces" types "github.com/prysmaticlabs/prysm/consensus-types/primitives" @@ -130,7 +131,7 @@ func (s *Store) Blocks(ctx context.Context, f *filters.QueryFilter) ([]interface encoded := bkt.Get(keys[i]) blk, err := unmarshalBlock(ctx, encoded) if err != nil { - return err + return errors.Wrapf(err, "could not unmarshal block with key %#x", keys[i]) } blocks = append(blocks, blk) blockRoots = append(blockRoots, bytesutil.ToBytes32(keys[i])) @@ -304,6 +305,16 @@ func (s *Store) SaveBlocks(ctx context.Context, blocks []interfaces.SignedBeacon if err := updateValueForIndices(ctx, indicesForBlocks[i], blockRoots[i], tx); err != nil { return errors.Wrap(err, "could not update DB indices") } + if features.Get().EnableOnlyBlindedBeaconBlocks { + blindedBlock, err := blk.ToBlinded() + if err != nil { + if !errors.Is(err, wrapper.ErrUnsupportedVersion) { + return err + } + } else { + blk = blindedBlock + } + } s.blockCache.Set(string(blockRoots[i]), blk, int64(len(encodedBlocks[i]))) if err := bkt.Put(blockRoots[i], encodedBlocks[i]); err != nil { return err @@ -758,7 +769,7 @@ func unmarshalBlock(_ context.Context, enc []byte) (interfaces.SignedBeaconBlock var err error enc, err = snappy.Decode(nil, enc) if err != nil { - return nil, err + return nil, errors.Wrap(err, "could not snappy decode block") } var rawBlock ssz.Unmarshaler switch { @@ -766,23 +777,23 @@ func unmarshalBlock(_ context.Context, enc []byte) (interfaces.SignedBeaconBlock // Marshal block bytes to altair beacon block. rawBlock = ðpb.SignedBeaconBlockAltair{} if err := rawBlock.UnmarshalSSZ(enc[len(altairKey):]); err != nil { - return nil, err + return nil, errors.Wrap(err, "could not unmarshal Altair block") } case hasBellatrixKey(enc): rawBlock = ðpb.SignedBeaconBlockBellatrix{} if err := rawBlock.UnmarshalSSZ(enc[len(bellatrixKey):]); err != nil { - return nil, err + return nil, errors.Wrap(err, "could not unmarshal Bellatrix block") } case hasBellatrixBlindKey(enc): rawBlock = ðpb.SignedBlindedBeaconBlockBellatrix{} if err := rawBlock.UnmarshalSSZ(enc[len(bellatrixBlindKey):]); err != nil { - return nil, err + return nil, errors.Wrap(err, "could not unmarshal blinded Bellatrix block") } default: // Marshal block bytes to phase 0 beacon block. rawBlock = ðpb.SignedBeaconBlock{} if err := rawBlock.UnmarshalSSZ(enc); err != nil { - return nil, err + return nil, errors.Wrap(err, "could not unmarshal Phase0 block") } } return wrapper.WrappedSignedBeaconBlock(rawBlock) @@ -790,19 +801,41 @@ func unmarshalBlock(_ context.Context, enc []byte) (interfaces.SignedBeaconBlock // marshal versioned beacon block from struct type down to bytes. func marshalBlock(_ context.Context, blk interfaces.SignedBeaconBlock) ([]byte, error) { - obj, err := blk.MarshalSSZ() - if err != nil { - return nil, err + var encodedBlock []byte + var err error + blockToSave := blk + if features.Get().EnableOnlyBlindedBeaconBlocks { + blindedBlock, err := blk.ToBlinded() + switch { + case errors.Is(err, wrapper.ErrUnsupportedVersion): + encodedBlock, err = blk.MarshalSSZ() + if err != nil { + return nil, errors.Wrap(err, "could not marshal non-blinded block") + } + case err != nil: + return nil, errors.Wrap(err, "could not convert block to blinded format") + default: + encodedBlock, err = blindedBlock.MarshalSSZ() + if err != nil { + return nil, errors.Wrap(err, "could not marshal blinded block") + } + blockToSave = blindedBlock + } + } else { + encodedBlock, err = blk.MarshalSSZ() + if err != nil { + return nil, err + } } - switch blk.Version() { + switch blockToSave.Version() { case version.BellatrixBlind: - return snappy.Encode(nil, append(bellatrixBlindKey, obj...)), nil + return snappy.Encode(nil, append(bellatrixBlindKey, encodedBlock...)), nil case version.Bellatrix: - return snappy.Encode(nil, append(bellatrixKey, obj...)), nil + return snappy.Encode(nil, append(bellatrixKey, encodedBlock...)), nil case version.Altair: - return snappy.Encode(nil, append(altairKey, obj...)), nil + return snappy.Encode(nil, append(altairKey, encodedBlock...)), nil case version.Phase0: - return snappy.Encode(nil, obj), nil + return snappy.Encode(nil, encodedBlock), nil default: return nil, errors.New("Unknown block version") } diff --git a/beacon-chain/db/kv/blocks_test.go b/beacon-chain/db/kv/blocks_test.go index 5785609eeb..e75f9be2cc 100644 --- a/beacon-chain/db/kv/blocks_test.go +++ b/beacon-chain/db/kv/blocks_test.go @@ -134,11 +134,17 @@ func TestStore_BlocksCRUD(t *testing.T) { retrievedBlock, err := db.Block(ctx, blockRoot) require.NoError(t, err) assert.DeepEqual(t, nil, retrievedBlock, "Expected nil block") + require.NoError(t, db.SaveBlock(ctx, blk)) assert.Equal(t, true, db.HasBlock(ctx, blockRoot), "Expected block to exist in the db") retrievedBlock, err = db.Block(ctx, blockRoot) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(blk.Proto(), retrievedBlock.Proto()), "Wanted: %v, received: %v", blk, retrievedBlock) + wanted := retrievedBlock + if _, err := retrievedBlock.PbBellatrixBlock(); err == nil { + wanted, err = retrievedBlock.ToBlinded() + require.NoError(t, err) + } + assert.Equal(t, true, proto.Equal(wanted.Proto(), retrievedBlock.Proto()), "Wanted: %v, received: %v", wanted, retrievedBlock) }) } } @@ -314,7 +320,13 @@ func TestStore_BlocksCRUD_NoCache(t *testing.T) { assert.Equal(t, true, db.HasBlock(ctx, blockRoot), "Expected block to exist in the db") retrievedBlock, err = db.Block(ctx, blockRoot) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(blk.Proto(), retrievedBlock.Proto()), "Wanted: %v, received: %v", blk, retrievedBlock) + + wanted := blk + if _, err := blk.PbBellatrixBlock(); err == nil { + wanted, err = blk.ToBlinded() + require.NoError(t, err) + } + assert.Equal(t, true, proto.Equal(wanted.Proto(), retrievedBlock.Proto()), "Wanted: %v, received: %v", wanted, retrievedBlock) }) } } @@ -524,7 +536,12 @@ func TestStore_SaveBlock_CanGetHighestAt(t *testing.T) { root := roots[0] b, err := db.Block(ctx, root) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(block1.Proto(), b.Proto()), "Wanted: %v, received: %v", block1, b) + wanted := block1 + if _, err := block1.PbBellatrixBlock(); err == nil { + wanted, err = wanted.ToBlinded() + require.NoError(t, err) + } + assert.Equal(t, true, proto.Equal(wanted.Proto(), b.Proto()), "Wanted: %v, received: %v", wanted, b) _, roots, err = db.HighestRootsBelowSlot(ctx, 11) require.NoError(t, err) @@ -533,7 +550,12 @@ func TestStore_SaveBlock_CanGetHighestAt(t *testing.T) { root = roots[0] b, err = db.Block(ctx, root) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(block2.Proto(), b.Proto()), "Wanted: %v, received: %v", block2, b) + wanted2 := block2 + if _, err := block2.PbBellatrixBlock(); err == nil { + wanted2, err = block2.ToBlinded() + require.NoError(t, err) + } + assert.Equal(t, true, proto.Equal(wanted2.Proto(), b.Proto()), "Wanted: %v, received: %v", wanted2, b) _, roots, err = db.HighestRootsBelowSlot(ctx, 101) require.NoError(t, err) @@ -542,7 +564,12 @@ func TestStore_SaveBlock_CanGetHighestAt(t *testing.T) { root = roots[0] b, err = db.Block(ctx, root) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(block3.Proto(), b.Proto()), "Wanted: %v, received: %v", block3, b) + wanted = block3 + if _, err := block3.PbBellatrixBlock(); err == nil { + wanted, err = wanted.ToBlinded() + require.NoError(t, err) + } + assert.Equal(t, true, proto.Equal(wanted.Proto(), b.Proto()), "Wanted: %v, received: %v", wanted, b) }) } } @@ -569,7 +596,12 @@ func TestStore_GenesisBlock_CanGetHighestAt(t *testing.T) { root := roots[0] b, err := db.Block(ctx, root) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(block1.Proto(), b.Proto()), "Wanted: %v, received: %v", block1, b) + wanted := block1 + if _, err := block1.PbBellatrixBlock(); err == nil { + wanted, err = block1.ToBlinded() + require.NoError(t, err) + } + assert.Equal(t, true, proto.Equal(wanted.Proto(), b.Proto()), "Wanted: %v, received: %v", wanted, b) _, roots, err = db.HighestRootsBelowSlot(ctx, 1) require.NoError(t, err) @@ -577,7 +609,12 @@ func TestStore_GenesisBlock_CanGetHighestAt(t *testing.T) { root = roots[0] b, err = db.Block(ctx, root) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(genesisBlock.Proto(), b.Proto()), "Wanted: %v, received: %v", genesisBlock, b) + wanted = genesisBlock + if _, err := genesisBlock.PbBellatrixBlock(); err == nil { + wanted, err = genesisBlock.ToBlinded() + require.NoError(t, err) + } + assert.Equal(t, true, proto.Equal(wanted.Proto(), b.Proto()), "Wanted: %v, received: %v", wanted, b) _, roots, err = db.HighestRootsBelowSlot(ctx, 0) require.NoError(t, err) @@ -585,7 +622,12 @@ func TestStore_GenesisBlock_CanGetHighestAt(t *testing.T) { root = roots[0] b, err = db.Block(ctx, root) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(genesisBlock.Proto(), b.Proto()), "Wanted: %v, received: %v", genesisBlock, b) + wanted = genesisBlock + if _, err := genesisBlock.PbBellatrixBlock(); err == nil { + wanted, err = genesisBlock.ToBlinded() + require.NoError(t, err) + } + assert.Equal(t, true, proto.Equal(wanted.Proto(), b.Proto()), "Wanted: %v, received: %v", wanted, b) }) } } @@ -671,15 +713,31 @@ func TestStore_BlocksBySlot_BlockRootsBySlot(t *testing.T) { assert.Equal(t, 0, len(retrievedBlocks), "Unexpected number of blocks received, expected none") retrievedBlocks, err = db.BlocksBySlot(ctx, 20) require.NoError(t, err) - assert.Equal(t, true, proto.Equal(b1.Proto(), retrievedBlocks[0].Proto()), "Wanted: %v, received: %v", b1, retrievedBlocks[0]) + + wanted := b1 + if _, err := b1.PbBellatrixBlock(); err == nil { + wanted, err = b1.ToBlinded() + require.NoError(t, err) + } + assert.Equal(t, true, proto.Equal(retrievedBlocks[0].Proto(), wanted.Proto()), "Wanted: %v, received: %v", retrievedBlocks[0], wanted) assert.Equal(t, true, len(retrievedBlocks) > 0, "Expected to have blocks") retrievedBlocks, err = db.BlocksBySlot(ctx, 100) require.NoError(t, err) if len(retrievedBlocks) != 2 { t.Fatalf("Expected 2 blocks, received %d blocks", len(retrievedBlocks)) } - assert.Equal(t, true, proto.Equal(b2.Proto(), retrievedBlocks[0].Proto()), "Wanted: %v, received: %v", b2, retrievedBlocks[0]) - assert.Equal(t, true, proto.Equal(b3.Proto(), retrievedBlocks[1].Proto()), "Wanted: %v, received: %v", b3, retrievedBlocks[1]) + wanted = b2 + if _, err := b2.PbBellatrixBlock(); err == nil { + wanted, err = b2.ToBlinded() + require.NoError(t, err) + } + assert.Equal(t, true, proto.Equal(wanted.Proto(), retrievedBlocks[0].Proto()), "Wanted: %v, received: %v", retrievedBlocks[0], wanted) + wanted = b3 + if _, err := b3.PbBellatrixBlock(); err == nil { + wanted, err = b3.ToBlinded() + require.NoError(t, err) + } + assert.Equal(t, true, proto.Equal(retrievedBlocks[1].Proto(), wanted.Proto()), "Wanted: %v, received: %v", retrievedBlocks[1], wanted) assert.Equal(t, true, len(retrievedBlocks) > 0, "Expected to have blocks") hasBlockRoots, retrievedBlockRoots, err := db.BlockRootsBySlot(ctx, 1) diff --git a/beacon-chain/db/kv/init_test.go b/beacon-chain/db/kv/init_test.go index c01769ee5d..c78dd6e2ed 100644 --- a/beacon-chain/db/kv/init_test.go +++ b/beacon-chain/db/kv/init_test.go @@ -1,6 +1,7 @@ package kv import ( + "github.com/prysmaticlabs/prysm/config/features" "github.com/prysmaticlabs/prysm/config/params" ) @@ -9,4 +10,7 @@ func init() { if err := params.SetActive(params.MainnetTestConfig()); err != nil { panic(err) } + features.Init(&features.Flags{ + EnableOnlyBlindedBeaconBlocks: true, + }) } diff --git a/beacon-chain/db/kv/kv.go b/beacon-chain/db/kv/kv.go index a34fe0ddaa..7d73264f56 100644 --- a/beacon-chain/db/kv/kv.go +++ b/beacon-chain/db/kv/kv.go @@ -4,6 +4,7 @@ package kv import ( "context" + "fmt" "os" "path" "time" @@ -14,6 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" prombolt "github.com/prysmaticlabs/prombbolt" "github.com/prysmaticlabs/prysm/beacon-chain/db/iface" + "github.com/prysmaticlabs/prysm/config/features" "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/io/file" bolt "go.etcd.io/bbolt" @@ -183,8 +185,13 @@ func NewKVStore(ctx context.Context, dirPath string, config *Config) (*Store, er }); err != nil { return nil, err } - err = prometheus.Register(createBoltCollector(kv.db)) - return kv, err + if err = prometheus.Register(createBoltCollector(kv.db)); err != nil { + return nil, err + } + if err = kv.checkNeedsResync(); err != nil { + return nil, err + } + return kv, nil } // ClearDB removes the previously stored database in the data directory. @@ -216,6 +223,23 @@ func (s *Store) DatabasePath() string { return s.databasePath } +func (s *Store) checkNeedsResync() error { + return s.db.View(func(tx *bolt.Tx) error { + bkt := tx.Bucket(migrationsBucket) + hasDisabledFeature := !features.Get().EnableOnlyBlindedBeaconBlocks + if hasDisabledFeature && bkt.Get(migrationBlindedBeaconBlocksKey) != nil { + return fmt.Errorf( + "you have disabled the flag %s, and your node must resync to ensure your "+ + "database is compatible. If you do not want to resync, please re-enable the %s flag", + features.EnableOnlyBlindedBeaconBlocks.Name, + features.EnableOnlyBlindedBeaconBlocks.Name, + ) + } + return nil + }) + +} + func createBuckets(tx *bolt.Tx, buckets ...[]byte) error { for _, bucket := range buckets { if _, err := tx.CreateBucketIfNotExists(bucket); err != nil { diff --git a/beacon-chain/db/kv/kv_test.go b/beacon-chain/db/kv/kv_test.go index 768718ce83..d64e93d331 100644 --- a/beacon-chain/db/kv/kv_test.go +++ b/beacon-chain/db/kv/kv_test.go @@ -4,7 +4,9 @@ import ( "context" "testing" + "github.com/prysmaticlabs/prysm/config/features" "github.com/prysmaticlabs/prysm/testing/require" + bolt "go.etcd.io/bbolt" ) // setupDB instantiates and returns a Store instance. @@ -16,3 +18,17 @@ func setupDB(t testing.TB) *Store { }) return db } + +func Test_checkNeedsResync(t *testing.T) { + store := setupDB(t) + resetFn := features.InitWithReset(&features.Flags{ + EnableOnlyBlindedBeaconBlocks: false, + }) + defer resetFn() + require.NoError(t, store.db.Update(func(tx *bolt.Tx) error { + bkt := tx.Bucket(migrationsBucket) + return bkt.Put(migrationBlindedBeaconBlocksKey, migrationCompleted) + })) + err := store.checkNeedsResync() + require.ErrorContains(t, "your node must resync", err) +} diff --git a/beacon-chain/db/kv/migration.go b/beacon-chain/db/kv/migration.go index 70c485d2c9..7803fd0daf 100644 --- a/beacon-chain/db/kv/migration.go +++ b/beacon-chain/db/kv/migration.go @@ -14,6 +14,7 @@ var migrations = []migration{ migrateArchivedIndex, migrateBlockSlotIndex, migrateStateValidators, + migrateBlindedBeaconBlocksEnabled, } // RunMigrations defined in the migrations array. diff --git a/beacon-chain/db/kv/migration_blinded_beacon_blocks.go b/beacon-chain/db/kv/migration_blinded_beacon_blocks.go new file mode 100644 index 0000000000..26b880018d --- /dev/null +++ b/beacon-chain/db/kv/migration_blinded_beacon_blocks.go @@ -0,0 +1,27 @@ +package kv + +import ( + "bytes" + "context" + + "github.com/prysmaticlabs/prysm/config/features" + bolt "go.etcd.io/bbolt" +) + +var migrationBlindedBeaconBlocksKey = []byte("blinded-beacon-blocks-enabled") + +func migrateBlindedBeaconBlocksEnabled(ctx context.Context, db *bolt.DB) error { + if !features.Get().EnableOnlyBlindedBeaconBlocks { + return nil // Only write to the migrations bucket if the feature is enabled. + } + if updateErr := db.Update(func(tx *bolt.Tx) error { + mb := tx.Bucket(migrationsBucket) + if b := mb.Get(migrationBlindedBeaconBlocksKey); bytes.Equal(b, migrationCompleted) { + return nil // Migration already completed. + } + return mb.Put(migrationBlindedBeaconBlocksKey, migrationCompleted) + }); updateErr != nil { + return updateErr + } + return nil +} diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 11addedeb4..1783111778 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -694,6 +694,7 @@ func (b *BeaconNode) registerSyncService() error { regularsync.WithStateGen(b.stateGen), regularsync.WithSlasherAttestationsFeed(b.slasherAttestationsFeed), regularsync.WithSlasherBlockHeadersFeed(b.slasherBlockHeadersFeed), + regularsync.WithExecutionPayloadReconstructor(web3Service), ) return b.services.RegisterService(rs) } @@ -799,49 +800,50 @@ func (b *BeaconNode) registerRPCService() error { p2pService := b.fetchP2P() rpcService := rpc.NewService(b.ctx, &rpc.Config{ - Host: host, - Port: port, - BeaconMonitoringHost: beaconMonitoringHost, - BeaconMonitoringPort: beaconMonitoringPort, - CertFlag: cert, - KeyFlag: key, - BeaconDB: b.db, - Broadcaster: p2pService, - PeersFetcher: p2pService, - PeerManager: p2pService, - MetadataProvider: p2pService, - ChainInfoFetcher: chainService, - HeadUpdater: chainService, - HeadFetcher: chainService, - CanonicalFetcher: chainService, - ForkFetcher: chainService, - FinalizationFetcher: chainService, - BlockReceiver: chainService, - AttestationReceiver: chainService, - GenesisTimeFetcher: chainService, - GenesisFetcher: chainService, - OptimisticModeFetcher: chainService, - AttestationsPool: b.attestationPool, - ExitPool: b.exitPool, - SlashingsPool: b.slashingsPool, - SlashingChecker: slasherService, - SyncCommitteeObjectPool: b.syncCommitteePool, - POWChainService: web3Service, - POWChainInfoFetcher: web3Service, - ChainStartFetcher: chainStartFetcher, - MockEth1Votes: mockEth1DataVotes, - SyncService: syncService, - DepositFetcher: depositFetcher, - PendingDepositFetcher: b.depositCache, - BlockNotifier: b, - StateNotifier: b, - OperationNotifier: b, - StateGen: b.stateGen, - EnableDebugRPCEndpoints: enableDebugRPCEndpoints, - MaxMsgSize: maxMsgSize, - ProposerIdsCache: b.proposerIdsCache, - ExecutionEngineCaller: web3Service, - BlockBuilder: b.fetchBuilderService(), + ExecutionEngineCaller: web3Service, + ExecutionPayloadReconstructor: web3Service, + Host: host, + Port: port, + BeaconMonitoringHost: beaconMonitoringHost, + BeaconMonitoringPort: beaconMonitoringPort, + CertFlag: cert, + KeyFlag: key, + BeaconDB: b.db, + Broadcaster: p2pService, + PeersFetcher: p2pService, + PeerManager: p2pService, + MetadataProvider: p2pService, + ChainInfoFetcher: chainService, + HeadUpdater: chainService, + HeadFetcher: chainService, + CanonicalFetcher: chainService, + ForkFetcher: chainService, + FinalizationFetcher: chainService, + BlockReceiver: chainService, + AttestationReceiver: chainService, + GenesisTimeFetcher: chainService, + GenesisFetcher: chainService, + OptimisticModeFetcher: chainService, + AttestationsPool: b.attestationPool, + ExitPool: b.exitPool, + SlashingsPool: b.slashingsPool, + SlashingChecker: slasherService, + SyncCommitteeObjectPool: b.syncCommitteePool, + POWChainService: web3Service, + POWChainInfoFetcher: web3Service, + ChainStartFetcher: chainStartFetcher, + MockEth1Votes: mockEth1DataVotes, + SyncService: syncService, + DepositFetcher: depositFetcher, + PendingDepositFetcher: b.depositCache, + BlockNotifier: b, + StateNotifier: b, + OperationNotifier: b, + StateGen: b.stateGen, + EnableDebugRPCEndpoints: enableDebugRPCEndpoints, + MaxMsgSize: maxMsgSize, + ProposerIdsCache: b.proposerIdsCache, + BlockBuilder: b.fetchBuilderService(), }) return b.services.RegisterService(rpcService) diff --git a/beacon-chain/powchain/engine_client.go b/beacon-chain/powchain/engine_client.go index 45ebc023c4..0618959c48 100644 --- a/beacon-chain/powchain/engine_client.go +++ b/beacon-chain/powchain/engine_client.go @@ -68,7 +68,7 @@ type EngineCaller interface { ExchangeTransitionConfiguration( ctx context.Context, cfg *pb.TransitionConfiguration, ) error - ExecutionBlockByHash(ctx context.Context, hash common.Hash) (*pb.ExecutionBlock, error) + ExecutionBlockByHash(ctx context.Context, hash common.Hash, withTxs bool) (*pb.ExecutionBlock, error) GetTerminalBlockHash(ctx context.Context) ([]byte, bool, error) } @@ -247,7 +247,7 @@ func (s *Service) GetTerminalBlockHash(ctx context.Context) ([]byte, bool, error if parentHash == params.BeaconConfig().ZeroHash { return nil, false, nil } - parentBlk, err := s.ExecutionBlockByHash(ctx, parentHash) + parentBlk, err := s.ExecutionBlockByHash(ctx, parentHash, false /* no txs */) if err != nil { return nil, false, errors.Wrap(err, "could not get parent execution block") } @@ -296,12 +296,11 @@ func (s *Service) LatestExecutionBlock(ctx context.Context) (*pb.ExecutionBlock, // ExecutionBlockByHash fetches an execution engine block by hash by calling // eth_blockByHash via JSON-RPC. -func (s *Service) ExecutionBlockByHash(ctx context.Context, hash common.Hash) (*pb.ExecutionBlock, error) { +func (s *Service) ExecutionBlockByHash(ctx context.Context, hash common.Hash, withTxs bool) (*pb.ExecutionBlock, error) { ctx, span := trace.StartSpan(ctx, "powchain.engine-api-client.ExecutionBlockByHash") defer span.End() - result := &pb.ExecutionBlock{} - err := s.rpcClient.CallContext(ctx, result, ExecutionBlockByHashMethod, hash, false /* no full transaction objects */) + err := s.rpcClient.CallContext(ctx, result, ExecutionBlockByHashMethod, hash, withTxs) return result, handleRPCError(err) } @@ -321,7 +320,7 @@ func (s *Service) ReconstructFullBellatrixBlock( return nil, err } executionBlockHash := common.BytesToHash(header.BlockHash()) - executionBlock, err := s.ExecutionBlockByHash(ctx, executionBlockHash) + executionBlock, err := s.ExecutionBlockByHash(ctx, executionBlockHash, true /* with txs */) if err != nil { return nil, fmt.Errorf("could not fetch execution block with txs by hash %#x: %v", executionBlockHash, err) } diff --git a/beacon-chain/powchain/engine_client_test.go b/beacon-chain/powchain/engine_client_test.go index 8627e1b8b7..7de46b952b 100644 --- a/beacon-chain/powchain/engine_client_test.go +++ b/beacon-chain/powchain/engine_client_test.go @@ -31,6 +31,7 @@ import ( var ( _ = ExecutionPayloadReconstructor(&Service{}) _ = EngineCaller(&Service{}) + _ = ExecutionPayloadReconstructor(&Service{}) _ = EngineCaller(&mocks.EngineClient{}) ) @@ -88,7 +89,7 @@ func TestClient_IPC(t *testing.T) { want, ok := fix["ExecutionBlock"].(*pb.ExecutionBlock) require.Equal(t, true, ok) arg := common.BytesToHash([]byte("foo")) - resp, err := srv.ExecutionBlockByHash(ctx, arg) + resp, err := srv.ExecutionBlockByHash(ctx, arg, true /* with txs */) require.NoError(t, err) require.DeepEqual(t, want, resp) }) @@ -396,7 +397,7 @@ func TestClient_HTTP(t *testing.T) { service.rpcClient = rpcClient // We call the RPC method via HTTP and expect a proper result. - resp, err := service.ExecutionBlockByHash(ctx, arg) + resp, err := service.ExecutionBlockByHash(ctx, arg, true /* with txs */) require.NoError(t, err) require.DeepEqual(t, want, resp) }) diff --git a/beacon-chain/powchain/testing/BUILD.bazel b/beacon-chain/powchain/testing/BUILD.bazel index 0118a25fba..64771eb21a 100644 --- a/beacon-chain/powchain/testing/BUILD.bazel +++ b/beacon-chain/powchain/testing/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//beacon-chain/state/v1:go_default_library", "//config/params:go_default_library", "//consensus-types/interfaces:go_default_library", + "//consensus-types/wrapper:go_default_library", "//encoding/bytesutil:go_default_library", "//proto/engine/v1:go_default_library", "//proto/prysm/v1alpha1:go_default_library", diff --git a/beacon-chain/powchain/testing/mock_engine_client.go b/beacon-chain/powchain/testing/mock_engine_client.go index 2653b4ea6a..5714ec74b1 100644 --- a/beacon-chain/powchain/testing/mock_engine_client.go +++ b/beacon-chain/powchain/testing/mock_engine_client.go @@ -10,26 +10,29 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/consensus-types/interfaces" + "github.com/prysmaticlabs/prysm/consensus-types/wrapper" "github.com/prysmaticlabs/prysm/encoding/bytesutil" pb "github.com/prysmaticlabs/prysm/proto/engine/v1" ) // EngineClient -- type EngineClient struct { - NewPayloadResp []byte - PayloadIDBytes *pb.PayloadIDBytes - ForkChoiceUpdatedResp []byte - ExecutionPayload *pb.ExecutionPayload - ExecutionBlock *pb.ExecutionBlock - Err error - ErrLatestExecBlock error - ErrExecBlockByHash error - ErrForkchoiceUpdated error - ErrNewPayload error - BlockByHashMap map[[32]byte]*pb.ExecutionBlock - TerminalBlockHash []byte - TerminalBlockHashExists bool - OverrideValidHash [32]byte + NewPayloadResp []byte + PayloadIDBytes *pb.PayloadIDBytes + ForkChoiceUpdatedResp []byte + ExecutionPayload *pb.ExecutionPayload + ExecutionBlock *pb.ExecutionBlock + Err error + ErrLatestExecBlock error + ErrExecBlockByHash error + ErrForkchoiceUpdated error + ErrNewPayload error + ExecutionPayloadByBlockHash map[[32]byte]*pb.ExecutionPayload + BlockByHashMap map[[32]byte]*pb.ExecutionBlock + NumReconstructedPayloads uint64 + TerminalBlockHash []byte + TerminalBlockHashExists bool + OverrideValidHash [32]byte } // NewPayload -- @@ -63,7 +66,7 @@ func (e *EngineClient) LatestExecutionBlock(_ context.Context) (*pb.ExecutionBlo } // ExecutionBlockByHash -- -func (e *EngineClient) ExecutionBlockByHash(_ context.Context, h common.Hash) (*pb.ExecutionBlock, error) { +func (e *EngineClient) ExecutionBlockByHash(_ context.Context, h common.Hash, _ bool) (*pb.ExecutionBlock, error) { b, ok := e.BlockByHashMap[h] if !ok { return nil, errors.New("block not found") @@ -71,6 +74,24 @@ func (e *EngineClient) ExecutionBlockByHash(_ context.Context, h common.Hash) (* return b, e.ErrExecBlockByHash } +func (e *EngineClient) ReconstructFullBellatrixBlock( + _ context.Context, blindedBlock interfaces.SignedBeaconBlock, +) (interfaces.SignedBeaconBlock, error) { + if !blindedBlock.Block().IsBlinded() { + return nil, errors.New("block must be blinded") + } + header, err := blindedBlock.Block().Body().Execution() + if err != nil { + return nil, err + } + payload, ok := e.ExecutionPayloadByBlockHash[bytesutil.ToBytes32(header.BlockHash())] + if !ok { + return nil, errors.New("block not found") + } + e.NumReconstructedPayloads++ + return wrapper.BuildSignedBeaconBlockFromExecutionPayload(blindedBlock, payload) +} + // GetTerminalBlockHash -- func (e *EngineClient) GetTerminalBlockHash(ctx context.Context) ([]byte, bool, error) { ttd := new(big.Int) @@ -99,7 +120,7 @@ func (e *EngineClient) GetTerminalBlockHash(ctx context.Context) ([]byte, bool, if parentHash == params.BeaconConfig().ZeroHash { return nil, false, nil } - parentBlk, err := e.ExecutionBlockByHash(ctx, parentHash) + parentBlk, err := e.ExecutionBlockByHash(ctx, parentHash, false /* with txs */) if err != nil { return nil, false, errors.Wrap(err, "could not get parent execution block") } diff --git a/beacon-chain/rpc/eth/beacon/BUILD.bazel b/beacon-chain/rpc/eth/beacon/BUILD.bazel index e527709cc0..195576fc9c 100644 --- a/beacon-chain/rpc/eth/beacon/BUILD.bazel +++ b/beacon-chain/rpc/eth/beacon/BUILD.bazel @@ -29,6 +29,7 @@ go_library( "//beacon-chain/operations/slashings:go_default_library", "//beacon-chain/operations/voluntaryexits:go_default_library", "//beacon-chain/p2p:go_default_library", + "//beacon-chain/powchain:go_default_library", "//beacon-chain/rpc/eth/helpers:go_default_library", "//beacon-chain/rpc/prysm/v1alpha1/validator:go_default_library", "//beacon-chain/rpc/statefetcher:go_default_library", @@ -88,6 +89,7 @@ go_test( "//beacon-chain/operations/synccommittee:go_default_library", "//beacon-chain/operations/voluntaryexits/mock:go_default_library", "//beacon-chain/p2p/testing:go_default_library", + "//beacon-chain/powchain/testing:go_default_library", "//beacon-chain/rpc/eth/helpers:go_default_library", "//beacon-chain/rpc/prysm/v1alpha1/validator:go_default_library", "//beacon-chain/rpc/statefetcher:go_default_library", @@ -101,6 +103,7 @@ go_test( "//crypto/bls:go_default_library", "//encoding/bytesutil:go_default_library", "//encoding/ssz:go_default_library", + "//proto/engine/v1:go_default_library", "//proto/eth/service:go_default_library", "//proto/eth/v1:go_default_library", "//proto/eth/v2:go_default_library", diff --git a/beacon-chain/rpc/eth/beacon/blocks.go b/beacon-chain/rpc/eth/beacon/blocks.go index f97cc86fce..f7257972d8 100644 --- a/beacon-chain/rpc/eth/beacon/blocks.go +++ b/beacon-chain/rpc/eth/beacon/blocks.go @@ -474,8 +474,42 @@ func (bs *Server) GetBlockV2(ctx context.Context, req *ethpbv2.BlockRequestV2) ( ExecutionOptimistic: isOptimistic, }, nil } + if _, err := blk.PbBlindedBellatrixBlock(); err == nil { + signedFullBlock, err := bs.ExecutionPayloadReconstructor.ReconstructFullBellatrixBlock(ctx, blk) + if err != nil { + return nil, status.Errorf( + codes.Internal, + "Could not reconstruct full execution payload to create signed beacon block: %v", + err, + ) + } + bellatrixBlk, err = signedFullBlock.PbBellatrixBlock() + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not get signed beacon block: %v", err) + } + v2Blk, err := migration.V1Alpha1BeaconBlockBellatrixToV2(bellatrixBlk.Block) + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not get signed beacon block: %v", err) + } + root, err := blk.Block().HashTreeRoot() + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not get block root: %v", err) + } + isOptimistic, err := bs.OptimisticModeFetcher.IsOptimisticForRoot(ctx, root) + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not check if block is optimistic: %v", err) + } + return ðpbv2.BlockResponseV2{ + Version: ethpbv2.Version_BELLATRIX, + Data: ðpbv2.SignedBeaconBlockContainerV2{ + Message: ðpbv2.SignedBeaconBlockContainerV2_BellatrixBlock{BellatrixBlock: v2Blk}, + Signature: blk.Signature(), + }, + ExecutionOptimistic: isOptimistic, + }, nil + } // ErrUnsupportedBellatrixBlock means that we have another block type - if !errors.Is(err, wrapper.ErrUnsupportedBellatrixBlock) { + if !errors.Is(err, wrapper.ErrUnsupportedBlindedBellatrixBlock) { return nil, status.Errorf(codes.Internal, "Could not get signed beacon block: %v", err) } diff --git a/beacon-chain/rpc/eth/beacon/blocks_test.go b/beacon-chain/rpc/eth/beacon/blocks_test.go index 2627266c91..e8ed53b51f 100644 --- a/beacon-chain/rpc/eth/beacon/blocks_test.go +++ b/beacon-chain/rpc/eth/beacon/blocks_test.go @@ -9,12 +9,14 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/db" dbTest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" mockp2p "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" + powchaintesting "github.com/prysmaticlabs/prysm/beacon-chain/powchain/testing" "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/consensus-types/interfaces" types "github.com/prysmaticlabs/prysm/consensus-types/primitives" "github.com/prysmaticlabs/prysm/consensus-types/wrapper" "github.com/prysmaticlabs/prysm/encoding/bytesutil" "github.com/prysmaticlabs/prysm/encoding/ssz" + enginev1 "github.com/prysmaticlabs/prysm/proto/engine/v1" ethpbv1 "github.com/prysmaticlabs/prysm/proto/eth/v1" ethpbv2 "github.com/prysmaticlabs/prysm/proto/eth/v2" "github.com/prysmaticlabs/prysm/proto/migration" @@ -1290,6 +1292,9 @@ func TestServer_GetBlockV2(t *testing.T) { ChainInfoFetcher: mockChainService, HeadFetcher: mockChainService, OptimisticModeFetcher: mockChainService, + ExecutionPayloadReconstructor: &powchaintesting.EngineClient{ + ExecutionPayloadByBlockHash: map[[32]byte]*enginev1.ExecutionPayload{}, + }, } genBlk, blkContainers := fillDBTestBlocksBellatrix(ctx, t, beaconDB) diff --git a/beacon-chain/rpc/eth/beacon/server.go b/beacon-chain/rpc/eth/beacon/server.go index 388b4fdfb0..6033aca117 100644 --- a/beacon-chain/rpc/eth/beacon/server.go +++ b/beacon-chain/rpc/eth/beacon/server.go @@ -12,6 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/operations/slashings" "github.com/prysmaticlabs/prysm/beacon-chain/operations/voluntaryexits" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/beacon-chain/powchain" v1alpha1validator "github.com/prysmaticlabs/prysm/beacon-chain/rpc/prysm/v1alpha1/validator" "github.com/prysmaticlabs/prysm/beacon-chain/rpc/statefetcher" "github.com/prysmaticlabs/prysm/beacon-chain/state/stategen" @@ -21,22 +22,23 @@ import ( // Server defines a server implementation of the gRPC Beacon Chain service, // providing RPC endpoints to access data relevant to the Ethereum Beacon Chain. type Server struct { - BeaconDB db.ReadOnlyDatabase - ChainInfoFetcher blockchain.ChainInfoFetcher - GenesisTimeFetcher blockchain.TimeFetcher - BlockReceiver blockchain.BlockReceiver - BlockNotifier blockfeed.Notifier - OperationNotifier operation.Notifier - Broadcaster p2p.Broadcaster - AttestationsPool attestations.Pool - SlashingsPool slashings.PoolManager - VoluntaryExitsPool voluntaryexits.PoolManager - StateGenService stategen.StateManager - StateFetcher statefetcher.Fetcher - HeadFetcher blockchain.HeadFetcher - OptimisticModeFetcher blockchain.OptimisticModeFetcher - V1Alpha1ValidatorServer *v1alpha1validator.Server - SyncChecker sync.Checker - CanonicalHistory *stategen.CanonicalHistory - HeadUpdater blockchain.HeadUpdater + BeaconDB db.ReadOnlyDatabase + ChainInfoFetcher blockchain.ChainInfoFetcher + GenesisTimeFetcher blockchain.TimeFetcher + BlockReceiver blockchain.BlockReceiver + BlockNotifier blockfeed.Notifier + OperationNotifier operation.Notifier + Broadcaster p2p.Broadcaster + AttestationsPool attestations.Pool + SlashingsPool slashings.PoolManager + VoluntaryExitsPool voluntaryexits.PoolManager + StateGenService stategen.StateManager + StateFetcher statefetcher.Fetcher + HeadFetcher blockchain.HeadFetcher + OptimisticModeFetcher blockchain.OptimisticModeFetcher + V1Alpha1ValidatorServer *v1alpha1validator.Server + SyncChecker sync.Checker + CanonicalHistory *stategen.CanonicalHistory + HeadUpdater blockchain.HeadUpdater + ExecutionPayloadReconstructor powchain.ExecutionPayloadReconstructor } diff --git a/beacon-chain/rpc/prysm/v1alpha1/beacon/blocks.go b/beacon-chain/rpc/prysm/v1alpha1/beacon/blocks.go index 1ec60d246d..ff4fc009dc 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/beacon/blocks.go +++ b/beacon-chain/rpc/prysm/v1alpha1/beacon/blocks.go @@ -139,6 +139,12 @@ func convertToBlockContainer(blk interfaces.SignedBeaconBlock, root [32]byte, is return nil, err } ctr.Block = ðpb.BeaconBlockContainer_BellatrixBlock{BellatrixBlock: rBlk} + case version.BellatrixBlind: + rBlk, err := blk.PbBlindedBellatrixBlock() + if err != nil { + return nil, err + } + ctr.Block = ðpb.BeaconBlockContainer_BlindedBellatrixBlock{BlindedBellatrixBlock: rBlk} default: return nil, errors.Errorf("block type is not recognized: %d", blk.Version()) } diff --git a/beacon-chain/rpc/prysm/v1alpha1/beacon/blocks_test.go b/beacon-chain/rpc/prysm/v1alpha1/beacon/blocks_test.go index ef9743442c..8e3ac92b80 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/beacon/blocks_test.go +++ b/beacon-chain/rpc/prysm/v1alpha1/beacon/blocks_test.go @@ -14,6 +14,7 @@ import ( dbTest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" v1 "github.com/prysmaticlabs/prysm/beacon-chain/state/v1" "github.com/prysmaticlabs/prysm/cmd" + "github.com/prysmaticlabs/prysm/config/features" fieldparams "github.com/prysmaticlabs/prysm/config/fieldparams" "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/consensus-types/interfaces" @@ -871,13 +872,21 @@ func TestServer_ListBeaconBlocks_Genesis(t *testing.T) { blk.Block.ParentRoot = parentRoot[:] wrapped, err := wrapper.WrappedSignedBeaconBlock(blk) assert.NoError(t, err) + blinded, err := wrapped.ToBlinded() + assert.NoError(t, err) + blindedProto, err := blinded.PbBlindedBellatrixBlock() + assert.NoError(t, err) blkContainer := ðpb.BeaconBlockContainer{ - Block: ðpb.BeaconBlockContainer_BellatrixBlock{BellatrixBlock: blk}} + Block: ðpb.BeaconBlockContainer_BlindedBellatrixBlock{BlindedBellatrixBlock: blindedProto}} runListBlocksGenesis(t, wrapped, blkContainer) }) } func runListBlocksGenesis(t *testing.T, blk interfaces.SignedBeaconBlock, blkContainer *ethpb.BeaconBlockContainer) { + resetFn := features.InitWithReset(&features.Flags{ + EnableOnlyBlindedBeaconBlocks: true, + }) + defer resetFn() db := dbTest.SetupDB(t) ctx := context.Background() diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index ded86b7963..526eab3764 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -71,49 +71,50 @@ type Service struct { // Config options for the beacon node RPC server. type Config struct { - Host string - Port string - CertFlag string - KeyFlag string - BeaconMonitoringHost string - BeaconMonitoringPort int - BeaconDB db.HeadAccessDatabase - ChainInfoFetcher blockchain.ChainInfoFetcher - HeadUpdater blockchain.HeadUpdater - HeadFetcher blockchain.HeadFetcher - CanonicalFetcher blockchain.CanonicalFetcher - ForkFetcher blockchain.ForkFetcher - FinalizationFetcher blockchain.FinalizationFetcher - AttestationReceiver blockchain.AttestationReceiver - BlockReceiver blockchain.BlockReceiver - POWChainService powchain.Chain - ChainStartFetcher powchain.ChainStartFetcher - POWChainInfoFetcher powchain.ChainInfoFetcher - GenesisTimeFetcher blockchain.TimeFetcher - GenesisFetcher blockchain.GenesisFetcher - EnableDebugRPCEndpoints bool - MockEth1Votes bool - AttestationsPool attestations.Pool - ExitPool voluntaryexits.PoolManager - SlashingsPool slashings.PoolManager - SlashingChecker slasherservice.SlashingChecker - SyncCommitteeObjectPool synccommittee.Pool - SyncService chainSync.Checker - Broadcaster p2p.Broadcaster - PeersFetcher p2p.PeersProvider - PeerManager p2p.PeerManager - MetadataProvider p2p.MetadataProvider - DepositFetcher depositcache.DepositFetcher - PendingDepositFetcher depositcache.PendingDepositsFetcher - StateNotifier statefeed.Notifier - BlockNotifier blockfeed.Notifier - OperationNotifier opfeed.Notifier - StateGen *stategen.State - MaxMsgSize int - ExecutionEngineCaller powchain.EngineCaller - ProposerIdsCache *cache.ProposerPayloadIDsCache - OptimisticModeFetcher blockchain.OptimisticModeFetcher - BlockBuilder builder.BlockBuilder + ExecutionPayloadReconstructor powchain.ExecutionPayloadReconstructor + Host string + Port string + CertFlag string + KeyFlag string + BeaconMonitoringHost string + BeaconMonitoringPort int + BeaconDB db.HeadAccessDatabase + ChainInfoFetcher blockchain.ChainInfoFetcher + HeadUpdater blockchain.HeadUpdater + HeadFetcher blockchain.HeadFetcher + CanonicalFetcher blockchain.CanonicalFetcher + ForkFetcher blockchain.ForkFetcher + FinalizationFetcher blockchain.FinalizationFetcher + AttestationReceiver blockchain.AttestationReceiver + BlockReceiver blockchain.BlockReceiver + POWChainService powchain.Chain + ChainStartFetcher powchain.ChainStartFetcher + POWChainInfoFetcher powchain.ChainInfoFetcher + GenesisTimeFetcher blockchain.TimeFetcher + GenesisFetcher blockchain.GenesisFetcher + EnableDebugRPCEndpoints bool + MockEth1Votes bool + AttestationsPool attestations.Pool + ExitPool voluntaryexits.PoolManager + SlashingsPool slashings.PoolManager + SlashingChecker slasherservice.SlashingChecker + SyncCommitteeObjectPool synccommittee.Pool + SyncService chainSync.Checker + Broadcaster p2p.Broadcaster + PeersFetcher p2p.PeersProvider + PeerManager p2p.PeerManager + MetadataProvider p2p.MetadataProvider + DepositFetcher depositcache.DepositFetcher + PendingDepositFetcher depositcache.PendingDepositsFetcher + StateNotifier statefeed.Notifier + BlockNotifier blockfeed.Notifier + OperationNotifier opfeed.Notifier + StateGen *stategen.State + MaxMsgSize int + ExecutionEngineCaller powchain.EngineCaller + ProposerIdsCache *cache.ProposerPayloadIDsCache + OptimisticModeFetcher blockchain.OptimisticModeFetcher + BlockBuilder builder.BlockBuilder } // NewService instantiates a new RPC service instance that will @@ -309,11 +310,12 @@ func (s *Service) Start() { StateGenService: s.cfg.StateGen, ReplayerBuilder: ch, }, - OptimisticModeFetcher: s.cfg.OptimisticModeFetcher, - HeadFetcher: s.cfg.HeadFetcher, - VoluntaryExitsPool: s.cfg.ExitPool, - V1Alpha1ValidatorServer: validatorServer, - SyncChecker: s.cfg.SyncService, + OptimisticModeFetcher: s.cfg.OptimisticModeFetcher, + HeadFetcher: s.cfg.HeadFetcher, + VoluntaryExitsPool: s.cfg.ExitPool, + V1Alpha1ValidatorServer: validatorServer, + SyncChecker: s.cfg.SyncService, + ExecutionPayloadReconstructor: s.cfg.ExecutionPayloadReconstructor, } ethpbv1alpha1.RegisterNodeServer(s.grpcServer, nodeServer) ethpbservice.RegisterBeaconNodeServer(s.grpcServer, nodeServerV1) diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index f8098c7601..fb5b62ca9a 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -76,6 +76,7 @@ go_library( "//beacon-chain/p2p/encoder:go_default_library", "//beacon-chain/p2p/peers:go_default_library", "//beacon-chain/p2p/types:go_default_library", + "//beacon-chain/powchain:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/state/stategen:go_default_library", "//cache/lru:go_default_library", @@ -185,12 +186,14 @@ go_test( "//beacon-chain/p2p/testing:go_default_library", "//beacon-chain/p2p/types:go_default_library", "//beacon-chain/powchain:go_default_library", + "//beacon-chain/powchain/testing:go_default_library", "//beacon-chain/state:go_default_library", "//beacon-chain/state/stategen:go_default_library", "//beacon-chain/state/v1:go_default_library", "//beacon-chain/sync/initial-sync/testing:go_default_library", "//cache/lru:go_default_library", "//cmd/beacon-chain/flags:go_default_library", + "//config/features:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/interfaces:go_default_library", @@ -201,6 +204,7 @@ go_test( "//encoding/bytesutil:go_default_library", "//encoding/ssz/equality:go_default_library", "//network/forks:go_default_library", + "//proto/engine/v1:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1/attestation:go_default_library", "//proto/prysm/v1alpha1/metadata:go_default_library", @@ -210,6 +214,8 @@ go_test( "//time:go_default_library", "//time/slots:go_default_library", "@com_github_d4l3k_messagediff//:go_default_library", + "@com_github_ethereum_go_ethereum//common:go_default_library", + "@com_github_ethereum_go_ethereum//core/types:go_default_library", "@com_github_ethereum_go_ethereum//p2p/enr:go_default_library", "@com_github_golang_snappy//:go_default_library", "@com_github_kevinms_leakybucket_go//:go_default_library", diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index 70642625a8..16094e5181 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -75,7 +75,13 @@ var ( Buckets: []float64{10, 50, 100, 200, 400, 800, 1600, 3200}, }, ) - + rpcBlocksByRangeResponseLatency = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "rpc_blocks_by_range_response_latency_milliseconds", + Help: "Captures total time to respond to rpc blocks by range requests in a milliseconds distribution", + Buckets: []float64{250, 500, 1000, 1500, 2000, 3000, 4000, 10000}, + }, + ) arrivalBlockPropagationHistogram = promauto.NewHistogram( prometheus.HistogramOpts{ Name: "block_arrival_latency_milliseconds", diff --git a/beacon-chain/sync/options.go b/beacon-chain/sync/options.go index b05a905e71..850e09e199 100644 --- a/beacon-chain/sync/options.go +++ b/beacon-chain/sync/options.go @@ -11,6 +11,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/operations/synccommittee" "github.com/prysmaticlabs/prysm/beacon-chain/operations/voluntaryexits" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/beacon-chain/powchain" "github.com/prysmaticlabs/prysm/beacon-chain/state/stategen" ) @@ -120,3 +121,10 @@ func WithSlasherBlockHeadersFeed(slasherBlockHeadersFeed *event.Feed) Option { return nil } } + +func WithExecutionPayloadReconstructor(r powchain.ExecutionPayloadReconstructor) Option { + return func(s *Service) error { + s.cfg.executionPayloadReconstructor = r + return nil + } +} diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index d41d976d9c..99eab20aa2 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -12,6 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/consensus-types/interfaces" types "github.com/prysmaticlabs/prysm/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/consensus-types/wrapper" "github.com/prysmaticlabs/prysm/encoding/bytesutil" "github.com/prysmaticlabs/prysm/monitoring/tracing" pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" @@ -161,18 +162,29 @@ func (s *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlo tracing.AnnotateError(span, err) return err } + start := time.Now() for _, b := range blks { - if b == nil || b.IsNil() || b.Block().IsNil() { + if err := wrapper.BeaconBlockIsNil(b); err != nil { continue } - if chunkErr := s.chunkBlockWriter(stream, b); chunkErr != nil { - log.WithError(chunkErr).Debug("Could not send a chunked response") + blockToWrite := b + if blockToWrite.Block().IsBlinded() { + fullBlock, err := s.cfg.executionPayloadReconstructor.ReconstructFullBellatrixBlock(ctx, blockToWrite) + if err != nil { + log.WithError(err).Error("Could not get reconstruct full bellatrix block from blinded body") + s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) + return err + } + blockToWrite = fullBlock + } + if chunkErr := s.chunkBlockWriter(stream, blockToWrite); chunkErr != nil { + log.WithError(chunkErr).Error("Could not send a chunked response") s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) tracing.AnnotateError(span, chunkErr) return chunkErr } - } + rpcBlocksByRangeResponseLatency.Observe(float64(time.Since(start).Milliseconds())) // Return error in the event we have an invalid parent. return err } diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go index d041eea37f..53b80e60c9 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go @@ -3,10 +3,13 @@ package sync import ( "context" "io" + "math/big" "sync" "testing" "time" + "github.com/ethereum/go-ethereum/common" + gethTypes "github.com/ethereum/go-ethereum/core/types" "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/protocol" @@ -17,13 +20,16 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder" p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types" + mockPOW "github.com/prysmaticlabs/prysm/beacon-chain/powchain/testing" "github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/config/features" fieldparams "github.com/prysmaticlabs/prysm/config/fieldparams" "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/consensus-types/interfaces" types "github.com/prysmaticlabs/prysm/consensus-types/primitives" "github.com/prysmaticlabs/prysm/consensus-types/wrapper" "github.com/prysmaticlabs/prysm/encoding/bytesutil" + enginev1 "github.com/prysmaticlabs/prysm/proto/engine/v1" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/testing/assert" "github.com/prysmaticlabs/prysm/testing/require" @@ -157,6 +163,124 @@ func TestRPCBeaconBlocksByRange_ReturnCorrectNumberBack(t *testing.T) { } } +func TestRPCBeaconBlocksByRange_ReconstructsPayloads(t *testing.T) { + features.Init(&features.Flags{EnableOnlyBlindedBeaconBlocks: true}) + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") + d := db.SetupDB(t) + + req := ðpb.BeaconBlocksByRangeRequest{ + StartSlot: 0, + Step: 1, + Count: 200, + } + + parent := bytesutil.PadTo([]byte("parentHash"), fieldparams.RootLength) + stateRoot := bytesutil.PadTo([]byte("stateRoot"), fieldparams.RootLength) + receiptsRoot := bytesutil.PadTo([]byte("receiptsRoot"), fieldparams.RootLength) + logsBloom := bytesutil.PadTo([]byte("logs"), fieldparams.LogsBloomLength) + tx := gethTypes.NewTransaction( + 0, + common.HexToAddress("095e7baea6a6c7c4c2dfeb977efac326af552d87"), + big.NewInt(0), 0, big.NewInt(0), + nil, + ) + txs := []*gethTypes.Transaction{tx} + encodedBinaryTxs := make([][]byte, 1) + var err error + encodedBinaryTxs[0], err = txs[0].MarshalBinary() + require.NoError(t, err) + blockHash := bytesutil.ToBytes32([]byte("foo")) + payload := &enginev1.ExecutionPayload{ + ParentHash: parent, + FeeRecipient: make([]byte, fieldparams.FeeRecipientLength), + StateRoot: stateRoot, + ReceiptsRoot: receiptsRoot, + LogsBloom: logsBloom, + PrevRandao: blockHash[:], + BlockNumber: 0, + GasLimit: 0, + GasUsed: 0, + Timestamp: 0, + ExtraData: make([]byte, 0), + BlockHash: blockHash[:], + BaseFeePerGas: bytesutil.PadTo([]byte("baseFeePerGas"), fieldparams.RootLength), + Transactions: encodedBinaryTxs, + } + mockEngine := &mockPOW.EngineClient{ + ExecutionPayloadByBlockHash: map[[32]byte]*enginev1.ExecutionPayload{ + blockHash: payload, + }, + } + wrappedPayload, err := wrapper.WrappedExecutionPayload(payload) + require.NoError(t, err) + header, err := wrapper.PayloadToHeader(wrappedPayload) + require.NoError(t, err) + + genRoot := [32]byte{} + // Populate the database with blocks that would match the request. + for i := req.StartSlot; i < req.StartSlot.Add(req.Step*req.Count); i += types.Slot(req.Step) { + blk := util.NewBlindedBeaconBlockBellatrix() + blk.Block.Slot = i + blk.Block.Body.ExecutionPayloadHeader = header + if i == 0 { + rt, err := blk.Block.HashTreeRoot() + require.NoError(t, err) + genRoot = rt + } + util.SaveBlock(t, context.Background(), d, blk) + } + require.NoError(t, d.SaveGenesisBlockRoot(context.Background(), genRoot)) + + // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). + r := &Service{ + cfg: &config{ + p2p: p1, + beaconDB: d, + chain: &chainMock.ChainService{}, + executionPayloadReconstructor: mockEngine, + }, + rateLimiter: newRateLimiter(p1), + } + pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) + topic := string(pcl) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false) + var wg sync.WaitGroup + wg.Add(1) + + // Use a new request to test this out + newReq := ðpb.BeaconBlocksByRangeRequest{StartSlot: 0, Step: 1, Count: 1} + + p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { + defer wg.Done() + for i := newReq.StartSlot; i < newReq.StartSlot.Add(newReq.Count*newReq.Step); i += types.Slot(newReq.Step) { + expectSuccess(t, stream) + res := util.NewBeaconBlockBellatrix() + assert.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, res)) + if res.Block.Slot.SubSlot(newReq.StartSlot).Mod(newReq.Step) != 0 { + t.Errorf("Received unexpected block slot %d", res.Block.Slot) + } + // Expect EOF + b := make([]byte, 1) + _, err := stream.Read(b) + require.ErrorContains(t, io.EOF.Error(), err) + } + require.Equal(t, uint64(1), mockEngine.NumReconstructedPayloads) + }) + + stream1, err := p1.BHost.NewStream(context.Background(), p2.BHost.ID(), pcl) + require.NoError(t, err) + + err = r.beaconBlocksByRangeRPCHandler(context.Background(), newReq, stream1) + require.NoError(t, err) + + if util.WaitTimeout(&wg, 1*time.Second) { + t.Fatal("Did not receive stream within 1 sec") + } +} + func TestRPCBeaconBlocksByRange_RPCHandlerReturnsSortedBlocks(t *testing.T) { p1 := p2ptest.NewTestP2P(t) p2 := p2ptest.NewTestP2P(t) diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root.go b/beacon-chain/sync/rpc_beacon_blocks_by_root.go index 1cefcdabf0..5948ff780a 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root.go @@ -9,6 +9,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/config/params" "github.com/prysmaticlabs/prysm/consensus-types/interfaces" + "github.com/prysmaticlabs/prysm/consensus-types/wrapper" ) // sendRecentBeaconBlocksRequest sends a recent beacon blocks request to a peer to get @@ -68,13 +69,24 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{ s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) return err } - if blk == nil || blk.IsNil() { + if err := wrapper.BeaconBlockIsNil(blk); err != nil { continue } + + if blk.Block().IsBlinded() { + blk, err = s.cfg.executionPayloadReconstructor.ReconstructFullBellatrixBlock(ctx, blk) + if err != nil { + log.WithError(err).Error("Could not get reconstruct full bellatrix block from blinded body") + s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) + return err + } + } + if err := s.chunkBlockWriter(stream, blk); err != nil { return err } } + closeStream(stream, log) return nil } diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go index a92942742b..6a2994d057 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_root_test.go @@ -2,10 +2,13 @@ package sync import ( "context" + "math/big" "sync" "testing" "time" + "github.com/ethereum/go-ethereum/common" + gethTypes "github.com/ethereum/go-ethereum/core/types" "github.com/kevinms/leakybucket-go" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/protocol" @@ -16,8 +19,13 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/p2p" p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" p2pTypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types" + mockPOW "github.com/prysmaticlabs/prysm/beacon-chain/powchain/testing" + fieldparams "github.com/prysmaticlabs/prysm/config/fieldparams" "github.com/prysmaticlabs/prysm/config/params" types "github.com/prysmaticlabs/prysm/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/consensus-types/wrapper" + "github.com/prysmaticlabs/prysm/encoding/bytesutil" + enginev1 "github.com/prysmaticlabs/prysm/proto/engine/v1" ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/testing/assert" "github.com/prysmaticlabs/prysm/testing/require" @@ -72,6 +80,105 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) { } } +func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks_ReconstructsPayload(t *testing.T) { + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") + d := db.SetupDB(t) + + // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). + parent := bytesutil.PadTo([]byte("parentHash"), fieldparams.RootLength) + stateRoot := bytesutil.PadTo([]byte("stateRoot"), fieldparams.RootLength) + receiptsRoot := bytesutil.PadTo([]byte("receiptsRoot"), fieldparams.RootLength) + logsBloom := bytesutil.PadTo([]byte("logs"), fieldparams.LogsBloomLength) + tx := gethTypes.NewTransaction( + 0, + common.HexToAddress("095e7baea6a6c7c4c2dfeb977efac326af552d87"), + big.NewInt(0), 0, big.NewInt(0), + nil, + ) + txs := []*gethTypes.Transaction{tx} + encodedBinaryTxs := make([][]byte, 1) + var err error + encodedBinaryTxs[0], err = txs[0].MarshalBinary() + require.NoError(t, err) + blockHash := bytesutil.ToBytes32([]byte("foo")) + payload := &enginev1.ExecutionPayload{ + ParentHash: parent, + FeeRecipient: make([]byte, fieldparams.FeeRecipientLength), + StateRoot: stateRoot, + ReceiptsRoot: receiptsRoot, + LogsBloom: logsBloom, + PrevRandao: blockHash[:], + BlockNumber: 0, + GasLimit: 0, + GasUsed: 0, + Timestamp: 0, + ExtraData: make([]byte, 0), + BlockHash: blockHash[:], + BaseFeePerGas: bytesutil.PadTo([]byte("baseFeePerGas"), fieldparams.RootLength), + Transactions: encodedBinaryTxs, + } + wrappedPayload, err := wrapper.WrappedExecutionPayload(payload) + require.NoError(t, err) + header, err := wrapper.PayloadToHeader(wrappedPayload) + require.NoError(t, err) + + var blkRoots p2pTypes.BeaconBlockByRootsReq + // Populate the database with blocks that would match the request. + for i := types.Slot(1); i < 11; i++ { + blk := util.NewBlindedBeaconBlockBellatrix() + blk.Block.Body.ExecutionPayloadHeader = header + blk.Block.Slot = i + root, err := blk.Block.HashTreeRoot() + require.NoError(t, err) + wsb, err := wrapper.WrappedSignedBeaconBlock(blk) + require.NoError(t, err) + require.NoError(t, d.SaveBlock(context.Background(), wsb)) + blkRoots = append(blkRoots, root) + } + + mockEngine := &mockPOW.EngineClient{ + ExecutionPayloadByBlockHash: map[[32]byte]*enginev1.ExecutionPayload{ + blockHash: payload, + }, + } + r := &Service{cfg: &config{ + p2p: p1, + beaconDB: d, + executionPayloadReconstructor: mockEngine, + }, rateLimiter: newRateLimiter(p1)} + r.cfg.chain = &mock.ChainService{ValidatorsRoot: [32]byte{}} + pcl := protocol.ID(p2p.RPCBlocksByRootTopicV1) + topic := string(pcl) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false) + + var wg sync.WaitGroup + wg.Add(1) + p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { + defer wg.Done() + for i := range blkRoots { + expectSuccess(t, stream) + res := util.NewBeaconBlockBellatrix() + assert.NoError(t, r.cfg.p2p.Encoding().DecodeWithMaxLength(stream, res)) + if uint64(res.Block.Slot) != uint64(i+1) { + t.Errorf("Received unexpected block slot %d but wanted %d", res.Block.Slot, i+1) + } + } + require.Equal(t, uint64(10), mockEngine.NumReconstructedPayloads) + }) + + stream1, err := p1.BHost.NewStream(context.Background(), p2.BHost.ID(), pcl) + require.NoError(t, err) + err = r.beaconBlocksRootRPCHandler(context.Background(), &blkRoots, stream1) + assert.NoError(t, err) + + if util.WaitTimeout(&wg, 1*time.Second) { + t.Fatal("Did not receive stream within 1 sec") + } +} + func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) { p1 := p2ptest.NewTestP2P(t) p2 := p2ptest.NewTestP2P(t) diff --git a/beacon-chain/sync/rpc_chunked_response.go b/beacon-chain/sync/rpc_chunked_response.go index f5ad7b9448..ef93b44650 100644 --- a/beacon-chain/sync/rpc_chunked_response.go +++ b/beacon-chain/sync/rpc_chunked_response.go @@ -30,6 +30,7 @@ func WriteBlockChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher return err } var obtainedCtx []byte + switch blk.Version() { case version.Phase0: valRoot := chain.GenesisValidatorsRoot() @@ -45,7 +46,7 @@ func WriteBlockChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher return err } obtainedCtx = digest[:] - case version.Bellatrix: + case version.Bellatrix, version.BellatrixBlind: valRoot := chain.GenesisValidatorsRoot() digest, err := forks.ForkDigestFromEpoch(params.BeaconConfig().BellatrixForkEpoch, valRoot[:]) if err != nil { diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 4e04acbf50..9703ea5f8a 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -29,6 +29,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/operations/synccommittee" "github.com/prysmaticlabs/prysm/beacon-chain/operations/voluntaryexits" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/beacon-chain/powchain" "github.com/prysmaticlabs/prysm/beacon-chain/state/stategen" lruwrpr "github.com/prysmaticlabs/prysm/cache/lru" "github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags" @@ -68,21 +69,22 @@ type validationFn func(ctx context.Context) (pubsub.ValidationResult, error) // config to hold dependencies for the sync service. type config struct { - attestationNotifier operation.Notifier - p2p p2p.P2P - beaconDB db.NoHeadAccessDatabase - attPool attestations.Pool - exitPool voluntaryexits.PoolManager - slashingPool slashings.PoolManager - syncCommsPool synccommittee.Pool - chain blockchainService - initialSync Checker - stateNotifier statefeed.Notifier - blockNotifier blockfeed.Notifier - operationNotifier operation.Notifier - stateGen *stategen.State - slasherAttestationsFeed *event.Feed - slasherBlockHeadersFeed *event.Feed + attestationNotifier operation.Notifier + p2p p2p.P2P + beaconDB db.NoHeadAccessDatabase + attPool attestations.Pool + exitPool voluntaryexits.PoolManager + slashingPool slashings.PoolManager + syncCommsPool synccommittee.Pool + chain blockchainService + initialSync Checker + stateNotifier statefeed.Notifier + blockNotifier blockfeed.Notifier + operationNotifier operation.Notifier + executionPayloadReconstructor powchain.ExecutionPayloadReconstructor + stateGen *stategen.State + slasherAttestationsFeed *event.Feed + slasherBlockHeadersFeed *event.Feed } // This defines the interface for interacting with block chain service diff --git a/config/features/config.go b/config/features/config.go index 97ec8b3bff..5070e8bbda 100644 --- a/config/features/config.go +++ b/config/features/config.go @@ -65,6 +65,7 @@ type Flags struct { EnableVectorizedHTR bool // EnableVectorizedHTR specifies whether the beacon state will use the optimized sha256 routines. EnableForkChoiceDoublyLinkedTree bool // EnableForkChoiceDoublyLinkedTree specifies whether fork choice store will use a doubly linked tree. EnableBatchGossipAggregation bool // EnableBatchGossipAggregation specifies whether to further aggregate our gossip batches before verifying them. + EnableOnlyBlindedBeaconBlocks bool // EnableOnlyBlindedBeaconBlocks enables only storing blinded beacon blocks in the DB post-Bellatrix fork. // KeystoreImportDebounceInterval specifies the time duration the validator waits to reload new keys if they have // changed on disk. This feature is for advanced use cases only. @@ -228,6 +229,10 @@ func ConfigureBeaconChain(ctx *cli.Context) error { logEnabled(enableGossipBatchAggregation) cfg.EnableBatchGossipAggregation = true } + if ctx.Bool(EnableOnlyBlindedBeaconBlocks.Name) { + logEnabled(EnableOnlyBlindedBeaconBlocks) + cfg.EnableOnlyBlindedBeaconBlocks = true + } Init(cfg) return nil } diff --git a/config/features/flags.go b/config/features/flags.go index 29bbd0382e..adb6a8bf7c 100644 --- a/config/features/flags.go +++ b/config/features/flags.go @@ -105,12 +105,10 @@ var ( Name: "disable-native-state", Usage: "Disables representing the beacon state as a pure Go struct.", } - enablePullTips = &cli.BoolFlag{ Name: "experimental-disable-boundary-checks", Usage: "Experimental disable of boundary checks, useful for debugging, may cause bad votes.", } - enableVecHTR = &cli.BoolFlag{ Name: "enable-vectorized-htr", Usage: "Enables new go sha256 library which utilizes optimized routines for merkle trees", @@ -123,6 +121,10 @@ var ( Name: "enable-gossip-batch-aggregation", Usage: "Enables new methods to further aggregate our gossip batches before verifying them.", } + EnableOnlyBlindedBeaconBlocks = &cli.BoolFlag{ + Name: "enable-only-blinded-beacon-blocks", + Usage: "Enables storing only blinded beacon blocks in the database without full execution layer transactions", + } ) // devModeFlags holds list of flags that are set when development mode is on. @@ -173,6 +175,7 @@ var BeaconChainFlags = append(deprecatedFlags, []cli.Flag{ enableVecHTR, enableForkChoiceDoublyLinkedTree, enableGossipBatchAggregation, + EnableOnlyBlindedBeaconBlocks, }...) // E2EBeaconChainFlags contains a list of the beacon chain feature flags to be tested in E2E. diff --git a/consensus-types/interfaces/beacon_block.go b/consensus-types/interfaces/beacon_block.go index da66945b84..da34475385 100644 --- a/consensus-types/interfaces/beacon_block.go +++ b/consensus-types/interfaces/beacon_block.go @@ -19,6 +19,7 @@ type SignedBeaconBlock interface { PbGenericBlock() (*ethpb.GenericSignedBeaconBlock, error) PbPhase0Block() (*ethpb.SignedBeaconBlock, error) PbAltairBlock() (*ethpb.SignedBeaconBlockAltair, error) + ToBlinded() (SignedBeaconBlock, error) PbBellatrixBlock() (*ethpb.SignedBeaconBlockBellatrix, error) PbBlindedBellatrixBlock() (*ethpb.SignedBlindedBeaconBlockBellatrix, error) ssz.Marshaler diff --git a/consensus-types/mock/block.go b/consensus-types/mock/block.go index 7f70606828..ccd4ec6483 100644 --- a/consensus-types/mock/block.go +++ b/consensus-types/mock/block.go @@ -73,6 +73,10 @@ func (SignedBeaconBlock) Version() int { panic("implement me") } +func (SignedBeaconBlock) ToBlinded() (interfaces.SignedBeaconBlock, error) { + panic("implement me") +} + func (SignedBeaconBlock) Header() (*eth.SignedBeaconBlockHeader, error) { panic("implement me") } diff --git a/consensus-types/wrapper/beacon_block.go b/consensus-types/wrapper/beacon_block.go index dca8374e6d..9f07aecca9 100644 --- a/consensus-types/wrapper/beacon_block.go +++ b/consensus-types/wrapper/beacon_block.go @@ -14,6 +14,8 @@ var ( // This allows us to create a generic beacon block interface that is implemented by different // fork versions of beacon blocks. ErrUnsupportedField = errors.New("unsupported field for block type") + // ErrUnsupportedVersion for beacon block methods. + ErrUnsupportedVersion = errors.New("unsupported beacon block version") // ErrUnsupportedSignedBeaconBlock is returned when the struct type is not a supported signed // beacon block type. ErrUnsupportedSignedBeaconBlock = errors.New("unsupported signed beacon block") @@ -208,55 +210,6 @@ func BuildSignedBeaconBlockFromExecutionPayload( return wrappedBellatrixSignedBeaconBlock(bellatrixFullBlock) } -// WrapSignedBlindedBeaconBlock converts a signed beacon block into a blinded format. -func WrapSignedBlindedBeaconBlock(blk interfaces.SignedBeaconBlock) (interfaces.SignedBeaconBlock, error) { - if err := BeaconBlockIsNil(blk); err != nil { - return nil, err - } - if blk.Block().IsBlinded() { - return blk, nil - } - b := blk.Block() - payload, err := b.Body().Execution() - switch { - case errors.Is(err, ErrUnsupportedField): - return nil, ErrUnsupportedSignedBeaconBlock - case err != nil: - return nil, errors.Wrap(err, "could not get execution payload") - default: - } - syncAgg, err := b.Body().SyncAggregate() - if err != nil { - return nil, err - } - header, err := PayloadToHeader(payload) - if err != nil { - return nil, err - } - blindedBlock := ð.SignedBlindedBeaconBlockBellatrix{ - Block: ð.BlindedBeaconBlockBellatrix{ - Slot: b.Slot(), - ProposerIndex: b.ProposerIndex(), - ParentRoot: b.ParentRoot(), - StateRoot: b.StateRoot(), - Body: ð.BlindedBeaconBlockBodyBellatrix{ - RandaoReveal: b.Body().RandaoReveal(), - Eth1Data: b.Body().Eth1Data(), - Graffiti: b.Body().Graffiti(), - ProposerSlashings: b.Body().ProposerSlashings(), - AttesterSlashings: b.Body().AttesterSlashings(), - Attestations: b.Body().Attestations(), - Deposits: b.Body().Deposits(), - VoluntaryExits: b.Body().VoluntaryExits(), - SyncAggregate: syncAgg, - ExecutionPayloadHeader: header, - }, - }, - Signature: blk.Signature(), - } - return wrappedBellatrixSignedBlindedBeaconBlock(blindedBlock) -} - func UnwrapGenericSignedBeaconBlock(gb *eth.GenericSignedBeaconBlock) (interfaces.SignedBeaconBlock, error) { if gb == nil { return nil, ErrNilObjectWrapped diff --git a/consensus-types/wrapper/beacon_block_altair.go b/consensus-types/wrapper/beacon_block_altair.go index 4c1abe475c..62ebea1d79 100644 --- a/consensus-types/wrapper/beacon_block_altair.go +++ b/consensus-types/wrapper/beacon_block_altair.go @@ -112,6 +112,10 @@ func (altairSignedBeaconBlock) PbBlindedBellatrixBlock() (*eth.SignedBlindedBeac return nil, ErrUnsupportedBlindedBellatrixBlock } +func (altairSignedBeaconBlock) ToBlinded() (interfaces.SignedBeaconBlock, error) { + return nil, ErrUnsupportedVersion +} + // Version of the underlying protobuf object. func (altairSignedBeaconBlock) Version() int { return version.Altair diff --git a/consensus-types/wrapper/beacon_block_bellatrix.go b/consensus-types/wrapper/beacon_block_bellatrix.go index c3374d3312..df8daeaa49 100644 --- a/consensus-types/wrapper/beacon_block_bellatrix.go +++ b/consensus-types/wrapper/beacon_block_bellatrix.go @@ -107,6 +107,44 @@ func (bellatrixSignedBeaconBlock) PbAltairBlock() (*eth.SignedBeaconBlockAltair, return nil, ErrUnsupportedAltairBlock } +func (w bellatrixSignedBeaconBlock) ToBlinded() (interfaces.SignedBeaconBlock, error) { + if w.Block().IsNil() { + return nil, errors.New("cannot convert nil block to blinded format") + } + payload := w.b.Block.Body.ExecutionPayload + wrappedPayload, err := WrappedExecutionPayload(payload) + if err != nil { + return nil, err + } + header, err := PayloadToHeader(wrappedPayload) + if err != nil { + return nil, err + } + return signedBlindedBeaconBlockBellatrix{ + b: ð.SignedBlindedBeaconBlockBellatrix{ + Block: ð.BlindedBeaconBlockBellatrix{ + Slot: w.b.Block.Slot, + ProposerIndex: w.b.Block.ProposerIndex, + ParentRoot: w.b.Block.ParentRoot, + StateRoot: w.b.Block.StateRoot, + Body: ð.BlindedBeaconBlockBodyBellatrix{ + RandaoReveal: w.b.Block.Body.RandaoReveal, + Eth1Data: w.b.Block.Body.Eth1Data, + Graffiti: w.b.Block.Body.Graffiti, + ProposerSlashings: w.b.Block.Body.ProposerSlashings, + AttesterSlashings: w.b.Block.Body.AttesterSlashings, + Attestations: w.b.Block.Body.Attestations, + Deposits: w.b.Block.Body.Deposits, + VoluntaryExits: w.b.Block.Body.VoluntaryExits, + SyncAggregate: w.b.Block.Body.SyncAggregate, + ExecutionPayloadHeader: header, + }, + }, + Signature: w.b.Signature, + }, + }, nil +} + // Version of the underlying protobuf object. func (bellatrixSignedBeaconBlock) Version() int { return version.Bellatrix diff --git a/consensus-types/wrapper/beacon_block_phase0.go b/consensus-types/wrapper/beacon_block_phase0.go index 6b225d8721..bce71c2b51 100644 --- a/consensus-types/wrapper/beacon_block_phase0.go +++ b/consensus-types/wrapper/beacon_block_phase0.go @@ -108,6 +108,10 @@ func (Phase0SignedBeaconBlock) PbBlindedBellatrixBlock() (*eth.SignedBlindedBeac return nil, ErrUnsupportedBlindedBellatrixBlock } +func (Phase0SignedBeaconBlock) ToBlinded() (interfaces.SignedBeaconBlock, error) { + return nil, ErrUnsupportedVersion +} + // Version of the underlying protobuf object. func (Phase0SignedBeaconBlock) Version() int { return version.Phase0 diff --git a/consensus-types/wrapper/beacon_block_test.go b/consensus-types/wrapper/beacon_block_test.go index 225277fbc1..60e634c0bc 100644 --- a/consensus-types/wrapper/beacon_block_test.go +++ b/consensus-types/wrapper/beacon_block_test.go @@ -114,7 +114,7 @@ func TestWrapSignedBlindedBeaconBlock(t *testing.T) { blk, err := wrapper.WrappedSignedBeaconBlock(bellatrixBlk) require.NoError(t, err) - builtBlock, err := wrapper.WrapSignedBlindedBeaconBlock(blk) + builtBlock, err := blk.ToBlinded() require.NoError(t, err) got, err := builtBlock.Block().Body().Execution() diff --git a/consensus-types/wrapper/blinded_beacon_block_bellatrix.go b/consensus-types/wrapper/blinded_beacon_block_bellatrix.go index c770d4e14b..9ba54b641f 100644 --- a/consensus-types/wrapper/blinded_beacon_block_bellatrix.go +++ b/consensus-types/wrapper/blinded_beacon_block_bellatrix.go @@ -107,6 +107,10 @@ func (signedBlindedBeaconBlockBellatrix) PbAltairBlock() (*eth.SignedBeaconBlock return nil, ErrUnsupportedAltairBlock } +func (signedBlindedBeaconBlockBellatrix) ToBlinded() (interfaces.SignedBeaconBlock, error) { + return nil, ErrUnsupportedVersion +} + // Version of the underlying protobuf object. func (signedBlindedBeaconBlockBellatrix) Version() int { return version.BellatrixBlind diff --git a/testing/spectest/shared/common/forkchoice/service.go b/testing/spectest/shared/common/forkchoice/service.go index b7a637e16b..5bc5ce0878 100644 --- a/testing/spectest/shared/common/forkchoice/service.go +++ b/testing/spectest/shared/common/forkchoice/service.go @@ -80,7 +80,7 @@ func (m *engineMock) NewPayload(context.Context, interfaces.ExecutionData) ([]by return nil, nil } -func (m *engineMock) LatestExecutionBlock() (*pb.ExecutionBlock, error) { +func (m *engineMock) LatestExecutionBlock(context.Context) (*pb.ExecutionBlock, error) { return nil, nil } @@ -88,7 +88,7 @@ func (m *engineMock) ExchangeTransitionConfiguration(context.Context, *pb.Transi return nil } -func (m *engineMock) ExecutionBlockByHash(_ context.Context, hash common.Hash) (*pb.ExecutionBlock, error) { +func (m *engineMock) ExecutionBlockByHash(_ context.Context, hash common.Hash, _ bool) (*pb.ExecutionBlock, error) { b, ok := m.powBlocks[bytesutil.ToBytes32(hash.Bytes())] if !ok { return nil, nil