diff --git a/beacon-chain/blockchain/error.go b/beacon-chain/blockchain/error.go index ae689fb8bb..cb6e20c4c9 100644 --- a/beacon-chain/blockchain/error.go +++ b/beacon-chain/blockchain/error.go @@ -17,8 +17,6 @@ var ( errNilJustifiedCheckpoint = errors.New("nil justified checkpoint returned from state") // errBlockDoesNotExist is returned when a block does not exist for a particular state summary. errBlockDoesNotExist = errors.New("could not find block in DB") - // errWrongBlockCount is returned when the wrong number of blocks or block roots is used - errWrongBlockCount = errors.New("wrong number of blocks or block roots") // errBlockNotFoundInCacheOrDB is returned when a block is not found in the cache or DB. errBlockNotFoundInCacheOrDB = errors.New("block not found in cache or db") // errWSBlockNotFound is returned when a block is not found in the WS cache or DB. diff --git a/beacon-chain/blockchain/log.go b/beacon-chain/blockchain/log.go index f74019fd33..2c9b60e3e7 100644 --- a/beacon-chain/blockchain/log.go +++ b/beacon-chain/blockchain/log.go @@ -73,7 +73,7 @@ func logBlockSyncStatus(block interfaces.ReadOnlyBeaconBlock, blockRoot [32]byte level := log.Logger.GetLevel() if level >= logrus.DebugLevel { parentRoot := block.ParentRoot() - log.WithFields(logrus.Fields{ + lf := logrus.Fields{ "slot": block.Slot(), "slotInEpoch": block.Slot() % params.BeaconConfig().SlotsPerEpoch, "block": fmt.Sprintf("0x%s...", hex.EncodeToString(blockRoot[:])[:8]), @@ -87,7 +87,12 @@ func logBlockSyncStatus(block interfaces.ReadOnlyBeaconBlock, blockRoot [32]byte "sinceSlotStartTime": prysmTime.Now().Sub(startTime), "chainServiceProcessedTime": prysmTime.Now().Sub(receivedTime), "deposits": len(block.Body().Deposits()), - }).Debug("Synced new block") + } + commits, err := block.Body().BlobKzgCommitments() + if err == nil { + lf["commitments"] = len(commits) + } + log.WithFields(lf).Debug("Synced new block") } else { log.WithFields(logrus.Fields{ "slot": block.Slot(), diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index b5da2308d8..1290e73905 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -160,19 +160,14 @@ func getStateVersionAndPayload(st state.BeaconState) (int, interfaces.ExecutionD return preStateVersion, preStateHeader, nil } -func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock, - blockRoots [][32]byte) error { +func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlock) error { ctx, span := trace.StartSpan(ctx, "blockChain.onBlockBatch") defer span.End() - if len(blks) == 0 || len(blockRoots) == 0 { + if len(blks) == 0 { return errors.New("no blocks provided") } - if len(blks) != len(blockRoots) { - return errWrongBlockCount - } - if err := consensusblocks.BeaconBlockIsNil(blks[0]); err != nil { return invalidBlock{error: err} } @@ -222,7 +217,7 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySi } // Save potential boundary states. if slots.IsEpochStart(preState.Slot()) { - boundaries[blockRoots[i]] = preState.Copy() + boundaries[b.Root()] = preState.Copy() } jCheckpoints[i] = preState.CurrentJustifiedCheckpoint() fCheckpoints[i] = preState.FinalizedCheckpoint() @@ -255,11 +250,12 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySi pendingNodes := make([]*forkchoicetypes.BlockAndCheckpoints, len(blks)) var isValidPayload bool for i, b := range blks { + root := b.Root() isValidPayload, err = s.notifyNewPayload(ctx, postVersionAndHeaders[i].version, postVersionAndHeaders[i].header, b) if err != nil { - return s.handleInvalidExecutionError(ctx, err, blockRoots[i], b.Block().ParentRoot()) + return s.handleInvalidExecutionError(ctx, err, root, b.Block().ParentRoot()) } if isValidPayload { if err := s.validateMergeTransitionBlock(ctx, preVersionAndHeaders[i].version, @@ -271,13 +267,13 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySi JustifiedCheckpoint: jCheckpoints[i], FinalizedCheckpoint: fCheckpoints[i]} pendingNodes[len(blks)-i-1] = args - if err := s.saveInitSyncBlock(ctx, blockRoots[i], b); err != nil { + if err := s.saveInitSyncBlock(ctx, root, b); err != nil { tracing.AnnotateError(span, err) return err } if err := s.cfg.BeaconDB.SaveStateSummary(ctx, ðpb.StateSummary{ Slot: b.Block().Slot(), - Root: blockRoots[i][:], + Root: root[:], }); err != nil { tracing.AnnotateError(span, err) return err @@ -301,8 +297,9 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySi return err } } + lastB := blks[len(blks)-1] + lastBR := lastB.Root() // Also saves the last post state which to be used as pre state for the next batch. - lastBR := blockRoots[len(blks)-1] if err := s.cfg.StateGen.SaveState(ctx, lastBR, preState); err != nil { return err } @@ -320,7 +317,6 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySi return errors.Wrap(err, "could not set optimistic block to valid") } } - lastB := blks[len(blks)-1] arg := ¬ifyForkchoiceUpdateArg{ headState: preState, headRoot: lastBR, diff --git a/beacon-chain/blockchain/process_block_test.go b/beacon-chain/blockchain/process_block_test.go index ac83ecb1cc..824d965306 100644 --- a/beacon-chain/blockchain/process_block_test.go +++ b/beacon-chain/blockchain/process_block_test.go @@ -49,8 +49,7 @@ func TestStore_OnBlockBatch(t *testing.T) { require.NoError(t, service.saveGenesisData(ctx, st)) bState := st.Copy() - var blks []interfaces.ReadOnlySignedBeaconBlock - var blkRoots [][32]byte + var blks []consensusblocks.ROBlock for i := 0; i < 97; i++ { b, err := util.GenerateFullBlock(bState, keys, util.DefaultBlockGenConfig(), primitives.Slot(i)) require.NoError(t, err) @@ -63,16 +62,15 @@ func TestStore_OnBlockBatch(t *testing.T) { require.NoError(t, service.saveInitSyncBlock(ctx, root, wsb)) wsb, err = consensusblocks.NewSignedBeaconBlock(b) require.NoError(t, err) - blks = append(blks, wsb) - blkRoots = append(blkRoots, root) + rwsb, err := consensusblocks.NewROBlock(wsb) + require.NoError(t, err) + blks = append(blks, rwsb) } - err := service.onBlockBatch(ctx, blks, blkRoots[1:]) - require.ErrorIs(t, errWrongBlockCount, err) - err = service.onBlockBatch(ctx, blks, blkRoots) + err := service.onBlockBatch(ctx, blks) require.NoError(t, err) jcp := service.CurrentJustifiedCheckpt() jroot := bytesutil.ToBytes32(jcp.Root) - require.Equal(t, blkRoots[63], jroot) + require.Equal(t, blks[63].Root(), jroot) require.Equal(t, primitives.Epoch(2), service.cfg.ForkChoiceStore.JustifiedCheckpoint().Epoch) } @@ -84,8 +82,7 @@ func TestStore_OnBlockBatch_NotifyNewPayload(t *testing.T) { require.NoError(t, service.saveGenesisData(ctx, st)) bState := st.Copy() - var blks []interfaces.ReadOnlySignedBeaconBlock - var blkRoots [][32]byte + var blks []consensusblocks.ROBlock blkCount := 4 for i := 0; i <= blkCount; i++ { b, err := util.GenerateFullBlock(bState, keys, util.DefaultBlockGenConfig(), primitives.Slot(i)) @@ -94,13 +91,12 @@ func TestStore_OnBlockBatch_NotifyNewPayload(t *testing.T) { require.NoError(t, err) bState, err = transition.ExecuteStateTransition(ctx, bState, wsb) require.NoError(t, err) - root, err := b.Block.HashTreeRoot() + rwsb, err := consensusblocks.NewROBlock(wsb) require.NoError(t, err) - require.NoError(t, service.saveInitSyncBlock(ctx, root, wsb)) - blks = append(blks, wsb) - blkRoots = append(blkRoots, root) + require.NoError(t, service.saveInitSyncBlock(ctx, rwsb.Root(), wsb)) + blks = append(blks, rwsb) } - require.NoError(t, service.onBlockBatch(ctx, blks, blkRoots)) + require.NoError(t, service.onBlockBatch(ctx, blks)) } func TestCachedPreState_CanGetFromStateSummary(t *testing.T) { @@ -1932,8 +1928,10 @@ func TestNoViableHead_Reboot(t *testing.T) { require.NoError(t, err) root, err = b.Block.HashTreeRoot() require.NoError(t, err) + rwsb, err := consensusblocks.NewROBlock(wsb) + require.NoError(t, err) // We use onBlockBatch here because the valid chain is missing in forkchoice - require.NoError(t, service.onBlockBatch(ctx, []interfaces.ReadOnlySignedBeaconBlock{wsb}, [][32]byte{root})) + require.NoError(t, service.onBlockBatch(ctx, []consensusblocks.ROBlock{rwsb})) // Check that the head is now VALID and the node is not optimistic require.Equal(t, genesisRoot, service.ensureRootNotZeros(service.cfg.ForkChoiceStore.CachedHeadRoot())) headRoot, err = service.HeadRoot(ctx) diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index 40fbfd26e1..9265a121e0 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -12,6 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" "github.com/prysmaticlabs/prysm/v4/config/features" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" @@ -32,7 +33,7 @@ var epochsSinceFinalitySaveHotStateDB = primitives.Epoch(100) // BlockReceiver interface defines the methods of chain service for receiving and processing new blocks. type BlockReceiver interface { ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error - ReceiveBlockBatch(ctx context.Context, blocks []interfaces.ReadOnlySignedBeaconBlock, blkRoots [][32]byte) error + ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock) error HasBlock(ctx context.Context, root [32]byte) bool RecentBlockSlot(root [32]byte) (primitives.Slot, error) } @@ -172,7 +173,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig // ReceiveBlockBatch processes the whole block batch at once, assuming the block batch is linear ,transitioning // the state, performing batch verification of all collected signatures and then performing the appropriate // actions for a block post-transition. -func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []interfaces.ReadOnlySignedBeaconBlock, blkRoots [][32]byte) error { +func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock) error { ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlockBatch") defer span.End() @@ -180,20 +181,21 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []interfaces.Rea defer s.cfg.ForkChoiceStore.Unlock() // Apply state transition on the incoming newly received block batches, one by one. - if err := s.onBlockBatch(ctx, blocks, blkRoots); err != nil { + if err := s.onBlockBatch(ctx, blocks); err != nil { err := errors.Wrap(err, "could not process block in batch") tracing.AnnotateError(span, err) return err } - lastBR := blkRoots[len(blkRoots)-1] + lastBR := blocks[len(blocks)-1].Root() optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(lastBR) if err != nil { lastSlot := blocks[len(blocks)-1].Block().Slot() log.WithError(err).Errorf("Could not check if block is optimistic, Root: %#x, Slot: %d", lastBR, lastSlot) optimistic = true } - for i, b := range blocks { + + for _, b := range blocks { blockCopy, err := b.Copy() if err != nil { return err @@ -203,7 +205,7 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []interfaces.Rea Type: statefeed.BlockProcessed, Data: &statefeed.BlockProcessedData{ Slot: blockCopy.Block().Slot(), - BlockRoot: blkRoots[i], + BlockRoot: b.Root(), SignedBlock: blockCopy, Verified: true, Optimistic: optimistic, diff --git a/beacon-chain/blockchain/receive_block_test.go b/beacon-chain/blockchain/receive_block_test.go index 39f4e9301a..19bd3e78c0 100644 --- a/beacon-chain/blockchain/receive_block_test.go +++ b/beacon-chain/blockchain/receive_block_test.go @@ -10,7 +10,6 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" - "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" @@ -228,13 +227,11 @@ func TestService_ReceiveBlockBatch(t *testing.T) { s, _ := minimalTestService(t, WithStateNotifier(&blockchainTesting.MockStateNotifier{RecordEvents: true})) err := s.saveGenesisData(ctx, genesis) require.NoError(t, err) - root, err := tt.args.block.Block.HashTreeRoot() - require.NoError(t, err) wsb, err := blocks.NewSignedBeaconBlock(tt.args.block) require.NoError(t, err) - blks := []interfaces.ReadOnlySignedBeaconBlock{wsb} - roots := [][32]byte{root} - err = s.ReceiveBlockBatch(ctx, blks, roots) + rwsb, err := blocks.NewROBlock(wsb) + require.NoError(t, err) + err = s.ReceiveBlockBatch(ctx, []blocks.ROBlock{rwsb}) if tt.wantedErr != "" { assert.ErrorContains(t, tt.wantedErr, err) } else { diff --git a/beacon-chain/blockchain/testing/BUILD.bazel b/beacon-chain/blockchain/testing/BUILD.bazel index 7c76446f0c..4a80b90820 100644 --- a/beacon-chain/blockchain/testing/BUILD.bazel +++ b/beacon-chain/blockchain/testing/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//beacon-chain/state/state-native:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", + "//consensus-types/blocks:go_default_library", "//consensus-types/interfaces:go_default_library", "//consensus-types/primitives:go_default_library", "//encoding/bytesutil:go_default_library", diff --git a/beacon-chain/blockchain/testing/mock.go b/beacon-chain/blockchain/testing/mock.go index 8c6fbd6f67..a6d5f2abf1 100644 --- a/beacon-chain/blockchain/testing/mock.go +++ b/beacon-chain/blockchain/testing/mock.go @@ -22,6 +22,7 @@ import ( state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/config/params" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" @@ -204,7 +205,7 @@ func (s *ChainService) ReceiveBlockInitialSync(ctx context.Context, block interf } // ReceiveBlockBatch processes blocks in batches from initial-sync. -func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock, _ [][32]byte) error { +func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []blocks.ROBlock) error { if s.State == nil { return ErrNilState } diff --git a/beacon-chain/execution/BUILD.bazel b/beacon-chain/execution/BUILD.bazel index e91895898d..68f4d5d9a7 100644 --- a/beacon-chain/execution/BUILD.bazel +++ b/beacon-chain/execution/BUILD.bazel @@ -71,6 +71,7 @@ go_library( "@com_github_sirupsen_logrus//:go_default_library", "@io_k8s_client_go//tools/cache:go_default_library", "@io_opencensus_go//trace:go_default_library", + "@org_golang_google_protobuf//proto:go_default_library", ], ) diff --git a/beacon-chain/execution/engine_client.go b/beacon-chain/execution/engine_client.go index 3e9c607ed2..4e98c9da29 100644 --- a/beacon-chain/execution/engine_client.go +++ b/beacon-chain/execution/engine_client.go @@ -28,6 +28,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/time/slots" "github.com/sirupsen/logrus" "go.opencensus.io/trace" + "google.golang.org/protobuf/proto" ) var ( @@ -582,7 +583,10 @@ func (s *Service) ReconstructFullBlock( // If the payload header has a block hash of 0x0, it means we are pre-merge and should // simply return the block with an empty execution payload. if bytes.Equal(header.BlockHash(), params.BeaconConfig().ZeroHash[:]) { - payload := buildEmptyExecutionPayload() + payload, err := buildEmptyExecutionPayload(blindedBlock.Version()) + if err != nil { + return nil, err + } return blocks.BuildSignedBeaconBlockFromExecutionPayload(blindedBlock, payload) } @@ -641,7 +645,11 @@ func (s *Service) ReconstructFullBellatrixBlockBatch( // For blocks that are pre-merge we simply reconstruct them via an empty // execution payload. for _, realIdx := range zeroExecPayloads { - payload := buildEmptyExecutionPayload() + bblock := blindedBlocks[realIdx] + payload, err := buildEmptyExecutionPayload(bblock.Version()) + if err != nil { + return nil, err + } fullBlock, err := blocks.BuildSignedBeaconBlockFromExecutionPayload(blindedBlocks[realIdx], payload) if err != nil { return nil, err @@ -677,7 +685,7 @@ func (s *Service) retrievePayloadFromExecutionHash(ctx context.Context, executio } executionBlock.Version = version - return fullPayloadFromExecutionBlock(header, executionBlock) + return fullPayloadFromExecutionBlock(version, header, executionBlock) } func (s *Service) retrievePayloadsFromExecutionHashes( @@ -705,6 +713,7 @@ func (s *Service) retrievePayloadsFromExecutionHashes( // blinded block. for sliceIdx, realIdx := range validExecPayloads { var payload interfaces.ExecutionData + bblock := blindedBlocks[realIdx] if features.Get().EnableOptionalEngineMethods { b := payloadBodies[sliceIdx] if b == nil { @@ -727,7 +736,7 @@ func (s *Service) retrievePayloadsFromExecutionHashes( if err != nil { return nil, err } - payload, err = fullPayloadFromExecutionBlock(header, b) + payload, err = fullPayloadFromExecutionBlock(bblock.Version(), header, b) if err != nil { return nil, err } @@ -742,7 +751,7 @@ func (s *Service) retrievePayloadsFromExecutionHashes( } func fullPayloadFromExecutionBlock( - header interfaces.ExecutionData, block *pb.ExecutionBlock, + blockVersion int, header interfaces.ExecutionData, block *pb.ExecutionBlock, ) (interfaces.ExecutionData, error) { if header.IsNil() || block == nil { return nil, errors.New("execution block and header cannot be nil") @@ -765,7 +774,7 @@ func fullPayloadFromExecutionBlock( txs[i] = txBin } - switch block.Version { + switch blockVersion { case version.Bellatrix: return blocks.WrappedExecutionPayload(&pb.ExecutionPayload{ ParentHash: header.ParentHash(), @@ -997,18 +1006,51 @@ func tDStringToUint256(td string) (*uint256.Int, error) { return i, nil } -func buildEmptyExecutionPayload() *pb.ExecutionPayload { - return &pb.ExecutionPayload{ - ParentHash: make([]byte, fieldparams.RootLength), - FeeRecipient: make([]byte, fieldparams.FeeRecipientLength), - StateRoot: make([]byte, fieldparams.RootLength), - ReceiptsRoot: make([]byte, fieldparams.RootLength), - LogsBloom: make([]byte, fieldparams.LogsBloomLength), - PrevRandao: make([]byte, fieldparams.RootLength), - BaseFeePerGas: make([]byte, fieldparams.RootLength), - BlockHash: make([]byte, fieldparams.RootLength), - Transactions: make([][]byte, 0), - ExtraData: make([]byte, 0), +func buildEmptyExecutionPayload(v int) (proto.Message, error) { + switch v { + case version.Bellatrix: + return &pb.ExecutionPayload{ + ParentHash: make([]byte, fieldparams.RootLength), + FeeRecipient: make([]byte, fieldparams.FeeRecipientLength), + StateRoot: make([]byte, fieldparams.RootLength), + ReceiptsRoot: make([]byte, fieldparams.RootLength), + LogsBloom: make([]byte, fieldparams.LogsBloomLength), + PrevRandao: make([]byte, fieldparams.RootLength), + BaseFeePerGas: make([]byte, fieldparams.RootLength), + BlockHash: make([]byte, fieldparams.RootLength), + Transactions: make([][]byte, 0), + ExtraData: make([]byte, 0), + }, nil + case version.Capella: + return &pb.ExecutionPayloadCapella{ + ParentHash: make([]byte, fieldparams.RootLength), + FeeRecipient: make([]byte, fieldparams.FeeRecipientLength), + StateRoot: make([]byte, fieldparams.RootLength), + ReceiptsRoot: make([]byte, fieldparams.RootLength), + LogsBloom: make([]byte, fieldparams.LogsBloomLength), + PrevRandao: make([]byte, fieldparams.RootLength), + BaseFeePerGas: make([]byte, fieldparams.RootLength), + BlockHash: make([]byte, fieldparams.RootLength), + Transactions: make([][]byte, 0), + ExtraData: make([]byte, 0), + Withdrawals: make([]*pb.Withdrawal, 0), + }, nil + case version.Deneb: + return &pb.ExecutionPayloadDeneb{ + ParentHash: make([]byte, fieldparams.RootLength), + FeeRecipient: make([]byte, fieldparams.FeeRecipientLength), + StateRoot: make([]byte, fieldparams.RootLength), + ReceiptsRoot: make([]byte, fieldparams.RootLength), + LogsBloom: make([]byte, fieldparams.LogsBloomLength), + PrevRandao: make([]byte, fieldparams.RootLength), + BaseFeePerGas: make([]byte, fieldparams.RootLength), + BlockHash: make([]byte, fieldparams.RootLength), + Transactions: make([][]byte, 0), + ExtraData: make([]byte, 0), + Withdrawals: make([]*pb.Withdrawal, 0), + }, nil + default: + return nil, errors.Wrapf(ErrUnsupportedVersion, "version=%s", version.String(v)) } } diff --git a/beacon-chain/execution/engine_client_test.go b/beacon-chain/execution/engine_client_test.go index c971888004..06af6b3f35 100644 --- a/beacon-chain/execution/engine_client_test.go +++ b/beacon-chain/execution/engine_client_test.go @@ -1604,8 +1604,9 @@ func fixtures() map[string]interface{} { func Test_fullPayloadFromExecutionBlock(t *testing.T) { type args struct { - header *pb.ExecutionPayloadHeader - block *pb.ExecutionBlock + header *pb.ExecutionPayloadHeader + block *pb.ExecutionBlock + version int } wantedHash := common.BytesToHash([]byte("foo")) tests := []struct { @@ -1621,9 +1622,9 @@ func Test_fullPayloadFromExecutionBlock(t *testing.T) { BlockHash: []byte("foo"), }, block: &pb.ExecutionBlock{ - Version: version.Bellatrix, - Hash: common.BytesToHash([]byte("bar")), + Hash: common.BytesToHash([]byte("bar")), }, + version: version.Bellatrix, }, err: "does not match execution block hash", }, @@ -1634,9 +1635,9 @@ func Test_fullPayloadFromExecutionBlock(t *testing.T) { BlockHash: wantedHash[:], }, block: &pb.ExecutionBlock{ - Version: version.Bellatrix, - Hash: wantedHash, + Hash: wantedHash, }, + version: version.Bellatrix, }, want: func() interfaces.ExecutionData { p, err := blocks.WrappedExecutionPayload(&pb.ExecutionPayload{ @@ -1652,7 +1653,7 @@ func Test_fullPayloadFromExecutionBlock(t *testing.T) { t.Run(tt.name, func(t *testing.T) { wrapped, err := blocks.WrappedExecutionPayloadHeader(tt.args.header) require.NoError(t, err) - got, err := fullPayloadFromExecutionBlock(wrapped, tt.args.block) + got, err := fullPayloadFromExecutionBlock(tt.args.version, wrapped, tt.args.block) if err != nil { assert.ErrorContains(t, tt.err, err) } else { @@ -1664,8 +1665,9 @@ func Test_fullPayloadFromExecutionBlock(t *testing.T) { func Test_fullPayloadFromExecutionBlockCapella(t *testing.T) { type args struct { - header *pb.ExecutionPayloadHeaderCapella - block *pb.ExecutionBlock + header *pb.ExecutionPayloadHeaderCapella + block *pb.ExecutionBlock + version int } wantedHash := common.BytesToHash([]byte("foo")) tests := []struct { @@ -1681,9 +1683,9 @@ func Test_fullPayloadFromExecutionBlockCapella(t *testing.T) { BlockHash: []byte("foo"), }, block: &pb.ExecutionBlock{ - Version: version.Capella, - Hash: common.BytesToHash([]byte("bar")), + Hash: common.BytesToHash([]byte("bar")), }, + version: version.Capella, }, err: "does not match execution block hash", }, @@ -1694,9 +1696,9 @@ func Test_fullPayloadFromExecutionBlockCapella(t *testing.T) { BlockHash: wantedHash[:], }, block: &pb.ExecutionBlock{ - Version: version.Capella, - Hash: wantedHash, + Hash: wantedHash, }, + version: version.Capella, }, want: func() interfaces.ExecutionData { p, err := blocks.WrappedExecutionPayloadCapella(&pb.ExecutionPayloadCapella{ @@ -1712,7 +1714,7 @@ func Test_fullPayloadFromExecutionBlockCapella(t *testing.T) { t.Run(tt.name, func(t *testing.T) { wrapped, err := blocks.WrappedExecutionPayloadHeaderCapella(tt.args.header, 0) require.NoError(t, err) - got, err := fullPayloadFromExecutionBlock(wrapped, tt.args.block) + got, err := fullPayloadFromExecutionBlock(tt.args.version, wrapped, tt.args.block) if err != nil { assert.ErrorContains(t, tt.err, err) } else { @@ -1724,8 +1726,9 @@ func Test_fullPayloadFromExecutionBlockCapella(t *testing.T) { func Test_fullPayloadFromExecutionBlockDeneb(t *testing.T) { type args struct { - header *pb.ExecutionPayloadHeaderDeneb - block *pb.ExecutionBlock + header *pb.ExecutionPayloadHeaderDeneb + block *pb.ExecutionBlock + version int } wantedHash := common.BytesToHash([]byte("foo")) tests := []struct { @@ -1741,9 +1744,9 @@ func Test_fullPayloadFromExecutionBlockDeneb(t *testing.T) { BlockHash: []byte("foo"), }, block: &pb.ExecutionBlock{ - Version: version.Deneb, - Hash: common.BytesToHash([]byte("bar")), + Hash: common.BytesToHash([]byte("bar")), }, + version: version.Deneb, }, err: "does not match execution block hash", }, @@ -1754,9 +1757,9 @@ func Test_fullPayloadFromExecutionBlockDeneb(t *testing.T) { BlockHash: wantedHash[:], }, block: &pb.ExecutionBlock{ - Version: version.Deneb, - Hash: wantedHash, + Hash: wantedHash, }, + version: version.Deneb, }, want: func() interfaces.ExecutionData { p, err := blocks.WrappedExecutionPayloadDeneb(&pb.ExecutionPayloadDeneb{ @@ -1772,7 +1775,7 @@ func Test_fullPayloadFromExecutionBlockDeneb(t *testing.T) { t.Run(tt.name, func(t *testing.T) { wrapped, err := blocks.WrappedExecutionPayloadHeaderDeneb(tt.args.header, 0) require.NoError(t, err) - got, err := fullPayloadFromExecutionBlock(wrapped, tt.args.block) + got, err := fullPayloadFromExecutionBlock(tt.args.version, wrapped, tt.args.block) if err != nil { assert.ErrorContains(t, tt.err, err) } else { diff --git a/beacon-chain/execution/errors.go b/beacon-chain/execution/errors.go index c27329c101..d6197c986a 100644 --- a/beacon-chain/execution/errors.go +++ b/beacon-chain/execution/errors.go @@ -36,4 +36,6 @@ var ( ErrNilResponse = errors.New("nil response") // ErrRequestTooLarge when the request is too large ErrRequestTooLarge = errors.New("request too large") + // ErrUnsupportedVersion represents a case where a payload is requested for a block type that doesn't have a known mapping. + ErrUnsupportedVersion = errors.New("unknown ExecutionPayload schema for block version") ) diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index bbb7929e09..ba321bad4f 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -29,10 +29,13 @@ go_library( "//beacon-chain/sync:go_default_library", "//cmd/beacon-chain/flags:go_default_library", "//config/params:go_default_library", + "//consensus-types:go_default_library", + "//consensus-types/blocks:go_default_library", "//consensus-types/interfaces:go_default_library", "//consensus-types/primitives:go_default_library", "//container/leaky-bucket:go_default_library", "//crypto/rand:go_default_library", + "//encoding/bytesutil:go_default_library", "//math:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//runtime:go_default_library", @@ -50,6 +53,7 @@ go_test( name = "go_raceon_test", srcs = [ "blocks_fetcher_test.go", + "blocks_fetcher_utils_test.go", "blocks_queue_test.go", "fsm_test.go", "initial_sync_test.go", diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 6b1297c523..0a9bdb45c8 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -3,6 +3,8 @@ package initialsync import ( "context" "fmt" + "sort" + "strings" "sync" "time" @@ -11,15 +13,20 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/db" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" p2pTypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" prysmsync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v4/config/params" + consensus_types "github.com/prysmaticlabs/prysm/v4/consensus-types" + blocks2 "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" leakybucket "github.com/prysmaticlabs/prysm/v4/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v4/crypto/rand" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v4/math" p2ppb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/time/slots" "github.com/sirupsen/logrus" "go.opencensus.io/trace" ) @@ -60,6 +67,8 @@ var blockLimiterPeriod = 30 * time.Second // blocksFetcherConfig is a config to setup the block fetcher. type blocksFetcherConfig struct { + clock *startup.Clock + ctxMap prysmsync.ContextByteVersions chain blockchainService p2p p2p.P2P db db.ReadOnlyDatabase @@ -76,6 +85,8 @@ type blocksFetcher struct { cancel context.CancelFunc rand *rand.Rand chain blockchainService + clock *startup.Clock + ctxMap prysmsync.ContextByteVersions p2p p2p.P2P db db.ReadOnlyDatabase blocksPerPeriod uint64 @@ -104,11 +115,11 @@ type fetchRequestParams struct { // fetchRequestResponse is a combined type to hold results of both successful executions and errors. // Valid usage pattern will be to check whether result's `err` is nil, before using `blocks`. type fetchRequestResponse struct { - pid peer.ID - start primitives.Slot - count uint64 - blocks []interfaces.ReadOnlySignedBeaconBlock - err error + pid peer.ID + start primitives.Slot + count uint64 + bwb []blocks2.BlockWithVerifiedBlobs + err error } // newBlocksFetcher creates ready to use fetcher. @@ -131,6 +142,8 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc cancel: cancel, rand: rand.NewGenerator(), chain: cfg.chain, + clock: cfg.clock, + ctxMap: cfg.ctxMap, p2p: cfg.p2p, db: cfg.db, blocksPerPeriod: uint64(blocksPerPeriod), @@ -246,10 +259,10 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot defer span.End() response := &fetchRequestResponse{ - start: start, - count: count, - blocks: []interfaces.ReadOnlySignedBeaconBlock{}, - err: nil, + start: start, + count: count, + bwb: []blocks2.BlockWithVerifiedBlobs{}, + err: nil, } if ctx.Err() != nil { @@ -273,7 +286,14 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot } } - response.blocks, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers) + response.bwb, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers) + if response.err == nil { + bwb, err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid) + if err != nil { + response.err = err + } + response.bwb = bwb + } return response } @@ -282,7 +302,7 @@ func (f *blocksFetcher) fetchBlocksFromPeer( ctx context.Context, start primitives.Slot, count uint64, peers []peer.ID, -) ([]interfaces.ReadOnlySignedBeaconBlock, peer.ID, error) { +) ([]blocks2.BlockWithVerifiedBlobs, peer.ID, error) { ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlocksFromPeer") defer span.End() @@ -293,17 +313,183 @@ func (f *blocksFetcher) fetchBlocksFromPeer( Step: 1, } for i := 0; i < len(peers); i++ { - blocks, err := f.requestBlocks(ctx, req, peers[i]) - if err == nil { - f.p2p.Peers().Scorers().BlockProviderScorer().Touch(peers[i]) - return blocks, peers[i], nil - } else { - log.WithError(err).Debug("Could not request blocks by range") + p := peers[i] + blocks, err := f.requestBlocks(ctx, req, p) + if err != nil { + log.WithField("peer", p).WithError(err).Debug("Could not request blocks by range from peer") + continue } + f.p2p.Peers().Scorers().BlockProviderScorer().Touch(p) + robs, err := sortedBlockWithVerifiedBlobSlice(blocks) + if err != nil { + log.WithField("peer", p).WithError(err).Debug("invalid BeaconBlocksByRange response") + continue + } + return robs, p, err } return nil, "", errNoPeersAvailable } +func sortedBlockWithVerifiedBlobSlice(blocks []interfaces.ReadOnlySignedBeaconBlock) ([]blocks2.BlockWithVerifiedBlobs, error) { + rb := make([]blocks2.BlockWithVerifiedBlobs, len(blocks)) + for i, b := range blocks { + ro, err := blocks2.NewROBlock(b) + if err != nil { + return nil, err + } + rb[i] = blocks2.BlockWithVerifiedBlobs{Block: ro} + } + sort.Sort(blocks2.BlockWithVerifiedBlobsSlice(rb)) + return rb, nil +} + +func blobRequest(bwb []blocks2.BlockWithVerifiedBlobs, blobWindowStart primitives.Slot) *p2ppb.BlobSidecarsByRangeRequest { + if len(bwb) == 0 { + return nil + } + // Short-circuit if the highest block is before the deneb start epoch or retention period start. + // This assumes blocks are sorted by sortedBlockWithVerifiedBlobSlice. + highest := bwb[len(bwb)-1].Block.Block().Slot() + // bwb is sorted by slot, so if the last element is outside the retention window, no blobs are needed. + if highest < blobWindowStart { + return nil + } + lowest := lowestSlotNeedsBlob(blobWindowStart, bwb) + if lowest == nil { + return nil + } + return &p2ppb.BlobSidecarsByRangeRequest{ + StartSlot: *lowest, + Count: uint64(highest.SubSlot(*lowest)) + 1, + } +} + +func lowestSlotNeedsBlob(retentionStart primitives.Slot, bwb []blocks2.BlockWithVerifiedBlobs) *primitives.Slot { + i := sort.Search(len(bwb), func(i int) bool { + if bwb[i].Block.Block().Slot() < retentionStart { + return false + } + commits, err := bwb[i].Block.Block().Body().BlobKzgCommitments() + if err != nil || len(commits) == 0 { + return false + } + return true + }) + if i >= len(bwb) { + return nil + } + s := bwb[i].Block.Block().Slot() + return &s +} + +func sortBlobs(blobs []*p2ppb.BlobSidecar) []*p2ppb.BlobSidecar { + sort.Slice(blobs, func(i, j int) bool { + if blobs[i].Slot == blobs[j].Slot { + return blobs[i].Index < blobs[j].Index + } + return blobs[i].Slot < blobs[j].Slot + }) + + return blobs +} + +var errBlobVerification = errors.New("peer unable to serve aligned BlobSidecarsByRange and BeaconBlockSidecarsByRange responses") +var errMissingBlobsForBlockCommitments = errors.Wrap(errBlobVerification, "blobs unavailable for processing block with kzg commitments") +var errMismatchedBlobBlockRoot = errors.Wrap(errBlobVerification, "BlockRoot in BlobSidecar does not match the expected root") +var errMissingBlobIndex = errors.Wrap(errBlobVerification, "missing expected blob index") +var errMismatchedBlobCommitments = errors.Wrap(errBlobVerification, "commitments at given slot, root and index do not match") + +func verifyAndPopulateBlobs(bwb []blocks2.BlockWithVerifiedBlobs, blobs []*p2ppb.BlobSidecar, blobWindowStart primitives.Slot) ([]blocks2.BlockWithVerifiedBlobs, error) { + // Assumes bwb has already been sorted by sortedBlockWithVerifiedBlobSlice. + blobs = sortBlobs(blobs) + blobi := 0 + // Loop over all blocks, and each time a commitment is observed, advance the index into the blob slice. + // The assumption is that the blob slice contains a value for every commitment in the blocks it is based on, + // correctly ordered by slot and blob index. + for i, bb := range bwb { + block := bb.Block.Block() + if block.Slot() < blobWindowStart { + continue + } + commits, err := block.Body().BlobKzgCommitments() + if err != nil { + if errors.Is(err, consensus_types.ErrUnsupportedField) { + log. + WithField("block_slot", block.Slot()). + WithField("retention_start", blobWindowStart). + Warn("block with slot within blob retention period has version which does not support commitments") + continue + } + return nil, err + } + bb.Blobs = make([]*p2ppb.BlobSidecar, len(commits)) + for ci := range commits { + // There are more expected commitments in this block, but we've run out of blobs from the response + // (out-of-bound error guard). + if blobi == len(blobs) { + return nil, missingCommitError(bb.Block.Root(), commits[ci:]) + } + bl := blobs[blobi] + if bl.Slot != block.Slot() { + return nil, missingCommitError(bb.Block.Root(), commits[ci:]) + } + if bytesutil.ToBytes32(bl.BlockRoot) != bb.Block.Root() { + return nil, errors.Wrapf(errMismatchedBlobBlockRoot, + "block root %#x != BlobSidecar.BlockRoot %#x at slot %d", bb.Block.Root(), bl.BlockRoot, block.Slot()) + } + if ci != int(bl.Index) { + return nil, errors.Wrapf(errMissingBlobIndex, + "did not receive blob index %d for block root %#x at slot %d", ci, bb.Block.Root(), block.Slot()) + } + ec := bytesutil.ToBytes48(commits[ci]) + ac := bytesutil.ToBytes48(bl.KzgCommitment) + if ec != ac { + return nil, errors.Wrapf(errMismatchedBlobCommitments, + "commitment %#x != block commitment %#x, at index %d for block root %#x at slot %d ", + ac, ec, bl.Index, bb.Block.Root(), block.Slot()) + } + bb.Blobs[ci] = bl + blobi += 1 + } + bwb[i] = bb + } + return bwb, nil +} + +func missingCommitError(root [32]byte, missing [][]byte) error { + missStr := make([]string, len(missing)) + for k := range missing { + missStr = append(missStr, fmt.Sprintf("%#x", k)) + } + return errors.Wrapf(errMissingBlobsForBlockCommitments, + "block root %#x missing %d commitments %s", root, len(missing), strings.Join(missStr, ",")) +} + +// fetchBlobsFromPeer fetches blocks from a single randomly selected peer. +func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.BlockWithVerifiedBlobs, pid peer.ID) ([]blocks2.BlockWithVerifiedBlobs, error) { + ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlobsFromPeer") + defer span.End() + if slots.ToEpoch(f.clock.CurrentSlot()) < params.BeaconConfig().DenebForkEpoch { + return bwb, nil + } + blobWindowStart, err := prysmsync.BlobsByRangeMinStartSlot(f.clock.CurrentSlot()) + if err != nil { + return nil, err + } + // Construct request message based on observed interval of blocks in need of blobs. + req := blobRequest(bwb, blobWindowStart) + if req == nil { + return bwb, nil + } + // Request blobs from the same peer that gave us the blob batch. + blobs, err := f.requestBlobs(ctx, req, pid) + if err != nil { + return nil, errors.Wrap(err, "could not request blobs by range") + } + f.p2p.Peers().Scorers().BlockProviderScorer().Touch(pid) + return verifyAndPopulateBlobs(bwb, blobs, blobWindowStart) +} + // requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams. func (f *blocksFetcher) requestBlocks( ctx context.Context, @@ -334,6 +520,33 @@ func (f *blocksFetcher) requestBlocks( return prysmsync.SendBeaconBlocksByRangeRequest(ctx, f.chain, f.p2p, pid, req, nil) } +func (f *blocksFetcher) requestBlobs(ctx context.Context, req *p2ppb.BlobSidecarsByRangeRequest, pid peer.ID) ([]*p2ppb.BlobSidecar, error) { + if ctx.Err() != nil { + return nil, ctx.Err() + } + l := f.peerLock(pid) + l.Lock() + log.WithFields(logrus.Fields{ + "peer": pid, + "start": req.StartSlot, + "count": req.Count, + "capacity": f.rateLimiter.Remaining(pid.String()), + "score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid), + }).Debug("Requesting blobs") + // We're intentionally abusing the block rate limit here, treating blob requests as if they were blob requests. + // Since blob requests take more bandwidth than blocks, we should improve how we account for the different kinds + // of requests, more in proportion to the cost of serving them. + if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) { + if err := f.waitForBandwidth(pid, req.Count); err != nil { + l.Unlock() + return nil, err + } + } + f.rateLimiter.Add(pid.String(), int64(req.Count)) + l.Unlock() + return prysmsync.SendBlobsByRangeRequest(ctx, f.clock, f.p2p, pid, f.ctxMap, req) +} + // requestBlocksByRoot is a wrapper for handling BeaconBlockByRootsReq requests/streams. func (f *blocksFetcher) requestBlocksByRoot( ctx context.Context, diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index 04fa4d2c05..d836642a8b 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -3,6 +3,7 @@ package initialsync import ( "context" "fmt" + "math/rand" "sort" "sync" "testing" @@ -23,6 +24,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" leakybucket "github.com/prysmaticlabs/prysm/v4/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v4/container/slice" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/testing/assert" "github.com/prysmaticlabs/prysm/v4/testing/require" @@ -273,6 +275,9 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) { st, err := util.NewBeaconState() require.NoError(t, err) + gt := time.Now() + vr := [32]byte{} + clock := startup.NewClock(gt, vr) mc := &mock.ChainService{ State: st, Root: genesisRoot[:], @@ -288,6 +293,7 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) { fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ chain: mc, p2p: p, + clock: clock, }) require.NoError(t, fetcher.start()) @@ -299,9 +305,9 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) { fetcher.stop() }() - processFetchedBlocks := func() ([]interfaces.ReadOnlySignedBeaconBlock, error) { + processFetchedBlocks := func() ([]blocks.BlockWithVerifiedBlobs, error) { defer cancel() - var unionRespBlocks []interfaces.ReadOnlySignedBeaconBlock + var unionRespBlocks []blocks.BlockWithVerifiedBlobs for { select { @@ -313,8 +319,8 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) { if resp.err != nil { log.WithError(resp.err).Debug("Block fetcher returned error") } else { - unionRespBlocks = append(unionRespBlocks, resp.blocks...) - if len(resp.blocks) == 0 { + unionRespBlocks = append(unionRespBlocks, resp.bwb...) + if len(resp.bwb) == 0 { log.WithFields(logrus.Fields{ "start": resp.start, "count": resp.count, @@ -337,30 +343,27 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) { maxExpectedBlocks += requestParams.count } - blocks, err := processFetchedBlocks() + bwb, err := processFetchedBlocks() assert.NoError(t, err) - sort.Slice(blocks, func(i, j int) bool { - return blocks[i].Block().Slot() < blocks[j].Block().Slot() - }) - - ss := make([]primitives.Slot, len(blocks)) - for i, block := range blocks { - ss[i] = block.Block().Slot() + sort.Sort(blocks.BlockWithVerifiedBlobsSlice(bwb)) + ss := make([]primitives.Slot, len(bwb)) + for i, b := range bwb { + ss[i] = b.Block.Block().Slot() } log.WithFields(logrus.Fields{ - "blocksLen": len(blocks), + "blocksLen": len(bwb), "slots": ss, }).Debug("Finished block fetching") - if len(blocks) > int(maxExpectedBlocks) { - t.Errorf("Too many blocks returned. Wanted %d got %d", maxExpectedBlocks, len(blocks)) + if len(bwb) > int(maxExpectedBlocks) { + t.Errorf("Too many blocks returned. Wanted %d got %d", maxExpectedBlocks, len(bwb)) } - assert.Equal(t, len(tt.expectedBlockSlots), len(blocks), "Processes wrong number of blocks") + assert.Equal(t, len(tt.expectedBlockSlots), len(bwb), "Processes wrong number of blocks") var receivedBlockSlots []primitives.Slot - for _, blk := range blocks { - receivedBlockSlots = append(receivedBlockSlots, blk.Block().Slot()) + for _, b := range bwb { + receivedBlockSlots = append(receivedBlockSlots, b.Block.Block().Slot()) } missing := slice.NotSlot(slice.IntersectionSlot(tt.expectedBlockSlots, receivedBlockSlots), tt.expectedBlockSlots) if len(missing) > 0 { @@ -417,11 +420,13 @@ func TestBlocksFetcher_handleRequest(t *testing.T) { mc, p2p, _ := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers) mc.ValidatorsRoot = [32]byte{} mc.Genesis = time.Now() + t.Run("context cancellation", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ chain: mc, p2p: p2p, + clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot), }) cancel() @@ -435,6 +440,7 @@ func TestBlocksFetcher_handleRequest(t *testing.T) { fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ chain: mc, p2p: p2p, + clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot), }) requestCtx, reqCancel := context.WithTimeout(context.Background(), 2*time.Second) @@ -447,7 +453,7 @@ func TestBlocksFetcher_handleRequest(t *testing.T) { } }() - var blocks []interfaces.ReadOnlySignedBeaconBlock + var bwb []blocks.BlockWithVerifiedBlobs select { case <-ctx.Done(): t.Error(ctx.Err()) @@ -455,16 +461,16 @@ func TestBlocksFetcher_handleRequest(t *testing.T) { if resp.err != nil { t.Error(resp.err) } else { - blocks = resp.blocks + bwb = resp.bwb } } - if uint64(len(blocks)) != uint64(blockBatchLimit) { - t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchLimit, len(blocks)) + if uint64(len(bwb)) != uint64(blockBatchLimit) { + t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchLimit, len(bwb)) } var receivedBlockSlots []primitives.Slot - for _, blk := range blocks { - receivedBlockSlots = append(receivedBlockSlots, blk.Block().Slot()) + for _, b := range bwb { + receivedBlockSlots = append(receivedBlockSlots, b.Block.Block().Slot()) } missing := slice.NotSlot(slice.IntersectionSlot(chainConfig.expectedBlockSlots, receivedBlockSlots), chainConfig.expectedBlockSlots) if len(missing) > 0 { @@ -952,3 +958,171 @@ func TestTimeToWait(t *testing.T) { }) } } + +func TestSortBlobs(t *testing.T) { + _, blobs := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10) + shuffled := make([]*ethpb.BlobSidecar, len(blobs)) + for i := range blobs { + shuffled[i] = blobs[i] + } + rand.Shuffle(len(shuffled), func(i, j int) { + shuffled[i], shuffled[j] = shuffled[j], shuffled[i] + }) + sorted := sortBlobs(shuffled) + require.Equal(t, len(sorted), len(shuffled)) + for i := range blobs { + expect := blobs[i] + actual := sorted[i] + require.Equal(t, expect.Slot, actual.Slot) + require.Equal(t, expect.Index, actual.Index) + require.Equal(t, bytesutil.ToBytes48(expect.KzgCommitment), bytesutil.ToBytes48(actual.KzgCommitment)) + require.Equal(t, bytesutil.ToBytes32(expect.BlockRoot), bytesutil.ToBytes32(actual.BlockRoot)) + } +} + +func TestLowestSlotNeedsBlob(t *testing.T) { + blks, _ := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10) + sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks)) + for i := range blks { + sbbs[i] = blks[i] + } + retentionStart := primitives.Slot(5) + bwb, err := sortedBlockWithVerifiedBlobSlice(sbbs) + require.NoError(t, err) + lowest := lowestSlotNeedsBlob(retentionStart, bwb) + require.Equal(t, retentionStart, *lowest) + higher := primitives.Slot(len(blks) + 1) + lowest = lowestSlotNeedsBlob(higher, bwb) + var nilSlot *primitives.Slot + require.Equal(t, nilSlot, lowest) + + blks, _ = util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10) + sbbs = make([]interfaces.ReadOnlySignedBeaconBlock, len(blks)) + for i := range blks { + sbbs[i] = blks[i] + } + bwb, err = sortedBlockWithVerifiedBlobSlice(sbbs) + require.NoError(t, err) + retentionStart = bwb[5].Block.Block().Slot() + next := bwb[6].Block.Block().Slot() + skip := bwb[5].Block.Block() + bwb[5].Block, _ = util.GenerateTestDenebBlockWithSidecar(t, skip.ParentRoot(), skip.Slot(), 0) + lowest = lowestSlotNeedsBlob(retentionStart, bwb) + require.Equal(t, next, *lowest) +} + +func TestBlobRequest(t *testing.T) { + var nilReq *ethpb.BlobSidecarsByRangeRequest + // no blocks + req := blobRequest([]blocks.BlockWithVerifiedBlobs{}, 0) + require.Equal(t, nilReq, req) + blks, _ := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10) + sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks)) + for i := range blks { + sbbs[i] = blks[i] + } + bwb, err := sortedBlockWithVerifiedBlobSlice(sbbs) + require.NoError(t, err) + maxBlkSlot := primitives.Slot(len(blks) - 1) + + tooHigh := primitives.Slot(len(blks) + 1) + req = blobRequest(bwb, tooHigh) + require.Equal(t, nilReq, req) + + req = blobRequest(bwb, maxBlkSlot) + require.Equal(t, uint64(1), req.Count) + require.Equal(t, maxBlkSlot, req.StartSlot) + + halfway := primitives.Slot(5) + req = blobRequest(bwb, halfway) + require.Equal(t, halfway, req.StartSlot) + // adding 1 to include the halfway slot itself + require.Equal(t, uint64(1+maxBlkSlot-halfway), req.Count) + + before := bwb[0].Block.Block().Slot() + allAfter := bwb[1:] + req = blobRequest(allAfter, before) + require.Equal(t, allAfter[0].Block.Block().Slot(), req.StartSlot) + require.Equal(t, len(allAfter), int(req.Count)) +} + +func testSequenceBlockWithBlob(t *testing.T, nblocks int) ([]blocks.BlockWithVerifiedBlobs, []*ethpb.BlobSidecar) { + blks, blobs := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, nblocks) + sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks)) + for i := range blks { + sbbs[i] = blks[i] + } + bwb, err := sortedBlockWithVerifiedBlobSlice(sbbs) + require.NoError(t, err) + return bwb, blobs +} + +func TestVerifyAndPopulateBlobs(t *testing.T) { + bwb, blobs := testSequenceBlockWithBlob(t, 10) + lastBlobIdx := len(blobs) - 1 + // Blocks are all before the retention window, blobs argument is ignored. + windowAfter := bwb[len(bwb)-1].Block.Block().Slot() + 1 + _, err := verifyAndPopulateBlobs(bwb, nil, windowAfter) + require.NoError(t, err) + + firstBlockSlot := bwb[0].Block.Block().Slot() + // slice off blobs for the last block so we hit the out of bounds / blob exhaustion check. + _, err = verifyAndPopulateBlobs(bwb, blobs[0:len(blobs)-6], firstBlockSlot) + require.ErrorIs(t, err, errMissingBlobsForBlockCommitments) + + bwb, blobs = testSequenceBlockWithBlob(t, 10) + // Misalign the slots of the blobs for the first block to simulate them being missing from the response. + offByOne := blobs[0].Slot + for i := range blobs { + if blobs[i].Slot == offByOne { + blobs[i].Slot = offByOne + 1 + } + } + _, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot) + require.ErrorIs(t, err, errMissingBlobsForBlockCommitments) + + bwb, blobs = testSequenceBlockWithBlob(t, 10) + blobs[lastBlobIdx].BlockRoot = blobs[0].BlockRoot + _, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot) + require.ErrorIs(t, err, errMismatchedBlobBlockRoot) + + bwb, blobs = testSequenceBlockWithBlob(t, 10) + blobs[lastBlobIdx].Index = 100 + _, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot) + require.ErrorIs(t, err, errMissingBlobIndex) + + var emptyKzg [48]byte + bwb, blobs = testSequenceBlockWithBlob(t, 10) + blobs[lastBlobIdx].KzgCommitment = emptyKzg[:] + _, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot) + require.ErrorIs(t, err, errMismatchedBlobCommitments) + + // happy path + bwb, blobs = testSequenceBlockWithBlob(t, 10) + + expectedCommits := make(map[[48]byte]bool) + for _, bl := range blobs { + expectedCommits[bytesutil.ToBytes48(bl.KzgCommitment)] = true + } + // The assertions using this map expect all commitments to be unique, so make sure that stays true. + require.Equal(t, len(blobs), len(expectedCommits)) + + bwb, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot) + require.NoError(t, err) + for _, bw := range bwb { + commits, err := bw.Block.Block().Body().BlobKzgCommitments() + require.NoError(t, err) + require.Equal(t, len(commits), len(bw.Blobs)) + for i := range commits { + bc := bytesutil.ToBytes48(commits[i]) + require.Equal(t, bc, bytesutil.ToBytes48(bw.Blobs[i].KzgCommitment)) + // Since we delete entries we've seen, duplicates will cause an error here. + _, ok := expectedCommits[bc] + // Make sure this was an expected delete, then delete it from the map so we can make sure we saw all of them. + require.Equal(t, true, ok) + delete(expectedCommits, bc) + } + } + // We delete each entry we've seen, so if we see all expected commits, the map should be empty at the end. + require.Equal(t, 0, len(expectedCommits)) +} diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go index 93d3e360df..b80c546350 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go @@ -3,13 +3,13 @@ package initialsync import ( "context" "fmt" - "sort" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" p2pTypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v4/config/params" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" p2ppb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" @@ -22,8 +22,8 @@ import ( // Blocks are stored in an ascending slot order. The first block is guaranteed to have parent // either in DB or initial sync cache. type forkData struct { - peer peer.ID - blocks []interfaces.ReadOnlySignedBeaconBlock + peer peer.ID + bwb []blocks.BlockWithVerifiedBlobs } // nonSkippedSlotAfter checks slots after the given one in an attempt to find a non-empty future slot. @@ -200,13 +200,20 @@ func (f *blocksFetcher) findFork(ctx context.Context, slot primitives.Slot) (*fo return nil, errNoPeersWithAltBlocks } +var errNoAlternateBlocks = errors.New("no alternative blocks exist within scanned range") + +func findForkReqRangeSize() uint64 { + return uint64(params.BeaconConfig().SlotsPerEpoch.Mul(2)) +} + // findForkWithPeer loads some blocks from a peer in an attempt to find alternative blocks. func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot primitives.Slot) (*forkData, error) { + reqCount := findForkReqRangeSize() // Safe-guard, since previous epoch is used when calculating. - slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch - if slot < slotsPerEpoch*2 { - return nil, fmt.Errorf("slot is too low to backtrack, min. expected %d", slotsPerEpoch*2) + if uint64(slot) < reqCount { + return nil, fmt.Errorf("slot is too low to backtrack, min. expected %d", reqCount) } + slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch // Locate non-skipped slot, supported by a given peer (can survive long periods of empty slots). // When searching for non-empty slot, start an epoch earlier - for those blocks we @@ -226,37 +233,62 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot // Request blocks starting from the first non-empty slot. req := &p2ppb.BeaconBlocksByRangeRequest{ StartSlot: nonSkippedSlot, - Count: uint64(slotsPerEpoch.Mul(2)), + Count: reqCount, Step: 1, } blocks, err := f.requestBlocks(ctx, req, pid) if err != nil { return nil, fmt.Errorf("cannot fetch blocks: %w", err) } + if len(blocks) == 0 { + return nil, errNoAlternateBlocks + } + + // If the first block is not connected to the current canonical chain, we'll stop processing this batch. + // Instead, we'll work backwards from the first block until we find a common ancestor, + // and then begin processing from there. + first := blocks[0] + if !f.chain.HasBlock(ctx, first.Block().ParentRoot()) { + // Backtrack on a root, to find a common ancestor from which we can resume syncing. + fork, err := f.findAncestor(ctx, pid, first) + if err != nil { + return nil, fmt.Errorf("failed to find common ancestor: %w", err) + } + return fork, nil + } // Traverse blocks, and if we've got one that doesn't have parent in DB, backtrack on it. - for i, block := range blocks { + // Note that we start from the second element in the array, because we know that the first element is in the db, + // otherwise we would have gone into the findAncestor early return path above. + for i := 1; i < len(blocks); i++ { + block := blocks[i] parentRoot := block.Block().ParentRoot() - if !f.chain.HasBlock(ctx, parentRoot) { - log.WithFields(logrus.Fields{ - "peer": pid, - "slot": block.Block().Slot(), - "root": fmt.Sprintf("%#x", parentRoot), - }).Debug("Block with unknown parent root has been found") - // Backtrack only if the first block is diverging, - // otherwise we already know the common ancestor slot. - if i == 0 { - // Backtrack on a root, to find a common ancestor from which we can resume syncing. - fork, err := f.findAncestor(ctx, pid, block) - if err != nil { - return nil, fmt.Errorf("failed to find common ancestor: %w", err) - } - return fork, nil - } - return &forkData{peer: pid, blocks: blocks}, nil + // Step through blocks until we find one that is not in the chain. The goal is to find the point where the + // chain observed in the peer diverges from the locally known chain, and then collect up the remainder of the + // observed chain chunk to start initial-sync processing from the fork point. + if f.chain.HasBlock(ctx, parentRoot) { + continue } + log.WithFields(logrus.Fields{ + "peer": pid, + "slot": block.Block().Slot(), + "root": fmt.Sprintf("%#x", parentRoot), + }).Debug("Block with unknown parent root has been found") + altBlocks, err := sortedBlockWithVerifiedBlobSlice(blocks[i-1:]) + if err != nil { + return nil, errors.Wrap(err, "invalid blocks received in findForkWithPeer") + } + // We need to fetch the blobs for the given alt-chain if any exist, so that we can try to verify and import + // the blocks. + bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid) + if err != nil { + return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer") + } + // The caller will use the BlocksWith VerifiedBlobs in bwb as the starting point for + // round-robin syncing the alternate chain. + return &forkData{peer: pid, bwb: bwb}, nil } - return nil, errors.New("no alternative blocks exist within scanned range") + return nil, errNoAlternateBlocks } // findAncestor tries to figure out common ancestor slot that connects a given root to known block. @@ -266,12 +298,17 @@ func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfa parentRoot := outBlocks[len(outBlocks)-1].Block().ParentRoot() if f.chain.HasBlock(ctx, parentRoot) { // Common ancestor found, forward blocks back to processor. - sort.Slice(outBlocks, func(i, j int) bool { - return outBlocks[i].Block().Slot() < outBlocks[j].Block().Slot() - }) + bwb, err := sortedBlockWithVerifiedBlobSlice(outBlocks) + if err != nil { + return nil, errors.Wrap(err, "received invalid blocks in findAncestor") + } + bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid) + if err != nil { + return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findAncestor") + } return &forkData{ - peer: pid, - blocks: outBlocks, + peer: pid, + bwb: bwb, }, nil } // Request block's parent. diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go index b58fb07339..9f225fb4e7 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils_test.go @@ -13,6 +13,8 @@ import ( dbtest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing" p2pm "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" p2pt "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" + p2pTypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" @@ -187,6 +189,7 @@ func TestBlocksFetcher_findFork(t *testing.T) { ctx, &blocksFetcherConfig{ chain: mc, + clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot), p2p: p2p, db: beaconDB, }, @@ -252,10 +255,14 @@ func TestBlocksFetcher_findFork(t *testing.T) { // is smart enough to link back to common ancestor, w/o discriminating between forks. This is // by design: fork exploration is undertaken when FSMs are stuck, so any progress is good. chain1b := extendBlockSequence(t, chain1, 64) + forkSlot1b := primitives.Slot(len(chain1)) curForkMoreBlocksPeer := connectPeerHavingBlocks(t, p2p, chain1b, finalizedSlot, p2p.Peers()) fork, err = fetcher.findFork(ctx, 251) require.NoError(t, err) - require.Equal(t, 64, len(fork.blocks)) + + reqEnd := testForkStartSlot(t, 251) + primitives.Slot(findForkReqRangeSize()) + require.Equal(t, primitives.Slot(len(chain1)), fork.bwb[0].Block.Block().Slot()) + require.Equal(t, int(reqEnd-forkSlot1b), len(fork.bwb)) require.Equal(t, curForkMoreBlocksPeer, fork.peer) // Save all chain1b blocks (so that they do not interfere with alternative fork) for _, blk := range chain1b { @@ -277,20 +284,20 @@ func TestBlocksFetcher_findFork(t *testing.T) { fork, err = fetcher.findFork(ctx, 251) require.NoError(t, err) assert.Equal(t, alternativePeer, fork.peer) - assert.Equal(t, 65, len(fork.blocks)) + assert.Equal(t, 65, len(fork.bwb)) ind := forkSlot - for _, blk := range fork.blocks { - require.Equal(t, blk.Block().Slot(), chain2[ind].Block.Slot) + for _, blk := range fork.bwb { + require.Equal(t, blk.Block.Block().Slot(), chain2[ind].Block.Slot) ind++ } // Process returned blocks and then attempt to extend chain (ensuring that parent block exists). - for _, blk := range fork.blocks { - require.NoError(t, beaconDB.SaveBlock(ctx, blk)) - require.NoError(t, st.SetSlot(blk.Block().Slot())) + for _, blk := range fork.bwb { + require.NoError(t, beaconDB.SaveBlock(ctx, blk.Block)) + require.NoError(t, st.SetSlot(blk.Block.Block().Slot())) } - assert.Equal(t, forkSlot.Add(uint64(len(fork.blocks)-1)), mc.HeadSlot()) - for i := forkSlot.Add(uint64(len(fork.blocks))); i < primitives.Slot(len(chain2)); i++ { + assert.Equal(t, forkSlot.Add(uint64(len(fork.bwb)-1)), mc.HeadSlot()) + for i := forkSlot.Add(uint64(len(fork.bwb))); i < primitives.Slot(len(chain2)); i++ { blk := chain2[i] require.Equal(t, blk.Block.Slot, i, "incorrect block selected for slot %d", i) // Only save is parent block exists. @@ -309,6 +316,21 @@ func TestBlocksFetcher_findFork(t *testing.T) { } } +func testForkStartSlot(t *testing.T, slot primitives.Slot) primitives.Slot { + // When we call find fork, the way we find the first common ancestor is: + // - start at the first slot of the given epoch (epochStart below) + // - look back an additional epoch, which findForkWithPeer comments say is done to ensure + // there will be more overlap, since this routine kicks off when we're losing sight of the canonical chain + // - add one: this happens in nonSkippedSlotAfterWithPeersTarget and isn't really explained why this method takes + // the slot before the beginning of the search range and not the actual search range. + // Anyway, since findFork returns the last common ancestor between the known chain and the forked chain, + // we need to figure out how much the two overlap so we can ignore the common blocks when looking at the + // size of the response. + epochStart, err := slots.EpochStart(slots.ToEpoch(slot)) + require.NoError(t, err) + return 1 + (epochStart - params.BeaconConfig().SlotsPerEpoch) +} + func TestBlocksFetcher_findForkWithPeer(t *testing.T) { beaconDB := dbtest.SetupDB(t) p1 := p2pt.NewTestP2P(t) @@ -335,6 +357,7 @@ func TestBlocksFetcher_findForkWithPeer(t *testing.T) { ctx, &blocksFetcherConfig{ chain: mc, + clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot), p2p: p1, db: beaconDB, }, @@ -392,8 +415,8 @@ func TestBlocksFetcher_findForkWithPeer(t *testing.T) { }() fork, err := fetcher.findForkWithPeer(ctx, p2, 64) require.NoError(t, err) - require.Equal(t, 10, len(fork.blocks)) - assert.Equal(t, forkedSlot, fork.blocks[0].Block().Slot(), "Expected slot %d to be ancestor", forkedSlot) + require.Equal(t, 10, len(fork.bwb)) + assert.Equal(t, forkedSlot, fork.bwb[0].Block.Block().Slot(), "Expected slot %d to be ancestor", forkedSlot) }) t.Run("first block is diverging - no common ancestor", func(t *testing.T) { @@ -417,11 +440,32 @@ func TestBlocksFetcher_findForkWithPeer(t *testing.T) { }() fork, err := fetcher.findForkWithPeer(ctx, p2, 64) require.NoError(t, err) - require.Equal(t, 64, len(fork.blocks)) - assert.Equal(t, primitives.Slot(33), fork.blocks[0].Block().Slot()) + + reqEnd := testForkStartSlot(t, 64) + primitives.Slot(findForkReqRangeSize()) + expectedLen := reqEnd - forkedSlot + // there are 4 blocks that are different before the beginning + require.Equal(t, int(expectedLen), len(fork.bwb)) + assert.Equal(t, primitives.Slot(60), fork.bwb[0].Block.Block().Slot()) }) } +func TestTestForkStartSlot(t *testing.T) { + require.Equal(t, primitives.Slot(33), testForkStartSlot(t, 64)) + require.Equal(t, primitives.Slot(193), testForkStartSlot(t, 251)) +} + +func consumeBlockRootRequest(t *testing.T, p *p2pt.TestP2P) func(network.Stream) { + return func(stream network.Stream) { + defer func() { + _err := stream.Close() + _ = _err + }() + + req := new(p2pTypes.BeaconBlockByRootsReq) + assert.NoError(t, p.Encoding().DecodeWithMaxLength(stream, req)) + } +} + func TestBlocksFetcher_findAncestor(t *testing.T) { beaconDB := dbtest.SetupDB(t) p2p := p2pt.NewTestP2P(t) @@ -476,9 +520,7 @@ func TestBlocksFetcher_findAncestor(t *testing.T) { p2 := p2pt.NewTestP2P(t) p2p.Connect(p2) - p2.SetStreamHandler(pcl, func(stream network.Stream) { - assert.NoError(t, stream.Close()) - }) + p2.SetStreamHandler(pcl, consumeBlockRootRequest(t, p2)) wsb, err := blocks.NewSignedBeaconBlock(knownBlocks[4]) require.NoError(t, err) diff --git a/beacon-chain/sync/initial-sync/blocks_queue.go b/beacon-chain/sync/initial-sync/blocks_queue.go index 68e5d3286e..628be0eb17 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue.go +++ b/beacon-chain/sync/initial-sync/blocks_queue.go @@ -8,8 +8,9 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/prysmaticlabs/prysm/v4/beacon-chain/db" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" beaconsync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" - "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/time/slots" "github.com/sirupsen/logrus" @@ -62,6 +63,8 @@ type syncMode uint8 type blocksQueueConfig struct { blocksFetcher *blocksFetcher chain blockchainService + clock *startup.Clock + ctxMap beaconsync.ContextByteVersions highestExpectedSlot primitives.Slot p2p p2p.P2P db db.ReadOnlyDatabase @@ -88,8 +91,8 @@ type blocksQueue struct { // blocksQueueFetchedData is a data container that is returned from a queue on each step. type blocksQueueFetchedData struct { - pid peer.ID - blocks []interfaces.ReadOnlySignedBeaconBlock + pid peer.ID + bwb []blocks.BlockWithVerifiedBlobs } // newBlocksQueue creates initialized priority queue. @@ -99,9 +102,11 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue { blocksFetcher := cfg.blocksFetcher if blocksFetcher == nil { blocksFetcher = newBlocksFetcher(ctx, &blocksFetcherConfig{ - chain: cfg.chain, - p2p: cfg.p2p, - db: cfg.db, + ctxMap: cfg.ctxMap, + chain: cfg.chain, + p2p: cfg.p2p, + db: cfg.db, + clock: cfg.clock, }) } highestExpectedSlot := cfg.highestExpectedSlot @@ -316,15 +321,15 @@ func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn { return m.state, errInputNotFetchRequestParams } if response.err != nil { - switch response.err { - case errSlotIsTooHigh: + if errors.Is(response.err, errSlotIsTooHigh) { // Current window is already too big, re-request previous epochs. for _, fsm := range q.smm.machines { if fsm.start < response.start && fsm.state == stateSkipped { fsm.setState(stateNew) } } - case beaconsync.ErrInvalidFetchedData: + } + if errors.Is(response.err, beaconsync.ErrInvalidFetchedData) { // Peer returned invalid data, penalize. q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(m.pid) log.WithField("pid", response.pid).Debug("Peer is penalized for invalid blocks") @@ -332,7 +337,7 @@ func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn { return m.state, response.err } m.pid = response.pid - m.blocks = response.blocks + m.bwb = response.bwb return stateDataParsed, nil } } @@ -347,14 +352,14 @@ func (q *blocksQueue) onReadyToSendEvent(ctx context.Context) eventHandlerFn { return m.state, errInvalidInitialState } - if len(m.blocks) == 0 { + if len(m.bwb) == 0 { return stateSkipped, nil } send := func() (stateID, error) { data := &blocksQueueFetchedData{ - pid: m.pid, - blocks: m.blocks, + pid: m.pid, + bwb: m.bwb, } select { case <-ctx.Done(): diff --git a/beacon-chain/sync/initial-sync/blocks_queue_test.go b/beacon-chain/sync/initial-sync/blocks_queue_test.go index 65e5cb86e2..87610368bf 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue_test.go +++ b/beacon-chain/sync/initial-sync/blocks_queue_test.go @@ -11,6 +11,7 @@ import ( dbtest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers" p2pt "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" beaconsync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" @@ -37,6 +38,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) { fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ chain: mc, p2p: p2p, + clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot), }) t.Run("stop without start", func(t *testing.T) { @@ -126,6 +128,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) { blocksFetcher: fetcher, chain: mc, highestExpectedSlot: primitives.Slot(blockBatchLimit), + clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot), }) assert.NoError(t, queue.start()) cancel() @@ -252,6 +255,7 @@ func TestBlocksQueue_Loop(t *testing.T) { fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{ chain: mc, p2p: p2p, + clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot), }) queue := newBlocksQueue(ctx, &blocksQueueConfig{ blocksFetcher: fetcher, @@ -259,7 +263,8 @@ func TestBlocksQueue_Loop(t *testing.T) { highestExpectedSlot: tt.highestExpectedSlot, }) assert.NoError(t, queue.start()) - processBlock := func(block interfaces.ReadOnlySignedBeaconBlock) error { + processBlock := func(b blocks.BlockWithVerifiedBlobs) error { + block := b.Block if !beaconDB.HasBlock(ctx, block.Block().ParentRoot()) { return fmt.Errorf("%w: %#x", errParentDoesNotExist, block.Block().ParentRoot()) } @@ -270,13 +275,13 @@ func TestBlocksQueue_Loop(t *testing.T) { return mc.ReceiveBlock(ctx, block, root) } - var blocks []interfaces.ReadOnlySignedBeaconBlock + var blocks []blocks.BlockWithVerifiedBlobs for data := range queue.fetchedData { - for _, block := range data.blocks { - if err := processBlock(block); err != nil { + for _, b := range data.bwb { + if err := processBlock(b); err != nil { continue } - blocks = append(blocks, block) + blocks = append(blocks, b) } } @@ -288,8 +293,8 @@ func TestBlocksQueue_Loop(t *testing.T) { } assert.Equal(t, len(tt.expectedBlockSlots), len(blocks), "Processes wrong number of blocks") var receivedBlockSlots []primitives.Slot - for _, blk := range blocks { - receivedBlockSlots = append(receivedBlockSlots, blk.Block().Slot()) + for _, b := range blocks { + receivedBlockSlots = append(receivedBlockSlots, b.Block.Block().Slot()) } missing := slice.NotSlot(slice.IntersectionSlot(tt.expectedBlockSlots, receivedBlockSlots), tt.expectedBlockSlots) if len(missing) > 0 { @@ -533,21 +538,21 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) { require.NoError(t, err) response := &fetchRequestResponse{ pid: "abc", - blocks: []interfaces.ReadOnlySignedBeaconBlock{ - wsb, - wsbCopy, + bwb: []blocks.BlockWithVerifiedBlobs{ + {Block: blocks.ROBlock{ReadOnlySignedBeaconBlock: wsb}}, + {Block: blocks.ROBlock{ReadOnlySignedBeaconBlock: wsbCopy}}, }, } fsm := &stateMachine{ state: stateScheduled, } assert.Equal(t, peer.ID(""), fsm.pid) - assert.DeepSSZEqual(t, []interfaces.ReadOnlySignedBeaconBlock(nil), fsm.blocks) + assert.Equal(t, 0, len(fsm.bwb)) updatedState, err := handlerFn(fsm, response) assert.NoError(t, err) assert.Equal(t, stateDataParsed, updatedState) assert.Equal(t, response.pid, fsm.pid) - assert.DeepSSZEqual(t, response.blocks, fsm.blocks) + assert.DeepSSZEqual(t, response.bwb, fsm.bwb) }) } @@ -631,8 +636,10 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) { queue.smm.addStateMachine(320) queue.smm.machines[256].state = stateDataParsed queue.smm.machines[256].pid = pidDataParsed - queue.smm.machines[256].blocks = []interfaces.ReadOnlySignedBeaconBlock{ - wsb, + rwsb, err := blocks.NewROBlock(wsb) + require.NoError(t, err) + queue.smm.machines[256].bwb = []blocks.BlockWithVerifiedBlobs{ + {Block: rwsb}, } handlerFn := queue.onReadyToSendEvent(ctx) @@ -663,8 +670,10 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) { queue.smm.addStateMachine(320) queue.smm.machines[320].state = stateDataParsed queue.smm.machines[320].pid = pidDataParsed - queue.smm.machines[320].blocks = []interfaces.ReadOnlySignedBeaconBlock{ - wsb, + rwsb, err := blocks.NewROBlock(wsb) + require.NoError(t, err) + queue.smm.machines[320].bwb = []blocks.BlockWithVerifiedBlobs{ + {Block: rwsb}, } handlerFn := queue.onReadyToSendEvent(ctx) @@ -692,8 +701,10 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) { queue.smm.addStateMachine(320) queue.smm.machines[320].state = stateDataParsed queue.smm.machines[320].pid = pidDataParsed - queue.smm.machines[320].blocks = []interfaces.ReadOnlySignedBeaconBlock{ - wsb, + rwsb, err := blocks.NewROBlock(wsb) + require.NoError(t, err) + queue.smm.machines[320].bwb = []blocks.BlockWithVerifiedBlobs{ + {Block: rwsb}, } handlerFn := queue.onReadyToSendEvent(ctx) @@ -1078,6 +1089,7 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) { chain: mc, p2p: p2p, db: beaconDB, + clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot), }, ) fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false) @@ -1212,27 +1224,28 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) { // required forked data, including data on and after slot 201. forkedEpochStartSlot, err := slots.EpochStart(slots.ToEpoch(forkedSlot)) require.NoError(t, err) - firstFSM, ok := queue.smm.findStateMachine(forkedEpochStartSlot + 1) + firstFSM, ok := queue.smm.findStateMachine(forkedSlot) require.Equal(t, true, ok) require.Equal(t, stateDataParsed, firstFSM.state) require.Equal(t, forkedPeer, firstFSM.pid) - require.Equal(t, 64, len(firstFSM.blocks)) - require.Equal(t, forkedEpochStartSlot+1, firstFSM.blocks[0].Block().Slot()) + reqEnd := testForkStartSlot(t, 251) + primitives.Slot(findForkReqRangeSize()) + require.Equal(t, int(reqEnd-forkedSlot), len(firstFSM.bwb)) + require.Equal(t, forkedSlot, firstFSM.bwb[0].Block.Block().Slot()) // Assert that forked data from chain2 is available (within 64 fetched blocks). - for i, blk := range chain2[forkedEpochStartSlot+1:] { - if i >= len(firstFSM.blocks) { + for i, blk := range chain2[forkedSlot:] { + if i >= len(firstFSM.bwb) { break } - rootFromFSM, err := firstFSM.blocks[i].Block().HashTreeRoot() - require.NoError(t, err) + rootFromFSM := firstFSM.bwb[i].Block.Root() blkRoot, err := blk.Block.HashTreeRoot() require.NoError(t, err) assert.Equal(t, blkRoot, rootFromFSM) } // Assert that machines are in the expected state. - startSlot = forkedEpochStartSlot.Add(1 + uint64(len(firstFSM.blocks))) + startSlot = forkedEpochStartSlot.Add(1 + blocksPerRequest) + require.Equal(t, int(blocksPerRequest)-int(forkedSlot-(forkedEpochStartSlot+1)), len(firstFSM.bwb)) for i := startSlot; i < startSlot.Add(blocksPerRequest*(lookaheadSteps-1)); i += primitives.Slot(blocksPerRequest) { fsm, ok := queue.smm.findStateMachine(i) require.Equal(t, true, ok) @@ -1298,6 +1311,7 @@ func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) { chain: mc, p2p: p2p, db: beaconDB, + clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot), }, ) fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false) @@ -1333,7 +1347,8 @@ func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) { case <-time.After(3 * time.Second): t.Fatal("test takes too long to complete") case data := <-queue.fetchedData: - for _, blk := range data.blocks { + for _, b := range data.bwb { + blk := b.Block blkRoot, err := blk.Block().HashTreeRoot() require.NoError(t, err) if isProcessedBlock(ctx, blk, blkRoot) { diff --git a/beacon-chain/sync/initial-sync/blocks_queue_utils.go b/beacon-chain/sync/initial-sync/blocks_queue_utils.go index 9d5ce7ca18..ceb9e2e5f5 100644 --- a/beacon-chain/sync/initial-sync/blocks_queue_utils.go +++ b/beacon-chain/sync/initial-sync/blocks_queue_utils.go @@ -14,13 +14,10 @@ func (q *blocksQueue) resetFromFork(fork *forkData) error { if fork == nil { return errors.New("nil fork data") } - if len(fork.blocks) == 0 { + if len(fork.bwb) == 0 { return errors.New("no blocks to reset from") } - firstBlock := fork.blocks[0].Block() - if firstBlock == nil || firstBlock.IsNil() { - return errors.New("invalid first block in fork data") - } + firstBlock := fork.bwb[0].Block.Block() blocksPerRequest := q.blocksFetcher.blocksPerPeriod if err := q.smm.removeAllStateMachines(); err != nil { @@ -28,11 +25,11 @@ func (q *blocksQueue) resetFromFork(fork *forkData) error { } fsm := q.smm.addStateMachine(firstBlock.Slot()) fsm.pid = fork.peer - fsm.blocks = fork.blocks + fsm.bwb = fork.bwb fsm.state = stateDataParsed // The rest of machines are in skipped state. - startSlot := firstBlock.Slot().Add(uint64(len(fork.blocks))) + startSlot := firstBlock.Slot().Add(uint64(len(fork.bwb))) for i := startSlot; i < startSlot.Add(blocksPerRequest*(lookaheadSteps-1)); i += primitives.Slot(blocksPerRequest) { fsm := q.smm.addStateMachine(i) fsm.state = stateSkipped diff --git a/beacon-chain/sync/initial-sync/fsm.go b/beacon-chain/sync/initial-sync/fsm.go index 1a2441a0f7..9f3be7f561 100644 --- a/beacon-chain/sync/initial-sync/fsm.go +++ b/beacon-chain/sync/initial-sync/fsm.go @@ -7,7 +7,7 @@ import ( "time" "github.com/libp2p/go-libp2p/core/peer" - "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" prysmTime "github.com/prysmaticlabs/prysm/v4/time" "github.com/prysmaticlabs/prysm/v4/time/slots" @@ -46,7 +46,7 @@ type stateMachine struct { start primitives.Slot state stateID pid peer.ID - blocks []interfaces.ReadOnlySignedBeaconBlock + bwb []blocks.BlockWithVerifiedBlobs updated time.Time } @@ -78,7 +78,7 @@ func (smm *stateMachineManager) addStateMachine(startSlot primitives.Slot) *stat smm: smm, start: startSlot, state: stateNew, - blocks: []interfaces.ReadOnlySignedBeaconBlock{}, + bwb: []blocks.BlockWithVerifiedBlobs{}, updated: prysmTime.Now(), } smm.recalculateMachineAttribs() @@ -90,7 +90,7 @@ func (smm *stateMachineManager) removeStateMachine(startSlot primitives.Slot) er if _, ok := smm.machines[startSlot]; !ok { return fmt.Errorf("state for machine %v is not found", startSlot) } - smm.machines[startSlot].blocks = nil + smm.machines[startSlot].bwb = nil delete(smm.machines, startSlot) smm.recalculateMachineAttribs() return nil diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 71fb6160ce..b9c0b32c2f 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -3,13 +3,15 @@ package initialsync import ( "context" "encoding/hex" - "errors" "fmt" "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/paulbellamy/ratecounter" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/time/slots" @@ -25,7 +27,7 @@ const ( type blockReceiverFn func(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error // batchBlockReceiverFn defines batch receiving function. -type batchBlockReceiverFn func(ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock, roots [][32]byte) error +type batchBlockReceiverFn func(ctx context.Context, blks []blocks.ROBlock) error // Round Robin sync looks at the latest peer statuses and syncs up to the highest known epoch. // @@ -69,10 +71,18 @@ func (s *Service) syncToFinalizedEpoch(ctx context.Context, genesis time.Time) e log.Debug("Already synced to finalized epoch") return nil } + + vr := s.clock.GenesisValidatorsRoot() + ctxMap, err := sync.ContextByteVersionsForValRoot(vr) + if err != nil { + return errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr) + } queue := newBlocksQueue(ctx, &blocksQueueConfig{ p2p: s.cfg.P2P, db: s.cfg.DB, chain: s.cfg.Chain, + clock: s.clock, + ctxMap: ctxMap, highestExpectedSlot: highestFinalizedSlot, mode: modeStopOnFinalizedEpoch, }) @@ -81,6 +91,9 @@ func (s *Service) syncToFinalizedEpoch(ctx context.Context, genesis time.Time) e } for data := range queue.fetchedData { + // If blobs are available. Verify blobs and blocks are consistence. + // We can't import a block if there's no associated blob within DA bound. + // The blob has to pass aggregated proof check. s.processFetchedData(ctx, genesis, s.cfg.Chain.HeadSlot(), data) } @@ -98,10 +111,17 @@ func (s *Service) syncToFinalizedEpoch(ctx context.Context, genesis time.Time) e // syncToNonFinalizedEpoch sync from head to best known non-finalized epoch supported by majority // of peers (no less than MinimumSyncPeers*2 peers). func (s *Service) syncToNonFinalizedEpoch(ctx context.Context, genesis time.Time) error { + vr := s.clock.GenesisValidatorsRoot() + ctxMap, err := sync.ContextByteVersionsForValRoot(vr) + if err != nil { + return errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr) + } queue := newBlocksQueue(ctx, &blocksQueueConfig{ p2p: s.cfg.P2P, db: s.cfg.DB, chain: s.cfg.Chain, + clock: s.clock, + ctxMap: ctxMap, highestExpectedSlot: slots.Since(genesis), mode: modeNonConstrained, }) @@ -128,7 +148,7 @@ func (s *Service) processFetchedData( defer s.updatePeerScorerStats(data.pid, startSlot) // Use Batch Block Verify to process and verify batches directly. - if err := s.processBatchedBlocks(ctx, genesis, data.blocks, s.cfg.Chain.ReceiveBlockBatch); err != nil { + if err := s.processBatchedBlocks(ctx, genesis, data.bwb, s.cfg.Chain.ReceiveBlockBatch); err != nil { log.WithError(err).Warn("Skip processing batched blocks") } } @@ -141,8 +161,8 @@ func (s *Service) processFetchedDataRegSync( blockReceiver := s.cfg.Chain.ReceiveBlock invalidBlocks := 0 blksWithoutParentCount := 0 - for _, blk := range data.blocks { - if err := s.processBlock(ctx, genesis, blk, blockReceiver); err != nil { + for _, b := range data.bwb { + if err := s.processBlock(ctx, genesis, b, blockReceiver); err != nil { switch { case errors.Is(err, errBlockAlreadyProcessed): log.WithError(err).Debug("Block is not processed") @@ -158,13 +178,13 @@ func (s *Service) processFetchedDataRegSync( } if blksWithoutParentCount > 0 { log.WithFields(logrus.Fields{ - "missingParent": fmt.Sprintf("%#x", data.blocks[0].Block().ParentRoot()), - "firstSlot": data.blocks[0].Block().Slot(), - "lastSlot": data.blocks[blksWithoutParentCount-1].Block().Slot(), + "missingParent": fmt.Sprintf("%#x", data.bwb[0].Block.Block().ParentRoot()), + "firstSlot": data.bwb[0].Block.Block().Slot(), + "lastSlot": data.bwb[blksWithoutParentCount-1].Block.Block().Slot(), }).Debug("Could not process batch blocks due to missing parent") } // Add more visible logging if all blocks cannot be processed. - if len(data.blocks) == invalidBlocks { + if len(data.bwb) == invalidBlocks { log.WithField("error", "Range had no valid blocks to process").Warn("Range is not processed") } } @@ -204,20 +224,20 @@ func (s *Service) logSyncStatus(genesis time.Time, blk interfaces.ReadOnlyBeacon } // logBatchSyncStatus and increments the block processing counter. -func (s *Service) logBatchSyncStatus(genesis time.Time, blks []interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) { - s.counter.Incr(int64(len(blks))) +func (s *Service) logBatchSyncStatus(genesis time.Time, firstBlk blocks.ROBlock, nBlocks int) { + s.counter.Incr(int64(nBlocks)) rate := float64(s.counter.Rate()) / counterSeconds if rate == 0 { rate = 1 } - firstBlk := blks[0] + firstRoot := firstBlk.Root() timeRemaining := time.Duration(float64(slots.Since(genesis)-firstBlk.Block().Slot())/rate) * time.Second log.WithFields(logrus.Fields{ "peers": len(s.cfg.P2P.Peers().Connected()), "blocksPerSecond": fmt.Sprintf("%.1f", rate), }).Infof( "Processing block batch of size %d starting from %s %d/%d - estimated time remaining %s", - len(blks), fmt.Sprintf("0x%s...", hex.EncodeToString(blkRoot[:])[:8]), + nBlocks, fmt.Sprintf("0x%s...", hex.EncodeToString(firstRoot[:])[:8]), firstBlk.Block().Slot(), slots.Since(genesis), timeRemaining, ) } @@ -226,14 +246,12 @@ func (s *Service) logBatchSyncStatus(genesis time.Time, blks []interfaces.ReadOn func (s *Service) processBlock( ctx context.Context, genesis time.Time, - blk interfaces.ReadOnlySignedBeaconBlock, + bwb blocks.BlockWithVerifiedBlobs, blockReceiver blockReceiverFn, ) error { - blkRoot, err := blk.Block().HashTreeRoot() - if err != nil { - return err - } - if s.isProcessedBlock(ctx, blk, blkRoot) { + blk := bwb.Block + blkRoot := blk.Root() + if s.isProcessedBlock(ctx, blk) { return fmt.Errorf("slot: %d , root %#x: %w", blk.Block().Slot(), blkRoot, errBlockAlreadyProcessed) } @@ -244,48 +262,63 @@ func (s *Service) processBlock( return blockReceiver(ctx, blk, blkRoot) } +type processedChecker func(context.Context, blocks.ROBlock) bool + +func validUnprocessed(ctx context.Context, bwb []blocks.BlockWithVerifiedBlobs, headSlot primitives.Slot, isProc processedChecker) ([]blocks.BlockWithVerifiedBlobs, error) { + // use a pointer to avoid confusing the zero-value with the case where the first element is processed. + var processed *int + for i := range bwb { + b := bwb[i].Block + if headSlot >= b.Block().Slot() && isProc(ctx, b) { + processed = &i + continue + } + if i > 0 { + parent := bwb[i-1].Block + if parent.Root() != b.Block().ParentRoot() { + return nil, fmt.Errorf("expected linear block list with parent root of %#x (slot %d) but received %#x (slot %d)", + parent, parent.Block().Slot(), b.Block().ParentRoot(), b.Block().Slot()) + } + } + } + if processed == nil { + return bwb, nil + } + if *processed+1 == len(bwb) { + maxIncoming := bwb[len(bwb)-1].Block + maxRoot := maxIncoming.Root() + return nil, fmt.Errorf("headSlot:%d, blockSlot:%d , root %#x:%w", headSlot, maxIncoming.Block().Slot(), maxRoot, errBlockAlreadyProcessed) + } + return bwb[*processed:], nil +} + func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time, - blks []interfaces.ReadOnlySignedBeaconBlock, bFunc batchBlockReceiverFn) error { - if len(blks) == 0 { + bwb []blocks.BlockWithVerifiedBlobs, bFunc batchBlockReceiverFn) error { + if len(bwb) == 0 { return errors.New("0 blocks provided into method") } - firstBlock := blks[0] - blkRoot, err := firstBlock.Block().HashTreeRoot() + headSlot := s.cfg.Chain.HeadSlot() + var err error + bwb, err = validUnprocessed(ctx, bwb, headSlot, s.isProcessedBlock) if err != nil { return err } - headSlot := s.cfg.Chain.HeadSlot() - for headSlot >= firstBlock.Block().Slot() && s.isProcessedBlock(ctx, firstBlock, blkRoot) { - if len(blks) == 1 { - return fmt.Errorf("headSlot:%d, blockSlot:%d , root %#x:%w", headSlot, firstBlock.Block().Slot(), blkRoot, errBlockAlreadyProcessed) + + first := bwb[0].Block + if !s.cfg.Chain.HasBlock(ctx, first.Block().ParentRoot()) { + return fmt.Errorf("%w: %#x (in processBatchedBlocks, slot=%d)", + errParentDoesNotExist, first.Block().ParentRoot(), first.Block().Slot()) + } + s.logBatchSyncStatus(genesis, first, len(bwb)) + for _, bb := range bwb { + if len(bb.Blobs) == 0 { + continue } - blks = blks[1:] - firstBlock = blks[0] - blkRoot, err = firstBlock.Block().HashTreeRoot() - if err != nil { - return err + if err := s.cfg.DB.SaveBlobSidecar(ctx, bb.Blobs); err != nil { + return errors.Wrapf(err, "failed to save blobs for block %#x", bb.Block.Root()) } } - s.logBatchSyncStatus(genesis, blks, blkRoot) - parentRoot := firstBlock.Block().ParentRoot() - if !s.cfg.Chain.HasBlock(ctx, parentRoot) { - return fmt.Errorf("%w: %#x (in processBatchedBlocks, slot=%d)", errParentDoesNotExist, firstBlock.Block().ParentRoot(), firstBlock.Block().Slot()) - } - blockRoots := make([][32]byte, len(blks)) - blockRoots[0] = blkRoot - for i := 1; i < len(blks); i++ { - b := blks[i] - if b.Block().ParentRoot() != blockRoots[i-1] { - return fmt.Errorf("expected linear block list with parent root of %#x but received %#x", - blockRoots[i-1][:], b.Block().ParentRoot()) - } - blkRoot, err := b.Block().HashTreeRoot() - if err != nil { - return err - } - blockRoots[i] = blkRoot - } - return bFunc(ctx, blks, blockRoots) + return bFunc(ctx, blocks.BlockWithVerifiedBlobsSlice(bwb).ROBlocks()) } // updatePeerScorerStats adjusts monitored metrics for a peer. @@ -304,7 +337,7 @@ func (s *Service) updatePeerScorerStats(pid peer.ID, startSlot primitives.Slot) } // isProcessedBlock checks DB and local cache for presence of a given block, to avoid duplicates. -func (s *Service) isProcessedBlock(ctx context.Context, blk interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) bool { +func (s *Service) isProcessedBlock(ctx context.Context, blk blocks.ROBlock) bool { cp := s.cfg.Chain.FinalizedCheckpt() finalizedSlot, err := slots.EpochStart(cp.Epoch) if err != nil { @@ -317,7 +350,7 @@ func (s *Service) isProcessedBlock(ctx context.Context, blk interfaces.ReadOnlyS } // If block exists in our db and is before or equal to our current head // we ignore it. - if s.cfg.Chain.HeadSlot() >= blk.Block().Slot() && s.cfg.Chain.HasBlock(ctx, blkRoot) { + if s.cfg.Chain.HeadSlot() >= blk.Block().Slot() && s.cfg.Chain.HasBlock(ctx, blk.Root()) { return true } return false diff --git a/beacon-chain/sync/initial-sync/round_robin_test.go b/beacon-chain/sync/initial-sync/round_robin_test.go index 102a5ad50b..f5c7a04548 100644 --- a/beacon-chain/sync/initial-sync/round_robin_test.go +++ b/beacon-chain/sync/initial-sync/round_robin_test.go @@ -10,6 +10,7 @@ import ( mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing" dbtest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing" p2pt "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" @@ -292,6 +293,8 @@ func TestService_roundRobinSync(t *testing.T) { st, err := util.NewBeaconState() require.NoError(t, err) + gt := time.Now() + vr := [32]byte{} mc := &mock.ChainService{ State: st, Root: genesisRoot[:], @@ -299,14 +302,16 @@ func TestService_roundRobinSync(t *testing.T) { FinalizedCheckPoint: ð.Checkpoint{ Epoch: 0, }, - Genesis: time.Now(), - ValidatorsRoot: [32]byte{}, + Genesis: gt, + ValidatorsRoot: vr, } // no-op mock + clock := startup.NewClock(gt, vr) s := &Service{ ctx: context.Background(), cfg: &Config{Chain: mc, P2P: p, DB: beaconDB}, synced: abool.New(), chainStarted: abool.NewBool(true), + clock: clock, } assert.NoError(t, s.roundRobinSync(makeGenesisTime(tt.currentSlot))) if s.cfg.Chain.HeadSlot() < tt.currentSlot { @@ -364,7 +369,9 @@ func TestService_processBlock(t *testing.T) { // Process block normally. wsb, err := blocks.NewSignedBeaconBlock(blk1) require.NoError(t, err) - err = s.processBlock(ctx, genesis, wsb, func( + rowsb, err := blocks.NewROBlock(wsb) + require.NoError(t, err) + err = s.processBlock(ctx, genesis, blocks.BlockWithVerifiedBlobs{Block: rowsb}, func( ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error { assert.NoError(t, s.cfg.Chain.ReceiveBlock(ctx, block, blockRoot)) return nil @@ -374,7 +381,9 @@ func TestService_processBlock(t *testing.T) { // Duplicate processing should trigger error. wsb, err = blocks.NewSignedBeaconBlock(blk1) require.NoError(t, err) - err = s.processBlock(ctx, genesis, wsb, func( + rowsb, err = blocks.NewROBlock(wsb) + require.NoError(t, err) + err = s.processBlock(ctx, genesis, blocks.BlockWithVerifiedBlobs{Block: rowsb}, func( ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error { return nil }) @@ -383,7 +392,9 @@ func TestService_processBlock(t *testing.T) { // Continue normal processing, should proceed w/o errors. wsb, err = blocks.NewSignedBeaconBlock(blk2) require.NoError(t, err) - err = s.processBlock(ctx, genesis, wsb, func( + rowsb, err = blocks.NewROBlock(wsb) + require.NoError(t, err) + err = s.processBlock(ctx, genesis, blocks.BlockWithVerifiedBlobs{Block: rowsb}, func( ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error { assert.NoError(t, s.cfg.Chain.ReceiveBlock(ctx, block, blockRoot)) return nil @@ -418,7 +429,7 @@ func TestService_processBlockBatch(t *testing.T) { genesis := makeGenesisTime(32) t.Run("process non-linear batch", func(t *testing.T) { - var batch []interfaces.ReadOnlySignedBeaconBlock + var batch []blocks.BlockWithVerifiedBlobs currBlockRoot := genesisBlkRoot for i := primitives.Slot(1); i < 10; i++ { parentRoot := currBlockRoot @@ -430,11 +441,13 @@ func TestService_processBlockBatch(t *testing.T) { util.SaveBlock(t, context.Background(), beaconDB, blk1) wsb, err := blocks.NewSignedBeaconBlock(blk1) require.NoError(t, err) - batch = append(batch, wsb) + rowsb, err := blocks.NewROBlock(wsb) + require.NoError(t, err) + batch = append(batch, blocks.BlockWithVerifiedBlobs{Block: rowsb}) currBlockRoot = blk1Root } - var batch2 []interfaces.ReadOnlySignedBeaconBlock + var batch2 []blocks.BlockWithVerifiedBlobs for i := primitives.Slot(10); i < 20; i++ { parentRoot := currBlockRoot blk1 := util.NewBeaconBlock() @@ -445,26 +458,29 @@ func TestService_processBlockBatch(t *testing.T) { util.SaveBlock(t, context.Background(), beaconDB, blk1) wsb, err := blocks.NewSignedBeaconBlock(blk1) require.NoError(t, err) - batch2 = append(batch2, wsb) + rowsb, err := blocks.NewROBlock(wsb) + require.NoError(t, err) + batch2 = append(batch2, blocks.BlockWithVerifiedBlobs{Block: rowsb}) currBlockRoot = blk1Root } - // Process block normally. - err = s.processBatchedBlocks(ctx, genesis, batch, func( - ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock, blockRoots [][32]byte) error { - assert.NoError(t, s.cfg.Chain.ReceiveBlockBatch(ctx, blks, blockRoots)) + cbnormal := func(ctx context.Context, blks []blocks.ROBlock) error { + assert.NoError(t, s.cfg.Chain.ReceiveBlockBatch(ctx, blks)) return nil - }) + } + // Process block normally. + err = s.processBatchedBlocks(ctx, genesis, batch, cbnormal) assert.NoError(t, err) - // Duplicate processing should trigger error. - err = s.processBatchedBlocks(ctx, genesis, batch, func( - ctx context.Context, blocks []interfaces.ReadOnlySignedBeaconBlock, blockRoots [][32]byte) error { + cbnil := func(ctx context.Context, blocks []blocks.ROBlock) error { return nil - }) + } + + // Duplicate processing should trigger error. + err = s.processBatchedBlocks(ctx, genesis, batch, cbnil) assert.ErrorContains(t, "block is already processed", err) - var badBatch2 []interfaces.ReadOnlySignedBeaconBlock + var badBatch2 []blocks.BlockWithVerifiedBlobs for i, b := range batch2 { // create a non-linear batch if i%3 == 0 && i != 0 { @@ -474,19 +490,12 @@ func TestService_processBlockBatch(t *testing.T) { } // Bad batch should fail because it is non linear - err = s.processBatchedBlocks(ctx, genesis, badBatch2, func( - ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock, blockRoots [][32]byte) error { - return nil - }) + err = s.processBatchedBlocks(ctx, genesis, badBatch2, cbnil) expectedSubErr := "expected linear block list" assert.ErrorContains(t, expectedSubErr, err) // Continue normal processing, should proceed w/o errors. - err = s.processBatchedBlocks(ctx, genesis, batch2, func( - ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock, blockRoots [][32]byte) error { - assert.NoError(t, s.cfg.Chain.ReceiveBlockBatch(ctx, blks, blockRoots)) - return nil - }) + err = s.processBatchedBlocks(ctx, genesis, batch2, cbnormal) assert.NoError(t, err) assert.Equal(t, primitives.Slot(19), s.cfg.Chain.HeadSlot(), "Unexpected head slot") }) @@ -537,6 +546,8 @@ func TestService_blockProviderScoring(t *testing.T) { st, err := util.NewBeaconState() require.NoError(t, err) require.NoError(t, err) + gt := time.Now() + vr := [32]byte{} mc := &mock.ChainService{ State: st, Root: genesisRoot[:], @@ -545,14 +556,16 @@ func TestService_blockProviderScoring(t *testing.T) { Epoch: 0, Root: make([]byte, 32), }, - Genesis: time.Now(), - ValidatorsRoot: [32]byte{}, + Genesis: gt, + ValidatorsRoot: vr, } // no-op mock + clock := startup.NewClock(gt, vr) s := &Service{ ctx: context.Background(), cfg: &Config{Chain: mc, P2P: p, DB: beaconDB}, synced: abool.New(), chainStarted: abool.NewBool(true), + clock: clock, } scorer := s.cfg.P2P.Peers().Scorers().BlockProviderScorer() expectedBlockSlots := makeSequence(1, 160) @@ -601,6 +614,9 @@ func TestService_syncToFinalizedEpoch(t *testing.T) { st, err := util.NewBeaconState() require.NoError(t, err) + gt := time.Now() + vr := [32]byte{} + clock := startup.NewClock(gt, vr) mc := &mock.ChainService{ State: st, Root: genesisRoot[:], @@ -609,8 +625,8 @@ func TestService_syncToFinalizedEpoch(t *testing.T) { Epoch: 0, Root: make([]byte, 32), }, - Genesis: time.Now(), - ValidatorsRoot: [32]byte{}, + Genesis: gt, + ValidatorsRoot: vr, } s := &Service{ ctx: context.Background(), @@ -618,6 +634,7 @@ func TestService_syncToFinalizedEpoch(t *testing.T) { synced: abool.New(), chainStarted: abool.NewBool(true), counter: ratecounter.NewRateCounter(counterSeconds * time.Second), + clock: clock, } expectedBlockSlots := makeSequence(1, 191) currentSlot := primitives.Slot(191) diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 0cc94edf64..e9d101ab3c 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -35,7 +35,7 @@ type blockchainService interface { // Config to set up the initial sync service. type Config struct { P2P p2p.P2P - DB db.ReadOnlyDatabase + DB db.NoHeadAccessDatabase Chain blockchainService StateNotifier statefeed.Notifier BlockNotifier blockfeed.Notifier diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_range.go b/beacon-chain/sync/rpc_blob_sidecars_by_range.go index 6c98967a7c..f8475bb572 100644 --- a/beacon-chain/sync/rpc_blob_sidecars_by_range.go +++ b/beacon-chain/sync/rpc_blob_sidecars_by_range.go @@ -2,6 +2,7 @@ package sync import ( "context" + "math" "time" libp2pcore "github.com/libp2p/go-libp2p/core" @@ -110,7 +111,14 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa return nil } -func blobsByRangeMinStartSlot(current primitives.Slot) (primitives.Slot, error) { +// BlobsByRangeMinStartSlot returns the lowest slot that we should expect peers to respect as the +// start slot in a BlobSidecarsByRange request. This can be used to validate incoming requests and +// to avoid pestering peers with requests for blobs that are outside the retention window. +func BlobsByRangeMinStartSlot(current primitives.Slot) (primitives.Slot, error) { + // Avoid overflow if we're running on a config where deneb is set to far future epoch. + if params.BeaconConfig().DenebForkEpoch == math.MaxUint64 { + return primitives.Slot(math.MaxUint64), nil + } minReqEpochs := params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest currEpoch := slots.ToEpoch(current) minStart := params.BeaconConfig().DenebForkEpoch @@ -156,9 +164,9 @@ func validateBlobsByRange(r *pb.BlobSidecarsByRangeRequest, current primitives.S // [max(current_epoch - MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, DENEB_FORK_EPOCH), current_epoch] // where current_epoch is defined by the current wall-clock time, // and clients MUST support serving requests of blobs on this range. - minStartSlot, err := blobsByRangeMinStartSlot(current) + minStartSlot, err := BlobsByRangeMinStartSlot(current) if err != nil { - return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "blobsByRangeMinStartSlot error") + return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "BlobsByRangeMinStartSlot error") } if rp.start > maxStart { return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "start > maxStart") diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_range_test.go b/beacon-chain/sync/rpc_blob_sidecars_by_range_test.go index 8a4f4386dd..5db6033c92 100644 --- a/beacon-chain/sync/rpc_blob_sidecars_by_range_test.go +++ b/beacon-chain/sync/rpc_blob_sidecars_by_range_test.go @@ -177,7 +177,7 @@ func TestBlobsByRangeValidation(t *testing.T) { and clients MUST support serving requests of blobs on this range. */ defaultCurrent := denebSlot + 100 + minReqSlots - defaultMinStart, err := blobsByRangeMinStartSlot(defaultCurrent) + defaultMinStart, err := BlobsByRangeMinStartSlot(defaultCurrent) require.NoError(t, err) cases := []struct { name string diff --git a/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go b/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go index 93b9d5854e..edef14c9f4 100644 --- a/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go +++ b/beacon-chain/sync/rpc_blob_sidecars_by_root_test.go @@ -130,6 +130,11 @@ func TestReadChunkEncodedBlobs(t *testing.T) { nblocks: 1, streamReader: readChunkEncodedBlobsAsStreamReader, }, + { + name: "test peer sending excess blobs", + nblocks: 1, + streamReader: readChunkEncodedBlobsLowMax, + }, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { @@ -138,6 +143,20 @@ func TestReadChunkEncodedBlobs(t *testing.T) { } } +// Specifies a max expected chunk parameter of 1, so that a response with one or more blobs will give ErrInvalidFetchedData. +func readChunkEncodedBlobsLowMax(t *testing.T, s *Service, expect []*expectedBlobChunk) func(network.Stream) { + encoding := s.cfg.p2p.Encoding() + ctxMap, err := ContextByteVersionsForValRoot(s.cfg.clock.GenesisValidatorsRoot()) + require.NoError(t, err) + vf := func(sidecar *ethpb.BlobSidecar) error { + return nil + } + return func(stream network.Stream) { + _, err := readChunkEncodedBlobs(stream, encoding, ctxMap, vf, 1) + require.ErrorIs(t, err, ErrInvalidFetchedData) + } +} + func readChunkEncodedBlobsAsStreamReader(t *testing.T, s *Service, expect []*expectedBlobChunk) func(network.Stream) { encoding := s.cfg.p2p.Encoding() ctxMap, err := ContextByteVersionsForValRoot(s.cfg.clock.GenesisValidatorsRoot()) @@ -146,7 +165,7 @@ func readChunkEncodedBlobsAsStreamReader(t *testing.T, s *Service, expect []*exp return nil } return func(stream network.Stream) { - scs, err := readChunkEncodedBlobs(stream, encoding, ctxMap, vf) + scs, err := readChunkEncodedBlobs(stream, encoding, ctxMap, vf, params.BeaconNetworkConfig().MaxRequestBlobSidecars) require.NoError(t, err) require.Equal(t, len(expect), len(scs)) for i, sc := range scs { diff --git a/beacon-chain/sync/rpc_send_request.go b/beacon-chain/sync/rpc_send_request.go index b56c184bbc..2b7ab2b248 100644 --- a/beacon-chain/sync/rpc_send_request.go +++ b/beacon-chain/sync/rpc_send_request.go @@ -12,6 +12,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/encoder" p2ptypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" @@ -21,9 +22,15 @@ import ( "github.com/prysmaticlabs/prysm/v4/time/slots" ) -// ErrInvalidFetchedData is thrown if stream fails to provide requested blocks. +// ErrInvalidFetchedData is used to signal that an error occurred which should result in peer downscoring. var ErrInvalidFetchedData = errors.New("invalid data returned from peer") +var errMaxRequestBlobSidecarsExceeded = errors.Wrap(ErrInvalidFetchedData, "peer exceeded req blob chunk tx limit") +var errBlobChunkedReadFailure = errors.New("failed to read stream of chunk-encoded blobs") +var errBlobUnmarshal = errors.New("Could not unmarshal chunk-encoded blob") +var errUnrequestedRoot = errors.New("Received BlobSidecar in response that was not requested") +var errBlobResponseOutOfBounds = errors.New("received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds") + // BeaconBlockProcessor defines a block processing function, which allows to start utilizing // blocks even before all blocks are ready. type BeaconBlockProcessor func(block interfaces.ReadOnlySignedBeaconBlock) error @@ -135,8 +142,8 @@ func SendBeaconBlocksByRootRequest( return blocks, nil } -func SendBlobsByRangeRequest(ctx context.Context, ci blockchain.ForkFetcher, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *pb.BlobSidecarsByRangeRequest) ([]*pb.BlobSidecar, error) { - topic, err := p2p.TopicFromMessage(p2p.BlobSidecarsByRangeName, slots.ToEpoch(ci.CurrentSlot())) +func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *pb.BlobSidecarsByRangeRequest) ([]*pb.BlobSidecar, error) { + topic, err := p2p.TopicFromMessage(p2p.BlobSidecarsByRangeName, slots.ToEpoch(tor.CurrentSlot())) if err != nil { return nil, err } @@ -147,7 +154,11 @@ func SendBlobsByRangeRequest(ctx context.Context, ci blockchain.ForkFetcher, p2p } defer closeStream(stream, log) - return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRangeReq(req)) + max := params.BeaconNetworkConfig().MaxRequestBlobSidecars + if max > req.Count*fieldparams.MaxBlobsPerBlock { + max = req.Count * fieldparams.MaxBlobsPerBlock + } + return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRangeReq(req), max) } func SendBlobSidecarByRoot( @@ -169,14 +180,13 @@ func SendBlobSidecarByRoot( } defer closeStream(stream, log) - return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req)) + max := params.BeaconNetworkConfig().MaxRequestBlobSidecars + if max > uint64(len(*req))*fieldparams.MaxBlobsPerBlock { + max = uint64(len(*req)) * fieldparams.MaxBlobsPerBlock + } + return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req), max) } -var ErrBlobChunkedReadFailure = errors.New("failed to read stream of chunk-encoded blobs") -var ErrBlobUnmarshal = errors.New("Could not unmarshal chunk-encoded blob") -var ErrUnrequestedRoot = errors.New("Received BlobSidecar in response that was not requested") -var ErrBlobResponseOutOfBounds = errors.New("received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds") - type blobResponseValidation func(*pb.BlobSidecar) error func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseValidation { @@ -186,7 +196,7 @@ func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseV } return func(sc *pb.BlobSidecar) error { if requested := roots[bytesutil.ToBytes32(sc.BlockRoot)]; !requested { - return errors.Wrapf(ErrUnrequestedRoot, "root=%#x", sc.BlockRoot) + return errors.Wrapf(errUnrequestedRoot, "root=%#x", sc.BlockRoot) } return nil } @@ -196,52 +206,69 @@ func blobValidatorFromRangeReq(req *pb.BlobSidecarsByRangeRequest) blobResponseV end := req.StartSlot + primitives.Slot(req.Count) return func(sc *pb.BlobSidecar) error { if sc.Slot < req.StartSlot || sc.Slot >= end { - return errors.Wrapf(ErrBlobResponseOutOfBounds, "req start,end:%d,%d, resp:%d", req.StartSlot, end, sc.Slot) + return errors.Wrapf(errBlobResponseOutOfBounds, "req start,end:%d,%d, resp:%d", req.StartSlot, end, sc.Slot) } return nil } } -func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation) ([]*pb.BlobSidecar, error) { - decode := encoding.DecodeWithMaxLength - max := int(params.BeaconNetworkConfig().MaxRequestBlobSidecars) - var ( - code uint8 - msg string - err error - ) +func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation, max uint64) ([]*pb.BlobSidecar, error) { sidecars := make([]*pb.BlobSidecar, 0) - for i := 0; i < max; i++ { - code, msg, err = ReadStatusCode(stream, encoding) + // Attempt an extra read beyond max to check if the peer is violating the spec by + // sending more than MAX_REQUEST_BLOB_SIDECARS, or more blobs than requested. + for i := uint64(0); i < max+1; i++ { + sc, err := readChunkedBlobSidecar(stream, encoding, ctxMap, vf) if err != nil { - break + if errors.Is(err, io.EOF) { + break + } + return nil, err } - if code != 0 { - return nil, errors.Wrap(ErrBlobChunkedReadFailure, msg) - } - ctxb, err := readContextFromStream(stream) - if err != nil { - return nil, errors.Wrap(err, "error reading chunk context bytes from stream") - } - - v, found := ctxMap[bytesutil.ToBytes4(ctxb)] - if !found { - return nil, errors.Wrapf(ErrBlobUnmarshal, fmt.Sprintf("unrecognized fork digest %#x", ctxb)) - } - if v != version.Deneb { - return nil, fmt.Errorf("unexpected context bytes for deneb BlobSidecar, ctx=%#x, v=%s", ctxb, version.String(v)) - } - sc := &pb.BlobSidecar{} - if err := decode(stream, sc); err != nil { - return nil, errors.Wrap(err, "failed to decode the protobuf-encoded BlobSidecar message from RPC chunk stream") - } - if err := vf(sc); err != nil { - return nil, errors.Wrap(err, "validation failure decoding blob RPC response") + if i == max { + // We have read an extra sidecar beyond what the spec allows. Since this is a spec violation, we return + // an error that wraps ErrInvalidFetchedData. The part of the state machine that handles rpc peer scoring + // will downscore the peer if the request ends in an error that wraps that one. + return nil, errMaxRequestBlobSidecarsExceeded } sidecars = append(sidecars, sc) } - if !errors.Is(err, io.EOF) { - return nil, err - } + return sidecars, nil } + +func readChunkedBlobSidecar(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation) (*pb.BlobSidecar, error) { + decode := encoding.DecodeWithMaxLength + var ( + code uint8 + msg string + ) + code, msg, err := ReadStatusCode(stream, encoding) + if err != nil { + return nil, err + } + if code != 0 { + return nil, errors.Wrap(errBlobChunkedReadFailure, msg) + } + ctxb, err := readContextFromStream(stream) + if err != nil { + return nil, errors.Wrap(err, "error reading chunk context bytes from stream") + } + + v, found := ctxMap[bytesutil.ToBytes4(ctxb)] + if !found { + return nil, errors.Wrapf(errBlobUnmarshal, fmt.Sprintf("unrecognized fork digest %#x", ctxb)) + } + // Only deneb is supported at this time, because we lack a fork-spanning interface/union type for blobs. + if v != version.Deneb { + return nil, fmt.Errorf("unexpected context bytes for deneb BlobSidecar, ctx=%#x, v=%s", ctxb, version.String(v)) + } + sc := &pb.BlobSidecar{} + if err := decode(stream, sc); err != nil { + return nil, errors.Wrap(err, "failed to decode the protobuf-encoded BlobSidecar message from RPC chunk stream") + } + if err := vf(sc); err != nil { + return nil, errors.Wrap(err, "validation failure decoding blob RPC response") + } + + return sc, nil +} diff --git a/beacon-chain/sync/rpc_send_request_test.go b/beacon-chain/sync/rpc_send_request_test.go index ca4aadb6f3..36d9fb35ab 100644 --- a/beacon-chain/sync/rpc_send_request_test.go +++ b/beacon-chain/sync/rpc_send_request_test.go @@ -495,7 +495,7 @@ func TestBlobValidatorFromRootReq(t *testing.T) { name: "invalid", ids: []*ethpb.BlobIdentifier{{BlockRoot: validRoot}}, response: []*ethpb.BlobSidecar{{BlockRoot: invalidRoot}}, - err: ErrUnrequestedRoot, + err: errUnrequestedRoot, }, } for _, c := range cases { @@ -544,7 +544,7 @@ func TestBlobValidatorFromRangeReq(t *testing.T) { Count: 1, }, response: []*ethpb.BlobSidecar{{Slot: 9}}, - err: ErrBlobResponseOutOfBounds, + err: errBlobResponseOutOfBounds, }, { name: "invalid - after, count 1", @@ -553,7 +553,7 @@ func TestBlobValidatorFromRangeReq(t *testing.T) { Count: 1, }, response: []*ethpb.BlobSidecar{{Slot: 11}}, - err: ErrBlobResponseOutOfBounds, + err: errBlobResponseOutOfBounds, }, { name: "invalid - after, multi", @@ -562,7 +562,7 @@ func TestBlobValidatorFromRangeReq(t *testing.T) { Count: 10, }, response: []*ethpb.BlobSidecar{{Slot: 23}}, - err: ErrBlobResponseOutOfBounds, + err: errBlobResponseOutOfBounds, }, { name: "invalid - after, at boundary, multi", @@ -571,7 +571,7 @@ func TestBlobValidatorFromRangeReq(t *testing.T) { Count: 10, }, response: []*ethpb.BlobSidecar{{Slot: 20}}, - err: ErrBlobResponseOutOfBounds, + err: errBlobResponseOutOfBounds, }, } for _, c := range cases { diff --git a/consensus-types/blocks/roblock.go b/consensus-types/blocks/roblock.go index d63d948631..11f5292761 100644 --- a/consensus-types/blocks/roblock.go +++ b/consensus-types/blocks/roblock.go @@ -5,6 +5,7 @@ import ( "sort" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" + eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" ) // ROBlock is a value that embeds a ReadOnlySignedBeaconBlock along with its block root ([32]byte). @@ -73,3 +74,44 @@ func (s ROBlockSlice) Swap(i, j int) { func (s ROBlockSlice) Len() int { return len(s) } + +type BlockWithVerifiedBlobs struct { + Block ROBlock + Blobs []*eth.BlobSidecar +} + +type BlockWithVerifiedBlobsSlice []BlockWithVerifiedBlobs + +func (s BlockWithVerifiedBlobsSlice) ROBlocks() []ROBlock { + r := make([]ROBlock, len(s)) + for i := range s { + r[i] = s[i].Block + } + return r +} + +// Less reports whether the element with index i must sort before the element with index j. +// ROBlocks are ordered first by their slot, +// with a lexicographic sort of roots breaking ties for slots with duplicate blocks. +func (s BlockWithVerifiedBlobsSlice) Less(i, j int) bool { + si, sj := s[i].Block.Block().Slot(), s[j].Block.Block().Slot() + + // lower slot wins + if si != sj { + return si < sj + } + + // break slot tie lexicographically comparing roots byte for byte + ri, rj := s[i].Block.Root(), s[j].Block.Root() + return bytes.Compare(ri[:], rj[:]) < 0 +} + +// Swap swaps the elements with indexes i and j. +func (s BlockWithVerifiedBlobsSlice) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +// Len is the number of elements in the collection. +func (s BlockWithVerifiedBlobsSlice) Len() int { + return len(s) +} diff --git a/testing/util/BUILD.bazel b/testing/util/BUILD.bazel index 553926ffc7..d5bd7b1406 100644 --- a/testing/util/BUILD.bazel +++ b/testing/util/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "block.go", "capella_block.go", "capella_state.go", + "deneb.go", "deneb_state.go", "deposits.go", "helpers.go", @@ -55,7 +56,9 @@ go_library( "//testing/assertions:go_default_library", "//testing/require:go_default_library", "//time/slots:go_default_library", + "@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", + "@com_github_ethereum_go_ethereum//core/types:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_prysmaticlabs_go_bitfield//:go_default_library", "@com_github_sirupsen_logrus//:go_default_library", diff --git a/testing/util/deneb.go b/testing/util/deneb.go new file mode 100644 index 0000000000..b9a23c1db6 --- /dev/null +++ b/testing/util/deneb.go @@ -0,0 +1,113 @@ +package util + +import ( + "encoding/binary" + "math/big" + "testing" + + "github.com/ethereum/go-ethereum/common" + gethTypes "github.com/ethereum/go-ethereum/core/types" + fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + enginev1 "github.com/prysmaticlabs/prysm/v4/proto/engine/v1" + ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/testing/require" +) + +func GenerateTestDenebBlockWithSidecar(t *testing.T, parent [32]byte, slot primitives.Slot, nblobs int) (blocks.ROBlock, []*ethpb.BlobSidecar) { + // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). + stateRoot := bytesutil.PadTo([]byte("stateRoot"), fieldparams.RootLength) + receiptsRoot := bytesutil.PadTo([]byte("receiptsRoot"), fieldparams.RootLength) + logsBloom := bytesutil.PadTo([]byte("logs"), fieldparams.LogsBloomLength) + parentHash := bytesutil.PadTo([]byte("parentHash"), fieldparams.RootLength) + tx := gethTypes.NewTransaction( + 0, + common.HexToAddress("095e7baea6a6c7c4c2dfeb977efac326af552d87"), + big.NewInt(0), 0, big.NewInt(0), + nil, + ) + txs := []*gethTypes.Transaction{tx} + encodedBinaryTxs := make([][]byte, 1) + var err error + encodedBinaryTxs[0], err = txs[0].MarshalBinary() + require.NoError(t, err) + blockHash := bytesutil.ToBytes32([]byte("foo")) + payload := &enginev1.ExecutionPayloadDeneb{ + ParentHash: parentHash, + FeeRecipient: make([]byte, fieldparams.FeeRecipientLength), + StateRoot: stateRoot, + ReceiptsRoot: receiptsRoot, + LogsBloom: logsBloom, + PrevRandao: blockHash[:], + BlockNumber: 0, + GasLimit: 0, + GasUsed: 0, + Timestamp: 0, + ExtraData: make([]byte, 0), + BaseFeePerGas: bytesutil.PadTo([]byte("baseFeePerGas"), fieldparams.RootLength), + ExcessDataGas: 0, + BlockHash: blockHash[:], + Transactions: encodedBinaryTxs, + } + block := NewBeaconBlockDeneb() + block.Block.Body.ExecutionPayload = payload + block.Block.Slot = slot + block.Block.ParentRoot = parent[:] + commitments := make([][48]byte, nblobs) + block.Block.Body.BlobKzgCommitments = make([][]byte, nblobs) + for i := range commitments { + binary.LittleEndian.PutUint16(commitments[i][0:16], uint16(i)) + binary.LittleEndian.PutUint16(commitments[i][16:32], uint16(slot)) + block.Block.Body.BlobKzgCommitments[i] = commitments[i][:] + } + + root, err := block.Block.HashTreeRoot() + require.NoError(t, err) + + sidecars := make([]*ethpb.BlobSidecar, len(commitments)) + for i, c := range block.Block.Body.BlobKzgCommitments { + sidecars[i] = GenerateTestDenebBlobSidecar(root, block, i, c) + } + + sbb, err := blocks.NewSignedBeaconBlock(block) + require.NoError(t, err) + rob, err := blocks.NewROBlock(sbb) + require.NoError(t, err) + return rob, sidecars +} + +func GenerateTestDenebBlobSidecar(root [32]byte, block *ethpb.SignedBeaconBlockDeneb, index int, commitment []byte) *ethpb.BlobSidecar { + blob := make([]byte, fieldparams.BlobSize) + binary.LittleEndian.PutUint64(blob, uint64(index)) + sc := ðpb.BlobSidecar{ + BlockRoot: root[:], + Index: uint64(index), + Slot: block.Block.Slot, + BlockParentRoot: block.Block.ParentRoot, + ProposerIndex: block.Block.ProposerIndex, + Blob: blob, + KzgCommitment: commitment, + KzgProof: commitment, + } + return sc +} + +func ExtendBlocksPlusBlobs(t *testing.T, blks []blocks.ROBlock, size int) ([]blocks.ROBlock, []*ethpb.BlobSidecar) { + blobs := make([]*ethpb.BlobSidecar, 0) + if len(blks) == 0 { + blk, blb := GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 6) + blobs = append(blobs, blb...) + blks = append(blks, blk) + } + + for i := 0; i < size; i++ { + prev := blks[len(blks)-1] + blk, blb := GenerateTestDenebBlockWithSidecar(t, prev.Root(), prev.Block().Slot()+1, 6) + blobs = append(blobs, blb...) + blks = append(blks, blk) + } + + return blks, blobs +}