Add in Context For Req/Resp (#8895)

* Add in context

* use better function signatures

* add in chainservice

* fix rpc requests

* add in test

* fix tests

* add in comments

* Update beacon-chain/sync/rpc_chunked_response.go

Co-authored-by: terence tsao <terence@prysmaticlabs.com>

* Update beacon-chain/sync/rpc_send_request.go

Co-authored-by: terence tsao <terence@prysmaticlabs.com>

* rename files

* fmt

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: terence tsao <terence@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Nishant Das
2021-05-18 03:25:59 +08:00
committed by GitHub
parent 519e1e4f4a
commit 35dc8c31fa
16 changed files with 226 additions and 69 deletions

View File

@@ -4,6 +4,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")
go_library(
name = "go_default_library",
srcs = [
"context.go",
"deadlines.go",
"decode_pubsub.go",
"doc.go",
@@ -106,6 +107,7 @@ go_test(
name = "go_default_test",
size = "small",
srcs = [
"context_test.go",
"decode_pubsub_test.go",
"error_test.go",
"pending_attestations_queue_test.go",

View File

@@ -0,0 +1,55 @@
package sync
import (
"errors"
"github.com/libp2p/go-libp2p-core/network"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
)
// writes peer's current context for the expected payload to the stream.
func writeContextToStream(stream network.Stream, chain blockchain.ChainInfoFetcher) error {
rpcCtx, err := rpcContext(stream, chain)
if err != nil {
return err
}
// Exit early if there is an empty context.
if len(rpcCtx) == 0 {
return nil
}
_, err = stream.Write(rpcCtx)
return err
}
// reads any attached context-bytes to the payload.
func readContextFromStream(stream network.Stream, chain blockchain.ChainInfoFetcher) ([]byte, error) {
rpcCtx, err := rpcContext(stream, chain)
if err != nil {
return nil, err
}
if len(rpcCtx) == 0 {
return []byte{}, nil
}
// Read context (fork-digest) from stream
b := make([]byte, 4)
if _, err := stream.Read(b); err != nil {
return nil, err
}
return b, nil
}
// retrieve expected context depending on rpc topic schema version.
func rpcContext(stream network.Stream, chain blockchain.ChainInfoFetcher) ([]byte, error) {
_, _, version, err := p2p.TopicDeconstructor(string(stream.Protocol()))
if err != nil {
return nil, err
}
switch version {
case p2p.SchemaVersionV1:
// Return empty context for a v1 method.
return []byte{}, nil
default:
return nil, errors.New("invalid version of %s registered for topic: %s")
}
}

View File

@@ -0,0 +1,72 @@
package sync
import (
"context"
"sync"
"testing"
"time"
core "github.com/libp2p/go-libp2p-core"
"github.com/libp2p/go-libp2p-core/network"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
)
func TestContextWrite_NoWrites(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
nPeer := p2ptest.NewTestP2P(t)
p1.Connect(nPeer)
wg := new(sync.WaitGroup)
prID := p2p.RPCPingTopicV1
wg.Add(1)
nPeer.BHost.SetStreamHandler(core.ProtocolID(prID), func(stream network.Stream) {
wg.Done()
// no-op
})
strm, err := p1.BHost.NewStream(context.Background(), nPeer.PeerID(), p2p.RPCPingTopicV1)
assert.NoError(t, err)
// Nothing will be written to the stream
assert.NoError(t, writeContextToStream(strm, nil))
if testutil.WaitTimeout(wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
}
func TestContextRead_NoReads(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
nPeer := p2ptest.NewTestP2P(t)
p1.Connect(nPeer)
wg := new(sync.WaitGroup)
prID := p2p.RPCPingTopicV1
wg.Add(1)
wantedData := []byte{'A', 'B', 'C', 'D'}
nPeer.BHost.SetStreamHandler(core.ProtocolID(prID), func(stream network.Stream) {
// No Context will be read from it
dt, err := readContextFromStream(stream, nil)
assert.NoError(t, err)
assert.Equal(t, 0, len(dt))
// Ensure sent over data hasn't been modified.
buf := make([]byte, len(wantedData))
n, err := stream.Read(buf)
assert.NoError(t, err)
assert.Equal(t, len(wantedData), n)
assert.DeepEqual(t, wantedData, buf)
wg.Done()
})
strm, err := p1.BHost.NewStream(context.Background(), nPeer.PeerID(), p2p.RPCPingTopicV1)
assert.NoError(t, err)
n, err := strm.Write(wantedData)
assert.NoError(t, err)
assert.Equal(t, len(wantedData), n)
if testutil.WaitTimeout(wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
}

View File

@@ -323,8 +323,7 @@ func (f *blocksFetcher) requestBlocks(
}
f.rateLimiter.Add(pid.String(), int64(req.Count))
l.Unlock()
return prysmsync.SendBeaconBlocksByRangeRequest(ctx, f.p2p, pid, req, nil)
return prysmsync.SendBeaconBlocksByRangeRequest(ctx, f.chain, f.p2p, pid, req, nil)
}
// requestBlocksByRoot is a wrapper for handling BeaconBlockByRootsReq requests/streams.
@@ -352,7 +351,7 @@ func (f *blocksFetcher) requestBlocksByRoot(
f.rateLimiter.Add(pid.String(), int64(len(*req)))
l.Unlock()
return prysmsync.SendBeaconBlocksByRootRequest(ctx, f.p2p, pid, req, nil)
return prysmsync.SendBeaconBlocksByRootRequest(ctx, f.chain, f.p2p, pid, req, nil)
}
// waitForBandwidth blocks up until peer's bandwidth is restored.

View File

@@ -608,7 +608,7 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step); i += types.Slot(req.Step) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = i
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
}
assert.NoError(t, stream.Close())
}
@@ -629,7 +629,7 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
for i := req.StartSlot; i < req.StartSlot.Add(req.Count*req.Step+1); i += types.Slot(req.Step) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = i
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
}
assert.NoError(t, stream.Close())
}
@@ -650,11 +650,11 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
return func(stream network.Stream) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 163
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
blk = testutil.NewBeaconBlock()
blk.Block.Slot = 162
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
assert.NoError(t, stream.Close())
}
},
@@ -674,11 +674,11 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
return func(stream network.Stream) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 160
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
blk = testutil.NewBeaconBlock()
blk.Block.Slot = 160
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
assert.NoError(t, stream.Close())
}
},
@@ -704,11 +704,11 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
// Patch mid block, with invalid slot number.
if i == req.StartSlot.Add(req.Count*req.Step/2) {
blk.Block.Slot = req.StartSlot - 1
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
break
} else {
blk.Block.Slot = i
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
}
}
}
@@ -735,11 +735,11 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
// Patch mid block, with invalid slot number.
if i == req.StartSlot.Add(req.Count*req.Step/2) {
blk.Block.Slot = req.StartSlot.Add(req.Count * req.Step)
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
break
} else {
blk.Block.Slot = i
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
}
}
}
@@ -760,11 +760,11 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
return func(stream network.Stream) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 100
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
blk = testutil.NewBeaconBlock()
blk.Block.Slot = 105
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
assert.NoError(t, stream.Close())
}
},
@@ -783,11 +783,11 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
return func(stream network.Stream) {
blk := testutil.NewBeaconBlock()
blk.Block.Slot = 100
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
blk = testutil.NewBeaconBlock()
blk.Block.Slot = 103
assert.NoError(t, beaconsync.WriteChunk(stream, p1.Encoding(), blk))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p1.Encoding(), blk))
assert.NoError(t, stream.Close())
}
},

View File

@@ -216,7 +216,7 @@ func connectPeer(t *testing.T, host *p2pt.TestP2P, datum *peerData, peerStatus *
}
for i := 0; i < len(ret); i++ {
assert.NoError(t, beaconsync.WriteChunk(stream, p.Encoding(), ret[i]))
assert.NoError(t, beaconsync.WriteChunk(stream, nil, p.Encoding(), ret[i]))
}
})
@@ -285,7 +285,7 @@ func connectPeerHavingBlocks(
if uint64(i) >= uint64(len(blocks)) {
break
}
require.NoError(t, beaconsync.WriteChunk(stream, p.Encoding(), blocks[i]))
require.NoError(t, beaconsync.WriteChunk(stream, nil, p.Encoding(), blocks[i]))
}
})

View File

@@ -29,8 +29,7 @@ var _ shared.Service = (*Service)(nil)
// blockchainService defines the interface for interaction with block chain service.
type blockchainService interface {
blockchain.BlockReceiver
blockchain.HeadFetcher
blockchain.FinalizationFetcher
blockchain.ChainInfoFetcher
}
// Config to set up the initial sync service.

View File

@@ -16,6 +16,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
db2 "github.com/prysmaticlabs/prysm/beacon-chain/db"
db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
@@ -51,7 +52,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsBlocks(t *testing.T) {
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
r := &Service{cfg: &Config{P2P: p1, DB: d, Chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID("/testing")
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false)
var wg sync.WaitGroup
@@ -113,7 +114,7 @@ func TestRPCBeaconBlocksByRange_ReturnCorrectNumberBack(t *testing.T) {
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
r := &Service{cfg: &Config{P2P: p1, DB: d, Chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID("/testing")
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false)
var wg sync.WaitGroup
@@ -177,7 +178,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsSortedBlocks(t *testing.T) {
// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
r := &Service{cfg: &Config{P2P: p1, DB: d, Chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID("/testing")
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false)
@@ -242,7 +243,7 @@ func TestRPCBeaconBlocksByRange_ReturnsGenesisBlock(t *testing.T) {
}
r := &Service{cfg: &Config{P2P: p1, DB: d, Chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID("/testing")
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false)
@@ -292,7 +293,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
req *pb.BeaconBlocksByRangeRequest, validateBlocks bool, success bool) error {
var wg sync.WaitGroup
wg.Add(1)
pcl := protocol.ID("/testing")
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
if !validateBlocks {
@@ -330,7 +331,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
capacity := int64(flags.Get().BlockBatchLimit * 3)
r := &Service{cfg: &Config{P2P: p1, DB: d, Chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID("/testing")
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, false)
req := &pb.BeaconBlocksByRangeRequest{
@@ -356,7 +357,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
capacity := int64(flags.Get().BlockBatchLimit * 3)
r := &Service{cfg: &Config{P2P: p1, DB: d, Chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID("/testing")
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, false)
@@ -385,7 +386,7 @@ func TestRPCBeaconBlocksByRange_RPCHandlerRateLimitOverflow(t *testing.T) {
capacity := int64(flags.Get().BlockBatchLimit * flags.Get().BlockBatchLimitBurstFactor)
r := &Service{cfg: &Config{P2P: p1, DB: d, Chain: &chainMock.ChainService{}}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID("/testing")
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, capacity, false)
@@ -547,7 +548,7 @@ func TestRPCBeaconBlocksByRange_EnforceResponseInvariants(t *testing.T) {
parentRoot = rt
}
}
pcl := protocol.ID("/testing")
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
sendRequest := func(p1, p2 *p2ptest.TestP2P, r *Service,
req *pb.BeaconBlocksByRangeRequest, processBlocks func([]*ethpb.SignedBeaconBlock)) error {
var wg sync.WaitGroup
@@ -703,7 +704,7 @@ func TestRPCBeaconBlocksByRange_FilterBlocks(t *testing.T) {
}))
}
}
pcl := protocol.ID("/testing")
pcl := protocol.ID(p2p.RPCBlocksByRangeTopicV1)
sendRequest := func(p1, p2 *p2ptest.TestP2P, r *Service,
req *pb.BeaconBlocksByRangeRequest, processBlocks func([]*ethpb.SignedBeaconBlock)) error {
var wg sync.WaitGroup

View File

@@ -17,7 +17,7 @@ func (s *Service) sendRecentBeaconBlocksRequest(ctx context.Context, blockRoots
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
_, err := SendBeaconBlocksByRootRequest(ctx, s.cfg.P2P, id, blockRoots, func(blk *ethpb.SignedBeaconBlock) error {
_, err := SendBeaconBlocksByRootRequest(ctx, s.cfg.Chain, s.cfg.P2P, id, blockRoots, func(blk *ethpb.SignedBeaconBlock) error {
blkRoot, err := blk.Block.HashTreeRoot()
if err != nil {
return err

View File

@@ -15,6 +15,7 @@ import (
mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/core/state"
db "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
p2pTypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
"github.com/prysmaticlabs/prysm/shared/params"
@@ -42,7 +43,7 @@ func TestRecentBeaconBlocksRPCHandler_ReturnsBlocks(t *testing.T) {
}
r := &Service{cfg: &Config{P2P: p1, DB: d}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID("/testing")
pcl := protocol.ID(p2p.RPCBlocksByRootTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(10000, 10000, false)
@@ -148,7 +149,7 @@ func TestRecentBeaconBlocksRPCHandler_HandleZeroBlocks(t *testing.T) {
d := db.SetupDB(t)
r := &Service{cfg: &Config{P2P: p1, DB: d}, rateLimiter: newRateLimiter(p1)}
pcl := protocol.ID("/testing")
pcl := protocol.ID(p2p.RPCBlocksByRootTopicV1)
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)

View File

@@ -5,37 +5,41 @@ import (
libp2pcore "github.com/libp2p/go-libp2p-core"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/encoder"
)
// chunkWriter writes the given message as a chunked response to the given network
// stream.
// response_chunk ::= | <result> | <encoding-dependent-header> | <encoded-payload>
// response_chunk ::= <result> | <context-bytes> | <encoding-dependent-header> | <encoded-payload>
func (s *Service) chunkWriter(stream libp2pcore.Stream, msg interface{}) error {
SetStreamWriteDeadline(stream, defaultWriteDuration)
return WriteChunk(stream, s.cfg.P2P.Encoding(), msg)
return WriteChunk(stream, s.cfg.Chain, s.cfg.P2P.Encoding(), msg)
}
// WriteChunk object to stream.
// response_chunk ::= | <result> | <encoding-dependent-header> | <encoded-payload>
func WriteChunk(stream libp2pcore.Stream, encoding encoder.NetworkEncoding, msg interface{}) error {
// response_chunk ::= <result> | <context-bytes> | <encoding-dependent-header> | <encoded-payload>
func WriteChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, encoding encoder.NetworkEncoding, msg interface{}) error {
if _, err := stream.Write([]byte{responseCodeSuccess}); err != nil {
return err
}
if err := writeContextToStream(stream, chain); err != nil {
return err
}
_, err := encoding.EncodeWithMaxLength(stream, msg)
return err
}
// ReadChunkedBlock handles each response chunk that is sent by the
// peer and converts it into a beacon block.
func ReadChunkedBlock(stream libp2pcore.Stream, p2p p2p.P2P, isFirstChunk bool) (*eth.SignedBeaconBlock, error) {
func ReadChunkedBlock(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P, isFirstChunk bool) (*eth.SignedBeaconBlock, error) {
// Handle deadlines differently for first chunk
if isFirstChunk {
return readFirstChunkedBlock(stream, p2p)
return readFirstChunkedBlock(stream, chain, p2p)
}
blk := &eth.SignedBeaconBlock{}
if err := readResponseChunk(stream, p2p, blk); err != nil {
if err := readResponseChunk(stream, chain, p2p, blk); err != nil {
return nil, err
}
return blk, nil
@@ -43,7 +47,7 @@ func ReadChunkedBlock(stream libp2pcore.Stream, p2p p2p.P2P, isFirstChunk bool)
// readFirstChunkedBlock reads the first chunked block and applies the appropriate deadlines to
// it.
func readFirstChunkedBlock(stream libp2pcore.Stream, p2p p2p.P2P) (*eth.SignedBeaconBlock, error) {
func readFirstChunkedBlock(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P) (*eth.SignedBeaconBlock, error) {
blk := &eth.SignedBeaconBlock{}
code, errMsg, err := ReadStatusCode(stream, p2p.Encoding())
if err != nil {
@@ -52,21 +56,30 @@ func readFirstChunkedBlock(stream libp2pcore.Stream, p2p p2p.P2P) (*eth.SignedBe
if code != 0 {
return nil, errors.New(errMsg)
}
// No-op for now with the rpc context.
_, err = readContextFromStream(stream, chain)
if err != nil {
return nil, err
}
err = p2p.Encoding().DecodeWithMaxLength(stream, blk)
return blk, err
}
// readResponseChunk reads the response from the stream and decodes it into the
// provided message type.
func readResponseChunk(stream libp2pcore.Stream, p2p p2p.P2P, to interface{}) error {
func readResponseChunk(stream libp2pcore.Stream, chain blockchain.ChainInfoFetcher, p2p p2p.P2P, to interface{}) error {
SetStreamReadDeadline(stream, respTimeout)
code, errMsg, err := readStatusCodeNoDeadline(stream, p2p.Encoding())
if err != nil {
return err
}
if code != 0 {
return errors.New(errMsg)
}
// No-op for now with the rpc context.
_, err = readContextFromStream(stream, chain)
if err != nil {
return err
}
return p2p.Encoding().DecodeWithMaxLength(stream, to)
}

View File

@@ -47,6 +47,11 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.Meta
s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
return nil, errors.New(errMsg)
}
// No-op for now with the rpc context.
_, err = readContextFromStream(stream, s.cfg.Chain)
if err != nil {
return nil, err
}
msg := new(pb.MetaData)
if err := s.cfg.P2P.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
return nil, err

View File

@@ -95,6 +95,11 @@ func (s *Service) sendPingRequest(ctx context.Context, id peer.ID) error {
s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer())
return errors.New(errMsg)
}
// No-op for now with the rpc context.
_, err = readContextFromStream(stream, s.cfg.Chain)
if err != nil {
return err
}
msg := new(types.SSZUint64)
if err := s.cfg.P2P.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
return err

View File

@@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
p2ptypes "github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
@@ -23,7 +24,7 @@ type BeaconBlockProcessor func(block *ethpb.SignedBeaconBlock) error
// SendBeaconBlocksByRangeRequest sends BeaconBlocksByRange and returns fetched blocks, if any.
func SendBeaconBlocksByRangeRequest(
ctx context.Context, p2pProvider p2p.P2P, pid peer.ID,
ctx context.Context, chain blockchain.ChainInfoFetcher, p2pProvider p2p.P2P, pid peer.ID,
req *pb.BeaconBlocksByRangeRequest, blockProcessor BeaconBlockProcessor,
) ([]*ethpb.SignedBeaconBlock, error) {
stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRangeTopicV1, pid)
@@ -44,7 +45,7 @@ func SendBeaconBlocksByRangeRequest(
var prevSlot types.Slot
for i := uint64(0); ; i++ {
isFirstChunk := i == 0
blk, err := ReadChunkedBlock(stream, p2pProvider, isFirstChunk)
blk, err := ReadChunkedBlock(stream, chain, p2pProvider, isFirstChunk)
if errors.Is(err, io.EOF) {
break
}
@@ -81,7 +82,7 @@ func SendBeaconBlocksByRangeRequest(
// SendBeaconBlocksByRootRequest sends BeaconBlocksByRoot and returns fetched blocks, if any.
func SendBeaconBlocksByRootRequest(
ctx context.Context, p2pProvider p2p.P2P, pid peer.ID,
ctx context.Context, chain blockchain.ChainInfoFetcher, p2pProvider p2p.P2P, pid peer.ID,
req *p2ptypes.BeaconBlockByRootsReq, blockProcessor BeaconBlockProcessor,
) ([]*ethpb.SignedBeaconBlock, error) {
stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRootTopicV1, pid)
@@ -105,7 +106,7 @@ func SendBeaconBlocksByRootRequest(
break
}
isFirstChunk := i == 0
blk, err := ReadChunkedBlock(stream, p2pProvider, isFirstChunk)
blk, err := ReadChunkedBlock(stream, chain, p2pProvider, isFirstChunk)
if errors.Is(err, io.EOF) {
break
}

View File

@@ -33,7 +33,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
p1.Connect(bogusPeer)
req := &pb.BeaconBlocksByRangeRequest{}
_, err := SendBeaconBlocksByRangeRequest(ctx, p1, bogusPeer.PeerID(), req, nil)
_, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, bogusPeer.PeerID(), req, nil)
assert.ErrorContains(t, "protocol not supported", err)
})
@@ -78,7 +78,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
if uint64(i) >= uint64(len(knownBlocks)) {
break
}
err = WriteChunk(stream, p2pProvider.Encoding(), knownBlocks[i])
err = WriteChunk(stream, nil, p2pProvider.Encoding(), knownBlocks[i])
if err != nil && err.Error() != mux.ErrReset.Error() {
require.NoError(t, err)
}
@@ -97,7 +97,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
Count: 128,
Step: 1,
}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, nil)
blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil)
assert.NoError(t, err)
assert.Equal(t, 128, len(blocks))
})
@@ -115,7 +115,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
Step: 1,
}
blocksFromProcessor := make([]*eth.SignedBeaconBlock, 0)
blocks, err := SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error {
blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error {
blocksFromProcessor = append(blocksFromProcessor, block)
return nil
})
@@ -137,7 +137,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
Step: 1,
}
errFromProcessor := errors.New("processor error")
_, err := SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error {
_, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error {
return errFromProcessor
})
assert.ErrorContains(t, errFromProcessor.Error(), err)
@@ -155,7 +155,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
Count: 128,
Step: 1,
}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, nil)
blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil)
assert.NoError(t, err)
assert.Equal(t, 128, len(blocks))
@@ -166,7 +166,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
cfg.MaxRequestBlocks = maxRequestBlocks
params.OverrideBeaconNetworkConfig(cfg)
}()
blocks, err = SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error {
blocks, err = SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error {
// Since ssz checks the boundaries, and doesn't normally allow to send requests bigger than
// the max request size, we are updating max request size dynamically. Even when updated dynamically,
// no more than max request size of blocks is expected on return.
@@ -197,7 +197,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
Count: 128,
Step: 1,
}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, nil)
blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil)
assert.ErrorContains(t, expectedErr.Error(), err)
assert.Equal(t, 0, len(blocks))
})
@@ -225,7 +225,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
if uint64(i) >= uint64(len(knownBlocks)) {
break
}
err = WriteChunk(stream, p2.Encoding(), knownBlocks[i])
err = WriteChunk(stream, nil, p2.Encoding(), knownBlocks[i])
if err != nil && err.Error() != mux.ErrReset.Error() {
require.NoError(t, err)
}
@@ -237,7 +237,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
Count: 128,
Step: 1,
}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, nil)
blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil)
assert.ErrorContains(t, ErrInvalidFetchedData.Error(), err)
assert.Equal(t, 0, len(blocks))
@@ -266,7 +266,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
if uint64(i) >= uint64(len(knownBlocks)) {
break
}
err = WriteChunk(stream, p2.Encoding(), knownBlocks[i])
err = WriteChunk(stream, nil, p2.Encoding(), knownBlocks[i])
if err != nil && err.Error() != mux.ErrReset.Error() {
require.NoError(t, err)
}
@@ -278,7 +278,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
Count: 128,
Step: 10,
}
blocks, err := SendBeaconBlocksByRangeRequest(ctx, p1, p2.PeerID(), req, nil)
blocks, err := SendBeaconBlocksByRangeRequest(ctx, nil, p1, p2.PeerID(), req, nil)
assert.ErrorContains(t, ErrInvalidFetchedData.Error(), err)
assert.Equal(t, 0, len(blocks))
@@ -307,7 +307,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
p1.Connect(bogusPeer)
req := &p2pTypes.BeaconBlockByRootsReq{}
_, err := SendBeaconBlocksByRootRequest(ctx, p1, bogusPeer.PeerID(), req, nil)
_, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, bogusPeer.PeerID(), req, nil)
assert.ErrorContains(t, "protocol not supported", err)
})
@@ -354,7 +354,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
p2.SetStreamHandler(pcl, knownBlocksProvider(p2, nil))
req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1]}
blocks, err := SendBeaconBlocksByRootRequest(ctx, p1, p2.PeerID(), req, nil)
blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil)
assert.NoError(t, err)
assert.Equal(t, 2, len(blocks))
})
@@ -368,7 +368,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
// No error from block processor.
req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1]}
blocksFromProcessor := make([]*eth.SignedBeaconBlock, 0)
blocks, err := SendBeaconBlocksByRootRequest(ctx, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error {
blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error {
blocksFromProcessor = append(blocksFromProcessor, block)
return nil
})
@@ -386,7 +386,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
// Send error from block processor.
req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1]}
errFromProcessor := errors.New("processor error")
_, err := SendBeaconBlocksByRootRequest(ctx, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error {
_, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error {
return errFromProcessor
})
assert.ErrorContains(t, errFromProcessor.Error(), err)
@@ -400,7 +400,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
// No cap on max roots.
req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1], knownRoots[2], knownRoots[3]}
blocks, err := SendBeaconBlocksByRootRequest(ctx, p1, p2.PeerID(), req, nil)
blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil)
assert.NoError(t, err)
assert.Equal(t, 4, len(blocks))
@@ -411,7 +411,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
cfg.MaxRequestBlocks = maxRequestBlocks
params.OverrideBeaconNetworkConfig(cfg)
}()
blocks, err = SendBeaconBlocksByRootRequest(ctx, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error {
blocks, err = SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, func(block *eth.SignedBeaconBlock) error {
// Since ssz checks the boundaries, and doesn't normally allow to send requests bigger than
// the max request size, we are updating max request size dynamically. Even when updated dynamically,
// no more than max request size of blocks is expected on return.
@@ -438,7 +438,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
}))
req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1], knownRoots[2], knownRoots[3]}
blocks, err := SendBeaconBlocksByRootRequest(ctx, p1, p2.PeerID(), req, nil)
blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil)
assert.ErrorContains(t, expectedErr.Error(), err)
assert.Equal(t, 0, len(blocks))
})
@@ -458,7 +458,7 @@ func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
}))
req := &p2pTypes.BeaconBlockByRootsReq{knownRoots[0], knownRoots[1], knownRoots[2], knownRoots[3]}
blocks, err := SendBeaconBlocksByRootRequest(ctx, p1, p2.PeerID(), req, nil)
blocks, err := SendBeaconBlocksByRootRequest(ctx, nil, p1, p2.PeerID(), req, nil)
assert.NoError(t, err)
assert.Equal(t, 3, len(blocks))
})

View File

@@ -154,7 +154,11 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
s.cfg.P2P.Peers().Scorers().BadResponsesScorer().Increment(id)
return errors.New(errMsg)
}
// No-op for now with the rpc context.
_, err = readContextFromStream(stream, s.cfg.Chain)
if err != nil {
return err
}
msg := &pb.Status{}
if err := s.cfg.P2P.Encoding().DecodeWithMaxLength(stream, msg); err != nil {
return err