From 2e056b38dabb3b00b68e960bb69e4ab496d90464 Mon Sep 17 00:00:00 2001 From: Raul Jordan Date: Mon, 25 Apr 2022 05:03:05 +0000 Subject: [PATCH] Engine API Proxy Utility for Merge Testing (#10533) * execution client package renaming * define interceptors and options * further clean * further dedup * further simplify * rev * rem * more modifications * further clean * defines tests * pass first test * proper tests * all tests in * gaz * lint * wait start * assign on ports * gaz Co-authored-by: Nishant Das --- .../middleware/engine-api-proxy/BUILD.bazel | 30 ++ .../middleware/engine-api-proxy/options.go | 56 ++++ testing/middleware/engine-api-proxy/proxy.go | 237 +++++++++++++ .../middleware/engine-api-proxy/proxy_test.go | 314 ++++++++++++++++++ 4 files changed, 637 insertions(+) create mode 100644 testing/middleware/engine-api-proxy/BUILD.bazel create mode 100644 testing/middleware/engine-api-proxy/options.go create mode 100644 testing/middleware/engine-api-proxy/proxy.go create mode 100644 testing/middleware/engine-api-proxy/proxy_test.go diff --git a/testing/middleware/engine-api-proxy/BUILD.bazel b/testing/middleware/engine-api-proxy/BUILD.bazel new file mode 100644 index 0000000000..4021d20e10 --- /dev/null +++ b/testing/middleware/engine-api-proxy/BUILD.bazel @@ -0,0 +1,30 @@ +load("@prysm//tools/go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "options.go", + "proxy.go", + ], + importpath = "github.com/prysmaticlabs/prysm/testing/middleware/engine-api-proxy", + visibility = ["//visibility:public"], + deps = [ + "@com_github_pkg_errors//:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["proxy_test.go"], + embed = [":go_default_library"], + deps = [ + "//crypto/rand:go_default_library", + "//proto/engine/v1:go_default_library", + "//testing/require:go_default_library", + "@com_github_ethereum_go_ethereum//common:go_default_library", + "@com_github_ethereum_go_ethereum//rpc:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + "@com_github_sirupsen_logrus//hooks/test:go_default_library", + ], +) diff --git a/testing/middleware/engine-api-proxy/options.go b/testing/middleware/engine-api-proxy/options.go new file mode 100644 index 0000000000..29c901e851 --- /dev/null +++ b/testing/middleware/engine-api-proxy/options.go @@ -0,0 +1,56 @@ +package proxy + +import ( + "net/url" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +type config struct { + proxyPort int + proxyHost string + destinationUrl *url.URL + logger *logrus.Logger +} + +type Option func(p *Proxy) error + +// WithHost sets the proxy server host. +func WithHost(host string) Option { + return func(p *Proxy) error { + p.cfg.proxyHost = host + return nil + } +} + +// WithPort sets the proxy server port. +func WithPort(port int) Option { + return func(p *Proxy) error { + p.cfg.proxyPort = port + return nil + } +} + +// WithDestinationAddress sets the forwarding address requests will be proxied to. +func WithDestinationAddress(addr string) Option { + return func(p *Proxy) error { + if addr == "" { + return errors.New("must provide a destination address for proxy") + } + u, err := url.Parse(addr) + if err != nil { + return errors.Wrapf(err, "could not parse URL for destination address: %s", addr) + } + p.cfg.destinationUrl = u + return nil + } +} + +// WithLogger sets a custom logger for the proxy. +func WithLogger(l *logrus.Logger) Option { + return func(p *Proxy) error { + p.cfg.logger = l + return nil + } +} diff --git a/testing/middleware/engine-api-proxy/proxy.go b/testing/middleware/engine-api-proxy/proxy.go new file mode 100644 index 0000000000..fb98f45948 --- /dev/null +++ b/testing/middleware/engine-api-proxy/proxy.go @@ -0,0 +1,237 @@ +// Package proxy provides a proxy middleware for engine API requests between Ethereum +// consensus clients and execution clients accordingly. Allows for customizing +// in-flight requests or responses using custom triggers. Useful for end-to-end testing. +package proxy + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "net" + "net/http" + "strings" + "sync" + + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +var ( + defaultProxyHost = "127.0.0.1" + defaultProxyPort = 8545 +) + +type jsonRPCObject struct { + Jsonrpc string `json:"jsonrpc"` + Method string `json:"method"` + Params []interface{} `json:"params"` + ID uint64 `json:"id"` + Result interface{} `json:"result"` +} + +type interceptorConfig struct { + response interface{} + trigger func() bool +} + +// Proxy server that sits as a middleware between an Ethereum consensus client and an execution client, +// allowing us to modify in-flight requests and responses for testing purposes. +type Proxy struct { + cfg *config + address string + srv *http.Server + lock sync.RWMutex + interceptors map[string]*interceptorConfig + backedUpRequests []*http.Request +} + +// New creates a proxy server forwarding requests from a consensus client to an execution client. +func New(opts ...Option) (*Proxy, error) { + p := &Proxy{ + cfg: &config{ + proxyHost: defaultProxyHost, + proxyPort: defaultProxyPort, + logger: logrus.New(), + }, + interceptors: make(map[string]*interceptorConfig), + } + for _, o := range opts { + if err := o(p); err != nil { + return nil, err + } + } + if p.cfg.destinationUrl == nil { + return nil, errors.New("must provide a destination address for request proxying") + } + mux := http.NewServeMux() + mux.Handle("/", p) + addr := fmt.Sprintf("%s:%d", p.cfg.proxyHost, p.cfg.proxyPort) + srv := &http.Server{ + Handler: mux, + Addr: addr, + } + p.address = addr + p.srv = srv + return p, nil +} + +// Address for the proxy server. +func (p *Proxy) Address() string { + return p.address +} + +// Start a proxy server. +func (p *Proxy) Start(ctx context.Context) error { + p.srv.BaseContext = func(listener net.Listener) context.Context { + return ctx + } + p.cfg.logger.WithFields(logrus.Fields{ + "forwardingAddress": p.cfg.destinationUrl.String(), + }).Infof("Engine proxy now listening on address %s", p.address) + go func() { + if err := p.srv.ListenAndServe(); err != nil { + p.cfg.logger.Error(err) + } + }() + for { + <-ctx.Done() + return p.srv.Shutdown(context.Background()) + } +} + +// ServeHTTP requests from a consensus client to an execution client, modifying in-flight requests +// and/or responses as desired. It also processes any backed-up requests. +func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { + requestBytes, err := parseRequestBytes(r) + if err != nil { + p.cfg.logger.WithError(err).Error("Could not parse request") + return + } + // Check if we need to intercept the request with a custom response. + hasIntercepted, err := p.interceptIfNeeded(requestBytes, w) + if err != nil { + p.cfg.logger.WithError(err).Error("Could not intercept request") + return + } + if hasIntercepted { + return + } + // If we are not intercepting the request, we proxy as normal. + p.proxyRequest(requestBytes, w, r) +} + +// AddRequestInterceptor for a desired json-rpc method by specifying a custom response +// and a function that checks if the interceptor should be triggered. +func (p *Proxy) AddRequestInterceptor(rpcMethodName string, response interface{}, trigger func() bool) { + p.lock.Lock() + defer p.lock.Unlock() + p.interceptors[rpcMethodName] = &interceptorConfig{ + response, + trigger, + } +} + +// Checks if there is a custom interceptor hook on the request, check if it can be +// triggered, and then write the custom response to the writer. +func (p *Proxy) interceptIfNeeded(requestBytes []byte, w http.ResponseWriter) (hasIntercepted bool, err error) { + if !isEngineAPICall(requestBytes) { + return + } + var jreq *jsonRPCObject + jreq, err = unmarshalRPCObject(requestBytes) + if err != nil { + return + } + p.lock.RLock() + defer p.lock.RUnlock() + interceptor, shouldIntercept := p.interceptors[jreq.Method] + if !shouldIntercept { + return + } + if !interceptor.trigger() { + return + } + jResp := &jsonRPCObject{ + Method: jreq.Method, + ID: jreq.ID, + Result: interceptor.response, + } + if err = json.NewEncoder(w).Encode(jResp); err != nil { + return + } + hasIntercepted = true + return +} + +// Create a new proxy request to the execution client. +func (p *Proxy) proxyRequest(requestBytes []byte, w http.ResponseWriter, r *http.Request) { + proxyReq, err := http.NewRequest(r.Method, p.cfg.destinationUrl.String(), r.Body) + if err != nil { + p.cfg.logger.WithError(err).Error("Could create new request") + return + } + + // Set the modified request as the proxy request body. + proxyReq.Body = ioutil.NopCloser(bytes.NewBuffer(requestBytes)) + + // Required proxy headers for forwarding JSON-RPC requests to the execution client. + proxyReq.Header.Set("Host", r.Host) + proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) + proxyReq.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + proxyRes, err := client.Do(proxyReq) + if err != nil { + p.cfg.logger.WithError(err).Error("Could not forward request to destination server") + return + } + defer func() { + if err = proxyRes.Body.Close(); err != nil { + p.cfg.logger.WithError(err).Error("Could not do close proxy response body") + } + }() + + // Pipe the proxy response to the original caller. + if _, err = io.Copy(w, proxyRes.Body); err != nil { + p.cfg.logger.WithError(err).Error("Could not copy proxy request body") + return + } +} + +// Peek into the bytes of an HTTP request's body. +func parseRequestBytes(req *http.Request) ([]byte, error) { + requestBytes, err := ioutil.ReadAll(req.Body) + if err != nil { + return nil, err + } + if err = req.Body.Close(); err != nil { + return nil, err + } + req.Body = ioutil.NopCloser(bytes.NewBuffer(requestBytes)) + return requestBytes, nil +} + +// Checks whether the JSON-RPC request is for the Ethereum engine API. +func isEngineAPICall(reqBytes []byte) bool { + jsonRequest, err := unmarshalRPCObject(reqBytes) + if err != nil { + switch { + case strings.Contains(err.Error(), "cannot unmarshal array"): + return false + default: + return false + } + } + return strings.Contains(jsonRequest.Method, "engine_") +} + +func unmarshalRPCObject(b []byte) (*jsonRPCObject, error) { + r := &jsonRPCObject{} + if err := json.Unmarshal(b, r); err != nil { + return nil, err + } + return r, nil +} diff --git a/testing/middleware/engine-api-proxy/proxy_test.go b/testing/middleware/engine-api-proxy/proxy_test.go new file mode 100644 index 0000000000..90a2051f66 --- /dev/null +++ b/testing/middleware/engine-api-proxy/proxy_test.go @@ -0,0 +1,314 @@ +package proxy + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/rpc" + "github.com/prysmaticlabs/prysm/crypto/rand" + pb "github.com/prysmaticlabs/prysm/proto/engine/v1" + "github.com/prysmaticlabs/prysm/testing/require" + "github.com/sirupsen/logrus" + logTest "github.com/sirupsen/logrus/hooks/test" +) + +func TestProxy(t *testing.T) { + t.Run("fails to proxy if destination is down", func(t *testing.T) { + logger := logrus.New() + hook := logTest.NewLocal(logger) + ctx := context.Background() + r := rand.NewGenerator() + proxy, err := New( + WithPort(r.Intn(50000)), + WithDestinationAddress("http://localhost:43239"), // Nothing running at destination server. + WithLogger(logger), + ) + require.NoError(t, err) + go func() { + if err := proxy.Start(ctx); err != nil { + t.Log(err) + } + }() + time.Sleep(time.Millisecond * 100) + + rpcClient, err := rpc.DialHTTP("http://" + proxy.Address()) + require.NoError(t, err) + + err = rpcClient.CallContext(ctx, nil, "someEngineMethod") + require.ErrorContains(t, "EOF", err) + + // Expect issues when reaching destination server. + require.LogsContain(t, hook, "Could not forward request to destination server") + }) + t.Run("properly proxies request/response", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + wantDestinationResponse := &pb.ForkchoiceState{ + HeadBlockHash: []byte("foo"), + SafeBlockHash: []byte("bar"), + FinalizedBlockHash: []byte("baz"), + } + srv := destinationServerSetup(t, wantDestinationResponse) + defer srv.Close() + + // Destination address server responds to JSON-RPC requests. + r := rand.NewGenerator() + proxy, err := New( + WithPort(r.Intn(50000)), + WithDestinationAddress(srv.URL), + ) + require.NoError(t, err) + go func() { + if err := proxy.Start(ctx); err != nil { + t.Log(err) + } + }() + time.Sleep(time.Millisecond * 100) + + // Dials the proxy. + rpcClient, err := rpc.DialHTTP("http://" + proxy.Address()) + require.NoError(t, err) + + // Expect the result from the proxy is the same as that one returned + // by the destination address. + proxyResult := &pb.ForkchoiceState{} + err = rpcClient.CallContext(ctx, proxyResult, "someEngineMethod") + require.NoError(t, err) + require.DeepEqual(t, wantDestinationResponse, proxyResult) + }) +} + +func TestProxy_CustomInterceptors(t *testing.T) { + t.Run("only intercepts engine API methods", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + type syncingResponse struct { + Syncing bool `json:"syncing"` + } + + wantDestinationResponse := &syncingResponse{Syncing: true} + srv := destinationServerSetup(t, wantDestinationResponse) + defer srv.Close() + + // Destination address server responds to JSON-RPC requests. + r := rand.NewGenerator() + proxy, err := New( + WithPort(r.Intn(50000)), + WithDestinationAddress(srv.URL), + ) + require.NoError(t, err) + go func() { + if err := proxy.Start(ctx); err != nil { + t.Log(err) + } + }() + time.Sleep(time.Millisecond * 100) + + method := "eth_syncing" + + // RPC method to intercept. + proxy.AddRequestInterceptor( + method, + &syncingResponse{Syncing: false}, // Custom response. + func() bool { + return true // Always intercept with a custom response. + }, + ) + + // Dials the proxy. + rpcClient, err := rpc.DialHTTP("http://" + proxy.Address()) + require.NoError(t, err) + + // Expect the result from the proxy is the same as that one returned + // by the destination address. + proxyResult := &syncingResponse{} + err = rpcClient.CallContext(ctx, proxyResult, method) + require.NoError(t, err) + + // The interception SHOULD NOT work because we should only intercept `engine` namespace methods. + require.DeepEqual(t, wantDestinationResponse, proxyResult) + }) + t.Run("only intercepts if trigger function returns true", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + type engineResponse struct { + BlockHash common.Hash `json:"blockHash"` + } + + destinationResponse := &engineResponse{BlockHash: common.BytesToHash([]byte("foo"))} + srv := destinationServerSetup(t, destinationResponse) + defer srv.Close() + + // Destination address server responds to JSON-RPC requests. + r := rand.NewGenerator() + proxy, err := New( + WithPort(r.Intn(50000)), + WithDestinationAddress(srv.URL), + ) + require.NoError(t, err) + go func() { + if err := proxy.Start(ctx); err != nil { + t.Log(err) + } + }() + time.Sleep(time.Millisecond * 100) + + method := "engine_newPayloadV1" + + // RPC method to intercept. + wantInterceptedResponse := &engineResponse{BlockHash: common.BytesToHash([]byte("bar"))} + conditional := false + proxy.AddRequestInterceptor( + method, + wantInterceptedResponse, + func() bool { + return conditional // Conditional trigger. + }, + ) + + // Dials the proxy. + rpcClient, err := rpc.DialHTTP("http://" + proxy.Address()) + require.NoError(t, err) + + proxyResult := &engineResponse{} + err = rpcClient.CallContext(ctx, proxyResult, method) + require.NoError(t, err) + + // The interception SHOULD NOT work because we should only intercept if trigger conditional is true. + require.DeepEqual(t, destinationResponse, proxyResult) + + conditional = true + proxy.AddRequestInterceptor( + method, + wantInterceptedResponse, + func() bool { + return conditional // Conditional trigger. + }, + ) + + // The interception should work and we should not be getting the destination + // response but rather a custom response from the interceptor config now that the trigger is true. + proxyResult = &engineResponse{} + err = rpcClient.CallContext(ctx, proxyResult, method) + require.NoError(t, err) + require.DeepEqual(t, wantInterceptedResponse, proxyResult) + }) + t.Run("triggers interceptor response correctly", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + type engineResponse struct { + BlockHash common.Hash `json:"blockHash"` + } + + destinationResponse := &engineResponse{BlockHash: common.BytesToHash([]byte("foo"))} + srv := destinationServerSetup(t, destinationResponse) + defer srv.Close() + + // Destination address server responds to JSON-RPC requests. + r := rand.NewGenerator() + proxy, err := New( + WithPort(r.Intn(50000)), + WithDestinationAddress(srv.URL), + ) + require.NoError(t, err) + go func() { + if err := proxy.Start(ctx); err != nil { + t.Log(err) + } + }() + time.Sleep(time.Millisecond * 100) + + method := "engine_newPayloadV1" + + // RPC method to intercept. + wantInterceptedResponse := &engineResponse{BlockHash: common.BytesToHash([]byte("bar"))} + proxy.AddRequestInterceptor( + method, + wantInterceptedResponse, + func() bool { + return true // Always intercept with a custom response. + }, + ) + + // Dials the proxy. + rpcClient, err := rpc.DialHTTP("http://" + proxy.Address()) + require.NoError(t, err) + + proxyResult := &engineResponse{} + err = rpcClient.CallContext(ctx, proxyResult, method) + require.NoError(t, err) + + // The interception should work and we should not be getting the destination + // response but rather a custom response from the interceptor config. + require.DeepEqual(t, wantInterceptedResponse, proxyResult) + }) +} + +func Test_isEngineAPICall(t *testing.T) { + type args struct { + reqBytes []byte + } + tests := []struct { + name string + args *jsonRPCObject + want bool + }{ + { + name: "nil data", + args: nil, + want: false, + }, + { + name: "engine method", + args: &jsonRPCObject{ + Method: "engine_newPayloadV1", + ID: 1, + Result: 5, + }, + want: true, + }, + { + name: "non-engine method", + args: &jsonRPCObject{ + Method: "eth_syncing", + ID: 1, + Result: false, + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + enc, err := json.Marshal(tt.args) + require.NoError(t, err) + if got := isEngineAPICall(enc); got != tt.want { + t.Errorf("isEngineAPICall() = %v, want %v", got, tt.want) + } + }) + } +} + +func destinationServerSetup(t *testing.T, response interface{}) *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + defer func() { + require.NoError(t, r.Body.Close()) + }() + resp := map[string]interface{}{ + "jsonrpc": "2.0", + "id": 1, + "result": response, + } + err := json.NewEncoder(w).Encode(resp) + require.NoError(t, err) + })) +}