Engine API Proxy Utility for Merge Testing (#10533)

* execution client package renaming

* define interceptors and options

* further clean

* further dedup

* further simplify

* rev

* rem

* more modifications

* further clean

* defines tests

* pass first test

* proper tests

* all tests in

* gaz

* lint

* wait start

* assign on ports

* gaz

Co-authored-by: Nishant Das <nishdas93@gmail.com>
This commit is contained in:
Raul Jordan
2022-04-25 05:03:05 +00:00
committed by GitHub
parent 966de59478
commit 2e056b38da
4 changed files with 637 additions and 0 deletions

View File

@@ -0,0 +1,30 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"options.go",
"proxy.go",
],
importpath = "github.com/prysmaticlabs/prysm/testing/middleware/engine-api-proxy",
visibility = ["//visibility:public"],
deps = [
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["proxy_test.go"],
embed = [":go_default_library"],
deps = [
"//crypto/rand:go_default_library",
"//proto/engine/v1:go_default_library",
"//testing/require:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//rpc:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)

View File

@@ -0,0 +1,56 @@
package proxy
import (
"net/url"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type config struct {
proxyPort int
proxyHost string
destinationUrl *url.URL
logger *logrus.Logger
}
type Option func(p *Proxy) error
// WithHost sets the proxy server host.
func WithHost(host string) Option {
return func(p *Proxy) error {
p.cfg.proxyHost = host
return nil
}
}
// WithPort sets the proxy server port.
func WithPort(port int) Option {
return func(p *Proxy) error {
p.cfg.proxyPort = port
return nil
}
}
// WithDestinationAddress sets the forwarding address requests will be proxied to.
func WithDestinationAddress(addr string) Option {
return func(p *Proxy) error {
if addr == "" {
return errors.New("must provide a destination address for proxy")
}
u, err := url.Parse(addr)
if err != nil {
return errors.Wrapf(err, "could not parse URL for destination address: %s", addr)
}
p.cfg.destinationUrl = u
return nil
}
}
// WithLogger sets a custom logger for the proxy.
func WithLogger(l *logrus.Logger) Option {
return func(p *Proxy) error {
p.cfg.logger = l
return nil
}
}

View File

@@ -0,0 +1,237 @@
// Package proxy provides a proxy middleware for engine API requests between Ethereum
// consensus clients and execution clients accordingly. Allows for customizing
// in-flight requests or responses using custom triggers. Useful for end-to-end testing.
package proxy
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"strings"
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
var (
defaultProxyHost = "127.0.0.1"
defaultProxyPort = 8545
)
type jsonRPCObject struct {
Jsonrpc string `json:"jsonrpc"`
Method string `json:"method"`
Params []interface{} `json:"params"`
ID uint64 `json:"id"`
Result interface{} `json:"result"`
}
type interceptorConfig struct {
response interface{}
trigger func() bool
}
// Proxy server that sits as a middleware between an Ethereum consensus client and an execution client,
// allowing us to modify in-flight requests and responses for testing purposes.
type Proxy struct {
cfg *config
address string
srv *http.Server
lock sync.RWMutex
interceptors map[string]*interceptorConfig
backedUpRequests []*http.Request
}
// New creates a proxy server forwarding requests from a consensus client to an execution client.
func New(opts ...Option) (*Proxy, error) {
p := &Proxy{
cfg: &config{
proxyHost: defaultProxyHost,
proxyPort: defaultProxyPort,
logger: logrus.New(),
},
interceptors: make(map[string]*interceptorConfig),
}
for _, o := range opts {
if err := o(p); err != nil {
return nil, err
}
}
if p.cfg.destinationUrl == nil {
return nil, errors.New("must provide a destination address for request proxying")
}
mux := http.NewServeMux()
mux.Handle("/", p)
addr := fmt.Sprintf("%s:%d", p.cfg.proxyHost, p.cfg.proxyPort)
srv := &http.Server{
Handler: mux,
Addr: addr,
}
p.address = addr
p.srv = srv
return p, nil
}
// Address for the proxy server.
func (p *Proxy) Address() string {
return p.address
}
// Start a proxy server.
func (p *Proxy) Start(ctx context.Context) error {
p.srv.BaseContext = func(listener net.Listener) context.Context {
return ctx
}
p.cfg.logger.WithFields(logrus.Fields{
"forwardingAddress": p.cfg.destinationUrl.String(),
}).Infof("Engine proxy now listening on address %s", p.address)
go func() {
if err := p.srv.ListenAndServe(); err != nil {
p.cfg.logger.Error(err)
}
}()
for {
<-ctx.Done()
return p.srv.Shutdown(context.Background())
}
}
// ServeHTTP requests from a consensus client to an execution client, modifying in-flight requests
// and/or responses as desired. It also processes any backed-up requests.
func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
requestBytes, err := parseRequestBytes(r)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not parse request")
return
}
// Check if we need to intercept the request with a custom response.
hasIntercepted, err := p.interceptIfNeeded(requestBytes, w)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not intercept request")
return
}
if hasIntercepted {
return
}
// If we are not intercepting the request, we proxy as normal.
p.proxyRequest(requestBytes, w, r)
}
// AddRequestInterceptor for a desired json-rpc method by specifying a custom response
// and a function that checks if the interceptor should be triggered.
func (p *Proxy) AddRequestInterceptor(rpcMethodName string, response interface{}, trigger func() bool) {
p.lock.Lock()
defer p.lock.Unlock()
p.interceptors[rpcMethodName] = &interceptorConfig{
response,
trigger,
}
}
// Checks if there is a custom interceptor hook on the request, check if it can be
// triggered, and then write the custom response to the writer.
func (p *Proxy) interceptIfNeeded(requestBytes []byte, w http.ResponseWriter) (hasIntercepted bool, err error) {
if !isEngineAPICall(requestBytes) {
return
}
var jreq *jsonRPCObject
jreq, err = unmarshalRPCObject(requestBytes)
if err != nil {
return
}
p.lock.RLock()
defer p.lock.RUnlock()
interceptor, shouldIntercept := p.interceptors[jreq.Method]
if !shouldIntercept {
return
}
if !interceptor.trigger() {
return
}
jResp := &jsonRPCObject{
Method: jreq.Method,
ID: jreq.ID,
Result: interceptor.response,
}
if err = json.NewEncoder(w).Encode(jResp); err != nil {
return
}
hasIntercepted = true
return
}
// Create a new proxy request to the execution client.
func (p *Proxy) proxyRequest(requestBytes []byte, w http.ResponseWriter, r *http.Request) {
proxyReq, err := http.NewRequest(r.Method, p.cfg.destinationUrl.String(), r.Body)
if err != nil {
p.cfg.logger.WithError(err).Error("Could create new request")
return
}
// Set the modified request as the proxy request body.
proxyReq.Body = ioutil.NopCloser(bytes.NewBuffer(requestBytes))
// Required proxy headers for forwarding JSON-RPC requests to the execution client.
proxyReq.Header.Set("Host", r.Host)
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
proxyReq.Header.Set("Content-Type", "application/json")
client := &http.Client{}
proxyRes, err := client.Do(proxyReq)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not forward request to destination server")
return
}
defer func() {
if err = proxyRes.Body.Close(); err != nil {
p.cfg.logger.WithError(err).Error("Could not do close proxy response body")
}
}()
// Pipe the proxy response to the original caller.
if _, err = io.Copy(w, proxyRes.Body); err != nil {
p.cfg.logger.WithError(err).Error("Could not copy proxy request body")
return
}
}
// Peek into the bytes of an HTTP request's body.
func parseRequestBytes(req *http.Request) ([]byte, error) {
requestBytes, err := ioutil.ReadAll(req.Body)
if err != nil {
return nil, err
}
if err = req.Body.Close(); err != nil {
return nil, err
}
req.Body = ioutil.NopCloser(bytes.NewBuffer(requestBytes))
return requestBytes, nil
}
// Checks whether the JSON-RPC request is for the Ethereum engine API.
func isEngineAPICall(reqBytes []byte) bool {
jsonRequest, err := unmarshalRPCObject(reqBytes)
if err != nil {
switch {
case strings.Contains(err.Error(), "cannot unmarshal array"):
return false
default:
return false
}
}
return strings.Contains(jsonRequest.Method, "engine_")
}
func unmarshalRPCObject(b []byte) (*jsonRPCObject, error) {
r := &jsonRPCObject{}
if err := json.Unmarshal(b, r); err != nil {
return nil, err
}
return r, nil
}

View File

@@ -0,0 +1,314 @@
package proxy
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rpc"
"github.com/prysmaticlabs/prysm/crypto/rand"
pb "github.com/prysmaticlabs/prysm/proto/engine/v1"
"github.com/prysmaticlabs/prysm/testing/require"
"github.com/sirupsen/logrus"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestProxy(t *testing.T) {
t.Run("fails to proxy if destination is down", func(t *testing.T) {
logger := logrus.New()
hook := logTest.NewLocal(logger)
ctx := context.Background()
r := rand.NewGenerator()
proxy, err := New(
WithPort(r.Intn(50000)),
WithDestinationAddress("http://localhost:43239"), // Nothing running at destination server.
WithLogger(logger),
)
require.NoError(t, err)
go func() {
if err := proxy.Start(ctx); err != nil {
t.Log(err)
}
}()
time.Sleep(time.Millisecond * 100)
rpcClient, err := rpc.DialHTTP("http://" + proxy.Address())
require.NoError(t, err)
err = rpcClient.CallContext(ctx, nil, "someEngineMethod")
require.ErrorContains(t, "EOF", err)
// Expect issues when reaching destination server.
require.LogsContain(t, hook, "Could not forward request to destination server")
})
t.Run("properly proxies request/response", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wantDestinationResponse := &pb.ForkchoiceState{
HeadBlockHash: []byte("foo"),
SafeBlockHash: []byte("bar"),
FinalizedBlockHash: []byte("baz"),
}
srv := destinationServerSetup(t, wantDestinationResponse)
defer srv.Close()
// Destination address server responds to JSON-RPC requests.
r := rand.NewGenerator()
proxy, err := New(
WithPort(r.Intn(50000)),
WithDestinationAddress(srv.URL),
)
require.NoError(t, err)
go func() {
if err := proxy.Start(ctx); err != nil {
t.Log(err)
}
}()
time.Sleep(time.Millisecond * 100)
// Dials the proxy.
rpcClient, err := rpc.DialHTTP("http://" + proxy.Address())
require.NoError(t, err)
// Expect the result from the proxy is the same as that one returned
// by the destination address.
proxyResult := &pb.ForkchoiceState{}
err = rpcClient.CallContext(ctx, proxyResult, "someEngineMethod")
require.NoError(t, err)
require.DeepEqual(t, wantDestinationResponse, proxyResult)
})
}
func TestProxy_CustomInterceptors(t *testing.T) {
t.Run("only intercepts engine API methods", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
type syncingResponse struct {
Syncing bool `json:"syncing"`
}
wantDestinationResponse := &syncingResponse{Syncing: true}
srv := destinationServerSetup(t, wantDestinationResponse)
defer srv.Close()
// Destination address server responds to JSON-RPC requests.
r := rand.NewGenerator()
proxy, err := New(
WithPort(r.Intn(50000)),
WithDestinationAddress(srv.URL),
)
require.NoError(t, err)
go func() {
if err := proxy.Start(ctx); err != nil {
t.Log(err)
}
}()
time.Sleep(time.Millisecond * 100)
method := "eth_syncing"
// RPC method to intercept.
proxy.AddRequestInterceptor(
method,
&syncingResponse{Syncing: false}, // Custom response.
func() bool {
return true // Always intercept with a custom response.
},
)
// Dials the proxy.
rpcClient, err := rpc.DialHTTP("http://" + proxy.Address())
require.NoError(t, err)
// Expect the result from the proxy is the same as that one returned
// by the destination address.
proxyResult := &syncingResponse{}
err = rpcClient.CallContext(ctx, proxyResult, method)
require.NoError(t, err)
// The interception SHOULD NOT work because we should only intercept `engine` namespace methods.
require.DeepEqual(t, wantDestinationResponse, proxyResult)
})
t.Run("only intercepts if trigger function returns true", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
type engineResponse struct {
BlockHash common.Hash `json:"blockHash"`
}
destinationResponse := &engineResponse{BlockHash: common.BytesToHash([]byte("foo"))}
srv := destinationServerSetup(t, destinationResponse)
defer srv.Close()
// Destination address server responds to JSON-RPC requests.
r := rand.NewGenerator()
proxy, err := New(
WithPort(r.Intn(50000)),
WithDestinationAddress(srv.URL),
)
require.NoError(t, err)
go func() {
if err := proxy.Start(ctx); err != nil {
t.Log(err)
}
}()
time.Sleep(time.Millisecond * 100)
method := "engine_newPayloadV1"
// RPC method to intercept.
wantInterceptedResponse := &engineResponse{BlockHash: common.BytesToHash([]byte("bar"))}
conditional := false
proxy.AddRequestInterceptor(
method,
wantInterceptedResponse,
func() bool {
return conditional // Conditional trigger.
},
)
// Dials the proxy.
rpcClient, err := rpc.DialHTTP("http://" + proxy.Address())
require.NoError(t, err)
proxyResult := &engineResponse{}
err = rpcClient.CallContext(ctx, proxyResult, method)
require.NoError(t, err)
// The interception SHOULD NOT work because we should only intercept if trigger conditional is true.
require.DeepEqual(t, destinationResponse, proxyResult)
conditional = true
proxy.AddRequestInterceptor(
method,
wantInterceptedResponse,
func() bool {
return conditional // Conditional trigger.
},
)
// The interception should work and we should not be getting the destination
// response but rather a custom response from the interceptor config now that the trigger is true.
proxyResult = &engineResponse{}
err = rpcClient.CallContext(ctx, proxyResult, method)
require.NoError(t, err)
require.DeepEqual(t, wantInterceptedResponse, proxyResult)
})
t.Run("triggers interceptor response correctly", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
type engineResponse struct {
BlockHash common.Hash `json:"blockHash"`
}
destinationResponse := &engineResponse{BlockHash: common.BytesToHash([]byte("foo"))}
srv := destinationServerSetup(t, destinationResponse)
defer srv.Close()
// Destination address server responds to JSON-RPC requests.
r := rand.NewGenerator()
proxy, err := New(
WithPort(r.Intn(50000)),
WithDestinationAddress(srv.URL),
)
require.NoError(t, err)
go func() {
if err := proxy.Start(ctx); err != nil {
t.Log(err)
}
}()
time.Sleep(time.Millisecond * 100)
method := "engine_newPayloadV1"
// RPC method to intercept.
wantInterceptedResponse := &engineResponse{BlockHash: common.BytesToHash([]byte("bar"))}
proxy.AddRequestInterceptor(
method,
wantInterceptedResponse,
func() bool {
return true // Always intercept with a custom response.
},
)
// Dials the proxy.
rpcClient, err := rpc.DialHTTP("http://" + proxy.Address())
require.NoError(t, err)
proxyResult := &engineResponse{}
err = rpcClient.CallContext(ctx, proxyResult, method)
require.NoError(t, err)
// The interception should work and we should not be getting the destination
// response but rather a custom response from the interceptor config.
require.DeepEqual(t, wantInterceptedResponse, proxyResult)
})
}
func Test_isEngineAPICall(t *testing.T) {
type args struct {
reqBytes []byte
}
tests := []struct {
name string
args *jsonRPCObject
want bool
}{
{
name: "nil data",
args: nil,
want: false,
},
{
name: "engine method",
args: &jsonRPCObject{
Method: "engine_newPayloadV1",
ID: 1,
Result: 5,
},
want: true,
},
{
name: "non-engine method",
args: &jsonRPCObject{
Method: "eth_syncing",
ID: 1,
Result: false,
},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
enc, err := json.Marshal(tt.args)
require.NoError(t, err)
if got := isEngineAPICall(enc); got != tt.want {
t.Errorf("isEngineAPICall() = %v, want %v", got, tt.want)
}
})
}
}
func destinationServerSetup(t *testing.T, response interface{}) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
defer func() {
require.NoError(t, r.Body.Close())
}()
resp := map[string]interface{}{
"jsonrpc": "2.0",
"id": 1,
"result": response,
}
err := json.NewEncoder(w).Encode(resp)
require.NoError(t, err)
}))
}