mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
Revert initsync revert (#12431)
* Revert "Revert "BeaconBlocksByRange and BlobSidecarsByRange consistency (#123… (#12426)"
This reverts commit ddc1e48e05.
* fix metrics bug, add batch.next tests
---------
Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
This commit is contained in:
@@ -4,6 +4,7 @@ go_library(
|
|||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = [
|
srcs = [
|
||||||
"batch_verifier.go",
|
"batch_verifier.go",
|
||||||
|
"block_batcher.go",
|
||||||
"broadcast_bls_changes.go",
|
"broadcast_bls_changes.go",
|
||||||
"context.go",
|
"context.go",
|
||||||
"deadlines.go",
|
"deadlines.go",
|
||||||
@@ -37,7 +38,6 @@ go_library(
|
|||||||
"subscriber_sync_committee_message.go",
|
"subscriber_sync_committee_message.go",
|
||||||
"subscriber_sync_contribution_proof.go",
|
"subscriber_sync_contribution_proof.go",
|
||||||
"subscription_topic_handler.go",
|
"subscription_topic_handler.go",
|
||||||
"utils.go",
|
|
||||||
"validate_aggregate_proof.go",
|
"validate_aggregate_proof.go",
|
||||||
"validate_attester_slashing.go",
|
"validate_attester_slashing.go",
|
||||||
"validate_beacon_attestation.go",
|
"validate_beacon_attestation.go",
|
||||||
@@ -134,6 +134,7 @@ go_test(
|
|||||||
size = "small",
|
size = "small",
|
||||||
srcs = [
|
srcs = [
|
||||||
"batch_verifier_test.go",
|
"batch_verifier_test.go",
|
||||||
|
"block_batcher_test.go",
|
||||||
"broadcast_bls_changes_test.go",
|
"broadcast_bls_changes_test.go",
|
||||||
"context_test.go",
|
"context_test.go",
|
||||||
"decode_pubsub_test.go",
|
"decode_pubsub_test.go",
|
||||||
@@ -146,6 +147,7 @@ go_test(
|
|||||||
"rpc_beacon_blocks_by_root_test.go",
|
"rpc_beacon_blocks_by_root_test.go",
|
||||||
"rpc_chunked_response_test.go",
|
"rpc_chunked_response_test.go",
|
||||||
"rpc_goodbye_test.go",
|
"rpc_goodbye_test.go",
|
||||||
|
"rpc_handler_test.go",
|
||||||
"rpc_metadata_test.go",
|
"rpc_metadata_test.go",
|
||||||
"rpc_ping_test.go",
|
"rpc_ping_test.go",
|
||||||
"rpc_send_request_test.go",
|
"rpc_send_request_test.go",
|
||||||
@@ -158,7 +160,6 @@ go_test(
|
|||||||
"subscription_topic_handler_test.go",
|
"subscription_topic_handler_test.go",
|
||||||
"sync_fuzz_test.go",
|
"sync_fuzz_test.go",
|
||||||
"sync_test.go",
|
"sync_test.go",
|
||||||
"utils_test.go",
|
|
||||||
"validate_aggregate_proof_test.go",
|
"validate_aggregate_proof_test.go",
|
||||||
"validate_attester_slashing_test.go",
|
"validate_attester_slashing_test.go",
|
||||||
"validate_beacon_attestation_test.go",
|
"validate_beacon_attestation_test.go",
|
||||||
|
|||||||
231
beacon-chain/sync/block_batcher.go
Normal file
231
beacon-chain/sync/block_batcher.go
Normal file
@@ -0,0 +1,231 @@
|
|||||||
|
package sync
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
libp2pcore "github.com/libp2p/go-libp2p/core"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filters"
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||||
|
)
|
||||||
|
|
||||||
|
// blockRangeBatcher encapsulates the logic for splitting up a block range request into fixed-size batches of
|
||||||
|
// blocks that are retrieved from the database, ensured to be canonical, sequential and unique.
|
||||||
|
// If a non-nil value for ticker is set, it will be used to pause between batches lookups, as a rate-limiter.
|
||||||
|
type blockRangeBatcher struct {
|
||||||
|
start primitives.Slot
|
||||||
|
end primitives.Slot
|
||||||
|
size uint64
|
||||||
|
db db.NoHeadAccessDatabase
|
||||||
|
limiter *limiter
|
||||||
|
ticker *time.Ticker
|
||||||
|
|
||||||
|
cf *canonicalFilter
|
||||||
|
current *blockBatch
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBlockRangeBatcher(rp rangeParams, bdb db.NoHeadAccessDatabase, limiter *limiter, canonical canonicalChecker, ticker *time.Ticker) (*blockRangeBatcher, error) {
|
||||||
|
if bdb == nil {
|
||||||
|
return nil, errors.New("nil db param, unable to initialize blockRangeBatcher")
|
||||||
|
}
|
||||||
|
if limiter == nil {
|
||||||
|
return nil, errors.New("nil limiter param, unable to initialize blockRangeBatcher")
|
||||||
|
}
|
||||||
|
if canonical == nil {
|
||||||
|
return nil, errors.New("nil canonicalChecker param, unable to initialize blockRangeBatcher")
|
||||||
|
}
|
||||||
|
if ticker == nil {
|
||||||
|
return nil, errors.New("nil ticker param, unable to initialize blockRangeBatcher")
|
||||||
|
}
|
||||||
|
if rp.size == 0 {
|
||||||
|
return nil, fmt.Errorf("invalid batch size of %d", rp.size)
|
||||||
|
}
|
||||||
|
if rp.end < rp.start {
|
||||||
|
return nil, fmt.Errorf("batch end slot %d is lower than batch start %d", rp.end, rp.start)
|
||||||
|
}
|
||||||
|
cf := &canonicalFilter{canonical: canonical}
|
||||||
|
return &blockRangeBatcher{
|
||||||
|
start: rp.start,
|
||||||
|
end: rp.end,
|
||||||
|
size: rp.size,
|
||||||
|
db: bdb,
|
||||||
|
limiter: limiter,
|
||||||
|
ticker: ticker,
|
||||||
|
cf: cf,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bb *blockRangeBatcher) next(ctx context.Context, stream libp2pcore.Stream) (blockBatch, bool) {
|
||||||
|
var nb blockBatch
|
||||||
|
var more bool
|
||||||
|
// The result of each call to next() is saved in the `current` field.
|
||||||
|
// If current is not nil, current.next figures out the next batch based on the previous one.
|
||||||
|
// If current is nil, newBlockBatch is used to generate the first batch.
|
||||||
|
if bb.current != nil {
|
||||||
|
current := *bb.current
|
||||||
|
nb, more = current.next(bb.end, bb.size)
|
||||||
|
} else {
|
||||||
|
nb, more = newBlockBatch(bb.start, bb.end, bb.size)
|
||||||
|
}
|
||||||
|
// newBlockBatch and next() both return a boolean to indicate whether calling .next() will yield another batch
|
||||||
|
// (based on the whether we've gotten to the end slot yet). blockRangeBatcher.next does the same,
|
||||||
|
// and returns (zero value, false), to signal the end of the iteration.
|
||||||
|
if !more {
|
||||||
|
return blockBatch{}, false
|
||||||
|
}
|
||||||
|
if err := bb.limiter.validateRequest(stream, bb.size); err != nil {
|
||||||
|
return blockBatch{err: errors.Wrap(err, "throttled by rate limiter")}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the ticker before doing anything expensive, unless this is the first batch.
|
||||||
|
if bb.ticker != nil && bb.current != nil {
|
||||||
|
<-bb.ticker.C
|
||||||
|
}
|
||||||
|
filter := filters.NewFilter().SetStartSlot(nb.start).SetEndSlot(nb.end)
|
||||||
|
blks, roots, err := bb.db.Blocks(ctx, filter)
|
||||||
|
if err != nil {
|
||||||
|
return blockBatch{err: errors.Wrap(err, "Could not retrieve blocks")}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
rob := make([]blocks.ROBlock, 0)
|
||||||
|
if nb.start == 0 {
|
||||||
|
gb, err := bb.genesisBlock(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return blockBatch{err: errors.Wrap(err, "could not retrieve genesis block")}, false
|
||||||
|
}
|
||||||
|
rob = append(rob, gb)
|
||||||
|
}
|
||||||
|
for i := 0; i < len(blks); i++ {
|
||||||
|
rb, err := blocks.NewROBlockWithRoot(blks[i], roots[i])
|
||||||
|
if err != nil {
|
||||||
|
return blockBatch{err: errors.Wrap(err, "Could not initialize ROBlock")}, false
|
||||||
|
}
|
||||||
|
rob = append(rob, rb)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter and sort our retrieved blocks, so that we only return valid sets of blocks.
|
||||||
|
nb.lin, nb.nonlin, nb.err = bb.cf.filter(ctx, rob)
|
||||||
|
|
||||||
|
// Decrease allowed blocks capacity by the number of streamed blocks.
|
||||||
|
bb.limiter.add(stream, int64(1+nb.end.SubSlot(nb.start)))
|
||||||
|
bb.current = &nb
|
||||||
|
return *bb.current, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bb *blockRangeBatcher) genesisBlock(ctx context.Context) (blocks.ROBlock, error) {
|
||||||
|
b, err := bb.db.GenesisBlock(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return blocks.ROBlock{}, err
|
||||||
|
}
|
||||||
|
htr, err := b.Block().HashTreeRoot()
|
||||||
|
if err != nil {
|
||||||
|
return blocks.ROBlock{}, err
|
||||||
|
}
|
||||||
|
return blocks.NewROBlockWithRoot(b, htr)
|
||||||
|
}
|
||||||
|
|
||||||
|
type blockBatch struct {
|
||||||
|
start primitives.Slot
|
||||||
|
end primitives.Slot
|
||||||
|
lin []blocks.ROBlock // lin is a linear chain of blocks connected through parent_root. broken tails go in nonlin.
|
||||||
|
nonlin []blocks.ROBlock // if there is a break in the chain of parent->child relationships, the tail is stored here.
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBlockBatch(start, reqEnd primitives.Slot, size uint64) (blockBatch, bool) {
|
||||||
|
if start > reqEnd {
|
||||||
|
return blockBatch{}, false
|
||||||
|
}
|
||||||
|
nb := blockBatch{start: start, end: start.Add(size - 1)}
|
||||||
|
if nb.end > reqEnd {
|
||||||
|
nb.end = reqEnd
|
||||||
|
}
|
||||||
|
return nb, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bat blockBatch) next(reqEnd primitives.Slot, size uint64) (blockBatch, bool) {
|
||||||
|
if bat.error() != nil {
|
||||||
|
return bat, false
|
||||||
|
}
|
||||||
|
if bat.nonLinear() {
|
||||||
|
return blockBatch{}, false
|
||||||
|
}
|
||||||
|
return newBlockBatch(bat.end.Add(1), reqEnd, size)
|
||||||
|
}
|
||||||
|
|
||||||
|
// blocks returns the list of linear, canonical blocks read from the db.
|
||||||
|
func (bb blockBatch) canonical() []blocks.ROBlock {
|
||||||
|
return bb.lin
|
||||||
|
}
|
||||||
|
|
||||||
|
// nonLinear is used to determine if there was a break in the chain of canonical blocks as read from the db.
|
||||||
|
// If true, code using the blockBatch should stop serving additional batches of blocks.
|
||||||
|
func (bb blockBatch) nonLinear() bool {
|
||||||
|
return len(bb.nonlin) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bb blockBatch) error() error {
|
||||||
|
return bb.err
|
||||||
|
}
|
||||||
|
|
||||||
|
type canonicalChecker func(context.Context, [32]byte) (bool, error)
|
||||||
|
|
||||||
|
type canonicalFilter struct {
|
||||||
|
prevRoot [32]byte
|
||||||
|
canonical canonicalChecker
|
||||||
|
}
|
||||||
|
|
||||||
|
// filters all the provided blocks to ensure they are canonical and strictly linear.
|
||||||
|
func (cf *canonicalFilter) filter(ctx context.Context, blks []blocks.ROBlock) ([]blocks.ROBlock, []blocks.ROBlock, error) {
|
||||||
|
blks = sortedUniqueBlocks(blks)
|
||||||
|
seq := make([]blocks.ROBlock, 0, len(blks))
|
||||||
|
nseq := make([]blocks.ROBlock, 0)
|
||||||
|
for i, b := range blks {
|
||||||
|
cb, err := cf.canonical(ctx, b.Root())
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
if !cb {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// prevRoot will be the zero value until we find the first canonical block in the stream seen by an instance
|
||||||
|
// of canonicalFilter. filter is called in batches; prevRoot can be the last root from the previous batch.
|
||||||
|
first := cf.prevRoot == [32]byte{}
|
||||||
|
// We assume blocks are processed in order, so the previous canonical root should be the parent of the next.
|
||||||
|
if !first && cf.prevRoot != b.Block().ParentRoot() {
|
||||||
|
// If the current block isn't descended from the last, something is wrong. Append everything remaining
|
||||||
|
// to the list of non-linear blocks, and stop building the canonical list.
|
||||||
|
nseq = append(nseq, blks[i:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
seq = append(seq, blks[i])
|
||||||
|
// Set the previous root as the
|
||||||
|
// newly added block's root
|
||||||
|
cf.prevRoot = b.Root()
|
||||||
|
}
|
||||||
|
return seq, nseq, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns a copy of the []ROBlock list in sorted order with duplicates removed
|
||||||
|
func sortedUniqueBlocks(blks []blocks.ROBlock) []blocks.ROBlock {
|
||||||
|
// Remove duplicate blocks received
|
||||||
|
sort.Sort(blocks.ROBlockSlice(blks))
|
||||||
|
if len(blks) < 2 {
|
||||||
|
return blks
|
||||||
|
}
|
||||||
|
u := 0
|
||||||
|
for i := 1; i < len(blks); i++ {
|
||||||
|
if blks[i].Root() != blks[u].Root() {
|
||||||
|
u += 1
|
||||||
|
if u != i {
|
||||||
|
blks[u] = blks[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return blks[:u+1]
|
||||||
|
}
|
||||||
134
beacon-chain/sync/block_batcher_test.go
Normal file
134
beacon-chain/sync/block_batcher_test.go
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
package sync
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math/rand"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||||
|
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/testing/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSortedObj_SortBlocksRoots(t *testing.T) {
|
||||||
|
source := rand.NewSource(33)
|
||||||
|
randGen := rand.New(source)
|
||||||
|
randFunc := func() int64 {
|
||||||
|
return randGen.Int63n(50)
|
||||||
|
}
|
||||||
|
|
||||||
|
var blks []blocks.ROBlock
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
slot := primitives.Slot(randFunc())
|
||||||
|
newBlk, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: slot, Body: ðpb.BeaconBlockBody{}}})
|
||||||
|
require.NoError(t, err)
|
||||||
|
root := bytesutil.ToBytes32(bytesutil.Bytes32(uint64(slot)))
|
||||||
|
b, err := blocks.NewROBlockWithRoot(newBlk, root)
|
||||||
|
require.NoError(t, err)
|
||||||
|
blks = append(blks, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
newBlks := sortedUniqueBlocks(blks)
|
||||||
|
previousSlot := primitives.Slot(0)
|
||||||
|
for _, b := range newBlks {
|
||||||
|
if b.Block().Slot() < previousSlot {
|
||||||
|
t.Errorf("Block list is not sorted as %d is smaller than previousSlot %d", b.Block().Slot(), previousSlot)
|
||||||
|
}
|
||||||
|
previousSlot = b.Block().Slot()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSortedObj_NoDuplicates(t *testing.T) {
|
||||||
|
source := rand.NewSource(33)
|
||||||
|
randGen := rand.New(source)
|
||||||
|
var blks []blocks.ROBlock
|
||||||
|
randFunc := func() int64 {
|
||||||
|
return randGen.Int63n(50)
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
slot := primitives.Slot(randFunc())
|
||||||
|
newBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: slot, Body: ðpb.BeaconBlockBody{}}}
|
||||||
|
// append twice
|
||||||
|
wsb, err := blocks.NewSignedBeaconBlock(newBlk)
|
||||||
|
require.NoError(t, err)
|
||||||
|
wsbCopy, err := wsb.Copy()
|
||||||
|
require.NoError(t, err)
|
||||||
|
root := bytesutil.ToBytes32(bytesutil.Bytes32(uint64(slot)))
|
||||||
|
b, err := blocks.NewROBlockWithRoot(wsb, root)
|
||||||
|
require.NoError(t, err)
|
||||||
|
b2, err := blocks.NewROBlockWithRoot(wsbCopy, root)
|
||||||
|
require.NoError(t, err)
|
||||||
|
blks = append(blks, b, b2)
|
||||||
|
}
|
||||||
|
|
||||||
|
dedup := sortedUniqueBlocks(blks)
|
||||||
|
roots := make(map[[32]byte]int)
|
||||||
|
for i, b := range dedup {
|
||||||
|
if di, dup := roots[b.Root()]; dup {
|
||||||
|
t.Errorf("Duplicated root %#x at index %d and %d", b.Root(), di, i)
|
||||||
|
}
|
||||||
|
roots[b.Root()] = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBlockBatchNext(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
batch blockBatch
|
||||||
|
start primitives.Slot
|
||||||
|
reqEnd primitives.Slot
|
||||||
|
size uint64
|
||||||
|
next []blockBatch
|
||||||
|
more []bool
|
||||||
|
err error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "end aligned",
|
||||||
|
batch: blockBatch{start: 0, end: 20},
|
||||||
|
start: 0,
|
||||||
|
reqEnd: 40,
|
||||||
|
size: 20,
|
||||||
|
next: []blockBatch{
|
||||||
|
{start: 0, end: 19},
|
||||||
|
{start: 20, end: 39},
|
||||||
|
{start: 40, end: 40},
|
||||||
|
{},
|
||||||
|
},
|
||||||
|
more: []bool{true, true, true, false},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "batches with more",
|
||||||
|
batch: blockBatch{start: 0, end: 22},
|
||||||
|
start: 0,
|
||||||
|
reqEnd: 40,
|
||||||
|
size: 23,
|
||||||
|
next: []blockBatch{
|
||||||
|
{start: 0, end: 22},
|
||||||
|
{start: 23, end: 40},
|
||||||
|
{},
|
||||||
|
},
|
||||||
|
more: []bool{true, true, false},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
t.Run(c.name, func(t *testing.T) {
|
||||||
|
var next blockBatch
|
||||||
|
var more bool
|
||||||
|
i := 0
|
||||||
|
for next, more = newBlockBatch(c.start, c.reqEnd, c.size); more; next, more = next.next(c.reqEnd, c.size) {
|
||||||
|
exp := c.next[i]
|
||||||
|
require.Equal(t, c.more[i], more)
|
||||||
|
require.Equal(t, exp.start, next.start)
|
||||||
|
require.Equal(t, exp.end, next.end)
|
||||||
|
if exp.err != nil {
|
||||||
|
require.ErrorIs(t, next.err, exp.err)
|
||||||
|
} else {
|
||||||
|
require.NoError(t, next.err)
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -248,7 +248,7 @@ func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, ra
|
|||||||
if len(bestPeers) == 0 {
|
if len(bestPeers) == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
roots = s.dedupRoots(roots)
|
roots = dedupRoots(roots)
|
||||||
// Randomly choose a peer to query from our best peers. If that peer cannot return
|
// Randomly choose a peer to query from our best peers. If that peer cannot return
|
||||||
// all the requested blocks, we randomly select another peer.
|
// all the requested blocks, we randomly select another peer.
|
||||||
pid := bestPeers[randGen.Int()%len(bestPeers)]
|
pid := bestPeers[randGen.Int()%len(bestPeers)]
|
||||||
@@ -456,3 +456,16 @@ func slotToCacheKey(s primitives.Slot) string {
|
|||||||
b := bytesutil.SlotToBytesBigEndian(s)
|
b := bytesutil.SlotToBytesBigEndian(s)
|
||||||
return string(b)
|
return string(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func dedupRoots(roots [][32]byte) [][32]byte {
|
||||||
|
newRoots := make([][32]byte, 0, len(roots))
|
||||||
|
rootMap := make(map[[32]byte]bool, len(roots))
|
||||||
|
for i, r := range roots {
|
||||||
|
if rootMap[r] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
rootMap[r] = true
|
||||||
|
newRoots = append(newRoots, roots[i])
|
||||||
|
}
|
||||||
|
return newRoots
|
||||||
|
}
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import (
|
|||||||
|
|
||||||
libp2pcore "github.com/libp2p/go-libp2p/core"
|
libp2pcore "github.com/libp2p/go-libp2p/core"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filters"
|
|
||||||
p2ptypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types"
|
p2ptypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/types"
|
||||||
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
|
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
|
||||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||||
@@ -26,36 +25,18 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
SetRPCStreamDeadlines(stream)
|
SetRPCStreamDeadlines(stream)
|
||||||
|
|
||||||
// Ticker to stagger out large requests.
|
|
||||||
ticker := time.NewTicker(time.Second)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
m, ok := msg.(*pb.BeaconBlocksByRangeRequest)
|
m, ok := msg.(*pb.BeaconBlocksByRangeRequest)
|
||||||
if !ok {
|
if !ok {
|
||||||
return errors.New("message is not type *pb.BeaconBlockByRangeRequest")
|
return errors.New("message is not type *pb.BeaconBlockByRangeRequest")
|
||||||
}
|
}
|
||||||
if err := s.validateRangeRequest(m); err != nil {
|
log.WithField("start-slot", m.StartSlot).WithField("count", m.Count).Debug("BeaconBlocksByRangeRequest")
|
||||||
|
rp, err := validateRangeRequest(m, s.cfg.clock.CurrentSlot())
|
||||||
|
if err != nil {
|
||||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
|
s.writeErrorResponseToStream(responseCodeInvalidRequest, err.Error(), stream)
|
||||||
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
|
||||||
tracing.AnnotateError(span, err)
|
tracing.AnnotateError(span, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Only have range requests with a step of 1 being processed.
|
|
||||||
if m.Step > 1 {
|
|
||||||
m.Step = 1
|
|
||||||
}
|
|
||||||
// The initial count for the first batch to be returned back.
|
|
||||||
count := m.Count
|
|
||||||
allowedBlocksPerSecond := uint64(flags.Get().BlockBatchLimit)
|
|
||||||
if count > allowedBlocksPerSecond {
|
|
||||||
count = allowedBlocksPerSecond
|
|
||||||
}
|
|
||||||
// initial batch start and end slots to be returned to remote peer.
|
|
||||||
startSlot := m.StartSlot
|
|
||||||
endSlot := startSlot.Add(m.Step * (count - 1))
|
|
||||||
|
|
||||||
// The final requested slot from remote peer.
|
|
||||||
endReqSlot := startSlot.Add(m.Step * (m.Count - 1))
|
|
||||||
|
|
||||||
blockLimiter, err := s.rateLimiter.topicCollector(string(stream.Protocol()))
|
blockLimiter, err := s.rateLimiter.topicCollector(string(stream.Protocol()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -63,129 +44,115 @@ func (s *Service) beaconBlocksByRangeRPCHandler(ctx context.Context, msg interfa
|
|||||||
}
|
}
|
||||||
remainingBucketCapacity := blockLimiter.Remaining(stream.Conn().RemotePeer().String())
|
remainingBucketCapacity := blockLimiter.Remaining(stream.Conn().RemotePeer().String())
|
||||||
span.AddAttributes(
|
span.AddAttributes(
|
||||||
trace.Int64Attribute("start", int64(startSlot)), // lint:ignore uintcast -- This conversion is OK for tracing.
|
trace.Int64Attribute("start", int64(rp.start)), // lint:ignore uintcast -- This conversion is OK for tracing.
|
||||||
trace.Int64Attribute("end", int64(endReqSlot)), // lint:ignore uintcast -- This conversion is OK for tracing.
|
trace.Int64Attribute("end", int64(rp.end)), // lint:ignore uintcast -- This conversion is OK for tracing.
|
||||||
trace.Int64Attribute("step", int64(m.Step)),
|
|
||||||
trace.Int64Attribute("count", int64(m.Count)),
|
trace.Int64Attribute("count", int64(m.Count)),
|
||||||
trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()),
|
trace.StringAttribute("peer", stream.Conn().RemotePeer().Pretty()),
|
||||||
trace.Int64Attribute("remaining_capacity", remainingBucketCapacity),
|
trace.Int64Attribute("remaining_capacity", remainingBucketCapacity),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Ticker to stagger out large requests.
|
||||||
|
ticker := time.NewTicker(time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
batcher, err := newBlockRangeBatcher(rp, s.cfg.beaconDB, s.rateLimiter, s.cfg.chain.IsCanonical, ticker)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Info("error in BlocksByRange batch")
|
||||||
|
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
||||||
|
tracing.AnnotateError(span, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// prevRoot is used to ensure that returned chains are strictly linear for singular steps
|
// prevRoot is used to ensure that returned chains are strictly linear for singular steps
|
||||||
// by comparing the previous root of the block in the list with the current block's parent.
|
// by comparing the previous root of the block in the list with the current block's parent.
|
||||||
var prevRoot [32]byte
|
var batch blockBatch
|
||||||
for startSlot <= endReqSlot {
|
var more bool
|
||||||
if err := s.rateLimiter.validateRequest(stream, allowedBlocksPerSecond); err != nil {
|
for batch, more = batcher.next(ctx, stream); more; batch, more = batcher.next(ctx, stream) {
|
||||||
tracing.AnnotateError(span, err)
|
batchStart := time.Now()
|
||||||
|
if err := s.writeBlockBatchToStream(ctx, batch, stream); err != nil {
|
||||||
|
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
rpcBlocksByRangeResponseLatency.Observe(float64(time.Since(batchStart).Milliseconds()))
|
||||||
if endSlot-startSlot > rangeLimit {
|
}
|
||||||
s.writeErrorResponseToStream(responseCodeInvalidRequest, p2ptypes.ErrInvalidRequest.Error(), stream)
|
if err := batch.error(); err != nil {
|
||||||
err := p2ptypes.ErrInvalidRequest
|
log.WithError(err).Info("error in BlocksByRange batch")
|
||||||
tracing.AnnotateError(span, err)
|
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
||||||
return err
|
tracing.AnnotateError(span, err)
|
||||||
}
|
return err
|
||||||
|
|
||||||
err := s.writeBlockRangeToStream(ctx, startSlot, endSlot, m.Step, &prevRoot, stream)
|
|
||||||
if err != nil && !errors.Is(err, p2ptypes.ErrInvalidParent) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Reduce capacity of peer in the rate limiter first.
|
|
||||||
// Decrease allowed blocks capacity by the number of streamed blocks.
|
|
||||||
if startSlot <= endSlot {
|
|
||||||
s.rateLimiter.add(stream, int64(1+endSlot.SubSlot(startSlot).Div(m.Step)))
|
|
||||||
}
|
|
||||||
// Exit in the event we have a disjoint chain to
|
|
||||||
// return.
|
|
||||||
if errors.Is(err, p2ptypes.ErrInvalidParent) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// Recalculate start and end slots for the next batch to be returned to the remote peer.
|
|
||||||
startSlot = endSlot.Add(m.Step)
|
|
||||||
endSlot = startSlot.Add(m.Step * (allowedBlocksPerSecond - 1))
|
|
||||||
if endSlot > endReqSlot {
|
|
||||||
endSlot = endReqSlot
|
|
||||||
}
|
|
||||||
|
|
||||||
// do not wait if all blocks have already been sent.
|
|
||||||
if startSlot > endReqSlot {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for ticker before resuming streaming blocks to remote peer.
|
|
||||||
<-ticker.C
|
|
||||||
}
|
}
|
||||||
closeStream(stream, log)
|
closeStream(stream, log)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlot primitives.Slot, step uint64,
|
type rangeParams struct {
|
||||||
prevRoot *[32]byte, stream libp2pcore.Stream) error {
|
start primitives.Slot
|
||||||
|
end primitives.Slot
|
||||||
|
size uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateRangeRequest(r *pb.BeaconBlocksByRangeRequest, current primitives.Slot) (rangeParams, error) {
|
||||||
|
rp := rangeParams{
|
||||||
|
start: r.StartSlot,
|
||||||
|
size: r.Count,
|
||||||
|
}
|
||||||
|
maxRequest := params.BeaconNetworkConfig().MaxRequestBlocks
|
||||||
|
// Ensure all request params are within appropriate bounds
|
||||||
|
if rp.size == 0 || rp.size > maxRequest {
|
||||||
|
return rangeParams{}, p2ptypes.ErrInvalidRequest
|
||||||
|
}
|
||||||
|
// Allow some wiggle room, up to double the MaxRequestBlocks past the current slot,
|
||||||
|
// to give nodes syncing close to the head of the chain some margin for error.
|
||||||
|
maxStart, err := current.SafeAdd(maxRequest * 2)
|
||||||
|
if err != nil {
|
||||||
|
return rangeParams{}, p2ptypes.ErrInvalidRequest
|
||||||
|
}
|
||||||
|
if rp.start > maxStart {
|
||||||
|
return rangeParams{}, p2ptypes.ErrInvalidRequest
|
||||||
|
}
|
||||||
|
rp.end, err = rp.start.SafeAdd((rp.size - 1))
|
||||||
|
if err != nil {
|
||||||
|
return rangeParams{}, p2ptypes.ErrInvalidRequest
|
||||||
|
}
|
||||||
|
|
||||||
|
limit := uint64(flags.Get().BlockBatchLimit)
|
||||||
|
if limit > maxRequest {
|
||||||
|
limit = maxRequest
|
||||||
|
}
|
||||||
|
if rp.size > limit {
|
||||||
|
rp.size = limit
|
||||||
|
}
|
||||||
|
|
||||||
|
return rp, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Service) writeBlockBatchToStream(ctx context.Context, batch blockBatch, stream libp2pcore.Stream) error {
|
||||||
ctx, span := trace.StartSpan(ctx, "sync.WriteBlockRangeToStream")
|
ctx, span := trace.StartSpan(ctx, "sync.WriteBlockRangeToStream")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot).SetSlotStep(step)
|
blinded := make([]interfaces.ReadOnlySignedBeaconBlock, 0)
|
||||||
blks, roots, err := s.cfg.beaconDB.Blocks(ctx, filter)
|
for _, b := range batch.canonical() {
|
||||||
if err != nil {
|
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
||||||
log.WithError(err).Debug("Could not retrieve blocks")
|
continue
|
||||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
|
||||||
tracing.AnnotateError(span, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// handle genesis case
|
|
||||||
if startSlot == 0 {
|
|
||||||
genBlock, genRoot, err := s.retrieveGenesisBlock(ctx)
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Debug("Could not retrieve genesis block")
|
|
||||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
|
||||||
tracing.AnnotateError(span, err)
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
blks = append([]interfaces.ReadOnlySignedBeaconBlock{genBlock}, blks...)
|
|
||||||
roots = append([][32]byte{genRoot}, roots...)
|
|
||||||
}
|
|
||||||
// Filter and sort our retrieved blocks, so that
|
|
||||||
// we only return valid sets of blocks.
|
|
||||||
blks, roots, err = s.dedupBlocksAndRoots(blks, roots)
|
|
||||||
if err != nil {
|
|
||||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
|
||||||
tracing.AnnotateError(span, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
blks, roots = s.sortBlocksAndRoots(blks, roots)
|
|
||||||
|
|
||||||
blks, err = s.filterBlocks(ctx, blks, roots, prevRoot, step, startSlot)
|
|
||||||
if err != nil && err != p2ptypes.ErrInvalidParent {
|
|
||||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
|
||||||
tracing.AnnotateError(span, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
start := time.Now()
|
|
||||||
// If the blocks are blinded, we reconstruct the full block via the execution client.
|
|
||||||
blindedExists := false
|
|
||||||
blindedIndex := 0
|
|
||||||
for i, b := range blks {
|
|
||||||
// Since the blocks are sorted in ascending order, we assume that the following
|
|
||||||
// blocks from the first blinded block are also ascending.
|
|
||||||
if b.IsBlinded() {
|
if b.IsBlinded() {
|
||||||
blindedExists = true
|
blinded = append(blinded, b.ReadOnlySignedBeaconBlock)
|
||||||
blindedIndex = i
|
continue
|
||||||
break
|
}
|
||||||
|
if chunkErr := s.chunkBlockWriter(stream, b); chunkErr != nil {
|
||||||
|
log.WithError(chunkErr).Error("Could not send a chunked response")
|
||||||
|
return chunkErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if len(blinded) == 0 {
|
||||||
var reconstructedBlock []interfaces.SignedBeaconBlock
|
return nil
|
||||||
if blindedExists {
|
|
||||||
reconstructedBlock, err = s.cfg.executionPayloadReconstructor.ReconstructFullBellatrixBlockBatch(ctx, blks[blindedIndex:])
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("Could not reconstruct full bellatrix block batch from blinded bodies")
|
|
||||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, b := range blks {
|
reconstructed, err := s.cfg.executionPayloadReconstructor.ReconstructFullBellatrixBlockBatch(ctx, blinded)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Error("Could not reconstruct full bellatrix block batch from blinded bodies")
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, b := range reconstructed {
|
||||||
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -194,115 +161,9 @@ func (s *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlo
|
|||||||
}
|
}
|
||||||
if chunkErr := s.chunkBlockWriter(stream, b); chunkErr != nil {
|
if chunkErr := s.chunkBlockWriter(stream, b); chunkErr != nil {
|
||||||
log.WithError(chunkErr).Debug("Could not send a chunked response")
|
log.WithError(chunkErr).Debug("Could not send a chunked response")
|
||||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
|
||||||
tracing.AnnotateError(span, chunkErr)
|
|
||||||
return chunkErr
|
return chunkErr
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, b := range reconstructedBlock {
|
|
||||||
if err := blocks.BeaconBlockIsNil(b); err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if b.IsBlinded() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if chunkErr := s.chunkBlockWriter(stream, b); chunkErr != nil {
|
|
||||||
log.WithError(chunkErr).Debug("Could not send a chunked response")
|
|
||||||
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
|
|
||||||
tracing.AnnotateError(span, chunkErr)
|
|
||||||
return chunkErr
|
|
||||||
}
|
|
||||||
}
|
|
||||||
rpcBlocksByRangeResponseLatency.Observe(float64(time.Since(start).Milliseconds()))
|
|
||||||
// Return error in the event we have an invalid parent.
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) validateRangeRequest(r *pb.BeaconBlocksByRangeRequest) error {
|
|
||||||
startSlot := r.StartSlot
|
|
||||||
count := r.Count
|
|
||||||
step := r.Step
|
|
||||||
|
|
||||||
maxRequestBlocks := params.BeaconNetworkConfig().MaxRequestBlocks
|
|
||||||
// Add a buffer for possible large range requests from nodes syncing close to the
|
|
||||||
// head of the chain.
|
|
||||||
buffer := rangeLimit * 2
|
|
||||||
highestExpectedSlot := s.cfg.clock.CurrentSlot().Add(uint64(buffer))
|
|
||||||
|
|
||||||
// Ensure all request params are within appropriate bounds
|
|
||||||
if count == 0 || count > maxRequestBlocks {
|
|
||||||
return p2ptypes.ErrInvalidRequest
|
|
||||||
}
|
|
||||||
|
|
||||||
if step == 0 || step > rangeLimit {
|
|
||||||
return p2ptypes.ErrInvalidRequest
|
|
||||||
}
|
|
||||||
|
|
||||||
if startSlot > highestExpectedSlot {
|
|
||||||
return p2ptypes.ErrInvalidRequest
|
|
||||||
}
|
|
||||||
|
|
||||||
endSlot := startSlot.Add(step * (count - 1))
|
|
||||||
if endSlot-startSlot > rangeLimit {
|
|
||||||
return p2ptypes.ErrInvalidRequest
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// filters all the provided blocks to ensure they are canonical
|
|
||||||
// and are strictly linear.
|
|
||||||
func (s *Service) filterBlocks(ctx context.Context, blks []interfaces.ReadOnlySignedBeaconBlock, roots [][32]byte, prevRoot *[32]byte,
|
|
||||||
step uint64, startSlot primitives.Slot) ([]interfaces.ReadOnlySignedBeaconBlock, error) {
|
|
||||||
if len(blks) != len(roots) {
|
|
||||||
return nil, errors.New("input blks and roots are diff lengths")
|
|
||||||
}
|
|
||||||
|
|
||||||
newBlks := make([]interfaces.ReadOnlySignedBeaconBlock, 0, len(blks))
|
|
||||||
for i, b := range blks {
|
|
||||||
isCanonical, err := s.cfg.chain.IsCanonical(ctx, roots[i])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
parentValid := *prevRoot != [32]byte{}
|
|
||||||
isLinear := *prevRoot == b.Block().ParentRoot()
|
|
||||||
isSingular := step == 1
|
|
||||||
slotDiff, err := b.Block().Slot().SafeSubSlot(startSlot)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
slotDiff, err = slotDiff.SafeMod(step)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
isRequestedSlotStep := slotDiff == 0
|
|
||||||
if isRequestedSlotStep && isCanonical {
|
|
||||||
// Exit early if our valid block is non linear.
|
|
||||||
if parentValid && isSingular && !isLinear {
|
|
||||||
return newBlks, p2ptypes.ErrInvalidParent
|
|
||||||
}
|
|
||||||
newBlks = append(newBlks, blks[i])
|
|
||||||
// Set the previous root as the
|
|
||||||
// newly added block's root
|
|
||||||
currRoot := roots[i]
|
|
||||||
*prevRoot = currRoot
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return newBlks, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) writeErrorResponseToStream(responseCode byte, reason string, stream libp2pcore.Stream) {
|
|
||||||
writeErrorResponseToStream(responseCode, reason, stream, s.cfg.p2p)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) retrieveGenesisBlock(ctx context.Context) (interfaces.ReadOnlySignedBeaconBlock, [32]byte, error) {
|
|
||||||
genBlock, err := s.cfg.beaconDB.GenesisBlock(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, [32]byte{}, err
|
|
||||||
}
|
|
||||||
genRoot, err := genBlock.Block().HashTreeRoot()
|
|
||||||
if err != nil {
|
|
||||||
return nil, [32]byte{}, err
|
|
||||||
}
|
|
||||||
return genBlock, genRoot, nil
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -25,7 +25,6 @@ import (
|
|||||||
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
|
||||||
"github.com/prysmaticlabs/prysm/v4/config/params"
|
"github.com/prysmaticlabs/prysm/v4/config/params"
|
||||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
||||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
|
||||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||||
leakybucket "github.com/prysmaticlabs/prysm/v4/container/leaky-bucket"
|
leakybucket "github.com/prysmaticlabs/prysm/v4/container/leaky-bucket"
|
||||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||||
@@ -63,14 +62,9 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsBlocks(t *testing.T) {
|
|||||||
util.SaveBlock(t, context.Background(), d, blk)
|
util.SaveBlock(t, context.Background(), d, blk)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
||||||
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
|
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
|
||||||
r := &Service{
|
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
|
||||||
cfg: &config{
|
|
||||||
p2p: p1, beaconDB: d, chain: &chainMock.ChainService{},
|
|
||||||
clock: startup.NewClock(time.Unix(0, 0), [32]byte{}),
|
|
||||||
},
|
|
||||||
rateLimiter: newRateLimiter(p1),
|
|
||||||
}
|
|
||||||
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
|
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
|
||||||
topic := string(pcl)
|
topic := string(pcl)
|
||||||
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false)
|
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false)
|
||||||
@@ -131,8 +125,8 @@ func TestRPCBeaconBlocksByRange_ReturnCorrectNumberBack(t *testing.T) {
|
|||||||
}
|
}
|
||||||
require.NoError(t, d.SaveGenesisBlockRoot(context.Background(), genRoot))
|
require.NoError(t, d.SaveGenesisBlockRoot(context.Background(), genRoot))
|
||||||
|
|
||||||
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
|
|
||||||
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
||||||
|
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
|
||||||
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
|
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
|
||||||
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
|
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
|
||||||
topic := string(pcl)
|
topic := string(pcl)
|
||||||
@@ -240,14 +234,15 @@ func TestRPCBeaconBlocksByRange_ReconstructsPayloads(t *testing.T) {
|
|||||||
}
|
}
|
||||||
require.NoError(t, d.SaveGenesisBlockRoot(context.Background(), genRoot))
|
require.NoError(t, d.SaveGenesisBlockRoot(context.Background(), genRoot))
|
||||||
|
|
||||||
|
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
||||||
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
|
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
|
||||||
r := &Service{
|
r := &Service{
|
||||||
cfg: &config{
|
cfg: &config{
|
||||||
p2p: p1,
|
p2p: p1,
|
||||||
beaconDB: d,
|
beaconDB: d,
|
||||||
chain: &chainMock.ChainService{},
|
chain: &chainMock.ChainService{},
|
||||||
|
clock: clock,
|
||||||
executionPayloadReconstructor: mockEngine,
|
executionPayloadReconstructor: mockEngine,
|
||||||
clock: startup.NewClock(time.Unix(0, 0), [32]byte{}),
|
|
||||||
},
|
},
|
||||||
rateLimiter: newRateLimiter(p1),
|
rateLimiter: newRateLimiter(p1),
|
||||||
}
|
}
|
||||||
@@ -317,9 +312,9 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsSortedBlocks(t *testing.T) {
|
|||||||
j++
|
j++
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
|
|
||||||
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
||||||
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
|
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
|
||||||
|
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
|
||||||
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
|
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
|
||||||
topic := string(pcl)
|
topic := string(pcl)
|
||||||
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false)
|
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), time.Second, false)
|
||||||
@@ -384,7 +379,8 @@ func TestRPCBeaconBlocksByRange_ReturnsGenesisBlock(t *testing.T) {
|
|||||||
prevRoot = rt
|
prevRoot = rt
|
||||||
}
|
}
|
||||||
|
|
||||||
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: startup.NewClock(time.Unix(0, 0), [32]byte{})}, rateLimiter: newRateLimiter(p1)}
|
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
||||||
|
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
|
||||||
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
|
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
|
||||||
topic := string(pcl)
|
topic := string(pcl)
|
||||||
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false)
|
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, time.Second, false)
|
||||||
@@ -503,7 +499,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
|
|||||||
|
|
||||||
capacity := int64(flags.Get().BlockBatchLimit * 3)
|
capacity := int64(flags.Get().BlockBatchLimit * 3)
|
||||||
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
||||||
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
|
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
|
||||||
|
|
||||||
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
|
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
|
||||||
topic := string(pcl)
|
topic := string(pcl)
|
||||||
@@ -534,7 +530,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
|
|||||||
|
|
||||||
capacity := int64(flags.Get().BlockBatchLimit * flags.Get().BlockBatchLimitBurstFactor)
|
capacity := int64(flags.Get().BlockBatchLimit * flags.Get().BlockBatchLimitBurstFactor)
|
||||||
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
||||||
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
|
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
|
||||||
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
|
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
|
||||||
topic := string(pcl)
|
topic := string(pcl)
|
||||||
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, time.Second, false)
|
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, time.Second, false)
|
||||||
@@ -565,15 +561,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
|
|||||||
func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) {
|
func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) {
|
||||||
slotsSinceGenesis := primitives.Slot(1000)
|
slotsSinceGenesis := primitives.Slot(1000)
|
||||||
offset := int64(slotsSinceGenesis.Mul(params.BeaconConfig().SecondsPerSlot))
|
offset := int64(slotsSinceGenesis.Mul(params.BeaconConfig().SecondsPerSlot))
|
||||||
chain := &chainMock.ChainService{
|
clock := startup.NewClock(time.Now().Add(time.Second*time.Duration(-1*offset)), [32]byte{})
|
||||||
Genesis: time.Now().Add(time.Second * time.Duration(-1*offset)),
|
|
||||||
}
|
|
||||||
r := &Service{
|
|
||||||
cfg: &config{
|
|
||||||
chain: chain,
|
|
||||||
clock: startup.NewClock(chain.Genesis, chain.ValidatorsRoot),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
@@ -613,8 +601,7 @@ func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) {
|
|||||||
Step: 0,
|
Step: 0,
|
||||||
Count: 1,
|
Count: 1,
|
||||||
},
|
},
|
||||||
expectedError: p2ptypes.ErrInvalidRequest,
|
expectedError: nil, // The Step param is ignored in v2 RPC
|
||||||
errorToLog: "validation did not fail with bad step",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Over limit Step",
|
name: "Over limit Step",
|
||||||
@@ -622,8 +609,7 @@ func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) {
|
|||||||
Step: rangeLimit + 1,
|
Step: rangeLimit + 1,
|
||||||
Count: 1,
|
Count: 1,
|
||||||
},
|
},
|
||||||
expectedError: p2ptypes.ErrInvalidRequest,
|
expectedError: nil, // The Step param is ignored in v2 RPC
|
||||||
errorToLog: "validation did not fail with bad step",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Correct Step",
|
name: "Correct Step",
|
||||||
@@ -658,8 +644,7 @@ func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) {
|
|||||||
Step: 3,
|
Step: 3,
|
||||||
Count: uint64(slotsSinceGenesis / 2),
|
Count: uint64(slotsSinceGenesis / 2),
|
||||||
},
|
},
|
||||||
expectedError: p2ptypes.ErrInvalidRequest,
|
expectedError: nil, // this is fine with the deprecation of Step
|
||||||
errorToLog: "validation did not fail with bad range",
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "Valid Request",
|
name: "Valid Request",
|
||||||
@@ -674,10 +659,11 @@ func TestRPCBeaconBlocksByRange_validateRangeRequest(t *testing.T) {
|
|||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
_, err := validateRangeRequest(tt.req, clock.CurrentSlot())
|
||||||
if tt.expectedError != nil {
|
if tt.expectedError != nil {
|
||||||
assert.ErrorContains(t, tt.expectedError.Error(), r.validateRangeRequest(tt.req), tt.errorToLog)
|
assert.ErrorContains(t, tt.expectedError.Error(), err, tt.errorToLog)
|
||||||
} else {
|
} else {
|
||||||
assert.NoError(t, r.validateRangeRequest(tt.req), tt.errorToLog)
|
assert.NoError(t, err, tt.errorToLog)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -905,7 +891,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
|
|||||||
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
|
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
|
||||||
|
|
||||||
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
||||||
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
|
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
|
||||||
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
|
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
|
||||||
req := ðpb.BeaconBlocksByRangeRequest{
|
req := ðpb.BeaconBlocksByRangeRequest{
|
||||||
StartSlot: 1,
|
StartSlot: 1,
|
||||||
@@ -937,7 +923,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
|
|||||||
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
|
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
|
||||||
|
|
||||||
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
||||||
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
|
r := &Service{cfg: &config{p2p: p1, beaconDB: d, clock: clock, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
|
||||||
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
|
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
|
||||||
req := ðpb.BeaconBlocksByRangeRequest{
|
req := ðpb.BeaconBlocksByRangeRequest{
|
||||||
StartSlot: 1,
|
StartSlot: 1,
|
||||||
@@ -971,7 +957,6 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
|
|||||||
|
|
||||||
p1.Connect(p2)
|
p1.Connect(p2)
|
||||||
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
|
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
|
||||||
|
|
||||||
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
clock := startup.NewClock(time.Unix(0, 0), [32]byte{})
|
||||||
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
|
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}, clock: clock}, rateLimiter: newRateLimiter(p1)}
|
||||||
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
|
r.rateLimiter.limiterMap[string(pcl)] = leakybucket.NewCollector(0.000001, 640, time.Second, false)
|
||||||
@@ -1087,12 +1072,6 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestRPCBeaconBlocksByRange_FilterBlocks_PreviousRoot(t *testing.T) {
|
func TestRPCBeaconBlocksByRange_FilterBlocks_PreviousRoot(t *testing.T) {
|
||||||
p1 := p2ptest.NewTestP2P(t)
|
|
||||||
p2 := p2ptest.NewTestP2P(t)
|
|
||||||
p1.Connect(p2)
|
|
||||||
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
|
|
||||||
d := db.SetupDB(t)
|
|
||||||
|
|
||||||
req := ðpb.BeaconBlocksByRangeRequest{
|
req := ðpb.BeaconBlocksByRangeRequest{
|
||||||
StartSlot: 100,
|
StartSlot: 100,
|
||||||
Step: 1,
|
Step: 1,
|
||||||
@@ -1102,8 +1081,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks_PreviousRoot(t *testing.T) {
|
|||||||
// Populate the database with blocks that would match the request.
|
// Populate the database with blocks that would match the request.
|
||||||
var prevRoot [32]byte
|
var prevRoot [32]byte
|
||||||
var err error
|
var err error
|
||||||
blks := []interfaces.ReadOnlySignedBeaconBlock{}
|
var blks []blocks.ROBlock
|
||||||
var roots [][32]byte
|
|
||||||
for i := req.StartSlot; i < req.StartSlot.Add(req.Count); i += primitives.Slot(1) {
|
for i := req.StartSlot; i < req.StartSlot.Add(req.Count); i += primitives.Slot(1) {
|
||||||
blk := util.NewBeaconBlock()
|
blk := util.NewBeaconBlock()
|
||||||
blk.Block.Slot = i
|
blk.Block.Slot = i
|
||||||
@@ -1112,21 +1090,19 @@ func TestRPCBeaconBlocksByRange_FilterBlocks_PreviousRoot(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
wsb, err := blocks.NewSignedBeaconBlock(blk)
|
wsb, err := blocks.NewSignedBeaconBlock(blk)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
blks = append(blks, wsb)
|
|
||||||
copiedRt := prevRoot
|
copiedRt := prevRoot
|
||||||
roots = append(roots, copiedRt)
|
b, err := blocks.NewROBlockWithRoot(wsb, copiedRt)
|
||||||
|
require.NoError(t, err)
|
||||||
|
blks = append(blks, b)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
|
chain := &chainMock.ChainService{}
|
||||||
r := &Service{cfg: &config{p2p: p1, beaconDB: d, chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
|
cf := canonicalFilter{canonical: chain.IsCanonical}
|
||||||
|
seq, nseq, err := cf.filter(context.Background(), blks)
|
||||||
var initialRoot [32]byte
|
|
||||||
ptrRt := &initialRoot
|
|
||||||
newBlks, err := r.filterBlocks(context.Background(), blks, roots, ptrRt, req.Step, req.StartSlot)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, len(blks), len(newBlks))
|
require.Equal(t, len(blks), len(seq))
|
||||||
|
require.Equal(t, 0, len(nseq))
|
||||||
|
|
||||||
// pointer should reference a new root.
|
// pointer should reference a new root.
|
||||||
require.NotEqual(t, *ptrRt, [32]byte{})
|
require.NotEqual(t, cf.prevRoot, [32]byte{})
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
56
beacon-chain/sync/rpc_handler_test.go
Normal file
56
beacon-chain/sync/rpc_handler_test.go
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
package sync
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-libp2p/core/network"
|
||||||
|
"github.com/libp2p/go-libp2p/core/protocol"
|
||||||
|
p2ptest "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/testing"
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/testing/require"
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/testing/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type rpcHandlerTest struct {
|
||||||
|
t *testing.T
|
||||||
|
topic protocol.ID
|
||||||
|
timeout time.Duration
|
||||||
|
err error
|
||||||
|
s *Service
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rt *rpcHandlerTest) testHandler(nh network.StreamHandler, rh rpcHandler, rhi interface{}) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), rt.timeout)
|
||||||
|
defer func() {
|
||||||
|
cancel()
|
||||||
|
}()
|
||||||
|
|
||||||
|
w := util.NewWaiter()
|
||||||
|
server := p2ptest.NewTestP2P(rt.t)
|
||||||
|
|
||||||
|
client, ok := rt.s.cfg.p2p.(*p2ptest.TestP2P)
|
||||||
|
require.Equal(rt.t, true, ok)
|
||||||
|
|
||||||
|
client.Connect(server)
|
||||||
|
defer func() {
|
||||||
|
require.NoError(rt.t, client.Disconnect(server.PeerID()))
|
||||||
|
}()
|
||||||
|
require.Equal(rt.t, 1, len(client.BHost.Network().Peers()), "Expected peers to be connected")
|
||||||
|
h := func(stream network.Stream) {
|
||||||
|
defer w.Done()
|
||||||
|
nh(stream)
|
||||||
|
}
|
||||||
|
server.BHost.SetStreamHandler(protocol.ID(rt.topic), h)
|
||||||
|
stream, err := client.BHost.NewStream(ctx, server.BHost.ID(), protocol.ID(rt.topic))
|
||||||
|
require.NoError(rt.t, err)
|
||||||
|
|
||||||
|
err = rh(ctx, rhi, stream)
|
||||||
|
if rt.err == nil {
|
||||||
|
require.NoError(rt.t, err)
|
||||||
|
} else {
|
||||||
|
require.ErrorIs(rt.t, err, rt.err)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.RequireDoneBeforeCancel(ctx, rt.t)
|
||||||
|
}
|
||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
|
|
||||||
lru "github.com/hashicorp/golang-lru"
|
lru "github.com/hashicorp/golang-lru"
|
||||||
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
||||||
|
libp2pcore "github.com/libp2p/go-libp2p/core"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/libp2p/go-libp2p/core/protocol"
|
"github.com/libp2p/go-libp2p/core/protocol"
|
||||||
gcache "github.com/patrickmn/go-cache"
|
gcache "github.com/patrickmn/go-cache"
|
||||||
@@ -41,7 +42,7 @@ import (
|
|||||||
|
|
||||||
var _ runtime.Service = (*Service)(nil)
|
var _ runtime.Service = (*Service)(nil)
|
||||||
|
|
||||||
const rangeLimit = 1024
|
const rangeLimit uint64 = 1024
|
||||||
const seenBlockSize = 1000
|
const seenBlockSize = 1000
|
||||||
const seenUnaggregatedAttSize = 20000
|
const seenUnaggregatedAttSize = 20000
|
||||||
const seenAggregatedAttSize = 1024
|
const seenAggregatedAttSize = 1024
|
||||||
@@ -272,6 +273,10 @@ func (s *Service) registerHandlers() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) writeErrorResponseToStream(responseCode byte, reason string, stream libp2pcore.Stream) {
|
||||||
|
writeErrorResponseToStream(responseCode, reason, stream, s.cfg.p2p)
|
||||||
|
}
|
||||||
|
|
||||||
// marks the chain as having started.
|
// marks the chain as having started.
|
||||||
func (s *Service) markForChainStart() {
|
func (s *Service) markForChainStart() {
|
||||||
s.chainStarted.Set()
|
s.chainStarted.Set()
|
||||||
|
|||||||
@@ -1,77 +0,0 @@
|
|||||||
package sync
|
|
||||||
|
|
||||||
import (
|
|
||||||
"errors"
|
|
||||||
"sort"
|
|
||||||
|
|
||||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
|
||||||
)
|
|
||||||
|
|
||||||
// A type to represent beacon blocks and roots which have methods
|
|
||||||
// which satisfy the Interface in `Sort` so that this type can
|
|
||||||
// be sorted in ascending order.
|
|
||||||
type sortedObj struct {
|
|
||||||
blks []interfaces.ReadOnlySignedBeaconBlock
|
|
||||||
roots [][32]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
// Less reports whether the element with index i must sort before the element with index j.
|
|
||||||
func (s sortedObj) Less(i, j int) bool {
|
|
||||||
return s.blks[i].Block().Slot() < s.blks[j].Block().Slot()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Swap swaps the elements with indexes i and j.
|
|
||||||
func (s sortedObj) Swap(i, j int) {
|
|
||||||
s.blks[i], s.blks[j] = s.blks[j], s.blks[i]
|
|
||||||
s.roots[i], s.roots[j] = s.roots[j], s.roots[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Len is the number of elements in the collection.
|
|
||||||
func (s sortedObj) Len() int {
|
|
||||||
return len(s.blks)
|
|
||||||
}
|
|
||||||
|
|
||||||
// removes duplicates from provided blocks and roots.
|
|
||||||
func (_ *Service) dedupBlocksAndRoots(blks []interfaces.ReadOnlySignedBeaconBlock, roots [][32]byte) ([]interfaces.ReadOnlySignedBeaconBlock, [][32]byte, error) {
|
|
||||||
if len(blks) != len(roots) {
|
|
||||||
return nil, nil, errors.New("input blks and roots are diff lengths")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove duplicate blocks received
|
|
||||||
rootMap := make(map[[32]byte]bool, len(blks))
|
|
||||||
newBlks := make([]interfaces.ReadOnlySignedBeaconBlock, 0, len(blks))
|
|
||||||
newRoots := make([][32]byte, 0, len(roots))
|
|
||||||
for i, r := range roots {
|
|
||||||
if rootMap[r] {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
rootMap[r] = true
|
|
||||||
newRoots = append(newRoots, roots[i])
|
|
||||||
newBlks = append(newBlks, blks[i])
|
|
||||||
}
|
|
||||||
return newBlks, newRoots, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (_ *Service) dedupRoots(roots [][32]byte) [][32]byte {
|
|
||||||
newRoots := make([][32]byte, 0, len(roots))
|
|
||||||
rootMap := make(map[[32]byte]bool, len(roots))
|
|
||||||
for i, r := range roots {
|
|
||||||
if rootMap[r] {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
rootMap[r] = true
|
|
||||||
newRoots = append(newRoots, roots[i])
|
|
||||||
}
|
|
||||||
return newRoots
|
|
||||||
}
|
|
||||||
|
|
||||||
// sort the provided blocks and roots in ascending order. This method assumes that the size of
|
|
||||||
// block slice and root slice is equal.
|
|
||||||
func (_ *Service) sortBlocksAndRoots(blks []interfaces.ReadOnlySignedBeaconBlock, roots [][32]byte) ([]interfaces.ReadOnlySignedBeaconBlock, [][32]byte) {
|
|
||||||
obj := sortedObj{
|
|
||||||
blks: blks,
|
|
||||||
roots: roots,
|
|
||||||
}
|
|
||||||
sort.Sort(obj)
|
|
||||||
return obj.blks, obj.roots
|
|
||||||
}
|
|
||||||
@@ -1,85 +0,0 @@
|
|||||||
package sync
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math/rand"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
|
|
||||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
|
||||||
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
|
||||||
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
|
||||||
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
|
||||||
"github.com/prysmaticlabs/prysm/v4/testing/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestSortedObj_SortBlocksRoots(t *testing.T) {
|
|
||||||
source := rand.NewSource(33)
|
|
||||||
randGen := rand.New(source)
|
|
||||||
var blks []interfaces.ReadOnlySignedBeaconBlock
|
|
||||||
var roots [][32]byte
|
|
||||||
randFunc := func() int64 {
|
|
||||||
return randGen.Int63n(50)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
slot := primitives.Slot(randFunc())
|
|
||||||
newBlk, err := blocks.NewSignedBeaconBlock(ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: slot, Body: ðpb.BeaconBlockBody{}}})
|
|
||||||
require.NoError(t, err)
|
|
||||||
blks = append(blks, newBlk)
|
|
||||||
root := bytesutil.ToBytes32(bytesutil.Bytes32(uint64(slot)))
|
|
||||||
roots = append(roots, root)
|
|
||||||
}
|
|
||||||
|
|
||||||
r := &Service{}
|
|
||||||
|
|
||||||
newBlks, newRoots := r.sortBlocksAndRoots(blks, roots)
|
|
||||||
|
|
||||||
previousSlot := primitives.Slot(0)
|
|
||||||
for i, b := range newBlks {
|
|
||||||
if b.Block().Slot() < previousSlot {
|
|
||||||
t.Errorf("Block list is not sorted as %d is smaller than previousSlot %d", b.Block().Slot(), previousSlot)
|
|
||||||
}
|
|
||||||
if bytesutil.FromBytes8(newRoots[i][:]) != uint64(b.Block().Slot()) {
|
|
||||||
t.Errorf("root doesn't match stored slot in block: wanted %d but got %d", b.Block().Slot(), bytesutil.FromBytes8(newRoots[i][:]))
|
|
||||||
}
|
|
||||||
previousSlot = b.Block().Slot()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestSortedObj_NoDuplicates(t *testing.T) {
|
|
||||||
source := rand.NewSource(33)
|
|
||||||
randGen := rand.New(source)
|
|
||||||
var blks []interfaces.ReadOnlySignedBeaconBlock
|
|
||||||
var roots [][32]byte
|
|
||||||
randFunc := func() int64 {
|
|
||||||
return randGen.Int63n(50)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
slot := primitives.Slot(randFunc())
|
|
||||||
newBlk := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{Slot: slot, Body: ðpb.BeaconBlockBody{}}}
|
|
||||||
// append twice
|
|
||||||
wsb, err := blocks.NewSignedBeaconBlock(newBlk)
|
|
||||||
require.NoError(t, err)
|
|
||||||
wsbCopy, err := wsb.Copy()
|
|
||||||
require.NoError(t, err)
|
|
||||||
blks = append(blks, wsb, wsbCopy)
|
|
||||||
|
|
||||||
// append twice
|
|
||||||
root := bytesutil.ToBytes32(bytesutil.Bytes32(uint64(slot)))
|
|
||||||
roots = append(roots, root, root)
|
|
||||||
}
|
|
||||||
|
|
||||||
r := &Service{}
|
|
||||||
|
|
||||||
newBlks, newRoots, err := r.dedupBlocksAndRoots(blks, roots)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
rootMap := make(map[[32]byte]bool)
|
|
||||||
for i, b := range newBlks {
|
|
||||||
if rootMap[newRoots[i]] {
|
|
||||||
t.Errorf("Duplicated root exists %#x with block %v", newRoots[i], b)
|
|
||||||
}
|
|
||||||
rootMap[newRoots[i]] = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -7,6 +7,7 @@ go_library(
|
|||||||
"factory.go",
|
"factory.go",
|
||||||
"getters.go",
|
"getters.go",
|
||||||
"proto.go",
|
"proto.go",
|
||||||
|
"roblock.go",
|
||||||
"setters.go",
|
"setters.go",
|
||||||
"types.go",
|
"types.go",
|
||||||
],
|
],
|
||||||
@@ -36,6 +37,7 @@ go_test(
|
|||||||
"factory_test.go",
|
"factory_test.go",
|
||||||
"getters_test.go",
|
"getters_test.go",
|
||||||
"proto_test.go",
|
"proto_test.go",
|
||||||
|
"roblock_test.go",
|
||||||
],
|
],
|
||||||
embed = [":go_default_library"],
|
embed = [":go_default_library"],
|
||||||
deps = [
|
deps = [
|
||||||
|
|||||||
75
consensus-types/blocks/roblock.go
Normal file
75
consensus-types/blocks/roblock.go
Normal file
@@ -0,0 +1,75 @@
|
|||||||
|
package blocks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"sort"
|
||||||
|
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ROBlock is a value that embeds a ReadOnlySignedBeaconBlock along with its block root ([32]byte).
|
||||||
|
// This allows the block root to be cached within a value that satisfies the ReadOnlySignedBeaconBlock interface.
|
||||||
|
// Since the root and slot for each ROBlock is known, slices can be efficiently sorted using ROBlockSlice.
|
||||||
|
type ROBlock struct {
|
||||||
|
interfaces.ReadOnlySignedBeaconBlock
|
||||||
|
root [32]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
// Root returns the block hash_tree_root for the embedded ReadOnlySignedBeaconBlock.Block().
|
||||||
|
func (b ROBlock) Root() [32]byte {
|
||||||
|
return b.root
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewROBlockWithRoot creates an ROBlock embedding the given block with its root. It accepts the root as parameter rather than
|
||||||
|
// computing it internally, because in some cases a block is retrieved by its root and recomputing it is a waste.
|
||||||
|
func NewROBlockWithRoot(b interfaces.ReadOnlySignedBeaconBlock, root [32]byte) (ROBlock, error) {
|
||||||
|
if err := BeaconBlockIsNil(b); err != nil {
|
||||||
|
return ROBlock{}, err
|
||||||
|
}
|
||||||
|
return ROBlock{ReadOnlySignedBeaconBlock: b, root: root}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewROBlock creates a ROBlock from a ReadOnlySignedBeaconBlock. It uses the HashTreeRoot method of the given
|
||||||
|
// ReadOnlySignedBeaconBlock.Block to compute the cached root.
|
||||||
|
func NewROBlock(b interfaces.ReadOnlySignedBeaconBlock) (ROBlock, error) {
|
||||||
|
if err := BeaconBlockIsNil(b); err != nil {
|
||||||
|
return ROBlock{}, err
|
||||||
|
}
|
||||||
|
root, err := b.Block().HashTreeRoot()
|
||||||
|
if err != nil {
|
||||||
|
return ROBlock{}, err
|
||||||
|
}
|
||||||
|
return ROBlock{ReadOnlySignedBeaconBlock: b, root: root}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ROBlockSlice implements sort.Interface so that slices of ROBlocks can be easily sorted.
|
||||||
|
// A slice of ROBlock is sorted first by slot, with ties broken by cached block roots.
|
||||||
|
type ROBlockSlice []ROBlock
|
||||||
|
|
||||||
|
var _ sort.Interface = ROBlockSlice{}
|
||||||
|
|
||||||
|
// Less reports whether the element with index i must sort before the element with index j.
|
||||||
|
// ROBlocks are ordered first by their slot,
|
||||||
|
// with a lexicographic sort of roots breaking ties for slots with duplicate blocks.
|
||||||
|
func (s ROBlockSlice) Less(i, j int) bool {
|
||||||
|
si, sj := s[i].Block().Slot(), s[j].Block().Slot()
|
||||||
|
|
||||||
|
// lower slot wins
|
||||||
|
if si != sj {
|
||||||
|
return si < sj
|
||||||
|
}
|
||||||
|
|
||||||
|
// break slot tie lexicographically comparing roots byte for byte
|
||||||
|
ri, rj := s[i].Root(), s[j].Root()
|
||||||
|
return bytes.Compare(ri[:], rj[:]) < 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// Swap swaps the elements with indexes i and j.
|
||||||
|
func (s ROBlockSlice) Swap(i, j int) {
|
||||||
|
s[i], s[j] = s[j], s[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Len is the number of elements in the collection.
|
||||||
|
func (s ROBlockSlice) Len() int {
|
||||||
|
return len(s)
|
||||||
|
}
|
||||||
90
consensus-types/blocks/roblock_test.go
Normal file
90
consensus-types/blocks/roblock_test.go
Normal file
@@ -0,0 +1,90 @@
|
|||||||
|
package blocks
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sort"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
|
||||||
|
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
|
||||||
|
"github.com/prysmaticlabs/prysm/v4/testing/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestROBlockSorting(t *testing.T) {
|
||||||
|
one := bytesutil.ToBytes32(bytesutil.PadTo([]byte{1}, 32))
|
||||||
|
two := bytesutil.ToBytes32(bytesutil.PadTo([]byte{2}, 32))
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
ros []ROBlock
|
||||||
|
sorted []ROBlock
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "1 item",
|
||||||
|
ros: []ROBlock{testROBlock(t, 1, [32]byte{})},
|
||||||
|
sorted: []ROBlock{testROBlock(t, 1, [32]byte{})},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "2 items, sorted",
|
||||||
|
ros: []ROBlock{testROBlock(t, 1, [32]byte{}), testROBlock(t, 2, [32]byte{})},
|
||||||
|
sorted: []ROBlock{testROBlock(t, 1, [32]byte{}), testROBlock(t, 2, [32]byte{})},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "2 items, reversed",
|
||||||
|
ros: []ROBlock{testROBlock(t, 2, [32]byte{}), testROBlock(t, 1, [32]byte{})},
|
||||||
|
sorted: []ROBlock{testROBlock(t, 1, [32]byte{}), testROBlock(t, 2, [32]byte{})},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "3 items, reversed, with tie breaker",
|
||||||
|
ros: []ROBlock{
|
||||||
|
testROBlock(t, 2, two),
|
||||||
|
testROBlock(t, 2, one),
|
||||||
|
testROBlock(t, 1, [32]byte{}),
|
||||||
|
},
|
||||||
|
sorted: []ROBlock{
|
||||||
|
testROBlock(t, 1, [32]byte{}),
|
||||||
|
testROBlock(t, 2, one),
|
||||||
|
testROBlock(t, 2, two),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "5 items, reversed, with double root tie",
|
||||||
|
ros: []ROBlock{
|
||||||
|
testROBlock(t, 0, one),
|
||||||
|
testROBlock(t, 2, two),
|
||||||
|
testROBlock(t, 2, one),
|
||||||
|
testROBlock(t, 2, two),
|
||||||
|
testROBlock(t, 2, one),
|
||||||
|
testROBlock(t, 1, [32]byte{}),
|
||||||
|
},
|
||||||
|
sorted: []ROBlock{
|
||||||
|
testROBlock(t, 0, one),
|
||||||
|
testROBlock(t, 1, [32]byte{}),
|
||||||
|
testROBlock(t, 2, one),
|
||||||
|
testROBlock(t, 2, one),
|
||||||
|
testROBlock(t, 2, two),
|
||||||
|
testROBlock(t, 2, two),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
t.Run(c.name, func(t *testing.T) {
|
||||||
|
sort.Sort(ROBlockSlice(c.ros))
|
||||||
|
for i := 0; i < len(c.sorted); i++ {
|
||||||
|
require.Equal(t, c.sorted[i].Block().Slot(), c.ros[i].Block().Slot())
|
||||||
|
require.Equal(t, c.sorted[i].Root(), c.ros[i].Root())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testROBlock(t *testing.T, slot primitives.Slot, root [32]byte) ROBlock {
|
||||||
|
b, err := NewSignedBeaconBlock(ð.SignedBeaconBlock{Block: ð.BeaconBlock{
|
||||||
|
Body: ð.BeaconBlockBody{},
|
||||||
|
Slot: slot,
|
||||||
|
}})
|
||||||
|
require.NoError(t, err)
|
||||||
|
return ROBlock{
|
||||||
|
ReadOnlySignedBeaconBlock: b,
|
||||||
|
root: root,
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,7 +1,9 @@
|
|||||||
package util
|
package util
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -20,3 +22,41 @@ func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Waiter offers an alternate ux for building tests that want to ensure contexts are used in certain ways.
|
||||||
|
type Waiter struct {
|
||||||
|
c chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewWaiter internally create the chan that Waiter relies on.
|
||||||
|
func NewWaiter() *Waiter {
|
||||||
|
return &Waiter{
|
||||||
|
c: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Done is used with RequireDoneAfter and RequireDoneBefore to make assertions
|
||||||
|
// that certain test code is reached before a timeout or context cancelation.
|
||||||
|
func (w *Waiter) Done() {
|
||||||
|
close(w.c)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RequireDoneAfter forces the test to fail if the timeout is reached before Done is called.
|
||||||
|
func (w *Waiter) RequireDoneAfter(t *testing.T, timeout time.Duration) {
|
||||||
|
select {
|
||||||
|
case <-w.c:
|
||||||
|
return
|
||||||
|
case <-time.After(timeout):
|
||||||
|
t.Fatalf("timeout after %s", timeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RequireDoneBeforeCancel forces the test to fail if the context is cancelled before Done is called.
|
||||||
|
func (w *Waiter) RequireDoneBeforeCancel(ctx context.Context, t *testing.T) {
|
||||||
|
select {
|
||||||
|
case <-w.c:
|
||||||
|
return
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Fatalf("context canceled before Done with error=%s", ctx.Err())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user