Integrate Engine Proxy into E2E (#10808)

* add it in

* support jwt secret

* fix it

* fix

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
This commit is contained in:
Nishant Das
2022-06-07 07:35:54 +08:00
committed by GitHub
parent 5b12f5a27d
commit 6c39301f33
13 changed files with 302 additions and 14 deletions

View File

@@ -31,14 +31,7 @@ func (e Endpoint) HttpClient() *http.Client {
if e.Auth.Method != authorization.Bearer {
return http.DefaultClient
}
authTransport := &jwtTransport{
underlyingTransport: http.DefaultTransport,
jwtSecret: []byte(e.Auth.Value),
}
return &http.Client{
Timeout: DefaultRPCHTTPTimeout,
Transport: authTransport,
}
return NewHttpClientWithSecret(e.Auth.Value)
}
// Equals compares two authorization data objects for equality.
@@ -70,3 +63,16 @@ func Method(auth string) authorization.AuthorizationMethod {
}
return authorization.None
}
// NewHttpClientWithSecret returns a http client that utilizes
// jwt authentication.
func NewHttpClientWithSecret(secret string) *http.Client {
authTransport := &jwtTransport{
underlyingTransport: http.DefaultTransport,
jwtSecret: []byte(secret),
}
return &http.Client{
Timeout: DefaultRPCHTTPTimeout,
Transport: authTransport,
}
}

View File

@@ -26,6 +26,7 @@ type componentHandler struct {
web3Signer e2etypes.ComponentRunner
bootnode e2etypes.ComponentRunner
eth1Miner e2etypes.ComponentRunner
eth1Proxy e2etypes.ComponentRunner
eth1Nodes e2etypes.MultipleComponentRunners
beaconNodes e2etypes.MultipleComponentRunners
validatorNodes e2etypes.MultipleComponentRunners
@@ -132,10 +133,23 @@ func (c *componentHandler) setup() {
if config.TestCheckpointSync {
appendDebugEndpoints(config)
}
// Proxies
proxies := eth1.NewProxySet()
g.Go(func() error {
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes}); err != nil {
return errors.Wrap(err, "beacon nodes require ETH1 and boot node to run")
}
if err := proxies.Start(ctx); err != nil {
return errors.Wrap(err, "failed to start proxies")
}
return nil
})
c.eth1Proxy = proxies
// Beacon nodes.
beaconNodes := components.NewBeaconNodes(config)
g.Go(func() error {
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes, bootNode}); err != nil {
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes, proxies, bootNode}); err != nil {
return errors.Wrap(err, "beacon nodes require ETH1 and boot node to run")
}
beaconNodes.SetENR(bootNode.ENR())
@@ -149,7 +163,7 @@ func (c *componentHandler) setup() {
if multiClientActive {
lighthouseNodes = components.NewLighthouseBeaconNodes(config)
g.Go(func() error {
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes, bootNode, beaconNodes}); err != nil {
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes, proxies, bootNode, beaconNodes}); err != nil {
return errors.Wrap(err, "lighthouse beacon nodes require ETH1 and boot node to run")
}
lighthouseNodes.SetENR(bootNode.ENR())
@@ -197,7 +211,7 @@ func (c *componentHandler) setup() {
func (c *componentHandler) required() []e2etypes.ComponentRunner {
multiClientActive := e2e.TestParams.LighthouseBeaconNodeCount > 0
requiredComponents := []e2etypes.ComponentRunner{
c.tracingSink, c.eth1Nodes, c.bootnode, c.beaconNodes, c.validatorNodes,
c.tracingSink, c.eth1Nodes, c.bootnode, c.beaconNodes, c.validatorNodes, c.eth1Proxy,
}
if multiClientActive {
requiredComponents = append(requiredComponents, []e2etypes.ComponentRunner{c.keygen, c.lighthouseBeaconNodes, c.lighthouseValidatorNodes}...)

View File

@@ -188,7 +188,7 @@ func (node *BeaconNode) Start(ctx context.Context) error {
fmt.Sprintf("--%s=%s", cmdshared.LogFileName.Name, stdOutFile.Name()),
fmt.Sprintf("--%s=%s", flags.DepositContractFlag.Name, e2e.TestParams.ContractAddress.Hex()),
fmt.Sprintf("--%s=%d", flags.RPCPort.Name, e2e.TestParams.Ports.PrysmBeaconNodeRPCPort+index),
fmt.Sprintf("--%s=http://127.0.0.1:%d", flags.HTTPWeb3ProviderFlag.Name, e2e.TestParams.Ports.Eth1AuthRPCPort+index),
fmt.Sprintf("--%s=http://127.0.0.1:%d", flags.HTTPWeb3ProviderFlag.Name, e2e.TestParams.Ports.Eth1ProxyPort+index),
fmt.Sprintf("--%s=%s", flags.ExecutionJWTSecretFlag.Name, jwtPath),
fmt.Sprintf("--%s=%d", flags.MinSyncPeers.Name, 1),
fmt.Sprintf("--%s=%d", cmdshared.P2PUDPPort.Name, e2e.TestParams.Ports.PrysmBeaconNodeUDPPort+index),

View File

@@ -8,6 +8,7 @@ go_library(
"miner.go",
"node.go",
"node_set.go",
"proxy.go",
"transactions.go",
],
importpath = "github.com/prysmaticlabs/prysm/testing/endtoend/components/eth1",
@@ -20,6 +21,7 @@ go_library(
"//testing/endtoend/helpers:go_default_library",
"//testing/endtoend/params:go_default_library",
"//testing/endtoend/types:go_default_library",
"//testing/middleware/engine-api-proxy:go_default_library",
"@com_github_ethereum_go_ethereum//accounts/abi/bind:go_default_library",
"@com_github_ethereum_go_ethereum//accounts/keystore:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",

View File

@@ -25,8 +25,10 @@ const timeGapPerMiningTX = 250 * time.Millisecond
var _ e2etypes.ComponentRunner = (*NodeSet)(nil)
var _ e2etypes.MultipleComponentRunners = (*NodeSet)(nil)
var _ e2etypes.MultipleComponentRunners = (*ProxySet)(nil)
var _ e2etypes.ComponentRunner = (*Miner)(nil)
var _ e2etypes.ComponentRunner = (*Node)(nil)
var _ e2etypes.ComponentRunner = (*Proxy)(nil)
// WaitForBlocks waits for a certain amount of blocks to be mined by the ETH1 chain before returning.
func WaitForBlocks(web3 *ethclient.Client, keystore *keystore.Key, blocksToWait uint64) error {

View File

@@ -0,0 +1,205 @@
package eth1
import (
"context"
"encoding/hex"
"fmt"
"os"
"path"
"strconv"
"strings"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/io/file"
"github.com/prysmaticlabs/prysm/testing/endtoend/helpers"
e2e "github.com/prysmaticlabs/prysm/testing/endtoend/params"
e2etypes "github.com/prysmaticlabs/prysm/testing/endtoend/types"
proxy "github.com/prysmaticlabs/prysm/testing/middleware/engine-api-proxy"
log "github.com/sirupsen/logrus"
)
// ProxySet represents a set of proxies for the engine-api.
type ProxySet struct {
e2etypes.ComponentRunner
started chan struct{}
proxies []e2etypes.ComponentRunner
}
// NewProxySet creates and returns a set of engine-api proxies.
func NewProxySet() *ProxySet {
return &ProxySet{
started: make(chan struct{}, 1),
}
}
// Start starts all the proxies in set.
func (s *ProxySet) Start(ctx context.Context) error {
totalNodeCount := e2e.TestParams.BeaconNodeCount + e2e.TestParams.LighthouseBeaconNodeCount
nodes := make([]e2etypes.ComponentRunner, totalNodeCount)
for i := 0; i < totalNodeCount; i++ {
// We start indexing nodes from 1 because the miner has an implicit 0 index.
nodes[i] = NewProxy(i)
}
s.proxies = nodes
// Wait for all nodes to finish their job (blocking).
// Once nodes are ready passed in handler function will be called.
return helpers.WaitOnNodes(ctx, nodes, func() {
// All nodes started, close channel, so that all services waiting on a set, can proceed.
close(s.started)
})
}
// Started checks whether proxy set is started and all proxies are ready to be queried.
func (s *ProxySet) Started() <-chan struct{} {
return s.started
}
// Pause pauses the component and its underlying process.
func (s *ProxySet) Pause() error {
for _, n := range s.proxies {
if err := n.Pause(); err != nil {
return err
}
}
return nil
}
// Resume resumes the component and its underlying process.
func (s *ProxySet) Resume() error {
for _, n := range s.proxies {
if err := n.Resume(); err != nil {
return err
}
}
return nil
}
// Stop stops the component and its underlying process.
func (s *ProxySet) Stop() error {
for _, n := range s.proxies {
if err := n.Stop(); err != nil {
return err
}
}
return nil
}
// PauseAtIndex pauses the component and its underlying process at the desired index.
func (s *ProxySet) PauseAtIndex(i int) error {
if i >= len(s.proxies) {
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.proxies))
}
return s.proxies[i].Pause()
}
// ResumeAtIndex resumes the component and its underlying process at the desired index.
func (s *ProxySet) ResumeAtIndex(i int) error {
if i >= len(s.proxies) {
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.proxies))
}
return s.proxies[i].Resume()
}
// StopAtIndex stops the component and its underlying process at the desired index.
func (s *ProxySet) StopAtIndex(i int) error {
if i >= len(s.proxies) {
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.proxies))
}
return s.proxies[i].Stop()
}
// Proxy represents an engine-api proxy.
type Proxy struct {
e2etypes.ComponentRunner
started chan struct{}
index int
cancel func()
}
// NewProxy creates and returns an engine-api proxy.
func NewProxy(index int) *Proxy {
return &Proxy{
started: make(chan struct{}, 1),
index: index,
}
}
// Start runs a proxy.
func (node *Proxy) Start(ctx context.Context) error {
file, err := os.Create(path.Join(e2e.TestParams.LogPath, "eth1_proxy_"+strconv.Itoa(node.index)+".log"))
if err != nil {
return err
}
jwtPath := path.Join(e2e.TestParams.TestPath, "eth1data/"+strconv.Itoa(node.index)+"/")
if node.index == 0 {
jwtPath = path.Join(e2e.TestParams.TestPath, "eth1data/miner/")
}
jwtPath = path.Join(jwtPath, "geth/jwtsecret")
secret, err := parseJWTSecretFromFile(jwtPath)
if err != nil {
return err
}
opts := []proxy.Option{
proxy.WithDestinationAddress(fmt.Sprintf("http://127.0.0.1:%d", e2e.TestParams.Ports.Eth1AuthRPCPort+node.index)),
proxy.WithPort(e2e.TestParams.Ports.Eth1ProxyPort + node.index),
proxy.WithLogger(log.New()),
proxy.WithLogFile(file),
proxy.WithJwtSecret(string(secret)),
}
nProxy, err := proxy.New(opts...)
if err != nil {
return err
}
log.Infof("Starting eth1 proxy %d with port: %d and file %s", node.index, e2e.TestParams.Ports.Eth1ProxyPort+node.index, file.Name())
// Set cancel into context.
ctx, cancel := context.WithCancel(ctx)
node.cancel = cancel
// Mark node as ready.
close(node.started)
return nProxy.Start(ctx)
}
// Started checks whether the eth1 proxy is started and ready to be queried.
func (node *Proxy) Started() <-chan struct{} {
return node.started
}
// Pause pauses the component and its underlying process.
func (node *Proxy) Pause() error {
// no-op
return nil
}
// Resume resumes the component and its underlying process.
func (node *Proxy) Resume() error {
// no-op
return nil
}
// Stop kills the component and its underlying process.
func (node *Proxy) Stop() error {
node.cancel()
return nil
}
func parseJWTSecretFromFile(jwtSecretFile string) ([]byte, error) {
enc, err := file.ReadFileAsBytes(jwtSecretFile)
if err != nil {
return nil, err
}
strData := strings.TrimSpace(string(enc))
if len(strData) == 0 {
return nil, fmt.Errorf("provided JWT secret in file %s cannot be empty", jwtSecretFile)
}
secret, err := hex.DecodeString(strings.TrimPrefix(strData, "0x"))
if err != nil {
return nil, err
}
if len(secret) < 32 {
return nil, errors.New("provided JWT secret should be a hex string of at least 32 bytes")
}
return secret, nil
}

View File

@@ -176,7 +176,7 @@ func (node *LighthouseBeaconNode) Start(ctx context.Context) error {
fmt.Sprintf("--http-port=%d", e2e.TestParams.Ports.LighthouseBeaconNodeHTTPPort+index),
fmt.Sprintf("--target-peers=%d", 10),
fmt.Sprintf("--eth1-endpoints=http://127.0.0.1:%d", e2e.TestParams.Ports.Eth1RPCPort+prysmNodeCount+index),
fmt.Sprintf("--execution-endpoints=http://127.0.0.1:%d", e2e.TestParams.Ports.Eth1AuthRPCPort+prysmNodeCount+index),
fmt.Sprintf("--execution-endpoints=http://127.0.0.1:%d", e2e.TestParams.Ports.Eth1ProxyPort+prysmNodeCount+index),
fmt.Sprintf("--jwt-secrets=%s", jwtPath),
fmt.Sprintf("--boot-nodes=%s", node.enr),
fmt.Sprintf("--metrics-port=%d", e2e.TestParams.Ports.LighthouseBeaconNodeMetricsPort+index),

View File

@@ -263,6 +263,13 @@ func (r *testRunner) testCheckpointSync(ctx context.Context, g *errgroup.Group,
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{ethNode}); err != nil {
return fmt.Errorf("sync beacon node not ready: %w", err)
}
proxyNode := eth1.NewProxy(i)
g.Go(func() error {
return proxyNode.Start(ctx)
})
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{proxyNode}); err != nil {
return fmt.Errorf("sync beacon node not ready: %w", err)
}
client, err := beacon.NewClient(bnAPI)
if err != nil {
@@ -323,6 +330,13 @@ func (r *testRunner) testBeaconChainSync(ctx context.Context, g *errgroup.Group,
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{ethNode}); err != nil {
return fmt.Errorf("sync beacon node not ready: %w", err)
}
proxyNode := eth1.NewProxy(index)
g.Go(func() error {
return proxyNode.Start(ctx)
})
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{proxyNode}); err != nil {
return fmt.Errorf("sync beacon node not ready: %w", err)
}
syncBeaconNode := components.NewBeaconNode(config, index, bootnodeEnr)
g.Go(func() error {
return syncBeaconNode.Start(ctx)

View File

@@ -34,6 +34,7 @@ type ports struct {
Eth1RPCPort int
Eth1AuthRPCPort int
Eth1WSPort int
Eth1ProxyPort int
PrysmBeaconNodeRPCPort int
PrysmBeaconNodeUDPPort int
PrysmBeaconNodeTCPPort int
@@ -89,6 +90,7 @@ const (
Eth1RPCPort = Eth1Port + portSpan
Eth1WSPort = Eth1Port + 2*portSpan
Eth1AuthRPCPort = Eth1Port + 3*portSpan
Eth1ProxyPort = Eth1Port + 4*portSpan
PrysmBeaconNodeRPCPort = 4150
PrysmBeaconNodeUDPPort = PrysmBeaconNodeRPCPort + portSpan
@@ -244,6 +246,10 @@ func initializeStandardPorts(shardCount, shardIndex int, ports *ports, existingR
if err != nil {
return err
}
eth1ProxyPort, err := port(Eth1ProxyPort, shardCount, shardIndex, existingRegistrations)
if err != nil {
return err
}
beaconNodeRPCPort, err := port(PrysmBeaconNodeRPCPort, shardCount, shardIndex, existingRegistrations)
if err != nil {
return err
@@ -286,6 +292,7 @@ func initializeStandardPorts(shardCount, shardIndex int, ports *ports, existingR
ports.Eth1RPCPort = eth1RPCPort
ports.Eth1AuthRPCPort = eth1AuthPort
ports.Eth1WSPort = eth1WSPort
ports.Eth1ProxyPort = eth1ProxyPort
ports.PrysmBeaconNodeRPCPort = beaconNodeRPCPort
ports.PrysmBeaconNodeUDPPort = beaconNodeUDPPort
ports.PrysmBeaconNodeTCPPort = beaconNodeTCPPort

View File

@@ -30,7 +30,7 @@ func TestStandardPorts(t *testing.T) {
var existingRegistrations []int
testPorts := &ports{}
assert.NoError(t, initializeStandardPorts(2, 0, testPorts, &existingRegistrations))
assert.Equal(t, 15, len(existingRegistrations))
assert.Equal(t, 16, len(existingRegistrations))
assert.NotEqual(t, 0, testPorts.PrysmBeaconNodeGatewayPort)
assert.NotEqual(t, 0, testPorts.PrysmBeaconNodeTCPPort)
assert.NotEqual(t, 0, testPorts.JaegerTracingPort)

View File

@@ -9,6 +9,7 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/testing/middleware/engine-api-proxy",
visibility = ["//visibility:public"],
deps = [
"//network:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],

View File

@@ -2,6 +2,7 @@ package proxy
import (
"net/url"
"os"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -12,6 +13,7 @@ type config struct {
proxyHost string
destinationUrl *url.URL
logger *logrus.Logger
secret string
}
type Option func(p *Proxy) error
@@ -54,3 +56,24 @@ func WithLogger(l *logrus.Logger) Option {
return nil
}
}
// WithLogFile specifies a log file to write
// the proxies output to.
func WithLogFile(f *os.File) Option {
return func(p *Proxy) error {
if p.cfg.logger == nil {
return errors.New("nil logger provided")
}
p.cfg.logger.SetOutput(f)
return nil
}
}
// WithJwtSecret adds in support for jwt authenticated
// connections for our proxy.
func WithJwtSecret(secret string) Option {
return func(p *Proxy) error {
p.cfg.secret = secret
return nil
}
}

View File

@@ -16,6 +16,7 @@ import (
"sync"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/network"
"github.com/sirupsen/logrus"
)
@@ -128,6 +129,7 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func (p *Proxy) AddRequestInterceptor(rpcMethodName string, response interface{}, trigger func() bool) {
p.lock.Lock()
defer p.lock.Unlock()
p.cfg.logger.Infof("Adding in interceptor for method %s", rpcMethodName)
p.interceptors[rpcMethodName] = &interceptorConfig{
response,
trigger,
@@ -168,6 +170,13 @@ func (p *Proxy) interceptIfNeeded(requestBytes []byte, w http.ResponseWriter) (h
// Create a new proxy request to the execution client.
func (p *Proxy) proxyRequest(requestBytes []byte, w http.ResponseWriter, r *http.Request) {
jreq, err := unmarshalRPCObject(requestBytes)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not unmarshal request")
// Continue and mark it as unknown.
jreq = &jsonRPCObject{Method: "unknown"}
}
p.cfg.logger.Infof("Forwarding %s request for method %s to %s", r.Method, jreq.Method, p.cfg.destinationUrl.String())
proxyReq, err := http.NewRequest(r.Method, p.cfg.destinationUrl.String(), r.Body)
if err != nil {
p.cfg.logger.WithError(err).Error("Could create new request")
@@ -183,11 +192,16 @@ func (p *Proxy) proxyRequest(requestBytes []byte, w http.ResponseWriter, r *http
proxyReq.Header.Set("Content-Type", "application/json")
client := &http.Client{}
if p.cfg.secret != "" {
client = network.NewHttpClientWithSecret(p.cfg.secret)
}
proxyRes, err := client.Do(proxyReq)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not forward request to destination server")
return
}
p.cfg.logger.Infof("Received response for %s request with method %s from %s", r.Method, jreq.Method, p.cfg.destinationUrl.String())
defer func() {
if err = proxyRes.Body.Close(); err != nil {
p.cfg.logger.WithError(err).Error("Could not do close proxy response body")