From 385a317902043656fb9437db36e033681b65d1d0 Mon Sep 17 00:00:00 2001 From: kasey <489222+kasey@users.noreply.github.com> Date: Fri, 19 May 2023 11:59:13 -0500 Subject: [PATCH] Revert initsync revert (#12431) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Revert "Revert "BeaconBlocksByRange and BlobSidecarsByRange consistency (#123… (#12426)" This reverts commit ddc1e48e0505db46616e937d0e3fc54e465a7c68. * fix metrics bug, add batch.next tests --------- Co-authored-by: Kasey Kirkham --- beacon-chain/sync/BUILD.bazel | 5 +- beacon-chain/sync/block_batcher.go | 231 +++++++++++++ beacon-chain/sync/block_batcher_test.go | 134 ++++++++ beacon-chain/sync/pending_blocks_queue.go | 15 +- .../sync/rpc_beacon_blocks_by_range.go | 319 +++++------------- .../sync/rpc_beacon_blocks_by_range_test.go | 84 ++--- beacon-chain/sync/rpc_handler_test.go | 56 +++ beacon-chain/sync/service.go | 7 +- beacon-chain/sync/utils.go | 77 ----- beacon-chain/sync/utils_test.go | 85 ----- consensus-types/blocks/BUILD.bazel | 2 + consensus-types/blocks/roblock.go | 75 ++++ consensus-types/blocks/roblock_test.go | 90 +++++ testing/util/wait_timeout.go | 40 +++ 14 files changed, 771 insertions(+), 449 deletions(-) create mode 100644 beacon-chain/sync/block_batcher.go create mode 100644 beacon-chain/sync/block_batcher_test.go create mode 100644 beacon-chain/sync/rpc_handler_test.go delete mode 100644 beacon-chain/sync/utils.go delete mode 100644 beacon-chain/sync/utils_test.go create mode 100644 consensus-types/blocks/roblock.go create mode 100644 consensus-types/blocks/roblock_test.go diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 734acce1d1..17c1c503d3 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "batch_verifier.go", + "block_batcher.go", "broadcast_bls_changes.go", "context.go", "deadlines.go", @@ -37,7 +38,6 @@ go_library( "subscriber_sync_committee_message.go", "subscriber_sync_contribution_proof.go", "subscription_topic_handler.go", - "utils.go", "validate_aggregate_proof.go", "validate_attester_slashing.go", "validate_beacon_attestation.go", @@ -134,6 +134,7 @@ go_test( size = "small", srcs = [ "batch_verifier_test.go", + "block_batcher_test.go", "broadcast_bls_changes_test.go", "context_test.go", "decode_pubsub_test.go", @@ -146,6 +147,7 @@ go_test( "rpc_beacon_blocks_by_root_test.go", "rpc_chunked_response_test.go", "rpc_goodbye_test.go", + "rpc_handler_test.go", "rpc_metadata_test.go", "rpc_ping_test.go", "rpc_send_request_test.go", @@ -158,7 +160,6 @@ go_test( "subscription_topic_handler_test.go", "sync_fuzz_test.go", "sync_test.go", - "utils_test.go", "validate_aggregate_proof_test.go", "validate_attester_slashing_test.go", "validate_beacon_attestation_test.go", diff --git a/beacon-chain/sync/block_batcher.go b/beacon-chain/sync/block_batcher.go new file mode 100644 index 0000000000..040dcd71e4 --- /dev/null +++ b/beacon-chain/sync/block_batcher.go @@ -0,0 +1,231 @@ +package sync + +import ( + "context" + "fmt" + "sort" + "time" + + libp2pcore "github.com/libp2p/go-libp2p/core" + "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filters" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" +) + +// blockRangeBatcher encapsulates the logic for splitting up a block range request into fixed-size batches of +// blocks that are retrieved from the database, ensured to be canonical, sequential and unique. +// If a non-nil value for ticker is set, it will be used to pause between batches lookups, as a rate-limiter. +type blockRangeBatcher struct { + start primitives.Slot + end primitives.Slot + size uint64 + db db.NoHeadAccessDatabase + limiter *limiter + ticker *time.Ticker + + cf *canonicalFilter + current *blockBatch +} + +func newBlockRangeBatcher(rp rangeParams, bdb db.NoHeadAccessDatabase, limiter *limiter, canonical canonicalChecker, ticker *time.Ticker) (*blockRangeBatcher, error) { + if bdb == nil { + return nil, errors.New("nil db param, unable to initialize blockRangeBatcher") + } + if limiter == nil { + return nil, errors.New("nil limiter param, unable to initialize blockRangeBatcher") + } + if canonical == nil { + return nil, errors.New("nil canonicalChecker param, unable to initialize blockRangeBatcher") + } + if ticker == nil { + return nil, errors.New("nil ticker param, unable to initialize blockRangeBatcher") + } + if rp.size == 0 { + return nil, fmt.Errorf("invalid batch size of %d", rp.size) + } + if rp.end < rp.start { + return nil, fmt.Errorf("batch end slot %d is lower than batch start %d", rp.end, rp.start) + } + cf := &canonicalFilter{canonical: canonical} + return &blockRangeBatcher{ + start: rp.start, + end: rp.end, + size: rp.size, + db: bdb, + limiter: limiter, + ticker: ticker, + cf: cf, + }, nil +} + +func (bb *blockRangeBatcher) next(ctx context.Context, stream libp2pcore.Stream) (blockBatch, bool) { + var nb blockBatch + var more bool + // The result of each call to next() is saved in the `current` field. + // If current is not nil, current.next figures out the next batch based on the previous one. + // If current is nil, newBlockBatch is used to generate the first batch. + if bb.current != nil { + current := *bb.current + nb, more = current.next(bb.end, bb.size) + } else { + nb, more = newBlockBatch(bb.start, bb.end, bb.size) + } + // newBlockBatch and next() both return a boolean to indicate whether calling .next() will yield another batch + // (based on the whether we've gotten to the end slot yet). blockRangeBatcher.next does the same, + // and returns (zero value, false), to signal the end of the iteration. + if !more { + return blockBatch{}, false + } + if err := bb.limiter.validateRequest(stream, bb.size); err != nil { + return blockBatch{err: errors.Wrap(err, "throttled by rate limiter")}, false + } + + // Wait for the ticker before doing anything expensive, unless this is the first batch. + if bb.ticker != nil && bb.current != nil { + <-bb.ticker.C + } + filter := filters.NewFilter().SetStartSlot(nb.start).SetEndSlot(nb.end) + blks, roots, err := bb.db.Blocks(ctx, filter) + if err != nil { + return blockBatch{err: errors.Wrap(err, "Could not retrieve blocks")}, false + } + + rob := make([]blocks.ROBlock, 0) + if nb.start == 0 { + gb, err := bb.genesisBlock(ctx) + if err != nil { + return blockBatch{err: errors.Wrap(err, "could not retrieve genesis block")}, false + } + rob = append(rob, gb) + } + for i := 0; i < len(blks); i++ { + rb, err := blocks.NewROBlockWithRoot(blks[i], roots[i]) + if err != nil { + return blockBatch{err: errors.Wrap(err, "Could not initialize ROBlock")}, false + } + rob = append(rob, rb) + } + + // Filter and sort our retrieved blocks, so that we only return valid sets of blocks. + nb.lin, nb.nonlin, nb.err = bb.cf.filter(ctx, rob) + + // Decrease allowed blocks capacity by the number of streamed blocks. + bb.limiter.add(stream, int64(1+nb.end.SubSlot(nb.start))) + bb.current = &nb + return *bb.current, true +} + +func (bb *blockRangeBatcher) genesisBlock(ctx context.Context) (blocks.ROBlock, error) { + b, err := bb.db.GenesisBlock(ctx) + if err != nil { + return blocks.ROBlock{}, err + } + htr, err := b.Block().HashTreeRoot() + if err != nil { + return blocks.ROBlock{}, err + } + return blocks.NewROBlockWithRoot(b, htr) +} + +type blockBatch struct { + start primitives.Slot + end primitives.Slot + lin []blocks.ROBlock // lin is a linear chain of blocks connected through parent_root. broken tails go in nonlin. + nonlin []blocks.ROBlock // if there is a break in the chain of parent->child relationships, the tail is stored here. + err error +} + +func newBlockBatch(start, reqEnd primitives.Slot, size uint64) (blockBatch, bool) { + if start > reqEnd { + return blockBatch{}, false + } + nb := blockBatch{start: start, end: start.Add(size - 1)} + if nb.end > reqEnd { + nb.end = reqEnd + } + return nb, true +} + +func (bat blockBatch) next(reqEnd primitives.Slot, size uint64) (blockBatch, bool) { + if bat.error() != nil { + return bat, false + } + if bat.nonLinear() { + return blockBatch{}, false + } + return newBlockBatch(bat.end.Add(1), reqEnd, size) +} + +// blocks returns the list of linear, canonical blocks read from the db. +func (bb blockBatch) canonical() []blocks.ROBlock { + return bb.lin +} + +// nonLinear is used to determine if there was a break in the chain of canonical blocks as read from the db. +// If true, code using the blockBatch should stop serving additional batches of blocks. +func (bb blockBatch) nonLinear() bool { + return len(bb.nonlin) > 0 +} + +func (bb blockBatch) error() error { + return bb.err +} + +type canonicalChecker func(context.Context, [32]byte) (bool, error) + +type canonicalFilter struct { + prevRoot [32]byte + canonical canonicalChecker +} + +// filters all the provided blocks to ensure they are canonical and strictly linear. +func (cf *canonicalFilter) filter(ctx context.Context, blks []blocks.ROBlock) ([]blocks.ROBlock, []blocks.ROBlock, error) { + blks = sortedUniqueBlocks(blks) + seq := make([]blocks.ROBlock, 0, len(blks)) + nseq := make([]blocks.ROBlock, 0) + for i, b := range blks { + cb, err := cf.canonical(ctx, b.Root()) + if err != nil { + return nil, nil, err + } + if !cb { + continue + } + // prevRoot will be the zero value until we find the first canonical block in the stream seen by an instance + // of canonicalFilter. filter is called in batches; prevRoot can be the last root from the previous batch. + first := cf.prevRoot == [32]byte{} + // We assume blocks are processed in order, so the previous canonical root should be the parent of the next. + if !first && cf.prevRoot != b.Block().ParentRoot() { + // If the current block isn't descended from the last, something is wrong. Append everything remaining + // to the list of non-linear blocks, and stop building the canonical list. + nseq = append(nseq, blks[i:]...) + break + } + seq = append(seq, blks[i]) + // Set the previous root as the + // newly added block's root + cf.prevRoot = b.Root() + } + return seq, nseq, nil +} + +// returns a copy of the []ROBlock list in sorted order with duplicates removed +func sortedUniqueBlocks(blks []blocks.ROBlock) []blocks.ROBlock { + // Remove duplicate blocks received + sort.Sort(blocks.ROBlockSlice(blks)) + if len(blks) < 2 { + return blks + } + u := 0 + for i := 1; i < len(blks); i++ { + if blks[i].Root() != blks[u].Root() { + u += 1 + if u != i { + blks[u] = blks[i] + } + } + } + return blks[:u+1] +} diff --git a/beacon-chain/sync/block_batcher_test.go b/beacon-chain/sync/block_batcher_test.go new file mode 100644 index 0000000000..c185ebd967 --- /dev/null +++ b/beacon-chain/sync/block_batcher_test.go @@ -0,0 +1,134 @@ +package sync + +import ( + "math/rand" + "testing" + + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/testing/require" +) + +func TestSortedObj_SortBlocksRoots(t *testing.T) { + source := rand.NewSource(33) + randGen := rand.New(source) + randFunc := func() int64 { + return randGen.Int63n(50) + } + + var blks []blocks.ROBlock + for i := 0; i < 10; i++ { + slot := primitives.Slot(randFunc()) + newBlk, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: slot, Body: ðpb.BeaconBlockBody{}}}) + require.NoError(t, err) + root := bytesutil.ToBytes32(bytesutil.Bytes32(uint64(slot))) + b, err := blocks.NewROBlockWithRoot(newBlk, root) + require.NoError(t, err) + blks = append(blks, b) + } + + newBlks := sortedUniqueBlocks(blks) + previousSlot := primitives.Slot(0) + for _, b := range newBlks { + if b.Block().Slot() < previousSlot { + t.Errorf("Block list is not sorted as %d is smaller than previousSlot %d", b.Block().Slot(), previousSlot) + } + previousSlot = b.Block().Slot() + } +} + +func TestSortedObj_NoDuplicates(t *testing.T) { + source := rand.NewSource(33) + randGen := rand.New(source) + var blks []blocks.ROBlock + randFunc := func() int64 { + return randGen.Int63n(50) + } + + for i := 0; i < 10; i++ { + slot := primitives.Slot(randFunc()) + newBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: slot, Body: ðpb.BeaconBlockBody{}}} + // append twice + wsb, err := blocks.NewSignedBeaconBlock(newBlk) + require.NoError(t, err) + wsbCopy, err := wsb.Copy() + require.NoError(t, err) + root := bytesutil.ToBytes32(bytesutil.Bytes32(uint64(slot))) + b, err := blocks.NewROBlockWithRoot(wsb, root) + require.NoError(t, err) + b2, err := blocks.NewROBlockWithRoot(wsbCopy, root) + require.NoError(t, err) + blks = append(blks, b, b2) + } + + dedup := sortedUniqueBlocks(blks) + roots := make(map[[32]byte]int) + for i, b := range dedup { + if di, dup := roots[b.Root()]; dup { + t.Errorf("Duplicated root %#x at index %d and %d", b.Root(), di, i) + } + roots[b.Root()] = i + } +} + +func TestBlockBatchNext(t *testing.T) { + cases := []struct { + name string + batch blockBatch + start primitives.Slot + reqEnd primitives.Slot + size uint64 + next []blockBatch + more []bool + err error + }{ + { + name: "end aligned", + batch: blockBatch{start: 0, end: 20}, + start: 0, + reqEnd: 40, + size: 20, + next: []blockBatch{ + {start: 0, end: 19}, + {start: 20, end: 39}, + {start: 40, end: 40}, + {}, + }, + more: []bool{true, true, true, false}, + }, + { + name: "batches with more", + batch: blockBatch{start: 0, end: 22}, + start: 0, + reqEnd: 40, + size: 23, + next: []blockBatch{ + {start: 0, end: 22}, + {start: 23, end: 40}, + {}, + }, + more: []bool{true, true, false}, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + var next blockBatch + var more bool + i := 0 + for next, more = newBlockBatch(c.start, c.reqEnd, c.size); more; next, more = next.next(c.reqEnd, c.size) { + exp := c.next[i] + require.Equal(t, c.more[i], more) + require.Equal(t, exp.start, next.start) + require.Equal(t, exp.end, next.end) + if exp.err != nil { + require.ErrorIs(t, next.err, exp.err) + } else { + require.NoError(t, next.err) + } + i++ + } + }) + } +} diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index b07fc6ee7b..ba99277357 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -248,7 +248,7 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra if len(bestPeers) == 0 { return nil } - roots = s.dedupRoots(roots) + roots = dedupRoots(roots) // Randomly choose a peer to query from our best peers. If that peer cannot return // all the requested blocks, we randomly select another peer. pid := bestPeers[randGen.Int()%len(bestPeers)] @@ -456,3 +456,16 @@ func slotToCacheKey(s primitives.Slot) string { b := bytesutil.SlotToBytesBigEndian(s) return string(b) } + +func dedupRoots(roots [][32]byte) [][32]byte { + newRoots := make([][32]byte, 0, len(roots)) + rootMap := make(map[[32]byte]bool, len(roots)) + for i, r := range roots { + if rootMap[r] { + continue + } + rootMap[r] = true + newRoots = append(newRoots, roots[i]) + } + return newRoots +} diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range.go b/beacon-chain/sync/rpc_beacon_blocks_by_range.go index a81a077214..5bc047df8a 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range.go @@ -6,7 +6,6 @@ import ( libp2pcore "github.com/libp2p/go-libp2p/core" "github.com/pkg/errors" - "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filters" p2ptypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v4/config/params" @@ -26,36 +25,18 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa defer cancel() SetRPCStreamDeadlines(stream) - // Ticker to stagger out large requests. - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - m, ok := msg.(*pb.BeaconBlocksByRangeRequest) if !ok { return errors.New("message is not type *pb.BeaconBlockByRangeRequest") } - if err := s.validateRangeRequest(m); err != nil { + log.WithField("start-slot", m.StartSlot).WithField("count", m.Count).Debug("BeaconBlocksByRangeRequest") + rp, err := validateRangeRequest(m, s.cfg.clock.CurrentSlot()) + if err != nil { s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream) s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) tracing.AnnotateError(span, err) return err } - // Only have range requests with a step of 1 being processed. - if m.Step > 1 { - m.Step = 1 - } - // The initial count for the first batch to be returned back. - count := m.Count - allowedBlocksPerSecond := uint64(flags.Get().BlockBatchLimit) - if count > allowedBlocksPerSecond { - count = allowedBlocksPerSecond - } - // initial batch start and end slots to be returned to remote peer. - startSlot := m.StartSlot - endSlot := startSlot.Add(m.Step * (count - 1)) - - // The final requested slot from remote peer. - endReqSlot := startSlot.Add(m.Step * (m.Count - 1)) blockLimiter, err := s.rateLimiter.topicCollector(string(stream.Protocol())) if err != nil { @@ -63,129 +44,115 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa } remainingBucketCapacity := blockLimiter.Remaining(stream.Conn().RemotePeer().String()) span.AddAttributes( - trace.Int64Attribute("start", int64(startSlot)), // lint:ignore uintcast -- This conversion is OK for tracing. - trace.Int64Attribute("end", int64(endReqSlot)), // lint:ignore uintcast -- This conversion is OK for tracing. - trace.Int64Attribute("step", int64(m.Step)), + trace.Int64Attribute("start", int64(rp.start)), // lint:ignore uintcast -- This conversion is OK for tracing. + trace.Int64Attribute("end", int64(rp.end)), // lint:ignore uintcast -- This conversion is OK for tracing. trace.Int64Attribute("count", int64(m.Count)), trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()), trace.Int64Attribute("remaining_capacity", remainingBucketCapacity), ) + + // Ticker to stagger out large requests. + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + batcher, err := newBlockRangeBatcher(rp, s.cfg.beaconDB, s.rateLimiter, s.cfg.chain.IsCanonical, ticker) + if err != nil { + log.WithError(err).Info("error in BlocksByRange batch") + s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) + return err + } + // prevRoot is used to ensure that returned chains are strictly linear for singular steps // by comparing the previous root of the block in the list with the current block's parent. - var prevRoot [32]byte - for startSlot <= endReqSlot { - if err := s.rateLimiter.validateRequest(stream, allowedBlocksPerSecond); err != nil { - tracing.AnnotateError(span, err) + var batch blockBatch + var more bool + for batch, more = batcher.next(ctx, stream); more; batch, more = batcher.next(ctx, stream) { + batchStart := time.Now() + if err := s.writeBlockBatchToStream(ctx, batch, stream); err != nil { + s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) return err } - - if endSlot-startSlot > rangeLimit { - s.writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrInvalidRequest.Error(), stream) - err := p2ptypes.ErrInvalidRequest - tracing.AnnotateError(span, err) - return err - } - - err := s.writeBlockRangeToStream(ctx, startSlot, endSlot, m.Step, &prevRoot, stream) - if err != nil && !errors.Is(err, p2ptypes.ErrInvalidParent) { - return err - } - // Reduce capacity of peer in the rate limiter first. - // Decrease allowed blocks capacity by the number of streamed blocks. - if startSlot <= endSlot { - s.rateLimiter.add(stream, int64(1+endSlot.SubSlot(startSlot).Div(m.Step))) - } - // Exit in the event we have a disjoint chain to - // return. - if errors.Is(err, p2ptypes.ErrInvalidParent) { - break - } - - // Recalculate start and end slots for the next batch to be returned to the remote peer. - startSlot = endSlot.Add(m.Step) - endSlot = startSlot.Add(m.Step * (allowedBlocksPerSecond - 1)) - if endSlot > endReqSlot { - endSlot = endReqSlot - } - - // do not wait if all blocks have already been sent. - if startSlot > endReqSlot { - break - } - - // wait for ticker before resuming streaming blocks to remote peer. - <-ticker.C + rpcBlocksByRangeResponseLatency.Observe(float64(time.Since(batchStart).Milliseconds())) + } + if err := batch.error(); err != nil { + log.WithError(err).Info("error in BlocksByRange batch") + s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) + tracing.AnnotateError(span, err) + return err } closeStream(stream, log) return nil } -func (s *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlot primitives.Slot, step uint64, - prevRoot *[32]byte, stream libp2pcore.Stream) error { +type rangeParams struct { + start primitives.Slot + end primitives.Slot + size uint64 +} + +func validateRangeRequest(r *pb.BeaconBlocksByRangeRequest, current primitives.Slot) (rangeParams, error) { + rp := rangeParams{ + start: r.StartSlot, + size: r.Count, + } + maxRequest := params.BeaconNetworkConfig().MaxRequestBlocks + // Ensure all request params are within appropriate bounds + if rp.size == 0 || rp.size > maxRequest { + return rangeParams{}, p2ptypes.ErrInvalidRequest + } + // Allow some wiggle room, up to double the MaxRequestBlocks past the current slot, + // to give nodes syncing close to the head of the chain some margin for error. + maxStart, err := current.SafeAdd(maxRequest * 2) + if err != nil { + return rangeParams{}, p2ptypes.ErrInvalidRequest + } + if rp.start > maxStart { + return rangeParams{}, p2ptypes.ErrInvalidRequest + } + rp.end, err = rp.start.SafeAdd((rp.size - 1)) + if err != nil { + return rangeParams{}, p2ptypes.ErrInvalidRequest + } + + limit := uint64(flags.Get().BlockBatchLimit) + if limit > maxRequest { + limit = maxRequest + } + if rp.size > limit { + rp.size = limit + } + + return rp, nil +} + +func (s *Service) writeBlockBatchToStream(ctx context.Context, batch blockBatch, stream libp2pcore.Stream) error { ctx, span := trace.StartSpan(ctx, "sync.WriteBlockRangeToStream") defer span.End() - filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot).SetSlotStep(step) - blks, roots, err := s.cfg.beaconDB.Blocks(ctx, filter) - if err != nil { - log.WithError(err).Debug("Could not retrieve blocks") - s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) - tracing.AnnotateError(span, err) - return err - } - // handle genesis case - if startSlot == 0 { - genBlock, genRoot, err := s.retrieveGenesisBlock(ctx) - if err != nil { - log.WithError(err).Debug("Could not retrieve genesis block") - s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) - tracing.AnnotateError(span, err) - return err + blinded := make([]interfaces.ReadOnlySignedBeaconBlock, 0) + for _, b := range batch.canonical() { + if err := blocks.BeaconBlockIsNil(b); err != nil { + continue } - blks = append([]interfaces.ReadOnlySignedBeaconBlock{genBlock}, blks...) - roots = append([][32]byte{genRoot}, roots...) - } - // Filter and sort our retrieved blocks, so that - // we only return valid sets of blocks. - blks, roots, err = s.dedupBlocksAndRoots(blks, roots) - if err != nil { - s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) - tracing.AnnotateError(span, err) - return err - } - blks, roots = s.sortBlocksAndRoots(blks, roots) - - blks, err = s.filterBlocks(ctx, blks, roots, prevRoot, step, startSlot) - if err != nil && err != p2ptypes.ErrInvalidParent { - s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) - tracing.AnnotateError(span, err) - return err - } - start := time.Now() - // If the blocks are blinded, we reconstruct the full block via the execution client. - blindedExists := false - blindedIndex := 0 - for i, b := range blks { - // Since the blocks are sorted in ascending order, we assume that the following - // blocks from the first blinded block are also ascending. if b.IsBlinded() { - blindedExists = true - blindedIndex = i - break + blinded = append(blinded, b.ReadOnlySignedBeaconBlock) + continue + } + if chunkErr := s.chunkBlockWriter(stream, b); chunkErr != nil { + log.WithError(chunkErr).Error("Could not send a chunked response") + return chunkErr } } - - var reconstructedBlock []interfaces.SignedBeaconBlock - if blindedExists { - reconstructedBlock, err = s.cfg.executionPayloadReconstructor.ReconstructFullBellatrixBlockBatch(ctx, blks[blindedIndex:]) - if err != nil { - log.WithError(err).Error("Could not reconstruct full bellatrix block batch from blinded bodies") - s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) - return err - } + if len(blinded) == 0 { + return nil } - for _, b := range blks { + reconstructed, err := s.cfg.executionPayloadReconstructor.ReconstructFullBellatrixBlockBatch(ctx, blinded) + if err != nil { + log.WithError(err).Error("Could not reconstruct full bellatrix block batch from blinded bodies") + return err + } + for _, b := range reconstructed { if err := blocks.BeaconBlockIsNil(b); err != nil { continue } @@ -194,115 +161,9 @@ func (s *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlo } if chunkErr := s.chunkBlockWriter(stream, b); chunkErr != nil { log.WithError(chunkErr).Debug("Could not send a chunked response") - s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) - tracing.AnnotateError(span, chunkErr) return chunkErr } } - for _, b := range reconstructedBlock { - if err := blocks.BeaconBlockIsNil(b); err != nil { - continue - } - if b.IsBlinded() { - continue - } - if chunkErr := s.chunkBlockWriter(stream, b); chunkErr != nil { - log.WithError(chunkErr).Debug("Could not send a chunked response") - s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream) - tracing.AnnotateError(span, chunkErr) - return chunkErr - } - } - rpcBlocksByRangeResponseLatency.Observe(float64(time.Since(start).Milliseconds())) - // Return error in the event we have an invalid parent. - return err -} - -func (s *Service) validateRangeRequest(r *pb.BeaconBlocksByRangeRequest) error { - startSlot := r.StartSlot - count := r.Count - step := r.Step - - maxRequestBlocks := params.BeaconNetworkConfig().MaxRequestBlocks - // Add a buffer for possible large range requests from nodes syncing close to the - // head of the chain. - buffer := rangeLimit * 2 - highestExpectedSlot := s.cfg.clock.CurrentSlot().Add(uint64(buffer)) - - // Ensure all request params are within appropriate bounds - if count == 0 || count > maxRequestBlocks { - return p2ptypes.ErrInvalidRequest - } - - if step == 0 || step > rangeLimit { - return p2ptypes.ErrInvalidRequest - } - - if startSlot > highestExpectedSlot { - return p2ptypes.ErrInvalidRequest - } - - endSlot := startSlot.Add(step * (count - 1)) - if endSlot-startSlot > rangeLimit { - return p2ptypes.ErrInvalidRequest - } return nil } - -// filters all the provided blocks to ensure they are canonical -// and are strictly linear. -func (s *Service) filterBlocks(ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock, roots [][32]byte, prevRoot *[32]byte, - step uint64, startSlot primitives.Slot) ([]interfaces.ReadOnlySignedBeaconBlock, error) { - if len(blks) != len(roots) { - return nil, errors.New("input blks and roots are diff lengths") - } - - newBlks := make([]interfaces.ReadOnlySignedBeaconBlock, 0, len(blks)) - for i, b := range blks { - isCanonical, err := s.cfg.chain.IsCanonical(ctx, roots[i]) - if err != nil { - return nil, err - } - parentValid := *prevRoot != [32]byte{} - isLinear := *prevRoot == b.Block().ParentRoot() - isSingular := step == 1 - slotDiff, err := b.Block().Slot().SafeSubSlot(startSlot) - if err != nil { - return nil, err - } - slotDiff, err = slotDiff.SafeMod(step) - if err != nil { - return nil, err - } - isRequestedSlotStep := slotDiff == 0 - if isRequestedSlotStep && isCanonical { - // Exit early if our valid block is non linear. - if parentValid && isSingular && !isLinear { - return newBlks, p2ptypes.ErrInvalidParent - } - newBlks = append(newBlks, blks[i]) - // Set the previous root as the - // newly added block's root - currRoot := roots[i] - *prevRoot = currRoot - } - } - return newBlks, nil -} - -func (s *Service) writeErrorResponseToStream(responseCode byte, reason string, stream libp2pcore.Stream) { - writeErrorResponseToStream(responseCode, reason, stream, s.cfg.p2p) -} - -func (s *Service) retrieveGenesisBlock(ctx context.Context) (interfaces.ReadOnlySignedBeaconBlock, [32]byte, error) { - genBlock, err := s.cfg.beaconDB.GenesisBlock(ctx) - if err != nil { - return nil, [32]byte{}, err - } - genRoot, err := genBlock.Block().HashTreeRoot() - if err != nil { - return nil, [32]byte{}, err - } - return genBlock, genRoot, nil -} diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go index 2472308398..1234462bac 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go @@ -25,7 +25,6 @@ import ( fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" - "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" leakybucket "github.com/prysmaticlabs/prysm/v4/container/leaky-bucket" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" @@ -63,14 +62,9 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsBlocks(t *testing.T) { util.SaveBlock(t, context.Background(), d, blk) } + clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). - r := &Service{ - cfg: &config{ - p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, - clock: startup.NewClock(time.Unix(0, 0), [32]byte{}), - }, - rateLimiter: newRateLimiter(p1), - } + r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false) @@ -131,8 +125,8 @@ func TestRPCBeaconBlocksByRange_ReturnCorrectNumberBack(t *testing.T) { } require.NoError(t, d.SaveGenesisBlockRoot(context.Background(), genRoot)) - // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) + // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) @@ -240,14 +234,15 @@ func TestRPCBeaconBlocksByRange_ReconstructsPayloads(t *testing.T) { } require.NoError(t, d.SaveGenesisBlockRoot(context.Background(), genRoot)) + clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). r := &Service{ cfg: &config{ p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, + clock: clock, executionPayloadReconstructor: mockEngine, - clock: startup.NewClock(time.Unix(0, 0), [32]byte{}), }, rateLimiter: newRateLimiter(p1), } @@ -317,9 +312,9 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsSortedBlocks(t *testing.T) { j++ } - // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)} + // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). + r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false) @@ -384,7 +379,8 @@ func TestRPCBeaconBlocksByRange_ReturnsGenesisBlock(t *testing.T) { prevRoot = rt } - r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: startup.NewClock(time.Unix(0, 0), [32]byte{})}, rateLimiter: newRateLimiter(p1)} + clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) + r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false) @@ -503,7 +499,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { capacity := int64(flags.Get().BlockBatchLimit * 3) clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) @@ -534,7 +530,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { capacity := int64(flags.Get().BlockBatchLimit * flags.Get().BlockBatchLimitBurstFactor) clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1) topic := string(pcl) r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, time.Second, false) @@ -565,15 +561,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) { func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) { slotsSinceGenesis := primitives.Slot(1000) offset := int64(slotsSinceGenesis.Mul(params.BeaconConfig().SecondsPerSlot)) - chain := &chainMock.ChainService{ - Genesis: time.Now().Add(time.Second * time.Duration(-1*offset)), - } - r := &Service{ - cfg: &config{ - chain: chain, - clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot), - }, - } + clock := startup.NewClock(time.Now().Add(time.Second*time.Duration(-1*offset)), [32]byte{}) tests := []struct { name string @@ -613,8 +601,7 @@ func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) { Step: 0, Count: 1, }, - expectedError: p2ptypes.ErrInvalidRequest, - errorToLog: "validation did not fail with bad step", + expectedError: nil, // The Step param is ignored in v2 RPC }, { name: "Over limit Step", @@ -622,8 +609,7 @@ func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) { Step: rangeLimit + 1, Count: 1, }, - expectedError: p2ptypes.ErrInvalidRequest, - errorToLog: "validation did not fail with bad step", + expectedError: nil, // The Step param is ignored in v2 RPC }, { name: "Correct Step", @@ -658,8 +644,7 @@ func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) { Step: 3, Count: uint64(slotsSinceGenesis / 2), }, - expectedError: p2ptypes.ErrInvalidRequest, - errorToLog: "validation did not fail with bad range", + expectedError: nil, // this is fine with the deprecation of Step }, { name: "Valid Request", @@ -674,10 +659,11 @@ func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + _, err := validateRangeRequest(tt.req, clock.CurrentSlot()) if tt.expectedError != nil { - assert.ErrorContains(t, tt.expectedError.Error(), r.validateRangeRequest(tt.req), tt.errorToLog) + assert.ErrorContains(t, tt.expectedError.Error(), err, tt.errorToLog) } else { - assert.NoError(t, r.validateRangeRequest(tt.req), tt.errorToLog) + assert.NoError(t, err, tt.errorToLog) } }) } @@ -905,7 +891,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 1, @@ -937,7 +923,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) - r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)} + r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 1, @@ -971,7 +957,6 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { p1.Connect(p2) assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") - clock := startup.NewClock(time.Unix(0, 0), [32]byte{}) r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)} r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false) @@ -1087,12 +1072,6 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) { } func TestRPCBeaconBlocksByRange_FilterBlocks_PreviousRoot(t *testing.T) { - p1 := p2ptest.NewTestP2P(t) - p2 := p2ptest.NewTestP2P(t) - p1.Connect(p2) - assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") - d := db.SetupDB(t) - req := ðpb.BeaconBlocksByRangeRequest{ StartSlot: 100, Step: 1, @@ -1102,8 +1081,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks_PreviousRoot(t *testing.T) { // Populate the database with blocks that would match the request. var prevRoot [32]byte var err error - blks := []interfaces.ReadOnlySignedBeaconBlock{} - var roots [][32]byte + var blks []blocks.ROBlock for i := req.StartSlot; i < req.StartSlot.Add(req.Count); i += primitives.Slot(1) { blk := util.NewBeaconBlock() blk.Block.Slot = i @@ -1112,21 +1090,19 @@ func TestRPCBeaconBlocksByRange_FilterBlocks_PreviousRoot(t *testing.T) { require.NoError(t, err) wsb, err := blocks.NewSignedBeaconBlock(blk) require.NoError(t, err) - blks = append(blks, wsb) copiedRt := prevRoot - roots = append(roots, copiedRt) + b, err := blocks.NewROBlockWithRoot(wsb, copiedRt) + require.NoError(t, err) + blks = append(blks, b) } - // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). - r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)} - - var initialRoot [32]byte - ptrRt := &initialRoot - newBlks, err := r.filterBlocks(context.Background(), blks, roots, ptrRt, req.Step, req.StartSlot) + chain := &chainMock.ChainService{} + cf := canonicalFilter{canonical: chain.IsCanonical} + seq, nseq, err := cf.filter(context.Background(), blks) require.NoError(t, err) - require.Equal(t, len(blks), len(newBlks)) + require.Equal(t, len(blks), len(seq)) + require.Equal(t, 0, len(nseq)) // pointer should reference a new root. - require.NotEqual(t, *ptrRt, [32]byte{}) - + require.NotEqual(t, cf.prevRoot, [32]byte{}) } diff --git a/beacon-chain/sync/rpc_handler_test.go b/beacon-chain/sync/rpc_handler_test.go new file mode 100644 index 0000000000..2ce2715c7d --- /dev/null +++ b/beacon-chain/sync/rpc_handler_test.go @@ -0,0 +1,56 @@ +package sync + +import ( + "context" + "testing" + "time" + + "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/protocol" + p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing" + "github.com/prysmaticlabs/prysm/v4/testing/require" + "github.com/prysmaticlabs/prysm/v4/testing/util" +) + +type rpcHandlerTest struct { + t *testing.T + topic protocol.ID + timeout time.Duration + err error + s *Service +} + +func (rt *rpcHandlerTest) testHandler(nh network.StreamHandler, rh rpcHandler, rhi interface{}) { + ctx, cancel := context.WithTimeout(context.Background(), rt.timeout) + defer func() { + cancel() + }() + + w := util.NewWaiter() + server := p2ptest.NewTestP2P(rt.t) + + client, ok := rt.s.cfg.p2p.(*p2ptest.TestP2P) + require.Equal(rt.t, true, ok) + + client.Connect(server) + defer func() { + require.NoError(rt.t, client.Disconnect(server.PeerID())) + }() + require.Equal(rt.t, 1, len(client.BHost.Network().Peers()), "Expected peers to be connected") + h := func(stream network.Stream) { + defer w.Done() + nh(stream) + } + server.BHost.SetStreamHandler(protocol.ID(rt.topic), h) + stream, err := client.BHost.NewStream(ctx, server.BHost.ID(), protocol.ID(rt.topic)) + require.NoError(rt.t, err) + + err = rh(ctx, rhi, stream) + if rt.err == nil { + require.NoError(rt.t, err) + } else { + require.ErrorIs(rt.t, err, rt.err) + } + + w.RequireDoneBeforeCancel(ctx, rt.t) +} diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index a247e624a1..5ed48b5cdb 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -11,6 +11,7 @@ import ( lru "github.com/hashicorp/golang-lru" pubsub "github.com/libp2p/go-libp2p-pubsub" + libp2pcore "github.com/libp2p/go-libp2p/core" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" gcache "github.com/patrickmn/go-cache" @@ -41,7 +42,7 @@ import ( var _ runtime.Service = (*Service)(nil) -const rangeLimit = 1024 +const rangeLimit uint64 = 1024 const seenBlockSize = 1000 const seenUnaggregatedAttSize = 20000 const seenAggregatedAttSize = 1024 @@ -272,6 +273,10 @@ func (s *Service) registerHandlers() { } } +func (s *Service) writeErrorResponseToStream(responseCode byte, reason string, stream libp2pcore.Stream) { + writeErrorResponseToStream(responseCode, reason, stream, s.cfg.p2p) +} + // marks the chain as having started. func (s *Service) markForChainStart() { s.chainStarted.Set() diff --git a/beacon-chain/sync/utils.go b/beacon-chain/sync/utils.go deleted file mode 100644 index dcf77f2526..0000000000 --- a/beacon-chain/sync/utils.go +++ /dev/null @@ -1,77 +0,0 @@ -package sync - -import ( - "errors" - "sort" - - "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" -) - -// A type to represent beacon blocks and roots which have methods -// which satisfy the Interface in `Sort` so that this type can -// be sorted in ascending order. -type sortedObj struct { - blks []interfaces.ReadOnlySignedBeaconBlock - roots [][32]byte -} - -// Less reports whether the element with index i must sort before the element with index j. -func (s sortedObj) Less(i, j int) bool { - return s.blks[i].Block().Slot() < s.blks[j].Block().Slot() -} - -// Swap swaps the elements with indexes i and j. -func (s sortedObj) Swap(i, j int) { - s.blks[i], s.blks[j] = s.blks[j], s.blks[i] - s.roots[i], s.roots[j] = s.roots[j], s.roots[i] -} - -// Len is the number of elements in the collection. -func (s sortedObj) Len() int { - return len(s.blks) -} - -// removes duplicates from provided blocks and roots. -func (_ *Service) dedupBlocksAndRoots(blks []interfaces.ReadOnlySignedBeaconBlock, roots [][32]byte) ([]interfaces.ReadOnlySignedBeaconBlock, [][32]byte, error) { - if len(blks) != len(roots) { - return nil, nil, errors.New("input blks and roots are diff lengths") - } - - // Remove duplicate blocks received - rootMap := make(map[[32]byte]bool, len(blks)) - newBlks := make([]interfaces.ReadOnlySignedBeaconBlock, 0, len(blks)) - newRoots := make([][32]byte, 0, len(roots)) - for i, r := range roots { - if rootMap[r] { - continue - } - rootMap[r] = true - newRoots = append(newRoots, roots[i]) - newBlks = append(newBlks, blks[i]) - } - return newBlks, newRoots, nil -} - -func (_ *Service) dedupRoots(roots [][32]byte) [][32]byte { - newRoots := make([][32]byte, 0, len(roots)) - rootMap := make(map[[32]byte]bool, len(roots)) - for i, r := range roots { - if rootMap[r] { - continue - } - rootMap[r] = true - newRoots = append(newRoots, roots[i]) - } - return newRoots -} - -// sort the provided blocks and roots in ascending order. This method assumes that the size of -// block slice and root slice is equal. -func (_ *Service) sortBlocksAndRoots(blks []interfaces.ReadOnlySignedBeaconBlock, roots [][32]byte) ([]interfaces.ReadOnlySignedBeaconBlock, [][32]byte) { - obj := sortedObj{ - blks: blks, - roots: roots, - } - sort.Sort(obj) - return obj.blks, obj.roots -} diff --git a/beacon-chain/sync/utils_test.go b/beacon-chain/sync/utils_test.go deleted file mode 100644 index 919c0f9d20..0000000000 --- a/beacon-chain/sync/utils_test.go +++ /dev/null @@ -1,85 +0,0 @@ -package sync - -import ( - "math/rand" - "testing" - - "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" - "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" - "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" - "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" - ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" - "github.com/prysmaticlabs/prysm/v4/testing/require" -) - -func TestSortedObj_SortBlocksRoots(t *testing.T) { - source := rand.NewSource(33) - randGen := rand.New(source) - var blks []interfaces.ReadOnlySignedBeaconBlock - var roots [][32]byte - randFunc := func() int64 { - return randGen.Int63n(50) - } - - for i := 0; i < 10; i++ { - slot := primitives.Slot(randFunc()) - newBlk, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: slot, Body: ðpb.BeaconBlockBody{}}}) - require.NoError(t, err) - blks = append(blks, newBlk) - root := bytesutil.ToBytes32(bytesutil.Bytes32(uint64(slot))) - roots = append(roots, root) - } - - r := &Service{} - - newBlks, newRoots := r.sortBlocksAndRoots(blks, roots) - - previousSlot := primitives.Slot(0) - for i, b := range newBlks { - if b.Block().Slot() < previousSlot { - t.Errorf("Block list is not sorted as %d is smaller than previousSlot %d", b.Block().Slot(), previousSlot) - } - if bytesutil.FromBytes8(newRoots[i][:]) != uint64(b.Block().Slot()) { - t.Errorf("root doesn't match stored slot in block: wanted %d but got %d", b.Block().Slot(), bytesutil.FromBytes8(newRoots[i][:])) - } - previousSlot = b.Block().Slot() - } -} - -func TestSortedObj_NoDuplicates(t *testing.T) { - source := rand.NewSource(33) - randGen := rand.New(source) - var blks []interfaces.ReadOnlySignedBeaconBlock - var roots [][32]byte - randFunc := func() int64 { - return randGen.Int63n(50) - } - - for i := 0; i < 10; i++ { - slot := primitives.Slot(randFunc()) - newBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: slot, Body: ðpb.BeaconBlockBody{}}} - // append twice - wsb, err := blocks.NewSignedBeaconBlock(newBlk) - require.NoError(t, err) - wsbCopy, err := wsb.Copy() - require.NoError(t, err) - blks = append(blks, wsb, wsbCopy) - - // append twice - root := bytesutil.ToBytes32(bytesutil.Bytes32(uint64(slot))) - roots = append(roots, root, root) - } - - r := &Service{} - - newBlks, newRoots, err := r.dedupBlocksAndRoots(blks, roots) - require.NoError(t, err) - - rootMap := make(map[[32]byte]bool) - for i, b := range newBlks { - if rootMap[newRoots[i]] { - t.Errorf("Duplicated root exists %#x with block %v", newRoots[i], b) - } - rootMap[newRoots[i]] = true - } -} diff --git a/consensus-types/blocks/BUILD.bazel b/consensus-types/blocks/BUILD.bazel index 8d507f9a66..f89aa31115 100644 --- a/consensus-types/blocks/BUILD.bazel +++ b/consensus-types/blocks/BUILD.bazel @@ -7,6 +7,7 @@ go_library( "factory.go", "getters.go", "proto.go", + "roblock.go", "setters.go", "types.go", ], @@ -36,6 +37,7 @@ go_test( "factory_test.go", "getters_test.go", "proto_test.go", + "roblock_test.go", ], embed = [":go_default_library"], deps = [ diff --git a/consensus-types/blocks/roblock.go b/consensus-types/blocks/roblock.go new file mode 100644 index 0000000000..d63d948631 --- /dev/null +++ b/consensus-types/blocks/roblock.go @@ -0,0 +1,75 @@ +package blocks + +import ( + "bytes" + "sort" + + "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" +) + +// ROBlock is a value that embeds a ReadOnlySignedBeaconBlock along with its block root ([32]byte). +// This allows the block root to be cached within a value that satisfies the ReadOnlySignedBeaconBlock interface. +// Since the root and slot for each ROBlock is known, slices can be efficiently sorted using ROBlockSlice. +type ROBlock struct { + interfaces.ReadOnlySignedBeaconBlock + root [32]byte +} + +// Root returns the block hash_tree_root for the embedded ReadOnlySignedBeaconBlock.Block(). +func (b ROBlock) Root() [32]byte { + return b.root +} + +// NewROBlockWithRoot creates an ROBlock embedding the given block with its root. It accepts the root as parameter rather than +// computing it internally, because in some cases a block is retrieved by its root and recomputing it is a waste. +func NewROBlockWithRoot(b interfaces.ReadOnlySignedBeaconBlock, root [32]byte) (ROBlock, error) { + if err := BeaconBlockIsNil(b); err != nil { + return ROBlock{}, err + } + return ROBlock{ReadOnlySignedBeaconBlock: b, root: root}, nil +} + +// NewROBlock creates a ROBlock from a ReadOnlySignedBeaconBlock. It uses the HashTreeRoot method of the given +// ReadOnlySignedBeaconBlock.Block to compute the cached root. +func NewROBlock(b interfaces.ReadOnlySignedBeaconBlock) (ROBlock, error) { + if err := BeaconBlockIsNil(b); err != nil { + return ROBlock{}, err + } + root, err := b.Block().HashTreeRoot() + if err != nil { + return ROBlock{}, err + } + return ROBlock{ReadOnlySignedBeaconBlock: b, root: root}, nil +} + +// ROBlockSlice implements sort.Interface so that slices of ROBlocks can be easily sorted. +// A slice of ROBlock is sorted first by slot, with ties broken by cached block roots. +type ROBlockSlice []ROBlock + +var _ sort.Interface = ROBlockSlice{} + +// Less reports whether the element with index i must sort before the element with index j. +// ROBlocks are ordered first by their slot, +// with a lexicographic sort of roots breaking ties for slots with duplicate blocks. +func (s ROBlockSlice) Less(i, j int) bool { + si, sj := s[i].Block().Slot(), s[j].Block().Slot() + + // lower slot wins + if si != sj { + return si < sj + } + + // break slot tie lexicographically comparing roots byte for byte + ri, rj := s[i].Root(), s[j].Root() + return bytes.Compare(ri[:], rj[:]) < 0 +} + +// Swap swaps the elements with indexes i and j. +func (s ROBlockSlice) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +// Len is the number of elements in the collection. +func (s ROBlockSlice) Len() int { + return len(s) +} diff --git a/consensus-types/blocks/roblock_test.go b/consensus-types/blocks/roblock_test.go new file mode 100644 index 0000000000..2ffde9624c --- /dev/null +++ b/consensus-types/blocks/roblock_test.go @@ -0,0 +1,90 @@ +package blocks + +import ( + "sort" + "testing" + + "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" + eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v4/testing/require" +) + +func TestROBlockSorting(t *testing.T) { + one := bytesutil.ToBytes32(bytesutil.PadTo([]byte{1}, 32)) + two := bytesutil.ToBytes32(bytesutil.PadTo([]byte{2}, 32)) + cases := []struct { + name string + ros []ROBlock + sorted []ROBlock + }{ + { + name: "1 item", + ros: []ROBlock{testROBlock(t, 1, [32]byte{})}, + sorted: []ROBlock{testROBlock(t, 1, [32]byte{})}, + }, + { + name: "2 items, sorted", + ros: []ROBlock{testROBlock(t, 1, [32]byte{}), testROBlock(t, 2, [32]byte{})}, + sorted: []ROBlock{testROBlock(t, 1, [32]byte{}), testROBlock(t, 2, [32]byte{})}, + }, + { + name: "2 items, reversed", + ros: []ROBlock{testROBlock(t, 2, [32]byte{}), testROBlock(t, 1, [32]byte{})}, + sorted: []ROBlock{testROBlock(t, 1, [32]byte{}), testROBlock(t, 2, [32]byte{})}, + }, + { + name: "3 items, reversed, with tie breaker", + ros: []ROBlock{ + testROBlock(t, 2, two), + testROBlock(t, 2, one), + testROBlock(t, 1, [32]byte{}), + }, + sorted: []ROBlock{ + testROBlock(t, 1, [32]byte{}), + testROBlock(t, 2, one), + testROBlock(t, 2, two), + }, + }, + { + name: "5 items, reversed, with double root tie", + ros: []ROBlock{ + testROBlock(t, 0, one), + testROBlock(t, 2, two), + testROBlock(t, 2, one), + testROBlock(t, 2, two), + testROBlock(t, 2, one), + testROBlock(t, 1, [32]byte{}), + }, + sorted: []ROBlock{ + testROBlock(t, 0, one), + testROBlock(t, 1, [32]byte{}), + testROBlock(t, 2, one), + testROBlock(t, 2, one), + testROBlock(t, 2, two), + testROBlock(t, 2, two), + }, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + sort.Sort(ROBlockSlice(c.ros)) + for i := 0; i < len(c.sorted); i++ { + require.Equal(t, c.sorted[i].Block().Slot(), c.ros[i].Block().Slot()) + require.Equal(t, c.sorted[i].Root(), c.ros[i].Root()) + } + }) + } +} + +func testROBlock(t *testing.T, slot primitives.Slot, root [32]byte) ROBlock { + b, err := NewSignedBeaconBlock(ð.SignedBeaconBlock{Block: ð.BeaconBlock{ + Body: ð.BeaconBlockBody{}, + Slot: slot, + }}) + require.NoError(t, err) + return ROBlock{ + ReadOnlySignedBeaconBlock: b, + root: root, + } +} diff --git a/testing/util/wait_timeout.go b/testing/util/wait_timeout.go index d7abaaefee..0482e4f229 100644 --- a/testing/util/wait_timeout.go +++ b/testing/util/wait_timeout.go @@ -1,7 +1,9 @@ package util import ( + "context" "sync" + "testing" "time" ) @@ -20,3 +22,41 @@ func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { return true } } + +// Waiter offers an alternate ux for building tests that want to ensure contexts are used in certain ways. +type Waiter struct { + c chan struct{} +} + +// NewWaiter internally create the chan that Waiter relies on. +func NewWaiter() *Waiter { + return &Waiter{ + c: make(chan struct{}), + } +} + +// Done is used with RequireDoneAfter and RequireDoneBefore to make assertions +// that certain test code is reached before a timeout or context cancelation. +func (w *Waiter) Done() { + close(w.c) +} + +// RequireDoneAfter forces the test to fail if the timeout is reached before Done is called. +func (w *Waiter) RequireDoneAfter(t *testing.T, timeout time.Duration) { + select { + case <-w.c: + return + case <-time.After(timeout): + t.Fatalf("timeout after %s", timeout) + } +} + +// RequireDoneBeforeCancel forces the test to fail if the context is cancelled before Done is called. +func (w *Waiter) RequireDoneBeforeCancel(ctx context.Context, t *testing.T) { + select { + case <-w.c: + return + case <-ctx.Done(): + t.Fatalf("context canceled before Done with error=%s", ctx.Err()) + } +}