Add In Support For Builder in E2E (#12343)

* fix it up

* add gaz

* add changes in

* finally runs

* fix it

* add progress

* add capella support

* save progress

* remove debug logs

* cleanup

* remove log

* fix flag

* remove unused lock

* gaz

* change

* fix

* lint

* james review

---------

Co-authored-by: james-prysm <90280386+james-prysm@users.noreply.github.com>
This commit is contained in:
Nishant Das
2023-05-12 00:10:29 +08:00
committed by GitHub
parent f6764fe62b
commit aef22bf54e
27 changed files with 1429 additions and 166 deletions

View File

@@ -346,6 +346,74 @@ func (p *ExecutionPayload) ToProto() (*v1.ExecutionPayload, error) {
}, nil
}
// FromProto converts a proto execution payload type to our builder
// compatible payload type.
func FromProto(payload *v1.ExecutionPayload) (ExecutionPayload, error) {
bFee, err := sszBytesToUint256(payload.BaseFeePerGas)
if err != nil {
return ExecutionPayload{}, err
}
txs := make([]hexutil.Bytes, len(payload.Transactions))
for i := range payload.Transactions {
txs[i] = payload.Transactions[i]
}
return ExecutionPayload{
ParentHash: payload.ParentHash,
FeeRecipient: payload.FeeRecipient,
StateRoot: payload.StateRoot,
ReceiptsRoot: payload.ReceiptsRoot,
LogsBloom: payload.LogsBloom,
PrevRandao: payload.PrevRandao,
BlockNumber: Uint64String(payload.BlockNumber),
GasLimit: Uint64String(payload.GasLimit),
GasUsed: Uint64String(payload.GasUsed),
Timestamp: Uint64String(payload.Timestamp),
ExtraData: payload.ExtraData,
BaseFeePerGas: bFee,
BlockHash: payload.BlockHash,
Transactions: txs,
}, nil
}
// FromProtoCapella converts a proto execution payload type for capella to our
// builder compatible payload type.
func FromProtoCapella(payload *v1.ExecutionPayloadCapella) (ExecutionPayloadCapella, error) {
bFee, err := sszBytesToUint256(payload.BaseFeePerGas)
if err != nil {
return ExecutionPayloadCapella{}, err
}
txs := make([]hexutil.Bytes, len(payload.Transactions))
for i := range payload.Transactions {
txs[i] = payload.Transactions[i]
}
withdrawals := make([]Withdrawal, len(payload.Withdrawals))
for i, w := range payload.Withdrawals {
withdrawals[i] = Withdrawal{
Index: Uint256{Int: big.NewInt(0).SetUint64(w.Index)},
ValidatorIndex: Uint256{Int: big.NewInt(0).SetUint64(uint64(w.ValidatorIndex))},
Address: w.Address,
Amount: Uint256{Int: big.NewInt(0).SetUint64(w.Amount)},
}
}
return ExecutionPayloadCapella{
ParentHash: payload.ParentHash,
FeeRecipient: payload.FeeRecipient,
StateRoot: payload.StateRoot,
ReceiptsRoot: payload.ReceiptsRoot,
LogsBloom: payload.LogsBloom,
PrevRandao: payload.PrevRandao,
BlockNumber: Uint64String(payload.BlockNumber),
GasLimit: Uint64String(payload.GasLimit),
GasUsed: Uint64String(payload.GasUsed),
Timestamp: Uint64String(payload.Timestamp),
ExtraData: payload.ExtraData,
BaseFeePerGas: bFee,
BlockHash: payload.BlockHash,
Transactions: txs,
Withdrawals: withdrawals,
}, nil
}
type ExecHeaderResponseCapella struct {
Data struct {
Signature hexutil.Bytes `json:"signature"`

View File

@@ -14,7 +14,6 @@ go_library(
"metrics.go",
"options.go",
"prometheus.go",
"provider.go",
"rpc_connection.go",
"service.go",
],
@@ -90,7 +89,6 @@ go_test(
"init_test.go",
"log_processing_test.go",
"prometheus_test.go",
"provider_test.go",
"service_test.go",
],
data = glob(["testdata/**"]),
@@ -122,7 +120,6 @@ go_test(
"//crypto/bls:go_default_library",
"//encoding/bytesutil:go_default_library",
"//monitoring/clientstats:go_default_library",
"//network/authorization:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",

View File

@@ -7,6 +7,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
"github.com/prysmaticlabs/prysm/v4/network"
"github.com/prysmaticlabs/prysm/v4/network/authorization"
)
@@ -15,7 +16,7 @@ type Option func(s *Service) error
// WithHttpEndpoint parse http endpoint for the powchain service to use.
func WithHttpEndpoint(endpointString string) Option {
return func(s *Service) error {
s.cfg.currHttpEndpoint = HttpEndpoint(endpointString)
s.cfg.currHttpEndpoint = network.HttpEndpoint(endpointString)
return nil
}
}
@@ -27,7 +28,7 @@ func WithHttpEndpointAndJWTSecret(endpointString string, secret []byte) Option {
return nil
}
// Overwrite authorization type for all endpoints to be of a bearer type.
hEndpoint := HttpEndpoint(endpointString)
hEndpoint := network.HttpEndpoint(endpointString)
hEndpoint.Auth.Method = authorization.Bearer
hEndpoint.Auth.Value = string(secret)

View File

@@ -1,49 +0,0 @@
package execution
import (
"encoding/base64"
"strings"
"github.com/prysmaticlabs/prysm/v4/network"
"github.com/prysmaticlabs/prysm/v4/network/authorization"
)
// HttpEndpoint extracts an httputils.Endpoint from the provider parameter.
func HttpEndpoint(eth1Provider string) network.Endpoint {
endpoint := network.Endpoint{
Url: "",
Auth: network.AuthorizationData{
Method: authorization.None,
Value: "",
}}
authValues := strings.Split(eth1Provider, ",")
endpoint.Url = strings.TrimSpace(authValues[0])
if len(authValues) > 2 {
log.Errorf(
"ETH1 endpoint string can contain one comma for specifying the authorization header to access the provider."+
" String contains too many commas: %d. Skipping authorization.", len(authValues)-1)
} else if len(authValues) == 2 {
switch network.Method(strings.TrimSpace(authValues[1])) {
case authorization.Basic:
basicAuthValues := strings.Split(strings.TrimSpace(authValues[1]), " ")
if len(basicAuthValues) != 2 {
log.Errorf("Basic Authentication has incorrect format. Skipping authorization.")
} else {
endpoint.Auth.Method = authorization.Basic
endpoint.Auth.Value = base64.StdEncoding.EncodeToString([]byte(basicAuthValues[1]))
}
case authorization.Bearer:
bearerAuthValues := strings.Split(strings.TrimSpace(authValues[1]), " ")
if len(bearerAuthValues) != 2 {
log.Errorf("Bearer Authentication has incorrect format. Skipping authorization.")
} else {
endpoint.Auth.Method = authorization.Bearer
endpoint.Auth.Value = bearerAuthValues[1]
}
case authorization.None:
log.Errorf("Authorization has incorrect format or authorization type is not supported.")
}
}
return endpoint
}

View File

@@ -1,74 +0,0 @@
package execution
import (
"testing"
"github.com/prysmaticlabs/prysm/v4/network/authorization"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestHttpEndpoint(t *testing.T) {
hook := logTest.NewGlobal()
url := "http://test"
t.Run("URL", func(t *testing.T) {
endpoint := HttpEndpoint(url)
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.None, endpoint.Auth.Method)
})
t.Run("URL with separator", func(t *testing.T) {
endpoint := HttpEndpoint(url + ",")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.None, endpoint.Auth.Method)
})
t.Run("URL with whitespace", func(t *testing.T) {
endpoint := HttpEndpoint(" " + url + " ,")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.None, endpoint.Auth.Method)
})
t.Run("Basic auth", func(t *testing.T) {
endpoint := HttpEndpoint(url + ",Basic username:password")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.Basic, endpoint.Auth.Method)
assert.Equal(t, "dXNlcm5hbWU6cGFzc3dvcmQ=", endpoint.Auth.Value)
})
t.Run("Basic auth with whitespace", func(t *testing.T) {
endpoint := HttpEndpoint(url + ", Basic username:password ")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.Basic, endpoint.Auth.Method)
assert.Equal(t, "dXNlcm5hbWU6cGFzc3dvcmQ=", endpoint.Auth.Value)
})
t.Run("Basic auth with incorrect format", func(t *testing.T) {
hook.Reset()
endpoint := HttpEndpoint(url + ",Basic username:password foo")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.None, endpoint.Auth.Method)
assert.LogsContain(t, hook, "Skipping authorization")
})
t.Run("Bearer auth", func(t *testing.T) {
endpoint := HttpEndpoint(url + ",Bearer token")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.Bearer, endpoint.Auth.Method)
assert.Equal(t, "token", endpoint.Auth.Value)
})
t.Run("Bearer auth with whitespace", func(t *testing.T) {
endpoint := HttpEndpoint(url + ", Bearer token ")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.Bearer, endpoint.Auth.Method)
assert.Equal(t, "token", endpoint.Auth.Value)
})
t.Run("Bearer auth with incorrect format", func(t *testing.T) {
hook.Reset()
endpoint := HttpEndpoint(url + ",Bearer token foo")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.None, endpoint.Auth.Method)
assert.LogsContain(t, hook, "Skipping authorization")
})
t.Run("Too many separators", func(t *testing.T) {
endpoint := HttpEndpoint(url + ",Bearer token,foo")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.None, endpoint.Auth.Method)
assert.LogsContain(t, hook, "Skipping authorization")
})
}

View File

@@ -3,7 +3,6 @@ package execution
import (
"context"
"fmt"
"net/url"
"strings"
"time"
@@ -107,26 +106,10 @@ func (s *Service) retryExecutionClientConnection(ctx context.Context, err error)
// Initializes an RPC connection with authentication headers.
func (s *Service) newRPCClientWithAuth(ctx context.Context, endpoint network.Endpoint) (*gethRPC.Client, error) {
// Need to handle ipc and http
var client *gethRPC.Client
u, err := url.Parse(endpoint.Url)
client, err := network.NewExecutionRPCClient(ctx, endpoint)
if err != nil {
return nil, err
}
switch u.Scheme {
case "http", "https":
client, err = gethRPC.DialOptions(ctx, endpoint.Url, gethRPC.WithHTTPClient(endpoint.HttpClient()))
if err != nil {
return nil, err
}
case "", "ipc":
client, err = gethRPC.DialIPC(ctx, endpoint.Url)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme)
}
if endpoint.Auth.Method != authorization.None {
header, err := endpoint.Auth.ToHeaderValue()
if err != nil {

View File

@@ -127,7 +127,6 @@ func (vs *Server) setExecutionData(ctx context.Context, blk interfaces.SignedBea
}
}
}
executionData, err := vs.getExecutionPayload(ctx, slot, idx, blk.Block().ParentRoot(), headState)
if err != nil {
return errors.Wrap(err, "failed to get execution payload")

View File

@@ -12,6 +12,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//network/authorization:go_default_library",
"@com_github_ethereum_go_ethereum//rpc:go_default_library",
"@com_github_golang_jwt_jwt_v4//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
@@ -32,5 +33,6 @@ go_test(
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"@com_github_golang_jwt_jwt_v4//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)

View File

@@ -1,11 +1,17 @@
package network
import (
"context"
"encoding/base64"
"errors"
"fmt"
"net/http"
"net/url"
"strings"
gethRPC "github.com/ethereum/go-ethereum/rpc"
"github.com/prysmaticlabs/prysm/v4/network/authorization"
log "github.com/sirupsen/logrus"
)
// Endpoint is an endpoint with authorization data.
@@ -53,6 +59,46 @@ func (d *AuthorizationData) ToHeaderValue() (string, error) {
return "", errors.New("could not create HTTP header for unknown authorization method")
}
// HttpEndpoint extracts an httputils.Endpoint from the provider parameter.
func HttpEndpoint(eth1Provider string) Endpoint {
endpoint := Endpoint{
Url: "",
Auth: AuthorizationData{
Method: authorization.None,
Value: "",
}}
authValues := strings.Split(eth1Provider, ",")
endpoint.Url = strings.TrimSpace(authValues[0])
if len(authValues) > 2 {
log.Errorf(
"ETH1 endpoint string can contain one comma for specifying the authorization header to access the provider."+
" String contains too many commas: %d. Skipping authorization.", len(authValues)-1)
} else if len(authValues) == 2 {
switch Method(strings.TrimSpace(authValues[1])) {
case authorization.Basic:
basicAuthValues := strings.Split(strings.TrimSpace(authValues[1]), " ")
if len(basicAuthValues) != 2 {
log.Errorf("Basic Authentication has incorrect format. Skipping authorization.")
} else {
endpoint.Auth.Method = authorization.Basic
endpoint.Auth.Value = base64.StdEncoding.EncodeToString([]byte(basicAuthValues[1]))
}
case authorization.Bearer:
bearerAuthValues := strings.Split(strings.TrimSpace(authValues[1]), " ")
if len(bearerAuthValues) != 2 {
log.Errorf("Bearer Authentication has incorrect format. Skipping authorization.")
} else {
endpoint.Auth.Method = authorization.Bearer
endpoint.Auth.Value = bearerAuthValues[1]
}
case authorization.None:
log.Errorf("Authorization has incorrect format or authorization type is not supported.")
}
}
return endpoint
}
// Method returns the authorizationmethod.AuthorizationMethod corresponding with the parameter value.
func Method(auth string) authorization.AuthorizationMethod {
if strings.HasPrefix(strings.ToLower(auth), "basic") {
@@ -76,3 +122,27 @@ func NewHttpClientWithSecret(secret string) *http.Client {
Transport: authTransport,
}
}
func NewExecutionRPCClient(ctx context.Context, endpoint Endpoint) (*gethRPC.Client, error) {
// Need to handle ipc and http
var client *gethRPC.Client
u, err := url.Parse(endpoint.Url)
if err != nil {
return nil, err
}
switch u.Scheme {
case "http", "https":
client, err = gethRPC.DialOptions(ctx, endpoint.Url, gethRPC.WithHTTPClient(endpoint.HttpClient()))
if err != nil {
return nil, err
}
case "", "ipc":
client, err = gethRPC.DialIPC(ctx, endpoint.Url)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("no known transport for URL scheme %q", u.Scheme)
}
return client, nil
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/network/authorization"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
logTest "github.com/sirupsen/logrus/hooks/test"
)
func TestToHeaderValue(t *testing.T) {
@@ -140,3 +141,68 @@ func TestAuthorizationDataEquals(t *testing.T) {
assert.Equal(t, false, d.Equals(other))
})
}
func TestHttpEndpoint(t *testing.T) {
hook := logTest.NewGlobal()
url := "http://test"
t.Run("URL", func(t *testing.T) {
endpoint := HttpEndpoint(url)
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.None, endpoint.Auth.Method)
})
t.Run("URL with separator", func(t *testing.T) {
endpoint := HttpEndpoint(url + ",")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.None, endpoint.Auth.Method)
})
t.Run("URL with whitespace", func(t *testing.T) {
endpoint := HttpEndpoint(" " + url + " ,")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.None, endpoint.Auth.Method)
})
t.Run("Basic auth", func(t *testing.T) {
endpoint := HttpEndpoint(url + ",Basic username:password")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.Basic, endpoint.Auth.Method)
assert.Equal(t, "dXNlcm5hbWU6cGFzc3dvcmQ=", endpoint.Auth.Value)
})
t.Run("Basic auth with whitespace", func(t *testing.T) {
endpoint := HttpEndpoint(url + ", Basic username:password ")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.Basic, endpoint.Auth.Method)
assert.Equal(t, "dXNlcm5hbWU6cGFzc3dvcmQ=", endpoint.Auth.Value)
})
t.Run("Basic auth with incorrect format", func(t *testing.T) {
hook.Reset()
endpoint := HttpEndpoint(url + ",Basic username:password foo")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.None, endpoint.Auth.Method)
assert.LogsContain(t, hook, "Skipping authorization")
})
t.Run("Bearer auth", func(t *testing.T) {
endpoint := HttpEndpoint(url + ",Bearer token")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.Bearer, endpoint.Auth.Method)
assert.Equal(t, "token", endpoint.Auth.Value)
})
t.Run("Bearer auth with whitespace", func(t *testing.T) {
endpoint := HttpEndpoint(url + ", Bearer token ")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.Bearer, endpoint.Auth.Method)
assert.Equal(t, "token", endpoint.Auth.Value)
})
t.Run("Bearer auth with incorrect format", func(t *testing.T) {
hook.Reset()
endpoint := HttpEndpoint(url + ",Bearer token foo")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.None, endpoint.Auth.Method)
assert.LogsContain(t, hook, "Skipping authorization")
})
t.Run("Too many separators", func(t *testing.T) {
endpoint := HttpEndpoint(url + ",Bearer token,foo")
assert.Equal(t, url, endpoint.Url)
assert.Equal(t, authorization.None, endpoint.Auth.Method)
assert.LogsContain(t, hook, "Skipping authorization")
})
}

View File

@@ -4,6 +4,7 @@ load("@prysm//tools/go:def.bzl", "go_test")
# gazelle:exclude mainnet_e2e_test.go
# gazelle:exclude mainnet_scenario_e2e_test.go
# gazelle:exclude minimal_scenario_e2e_test.go
# gazelle:exclude minimal_builder_e2e_test.go
# Presubmit tests represent the group of endtoend tests that are run on pull
# requests and must be passing before a pull request can merge.
@@ -25,6 +26,7 @@ test_suite(
"manual",
],
tests = [
":go_builder_test",
":go_mainnet_test",
],
)
@@ -117,6 +119,39 @@ go_test(
deps = common_deps,
)
# gazelle:ignore
go_test(
name = "go_builder_test",
size = "large",
testonly = True,
srcs = [
"component_handler_test.go",
"endtoend_setup_test.go",
"endtoend_test.go",
"minimal_builder_e2e_test.go",
],
args = ["-test.v"],
data = [
"//:prysm_sh",
"//cmd/beacon-chain",
"//cmd/validator",
"//config/params:custom_configs",
"//tools/bootnode",
"@com_github_ethereum_go_ethereum//cmd/geth",
"@web3signer",
],
eth_network = "minimal",
flaky = True,
shard_count = 2,
tags = [
"e2e",
"manual",
"minimal",
"requires-network",
],
deps = common_deps,
)
go_test(
name = "go_mainnet_test",
size = "large",

View File

@@ -28,6 +28,7 @@ type componentHandler struct {
web3Signer e2etypes.ComponentRunner
bootnode e2etypes.ComponentRunner
eth1Miner e2etypes.ComponentRunner
builders e2etypes.MultipleComponentRunners
eth1Proxy e2etypes.MultipleComponentRunners
eth1Nodes e2etypes.MultipleComponentRunners
beaconNodes e2etypes.MultipleComponentRunners
@@ -137,23 +138,47 @@ func (c *componentHandler) setup() {
if config.TestCheckpointSync {
appendDebugEndpoints(config)
}
// Proxies
proxies := eth1.NewProxySet()
g.Go(func() error {
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes}); err != nil {
return errors.Wrap(err, "proxies require execution nodes to run")
}
if err := proxies.Start(ctx); err != nil {
return errors.Wrap(err, "failed to start proxies")
}
return nil
})
c.eth1Proxy = proxies
var builders *components.BuilderSet
var proxies *eth1.ProxySet
if config.UseBuilder {
// Builder
builders = components.NewBuilderSet()
g.Go(func() error {
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes}); err != nil {
return errors.Wrap(err, "builders require execution nodes to run")
}
if err := builders.Start(ctx); err != nil {
return errors.Wrap(err, "failed to start builders")
}
return nil
})
c.builders = builders
} else {
// Proxies
proxies = eth1.NewProxySet()
g.Go(func() error {
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes}); err != nil {
return errors.Wrap(err, "proxies require execution nodes to run")
}
if err := proxies.Start(ctx); err != nil {
return errors.Wrap(err, "failed to start proxies")
}
return nil
})
c.eth1Proxy = proxies
}
// Beacon nodes.
beaconNodes := components.NewBeaconNodes(config)
g.Go(func() error {
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{eth1Nodes, proxies, bootNode}); err != nil {
wantedComponents := []e2etypes.ComponentRunner{eth1Nodes, bootNode}
if config.UseBuilder {
wantedComponents = append(wantedComponents, builders)
} else {
wantedComponents = append(wantedComponents, proxies)
}
if err := helpers.ComponentsStarted(ctx, wantedComponents); err != nil {
return errors.Wrap(err, "beacon nodes require proxies, execution and boot node to run")
}
beaconNodes.SetENR(bootNode.ENR())
@@ -215,7 +240,12 @@ func (c *componentHandler) setup() {
func (c *componentHandler) required() []e2etypes.ComponentRunner {
multiClientActive := e2e.TestParams.LighthouseBeaconNodeCount > 0
requiredComponents := []e2etypes.ComponentRunner{
c.tracingSink, c.eth1Nodes, c.bootnode, c.beaconNodes, c.validatorNodes, c.eth1Proxy,
c.tracingSink, c.eth1Nodes, c.bootnode, c.beaconNodes, c.validatorNodes,
}
if c.cfg.UseBuilder {
requiredComponents = append(requiredComponents, c.builders)
} else {
requiredComponents = append(requiredComponents, c.eth1Proxy)
}
if multiClientActive {
requiredComponents = append(requiredComponents, []e2etypes.ComponentRunner{c.keygen, c.lighthouseBeaconNodes, c.lighthouseValidatorNodes}...)

View File

@@ -6,6 +6,7 @@ go_library(
srcs = [
"beacon_node.go",
"boot_node.go",
"builder.go",
"lighthouse_beacon.go",
"lighthouse_validator.go",
"log.go",
@@ -35,6 +36,7 @@ go_library(
"//testing/endtoend/helpers:go_default_library",
"//testing/endtoend/params:go_default_library",
"//testing/endtoend/types:go_default_library",
"//testing/middleware/builder:go_default_library",
"//validator/keymanager:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",

View File

@@ -282,6 +282,9 @@ func (node *BeaconNode) Start(ctx context.Context) error {
if !config.TestFeature || index%2 == 0 {
args = append(args, features.E2EBeaconChainFlags...)
}
if config.UseBuilder {
args = append(args, fmt.Sprintf("--%s=%s:%d", flags.MevRelayEndpoint.Name, "http://127.0.0.1", e2e.TestParams.Ports.Eth1ProxyPort+index))
}
args = append(args, config.BeaconFlags...)
cmd := exec.CommandContext(ctx, binaryPath, args...) // #nosec G204 -- Safe

View File

@@ -0,0 +1,213 @@
package components
import (
"context"
"encoding/hex"
"fmt"
"os"
"path"
"strconv"
"strings"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/io/file"
"github.com/prysmaticlabs/prysm/v4/testing/endtoend/helpers"
e2e "github.com/prysmaticlabs/prysm/v4/testing/endtoend/params"
e2etypes "github.com/prysmaticlabs/prysm/v4/testing/endtoend/types"
"github.com/prysmaticlabs/prysm/v4/testing/middleware/builder"
"github.com/sirupsen/logrus"
)
// BuilderSet represents a set of builders for the validators running via a relay.
type BuilderSet struct {
e2etypes.ComponentRunner
started chan struct{}
builders []e2etypes.ComponentRunner
}
// NewBuilderSet creates and returns a set of builders.
func NewBuilderSet() *BuilderSet {
return &BuilderSet{
started: make(chan struct{}, 1),
}
}
// Start starts all the builders in set.
func (s *BuilderSet) Start(ctx context.Context) error {
totalNodeCount := e2e.TestParams.BeaconNodeCount + e2e.TestParams.LighthouseBeaconNodeCount
nodes := make([]e2etypes.ComponentRunner, totalNodeCount)
for i := 0; i < totalNodeCount; i++ {
nodes[i] = NewBuilder(i)
}
s.builders = nodes
// Wait for all nodes to finish their job (blocking).
// Once nodes are ready passed in handler function will be called.
return helpers.WaitOnNodes(ctx, nodes, func() {
// All nodes started, close channel, so that all services waiting on a set, can proceed.
close(s.started)
})
}
// Started checks whether builder set is started and all builders are ready to be queried.
func (s *BuilderSet) Started() <-chan struct{} {
return s.started
}
// Pause pauses the component and its underlying process.
func (s *BuilderSet) Pause() error {
for _, n := range s.builders {
if err := n.Pause(); err != nil {
return err
}
}
return nil
}
// Resume resumes the component and its underlying process.
func (s *BuilderSet) Resume() error {
for _, n := range s.builders {
if err := n.Resume(); err != nil {
return err
}
}
return nil
}
// Stop stops the component and its underlying process.
func (s *BuilderSet) Stop() error {
for _, n := range s.builders {
if err := n.Stop(); err != nil {
return err
}
}
return nil
}
// PauseAtIndex pauses the component and its underlying process at the desired index.
func (s *BuilderSet) PauseAtIndex(i int) error {
if i >= len(s.builders) {
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.builders))
}
return s.builders[i].Pause()
}
// ResumeAtIndex resumes the component and its underlying process at the desired index.
func (s *BuilderSet) ResumeAtIndex(i int) error {
if i >= len(s.builders) {
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.builders))
}
return s.builders[i].Resume()
}
// StopAtIndex stops the component and its underlying process at the desired index.
func (s *BuilderSet) StopAtIndex(i int) error {
if i >= len(s.builders) {
return errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.builders))
}
return s.builders[i].Stop()
}
// ComponentAtIndex returns the component at the provided index.
func (s *BuilderSet) ComponentAtIndex(i int) (e2etypes.ComponentRunner, error) {
if i >= len(s.builders) {
return nil, errors.Errorf("provided index exceeds slice size: %d >= %d", i, len(s.builders))
}
return s.builders[i], nil
}
// Builder represents a block builder.
type Builder struct {
e2etypes.ComponentRunner
started chan struct{}
index int
builder *builder.Builder
cancel func()
}
// NewBuilder creates and returns a builder.
func NewBuilder(index int) *Builder {
return &Builder{
started: make(chan struct{}, 1),
index: index,
}
}
// Start runs a builder.
func (node *Builder) Start(ctx context.Context) error {
f, err := os.Create(path.Join(e2e.TestParams.LogPath, "builder_"+strconv.Itoa(node.index)+".log"))
if err != nil {
return err
}
jwtPath := path.Join(e2e.TestParams.TestPath, "eth1data/"+strconv.Itoa(node.index)+"/")
if node.index == 0 {
jwtPath = path.Join(e2e.TestParams.TestPath, "eth1data/miner/")
}
jwtPath = path.Join(jwtPath, "geth/jwtsecret")
secret, err := parseJWTSecretFromFile(jwtPath)
if err != nil {
return err
}
opts := []builder.Option{
builder.WithDestinationAddress(fmt.Sprintf("http://127.0.0.1:%d", e2e.TestParams.Ports.Eth1AuthRPCPort+node.index)),
builder.WithPort(e2e.TestParams.Ports.Eth1ProxyPort + node.index),
builder.WithLogger(logrus.New()),
builder.WithLogFile(f),
builder.WithJwtSecret(string(secret)),
}
bd, err := builder.New(opts...)
if err != nil {
return err
}
log.Infof("Starting builder %d with port: %d and file %s", node.index, e2e.TestParams.Ports.Eth1ProxyPort+node.index, f.Name())
// Set cancel into context.
ctx, cancel := context.WithCancel(ctx)
node.cancel = cancel
node.builder = bd
// Mark node as ready.
close(node.started)
return bd.Start(ctx)
}
// Started checks whether the builder is started and ready to be queried.
func (node *Builder) Started() <-chan struct{} {
return node.started
}
// Pause pauses the component and its underlying process.
func (node *Builder) Pause() error {
// no-op
return nil
}
// Resume resumes the component and its underlying process.
func (node *Builder) Resume() error {
// no-op
return nil
}
// Stop kills the component and its underlying process.
func (node *Builder) Stop() error {
node.cancel()
return nil
}
func parseJWTSecretFromFile(jwtSecretFile string) ([]byte, error) {
enc, err := file.ReadFileAsBytes(jwtSecretFile)
if err != nil {
return nil, err
}
strData := strings.TrimSpace(string(enc))
if strData == "" {
return nil, fmt.Errorf("provided JWT secret in file %s cannot be empty", jwtSecretFile)
}
secret, err := hex.DecodeString(strings.TrimPrefix(strData, "0x"))
if err != nil {
return nil, err
}
if len(secret) < 32 {
return nil, errors.New("provided JWT secret should be a hex string of at least 32 bytes")
}
return secret, nil
}

View File

@@ -46,7 +46,11 @@ func WaitForBlocks(web3 *ethclient.Client, key *keystore.Key, blocksToWait uint6
finishBlock := block.NumberU64() + blocksToWait
for block.NumberU64() <= finishBlock {
spamTX := types.NewTransaction(nonce, key.Address, big.NewInt(0), params.SpamTxGasLimit, big.NewInt(1e6), []byte{})
gasPrice, err := web3.SuggestGasPrice(context.Background())
if err != nil {
return err
}
spamTX := types.NewTransaction(nonce, key.Address, big.NewInt(0), params.SpamTxGasLimit, gasPrice, []byte{})
signed, err := types.SignTx(spamTX, types.NewEIP155Signer(chainID), key.PrivateKey)
if err != nil {
return err

View File

@@ -70,7 +70,7 @@ func (t *TransactionGenerator) Start(ctx context.Context) error {
}
f := filler.NewFiller(rnd)
// Broadcast Transactions every 3 blocks
txPeriod := 3 * time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
txPeriod := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
ticker := time.NewTicker(txPeriod)
gasPrice := big.NewInt(1e11)
for {
@@ -99,16 +99,22 @@ func SendTransaction(client *rpc.Client, key *ecdsa.PrivateKey, f *filler.Filler
if err != nil {
return err
}
nonce, err := backend.NonceAt(context.Background(), sender, big.NewInt(-1))
nonce, err := backend.PendingNonceAt(context.Background(), sender)
if err != nil {
return err
}
expectedPrice, err := backend.SuggestGasPrice(context.Background())
if err != nil {
return err
}
if expectedPrice.Cmp(gasPrice) > 0 {
gasPrice = expectedPrice
}
g, _ := errgroup.WithContext(context.Background())
for i := uint64(0); i < N; i++ {
index := i
g.Go(func() error {
tx, err := txfuzz.RandomValidTx(client, f, sender, nonce+index, gasPrice, nil, al)
tx, err := txfuzz.RandomValidTx(client, f, sender, nonce+index, expectedPrice, nil, al)
if err != nil {
// In the event the transaction constructed is not valid, we continue with the routine
// rather than complete stop it.

View File

@@ -264,6 +264,9 @@ func (v *ValidatorNode) Start(ctx context.Context) error {
fmt.Sprintf("--%s=%d", flags.InteropStartIndex.Name, offset),
)
}
if v.config.UseBuilder {
args = append(args, fmt.Sprintf("--%s", flags.EnableBuilderFlag.Name))
}
args = append(args, config.ValidatorFlags...)
if v.config.UsePrysmShValidator {

View File

@@ -86,6 +86,9 @@ func e2eMinimal(t *testing.T, v int, cfgo ...types.E2EConfigOpt) *testRunner {
for _, o := range cfgo {
o(testConfig)
}
if testConfig.UseBuilder {
testConfig.Evaluators = append(testConfig.Evaluators, ev.BuilderIsActive)
}
return newTestRunner(t, testConfig)
}
@@ -165,6 +168,9 @@ func e2eMainnet(t *testing.T, usePrysmSh, useMultiClient bool, cfg *params.Beaco
if testConfig.UseValidatorCrossClient {
testConfig.Evaluators = append(testConfig.Evaluators, beaconapi_evaluators.BeaconAPIMultiClientVerifyIntegrity)
}
if testConfig.UseBuilder {
testConfig.Evaluators = append(testConfig.Evaluators, ev.BuilderIsActive)
}
return newTestRunner(t, testConfig)
}

View File

@@ -6,6 +6,7 @@ go_library(
srcs = [
"api_gateway_v1alpha1.go",
"api_middleware.go",
"builder.go",
"data.go",
"execution_engine.go",
"fee_recipient.go",

View File

@@ -0,0 +1,103 @@
package evaluators
import (
"context"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/testing/endtoend/helpers"
"github.com/prysmaticlabs/prysm/v4/testing/endtoend/policies"
e2etypes "github.com/prysmaticlabs/prysm/v4/testing/endtoend/types"
"github.com/prysmaticlabs/prysm/v4/time/slots"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)
// BuilderIsActive checks that the builder is indeed producing the respective payloads
var BuilderIsActive = e2etypes.Evaluator{
Name: "builder_is_active_at_epoch_%d",
Policy: policies.OnwardsNthEpoch(helpers.BellatrixE2EForkEpoch),
Evaluation: builderActive,
}
func builderActive(_ *e2etypes.EvaluationContext, conns ...*grpc.ClientConn) error {
conn := conns[0]
client := ethpb.NewNodeClient(conn)
beaconClient := ethpb.NewBeaconChainClient(conn)
genesis, err := client.GetGenesis(context.Background(), &emptypb.Empty{})
if err != nil {
return errors.Wrap(err, "failed to get genesis data")
}
currSlot := slots.CurrentSlot(uint64(genesis.GenesisTime.AsTime().Unix()))
currEpoch := slots.ToEpoch(currSlot)
lowestBound := primitives.Epoch(0)
if currEpoch >= 1 {
lowestBound = currEpoch - 1
}
if lowestBound < helpers.BellatrixE2EForkEpoch {
lowestBound = helpers.BellatrixE2EForkEpoch
}
blockCtrs, err := beaconClient.ListBeaconBlocks(context.Background(), &ethpb.ListBlocksRequest{QueryFilter: &ethpb.ListBlocksRequest_Epoch{Epoch: lowestBound}})
if err != nil {
return errors.Wrap(err, "failed to get beacon blocks")
}
for _, ctr := range blockCtrs.BlockContainers {
b, err := syncCompatibleBlockFromCtr(ctr)
if err != nil {
return errors.Wrapf(err, "block type doesn't exist for block at epoch %d", lowestBound)
}
if b.IsNil() {
return errors.New("nil block provided")
}
forkStartSlot, err := slots.EpochStart(helpers.BellatrixE2EForkEpoch)
if err != nil {
return err
}
if forkStartSlot == b.Block().Slot() || forkStartSlot+1 == b.Block().Slot() {
// Skip fork slot and the next one, as we don't send FCUs yet.
continue
}
execPayload, err := b.Block().Body().Execution()
if err != nil {
return err
}
if string(execPayload.ExtraData()) != "prysm-builder" {
return errors.Errorf("block with slot %d was not built by the builder. It has an extra data of %s", b.Block().Slot(), string(execPayload.ExtraData()))
}
}
if lowestBound == currEpoch {
return nil
}
blockCtrs, err = beaconClient.ListBeaconBlocks(context.Background(), &ethpb.ListBlocksRequest{QueryFilter: &ethpb.ListBlocksRequest_Epoch{Epoch: currEpoch}})
if err != nil {
return errors.Wrap(err, "failed to get validator participation")
}
for _, ctr := range blockCtrs.BlockContainers {
b, err := syncCompatibleBlockFromCtr(ctr)
if err != nil {
return errors.Wrapf(err, "block type doesn't exist for block at epoch %d", lowestBound)
}
if b.IsNil() {
return errors.New("nil block provided")
}
forkStartSlot, err := slots.EpochStart(helpers.BellatrixE2EForkEpoch)
if err != nil {
return err
}
if forkStartSlot == b.Block().Slot() || forkStartSlot+1 == b.Block().Slot() {
// Skip fork slot and the next one, as we don't send FCUs yet.
continue
}
execPayload, err := b.Block().Body().Execution()
if err != nil {
return err
}
if string(execPayload.ExtraData()) != "prysm-builder" {
return errors.Errorf("block with slot %d was not built by the builder. It has an extra data of %s", b.Block().Slot(), string(execPayload.ExtraData()))
}
}
return nil
}

View File

@@ -0,0 +1,13 @@
package endtoend
import (
"testing"
"github.com/prysmaticlabs/prysm/v4/runtime/version"
"github.com/prysmaticlabs/prysm/v4/testing/endtoend/types"
)
func TestEndToEnd_MinimalConfig_WithBuilder(t *testing.T) {
r := e2eMinimal(t, version.Phase0, types.WithCheckpointSync(), types.WithBuilder())
r.run()
}

View File

@@ -44,6 +44,12 @@ func WithValidatorRESTApi() E2EConfigOpt {
}
}
func WithBuilder() E2EConfigOpt {
return func(cfg *E2EConfig) {
cfg.UseBuilder = true
}
}
// E2EConfig defines the struct for all configurations needed for E2E testing.
type E2EConfig struct {
TestCheckpointSync bool
@@ -56,6 +62,7 @@ type E2EConfig struct {
UseFixedPeerIDs bool
UseValidatorCrossClient bool
UseBeaconRestApi bool
UseBuilder bool
EpochsToRun uint64
Seed int64
TracingSinkEndpoint string

View File

@@ -0,0 +1,35 @@
load("@prysm//tools/go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = [
"builder.go",
"options.go",
],
importpath = "github.com/prysmaticlabs/prysm/v4/testing/middleware/builder",
visibility = ["//visibility:public"],
deps = [
"//api/client/builder:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//crypto/bls:go_default_library",
"//encoding/bytesutil:go_default_library",
"//math:go_default_library",
"//network:go_default_library",
"//network/authorization:go_default_library",
"//proto/engine/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_ethereum_go_ethereum//beacon/engine:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_ethereum_go_ethereum//rpc:go_default_library",
"@com_github_ethereum_go_ethereum//trie:go_default_library",
"@com_github_gorilla_mux//:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
)

View File

@@ -0,0 +1,659 @@
package builder
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"math/big"
"net"
"net/http"
"strconv"
"strings"
"time"
"github.com/ethereum/go-ethereum/beacon/engine"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
gethTypes "github.com/ethereum/go-ethereum/core/types"
gethRPC "github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie"
gMux "github.com/gorilla/mux"
builderAPI "github.com/prysmaticlabs/prysm/v4/api/client/builder"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
types "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/crypto/bls"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v4/math"
"github.com/prysmaticlabs/prysm/v4/network"
"github.com/prysmaticlabs/prysm/v4/network/authorization"
v1 "github.com/prysmaticlabs/prysm/v4/proto/engine/v1"
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/sirupsen/logrus"
)
const (
statusPath = "/eth/v1/builder/status"
registerPath = "/eth/v1/builder/validators"
headerPath = "/eth/v1/builder/header/{slot:[0-9]+}/{parent_hash:0x[a-fA-F0-9]+}/{pubkey:0x[a-fA-F0-9]+}"
blindedPath = "/eth/v1/builder/blinded_blocks"
// ForkchoiceUpdatedMethod v1 request string for JSON-RPC.
ForkchoiceUpdatedMethod = "engine_forkchoiceUpdatedV1"
// ForkchoiceUpdatedMethodV2 v2 request string for JSON-RPC.
ForkchoiceUpdatedMethodV2 = "engine_forkchoiceUpdatedV2"
// GetPayloadMethod v1 request string for JSON-RPC.
GetPayloadMethod = "engine_getPayloadV1"
// GetPayloadMethodV2 v2 request string for JSON-RPC.
GetPayloadMethodV2 = "engine_getPayloadV2"
// ExchangeTransitionConfigurationMethod v1 request string for JSON-RPC.
)
var (
defaultBuilderHost = "127.0.0.1"
defaultBuilderPort = 8551
)
type jsonRPCObject struct {
Jsonrpc string `json:"jsonrpc"`
Method string `json:"method"`
Params []interface{} `json:"params"`
ID uint64 `json:"id"`
Result interface{} `json:"result"`
}
type ForkchoiceUpdatedResponse struct {
Jsonrpc string `json:"jsonrpc"`
Method string `json:"method"`
Params []interface{} `json:"params"`
ID uint64 `json:"id"`
Result struct {
Status *v1.PayloadStatus `json:"payloadStatus"`
PayloadId *v1.PayloadIDBytes `json:"payloadId"`
} `json:"result"`
}
type ExecPayloadResponse struct {
Version string `json:"version"`
Data *v1.ExecutionPayload `json:"data"`
}
type ExecHeaderResponseCapella struct {
Version string `json:"version"`
Data struct {
Signature hexutil.Bytes `json:"signature"`
Message *builderAPI.BuilderBidCapella `json:"message"`
} `json:"data"`
}
type Builder struct {
cfg *config
address string
execClient *gethRPC.Client
currId *v1.PayloadIDBytes
currPayload interfaces.ExecutionData
mux *gMux.Router
validatorMap map[string]*eth.ValidatorRegistrationV1
srv *http.Server
}
// New creates a proxy server forwarding requests from a consensus client to an execution client.
func New(opts ...Option) (*Builder, error) {
p := &Builder{
cfg: &config{
builderPort: defaultBuilderPort,
builderHost: defaultBuilderHost,
logger: logrus.New(),
},
}
for _, o := range opts {
if err := o(p); err != nil {
return nil, err
}
}
if p.cfg.destinationUrl == nil {
return nil, errors.New("must provide a destination address for request proxying")
}
endpoint := network.HttpEndpoint(p.cfg.destinationUrl.String())
endpoint.Auth.Method = authorization.Bearer
endpoint.Auth.Value = p.cfg.secret
execClient, err := network.NewExecutionRPCClient(context.Background(), endpoint)
if err != nil {
return nil, err
}
mux := http.NewServeMux()
mux.Handle("/", p)
router := gMux.NewRouter()
router.HandleFunc(statusPath, func(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(http.StatusOK)
})
router.HandleFunc(registerPath, p.registerValidators)
router.HandleFunc(headerPath, p.handleHeaderRequest)
router.HandleFunc(blindedPath, p.handleBlindedBlock)
addr := fmt.Sprintf("%s:%d", p.cfg.builderHost, p.cfg.builderPort)
srv := &http.Server{
Handler: mux,
Addr: addr,
ReadHeaderTimeout: time.Second,
}
p.address = addr
p.srv = srv
p.execClient = execClient
p.validatorMap = map[string]*eth.ValidatorRegistrationV1{}
p.mux = router
return p, nil
}
// Address for the proxy server.
func (p *Builder) Address() string {
return p.address
}
// Start a proxy server.
func (p *Builder) Start(ctx context.Context) error {
p.srv.BaseContext = func(listener net.Listener) context.Context {
return ctx
}
p.cfg.logger.WithFields(logrus.Fields{
"executionAddress": p.cfg.destinationUrl.String(),
}).Infof("Builder now listening on address %s", p.address)
go func() {
if err := p.srv.ListenAndServe(); err != nil {
p.cfg.logger.Error(err)
}
}()
for {
<-ctx.Done()
return p.srv.Shutdown(context.Background())
}
}
// ServeHTTP requests from a consensus client to an execution client, modifying in-flight requests
// and/or responses as desired. It also processes any backed-up requests.
func (p *Builder) ServeHTTP(w http.ResponseWriter, r *http.Request) {
p.cfg.logger.Infof("Received %s request from beacon with url: %s", r.Method, r.URL.Path)
if p.isBuilderCall(r) {
p.mux.ServeHTTP(w, r)
return
}
requestBytes, err := parseRequestBytes(r)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not parse request")
return
}
execRes, err := p.sendHttpRequest(r, requestBytes)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not forward request")
return
}
p.cfg.logger.Infof("Received response for %s request with method %s from %s", r.Method, r.Method, p.cfg.destinationUrl.String())
defer func() {
if err = execRes.Body.Close(); err != nil {
p.cfg.logger.WithError(err).Error("Could not do close proxy responseGen body")
}
}()
buf := bytes.NewBuffer([]byte{})
if _, err = io.Copy(buf, execRes.Body); err != nil {
p.cfg.logger.WithError(err).Error("Could not copy proxy request body")
return
}
byteResp := bytesutil.SafeCopyBytes(buf.Bytes())
p.handleEngineCalls(requestBytes, byteResp)
// Pipe the proxy responseGen to the original caller.
if _, err = io.Copy(w, buf); err != nil {
p.cfg.logger.WithError(err).Error("Could not copy proxy request body")
return
}
}
func (p *Builder) handleEngineCalls(req, resp []byte) {
if !isEngineAPICall(req) {
return
}
rpcObj, err := unmarshalRPCObject(req)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not unmarshal rpc object")
return
}
p.cfg.logger.Infof("Received engine call %s", rpcObj.Method)
switch rpcObj.Method {
case ForkchoiceUpdatedMethod, ForkchoiceUpdatedMethodV2:
result := &ForkchoiceUpdatedResponse{}
err = json.Unmarshal(resp, result)
if err != nil {
p.cfg.logger.Errorf("Could not unmarshal fcu: %v", err)
return
}
p.currId = result.Result.PayloadId
p.cfg.logger.Infof("Received payload id of %#x", result.Result.PayloadId)
}
}
func (p *Builder) isBuilderCall(req *http.Request) bool {
return strings.Contains(req.URL.Path, "/eth/v1/builder/")
}
func (p *Builder) registerValidators(w http.ResponseWriter, req *http.Request) {
registrations := []builderAPI.SignedValidatorRegistration{}
if err := json.NewDecoder(req.Body).Decode(&registrations); err != nil {
http.Error(w, "invalid request", http.StatusBadRequest)
return
}
for _, r := range registrations {
msg := r.Message
p.validatorMap[string(r.Message.Pubkey)] = msg
}
// TODO: Verify Signatures from validators
w.WriteHeader(http.StatusOK)
}
func (p *Builder) handleHeaderRequest(w http.ResponseWriter, req *http.Request) {
urlParams := gMux.Vars(req)
pHash := urlParams["parent_hash"]
if pHash == "" {
http.Error(w, "no valid parent hash", http.StatusBadRequest)
return
}
reqSlot := urlParams["slot"]
if reqSlot == "" {
http.Error(w, "no valid slot provided", http.StatusBadRequest)
return
}
slot, err := strconv.Atoi(reqSlot)
if err != nil {
http.Error(w, "invalid slot provided", http.StatusBadRequest)
return
}
ax := types.Slot(slot)
currEpoch := types.Epoch(ax / params.BeaconConfig().SlotsPerEpoch)
if currEpoch >= params.BeaconConfig().CapellaForkEpoch {
p.handleHeadeRequestCapella(w)
return
}
b, err := p.retrievePendingBlock()
if err != nil {
p.cfg.logger.WithError(err).Error("Could not retrieve pending block")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
secKey, err := bls.RandKey()
if err != nil {
p.cfg.logger.WithError(err).Error("Could not retrieve secret key")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
wObj, err := blocks.WrappedExecutionPayload(b)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not wrap execution payload")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
hdr, err := blocks.PayloadToHeader(wObj)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not make payload into header")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
gEth := big.NewInt(int64(params.BeaconConfig().GweiPerEth))
weiEth := gEth.Mul(gEth, gEth)
val := builderAPI.Uint256{Int: weiEth}
wrappedHdr := &builderAPI.ExecutionPayloadHeader{ExecutionPayloadHeader: hdr}
bid := &builderAPI.BuilderBid{
Header: wrappedHdr,
Value: val,
Pubkey: secKey.PublicKey().Marshal(),
}
sszBid := &eth.BuilderBid{
Header: hdr,
Value: val.SSZBytes(),
Pubkey: secKey.PublicKey().Marshal(),
}
d, err := signing.ComputeDomain(params.BeaconConfig().DomainApplicationBuilder,
nil, /* fork version */
nil /* genesis val root */)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not compute the domain")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
rt, err := signing.ComputeSigningRoot(sszBid, d)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not compute the signing root")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
sig := secKey.Sign(rt[:])
hdrResp := &builderAPI.ExecHeaderResponse{
Version: "bellatrix",
Data: struct {
Signature hexutil.Bytes `json:"signature"`
Message *builderAPI.BuilderBid `json:"message"`
}{
Signature: sig.Marshal(),
Message: bid,
},
}
err = json.NewEncoder(w).Encode(hdrResp)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not encode response")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
p.currPayload = wObj
w.WriteHeader(http.StatusOK)
}
func (p *Builder) handleHeadeRequestCapella(w http.ResponseWriter) {
b, err := p.retrievePendingBlockCapella()
if err != nil {
p.cfg.logger.WithError(err).Error("Could not retrieve pending block")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
secKey, err := bls.RandKey()
if err != nil {
p.cfg.logger.WithError(err).Error("Could not retrieve secret key")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
v := big.NewInt(0).SetBytes(bytesutil.ReverseByteOrder(b.Value))
v = v.Mul(v, big.NewInt(2))
// Is used as the helper modifies the big.Int
weiVal := big.NewInt(0).SetBytes(bytesutil.ReverseByteOrder(b.Value))
weiVal = weiVal.Mul(weiVal, big.NewInt(2))
wObj, err := blocks.WrappedExecutionPayloadCapella(b.Payload, math.WeiToGwei(weiVal))
if err != nil {
p.cfg.logger.WithError(err).Error("Could not wrap execution payload")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
hdr, err := blocks.PayloadToHeaderCapella(wObj)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not make payload into header")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
val := builderAPI.Uint256{Int: v}
wrappedHdr := &builderAPI.ExecutionPayloadHeaderCapella{ExecutionPayloadHeaderCapella: hdr}
bid := &builderAPI.BuilderBidCapella{
Header: wrappedHdr,
Value: val,
Pubkey: secKey.PublicKey().Marshal(),
}
sszBid := &eth.BuilderBidCapella{
Header: hdr,
Value: val.SSZBytes(),
Pubkey: secKey.PublicKey().Marshal(),
}
d, err := signing.ComputeDomain(params.BeaconConfig().DomainApplicationBuilder,
nil, /* fork version */
nil /* genesis val root */)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not compute the domain")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
rt, err := signing.ComputeSigningRoot(sszBid, d)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not compute the signing root")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
sig := secKey.Sign(rt[:])
hdrResp := &ExecHeaderResponseCapella{
Version: "capella",
Data: struct {
Signature hexutil.Bytes `json:"signature"`
Message *builderAPI.BuilderBidCapella `json:"message"`
}{
Signature: sig.Marshal(),
Message: bid,
},
}
err = json.NewEncoder(w).Encode(hdrResp)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not encode response")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
p.currPayload = wObj
w.WriteHeader(http.StatusOK)
}
func (p *Builder) handleBlindedBlock(w http.ResponseWriter, req *http.Request) {
sb := &builderAPI.SignedBlindedBeaconBlockBellatrix{
SignedBlindedBeaconBlockBellatrix: &eth.SignedBlindedBeaconBlockBellatrix{},
}
err := json.NewDecoder(req.Body).Decode(sb)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not decode blinded block")
// TODO: Allow the method to unmarshal blinded blocks correctly
}
if p.currPayload == nil {
p.cfg.logger.Error("No payload is cached")
http.Error(w, "payload not found", http.StatusInternalServerError)
return
}
if payload, err := p.currPayload.PbCapella(); err == nil {
convertedPayload, err := builderAPI.FromProtoCapella(payload)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not convert the payload")
http.Error(w, "payload not found", http.StatusInternalServerError)
return
}
execResp := &builderAPI.ExecPayloadResponseCapella{
Version: "capella",
Data: convertedPayload,
}
err = json.NewEncoder(w).Encode(execResp)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not encode full payload response")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
return
}
bellPayload, err := p.currPayload.PbBellatrix()
if err != nil {
p.cfg.logger.WithError(err).Error("Could not retrieve the payload")
http.Error(w, "payload not found", http.StatusInternalServerError)
return
}
convertedPayload, err := builderAPI.FromProto(bellPayload)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not convert the payload")
http.Error(w, "payload not found", http.StatusInternalServerError)
return
}
execResp := &builderAPI.ExecPayloadResponse{
Version: "bellatrix",
Data: convertedPayload,
}
err = json.NewEncoder(w).Encode(execResp)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not encode full payload response")
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
func (p *Builder) retrievePendingBlock() (*v1.ExecutionPayload, error) {
result := &engine.ExecutableData{}
if p.currId == nil {
return nil, errors.New("no payload id is cached")
}
err := p.execClient.CallContext(context.Background(), result, GetPayloadMethod, *p.currId)
if err != nil {
return nil, err
}
payloadEnv, err := modifyExecutionPayload(*result, big.NewInt(0))
if err != nil {
return nil, err
}
marshalledOutput, err := payloadEnv.ExecutionPayload.MarshalJSON()
if err != nil {
return nil, err
}
bellatrixPayload := &v1.ExecutionPayload{}
if err = json.Unmarshal(marshalledOutput, bellatrixPayload); err != nil {
return nil, err
}
return bellatrixPayload, nil
}
func (p *Builder) retrievePendingBlockCapella() (*v1.ExecutionPayloadCapellaWithValue, error) {
result := &engine.ExecutionPayloadEnvelope{}
if p.currId == nil {
return nil, errors.New("no payload id is cached")
}
err := p.execClient.CallContext(context.Background(), result, GetPayloadMethodV2, *p.currId)
if err != nil {
return nil, err
}
payloadEnv, err := modifyExecutionPayload(*result.ExecutionPayload, result.BlockValue)
if err != nil {
return nil, err
}
marshalledOutput, err := payloadEnv.MarshalJSON()
if err != nil {
return nil, err
}
capellaPayload := &v1.ExecutionPayloadCapellaWithValue{}
if err = json.Unmarshal(marshalledOutput, capellaPayload); err != nil {
return nil, err
}
return capellaPayload, nil
}
func (p *Builder) sendHttpRequest(req *http.Request, requestBytes []byte) (*http.Response, error) {
proxyReq, err := http.NewRequest(req.Method, p.cfg.destinationUrl.String(), req.Body)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not create new request")
return nil, err
}
// Set the modified request as the proxy request body.
proxyReq.Body = ioutil.NopCloser(bytes.NewBuffer(requestBytes))
// Required proxy headers for forwarding JSON-RPC requests to the execution client.
proxyReq.Header.Set("Host", req.Host)
proxyReq.Header.Set("X-Forwarded-For", req.RemoteAddr)
proxyReq.Header.Set("Content-Type", "application/json")
client := &http.Client{}
if p.cfg.secret != "" {
client = network.NewHttpClientWithSecret(p.cfg.secret)
}
proxyRes, err := client.Do(proxyReq)
if err != nil {
p.cfg.logger.WithError(err).Error("Could not forward request to destination server")
return nil, err
}
return proxyRes, nil
}
// Peek into the bytes of an HTTP request's body.
func parseRequestBytes(req *http.Request) ([]byte, error) {
requestBytes, err := ioutil.ReadAll(req.Body)
if err != nil {
return nil, err
}
if err = req.Body.Close(); err != nil {
return nil, err
}
req.Body = ioutil.NopCloser(bytes.NewBuffer(requestBytes))
return requestBytes, nil
}
// Checks whether the JSON-RPC request is for the Ethereum engine API.
func isEngineAPICall(reqBytes []byte) bool {
jsonRequest, err := unmarshalRPCObject(reqBytes)
if err != nil {
switch {
case strings.Contains(err.Error(), "cannot unmarshal array"):
return false
default:
return false
}
}
return strings.Contains(jsonRequest.Method, "engine_")
}
func unmarshalRPCObject(b []byte) (*jsonRPCObject, error) {
r := &jsonRPCObject{}
if err := json.Unmarshal(b, r); err != nil {
return nil, err
}
return r, nil
}
func modifyExecutionPayload(execPayload engine.ExecutableData, fees *big.Int) (*engine.ExecutionPayloadEnvelope, error) {
modifiedBlock, err := executableDataToBlock(execPayload)
if err != nil {
return &engine.ExecutionPayloadEnvelope{}, err
}
return engine.BlockToExecutableData(modifiedBlock, fees), nil
}
// This modifies the provided payload to imprint the builder's extra data
func executableDataToBlock(params engine.ExecutableData) (*gethTypes.Block, error) {
txs, err := decodeTransactions(params.Transactions)
if err != nil {
return nil, err
}
// Only set withdrawalsRoot if it is non-nil. This allows CLs to use
// ExecutableData before withdrawals are enabled by marshaling
// Withdrawals as the json null value.
var withdrawalsRoot *common.Hash
if params.Withdrawals != nil {
h := gethTypes.DeriveSha(gethTypes.Withdrawals(params.Withdrawals), trie.NewStackTrie(nil))
withdrawalsRoot = &h
}
header := &gethTypes.Header{
ParentHash: params.ParentHash,
UncleHash: gethTypes.EmptyUncleHash,
Coinbase: params.FeeRecipient,
Root: params.StateRoot,
TxHash: gethTypes.DeriveSha(gethTypes.Transactions(txs), trie.NewStackTrie(nil)),
ReceiptHash: params.ReceiptsRoot,
Bloom: gethTypes.BytesToBloom(params.LogsBloom),
Difficulty: common.Big0,
Number: new(big.Int).SetUint64(params.Number),
GasLimit: params.GasLimit,
GasUsed: params.GasUsed,
Time: params.Timestamp,
BaseFee: params.BaseFeePerGas,
Extra: []byte("prysm-builder"), // add in extra data
MixDigest: params.Random,
WithdrawalsHash: withdrawalsRoot,
}
block := gethTypes.NewBlockWithHeader(header).WithBody(txs, nil /* uncles */).WithWithdrawals(params.Withdrawals)
return block, nil
}
func decodeTransactions(enc [][]byte) ([]*gethTypes.Transaction, error) {
var txs = make([]*gethTypes.Transaction, len(enc))
for i, encTx := range enc {
var tx gethTypes.Transaction
if err := tx.UnmarshalBinary(encTx); err != nil {
return nil, fmt.Errorf("invalid transaction %d: %v", i, err)
}
txs[i] = &tx
}
return txs, nil
}

View File

@@ -0,0 +1,79 @@
package builder
import (
"net/url"
"os"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
type config struct {
builderPort int
builderHost string
destinationUrl *url.URL
logger *logrus.Logger
secret string
}
type Option func(p *Builder) error
// WithHost sets the proxy server host.
func WithHost(host string) Option {
return func(p *Builder) error {
p.cfg.builderHost = host
return nil
}
}
// WithPort sets the proxy server port.
func WithPort(port int) Option {
return func(p *Builder) error {
p.cfg.builderPort = port
return nil
}
}
// WithDestinationAddress sets the forwarding address requests will be sent to.
func WithDestinationAddress(addr string) Option {
return func(p *Builder) error {
if addr == "" {
return errors.New("must provide a destination address for builder")
}
u, err := url.Parse(addr)
if err != nil {
return errors.Wrapf(err, "could not parse URL for destination address: %s", addr)
}
p.cfg.destinationUrl = u
return nil
}
}
// WithJwtSecret adds in support for jwt authenticated
// connections for our proxy.
func WithJwtSecret(secret string) Option {
return func(p *Builder) error {
p.cfg.secret = secret
return nil
}
}
// WithLogger sets a custom logger for the proxy.
func WithLogger(l *logrus.Logger) Option {
return func(p *Builder) error {
p.cfg.logger = l
return nil
}
}
// WithLogFile specifies a log file to write
// the proxies output to.
func WithLogFile(f *os.File) Option {
return func(p *Builder) error {
if p.cfg.logger == nil {
return errors.New("nil logger provided")
}
p.cfg.logger.SetOutput(f)
return nil
}
}

View File

@@ -1034,6 +1034,7 @@ func (v *validator) PushProposerSettings(ctx context.Context, km keymanager.IKey
return nil
}
func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) {
filteredKeys := make([][fieldparams.BLSPubkeyLength]byte, 0)
statusRequestKeys := make([][]byte, 0)