Add RPC Changes to Sync For Block Methods (#9497)

* add changes

* radek's review

* kasey's review

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Nishant Das
2021-09-04 00:00:56 +08:00
committed by GitHub
parent e875fe3dfc
commit cc790ceb2e
17 changed files with 325 additions and 86 deletions

View File

@@ -91,6 +91,7 @@ go_library(
"//shared/sszutil:go_default_library",
"//shared/timeutils:go_default_library",
"//shared/traceutil:go_default_library",
"//shared/version:go_default_library",
"@com_github_hashicorp_golang_lru//:go_default_library",
"@com_github_kevinms_leakybucket_go//:go_default_library",
"@com_github_libp2p_go_libp2p_core//:go_default_library",
@@ -123,6 +124,7 @@ go_test(
"rate_limiter_test.go",
"rpc_beacon_blocks_by_range_test.go",
"rpc_beacon_blocks_by_root_test.go",
"rpc_chunked_response_test.go",
"rpc_goodbye_test.go",
"rpc_metadata_test.go",
"rpc_ping_test.go",
@@ -148,6 +150,7 @@ go_test(
embed = [":go_default_library"],
shard_count = 4,
deps = [
"//beacon-chain/blockchain:go_default_library",
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/cache:go_default_library",
"//beacon-chain/core/altair:go_default_library",

View File

@@ -1,15 +1,21 @@
package sync
import (
"errors"
"github.com/libp2p/go-libp2p-core/network"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
)
// Specifies the fixed size context length.
const forkDigestLength = 4
// writes peer's current context for the expected payload to the stream.
func writeContextToStream(stream network.Stream, chain blockchain.ChainInfoFetcher) error {
func writeContextToStream(objCtx []byte, stream network.Stream, chain blockchain.ChainInfoFetcher) error {
// The rpc context for our v2 methods is the fork-digest of
// the relevant payload. We write the associated fork-digest(context)
// into the stream for the payload.
rpcCtx, err := rpcContext(stream, chain)
if err != nil {
return err
@@ -18,6 +24,10 @@ func writeContextToStream(stream network.Stream, chain blockchain.ChainInfoFetch
if len(rpcCtx) == 0 {
return nil
}
// Always choose the object's context when writing to the stream.
if objCtx != nil {
rpcCtx = objCtx
}
_, err = stream.Write(rpcCtx)
return err
}
@@ -32,7 +42,7 @@ func readContextFromStream(stream network.Stream, chain blockchain.ChainInfoFetc
return []byte{}, nil
}
// Read context (fork-digest) from stream
b := make([]byte, 4)
b := make([]byte, forkDigestLength)
if _, err := stream.Read(b); err != nil {
return nil, err
}
@@ -49,6 +59,14 @@ func rpcContext(stream network.Stream, chain blockchain.ChainInfoFetcher) ([]byt
case p2p.SchemaVersionV1:
// Return empty context for a v1 method.
return []byte{}, nil
case p2p.SchemaVersionV2:
currFork := chain.CurrentFork()
genRoot := chain.GenesisValidatorRoot()
digest, err := helpers.ComputeForkDigest(currFork.CurrentVersion, genRoot[:])
if err != nil {
return nil, err
}
return digest[:], nil
default:
return nil, errors.New("invalid version of %s registered for topic: %s")
}

View File

@@ -30,7 +30,7 @@ func TestContextWrite_NoWrites(t *testing.T) {
assert.NoError(t, err)
// Nothing will be written to the stream
assert.NoError(t, writeContextToStream(strm, nil))
assert.NoError(t, writeContextToStream(nil, strm, nil))
if testutil.WaitTimeout(wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}

View File

@@ -281,6 +281,8 @@ func TestBlocksFetcher_RoundRobin(t *testing.T) {
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: 0,
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
}
ctx, cancel := context.WithCancel(context.Background())
@@ -414,7 +416,8 @@ func TestBlocksFetcher_handleRequest(t *testing.T) {
}
mc, p2p, _ := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers)
mc.ValidatorsRoot = [32]byte{}
mc.Genesis = time.Now()
t.Run("context cancellation", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{
@@ -547,7 +550,7 @@ func TestBlocksFetcher_RequestBlocksRateLimitingLocks(t *testing.T) {
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{p2p: p1})
fetcher.rateLimiter = leakybucket.NewCollector(float64(req.Count), int64(req.Count*burstFactor), false)
fetcher.chain = &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
hook := logTest.NewGlobal()
wg := new(sync.WaitGroup)
wg.Add(1)
@@ -610,7 +613,8 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step); i += types.Slot(req.Step) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = i
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
mchain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
assert.NoError(t, beaconsync.WriteBlockChunk(stream, mchain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk)))
}
assert.NoError(t, stream.Close())
}
@@ -631,7 +635,8 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step+1); i += types.Slot(req.Step) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = i
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk)))
}
assert.NoError(t, stream.Close())
}
@@ -652,11 +657,12 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
return func(stream network.Stream) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 163
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk)))
blk = testutil.NewBeaconBlock()
blk.Block.Slot = 162
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk)))
assert.NoError(t, stream.Close())
}
},
@@ -676,11 +682,13 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
return func(stream network.Stream) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 160
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk)))
blk = testutil.NewBeaconBlock()
blk.Block.Slot = 160
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk)))
assert.NoError(t, stream.Close())
}
},
@@ -703,14 +711,15 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
}()
for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step); i += types.Slot(req.Step) {
blk := testutil.NewBeaconBlock()
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
// Patch mid block, with invalid slot number.
if i == req.StartSlot.Add(req.Count*req.Step/2) {
blk.Block.Slot = req.StartSlot - 1
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk)))
break
} else {
blk.Block.Slot = i
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk)))
}
}
}
@@ -734,14 +743,15 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
}()
for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step); i += types.Slot(req.Step) {
blk := testutil.NewBeaconBlock()
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
// Patch mid block, with invalid slot number.
if i == req.StartSlot.Add(req.Count*req.Step/2) {
blk.Block.Slot = req.StartSlot.Add(req.Count * req.Step)
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk)))
break
} else {
blk.Block.Slot = i
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk)))
}
}
}
@@ -762,11 +772,12 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
return func(stream network.Stream) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 100
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk)))
blk = testutil.NewBeaconBlock()
blk.Block.Slot = 105
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk)))
assert.NoError(t, stream.Close())
}
},
@@ -785,11 +796,12 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
return func(stream network.Stream) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 100
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk)))
blk = testutil.NewBeaconBlock()
blk.Block.Slot = 103
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p1.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blk)))
assert.NoError(t, stream.Close())
}
},
@@ -805,7 +817,7 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{p2p: p1})
fetcher := newBlocksFetcher(ctx, &blocksFetcherConfig{p2p: p1, chain: &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}})
fetcher.rateLimiter = leakybucket.NewCollector(0.000001, 640, false)
for _, tt := range tests {

View File

@@ -178,6 +178,8 @@ func TestBlocksFetcher_findFork(t *testing.T) {
Epoch: finalizedEpoch,
Root: []byte(fmt.Sprintf("finalized_root %d", finalizedEpoch)),
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
}
ctx, cancel := context.WithCancel(context.Background())
@@ -321,9 +323,11 @@ func TestBlocksFetcher_findForkWithPeer(t *testing.T) {
st, err := testutil.NewBeaconState()
require.NoError(t, err)
mc := &mock.ChainService{
State: st,
Root: genesisRoot[:],
DB: beaconDB,
State: st,
Root: genesisRoot[:],
DB: beaconDB,
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
}
ctx, cancel := context.WithCancel(context.Background())
@@ -438,6 +442,8 @@ func TestBlocksFetcher_findAncestor(t *testing.T) {
Epoch: finalizedEpoch,
Root: []byte(fmt.Sprintf("finalized_root %d", finalizedEpoch)),
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
}
ctx, cancel := context.WithCancel(context.Background())

View File

@@ -1052,6 +1052,8 @@ func TestBlocksQueue_stuckInUnfavourableFork(t *testing.T) {
Epoch: finalizedEpoch,
Root: []byte(fmt.Sprintf("finalized_root %d", finalizedEpoch)),
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
}
ctx, cancel := context.WithCancel(context.Background())
@@ -1251,6 +1253,8 @@ func TestBlocksQueue_stuckWhenHeadIsSetToOrphanedBlock(t *testing.T) {
Epoch: finalizedEpoch,
Root: []byte(fmt.Sprintf("finalized_root %d", finalizedEpoch)),
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
}
// Populate database with blocks with part of the chain, orphaned block will be added on top.

View File

@@ -96,6 +96,8 @@ func initializeTestServices(t *testing.T, slots []types.Slot, peers []*peerData)
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: 0,
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
}, p, beaconDB
}
@@ -216,8 +218,9 @@ func connectPeer(t *testing.T, host *p2pt.TestP2P, datum *peerData, peerStatus *
ret = ret[:req.Count]
}
mChain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
for i := 0; i < len(ret); i++ {
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p.Encoding(), ret[i]))
assert.NoError(t, beaconsync.WriteBlockChunk(stream, mChain, p.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(ret[i])))
}
})
@@ -286,7 +289,8 @@ func connectPeerHavingBlocks(
if uint64(i) >= uint64(len(blocks)) {
break
}
require.NoError(t, beaconsync.WriteChunk(stream, nil, p.Encoding(), blocks[i]))
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
require.NoError(t, beaconsync.WriteBlockChunk(stream, chain, p.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(blocks[i])))
}
})

View File

@@ -295,6 +295,8 @@ func TestService_roundRobinSync(t *testing.T) {
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: 0,
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
} // no-op mock
s := &Service{
ctx: context.Background(),
@@ -338,6 +340,8 @@ func TestService_processBlock(t *testing.T) {
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: 0,
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
},
StateNotifier: &mock.MockStateNotifier{},
})
@@ -527,6 +531,8 @@ func TestService_blockProviderScoring(t *testing.T) {
Epoch: 0,
Root: make([]byte, 32),
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
} // no-op mock
s := &Service{
ctx: context.Background(),
@@ -590,6 +596,8 @@ func TestService_syncToFinalizedEpoch(t *testing.T) {
Epoch: 0,
Root: make([]byte, 32),
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
}
s := &Service{
ctx: context.Background(),

View File

@@ -58,6 +58,8 @@ func TestService_InitStartStop(t *testing.T) {
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: 0,
},
Genesis: time.Unix(4113849600, 0),
ValidatorsRoot: [32]byte{},
}
},
methodRuns: func(fd *event.Feed) {
@@ -86,6 +88,8 @@ func TestService_InitStartStop(t *testing.T) {
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: 0,
},
Genesis: time.Now().Add(-5 * time.Minute),
ValidatorsRoot: [32]byte{},
}
},
methodRuns: func(fd *event.Feed) {
@@ -117,6 +121,8 @@ func TestService_InitStartStop(t *testing.T) {
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: helpers.SlotToEpoch(futureSlot),
},
Genesis: makeGenesisTime(futureSlot),
ValidatorsRoot: [32]byte{},
}
},
methodRuns: func(fd *event.Feed) {
@@ -150,7 +156,7 @@ func TestService_InitStartStop(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mc := &mock.ChainService{}
mc := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
// Allow overriding with customized chain service.
if tt.chainService != nil {
mc = tt.chainService()
@@ -211,7 +217,7 @@ func TestService_waitForStateInitialization(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := newService(ctx, &mock.ChainService{})
s := newService(ctx, &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}})
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
@@ -238,7 +244,7 @@ func TestService_waitForStateInitialization(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := newService(ctx, &mock.ChainService{})
s := newService(ctx, &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}})
expectedGenesisTime := time.Unix(358544700, 0)
var receivedGenesisTime time.Time
@@ -282,7 +288,7 @@ func TestService_waitForStateInitialization(t *testing.T) {
defer hook.Reset()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := newService(ctx, &mock.ChainService{})
s := newService(ctx, &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}})
// Initialize mock feed
_ = s.cfg.StateNotifier.StateFeed()
@@ -320,7 +326,7 @@ func TestService_waitForStateInitialization(t *testing.T) {
}
func TestService_markSynced(t *testing.T) {
mc := &mock.ChainService{}
mc := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
s := NewService(ctx, &Config{
@@ -403,6 +409,8 @@ func TestService_Resync(t *testing.T) {
FinalizedCheckPoint: &eth.Checkpoint{
Epoch: helpers.SlotToEpoch(futureSlot),
},
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
}
},
assert: func(s *Service) {

View File

@@ -162,7 +162,7 @@ func (s *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlo
if b == nil || b.IsNil() || b.Block().IsNil() {
continue
}
if chunkErr := s.chunkWriter(stream, b.Proto()); chunkErr != nil {
if chunkErr := s.chunkBlockWriter(stream, b); chunkErr != nil {
log.WithError(chunkErr).Debug("Could not send a chunked response")
s.writeErrorResponseToStream(responseCodeServerError, p2ptypes.ErrGeneric.Error(), stream)
traceutil.AnnotateError(span, chunkErr)

View File

@@ -22,7 +22,6 @@ import (
"github.com/prysmaticlabs/prysm/cmd/beacon-chain/flags"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
statepb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
@@ -635,7 +634,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
require.NoError(t, d.SaveBlock(context.Background(), wrapper.WrappedPhase0SignedBeaconBlock(blocks[j])))
j++
}
stateSummaries := make([]*statepb.StateSummary, len(blocks))
stateSummaries := make([]*ethpb.StateSummary, len(blocks))
if finalized {
if chain.CanonicalRoots == nil {
@@ -644,7 +643,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
for i, b := range blocks {
bRoot, err := b.Block.HashTreeRoot()
require.NoError(t, err)
stateSummaries[i] = &statepb.StateSummary{
stateSummaries[i] = &ethpb.StateSummary{
Slot: b.Block.Slot,
Root: bRoot[:],
}
@@ -685,7 +684,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
require.NoError(t, d.SaveBlock(context.Background(), wrapper.WrappedPhase0SignedBeaconBlock(blocks[j])))
j++
}
stateSummaries := make([]*statepb.StateSummary, len(blocks))
stateSummaries := make([]*ethpb.StateSummary, len(blocks))
if finalized {
if chain.CanonicalRoots == nil {
chain.CanonicalRoots = map[[32]byte]bool{}
@@ -693,7 +692,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
for i, b := range blocks {
bRoot, err := b.Block.HashTreeRoot()
require.NoError(t, err)
stateSummaries[i] = &statepb.StateSummary{
stateSummaries[i] = &ethpb.StateSummary{
Slot: b.Block.Slot,
Root: bRoot[:],
}

View File

@@ -71,7 +71,7 @@ func (s *Service) beaconBlocksRootRPCHandler(ctx context.Context, msg interface{
if blk == nil || blk.IsNil() {
continue
}
if err := s.chunkWriter(stream, blk.Proto()); err != nil {
if err := s.chunkBlockWriter(stream, blk); err != nil {
return err
}
}

View File

@@ -44,6 +44,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) {
}
r := &Service{cfg: &Config{P2P: p1, DB: d}, rateLimiter: newRateLimiter(p1)}
r.cfg.Chain = &mock.ChainService{ValidatorsRoot: [32]byte{}}
pcl := protocol.ID(p2p.RPCBlocksByRootTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false)
@@ -104,6 +105,8 @@ func TestRecentBeaconBlocks_RPCRequestSent(t *testing.T) {
State: genesisState,
FinalizedCheckPoint: finalizedCheckpt,
Root: blockARoot[:],
Genesis: time.Now(),
ValidatorsRoot: [32]byte{},
},
},
slotToPendingBlocks: gcache.New(time.Second, 2*time.Second),

View File

@@ -1,35 +1,56 @@
package sync
import (
"errors"
libp2pcore "github.com/libp2p/go-libp2p-core"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
eth "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/p2putils"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/version"
)
// chunkWriter writes the given message as a chunked response to the given network
// chunkBlockWriter writes the given message as a chunked response to the given network
// stream.
// response_chunk ::= <result> | <context-bytes> | <encoding-dependent-header> | <encoded-payload>
func (s *Service) chunkWriter(stream libp2pcore.Stream, msg interface{}) error {
func (s *Service) chunkBlockWriter(stream libp2pcore.Stream, blk block.SignedBeaconBlock) error {
SetStreamWriteDeadline(stream, defaultWriteDuration)
return WriteChunk(stream, s.cfg.Chain, s.cfg.P2P.Encoding(), msg)
return WriteBlockChunk(stream, s.cfg.Chain, s.cfg.P2P.Encoding(), blk)
}
// WriteChunk object to stream.
// response_chunk ::= <result> | <context-bytes> | <encoding-dependent-header> | <encoded-payload>
func WriteChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, encoding encoder.NetworkEncoding, msg interface{}) error {
func WriteBlockChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, encoding encoder.NetworkEncoding, blk block.SignedBeaconBlock) error {
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
return err
}
if err := writeContextToStream(stream, chain); err != nil {
obtainedCtx := []byte{}
switch blk.Version() {
case version.Phase0:
valRoot := chain.GenesisValidatorRoot()
digest, err := p2putils.ForkDigestFromEpoch(params.BeaconConfig().GenesisEpoch, valRoot[:])
if err != nil {
return err
}
obtainedCtx = digest[:]
case version.Altair:
valRoot := chain.GenesisValidatorRoot()
digest, err := p2putils.ForkDigestFromEpoch(params.BeaconConfig().AltairForkEpoch, valRoot[:])
if err != nil {
return err
}
obtainedCtx = digest[:]
}
if err := writeContextToStream(obtainedCtx, stream, chain); err != nil {
return err
}
_, err := encoding.EncodeWithMaxLength(stream, msg)
_, err := encoding.EncodeWithMaxLength(stream, blk)
return err
}
@@ -40,17 +61,13 @@ func ReadChunkedBlock(stream libp2pcore.Stream, chain blockchain.ChainInfoFetche
if isFirstChunk {
return readFirstChunkedBlock(stream, chain, p2p)
}
blk := &eth.SignedBeaconBlock{}
if err := readResponseChunk(stream, chain, p2p, blk); err != nil {
return nil, err
}
return wrapper.WrappedPhase0SignedBeaconBlock(blk), nil
return readResponseChunk(stream, chain, p2p)
}
// readFirstChunkedBlock reads the first chunked block and applies the appropriate deadlines to
// it.
func readFirstChunkedBlock(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P) (block.SignedBeaconBlock, error) {
blk := &eth.SignedBeaconBlock{}
code, errMsg, err := ReadStatusCode(stream, p2p.Encoding())
if err != nil {
return nil, err
@@ -58,30 +75,62 @@ func readFirstChunkedBlock(stream libp2pcore.Stream, chain blockchain.ChainInfoF
if code != 0 {
return nil, errors.New(errMsg)
}
// No-op for now with the rpc context.
_, err = readContextFromStream(stream, chain)
rpcCtx, err := readContextFromStream(stream, chain)
if err != nil {
return nil, err
}
blk, err := extractBlockDataType(rpcCtx, chain)
if err != nil {
return nil, err
}
err = p2p.Encoding().DecodeWithMaxLength(stream, blk)
return wrapper.WrappedPhase0SignedBeaconBlock(blk), err
return blk, err
}
// readResponseChunk reads the response from the stream and decodes it into the
// provided message type.
func readResponseChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P, to interface{}) error {
func readResponseChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P) (block.SignedBeaconBlock, error) {
SetStreamReadDeadline(stream, respTimeout)
code, errMsg, err := readStatusCodeNoDeadline(stream, p2p.Encoding())
if err != nil {
return err
return nil, err
}
if code != 0 {
return errors.New(errMsg)
return nil, errors.New(errMsg)
}
// No-op for now with the rpc context.
_, err = readContextFromStream(stream, chain)
rpcCtx, err := readContextFromStream(stream, chain)
if err != nil {
return err
return nil, err
}
return p2p.Encoding().DecodeWithMaxLength(stream, to)
blk, err := extractBlockDataType(rpcCtx, chain)
if err != nil {
return nil, err
}
err = p2p.Encoding().DecodeWithMaxLength(stream, blk)
return blk, err
}
func extractBlockDataType(digest []byte, chain blockchain.ChainInfoFetcher) (block.SignedBeaconBlock, error) {
if len(digest) == 0 {
bFunc, ok := types.BlockMap[bytesutil.ToBytes4(params.BeaconConfig().GenesisForkVersion)]
if !ok {
return nil, errors.New("no block type exists for the genesis fork version.")
}
return bFunc()
}
if len(digest) != forkDigestLength {
return nil, errors.Errorf("invalid digest returned, wanted a length of %d but received %d", forkDigestLength, len(digest))
}
vRoot := chain.GenesisValidatorRoot()
for k, blkFunc := range types.BlockMap {
rDigest, err := helpers.ComputeForkDigest(k[:], vRoot[:])
if err != nil {
return nil, err
}
if rDigest == bytesutil.ToBytes4(digest) {
return blkFunc()
}
}
return nil, errors.New("no valid digest matched")
}

View File

@@ -0,0 +1,96 @@
package sync
import (
"reflect"
"testing"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
ethpb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/block"
"github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1/wrapper"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestExtractBlockDataType(t *testing.T) {
// Precompute digests
genDigest, err := helpers.ComputeForkDigest(params.BeaconConfig().GenesisForkVersion, params.BeaconConfig().ZeroHash[:])
require.NoError(t, err)
altairDigest, err := helpers.ComputeForkDigest(params.BeaconConfig().AltairForkVersion, params.BeaconConfig().ZeroHash[:])
require.NoError(t, err)
type args struct {
digest []byte
chain blockchain.ChainInfoFetcher
}
tests := []struct {
name string
args args
want block.SignedBeaconBlock
wantErr bool
}{
{
name: "no digest",
args: args{
digest: []byte{},
chain: &mock.ChainService{ValidatorsRoot: [32]byte{}},
},
want: wrapper.WrappedPhase0SignedBeaconBlock(&ethpb.SignedBeaconBlock{}),
wantErr: false,
},
{
name: "invalid digest",
args: args{
digest: []byte{0x00, 0x01},
chain: &mock.ChainService{ValidatorsRoot: [32]byte{}},
},
want: nil,
wantErr: true,
},
{
name: "non existent digest",
args: args{
digest: []byte{0x00, 0x01, 0x02, 0x03},
chain: &mock.ChainService{ValidatorsRoot: [32]byte{}},
},
want: nil,
wantErr: true,
},
{
name: "genesis fork version",
args: args{
digest: genDigest[:],
chain: &mock.ChainService{ValidatorsRoot: [32]byte{}},
},
want: wrapper.WrappedPhase0SignedBeaconBlock(&ethpb.SignedBeaconBlock{}),
wantErr: false,
},
{
name: "altair fork version",
args: args{
digest: altairDigest[:],
chain: &mock.ChainService{ValidatorsRoot: [32]byte{}},
},
want: func() block.SignedBeaconBlock {
wsb, err := wrapper.WrappedAltairSignedBeaconBlock(&ethpb.SignedBeaconBlockAltair{Block: &ethpb.BeaconBlockAltair{}})
require.NoError(t, err)
return wsb
}(),
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := extractBlockDataType(tt.args.digest, tt.args.chain)
if (err != nil) != tt.wantErr {
t.Errorf("extractBlockDataType() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("extractBlockDataType() got = %v, want %v", got, tt.want)
}
})
}
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
pb "github.com/prysmaticlabs/prysm/proto/prysm/v1alpha1"
@@ -27,7 +28,11 @@ func SendBeaconBlocksByRangeRequest(
ctx context.Context, chain blockchain.ChainInfoFetcher, p2pProvider p2p.P2P, pid peer.ID,
req *pb.BeaconBlocksByRangeRequest, blockProcessor BeaconBlockProcessor,
) ([]block.SignedBeaconBlock, error) {
stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRangeTopicV1, pid)
topic, err := p2p.TopicFromMessage(p2p.BeaconBlocksByRangeMessageName, helpers.SlotToEpoch(chain.CurrentSlot()))
if err != nil {
return nil, err
}
stream, err := p2pProvider.Send(ctx, req, topic, pid)
if err != nil {
return nil, err
}
@@ -85,7 +90,11 @@ func SendBeaconBlocksByRootRequest(
ctx context.Context, chain blockchain.ChainInfoFetcher, p2pProvider p2p.P2P, pid peer.ID,
req *p2ptypes.BeaconBlockByRootsReq, blockProcessor BeaconBlockProcessor,
) ([]block.SignedBeaconBlock, error) {
stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRootTopicV1, pid)
topic, err := p2p.TopicFromMessage(p2p.BeaconBlocksByRootsMessageName, helpers.SlotToEpoch(chain.CurrentSlot()))
if err != nil {
return nil, err
}
stream, err := p2pProvider.Send(ctx, req, topic, pid)
if err != nil {
return nil, err
}

View File

@@ -6,10 +6,12 @@ import (
"fmt"
"io"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/mux"
"github.com/libp2p/go-libp2p-core/network"
types "github.com/prysmaticlabs/eth2-types"
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
p2pTypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
@@ -35,7 +37,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
p1.Connect(bogusPeer)
req := &pb.BeaconBlocksByRangeRequest{}
_, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, bogusPeer.PeerID(), req, nil)
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
_, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, bogusPeer.PeerID(), req, nil)
assert.ErrorContains(t, "protocol not supported", err)
})
@@ -80,7 +83,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
if uint64(i) >= uint64(len(knownBlocks)) {
break
}
err = WriteChunk(stream, nil, p2pProvider.Encoding(), knownBlocks[i])
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
err = WriteBlockChunk(stream, chain, p2pProvider.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(knownBlocks[i]))
if err != nil && err.Error() != mux.ErrReset.Error() {
require.NoError(t, err)
}
@@ -99,7 +103,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
Count: 128,
Step: 1,
}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil)
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, nil)
assert.NoError(t, err)
assert.Equal(t, 128, len(blocks))
})
@@ -117,7 +122,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
Step: 1,
}
blocksFromProcessor := make([]block.SignedBeaconBlock, 0)
blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error {
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error {
blocksFromProcessor = append(blocksFromProcessor, block)
return nil
})
@@ -139,7 +145,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
Step: 1,
}
errFromProcessor := errors.New("processor error")
_, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error {
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
_, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error {
return errFromProcessor
})
assert.ErrorContains(t, errFromProcessor.Error(), err)
@@ -157,7 +164,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
Count: 128,
Step: 1,
}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil)
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, nil)
assert.NoError(t, err)
assert.Equal(t, 128, len(blocks))
@@ -168,7 +176,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
cfg.MaxRequestBlocks = maxRequestBlocks
params.OverrideBeaconNetworkConfig(cfg)
}()
blocks, err = SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error {
blocks, err = SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error {
// Since ssz checks the boundaries, and doesn't normally allow to send requests bigger than
// the max request size, we are updating max request size dynamically. Even when updated dynamically,
// no more than max request size of blocks is expected on return.
@@ -199,7 +207,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
Count: 128,
Step: 1,
}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil)
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, nil)
assert.ErrorContains(t, expectedErr.Error(), err)
assert.Equal(t, 0, len(blocks))
})
@@ -227,7 +236,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
if uint64(i) >= uint64(len(knownBlocks)) {
break
}
err = WriteChunk(stream, nil, p2.Encoding(), knownBlocks[i])
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
err = WriteBlockChunk(stream, chain, p2.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(knownBlocks[i]))
if err != nil && err.Error() != mux.ErrReset.Error() {
require.NoError(t, err)
}
@@ -239,7 +249,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
Count: 128,
Step: 1,
}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil)
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, nil)
assert.ErrorContains(t, ErrInvalidFetchedData.Error(), err)
assert.Equal(t, 0, len(blocks))
@@ -268,7 +279,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
if uint64(i) >= uint64(len(knownBlocks)) {
break
}
err = WriteChunk(stream, nil, p2.Encoding(), knownBlocks[i])
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
err = WriteBlockChunk(stream, chain, p2.Encoding(), wrapper.WrappedPhase0SignedBeaconBlock(knownBlocks[i]))
if err != nil && err.Error() != mux.ErrReset.Error() {
require.NoError(t, err)
}
@@ -280,7 +292,8 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
Count: 128,
Step: 10,
}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil)
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, chain, p1, p2.PeerID(), req, nil)
assert.ErrorContains(t, ErrInvalidFetchedData.Error(), err)
assert.Equal(t, 0, len(blocks))
@@ -309,7 +322,8 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
p1.Connect(bogusPeer)
req := &p2pTypes.BeaconBlockByRootsReq{}
_, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, bogusPeer.PeerID(), req, nil)
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
_, err := SendBeaconBlocksByRootRequest(ctx, chain, p1, bogusPeer.PeerID(), req, nil)
assert.ErrorContains(t, "protocol not supported", err)
})
@@ -356,7 +370,8 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
p2.SetStreamHandler(pcl, knownBlocksProvider(p2, nil))
req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1]}
blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil)
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
blocks, err := SendBeaconBlocksByRootRequest(ctx, chain, p1, p2.PeerID(), req, nil)
assert.NoError(t, err)
assert.Equal(t, 2, len(blocks))
})
@@ -370,7 +385,8 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
// No error from block processor.
req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1]}
blocksFromProcessor := make([]block.SignedBeaconBlock, 0)
blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error {
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
blocks, err := SendBeaconBlocksByRootRequest(ctx, chain, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error {
blocksFromProcessor = append(blocksFromProcessor, block)
return nil
})
@@ -388,7 +404,8 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
// Send error from block processor.
req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1]}
errFromProcessor := errors.New("processor error")
_, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error {
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
_, err := SendBeaconBlocksByRootRequest(ctx, chain, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error {
return errFromProcessor
})
assert.ErrorContains(t, errFromProcessor.Error(), err)
@@ -402,7 +419,8 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
// No cap on max roots.
req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1], knownRoots[2], knownRoots[3]}
blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil)
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
blocks, err := SendBeaconBlocksByRootRequest(ctx, chain, p1, p2.PeerID(), req, nil)
assert.NoError(t, err)
assert.Equal(t, 4, len(blocks))
@@ -413,7 +431,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
cfg.MaxRequestBlocks = maxRequestBlocks
params.OverrideBeaconNetworkConfig(cfg)
}()
blocks, err = SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error {
blocks, err = SendBeaconBlocksByRootRequest(ctx, chain, p1, p2.PeerID(), req, func(block block.SignedBeaconBlock) error {
// Since ssz checks the boundaries, and doesn't normally allow to send requests bigger than
// the max request size, we are updating max request size dynamically. Even when updated dynamically,
// no more than max request size of blocks is expected on return.
@@ -440,7 +458,8 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
}))
req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1], knownRoots[2], knownRoots[3]}
blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil)
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
blocks, err := SendBeaconBlocksByRootRequest(ctx, chain, p1, p2.PeerID(), req, nil)
assert.ErrorContains(t, expectedErr.Error(), err)
assert.Equal(t, 0, len(blocks))
})
@@ -460,7 +479,8 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
}))
req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1], knownRoots[2], knownRoots[3]}
blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil)
chain := &mock.ChainService{Genesis: time.Now(), ValidatorsRoot: [32]byte{}}
blocks, err := SendBeaconBlocksByRootRequest(ctx, chain, p1, p2.PeerID(), req, nil)
assert.NoError(t, err)
assert.Equal(t, 3, len(blocks))
})