Compare commits

..

1 Commits

Author SHA1 Message Date
terence tsao
f75c3082d5 gloas: implement modified process withdrawals 2026-02-01 19:41:59 -08:00
96 changed files with 1746 additions and 1910 deletions

View File

@@ -2,7 +2,7 @@ name: Go
on:
push:
branches: [ master, develop ]
branches: [ master ]
pull_request:
branches: [ '*' ]
merge_group:

View File

@@ -33,8 +33,9 @@ formatters:
generated: lax
paths:
- validator/web/site_data.go
- .*_test.go
- proto
- tools/analyzers
- third_party$
- builtin$
- examples$
- examples$

View File

@@ -3,16 +3,13 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"grpc_connection_provider.go",
"grpcutils.go",
"log.go",
"mock_grpc_provider.go",
"parameters.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/api/grpc",
visibility = ["//visibility:public"],
deps = [
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
@@ -21,17 +18,12 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"grpc_connection_provider_test.go",
"grpcutils_test.go",
],
srcs = ["grpcutils_test.go"],
embed = [":go_default_library"],
deps = [
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//credentials/insecure:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
],
)

View File

@@ -1,173 +0,0 @@
package grpc
import (
"context"
"strings"
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
// GrpcConnectionProvider manages gRPC connections for failover support.
// It allows switching between different beacon node endpoints when the current one becomes unavailable.
// Only one connection is maintained at a time - when switching hosts, the old connection is closed.
type GrpcConnectionProvider interface {
// CurrentConn returns the currently active gRPC connection.
// The connection is created lazily on first call.
// Returns nil if the provider has been closed.
CurrentConn() *grpc.ClientConn
// CurrentHost returns the address of the currently active endpoint.
CurrentHost() string
// Hosts returns all configured endpoint addresses.
Hosts() []string
// SwitchHost switches to the endpoint at the given index.
// The new connection is created lazily on next CurrentConn() call.
SwitchHost(index int) error
// Close closes the current connection.
Close()
}
type grpcConnectionProvider struct {
// Immutable after construction - no lock needed for reads
endpoints []string
ctx context.Context
dialOpts []grpc.DialOption
// Current connection state (protected by mutex)
currentIndex uint64
conn *grpc.ClientConn
mu sync.Mutex
closed bool
}
// NewGrpcConnectionProvider creates a new connection provider that manages gRPC connections.
// The endpoint parameter can be a comma-separated list of addresses (e.g., "host1:4000,host2:4000").
// Only one connection is maintained at a time, created lazily on first use.
func NewGrpcConnectionProvider(
ctx context.Context,
endpoint string,
dialOpts []grpc.DialOption,
) (GrpcConnectionProvider, error) {
endpoints := parseEndpoints(endpoint)
if len(endpoints) == 0 {
return nil, errors.New("no gRPC endpoints provided")
}
log.WithFields(logrus.Fields{
"endpoints": endpoints,
"count": len(endpoints),
}).Info("Initialized gRPC connection provider")
return &grpcConnectionProvider{
endpoints: endpoints,
ctx: ctx,
dialOpts: dialOpts,
}, nil
}
// parseEndpoints splits a comma-separated endpoint string into individual endpoints.
func parseEndpoints(endpoint string) []string {
if endpoint == "" {
return nil
}
endpoints := make([]string, 0, 1)
for p := range strings.SplitSeq(endpoint, ",") {
if p = strings.TrimSpace(p); p != "" {
endpoints = append(endpoints, p)
}
}
return endpoints
}
func (p *grpcConnectionProvider) CurrentConn() *grpc.ClientConn {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return nil
}
// Return existing connection if available
if p.conn != nil {
return p.conn
}
// Create connection lazily
ep := p.endpoints[p.currentIndex]
conn, err := grpc.DialContext(p.ctx, ep, p.dialOpts...)
if err != nil {
log.WithError(err).WithField("endpoint", ep).Error("Failed to create gRPC connection")
return nil
}
p.conn = conn
log.WithField("endpoint", ep).Debug("Created gRPC connection")
return conn
}
func (p *grpcConnectionProvider) CurrentHost() string {
p.mu.Lock()
defer p.mu.Unlock()
return p.endpoints[p.currentIndex]
}
func (p *grpcConnectionProvider) Hosts() []string {
// Return a copy to maintain immutability
hosts := make([]string, len(p.endpoints))
copy(hosts, p.endpoints)
return hosts
}
func (p *grpcConnectionProvider) SwitchHost(index int) error {
if index < 0 || index >= len(p.endpoints) {
return errors.Errorf("invalid host index %d, must be between 0 and %d", index, len(p.endpoints)-1)
}
p.mu.Lock()
defer p.mu.Unlock()
if uint64(index) == p.currentIndex {
return nil // Already on this host
}
oldHost := p.endpoints[p.currentIndex]
oldConn := p.conn
p.conn = nil // Clear immediately - new connection created lazily
p.currentIndex = uint64(index)
// Close old connection asynchronously to avoid blocking the caller
if oldConn != nil {
go func() {
if err := oldConn.Close(); err != nil {
log.WithError(err).WithField("endpoint", oldHost).Debug("Failed to close previous connection")
}
}()
}
log.WithFields(logrus.Fields{
"previousHost": oldHost,
"newHost": p.endpoints[index],
}).Debug("Switched gRPC endpoint")
return nil
}
func (p *grpcConnectionProvider) Close() {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return
}
p.closed = true
if p.conn != nil {
if err := p.conn.Close(); err != nil {
log.WithError(err).WithField("endpoint", p.endpoints[p.currentIndex]).Debug("Failed to close gRPC connection")
}
p.conn = nil
}
}

View File

@@ -1,207 +0,0 @@
package grpc
import (
"context"
"net"
"reflect"
"strings"
"testing"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func TestParseEndpoints(t *testing.T) {
tests := []struct {
name string
input string
expected []string
}{
{"single endpoint", "localhost:4000", []string{"localhost:4000"}},
{"multiple endpoints", "host1:4000,host2:4000,host3:4000", []string{"host1:4000", "host2:4000", "host3:4000"}},
{"endpoints with spaces", "host1:4000, host2:4000 , host3:4000", []string{"host1:4000", "host2:4000", "host3:4000"}},
{"empty string", "", nil},
{"only commas", ",,,", []string{}},
{"trailing comma", "host1:4000,host2:4000,", []string{"host1:4000", "host2:4000"}},
{"leading comma", ",host1:4000,host2:4000", []string{"host1:4000", "host2:4000"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := parseEndpoints(tt.input)
if !reflect.DeepEqual(tt.expected, got) {
t.Errorf("parseEndpoints(%q) = %v, want %v", tt.input, got, tt.expected)
}
})
}
}
func TestNewGrpcConnectionProvider_Errors(t *testing.T) {
t.Run("no endpoints", func(t *testing.T) {
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
_, err := NewGrpcConnectionProvider(context.Background(), "", dialOpts)
require.ErrorContains(t, "no gRPC endpoints provided", err)
})
}
func TestGrpcConnectionProvider_LazyConnection(t *testing.T) {
// Start only one server but configure provider with two endpoints
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
server := grpc.NewServer()
go func() { _ = server.Serve(lis) }()
defer server.Stop()
validAddr := lis.Addr().String()
invalidAddr := "127.0.0.1:1" // Port 1 is unlikely to be listening
// Provider should succeed even though second endpoint is invalid (lazy connections)
endpoint := validAddr + "," + invalidAddr
ctx := context.Background()
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
provider, err := NewGrpcConnectionProvider(ctx, endpoint, dialOpts)
require.NoError(t, err, "Provider creation should succeed with lazy connections")
defer func() { provider.Close() }()
// First endpoint should work
conn := provider.CurrentConn()
assert.NotNil(t, conn, "First connection should be created lazily")
}
func TestGrpcConnectionProvider_SingleConnectionModel(t *testing.T) {
// Create provider with 3 endpoints
var addrs []string
var servers []*grpc.Server
for range 3 {
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
server := grpc.NewServer()
go func() { _ = server.Serve(lis) }()
addrs = append(addrs, lis.Addr().String())
servers = append(servers, server)
}
defer func() {
for _, s := range servers {
s.Stop()
}
}()
endpoint := strings.Join(addrs, ",")
ctx := context.Background()
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
provider, err := NewGrpcConnectionProvider(ctx, endpoint, dialOpts)
require.NoError(t, err)
defer func() { provider.Close() }()
// Access the internal state to verify single connection behavior
p := provider.(*grpcConnectionProvider)
// Initially no connection
p.mu.Lock()
assert.Equal(t, (*grpc.ClientConn)(nil), p.conn, "Connection should be nil before access")
p.mu.Unlock()
// Access connection - should create one
conn0 := provider.CurrentConn()
assert.NotNil(t, conn0)
p.mu.Lock()
assert.NotNil(t, p.conn, "Connection should be created after CurrentConn()")
firstConn := p.conn
p.mu.Unlock()
// Call CurrentConn again - should return same connection
conn0Again := provider.CurrentConn()
assert.Equal(t, conn0, conn0Again, "Should return same connection")
// Switch to different host - old connection should be closed, new one created lazily
require.NoError(t, provider.SwitchHost(1))
p.mu.Lock()
assert.Equal(t, (*grpc.ClientConn)(nil), p.conn, "Connection should be nil after SwitchHost (lazy)")
p.mu.Unlock()
// Get new connection
conn1 := provider.CurrentConn()
assert.NotNil(t, conn1)
assert.NotEqual(t, firstConn, conn1, "Should be a different connection after switching hosts")
}
// testProvider creates a provider with n test servers and returns cleanup function.
func testProvider(t *testing.T, n int) (GrpcConnectionProvider, []string, func()) {
var addrs []string
var cleanups []func()
for range n {
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
server := grpc.NewServer()
go func() { _ = server.Serve(lis) }()
addrs = append(addrs, lis.Addr().String())
cleanups = append(cleanups, server.Stop)
}
endpoint := strings.Join(addrs, ",")
ctx := context.Background()
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
provider, err := NewGrpcConnectionProvider(ctx, endpoint, dialOpts)
require.NoError(t, err)
cleanup := func() {
provider.Close()
for _, c := range cleanups {
c()
}
}
return provider, addrs, cleanup
}
func TestGrpcConnectionProvider(t *testing.T) {
provider, addrs, cleanup := testProvider(t, 3)
defer cleanup()
t.Run("initial state", func(t *testing.T) {
assert.Equal(t, 3, len(provider.Hosts()))
assert.Equal(t, addrs[0], provider.CurrentHost())
assert.NotNil(t, provider.CurrentConn())
})
t.Run("SwitchHost", func(t *testing.T) {
require.NoError(t, provider.SwitchHost(1))
assert.Equal(t, addrs[1], provider.CurrentHost())
assert.NotNil(t, provider.CurrentConn()) // New connection created lazily
require.NoError(t, provider.SwitchHost(0))
assert.Equal(t, addrs[0], provider.CurrentHost())
require.ErrorContains(t, "invalid host index", provider.SwitchHost(-1))
require.ErrorContains(t, "invalid host index", provider.SwitchHost(3))
})
t.Run("SwitchHost circular", func(t *testing.T) {
// Test round-robin style switching using SwitchHost with manual index
indices := []int{1, 2, 0, 1} // Simulate circular switching
for i, idx := range indices {
require.NoError(t, provider.SwitchHost(idx))
assert.Equal(t, addrs[idx], provider.CurrentHost(), "iteration %d", i)
}
})
t.Run("Hosts returns copy", func(t *testing.T) {
hosts := provider.Hosts()
original := hosts[0]
hosts[0] = "modified"
assert.Equal(t, original, provider.Hosts()[0])
})
}
func TestGrpcConnectionProvider_Close(t *testing.T) {
provider, _, cleanup := testProvider(t, 1)
defer cleanup()
assert.NotNil(t, provider.CurrentConn())
provider.Close()
assert.Equal(t, (*grpc.ClientConn)(nil), provider.CurrentConn())
provider.Close() // Double close is safe
}

View File

@@ -1,20 +0,0 @@
package grpc
import "google.golang.org/grpc"
// MockGrpcProvider implements GrpcConnectionProvider for testing.
type MockGrpcProvider struct {
MockConn *grpc.ClientConn
MockHosts []string
}
func (m *MockGrpcProvider) CurrentConn() *grpc.ClientConn { return m.MockConn }
func (m *MockGrpcProvider) CurrentHost() string {
if len(m.MockHosts) > 0 {
return m.MockHosts[0]
}
return ""
}
func (m *MockGrpcProvider) Hosts() []string { return m.MockHosts }
func (m *MockGrpcProvider) SwitchHost(int) error { return nil }
func (m *MockGrpcProvider) Close() {}

View File

@@ -1,34 +0,0 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"log.go",
"mock_rest_provider.go",
"rest_connection_provider.go",
"rest_handler.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/api/rest",
visibility = ["//visibility:public"],
deps = [
"//api:go_default_library",
"//api/apiutil:go_default_library",
"//api/client:go_default_library",
"//config/params:go_default_library",
"//network/httputil:go_default_library",
"//runtime/version:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opentelemetry_go_contrib_instrumentation_net_http_otelhttp//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["rest_connection_provider_test.go"],
embed = [":go_default_library"],
deps = [
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
],
)

View File

@@ -1,9 +0,0 @@
// Code generated by hack/gen-logs.sh; DO NOT EDIT.
// This file is created and regenerated automatically. Anything added here might get removed.
package rest
import "github.com/sirupsen/logrus"
// The prefix for logs from this package will be the text after the last slash in the package path.
// If you wish to change this, you should add your desired name in the runtime/logging/logrus-prefixed-formatter/prefix-replacement.go file.
var log = logrus.WithField("package", "api/rest")

View File

@@ -1,49 +0,0 @@
package rest
import (
"bytes"
"context"
"net/http"
)
// MockRestProvider implements RestConnectionProvider for testing.
type MockRestProvider struct {
MockClient *http.Client
MockHandler RestHandler
MockHosts []string
HostIndex int
}
func (m *MockRestProvider) HttpClient() *http.Client { return m.MockClient }
func (m *MockRestProvider) RestHandler() RestHandler { return m.MockHandler }
func (m *MockRestProvider) CurrentHost() string {
if len(m.MockHosts) > 0 {
return m.MockHosts[m.HostIndex%len(m.MockHosts)]
}
return ""
}
func (m *MockRestProvider) Hosts() []string { return m.MockHosts }
func (m *MockRestProvider) SwitchHost(index int) error { m.HostIndex = index; return nil }
// MockRestHandler implements RestHandler for testing.
type MockRestHandler struct {
MockHost string
MockClient *http.Client
}
func (m *MockRestHandler) Get(_ context.Context, _ string, _ any) error { return nil }
func (m *MockRestHandler) GetStatusCode(_ context.Context, _ string) (int, error) {
return http.StatusOK, nil
}
func (m *MockRestHandler) GetSSZ(_ context.Context, _ string) ([]byte, http.Header, error) {
return nil, nil, nil
}
func (m *MockRestHandler) Post(_ context.Context, _ string, _ map[string]string, _ *bytes.Buffer, _ any) error {
return nil
}
func (m *MockRestHandler) PostSSZ(_ context.Context, _ string, _ map[string]string, _ *bytes.Buffer) ([]byte, http.Header, error) {
return nil, nil, nil
}
func (m *MockRestHandler) HttpClient() *http.Client { return m.MockClient }
func (m *MockRestHandler) Host() string { return m.MockHost }
func (m *MockRestHandler) SwitchHost(host string) { m.MockHost = host }

View File

@@ -1,158 +0,0 @@
package rest
import (
"net/http"
"strings"
"sync/atomic"
"time"
"github.com/OffchainLabs/prysm/v7/api/client"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)
// RestConnectionProvider manages HTTP client configuration for REST API with failover support.
// It allows switching between different beacon node REST endpoints when the current one becomes unavailable.
type RestConnectionProvider interface {
// HttpClient returns the configured HTTP client with headers, timeout, and optional tracing.
HttpClient() *http.Client
// RestHandler returns the REST handler for making API requests.
RestHandler() RestHandler
// CurrentHost returns the current REST API endpoint URL.
CurrentHost() string
// Hosts returns all configured REST API endpoint URLs.
Hosts() []string
// SwitchHost switches to the endpoint at the given index.
SwitchHost(index int) error
}
// RestConnectionProviderOption is a functional option for configuring the REST connection provider.
type RestConnectionProviderOption func(*restConnectionProvider)
// WithHttpTimeout sets the HTTP client timeout.
func WithHttpTimeout(timeout time.Duration) RestConnectionProviderOption {
return func(p *restConnectionProvider) {
p.timeout = timeout
}
}
// WithHttpHeaders sets custom HTTP headers to include in all requests.
func WithHttpHeaders(headers map[string][]string) RestConnectionProviderOption {
return func(p *restConnectionProvider) {
p.headers = headers
}
}
// WithTracing enables OpenTelemetry tracing for HTTP requests.
func WithTracing() RestConnectionProviderOption {
return func(p *restConnectionProvider) {
p.enableTracing = true
}
}
type restConnectionProvider struct {
endpoints []string
httpClient *http.Client
restHandler RestHandler
currentIndex atomic.Uint64
timeout time.Duration
headers map[string][]string
enableTracing bool
}
// NewRestConnectionProvider creates a new REST connection provider that manages HTTP client configuration.
// The endpoint parameter can be a comma-separated list of URLs (e.g., "http://host1:3500,http://host2:3500").
func NewRestConnectionProvider(endpoint string, opts ...RestConnectionProviderOption) (RestConnectionProvider, error) {
endpoints := parseEndpoints(endpoint)
if len(endpoints) == 0 {
return nil, errors.New("no REST API endpoints provided")
}
p := &restConnectionProvider{
endpoints: endpoints,
}
for _, opt := range opts {
opt(p)
}
// Build the HTTP transport chain
var transport http.RoundTripper = http.DefaultTransport
// Add custom headers if configured
if len(p.headers) > 0 {
transport = client.NewCustomHeadersTransport(transport, p.headers)
}
// Add tracing if enabled
if p.enableTracing {
transport = otelhttp.NewTransport(transport)
}
p.httpClient = &http.Client{
Timeout: p.timeout,
Transport: transport,
}
// Create the REST handler with the HTTP client and initial host
p.restHandler = newRestHandler(*p.httpClient, endpoints[0])
log.WithFields(logrus.Fields{
"endpoints": endpoints,
"count": len(endpoints),
}).Info("Initialized REST connection provider")
return p, nil
}
// parseEndpoints splits a comma-separated endpoint string into individual endpoints.
func parseEndpoints(endpoint string) []string {
if endpoint == "" {
return nil
}
endpoints := make([]string, 0, 1)
for p := range strings.SplitSeq(endpoint, ",") {
if p = strings.TrimSpace(p); p != "" {
endpoints = append(endpoints, p)
}
}
return endpoints
}
func (p *restConnectionProvider) HttpClient() *http.Client {
return p.httpClient
}
func (p *restConnectionProvider) RestHandler() RestHandler {
return p.restHandler
}
func (p *restConnectionProvider) CurrentHost() string {
return p.endpoints[p.currentIndex.Load()]
}
func (p *restConnectionProvider) Hosts() []string {
// Return a copy to maintain immutability
hosts := make([]string, len(p.endpoints))
copy(hosts, p.endpoints)
return hosts
}
func (p *restConnectionProvider) SwitchHost(index int) error {
if index < 0 || index >= len(p.endpoints) {
return errors.Errorf("invalid host index %d, must be between 0 and %d", index, len(p.endpoints)-1)
}
oldIdx := p.currentIndex.Load()
p.currentIndex.Store(uint64(index))
// Update the rest handler's host
p.restHandler.SwitchHost(p.endpoints[index])
log.WithFields(logrus.Fields{
"previousHost": p.endpoints[oldIdx],
"newHost": p.endpoints[index],
}).Debug("Switched REST endpoint")
return nil
}

View File

@@ -1,80 +0,0 @@
package rest
import (
"reflect"
"testing"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
)
func TestParseEndpoints(t *testing.T) {
tests := []struct {
name string
input string
expected []string
}{
{"single endpoint", "http://localhost:3500", []string{"http://localhost:3500"}},
{"multiple endpoints", "http://host1:3500,http://host2:3500,http://host3:3500", []string{"http://host1:3500", "http://host2:3500", "http://host3:3500"}},
{"endpoints with spaces", "http://host1:3500, http://host2:3500 , http://host3:3500", []string{"http://host1:3500", "http://host2:3500", "http://host3:3500"}},
{"empty string", "", nil},
{"only commas", ",,,", []string{}},
{"trailing comma", "http://host1:3500,http://host2:3500,", []string{"http://host1:3500", "http://host2:3500"}},
{"leading comma", ",http://host1:3500,http://host2:3500", []string{"http://host1:3500", "http://host2:3500"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := parseEndpoints(tt.input)
if !reflect.DeepEqual(tt.expected, got) {
t.Errorf("parseEndpoints(%q) = %v, want %v", tt.input, got, tt.expected)
}
})
}
}
func TestNewRestConnectionProvider_Errors(t *testing.T) {
t.Run("no endpoints", func(t *testing.T) {
_, err := NewRestConnectionProvider("")
require.ErrorContains(t, "no REST API endpoints provided", err)
})
}
func TestRestConnectionProvider(t *testing.T) {
provider, err := NewRestConnectionProvider("http://host1:3500,http://host2:3500,http://host3:3500")
require.NoError(t, err)
t.Run("initial state", func(t *testing.T) {
assert.Equal(t, 3, len(provider.Hosts()))
assert.Equal(t, "http://host1:3500", provider.CurrentHost())
assert.NotNil(t, provider.HttpClient())
})
t.Run("SwitchHost", func(t *testing.T) {
require.NoError(t, provider.SwitchHost(1))
assert.Equal(t, "http://host2:3500", provider.CurrentHost())
require.NoError(t, provider.SwitchHost(0))
assert.Equal(t, "http://host1:3500", provider.CurrentHost())
require.ErrorContains(t, "invalid host index", provider.SwitchHost(-1))
require.ErrorContains(t, "invalid host index", provider.SwitchHost(3))
})
t.Run("Hosts returns copy", func(t *testing.T) {
hosts := provider.Hosts()
original := hosts[0]
hosts[0] = "modified"
assert.Equal(t, original, provider.Hosts()[0])
})
}
func TestRestConnectionProvider_WithOptions(t *testing.T) {
headers := map[string][]string{"Authorization": {"Bearer token"}}
provider, err := NewRestConnectionProvider(
"http://localhost:3500",
WithHttpHeaders(headers),
WithHttpTimeout(30000000000), // 30 seconds in nanoseconds
WithTracing(),
)
require.NoError(t, err)
assert.NotNil(t, provider.HttpClient())
assert.Equal(t, "http://localhost:3500", provider.CurrentHost())
}

View File

@@ -7,6 +7,7 @@ go_library(
"payload_attestation.go",
"pending_payment.go",
"proposer_slashing.go",
"withdrawal.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/beacon-chain/core/gloas",
visibility = ["//visibility:public"],

View File

@@ -0,0 +1,107 @@
package gloas
import (
"fmt"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/pkg/errors"
)
// ProcessWithdrawals applies withdrawals to the state for Gloas.
//
// Spec v1.7.0-alpha.1 (pseudocode):
//
// def process_withdrawals(
//
// state: BeaconState,
// # [Modified in Gloas:EIP7732]
// # Removed `payload`
//
// ) -> None:
//
// # [New in Gloas:EIP7732]
// # Return early if the parent block is empty
// if not is_parent_block_full(state):
// return
//
// # Get expected withdrawals
// expected = get_expected_withdrawals(state)
//
// # Apply expected withdrawals
// apply_withdrawals(state, expected.withdrawals)
//
// # Update withdrawals fields in the state
// update_next_withdrawal_index(state, expected.withdrawals)
// # [New in Gloas:EIP7732]
// update_payload_expected_withdrawals(state, expected.withdrawals)
// # [New in Gloas:EIP7732]
// update_builder_pending_withdrawals(state, expected.processed_builder_withdrawals_count)
// update_pending_partial_withdrawals(state, expected.processed_partial_withdrawals_count)
// # [New in Gloas:EIP7732]
// update_next_withdrawal_builder_index(state, expected.processed_builders_sweep_count)
// update_next_withdrawal_validator_index(state, expected.withdrawals)
func ProcessWithdrawals(st state.BeaconState) error {
full, err := st.IsParentBlockFull()
if err != nil {
return errors.Wrap(err, "could not get parent block full status")
}
if !full {
return nil
}
expected, err := st.ExpectedWithdrawalsGloas()
if err != nil {
return errors.Wrap(err, "could not get expected withdrawals")
}
if err := st.DecreaseWithdrawalBalances(expected.Withdrawals); err != nil {
return errors.Wrap(err, "could not decrease withdrawal balances")
}
if len(expected.Withdrawals) > 0 {
if err := st.SetNextWithdrawalIndex(expected.Withdrawals[len(expected.Withdrawals)-1].Index + 1); err != nil {
return errors.Wrap(err, "could not set next withdrawal index")
}
}
err = st.SetPayloadExpectedWithdrawals(expected.Withdrawals)
if err != nil {
return errors.Wrap(err, "could not set payload expected withdrawals")
}
err = st.DequeueBuilderPendingWithdrawals(expected.ProcessedBuilderWithdrawalsCount)
if err != nil {
return fmt.Errorf("unable to dequeue builder pending withdrawals from state: %w", err)
}
if err := st.DequeuePendingPartialWithdrawals(expected.ProcessedPartialWithdrawalsCount); err != nil {
return fmt.Errorf("unable to dequeue partial withdrawals from state: %w", err)
}
err = st.SetNextWithdrawalBuilderIndex(expected.NextWithdrawalBuilderIndex)
if err != nil {
return errors.Wrap(err, "could not set next withdrawal builder index")
}
var nextValidatorIndex primitives.ValidatorIndex
if uint64(len(expected.Withdrawals)) < params.BeaconConfig().MaxWithdrawalsPerPayload {
nextValidatorIndex, err = st.NextWithdrawalValidatorIndex()
if err != nil {
return errors.Wrap(err, "could not get next withdrawal validator index")
}
nextValidatorIndex += primitives.ValidatorIndex(params.BeaconConfig().MaxValidatorsPerWithdrawalsSweep)
nextValidatorIndex = nextValidatorIndex % primitives.ValidatorIndex(st.NumValidators())
} else {
nextValidatorIndex = expected.Withdrawals[len(expected.Withdrawals)-1].ValidatorIndex + 1
if nextValidatorIndex == primitives.ValidatorIndex(st.NumValidators()) {
nextValidatorIndex = 0
}
}
if err := st.SetNextWithdrawalValidatorIndex(nextValidatorIndex); err != nil {
return errors.Wrap(err, "could not set next withdrawal validator index")
}
return nil
}

View File

@@ -610,7 +610,6 @@ func (dcs *DataColumnStorage) Clear() error {
// prune clean the cache, the filesystem and mutexes.
func (dcs *DataColumnStorage) prune() {
log.WithField("highestStoredEpoch", dcs.cache.HighestEpoch()).WithField("retentionEpochs", dcs.retentionEpochs).Debug("Pruning data column storage")
startTime := time.Now()
defer func() {
dataColumnPruneLatency.Observe(float64(time.Since(startTime).Milliseconds()))

View File

@@ -67,6 +67,7 @@ func getSubscriptionStatusFromDB(t *testing.T, db *Store) bool {
return subscribed
}
func TestUpdateCustodyInfo(t *testing.T) {
ctx := t.Context()

View File

@@ -134,19 +134,10 @@ type BeaconNode struct {
// New creates a new node instance, sets up configuration options, and registers
// every required service to the node.
func New(cliCtx *cli.Context, cancel context.CancelFunc, optFuncs []func(*cli.Context) ([]Option, error), opts ...Option) (*BeaconNode, error) {
func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*BeaconNode, error) {
if err := configureBeacon(cliCtx); err != nil {
return nil, errors.Wrap(err, "could not set beacon configuration options")
}
for _, of := range optFuncs {
ofo, err := of(cliCtx)
if err != nil {
return nil, err
}
if ofo != nil {
opts = append(opts, ofo...)
}
}
ctx := cliCtx.Context
beacon := &BeaconNode{

View File

@@ -59,7 +59,7 @@ func TestNodeClose_OK(t *testing.T) {
WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)),
}
node, err := New(ctx, cancel, nil, options...)
node, err := New(ctx, cancel, options...)
require.NoError(t, err)
node.Close()
@@ -87,7 +87,7 @@ func TestNodeStart_Ok(t *testing.T) {
WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)),
}
node, err := New(ctx, cancel, nil, options...)
node, err := New(ctx, cancel, options...)
require.NoError(t, err)
require.NotNil(t, node.lcStore)
node.services = &runtime.ServiceRegistry{}
@@ -116,7 +116,7 @@ func TestNodeStart_SyncChecker(t *testing.T) {
WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)),
}
node, err := New(ctx, cancel, nil, options...)
node, err := New(ctx, cancel, options...)
require.NoError(t, err)
go func() {
node.Start()
@@ -151,7 +151,7 @@ func TestClearDB(t *testing.T) {
WithDataColumnStorage(filesystem.NewEphemeralDataColumnStorage(t)),
}
_, err = New(context, cancel, nil, options...)
_, err = New(context, cancel, options...)
require.NoError(t, err)
require.LogsContain(t, hook, "Removing database")
}

View File

@@ -26,8 +26,8 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits/mock"
p2pMock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core"
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing"
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/crypto/bls"

View File

@@ -84,6 +84,8 @@ func TestGetSpec(t *testing.T) {
config.FuluForkVersion = []byte("FuluForkVersion")
config.FuluForkEpoch = 109
config.GloasForkEpoch = 110
config.BuilderIndexFlag = 111
config.MaxBuildersPerWithdrawalsSweep = 112
config.BLSWithdrawalPrefixByte = byte('b')
config.ETH1AddressWithdrawalPrefixByte = byte('c')
config.GenesisDelay = 24
@@ -220,7 +222,7 @@ func TestGetSpec(t *testing.T) {
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), &resp))
data, ok := resp.Data.(map[string]any)
require.Equal(t, true, ok)
assert.Equal(t, 192, len(data))
assert.Equal(t, 194, len(data))
for k, v := range data {
t.Run(k, func(t *testing.T) {
switch k {
@@ -302,6 +304,10 @@ func TestGetSpec(t *testing.T) {
assert.Equal(t, "109", v)
case "GLOAS_FORK_EPOCH":
assert.Equal(t, "110", v)
case "BUILDER_INDEX_FLAG":
assert.Equal(t, "111", v)
case "MAX_BUILDERS_PER_WITHDRAWALS_SWEEP":
assert.Equal(t, "112", v)
case "MIN_ANCHOR_POW_BLOCK_DIFFICULTY":
assert.Equal(t, "1000", v)
case "BLS_WITHDRAWAL_PREFIX":

View File

@@ -3,6 +3,7 @@ package state
import (
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
)
@@ -13,6 +14,10 @@ type writeOnlyGloasFields interface {
RotateBuilderPendingPayments() error
AppendBuilderPendingWithdrawals([]*ethpb.BuilderPendingWithdrawal) error
UpdateExecutionPayloadAvailabilityAtIndex(idx uint64, val byte) error
SetPayloadExpectedWithdrawals(withdrawals []*enginev1.Withdrawal) error
DecreaseWithdrawalBalances(withdrawals []*enginev1.Withdrawal) error
DequeueBuilderPendingWithdrawals(num uint64) error
SetNextWithdrawalBuilderIndex(idx primitives.BuilderIndex) error
}
type readOnlyGloasFields interface {
@@ -21,4 +26,15 @@ type readOnlyGloasFields interface {
CanBuilderCoverBid(primitives.BuilderIndex, primitives.Gwei) (bool, error)
LatestBlockHash() ([32]byte, error)
BuilderPendingPayments() ([]*ethpb.BuilderPendingPayment, error)
IsParentBlockFull() (bool, error)
ExpectedWithdrawalsGloas() (ExpectedWithdrawalsGloasResult, error)
}
// ExpectedWithdrawalsGloasResult bundles the expected withdrawals and related counters
// for the Gloas fork to avoid positional return mistakes.
type ExpectedWithdrawalsGloasResult struct {
Withdrawals []*enginev1.Withdrawal
ProcessedBuilderWithdrawalsCount uint64
ProcessedPartialWithdrawalsCount uint64
NextWithdrawalBuilderIndex primitives.BuilderIndex
}

View File

@@ -1,13 +1,17 @@
package state_native
import (
"bytes"
"fmt"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/time/slots"
)
// LatestBlockHash returns the hash of the latest execution block.
@@ -147,3 +151,223 @@ func (b *BeaconState) BuilderPendingPayments() ([]*ethpb.BuilderPendingPayment,
return b.builderPendingPaymentsVal(), nil
}
// IsParentBlockFull returns true if the last committed payload bid was fulfilled with a payload,
// which can only happen when both beacon block and payload were present.
// This function must be called on a beacon state before processing the execution payload bid in the block.
// Spec v1.7.0-alpha.2 (pseudocode):
// def is_parent_block_full(state: BeaconState) -> bool:
//
// return state.latest_execution_payload_bid.block_hash == state.latest_block_hash
func (b *BeaconState) IsParentBlockFull() (bool, error) {
if b.version < version.Gloas {
return false, errNotSupported("IsParentBlockFull", b.version)
}
b.lock.RLock()
defer b.lock.RUnlock()
if b.latestExecutionPayloadBid == nil {
return false, nil
}
return bytes.Equal(b.latestExecutionPayloadBid.BlockHash, b.latestBlockHash), nil
}
// ExpectedWithdrawalsGloas returns the withdrawals that a proposer will need to pack in the next block
// applied to the current state. It is also used by validators to check that the execution payload carried
// the right number of withdrawals.
//
// Spec v1.7.0-alpha.1:
//
// def get_expected_withdrawals(state: BeaconState) -> ExpectedWithdrawals:
// withdrawal_index = state.next_withdrawal_index
// withdrawals: List[Withdrawal] = []
//
// # [New in Gloas:EIP7732]
// # Get builder withdrawals
// builder_withdrawals, withdrawal_index, processed_builder_withdrawals_count = (
// get_builder_withdrawals(state, withdrawal_index, withdrawals)
// )
// withdrawals.extend(builder_withdrawals)
//
// # Get partial withdrawals
// partial_withdrawals, withdrawal_index, processed_partial_withdrawals_count = (
// get_pending_partial_withdrawals(state, withdrawal_index, withdrawals)
// )
// withdrawals.extend(partial_withdrawals)
//
// # [New in Gloas:EIP7732]
// # Get builders sweep withdrawals
// builders_sweep_withdrawals, withdrawal_index, processed_builders_sweep_count = (
// get_builders_sweep_withdrawals(state, withdrawal_index, withdrawals)
// )
// withdrawals.extend(builders_sweep_withdrawals)
//
// # Get validators sweep withdrawals
// validators_sweep_withdrawals, withdrawal_index, processed_validators_sweep_count = (
// get_validators_sweep_withdrawals(state, withdrawal_index, withdrawals)
// )
// withdrawals.extend(validators_sweep_withdrawals)
//
// return ExpectedWithdrawals(
// withdrawals,
// # [New in Gloas:EIP7732]
// processed_builder_withdrawals_count,
// processed_partial_withdrawals_count,
// # [New in Gloas:EIP7732]
// processed_builders_sweep_count,
// processed_validators_sweep_count,
// )
func (b *BeaconState) ExpectedWithdrawalsGloas() (state.ExpectedWithdrawalsGloasResult, error) {
if b.version < version.Gloas {
return state.ExpectedWithdrawalsGloasResult{}, errNotSupported("ExpectedWithdrawalsGloas", b.version)
}
b.lock.RLock()
defer b.lock.RUnlock()
cfg := params.BeaconConfig()
withdrawals := make([]*enginev1.Withdrawal, 0, cfg.MaxWithdrawalsPerPayload)
withdrawalIndex := b.nextWithdrawalIndex
withdrawalIndex, processedBuilderWithdrawalsCount, err := b.appendBuilderWithdrawals(withdrawalIndex, &withdrawals)
if err != nil {
return state.ExpectedWithdrawalsGloasResult{}, err
}
withdrawalIndex, processedPartialWithdrawalsCount, err := b.appendPendingPartialWithdrawals(withdrawalIndex, &withdrawals)
if err != nil {
return state.ExpectedWithdrawalsGloasResult{}, err
}
withdrawalIndex, nextBuilderIndex, err := b.appendBuildersSweepWithdrawals(withdrawalIndex, &withdrawals)
if err != nil {
return state.ExpectedWithdrawalsGloasResult{}, err
}
err = b.appendValidatorsSweepWithdrawals(withdrawalIndex, &withdrawals)
if err != nil {
return state.ExpectedWithdrawalsGloasResult{}, err
}
return state.ExpectedWithdrawalsGloasResult{
Withdrawals: withdrawals,
ProcessedBuilderWithdrawalsCount: processedBuilderWithdrawalsCount,
ProcessedPartialWithdrawalsCount: processedPartialWithdrawalsCount,
NextWithdrawalBuilderIndex: nextBuilderIndex,
}, nil
}
// appendBuilderWithdrawals returns builder pending withdrawals, the updated withdrawal index,
// and the processed count, following spec v1.7.0-alpha.2:
//
// def get_builder_withdrawals(state, withdrawal_index, prior_withdrawals):
// withdrawals_limit = MAX_WITHDRAWALS_PER_PAYLOAD - 1
// assert len(prior_withdrawals) <= withdrawals_limit
// processed_count = 0
// withdrawals = []
// for withdrawal in state.builder_pending_withdrawals:
// if len(prior_withdrawals + withdrawals) >= withdrawals_limit:
// break
// withdrawals.append(Withdrawal(
// index=withdrawal_index,
// validator_index=convert_builder_index_to_validator_index(withdrawal.builder_index),
// address=withdrawal.fee_recipient,
// amount=withdrawal.amount,
// ))
// withdrawal_index += 1
// processed_count += 1
// return withdrawals, withdrawal_index, processed_count
func (b *BeaconState) appendBuilderWithdrawals(withdrawalIndex uint64, withdrawals *[]*enginev1.Withdrawal) (uint64, uint64, error) {
cfg := params.BeaconConfig()
withdrawalsLimit := int(cfg.MaxWithdrawalsPerPayload - 1)
ws := *withdrawals
if len(ws) > withdrawalsLimit {
return withdrawalIndex, 0, fmt.Errorf("prior withdrawals length %d exceeds limit %d", len(ws), withdrawalsLimit)
}
var processedCount uint64
for _, w := range b.builderPendingWithdrawals {
if len(ws) >= withdrawalsLimit {
break
}
ws = append(ws, &enginev1.Withdrawal{
Index: withdrawalIndex,
ValidatorIndex: w.BuilderIndex.ToValidatorIndex(),
Address: w.FeeRecipient,
Amount: uint64(w.Amount),
})
withdrawalIndex++
processedCount++
}
*withdrawals = ws
return withdrawalIndex, processedCount, nil
}
// appendBuildersSweepWithdrawals returns builder sweep withdrawals, the updated withdrawal index,
// and the processed count, following spec v1.7.0-alpha.2:
//
// def get_builders_sweep_withdrawals(state, withdrawal_index, prior_withdrawals):
// epoch = get_current_epoch(state)
// builders_limit = min(len(state.builders), MAX_BUILDERS_PER_WITHDRAWALS_SWEEP)
// withdrawals_limit = MAX_WITHDRAWALS_PER_PAYLOAD - 1
// assert len(prior_withdrawals) <= withdrawals_limit
// processed_count = 0
// withdrawals = []
// builder_index = state.next_withdrawal_builder_index
// for _ in range(builders_limit):
// if len(prior_withdrawals + withdrawals) >= withdrawals_limit:
// break
// builder = state.builders[builder_index]
// if builder.withdrawable_epoch <= epoch and builder.balance > 0:
// withdrawals.append(Withdrawal(
// index=withdrawal_index,
// validator_index=convert_builder_index_to_validator_index(builder_index),
// address=builder.execution_address,
// amount=builder.balance,
// ))
// withdrawal_index += 1
// builder_index = BuilderIndex((builder_index + 1) % len(state.builders))
// processed_count += 1
// return withdrawals, withdrawal_index, processed_count
func (b *BeaconState) appendBuildersSweepWithdrawals(withdrawalIndex uint64, withdrawals *[]*enginev1.Withdrawal) (uint64, primitives.BuilderIndex, error) {
cfg := params.BeaconConfig()
withdrawalsLimit := int(cfg.MaxWithdrawalsPerPayload - 1)
if len(*withdrawals) > withdrawalsLimit {
return withdrawalIndex, 0, fmt.Errorf("prior withdrawals length %d exceeds limit %d", len(*withdrawals), withdrawalsLimit)
}
ws := *withdrawals
buildersLimit := len(b.builders)
if maxBuilders := int(cfg.MaxBuildersPerWithdrawalsSweep); buildersLimit > maxBuilders {
buildersLimit = maxBuilders
}
builderIndex := b.nextWithdrawalBuilderIndex
epoch := slots.ToEpoch(b.slot)
for i := 0; i < buildersLimit; i++ {
if len(ws) >= withdrawalsLimit {
break
}
builder := b.builders[builderIndex]
if builder != nil && builder.WithdrawableEpoch <= epoch && builder.Balance > 0 {
ws = append(ws, &enginev1.Withdrawal{
Index: withdrawalIndex,
ValidatorIndex: builderIndex.ToValidatorIndex(),
Address: builder.ExecutionAddress,
Amount: uint64(builder.Balance),
})
withdrawalIndex++
}
builderIndex = primitives.BuilderIndex((uint64(builderIndex) + 1) % uint64(len(b.builders)))
}
*withdrawals = ws
return withdrawalIndex, builderIndex, nil
}

View File

@@ -1,26 +1,27 @@
package state_native_test
package state_native
import (
"bytes"
"testing"
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/testing/require"
"github.com/OffchainLabs/prysm/v7/testing/util"
"github.com/OffchainLabs/prysm/v7/time/slots"
)
func TestLatestBlockHash(t *testing.T) {
t.Run("returns error before gloas", func(t *testing.T) {
st, _ := util.DeterministicGenesisState(t, 1)
st := &BeaconState{version: version.Fulu}
_, err := st.LatestBlockHash()
require.ErrorContains(t, "is not supported", err)
})
t.Run("returns zero hash when unset", func(t *testing.T) {
st, err := state_native.InitializeFromProtoGloas(&ethpb.BeaconStateGloas{})
st, err := InitializeFromProtoGloas(&ethpb.BeaconStateGloas{})
require.NoError(t, err)
got, err := st.LatestBlockHash()
@@ -33,7 +34,7 @@ func TestLatestBlockHash(t *testing.T) {
var want [32]byte
copy(want[:], hashBytes)
st, err := state_native.InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
st, err := InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
LatestBlockHash: hashBytes,
})
require.NoError(t, err)
@@ -46,17 +47,14 @@ func TestLatestBlockHash(t *testing.T) {
func TestBuilderPubkey(t *testing.T) {
t.Run("returns error before gloas", func(t *testing.T) {
stIface, _ := util.DeterministicGenesisState(t, 1)
native, ok := stIface.(*state_native.BeaconState)
require.Equal(t, true, ok)
_, err := native.BuilderPubkey(0)
st := &BeaconState{version: version.Fulu}
_, err := st.BuilderPubkey(0)
require.ErrorContains(t, "is not supported", err)
})
t.Run("returns pubkey copy", func(t *testing.T) {
pubkey := bytes.Repeat([]byte{0xAA}, 48)
stIface, err := state_native.InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
stIface, err := InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
Builders: []*ethpb.Builder{
{
Pubkey: pubkey,
@@ -80,12 +78,12 @@ func TestBuilderPubkey(t *testing.T) {
})
t.Run("out of range returns error", func(t *testing.T) {
stIface, err := state_native.InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
stIface, err := InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
Builders: []*ethpb.Builder{},
})
require.NoError(t, err)
st := stIface.(*state_native.BeaconState)
st := stIface.(*BeaconState)
_, err = st.BuilderPubkey(1)
require.ErrorContains(t, "out of range", err)
})
@@ -93,7 +91,7 @@ func TestBuilderPubkey(t *testing.T) {
func TestBuilderHelpers(t *testing.T) {
t.Run("is active builder", func(t *testing.T) {
st, err := state_native.InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
st, err := InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
Builders: []*ethpb.Builder{
{
Balance: 10,
@@ -120,7 +118,7 @@ func TestBuilderHelpers(t *testing.T) {
},
FinalizedCheckpoint: &ethpb.Checkpoint{Epoch: 2},
}
stInactive, err := state_native.InitializeFromProtoGloas(stProto)
stInactive, err := InitializeFromProtoGloas(stProto)
require.NoError(t, err)
active, err = stInactive.IsActiveBuilder(0)
@@ -129,7 +127,7 @@ func TestBuilderHelpers(t *testing.T) {
})
t.Run("can builder cover bid", func(t *testing.T) {
stIface, err := state_native.InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
stIface, err := InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
Builders: []*ethpb.Builder{
{
Balance: primitives.Gwei(params.BeaconConfig().MinDepositAmount + 50),
@@ -147,7 +145,7 @@ func TestBuilderHelpers(t *testing.T) {
})
require.NoError(t, err)
st := stIface.(*state_native.BeaconState)
st := stIface.(*BeaconState)
ok, err := st.CanBuilderCoverBid(0, 20)
require.NoError(t, err)
require.Equal(t, true, ok)
@@ -159,10 +157,245 @@ func TestBuilderHelpers(t *testing.T) {
}
func TestBuilderPendingPayments_UnsupportedVersion(t *testing.T) {
stIface, err := state_native.InitializeFromProtoElectra(&ethpb.BeaconStateElectra{})
stIface, err := InitializeFromProtoElectra(&ethpb.BeaconStateElectra{})
require.NoError(t, err)
st := stIface.(*state_native.BeaconState)
st := stIface.(*BeaconState)
_, err = st.BuilderPendingPayments()
require.ErrorContains(t, "BuilderPendingPayments", err)
}
func TestIsParentBlockFull(t *testing.T) {
t.Run("returns error before gloas", func(t *testing.T) {
st := &BeaconState{version: version.Fulu}
_, err := st.IsParentBlockFull()
require.ErrorContains(t, "is not supported", err)
})
t.Run("returns false when bid is nil", func(t *testing.T) {
st := &BeaconState{version: version.Gloas}
got, err := st.IsParentBlockFull()
require.NoError(t, err)
require.Equal(t, false, got)
})
t.Run("returns true when hashes match", func(t *testing.T) {
hash := bytes.Repeat([]byte{0xAB}, 32)
st := &BeaconState{
version: version.Gloas,
latestExecutionPayloadBid: &ethpb.ExecutionPayloadBid{
BlockHash: hash,
},
latestBlockHash: hash,
}
got, err := st.IsParentBlockFull()
require.NoError(t, err)
require.Equal(t, true, got)
})
t.Run("returns false when hashes differ", func(t *testing.T) {
hash := bytes.Repeat([]byte{0xAB}, 32)
other := bytes.Repeat([]byte{0xCD}, 32)
st := &BeaconState{
version: version.Gloas,
latestExecutionPayloadBid: &ethpb.ExecutionPayloadBid{
BlockHash: hash,
},
latestBlockHash: other,
}
got, err := st.IsParentBlockFull()
require.NoError(t, err)
require.Equal(t, false, got)
})
}
func TestAppendBuilderWithdrawals(t *testing.T) {
t.Run("errors when prior withdrawals exceed limit", func(t *testing.T) {
st := &BeaconState{}
limit := params.BeaconConfig().MaxWithdrawalsPerPayload - 1
withdrawals := make([]*enginev1.Withdrawal, limit+1)
nextIndex, processed, err := st.appendBuilderWithdrawals(5, &withdrawals)
require.ErrorContains(t, "exceeds limit", err)
require.Equal(t, uint64(5), nextIndex)
require.Equal(t, uint64(0), processed)
require.Equal(t, int(limit+1), len(withdrawals))
})
t.Run("appends builder withdrawals and increments index", func(t *testing.T) {
st := &BeaconState{
builderPendingWithdrawals: []*ethpb.BuilderPendingWithdrawal{
{BuilderIndex: 1, FeeRecipient: []byte{0x01}, Amount: 11},
{BuilderIndex: 2, FeeRecipient: []byte{0x02}, Amount: 22},
{BuilderIndex: 3, FeeRecipient: []byte{0x03}, Amount: 33},
},
}
withdrawals := []*enginev1.Withdrawal{
{Index: 7, ValidatorIndex: 9, Address: []byte{0xAA}, Amount: 99},
}
nextIndex, processed, err := st.appendBuilderWithdrawals(10, &withdrawals)
require.NoError(t, err)
require.Equal(t, uint64(13), nextIndex)
require.Equal(t, uint64(3), processed)
require.Equal(t, 4, len(withdrawals))
require.DeepEqual(t, &enginev1.Withdrawal{
Index: 10,
ValidatorIndex: primitives.BuilderIndex(1).ToValidatorIndex(),
Address: []byte{0x01},
Amount: 11,
}, withdrawals[1])
require.DeepEqual(t, &enginev1.Withdrawal{
Index: 11,
ValidatorIndex: primitives.BuilderIndex(2).ToValidatorIndex(),
Address: []byte{0x02},
Amount: 22,
}, withdrawals[2])
require.DeepEqual(t, &enginev1.Withdrawal{
Index: 12,
ValidatorIndex: primitives.BuilderIndex(3).ToValidatorIndex(),
Address: []byte{0x03},
Amount: 33,
}, withdrawals[3])
})
t.Run("respects per-payload limit", func(t *testing.T) {
limit := params.BeaconConfig().MaxWithdrawalsPerPayload - 1
st := &BeaconState{
builderPendingWithdrawals: []*ethpb.BuilderPendingWithdrawal{
{BuilderIndex: 4, FeeRecipient: []byte{0x04}, Amount: 44},
{BuilderIndex: 5, FeeRecipient: []byte{0x05}, Amount: 55},
},
}
withdrawals := make([]*enginev1.Withdrawal, limit-1)
nextIndex, processed, err := st.appendBuilderWithdrawals(20, &withdrawals)
require.NoError(t, err)
require.Equal(t, uint64(21), nextIndex)
require.Equal(t, uint64(1), processed)
require.Equal(t, int(limit), len(withdrawals))
require.DeepEqual(t, &enginev1.Withdrawal{
Index: 20,
ValidatorIndex: primitives.BuilderIndex(4).ToValidatorIndex(),
Address: []byte{0x04},
Amount: 44,
}, withdrawals[len(withdrawals)-1])
})
t.Run("does not append when already at limit", func(t *testing.T) {
limit := params.BeaconConfig().MaxWithdrawalsPerPayload - 1
if limit == 0 {
t.Skip("withdrawals limit too small")
}
st := &BeaconState{
builderPendingWithdrawals: []*ethpb.BuilderPendingWithdrawal{
{BuilderIndex: 6, FeeRecipient: []byte{0x06}, Amount: 66},
},
}
withdrawals := make([]*enginev1.Withdrawal, limit)
nextIndex, processed, err := st.appendBuilderWithdrawals(30, &withdrawals)
require.NoError(t, err)
require.Equal(t, uint64(30), nextIndex)
require.Equal(t, uint64(0), processed)
require.Equal(t, int(limit), len(withdrawals))
})
}
func TestAppendBuildersSweepWithdrawals(t *testing.T) {
t.Run("errors when prior withdrawals exceed limit", func(t *testing.T) {
st := &BeaconState{}
limit := params.BeaconConfig().MaxWithdrawalsPerPayload - 1
withdrawals := make([]*enginev1.Withdrawal, limit+1)
nextIndex, nextBuilderIndex, err := st.appendBuildersSweepWithdrawals(5, &withdrawals)
require.ErrorContains(t, "exceeds limit", err)
require.Equal(t, uint64(5), nextIndex)
require.Equal(t, primitives.BuilderIndex(0), nextBuilderIndex)
require.Equal(t, int(limit+1), len(withdrawals))
})
t.Run("appends eligible builders, skips ineligible", func(t *testing.T) {
epoch := primitives.Epoch(3)
st := &BeaconState{
slot: slots.UnsafeEpochStart(epoch),
nextWithdrawalBuilderIndex: 2,
builders: []*ethpb.Builder{
{ExecutionAddress: []byte{0x01}, Balance: 0, WithdrawableEpoch: epoch},
{ExecutionAddress: []byte{0x02}, Balance: 10, WithdrawableEpoch: epoch + 1},
{ExecutionAddress: []byte{0x03}, Balance: 20, WithdrawableEpoch: epoch},
},
}
withdrawals := []*enginev1.Withdrawal{}
nextIndex, nextBuilderIndex, err := st.appendBuildersSweepWithdrawals(100, &withdrawals)
require.NoError(t, err)
require.Equal(t, uint64(101), nextIndex)
require.Equal(t, primitives.BuilderIndex(2), nextBuilderIndex)
require.Equal(t, 1, len(withdrawals))
require.DeepEqual(t, &enginev1.Withdrawal{
Index: 100,
ValidatorIndex: primitives.BuilderIndex(2).ToValidatorIndex(),
Address: []byte{0x03},
Amount: 20,
}, withdrawals[0])
})
t.Run("respects max builders per sweep", func(t *testing.T) {
cfg := params.BeaconConfig()
max := int(cfg.MaxBuildersPerWithdrawalsSweep)
epoch := primitives.Epoch(1)
builders := make([]*ethpb.Builder, max+2)
for i := range builders {
builders[i] = &ethpb.Builder{
ExecutionAddress: []byte{byte(i + 1)},
Balance: 1,
WithdrawableEpoch: epoch,
}
}
start := len(builders) - 1
st := &BeaconState{
slot: slots.UnsafeEpochStart(epoch),
nextWithdrawalBuilderIndex: primitives.BuilderIndex(start),
builders: builders,
}
withdrawals := []*enginev1.Withdrawal{}
nextIndex, nextBuilderIndex, err := st.appendBuildersSweepWithdrawals(7, &withdrawals)
require.NoError(t, err)
limit := int(cfg.MaxWithdrawalsPerPayload - 1)
expectedCount := min(max, limit)
require.Equal(t, uint64(7)+uint64(expectedCount), nextIndex)
require.Equal(t, expectedCount, len(withdrawals))
expectedNext := primitives.BuilderIndex((uint64(start) + uint64(expectedCount)) % uint64(len(builders)))
require.Equal(t, expectedNext, nextBuilderIndex)
})
t.Run("stops when payload limit reached", func(t *testing.T) {
cfg := params.BeaconConfig()
limit := cfg.MaxWithdrawalsPerPayload - 1
if limit < 1 {
t.Skip("withdrawals limit too small")
}
epoch := primitives.Epoch(2)
builders := []*ethpb.Builder{
{ExecutionAddress: []byte{0x0A}, Balance: 3, WithdrawableEpoch: epoch},
{ExecutionAddress: []byte{0x0B}, Balance: 4, WithdrawableEpoch: epoch},
}
st := &BeaconState{
slot: slots.UnsafeEpochStart(epoch),
nextWithdrawalBuilderIndex: 0,
builders: builders,
}
withdrawals := make([]*enginev1.Withdrawal, limit)
nextIndex, nextBuilderIndex, err := st.appendBuildersSweepWithdrawals(20, &withdrawals)
require.NoError(t, err)
require.Equal(t, uint64(20), nextIndex)
require.Equal(t, int(limit), len(withdrawals))
require.Equal(t, primitives.BuilderIndex(0), nextBuilderIndex)
})
}

View File

@@ -133,11 +133,22 @@ func (b *BeaconState) appendPendingPartialWithdrawals(withdrawalIndex uint64, wi
return withdrawalIndex, 0, nil
}
cfg := params.BeaconConfig()
withdrawalsLimit := min(
len(*withdrawals)+int(cfg.MaxPendingPartialsPerWithdrawalsSweep),
int(cfg.MaxWithdrawalsPerPayload-1),
)
if len(*withdrawals) > withdrawalsLimit {
return withdrawalIndex, 0, fmt.Errorf("prior withdrawals length %d exceeds limit %d", len(*withdrawals), withdrawalsLimit)
}
ws := *withdrawals
epoch := slots.ToEpoch(b.slot)
var processedPartialWithdrawalsCount uint64
for _, w := range b.pendingPartialWithdrawals {
if w.WithdrawableEpoch > epoch || len(ws) >= int(params.BeaconConfig().MaxPendingPartialsPerWithdrawalsSweep) {
isWithdrawable := w.WithdrawableEpoch <= epoch
hasReachedLimit := len(ws) >= withdrawalsLimit
if !isWithdrawable || hasReachedLimit {
break
}
@@ -149,7 +160,7 @@ func (b *BeaconState) appendPendingPartialWithdrawals(withdrawalIndex uint64, wi
if err != nil {
return withdrawalIndex, 0, fmt.Errorf("could not retrieve balance at index %d: %w", w.Index, err)
}
hasSufficientEffectiveBalance := v.EffectiveBalance() >= params.BeaconConfig().MinActivationBalance
hasSufficientEffectiveBalance := v.EffectiveBalance() >= cfg.MinActivationBalance
var totalWithdrawn uint64
for _, wi := range ws {
if wi.ValidatorIndex == w.Index {
@@ -160,9 +171,9 @@ func (b *BeaconState) appendPendingPartialWithdrawals(withdrawalIndex uint64, wi
if err != nil {
return withdrawalIndex, 0, errors.Wrapf(err, "failed to subtract balance %d with total withdrawn %d", vBal, totalWithdrawn)
}
hasExcessBalance := balance > params.BeaconConfig().MinActivationBalance
if v.ExitEpoch() == params.BeaconConfig().FarFutureEpoch && hasSufficientEffectiveBalance && hasExcessBalance {
amount := min(balance-params.BeaconConfig().MinActivationBalance, w.Amount)
hasExcessBalance := balance > cfg.MinActivationBalance
if v.ExitEpoch() == cfg.FarFutureEpoch && hasSufficientEffectiveBalance && hasExcessBalance {
amount := min(balance-cfg.MinActivationBalance, w.Amount)
ws = append(ws, &enginev1.Withdrawal{
Index: withdrawalIndex,
ValidatorIndex: w.Index,
@@ -183,7 +194,7 @@ func (b *BeaconState) appendValidatorsSweepWithdrawals(withdrawalIndex uint64, w
validatorIndex := b.nextWithdrawalValidatorIndex
validatorsLen := b.validatorsLen()
epoch := slots.ToEpoch(b.slot)
bound := min(uint64(validatorsLen), params.BeaconConfig().MaxValidatorsPerWithdrawalsSweep)
bound := min(validatorsLen, int(params.BeaconConfig().MaxValidatorsPerWithdrawalsSweep))
for range bound {
val, err := b.validatorAtIndexReadOnly(validatorIndex)
if err != nil {
@@ -222,7 +233,7 @@ func (b *BeaconState) appendValidatorsSweepWithdrawals(withdrawalIndex uint64, w
})
withdrawalIndex++
}
if uint64(len(ws)) == params.BeaconConfig().MaxWithdrawalsPerPayload {
if len(ws) == int(params.BeaconConfig().MaxWithdrawalsPerPayload) {
break
}
validatorIndex += 1

View File

@@ -1,6 +1,7 @@
package state_native
import (
"errors"
"fmt"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native/types"
@@ -8,6 +9,7 @@ import (
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
)
@@ -161,3 +163,154 @@ func (b *BeaconState) UpdateExecutionPayloadAvailabilityAtIndex(idx uint64, val
b.markFieldAsDirty(types.ExecutionPayloadAvailability)
return nil
}
// SetPayloadExpectedWithdrawals stores the expected withdrawals for the next payload.
func (b *BeaconState) SetPayloadExpectedWithdrawals(withdrawals []*enginev1.Withdrawal) error {
if b.version < version.Gloas {
return errNotSupported("SetPayloadExpectedWithdrawals", b.version)
}
b.lock.Lock()
defer b.lock.Unlock()
b.payloadExpectedWithdrawals = withdrawals
b.markFieldAsDirty(types.PayloadExpectedWithdrawals)
return nil
}
// DequeueBuilderPendingWithdrawals removes processed builder withdrawals from the front of the queue.
func (b *BeaconState) DequeueBuilderPendingWithdrawals(n uint64) error {
if b.version < version.Gloas {
return errNotSupported("DequeueBuilderPendingWithdrawals", b.version)
}
if n > uint64(len(b.builderPendingWithdrawals)) {
return errors.New("cannot dequeue more builder withdrawals than are in the queue")
}
if n == 0 {
return nil
}
b.lock.Lock()
defer b.lock.Unlock()
if b.sharedFieldReferences[types.BuilderPendingWithdrawals].Refs() > 1 {
withdrawals := make([]*ethpb.BuilderPendingWithdrawal, len(b.builderPendingWithdrawals))
copy(withdrawals, b.builderPendingWithdrawals)
b.builderPendingWithdrawals = withdrawals
b.sharedFieldReferences[types.BuilderPendingWithdrawals].MinusRef()
b.sharedFieldReferences[types.BuilderPendingWithdrawals] = stateutil.NewRef(1)
}
b.builderPendingWithdrawals = b.builderPendingWithdrawals[n:]
b.markFieldAsDirty(types.BuilderPendingWithdrawals)
b.rebuildTrie[types.BuilderPendingWithdrawals] = true
return nil
}
// SetNextWithdrawalBuilderIndex sets the next builder index for the withdrawals sweep.
func (b *BeaconState) SetNextWithdrawalBuilderIndex(index primitives.BuilderIndex) error {
if b.version < version.Gloas {
return errNotSupported("SetNextWithdrawalBuilderIndex", b.version)
}
b.lock.Lock()
defer b.lock.Unlock()
b.nextWithdrawalBuilderIndex = index
b.markFieldAsDirty(types.NextWithdrawalBuilderIndex)
return nil
}
// DecreaseWithdrawalBalances applies withdrawal balance decreases for validators and builders.
// This method holds the state lock for the full batch to avoid lock churn.
func (b *BeaconState) DecreaseWithdrawalBalances(withdrawals []*enginev1.Withdrawal) error {
if b.version < version.Gloas {
return errNotSupported("DecreaseWithdrawalBalances", b.version)
}
if len(withdrawals) == 0 {
return nil
}
b.lock.Lock()
defer b.lock.Unlock()
var (
balanceIndices []uint64
builderIndices []uint64
)
for _, withdrawal := range withdrawals {
if withdrawal == nil {
return errors.New("withdrawal is nil")
}
if withdrawal.Amount == 0 {
continue
}
if withdrawal.ValidatorIndex.IsBuilderIndex() {
builderIndex := withdrawal.ValidatorIndex.ToBuilderIndex()
if err := b.decreaseBuilderBalanceLockFree(builderIndex, withdrawal.Amount); err != nil {
return err
}
builderIndices = append(builderIndices, uint64(builderIndex))
continue
}
balAtIdx, err := b.balanceAtIndex(withdrawal.ValidatorIndex)
if err != nil {
return err
}
newBal := decreaseBalanceWithVal(balAtIdx, withdrawal.Amount)
if err := b.balancesMultiValue.UpdateAt(b, uint64(withdrawal.ValidatorIndex), newBal); err != nil {
return fmt.Errorf("could not update balances: %w", err)
}
balanceIndices = append(balanceIndices, uint64(withdrawal.ValidatorIndex))
}
if len(balanceIndices) > 0 {
b.markFieldAsDirty(types.Balances)
b.addDirtyIndices(types.Balances, balanceIndices)
}
if len(builderIndices) > 0 {
b.markFieldAsDirty(types.Builders)
b.addDirtyIndices(types.Builders, builderIndices)
}
return nil
}
func (b *BeaconState) decreaseBuilderBalanceLockFree(builderIndex primitives.BuilderIndex, amount uint64) error {
idx := uint64(builderIndex)
if idx >= uint64(len(b.builders)) {
return fmt.Errorf("builder index %d out of range (len=%d)", builderIndex, len(b.builders))
}
if b.sharedFieldReferences[types.Builders].Refs() > 1 {
builders := make([]*ethpb.Builder, len(b.builders))
copy(builders, b.builders)
b.builders = builders
b.sharedFieldReferences[types.Builders].MinusRef()
b.sharedFieldReferences[types.Builders] = stateutil.NewRef(1)
}
builder := b.builders[idx]
bal := uint64(builder.Balance)
if amount >= bal {
builder.Balance = 0
} else {
builder.Balance = primitives.Gwei(bal - amount)
}
return nil
}
func decreaseBalanceWithVal(currBalance, delta uint64) uint64 {
if delta > currBalance {
return 0
}
return currBalance - delta
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/testing/require"
@@ -229,6 +230,235 @@ func TestRotateBuilderPendingPayments_UnsupportedVersion(t *testing.T) {
require.ErrorContains(t, "RotateBuilderPendingPayments", err)
}
func TestSetPayloadExpectedWithdrawals(t *testing.T) {
t.Run("previous fork returns expected error", func(t *testing.T) {
st := &BeaconState{version: version.Fulu}
err := st.SetPayloadExpectedWithdrawals([]*enginev1.Withdrawal{})
require.ErrorContains(t, "SetPayloadExpectedWithdrawals", err)
})
t.Run("allows nil input and marks dirty", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
}
require.NoError(t, st.SetPayloadExpectedWithdrawals(nil))
require.Equal(t, true, st.payloadExpectedWithdrawals == nil)
require.Equal(t, true, st.dirtyFields[types.PayloadExpectedWithdrawals])
})
t.Run("sets and marks dirty", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
payloadExpectedWithdrawals: []*enginev1.Withdrawal{{Index: 1}, {Index: 2}},
}
withdrawals := []*enginev1.Withdrawal{{Index: 3}}
require.NoError(t, st.SetPayloadExpectedWithdrawals(withdrawals))
require.DeepEqual(t, withdrawals, st.payloadExpectedWithdrawals)
require.Equal(t, true, st.dirtyFields[types.PayloadExpectedWithdrawals])
})
}
func TestDecreaseWithdrawalBalances(t *testing.T) {
t.Run("previous fork returns expected error", func(t *testing.T) {
st := &BeaconState{version: version.Fulu}
err := st.DecreaseWithdrawalBalances([]*enginev1.Withdrawal{{}})
require.ErrorContains(t, "DecreaseWithdrawalBalances", err)
})
t.Run("rejects nil withdrawal", func(t *testing.T) {
st := &BeaconState{version: version.Gloas}
err := st.DecreaseWithdrawalBalances([]*enginev1.Withdrawal{nil})
require.ErrorContains(t, "withdrawal is nil", err)
})
t.Run("no-op on empty input", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
dirtyIndices: make(map[types.FieldIndex][]uint64),
rebuildTrie: make(map[types.FieldIndex]bool),
}
require.NoError(t, st.DecreaseWithdrawalBalances(nil))
require.Equal(t, 0, len(st.dirtyFields))
require.Equal(t, 0, len(st.dirtyIndices))
})
t.Run("updates validator and builder balances and tracks dirty indices", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
dirtyIndices: make(map[types.FieldIndex][]uint64),
rebuildTrie: make(map[types.FieldIndex]bool),
sharedFieldReferences: map[types.FieldIndex]*stateutil.Reference{
types.Builders: stateutil.NewRef(1),
},
balancesMultiValue: NewMultiValueBalances([]uint64{100, 200, 300}),
builders: []*ethpb.Builder{
{Balance: 1000},
{Balance: 50},
},
}
withdrawals := []*enginev1.Withdrawal{
{ValidatorIndex: primitives.ValidatorIndex(1), Amount: 20},
{ValidatorIndex: primitives.BuilderIndex(1).ToValidatorIndex(), Amount: 30},
{ValidatorIndex: primitives.ValidatorIndex(2), Amount: 400},
{ValidatorIndex: primitives.BuilderIndex(0).ToValidatorIndex(), Amount: 2000},
{ValidatorIndex: primitives.ValidatorIndex(0), Amount: 0},
}
require.NoError(t, st.DecreaseWithdrawalBalances(withdrawals))
require.DeepEqual(t, []uint64{100, 180, 0}, st.Balances())
require.Equal(t, primitives.Gwei(0), st.builders[0].Balance)
require.Equal(t, primitives.Gwei(20), st.builders[1].Balance)
require.Equal(t, true, st.dirtyFields[types.Balances])
require.Equal(t, true, st.dirtyFields[types.Builders])
require.DeepEqual(t, []uint64{1, 2}, st.dirtyIndices[types.Balances])
require.DeepEqual(t, []uint64{1, 0}, st.dirtyIndices[types.Builders])
})
t.Run("returns error on builder index out of range", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
dirtyIndices: make(map[types.FieldIndex][]uint64),
rebuildTrie: make(map[types.FieldIndex]bool),
sharedFieldReferences: map[types.FieldIndex]*stateutil.Reference{
types.Builders: stateutil.NewRef(1),
},
builders: []*ethpb.Builder{{Balance: 5}},
}
err := st.DecreaseWithdrawalBalances([]*enginev1.Withdrawal{
{ValidatorIndex: primitives.BuilderIndex(2).ToValidatorIndex(), Amount: 1},
})
require.ErrorContains(t, "out of range", err)
require.Equal(t, false, st.dirtyFields[types.Builders])
require.Equal(t, 0, len(st.dirtyIndices[types.Builders]))
})
}
func TestDequeueBuilderPendingWithdrawals(t *testing.T) {
t.Run("previous fork returns expected error", func(t *testing.T) {
st := &BeaconState{version: version.Fulu}
err := st.DequeueBuilderPendingWithdrawals(1)
require.ErrorContains(t, "DequeueBuilderPendingWithdrawals", err)
})
t.Run("returns error when dequeueing more than length", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
sharedFieldReferences: map[types.FieldIndex]*stateutil.Reference{
types.BuilderPendingWithdrawals: stateutil.NewRef(1),
},
builderPendingWithdrawals: []*ethpb.BuilderPendingWithdrawal{{Amount: 1}},
}
err := st.DequeueBuilderPendingWithdrawals(2)
require.ErrorContains(t, "cannot dequeue more builder withdrawals", err)
require.Equal(t, 1, len(st.builderPendingWithdrawals))
require.Equal(t, false, st.dirtyFields[types.BuilderPendingWithdrawals])
})
t.Run("no-op on zero", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
sharedFieldReferences: map[types.FieldIndex]*stateutil.Reference{
types.BuilderPendingWithdrawals: stateutil.NewRef(1),
},
builderPendingWithdrawals: []*ethpb.BuilderPendingWithdrawal{{Amount: 1}},
}
require.NoError(t, st.DequeueBuilderPendingWithdrawals(0))
require.Equal(t, 1, len(st.builderPendingWithdrawals))
require.Equal(t, false, st.dirtyFields[types.BuilderPendingWithdrawals])
require.Equal(t, false, st.rebuildTrie[types.BuilderPendingWithdrawals])
})
t.Run("dequeues and marks dirty", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
sharedFieldReferences: map[types.FieldIndex]*stateutil.Reference{
types.BuilderPendingWithdrawals: stateutil.NewRef(1),
},
builderPendingWithdrawals: []*ethpb.BuilderPendingWithdrawal{
{Amount: 1},
{Amount: 2},
{Amount: 3},
},
rebuildTrie: make(map[types.FieldIndex]bool),
}
require.NoError(t, st.DequeueBuilderPendingWithdrawals(2))
require.Equal(t, 1, len(st.builderPendingWithdrawals))
require.Equal(t, primitives.Gwei(3), st.builderPendingWithdrawals[0].Amount)
require.Equal(t, true, st.dirtyFields[types.BuilderPendingWithdrawals])
require.Equal(t, true, st.rebuildTrie[types.BuilderPendingWithdrawals])
})
t.Run("copy-on-write preserves shared state", func(t *testing.T) {
sharedRef := stateutil.NewRef(2)
sharedWithdrawals := []*ethpb.BuilderPendingWithdrawal{
{Amount: 1},
{Amount: 2},
{Amount: 3},
}
st1 := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
sharedFieldReferences: map[types.FieldIndex]*stateutil.Reference{
types.BuilderPendingWithdrawals: sharedRef,
},
builderPendingWithdrawals: sharedWithdrawals,
rebuildTrie: make(map[types.FieldIndex]bool),
}
st2 := &BeaconState{
sharedFieldReferences: map[types.FieldIndex]*stateutil.Reference{
types.BuilderPendingWithdrawals: sharedRef,
},
builderPendingWithdrawals: sharedWithdrawals,
}
require.NoError(t, st1.DequeueBuilderPendingWithdrawals(2))
require.Equal(t, primitives.Gwei(3), st1.builderPendingWithdrawals[0].Amount)
require.Equal(t, 3, len(st2.builderPendingWithdrawals))
require.Equal(t, primitives.Gwei(1), st2.builderPendingWithdrawals[0].Amount)
require.Equal(t, uint(1), st1.sharedFieldReferences[types.BuilderPendingWithdrawals].Refs())
require.Equal(t, uint(1), st2.sharedFieldReferences[types.BuilderPendingWithdrawals].Refs())
})
}
func TestSetNextWithdrawalBuilderIndex(t *testing.T) {
t.Run("previous fork returns expected error", func(t *testing.T) {
st := &BeaconState{version: version.Fulu}
err := st.SetNextWithdrawalBuilderIndex(1)
require.ErrorContains(t, "SetNextWithdrawalBuilderIndex", err)
})
t.Run("sets and marks dirty", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
}
require.NoError(t, st.SetNextWithdrawalBuilderIndex(7))
require.Equal(t, primitives.BuilderIndex(7), st.nextWithdrawalBuilderIndex)
require.Equal(t, true, st.dirtyFields[types.NextWithdrawalBuilderIndex])
})
}
func TestAppendBuilderPendingWithdrawal_CopyOnWrite(t *testing.T) {
wd := &ethpb.BuilderPendingWithdrawal{
FeeRecipient: make([]byte, 20),

View File

@@ -1027,10 +1027,10 @@ func TestGetVerifyingStateEdgeCases(t *testing.T) {
sc: signatureCache,
sr: &mockStateByRooter{sbr: sbrErrorIfCalled(t)}, // Should not be called
hsp: &mockHeadStateProvider{
headRoot: parentRoot[:], // Same as parent
headSlot: 32, // Epoch 1
headState: fuluState.Copy(), // HeadState (not ReadOnly) for ProcessSlots
headStateReadOnly: nil, // Should not use ReadOnly path
headRoot: parentRoot[:], // Same as parent
headSlot: 32, // Epoch 1
headState: fuluState.Copy(), // HeadState (not ReadOnly) for ProcessSlots
headStateReadOnly: nil, // Should not use ReadOnly path
},
fc: &mockForkchoicer{
// Return same root for both to simulate same chain
@@ -1045,8 +1045,8 @@ func TestGetVerifyingStateEdgeCases(t *testing.T) {
// Wrap to detect HeadState call
originalHsp := initializer.shared.hsp.(*mockHeadStateProvider)
wrappedHsp := &mockHeadStateProvider{
headRoot: originalHsp.headRoot,
headSlot: originalHsp.headSlot,
headRoot: originalHsp.headRoot,
headSlot: originalHsp.headSlot,
headState: originalHsp.headState,
}
initializer.shared.hsp = &headStateCallTracker{

View File

@@ -1,3 +0,0 @@
### Added
- Set beacon node options after reading the config file.

View File

@@ -1,7 +0,0 @@
### Changed
- gRPC fallback now matches rest api implementation and will also check and connect to only synced nodes.
### Removed
- gRPC resolver for load balancing, the new implementation matches rest api's so we should remove the resolver so it's handled the same way for consistency.

View File

@@ -1,3 +0,0 @@
### Ignored
- Updated golangci to run lint on tests too.

View File

@@ -1,3 +0,0 @@
### Ignored
- Add handy documentation for SSZ Query package (`encoding/ssz/query`).

View File

@@ -0,0 +1,2 @@
### Added
- add modified process withdrawals for gloas

View File

@@ -1,2 +0,0 @@
### Ignored
- Run go fmt

View File

@@ -367,8 +367,17 @@ func startNode(ctx *cli.Context, cancel context.CancelFunc) error {
backfill.BeaconNodeOptions,
das.BeaconNodeOptions,
}
for _, of := range optFuncs {
ofo, err := of(ctx)
if err != nil {
return err
}
if ofo != nil {
opts = append(opts, ofo...)
}
}
beacon, err := node.New(ctx, cancel, optFuncs, opts...)
beacon, err := node.New(ctx, cancel, opts...)
if err != nil {
return fmt.Errorf("unable to start beacon node: %w", err)
}

View File

@@ -47,6 +47,7 @@ type BeaconChainConfig struct {
HysteresisQuotient uint64 `yaml:"HYSTERESIS_QUOTIENT" spec:"true"` // HysteresisQuotient defines the hysteresis quotient for effective balance calculations.
HysteresisDownwardMultiplier uint64 `yaml:"HYSTERESIS_DOWNWARD_MULTIPLIER" spec:"true"` // HysteresisDownwardMultiplier defines the hysteresis downward multiplier for effective balance calculations.
HysteresisUpwardMultiplier uint64 `yaml:"HYSTERESIS_UPWARD_MULTIPLIER" spec:"true"` // HysteresisUpwardMultiplier defines the hysteresis upward multiplier for effective balance calculations.
BuilderIndexFlag uint64 `yaml:"BUILDER_INDEX_FLAG" spec:"true"` // BuilderIndexFlag marks a ValidatorIndex as a BuilderIndex when the bit is set.
// Gwei value constants.
MinDepositAmount uint64 `yaml:"MIN_DEPOSIT_AMOUNT" spec:"true"` // MinDepositAmount is the minimum amount of Gwei a validator can send to the deposit contract at once (lower amounts will be reverted).
@@ -130,6 +131,7 @@ type BeaconChainConfig struct {
MaxWithdrawalsPerPayload uint64 `yaml:"MAX_WITHDRAWALS_PER_PAYLOAD" spec:"true"` // MaxWithdrawalsPerPayload defines the maximum number of withdrawals in a block.
MaxBlsToExecutionChanges uint64 `yaml:"MAX_BLS_TO_EXECUTION_CHANGES" spec:"true"` // MaxBlsToExecutionChanges defines the maximum number of BLS-to-execution-change objects in a block.
MaxValidatorsPerWithdrawalsSweep uint64 `yaml:"MAX_VALIDATORS_PER_WITHDRAWALS_SWEEP" spec:"true"` // MaxValidatorsPerWithdrawalsSweep bounds the size of the sweep searching for withdrawals per slot.
MaxBuildersPerWithdrawalsSweep uint64 `yaml:"MAX_BUILDERS_PER_WITHDRAWALS_SWEEP" spec:"true"` // MaxBuildersPerWithdrawalsSweep bounds the size of the builder withdrawals sweep per slot.
// BLS domain values.
DomainBeaconProposer [4]byte `yaml:"DOMAIN_BEACON_PROPOSER" spec:"true"` // DomainBeaconProposer defines the BLS signature domain for beacon proposal verification.

View File

@@ -194,6 +194,8 @@ func ConfigToYaml(cfg *BeaconChainConfig) []byte {
fmt.Sprintf("SHARD_COMMITTEE_PERIOD: %d", cfg.ShardCommitteePeriod),
fmt.Sprintf("MIN_VALIDATOR_WITHDRAWABILITY_DELAY: %d", cfg.MinValidatorWithdrawabilityDelay),
fmt.Sprintf("MAX_VALIDATORS_PER_WITHDRAWALS_SWEEP: %d", cfg.MaxValidatorsPerWithdrawalsSweep),
fmt.Sprintf("MAX_BUILDERS_PER_WITHDRAWALS_SWEEP: %d", cfg.MaxBuildersPerWithdrawalsSweep),
fmt.Sprintf("BUILDER_INDEX_FLAG: %d", cfg.BuilderIndexFlag),
fmt.Sprintf("MAX_SEED_LOOKAHEAD: %d", cfg.MaxSeedLookahead),
fmt.Sprintf("EJECTION_BALANCE: %d", cfg.EjectionBalance),
fmt.Sprintf("MIN_PER_EPOCH_CHURN_LIMIT: %d", cfg.MinPerEpochChurnLimit),

View File

@@ -85,6 +85,7 @@ var mainnetBeaconConfig = &BeaconChainConfig{
HysteresisQuotient: 4,
HysteresisDownwardMultiplier: 1,
HysteresisUpwardMultiplier: 5,
BuilderIndexFlag: 1099511627776,
// Gwei value constants.
MinDepositAmount: 1 * 1e9,
@@ -175,6 +176,7 @@ var mainnetBeaconConfig = &BeaconChainConfig{
MaxWithdrawalsPerPayload: 16,
MaxBlsToExecutionChanges: 16,
MaxValidatorsPerWithdrawalsSweep: 16384,
MaxBuildersPerWithdrawalsSweep: 16384,
// BLS domain values.
DomainBeaconProposer: bytesutil.Uint32ToBytes4(0x00000000),

View File

@@ -49,6 +49,7 @@ func compareConfigs(t *testing.T, expected, actual *params.BeaconChainConfig) {
require.DeepEqual(t, expected.HysteresisQuotient, actual.HysteresisQuotient)
require.DeepEqual(t, expected.HysteresisDownwardMultiplier, actual.HysteresisDownwardMultiplier)
require.DeepEqual(t, expected.HysteresisUpwardMultiplier, actual.HysteresisUpwardMultiplier)
require.DeepEqual(t, expected.BuilderIndexFlag, actual.BuilderIndexFlag)
require.DeepEqual(t, expected.MinDepositAmount, actual.MinDepositAmount)
require.DeepEqual(t, expected.MaxEffectiveBalance, actual.MaxEffectiveBalance)
require.DeepEqual(t, expected.EjectionBalance, actual.EjectionBalance)
@@ -94,6 +95,7 @@ func compareConfigs(t *testing.T, expected, actual *params.BeaconChainConfig) {
require.DeepEqual(t, expected.MaxDeposits, actual.MaxDeposits)
require.DeepEqual(t, expected.MaxVoluntaryExits, actual.MaxVoluntaryExits)
require.DeepEqual(t, expected.MaxWithdrawalsPerPayload, actual.MaxWithdrawalsPerPayload)
require.DeepEqual(t, expected.MaxBuildersPerWithdrawalsSweep, actual.MaxBuildersPerWithdrawalsSweep)
require.DeepEqual(t, expected.DomainBeaconProposer, actual.DomainBeaconProposer)
require.DeepEqual(t, expected.DomainRandao, actual.DomainRandao)
require.DeepEqual(t, expected.DomainBeaconAttester, actual.DomainBeaconAttester)

View File

@@ -13,6 +13,16 @@ var _ fssz.Unmarshaler = (*BuilderIndex)(nil)
// BuilderIndex is an index into the builder registry.
type BuilderIndex uint64
// ToValidatorIndex sets the builder flag on a builder index.
//
// Spec v1.6.1 (pseudocode):
// def convert_builder_index_to_validator_index(builder_index: BuilderIndex) -> ValidatorIndex:
//
// return ValidatorIndex(builder_index | BUILDER_INDEX_FLAG)
func (b BuilderIndex) ToValidatorIndex() ValidatorIndex {
return ValidatorIndex(uint64(b) | BuilderIndexFlag)
}
// HashTreeRoot returns the SSZ hash tree root of the index.
func (b BuilderIndex) HashTreeRoot() ([32]byte, error) {
return fssz.HashWithDefaultHasher(b)

View File

@@ -10,9 +10,34 @@ var _ fssz.HashRoot = (ValidatorIndex)(0)
var _ fssz.Marshaler = (*ValidatorIndex)(nil)
var _ fssz.Unmarshaler = (*ValidatorIndex)(nil)
// BuilderIndexFlag marks a ValidatorIndex as a BuilderIndex when the bit is set.
//
// Spec v1.6.1: BUILDER_INDEX_FLAG.
const BuilderIndexFlag uint64 = 1 << 40
// ValidatorIndex in eth2.
type ValidatorIndex uint64
// IsBuilderIndex returns true when the BuilderIndex flag is set on the validator index.
//
// Spec v1.6.1 (pseudocode):
// def is_builder_index(validator_index: ValidatorIndex) -> bool:
//
// return (validator_index & BUILDER_INDEX_FLAG) != 0
func (v ValidatorIndex) IsBuilderIndex() bool {
return uint64(v)&BuilderIndexFlag != 0
}
// ToBuilderIndex strips the builder flag from a validator index.
//
// Spec v1.6.1 (pseudocode):
// def convert_validator_index_to_builder_index(validator_index: ValidatorIndex) -> BuilderIndex:
//
// return BuilderIndex(validator_index & ~BUILDER_INDEX_FLAG)
func (v ValidatorIndex) ToBuilderIndex() BuilderIndex {
return BuilderIndex(uint64(v) & ^BuilderIndexFlag)
}
// Div divides validator index by x.
// This method panics if dividing by zero!
func (v ValidatorIndex) Div(x uint64) ValidatorIndex {

View File

@@ -33,3 +33,32 @@ func TestValidatorIndex_Casting(t *testing.T) {
}
})
}
func TestValidatorIndex_BuilderIndexFlagConversions(t *testing.T) {
base := uint64(42)
unflagged := ValidatorIndex(base)
if unflagged.IsBuilderIndex() {
t.Fatalf("expected unflagged validator index to not be a builder index")
}
if got, want := unflagged.ToBuilderIndex(), BuilderIndex(base); got != want {
t.Fatalf("unexpected builder index: got %d want %d", got, want)
}
flagged := ValidatorIndex(base | BuilderIndexFlag)
if !flagged.IsBuilderIndex() {
t.Fatalf("expected flagged validator index to be a builder index")
}
if got, want := flagged.ToBuilderIndex(), BuilderIndex(base); got != want {
t.Fatalf("unexpected builder index: got %d want %d", got, want)
}
builder := BuilderIndex(base)
roundTrip := builder.ToValidatorIndex()
if !roundTrip.IsBuilderIndex() {
t.Fatalf("expected round-tripped validator index to be a builder index")
}
if got, want := roundTrip.ToBuilderIndex(), builder; got != want {
t.Fatalf("unexpected round-trip builder index: got %d want %d", got, want)
}
}

View File

@@ -1,190 +0,0 @@
# SSZ Query Package
The `encoding/ssz/query` package provides a system for analyzing and querying SSZ ([Simple Serialize](https://github.com/ethereum/consensus-specs/blob/master/ssz/simple-serialize.md)) data structures, as well as generating Merkle proofs from them. It enables runtime analysis of SSZ-serialized Go objects with reflection, path-based queries through nested structures, generalized index calculation, and Merkle proof generation.
This package is designed to be generic. It operates on arbitrary SSZ-serialized Go values at runtime, so the same query/proof machinery applies equally to any SSZ type, including the BeaconState/BeaconBlock.
## Usage Example
```go
// 1. Analyze an SSZ object
block := &ethpb.BeaconBlock{...}
info, err := query.AnalyzeObject(block)
// 2. Parse a path
path, err := query.ParsePath(".body.attestations[0].data.slot")
// 3. Get the generalized index
gindex, err := query.GetGeneralizedIndexFromPath(info, path)
// 4. Generate a Merkle proof
proof, err := info.Prove(gindex)
// 5. Get offset and length to slice the SSZ-encoded bytes
sszBytes, _ := block.MarshalSSZ()
_, offset, length, err := query.CalculateOffsetAndLength(info, path)
// slotBytes contains the SSZ-encoded value at the queried path
slotBytes := sszBytes[offset : offset+length]
```
## Exported API
The main exported API consists of:
```go
// AnalyzeObject analyzes an SSZ object and returns its structural information
func AnalyzeObject(obj SSZObject) (*SszInfo, error)
// ParsePath parses a path string like ".field1.field2[0].field3"
func ParsePath(rawPath string) (Path, error)
// CalculateOffsetAndLength computes byte offset and length for a path within an SSZ object
func CalculateOffsetAndLength(sszInfo *SszInfo, path Path) (*SszInfo, uint64, uint64, error)
// GetGeneralizedIndexFromPath calculates the generalized index for a given path
func GetGeneralizedIndexFromPath(info *SszInfo, path Path) (uint64, error)
// Prove generates a Merkle proof for a target generalized index
func (s *SszInfo) Prove(gindex uint64) (*fastssz.Proof, error)
```
## Type System
### SSZ Types
The package now supports [all standard SSZ types](https://github.com/ethereum/consensus-specs/blob/master/ssz/simple-serialize.md#typing) except `ProgressiveList`, `ProgressiveContainer`, `ProgressiveBitlist`, `Union`, and `CompatibleUnion`.
### Core Data Structures
#### `SszInfo`
The `SszInfo` structure contains complete structural metadata for an SSZ type:
```go
type SszInfo struct {
sszType SszType // SSZ Type classification
typ reflect.Type // Go reflect.Type
source SSZObject // Original SSZObject reference. Mostly used for reusing SSZ methods like `HashTreeRoot`.
isVariable bool // True if contains variable-size fields
// Composite types have corresponding metadata. Other fields would be nil except for the current type.
containerInfo *containerInfo
listInfo *listInfo
vectorInfo *vectorInfo
bitlistInfo *bitlistInfo
bitvectorInfo *bitvectorInfo
}
```
#### `Path`
The `Path` structure represents navigation paths through SSZ structures. It supports accessing a field by field name, accessing an element by index (list/vector type), and finding the length of homogenous collection types. The `ParsePath` function parses a raw string into a `Path` instance, which is commonly used in other APIs like `CalculateOffsetAndLength` and `GetGeneralizedIndexFromPath`.
```go
type Path struct {
Length bool // Flag for length queries (e.g., len(.field))
Elements []PathElement // Sequence of field accesses and indices
}
type PathElement struct {
Name string // Field name
Index *uint64 // list/vector index (nil if not an index access)
}
```
## Implementation Details
### Type Analysis (`analyzer.go`)
The `AnalyzeObject` function performs recursive type introspection using Go reflection:
1. **Type Inspection** - Examines Go `reflect.Value` to determine SSZ type
- Basic types (`uint8`, `uint16`, `uint32`, `uint64`, `bool`): `SSZType` constants
- Slices: Determined from struct tags (`ssz-size` for vectors, `ssz-max` for lists). There is a related [write-up](https://hackmd.io/@junsong/H101DKnwxl) regarding struct tags.
- Structs: Analyzed as Containers with field ordering from JSON tags
- Pointers: Dereferenced automatically
2. **Variable-Length Population** - Determines actual sizes at runtime
- For lists: Iterates elements, caches sizes for variable-element lists
- For containers: Recursively populates variable fields, adjusts offsets
- For bitlists: Decodes bit length from bitvector
3. **Offset Calculation** - Computes byte positions within serialized data
- Fixed-size fields: Offset = sum of preceding field sizes
- Variable-size fields: Offset stored as 4-byte pointer entries
### Path Parsing (`path.go`)
The `ParsePath` function parses path strings with the following rules:
- **Dot notation**: `.field1.field2` for field access
- **Array indexing**: `[0]`, `[42]` for element access
- **Length queries**: `len(.field)` for list/vector lengths
- **Character set**: Only `[A-Za-z0-9._\[\]\(\)]` allowed
Example:
```go
path, _ := ParsePath(".nested.array_field[5].inner_field")
// Returns: Path{
// Elements: [
// PathElement{Name: "nested"},
// PathElement{Name: "array_field", Index: <Pointer to uint64(5)>},
// PathElement{Name: "inner_field"}
// ]
// }
```
### Generalized Index Calculation (`generalized_index.go`)
The generalized index is a tree position identifier. This package follows the [Ethereum consensus-specs](https://github.com/ethereum/consensus-specs/blob/master/ssz/merkle-proofs.md#generalized-merkle-tree-index) to calculate the generalized index.
### Merkle Proof Generation (`merkle_proof.go`, `proof_collector.go`)
The `Prove` method generates Merkle proofs using a single-sweep merkleization algorithm:
#### Algorithm Overview
**Key Terms:**
- **Target gindex** (generalized index): The position of the SSZ element you want to prove, expressed as a generalized Merkle tree index. Stored in `Proof.Index`.
- Note: The generalized index for root is 1.
- **Registered gindices**: The set of tree positions whose node hashes must be captured during merkleization in order to later assemble the proof.
- **Sibling node**: The node that shares the same parent as another node.
- **Leaf value**: The 32-byte hash of the target node (the node being proven). Stored in `Proof.Leaf`.
**Phases:**
1. **Registration Phase** (`addTarget`)
> Goal: determine exactly which sibling hashes are needed for the proof.
- Record the target gindex as the proof target.
- Starting from the target node, walk the Merkle tree from the leaf (target gindex) to the root (gindex = 1).
- At each step:
- Compute and register the sibling gindex (`i XOR 1`) as “must collect”.
- Move to the parent (`i = i/2`).
- This produces the full set of registered gindices (the sibling nodes on the target-to-root path).
2. **Merkleization Phase** (`merkleize`)
> Goal: recursively merkleize the tree and capture the needed hashes.
- Recursively traverse the SSZ structure and compute Merkle tree node hashes from leaves to root.
- Whenever the traversal computes a node whose gindex is in registered gindices, store that nodes hash for later proof construction.
3. **Proof Assembly Phase** (`toProof`)
> Goal: create the final `fastssz.Proof` object in the correct format and order.
```go
// Proof represents a merkle proof against a general index.
type Proof struct {
Index int
Leaf []byte
Hashes [][]byte
}
```
- Set `Proof.Index` to the target gindex.
- Set `Proof.Leaf` to the 32-byte hash of the target node.
- Build `Proof.Hashes` by walking from the target node up to (but not including) the root:
- At node `i`, append the stored hash for the sibling (`i XOR 1`).
- Move to the parent (`i = i/2`).
- The resulting `Proof.Hashes` is ordered from the target level upward, containing one sibling hash per tree level on the path to the root.

View File

@@ -26,21 +26,21 @@ func TestLifecycle(t *testing.T) {
port := 1000 + rand.Intn(1000)
prometheusService := NewService(t.Context(), fmt.Sprintf(":%d", port), nil)
prometheusService.Start()
// Actively wait until the service responds on /metrics (faster and less flaky than a fixed sleep)
deadline := time.Now().Add(3 * time.Second)
for {
if time.Now().After(deadline) {
t.Fatalf("metrics endpoint not ready within timeout")
}
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
if err == nil {
_ = resp.Body.Close()
if resp.StatusCode == http.StatusOK {
break
}
}
time.Sleep(50 * time.Millisecond)
}
// Actively wait until the service responds on /metrics (faster and less flaky than a fixed sleep)
deadline := time.Now().Add(3 * time.Second)
for {
if time.Now().After(deadline) {
t.Fatalf("metrics endpoint not ready within timeout")
}
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
if err == nil {
_ = resp.Body.Close()
if resp.StatusCode == http.StatusOK {
break
}
}
time.Sleep(50 * time.Millisecond)
}
// Query the service to ensure it really started.
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
@@ -49,18 +49,18 @@ func TestLifecycle(t *testing.T) {
err = prometheusService.Stop()
require.NoError(t, err)
// Actively wait until the service stops responding on /metrics
deadline = time.Now().Add(3 * time.Second)
for {
if time.Now().After(deadline) {
t.Fatalf("metrics endpoint still reachable after timeout")
}
_, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
if err != nil {
break
}
time.Sleep(50 * time.Millisecond)
}
// Actively wait until the service stops responding on /metrics
deadline = time.Now().Add(3 * time.Second)
for {
if time.Now().After(deadline) {
t.Fatalf("metrics endpoint still reachable after timeout")
}
_, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
if err != nil {
break
}
time.Sleep(50 * time.Millisecond)
}
// Query the service to ensure it really stopped.
_, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))

View File

@@ -225,9 +225,9 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{r.depositor}); err != nil {
return errors.Wrap(err, "testDepositsAndTx unable to run, depositor did not Start")
}
go func() {
if r.config.TestDeposits {
log.Info("Running deposit tests")
go func() {
if r.config.TestDeposits {
log.Info("Running deposit tests")
// The validators with an index < minGenesisActiveCount all have deposits already from the chain start.
// Skip all of those chain start validators by seeking to minGenesisActiveCount in the validator list
// for further deposit testing.
@@ -238,12 +238,12 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
r.t.Error(errors.Wrap(err, "depositor.SendAndMine failed"))
}
}
}
// Only generate background transactions when relevant for the test.
if r.config.TestDeposits || r.config.TestFeature || r.config.UseBuilder {
r.testTxGeneration(ctx, g, keystorePath, []e2etypes.ComponentRunner{})
}
}()
}
// Only generate background transactions when relevant for the test.
if r.config.TestDeposits || r.config.TestFeature || r.config.UseBuilder {
r.testTxGeneration(ctx, g, keystorePath, []e2etypes.ComponentRunner{})
}
}()
if r.config.TestDeposits {
return depositCheckValidator.Start(ctx)
}

View File

@@ -38,8 +38,8 @@ func TestEndToEnd_MinimalConfig(t *testing.T) {
r := e2eMinimal(t, cfg,
types.WithCheckpointSync(),
types.WithEpochs(10),
types.WithExitEpoch(4), // Minimum due to ShardCommitteePeriod=4
types.WithLargeBlobs(), // Use large blob transactions for BPO testing
types.WithExitEpoch(4), // Minimum due to ShardCommitteePeriod=4
types.WithLargeBlobs(), // Use large blob transactions for BPO testing
)
r.run()
}
}

View File

@@ -204,6 +204,7 @@ go_test(
"gloas__operations__execution_payload_header_test.go",
"gloas__operations__payload_attestation_test.go",
"gloas__operations__proposer_slashing_test.go",
"gloas__operations__withdrawals_test.go",
"gloas__sanity__slots_test.go",
"gloas__ssz_static__ssz_static_test.go",
"phase0__epoch_processing__effective_balance_updates_test.go",

View File

@@ -0,0 +1,11 @@
package mainnet
import (
"testing"
"github.com/OffchainLabs/prysm/v7/testing/spectest/shared/gloas/operations"
)
func TestMainnet_Gloas_Operations_Withdrawals(t *testing.T) {
operations.RunWithdrawalsTest(t, "mainnet")
}

View File

@@ -210,6 +210,7 @@ go_test(
"gloas__operations__execution_payload_bid_test.go",
"gloas__operations__payload_attestation_test.go",
"gloas__operations__proposer_slashing_test.go",
"gloas__operations__withdrawals_test.go",
"gloas__sanity__slots_test.go",
"gloas__ssz_static__ssz_static_test.go",
"phase0__epoch_processing__effective_balance_updates_test.go",

View File

@@ -0,0 +1,11 @@
package minimal
import (
"testing"
"github.com/OffchainLabs/prysm/v7/testing/spectest/shared/gloas/operations"
)
func TestMinimal_Gloas_Operations_Withdrawals(t *testing.T) {
operations.RunWithdrawalsTest(t, "minimal")
}

View File

@@ -8,16 +8,20 @@ go_library(
"helpers.go",
"payload_attestation.go",
"proposer_slashing.go",
"withdrawals.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/testing/spectest/shared/gloas/operations",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/core/gloas:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"//testing/require:go_default_library",
"//testing/spectest/shared/common/operations:go_default_library",
"//testing/spectest/utils:go_default_library",
],
)

View File

@@ -0,0 +1,48 @@
package operations
import (
"context"
"path"
"testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/gloas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/testing/require"
common "github.com/OffchainLabs/prysm/v7/testing/spectest/shared/common/operations"
"github.com/OffchainLabs/prysm/v7/testing/spectest/utils"
)
func emptyBlockGloas() (interfaces.SignedBeaconBlock, error) {
b := &ethpb.SignedBeaconBlockGloas{
Block: &ethpb.BeaconBlockGloas{
Body: &ethpb.BeaconBlockBodyGloas{},
},
}
return blocks.NewSignedBeaconBlock(b)
}
func RunWithdrawalsTest(t *testing.T, config string) {
require.NoError(t, utils.SetConfig(t, config))
testFolders, testsFolderPath := utils.TestFolders(t, config, version.String(version.Gloas), "operations/withdrawals/pyspec_tests")
if len(testFolders) == 0 {
t.Fatalf("No test folders found for %s/%s/%s", config, version.String(version.Gloas), "operations/withdrawals/pyspec_tests")
}
for _, folder := range testFolders {
t.Run(folder.Name(), func(t *testing.T) {
folderPath := path.Join(testsFolderPath, folder.Name())
blk, err := emptyBlockGloas()
require.NoError(t, err)
common.RunBlockOperationTest(t, folderPath, blk, sszToState, func(_ context.Context, s state.BeaconState, _ interfaces.ReadOnlySignedBeaconBlock) (state.BeaconState, error) {
if err := gloas.ProcessWithdrawals(s); err != nil {
return nil, err
}
return s, nil
})
})
}
}

View File

@@ -283,16 +283,16 @@ func (mr *MockValidatorClientMockRecorder) ProposeExit(ctx, in any) *gomock.Call
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProposeExit", reflect.TypeOf((*MockValidatorClient)(nil).ProposeExit), ctx, in)
}
// SwitchHost mocks base method.
func (m *MockValidatorClient) SwitchHost(host string) {
// SetHost mocks base method.
func (m *MockValidatorClient) SetHost(host string) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SwitchHost", host)
m.ctrl.Call(m, "SetHost", host)
}
// SwitchHost indicates an expected call of SwitchHost.
func (mr *MockValidatorClientMockRecorder) SwitchHost(host any) *gomock.Call {
// SetHost indicates an expected call of SetHost.
func (mr *MockValidatorClientMockRecorder) SetHost(host any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SwitchHost", reflect.TypeOf((*MockValidatorClient)(nil).SwitchHost), host)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHost", reflect.TypeOf((*MockValidatorClient)(nil).SetHost), host)
}
// StartEventStream mocks base method.

View File

@@ -25,7 +25,6 @@ go_library(
],
deps = [
"//api/grpc:go_default_library",
"//api/rest:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//cmd/validator/flags:go_default_library",
"//config/fieldparams:go_default_library",

View File

@@ -3,13 +3,14 @@ package accounts
import (
"context"
"io"
"net/http"
"os"
"time"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/crypto/bls"
"github.com/OffchainLabs/prysm/v7/validator/accounts/wallet"
beaconApi "github.com/OffchainLabs/prysm/v7/validator/client/beacon-api"
iface "github.com/OffchainLabs/prysm/v7/validator/client/iface"
nodeClientFactory "github.com/OffchainLabs/prysm/v7/validator/client/node-client-factory"
validatorClientFactory "github.com/OffchainLabs/prysm/v7/validator/client/validator-client-factory"
@@ -76,17 +77,22 @@ func (acm *CLIManager) prepareBeaconClients(ctx context.Context) (*iface.Validat
}
ctx = grpcutil.AppendHeaders(ctx, acm.grpcHeaders)
conn, err := validatorHelpers.NewNodeConnection(
validatorHelpers.WithGRPC(ctx, acm.beaconRPCProvider, acm.dialOpts),
validatorHelpers.WithREST(acm.beaconApiEndpoint, rest.WithHttpTimeout(acm.beaconApiTimeout)),
)
grpcConn, err := grpc.DialContext(ctx, acm.beaconRPCProvider, acm.dialOpts...)
if err != nil {
return nil, nil, err
return nil, nil, errors.Wrapf(err, "could not dial endpoint %s", acm.beaconRPCProvider)
}
conn := validatorHelpers.NewNodeConnection(
grpcConn,
acm.beaconApiEndpoint,
validatorHelpers.WithBeaconApiTimeout(acm.beaconApiTimeout),
)
validatorClient := validatorClientFactory.NewValidatorClient(conn)
nodeClient := nodeClientFactory.NewNodeClient(conn)
restHandler := beaconApi.NewBeaconApiRestHandler(
http.Client{Timeout: acm.beaconApiTimeout},
acm.beaconApiEndpoint,
)
validatorClient := validatorClientFactory.NewValidatorClient(conn, restHandler)
nodeClient := nodeClientFactory.NewNodeClient(conn, restHandler)
return &validatorClient, &nodeClient, nil
}

View File

@@ -10,6 +10,7 @@ go_library(
"log.go",
"log_helpers.go",
"metrics.go",
"multiple_endpoints_grpc_resolver.go",
"propose.go",
"registration.go",
"runner.go",
@@ -28,7 +29,6 @@ go_library(
"//api/client:go_default_library",
"//api/client/event:go_default_library",
"//api/grpc:go_default_library",
"//api/rest:go_default_library",
"//api/server/structs:go_default_library",
"//async:go_default_library",
"//async/event:go_default_library",
@@ -58,6 +58,7 @@ go_library(
"//time/slots:go_default_library",
"//validator/accounts/iface:go_default_library",
"//validator/accounts/wallet:go_default_library",
"//validator/client/beacon-api:go_default_library",
"//validator/client/beacon-chain-client-factory:go_default_library",
"//validator/client/iface:go_default_library",
"//validator/client/node-client-factory:go_default_library",
@@ -85,11 +86,13 @@ go_library(
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opentelemetry_go_contrib_instrumentation_google_golang_org_grpc_otelgrpc//:go_default_library",
"@io_opentelemetry_go_contrib_instrumentation_net_http_otelhttp//:go_default_library",
"@io_opentelemetry_go_otel_trace//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//credentials:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
"@org_golang_google_grpc//resolver:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
"@org_golang_google_protobuf//types/known/emptypb:go_default_library",
@@ -121,8 +124,6 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//api/grpc:go_default_library",
"//api/rest:go_default_library",
"//api/server/structs:go_default_library",
"//async/event:go_default_library",
"//beacon-chain/core/signing:go_default_library",

View File

@@ -26,6 +26,7 @@ go_library(
"propose_exit.go",
"prysm_beacon_chain_client.go",
"registration.go",
"rest_handler_client.go",
"state_validators.go",
"status.go",
"stream_blocks.go",
@@ -42,7 +43,6 @@ go_library(
"//api:go_default_library",
"//api/apiutil:go_default_library",
"//api/client/event:go_default_library",
"//api/rest:go_default_library",
"//api/server/structs:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
@@ -111,7 +111,6 @@ go_test(
deps = [
"//api:go_default_library",
"//api/apiutil:go_default_library",
"//api/rest:go_default_library",
"//api/server/structs:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/rpc/eth/shared/testing:go_default_library",

View File

@@ -5,7 +5,6 @@ import (
"reflect"
"strconv"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
@@ -18,7 +17,7 @@ import (
type beaconApiChainClient struct {
fallbackClient iface.ChainClient
jsonRestHandler rest.RestHandler
jsonRestHandler RestHandler
stateValidatorsProvider StateValidatorsProvider
}
@@ -328,7 +327,7 @@ func (c beaconApiChainClient) ValidatorParticipation(ctx context.Context, in *et
return nil, errors.New("beaconApiChainClient.ValidatorParticipation is not implemented. To use a fallback client, pass a fallback client as the last argument of NewBeaconApiChainClientWithFallback.")
}
func NewBeaconApiChainClientWithFallback(jsonRestHandler rest.RestHandler, fallbackClient iface.ChainClient) iface.ChainClient {
func NewBeaconApiChainClientWithFallback(jsonRestHandler RestHandler, fallbackClient iface.ChainClient) iface.ChainClient {
return &beaconApiChainClient{
jsonRestHandler: jsonRestHandler,
fallbackClient: fallbackClient,

View File

@@ -5,7 +5,6 @@ import (
"net/http"
"strconv"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
@@ -21,7 +20,7 @@ var (
type beaconApiNodeClient struct {
fallbackClient iface.NodeClient
jsonRestHandler rest.RestHandler
jsonRestHandler RestHandler
genesisProvider GenesisProvider
}
@@ -116,7 +115,7 @@ func (c *beaconApiNodeClient) IsReady(ctx context.Context) bool {
return statusCode == http.StatusOK
}
func NewNodeClientWithFallback(jsonRestHandler rest.RestHandler, fallbackClient iface.NodeClient) iface.NodeClient {
func NewNodeClientWithFallback(jsonRestHandler RestHandler, fallbackClient iface.NodeClient) iface.NodeClient {
b := &beaconApiNodeClient{
jsonRestHandler: jsonRestHandler,
fallbackClient: fallbackClient,

View File

@@ -6,7 +6,6 @@ import (
"time"
"github.com/OffchainLabs/prysm/v7/api/client/event"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
@@ -23,13 +22,13 @@ type beaconApiValidatorClient struct {
genesisProvider GenesisProvider
dutiesProvider dutiesProvider
stateValidatorsProvider StateValidatorsProvider
jsonRestHandler rest.RestHandler
jsonRestHandler RestHandler
beaconBlockConverter BeaconBlockConverter
prysmChainClient iface.PrysmChainClient
isEventStreamRunning bool
}
func NewBeaconApiValidatorClient(jsonRestHandler rest.RestHandler, opts ...ValidatorClientOpt) iface.ValidatorClient {
func NewBeaconApiValidatorClient(jsonRestHandler RestHandler, opts ...ValidatorClientOpt) iface.ValidatorClient {
c := &beaconApiValidatorClient{
genesisProvider: &beaconApiGenesisProvider{jsonRestHandler: jsonRestHandler},
dutiesProvider: beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler},
@@ -332,6 +331,6 @@ func (c *beaconApiValidatorClient) Host() string {
return c.jsonRestHandler.Host()
}
func (c *beaconApiValidatorClient) SwitchHost(host string) {
c.jsonRestHandler.SwitchHost(host)
func (c *beaconApiValidatorClient) SetHost(host string) {
c.jsonRestHandler.SetHost(host)
}

View File

@@ -549,7 +549,7 @@ func TestBeaconApiValidatorClient_Host(t *testing.T) {
hosts := []string{"http://localhost:8080", "http://localhost:8081"}
jsonRestHandler := mock.NewMockJsonRestHandler(ctrl)
jsonRestHandler.EXPECT().SwitchHost(
jsonRestHandler.EXPECT().SetHost(
hosts[0],
).Times(1)
jsonRestHandler.EXPECT().Host().Return(
@@ -557,17 +557,17 @@ func TestBeaconApiValidatorClient_Host(t *testing.T) {
).Times(1)
validatorClient := beaconApiValidatorClient{jsonRestHandler: jsonRestHandler}
validatorClient.SwitchHost(hosts[0])
validatorClient.SetHost(hosts[0])
host := validatorClient.Host()
require.Equal(t, hosts[0], host)
jsonRestHandler.EXPECT().SwitchHost(
jsonRestHandler.EXPECT().SetHost(
hosts[1],
).Times(1)
jsonRestHandler.EXPECT().Host().Return(
hosts[1],
).Times(1)
validatorClient.SwitchHost(hosts[1])
validatorClient.SetHost(hosts[1])
host = validatorClient.Host()
require.Equal(t, hosts[1], host)
}

View File

@@ -9,7 +9,6 @@ import (
"strconv"
"github.com/OffchainLabs/prysm/v7/api/apiutil"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
@@ -28,7 +27,7 @@ type dutiesProvider interface {
}
type beaconApiDutiesProvider struct {
jsonRestHandler rest.RestHandler
jsonRestHandler RestHandler
}
type attesterDuty struct {

View File

@@ -7,7 +7,6 @@ import (
"sync"
"time"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
@@ -21,7 +20,7 @@ type GenesisProvider interface {
}
type beaconApiGenesisProvider struct {
jsonRestHandler rest.RestHandler
jsonRestHandler RestHandler
genesis *structs.Genesis
once sync.Once
}

View File

@@ -150,14 +150,14 @@ func (mr *MockRestHandlerMockRecorder) PostSSZ(ctx, endpoint, headers, data any)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PostSSZ", reflect.TypeOf((*MockRestHandler)(nil).PostSSZ), ctx, endpoint, headers, data)
}
// SwitchHost mocks base method.
func (m *MockRestHandler) SwitchHost(host string) {
// SetHost mocks base method.
func (m *MockRestHandler) SetHost(host string) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SwitchHost", host)
m.ctrl.Call(m, "SetHost", host)
}
// SwitchHost indicates an expected call of SwitchHost.
func (mr *MockRestHandlerMockRecorder) SwitchHost(host any) *gomock.Call {
// SetHost indicates an expected call of SetHost.
func (mr *MockRestHandlerMockRecorder) SetHost(host any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SwitchHost", reflect.TypeOf((*MockRestHandler)(nil).SwitchHost), host)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHost", reflect.TypeOf((*MockRestHandler)(nil).SetHost), host)
}

View File

@@ -10,7 +10,6 @@ import (
"strings"
"github.com/OffchainLabs/prysm/v7/api/apiutil"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
validator2 "github.com/OffchainLabs/prysm/v7/consensus-types/validator"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
@@ -19,7 +18,7 @@ import (
)
// NewPrysmChainClient returns implementation of iface.PrysmChainClient.
func NewPrysmChainClient(jsonRestHandler rest.RestHandler, nodeClient iface.NodeClient) iface.PrysmChainClient {
func NewPrysmChainClient(jsonRestHandler RestHandler, nodeClient iface.NodeClient) iface.PrysmChainClient {
return prysmChainClient{
jsonRestHandler: jsonRestHandler,
nodeClient: nodeClient,
@@ -27,7 +26,7 @@ func NewPrysmChainClient(jsonRestHandler rest.RestHandler, nodeClient iface.Node
}
type prysmChainClient struct {
jsonRestHandler rest.RestHandler
jsonRestHandler RestHandler
nodeClient iface.NodeClient
}

View File

@@ -1,4 +1,4 @@
package rest
package beacon_api
import (
"bytes"
@@ -21,7 +21,6 @@ import (
type reqOption func(*http.Request)
// RestHandler defines the interface for making REST API requests.
type RestHandler interface {
Get(ctx context.Context, endpoint string, resp any) error
GetStatusCode(ctx context.Context, endpoint string) (int, error)
@@ -30,34 +29,29 @@ type RestHandler interface {
PostSSZ(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer) ([]byte, http.Header, error)
HttpClient() *http.Client
Host() string
SwitchHost(host string)
SetHost(host string)
}
type restHandler struct {
type BeaconApiRestHandler struct {
client http.Client
host string
reqOverrides []reqOption
}
// newRestHandler returns a RestHandler (internal use)
func newRestHandler(client http.Client, host string) RestHandler {
return NewRestHandler(client, host)
}
// NewRestHandler returns a RestHandler
func NewRestHandler(client http.Client, host string) RestHandler {
rh := &restHandler{
// NewBeaconApiRestHandler returns a RestHandler
func NewBeaconApiRestHandler(client http.Client, host string) RestHandler {
brh := &BeaconApiRestHandler{
client: client,
host: host,
}
rh.appendAcceptOverride()
return rh
brh.appendAcceptOverride()
return brh
}
// appendAcceptOverride enables the Accept header to be customized at runtime via an environment variable.
// This is specified as an env var because it is a niche option that prysm may use for performance testing or debugging
// bug which users are unlikely to need. Using an env var keeps the set of user-facing flags cleaner.
func (c *restHandler) appendAcceptOverride() {
func (c *BeaconApiRestHandler) appendAcceptOverride() {
if accept := os.Getenv(params.EnvNameOverrideAccept); accept != "" {
c.reqOverrides = append(c.reqOverrides, func(req *http.Request) {
req.Header.Set("Accept", accept)
@@ -66,18 +60,18 @@ func (c *restHandler) appendAcceptOverride() {
}
// HttpClient returns the underlying HTTP client of the handler
func (c *restHandler) HttpClient() *http.Client {
func (c *BeaconApiRestHandler) HttpClient() *http.Client {
return &c.client
}
// Host returns the underlying HTTP host
func (c *restHandler) Host() string {
func (c *BeaconApiRestHandler) Host() string {
return c.host
}
// Get sends a GET request and decodes the response body as a JSON object into the passed in object.
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
func (c *restHandler) Get(ctx context.Context, endpoint string, resp any) error {
func (c *BeaconApiRestHandler) Get(ctx context.Context, endpoint string, resp any) error {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -100,7 +94,7 @@ func (c *restHandler) Get(ctx context.Context, endpoint string, resp any) error
// GetStatusCode sends a GET request and returns only the HTTP status code.
// This is useful for endpoints like /eth/v1/node/health that communicate status via HTTP codes
// (200 = ready, 206 = syncing, 503 = unavailable) rather than response bodies.
func (c *restHandler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
func (c *BeaconApiRestHandler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -119,7 +113,7 @@ func (c *restHandler) GetStatusCode(ctx context.Context, endpoint string) (int,
return httpResp.StatusCode, nil
}
func (c *restHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -174,7 +168,7 @@ func (c *restHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http
// Post sends a POST request and decodes the response body as a JSON object into the passed in object.
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
func (c *restHandler) Post(
func (c *BeaconApiRestHandler) Post(
ctx context.Context,
apiEndpoint string,
headers map[string]string,
@@ -210,7 +204,7 @@ func (c *restHandler) Post(
}
// PostSSZ sends a POST request and prefers an SSZ (application/octet-stream) response body.
func (c *restHandler) PostSSZ(
func (c *BeaconApiRestHandler) PostSSZ(
ctx context.Context,
apiEndpoint string,
headers map[string]string,
@@ -311,6 +305,6 @@ func decodeResp(httpResp *http.Response, resp any) error {
return nil
}
func (c *restHandler) SwitchHost(host string) {
func (c *BeaconApiRestHandler) SetHost(host string) {
c.host = host
}

View File

@@ -12,12 +12,13 @@ import (
"time"
"github.com/OffchainLabs/prysm/v7/api"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/network/httputil"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
)
@@ -44,7 +45,10 @@ func TestGet(t *testing.T) {
server := httptest.NewServer(mux)
defer server.Close()
jsonRestHandler := rest.NewRestHandler(http.Client{Timeout: time.Second * 5}, server.URL)
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Second * 5},
host: server.URL,
}
resp := &structs.GetGenesisResponse{}
require.NoError(t, jsonRestHandler.Get(ctx, endpoint+"?arg1=abc&arg2=def", resp))
assert.DeepEqual(t, genesisJson, resp)
@@ -75,7 +79,10 @@ func TestGetSSZ(t *testing.T) {
server := httptest.NewServer(mux)
defer server.Close()
jsonRestHandler := rest.NewRestHandler(http.Client{Timeout: time.Second * 5}, server.URL)
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Second * 5},
host: server.URL,
}
body, header, err := jsonRestHandler.GetSSZ(ctx, endpoint)
require.NoError(t, err)
@@ -101,7 +108,10 @@ func TestGetSSZ(t *testing.T) {
server := httptest.NewServer(mux)
defer server.Close()
jsonRestHandler := rest.NewRestHandler(http.Client{Timeout: time.Second * 5}, server.URL)
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Second * 5},
host: server.URL,
}
body, header, err := jsonRestHandler.GetSSZ(ctx, endpoint)
require.NoError(t, err)
@@ -126,7 +136,10 @@ func TestGetSSZ(t *testing.T) {
server := httptest.NewServer(mux)
defer server.Close()
jsonRestHandler := rest.NewRestHandler(http.Client{Timeout: time.Second * 5}, server.URL)
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Second * 5},
host: server.URL,
}
_, _, err := jsonRestHandler.GetSSZ(ctx, endpoint)
require.NoError(t, err)
@@ -148,7 +161,7 @@ func TestAcceptOverrideSSZ(t *testing.T) {
require.NoError(t, err)
}))
defer srv.Close()
c := rest.NewRestHandler(http.Client{Timeout: time.Second * 5}, srv.URL)
c := NewBeaconApiRestHandler(http.Client{Timeout: time.Second * 5}, srv.URL)
_, _, err := c.GetSSZ(t.Context(), "/test")
require.NoError(t, err)
}
@@ -191,12 +204,162 @@ func TestPost(t *testing.T) {
server := httptest.NewServer(mux)
defer server.Close()
jsonRestHandler := rest.NewRestHandler(http.Client{Timeout: time.Second * 5}, server.URL)
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Second * 5},
host: server.URL,
}
resp := &structs.GetGenesisResponse{}
require.NoError(t, jsonRestHandler.Post(ctx, endpoint, headers, bytes.NewBuffer(dataBytes), resp))
assert.DeepEqual(t, genesisJson, resp)
}
func Test_decodeResp(t *testing.T) {
type j struct {
Foo string `json:"foo"`
}
t.Run("200 JSON with charset", func(t *testing.T) {
body := bytes.Buffer{}
r := &http.Response{
Status: "200",
StatusCode: http.StatusOK,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {"application/json; charset=utf-8"}},
}
require.NoError(t, decodeResp(r, nil))
})
t.Run("200 non-JSON", func(t *testing.T) {
body := bytes.Buffer{}
r := &http.Response{
Status: "200",
StatusCode: http.StatusOK,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.OctetStreamMediaType}},
}
require.NoError(t, decodeResp(r, nil))
})
t.Run("204 non-JSON", func(t *testing.T) {
body := bytes.Buffer{}
r := &http.Response{
Status: "204",
StatusCode: http.StatusNoContent,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.OctetStreamMediaType}},
}
require.NoError(t, decodeResp(r, nil))
})
t.Run("500 non-JSON", func(t *testing.T) {
body := bytes.Buffer{}
_, err := body.WriteString("foo")
require.NoError(t, err)
r := &http.Response{
Status: "500",
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.OctetStreamMediaType}},
}
err = decodeResp(r, nil)
errJson := &httputil.DefaultJsonError{}
require.Equal(t, true, errors.As(err, &errJson))
assert.Equal(t, http.StatusInternalServerError, errJson.Code)
assert.Equal(t, "foo", errJson.Message)
})
t.Run("200 JSON with resp", func(t *testing.T) {
body := bytes.Buffer{}
b, err := json.Marshal(&j{Foo: "foo"})
require.NoError(t, err)
body.Write(b)
r := &http.Response{
Status: "200",
StatusCode: http.StatusOK,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
}
resp := &j{}
require.NoError(t, decodeResp(r, resp))
assert.Equal(t, "foo", resp.Foo)
})
t.Run("200 JSON without resp", func(t *testing.T) {
body := bytes.Buffer{}
r := &http.Response{
Status: "200",
StatusCode: http.StatusOK,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
}
require.NoError(t, decodeResp(r, nil))
})
t.Run("204 JSON", func(t *testing.T) {
body := bytes.Buffer{}
r := &http.Response{
Status: "204",
StatusCode: http.StatusNoContent,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
}
require.NoError(t, decodeResp(r, nil))
})
t.Run("500 JSON", func(t *testing.T) {
body := bytes.Buffer{}
b, err := json.Marshal(&httputil.DefaultJsonError{Code: http.StatusInternalServerError, Message: "error"})
require.NoError(t, err)
body.Write(b)
r := &http.Response{
Status: "500",
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
}
err = decodeResp(r, nil)
errJson := &httputil.DefaultJsonError{}
require.Equal(t, true, errors.As(err, &errJson))
assert.Equal(t, http.StatusInternalServerError, errJson.Code)
assert.Equal(t, "error", errJson.Message)
})
t.Run("200 JSON cannot decode", func(t *testing.T) {
body := bytes.Buffer{}
_, err := body.WriteString("foo")
require.NoError(t, err)
r := &http.Response{
Status: "200",
StatusCode: http.StatusOK,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
Request: &http.Request{},
}
resp := &j{}
err = decodeResp(r, resp)
assert.ErrorContains(t, "failed to decode response body into json", err)
})
t.Run("500 JSON cannot decode", func(t *testing.T) {
body := bytes.Buffer{}
_, err := body.WriteString("foo")
require.NoError(t, err)
r := &http.Response{
Status: "500",
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
Request: &http.Request{},
}
err = decodeResp(r, nil)
assert.ErrorContains(t, "failed to decode response body into error json", err)
})
t.Run("500 not JSON", func(t *testing.T) {
body := bytes.Buffer{}
_, err := body.WriteString("foo")
require.NoError(t, err)
r := &http.Response{
Status: "500",
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {"text/plain"}},
Request: &http.Request{},
}
err = decodeResp(r, nil)
assert.ErrorContains(t, "HTTP request unsuccessful (500: foo)", err)
})
}
func TestGetStatusCode(t *testing.T) {
ctx := t.Context()
const endpoint = "/eth/v1/node/health"
@@ -238,7 +401,10 @@ func TestGetStatusCode(t *testing.T) {
server := httptest.NewServer(mux)
defer server.Close()
jsonRestHandler := rest.NewRestHandler(http.Client{Timeout: time.Second * 5}, server.URL)
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Second * 5},
host: server.URL,
}
statusCode, err := jsonRestHandler.GetStatusCode(ctx, endpoint)
require.NoError(t, err)
@@ -247,7 +413,10 @@ func TestGetStatusCode(t *testing.T) {
}
t.Run("returns error on connection failure", func(t *testing.T) {
jsonRestHandler := rest.NewRestHandler(http.Client{Timeout: time.Millisecond * 100}, "http://localhost:99999")
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Millisecond * 100},
host: "http://localhost:99999", // Invalid port
}
_, err := jsonRestHandler.GetStatusCode(ctx, endpoint)
require.ErrorContains(t, "failed to perform request", err)

View File

@@ -9,7 +9,6 @@ import (
"strconv"
"github.com/OffchainLabs/prysm/v7/api/apiutil"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/pkg/errors"
@@ -22,7 +21,7 @@ type StateValidatorsProvider interface {
}
type beaconApiStateValidatorsProvider struct {
jsonRestHandler rest.RestHandler
jsonRestHandler RestHandler
}
func (c beaconApiStateValidatorsProvider) StateValidators(

View File

@@ -9,17 +9,19 @@ import (
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
)
func NewChainClient(validatorConn validatorHelpers.NodeConnection) iface.ChainClient {
grpcClient := grpcApi.NewGrpcChainClient(validatorConn)
func NewChainClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.ChainClient {
grpcClient := grpcApi.NewGrpcChainClient(validatorConn.GetGrpcClientConn())
if features.Get().EnableBeaconRESTApi {
return beaconApi.NewBeaconApiChainClientWithFallback(validatorConn.GetRestHandler(), grpcClient)
return beaconApi.NewBeaconApiChainClientWithFallback(jsonRestHandler, grpcClient)
} else {
return grpcClient
}
return grpcClient
}
func NewPrysmChainClient(validatorConn validatorHelpers.NodeConnection) iface.PrysmChainClient {
func NewPrysmChainClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.PrysmChainClient {
if features.Get().EnableBeaconRESTApi {
return beaconApi.NewPrysmChainClient(validatorConn.GetRestHandler(), nodeClientFactory.NewNodeClient(validatorConn))
return beaconApi.NewPrysmChainClient(jsonRestHandler, nodeClientFactory.NewNodeClient(validatorConn, jsonRestHandler))
} else {
return grpcApi.NewGrpcPrysmChainClient(validatorConn.GetGrpcClientConn())
}
return grpcApi.NewGrpcPrysmChainClient(validatorConn)
}

View File

@@ -4,7 +4,6 @@ go_library(
name = "go_default_library",
srcs = [
"grpc_beacon_chain_client.go",
"grpc_client_manager.go",
"grpc_node_client.go",
"grpc_prysm_beacon_chain_client.go",
"grpc_validator_client.go",
@@ -26,7 +25,6 @@ go_library(
"//proto/eth/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//validator/client/iface:go_default_library",
"//validator/helpers:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_golang_protobuf//ptypes/empty",
"@com_github_pkg_errors//:go_default_library",
@@ -41,8 +39,6 @@ go_test(
name = "go_default_test",
size = "small",
srcs = [
"grpc_client_manager_test.go",
"grpc_node_client_test.go",
"grpc_prysm_beacon_chain_client_test.go",
"grpc_validator_client_test.go",
],
@@ -60,11 +56,7 @@ go_test(
"//testing/util:go_default_library",
"//testing/validator-mock:go_default_library",
"//validator/client/iface:go_default_library",
"//validator/helpers:go_default_library",
"//validator/testing:go_default_library",
"@com_github_golang_protobuf//ptypes/empty",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_protobuf//types/known/emptypb:go_default_library",
"@org_uber_go_mock//gomock:go_default_library",
],

View File

@@ -5,42 +5,38 @@ import (
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"
)
type grpcChainClient struct {
*grpcClientManager[ethpb.BeaconChainClient]
beaconChainClient ethpb.BeaconChainClient
}
func (c *grpcChainClient) ChainHead(ctx context.Context, in *empty.Empty) (*ethpb.ChainHead, error) {
return c.getClient().GetChainHead(ctx, in)
return c.beaconChainClient.GetChainHead(ctx, in)
}
func (c *grpcChainClient) ValidatorBalances(ctx context.Context, in *ethpb.ListValidatorBalancesRequest) (*ethpb.ValidatorBalances, error) {
return c.getClient().ListValidatorBalances(ctx, in)
return c.beaconChainClient.ListValidatorBalances(ctx, in)
}
func (c *grpcChainClient) Validators(ctx context.Context, in *ethpb.ListValidatorsRequest) (*ethpb.Validators, error) {
return c.getClient().ListValidators(ctx, in)
return c.beaconChainClient.ListValidators(ctx, in)
}
func (c *grpcChainClient) ValidatorQueue(ctx context.Context, in *empty.Empty) (*ethpb.ValidatorQueue, error) {
return c.getClient().GetValidatorQueue(ctx, in)
return c.beaconChainClient.GetValidatorQueue(ctx, in)
}
func (c *grpcChainClient) ValidatorPerformance(ctx context.Context, in *ethpb.ValidatorPerformanceRequest) (*ethpb.ValidatorPerformanceResponse, error) {
return c.getClient().GetValidatorPerformance(ctx, in)
return c.beaconChainClient.GetValidatorPerformance(ctx, in)
}
func (c *grpcChainClient) ValidatorParticipation(ctx context.Context, in *ethpb.GetValidatorParticipationRequest) (*ethpb.ValidatorParticipationResponse, error) {
return c.getClient().GetValidatorParticipation(ctx, in)
return c.beaconChainClient.GetValidatorParticipation(ctx, in)
}
// NewGrpcChainClient creates a new gRPC chain client that supports
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
func NewGrpcChainClient(conn validatorHelpers.NodeConnection) iface.ChainClient {
return &grpcChainClient{
grpcClientManager: newGrpcClientManager(conn, ethpb.NewBeaconChainClient),
}
func NewGrpcChainClient(cc grpc.ClientConnInterface) iface.ChainClient {
return &grpcChainClient{ethpb.NewBeaconChainClient(cc)}
}

View File

@@ -1,44 +0,0 @@
package grpc_api
import (
"sync"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"google.golang.org/grpc"
)
// grpcClientManager handles dynamic gRPC client recreation when the connection changes.
// It uses generics to work with any gRPC client type.
type grpcClientManager[T any] struct {
mu sync.Mutex
conn validatorHelpers.NodeConnection
client T
lastHost string
newClient func(grpc.ClientConnInterface) T
}
// newGrpcClientManager creates a new client manager with the given connection and client constructor.
func newGrpcClientManager[T any](
conn validatorHelpers.NodeConnection,
newClient func(grpc.ClientConnInterface) T,
) *grpcClientManager[T] {
return &grpcClientManager[T]{
conn: conn,
newClient: newClient,
client: newClient(conn.GetGrpcClientConn()),
lastHost: conn.GetGrpcConnectionProvider().CurrentHost(),
}
}
// getClient returns the current client, recreating it if the connection has changed.
func (m *grpcClientManager[T]) getClient() T {
m.mu.Lock()
defer m.mu.Unlock()
currentHost := m.conn.GetGrpcConnectionProvider().CurrentHost()
if m.lastHost != currentHost {
m.client = m.newClient(m.conn.GetGrpcClientConn())
m.lastHost = currentHost
}
return m.client
}

View File

@@ -1,168 +0,0 @@
package grpc_api
import (
"sync"
"testing"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"google.golang.org/grpc"
)
// mockProvider implements grpcutil.GrpcConnectionProvider for testing.
type mockProvider struct {
hosts []string
currentIndex int
mu sync.Mutex
}
func (m *mockProvider) CurrentConn() *grpc.ClientConn { return nil }
func (m *mockProvider) Hosts() []string { return m.hosts }
func (m *mockProvider) Close() {}
func (m *mockProvider) CurrentHost() string {
m.mu.Lock()
defer m.mu.Unlock()
return m.hosts[m.currentIndex]
}
func (m *mockProvider) SwitchHost(index int) error {
m.mu.Lock()
defer m.mu.Unlock()
m.currentIndex = index
return nil
}
// nextHost is a test helper for round-robin simulation (not part of the interface).
func (m *mockProvider) nextHost() {
m.mu.Lock()
defer m.mu.Unlock()
m.currentIndex = (m.currentIndex + 1) % len(m.hosts)
}
// testClient is a simple type for testing the generic client manager.
type testClient struct{ id int }
// testManager creates a manager with client creation counting.
func testManager(t *testing.T, provider *mockProvider) (*grpcClientManager[*testClient], *int) {
conn, err := validatorHelpers.NewNodeConnection(validatorHelpers.WithGRPCProvider(provider))
require.NoError(t, err)
clientCount := new(int)
newClient := func(grpc.ClientConnInterface) *testClient {
*clientCount++
return &testClient{id: *clientCount}
}
manager := newGrpcClientManager(conn, newClient)
require.NotNil(t, manager)
return manager, clientCount
}
func TestGrpcClientManager(t *testing.T) {
t.Run("tracks host", func(t *testing.T) {
provider := &mockProvider{hosts: []string{"host1:4000", "host2:4000"}}
manager, count := testManager(t, provider)
assert.Equal(t, 1, *count)
assert.Equal(t, "host1:4000", manager.lastHost)
})
t.Run("same host returns same client", func(t *testing.T) {
provider := &mockProvider{hosts: []string{"host1:4000", "host2:4000"}}
manager, count := testManager(t, provider)
c1, c2, c3 := manager.getClient(), manager.getClient(), manager.getClient()
assert.Equal(t, 1, *count)
assert.Equal(t, c1, c2)
assert.Equal(t, c2, c3)
})
t.Run("host change recreates client", func(t *testing.T) {
provider := &mockProvider{hosts: []string{"host1:4000", "host2:4000"}}
manager, count := testManager(t, provider)
c1 := manager.getClient()
assert.Equal(t, 1, c1.id)
provider.nextHost()
c2 := manager.getClient()
assert.Equal(t, 2, *count)
assert.Equal(t, 2, c2.id)
// Same host again - no recreation
c3 := manager.getClient()
assert.Equal(t, 2, *count)
assert.Equal(t, c2, c3)
})
t.Run("multiple host switches", func(t *testing.T) {
provider := &mockProvider{hosts: []string{"host1:4000", "host2:4000", "host3:4000"}}
manager, count := testManager(t, provider)
assert.Equal(t, 1, *count)
for expected := 2; expected <= 4; expected++ {
provider.nextHost()
_ = manager.getClient()
assert.Equal(t, expected, *count)
}
})
}
func TestGrpcClientManager_Concurrent(t *testing.T) {
t.Run("concurrent access same host", func(t *testing.T) {
provider := &mockProvider{hosts: []string{"host1:4000", "host2:4000"}}
manager, _ := testManager(t, provider)
var clientCount int
var countMu sync.Mutex
// Override with thread-safe counter
manager.newClient = func(grpc.ClientConnInterface) *testClient {
countMu.Lock()
clientCount++
id := clientCount
countMu.Unlock()
return &testClient{id: id}
}
manager.client = manager.newClient(nil)
clientCount = 1
var wg sync.WaitGroup
for range 100 {
wg.Go(func() { _ = manager.getClient() })
}
wg.Wait()
countMu.Lock()
assert.Equal(t, 1, clientCount)
countMu.Unlock()
})
t.Run("concurrent with host changes", func(t *testing.T) {
provider := &mockProvider{hosts: []string{"host1:4000", "host2:4000"}}
manager, _ := testManager(t, provider)
var clientCount int
var countMu sync.Mutex
manager.newClient = func(grpc.ClientConnInterface) *testClient {
countMu.Lock()
clientCount++
id := clientCount
countMu.Unlock()
return &testClient{id: id}
}
manager.client = manager.newClient(nil)
clientCount = 1
var wg sync.WaitGroup
for range 50 {
wg.Go(func() { _ = manager.getClient() })
wg.Go(func() { provider.nextHost() })
}
wg.Wait()
countMu.Lock()
assert.NotEqual(t, 0, clientCount, "Should have created at least one client")
countMu.Unlock()
})
}

View File

@@ -5,8 +5,8 @@ import (
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"
)
var (
@@ -14,40 +14,35 @@ var (
)
type grpcNodeClient struct {
*grpcClientManager[ethpb.NodeClient]
nodeClient ethpb.NodeClient
}
func (c *grpcNodeClient) SyncStatus(ctx context.Context, in *empty.Empty) (*ethpb.SyncStatus, error) {
return c.getClient().GetSyncStatus(ctx, in)
return c.nodeClient.GetSyncStatus(ctx, in)
}
func (c *grpcNodeClient) Genesis(ctx context.Context, in *empty.Empty) (*ethpb.Genesis, error) {
return c.getClient().GetGenesis(ctx, in)
return c.nodeClient.GetGenesis(ctx, in)
}
func (c *grpcNodeClient) Version(ctx context.Context, in *empty.Empty) (*ethpb.Version, error) {
return c.getClient().GetVersion(ctx, in)
return c.nodeClient.GetVersion(ctx, in)
}
func (c *grpcNodeClient) Peers(ctx context.Context, in *empty.Empty) (*ethpb.Peers, error) {
return c.getClient().ListPeers(ctx, in)
return c.nodeClient.ListPeers(ctx, in)
}
func (c *grpcNodeClient) IsReady(ctx context.Context) bool {
// GetHealth returns 200 OK only if node is synced and not optimistic.
// otherwise it will throw an error
_, err := c.getClient().GetHealth(ctx, &ethpb.HealthRequest{})
_, err := c.nodeClient.GetHealth(ctx, &ethpb.HealthRequest{})
if err != nil {
log.WithError(err).Debug("Node is not ready")
log.WithError(err).Error("Failed to get health of node")
return false
}
return true
}
// NewNodeClient creates a new gRPC node client that supports
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
func NewNodeClient(conn validatorHelpers.NodeConnection) iface.NodeClient {
return &grpcNodeClient{
grpcClientManager: newGrpcClientManager(conn, ethpb.NewNodeClient),
}
func NewNodeClient(cc grpc.ClientConnInterface) iface.NodeClient {
g := &grpcNodeClient{nodeClient: ethpb.NewNodeClient(cc)}
return g
}

View File

@@ -1,72 +0,0 @@
package grpc_api
import (
"errors"
"testing"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/mock"
"github.com/OffchainLabs/prysm/v7/testing/require"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"github.com/golang/protobuf/ptypes/empty"
"go.uber.org/mock/gomock"
"google.golang.org/grpc"
)
func TestGrpcNodeClient_IsReady(t *testing.T) {
// The IsReady function now relies on GetHealth which returns:
// - 200 OK (nil error) only if node is synced AND not optimistic
// - 206 Partial Content (error) if syncing or optimistic
// - 503 Unavailable (error) if unavailable
testCases := []struct {
name string
healthErr error
expectedResult bool
}{
{
name: "returns true when health check succeeds (synced and not optimistic)",
healthErr: nil,
expectedResult: true,
},
{
name: "returns false when health check fails (syncing, optimistic, or unavailable)",
healthErr: errors.New("node not ready"),
expectedResult: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx := t.Context()
mockNodeClient := mock.NewMockNodeClient(ctrl)
// Set up health check expectation
mockNodeClient.EXPECT().GetHealth(
gomock.Any(),
gomock.Any(),
).Return(&empty.Empty{}, tc.healthErr)
// Create a mock provider
provider := &mockProvider{hosts: []string{"host1:4000"}}
conn, err := validatorHelpers.NewNodeConnection(validatorHelpers.WithGRPCProvider(provider))
require.NoError(t, err)
// Create client with injected mock
client := &grpcNodeClient{
grpcClientManager: &grpcClientManager[ethpb.NodeClient]{
conn: conn,
client: mockNodeClient,
lastHost: "host1:4000",
newClient: func(grpc.ClientConnInterface) ethpb.NodeClient { return mockNodeClient },
},
}
result := client.IsReady(ctx)
assert.Equal(t, tc.expectedResult, result)
})
}
}

View File

@@ -12,9 +12,9 @@ import (
eth "github.com/OffchainLabs/prysm/v7/proto/eth/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
type grpcPrysmChainClient struct {
@@ -95,8 +95,6 @@ func (c *grpcPrysmChainClient) ValidatorPerformance(ctx context.Context, in *eth
return c.chainClient.ValidatorPerformance(ctx, in)
}
// NewGrpcPrysmChainClient creates a new gRPC Prysm chain client that supports
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
func NewGrpcPrysmChainClient(conn validatorHelpers.NodeConnection) iface.PrysmChainClient {
return &grpcPrysmChainClient{chainClient: NewGrpcChainClient(conn)}
func NewGrpcPrysmChainClient(cc grpc.ClientConnInterface) iface.PrysmChainClient {
return &grpcPrysmChainClient{chainClient: &grpcChainClient{ethpb.NewBeaconChainClient(cc)}}
}

View File

@@ -14,24 +14,24 @@ import (
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type grpcValidatorClient struct {
*grpcClientManager[ethpb.BeaconNodeValidatorClient]
isEventStreamRunning bool
beaconNodeValidatorClient ethpb.BeaconNodeValidatorClient
isEventStreamRunning bool
}
func (c *grpcValidatorClient) Duties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.ValidatorDutiesContainer, error) {
if features.Get().DisableDutiesV2 {
return c.getDuties(ctx, in)
}
dutiesResponse, err := c.getClient().GetDutiesV2(ctx, in)
dutiesResponse, err := c.beaconNodeValidatorClient.GetDutiesV2(ctx, in)
if err != nil {
if status.Code(err) == codes.Unimplemented {
log.Warn("GetDutiesV2 returned status code unavailable, falling back to GetDuties")
@@ -47,7 +47,7 @@ func (c *grpcValidatorClient) Duties(ctx context.Context, in *ethpb.DutiesReques
// getDuties is calling the v1 of get duties
func (c *grpcValidatorClient) getDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.ValidatorDutiesContainer, error) {
dutiesResponse, err := c.getClient().GetDuties(ctx, in)
dutiesResponse, err := c.beaconNodeValidatorClient.GetDuties(ctx, in)
if err != nil {
return nil, errors.Wrap(
client.ErrConnectionIssue,
@@ -147,108 +147,108 @@ func toValidatorDutyV2(duty *ethpb.DutiesV2Response_Duty) (*ethpb.ValidatorDuty,
}
func (c *grpcValidatorClient) CheckDoppelGanger(ctx context.Context, in *ethpb.DoppelGangerRequest) (*ethpb.DoppelGangerResponse, error) {
return c.getClient().CheckDoppelGanger(ctx, in)
return c.beaconNodeValidatorClient.CheckDoppelGanger(ctx, in)
}
func (c *grpcValidatorClient) DomainData(ctx context.Context, in *ethpb.DomainRequest) (*ethpb.DomainResponse, error) {
return c.getClient().DomainData(ctx, in)
return c.beaconNodeValidatorClient.DomainData(ctx, in)
}
func (c *grpcValidatorClient) AttestationData(ctx context.Context, in *ethpb.AttestationDataRequest) (*ethpb.AttestationData, error) {
return c.getClient().GetAttestationData(ctx, in)
return c.beaconNodeValidatorClient.GetAttestationData(ctx, in)
}
func (c *grpcValidatorClient) BeaconBlock(ctx context.Context, in *ethpb.BlockRequest) (*ethpb.GenericBeaconBlock, error) {
return c.getClient().GetBeaconBlock(ctx, in)
return c.beaconNodeValidatorClient.GetBeaconBlock(ctx, in)
}
func (c *grpcValidatorClient) FeeRecipientByPubKey(ctx context.Context, in *ethpb.FeeRecipientByPubKeyRequest) (*ethpb.FeeRecipientByPubKeyResponse, error) {
return c.getClient().GetFeeRecipientByPubKey(ctx, in)
return c.beaconNodeValidatorClient.GetFeeRecipientByPubKey(ctx, in)
}
func (c *grpcValidatorClient) SyncCommitteeContribution(ctx context.Context, in *ethpb.SyncCommitteeContributionRequest) (*ethpb.SyncCommitteeContribution, error) {
return c.getClient().GetSyncCommitteeContribution(ctx, in)
return c.beaconNodeValidatorClient.GetSyncCommitteeContribution(ctx, in)
}
func (c *grpcValidatorClient) SyncMessageBlockRoot(ctx context.Context, in *empty.Empty) (*ethpb.SyncMessageBlockRootResponse, error) {
return c.getClient().GetSyncMessageBlockRoot(ctx, in)
return c.beaconNodeValidatorClient.GetSyncMessageBlockRoot(ctx, in)
}
func (c *grpcValidatorClient) SyncSubcommitteeIndex(ctx context.Context, in *ethpb.SyncSubcommitteeIndexRequest) (*ethpb.SyncSubcommitteeIndexResponse, error) {
return c.getClient().GetSyncSubcommitteeIndex(ctx, in)
return c.beaconNodeValidatorClient.GetSyncSubcommitteeIndex(ctx, in)
}
func (c *grpcValidatorClient) MultipleValidatorStatus(ctx context.Context, in *ethpb.MultipleValidatorStatusRequest) (*ethpb.MultipleValidatorStatusResponse, error) {
return c.getClient().MultipleValidatorStatus(ctx, in)
return c.beaconNodeValidatorClient.MultipleValidatorStatus(ctx, in)
}
func (c *grpcValidatorClient) PrepareBeaconProposer(ctx context.Context, in *ethpb.PrepareBeaconProposerRequest) (*empty.Empty, error) {
return c.getClient().PrepareBeaconProposer(ctx, in)
return c.beaconNodeValidatorClient.PrepareBeaconProposer(ctx, in)
}
func (c *grpcValidatorClient) ProposeAttestation(ctx context.Context, in *ethpb.Attestation) (*ethpb.AttestResponse, error) {
return c.getClient().ProposeAttestation(ctx, in)
return c.beaconNodeValidatorClient.ProposeAttestation(ctx, in)
}
func (c *grpcValidatorClient) ProposeAttestationElectra(ctx context.Context, in *ethpb.SingleAttestation) (*ethpb.AttestResponse, error) {
return c.getClient().ProposeAttestationElectra(ctx, in)
return c.beaconNodeValidatorClient.ProposeAttestationElectra(ctx, in)
}
func (c *grpcValidatorClient) ProposeBeaconBlock(ctx context.Context, in *ethpb.GenericSignedBeaconBlock) (*ethpb.ProposeResponse, error) {
return c.getClient().ProposeBeaconBlock(ctx, in)
return c.beaconNodeValidatorClient.ProposeBeaconBlock(ctx, in)
}
func (c *grpcValidatorClient) ProposeExit(ctx context.Context, in *ethpb.SignedVoluntaryExit) (*ethpb.ProposeExitResponse, error) {
return c.getClient().ProposeExit(ctx, in)
return c.beaconNodeValidatorClient.ProposeExit(ctx, in)
}
func (c *grpcValidatorClient) StreamBlocksAltair(ctx context.Context, in *ethpb.StreamBlocksRequest) (ethpb.BeaconNodeValidator_StreamBlocksAltairClient, error) {
return c.getClient().StreamBlocksAltair(ctx, in)
return c.beaconNodeValidatorClient.StreamBlocksAltair(ctx, in)
}
func (c *grpcValidatorClient) SubmitAggregateSelectionProof(ctx context.Context, in *ethpb.AggregateSelectionRequest, _ primitives.ValidatorIndex, _ uint64) (*ethpb.AggregateSelectionResponse, error) {
return c.getClient().SubmitAggregateSelectionProof(ctx, in)
return c.beaconNodeValidatorClient.SubmitAggregateSelectionProof(ctx, in)
}
func (c *grpcValidatorClient) SubmitAggregateSelectionProofElectra(ctx context.Context, in *ethpb.AggregateSelectionRequest, _ primitives.ValidatorIndex, _ uint64) (*ethpb.AggregateSelectionElectraResponse, error) {
return c.getClient().SubmitAggregateSelectionProofElectra(ctx, in)
return c.beaconNodeValidatorClient.SubmitAggregateSelectionProofElectra(ctx, in)
}
func (c *grpcValidatorClient) SubmitSignedAggregateSelectionProof(ctx context.Context, in *ethpb.SignedAggregateSubmitRequest) (*ethpb.SignedAggregateSubmitResponse, error) {
return c.getClient().SubmitSignedAggregateSelectionProof(ctx, in)
return c.beaconNodeValidatorClient.SubmitSignedAggregateSelectionProof(ctx, in)
}
func (c *grpcValidatorClient) SubmitSignedAggregateSelectionProofElectra(ctx context.Context, in *ethpb.SignedAggregateSubmitElectraRequest) (*ethpb.SignedAggregateSubmitResponse, error) {
return c.getClient().SubmitSignedAggregateSelectionProofElectra(ctx, in)
return c.beaconNodeValidatorClient.SubmitSignedAggregateSelectionProofElectra(ctx, in)
}
func (c *grpcValidatorClient) SubmitSignedContributionAndProof(ctx context.Context, in *ethpb.SignedContributionAndProof) (*empty.Empty, error) {
return c.getClient().SubmitSignedContributionAndProof(ctx, in)
return c.beaconNodeValidatorClient.SubmitSignedContributionAndProof(ctx, in)
}
func (c *grpcValidatorClient) SubmitSyncMessage(ctx context.Context, in *ethpb.SyncCommitteeMessage) (*empty.Empty, error) {
return c.getClient().SubmitSyncMessage(ctx, in)
return c.beaconNodeValidatorClient.SubmitSyncMessage(ctx, in)
}
func (c *grpcValidatorClient) SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error) {
return c.getClient().SubmitValidatorRegistrations(ctx, in)
return c.beaconNodeValidatorClient.SubmitValidatorRegistrations(ctx, in)
}
func (c *grpcValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, _ []*ethpb.ValidatorDuty) (*empty.Empty, error) {
return c.getClient().SubscribeCommitteeSubnets(ctx, in)
return c.beaconNodeValidatorClient.SubscribeCommitteeSubnets(ctx, in)
}
func (c *grpcValidatorClient) ValidatorIndex(ctx context.Context, in *ethpb.ValidatorIndexRequest) (*ethpb.ValidatorIndexResponse, error) {
return c.getClient().ValidatorIndex(ctx, in)
return c.beaconNodeValidatorClient.ValidatorIndex(ctx, in)
}
func (c *grpcValidatorClient) ValidatorStatus(ctx context.Context, in *ethpb.ValidatorStatusRequest) (*ethpb.ValidatorStatusResponse, error) {
return c.getClient().ValidatorStatus(ctx, in)
return c.beaconNodeValidatorClient.ValidatorStatus(ctx, in)
}
// Deprecated: Do not use.
func (c *grpcValidatorClient) WaitForChainStart(ctx context.Context, in *empty.Empty) (*ethpb.ChainStartResponse, error) {
stream, err := c.getClient().WaitForChainStart(ctx, in)
stream, err := c.beaconNodeValidatorClient.WaitForChainStart(ctx, in)
if err != nil {
return nil, errors.Wrap(
client.ErrConnectionIssue,
@@ -260,13 +260,13 @@ func (c *grpcValidatorClient) WaitForChainStart(ctx context.Context, in *empty.E
}
func (c *grpcValidatorClient) AssignValidatorToSubnet(ctx context.Context, in *ethpb.AssignValidatorToSubnetRequest) (*empty.Empty, error) {
return c.getClient().AssignValidatorToSubnet(ctx, in)
return c.beaconNodeValidatorClient.AssignValidatorToSubnet(ctx, in)
}
func (c *grpcValidatorClient) AggregatedSigAndAggregationBits(
ctx context.Context,
in *ethpb.AggregatedSigAndAggregationBitsRequest,
) (*ethpb.AggregatedSigAndAggregationBitsResponse, error) {
return c.getClient().AggregatedSigAndAggregationBits(ctx, in)
return c.beaconNodeValidatorClient.AggregatedSigAndAggregationBits(ctx, in)
}
func (*grpcValidatorClient) AggregatedSelections(context.Context, []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) {
@@ -277,12 +277,8 @@ func (*grpcValidatorClient) AggregatedSyncSelections(context.Context, []iface.Sy
return nil, iface.ErrNotSupported
}
// NewGrpcValidatorClient creates a new gRPC validator client that supports
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
func NewGrpcValidatorClient(conn validatorHelpers.NodeConnection) iface.ValidatorClient {
return &grpcValidatorClient{
grpcClientManager: newGrpcClientManager(conn, ethpb.NewBeaconNodeValidatorClient),
}
func NewGrpcValidatorClient(cc grpc.ClientConnInterface) iface.ValidatorClient {
return &grpcValidatorClient{ethpb.NewBeaconNodeValidatorClient(cc), false}
}
func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []string, eventsChannel chan<- *eventClient.Event) {
@@ -312,7 +308,7 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
log.Warn("gRPC only supports the head topic, other topics will be ignored")
}
stream, err := c.getClient().StreamSlots(ctx, &ethpb.StreamSlotsRequest{VerifiedOnly: true})
stream, err := c.beaconNodeValidatorClient.StreamSlots(ctx, &ethpb.StreamSlotsRequest{VerifiedOnly: true})
if err != nil {
eventsChannel <- &eventClient.Event{
EventType: eventClient.EventConnectionError,
@@ -378,20 +374,11 @@ func (c *grpcValidatorClient) EventStreamIsRunning() bool {
return c.isEventStreamRunning
}
func (c *grpcValidatorClient) Host() string {
return c.grpcClientManager.conn.GetGrpcConnectionProvider().CurrentHost()
func (*grpcValidatorClient) Host() string {
log.Warn(iface.ErrNotSupported)
return ""
}
func (c *grpcValidatorClient) SwitchHost(host string) {
provider := c.grpcClientManager.conn.GetGrpcConnectionProvider()
// Find the index of the requested host and switch to it
for i, h := range provider.Hosts() {
if h == host {
if err := provider.SwitchHost(i); err != nil {
log.WithError(err).WithField("host", host).Error("Failed to set gRPC host")
}
return
}
}
log.WithField("host", host).Warn("Requested gRPC host not found in configured endpoints")
func (*grpcValidatorClient) SetHost(_ string) {
log.Warn(iface.ErrNotSupported)
}

View File

@@ -14,10 +14,8 @@ import (
"github.com/OffchainLabs/prysm/v7/testing/assert"
mock2 "github.com/OffchainLabs/prysm/v7/testing/mock"
"github.com/OffchainLabs/prysm/v7/testing/require"
validatorTesting "github.com/OffchainLabs/prysm/v7/validator/testing"
logTest "github.com/sirupsen/logrus/hooks/test"
"go.uber.org/mock/gomock"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)
@@ -135,15 +133,7 @@ func TestWaitForChainStart_StreamSetupFails(t *testing.T) {
gomock.Any(),
).Return(nil, errors.New("failed stream"))
validatorClient := &grpcValidatorClient{
grpcClientManager: newGrpcClientManager(
validatorTesting.MockNodeConnection(),
func(_ grpc.ClientConnInterface) eth.BeaconNodeValidatorClient {
return beaconNodeValidatorClient
},
),
isEventStreamRunning: true,
}
validatorClient := &grpcValidatorClient{beaconNodeValidatorClient, true}
_, err := validatorClient.WaitForChainStart(t.Context(), &emptypb.Empty{})
want := "could not setup beacon chain ChainStart streaming client"
assert.ErrorContains(t, want, err)
@@ -156,15 +146,7 @@ func TestStartEventStream(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
beaconNodeValidatorClient := mock2.NewMockBeaconNodeValidatorClient(ctrl)
grpcClient := &grpcValidatorClient{
grpcClientManager: newGrpcClientManager(
validatorTesting.MockNodeConnection(),
func(_ grpc.ClientConnInterface) eth.BeaconNodeValidatorClient {
return beaconNodeValidatorClient
},
),
isEventStreamRunning: true,
}
grpcClient := &grpcValidatorClient{beaconNodeValidatorClient, true}
tests := []struct {
name string
topics []string

View File

@@ -152,5 +152,5 @@ type ValidatorClient interface {
AggregatedSelections(ctx context.Context, selections []BeaconCommitteeSelection) ([]BeaconCommitteeSelection, error)
AggregatedSyncSelections(ctx context.Context, selections []SyncCommitteeSelection) ([]SyncCommitteeSelection, error)
Host() string
SwitchHost(host string)
SetHost(host string)
}

View File

@@ -0,0 +1,53 @@
package client
import (
"strings"
"google.golang.org/grpc/resolver"
)
// Modification of a default grpc passthrough resolver (google.golang.org/grpc/resolver/passthrough) allowing to use multiple addresses
// in grpc endpoint. Example:
// conn, err := grpc.DialContext(ctx, "127.0.0.1:4000,127.0.0.1:4001", grpc.WithInsecure(), grpc.WithResolvers(&multipleEndpointsGrpcResolverBuilder{}))
// It can be used with any grpc load balancer (pick_first, round_robin). Default is pick_first.
// Round robin can be used by adding the following option:
// grpc.WithDefaultServiceConfig("{\"loadBalancingConfig\":[{\"round_robin\":{}}]}")
type multipleEndpointsGrpcResolverBuilder struct{}
// Build creates and starts multiple endpoints resolver.
func (*multipleEndpointsGrpcResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
r := &multipleEndpointsGrpcResolver{
target: target,
cc: cc,
}
r.start()
return r, nil
}
// Scheme returns default scheme.
func (*multipleEndpointsGrpcResolverBuilder) Scheme() string {
return resolver.GetDefaultScheme()
}
type multipleEndpointsGrpcResolver struct {
target resolver.Target
cc resolver.ClientConn
}
func (r *multipleEndpointsGrpcResolver) start() {
ep := r.target.Endpoint()
endpoints := strings.Split(ep, ",")
var addrs []resolver.Address
for _, endpoint := range endpoints {
addrs = append(addrs, resolver.Address{Addr: endpoint, ServerName: endpoint})
}
if err := r.cc.UpdateState(resolver.State{Addresses: addrs}); err != nil {
log.WithError(err).Error("Failed to update grpc connection state")
}
}
// ResolveNow --
func (*multipleEndpointsGrpcResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
// Close --
func (*multipleEndpointsGrpcResolver) Close() {}

View File

@@ -8,10 +8,11 @@ import (
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
)
func NewNodeClient(validatorConn validatorHelpers.NodeConnection) iface.NodeClient {
grpcClient := grpcApi.NewNodeClient(validatorConn)
func NewNodeClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.NodeClient {
grpcClient := grpcApi.NewNodeClient(validatorConn.GetGrpcClientConn())
if features.Get().EnableBeaconRESTApi {
return beaconApi.NewNodeClientWithFallback(validatorConn.GetRestHandler(), grpcClient)
return beaconApi.NewNodeClientWithFallback(jsonRestHandler, grpcClient)
} else {
return grpcClient
}
return grpcClient
}

View File

@@ -2,11 +2,13 @@ package client
import (
"context"
"net/http"
"strings"
"time"
api "github.com/OffchainLabs/prysm/v7/api/client"
eventClient "github.com/OffchainLabs/prysm/v7/api/client/event"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/async/event"
lruwrpr "github.com/OffchainLabs/prysm/v7/cache/lru"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
@@ -15,6 +17,7 @@ import (
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/validator/accounts/wallet"
beaconApi "github.com/OffchainLabs/prysm/v7/validator/client/beacon-api"
beaconChainClientFactory "github.com/OffchainLabs/prysm/v7/validator/client/beacon-chain-client-factory"
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
nodeclientfactory "github.com/OffchainLabs/prysm/v7/validator/client/node-client-factory"
@@ -32,6 +35,7 @@ import (
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/proto"
@@ -68,7 +72,6 @@ type Config struct {
DB db.Database
Wallet *wallet.Wallet
WalletInitializedFeed *event.Feed
Conn validatorHelpers.NodeConnection // Optional: pre-built connection (if nil, built from endpoint configs)
MaxHealthChecks int
GRPCMaxCallRecvMsgSize int
GRPCRetries uint
@@ -119,12 +122,6 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e
maxHealthChecks: cfg.MaxHealthChecks,
}
// Use pre-built connection if provided
if cfg.Conn != nil {
s.conn = cfg.Conn
return s, nil
}
dialOpts := ConstructDialOptions(
cfg.GRPCMaxCallRecvMsgSize,
cfg.BeaconNodeCert,
@@ -137,21 +134,19 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e
s.ctx = grpcutil.AppendHeaders(ctx, cfg.GRPCHeaders)
conn, err := validatorHelpers.NewNodeConnection(
validatorHelpers.WithGRPC(s.ctx, cfg.BeaconNodeGRPCEndpoint, dialOpts),
validatorHelpers.WithREST(cfg.BeaconApiEndpoint,
rest.WithHttpHeaders(cfg.BeaconApiHeaders),
rest.WithHttpTimeout(cfg.BeaconApiTimeout),
rest.WithTracing(),
),
)
grpcConn, err := grpc.DialContext(ctx, cfg.BeaconNodeGRPCEndpoint, dialOpts...)
if err != nil {
return s, err
}
if cfg.BeaconNodeCert != "" && cfg.BeaconNodeGRPCEndpoint != "" {
if cfg.BeaconNodeCert != "" {
log.Info("Established secure gRPC connection")
}
s.conn = conn
s.conn = validatorHelpers.NewNodeConnection(
grpcConn,
cfg.BeaconApiEndpoint,
validatorHelpers.WithBeaconApiHeaders(cfg.BeaconApiHeaders),
validatorHelpers.WithBeaconApiTimeout(cfg.BeaconApiTimeout),
)
return s, nil
}
@@ -186,13 +181,20 @@ func (v *ValidatorService) Start() {
return
}
restProvider := v.conn.GetRestConnectionProvider()
if restProvider == nil || len(restProvider.Hosts()) == 0 {
log.Error("No REST API hosts provided")
u := strings.ReplaceAll(v.conn.GetBeaconApiUrl(), " ", "")
hosts := strings.Split(u, ",")
if len(hosts) == 0 {
log.WithError(err).Error("No API hosts provided")
return
}
validatorClient := validatorclientfactory.NewValidatorClient(v.conn)
headersTransport := api.NewCustomHeadersTransport(http.DefaultTransport, v.conn.GetBeaconApiHeaders())
restHandler := beaconApi.NewBeaconApiRestHandler(
http.Client{Timeout: v.conn.GetBeaconApiTimeout(), Transport: otelhttp.NewTransport(headersTransport)},
hosts[0],
)
validatorClient := validatorclientfactory.NewValidatorClient(v.conn, restHandler)
v.validator = &validator{
slotFeed: new(event.Feed),
@@ -206,12 +208,12 @@ func (v *ValidatorService) Start() {
graffiti: v.graffiti,
graffitiStruct: v.graffitiStruct,
graffitiOrderedIndex: graffitiOrderedIndex,
conn: v.conn,
beaconNodeHosts: hosts,
currentHostIndex: 0,
validatorClient: validatorClient,
chainClient: beaconChainClientFactory.NewChainClient(v.conn),
nodeClient: nodeclientfactory.NewNodeClient(v.conn),
prysmChainClient: beaconChainClientFactory.NewPrysmChainClient(v.conn),
chainClient: beaconChainClientFactory.NewChainClient(v.conn, restHandler),
nodeClient: nodeclientfactory.NewNodeClient(v.conn, restHandler),
prysmChainClient: beaconChainClientFactory.NewPrysmChainClient(v.conn, restHandler),
db: v.db,
km: nil,
web3SignerConfig: v.web3SignerConfig,
@@ -367,6 +369,7 @@ func ConstructDialOptions(
grpcprometheus.StreamClientInterceptor,
grpcretry.StreamClientInterceptor(),
),
grpc.WithResolvers(&multipleEndpointsGrpcResolverBuilder{}),
}
dialOpts = append(dialOpts, extraOpts...)

View File

@@ -33,10 +33,7 @@ func TestStop_CancelsContext(t *testing.T) {
func TestNew_Insecure(t *testing.T) {
hook := logTest.NewGlobal()
_, err := NewValidatorService(t.Context(), &Config{
BeaconNodeGRPCEndpoint: "localhost:4000",
BeaconApiEndpoint: "http://localhost:3500",
})
_, err := NewValidatorService(t.Context(), &Config{})
require.NoError(t, err)
require.LogsContain(t, hook, "You are using an insecure gRPC connection")
}
@@ -61,11 +58,7 @@ func TestStart_GrpcHeaders(t *testing.T) {
"Authorization", "this is a valid value",
},
} {
cfg := &Config{
BeaconNodeGRPCEndpoint: "localhost:4000",
BeaconApiEndpoint: "http://localhost:3500",
GRPCHeaders: strings.Split(input, ","),
}
cfg := &Config{GRPCHeaders: strings.Split(input, ",")}
validatorService, err := NewValidatorService(ctx, cfg)
require.NoError(t, err)
md, _ := metadata.FromOutgoingContext(validatorService.ctx)

View File

@@ -10,10 +10,12 @@ import (
func NewValidatorClient(
validatorConn validatorHelpers.NodeConnection,
jsonRestHandler beaconApi.RestHandler,
opt ...beaconApi.ValidatorClientOpt,
) iface.ValidatorClient {
if features.Get().EnableBeaconRESTApi {
return beaconApi.NewBeaconApiValidatorClient(validatorConn.GetRestHandler(), opt...)
return beaconApi.NewBeaconApiValidatorClient(jsonRestHandler, opt...)
} else {
return grpcApi.NewGrpcValidatorClient(validatorConn.GetGrpcClientConn())
}
return grpcApi.NewGrpcValidatorClient(validatorConn)
}

View File

@@ -38,7 +38,6 @@ import (
"github.com/OffchainLabs/prysm/v7/validator/db"
dbCommon "github.com/OffchainLabs/prysm/v7/validator/db/common"
"github.com/OffchainLabs/prysm/v7/validator/graffiti"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"github.com/OffchainLabs/prysm/v7/validator/keymanager"
"github.com/OffchainLabs/prysm/v7/validator/keymanager/local"
remoteweb3signer "github.com/OffchainLabs/prysm/v7/validator/keymanager/remote-web3signer"
@@ -102,9 +101,9 @@ type validator struct {
pubkeyToStatus map[[fieldparams.BLSPubkeyLength]byte]*validatorStatus
wallet *wallet.Wallet
walletInitializedChan chan *wallet.Wallet
currentHostIndex uint64
walletInitializedFeed *event.Feed
graffitiOrderedIndex uint64
conn validatorHelpers.NodeConnection
submittedAtts map[submittedAttKey]*submittedAtt
validatorsRegBatchSize int
validatorClient iface.ValidatorClient
@@ -115,7 +114,7 @@ type validator struct {
km keymanager.IKeymanager
accountChangedSub event.Subscription
ticker slots.Ticker
currentHostIndex uint64
beaconNodeHosts []string
genesisTime time.Time
graffiti []byte
voteStats voteStats
@@ -1312,64 +1311,34 @@ func (v *validator) Host() string {
}
func (v *validator) changeHost() {
hosts := v.hosts()
if len(hosts) <= 1 {
return
}
next := (v.currentHostIndex + 1) % uint64(len(hosts))
next := (v.currentHostIndex + 1) % uint64(len(v.beaconNodeHosts))
log.WithFields(logrus.Fields{
"currentHost": hosts[v.currentHostIndex],
"nextHost": hosts[next],
"currentHost": v.beaconNodeHosts[v.currentHostIndex],
"nextHost": v.beaconNodeHosts[next],
}).Warn("Beacon node is not responding, switching host")
v.validatorClient.SwitchHost(hosts[next])
v.validatorClient.SetHost(v.beaconNodeHosts[next])
v.currentHostIndex = next
}
// hosts returns the list of configured beacon node hosts.
func (v *validator) hosts() []string {
if features.Get().EnableBeaconRESTApi {
return v.conn.GetRestConnectionProvider().Hosts()
}
return v.conn.GetGrpcConnectionProvider().Hosts()
}
// numHosts returns the number of configured beacon node hosts.
func (v *validator) numHosts() int {
return len(v.hosts())
}
func (v *validator) FindHealthyHost(ctx context.Context) bool {
numHosts := v.numHosts()
startingHost := v.Host()
attemptedHosts := []string{}
// Check all hosts for a fully synced node
for i := range numHosts {
if v.nodeClient.IsReady(ctx) {
if len(attemptedHosts) > 0 {
log.WithFields(logrus.Fields{
"previousHost": startingHost,
"newHost": v.Host(),
"failedAttempts": attemptedHosts,
}).Info("Failover succeeded: connected to healthy beacon node")
}
// Tail-recursive closure keeps retry count private.
var check func(remaining int) bool
check = func(remaining int) bool {
if v.nodeClient.IsReady(ctx) { // ready → done
return true
}
log.WithField("host", v.Host()).Debug("Beacon node not fully synced")
attemptedHosts = append(attemptedHosts, v.Host())
// Try next host if not the last iteration
if i < numHosts-1 {
v.changeHost()
if len(v.beaconNodeHosts) == 1 && features.Get().EnableBeaconRESTApi {
log.WithField("host", v.Host()).Warn("Beacon node is not responding, no backup node configured")
return false
}
if remaining == 0 || !features.Get().EnableBeaconRESTApi {
return false // exhausted or REST disabled
}
v.changeHost()
return check(remaining - 1) // recurse
}
if numHosts == 1 {
log.WithField("host", v.Host()).Warn("Beacon node is not fully synced, no backup node configured")
} else {
log.Warn("No fully synced beacon node found")
}
return false
return check(len(v.beaconNodeHosts))
}
func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) {

View File

@@ -16,8 +16,6 @@ import (
"testing"
"time"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/async/event"
"github.com/OffchainLabs/prysm/v7/cmd/validator/flags"
@@ -39,7 +37,6 @@ import (
"github.com/OffchainLabs/prysm/v7/validator/accounts/wallet"
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
dbTest "github.com/OffchainLabs/prysm/v7/validator/db/testing"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"github.com/OffchainLabs/prysm/v7/validator/keymanager"
"github.com/OffchainLabs/prysm/v7/validator/keymanager/local"
remoteweb3signer "github.com/OffchainLabs/prysm/v7/validator/keymanager/remote-web3signer"
@@ -2795,27 +2792,18 @@ func TestValidator_Host(t *testing.T) {
}
func TestValidator_ChangeHost(t *testing.T) {
// Enable REST API mode for this test since changeHost only calls SwitchHost in REST API mode
resetCfg := features.InitWithReset(&features.Flags{EnableBeaconRESTApi: true})
defer resetCfg()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
hosts := []string{"http://localhost:8080", "http://localhost:8081"}
restProvider := &rest.MockRestProvider{MockHosts: hosts}
conn, err := validatorHelpers.NewNodeConnection(validatorHelpers.WithRestProvider(restProvider))
require.NoError(t, err)
client := validatormock.NewMockValidatorClient(ctrl)
v := validator{
validatorClient: client,
conn: conn,
beaconNodeHosts: []string{"http://localhost:8080", "http://localhost:8081"},
currentHostIndex: 0,
}
client.EXPECT().SwitchHost(hosts[1])
client.EXPECT().SwitchHost(hosts[0])
client.EXPECT().SetHost(v.beaconNodeHosts[1])
client.EXPECT().SetHost(v.beaconNodeHosts[0])
v.changeHost()
assert.Equal(t, uint64(1), v.currentHostIndex)
v.changeHost()
@@ -2850,16 +2838,12 @@ func TestUpdateValidatorStatusCache(t *testing.T) {
gomock.Any(),
gomock.Any()).Return(mockResponse, nil)
mockProvider := &grpcutil.MockGrpcProvider{MockHosts: []string{"localhost:4000", "localhost:4001"}}
conn, err := validatorHelpers.NewNodeConnection(validatorHelpers.WithGRPCProvider(mockProvider))
require.NoError(t, err)
v := &validator{
validatorClient: client,
conn: conn,
beaconNodeHosts: []string{"http://localhost:8080", "http://localhost:8081"},
currentHostIndex: 0,
pubkeyToStatus: map[[fieldparams.BLSPubkeyLength]byte]*validatorStatus{
[fieldparams.BLSPubkeyLength]byte{0x03}: { // add non existent key and status to cache, should be fully removed on update
[fieldparams.BLSPubkeyLength]byte{0x03}: &validatorStatus{ // add non existent key and status to cache, should be fully removed on update
publicKey: []byte{0x03},
status: &ethpb.ValidatorStatusResponse{
Status: ethpb.ValidatorStatus_ACTIVE,
@@ -2869,7 +2853,7 @@ func TestUpdateValidatorStatusCache(t *testing.T) {
},
}
err = v.updateValidatorStatusCache(ctx, pubkeys)
err := v.updateValidatorStatusCache(ctx, pubkeys)
assert.NoError(t, err)
// make sure the nonexistent key is fully removed

View File

@@ -10,8 +10,6 @@ go_library(
importpath = "github.com/OffchainLabs/prysm/v7/validator/helpers",
visibility = ["//visibility:public"],
deps = [
"//api/grpc:go_default_library",
"//api/rest:go_default_library",
"//config/fieldparams:go_default_library",
"//consensus-types/primitives:go_default_library",
"//validator/db/iface:go_default_library",
@@ -26,23 +24,18 @@ go_test(
srcs = [
"converts_test.go",
"metadata_test.go",
"node_connection_test.go",
],
embed = [":go_default_library"],
deps = [
"//api/grpc:go_default_library",
"//api/rest:go_default_library",
"//config/fieldparams:go_default_library",
"//config/proposer:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//validator/db/common:go_default_library",
"//validator/db/iface:go_default_library",
"//validator/slashing-protection-history/format:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@org_golang_google_grpc//:go_default_library",
],
)

View File

@@ -1,120 +1,78 @@
package helpers
import (
"context"
"time"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
// NodeConnection provides access to both gRPC and REST API connections to a beacon node.
// Use an interface with a private dummy function to force all other packages to call NewNodeConnection
type NodeConnection interface {
// GetGrpcClientConn returns the current gRPC client connection.
// Returns nil if no gRPC provider is configured.
GetGrpcClientConn() *grpc.ClientConn
// GetGrpcConnectionProvider returns the gRPC connection provider.
GetGrpcConnectionProvider() grpcutil.GrpcConnectionProvider
// GetRestConnectionProvider returns the REST connection provider.
GetRestConnectionProvider() rest.RestConnectionProvider
// GetRestHandler returns the REST handler for making API requests.
// Returns nil if no REST provider is configured.
GetRestHandler() rest.RestHandler
GetBeaconApiUrl() string
GetBeaconApiHeaders() map[string][]string
setBeaconApiHeaders(map[string][]string)
GetBeaconApiTimeout() time.Duration
setBeaconApiTimeout(time.Duration)
dummy()
}
type nodeConnection struct {
grpcConnectionProvider grpcutil.GrpcConnectionProvider
restConnectionProvider rest.RestConnectionProvider
grpcClientConn *grpc.ClientConn
beaconApiUrl string
beaconApiHeaders map[string][]string
beaconApiTimeout time.Duration
}
// NodeConnectionOption is a functional option for configuring the node connection.
type NodeConnectionOption func(nc NodeConnection)
// WithBeaconApiHeaders sets the HTTP headers that should be sent to the server along with each request.
func WithBeaconApiHeaders(headers map[string][]string) NodeConnectionOption {
return func(nc NodeConnection) {
nc.setBeaconApiHeaders(headers)
}
}
// WithBeaconApiTimeout sets the HTTP request timeout.
func WithBeaconApiTimeout(timeout time.Duration) NodeConnectionOption {
return func(nc NodeConnection) {
nc.setBeaconApiTimeout(timeout)
}
}
func (c *nodeConnection) GetGrpcClientConn() *grpc.ClientConn {
if c.grpcConnectionProvider == nil {
return nil
}
return c.grpcConnectionProvider.CurrentConn()
return c.grpcClientConn
}
func (c *nodeConnection) GetGrpcConnectionProvider() grpcutil.GrpcConnectionProvider {
return c.grpcConnectionProvider
func (c *nodeConnection) GetBeaconApiUrl() string {
return c.beaconApiUrl
}
func (c *nodeConnection) GetRestConnectionProvider() rest.RestConnectionProvider {
return c.restConnectionProvider
func (c *nodeConnection) GetBeaconApiHeaders() map[string][]string {
return c.beaconApiHeaders
}
func (c *nodeConnection) GetRestHandler() rest.RestHandler {
if c.restConnectionProvider == nil {
return nil
}
return c.restConnectionProvider.RestHandler()
func (c *nodeConnection) setBeaconApiHeaders(headers map[string][]string) {
c.beaconApiHeaders = headers
}
// NodeConnectionOption is a functional option for configuring a NodeConnection.
type NodeConnectionOption func(*nodeConnection) error
// WithGRPC configures a gRPC connection provider for the NodeConnection.
// If endpoint is empty, this option is a no-op.
func WithGRPC(ctx context.Context, endpoint string, dialOpts []grpc.DialOption) NodeConnectionOption {
return func(c *nodeConnection) error {
if endpoint == "" {
return nil
}
provider, err := grpcutil.NewGrpcConnectionProvider(ctx, endpoint, dialOpts)
if err != nil {
return errors.Wrap(err, "failed to create gRPC connection provider")
}
c.grpcConnectionProvider = provider
return nil
}
func (c *nodeConnection) GetBeaconApiTimeout() time.Duration {
return c.beaconApiTimeout
}
// WithREST configures a REST connection provider for the NodeConnection.
// If endpoint is empty, this option is a no-op.
func WithREST(endpoint string, opts ...rest.RestConnectionProviderOption) NodeConnectionOption {
return func(c *nodeConnection) error {
if endpoint == "" {
return nil
}
provider, err := rest.NewRestConnectionProvider(endpoint, opts...)
if err != nil {
return errors.Wrap(err, "failed to create REST connection provider")
}
c.restConnectionProvider = provider
return nil
}
func (c *nodeConnection) setBeaconApiTimeout(timeout time.Duration) {
c.beaconApiTimeout = timeout
}
// WithGRPCProvider sets a pre-built gRPC connection provider.
func WithGRPCProvider(provider grpcutil.GrpcConnectionProvider) NodeConnectionOption {
return func(c *nodeConnection) error {
c.grpcConnectionProvider = provider
return nil
}
}
func (*nodeConnection) dummy() {}
// WithRestProvider sets a pre-built REST connection provider.
func WithRestProvider(provider rest.RestConnectionProvider) NodeConnectionOption {
return func(c *nodeConnection) error {
c.restConnectionProvider = provider
return nil
}
}
// NewNodeConnection creates a new NodeConnection with the given options.
// At least one provider (gRPC or REST) must be configured via options.
// Returns an error if no providers are configured.
func NewNodeConnection(opts ...NodeConnectionOption) (NodeConnection, error) {
c := &nodeConnection{}
func NewNodeConnection(grpcConn *grpc.ClientConn, beaconApiUrl string, opts ...NodeConnectionOption) NodeConnection {
conn := &nodeConnection{}
conn.grpcClientConn = grpcConn
conn.beaconApiUrl = beaconApiUrl
for _, opt := range opts {
if err := opt(c); err != nil {
return nil, err
}
opt(conn)
}
if c.grpcConnectionProvider == nil && c.restConnectionProvider == nil {
return nil, errors.New("at least one beacon node endpoint must be provided (--beacon-rpc-provider or --beacon-rest-api-provider)")
}
return c, nil
return conn
}

View File

@@ -1,101 +0,0 @@
package helpers
import (
"context"
"testing"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"google.golang.org/grpc"
)
func TestNewNodeConnection(t *testing.T) {
t.Run("with both providers", func(t *testing.T) {
grpcProvider := &grpcutil.MockGrpcProvider{MockHosts: []string{"localhost:4000"}}
restProvider := &rest.MockRestProvider{MockHosts: []string{"http://localhost:3500"}}
conn, err := NewNodeConnection(
WithGRPCProvider(grpcProvider),
WithRestProvider(restProvider),
)
require.NoError(t, err)
assert.Equal(t, grpcProvider, conn.GetGrpcConnectionProvider())
assert.Equal(t, restProvider, conn.GetRestConnectionProvider())
})
t.Run("with only rest provider", func(t *testing.T) {
restProvider := &rest.MockRestProvider{MockHosts: []string{"http://localhost:3500"}}
conn, err := NewNodeConnection(WithRestProvider(restProvider))
require.NoError(t, err)
assert.Equal(t, (grpcutil.GrpcConnectionProvider)(nil), conn.GetGrpcConnectionProvider())
assert.Equal(t, (*grpc.ClientConn)(nil), conn.GetGrpcClientConn())
assert.Equal(t, restProvider, conn.GetRestConnectionProvider())
})
t.Run("with only grpc provider", func(t *testing.T) {
grpcProvider := &grpcutil.MockGrpcProvider{MockHosts: []string{"localhost:4000"}}
conn, err := NewNodeConnection(WithGRPCProvider(grpcProvider))
require.NoError(t, err)
assert.Equal(t, grpcProvider, conn.GetGrpcConnectionProvider())
assert.Equal(t, (rest.RestConnectionProvider)(nil), conn.GetRestConnectionProvider())
assert.Equal(t, (rest.RestHandler)(nil), conn.GetRestHandler())
})
t.Run("with no providers returns error", func(t *testing.T) {
conn, err := NewNodeConnection()
require.ErrorContains(t, "at least one beacon node endpoint must be provided", err)
assert.Equal(t, (NodeConnection)(nil), conn)
})
t.Run("with empty endpoints is no-op", func(t *testing.T) {
// Empty endpoints should be skipped, resulting in no providers
conn, err := NewNodeConnection(
WithGRPC(context.Background(), "", nil),
WithREST(""),
)
require.ErrorContains(t, "at least one beacon node endpoint must be provided", err)
assert.Equal(t, (NodeConnection)(nil), conn)
})
}
func TestNodeConnection_GetGrpcClientConn(t *testing.T) {
t.Run("delegates to provider", func(t *testing.T) {
// We can't easily create a real grpc.ClientConn in tests,
// but we can verify the delegation works with nil
grpcProvider := &grpcutil.MockGrpcProvider{MockConn: nil, MockHosts: []string{"localhost:4000"}}
conn, err := NewNodeConnection(WithGRPCProvider(grpcProvider))
require.NoError(t, err)
// Should delegate to provider.CurrentConn()
assert.Equal(t, grpcProvider.CurrentConn(), conn.GetGrpcClientConn())
})
t.Run("returns nil when provider is nil", func(t *testing.T) {
restProvider := &rest.MockRestProvider{MockHosts: []string{"http://localhost:3500"}}
conn, err := NewNodeConnection(WithRestProvider(restProvider))
require.NoError(t, err)
assert.Equal(t, (*grpc.ClientConn)(nil), conn.GetGrpcClientConn())
})
}
func TestNodeConnection_GetRestHandler(t *testing.T) {
t.Run("delegates to provider", func(t *testing.T) {
mockHandler := &rest.MockRestHandler{}
restProvider := &rest.MockRestProvider{MockHandler: mockHandler, MockHosts: []string{"http://localhost:3500"}}
conn, err := NewNodeConnection(WithRestProvider(restProvider))
require.NoError(t, err)
assert.Equal(t, mockHandler, conn.GetRestHandler())
})
t.Run("returns nil when provider is nil", func(t *testing.T) {
grpcProvider := &grpcutil.MockGrpcProvider{MockHosts: []string{"localhost:4000"}}
conn, err := NewNodeConnection(WithGRPCProvider(grpcProvider))
require.NoError(t, err)
assert.Equal(t, (rest.RestHandler)(nil), conn.GetRestHandler())
})
}

View File

@@ -41,8 +41,6 @@ func TestNode_Builds(t *testing.T) {
set.String("wallet-password-file", passwordFile, "path to wallet password")
set.String("keymanager-kind", "imported", "keymanager kind")
set.String("verbosity", "debug", "log verbosity")
set.String("beacon-rpc-provider", "localhost:4000", "beacon node RPC endpoint")
set.String("beacon-rest-api-provider", "http://localhost:3500", "beacon node REST API endpoint")
require.NoError(t, set.Set(flags.WalletPasswordFileFlag.Name, passwordFile))
ctx := cli.NewContext(&app, set, nil)
opts := []accounts.Option{

View File

@@ -23,9 +23,9 @@ go_library(
],
deps = [
"//api:go_default_library",
"//api/client:go_default_library",
"//api/grpc:go_default_library",
"//api/pagination:go_default_library",
"//api/rest:go_default_library",
"//api/server:go_default_library",
"//api/server/httprest:go_default_library",
"//api/server/middleware:go_default_library",
@@ -55,6 +55,7 @@ go_library(
"//validator/accounts/petnames:go_default_library",
"//validator/accounts/wallet:go_default_library",
"//validator/client:go_default_library",
"//validator/client/beacon-api:go_default_library",
"//validator/client/beacon-chain-client-factory:go_default_library",
"//validator/client/iface:go_default_library",
"//validator/client/node-client-factory:go_default_library",
@@ -78,6 +79,7 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_wealdtech_go_eth2_wallet_encryptor_keystorev4//:go_default_library",
"@io_opentelemetry_go_contrib_instrumentation_net_http_otelhttp//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
@@ -104,7 +106,6 @@ go_test(
embed = [":go_default_library"],
deps = [
"//api:go_default_library",
"//api/grpc:go_default_library",
"//async/event:go_default_library",
"//cmd/validator/flags:go_default_library",
"//config/features:go_default_library",

View File

@@ -1,10 +1,13 @@
package rpc
import (
"net/http"
api "github.com/OffchainLabs/prysm/v7/api/client"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/api/rest"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/validator/client"
beaconApi "github.com/OffchainLabs/prysm/v7/validator/client/beacon-api"
beaconChainClientFactory "github.com/OffchainLabs/prysm/v7/validator/client/beacon-chain-client-factory"
nodeClientFactory "github.com/OffchainLabs/prysm/v7/validator/client/node-client-factory"
validatorClientFactory "github.com/OffchainLabs/prysm/v7/validator/client/validator-client-factory"
@@ -14,6 +17,7 @@ import (
grpcopentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"google.golang.org/grpc"
)
@@ -37,26 +41,30 @@ func (s *Server) registerBeaconClient() error {
s.ctx = grpcutil.AppendHeaders(s.ctx, s.grpcHeaders)
conn, err := validatorHelpers.NewNodeConnection(
validatorHelpers.WithGRPC(s.ctx, s.beaconNodeEndpoint, dialOpts),
validatorHelpers.WithREST(s.beaconApiEndpoint,
rest.WithHttpHeaders(s.beaconApiHeaders),
rest.WithHttpTimeout(s.beaconApiTimeout),
rest.WithTracing(),
),
)
grpcConn, err := grpc.DialContext(s.ctx, s.beaconNodeEndpoint, dialOpts...)
if err != nil {
return err
return errors.Wrapf(err, "could not dial endpoint: %s", s.beaconNodeEndpoint)
}
if s.beaconNodeCert != "" && s.beaconNodeEndpoint != "" {
if s.beaconNodeCert != "" {
log.Info("Established secure gRPC connection")
}
if grpcConn := conn.GetGrpcClientConn(); grpcConn != nil {
s.healthClient = ethpb.NewHealthClient(grpcConn)
}
s.healthClient = ethpb.NewHealthClient(grpcConn)
s.chainClient = beaconChainClientFactory.NewChainClient(conn)
s.nodeClient = nodeClientFactory.NewNodeClient(conn)
s.beaconNodeValidatorClient = validatorClientFactory.NewValidatorClient(conn)
conn := validatorHelpers.NewNodeConnection(
grpcConn,
s.beaconApiEndpoint,
validatorHelpers.WithBeaconApiHeaders(s.beaconApiHeaders),
validatorHelpers.WithBeaconApiTimeout(s.beaconApiTimeout),
)
headersTransport := api.NewCustomHeadersTransport(http.DefaultTransport, conn.GetBeaconApiHeaders())
restHandler := beaconApi.NewBeaconApiRestHandler(
http.Client{Timeout: s.beaconApiTimeout, Transport: otelhttp.NewTransport(headersTransport)},
s.beaconApiEndpoint,
)
s.chainClient = beaconChainClientFactory.NewChainClient(conn, restHandler)
s.nodeClient = nodeClientFactory.NewNodeClient(conn, restHandler)
s.beaconNodeValidatorClient = validatorClientFactory.NewValidatorClient(conn, restHandler)
return nil
}

View File

@@ -3,17 +3,19 @@ package rpc
import (
"testing"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"google.golang.org/grpc/metadata"
)
func TestGrpcHeaders(t *testing.T) {
ctx := t.Context()
grpcHeaders := []string{"first=value1", "second=value2"}
ctx = grpcutil.AppendHeaders(ctx, grpcHeaders)
md, _ := metadata.FromOutgoingContext(ctx)
s := &Server{
ctx: t.Context(),
grpcHeaders: []string{"first=value1", "second=value2"},
}
err := s.registerBeaconClient()
require.NoError(t, err)
md, _ := metadata.FromOutgoingContext(s.ctx)
require.Equal(t, 2, md.Len(), "MetadataV0 contains wrong number of values")
assert.Equal(t, "value1", md.Get("first")[0])
assert.Equal(t, "value2", md.Get("second")[0])

View File

@@ -22,7 +22,6 @@ import (
"github.com/OffchainLabs/prysm/v7/validator/client"
"github.com/OffchainLabs/prysm/v7/validator/client/testutil"
"github.com/OffchainLabs/prysm/v7/validator/keymanager"
validatorTesting "github.com/OffchainLabs/prysm/v7/validator/testing"
"github.com/google/uuid"
"github.com/tyler-smith/go-bip39"
keystorev4 "github.com/wealdtech/go-eth2-wallet-encryptor-keystorev4"
@@ -47,7 +46,6 @@ func TestServer_CreateWallet_Local(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: validatorTesting.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -445,7 +443,6 @@ func TestServer_WalletConfig(t *testing.T) {
require.NoError(t, err)
s.wallet = w
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: validatorTesting.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,

View File

@@ -53,7 +53,6 @@ func TestServer_ListAccounts(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: constant.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -159,7 +158,6 @@ func TestServer_BackupAccounts(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: constant.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -284,7 +282,6 @@ func TestServer_VoluntaryExit(t *testing.T) {
require.NoError(t, err)
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: constant.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,

View File

@@ -52,7 +52,6 @@ func TestServer_ListKeystores(t *testing.T) {
t.Run("wallet not ready", func(t *testing.T) {
m := &testutil.FakeValidator{}
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
})
require.NoError(t, err)
@@ -82,7 +81,6 @@ func TestServer_ListKeystores(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -149,7 +147,6 @@ func TestServer_ImportKeystores(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -371,7 +368,6 @@ func TestServer_ImportKeystores_WrongKeymanagerKind(t *testing.T) {
}})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -656,7 +652,6 @@ func TestServer_DeleteKeystores_WrongKeymanagerKind(t *testing.T) {
}})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -700,7 +695,6 @@ func setupServerWithWallet(t testing.TB) *Server {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -736,7 +730,6 @@ func TestServer_SetVoluntaryExit(t *testing.T) {
m := &testutil.FakeValidator{Km: km}
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
})
require.NoError(t, err)
@@ -960,7 +953,6 @@ func TestServer_GetGasLimit(t *testing.T) {
err := m.SetProposerSettings(ctx, tt.args)
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
})
require.NoError(t, err)
@@ -1119,7 +1111,6 @@ func TestServer_SetGasLimit(t *testing.T) {
require.NoError(t, err)
validatorDB := dbtest.SetupDB(t, t.TempDir(), [][fieldparams.BLSPubkeyLength]byte{}, isSlashingProtectionMinimal)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
DB: validatorDB,
})
@@ -1309,7 +1300,6 @@ func TestServer_DeleteGasLimit(t *testing.T) {
require.NoError(t, err)
validatorDB := dbtest.SetupDB(t, t.TempDir(), [][fieldparams.BLSPubkeyLength]byte{}, isSlashingProtectionMinimal)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
DB: validatorDB,
})
@@ -1358,7 +1348,6 @@ func TestServer_ListRemoteKeys(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false, Web3SignerConfig: config})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -1415,7 +1404,6 @@ func TestServer_ImportRemoteKeys(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false, Web3SignerConfig: config})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -1478,7 +1466,6 @@ func TestServer_DeleteRemoteKeys(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false, Web3SignerConfig: config})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -1580,7 +1567,6 @@ func TestServer_ListFeeRecipientByPubkey(t *testing.T) {
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
})
require.NoError(t, err)
@@ -1605,7 +1591,6 @@ func TestServer_ListFeeRecipientByPubKey_NoFeeRecipientSet(t *testing.T) {
ctx := t.Context()
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: &testutil.FakeValidator{},
})
require.NoError(t, err)
@@ -1795,7 +1780,6 @@ func TestServer_FeeRecipientByPubkey(t *testing.T) {
// save a default here
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
DB: validatorDB,
})
@@ -1906,7 +1890,6 @@ func TestServer_DeleteFeeRecipientByPubkey(t *testing.T) {
require.NoError(t, err)
validatorDB := dbtest.SetupDB(t, t.TempDir(), [][fieldparams.BLSPubkeyLength]byte{}, isSlashingProtectionMinimal)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
DB: validatorDB,
})
@@ -1957,7 +1940,6 @@ func TestServer_Graffiti(t *testing.T) {
graffiti := "graffiti"
m := &testutil.FakeValidator{}
vs, err := client.NewValidatorService(t.Context(), &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
})
require.NoError(t, err)

View File

@@ -5,7 +5,6 @@ go_library(
testonly = True,
srcs = [
"constants.go",
"mock_node_connection.go",
"mock_protector.go",
"protection_history.go",
],
@@ -15,7 +14,6 @@ go_library(
"//validator:__subpackages__",
],
deps = [
"//api/grpc:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
@@ -24,7 +22,6 @@ go_library(
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//validator/db/common:go_default_library",
"//validator/helpers:go_default_library",
"//validator/slashing-protection-history/format:go_default_library",
],
)

View File

@@ -1,16 +0,0 @@
package testing
import (
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/validator/helpers"
)
// MockNodeConnection creates a minimal NodeConnection for testing.
func MockNodeConnection() helpers.NodeConnection {
conn, _ := helpers.NewNodeConnection(
helpers.WithGRPCProvider(&grpcutil.MockGrpcProvider{
MockHosts: []string{"mock:4000"},
}),
)
return conn
}