Refactor RPC Topic Construction (#8892)

* check in changes

* goimports

* export it

* add new rpc type
This commit is contained in:
Nishant Das
2021-05-15 08:58:56 +08:00
committed by GitHub
parent bc27a73600
commit e6a1d5b1b9
15 changed files with 243 additions and 54 deletions

View File

@@ -9,32 +9,74 @@ import (
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
)
// Current schema version for our rpc protocol ID.
const schemaVersionV1 = "/1"
// SchemaVersionV1 specifies the schema version for our rpc protocol ID.
const SchemaVersionV1 = "/1"
// Specifies the protocol prefix for all our Req/Resp topics.
const protocolPrefix = "/eth2/beacon_chain/req"
// Specifies the name for the status message topic.
const statusMessageName = "/status"
// Specifies the name for the goodbye message topic.
const goodbyeMessageName = "/goodbye"
// Specifies the name for the beacon blocks by range message topic.
const beaconBlocksByRangeMessageName = "/beacon_blocks_by_range"
// Specifies the name for the beacon blocks by root message topic.
const beaconBlocksByRootsMessageName = "/beacon_blocks_by_root"
// Specifies the name for the ping message topic.
const pingMessageName = "/ping"
// Specifies the name for the metadata message topic.
const metadataMessageName = "/metadata"
const (
// RPCStatusTopic defines the topic for the status rpc method.
RPCStatusTopic = "/eth2/beacon_chain/req/status" + schemaVersionV1
// RPCGoodByeTopic defines the topic for the goodbye rpc method.
RPCGoodByeTopic = "/eth2/beacon_chain/req/goodbye" + schemaVersionV1
// RPCBlocksByRangeTopic defines the topic for the blocks by range rpc method.
RPCBlocksByRangeTopic = "/eth2/beacon_chain/req/beacon_blocks_by_range" + schemaVersionV1
// RPCBlocksByRootTopic defines the topic for the blocks by root rpc method.
RPCBlocksByRootTopic = "/eth2/beacon_chain/req/beacon_blocks_by_root" + schemaVersionV1
// RPCPingTopic defines the topic for the ping rpc method.
RPCPingTopic = "/eth2/beacon_chain/req/ping" + schemaVersionV1
// RPCMetaDataTopic defines the topic for the metadata rpc method.
RPCMetaDataTopic = "/eth2/beacon_chain/req/metadata" + schemaVersionV1
// V1 RPC Topics
// RPCStatusTopicV1 defines the v1 topic for the status rpc method.
RPCStatusTopicV1 = protocolPrefix + statusMessageName + SchemaVersionV1
// RPCGoodByeTopicV1 defines the v1 topic for the goodbye rpc method.
RPCGoodByeTopicV1 = protocolPrefix + goodbyeMessageName + SchemaVersionV1
// RPCBlocksByRangeTopicV1 defines v1 the topic for the blocks by range rpc method.
RPCBlocksByRangeTopicV1 = protocolPrefix + beaconBlocksByRangeMessageName + SchemaVersionV1
// RPCBlocksByRootTopicV1 defines the v1 topic for the blocks by root rpc method.
RPCBlocksByRootTopicV1 = protocolPrefix + beaconBlocksByRootsMessageName + SchemaVersionV1
// RPCPingTopicV1 defines the v1 topic for the ping rpc method.
RPCPingTopicV1 = protocolPrefix + pingMessageName + SchemaVersionV1
// RPCMetaDataTopicV1 defines the v1 topic for the metadata rpc method.
RPCMetaDataTopicV1 = protocolPrefix + metadataMessageName + SchemaVersionV1
)
// RPCTopicMappings map the base message type to the rpc request.
var RPCTopicMappings = map[string]interface{}{
RPCStatusTopic: new(pb.Status),
RPCGoodByeTopic: new(types.SSZUint64),
RPCBlocksByRangeTopic: new(pb.BeaconBlocksByRangeRequest),
RPCBlocksByRootTopic: new(p2ptypes.BeaconBlockByRootsReq),
RPCPingTopic: new(types.SSZUint64),
RPCMetaDataTopic: new(interface{}),
RPCStatusTopicV1: new(pb.Status),
RPCGoodByeTopicV1: new(types.SSZUint64),
RPCBlocksByRangeTopicV1: new(pb.BeaconBlocksByRangeRequest),
RPCBlocksByRootTopicV1: new(p2ptypes.BeaconBlockByRootsReq),
RPCPingTopicV1: new(types.SSZUint64),
RPCMetaDataTopicV1: new(interface{}),
}
// Maps all registered protocol prefixes.
var protocolMapping = map[string]bool{
protocolPrefix: true,
}
// Maps all the protocol message names for the different rpc
// topics.
var messageMapping = map[string]bool{
statusMessageName: true,
goodbyeMessageName: true,
beaconBlocksByRangeMessageName: true,
beaconBlocksByRootsMessageName: true,
pingMessageName: true,
metadataMessageName: true,
}
var versionMapping = map[string]bool{
SchemaVersionV1: true,
}
// VerifyTopicMapping verifies that the topic and its accompanying
@@ -54,3 +96,94 @@ func VerifyTopicMapping(topic string, msg interface{}) error {
}
return nil
}
// TopicDeconstructor splits the provided topic to its logical sub-sections.
// It is assumed all input topics will follow the specific schema:
// /protocol-prefix/message-name/schema-version/...
// For the purposes of deconstruction, only the first 3 components are
// relevant.
func TopicDeconstructor(topic string) (string, string, string, error) {
origTopic := topic
protPrefix := ""
message := ""
version := ""
// Iterate through all the relevant mappings to find the relevant prefixes,messages
// and version for this topic.
for k := range protocolMapping {
keyLen := len(k)
if keyLen > len(topic) {
continue
}
if topic[:keyLen] == k {
protPrefix = k
topic = topic[keyLen:]
}
}
if protPrefix == "" {
return "", "", "", errors.Errorf("unable to find a valid protocol prefix for %s", origTopic)
}
for k := range messageMapping {
keyLen := len(k)
if keyLen > len(topic) {
continue
}
if topic[:keyLen] == k {
message = k
topic = topic[keyLen:]
}
}
if message == "" {
return "", "", "", errors.Errorf("unable to find a valid message for %s", origTopic)
}
for k := range versionMapping {
keyLen := len(k)
if keyLen > len(topic) {
continue
}
if topic[:keyLen] == k {
version = k
topic = topic[keyLen:]
}
}
if version == "" {
return "", "", "", errors.Errorf("unable to find a valid schema version for %s", origTopic)
}
return protPrefix, message, version, nil
}
// RPCTopic is a type used to denote and represent a req/resp topic.
type RPCTopic string
// ProtocolPrefix returns the protocol prefix of the rpc topic.
func (r RPCTopic) ProtocolPrefix() string {
prefix, _, _, err := TopicDeconstructor(string(r))
if err != nil {
return ""
}
return prefix
}
// MessageType returns the message type of the rpc topic.
func (r RPCTopic) MessageType() string {
_, message, _, err := TopicDeconstructor(string(r))
if err != nil {
return ""
}
return message
}
// Version returns the schema version of the rpc topic.
func (r RPCTopic) Version() string {
_, _, version, err := TopicDeconstructor(string(r))
if err != nil {
return ""
}
return version
}

View File

@@ -6,14 +6,70 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/p2p/types"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
func TestVerifyRPCMappings(t *testing.T) {
assert.NoError(t, VerifyTopicMapping(RPCStatusTopic, &pb.Status{}), "Failed to verify status rpc topic")
assert.NotNil(t, VerifyTopicMapping(RPCStatusTopic, new([]byte)), "Incorrect message type verified for status rpc topic")
assert.NoError(t, VerifyTopicMapping(RPCStatusTopicV1, &pb.Status{}), "Failed to verify status rpc topic")
assert.NotNil(t, VerifyTopicMapping(RPCStatusTopicV1, new([]byte)), "Incorrect message type verified for status rpc topic")
assert.NoError(t, VerifyTopicMapping(RPCMetaDataTopic, new(interface{})), "Failed to verify metadata rpc topic")
assert.NotNil(t, VerifyTopicMapping(RPCStatusTopic, new([]byte)), "Incorrect message type verified for metadata rpc topic")
assert.NoError(t, VerifyTopicMapping(RPCMetaDataTopicV1, new(interface{})), "Failed to verify metadata rpc topic")
assert.NotNil(t, VerifyTopicMapping(RPCStatusTopicV1, new([]byte)), "Incorrect message type verified for metadata rpc topic")
assert.NoError(t, VerifyTopicMapping(RPCBlocksByRootTopic, new(types.BeaconBlockByRootsReq)), "Failed to verify blocks by root rpc topic")
assert.NoError(t, VerifyTopicMapping(RPCBlocksByRootTopicV1, new(types.BeaconBlockByRootsReq)), "Failed to verify blocks by root rpc topic")
}
func TestTopicDeconstructor(t *testing.T) {
tt := []struct {
name string
topic string
expectedError string
output []string
}{
{
name: "invalid topic",
topic: "/sjdksfks/dusidsdsd/ssz",
expectedError: "unable to find a valid protocol prefix for /sjdksfks/dusidsdsd/ssz",
output: []string{"", "", ""},
},
{
name: "valid status topic",
topic: protocolPrefix + statusMessageName + SchemaVersionV1,
expectedError: "",
output: []string{protocolPrefix, statusMessageName, SchemaVersionV1},
},
{
name: "malformed status topic",
topic: protocolPrefix + "/statis" + SchemaVersionV1,
expectedError: "unable to find a valid message for /eth2/beacon_chain/req/statis/1",
output: []string{""},
},
{
name: "valid beacon block by range topic",
topic: protocolPrefix + beaconBlocksByRangeMessageName + SchemaVersionV1 + "/ssz_snappy",
expectedError: "",
output: []string{protocolPrefix, beaconBlocksByRangeMessageName, SchemaVersionV1},
},
{
name: "beacon block by range topic with malformed version",
topic: protocolPrefix + beaconBlocksByRangeMessageName + "/v" + "/ssz_snappy",
expectedError: "unable to find a valid schema version for /eth2/beacon_chain/req/beacon_blocks_by_range/v/ssz_snappy",
output: []string{""},
},
}
for _, test := range tt {
t.Run(test.name, func(t *testing.T) {
protocolPref, message, version, err := TopicDeconstructor(test.topic)
if test.expectedError != "" {
require.NotNil(t, err)
assert.Equal(t, test.expectedError, err.Error())
} else {
require.NoError(t, err)
assert.Equal(t, test.output[0], protocolPref)
assert.Equal(t, test.output[1], message)
assert.Equal(t, test.output[2], version)
}
})
}
}

View File

@@ -33,7 +33,7 @@ func (s *Service) Send(ctx context.Context, message interface{}, baseTopic strin
return nil, err
}
// do not encode anything if we are sending a metadata request
if baseTopic != RPCMetaDataTopic {
if baseTopic != RPCMetaDataTopicV1 {
if _, err := s.Encoding().EncodeWithMaxLength(stream, message); err != nil {
traceutil.AnnotateError(span, err)
_err := stream.Reset()

View File

@@ -531,7 +531,7 @@ func TestBlocksFetcher_RequestBlocksRateLimitingLocks(t *testing.T) {
Count: 64,
}
topic := p2pm.RPCBlocksByRangeTopic
topic := p2pm.RPCBlocksByRangeTopicV1
protocol := core.ProtocolID(topic + p2.Encoding().ProtocolSuffix())
streamHandlerFn := func(stream network.Stream) {
assert.NoError(t, stream.Close())
@@ -798,7 +798,7 @@ func TestBlocksFetcher_requestBlocksFromPeerReturningInvalidBlocks(t *testing.T)
},
}
topic := p2pm.RPCBlocksByRangeTopic
topic := p2pm.RPCBlocksByRangeTopicV1
protocol := core.ProtocolID(topic + p1.Encoding().ProtocolSuffix())
ctx, cancel := context.WithCancel(context.Background())

View File

@@ -450,7 +450,7 @@ func TestBlocksFetcher_findAncestor(t *testing.T) {
},
)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false)
pcl := fmt.Sprintf("%s/ssz_snappy", p2pm.RPCBlocksByRootTopic)
pcl := fmt.Sprintf("%s/ssz_snappy", p2pm.RPCBlocksByRootTopicV1)
t.Run("error on request", func(t *testing.T) {
p2 := p2pt.NewTestP2P(t)

View File

@@ -39,22 +39,22 @@ func newRateLimiter(p2pProvider p2p.P2P) *limiter {
// Set topic map for all rpc topics.
topicMap := make(map[string]*leakybucket.Collector, len(p2p.RPCTopicMappings))
// Goodbye Message
topicMap[addEncoding(p2p.RPCGoodByeTopic)] = leakybucket.NewCollector(1, 1, false /* deleteEmptyBuckets */)
topicMap[addEncoding(p2p.RPCGoodByeTopicV1)] = leakybucket.NewCollector(1, 1, false /* deleteEmptyBuckets */)
// Metadata Message
topicMap[addEncoding(p2p.RPCMetaDataTopic)] = leakybucket.NewCollector(1, defaultBurstLimit, false /* deleteEmptyBuckets */)
topicMap[addEncoding(p2p.RPCMetaDataTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, false /* deleteEmptyBuckets */)
// Ping Message
topicMap[addEncoding(p2p.RPCPingTopic)] = leakybucket.NewCollector(1, defaultBurstLimit, false /* deleteEmptyBuckets */)
topicMap[addEncoding(p2p.RPCPingTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, false /* deleteEmptyBuckets */)
// Status Message
topicMap[addEncoding(p2p.RPCStatusTopic)] = leakybucket.NewCollector(1, defaultBurstLimit, false /* deleteEmptyBuckets */)
topicMap[addEncoding(p2p.RPCStatusTopicV1)] = leakybucket.NewCollector(1, defaultBurstLimit, false /* deleteEmptyBuckets */)
// Use a single collector for block requests
blockCollector := leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */)
// BlocksByRoots requests
topicMap[addEncoding(p2p.RPCBlocksByRootTopic)] = blockCollector
topicMap[addEncoding(p2p.RPCBlocksByRootTopicV1)] = blockCollector
// BlockByRange requests
topicMap[addEncoding(p2p.RPCBlocksByRangeTopic)] = blockCollector
topicMap[addEncoding(p2p.RPCBlocksByRangeTopicV1)] = blockCollector
// General topic for all rpc requests.
topicMap[rpcLimiterTopic] = leakybucket.NewCollector(5, defaultBurstLimit*2, false /* deleteEmptyBuckets */)

View File

@@ -35,7 +35,7 @@ func TestRateLimiter_ExceedCapacity(t *testing.T) {
rlimiter := newRateLimiter(p1)
// BlockByRange
topic := p2p.RPCBlocksByRangeTopic + p1.Encoding().ProtocolSuffix()
topic := p2p.RPCBlocksByRangeTopicV1 + p1.Encoding().ProtocolSuffix()
wg := sync.WaitGroup{}
p2.BHost.SetStreamHandler(protocol.ID(topic), func(stream network.Stream) {
@@ -72,7 +72,7 @@ func TestRateLimiter_ExceedRawCapacity(t *testing.T) {
rlimiter := newRateLimiter(p1)
// BlockByRange
topic := p2p.RPCBlocksByRangeTopic + p1.Encoding().ProtocolSuffix()
topic := p2p.RPCBlocksByRangeTopicV1 + p1.Encoding().ProtocolSuffix()
wg := sync.WaitGroup{}
p2.BHost.SetStreamHandler(protocol.ID(topic), func(stream network.Stream) {

View File

@@ -31,27 +31,27 @@ type rpcHandler func(context.Context, interface{}, libp2pcore.Stream) error
// registerRPCHandlers for p2p RPC.
func (s *Service) registerRPCHandlers() {
s.registerRPC(
p2p.RPCStatusTopic,
p2p.RPCStatusTopicV1,
s.statusRPCHandler,
)
s.registerRPC(
p2p.RPCGoodByeTopic,
p2p.RPCGoodByeTopicV1,
s.goodbyeRPCHandler,
)
s.registerRPC(
p2p.RPCBlocksByRangeTopic,
p2p.RPCBlocksByRangeTopicV1,
s.beaconBlocksByRangeRPCHandler,
)
s.registerRPC(
p2p.RPCBlocksByRootTopic,
p2p.RPCBlocksByRootTopicV1,
s.beaconBlocksRootRPCHandler,
)
s.registerRPC(
p2p.RPCPingTopic,
p2p.RPCPingTopicV1,
s.pingHandler,
)
s.registerRPC(
p2p.RPCMetaDataTopic,
p2p.RPCMetaDataTopicV1,
s.metaDataHandler,
)
}
@@ -113,7 +113,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
// since metadata requests do not have any data in the payload, we
// do not decode anything.
if baseTopic == p2p.RPCMetaDataTopic {
if baseTopic == p2p.RPCMetaDataTopicV1 {
if err := handle(ctx, base, stream); err != nil {
messageFailedProcessingCounter.WithLabelValues(topic).Inc()
if err != p2ptypes.ErrWrongForkDigestVersion {
@@ -131,7 +131,7 @@ func (s *Service) registerRPC(baseTopic string, handle rpcHandler) {
msg := reflect.New(t.Elem())
if err := s.cfg.P2P.Encoding().DecodeWithMaxLength(stream, msg.Interface()); err != nil {
// Debug logs for goodbye/status errors
if strings.Contains(topic, p2p.RPCGoodByeTopic) || strings.Contains(topic, p2p.RPCStatusTopic) {
if strings.Contains(topic, p2p.RPCGoodByeTopicV1) || strings.Contains(topic, p2p.RPCStatusTopicV1) {
log.WithError(err).Debug("Could not decode goodbye stream message")
traceutil.AnnotateError(span, err)
return

View File

@@ -91,7 +91,7 @@ func (s *Service) sendGoodByeMessage(ctx context.Context, code p2ptypes.RPCGoodb
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
stream, err := s.cfg.P2P.Send(ctx, &code, p2p.RPCGoodByeTopic, id)
stream, err := s.cfg.P2P.Send(ctx, &code, p2p.RPCGoodByeTopicV1, id)
if err != nil {
return err
}

View File

@@ -34,7 +34,7 @@ func (s *Service) sendMetaDataRequest(ctx context.Context, id peer.ID) (*pb.Meta
ctx, cancel := context.WithTimeout(ctx, respTimeout)
defer cancel()
stream, err := s.cfg.P2P.Send(ctx, new(interface{}), p2p.RPCMetaDataTopic, id)
stream, err := s.cfg.P2P.Send(ctx, new(interface{}), p2p.RPCMetaDataTopicV1, id)
if err != nil {
return nil, err
}

View File

@@ -99,7 +99,7 @@ func TestMetadataRPCHandler_SendsMetadata(t *testing.T) {
}
// Setup streams
pcl := protocol.ID(p2p.RPCMetaDataTopic + r.cfg.P2P.Encoding().ProtocolSuffix())
pcl := protocol.ID(p2p.RPCMetaDataTopicV1 + r.cfg.P2P.Encoding().ProtocolSuffix())
topic := string(pcl)
r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)
r2.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(1, 1, false)

View File

@@ -77,7 +77,7 @@ func (s *Service) sendPingRequest(ctx context.Context, id peer.ID) error {
defer cancel()
metadataSeq := types.SSZUint64(s.cfg.P2P.MetadataSeq())
stream, err := s.cfg.P2P.Send(ctx, &metadataSeq, p2p.RPCPingTopic, id)
stream, err := s.cfg.P2P.Send(ctx, &metadataSeq, p2p.RPCPingTopicV1, id)
if err != nil {
return err
}

View File

@@ -26,7 +26,7 @@ func SendBeaconBlocksByRangeRequest(
ctx context.Context, p2pProvider p2p.P2P, pid peer.ID,
req *pb.BeaconBlocksByRangeRequest, blockProcessor BeaconBlockProcessor,
) ([]*ethpb.SignedBeaconBlock, error) {
stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRangeTopic, pid)
stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRangeTopicV1, pid)
if err != nil {
return nil, err
}
@@ -84,7 +84,7 @@ func SendBeaconBlocksByRootRequest(
ctx context.Context, p2pProvider p2p.P2P, pid peer.ID,
req *p2ptypes.BeaconBlockByRootsReq, blockProcessor BeaconBlockProcessor,
) ([]*ethpb.SignedBeaconBlock, error) {
stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRootTopic, pid)
stream, err := p2pProvider.Send(ctx, req, p2p.RPCBlocksByRootTopicV1, pid)
if err != nil {
return nil, err
}

View File

@@ -24,7 +24,7 @@ import (
func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pcl := fmt.Sprintf("%s/ssz_snappy", p2p.RPCBlocksByRangeTopic)
pcl := fmt.Sprintf("%s/ssz_snappy", p2p.RPCBlocksByRangeTopicV1)
t.Run("stream error", func(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
@@ -288,7 +288,7 @@ func TestSendRequest_SendBeaconBlocksByRangeRequest(t *testing.T) {
func TestSendRequest_SendBeaconBlocksByRootRequest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
pcl := fmt.Sprintf("%s/ssz_snappy", p2p.RPCBlocksByRootTopic)
pcl := fmt.Sprintf("%s/ssz_snappy", p2p.RPCBlocksByRootTopicV1)
knownBlocks := make(map[[32]byte]*eth.SignedBeaconBlock)
knownRoots := make([][32]byte, 0)

View File

@@ -139,7 +139,7 @@ func (s *Service) sendRPCStatusRequest(ctx context.Context, id peer.ID) error {
HeadRoot: headRoot,
HeadSlot: s.cfg.Chain.HeadSlot(),
}
stream, err := s.cfg.P2P.Send(ctx, resp, p2p.RPCStatusTopic, id)
stream, err := s.cfg.P2P.Send(ctx, resp, p2p.RPCStatusTopicV1, id)
if err != nil {
return err
}