Compare commits

...

3 Commits

Author SHA1 Message Date
nisdas
54ae2b32b2 Merge branch 'develop' of https://github.com/prysmaticlabs/geth-sharding into e2eProxy 2022-04-12 18:04:39 +08:00
nisdas
b84c1aa3ea hack it in 2022-04-10 16:31:15 +08:00
nisdas
621e149dce add all this stuff 2022-04-09 21:09:06 +08:00
7 changed files with 546 additions and 3 deletions

View File

@@ -48,6 +48,7 @@ go_test(
"//testing/require:go_default_library",
"//testing/slasher/simulator:go_default_library",
"//testing/util:go_default_library",
"//time/slots:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_prysmaticlabs_eth2_types//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",

View File

@@ -9,6 +9,7 @@ go_library(
"lighthouse_beacon.go",
"lighthouse_validator.go",
"log.go",
"proxy.go",
"tracing_sink.go",
"validator.go",
"web3remotesigner.go",
@@ -29,6 +30,7 @@ go_library(
"//crypto/bls:go_default_library",
"//encoding/bytesutil:go_default_library",
"//io/file:go_default_library",
"//proto/engine/v1:go_default_library",
"//runtime/interop:go_default_library",
"//testing/endtoend/components/eth1:go_default_library",
"//testing/endtoend/helpers:go_default_library",

View File

@@ -117,8 +117,10 @@ func (node *BeaconNode) Start(ctx context.Context) error {
expectedNumOfPeers += 1
}
jwtPath := path.Join(e2e.TestParams.TestPath, "eth1data/"+strconv.Itoa(node.index)+"/")
eth1Port := e2e.TestParams.Ports.Eth1AuthRPCPort
if index == 0 {
jwtPath = path.Join(e2e.TestParams.TestPath, "eth1data/miner/")
eth1Port = e2e.TestParams.Ports.Eth1ProxyPort
}
jwtPath = path.Join(jwtPath, "geth/jwtsecret")
args := []string{
@@ -126,7 +128,7 @@ func (node *BeaconNode) Start(ctx context.Context) error {
fmt.Sprintf("--%s=%s", cmdshared.LogFileName.Name, stdOutFile.Name()),
fmt.Sprintf("--%s=%s", flags.DepositContractFlag.Name, e2e.TestParams.ContractAddress.Hex()),
fmt.Sprintf("--%s=%d", flags.RPCPort.Name, e2e.TestParams.Ports.PrysmBeaconNodeRPCPort+index),
fmt.Sprintf("--%s=http://127.0.0.1:%d", flags.HTTPWeb3ProviderFlag.Name, e2e.TestParams.Ports.Eth1AuthRPCPort+index),
fmt.Sprintf("--%s=http://127.0.0.1:%d", flags.HTTPWeb3ProviderFlag.Name, eth1Port+index),
fmt.Sprintf("--%s=%s", flags.ExecutionJWTSecretFlag.Name, jwtPath),
fmt.Sprintf("--%s=%d", flags.MinSyncPeers.Name, 1),
fmt.Sprintf("--%s=%d", cmdshared.P2PUDPPort.Name, e2e.TestParams.Ports.PrysmBeaconNodeUDPPort+index),

View File

@@ -0,0 +1,511 @@
package components
// Package main provides a proxy middleware for engine API requests between Ethereum
// consensus clients and execution clients accordingly. Allows for configuration of various
// test cases using yaml files as detailed in the README.md of the document.
import (
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"flag"
"io"
"io/ioutil"
"math/big"
"net"
"net/http"
"strconv"
"strings"
"github.com/ethereum/go-ethereum/common/hexutil"
pb "github.com/prysmaticlabs/prysm/proto/engine/v1"
"github.com/prysmaticlabs/prysm/testing/endtoend/helpers"
e2e "github.com/prysmaticlabs/prysm/testing/endtoend/params"
"github.com/sirupsen/logrus"
)
var (
spoofingConfigFile = flag.String("spoofing-config", "tools/engine-proxy/spoofing_config.yaml", "")
executionEndpoint = flag.String("execution-endpoint", "http://localhost:8545", "")
fuzz = flag.Bool(
"fuzz",
false,
"fuzzes requests and responses, overrides -spoofing-config if -fuzz is set",
)
)
type spoofingConfig struct {
Requests []*spoof `yaml:"requests"`
Responses []*spoof `yaml:"responses"`
}
type spoof struct {
Method string
Fields map[string]interface{}
}
type jsonRPCObject struct {
Jsonrpc string `json:"jsonrpc"`
Method string `json:"method"`
Params []interface{} `json:"params"`
ID uint64 `json:"id"`
Result interface{} `json:"result"`
}
type forkchoiceUpdatedResponse struct {
Status *pb.PayloadStatus `json:"payloadStatus"`
PayloadId *pb.PayloadIDBytes `json:"payloadId"`
}
type InterceptorFunc func(reqBytes []byte, w http.ResponseWriter, r *http.Request) bool
type ProxyNode struct {
address string
srv *http.Server
destAddress string
logger *logrus.Logger
logEntry *logrus.Entry
interceptor InterceptorFunc
backedUpRequests []*http.Request
}
func NewProxyNode(proxyPort int, destAddress string) *ProxyNode {
pn := &ProxyNode{}
mux := http.NewServeMux()
mux.HandleFunc("/", pn.proxyHandler())
addr := "127.0.0.1:" + strconv.Itoa(proxyPort)
srv := &http.Server{
Handler: mux,
Addr: addr,
}
pn.address = addr
pn.srv = srv
pn.destAddress = destAddress
return pn
}
func (pn *ProxyNode) Start(ctx context.Context) error {
stdOutFile, err := helpers.DeleteAndCreateFile(e2e.TestParams.LogPath, "eth1-proxy.log")
if err != nil {
return err
}
pn.logger = logrus.New()
pn.logger.SetOutput(stdOutFile)
pn.logEntry = pn.logger.WithField("prefix", "engine-proxy")
pn.logEntry.Infof("Engine proxy now listening on address %s", pn.address)
pn.srv.BaseContext = func(listener net.Listener) context.Context {
return ctx
}
go func() {
if err := pn.srv.ListenAndServe(); err != nil {
pn.logEntry.Error(err)
}
}()
for {
select {
case <-ctx.Done():
return pn.srv.Shutdown(context.Background())
}
}
}
func (pn *ProxyNode) AddInterceptor(icptr InterceptorFunc) {
pn.interceptor = icptr
}
func (pn *ProxyNode) SyncingInterceptor() InterceptorFunc {
return func(reqBytes []byte, w http.ResponseWriter, r *http.Request) bool {
if !pn.checkIfValid(reqBytes) {
return false
}
pn.returnSyncingResponse(reqBytes, w, r)
return true
}
}
// Proxies requests from a consensus client to an execution client, spoofing requests
// and/or responses as desired. Acts as a middleware useful for testing different merge scenarios.
func (pn *ProxyNode) proxyHandler() func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
requestBytes, err := pn.parseRequestBytes(r)
if err != nil {
pn.logEntry.WithError(err).Error("Could not parse request")
return
}
if pn.interceptor != nil && pn.interceptor(requestBytes, w, r) {
return
}
for _, rq := range pn.backedUpRequests {
requestB, err := pn.parseRequestBytes(rq)
if err != nil {
pn.logEntry.WithError(err).Error("Could not parse request")
return
}
destAddr := "http://" + pn.destAddress
// Create a new proxy request to the execution client.
url := rq.URL
url.Host = destAddr
proxyReq, err := http.NewRequest(rq.Method, destAddr, rq.Body)
if err != nil {
pn.logEntry.WithError(err).Error("Could create new request")
return
}
// Set the modified request as the proxy request body.
proxyReq.Body = ioutil.NopCloser(bytes.NewBuffer(requestB))
// Required proxy headers for forwarding JSON-RPC requests to the execution client.
proxyReq.Header.Set("Host", rq.Host)
proxyReq.Header.Set("X-Forwarded-For", rq.RemoteAddr)
proxyReq.Header.Set("Content-Type", "application/json")
client := &http.Client{}
proxyRes, err := client.Do(proxyReq)
if err != nil {
pn.logEntry.WithError(err).Error("Could not do client proxy")
return
}
buf := bytes.NewBuffer([]byte{})
// Pipe the proxy response to the original caller.
if _, err = io.Copy(buf, proxyRes.Body); err != nil {
pn.logEntry.WithError(err).Error("Could not copy proxy request body")
return
}
if err = proxyRes.Body.Close(); err != nil {
pn.logEntry.WithError(err).Error("Could not do client proxy")
return
}
pn.logEntry.Infof("Queued Request Response: %s", buf.String())
}
pn.backedUpRequests = []*http.Request{}
/*
var modifiedReq []byte
if *fuzz {
modifiedReq, err = fuzzRequest(requestBytes)
} else {
// We optionally spoof the request as desired.
modifiedReq, err = spoofRequest(config, requestBytes)
}
if err != nil {
log.WithError(err).Error("Failed to spoof request")
return
}
*/
destAddr := "http://" + pn.destAddress
// Create a new proxy request to the execution client.
url := r.URL
url.Host = destAddr
proxyReq, err := http.NewRequest(r.Method, destAddr, r.Body)
if err != nil {
pn.logEntry.WithError(err).Error("Could create new request")
return
}
// Set the modified request as the proxy request body.
proxyReq.Body = ioutil.NopCloser(bytes.NewBuffer(requestBytes))
// Required proxy headers for forwarding JSON-RPC requests to the execution client.
proxyReq.Header.Set("Host", r.Host)
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
proxyReq.Header.Set("Content-Type", "application/json")
client := &http.Client{}
proxyRes, err := client.Do(proxyReq)
if err != nil {
pn.logEntry.WithError(err).Error("Could not do client proxy")
return
}
/*
// We optionally spoof the response as desired.
var modifiedResp []byte
if *fuzz {
modifiedResp, err = fuzzResponse(proxyRes.Body)
} else {
modifiedResp, err = spoofResponse(config, requestBytes, proxyRes.Body)
}
if err != nil {
log.WithError(err).Error("Failed to spoof response")
return
}
*/
// Pipe the proxy response to the original caller.
if _, err = io.Copy(w, proxyRes.Body); err != nil {
pn.logEntry.WithError(err).Error("Could not copy proxy request body")
return
}
if err = proxyRes.Body.Close(); err != nil {
pn.logEntry.WithError(err).Error("Could not do client proxy")
return
}
}
}
func (pn *ProxyNode) parseRequestBytes(req *http.Request) ([]byte, error) {
requestBytes, err := ioutil.ReadAll(req.Body)
if err != nil {
return nil, err
}
if err = req.Body.Close(); err != nil {
return nil, err
}
pn.logEntry.Infof("%s", string(requestBytes))
req.Body = ioutil.NopCloser(bytes.NewBuffer(requestBytes))
return requestBytes, nil
}
func fuzzRequest(requestBytes []byte) ([]byte, error) {
// If the JSON request is not a JSON-RPC object, return the request as-is.
jsonRequest, err := unmarshalRPCObject(requestBytes)
if err != nil {
switch {
case strings.Contains(err.Error(), "cannot unmarshal array"):
return requestBytes, nil
default:
return nil, err
}
}
if len(jsonRequest.Params) == 0 {
return requestBytes, nil
}
params := make(map[string]interface{})
if err := extractObjectFromJSONRPC(jsonRequest.Params[0], &params); err != nil {
return requestBytes, nil
}
// 10% chance to fuzz a field.
fuzzProbability := uint64(5) // TODO: Do not hardcode.
for k, v := range params {
// Each field has a probability of of being fuzzed.
r, err := rand.Int(rand.Reader, big.NewInt(100))
if err != nil {
return nil, err
}
switch obj := v.(type) {
case string:
if strings.Contains(obj, "0x") && r.Uint64() <= fuzzProbability {
// Ignore hex strings of odd length.
if len(obj)%2 != 0 {
continue
}
// Generate some random hex string with same length and set it.
// TODO: Experiment with length < current field, or junk in general.
hexBytes, err := hexutil.Decode(obj)
if err != nil {
return nil, err
}
fuzzedStr, err := randomHex(len(hexBytes))
if err != nil {
return nil, err
}
log.WithField("method", jsonRequest.Method).Warnf(
"Fuzzing field %s, modifying value from %s to %s",
k,
obj,
fuzzedStr,
)
params[k] = fuzzedStr
}
}
}
jsonRequest.Params[0] = params
return json.Marshal(jsonRequest)
}
// Parses the request from thec consensus client and checks if user desires
// to spoof it based on the JSON-RPC method. If so, it returns the modified
// request bytes which will be proxied to the execution client.
func spoofRequest(config *spoofingConfig, requestBytes []byte) ([]byte, error) {
// If the JSON request is not a JSON-RPC object, return the request as-is.
jsonRequest, err := unmarshalRPCObject(requestBytes)
if err != nil {
switch {
case strings.Contains(err.Error(), "cannot unmarshal array"):
return requestBytes, nil
default:
return nil, err
}
}
if len(jsonRequest.Params) == 0 {
return requestBytes, nil
}
desiredMethodsToSpoof := make(map[string]*spoof)
for _, spoofReq := range config.Requests {
desiredMethodsToSpoof[spoofReq.Method] = spoofReq
}
// If we don't want to spoof the request, just return the request as-is.
spoofDetails, ok := desiredMethodsToSpoof[jsonRequest.Method]
if !ok {
return requestBytes, nil
}
// TODO: Support methods with multiple params.
params := make(map[string]interface{})
if err := extractObjectFromJSONRPC(jsonRequest.Params[0], &params); err != nil {
return nil, err
}
for fieldToModify, fieldValue := range spoofDetails.Fields {
if _, ok := params[fieldToModify]; !ok {
continue
}
params[fieldToModify] = fieldValue
}
log.WithField("method", jsonRequest.Method).Infof("Modified request %v", params)
jsonRequest.Params[0] = params
return json.Marshal(jsonRequest)
}
func (pn *ProxyNode) checkIfValid(reqBytes []byte) bool {
jsonRequest, err := unmarshalRPCObject(reqBytes)
if err != nil {
switch {
case strings.Contains(err.Error(), "cannot unmarshal array"):
return false
default:
return false
}
}
if strings.Contains(jsonRequest.Method, "engine_forkchoiceUpdatedV1") ||
strings.Contains(jsonRequest.Method, "engine_newPayloadV1") {
return true
}
return false
}
func (pn *ProxyNode) returnSyncingResponse(reqBytes []byte, w http.ResponseWriter, r *http.Request) {
jsonRequest, err := unmarshalRPCObject(reqBytes)
if err != nil {
return
}
switch {
case strings.Contains(jsonRequest.Method, "engine_forkchoiceUpdatedV1"):
resp := &forkchoiceUpdatedResponse{
Status: &pb.PayloadStatus{
Status: pb.PayloadStatus_SYNCING,
},
PayloadId: nil,
}
jResp := &jsonRPCObject{
Method: jsonRequest.Method,
ID: jsonRequest.ID,
Result: resp,
}
rawResp, err := json.Marshal(jResp)
if err != nil {
return
}
_, err = w.Write(rawResp)
_ = err
return
case strings.Contains(jsonRequest.Method, "engine_newPayloadV1"):
resp := &pb.PayloadStatus{
Status: pb.PayloadStatus_SYNCING,
}
jResp := &jsonRPCObject{
Method: jsonRequest.Method,
ID: jsonRequest.ID,
Result: resp,
}
rawResp, err := json.Marshal(jResp)
if err != nil {
return
}
_, err = w.Write(rawResp)
_ = err
pn.backedUpRequests = append(pn.backedUpRequests, r)
return
}
return
}
func fuzzResponse(responseBody io.Reader) ([]byte, error) {
responseBytes, err := ioutil.ReadAll(responseBody)
if err != nil {
return nil, err
}
return responseBytes, nil
}
// Parses the response body from the execution client and checks if user desires
// to spoof it based on the JSON-RPC method. If so, it returns the modified
// response bytes which will be proxied to the consensus client.
func spoofResponse(config *spoofingConfig, requestBytes []byte, responseBody io.Reader) ([]byte, error) {
responseBytes, err := ioutil.ReadAll(responseBody)
if err != nil {
return nil, err
}
// If the JSON request is not a JSON-RPC object, return the request as-is.
jsonRequest, err := unmarshalRPCObject(requestBytes)
if err != nil {
switch {
case strings.Contains(err.Error(), "cannot unmarshal array"):
return responseBytes, nil
default:
return nil, err
}
}
jsonResponse, err := unmarshalRPCObject(responseBytes)
if err != nil {
switch {
case strings.Contains(err.Error(), "cannot unmarshal array"):
return responseBytes, nil
default:
return nil, err
}
}
desiredMethodsToSpoof := make(map[string]*spoof)
for _, spoofReq := range config.Responses {
desiredMethodsToSpoof[spoofReq.Method] = spoofReq
}
// If we don't want to spoof the request, just return the request as-is.
spoofDetails, ok := desiredMethodsToSpoof[jsonRequest.Method]
if !ok {
return responseBytes, nil
}
// TODO: Support nested objects.
params := make(map[string]interface{})
if err := extractObjectFromJSONRPC(jsonResponse.Result, &params); err != nil {
return nil, err
}
for fieldToModify, fieldValue := range spoofDetails.Fields {
if _, ok := params[fieldToModify]; !ok {
continue
}
params[fieldToModify] = fieldValue
}
log.WithField("method", jsonRequest.Method).Infof("Modified response %v", params)
jsonResponse.Result = params
return json.Marshal(jsonResponse)
}
func unmarshalRPCObject(rawBytes []byte) (*jsonRPCObject, error) {
jsonObj := &jsonRPCObject{}
if err := json.Unmarshal(rawBytes, jsonObj); err != nil {
return nil, err
}
return jsonObj, nil
}
func extractObjectFromJSONRPC(src interface{}, dst interface{}) error {
rawResp, err := json.Marshal(src)
if err != nil {
return err
}
return json.Unmarshal(rawResp, dst)
}
func randomHex(n int) (string, error) {
b := make([]byte, n)
if _, err := rand.Read(b); err != nil {
return "", err
}
return "0x" + hex.EncodeToString(b), nil
}

View File

@@ -7,8 +7,10 @@ package endtoend
import (
"context"
"fmt"
"net/http"
"os"
"path"
"strconv"
"strings"
"sync"
"testing"
@@ -27,6 +29,7 @@ import (
e2e "github.com/prysmaticlabs/prysm/testing/endtoend/params"
e2etypes "github.com/prysmaticlabs/prysm/testing/endtoend/types"
"github.com/prysmaticlabs/prysm/testing/require"
"github.com/prysmaticlabs/prysm/time/slots"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
@@ -79,7 +82,10 @@ func (r *testRunner) run() {
g.Go(func() error {
return tracingSink.Start(ctx)
})
proxyNode := components.NewProxyNode(e2e.TestParams.Ports.Eth1ProxyPort, "127.0.0.1:"+strconv.Itoa(e2e.TestParams.Ports.Eth1RPCPort))
g.Go(func() error {
return proxyNode.Start(ctx)
})
if multiClientActive {
keyGen = components.NewKeystoreGenerator()
@@ -254,6 +260,17 @@ func (r *testRunner) run() {
require.NoError(t, err)
tickingStartTime := helpers.EpochTickerStartTime(genesis)
proxyNode.AddInterceptor(func(reqBytes []byte, w http.ResponseWriter, r *http.Request) bool {
currSlot := slots.CurrentSlot(uint64(genesis.GenesisTime.AsTime().Unix()))
if currSlot < params.BeaconConfig().SlotsPerEpoch.Mul(uint64(9)) {
return false
}
if currSlot >= params.BeaconConfig().SlotsPerEpoch.Mul(uint64(10)) {
return false
}
return proxyNode.SyncingInterceptor()(reqBytes, w, r)
})
// Run assigned evaluators.
if err := r.runEvaluators(conns, tickingStartTime); err != nil {
return errors.Wrap(err, "one or more evaluators failed")
@@ -299,6 +316,9 @@ func (r *testRunner) runEvaluators(conns []*grpc.ClientConn, tickingStartTime ti
secondsPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch.Mul(params.BeaconConfig().SecondsPerSlot))
ticker := helpers.NewEpochTicker(tickingStartTime, secondsPerEpoch)
for currentEpoch := range ticker.C() {
if currentEpoch == 9 || currentEpoch == 10 {
continue
}
wg := new(sync.WaitGroup)
for _, eval := range config.Evaluators {
// Fix reference to evaluator as it will be running

View File

@@ -19,7 +19,7 @@ type testArgs struct {
useWeb3RemoteSigner bool
}
func TestEndToEnd_MinimalConfig(t *testing.T) {
func TestEndToEnd_MinimalConfigV(t *testing.T) {
e2eMinimal(t, &testArgs{
usePrysmSh: false,
useWeb3RemoteSigner: false,

View File

@@ -31,6 +31,7 @@ type ports struct {
Eth1RPCPort int
Eth1AuthRPCPort int
Eth1WSPort int
Eth1ProxyPort int
PrysmBeaconNodeRPCPort int
PrysmBeaconNodeUDPPort int
PrysmBeaconNodeTCPPort int
@@ -86,6 +87,7 @@ const (
Eth1RPCPort = Eth1Port + portSpan
Eth1WSPort = Eth1Port + 2*portSpan
Eth1AuthRPCPort = Eth1Port + 3*portSpan
Eth1ProxyPort = Eth1Port + 4*portSpan
PrysmBeaconNodeRPCPort = 4150
PrysmBeaconNodeUDPPort = PrysmBeaconNodeRPCPort + portSpan
@@ -233,6 +235,10 @@ func initializeStandardPorts(shardCount, shardIndex int, ports *ports, existingR
if err != nil {
return err
}
eth1ProxyPort, err := port(Eth1ProxyPort, shardCount, shardIndex, existingRegistrations)
if err != nil {
return err
}
beaconNodeRPCPort, err := port(PrysmBeaconNodeRPCPort, shardCount, shardIndex, existingRegistrations)
if err != nil {
return err
@@ -275,6 +281,7 @@ func initializeStandardPorts(shardCount, shardIndex int, ports *ports, existingR
ports.Eth1RPCPort = eth1RPCPort
ports.Eth1AuthRPCPort = eth1AuthPort
ports.Eth1WSPort = eth1WSPort
ports.Eth1ProxyPort = eth1ProxyPort
ports.PrysmBeaconNodeRPCPort = beaconNodeRPCPort
ports.PrysmBeaconNodeUDPPort = beaconNodeUDPPort
ports.PrysmBeaconNodeTCPPort = beaconNodeTCPPort