sync blobs in initial-sync (#12522)

This commit is contained in:
kasey
2023-07-25 16:57:01 -05:00
committed by Preston Van Loon
parent 48f1f69695
commit 3476112d63
32 changed files with 1163 additions and 368 deletions

View File

@@ -17,8 +17,6 @@ var (
errNilJustifiedCheckpoint = errors.New("nil justified checkpoint returned from state")
// errBlockDoesNotExist is returned when a block does not exist for a particular state summary.
errBlockDoesNotExist = errors.New("could not find block in DB")
// errWrongBlockCount is returned when the wrong number of blocks or block roots is used
errWrongBlockCount = errors.New("wrong number of blocks or block roots")
// errBlockNotFoundInCacheOrDB is returned when a block is not found in the cache or DB.
errBlockNotFoundInCacheOrDB = errors.New("block not found in cache or db")
// errWSBlockNotFound is returned when a block is not found in the WS cache or DB.

View File

@@ -73,7 +73,7 @@ func logBlockSyncStatus(block interfaces.ReadOnlyBeaconBlock, blockRoot [32]byte
level := log.Logger.GetLevel()
if level >= logrus.DebugLevel {
parentRoot := block.ParentRoot()
log.WithFields(logrus.Fields{
lf := logrus.Fields{
"slot": block.Slot(),
"slotInEpoch": block.Slot() % params.BeaconConfig().SlotsPerEpoch,
"block": fmt.Sprintf("0x%s...", hex.EncodeToString(blockRoot[:])[:8]),
@@ -87,7 +87,12 @@ func logBlockSyncStatus(block interfaces.ReadOnlyBeaconBlock, blockRoot [32]byte
"sinceSlotStartTime": prysmTime.Now().Sub(startTime),
"chainServiceProcessedTime": prysmTime.Now().Sub(receivedTime),
"deposits": len(block.Body().Deposits()),
}).Debug("Synced new block")
}
commits, err := block.Body().BlobKzgCommitments()
if err == nil {
lf["commitments"] = len(commits)
}
log.WithFields(lf).Debug("Synced new block")
} else {
log.WithFields(logrus.Fields{
"slot": block.Slot(),

View File

@@ -160,19 +160,14 @@ func getStateVersionAndPayload(st state.BeaconState) (int, interfaces.ExecutionD
return preStateVersion, preStateHeader, nil
}
func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock,
blockRoots [][32]byte) error {
func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlock) error {
ctx, span := trace.StartSpan(ctx, "blockChain.onBlockBatch")
defer span.End()
if len(blks) == 0 || len(blockRoots) == 0 {
if len(blks) == 0 {
return errors.New("no blocks provided")
}
if len(blks) != len(blockRoots) {
return errWrongBlockCount
}
if err := consensusblocks.BeaconBlockIsNil(blks[0]); err != nil {
return invalidBlock{error: err}
}
@@ -222,7 +217,7 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySi
}
// Save potential boundary states.
if slots.IsEpochStart(preState.Slot()) {
boundaries[blockRoots[i]] = preState.Copy()
boundaries[b.Root()] = preState.Copy()
}
jCheckpoints[i] = preState.CurrentJustifiedCheckpoint()
fCheckpoints[i] = preState.FinalizedCheckpoint()
@@ -255,11 +250,12 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySi
pendingNodes := make([]*forkchoicetypes.BlockAndCheckpoints, len(blks))
var isValidPayload bool
for i, b := range blks {
root := b.Root()
isValidPayload, err = s.notifyNewPayload(ctx,
postVersionAndHeaders[i].version,
postVersionAndHeaders[i].header, b)
if err != nil {
return s.handleInvalidExecutionError(ctx, err, blockRoots[i], b.Block().ParentRoot())
return s.handleInvalidExecutionError(ctx, err, root, b.Block().ParentRoot())
}
if isValidPayload {
if err := s.validateMergeTransitionBlock(ctx, preVersionAndHeaders[i].version,
@@ -271,13 +267,13 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySi
JustifiedCheckpoint: jCheckpoints[i],
FinalizedCheckpoint: fCheckpoints[i]}
pendingNodes[len(blks)-i-1] = args
if err := s.saveInitSyncBlock(ctx, blockRoots[i], b); err != nil {
if err := s.saveInitSyncBlock(ctx, root, b); err != nil {
tracing.AnnotateError(span, err)
return err
}
if err := s.cfg.BeaconDB.SaveStateSummary(ctx, &ethpb.StateSummary{
Slot: b.Block().Slot(),
Root: blockRoots[i][:],
Root: root[:],
}); err != nil {
tracing.AnnotateError(span, err)
return err
@@ -301,8 +297,9 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySi
return err
}
}
lastB := blks[len(blks)-1]
lastBR := lastB.Root()
// Also saves the last post state which to be used as pre state for the next batch.
lastBR := blockRoots[len(blks)-1]
if err := s.cfg.StateGen.SaveState(ctx, lastBR, preState); err != nil {
return err
}
@@ -320,7 +317,6 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySi
return errors.Wrap(err, "could not set optimistic block to valid")
}
}
lastB := blks[len(blks)-1]
arg := &notifyForkchoiceUpdateArg{
headState: preState,
headRoot: lastBR,

View File

@@ -49,8 +49,7 @@ func TestStore_OnBlockBatch(t *testing.T) {
require.NoError(t, service.saveGenesisData(ctx, st))
bState := st.Copy()
var blks []interfaces.ReadOnlySignedBeaconBlock
var blkRoots [][32]byte
var blks []consensusblocks.ROBlock
for i := 0; i < 97; i++ {
b, err := util.GenerateFullBlock(bState, keys, util.DefaultBlockGenConfig(), primitives.Slot(i))
require.NoError(t, err)
@@ -63,16 +62,15 @@ func TestStore_OnBlockBatch(t *testing.T) {
require.NoError(t, service.saveInitSyncBlock(ctx, root, wsb))
wsb, err = consensusblocks.NewSignedBeaconBlock(b)
require.NoError(t, err)
blks = append(blks, wsb)
blkRoots = append(blkRoots, root)
rwsb, err := consensusblocks.NewROBlock(wsb)
require.NoError(t, err)
blks = append(blks, rwsb)
}
err := service.onBlockBatch(ctx, blks, blkRoots[1:])
require.ErrorIs(t, errWrongBlockCount, err)
err = service.onBlockBatch(ctx, blks, blkRoots)
err := service.onBlockBatch(ctx, blks)
require.NoError(t, err)
jcp := service.CurrentJustifiedCheckpt()
jroot := bytesutil.ToBytes32(jcp.Root)
require.Equal(t, blkRoots[63], jroot)
require.Equal(t, blks[63].Root(), jroot)
require.Equal(t, primitives.Epoch(2), service.cfg.ForkChoiceStore.JustifiedCheckpoint().Epoch)
}
@@ -84,8 +82,7 @@ func TestStore_OnBlockBatch_NotifyNewPayload(t *testing.T) {
require.NoError(t, service.saveGenesisData(ctx, st))
bState := st.Copy()
var blks []interfaces.ReadOnlySignedBeaconBlock
var blkRoots [][32]byte
var blks []consensusblocks.ROBlock
blkCount := 4
for i := 0; i <= blkCount; i++ {
b, err := util.GenerateFullBlock(bState, keys, util.DefaultBlockGenConfig(), primitives.Slot(i))
@@ -94,13 +91,12 @@ func TestStore_OnBlockBatch_NotifyNewPayload(t *testing.T) {
require.NoError(t, err)
bState, err = transition.ExecuteStateTransition(ctx, bState, wsb)
require.NoError(t, err)
root, err := b.Block.HashTreeRoot()
rwsb, err := consensusblocks.NewROBlock(wsb)
require.NoError(t, err)
require.NoError(t, service.saveInitSyncBlock(ctx, root, wsb))
blks = append(blks, wsb)
blkRoots = append(blkRoots, root)
require.NoError(t, service.saveInitSyncBlock(ctx, rwsb.Root(), wsb))
blks = append(blks, rwsb)
}
require.NoError(t, service.onBlockBatch(ctx, blks, blkRoots))
require.NoError(t, service.onBlockBatch(ctx, blks))
}
func TestCachedPreState_CanGetFromStateSummary(t *testing.T) {
@@ -1932,8 +1928,10 @@ func TestNoViableHead_Reboot(t *testing.T) {
require.NoError(t, err)
root, err = b.Block.HashTreeRoot()
require.NoError(t, err)
rwsb, err := consensusblocks.NewROBlock(wsb)
require.NoError(t, err)
// We use onBlockBatch here because the valid chain is missing in forkchoice
require.NoError(t, service.onBlockBatch(ctx, []interfaces.ReadOnlySignedBeaconBlock{wsb}, [][32]byte{root}))
require.NoError(t, service.onBlockBatch(ctx, []consensusblocks.ROBlock{rwsb}))
// Check that the head is now VALID and the node is not optimistic
require.Equal(t, genesisRoot, service.ensureRootNotZeros(service.cfg.ForkChoiceStore.CachedHeadRoot()))
headRoot, err = service.HeadRoot(ctx)

View File

@@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/config/features"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
@@ -32,7 +33,7 @@ var epochsSinceFinalitySaveHotStateDB = primitives.Epoch(100)
// BlockReceiver interface defines the methods of chain service for receiving and processing new blocks.
type BlockReceiver interface {
ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error
ReceiveBlockBatch(ctx context.Context, blocks []interfaces.ReadOnlySignedBeaconBlock, blkRoots [][32]byte) error
ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock) error
HasBlock(ctx context.Context, root [32]byte) bool
RecentBlockSlot(root [32]byte) (primitives.Slot, error)
}
@@ -172,7 +173,7 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
// ReceiveBlockBatch processes the whole block batch at once, assuming the block batch is linear ,transitioning
// the state, performing batch verification of all collected signatures and then performing the appropriate
// actions for a block post-transition.
func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []interfaces.ReadOnlySignedBeaconBlock, blkRoots [][32]byte) error {
func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []blocks.ROBlock) error {
ctx, span := trace.StartSpan(ctx, "blockChain.ReceiveBlockBatch")
defer span.End()
@@ -180,20 +181,21 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []interfaces.Rea
defer s.cfg.ForkChoiceStore.Unlock()
// Apply state transition on the incoming newly received block batches, one by one.
if err := s.onBlockBatch(ctx, blocks, blkRoots); err != nil {
if err := s.onBlockBatch(ctx, blocks); err != nil {
err := errors.Wrap(err, "could not process block in batch")
tracing.AnnotateError(span, err)
return err
}
lastBR := blkRoots[len(blkRoots)-1]
lastBR := blocks[len(blocks)-1].Root()
optimistic, err := s.cfg.ForkChoiceStore.IsOptimistic(lastBR)
if err != nil {
lastSlot := blocks[len(blocks)-1].Block().Slot()
log.WithError(err).Errorf("Could not check if block is optimistic, Root: %#x, Slot: %d", lastBR, lastSlot)
optimistic = true
}
for i, b := range blocks {
for _, b := range blocks {
blockCopy, err := b.Copy()
if err != nil {
return err
@@ -203,7 +205,7 @@ func (s *Service) ReceiveBlockBatch(ctx context.Context, blocks []interfaces.Rea
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: blockCopy.Block().Slot(),
BlockRoot: blkRoots[i],
BlockRoot: b.Root(),
SignedBlock: blockCopy,
Verified: true,
Optimistic: optimistic,

View File

@@ -10,7 +10,6 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
@@ -228,13 +227,11 @@ func TestService_ReceiveBlockBatch(t *testing.T) {
s, _ := minimalTestService(t, WithStateNotifier(&blockchainTesting.MockStateNotifier{RecordEvents: true}))
err := s.saveGenesisData(ctx, genesis)
require.NoError(t, err)
root, err := tt.args.block.Block.HashTreeRoot()
require.NoError(t, err)
wsb, err := blocks.NewSignedBeaconBlock(tt.args.block)
require.NoError(t, err)
blks := []interfaces.ReadOnlySignedBeaconBlock{wsb}
roots := [][32]byte{root}
err = s.ReceiveBlockBatch(ctx, blks, roots)
rwsb, err := blocks.NewROBlock(wsb)
require.NoError(t, err)
err = s.ReceiveBlockBatch(ctx, []blocks.ROBlock{rwsb})
if tt.wantedErr != "" {
assert.ErrorContains(t, tt.wantedErr, err)
} else {

View File

@@ -23,6 +23,7 @@ go_library(
"//beacon-chain/state/state-native:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//encoding/bytesutil:go_default_library",

View File

@@ -22,6 +22,7 @@ import (
state_native "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/state-native"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
@@ -204,7 +205,7 @@ func (s *ChainService) ReceiveBlockInitialSync(ctx context.Context, block interf
}
// ReceiveBlockBatch processes blocks in batches from initial-sync.
func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock, _ [][32]byte) error {
func (s *ChainService) ReceiveBlockBatch(ctx context.Context, blks []blocks.ROBlock) error {
if s.State == nil {
return ErrNilState
}

View File

@@ -71,6 +71,7 @@ go_library(
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_client_go//tools/cache:go_default_library",
"@io_opencensus_go//trace:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
],
)

View File

@@ -28,6 +28,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"google.golang.org/protobuf/proto"
)
var (
@@ -582,7 +583,10 @@ func (s *Service) ReconstructFullBlock(
// If the payload header has a block hash of 0x0, it means we are pre-merge and should
// simply return the block with an empty execution payload.
if bytes.Equal(header.BlockHash(), params.BeaconConfig().ZeroHash[:]) {
payload := buildEmptyExecutionPayload()
payload, err := buildEmptyExecutionPayload(blindedBlock.Version())
if err != nil {
return nil, err
}
return blocks.BuildSignedBeaconBlockFromExecutionPayload(blindedBlock, payload)
}
@@ -641,7 +645,11 @@ func (s *Service) ReconstructFullBellatrixBlockBatch(
// For blocks that are pre-merge we simply reconstruct them via an empty
// execution payload.
for _, realIdx := range zeroExecPayloads {
payload := buildEmptyExecutionPayload()
bblock := blindedBlocks[realIdx]
payload, err := buildEmptyExecutionPayload(bblock.Version())
if err != nil {
return nil, err
}
fullBlock, err := blocks.BuildSignedBeaconBlockFromExecutionPayload(blindedBlocks[realIdx], payload)
if err != nil {
return nil, err
@@ -677,7 +685,7 @@ func (s *Service) retrievePayloadFromExecutionHash(ctx context.Context, executio
}
executionBlock.Version = version
return fullPayloadFromExecutionBlock(header, executionBlock)
return fullPayloadFromExecutionBlock(version, header, executionBlock)
}
func (s *Service) retrievePayloadsFromExecutionHashes(
@@ -705,6 +713,7 @@ func (s *Service) retrievePayloadsFromExecutionHashes(
// blinded block.
for sliceIdx, realIdx := range validExecPayloads {
var payload interfaces.ExecutionData
bblock := blindedBlocks[realIdx]
if features.Get().EnableOptionalEngineMethods {
b := payloadBodies[sliceIdx]
if b == nil {
@@ -727,7 +736,7 @@ func (s *Service) retrievePayloadsFromExecutionHashes(
if err != nil {
return nil, err
}
payload, err = fullPayloadFromExecutionBlock(header, b)
payload, err = fullPayloadFromExecutionBlock(bblock.Version(), header, b)
if err != nil {
return nil, err
}
@@ -742,7 +751,7 @@ func (s *Service) retrievePayloadsFromExecutionHashes(
}
func fullPayloadFromExecutionBlock(
header interfaces.ExecutionData, block *pb.ExecutionBlock,
blockVersion int, header interfaces.ExecutionData, block *pb.ExecutionBlock,
) (interfaces.ExecutionData, error) {
if header.IsNil() || block == nil {
return nil, errors.New("execution block and header cannot be nil")
@@ -765,7 +774,7 @@ func fullPayloadFromExecutionBlock(
txs[i] = txBin
}
switch block.Version {
switch blockVersion {
case version.Bellatrix:
return blocks.WrappedExecutionPayload(&pb.ExecutionPayload{
ParentHash: header.ParentHash(),
@@ -997,18 +1006,51 @@ func tDStringToUint256(td string) (*uint256.Int, error) {
return i, nil
}
func buildEmptyExecutionPayload() *pb.ExecutionPayload {
return &pb.ExecutionPayload{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
Transactions: make([][]byte, 0),
ExtraData: make([]byte, 0),
func buildEmptyExecutionPayload(v int) (proto.Message, error) {
switch v {
case version.Bellatrix:
return &pb.ExecutionPayload{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
Transactions: make([][]byte, 0),
ExtraData: make([]byte, 0),
}, nil
case version.Capella:
return &pb.ExecutionPayloadCapella{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
Transactions: make([][]byte, 0),
ExtraData: make([]byte, 0),
Withdrawals: make([]*pb.Withdrawal, 0),
}, nil
case version.Deneb:
return &pb.ExecutionPayloadDeneb{
ParentHash: make([]byte, fieldparams.RootLength),
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: make([]byte, fieldparams.RootLength),
ReceiptsRoot: make([]byte, fieldparams.RootLength),
LogsBloom: make([]byte, fieldparams.LogsBloomLength),
PrevRandao: make([]byte, fieldparams.RootLength),
BaseFeePerGas: make([]byte, fieldparams.RootLength),
BlockHash: make([]byte, fieldparams.RootLength),
Transactions: make([][]byte, 0),
ExtraData: make([]byte, 0),
Withdrawals: make([]*pb.Withdrawal, 0),
}, nil
default:
return nil, errors.Wrapf(ErrUnsupportedVersion, "version=%s", version.String(v))
}
}

View File

@@ -1604,8 +1604,9 @@ func fixtures() map[string]interface{} {
func Test_fullPayloadFromExecutionBlock(t *testing.T) {
type args struct {
header *pb.ExecutionPayloadHeader
block *pb.ExecutionBlock
header *pb.ExecutionPayloadHeader
block *pb.ExecutionBlock
version int
}
wantedHash := common.BytesToHash([]byte("foo"))
tests := []struct {
@@ -1621,9 +1622,9 @@ func Test_fullPayloadFromExecutionBlock(t *testing.T) {
BlockHash: []byte("foo"),
},
block: &pb.ExecutionBlock{
Version: version.Bellatrix,
Hash: common.BytesToHash([]byte("bar")),
Hash: common.BytesToHash([]byte("bar")),
},
version: version.Bellatrix,
},
err: "does not match execution block hash",
},
@@ -1634,9 +1635,9 @@ func Test_fullPayloadFromExecutionBlock(t *testing.T) {
BlockHash: wantedHash[:],
},
block: &pb.ExecutionBlock{
Version: version.Bellatrix,
Hash: wantedHash,
Hash: wantedHash,
},
version: version.Bellatrix,
},
want: func() interfaces.ExecutionData {
p, err := blocks.WrappedExecutionPayload(&pb.ExecutionPayload{
@@ -1652,7 +1653,7 @@ func Test_fullPayloadFromExecutionBlock(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
wrapped, err := blocks.WrappedExecutionPayloadHeader(tt.args.header)
require.NoError(t, err)
got, err := fullPayloadFromExecutionBlock(wrapped, tt.args.block)
got, err := fullPayloadFromExecutionBlock(tt.args.version, wrapped, tt.args.block)
if err != nil {
assert.ErrorContains(t, tt.err, err)
} else {
@@ -1664,8 +1665,9 @@ func Test_fullPayloadFromExecutionBlock(t *testing.T) {
func Test_fullPayloadFromExecutionBlockCapella(t *testing.T) {
type args struct {
header *pb.ExecutionPayloadHeaderCapella
block *pb.ExecutionBlock
header *pb.ExecutionPayloadHeaderCapella
block *pb.ExecutionBlock
version int
}
wantedHash := common.BytesToHash([]byte("foo"))
tests := []struct {
@@ -1681,9 +1683,9 @@ func Test_fullPayloadFromExecutionBlockCapella(t *testing.T) {
BlockHash: []byte("foo"),
},
block: &pb.ExecutionBlock{
Version: version.Capella,
Hash: common.BytesToHash([]byte("bar")),
Hash: common.BytesToHash([]byte("bar")),
},
version: version.Capella,
},
err: "does not match execution block hash",
},
@@ -1694,9 +1696,9 @@ func Test_fullPayloadFromExecutionBlockCapella(t *testing.T) {
BlockHash: wantedHash[:],
},
block: &pb.ExecutionBlock{
Version: version.Capella,
Hash: wantedHash,
Hash: wantedHash,
},
version: version.Capella,
},
want: func() interfaces.ExecutionData {
p, err := blocks.WrappedExecutionPayloadCapella(&pb.ExecutionPayloadCapella{
@@ -1712,7 +1714,7 @@ func Test_fullPayloadFromExecutionBlockCapella(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
wrapped, err := blocks.WrappedExecutionPayloadHeaderCapella(tt.args.header, 0)
require.NoError(t, err)
got, err := fullPayloadFromExecutionBlock(wrapped, tt.args.block)
got, err := fullPayloadFromExecutionBlock(tt.args.version, wrapped, tt.args.block)
if err != nil {
assert.ErrorContains(t, tt.err, err)
} else {
@@ -1724,8 +1726,9 @@ func Test_fullPayloadFromExecutionBlockCapella(t *testing.T) {
func Test_fullPayloadFromExecutionBlockDeneb(t *testing.T) {
type args struct {
header *pb.ExecutionPayloadHeaderDeneb
block *pb.ExecutionBlock
header *pb.ExecutionPayloadHeaderDeneb
block *pb.ExecutionBlock
version int
}
wantedHash := common.BytesToHash([]byte("foo"))
tests := []struct {
@@ -1741,9 +1744,9 @@ func Test_fullPayloadFromExecutionBlockDeneb(t *testing.T) {
BlockHash: []byte("foo"),
},
block: &pb.ExecutionBlock{
Version: version.Deneb,
Hash: common.BytesToHash([]byte("bar")),
Hash: common.BytesToHash([]byte("bar")),
},
version: version.Deneb,
},
err: "does not match execution block hash",
},
@@ -1754,9 +1757,9 @@ func Test_fullPayloadFromExecutionBlockDeneb(t *testing.T) {
BlockHash: wantedHash[:],
},
block: &pb.ExecutionBlock{
Version: version.Deneb,
Hash: wantedHash,
Hash: wantedHash,
},
version: version.Deneb,
},
want: func() interfaces.ExecutionData {
p, err := blocks.WrappedExecutionPayloadDeneb(&pb.ExecutionPayloadDeneb{
@@ -1772,7 +1775,7 @@ func Test_fullPayloadFromExecutionBlockDeneb(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
wrapped, err := blocks.WrappedExecutionPayloadHeaderDeneb(tt.args.header, 0)
require.NoError(t, err)
got, err := fullPayloadFromExecutionBlock(wrapped, tt.args.block)
got, err := fullPayloadFromExecutionBlock(tt.args.version, wrapped, tt.args.block)
if err != nil {
assert.ErrorContains(t, tt.err, err)
} else {

View File

@@ -36,4 +36,6 @@ var (
ErrNilResponse = errors.New("nil response")
// ErrRequestTooLarge when the request is too large
ErrRequestTooLarge = errors.New("request too large")
// ErrUnsupportedVersion represents a case where a payload is requested for a block type that doesn't have a known mapping.
ErrUnsupportedVersion = errors.New("unknown ExecutionPayload schema for block version")
)

View File

@@ -29,10 +29,13 @@ go_library(
"//beacon-chain/sync:go_default_library",
"//cmd/beacon-chain/flags:go_default_library",
"//config/params:go_default_library",
"//consensus-types:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//container/leaky-bucket:go_default_library",
"//crypto/rand:go_default_library",
"//encoding/bytesutil:go_default_library",
"//math:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime:go_default_library",
@@ -50,6 +53,7 @@ go_test(
name = "go_raceon_test",
srcs = [
"blocks_fetcher_test.go",
"blocks_fetcher_utils_test.go",
"blocks_queue_test.go",
"fsm_test.go",
"initial_sync_test.go",

View File

@@ -3,6 +3,8 @@ package initialsync
import (
"context"
"fmt"
"sort"
"strings"
"sync"
"time"
@@ -11,15 +13,20 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
p2pTypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
prysmsync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v4/config/params"
consensus_types "github.com/prysmaticlabs/prysm/v4/consensus-types"
blocks2 "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
leakybucket "github.com/prysmaticlabs/prysm/v4/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v4/crypto/rand"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/math"
p2ppb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
@@ -60,6 +67,8 @@ var blockLimiterPeriod = 30 * time.Second
// blocksFetcherConfig is a config to setup the block fetcher.
type blocksFetcherConfig struct {
clock *startup.Clock
ctxMap prysmsync.ContextByteVersions
chain blockchainService
p2p p2p.P2P
db db.ReadOnlyDatabase
@@ -76,6 +85,8 @@ type blocksFetcher struct {
cancel context.CancelFunc
rand *rand.Rand
chain blockchainService
clock *startup.Clock
ctxMap prysmsync.ContextByteVersions
p2p p2p.P2P
db db.ReadOnlyDatabase
blocksPerPeriod uint64
@@ -104,11 +115,11 @@ type fetchRequestParams struct {
// fetchRequestResponse is a combined type to hold results of both successful executions and errors.
// Valid usage pattern will be to check whether result's `err` is nil, before using `blocks`.
type fetchRequestResponse struct {
pid peer.ID
start primitives.Slot
count uint64
blocks []interfaces.ReadOnlySignedBeaconBlock
err error
pid peer.ID
start primitives.Slot
count uint64
bwb []blocks2.BlockWithVerifiedBlobs
err error
}
// newBlocksFetcher creates ready to use fetcher.
@@ -131,6 +142,8 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
cancel: cancel,
rand: rand.NewGenerator(),
chain: cfg.chain,
clock: cfg.clock,
ctxMap: cfg.ctxMap,
p2p: cfg.p2p,
db: cfg.db,
blocksPerPeriod: uint64(blocksPerPeriod),
@@ -246,10 +259,10 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot
defer span.End()
response := &fetchRequestResponse{
start: start,
count: count,
blocks: []interfaces.ReadOnlySignedBeaconBlock{},
err: nil,
start: start,
count: count,
bwb: []blocks2.BlockWithVerifiedBlobs{},
err: nil,
}
if ctx.Err() != nil {
@@ -273,7 +286,14 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot
}
}
response.blocks, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
response.bwb, response.pid, response.err = f.fetchBlocksFromPeer(ctx, start, count, peers)
if response.err == nil {
bwb, err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid)
if err != nil {
response.err = err
}
response.bwb = bwb
}
return response
}
@@ -282,7 +302,7 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
ctx context.Context,
start primitives.Slot, count uint64,
peers []peer.ID,
) ([]interfaces.ReadOnlySignedBeaconBlock, peer.ID, error) {
) ([]blocks2.BlockWithVerifiedBlobs, peer.ID, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlocksFromPeer")
defer span.End()
@@ -293,17 +313,183 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
Step: 1,
}
for i := 0; i < len(peers); i++ {
blocks, err := f.requestBlocks(ctx, req, peers[i])
if err == nil {
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(peers[i])
return blocks, peers[i], nil
} else {
log.WithError(err).Debug("Could not request blocks by range")
p := peers[i]
blocks, err := f.requestBlocks(ctx, req, p)
if err != nil {
log.WithField("peer", p).WithError(err).Debug("Could not request blocks by range from peer")
continue
}
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(p)
robs, err := sortedBlockWithVerifiedBlobSlice(blocks)
if err != nil {
log.WithField("peer", p).WithError(err).Debug("invalid BeaconBlocksByRange response")
continue
}
return robs, p, err
}
return nil, "", errNoPeersAvailable
}
func sortedBlockWithVerifiedBlobSlice(blocks []interfaces.ReadOnlySignedBeaconBlock) ([]blocks2.BlockWithVerifiedBlobs, error) {
rb := make([]blocks2.BlockWithVerifiedBlobs, len(blocks))
for i, b := range blocks {
ro, err := blocks2.NewROBlock(b)
if err != nil {
return nil, err
}
rb[i] = blocks2.BlockWithVerifiedBlobs{Block: ro}
}
sort.Sort(blocks2.BlockWithVerifiedBlobsSlice(rb))
return rb, nil
}
func blobRequest(bwb []blocks2.BlockWithVerifiedBlobs, blobWindowStart primitives.Slot) *p2ppb.BlobSidecarsByRangeRequest {
if len(bwb) == 0 {
return nil
}
// Short-circuit if the highest block is before the deneb start epoch or retention period start.
// This assumes blocks are sorted by sortedBlockWithVerifiedBlobSlice.
highest := bwb[len(bwb)-1].Block.Block().Slot()
// bwb is sorted by slot, so if the last element is outside the retention window, no blobs are needed.
if highest < blobWindowStart {
return nil
}
lowest := lowestSlotNeedsBlob(blobWindowStart, bwb)
if lowest == nil {
return nil
}
return &p2ppb.BlobSidecarsByRangeRequest{
StartSlot: *lowest,
Count: uint64(highest.SubSlot(*lowest)) + 1,
}
}
func lowestSlotNeedsBlob(retentionStart primitives.Slot, bwb []blocks2.BlockWithVerifiedBlobs) *primitives.Slot {
i := sort.Search(len(bwb), func(i int) bool {
if bwb[i].Block.Block().Slot() < retentionStart {
return false
}
commits, err := bwb[i].Block.Block().Body().BlobKzgCommitments()
if err != nil || len(commits) == 0 {
return false
}
return true
})
if i >= len(bwb) {
return nil
}
s := bwb[i].Block.Block().Slot()
return &s
}
func sortBlobs(blobs []*p2ppb.BlobSidecar) []*p2ppb.BlobSidecar {
sort.Slice(blobs, func(i, j int) bool {
if blobs[i].Slot == blobs[j].Slot {
return blobs[i].Index < blobs[j].Index
}
return blobs[i].Slot < blobs[j].Slot
})
return blobs
}
var errBlobVerification = errors.New("peer unable to serve aligned BlobSidecarsByRange and BeaconBlockSidecarsByRange responses")
var errMissingBlobsForBlockCommitments = errors.Wrap(errBlobVerification, "blobs unavailable for processing block with kzg commitments")
var errMismatchedBlobBlockRoot = errors.Wrap(errBlobVerification, "BlockRoot in BlobSidecar does not match the expected root")
var errMissingBlobIndex = errors.Wrap(errBlobVerification, "missing expected blob index")
var errMismatchedBlobCommitments = errors.Wrap(errBlobVerification, "commitments at given slot, root and index do not match")
func verifyAndPopulateBlobs(bwb []blocks2.BlockWithVerifiedBlobs, blobs []*p2ppb.BlobSidecar, blobWindowStart primitives.Slot) ([]blocks2.BlockWithVerifiedBlobs, error) {
// Assumes bwb has already been sorted by sortedBlockWithVerifiedBlobSlice.
blobs = sortBlobs(blobs)
blobi := 0
// Loop over all blocks, and each time a commitment is observed, advance the index into the blob slice.
// The assumption is that the blob slice contains a value for every commitment in the blocks it is based on,
// correctly ordered by slot and blob index.
for i, bb := range bwb {
block := bb.Block.Block()
if block.Slot() < blobWindowStart {
continue
}
commits, err := block.Body().BlobKzgCommitments()
if err != nil {
if errors.Is(err, consensus_types.ErrUnsupportedField) {
log.
WithField("block_slot", block.Slot()).
WithField("retention_start", blobWindowStart).
Warn("block with slot within blob retention period has version which does not support commitments")
continue
}
return nil, err
}
bb.Blobs = make([]*p2ppb.BlobSidecar, len(commits))
for ci := range commits {
// There are more expected commitments in this block, but we've run out of blobs from the response
// (out-of-bound error guard).
if blobi == len(blobs) {
return nil, missingCommitError(bb.Block.Root(), commits[ci:])
}
bl := blobs[blobi]
if bl.Slot != block.Slot() {
return nil, missingCommitError(bb.Block.Root(), commits[ci:])
}
if bytesutil.ToBytes32(bl.BlockRoot) != bb.Block.Root() {
return nil, errors.Wrapf(errMismatchedBlobBlockRoot,
"block root %#x != BlobSidecar.BlockRoot %#x at slot %d", bb.Block.Root(), bl.BlockRoot, block.Slot())
}
if ci != int(bl.Index) {
return nil, errors.Wrapf(errMissingBlobIndex,
"did not receive blob index %d for block root %#x at slot %d", ci, bb.Block.Root(), block.Slot())
}
ec := bytesutil.ToBytes48(commits[ci])
ac := bytesutil.ToBytes48(bl.KzgCommitment)
if ec != ac {
return nil, errors.Wrapf(errMismatchedBlobCommitments,
"commitment %#x != block commitment %#x, at index %d for block root %#x at slot %d ",
ac, ec, bl.Index, bb.Block.Root(), block.Slot())
}
bb.Blobs[ci] = bl
blobi += 1
}
bwb[i] = bb
}
return bwb, nil
}
func missingCommitError(root [32]byte, missing [][]byte) error {
missStr := make([]string, len(missing))
for k := range missing {
missStr = append(missStr, fmt.Sprintf("%#x", k))
}
return errors.Wrapf(errMissingBlobsForBlockCommitments,
"block root %#x missing %d commitments %s", root, len(missing), strings.Join(missStr, ","))
}
// fetchBlobsFromPeer fetches blocks from a single randomly selected peer.
func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks2.BlockWithVerifiedBlobs, pid peer.ID) ([]blocks2.BlockWithVerifiedBlobs, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlobsFromPeer")
defer span.End()
if slots.ToEpoch(f.clock.CurrentSlot()) < params.BeaconConfig().DenebForkEpoch {
return bwb, nil
}
blobWindowStart, err := prysmsync.BlobsByRangeMinStartSlot(f.clock.CurrentSlot())
if err != nil {
return nil, err
}
// Construct request message based on observed interval of blocks in need of blobs.
req := blobRequest(bwb, blobWindowStart)
if req == nil {
return bwb, nil
}
// Request blobs from the same peer that gave us the blob batch.
blobs, err := f.requestBlobs(ctx, req, pid)
if err != nil {
return nil, errors.Wrap(err, "could not request blobs by range")
}
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(pid)
return verifyAndPopulateBlobs(bwb, blobs, blobWindowStart)
}
// requestBlocks is a wrapper for handling BeaconBlocksByRangeRequest requests/streams.
func (f *blocksFetcher) requestBlocks(
ctx context.Context,
@@ -334,6 +520,33 @@ func (f *blocksFetcher) requestBlocks(
return prysmsync.SendBeaconBlocksByRangeRequest(ctx, f.chain, f.p2p, pid, req, nil)
}
func (f *blocksFetcher) requestBlobs(ctx context.Context, req *p2ppb.BlobSidecarsByRangeRequest, pid peer.ID) ([]*p2ppb.BlobSidecar, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
l := f.peerLock(pid)
l.Lock()
log.WithFields(logrus.Fields{
"peer": pid,
"start": req.StartSlot,
"count": req.Count,
"capacity": f.rateLimiter.Remaining(pid.String()),
"score": f.p2p.Peers().Scorers().BlockProviderScorer().FormatScorePretty(pid),
}).Debug("Requesting blobs")
// We're intentionally abusing the block rate limit here, treating blob requests as if they were blob requests.
// Since blob requests take more bandwidth than blocks, we should improve how we account for the different kinds
// of requests, more in proportion to the cost of serving them.
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
if err := f.waitForBandwidth(pid, req.Count); err != nil {
l.Unlock()
return nil, err
}
}
f.rateLimiter.Add(pid.String(), int64(req.Count))
l.Unlock()
return prysmsync.SendBlobsByRangeRequest(ctx, f.clock, f.p2p, pid, f.ctxMap, req)
}
// requestBlocksByRoot is a wrapper for handling BeaconBlockByRootsReq requests/streams.
func (f *blocksFetcher) requestBlocksByRoot(
ctx context.Context,

View File

@@ -3,6 +3,7 @@ package initialsync
import (
"context"
"fmt"
"math/rand"
"sort"
"sync"
"testing"
@@ -23,6 +24,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
leakybucket "github.com/prysmaticlabs/prysm/v4/container/leaky-bucket"
"github.com/prysmaticlabs/prysm/v4/container/slice"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
@@ -273,6 +275,9 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) {
st, err := util.NewBeaconState()
require.NoError(t, err)
gt := time.Now()
vr := [32]byte{}
clock := startup.NewClock(gt, vr)
mc := &mock.ChainService{
State: st,
Root: genesisRoot[:],
@@ -288,6 +293,7 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) {
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
chain: mc,
p2p: p,
clock: clock,
})
require.NoError(t, fetcher.start())
@@ -299,9 +305,9 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) {
fetcher.stop()
}()
processFetchedBlocks := func() ([]interfaces.ReadOnlySignedBeaconBlock, error) {
processFetchedBlocks := func() ([]blocks.BlockWithVerifiedBlobs, error) {
defer cancel()
var unionRespBlocks []interfaces.ReadOnlySignedBeaconBlock
var unionRespBlocks []blocks.BlockWithVerifiedBlobs
for {
select {
@@ -313,8 +319,8 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) {
if resp.err != nil {
log.WithError(resp.err).Debug("Block fetcher returned error")
} else {
unionRespBlocks = append(unionRespBlocks, resp.blocks...)
if len(resp.blocks) == 0 {
unionRespBlocks = append(unionRespBlocks, resp.bwb...)
if len(resp.bwb) == 0 {
log.WithFields(logrus.Fields{
"start": resp.start,
"count": resp.count,
@@ -337,30 +343,27 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) {
maxExpectedBlocks += requestParams.count
}
blocks, err := processFetchedBlocks()
bwb, err := processFetchedBlocks()
assert.NoError(t, err)
sort.Slice(blocks, func(i, j int) bool {
return blocks[i].Block().Slot() < blocks[j].Block().Slot()
})
ss := make([]primitives.Slot, len(blocks))
for i, block := range blocks {
ss[i] = block.Block().Slot()
sort.Sort(blocks.BlockWithVerifiedBlobsSlice(bwb))
ss := make([]primitives.Slot, len(bwb))
for i, b := range bwb {
ss[i] = b.Block.Block().Slot()
}
log.WithFields(logrus.Fields{
"blocksLen": len(blocks),
"blocksLen": len(bwb),
"slots": ss,
}).Debug("Finished block fetching")
if len(blocks) > int(maxExpectedBlocks) {
t.Errorf("Too many blocks returned. Wanted %d got %d", maxExpectedBlocks, len(blocks))
if len(bwb) > int(maxExpectedBlocks) {
t.Errorf("Too many blocks returned. Wanted %d got %d", maxExpectedBlocks, len(bwb))
}
assert.Equal(t, len(tt.expectedBlockSlots), len(blocks), "Processes wrong number of blocks")
assert.Equal(t, len(tt.expectedBlockSlots), len(bwb), "Processes wrong number of blocks")
var receivedBlockSlots []primitives.Slot
for _, blk := range blocks {
receivedBlockSlots = append(receivedBlockSlots, blk.Block().Slot())
for _, b := range bwb {
receivedBlockSlots = append(receivedBlockSlots, b.Block.Block().Slot())
}
missing := slice.NotSlot(slice.IntersectionSlot(tt.expectedBlockSlots, receivedBlockSlots), tt.expectedBlockSlots)
if len(missing) > 0 {
@@ -417,11 +420,13 @@ func TestBlocksFetcher_handleRequest(t *testing.T) {
mc, p2p, _ := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers)
mc.ValidatorsRoot = [32]byte{}
mc.Genesis = time.Now()
t.Run("context cancellation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
chain: mc,
p2p: p2p,
clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot),
})
cancel()
@@ -435,6 +440,7 @@ func TestBlocksFetcher_handleRequest(t *testing.T) {
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
chain: mc,
p2p: p2p,
clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot),
})
requestCtx, reqCancel := context.WithTimeout(context.Background(), 2*time.Second)
@@ -447,7 +453,7 @@ func TestBlocksFetcher_handleRequest(t *testing.T) {
}
}()
var blocks []interfaces.ReadOnlySignedBeaconBlock
var bwb []blocks.BlockWithVerifiedBlobs
select {
case <-ctx.Done():
t.Error(ctx.Err())
@@ -455,16 +461,16 @@ func TestBlocksFetcher_handleRequest(t *testing.T) {
if resp.err != nil {
t.Error(resp.err)
} else {
blocks = resp.blocks
bwb = resp.bwb
}
}
if uint64(len(blocks)) != uint64(blockBatchLimit) {
t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchLimit, len(blocks))
if uint64(len(bwb)) != uint64(blockBatchLimit) {
t.Errorf("incorrect number of blocks returned, expected: %v, got: %v", blockBatchLimit, len(bwb))
}
var receivedBlockSlots []primitives.Slot
for _, blk := range blocks {
receivedBlockSlots = append(receivedBlockSlots, blk.Block().Slot())
for _, b := range bwb {
receivedBlockSlots = append(receivedBlockSlots, b.Block.Block().Slot())
}
missing := slice.NotSlot(slice.IntersectionSlot(chainConfig.expectedBlockSlots, receivedBlockSlots), chainConfig.expectedBlockSlots)
if len(missing) > 0 {
@@ -952,3 +958,171 @@ func TestTimeToWait(t *testing.T) {
})
}
}
func TestSortBlobs(t *testing.T) {
_, blobs := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10)
shuffled := make([]*ethpb.BlobSidecar, len(blobs))
for i := range blobs {
shuffled[i] = blobs[i]
}
rand.Shuffle(len(shuffled), func(i, j int) {
shuffled[i], shuffled[j] = shuffled[j], shuffled[i]
})
sorted := sortBlobs(shuffled)
require.Equal(t, len(sorted), len(shuffled))
for i := range blobs {
expect := blobs[i]
actual := sorted[i]
require.Equal(t, expect.Slot, actual.Slot)
require.Equal(t, expect.Index, actual.Index)
require.Equal(t, bytesutil.ToBytes48(expect.KzgCommitment), bytesutil.ToBytes48(actual.KzgCommitment))
require.Equal(t, bytesutil.ToBytes32(expect.BlockRoot), bytesutil.ToBytes32(actual.BlockRoot))
}
}
func TestLowestSlotNeedsBlob(t *testing.T) {
blks, _ := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10)
sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks))
for i := range blks {
sbbs[i] = blks[i]
}
retentionStart := primitives.Slot(5)
bwb, err := sortedBlockWithVerifiedBlobSlice(sbbs)
require.NoError(t, err)
lowest := lowestSlotNeedsBlob(retentionStart, bwb)
require.Equal(t, retentionStart, *lowest)
higher := primitives.Slot(len(blks) + 1)
lowest = lowestSlotNeedsBlob(higher, bwb)
var nilSlot *primitives.Slot
require.Equal(t, nilSlot, lowest)
blks, _ = util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10)
sbbs = make([]interfaces.ReadOnlySignedBeaconBlock, len(blks))
for i := range blks {
sbbs[i] = blks[i]
}
bwb, err = sortedBlockWithVerifiedBlobSlice(sbbs)
require.NoError(t, err)
retentionStart = bwb[5].Block.Block().Slot()
next := bwb[6].Block.Block().Slot()
skip := bwb[5].Block.Block()
bwb[5].Block, _ = util.GenerateTestDenebBlockWithSidecar(t, skip.ParentRoot(), skip.Slot(), 0)
lowest = lowestSlotNeedsBlob(retentionStart, bwb)
require.Equal(t, next, *lowest)
}
func TestBlobRequest(t *testing.T) {
var nilReq *ethpb.BlobSidecarsByRangeRequest
// no blocks
req := blobRequest([]blocks.BlockWithVerifiedBlobs{}, 0)
require.Equal(t, nilReq, req)
blks, _ := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, 10)
sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks))
for i := range blks {
sbbs[i] = blks[i]
}
bwb, err := sortedBlockWithVerifiedBlobSlice(sbbs)
require.NoError(t, err)
maxBlkSlot := primitives.Slot(len(blks) - 1)
tooHigh := primitives.Slot(len(blks) + 1)
req = blobRequest(bwb, tooHigh)
require.Equal(t, nilReq, req)
req = blobRequest(bwb, maxBlkSlot)
require.Equal(t, uint64(1), req.Count)
require.Equal(t, maxBlkSlot, req.StartSlot)
halfway := primitives.Slot(5)
req = blobRequest(bwb, halfway)
require.Equal(t, halfway, req.StartSlot)
// adding 1 to include the halfway slot itself
require.Equal(t, uint64(1+maxBlkSlot-halfway), req.Count)
before := bwb[0].Block.Block().Slot()
allAfter := bwb[1:]
req = blobRequest(allAfter, before)
require.Equal(t, allAfter[0].Block.Block().Slot(), req.StartSlot)
require.Equal(t, len(allAfter), int(req.Count))
}
func testSequenceBlockWithBlob(t *testing.T, nblocks int) ([]blocks.BlockWithVerifiedBlobs, []*ethpb.BlobSidecar) {
blks, blobs := util.ExtendBlocksPlusBlobs(t, []blocks.ROBlock{}, nblocks)
sbbs := make([]interfaces.ReadOnlySignedBeaconBlock, len(blks))
for i := range blks {
sbbs[i] = blks[i]
}
bwb, err := sortedBlockWithVerifiedBlobSlice(sbbs)
require.NoError(t, err)
return bwb, blobs
}
func TestVerifyAndPopulateBlobs(t *testing.T) {
bwb, blobs := testSequenceBlockWithBlob(t, 10)
lastBlobIdx := len(blobs) - 1
// Blocks are all before the retention window, blobs argument is ignored.
windowAfter := bwb[len(bwb)-1].Block.Block().Slot() + 1
_, err := verifyAndPopulateBlobs(bwb, nil, windowAfter)
require.NoError(t, err)
firstBlockSlot := bwb[0].Block.Block().Slot()
// slice off blobs for the last block so we hit the out of bounds / blob exhaustion check.
_, err = verifyAndPopulateBlobs(bwb, blobs[0:len(blobs)-6], firstBlockSlot)
require.ErrorIs(t, err, errMissingBlobsForBlockCommitments)
bwb, blobs = testSequenceBlockWithBlob(t, 10)
// Misalign the slots of the blobs for the first block to simulate them being missing from the response.
offByOne := blobs[0].Slot
for i := range blobs {
if blobs[i].Slot == offByOne {
blobs[i].Slot = offByOne + 1
}
}
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, errMissingBlobsForBlockCommitments)
bwb, blobs = testSequenceBlockWithBlob(t, 10)
blobs[lastBlobIdx].BlockRoot = blobs[0].BlockRoot
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, errMismatchedBlobBlockRoot)
bwb, blobs = testSequenceBlockWithBlob(t, 10)
blobs[lastBlobIdx].Index = 100
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, errMissingBlobIndex)
var emptyKzg [48]byte
bwb, blobs = testSequenceBlockWithBlob(t, 10)
blobs[lastBlobIdx].KzgCommitment = emptyKzg[:]
_, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.ErrorIs(t, err, errMismatchedBlobCommitments)
// happy path
bwb, blobs = testSequenceBlockWithBlob(t, 10)
expectedCommits := make(map[[48]byte]bool)
for _, bl := range blobs {
expectedCommits[bytesutil.ToBytes48(bl.KzgCommitment)] = true
}
// The assertions using this map expect all commitments to be unique, so make sure that stays true.
require.Equal(t, len(blobs), len(expectedCommits))
bwb, err = verifyAndPopulateBlobs(bwb, blobs, firstBlockSlot)
require.NoError(t, err)
for _, bw := range bwb {
commits, err := bw.Block.Block().Body().BlobKzgCommitments()
require.NoError(t, err)
require.Equal(t, len(commits), len(bw.Blobs))
for i := range commits {
bc := bytesutil.ToBytes48(commits[i])
require.Equal(t, bc, bytesutil.ToBytes48(bw.Blobs[i].KzgCommitment))
// Since we delete entries we've seen, duplicates will cause an error here.
_, ok := expectedCommits[bc]
// Make sure this was an expected delete, then delete it from the map so we can make sure we saw all of them.
require.Equal(t, true, ok)
delete(expectedCommits, bc)
}
}
// We delete each entry we've seen, so if we see all expected commits, the map should be empty at the end.
require.Equal(t, 0, len(expectedCommits))
}

View File

@@ -3,13 +3,13 @@ package initialsync
import (
"context"
"fmt"
"sort"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
p2pTypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
p2ppb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
@@ -22,8 +22,8 @@ import (
// Blocks are stored in an ascending slot order. The first block is guaranteed to have parent
// either in DB or initial sync cache.
type forkData struct {
peer peer.ID
blocks []interfaces.ReadOnlySignedBeaconBlock
peer peer.ID
bwb []blocks.BlockWithVerifiedBlobs
}
// nonSkippedSlotAfter checks slots after the given one in an attempt to find a non-empty future slot.
@@ -200,13 +200,20 @@ func (f *blocksFetcher) findFork(ctx context.Context, slot primitives.Slot) (*fo
return nil, errNoPeersWithAltBlocks
}
var errNoAlternateBlocks = errors.New("no alternative blocks exist within scanned range")
func findForkReqRangeSize() uint64 {
return uint64(params.BeaconConfig().SlotsPerEpoch.Mul(2))
}
// findForkWithPeer loads some blocks from a peer in an attempt to find alternative blocks.
func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot primitives.Slot) (*forkData, error) {
reqCount := findForkReqRangeSize()
// Safe-guard, since previous epoch is used when calculating.
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
if slot < slotsPerEpoch*2 {
return nil, fmt.Errorf("slot is too low to backtrack, min. expected %d", slotsPerEpoch*2)
if uint64(slot) < reqCount {
return nil, fmt.Errorf("slot is too low to backtrack, min. expected %d", reqCount)
}
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
// Locate non-skipped slot, supported by a given peer (can survive long periods of empty slots).
// When searching for non-empty slot, start an epoch earlier - for those blocks we
@@ -226,37 +233,62 @@ func (f *blocksFetcher) findForkWithPeer(ctx context.Context, pid peer.ID, slot
// Request blocks starting from the first non-empty slot.
req := &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: nonSkippedSlot,
Count: uint64(slotsPerEpoch.Mul(2)),
Count: reqCount,
Step: 1,
}
blocks, err := f.requestBlocks(ctx, req, pid)
if err != nil {
return nil, fmt.Errorf("cannot fetch blocks: %w", err)
}
if len(blocks) == 0 {
return nil, errNoAlternateBlocks
}
// If the first block is not connected to the current canonical chain, we'll stop processing this batch.
// Instead, we'll work backwards from the first block until we find a common ancestor,
// and then begin processing from there.
first := blocks[0]
if !f.chain.HasBlock(ctx, first.Block().ParentRoot()) {
// Backtrack on a root, to find a common ancestor from which we can resume syncing.
fork, err := f.findAncestor(ctx, pid, first)
if err != nil {
return nil, fmt.Errorf("failed to find common ancestor: %w", err)
}
return fork, nil
}
// Traverse blocks, and if we've got one that doesn't have parent in DB, backtrack on it.
for i, block := range blocks {
// Note that we start from the second element in the array, because we know that the first element is in the db,
// otherwise we would have gone into the findAncestor early return path above.
for i := 1; i < len(blocks); i++ {
block := blocks[i]
parentRoot := block.Block().ParentRoot()
if !f.chain.HasBlock(ctx, parentRoot) {
log.WithFields(logrus.Fields{
"peer": pid,
"slot": block.Block().Slot(),
"root": fmt.Sprintf("%#x", parentRoot),
}).Debug("Block with unknown parent root has been found")
// Backtrack only if the first block is diverging,
// otherwise we already know the common ancestor slot.
if i == 0 {
// Backtrack on a root, to find a common ancestor from which we can resume syncing.
fork, err := f.findAncestor(ctx, pid, block)
if err != nil {
return nil, fmt.Errorf("failed to find common ancestor: %w", err)
}
return fork, nil
}
return &forkData{peer: pid, blocks: blocks}, nil
// Step through blocks until we find one that is not in the chain. The goal is to find the point where the
// chain observed in the peer diverges from the locally known chain, and then collect up the remainder of the
// observed chain chunk to start initial-sync processing from the fork point.
if f.chain.HasBlock(ctx, parentRoot) {
continue
}
log.WithFields(logrus.Fields{
"peer": pid,
"slot": block.Block().Slot(),
"root": fmt.Sprintf("%#x", parentRoot),
}).Debug("Block with unknown parent root has been found")
altBlocks, err := sortedBlockWithVerifiedBlobSlice(blocks[i-1:])
if err != nil {
return nil, errors.Wrap(err, "invalid blocks received in findForkWithPeer")
}
// We need to fetch the blobs for the given alt-chain if any exist, so that we can try to verify and import
// the blocks.
bwb, err := f.fetchBlobsFromPeer(ctx, altBlocks, pid)
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findForkWithPeer")
}
// The caller will use the BlocksWith VerifiedBlobs in bwb as the starting point for
// round-robin syncing the alternate chain.
return &forkData{peer: pid, bwb: bwb}, nil
}
return nil, errors.New("no alternative blocks exist within scanned range")
return nil, errNoAlternateBlocks
}
// findAncestor tries to figure out common ancestor slot that connects a given root to known block.
@@ -266,12 +298,17 @@ func (f *blocksFetcher) findAncestor(ctx context.Context, pid peer.ID, b interfa
parentRoot := outBlocks[len(outBlocks)-1].Block().ParentRoot()
if f.chain.HasBlock(ctx, parentRoot) {
// Common ancestor found, forward blocks back to processor.
sort.Slice(outBlocks, func(i, j int) bool {
return outBlocks[i].Block().Slot() < outBlocks[j].Block().Slot()
})
bwb, err := sortedBlockWithVerifiedBlobSlice(outBlocks)
if err != nil {
return nil, errors.Wrap(err, "received invalid blocks in findAncestor")
}
bwb, err = f.fetchBlobsFromPeer(ctx, bwb, pid)
if err != nil {
return nil, errors.Wrap(err, "unable to retrieve blobs for blocks found in findAncestor")
}
return &forkData{
peer: pid,
blocks: outBlocks,
peer: pid,
bwb: bwb,
}, nil
}
// Request block's parent.

View File

@@ -13,6 +13,8 @@ import (
dbtest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
p2pm "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
p2pt "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
p2pTypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
@@ -187,6 +189,7 @@ func TestBlocksFetcher_findFork(t *testing.T) {
ctx,
&blocksFetcherConfig{
chain: mc,
clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot),
p2p: p2p,
db: beaconDB,
},
@@ -252,10 +255,14 @@ func TestBlocksFetcher_findFork(t *testing.T) {
// is smart enough to link back to common ancestor, w/o discriminating between forks. This is
// by design: fork exploration is undertaken when FSMs are stuck, so any progress is good.
chain1b := extendBlockSequence(t, chain1, 64)
forkSlot1b := primitives.Slot(len(chain1))
curForkMoreBlocksPeer := connectPeerHavingBlocks(t, p2p, chain1b, finalizedSlot, p2p.Peers())
fork, err = fetcher.findFork(ctx, 251)
require.NoError(t, err)
require.Equal(t, 64, len(fork.blocks))
reqEnd := testForkStartSlot(t, 251) + primitives.Slot(findForkReqRangeSize())
require.Equal(t, primitives.Slot(len(chain1)), fork.bwb[0].Block.Block().Slot())
require.Equal(t, int(reqEnd-forkSlot1b), len(fork.bwb))
require.Equal(t, curForkMoreBlocksPeer, fork.peer)
// Save all chain1b blocks (so that they do not interfere with alternative fork)
for _, blk := range chain1b {
@@ -277,20 +284,20 @@ func TestBlocksFetcher_findFork(t *testing.T) {
fork, err = fetcher.findFork(ctx, 251)
require.NoError(t, err)
assert.Equal(t, alternativePeer, fork.peer)
assert.Equal(t, 65, len(fork.blocks))
assert.Equal(t, 65, len(fork.bwb))
ind := forkSlot
for _, blk := range fork.blocks {
require.Equal(t, blk.Block().Slot(), chain2[ind].Block.Slot)
for _, blk := range fork.bwb {
require.Equal(t, blk.Block.Block().Slot(), chain2[ind].Block.Slot)
ind++
}
// Process returned blocks and then attempt to extend chain (ensuring that parent block exists).
for _, blk := range fork.blocks {
require.NoError(t, beaconDB.SaveBlock(ctx, blk))
require.NoError(t, st.SetSlot(blk.Block().Slot()))
for _, blk := range fork.bwb {
require.NoError(t, beaconDB.SaveBlock(ctx, blk.Block))
require.NoError(t, st.SetSlot(blk.Block.Block().Slot()))
}
assert.Equal(t, forkSlot.Add(uint64(len(fork.blocks)-1)), mc.HeadSlot())
for i := forkSlot.Add(uint64(len(fork.blocks))); i < primitives.Slot(len(chain2)); i++ {
assert.Equal(t, forkSlot.Add(uint64(len(fork.bwb)-1)), mc.HeadSlot())
for i := forkSlot.Add(uint64(len(fork.bwb))); i < primitives.Slot(len(chain2)); i++ {
blk := chain2[i]
require.Equal(t, blk.Block.Slot, i, "incorrect block selected for slot %d", i)
// Only save is parent block exists.
@@ -309,6 +316,21 @@ func TestBlocksFetcher_findFork(t *testing.T) {
}
}
func testForkStartSlot(t *testing.T, slot primitives.Slot) primitives.Slot {
// When we call find fork, the way we find the first common ancestor is:
// - start at the first slot of the given epoch (epochStart below)
// - look back an additional epoch, which findForkWithPeer comments say is done to ensure
// there will be more overlap, since this routine kicks off when we're losing sight of the canonical chain
// - add one: this happens in nonSkippedSlotAfterWithPeersTarget and isn't really explained why this method takes
// the slot before the beginning of the search range and not the actual search range.
// Anyway, since findFork returns the last common ancestor between the known chain and the forked chain,
// we need to figure out how much the two overlap so we can ignore the common blocks when looking at the
// size of the response.
epochStart, err := slots.EpochStart(slots.ToEpoch(slot))
require.NoError(t, err)
return 1 + (epochStart - params.BeaconConfig().SlotsPerEpoch)
}
func TestBlocksFetcher_findForkWithPeer(t *testing.T) {
beaconDB := dbtest.SetupDB(t)
p1 := p2pt.NewTestP2P(t)
@@ -335,6 +357,7 @@ func TestBlocksFetcher_findForkWithPeer(t *testing.T) {
ctx,
&blocksFetcherConfig{
chain: mc,
clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot),
p2p: p1,
db: beaconDB,
},
@@ -392,8 +415,8 @@ func TestBlocksFetcher_findForkWithPeer(t *testing.T) {
}()
fork, err := fetcher.findForkWithPeer(ctx, p2, 64)
require.NoError(t, err)
require.Equal(t, 10, len(fork.blocks))
assert.Equal(t, forkedSlot, fork.blocks[0].Block().Slot(), "Expected slot %d to be ancestor", forkedSlot)
require.Equal(t, 10, len(fork.bwb))
assert.Equal(t, forkedSlot, fork.bwb[0].Block.Block().Slot(), "Expected slot %d to be ancestor", forkedSlot)
})
t.Run("first block is diverging - no common ancestor", func(t *testing.T) {
@@ -417,11 +440,32 @@ func TestBlocksFetcher_findForkWithPeer(t *testing.T) {
}()
fork, err := fetcher.findForkWithPeer(ctx, p2, 64)
require.NoError(t, err)
require.Equal(t, 64, len(fork.blocks))
assert.Equal(t, primitives.Slot(33), fork.blocks[0].Block().Slot())
reqEnd := testForkStartSlot(t, 64) + primitives.Slot(findForkReqRangeSize())
expectedLen := reqEnd - forkedSlot
// there are 4 blocks that are different before the beginning
require.Equal(t, int(expectedLen), len(fork.bwb))
assert.Equal(t, primitives.Slot(60), fork.bwb[0].Block.Block().Slot())
})
}
func TestTestForkStartSlot(t *testing.T) {
require.Equal(t, primitives.Slot(33), testForkStartSlot(t, 64))
require.Equal(t, primitives.Slot(193), testForkStartSlot(t, 251))
}
func consumeBlockRootRequest(t *testing.T, p *p2pt.TestP2P) func(network.Stream) {
return func(stream network.Stream) {
defer func() {
_err := stream.Close()
_ = _err
}()
req := new(p2pTypes.BeaconBlockByRootsReq)
assert.NoError(t, p.Encoding().DecodeWithMaxLength(stream, req))
}
}
func TestBlocksFetcher_findAncestor(t *testing.T) {
beaconDB := dbtest.SetupDB(t)
p2p := p2pt.NewTestP2P(t)
@@ -476,9 +520,7 @@ func TestBlocksFetcher_findAncestor(t *testing.T) {
p2 := p2pt.NewTestP2P(t)
p2p.Connect(p2)
p2.SetStreamHandler(pcl, func(stream network.Stream) {
assert.NoError(t, stream.Close())
})
p2.SetStreamHandler(pcl, consumeBlockRootRequest(t, p2))
wsb, err := blocks.NewSignedBeaconBlock(knownBlocks[4])
require.NoError(t, err)

View File

@@ -8,8 +8,9 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
beaconsync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"github.com/sirupsen/logrus"
@@ -62,6 +63,8 @@ type syncMode uint8
type blocksQueueConfig struct {
blocksFetcher *blocksFetcher
chain blockchainService
clock *startup.Clock
ctxMap beaconsync.ContextByteVersions
highestExpectedSlot primitives.Slot
p2p p2p.P2P
db db.ReadOnlyDatabase
@@ -88,8 +91,8 @@ type blocksQueue struct {
// blocksQueueFetchedData is a data container that is returned from a queue on each step.
type blocksQueueFetchedData struct {
pid peer.ID
blocks []interfaces.ReadOnlySignedBeaconBlock
pid peer.ID
bwb []blocks.BlockWithVerifiedBlobs
}
// newBlocksQueue creates initialized priority queue.
@@ -99,9 +102,11 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue {
blocksFetcher := cfg.blocksFetcher
if blocksFetcher == nil {
blocksFetcher = newBlocksFetcher(ctx, &blocksFetcherConfig{
chain: cfg.chain,
p2p: cfg.p2p,
db: cfg.db,
ctxMap: cfg.ctxMap,
chain: cfg.chain,
p2p: cfg.p2p,
db: cfg.db,
clock: cfg.clock,
})
}
highestExpectedSlot := cfg.highestExpectedSlot
@@ -316,15 +321,15 @@ func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn {
return m.state, errInputNotFetchRequestParams
}
if response.err != nil {
switch response.err {
case errSlotIsTooHigh:
if errors.Is(response.err, errSlotIsTooHigh) {
// Current window is already too big, re-request previous epochs.
for _, fsm := range q.smm.machines {
if fsm.start < response.start && fsm.state == stateSkipped {
fsm.setState(stateNew)
}
}
case beaconsync.ErrInvalidFetchedData:
}
if errors.Is(response.err, beaconsync.ErrInvalidFetchedData) {
// Peer returned invalid data, penalize.
q.blocksFetcher.p2p.Peers().Scorers().BadResponsesScorer().Increment(m.pid)
log.WithField("pid", response.pid).Debug("Peer is penalized for invalid blocks")
@@ -332,7 +337,7 @@ func (q *blocksQueue) onDataReceivedEvent(ctx context.Context) eventHandlerFn {
return m.state, response.err
}
m.pid = response.pid
m.blocks = response.blocks
m.bwb = response.bwb
return stateDataParsed, nil
}
}
@@ -347,14 +352,14 @@ func (q *blocksQueue) onReadyToSendEvent(ctx context.Context) eventHandlerFn {
return m.state, errInvalidInitialState
}
if len(m.blocks) == 0 {
if len(m.bwb) == 0 {
return stateSkipped, nil
}
send := func() (stateID, error) {
data := &blocksQueueFetchedData{
pid: m.pid,
blocks: m.blocks,
pid: m.pid,
bwb: m.bwb,
}
select {
case <-ctx.Done():

View File

@@ -11,6 +11,7 @@ import (
dbtest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers"
p2pt "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
beaconsync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
@@ -37,6 +38,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) {
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
chain: mc,
p2p: p2p,
clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot),
})
t.Run("stop without start", func(t *testing.T) {
@@ -126,6 +128,7 @@ func TestBlocksQueue_InitStartStop(t *testing.T) {
blocksFetcher: fetcher,
chain: mc,
highestExpectedSlot: primitives.Slot(blockBatchLimit),
clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot),
})
assert.NoError(t, queue.start())
cancel()
@@ -252,6 +255,7 @@ func TestBlocksQueue_Loop(t *testing.T) {
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
chain: mc,
p2p: p2p,
clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot),
})
queue := newBlocksQueue(ctx, &blocksQueueConfig{
blocksFetcher: fetcher,
@@ -259,7 +263,8 @@ func TestBlocksQueue_Loop(t *testing.T) {
highestExpectedSlot: tt.highestExpectedSlot,
})
assert.NoError(t, queue.start())
processBlock := func(block interfaces.ReadOnlySignedBeaconBlock) error {
processBlock := func(b blocks.BlockWithVerifiedBlobs) error {
block := b.Block
if !beaconDB.HasBlock(ctx, block.Block().ParentRoot()) {
return fmt.Errorf("%w: %#x", errParentDoesNotExist, block.Block().ParentRoot())
}
@@ -270,13 +275,13 @@ func TestBlocksQueue_Loop(t *testing.T) {
return mc.ReceiveBlock(ctx, block, root)
}
var blocks []interfaces.ReadOnlySignedBeaconBlock
var blocks []blocks.BlockWithVerifiedBlobs
for data := range queue.fetchedData {
for _, block := range data.blocks {
if err := processBlock(block); err != nil {
for _, b := range data.bwb {
if err := processBlock(b); err != nil {
continue
}
blocks = append(blocks, block)
blocks = append(blocks, b)
}
}
@@ -288,8 +293,8 @@ func TestBlocksQueue_Loop(t *testing.T) {
}
assert.Equal(t, len(tt.expectedBlockSlots), len(blocks), "Processes wrong number of blocks")
var receivedBlockSlots []primitives.Slot
for _, blk := range blocks {
receivedBlockSlots = append(receivedBlockSlots, blk.Block().Slot())
for _, b := range blocks {
receivedBlockSlots = append(receivedBlockSlots, b.Block.Block().Slot())
}
missing := slice.NotSlot(slice.IntersectionSlot(tt.expectedBlockSlots, receivedBlockSlots), tt.expectedBlockSlots)
if len(missing) > 0 {
@@ -533,21 +538,21 @@ func TestBlocksQueue_onDataReceivedEvent(t *testing.T) {
require.NoError(t, err)
response := &fetchRequestResponse{
pid: "abc",
blocks: []interfaces.ReadOnlySignedBeaconBlock{
wsb,
wsbCopy,
bwb: []blocks.BlockWithVerifiedBlobs{
{Block: blocks.ROBlock{ReadOnlySignedBeaconBlock: wsb}},
{Block: blocks.ROBlock{ReadOnlySignedBeaconBlock: wsbCopy}},
},
}
fsm := &stateMachine{
state: stateScheduled,
}
assert.Equal(t, peer.ID(""), fsm.pid)
assert.DeepSSZEqual(t, []interfaces.ReadOnlySignedBeaconBlock(nil), fsm.blocks)
assert.Equal(t, 0, len(fsm.bwb))
updatedState, err := handlerFn(fsm, response)
assert.NoError(t, err)
assert.Equal(t, stateDataParsed, updatedState)
assert.Equal(t, response.pid, fsm.pid)
assert.DeepSSZEqual(t, response.blocks, fsm.blocks)
assert.DeepSSZEqual(t, response.bwb, fsm.bwb)
})
}
@@ -631,8 +636,10 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) {
queue.smm.addStateMachine(320)
queue.smm.machines[256].state = stateDataParsed
queue.smm.machines[256].pid = pidDataParsed
queue.smm.machines[256].blocks = []interfaces.ReadOnlySignedBeaconBlock{
wsb,
rwsb, err := blocks.NewROBlock(wsb)
require.NoError(t, err)
queue.smm.machines[256].bwb = []blocks.BlockWithVerifiedBlobs{
{Block: rwsb},
}
handlerFn := queue.onReadyToSendEvent(ctx)
@@ -663,8 +670,10 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) {
queue.smm.addStateMachine(320)
queue.smm.machines[320].state = stateDataParsed
queue.smm.machines[320].pid = pidDataParsed
queue.smm.machines[320].blocks = []interfaces.ReadOnlySignedBeaconBlock{
wsb,
rwsb, err := blocks.NewROBlock(wsb)
require.NoError(t, err)
queue.smm.machines[320].bwb = []blocks.BlockWithVerifiedBlobs{
{Block: rwsb},
}
handlerFn := queue.onReadyToSendEvent(ctx)
@@ -692,8 +701,10 @@ func TestBlocksQueue_onReadyToSendEvent(t *testing.T) {
queue.smm.addStateMachine(320)
queue.smm.machines[320].state = stateDataParsed
queue.smm.machines[320].pid = pidDataParsed
queue.smm.machines[320].blocks = []interfaces.ReadOnlySignedBeaconBlock{
wsb,
rwsb, err := blocks.NewROBlock(wsb)
require.NoError(t, err)
queue.smm.machines[320].bwb = []blocks.BlockWithVerifiedBlobs{
{Block: rwsb},
}
handlerFn := queue.onReadyToSendEvent(ctx)
@@ -1078,6 +1089,7 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) {
chain: mc,
p2p: p2p,
db: beaconDB,
clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot),
},
)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false)
@@ -1212,27 +1224,28 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) {
// required forked data, including data on and after slot 201.
forkedEpochStartSlot, err := slots.EpochStart(slots.ToEpoch(forkedSlot))
require.NoError(t, err)
firstFSM, ok := queue.smm.findStateMachine(forkedEpochStartSlot + 1)
firstFSM, ok := queue.smm.findStateMachine(forkedSlot)
require.Equal(t, true, ok)
require.Equal(t, stateDataParsed, firstFSM.state)
require.Equal(t, forkedPeer, firstFSM.pid)
require.Equal(t, 64, len(firstFSM.blocks))
require.Equal(t, forkedEpochStartSlot+1, firstFSM.blocks[0].Block().Slot())
reqEnd := testForkStartSlot(t, 251) + primitives.Slot(findForkReqRangeSize())
require.Equal(t, int(reqEnd-forkedSlot), len(firstFSM.bwb))
require.Equal(t, forkedSlot, firstFSM.bwb[0].Block.Block().Slot())
// Assert that forked data from chain2 is available (within 64 fetched blocks).
for i, blk := range chain2[forkedEpochStartSlot+1:] {
if i >= len(firstFSM.blocks) {
for i, blk := range chain2[forkedSlot:] {
if i >= len(firstFSM.bwb) {
break
}
rootFromFSM, err := firstFSM.blocks[i].Block().HashTreeRoot()
require.NoError(t, err)
rootFromFSM := firstFSM.bwb[i].Block.Root()
blkRoot, err := blk.Block.HashTreeRoot()
require.NoError(t, err)
assert.Equal(t, blkRoot, rootFromFSM)
}
// Assert that machines are in the expected state.
startSlot = forkedEpochStartSlot.Add(1 + uint64(len(firstFSM.blocks)))
startSlot = forkedEpochStartSlot.Add(1 + blocksPerRequest)
require.Equal(t, int(blocksPerRequest)-int(forkedSlot-(forkedEpochStartSlot+1)), len(firstFSM.bwb))
for i := startSlot; i < startSlot.Add(blocksPerRequest*(lookaheadSteps-1)); i += primitives.Slot(blocksPerRequest) {
fsm, ok := queue.smm.findStateMachine(i)
require.Equal(t, true, ok)
@@ -1298,6 +1311,7 @@ func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) {
chain: mc,
p2p: p2p,
db: beaconDB,
clock: startup.NewClock(mc.Genesis, mc.ValidatorsRoot),
},
)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, 1*time.Second, false)
@@ -1333,7 +1347,8 @@ func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) {
case <-time.After(3 * time.Second):
t.Fatal("test takes too long to complete")
case data := <-queue.fetchedData:
for _, blk := range data.blocks {
for _, b := range data.bwb {
blk := b.Block
blkRoot, err := blk.Block().HashTreeRoot()
require.NoError(t, err)
if isProcessedBlock(ctx, blk, blkRoot) {

View File

@@ -14,13 +14,10 @@ func (q *blocksQueue) resetFromFork(fork *forkData) error {
if fork == nil {
return errors.New("nil fork data")
}
if len(fork.blocks) == 0 {
if len(fork.bwb) == 0 {
return errors.New("no blocks to reset from")
}
firstBlock := fork.blocks[0].Block()
if firstBlock == nil || firstBlock.IsNil() {
return errors.New("invalid first block in fork data")
}
firstBlock := fork.bwb[0].Block.Block()
blocksPerRequest := q.blocksFetcher.blocksPerPeriod
if err := q.smm.removeAllStateMachines(); err != nil {
@@ -28,11 +25,11 @@ func (q *blocksQueue) resetFromFork(fork *forkData) error {
}
fsm := q.smm.addStateMachine(firstBlock.Slot())
fsm.pid = fork.peer
fsm.blocks = fork.blocks
fsm.bwb = fork.bwb
fsm.state = stateDataParsed
// The rest of machines are in skipped state.
startSlot := firstBlock.Slot().Add(uint64(len(fork.blocks)))
startSlot := firstBlock.Slot().Add(uint64(len(fork.bwb)))
for i := startSlot; i < startSlot.Add(blocksPerRequest*(lookaheadSteps-1)); i += primitives.Slot(blocksPerRequest) {
fsm := q.smm.addStateMachine(i)
fsm.state = stateSkipped

View File

@@ -7,7 +7,7 @@ import (
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
prysmTime "github.com/prysmaticlabs/prysm/v4/time"
"github.com/prysmaticlabs/prysm/v4/time/slots"
@@ -46,7 +46,7 @@ type stateMachine struct {
start primitives.Slot
state stateID
pid peer.ID
blocks []interfaces.ReadOnlySignedBeaconBlock
bwb []blocks.BlockWithVerifiedBlobs
updated time.Time
}
@@ -78,7 +78,7 @@ func (smm *stateMachineManager) addStateMachine(startSlot primitives.Slot) *stat
smm: smm,
start: startSlot,
state: stateNew,
blocks: []interfaces.ReadOnlySignedBeaconBlock{},
bwb: []blocks.BlockWithVerifiedBlobs{},
updated: prysmTime.Now(),
}
smm.recalculateMachineAttribs()
@@ -90,7 +90,7 @@ func (smm *stateMachineManager) removeStateMachine(startSlot primitives.Slot) er
if _, ok := smm.machines[startSlot]; !ok {
return fmt.Errorf("state for machine %v is not found", startSlot)
}
smm.machines[startSlot].blocks = nil
smm.machines[startSlot].bwb = nil
delete(smm.machines, startSlot)
smm.recalculateMachineAttribs()
return nil

View File

@@ -3,13 +3,15 @@ package initialsync
import (
"context"
"encoding/hex"
"errors"
"fmt"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/paulbellamy/ratecounter"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/time/slots"
@@ -25,7 +27,7 @@ const (
type blockReceiverFn func(ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error
// batchBlockReceiverFn defines batch receiving function.
type batchBlockReceiverFn func(ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock, roots [][32]byte) error
type batchBlockReceiverFn func(ctx context.Context, blks []blocks.ROBlock) error
// Round Robin sync looks at the latest peer statuses and syncs up to the highest known epoch.
//
@@ -69,10 +71,18 @@ func (s *Service) syncToFinalizedEpoch(ctx context.Context, genesis time.Time) e
log.Debug("Already synced to finalized epoch")
return nil
}
vr := s.clock.GenesisValidatorsRoot()
ctxMap, err := sync.ContextByteVersionsForValRoot(vr)
if err != nil {
return errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr)
}
queue := newBlocksQueue(ctx, &blocksQueueConfig{
p2p: s.cfg.P2P,
db: s.cfg.DB,
chain: s.cfg.Chain,
clock: s.clock,
ctxMap: ctxMap,
highestExpectedSlot: highestFinalizedSlot,
mode: modeStopOnFinalizedEpoch,
})
@@ -81,6 +91,9 @@ func (s *Service) syncToFinalizedEpoch(ctx context.Context, genesis time.Time) e
}
for data := range queue.fetchedData {
// If blobs are available. Verify blobs and blocks are consistence.
// We can't import a block if there's no associated blob within DA bound.
// The blob has to pass aggregated proof check.
s.processFetchedData(ctx, genesis, s.cfg.Chain.HeadSlot(), data)
}
@@ -98,10 +111,17 @@ func (s *Service) syncToFinalizedEpoch(ctx context.Context, genesis time.Time) e
// syncToNonFinalizedEpoch sync from head to best known non-finalized epoch supported by majority
// of peers (no less than MinimumSyncPeers*2 peers).
func (s *Service) syncToNonFinalizedEpoch(ctx context.Context, genesis time.Time) error {
vr := s.clock.GenesisValidatorsRoot()
ctxMap, err := sync.ContextByteVersionsForValRoot(vr)
if err != nil {
return errors.Wrapf(err, "unable to initialize context version map using genesis validator root = %#x", vr)
}
queue := newBlocksQueue(ctx, &blocksQueueConfig{
p2p: s.cfg.P2P,
db: s.cfg.DB,
chain: s.cfg.Chain,
clock: s.clock,
ctxMap: ctxMap,
highestExpectedSlot: slots.Since(genesis),
mode: modeNonConstrained,
})
@@ -128,7 +148,7 @@ func (s *Service) processFetchedData(
defer s.updatePeerScorerStats(data.pid, startSlot)
// Use Batch Block Verify to process and verify batches directly.
if err := s.processBatchedBlocks(ctx, genesis, data.blocks, s.cfg.Chain.ReceiveBlockBatch); err != nil {
if err := s.processBatchedBlocks(ctx, genesis, data.bwb, s.cfg.Chain.ReceiveBlockBatch); err != nil {
log.WithError(err).Warn("Skip processing batched blocks")
}
}
@@ -141,8 +161,8 @@ func (s *Service) processFetchedDataRegSync(
blockReceiver := s.cfg.Chain.ReceiveBlock
invalidBlocks := 0
blksWithoutParentCount := 0
for _, blk := range data.blocks {
if err := s.processBlock(ctx, genesis, blk, blockReceiver); err != nil {
for _, b := range data.bwb {
if err := s.processBlock(ctx, genesis, b, blockReceiver); err != nil {
switch {
case errors.Is(err, errBlockAlreadyProcessed):
log.WithError(err).Debug("Block is not processed")
@@ -158,13 +178,13 @@ func (s *Service) processFetchedDataRegSync(
}
if blksWithoutParentCount > 0 {
log.WithFields(logrus.Fields{
"missingParent": fmt.Sprintf("%#x", data.blocks[0].Block().ParentRoot()),
"firstSlot": data.blocks[0].Block().Slot(),
"lastSlot": data.blocks[blksWithoutParentCount-1].Block().Slot(),
"missingParent": fmt.Sprintf("%#x", data.bwb[0].Block.Block().ParentRoot()),
"firstSlot": data.bwb[0].Block.Block().Slot(),
"lastSlot": data.bwb[blksWithoutParentCount-1].Block.Block().Slot(),
}).Debug("Could not process batch blocks due to missing parent")
}
// Add more visible logging if all blocks cannot be processed.
if len(data.blocks) == invalidBlocks {
if len(data.bwb) == invalidBlocks {
log.WithField("error", "Range had no valid blocks to process").Warn("Range is not processed")
}
}
@@ -204,20 +224,20 @@ func (s *Service) logSyncStatus(genesis time.Time, blk interfaces.ReadOnlyBeacon
}
// logBatchSyncStatus and increments the block processing counter.
func (s *Service) logBatchSyncStatus(genesis time.Time, blks []interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) {
s.counter.Incr(int64(len(blks)))
func (s *Service) logBatchSyncStatus(genesis time.Time, firstBlk blocks.ROBlock, nBlocks int) {
s.counter.Incr(int64(nBlocks))
rate := float64(s.counter.Rate()) / counterSeconds
if rate == 0 {
rate = 1
}
firstBlk := blks[0]
firstRoot := firstBlk.Root()
timeRemaining := time.Duration(float64(slots.Since(genesis)-firstBlk.Block().Slot())/rate) * time.Second
log.WithFields(logrus.Fields{
"peers": len(s.cfg.P2P.Peers().Connected()),
"blocksPerSecond": fmt.Sprintf("%.1f", rate),
}).Infof(
"Processing block batch of size %d starting from %s %d/%d - estimated time remaining %s",
len(blks), fmt.Sprintf("0x%s...", hex.EncodeToString(blkRoot[:])[:8]),
nBlocks, fmt.Sprintf("0x%s...", hex.EncodeToString(firstRoot[:])[:8]),
firstBlk.Block().Slot(), slots.Since(genesis), timeRemaining,
)
}
@@ -226,14 +246,12 @@ func (s *Service) logBatchSyncStatus(genesis time.Time, blks []interfaces.ReadOn
func (s *Service) processBlock(
ctx context.Context,
genesis time.Time,
blk interfaces.ReadOnlySignedBeaconBlock,
bwb blocks.BlockWithVerifiedBlobs,
blockReceiver blockReceiverFn,
) error {
blkRoot, err := blk.Block().HashTreeRoot()
if err != nil {
return err
}
if s.isProcessedBlock(ctx, blk, blkRoot) {
blk := bwb.Block
blkRoot := blk.Root()
if s.isProcessedBlock(ctx, blk) {
return fmt.Errorf("slot: %d , root %#x: %w", blk.Block().Slot(), blkRoot, errBlockAlreadyProcessed)
}
@@ -244,48 +262,63 @@ func (s *Service) processBlock(
return blockReceiver(ctx, blk, blkRoot)
}
type processedChecker func(context.Context, blocks.ROBlock) bool
func validUnprocessed(ctx context.Context, bwb []blocks.BlockWithVerifiedBlobs, headSlot primitives.Slot, isProc processedChecker) ([]blocks.BlockWithVerifiedBlobs, error) {
// use a pointer to avoid confusing the zero-value with the case where the first element is processed.
var processed *int
for i := range bwb {
b := bwb[i].Block
if headSlot >= b.Block().Slot() && isProc(ctx, b) {
processed = &i
continue
}
if i > 0 {
parent := bwb[i-1].Block
if parent.Root() != b.Block().ParentRoot() {
return nil, fmt.Errorf("expected linear block list with parent root of %#x (slot %d) but received %#x (slot %d)",
parent, parent.Block().Slot(), b.Block().ParentRoot(), b.Block().Slot())
}
}
}
if processed == nil {
return bwb, nil
}
if *processed+1 == len(bwb) {
maxIncoming := bwb[len(bwb)-1].Block
maxRoot := maxIncoming.Root()
return nil, fmt.Errorf("headSlot:%d, blockSlot:%d , root %#x:%w", headSlot, maxIncoming.Block().Slot(), maxRoot, errBlockAlreadyProcessed)
}
return bwb[*processed:], nil
}
func (s *Service) processBatchedBlocks(ctx context.Context, genesis time.Time,
blks []interfaces.ReadOnlySignedBeaconBlock, bFunc batchBlockReceiverFn) error {
if len(blks) == 0 {
bwb []blocks.BlockWithVerifiedBlobs, bFunc batchBlockReceiverFn) error {
if len(bwb) == 0 {
return errors.New("0 blocks provided into method")
}
firstBlock := blks[0]
blkRoot, err := firstBlock.Block().HashTreeRoot()
headSlot := s.cfg.Chain.HeadSlot()
var err error
bwb, err = validUnprocessed(ctx, bwb, headSlot, s.isProcessedBlock)
if err != nil {
return err
}
headSlot := s.cfg.Chain.HeadSlot()
for headSlot >= firstBlock.Block().Slot() && s.isProcessedBlock(ctx, firstBlock, blkRoot) {
if len(blks) == 1 {
return fmt.Errorf("headSlot:%d, blockSlot:%d , root %#x:%w", headSlot, firstBlock.Block().Slot(), blkRoot, errBlockAlreadyProcessed)
first := bwb[0].Block
if !s.cfg.Chain.HasBlock(ctx, first.Block().ParentRoot()) {
return fmt.Errorf("%w: %#x (in processBatchedBlocks, slot=%d)",
errParentDoesNotExist, first.Block().ParentRoot(), first.Block().Slot())
}
s.logBatchSyncStatus(genesis, first, len(bwb))
for _, bb := range bwb {
if len(bb.Blobs) == 0 {
continue
}
blks = blks[1:]
firstBlock = blks[0]
blkRoot, err = firstBlock.Block().HashTreeRoot()
if err != nil {
return err
if err := s.cfg.DB.SaveBlobSidecar(ctx, bb.Blobs); err != nil {
return errors.Wrapf(err, "failed to save blobs for block %#x", bb.Block.Root())
}
}
s.logBatchSyncStatus(genesis, blks, blkRoot)
parentRoot := firstBlock.Block().ParentRoot()
if !s.cfg.Chain.HasBlock(ctx, parentRoot) {
return fmt.Errorf("%w: %#x (in processBatchedBlocks, slot=%d)", errParentDoesNotExist, firstBlock.Block().ParentRoot(), firstBlock.Block().Slot())
}
blockRoots := make([][32]byte, len(blks))
blockRoots[0] = blkRoot
for i := 1; i < len(blks); i++ {
b := blks[i]
if b.Block().ParentRoot() != blockRoots[i-1] {
return fmt.Errorf("expected linear block list with parent root of %#x but received %#x",
blockRoots[i-1][:], b.Block().ParentRoot())
}
blkRoot, err := b.Block().HashTreeRoot()
if err != nil {
return err
}
blockRoots[i] = blkRoot
}
return bFunc(ctx, blks, blockRoots)
return bFunc(ctx, blocks.BlockWithVerifiedBlobsSlice(bwb).ROBlocks())
}
// updatePeerScorerStats adjusts monitored metrics for a peer.
@@ -304,7 +337,7 @@ func (s *Service) updatePeerScorerStats(pid peer.ID, startSlot primitives.Slot)
}
// isProcessedBlock checks DB and local cache for presence of a given block, to avoid duplicates.
func (s *Service) isProcessedBlock(ctx context.Context, blk interfaces.ReadOnlySignedBeaconBlock, blkRoot [32]byte) bool {
func (s *Service) isProcessedBlock(ctx context.Context, blk blocks.ROBlock) bool {
cp := s.cfg.Chain.FinalizedCheckpt()
finalizedSlot, err := slots.EpochStart(cp.Epoch)
if err != nil {
@@ -317,7 +350,7 @@ func (s *Service) isProcessedBlock(ctx context.Context, blk interfaces.ReadOnlyS
}
// If block exists in our db and is before or equal to our current head
// we ignore it.
if s.cfg.Chain.HeadSlot() >= blk.Block().Slot() && s.cfg.Chain.HasBlock(ctx, blkRoot) {
if s.cfg.Chain.HeadSlot() >= blk.Block().Slot() && s.cfg.Chain.HasBlock(ctx, blk.Root()) {
return true
}
return false

View File

@@ -10,6 +10,7 @@ import (
mock "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain/testing"
dbtest "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/testing"
p2pt "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
@@ -292,6 +293,8 @@ func TestService_roundRobinSync(t *testing.T) {
st, err := util.NewBeaconState()
require.NoError(t, err)
gt := time.Now()
vr := [32]byte{}
mc := &mock.ChainService{
State: st,
Root: genesisRoot[:],
@@ -299,14 +302,16 @@ func TestService_roundRobinSync(t *testing.T) {
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: 0,
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
Genesis: gt,
ValidatorsRoot: vr,
} // no-op mock
clock := startup.NewClock(gt, vr)
s := &Service{
ctx: context.Background(),
cfg: &Config{Chain: mc, P2P: p, DB: beaconDB},
synced: abool.New(),
chainStarted: abool.NewBool(true),
clock: clock,
}
assert.NoError(t, s.roundRobinSync(makeGenesisTime(tt.currentSlot)))
if s.cfg.Chain.HeadSlot() < tt.currentSlot {
@@ -364,7 +369,9 @@ func TestService_processBlock(t *testing.T) {
// Process block normally.
wsb, err := blocks.NewSignedBeaconBlock(blk1)
require.NoError(t, err)
err = s.processBlock(ctx, genesis, wsb, func(
rowsb, err := blocks.NewROBlock(wsb)
require.NoError(t, err)
err = s.processBlock(ctx, genesis, blocks.BlockWithVerifiedBlobs{Block: rowsb}, func(
ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error {
assert.NoError(t, s.cfg.Chain.ReceiveBlock(ctx, block, blockRoot))
return nil
@@ -374,7 +381,9 @@ func TestService_processBlock(t *testing.T) {
// Duplicate processing should trigger error.
wsb, err = blocks.NewSignedBeaconBlock(blk1)
require.NoError(t, err)
err = s.processBlock(ctx, genesis, wsb, func(
rowsb, err = blocks.NewROBlock(wsb)
require.NoError(t, err)
err = s.processBlock(ctx, genesis, blocks.BlockWithVerifiedBlobs{Block: rowsb}, func(
ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error {
return nil
})
@@ -383,7 +392,9 @@ func TestService_processBlock(t *testing.T) {
// Continue normal processing, should proceed w/o errors.
wsb, err = blocks.NewSignedBeaconBlock(blk2)
require.NoError(t, err)
err = s.processBlock(ctx, genesis, wsb, func(
rowsb, err = blocks.NewROBlock(wsb)
require.NoError(t, err)
err = s.processBlock(ctx, genesis, blocks.BlockWithVerifiedBlobs{Block: rowsb}, func(
ctx context.Context, block interfaces.ReadOnlySignedBeaconBlock, blockRoot [32]byte) error {
assert.NoError(t, s.cfg.Chain.ReceiveBlock(ctx, block, blockRoot))
return nil
@@ -418,7 +429,7 @@ func TestService_processBlockBatch(t *testing.T) {
genesis := makeGenesisTime(32)
t.Run("process non-linear batch", func(t *testing.T) {
var batch []interfaces.ReadOnlySignedBeaconBlock
var batch []blocks.BlockWithVerifiedBlobs
currBlockRoot := genesisBlkRoot
for i := primitives.Slot(1); i < 10; i++ {
parentRoot := currBlockRoot
@@ -430,11 +441,13 @@ func TestService_processBlockBatch(t *testing.T) {
util.SaveBlock(t, context.Background(), beaconDB, blk1)
wsb, err := blocks.NewSignedBeaconBlock(blk1)
require.NoError(t, err)
batch = append(batch, wsb)
rowsb, err := blocks.NewROBlock(wsb)
require.NoError(t, err)
batch = append(batch, blocks.BlockWithVerifiedBlobs{Block: rowsb})
currBlockRoot = blk1Root
}
var batch2 []interfaces.ReadOnlySignedBeaconBlock
var batch2 []blocks.BlockWithVerifiedBlobs
for i := primitives.Slot(10); i < 20; i++ {
parentRoot := currBlockRoot
blk1 := util.NewBeaconBlock()
@@ -445,26 +458,29 @@ func TestService_processBlockBatch(t *testing.T) {
util.SaveBlock(t, context.Background(), beaconDB, blk1)
wsb, err := blocks.NewSignedBeaconBlock(blk1)
require.NoError(t, err)
batch2 = append(batch2, wsb)
rowsb, err := blocks.NewROBlock(wsb)
require.NoError(t, err)
batch2 = append(batch2, blocks.BlockWithVerifiedBlobs{Block: rowsb})
currBlockRoot = blk1Root
}
// Process block normally.
err = s.processBatchedBlocks(ctx, genesis, batch, func(
ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock, blockRoots [][32]byte) error {
assert.NoError(t, s.cfg.Chain.ReceiveBlockBatch(ctx, blks, blockRoots))
cbnormal := func(ctx context.Context, blks []blocks.ROBlock) error {
assert.NoError(t, s.cfg.Chain.ReceiveBlockBatch(ctx, blks))
return nil
})
}
// Process block normally.
err = s.processBatchedBlocks(ctx, genesis, batch, cbnormal)
assert.NoError(t, err)
// Duplicate processing should trigger error.
err = s.processBatchedBlocks(ctx, genesis, batch, func(
ctx context.Context, blocks []interfaces.ReadOnlySignedBeaconBlock, blockRoots [][32]byte) error {
cbnil := func(ctx context.Context, blocks []blocks.ROBlock) error {
return nil
})
}
// Duplicate processing should trigger error.
err = s.processBatchedBlocks(ctx, genesis, batch, cbnil)
assert.ErrorContains(t, "block is already processed", err)
var badBatch2 []interfaces.ReadOnlySignedBeaconBlock
var badBatch2 []blocks.BlockWithVerifiedBlobs
for i, b := range batch2 {
// create a non-linear batch
if i%3 == 0 && i != 0 {
@@ -474,19 +490,12 @@ func TestService_processBlockBatch(t *testing.T) {
}
// Bad batch should fail because it is non linear
err = s.processBatchedBlocks(ctx, genesis, badBatch2, func(
ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock, blockRoots [][32]byte) error {
return nil
})
err = s.processBatchedBlocks(ctx, genesis, badBatch2, cbnil)
expectedSubErr := "expected linear block list"
assert.ErrorContains(t, expectedSubErr, err)
// Continue normal processing, should proceed w/o errors.
err = s.processBatchedBlocks(ctx, genesis, batch2, func(
ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock, blockRoots [][32]byte) error {
assert.NoError(t, s.cfg.Chain.ReceiveBlockBatch(ctx, blks, blockRoots))
return nil
})
err = s.processBatchedBlocks(ctx, genesis, batch2, cbnormal)
assert.NoError(t, err)
assert.Equal(t, primitives.Slot(19), s.cfg.Chain.HeadSlot(), "Unexpected head slot")
})
@@ -537,6 +546,8 @@ func TestService_blockProviderScoring(t *testing.T) {
st, err := util.NewBeaconState()
require.NoError(t, err)
require.NoError(t, err)
gt := time.Now()
vr := [32]byte{}
mc := &mock.ChainService{
State: st,
Root: genesisRoot[:],
@@ -545,14 +556,16 @@ func TestService_blockProviderScoring(t *testing.T) {
Epoch: 0,
Root: make([]byte, 32),
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
Genesis: gt,
ValidatorsRoot: vr,
} // no-op mock
clock := startup.NewClock(gt, vr)
s := &Service{
ctx: context.Background(),
cfg: &Config{Chain: mc, P2P: p, DB: beaconDB},
synced: abool.New(),
chainStarted: abool.NewBool(true),
clock: clock,
}
scorer := s.cfg.P2P.Peers().Scorers().BlockProviderScorer()
expectedBlockSlots := makeSequence(1, 160)
@@ -601,6 +614,9 @@ func TestService_syncToFinalizedEpoch(t *testing.T) {
st, err := util.NewBeaconState()
require.NoError(t, err)
gt := time.Now()
vr := [32]byte{}
clock := startup.NewClock(gt, vr)
mc := &mock.ChainService{
State: st,
Root: genesisRoot[:],
@@ -609,8 +625,8 @@ func TestService_syncToFinalizedEpoch(t *testing.T) {
Epoch: 0,
Root: make([]byte, 32),
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
Genesis: gt,
ValidatorsRoot: vr,
}
s := &Service{
ctx: context.Background(),
@@ -618,6 +634,7 @@ func TestService_syncToFinalizedEpoch(t *testing.T) {
synced: abool.New(),
chainStarted: abool.NewBool(true),
counter: ratecounter.NewRateCounter(counterSeconds * time.Second),
clock: clock,
}
expectedBlockSlots := makeSequence(1, 191)
currentSlot := primitives.Slot(191)

View File

@@ -35,7 +35,7 @@ type blockchainService interface {
// Config to set up the initial sync service.
type Config struct {
P2P p2p.P2P
DB db.ReadOnlyDatabase
DB db.NoHeadAccessDatabase
Chain blockchainService
StateNotifier statefeed.Notifier
BlockNotifier blockfeed.Notifier

View File

@@ -2,6 +2,7 @@ package sync
import (
"context"
"math"
"time"
libp2pcore "github.com/libp2p/go-libp2p/core"
@@ -110,7 +111,14 @@ func (s *Service) blobSidecarsByRangeRPCHandler(ctx context.Context, msg interfa
return nil
}
func blobsByRangeMinStartSlot(current primitives.Slot) (primitives.Slot, error) {
// BlobsByRangeMinStartSlot returns the lowest slot that we should expect peers to respect as the
// start slot in a BlobSidecarsByRange request. This can be used to validate incoming requests and
// to avoid pestering peers with requests for blobs that are outside the retention window.
func BlobsByRangeMinStartSlot(current primitives.Slot) (primitives.Slot, error) {
// Avoid overflow if we're running on a config where deneb is set to far future epoch.
if params.BeaconConfig().DenebForkEpoch == math.MaxUint64 {
return primitives.Slot(math.MaxUint64), nil
}
minReqEpochs := params.BeaconNetworkConfig().MinEpochsForBlobsSidecarsRequest
currEpoch := slots.ToEpoch(current)
minStart := params.BeaconConfig().DenebForkEpoch
@@ -156,9 +164,9 @@ func validateBlobsByRange(r *pb.BlobSidecarsByRangeRequest, current primitives.S
// [max(current_epoch - MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, DENEB_FORK_EPOCH), current_epoch]
// where current_epoch is defined by the current wall-clock time,
// and clients MUST support serving requests of blobs on this range.
minStartSlot, err := blobsByRangeMinStartSlot(current)
minStartSlot, err := BlobsByRangeMinStartSlot(current)
if err != nil {
return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "blobsByRangeMinStartSlot error")
return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "BlobsByRangeMinStartSlot error")
}
if rp.start > maxStart {
return rangeParams{}, errors.Wrap(p2ptypes.ErrInvalidRequest, "start > maxStart")

View File

@@ -177,7 +177,7 @@ func TestBlobsByRangeValidation(t *testing.T) {
and clients MUST support serving requests of blobs on this range.
*/
defaultCurrent := denebSlot + 100 + minReqSlots
defaultMinStart, err := blobsByRangeMinStartSlot(defaultCurrent)
defaultMinStart, err := BlobsByRangeMinStartSlot(defaultCurrent)
require.NoError(t, err)
cases := []struct {
name string

View File

@@ -130,6 +130,11 @@ func TestReadChunkEncodedBlobs(t *testing.T) {
nblocks: 1,
streamReader: readChunkEncodedBlobsAsStreamReader,
},
{
name: "test peer sending excess blobs",
nblocks: 1,
streamReader: readChunkEncodedBlobsLowMax,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
@@ -138,6 +143,20 @@ func TestReadChunkEncodedBlobs(t *testing.T) {
}
}
// Specifies a max expected chunk parameter of 1, so that a response with one or more blobs will give ErrInvalidFetchedData.
func readChunkEncodedBlobsLowMax(t *testing.T, s *Service, expect []*expectedBlobChunk) func(network.Stream) {
encoding := s.cfg.p2p.Encoding()
ctxMap, err := ContextByteVersionsForValRoot(s.cfg.clock.GenesisValidatorsRoot())
require.NoError(t, err)
vf := func(sidecar *ethpb.BlobSidecar) error {
return nil
}
return func(stream network.Stream) {
_, err := readChunkEncodedBlobs(stream, encoding, ctxMap, vf, 1)
require.ErrorIs(t, err, ErrInvalidFetchedData)
}
}
func readChunkEncodedBlobsAsStreamReader(t *testing.T, s *Service, expect []*expectedBlobChunk) func(network.Stream) {
encoding := s.cfg.p2p.Encoding()
ctxMap, err := ContextByteVersionsForValRoot(s.cfg.clock.GenesisValidatorsRoot())
@@ -146,7 +165,7 @@ func readChunkEncodedBlobsAsStreamReader(t *testing.T, s *Service, expect []*exp
return nil
}
return func(stream network.Stream) {
scs, err := readChunkEncodedBlobs(stream, encoding, ctxMap, vf)
scs, err := readChunkEncodedBlobs(stream, encoding, ctxMap, vf, params.BeaconNetworkConfig().MaxRequestBlobSidecars)
require.NoError(t, err)
require.Equal(t, len(expect), len(scs))
for i, sc := range scs {

View File

@@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/encoder"
p2ptypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
@@ -21,9 +22,15 @@ import (
"github.com/prysmaticlabs/prysm/v4/time/slots"
)
// ErrInvalidFetchedData is thrown if stream fails to provide requested blocks.
// ErrInvalidFetchedData is used to signal that an error occurred which should result in peer downscoring.
var ErrInvalidFetchedData = errors.New("invalid data returned from peer")
var errMaxRequestBlobSidecarsExceeded = errors.Wrap(ErrInvalidFetchedData, "peer exceeded req blob chunk tx limit")
var errBlobChunkedReadFailure = errors.New("failed to read stream of chunk-encoded blobs")
var errBlobUnmarshal = errors.New("Could not unmarshal chunk-encoded blob")
var errUnrequestedRoot = errors.New("Received BlobSidecar in response that was not requested")
var errBlobResponseOutOfBounds = errors.New("received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds")
// BeaconBlockProcessor defines a block processing function, which allows to start utilizing
// blocks even before all blocks are ready.
type BeaconBlockProcessor func(block interfaces.ReadOnlySignedBeaconBlock) error
@@ -135,8 +142,8 @@ func SendBeaconBlocksByRootRequest(
return blocks, nil
}
func SendBlobsByRangeRequest(ctx context.Context, ci blockchain.ForkFetcher, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *pb.BlobSidecarsByRangeRequest) ([]*pb.BlobSidecar, error) {
topic, err := p2p.TopicFromMessage(p2p.BlobSidecarsByRangeName, slots.ToEpoch(ci.CurrentSlot()))
func SendBlobsByRangeRequest(ctx context.Context, tor blockchain.TemporalOracle, p2pApi p2p.SenderEncoder, pid peer.ID, ctxMap ContextByteVersions, req *pb.BlobSidecarsByRangeRequest) ([]*pb.BlobSidecar, error) {
topic, err := p2p.TopicFromMessage(p2p.BlobSidecarsByRangeName, slots.ToEpoch(tor.CurrentSlot()))
if err != nil {
return nil, err
}
@@ -147,7 +154,11 @@ func SendBlobsByRangeRequest(ctx context.Context, ci blockchain.ForkFetcher, p2p
}
defer closeStream(stream, log)
return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRangeReq(req))
max := params.BeaconNetworkConfig().MaxRequestBlobSidecars
if max > req.Count*fieldparams.MaxBlobsPerBlock {
max = req.Count * fieldparams.MaxBlobsPerBlock
}
return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRangeReq(req), max)
}
func SendBlobSidecarByRoot(
@@ -169,14 +180,13 @@ func SendBlobSidecarByRoot(
}
defer closeStream(stream, log)
return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req))
max := params.BeaconNetworkConfig().MaxRequestBlobSidecars
if max > uint64(len(*req))*fieldparams.MaxBlobsPerBlock {
max = uint64(len(*req)) * fieldparams.MaxBlobsPerBlock
}
return readChunkEncodedBlobs(stream, p2pApi.Encoding(), ctxMap, blobValidatorFromRootReq(req), max)
}
var ErrBlobChunkedReadFailure = errors.New("failed to read stream of chunk-encoded blobs")
var ErrBlobUnmarshal = errors.New("Could not unmarshal chunk-encoded blob")
var ErrUnrequestedRoot = errors.New("Received BlobSidecar in response that was not requested")
var ErrBlobResponseOutOfBounds = errors.New("received BlobSidecar with slot outside BlobSidecarsByRangeRequest bounds")
type blobResponseValidation func(*pb.BlobSidecar) error
func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseValidation {
@@ -186,7 +196,7 @@ func blobValidatorFromRootReq(req *p2ptypes.BlobSidecarsByRootReq) blobResponseV
}
return func(sc *pb.BlobSidecar) error {
if requested := roots[bytesutil.ToBytes32(sc.BlockRoot)]; !requested {
return errors.Wrapf(ErrUnrequestedRoot, "root=%#x", sc.BlockRoot)
return errors.Wrapf(errUnrequestedRoot, "root=%#x", sc.BlockRoot)
}
return nil
}
@@ -196,52 +206,69 @@ func blobValidatorFromRangeReq(req *pb.BlobSidecarsByRangeRequest) blobResponseV
end := req.StartSlot + primitives.Slot(req.Count)
return func(sc *pb.BlobSidecar) error {
if sc.Slot < req.StartSlot || sc.Slot >= end {
return errors.Wrapf(ErrBlobResponseOutOfBounds, "req start,end:%d,%d, resp:%d", req.StartSlot, end, sc.Slot)
return errors.Wrapf(errBlobResponseOutOfBounds, "req start,end:%d,%d, resp:%d", req.StartSlot, end, sc.Slot)
}
return nil
}
}
func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation) ([]*pb.BlobSidecar, error) {
decode := encoding.DecodeWithMaxLength
max := int(params.BeaconNetworkConfig().MaxRequestBlobSidecars)
var (
code uint8
msg string
err error
)
func readChunkEncodedBlobs(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation, max uint64) ([]*pb.BlobSidecar, error) {
sidecars := make([]*pb.BlobSidecar, 0)
for i := 0; i < max; i++ {
code, msg, err = ReadStatusCode(stream, encoding)
// Attempt an extra read beyond max to check if the peer is violating the spec by
// sending more than MAX_REQUEST_BLOB_SIDECARS, or more blobs than requested.
for i := uint64(0); i < max+1; i++ {
sc, err := readChunkedBlobSidecar(stream, encoding, ctxMap, vf)
if err != nil {
break
if errors.Is(err, io.EOF) {
break
}
return nil, err
}
if code != 0 {
return nil, errors.Wrap(ErrBlobChunkedReadFailure, msg)
}
ctxb, err := readContextFromStream(stream)
if err != nil {
return nil, errors.Wrap(err, "error reading chunk context bytes from stream")
}
v, found := ctxMap[bytesutil.ToBytes4(ctxb)]
if !found {
return nil, errors.Wrapf(ErrBlobUnmarshal, fmt.Sprintf("unrecognized fork digest %#x", ctxb))
}
if v != version.Deneb {
return nil, fmt.Errorf("unexpected context bytes for deneb BlobSidecar, ctx=%#x, v=%s", ctxb, version.String(v))
}
sc := &pb.BlobSidecar{}
if err := decode(stream, sc); err != nil {
return nil, errors.Wrap(err, "failed to decode the protobuf-encoded BlobSidecar message from RPC chunk stream")
}
if err := vf(sc); err != nil {
return nil, errors.Wrap(err, "validation failure decoding blob RPC response")
if i == max {
// We have read an extra sidecar beyond what the spec allows. Since this is a spec violation, we return
// an error that wraps ErrInvalidFetchedData. The part of the state machine that handles rpc peer scoring
// will downscore the peer if the request ends in an error that wraps that one.
return nil, errMaxRequestBlobSidecarsExceeded
}
sidecars = append(sidecars, sc)
}
if !errors.Is(err, io.EOF) {
return nil, err
}
return sidecars, nil
}
func readChunkedBlobSidecar(stream network.Stream, encoding encoder.NetworkEncoding, ctxMap ContextByteVersions, vf blobResponseValidation) (*pb.BlobSidecar, error) {
decode := encoding.DecodeWithMaxLength
var (
code uint8
msg string
)
code, msg, err := ReadStatusCode(stream, encoding)
if err != nil {
return nil, err
}
if code != 0 {
return nil, errors.Wrap(errBlobChunkedReadFailure, msg)
}
ctxb, err := readContextFromStream(stream)
if err != nil {
return nil, errors.Wrap(err, "error reading chunk context bytes from stream")
}
v, found := ctxMap[bytesutil.ToBytes4(ctxb)]
if !found {
return nil, errors.Wrapf(errBlobUnmarshal, fmt.Sprintf("unrecognized fork digest %#x", ctxb))
}
// Only deneb is supported at this time, because we lack a fork-spanning interface/union type for blobs.
if v != version.Deneb {
return nil, fmt.Errorf("unexpected context bytes for deneb BlobSidecar, ctx=%#x, v=%s", ctxb, version.String(v))
}
sc := &pb.BlobSidecar{}
if err := decode(stream, sc); err != nil {
return nil, errors.Wrap(err, "failed to decode the protobuf-encoded BlobSidecar message from RPC chunk stream")
}
if err := vf(sc); err != nil {
return nil, errors.Wrap(err, "validation failure decoding blob RPC response")
}
return sc, nil
}

View File

@@ -495,7 +495,7 @@ func TestBlobValidatorFromRootReq(t *testing.T) {
name: "invalid",
ids: []*ethpb.BlobIdentifier{{BlockRoot: validRoot}},
response: []*ethpb.BlobSidecar{{BlockRoot: invalidRoot}},
err: ErrUnrequestedRoot,
err: errUnrequestedRoot,
},
}
for _, c := range cases {
@@ -544,7 +544,7 @@ func TestBlobValidatorFromRangeReq(t *testing.T) {
Count: 1,
},
response: []*ethpb.BlobSidecar{{Slot: 9}},
err: ErrBlobResponseOutOfBounds,
err: errBlobResponseOutOfBounds,
},
{
name: "invalid - after, count 1",
@@ -553,7 +553,7 @@ func TestBlobValidatorFromRangeReq(t *testing.T) {
Count: 1,
},
response: []*ethpb.BlobSidecar{{Slot: 11}},
err: ErrBlobResponseOutOfBounds,
err: errBlobResponseOutOfBounds,
},
{
name: "invalid - after, multi",
@@ -562,7 +562,7 @@ func TestBlobValidatorFromRangeReq(t *testing.T) {
Count: 10,
},
response: []*ethpb.BlobSidecar{{Slot: 23}},
err: ErrBlobResponseOutOfBounds,
err: errBlobResponseOutOfBounds,
},
{
name: "invalid - after, at boundary, multi",
@@ -571,7 +571,7 @@ func TestBlobValidatorFromRangeReq(t *testing.T) {
Count: 10,
},
response: []*ethpb.BlobSidecar{{Slot: 20}},
err: ErrBlobResponseOutOfBounds,
err: errBlobResponseOutOfBounds,
},
}
for _, c := range cases {

View File

@@ -5,6 +5,7 @@ import (
"sort"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
)
// ROBlock is a value that embeds a ReadOnlySignedBeaconBlock along with its block root ([32]byte).
@@ -73,3 +74,44 @@ func (s ROBlockSlice) Swap(i, j int) {
func (s ROBlockSlice) Len() int {
return len(s)
}
type BlockWithVerifiedBlobs struct {
Block ROBlock
Blobs []*eth.BlobSidecar
}
type BlockWithVerifiedBlobsSlice []BlockWithVerifiedBlobs
func (s BlockWithVerifiedBlobsSlice) ROBlocks() []ROBlock {
r := make([]ROBlock, len(s))
for i := range s {
r[i] = s[i].Block
}
return r
}
// Less reports whether the element with index i must sort before the element with index j.
// ROBlocks are ordered first by their slot,
// with a lexicographic sort of roots breaking ties for slots with duplicate blocks.
func (s BlockWithVerifiedBlobsSlice) Less(i, j int) bool {
si, sj := s[i].Block.Block().Slot(), s[j].Block.Block().Slot()
// lower slot wins
if si != sj {
return si < sj
}
// break slot tie lexicographically comparing roots byte for byte
ri, rj := s[i].Block.Root(), s[j].Block.Root()
return bytes.Compare(ri[:], rj[:]) < 0
}
// Swap swaps the elements with indexes i and j.
func (s BlockWithVerifiedBlobsSlice) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// Len is the number of elements in the collection.
func (s BlockWithVerifiedBlobsSlice) Len() int {
return len(s)
}

View File

@@ -13,6 +13,7 @@ go_library(
"block.go",
"capella_block.go",
"capella_state.go",
"deneb.go",
"deneb_state.go",
"deposits.go",
"helpers.go",
@@ -55,7 +56,9 @@ go_library(
"//testing/assertions:go_default_library",
"//testing/require:go_default_library",
"//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",

113
testing/util/deneb.go Normal file
View File

@@ -0,0 +1,113 @@
package util
import (
"encoding/binary"
"math/big"
"testing"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
enginev1 "github.com/prysmaticlabs/prysm/v4/proto/engine/v1"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/testing/require"
)
func GenerateTestDenebBlockWithSidecar(t *testing.T, parent [32]byte, slot primitives.Slot, nblobs int) (blocks.ROBlock, []*ethpb.BlobSidecar) {
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
stateRoot := bytesutil.PadTo([]byte("stateRoot"), fieldparams.RootLength)
receiptsRoot := bytesutil.PadTo([]byte("receiptsRoot"), fieldparams.RootLength)
logsBloom := bytesutil.PadTo([]byte("logs"), fieldparams.LogsBloomLength)
parentHash := bytesutil.PadTo([]byte("parentHash"), fieldparams.RootLength)
tx := gethTypes.NewTransaction(
0,
common.HexToAddress("095e7baea6a6c7c4c2dfeb977efac326af552d87"),
big.NewInt(0), 0, big.NewInt(0),
nil,
)
txs := []*gethTypes.Transaction{tx}
encodedBinaryTxs := make([][]byte, 1)
var err error
encodedBinaryTxs[0], err = txs[0].MarshalBinary()
require.NoError(t, err)
blockHash := bytesutil.ToBytes32([]byte("foo"))
payload := &enginev1.ExecutionPayloadDeneb{
ParentHash: parentHash,
FeeRecipient: make([]byte, fieldparams.FeeRecipientLength),
StateRoot: stateRoot,
ReceiptsRoot: receiptsRoot,
LogsBloom: logsBloom,
PrevRandao: blockHash[:],
BlockNumber: 0,
GasLimit: 0,
GasUsed: 0,
Timestamp: 0,
ExtraData: make([]byte, 0),
BaseFeePerGas: bytesutil.PadTo([]byte("baseFeePerGas"), fieldparams.RootLength),
ExcessDataGas: 0,
BlockHash: blockHash[:],
Transactions: encodedBinaryTxs,
}
block := NewBeaconBlockDeneb()
block.Block.Body.ExecutionPayload = payload
block.Block.Slot = slot
block.Block.ParentRoot = parent[:]
commitments := make([][48]byte, nblobs)
block.Block.Body.BlobKzgCommitments = make([][]byte, nblobs)
for i := range commitments {
binary.LittleEndian.PutUint16(commitments[i][0:16], uint16(i))
binary.LittleEndian.PutUint16(commitments[i][16:32], uint16(slot))
block.Block.Body.BlobKzgCommitments[i] = commitments[i][:]
}
root, err := block.Block.HashTreeRoot()
require.NoError(t, err)
sidecars := make([]*ethpb.BlobSidecar, len(commitments))
for i, c := range block.Block.Body.BlobKzgCommitments {
sidecars[i] = GenerateTestDenebBlobSidecar(root, block, i, c)
}
sbb, err := blocks.NewSignedBeaconBlock(block)
require.NoError(t, err)
rob, err := blocks.NewROBlock(sbb)
require.NoError(t, err)
return rob, sidecars
}
func GenerateTestDenebBlobSidecar(root [32]byte, block *ethpb.SignedBeaconBlockDeneb, index int, commitment []byte) *ethpb.BlobSidecar {
blob := make([]byte, fieldparams.BlobSize)
binary.LittleEndian.PutUint64(blob, uint64(index))
sc := &ethpb.BlobSidecar{
BlockRoot: root[:],
Index: uint64(index),
Slot: block.Block.Slot,
BlockParentRoot: block.Block.ParentRoot,
ProposerIndex: block.Block.ProposerIndex,
Blob: blob,
KzgCommitment: commitment,
KzgProof: commitment,
}
return sc
}
func ExtendBlocksPlusBlobs(t *testing.T, blks []blocks.ROBlock, size int) ([]blocks.ROBlock, []*ethpb.BlobSidecar) {
blobs := make([]*ethpb.BlobSidecar, 0)
if len(blks) == 0 {
blk, blb := GenerateTestDenebBlockWithSidecar(t, [32]byte{}, 0, 6)
blobs = append(blobs, blb...)
blks = append(blks, blk)
}
for i := 0; i < size; i++ {
prev := blks[len(blks)-1]
blk, blb := GenerateTestDenebBlockWithSidecar(t, prev.Root(), prev.Block().Slot()+1, 6)
blobs = append(blobs, blb...)
blks = append(blks, blk)
}
return blks, blobs
}