mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-10 13:58:09 -05:00
Compare commits
1 Commits
initial-sy
...
e2e-blockr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b0ba6ba97 |
@@ -42,6 +42,7 @@ go_library(
|
||||
"subscriber_sync_committee_message.go",
|
||||
"subscriber_sync_contribution_proof.go",
|
||||
"subscription_topic_handler.go",
|
||||
"utils.go",
|
||||
"validate_aggregate_proof.go",
|
||||
"validate_attester_slashing.go",
|
||||
"validate_beacon_attestation.go",
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
libp2pcore "github.com/libp2p/go-libp2p/core"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filters"
|
||||
p2ptypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types"
|
||||
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
@@ -25,17 +26,36 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
|
||||
defer cancel()
|
||||
SetRPCStreamDeadlines(stream)
|
||||
|
||||
// Ticker to stagger out large requests.
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
m, ok := msg.(*pb.BeaconBlocksByRangeRequest)
|
||||
if !ok {
|
||||
return errors.New("message is not type *pb.BeaconBlockByRangeRequest")
|
||||
}
|
||||
start, end, size, err := validateRangeRequest(m, s.cfg.chain.CurrentSlot())
|
||||
if err != nil {
|
||||
if err := s.validateRangeRequest(m); err != nil {
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
|
||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
// Only have range requests with a step of 1 being processed.
|
||||
if m.Step > 1 {
|
||||
m.Step = 1
|
||||
}
|
||||
// The initial count for the first batch to be returned back.
|
||||
count := m.Count
|
||||
allowedBlocksPerSecond := uint64(flags.Get().BlockBatchLimit)
|
||||
if count > allowedBlocksPerSecond {
|
||||
count = allowedBlocksPerSecond
|
||||
}
|
||||
// initial batch start and end slots to be returned to remote peer.
|
||||
startSlot := m.StartSlot
|
||||
endSlot := startSlot.Add(m.Step * (count - 1))
|
||||
|
||||
// The final requested slot from remote peer.
|
||||
endReqSlot := startSlot.Add(m.Step * (m.Count - 1))
|
||||
|
||||
blockLimiter, err := s.rateLimiter.topicCollector(string(stream.Protocol()))
|
||||
if err != nil {
|
||||
@@ -43,87 +63,105 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
|
||||
}
|
||||
remainingBucketCapacity := blockLimiter.Remaining(stream.Conn().RemotePeer().String())
|
||||
span.AddAttributes(
|
||||
trace.Int64Attribute("start", int64(start)), // lint:ignore uintcast -- This conversion is OK for tracing.
|
||||
trace.Int64Attribute("end", int64(end)), // lint:ignore uintcast -- This conversion is OK for tracing.
|
||||
trace.Int64Attribute("start", int64(startSlot)), // lint:ignore uintcast -- This conversion is OK for tracing.
|
||||
trace.Int64Attribute("end", int64(endReqSlot)), // lint:ignore uintcast -- This conversion is OK for tracing.
|
||||
trace.Int64Attribute("step", int64(m.Step)),
|
||||
trace.Int64Attribute("count", int64(m.Count)),
|
||||
trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()),
|
||||
trace.Int64Attribute("remaining_capacity", remainingBucketCapacity),
|
||||
)
|
||||
|
||||
// Ticker to stagger out large requests.
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
batcher := &blockRangeBatcher{
|
||||
start: start,
|
||||
end: end,
|
||||
size: size,
|
||||
db: s.cfg.beaconDB,
|
||||
limiter: s.rateLimiter,
|
||||
isCanonical: s.cfg.chain.IsCanonical,
|
||||
ticker: ticker,
|
||||
}
|
||||
|
||||
// prevRoot is used to ensure that returned chains are strictly linear for singular steps
|
||||
// by comparing the previous root of the block in the list with the current block's parent.
|
||||
var batch blockBatch
|
||||
for batch, ok = batcher.Next(ctx, stream); ok; batch, ok = batcher.Next(ctx, stream) {
|
||||
batchStart := time.Now()
|
||||
rpcBlocksByRangeResponseLatency.Observe(float64(time.Since(batchStart).Milliseconds()))
|
||||
if err := s.writeBlockBatchToStream(ctx, batch, stream); err != nil {
|
||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
||||
var prevRoot [32]byte
|
||||
for startSlot <= endReqSlot {
|
||||
if err := s.rateLimiter.validateRequest(stream, allowedBlocksPerSecond); err != nil {
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := batch.Err(); err != nil {
|
||||
log.WithError(err).Debug("error in BlocksByRange batch")
|
||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
|
||||
if endSlot-startSlot > primitives.Slot(rangeLimit) {
|
||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrInvalidRequest.Error(), stream)
|
||||
err := p2ptypes.ErrInvalidRequest
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
|
||||
err := s.writeBlockRangeToStream(ctx, startSlot, endSlot, m.Step, &prevRoot, stream)
|
||||
if err != nil && !errors.Is(err, p2ptypes.ErrInvalidParent) {
|
||||
return err
|
||||
}
|
||||
// Reduce capacity of peer in the rate limiter first.
|
||||
// Decrease allowed blocks capacity by the number of streamed blocks.
|
||||
if startSlot <= endSlot {
|
||||
s.rateLimiter.add(stream, int64(1+endSlot.SubSlot(startSlot).Div(m.Step)))
|
||||
}
|
||||
// Exit in the event we have a disjoint chain to
|
||||
// return.
|
||||
if errors.Is(err, p2ptypes.ErrInvalidParent) {
|
||||
break
|
||||
}
|
||||
|
||||
// Recalculate start and end slots for the next batch to be returned to the remote peer.
|
||||
startSlot = endSlot.Add(m.Step)
|
||||
endSlot = startSlot.Add(m.Step * (allowedBlocksPerSecond - 1))
|
||||
if endSlot > endReqSlot {
|
||||
endSlot = endReqSlot
|
||||
}
|
||||
|
||||
// do not wait if all blocks have already been sent.
|
||||
if startSlot > endReqSlot {
|
||||
break
|
||||
}
|
||||
|
||||
// wait for ticker before resuming streaming blocks to remote peer.
|
||||
<-ticker.C
|
||||
}
|
||||
closeStream(stream, log)
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateRangeRequest(r *pb.BeaconBlocksByRangeRequest, current primitives.Slot) (primitives.Slot, primitives.Slot, uint64, error) {
|
||||
start := r.StartSlot
|
||||
count := r.Count
|
||||
maxRequest := params.BeaconNetworkConfig().MaxRequestBlocks
|
||||
// Ensure all request params are within appropriate bounds
|
||||
if count == 0 || count > maxRequest {
|
||||
return 0, 0, 0, p2ptypes.ErrInvalidRequest
|
||||
}
|
||||
// Allow some wiggle room, up to double the MaxRequestBlocks past the current slot,
|
||||
// to give nodes syncing close to the head of the chain some margin for error.
|
||||
maxStart, err := current.SafeAdd(maxRequest * 2)
|
||||
if err != nil {
|
||||
return 0, 0, 0, p2ptypes.ErrInvalidRequest
|
||||
}
|
||||
if start > maxStart {
|
||||
return 0, 0, 0, p2ptypes.ErrInvalidRequest
|
||||
}
|
||||
end, err := start.SafeAdd((count - 1))
|
||||
if err != nil {
|
||||
return 0, 0, 0, p2ptypes.ErrInvalidRequest
|
||||
}
|
||||
|
||||
limit := uint64(flags.Get().BlockBatchLimit)
|
||||
if limit > maxRequest {
|
||||
limit = maxRequest
|
||||
}
|
||||
batchSize := count
|
||||
if batchSize > limit {
|
||||
batchSize = limit
|
||||
}
|
||||
|
||||
return start, end, batchSize, nil
|
||||
}
|
||||
|
||||
func (s *Service) writeBlockBatchToStream(ctx context.Context, batch blockBatch, stream libp2pcore.Stream) error {
|
||||
func (s *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlot primitives.Slot, step uint64,
|
||||
prevRoot *[32]byte, stream libp2pcore.Stream) error {
|
||||
ctx, span := trace.StartSpan(ctx, "sync.WriteBlockRangeToStream")
|
||||
defer span.End()
|
||||
|
||||
blks := batch.Sequence()
|
||||
filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot).SetSlotStep(step)
|
||||
blks, roots, err := s.cfg.beaconDB.Blocks(ctx, filter)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not retrieve blocks")
|
||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
// handle genesis case
|
||||
if startSlot == 0 {
|
||||
genBlock, genRoot, err := s.retrieveGenesisBlock(ctx)
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("Could not retrieve genesis block")
|
||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
blks = append([]interfaces.ReadOnlySignedBeaconBlock{genBlock}, blks...)
|
||||
roots = append([][32]byte{genRoot}, roots...)
|
||||
}
|
||||
// Filter and sort our retrieved blocks, so that
|
||||
// we only return valid sets of blocks.
|
||||
blks, roots, err = s.dedupBlocksAndRoots(blks, roots)
|
||||
if err != nil {
|
||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
blks, roots = s.sortBlocksAndRoots(blks, roots)
|
||||
|
||||
blks, err = s.filterBlocks(ctx, blks, roots, prevRoot, step, startSlot)
|
||||
if err != nil && err != p2ptypes.ErrInvalidParent {
|
||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
||||
tracing.AnnotateError(span, err)
|
||||
return err
|
||||
}
|
||||
start := time.Now()
|
||||
// If the blocks are blinded, we reconstruct the full block via the execution client.
|
||||
blindedExists := false
|
||||
blindedIndex := 0
|
||||
@@ -137,6 +175,16 @@ func (s *Service) writeBlockBatchToStream(ctx context.Context, batch blockBatch,
|
||||
}
|
||||
}
|
||||
|
||||
var reconstructedBlock []interfaces.SignedBeaconBlock
|
||||
if blindedExists {
|
||||
reconstructedBlock, err = s.cfg.executionPayloadReconstructor.ReconstructFullBellatrixBlockBatch(ctx, blks[blindedIndex:])
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not reconstruct full bellatrix block batch from blinded bodies")
|
||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for _, b := range blks {
|
||||
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
||||
continue
|
||||
@@ -146,24 +194,12 @@ func (s *Service) writeBlockBatchToStream(ctx context.Context, batch blockBatch,
|
||||
}
|
||||
if chunkErr := s.chunkBlockWriter(stream, b); chunkErr != nil {
|
||||
log.WithError(chunkErr).Debug("Could not send a chunked response")
|
||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
||||
tracing.AnnotateError(span, chunkErr)
|
||||
return chunkErr
|
||||
}
|
||||
}
|
||||
|
||||
var err error
|
||||
var reconstructedBlock []interfaces.SignedBeaconBlock
|
||||
if blindedExists {
|
||||
blinded := blks[blindedIndex:]
|
||||
unwrapped := make([]interfaces.ReadOnlySignedBeaconBlock, len(blinded))
|
||||
for i := range blinded {
|
||||
unwrapped[i] = blks[i].ReadOnlySignedBeaconBlock
|
||||
}
|
||||
reconstructedBlock, err = s.cfg.executionPayloadReconstructor.ReconstructFullBellatrixBlockBatch(ctx, unwrapped)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not reconstruct full bellatrix block batch from blinded bodies")
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, b := range reconstructedBlock {
|
||||
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
||||
continue
|
||||
@@ -173,9 +209,96 @@ func (s *Service) writeBlockBatchToStream(ctx context.Context, batch blockBatch,
|
||||
}
|
||||
if chunkErr := s.chunkBlockWriter(stream, b); chunkErr != nil {
|
||||
log.WithError(chunkErr).Debug("Could not send a chunked response")
|
||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
||||
tracing.AnnotateError(span, chunkErr)
|
||||
return chunkErr
|
||||
}
|
||||
}
|
||||
rpcBlocksByRangeResponseLatency.Observe(float64(time.Since(start).Milliseconds()))
|
||||
// Return error in the event we have an invalid parent.
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Service) validateRangeRequest(r *pb.BeaconBlocksByRangeRequest) error {
|
||||
startSlot := r.StartSlot
|
||||
count := r.Count
|
||||
step := r.Step
|
||||
|
||||
maxRequestBlocks := params.BeaconNetworkConfig().MaxRequestBlocks
|
||||
// Add a buffer for possible large range requests from nodes syncing close to the
|
||||
// head of the chain.
|
||||
buffer := rangeLimit * 2
|
||||
highestExpectedSlot := s.cfg.chain.CurrentSlot().Add(uint64(buffer))
|
||||
|
||||
// Ensure all request params are within appropriate bounds
|
||||
if count == 0 || count > maxRequestBlocks {
|
||||
return p2ptypes.ErrInvalidRequest
|
||||
}
|
||||
|
||||
if step == 0 || step > rangeLimit {
|
||||
return p2ptypes.ErrInvalidRequest
|
||||
}
|
||||
|
||||
if startSlot > highestExpectedSlot {
|
||||
return p2ptypes.ErrInvalidRequest
|
||||
}
|
||||
|
||||
endSlot := startSlot.Add(step * (count - 1))
|
||||
if endSlot-startSlot > primitives.Slot(rangeLimit) {
|
||||
return p2ptypes.ErrInvalidRequest
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// filters all the provided blocks to ensure they are canonical
|
||||
// and are strictly linear.
|
||||
func (s *Service) filterBlocks(ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock, roots [][32]byte, prevRoot *[32]byte,
|
||||
step uint64, startSlot primitives.Slot) ([]interfaces.ReadOnlySignedBeaconBlock, error) {
|
||||
if len(blks) != len(roots) {
|
||||
return nil, errors.New("input blks and roots are diff lengths")
|
||||
}
|
||||
|
||||
newBlks := make([]interfaces.ReadOnlySignedBeaconBlock, 0, len(blks))
|
||||
for i, b := range blks {
|
||||
isCanonical, err := s.cfg.chain.IsCanonical(ctx, roots[i])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
parentValid := *prevRoot != [32]byte{}
|
||||
isLinear := *prevRoot == b.Block().ParentRoot()
|
||||
isSingular := step == 1
|
||||
slotDiff, err := b.Block().Slot().SafeSubSlot(startSlot)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
slotDiff, err = slotDiff.SafeMod(step)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
isRequestedSlotStep := slotDiff == 0
|
||||
if isRequestedSlotStep && isCanonical {
|
||||
// Exit early if our valid block is non linear.
|
||||
if parentValid && isSingular && !isLinear {
|
||||
return newBlks, p2ptypes.ErrInvalidParent
|
||||
}
|
||||
newBlks = append(newBlks, blks[i])
|
||||
// Set the previous root as the
|
||||
// newly added block's root
|
||||
currRoot := roots[i]
|
||||
*prevRoot = currRoot
|
||||
}
|
||||
}
|
||||
return newBlks, nil
|
||||
}
|
||||
|
||||
func (s *Service) retrieveGenesisBlock(ctx context.Context) (interfaces.ReadOnlySignedBeaconBlock, [32]byte, error) {
|
||||
genBlock, err := s.cfg.beaconDB.GenesisBlock(ctx)
|
||||
if err != nil {
|
||||
return nil, [32]byte{}, err
|
||||
}
|
||||
genRoot, err := genBlock.Block().HashTreeRoot()
|
||||
if err != nil {
|
||||
return nil, [32]byte{}, err
|
||||
}
|
||||
return genBlock, genRoot, nil
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ import (
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||
leakybucket "github.com/prysmaticlabs/prysm/v4/container/leaky-bucket"
|
||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||
@@ -597,7 +598,8 @@ func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) {
|
||||
Step: 0,
|
||||
Count: 1,
|
||||
},
|
||||
expectedError: nil, // The Step param is ignored in v2 RPC
|
||||
expectedError: p2ptypes.ErrInvalidRequest,
|
||||
errorToLog: "validation did not fail with bad step",
|
||||
},
|
||||
{
|
||||
name: "Over limit Step",
|
||||
@@ -605,7 +607,8 @@ func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) {
|
||||
Step: rangeLimit + 1,
|
||||
Count: 1,
|
||||
},
|
||||
expectedError: nil, // The Step param is ignored in v2 RPC
|
||||
expectedError: p2ptypes.ErrInvalidRequest,
|
||||
errorToLog: "validation did not fail with bad step",
|
||||
},
|
||||
{
|
||||
name: "Correct Step",
|
||||
@@ -640,7 +643,8 @@ func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) {
|
||||
Step: 3,
|
||||
Count: uint64(slotsSinceGenesis / 2),
|
||||
},
|
||||
expectedError: nil, // this is fine with the deprecation of Step
|
||||
expectedError: p2ptypes.ErrInvalidRequest,
|
||||
errorToLog: "validation did not fail with bad range",
|
||||
},
|
||||
{
|
||||
name: "Valid Request",
|
||||
@@ -655,11 +659,10 @@ func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
_, _, _, err := validateRangeRequest(tt.req, r.cfg.chain.CurrentSlot())
|
||||
if tt.expectedError != nil {
|
||||
assert.ErrorContains(t, tt.expectedError.Error(), err, tt.errorToLog)
|
||||
assert.ErrorContains(t, tt.expectedError.Error(), r.validateRangeRequest(tt.req), tt.errorToLog)
|
||||
} else {
|
||||
assert.NoError(t, err, tt.errorToLog)
|
||||
assert.NoError(t, r.validateRangeRequest(tt.req), tt.errorToLog)
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -1067,6 +1070,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks_PreviousRoot(t *testing.T) {
|
||||
p2 := p2ptest.NewTestP2P(t)
|
||||
p1.Connect(p2)
|
||||
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
|
||||
d := db.SetupDB(t)
|
||||
|
||||
req := ðpb.BeaconBlocksByRangeRequest{
|
||||
StartSlot: 100,
|
||||
@@ -1077,7 +1081,8 @@ func TestRPCBeaconBlocksByRange_FilterBlocks_PreviousRoot(t *testing.T) {
|
||||
// Populate the database with blocks that would match the request.
|
||||
var prevRoot [32]byte
|
||||
var err error
|
||||
var blks []blocks.ROBlock
|
||||
blks := []interfaces.ReadOnlySignedBeaconBlock{}
|
||||
var roots [][32]byte
|
||||
for i := req.StartSlot; i < req.StartSlot.Add(req.Count); i += primitives.Slot(1) {
|
||||
blk := util.NewBeaconBlock()
|
||||
blk.Block.Slot = i
|
||||
@@ -1086,18 +1091,21 @@ func TestRPCBeaconBlocksByRange_FilterBlocks_PreviousRoot(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
wsb, err := blocks.NewSignedBeaconBlock(blk)
|
||||
require.NoError(t, err)
|
||||
blks = append(blks, wsb)
|
||||
copiedRt := prevRoot
|
||||
blks = append(blks, blocks.NewROBlock(wsb, copiedRt))
|
||||
roots = append(roots, copiedRt)
|
||||
}
|
||||
|
||||
chain := &chainMock.ChainService{}
|
||||
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
|
||||
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
|
||||
|
||||
var initialRoot [32]byte
|
||||
ptrRt := &initialRoot
|
||||
seq, nseq, err := filterCanonical(context.Background(), blks, ptrRt, chain.IsCanonical)
|
||||
newBlks, err := r.filterBlocks(context.Background(), blks, roots, ptrRt, req.Step, req.StartSlot)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, len(blks), len(seq))
|
||||
require.Equal(t, 0, len(nseq))
|
||||
require.Equal(t, len(blks), len(newBlks))
|
||||
|
||||
// pointer should reference a new root.
|
||||
require.NotEqual(t, *ptrRt, [32]byte{})
|
||||
|
||||
}
|
||||
|
||||
64
beacon-chain/sync/utils.go
Normal file
64
beacon-chain/sync/utils.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sort"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||
)
|
||||
|
||||
// A type to represent beacon blocks and roots which have methods
|
||||
// which satisfy the Interface in `Sort` so that this type can
|
||||
// be sorted in ascending order.
|
||||
type sortedObj struct {
|
||||
blks []interfaces.ReadOnlySignedBeaconBlock
|
||||
roots [][32]byte
|
||||
}
|
||||
|
||||
// Less reports whether the element with index i must sort before the element with index j.
|
||||
func (s sortedObj) Less(i, j int) bool {
|
||||
return s.blks[i].Block().Slot() < s.blks[j].Block().Slot()
|
||||
}
|
||||
|
||||
// Swap swaps the elements with indexes i and j.
|
||||
func (s sortedObj) Swap(i, j int) {
|
||||
s.blks[i], s.blks[j] = s.blks[j], s.blks[i]
|
||||
s.roots[i], s.roots[j] = s.roots[j], s.roots[i]
|
||||
}
|
||||
|
||||
// Len is the number of elements in the collection.
|
||||
func (s sortedObj) Len() int {
|
||||
return len(s.blks)
|
||||
}
|
||||
|
||||
// removes duplicates from provided blocks and roots.
|
||||
func (_ *Service) dedupBlocksAndRoots(blks []interfaces.ReadOnlySignedBeaconBlock, roots [][32]byte) ([]interfaces.ReadOnlySignedBeaconBlock, [][32]byte, error) {
|
||||
if len(blks) != len(roots) {
|
||||
return nil, nil, errors.New("input blks and roots are diff lengths")
|
||||
}
|
||||
|
||||
// Remove duplicate blocks received
|
||||
rootMap := make(map[[32]byte]bool, len(blks))
|
||||
newBlks := make([]interfaces.ReadOnlySignedBeaconBlock, 0, len(blks))
|
||||
newRoots := make([][32]byte, 0, len(roots))
|
||||
for i, r := range roots {
|
||||
if rootMap[r] {
|
||||
continue
|
||||
}
|
||||
rootMap[r] = true
|
||||
newRoots = append(newRoots, roots[i])
|
||||
newBlks = append(newBlks, blks[i])
|
||||
}
|
||||
return newBlks, newRoots, nil
|
||||
}
|
||||
|
||||
// sort the provided blocks and roots in ascending order. This method assumes that the size of
|
||||
// block slice and root slice is equal.
|
||||
func (_ *Service) sortBlocksAndRoots(blks []interfaces.ReadOnlySignedBeaconBlock, roots [][32]byte) ([]interfaces.ReadOnlySignedBeaconBlock, [][32]byte) {
|
||||
obj := sortedObj{
|
||||
blks: blks,
|
||||
roots: roots,
|
||||
}
|
||||
sort.Sort(obj)
|
||||
return obj.blks, obj.roots
|
||||
}
|
||||
Reference in New Issue
Block a user