Compare commits

..

7 Commits

Author SHA1 Message Date
satushh
18024098e0 Merge branch 'develop' into close-test-host 2026-02-02 20:41:12 +05:30
satushh
04d7e29611 Merge branch 'develop' into close-test-host 2026-02-02 20:27:51 +05:30
satushh
b743cd72e2 changelog 2026-02-02 20:27:14 +05:30
james-prysm
641d90990d grpc fallback improvements (#16215)
<!-- Thanks for sending a PR! Before submitting:

1. If this is your first PR, check out our contribution guide here
https://docs.prylabs.network/docs/contribute/contribution-guidelines
You will then need to sign our Contributor License Agreement (CLA),
which will show up as a comment from a bot in this pull request after
you open it. We cannot review code without a signed CLA.
2. Please file an associated tracking issue if this pull request is
non-trivial and requires context for our team to understand. All
features and most bug fixes should have
an associated issue with a design discussed and decided upon. Small bug
   fixes and documentation improvements don't need issues.
3. New features and bug fixes must have tests. Documentation may need to
be updated. If you're unsure what to update, send the PR, and we'll
discuss
   in review.
4. Note that PRs updating dependencies and new Go versions are not
accepted.
   Please file an issue instead.
5. A changelog entry is required for user facing issues.
-->

**What type of PR is this?**

## Summary

This PR implements gRPC fallback support for the validator client,
allowing it to automatically switch between multiple beacon node
endpoints when the primary node becomes unavailable or unhealthy.

## Changes

- Added `grpcConnectionProvider` to manage multiple gRPC connections
with circular failover
- Validator automatically detects unhealthy beacon nodes and switches to
the next available endpoint
- Health checks verify both node responsiveness AND sync status before
accepting a node
- Improved logging to only show "Found fully synced beacon node" when an
actual switch occurs (reduces log noise)


I removed the old middleware that uses gRPC's built in load balancer
because:

- gRPC's pick_first load balancer doesn't provide sync-status-aware
failover
- The validator needs to ensure it connects to a fully synced node, not
just a reachable one

## Test Scenario

### Setup
Deployed a 4-node Kurtosis testnet with local validator connecting to 2
beacon nodes:

```yaml
# kurtosis-grpc-fallback-test.yaml
participants:
  - el_type: nethermind
    cl_type: prysm
    validator_count: 128  # Keeps chain advancing
  - el_type: nethermind
    cl_type: prysm
    validator_count: 64
  - el_type: nethermind
    cl_type: prysm
    validator_count: 64   # Keeps chain advancing
  - el_type: nethermind
    cl_type: prysm
    validator_count: 64   # Keeps chain advancing

network_params:
  fulu_fork_epoch: 0
  seconds_per_slot: 6
```

Local validator started with:
```bash
./validator --beacon-rpc-provider=127.0.0.1:33005,127.0.0.1:33012 ...
```

### Test 1: Primary Failover (cl-1 → cl-2)

1. Stopped cl-1 beacon node
2. Validator detected failure and switched to cl-2

**Logs:**
```
WARN  Beacon node is not responding, switching host currentHost=127.0.0.1:33005 nextHost=127.0.0.1:33012
DEBUG Trying gRPC endpoint newHost=127.0.0.1:33012 previousHost=127.0.0.1:33005
INFO  Failover succeeded: connected to healthy beacon node failedAttempts=[127.0.0.1:33005] newHost=127.0.0.1:33012 previousHost=127.0.0.1:33005
```

**Result:**  PASSED - Validator continued submitting attestations on
cl-2

### Test 2: Circular Failover (cl-2 → cl-1)

1. Restarted cl-1, stopped cl-2
2. Validator detected failure and switched back to cl-1

**Logs:**
```
WARN  Beacon node is not responding, switching host currentHost=127.0.0.1:33012 nextHost=127.0.0.1:33005
DEBUG Trying gRPC endpoint newHost=127.0.0.1:33005 previousHost=127.0.0.1:33012
INFO  Failover succeeded: connected to healthy beacon node failedAttempts=[127.0.0.1:33012] newHost=127.0.0.1:33005 previousHost=127.0.0.1:33012
```

**Result:**  PASSED - Circular fallback works correctly

## Key Log Messages

| Log Level | Message | Source |
|-----------|---------|--------|
| WARN | "Beacon node is not responding, switching host" |
`changeHost()` in validator.go |
| INFO | "Switched gRPC endpoint" | `SetHost()` in
grpc_connection_provider.go |
| INFO | "Found fully synced beacon node" | `FindHealthyHost()` in
validator.go (only on actual switch) |

## Test Plan

- [x] Verify primary failover (cl-1 → cl-2)
- [x] Verify circular failover (cl-2 → cl-1)
- [x] Verify validator continues producing attestations after switch
- [x] Verify "Found fully synced beacon node" only logs on actual switch
(not every health check)

**What does this PR do? Why is it needed?**

**Which issues(s) does this PR fix?**

Fixes # https://github.com/OffchainLabs/prysm/pull/7133


**Other notes for review**

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [x] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).

---------

Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
Co-authored-by: Radosław Kapka <rkapka@wp.pl>
Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
2026-02-02 14:51:56 +00:00
satushh
df45e4da57 Close libp2p host 2026-02-02 20:21:52 +05:30
terence
d2fc250f34 Run go fmt (#16311)
Co-authored-by: Manu NALEPA <enalepa@offchainlabs.com>
2026-02-02 14:19:15 +00:00
Jun Song
571c6f39aa Add docs for SSZ Query package (#16299)
**What type of PR is this?**

Documentation

**What does this PR do? Why is it needed?**

Although godoc and comments are well-written in `encoding/ssz/query`
package, we (@rkapka, @fernantho, @syjn99)
[agreed](https://discord.com/channels/476244492043812875/1387734369527136297/1466075406523174944)
that it would be great to have human-readable documentation.

**Which issues(s) does this PR fix?**

Part of  #15587 & #15598 

**Other notes for review**

This documentation is first drafted by Claude Code, and then has a few
rounds of self-review.

**Acknowledgements**

- [x] I have read
[CONTRIBUTING.md](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md).
- [x] I have included a uniquely named [changelog fragment
file](https://github.com/prysmaticlabs/prysm/blob/develop/CONTRIBUTING.md#maintaining-changelogmd).
- [x] I have added a description with sufficient context for reviewers
to understand this PR.
- [ ] I have tested that my changes work as expected and I added a
testing plan to the PR description (if applicable).

---------

Co-authored-by: fernantho <fernantho1@gmail.com>
Co-authored-by: Radosław Kapka <radoslaw.kapka@gmail.com>
2026-02-01 03:39:53 +00:00
90 changed files with 1909 additions and 1743 deletions

View File

@@ -3,13 +3,16 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"grpc_connection_provider.go",
"grpcutils.go",
"log.go",
"mock_grpc_provider.go",
"parameters.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/api/grpc",
visibility = ["//visibility:public"],
deps = [
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
@@ -18,12 +21,17 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["grpcutils_test.go"],
srcs = [
"grpc_connection_provider_test.go",
"grpcutils_test.go",
],
embed = [":go_default_library"],
deps = [
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//credentials/insecure:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
],
)

View File

@@ -0,0 +1,173 @@
package grpc
import (
"context"
"strings"
"sync"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
// GrpcConnectionProvider manages gRPC connections for failover support.
// It allows switching between different beacon node endpoints when the current one becomes unavailable.
// Only one connection is maintained at a time - when switching hosts, the old connection is closed.
type GrpcConnectionProvider interface {
// CurrentConn returns the currently active gRPC connection.
// The connection is created lazily on first call.
// Returns nil if the provider has been closed.
CurrentConn() *grpc.ClientConn
// CurrentHost returns the address of the currently active endpoint.
CurrentHost() string
// Hosts returns all configured endpoint addresses.
Hosts() []string
// SwitchHost switches to the endpoint at the given index.
// The new connection is created lazily on next CurrentConn() call.
SwitchHost(index int) error
// Close closes the current connection.
Close()
}
type grpcConnectionProvider struct {
// Immutable after construction - no lock needed for reads
endpoints []string
ctx context.Context
dialOpts []grpc.DialOption
// Current connection state (protected by mutex)
currentIndex uint64
conn *grpc.ClientConn
mu sync.Mutex
closed bool
}
// NewGrpcConnectionProvider creates a new connection provider that manages gRPC connections.
// The endpoint parameter can be a comma-separated list of addresses (e.g., "host1:4000,host2:4000").
// Only one connection is maintained at a time, created lazily on first use.
func NewGrpcConnectionProvider(
ctx context.Context,
endpoint string,
dialOpts []grpc.DialOption,
) (GrpcConnectionProvider, error) {
endpoints := parseEndpoints(endpoint)
if len(endpoints) == 0 {
return nil, errors.New("no gRPC endpoints provided")
}
log.WithFields(logrus.Fields{
"endpoints": endpoints,
"count": len(endpoints),
}).Info("Initialized gRPC connection provider")
return &grpcConnectionProvider{
endpoints: endpoints,
ctx: ctx,
dialOpts: dialOpts,
}, nil
}
// parseEndpoints splits a comma-separated endpoint string into individual endpoints.
func parseEndpoints(endpoint string) []string {
if endpoint == "" {
return nil
}
endpoints := make([]string, 0, 1)
for p := range strings.SplitSeq(endpoint, ",") {
if p = strings.TrimSpace(p); p != "" {
endpoints = append(endpoints, p)
}
}
return endpoints
}
func (p *grpcConnectionProvider) CurrentConn() *grpc.ClientConn {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return nil
}
// Return existing connection if available
if p.conn != nil {
return p.conn
}
// Create connection lazily
ep := p.endpoints[p.currentIndex]
conn, err := grpc.DialContext(p.ctx, ep, p.dialOpts...)
if err != nil {
log.WithError(err).WithField("endpoint", ep).Error("Failed to create gRPC connection")
return nil
}
p.conn = conn
log.WithField("endpoint", ep).Debug("Created gRPC connection")
return conn
}
func (p *grpcConnectionProvider) CurrentHost() string {
p.mu.Lock()
defer p.mu.Unlock()
return p.endpoints[p.currentIndex]
}
func (p *grpcConnectionProvider) Hosts() []string {
// Return a copy to maintain immutability
hosts := make([]string, len(p.endpoints))
copy(hosts, p.endpoints)
return hosts
}
func (p *grpcConnectionProvider) SwitchHost(index int) error {
if index < 0 || index >= len(p.endpoints) {
return errors.Errorf("invalid host index %d, must be between 0 and %d", index, len(p.endpoints)-1)
}
p.mu.Lock()
defer p.mu.Unlock()
if uint64(index) == p.currentIndex {
return nil // Already on this host
}
oldHost := p.endpoints[p.currentIndex]
oldConn := p.conn
p.conn = nil // Clear immediately - new connection created lazily
p.currentIndex = uint64(index)
// Close old connection asynchronously to avoid blocking the caller
if oldConn != nil {
go func() {
if err := oldConn.Close(); err != nil {
log.WithError(err).WithField("endpoint", oldHost).Debug("Failed to close previous connection")
}
}()
}
log.WithFields(logrus.Fields{
"previousHost": oldHost,
"newHost": p.endpoints[index],
}).Debug("Switched gRPC endpoint")
return nil
}
func (p *grpcConnectionProvider) Close() {
p.mu.Lock()
defer p.mu.Unlock()
if p.closed {
return
}
p.closed = true
if p.conn != nil {
if err := p.conn.Close(); err != nil {
log.WithError(err).WithField("endpoint", p.endpoints[p.currentIndex]).Debug("Failed to close gRPC connection")
}
p.conn = nil
}
}

View File

@@ -0,0 +1,207 @@
package grpc
import (
"context"
"net"
"reflect"
"strings"
"testing"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func TestParseEndpoints(t *testing.T) {
tests := []struct {
name string
input string
expected []string
}{
{"single endpoint", "localhost:4000", []string{"localhost:4000"}},
{"multiple endpoints", "host1:4000,host2:4000,host3:4000", []string{"host1:4000", "host2:4000", "host3:4000"}},
{"endpoints with spaces", "host1:4000, host2:4000 , host3:4000", []string{"host1:4000", "host2:4000", "host3:4000"}},
{"empty string", "", nil},
{"only commas", ",,,", []string{}},
{"trailing comma", "host1:4000,host2:4000,", []string{"host1:4000", "host2:4000"}},
{"leading comma", ",host1:4000,host2:4000", []string{"host1:4000", "host2:4000"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := parseEndpoints(tt.input)
if !reflect.DeepEqual(tt.expected, got) {
t.Errorf("parseEndpoints(%q) = %v, want %v", tt.input, got, tt.expected)
}
})
}
}
func TestNewGrpcConnectionProvider_Errors(t *testing.T) {
t.Run("no endpoints", func(t *testing.T) {
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
_, err := NewGrpcConnectionProvider(context.Background(), "", dialOpts)
require.ErrorContains(t, "no gRPC endpoints provided", err)
})
}
func TestGrpcConnectionProvider_LazyConnection(t *testing.T) {
// Start only one server but configure provider with two endpoints
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
server := grpc.NewServer()
go func() { _ = server.Serve(lis) }()
defer server.Stop()
validAddr := lis.Addr().String()
invalidAddr := "127.0.0.1:1" // Port 1 is unlikely to be listening
// Provider should succeed even though second endpoint is invalid (lazy connections)
endpoint := validAddr + "," + invalidAddr
ctx := context.Background()
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
provider, err := NewGrpcConnectionProvider(ctx, endpoint, dialOpts)
require.NoError(t, err, "Provider creation should succeed with lazy connections")
defer func() { provider.Close() }()
// First endpoint should work
conn := provider.CurrentConn()
assert.NotNil(t, conn, "First connection should be created lazily")
}
func TestGrpcConnectionProvider_SingleConnectionModel(t *testing.T) {
// Create provider with 3 endpoints
var addrs []string
var servers []*grpc.Server
for range 3 {
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
server := grpc.NewServer()
go func() { _ = server.Serve(lis) }()
addrs = append(addrs, lis.Addr().String())
servers = append(servers, server)
}
defer func() {
for _, s := range servers {
s.Stop()
}
}()
endpoint := strings.Join(addrs, ",")
ctx := context.Background()
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
provider, err := NewGrpcConnectionProvider(ctx, endpoint, dialOpts)
require.NoError(t, err)
defer func() { provider.Close() }()
// Access the internal state to verify single connection behavior
p := provider.(*grpcConnectionProvider)
// Initially no connection
p.mu.Lock()
assert.Equal(t, (*grpc.ClientConn)(nil), p.conn, "Connection should be nil before access")
p.mu.Unlock()
// Access connection - should create one
conn0 := provider.CurrentConn()
assert.NotNil(t, conn0)
p.mu.Lock()
assert.NotNil(t, p.conn, "Connection should be created after CurrentConn()")
firstConn := p.conn
p.mu.Unlock()
// Call CurrentConn again - should return same connection
conn0Again := provider.CurrentConn()
assert.Equal(t, conn0, conn0Again, "Should return same connection")
// Switch to different host - old connection should be closed, new one created lazily
require.NoError(t, provider.SwitchHost(1))
p.mu.Lock()
assert.Equal(t, (*grpc.ClientConn)(nil), p.conn, "Connection should be nil after SwitchHost (lazy)")
p.mu.Unlock()
// Get new connection
conn1 := provider.CurrentConn()
assert.NotNil(t, conn1)
assert.NotEqual(t, firstConn, conn1, "Should be a different connection after switching hosts")
}
// testProvider creates a provider with n test servers and returns cleanup function.
func testProvider(t *testing.T, n int) (GrpcConnectionProvider, []string, func()) {
var addrs []string
var cleanups []func()
for range n {
lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
server := grpc.NewServer()
go func() { _ = server.Serve(lis) }()
addrs = append(addrs, lis.Addr().String())
cleanups = append(cleanups, server.Stop)
}
endpoint := strings.Join(addrs, ",")
ctx := context.Background()
dialOpts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
provider, err := NewGrpcConnectionProvider(ctx, endpoint, dialOpts)
require.NoError(t, err)
cleanup := func() {
provider.Close()
for _, c := range cleanups {
c()
}
}
return provider, addrs, cleanup
}
func TestGrpcConnectionProvider(t *testing.T) {
provider, addrs, cleanup := testProvider(t, 3)
defer cleanup()
t.Run("initial state", func(t *testing.T) {
assert.Equal(t, 3, len(provider.Hosts()))
assert.Equal(t, addrs[0], provider.CurrentHost())
assert.NotNil(t, provider.CurrentConn())
})
t.Run("SwitchHost", func(t *testing.T) {
require.NoError(t, provider.SwitchHost(1))
assert.Equal(t, addrs[1], provider.CurrentHost())
assert.NotNil(t, provider.CurrentConn()) // New connection created lazily
require.NoError(t, provider.SwitchHost(0))
assert.Equal(t, addrs[0], provider.CurrentHost())
require.ErrorContains(t, "invalid host index", provider.SwitchHost(-1))
require.ErrorContains(t, "invalid host index", provider.SwitchHost(3))
})
t.Run("SwitchHost circular", func(t *testing.T) {
// Test round-robin style switching using SwitchHost with manual index
indices := []int{1, 2, 0, 1} // Simulate circular switching
for i, idx := range indices {
require.NoError(t, provider.SwitchHost(idx))
assert.Equal(t, addrs[idx], provider.CurrentHost(), "iteration %d", i)
}
})
t.Run("Hosts returns copy", func(t *testing.T) {
hosts := provider.Hosts()
original := hosts[0]
hosts[0] = "modified"
assert.Equal(t, original, provider.Hosts()[0])
})
}
func TestGrpcConnectionProvider_Close(t *testing.T) {
provider, _, cleanup := testProvider(t, 1)
defer cleanup()
assert.NotNil(t, provider.CurrentConn())
provider.Close()
assert.Equal(t, (*grpc.ClientConn)(nil), provider.CurrentConn())
provider.Close() // Double close is safe
}

View File

@@ -0,0 +1,20 @@
package grpc
import "google.golang.org/grpc"
// MockGrpcProvider implements GrpcConnectionProvider for testing.
type MockGrpcProvider struct {
MockConn *grpc.ClientConn
MockHosts []string
}
func (m *MockGrpcProvider) CurrentConn() *grpc.ClientConn { return m.MockConn }
func (m *MockGrpcProvider) CurrentHost() string {
if len(m.MockHosts) > 0 {
return m.MockHosts[0]
}
return ""
}
func (m *MockGrpcProvider) Hosts() []string { return m.MockHosts }
func (m *MockGrpcProvider) SwitchHost(int) error { return nil }
func (m *MockGrpcProvider) Close() {}

34
api/rest/BUILD.bazel Normal file
View File

@@ -0,0 +1,34 @@
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"log.go",
"mock_rest_provider.go",
"rest_connection_provider.go",
"rest_handler.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/api/rest",
visibility = ["//visibility:public"],
deps = [
"//api:go_default_library",
"//api/apiutil:go_default_library",
"//api/client:go_default_library",
"//config/params:go_default_library",
"//network/httputil:go_default_library",
"//runtime/version:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opentelemetry_go_contrib_instrumentation_net_http_otelhttp//:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["rest_connection_provider_test.go"],
embed = [":go_default_library"],
deps = [
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
],
)

9
api/rest/log.go Normal file
View File

@@ -0,0 +1,9 @@
// Code generated by hack/gen-logs.sh; DO NOT EDIT.
// This file is created and regenerated automatically. Anything added here might get removed.
package rest
import "github.com/sirupsen/logrus"
// The prefix for logs from this package will be the text after the last slash in the package path.
// If you wish to change this, you should add your desired name in the runtime/logging/logrus-prefixed-formatter/prefix-replacement.go file.
var log = logrus.WithField("package", "api/rest")

View File

@@ -0,0 +1,49 @@
package rest
import (
"bytes"
"context"
"net/http"
)
// MockRestProvider implements RestConnectionProvider for testing.
type MockRestProvider struct {
MockClient *http.Client
MockHandler RestHandler
MockHosts []string
HostIndex int
}
func (m *MockRestProvider) HttpClient() *http.Client { return m.MockClient }
func (m *MockRestProvider) RestHandler() RestHandler { return m.MockHandler }
func (m *MockRestProvider) CurrentHost() string {
if len(m.MockHosts) > 0 {
return m.MockHosts[m.HostIndex%len(m.MockHosts)]
}
return ""
}
func (m *MockRestProvider) Hosts() []string { return m.MockHosts }
func (m *MockRestProvider) SwitchHost(index int) error { m.HostIndex = index; return nil }
// MockRestHandler implements RestHandler for testing.
type MockRestHandler struct {
MockHost string
MockClient *http.Client
}
func (m *MockRestHandler) Get(_ context.Context, _ string, _ any) error { return nil }
func (m *MockRestHandler) GetStatusCode(_ context.Context, _ string) (int, error) {
return http.StatusOK, nil
}
func (m *MockRestHandler) GetSSZ(_ context.Context, _ string) ([]byte, http.Header, error) {
return nil, nil, nil
}
func (m *MockRestHandler) Post(_ context.Context, _ string, _ map[string]string, _ *bytes.Buffer, _ any) error {
return nil
}
func (m *MockRestHandler) PostSSZ(_ context.Context, _ string, _ map[string]string, _ *bytes.Buffer) ([]byte, http.Header, error) {
return nil, nil, nil
}
func (m *MockRestHandler) HttpClient() *http.Client { return m.MockClient }
func (m *MockRestHandler) Host() string { return m.MockHost }
func (m *MockRestHandler) SwitchHost(host string) { m.MockHost = host }

View File

@@ -0,0 +1,158 @@
package rest
import (
"net/http"
"strings"
"sync/atomic"
"time"
"github.com/OffchainLabs/prysm/v7/api/client"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
)
// RestConnectionProvider manages HTTP client configuration for REST API with failover support.
// It allows switching between different beacon node REST endpoints when the current one becomes unavailable.
type RestConnectionProvider interface {
// HttpClient returns the configured HTTP client with headers, timeout, and optional tracing.
HttpClient() *http.Client
// RestHandler returns the REST handler for making API requests.
RestHandler() RestHandler
// CurrentHost returns the current REST API endpoint URL.
CurrentHost() string
// Hosts returns all configured REST API endpoint URLs.
Hosts() []string
// SwitchHost switches to the endpoint at the given index.
SwitchHost(index int) error
}
// RestConnectionProviderOption is a functional option for configuring the REST connection provider.
type RestConnectionProviderOption func(*restConnectionProvider)
// WithHttpTimeout sets the HTTP client timeout.
func WithHttpTimeout(timeout time.Duration) RestConnectionProviderOption {
return func(p *restConnectionProvider) {
p.timeout = timeout
}
}
// WithHttpHeaders sets custom HTTP headers to include in all requests.
func WithHttpHeaders(headers map[string][]string) RestConnectionProviderOption {
return func(p *restConnectionProvider) {
p.headers = headers
}
}
// WithTracing enables OpenTelemetry tracing for HTTP requests.
func WithTracing() RestConnectionProviderOption {
return func(p *restConnectionProvider) {
p.enableTracing = true
}
}
type restConnectionProvider struct {
endpoints []string
httpClient *http.Client
restHandler RestHandler
currentIndex atomic.Uint64
timeout time.Duration
headers map[string][]string
enableTracing bool
}
// NewRestConnectionProvider creates a new REST connection provider that manages HTTP client configuration.
// The endpoint parameter can be a comma-separated list of URLs (e.g., "http://host1:3500,http://host2:3500").
func NewRestConnectionProvider(endpoint string, opts ...RestConnectionProviderOption) (RestConnectionProvider, error) {
endpoints := parseEndpoints(endpoint)
if len(endpoints) == 0 {
return nil, errors.New("no REST API endpoints provided")
}
p := &restConnectionProvider{
endpoints: endpoints,
}
for _, opt := range opts {
opt(p)
}
// Build the HTTP transport chain
var transport http.RoundTripper = http.DefaultTransport
// Add custom headers if configured
if len(p.headers) > 0 {
transport = client.NewCustomHeadersTransport(transport, p.headers)
}
// Add tracing if enabled
if p.enableTracing {
transport = otelhttp.NewTransport(transport)
}
p.httpClient = &http.Client{
Timeout: p.timeout,
Transport: transport,
}
// Create the REST handler with the HTTP client and initial host
p.restHandler = newRestHandler(*p.httpClient, endpoints[0])
log.WithFields(logrus.Fields{
"endpoints": endpoints,
"count": len(endpoints),
}).Info("Initialized REST connection provider")
return p, nil
}
// parseEndpoints splits a comma-separated endpoint string into individual endpoints.
func parseEndpoints(endpoint string) []string {
if endpoint == "" {
return nil
}
endpoints := make([]string, 0, 1)
for p := range strings.SplitSeq(endpoint, ",") {
if p = strings.TrimSpace(p); p != "" {
endpoints = append(endpoints, p)
}
}
return endpoints
}
func (p *restConnectionProvider) HttpClient() *http.Client {
return p.httpClient
}
func (p *restConnectionProvider) RestHandler() RestHandler {
return p.restHandler
}
func (p *restConnectionProvider) CurrentHost() string {
return p.endpoints[p.currentIndex.Load()]
}
func (p *restConnectionProvider) Hosts() []string {
// Return a copy to maintain immutability
hosts := make([]string, len(p.endpoints))
copy(hosts, p.endpoints)
return hosts
}
func (p *restConnectionProvider) SwitchHost(index int) error {
if index < 0 || index >= len(p.endpoints) {
return errors.Errorf("invalid host index %d, must be between 0 and %d", index, len(p.endpoints)-1)
}
oldIdx := p.currentIndex.Load()
p.currentIndex.Store(uint64(index))
// Update the rest handler's host
p.restHandler.SwitchHost(p.endpoints[index])
log.WithFields(logrus.Fields{
"previousHost": p.endpoints[oldIdx],
"newHost": p.endpoints[index],
}).Debug("Switched REST endpoint")
return nil
}

View File

@@ -0,0 +1,80 @@
package rest
import (
"reflect"
"testing"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
)
func TestParseEndpoints(t *testing.T) {
tests := []struct {
name string
input string
expected []string
}{
{"single endpoint", "http://localhost:3500", []string{"http://localhost:3500"}},
{"multiple endpoints", "http://host1:3500,http://host2:3500,http://host3:3500", []string{"http://host1:3500", "http://host2:3500", "http://host3:3500"}},
{"endpoints with spaces", "http://host1:3500, http://host2:3500 , http://host3:3500", []string{"http://host1:3500", "http://host2:3500", "http://host3:3500"}},
{"empty string", "", nil},
{"only commas", ",,,", []string{}},
{"trailing comma", "http://host1:3500,http://host2:3500,", []string{"http://host1:3500", "http://host2:3500"}},
{"leading comma", ",http://host1:3500,http://host2:3500", []string{"http://host1:3500", "http://host2:3500"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := parseEndpoints(tt.input)
if !reflect.DeepEqual(tt.expected, got) {
t.Errorf("parseEndpoints(%q) = %v, want %v", tt.input, got, tt.expected)
}
})
}
}
func TestNewRestConnectionProvider_Errors(t *testing.T) {
t.Run("no endpoints", func(t *testing.T) {
_, err := NewRestConnectionProvider("")
require.ErrorContains(t, "no REST API endpoints provided", err)
})
}
func TestRestConnectionProvider(t *testing.T) {
provider, err := NewRestConnectionProvider("http://host1:3500,http://host2:3500,http://host3:3500")
require.NoError(t, err)
t.Run("initial state", func(t *testing.T) {
assert.Equal(t, 3, len(provider.Hosts()))
assert.Equal(t, "http://host1:3500", provider.CurrentHost())
assert.NotNil(t, provider.HttpClient())
})
t.Run("SwitchHost", func(t *testing.T) {
require.NoError(t, provider.SwitchHost(1))
assert.Equal(t, "http://host2:3500", provider.CurrentHost())
require.NoError(t, provider.SwitchHost(0))
assert.Equal(t, "http://host1:3500", provider.CurrentHost())
require.ErrorContains(t, "invalid host index", provider.SwitchHost(-1))
require.ErrorContains(t, "invalid host index", provider.SwitchHost(3))
})
t.Run("Hosts returns copy", func(t *testing.T) {
hosts := provider.Hosts()
original := hosts[0]
hosts[0] = "modified"
assert.Equal(t, original, provider.Hosts()[0])
})
}
func TestRestConnectionProvider_WithOptions(t *testing.T) {
headers := map[string][]string{"Authorization": {"Bearer token"}}
provider, err := NewRestConnectionProvider(
"http://localhost:3500",
WithHttpHeaders(headers),
WithHttpTimeout(30000000000), // 30 seconds in nanoseconds
WithTracing(),
)
require.NoError(t, err)
assert.NotNil(t, provider.HttpClient())
assert.Equal(t, "http://localhost:3500", provider.CurrentHost())
}

View File

@@ -1,4 +1,4 @@
package beacon_api
package rest
import (
"bytes"
@@ -21,6 +21,7 @@ import (
type reqOption func(*http.Request)
// RestHandler defines the interface for making REST API requests.
type RestHandler interface {
Get(ctx context.Context, endpoint string, resp any) error
GetStatusCode(ctx context.Context, endpoint string) (int, error)
@@ -29,29 +30,34 @@ type RestHandler interface {
PostSSZ(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer) ([]byte, http.Header, error)
HttpClient() *http.Client
Host() string
SetHost(host string)
SwitchHost(host string)
}
type BeaconApiRestHandler struct {
type restHandler struct {
client http.Client
host string
reqOverrides []reqOption
}
// NewBeaconApiRestHandler returns a RestHandler
func NewBeaconApiRestHandler(client http.Client, host string) RestHandler {
brh := &BeaconApiRestHandler{
// newRestHandler returns a RestHandler (internal use)
func newRestHandler(client http.Client, host string) RestHandler {
return NewRestHandler(client, host)
}
// NewRestHandler returns a RestHandler
func NewRestHandler(client http.Client, host string) RestHandler {
rh := &restHandler{
client: client,
host: host,
}
brh.appendAcceptOverride()
return brh
rh.appendAcceptOverride()
return rh
}
// appendAcceptOverride enables the Accept header to be customized at runtime via an environment variable.
// This is specified as an env var because it is a niche option that prysm may use for performance testing or debugging
// bug which users are unlikely to need. Using an env var keeps the set of user-facing flags cleaner.
func (c *BeaconApiRestHandler) appendAcceptOverride() {
func (c *restHandler) appendAcceptOverride() {
if accept := os.Getenv(params.EnvNameOverrideAccept); accept != "" {
c.reqOverrides = append(c.reqOverrides, func(req *http.Request) {
req.Header.Set("Accept", accept)
@@ -60,18 +66,18 @@ func (c *BeaconApiRestHandler) appendAcceptOverride() {
}
// HttpClient returns the underlying HTTP client of the handler
func (c *BeaconApiRestHandler) HttpClient() *http.Client {
func (c *restHandler) HttpClient() *http.Client {
return &c.client
}
// Host returns the underlying HTTP host
func (c *BeaconApiRestHandler) Host() string {
func (c *restHandler) Host() string {
return c.host
}
// Get sends a GET request and decodes the response body as a JSON object into the passed in object.
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
func (c *BeaconApiRestHandler) Get(ctx context.Context, endpoint string, resp any) error {
func (c *restHandler) Get(ctx context.Context, endpoint string, resp any) error {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -94,7 +100,7 @@ func (c *BeaconApiRestHandler) Get(ctx context.Context, endpoint string, resp an
// GetStatusCode sends a GET request and returns only the HTTP status code.
// This is useful for endpoints like /eth/v1/node/health that communicate status via HTTP codes
// (200 = ready, 206 = syncing, 503 = unavailable) rather than response bodies.
func (c *BeaconApiRestHandler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
func (c *restHandler) GetStatusCode(ctx context.Context, endpoint string) (int, error) {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -113,7 +119,7 @@ func (c *BeaconApiRestHandler) GetStatusCode(ctx context.Context, endpoint strin
return httpResp.StatusCode, nil
}
func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
func (c *restHandler) GetSSZ(ctx context.Context, endpoint string) ([]byte, http.Header, error) {
url := c.host + endpoint
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
@@ -168,7 +174,7 @@ func (c *BeaconApiRestHandler) GetSSZ(ctx context.Context, endpoint string) ([]b
// Post sends a POST request and decodes the response body as a JSON object into the passed in object.
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
func (c *BeaconApiRestHandler) Post(
func (c *restHandler) Post(
ctx context.Context,
apiEndpoint string,
headers map[string]string,
@@ -204,7 +210,7 @@ func (c *BeaconApiRestHandler) Post(
}
// PostSSZ sends a POST request and prefers an SSZ (application/octet-stream) response body.
func (c *BeaconApiRestHandler) PostSSZ(
func (c *restHandler) PostSSZ(
ctx context.Context,
apiEndpoint string,
headers map[string]string,
@@ -305,6 +311,6 @@ func decodeResp(httpResp *http.Response, resp any) error {
return nil
}
func (c *BeaconApiRestHandler) SetHost(host string) {
func (c *restHandler) SwitchHost(host string) {
c.host = host
}

View File

@@ -7,7 +7,6 @@ go_library(
"payload_attestation.go",
"pending_payment.go",
"proposer_slashing.go",
"withdrawal.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/beacon-chain/core/gloas",
visibility = ["//visibility:public"],

View File

@@ -1,107 +0,0 @@
package gloas
import (
"fmt"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/pkg/errors"
)
// ProcessWithdrawals applies withdrawals to the state for Gloas.
//
// Spec v1.7.0-alpha.1 (pseudocode):
//
// def process_withdrawals(
//
// state: BeaconState,
// # [Modified in Gloas:EIP7732]
// # Removed `payload`
//
// ) -> None:
//
// # [New in Gloas:EIP7732]
// # Return early if the parent block is empty
// if not is_parent_block_full(state):
// return
//
// # Get expected withdrawals
// expected = get_expected_withdrawals(state)
//
// # Apply expected withdrawals
// apply_withdrawals(state, expected.withdrawals)
//
// # Update withdrawals fields in the state
// update_next_withdrawal_index(state, expected.withdrawals)
// # [New in Gloas:EIP7732]
// update_payload_expected_withdrawals(state, expected.withdrawals)
// # [New in Gloas:EIP7732]
// update_builder_pending_withdrawals(state, expected.processed_builder_withdrawals_count)
// update_pending_partial_withdrawals(state, expected.processed_partial_withdrawals_count)
// # [New in Gloas:EIP7732]
// update_next_withdrawal_builder_index(state, expected.processed_builders_sweep_count)
// update_next_withdrawal_validator_index(state, expected.withdrawals)
func ProcessWithdrawals(st state.BeaconState) error {
full, err := st.IsParentBlockFull()
if err != nil {
return errors.Wrap(err, "could not get parent block full status")
}
if !full {
return nil
}
expected, err := st.ExpectedWithdrawalsGloas()
if err != nil {
return errors.Wrap(err, "could not get expected withdrawals")
}
if err := st.DecreaseWithdrawalBalances(expected.Withdrawals); err != nil {
return errors.Wrap(err, "could not decrease withdrawal balances")
}
if len(expected.Withdrawals) > 0 {
if err := st.SetNextWithdrawalIndex(expected.Withdrawals[len(expected.Withdrawals)-1].Index + 1); err != nil {
return errors.Wrap(err, "could not set next withdrawal index")
}
}
err = st.SetPayloadExpectedWithdrawals(expected.Withdrawals)
if err != nil {
return errors.Wrap(err, "could not set payload expected withdrawals")
}
err = st.DequeueBuilderPendingWithdrawals(expected.ProcessedBuilderWithdrawalsCount)
if err != nil {
return fmt.Errorf("unable to dequeue builder pending withdrawals from state: %w", err)
}
if err := st.DequeuePendingPartialWithdrawals(expected.ProcessedPartialWithdrawalsCount); err != nil {
return fmt.Errorf("unable to dequeue partial withdrawals from state: %w", err)
}
err = st.SetNextWithdrawalBuilderIndex(expected.NextWithdrawalBuilderIndex)
if err != nil {
return errors.Wrap(err, "could not set next withdrawal builder index")
}
var nextValidatorIndex primitives.ValidatorIndex
if uint64(len(expected.Withdrawals)) < params.BeaconConfig().MaxWithdrawalsPerPayload {
nextValidatorIndex, err = st.NextWithdrawalValidatorIndex()
if err != nil {
return errors.Wrap(err, "could not get next withdrawal validator index")
}
nextValidatorIndex += primitives.ValidatorIndex(params.BeaconConfig().MaxValidatorsPerWithdrawalsSweep)
nextValidatorIndex = nextValidatorIndex % primitives.ValidatorIndex(st.NumValidators())
} else {
nextValidatorIndex = expected.Withdrawals[len(expected.Withdrawals)-1].ValidatorIndex + 1
if nextValidatorIndex == primitives.ValidatorIndex(st.NumValidators()) {
nextValidatorIndex = 0
}
}
if err := st.SetNextWithdrawalValidatorIndex(nextValidatorIndex); err != nil {
return errors.Wrap(err, "could not set next withdrawal validator index")
}
return nil
}

View File

@@ -67,7 +67,6 @@ func getSubscriptionStatusFromDB(t *testing.T, db *Store) bool {
return subscribed
}
func TestUpdateCustodyInfo(t *testing.T) {
ctx := t.Context()

View File

@@ -138,6 +138,9 @@ func connect(a, b host.Host) error {
func (p *TestP2P) ReceiveRPC(topic string, msg proto.Message) {
h, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{}))
require.NoError(p.t, err)
defer func() {
require.NoError(p.t, h.Close())
}()
if err := connect(h, p.BHost); err != nil {
p.t.Fatalf("Failed to connect two peers for RPC: %v", err)
}
@@ -169,6 +172,9 @@ func (p *TestP2P) ReceiveRPC(topic string, msg proto.Message) {
func (p *TestP2P) ReceivePubSub(topic string, msg proto.Message) {
h, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{}))
require.NoError(p.t, err)
defer func() {
require.NoError(p.t, h.Close())
}()
ps, err := pubsub.NewFloodSub(context.Background(), h,
pubsub.WithMessageSigning(false),
pubsub.WithStrictSignatureVerification(false),

View File

@@ -26,8 +26,8 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits/mock"
p2pMock "github.com/OffchainLabs/prysm/v7/beacon-chain/p2p/testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core"
mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing"
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
mockSync "github.com/OffchainLabs/prysm/v7/beacon-chain/sync/initial-sync/testing"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/crypto/bls"

View File

@@ -84,8 +84,6 @@ func TestGetSpec(t *testing.T) {
config.FuluForkVersion = []byte("FuluForkVersion")
config.FuluForkEpoch = 109
config.GloasForkEpoch = 110
config.BuilderIndexFlag = 111
config.MaxBuildersPerWithdrawalsSweep = 112
config.BLSWithdrawalPrefixByte = byte('b')
config.ETH1AddressWithdrawalPrefixByte = byte('c')
config.GenesisDelay = 24
@@ -222,7 +220,7 @@ func TestGetSpec(t *testing.T) {
require.NoError(t, json.Unmarshal(writer.Body.Bytes(), &resp))
data, ok := resp.Data.(map[string]any)
require.Equal(t, true, ok)
assert.Equal(t, 194, len(data))
assert.Equal(t, 192, len(data))
for k, v := range data {
t.Run(k, func(t *testing.T) {
switch k {
@@ -304,10 +302,6 @@ func TestGetSpec(t *testing.T) {
assert.Equal(t, "109", v)
case "GLOAS_FORK_EPOCH":
assert.Equal(t, "110", v)
case "BUILDER_INDEX_FLAG":
assert.Equal(t, "111", v)
case "MAX_BUILDERS_PER_WITHDRAWALS_SWEEP":
assert.Equal(t, "112", v)
case "MIN_ANCHOR_POW_BLOCK_DIFFICULTY":
assert.Equal(t, "1000", v)
case "BLS_WITHDRAWAL_PREFIX":

View File

@@ -3,7 +3,6 @@ package state
import (
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
)
@@ -14,10 +13,6 @@ type writeOnlyGloasFields interface {
RotateBuilderPendingPayments() error
AppendBuilderPendingWithdrawals([]*ethpb.BuilderPendingWithdrawal) error
UpdateExecutionPayloadAvailabilityAtIndex(idx uint64, val byte) error
SetPayloadExpectedWithdrawals(withdrawals []*enginev1.Withdrawal) error
DecreaseWithdrawalBalances(withdrawals []*enginev1.Withdrawal) error
DequeueBuilderPendingWithdrawals(num uint64) error
SetNextWithdrawalBuilderIndex(idx primitives.BuilderIndex) error
}
type readOnlyGloasFields interface {
@@ -26,15 +21,4 @@ type readOnlyGloasFields interface {
CanBuilderCoverBid(primitives.BuilderIndex, primitives.Gwei) (bool, error)
LatestBlockHash() ([32]byte, error)
BuilderPendingPayments() ([]*ethpb.BuilderPendingPayment, error)
IsParentBlockFull() (bool, error)
ExpectedWithdrawalsGloas() (ExpectedWithdrawalsGloasResult, error)
}
// ExpectedWithdrawalsGloasResult bundles the expected withdrawals and related counters
// for the Gloas fork to avoid positional return mistakes.
type ExpectedWithdrawalsGloasResult struct {
Withdrawals []*enginev1.Withdrawal
ProcessedBuilderWithdrawalsCount uint64
ProcessedPartialWithdrawalsCount uint64
NextWithdrawalBuilderIndex primitives.BuilderIndex
}

View File

@@ -1,17 +1,13 @@
package state_native
import (
"bytes"
"fmt"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/time/slots"
)
// LatestBlockHash returns the hash of the latest execution block.
@@ -151,223 +147,3 @@ func (b *BeaconState) BuilderPendingPayments() ([]*ethpb.BuilderPendingPayment,
return b.builderPendingPaymentsVal(), nil
}
// IsParentBlockFull returns true if the last committed payload bid was fulfilled with a payload,
// which can only happen when both beacon block and payload were present.
// This function must be called on a beacon state before processing the execution payload bid in the block.
// Spec v1.7.0-alpha.2 (pseudocode):
// def is_parent_block_full(state: BeaconState) -> bool:
//
// return state.latest_execution_payload_bid.block_hash == state.latest_block_hash
func (b *BeaconState) IsParentBlockFull() (bool, error) {
if b.version < version.Gloas {
return false, errNotSupported("IsParentBlockFull", b.version)
}
b.lock.RLock()
defer b.lock.RUnlock()
if b.latestExecutionPayloadBid == nil {
return false, nil
}
return bytes.Equal(b.latestExecutionPayloadBid.BlockHash, b.latestBlockHash), nil
}
// ExpectedWithdrawalsGloas returns the withdrawals that a proposer will need to pack in the next block
// applied to the current state. It is also used by validators to check that the execution payload carried
// the right number of withdrawals.
//
// Spec v1.7.0-alpha.1:
//
// def get_expected_withdrawals(state: BeaconState) -> ExpectedWithdrawals:
// withdrawal_index = state.next_withdrawal_index
// withdrawals: List[Withdrawal] = []
//
// # [New in Gloas:EIP7732]
// # Get builder withdrawals
// builder_withdrawals, withdrawal_index, processed_builder_withdrawals_count = (
// get_builder_withdrawals(state, withdrawal_index, withdrawals)
// )
// withdrawals.extend(builder_withdrawals)
//
// # Get partial withdrawals
// partial_withdrawals, withdrawal_index, processed_partial_withdrawals_count = (
// get_pending_partial_withdrawals(state, withdrawal_index, withdrawals)
// )
// withdrawals.extend(partial_withdrawals)
//
// # [New in Gloas:EIP7732]
// # Get builders sweep withdrawals
// builders_sweep_withdrawals, withdrawal_index, processed_builders_sweep_count = (
// get_builders_sweep_withdrawals(state, withdrawal_index, withdrawals)
// )
// withdrawals.extend(builders_sweep_withdrawals)
//
// # Get validators sweep withdrawals
// validators_sweep_withdrawals, withdrawal_index, processed_validators_sweep_count = (
// get_validators_sweep_withdrawals(state, withdrawal_index, withdrawals)
// )
// withdrawals.extend(validators_sweep_withdrawals)
//
// return ExpectedWithdrawals(
// withdrawals,
// # [New in Gloas:EIP7732]
// processed_builder_withdrawals_count,
// processed_partial_withdrawals_count,
// # [New in Gloas:EIP7732]
// processed_builders_sweep_count,
// processed_validators_sweep_count,
// )
func (b *BeaconState) ExpectedWithdrawalsGloas() (state.ExpectedWithdrawalsGloasResult, error) {
if b.version < version.Gloas {
return state.ExpectedWithdrawalsGloasResult{}, errNotSupported("ExpectedWithdrawalsGloas", b.version)
}
b.lock.RLock()
defer b.lock.RUnlock()
cfg := params.BeaconConfig()
withdrawals := make([]*enginev1.Withdrawal, 0, cfg.MaxWithdrawalsPerPayload)
withdrawalIndex := b.nextWithdrawalIndex
withdrawalIndex, processedBuilderWithdrawalsCount, err := b.appendBuilderWithdrawals(withdrawalIndex, &withdrawals)
if err != nil {
return state.ExpectedWithdrawalsGloasResult{}, err
}
withdrawalIndex, processedPartialWithdrawalsCount, err := b.appendPendingPartialWithdrawals(withdrawalIndex, &withdrawals)
if err != nil {
return state.ExpectedWithdrawalsGloasResult{}, err
}
withdrawalIndex, nextBuilderIndex, err := b.appendBuildersSweepWithdrawals(withdrawalIndex, &withdrawals)
if err != nil {
return state.ExpectedWithdrawalsGloasResult{}, err
}
err = b.appendValidatorsSweepWithdrawals(withdrawalIndex, &withdrawals)
if err != nil {
return state.ExpectedWithdrawalsGloasResult{}, err
}
return state.ExpectedWithdrawalsGloasResult{
Withdrawals: withdrawals,
ProcessedBuilderWithdrawalsCount: processedBuilderWithdrawalsCount,
ProcessedPartialWithdrawalsCount: processedPartialWithdrawalsCount,
NextWithdrawalBuilderIndex: nextBuilderIndex,
}, nil
}
// appendBuilderWithdrawals returns builder pending withdrawals, the updated withdrawal index,
// and the processed count, following spec v1.7.0-alpha.2:
//
// def get_builder_withdrawals(state, withdrawal_index, prior_withdrawals):
// withdrawals_limit = MAX_WITHDRAWALS_PER_PAYLOAD - 1
// assert len(prior_withdrawals) <= withdrawals_limit
// processed_count = 0
// withdrawals = []
// for withdrawal in state.builder_pending_withdrawals:
// if len(prior_withdrawals + withdrawals) >= withdrawals_limit:
// break
// withdrawals.append(Withdrawal(
// index=withdrawal_index,
// validator_index=convert_builder_index_to_validator_index(withdrawal.builder_index),
// address=withdrawal.fee_recipient,
// amount=withdrawal.amount,
// ))
// withdrawal_index += 1
// processed_count += 1
// return withdrawals, withdrawal_index, processed_count
func (b *BeaconState) appendBuilderWithdrawals(withdrawalIndex uint64, withdrawals *[]*enginev1.Withdrawal) (uint64, uint64, error) {
cfg := params.BeaconConfig()
withdrawalsLimit := int(cfg.MaxWithdrawalsPerPayload - 1)
ws := *withdrawals
if len(ws) > withdrawalsLimit {
return withdrawalIndex, 0, fmt.Errorf("prior withdrawals length %d exceeds limit %d", len(ws), withdrawalsLimit)
}
var processedCount uint64
for _, w := range b.builderPendingWithdrawals {
if len(ws) >= withdrawalsLimit {
break
}
ws = append(ws, &enginev1.Withdrawal{
Index: withdrawalIndex,
ValidatorIndex: w.BuilderIndex.ToValidatorIndex(),
Address: w.FeeRecipient,
Amount: uint64(w.Amount),
})
withdrawalIndex++
processedCount++
}
*withdrawals = ws
return withdrawalIndex, processedCount, nil
}
// appendBuildersSweepWithdrawals returns builder sweep withdrawals, the updated withdrawal index,
// and the processed count, following spec v1.7.0-alpha.2:
//
// def get_builders_sweep_withdrawals(state, withdrawal_index, prior_withdrawals):
// epoch = get_current_epoch(state)
// builders_limit = min(len(state.builders), MAX_BUILDERS_PER_WITHDRAWALS_SWEEP)
// withdrawals_limit = MAX_WITHDRAWALS_PER_PAYLOAD - 1
// assert len(prior_withdrawals) <= withdrawals_limit
// processed_count = 0
// withdrawals = []
// builder_index = state.next_withdrawal_builder_index
// for _ in range(builders_limit):
// if len(prior_withdrawals + withdrawals) >= withdrawals_limit:
// break
// builder = state.builders[builder_index]
// if builder.withdrawable_epoch <= epoch and builder.balance > 0:
// withdrawals.append(Withdrawal(
// index=withdrawal_index,
// validator_index=convert_builder_index_to_validator_index(builder_index),
// address=builder.execution_address,
// amount=builder.balance,
// ))
// withdrawal_index += 1
// builder_index = BuilderIndex((builder_index + 1) % len(state.builders))
// processed_count += 1
// return withdrawals, withdrawal_index, processed_count
func (b *BeaconState) appendBuildersSweepWithdrawals(withdrawalIndex uint64, withdrawals *[]*enginev1.Withdrawal) (uint64, primitives.BuilderIndex, error) {
cfg := params.BeaconConfig()
withdrawalsLimit := int(cfg.MaxWithdrawalsPerPayload - 1)
if len(*withdrawals) > withdrawalsLimit {
return withdrawalIndex, 0, fmt.Errorf("prior withdrawals length %d exceeds limit %d", len(*withdrawals), withdrawalsLimit)
}
ws := *withdrawals
buildersLimit := len(b.builders)
if maxBuilders := int(cfg.MaxBuildersPerWithdrawalsSweep); buildersLimit > maxBuilders {
buildersLimit = maxBuilders
}
builderIndex := b.nextWithdrawalBuilderIndex
epoch := slots.ToEpoch(b.slot)
for i := 0; i < buildersLimit; i++ {
if len(ws) >= withdrawalsLimit {
break
}
builder := b.builders[builderIndex]
if builder != nil && builder.WithdrawableEpoch <= epoch && builder.Balance > 0 {
ws = append(ws, &enginev1.Withdrawal{
Index: withdrawalIndex,
ValidatorIndex: builderIndex.ToValidatorIndex(),
Address: builder.ExecutionAddress,
Amount: uint64(builder.Balance),
})
withdrawalIndex++
}
builderIndex = primitives.BuilderIndex((uint64(builderIndex) + 1) % uint64(len(b.builders)))
}
*withdrawals = ws
return withdrawalIndex, builderIndex, nil
}

View File

@@ -1,27 +1,26 @@
package state_native
package state_native_test
import (
"bytes"
"testing"
state_native "github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/testing/require"
"github.com/OffchainLabs/prysm/v7/time/slots"
"github.com/OffchainLabs/prysm/v7/testing/util"
)
func TestLatestBlockHash(t *testing.T) {
t.Run("returns error before gloas", func(t *testing.T) {
st := &BeaconState{version: version.Fulu}
st, _ := util.DeterministicGenesisState(t, 1)
_, err := st.LatestBlockHash()
require.ErrorContains(t, "is not supported", err)
})
t.Run("returns zero hash when unset", func(t *testing.T) {
st, err := InitializeFromProtoGloas(&ethpb.BeaconStateGloas{})
st, err := state_native.InitializeFromProtoGloas(&ethpb.BeaconStateGloas{})
require.NoError(t, err)
got, err := st.LatestBlockHash()
@@ -34,7 +33,7 @@ func TestLatestBlockHash(t *testing.T) {
var want [32]byte
copy(want[:], hashBytes)
st, err := InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
st, err := state_native.InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
LatestBlockHash: hashBytes,
})
require.NoError(t, err)
@@ -47,14 +46,17 @@ func TestLatestBlockHash(t *testing.T) {
func TestBuilderPubkey(t *testing.T) {
t.Run("returns error before gloas", func(t *testing.T) {
st := &BeaconState{version: version.Fulu}
_, err := st.BuilderPubkey(0)
stIface, _ := util.DeterministicGenesisState(t, 1)
native, ok := stIface.(*state_native.BeaconState)
require.Equal(t, true, ok)
_, err := native.BuilderPubkey(0)
require.ErrorContains(t, "is not supported", err)
})
t.Run("returns pubkey copy", func(t *testing.T) {
pubkey := bytes.Repeat([]byte{0xAA}, 48)
stIface, err := InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
stIface, err := state_native.InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
Builders: []*ethpb.Builder{
{
Pubkey: pubkey,
@@ -78,12 +80,12 @@ func TestBuilderPubkey(t *testing.T) {
})
t.Run("out of range returns error", func(t *testing.T) {
stIface, err := InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
stIface, err := state_native.InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
Builders: []*ethpb.Builder{},
})
require.NoError(t, err)
st := stIface.(*BeaconState)
st := stIface.(*state_native.BeaconState)
_, err = st.BuilderPubkey(1)
require.ErrorContains(t, "out of range", err)
})
@@ -91,7 +93,7 @@ func TestBuilderPubkey(t *testing.T) {
func TestBuilderHelpers(t *testing.T) {
t.Run("is active builder", func(t *testing.T) {
st, err := InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
st, err := state_native.InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
Builders: []*ethpb.Builder{
{
Balance: 10,
@@ -118,7 +120,7 @@ func TestBuilderHelpers(t *testing.T) {
},
FinalizedCheckpoint: &ethpb.Checkpoint{Epoch: 2},
}
stInactive, err := InitializeFromProtoGloas(stProto)
stInactive, err := state_native.InitializeFromProtoGloas(stProto)
require.NoError(t, err)
active, err = stInactive.IsActiveBuilder(0)
@@ -127,7 +129,7 @@ func TestBuilderHelpers(t *testing.T) {
})
t.Run("can builder cover bid", func(t *testing.T) {
stIface, err := InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
stIface, err := state_native.InitializeFromProtoGloas(&ethpb.BeaconStateGloas{
Builders: []*ethpb.Builder{
{
Balance: primitives.Gwei(params.BeaconConfig().MinDepositAmount + 50),
@@ -145,7 +147,7 @@ func TestBuilderHelpers(t *testing.T) {
})
require.NoError(t, err)
st := stIface.(*BeaconState)
st := stIface.(*state_native.BeaconState)
ok, err := st.CanBuilderCoverBid(0, 20)
require.NoError(t, err)
require.Equal(t, true, ok)
@@ -157,245 +159,10 @@ func TestBuilderHelpers(t *testing.T) {
}
func TestBuilderPendingPayments_UnsupportedVersion(t *testing.T) {
stIface, err := InitializeFromProtoElectra(&ethpb.BeaconStateElectra{})
stIface, err := state_native.InitializeFromProtoElectra(&ethpb.BeaconStateElectra{})
require.NoError(t, err)
st := stIface.(*BeaconState)
st := stIface.(*state_native.BeaconState)
_, err = st.BuilderPendingPayments()
require.ErrorContains(t, "BuilderPendingPayments", err)
}
func TestIsParentBlockFull(t *testing.T) {
t.Run("returns error before gloas", func(t *testing.T) {
st := &BeaconState{version: version.Fulu}
_, err := st.IsParentBlockFull()
require.ErrorContains(t, "is not supported", err)
})
t.Run("returns false when bid is nil", func(t *testing.T) {
st := &BeaconState{version: version.Gloas}
got, err := st.IsParentBlockFull()
require.NoError(t, err)
require.Equal(t, false, got)
})
t.Run("returns true when hashes match", func(t *testing.T) {
hash := bytes.Repeat([]byte{0xAB}, 32)
st := &BeaconState{
version: version.Gloas,
latestExecutionPayloadBid: &ethpb.ExecutionPayloadBid{
BlockHash: hash,
},
latestBlockHash: hash,
}
got, err := st.IsParentBlockFull()
require.NoError(t, err)
require.Equal(t, true, got)
})
t.Run("returns false when hashes differ", func(t *testing.T) {
hash := bytes.Repeat([]byte{0xAB}, 32)
other := bytes.Repeat([]byte{0xCD}, 32)
st := &BeaconState{
version: version.Gloas,
latestExecutionPayloadBid: &ethpb.ExecutionPayloadBid{
BlockHash: hash,
},
latestBlockHash: other,
}
got, err := st.IsParentBlockFull()
require.NoError(t, err)
require.Equal(t, false, got)
})
}
func TestAppendBuilderWithdrawals(t *testing.T) {
t.Run("errors when prior withdrawals exceed limit", func(t *testing.T) {
st := &BeaconState{}
limit := params.BeaconConfig().MaxWithdrawalsPerPayload - 1
withdrawals := make([]*enginev1.Withdrawal, limit+1)
nextIndex, processed, err := st.appendBuilderWithdrawals(5, &withdrawals)
require.ErrorContains(t, "exceeds limit", err)
require.Equal(t, uint64(5), nextIndex)
require.Equal(t, uint64(0), processed)
require.Equal(t, int(limit+1), len(withdrawals))
})
t.Run("appends builder withdrawals and increments index", func(t *testing.T) {
st := &BeaconState{
builderPendingWithdrawals: []*ethpb.BuilderPendingWithdrawal{
{BuilderIndex: 1, FeeRecipient: []byte{0x01}, Amount: 11},
{BuilderIndex: 2, FeeRecipient: []byte{0x02}, Amount: 22},
{BuilderIndex: 3, FeeRecipient: []byte{0x03}, Amount: 33},
},
}
withdrawals := []*enginev1.Withdrawal{
{Index: 7, ValidatorIndex: 9, Address: []byte{0xAA}, Amount: 99},
}
nextIndex, processed, err := st.appendBuilderWithdrawals(10, &withdrawals)
require.NoError(t, err)
require.Equal(t, uint64(13), nextIndex)
require.Equal(t, uint64(3), processed)
require.Equal(t, 4, len(withdrawals))
require.DeepEqual(t, &enginev1.Withdrawal{
Index: 10,
ValidatorIndex: primitives.BuilderIndex(1).ToValidatorIndex(),
Address: []byte{0x01},
Amount: 11,
}, withdrawals[1])
require.DeepEqual(t, &enginev1.Withdrawal{
Index: 11,
ValidatorIndex: primitives.BuilderIndex(2).ToValidatorIndex(),
Address: []byte{0x02},
Amount: 22,
}, withdrawals[2])
require.DeepEqual(t, &enginev1.Withdrawal{
Index: 12,
ValidatorIndex: primitives.BuilderIndex(3).ToValidatorIndex(),
Address: []byte{0x03},
Amount: 33,
}, withdrawals[3])
})
t.Run("respects per-payload limit", func(t *testing.T) {
limit := params.BeaconConfig().MaxWithdrawalsPerPayload - 1
st := &BeaconState{
builderPendingWithdrawals: []*ethpb.BuilderPendingWithdrawal{
{BuilderIndex: 4, FeeRecipient: []byte{0x04}, Amount: 44},
{BuilderIndex: 5, FeeRecipient: []byte{0x05}, Amount: 55},
},
}
withdrawals := make([]*enginev1.Withdrawal, limit-1)
nextIndex, processed, err := st.appendBuilderWithdrawals(20, &withdrawals)
require.NoError(t, err)
require.Equal(t, uint64(21), nextIndex)
require.Equal(t, uint64(1), processed)
require.Equal(t, int(limit), len(withdrawals))
require.DeepEqual(t, &enginev1.Withdrawal{
Index: 20,
ValidatorIndex: primitives.BuilderIndex(4).ToValidatorIndex(),
Address: []byte{0x04},
Amount: 44,
}, withdrawals[len(withdrawals)-1])
})
t.Run("does not append when already at limit", func(t *testing.T) {
limit := params.BeaconConfig().MaxWithdrawalsPerPayload - 1
if limit == 0 {
t.Skip("withdrawals limit too small")
}
st := &BeaconState{
builderPendingWithdrawals: []*ethpb.BuilderPendingWithdrawal{
{BuilderIndex: 6, FeeRecipient: []byte{0x06}, Amount: 66},
},
}
withdrawals := make([]*enginev1.Withdrawal, limit)
nextIndex, processed, err := st.appendBuilderWithdrawals(30, &withdrawals)
require.NoError(t, err)
require.Equal(t, uint64(30), nextIndex)
require.Equal(t, uint64(0), processed)
require.Equal(t, int(limit), len(withdrawals))
})
}
func TestAppendBuildersSweepWithdrawals(t *testing.T) {
t.Run("errors when prior withdrawals exceed limit", func(t *testing.T) {
st := &BeaconState{}
limit := params.BeaconConfig().MaxWithdrawalsPerPayload - 1
withdrawals := make([]*enginev1.Withdrawal, limit+1)
nextIndex, nextBuilderIndex, err := st.appendBuildersSweepWithdrawals(5, &withdrawals)
require.ErrorContains(t, "exceeds limit", err)
require.Equal(t, uint64(5), nextIndex)
require.Equal(t, primitives.BuilderIndex(0), nextBuilderIndex)
require.Equal(t, int(limit+1), len(withdrawals))
})
t.Run("appends eligible builders, skips ineligible", func(t *testing.T) {
epoch := primitives.Epoch(3)
st := &BeaconState{
slot: slots.UnsafeEpochStart(epoch),
nextWithdrawalBuilderIndex: 2,
builders: []*ethpb.Builder{
{ExecutionAddress: []byte{0x01}, Balance: 0, WithdrawableEpoch: epoch},
{ExecutionAddress: []byte{0x02}, Balance: 10, WithdrawableEpoch: epoch + 1},
{ExecutionAddress: []byte{0x03}, Balance: 20, WithdrawableEpoch: epoch},
},
}
withdrawals := []*enginev1.Withdrawal{}
nextIndex, nextBuilderIndex, err := st.appendBuildersSweepWithdrawals(100, &withdrawals)
require.NoError(t, err)
require.Equal(t, uint64(101), nextIndex)
require.Equal(t, primitives.BuilderIndex(2), nextBuilderIndex)
require.Equal(t, 1, len(withdrawals))
require.DeepEqual(t, &enginev1.Withdrawal{
Index: 100,
ValidatorIndex: primitives.BuilderIndex(2).ToValidatorIndex(),
Address: []byte{0x03},
Amount: 20,
}, withdrawals[0])
})
t.Run("respects max builders per sweep", func(t *testing.T) {
cfg := params.BeaconConfig()
max := int(cfg.MaxBuildersPerWithdrawalsSweep)
epoch := primitives.Epoch(1)
builders := make([]*ethpb.Builder, max+2)
for i := range builders {
builders[i] = &ethpb.Builder{
ExecutionAddress: []byte{byte(i + 1)},
Balance: 1,
WithdrawableEpoch: epoch,
}
}
start := len(builders) - 1
st := &BeaconState{
slot: slots.UnsafeEpochStart(epoch),
nextWithdrawalBuilderIndex: primitives.BuilderIndex(start),
builders: builders,
}
withdrawals := []*enginev1.Withdrawal{}
nextIndex, nextBuilderIndex, err := st.appendBuildersSweepWithdrawals(7, &withdrawals)
require.NoError(t, err)
limit := int(cfg.MaxWithdrawalsPerPayload - 1)
expectedCount := min(max, limit)
require.Equal(t, uint64(7)+uint64(expectedCount), nextIndex)
require.Equal(t, expectedCount, len(withdrawals))
expectedNext := primitives.BuilderIndex((uint64(start) + uint64(expectedCount)) % uint64(len(builders)))
require.Equal(t, expectedNext, nextBuilderIndex)
})
t.Run("stops when payload limit reached", func(t *testing.T) {
cfg := params.BeaconConfig()
limit := cfg.MaxWithdrawalsPerPayload - 1
if limit < 1 {
t.Skip("withdrawals limit too small")
}
epoch := primitives.Epoch(2)
builders := []*ethpb.Builder{
{ExecutionAddress: []byte{0x0A}, Balance: 3, WithdrawableEpoch: epoch},
{ExecutionAddress: []byte{0x0B}, Balance: 4, WithdrawableEpoch: epoch},
}
st := &BeaconState{
slot: slots.UnsafeEpochStart(epoch),
nextWithdrawalBuilderIndex: 0,
builders: builders,
}
withdrawals := make([]*enginev1.Withdrawal, limit)
nextIndex, nextBuilderIndex, err := st.appendBuildersSweepWithdrawals(20, &withdrawals)
require.NoError(t, err)
require.Equal(t, uint64(20), nextIndex)
require.Equal(t, int(limit), len(withdrawals))
require.Equal(t, primitives.BuilderIndex(0), nextBuilderIndex)
})
}

View File

@@ -133,22 +133,11 @@ func (b *BeaconState) appendPendingPartialWithdrawals(withdrawalIndex uint64, wi
return withdrawalIndex, 0, nil
}
cfg := params.BeaconConfig()
withdrawalsLimit := min(
len(*withdrawals)+int(cfg.MaxPendingPartialsPerWithdrawalsSweep),
int(cfg.MaxWithdrawalsPerPayload-1),
)
if len(*withdrawals) > withdrawalsLimit {
return withdrawalIndex, 0, fmt.Errorf("prior withdrawals length %d exceeds limit %d", len(*withdrawals), withdrawalsLimit)
}
ws := *withdrawals
epoch := slots.ToEpoch(b.slot)
var processedPartialWithdrawalsCount uint64
for _, w := range b.pendingPartialWithdrawals {
isWithdrawable := w.WithdrawableEpoch <= epoch
hasReachedLimit := len(ws) >= withdrawalsLimit
if !isWithdrawable || hasReachedLimit {
if w.WithdrawableEpoch > epoch || len(ws) >= int(params.BeaconConfig().MaxPendingPartialsPerWithdrawalsSweep) {
break
}
@@ -160,7 +149,7 @@ func (b *BeaconState) appendPendingPartialWithdrawals(withdrawalIndex uint64, wi
if err != nil {
return withdrawalIndex, 0, fmt.Errorf("could not retrieve balance at index %d: %w", w.Index, err)
}
hasSufficientEffectiveBalance := v.EffectiveBalance() >= cfg.MinActivationBalance
hasSufficientEffectiveBalance := v.EffectiveBalance() >= params.BeaconConfig().MinActivationBalance
var totalWithdrawn uint64
for _, wi := range ws {
if wi.ValidatorIndex == w.Index {
@@ -171,9 +160,9 @@ func (b *BeaconState) appendPendingPartialWithdrawals(withdrawalIndex uint64, wi
if err != nil {
return withdrawalIndex, 0, errors.Wrapf(err, "failed to subtract balance %d with total withdrawn %d", vBal, totalWithdrawn)
}
hasExcessBalance := balance > cfg.MinActivationBalance
if v.ExitEpoch() == cfg.FarFutureEpoch && hasSufficientEffectiveBalance && hasExcessBalance {
amount := min(balance-cfg.MinActivationBalance, w.Amount)
hasExcessBalance := balance > params.BeaconConfig().MinActivationBalance
if v.ExitEpoch() == params.BeaconConfig().FarFutureEpoch && hasSufficientEffectiveBalance && hasExcessBalance {
amount := min(balance-params.BeaconConfig().MinActivationBalance, w.Amount)
ws = append(ws, &enginev1.Withdrawal{
Index: withdrawalIndex,
ValidatorIndex: w.Index,
@@ -194,7 +183,7 @@ func (b *BeaconState) appendValidatorsSweepWithdrawals(withdrawalIndex uint64, w
validatorIndex := b.nextWithdrawalValidatorIndex
validatorsLen := b.validatorsLen()
epoch := slots.ToEpoch(b.slot)
bound := min(validatorsLen, int(params.BeaconConfig().MaxValidatorsPerWithdrawalsSweep))
bound := min(uint64(validatorsLen), params.BeaconConfig().MaxValidatorsPerWithdrawalsSweep)
for range bound {
val, err := b.validatorAtIndexReadOnly(validatorIndex)
if err != nil {
@@ -233,7 +222,7 @@ func (b *BeaconState) appendValidatorsSweepWithdrawals(withdrawalIndex uint64, w
})
withdrawalIndex++
}
if len(ws) == int(params.BeaconConfig().MaxWithdrawalsPerPayload) {
if uint64(len(ws)) == params.BeaconConfig().MaxWithdrawalsPerPayload {
break
}
validatorIndex += 1

View File

@@ -1,7 +1,6 @@
package state_native
import (
"errors"
"fmt"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/state-native/types"
@@ -9,7 +8,6 @@ import (
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
)
@@ -163,154 +161,3 @@ func (b *BeaconState) UpdateExecutionPayloadAvailabilityAtIndex(idx uint64, val
b.markFieldAsDirty(types.ExecutionPayloadAvailability)
return nil
}
// SetPayloadExpectedWithdrawals stores the expected withdrawals for the next payload.
func (b *BeaconState) SetPayloadExpectedWithdrawals(withdrawals []*enginev1.Withdrawal) error {
if b.version < version.Gloas {
return errNotSupported("SetPayloadExpectedWithdrawals", b.version)
}
b.lock.Lock()
defer b.lock.Unlock()
b.payloadExpectedWithdrawals = withdrawals
b.markFieldAsDirty(types.PayloadExpectedWithdrawals)
return nil
}
// DequeueBuilderPendingWithdrawals removes processed builder withdrawals from the front of the queue.
func (b *BeaconState) DequeueBuilderPendingWithdrawals(n uint64) error {
if b.version < version.Gloas {
return errNotSupported("DequeueBuilderPendingWithdrawals", b.version)
}
if n > uint64(len(b.builderPendingWithdrawals)) {
return errors.New("cannot dequeue more builder withdrawals than are in the queue")
}
if n == 0 {
return nil
}
b.lock.Lock()
defer b.lock.Unlock()
if b.sharedFieldReferences[types.BuilderPendingWithdrawals].Refs() > 1 {
withdrawals := make([]*ethpb.BuilderPendingWithdrawal, len(b.builderPendingWithdrawals))
copy(withdrawals, b.builderPendingWithdrawals)
b.builderPendingWithdrawals = withdrawals
b.sharedFieldReferences[types.BuilderPendingWithdrawals].MinusRef()
b.sharedFieldReferences[types.BuilderPendingWithdrawals] = stateutil.NewRef(1)
}
b.builderPendingWithdrawals = b.builderPendingWithdrawals[n:]
b.markFieldAsDirty(types.BuilderPendingWithdrawals)
b.rebuildTrie[types.BuilderPendingWithdrawals] = true
return nil
}
// SetNextWithdrawalBuilderIndex sets the next builder index for the withdrawals sweep.
func (b *BeaconState) SetNextWithdrawalBuilderIndex(index primitives.BuilderIndex) error {
if b.version < version.Gloas {
return errNotSupported("SetNextWithdrawalBuilderIndex", b.version)
}
b.lock.Lock()
defer b.lock.Unlock()
b.nextWithdrawalBuilderIndex = index
b.markFieldAsDirty(types.NextWithdrawalBuilderIndex)
return nil
}
// DecreaseWithdrawalBalances applies withdrawal balance decreases for validators and builders.
// This method holds the state lock for the full batch to avoid lock churn.
func (b *BeaconState) DecreaseWithdrawalBalances(withdrawals []*enginev1.Withdrawal) error {
if b.version < version.Gloas {
return errNotSupported("DecreaseWithdrawalBalances", b.version)
}
if len(withdrawals) == 0 {
return nil
}
b.lock.Lock()
defer b.lock.Unlock()
var (
balanceIndices []uint64
builderIndices []uint64
)
for _, withdrawal := range withdrawals {
if withdrawal == nil {
return errors.New("withdrawal is nil")
}
if withdrawal.Amount == 0 {
continue
}
if withdrawal.ValidatorIndex.IsBuilderIndex() {
builderIndex := withdrawal.ValidatorIndex.ToBuilderIndex()
if err := b.decreaseBuilderBalanceLockFree(builderIndex, withdrawal.Amount); err != nil {
return err
}
builderIndices = append(builderIndices, uint64(builderIndex))
continue
}
balAtIdx, err := b.balanceAtIndex(withdrawal.ValidatorIndex)
if err != nil {
return err
}
newBal := decreaseBalanceWithVal(balAtIdx, withdrawal.Amount)
if err := b.balancesMultiValue.UpdateAt(b, uint64(withdrawal.ValidatorIndex), newBal); err != nil {
return fmt.Errorf("could not update balances: %w", err)
}
balanceIndices = append(balanceIndices, uint64(withdrawal.ValidatorIndex))
}
if len(balanceIndices) > 0 {
b.markFieldAsDirty(types.Balances)
b.addDirtyIndices(types.Balances, balanceIndices)
}
if len(builderIndices) > 0 {
b.markFieldAsDirty(types.Builders)
b.addDirtyIndices(types.Builders, builderIndices)
}
return nil
}
func (b *BeaconState) decreaseBuilderBalanceLockFree(builderIndex primitives.BuilderIndex, amount uint64) error {
idx := uint64(builderIndex)
if idx >= uint64(len(b.builders)) {
return fmt.Errorf("builder index %d out of range (len=%d)", builderIndex, len(b.builders))
}
if b.sharedFieldReferences[types.Builders].Refs() > 1 {
builders := make([]*ethpb.Builder, len(b.builders))
copy(builders, b.builders)
b.builders = builders
b.sharedFieldReferences[types.Builders].MinusRef()
b.sharedFieldReferences[types.Builders] = stateutil.NewRef(1)
}
builder := b.builders[idx]
bal := uint64(builder.Balance)
if amount >= bal {
builder.Balance = 0
} else {
builder.Balance = primitives.Gwei(bal - amount)
}
return nil
}
func decreaseBalanceWithVal(currBalance, delta uint64) uint64 {
if delta > currBalance {
return 0
}
return currBalance - delta
}

View File

@@ -8,7 +8,6 @@ import (
"github.com/OffchainLabs/prysm/v7/beacon-chain/state/stateutil"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
enginev1 "github.com/OffchainLabs/prysm/v7/proto/engine/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/testing/require"
@@ -230,235 +229,6 @@ func TestRotateBuilderPendingPayments_UnsupportedVersion(t *testing.T) {
require.ErrorContains(t, "RotateBuilderPendingPayments", err)
}
func TestSetPayloadExpectedWithdrawals(t *testing.T) {
t.Run("previous fork returns expected error", func(t *testing.T) {
st := &BeaconState{version: version.Fulu}
err := st.SetPayloadExpectedWithdrawals([]*enginev1.Withdrawal{})
require.ErrorContains(t, "SetPayloadExpectedWithdrawals", err)
})
t.Run("allows nil input and marks dirty", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
}
require.NoError(t, st.SetPayloadExpectedWithdrawals(nil))
require.Equal(t, true, st.payloadExpectedWithdrawals == nil)
require.Equal(t, true, st.dirtyFields[types.PayloadExpectedWithdrawals])
})
t.Run("sets and marks dirty", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
payloadExpectedWithdrawals: []*enginev1.Withdrawal{{Index: 1}, {Index: 2}},
}
withdrawals := []*enginev1.Withdrawal{{Index: 3}}
require.NoError(t, st.SetPayloadExpectedWithdrawals(withdrawals))
require.DeepEqual(t, withdrawals, st.payloadExpectedWithdrawals)
require.Equal(t, true, st.dirtyFields[types.PayloadExpectedWithdrawals])
})
}
func TestDecreaseWithdrawalBalances(t *testing.T) {
t.Run("previous fork returns expected error", func(t *testing.T) {
st := &BeaconState{version: version.Fulu}
err := st.DecreaseWithdrawalBalances([]*enginev1.Withdrawal{{}})
require.ErrorContains(t, "DecreaseWithdrawalBalances", err)
})
t.Run("rejects nil withdrawal", func(t *testing.T) {
st := &BeaconState{version: version.Gloas}
err := st.DecreaseWithdrawalBalances([]*enginev1.Withdrawal{nil})
require.ErrorContains(t, "withdrawal is nil", err)
})
t.Run("no-op on empty input", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
dirtyIndices: make(map[types.FieldIndex][]uint64),
rebuildTrie: make(map[types.FieldIndex]bool),
}
require.NoError(t, st.DecreaseWithdrawalBalances(nil))
require.Equal(t, 0, len(st.dirtyFields))
require.Equal(t, 0, len(st.dirtyIndices))
})
t.Run("updates validator and builder balances and tracks dirty indices", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
dirtyIndices: make(map[types.FieldIndex][]uint64),
rebuildTrie: make(map[types.FieldIndex]bool),
sharedFieldReferences: map[types.FieldIndex]*stateutil.Reference{
types.Builders: stateutil.NewRef(1),
},
balancesMultiValue: NewMultiValueBalances([]uint64{100, 200, 300}),
builders: []*ethpb.Builder{
{Balance: 1000},
{Balance: 50},
},
}
withdrawals := []*enginev1.Withdrawal{
{ValidatorIndex: primitives.ValidatorIndex(1), Amount: 20},
{ValidatorIndex: primitives.BuilderIndex(1).ToValidatorIndex(), Amount: 30},
{ValidatorIndex: primitives.ValidatorIndex(2), Amount: 400},
{ValidatorIndex: primitives.BuilderIndex(0).ToValidatorIndex(), Amount: 2000},
{ValidatorIndex: primitives.ValidatorIndex(0), Amount: 0},
}
require.NoError(t, st.DecreaseWithdrawalBalances(withdrawals))
require.DeepEqual(t, []uint64{100, 180, 0}, st.Balances())
require.Equal(t, primitives.Gwei(0), st.builders[0].Balance)
require.Equal(t, primitives.Gwei(20), st.builders[1].Balance)
require.Equal(t, true, st.dirtyFields[types.Balances])
require.Equal(t, true, st.dirtyFields[types.Builders])
require.DeepEqual(t, []uint64{1, 2}, st.dirtyIndices[types.Balances])
require.DeepEqual(t, []uint64{1, 0}, st.dirtyIndices[types.Builders])
})
t.Run("returns error on builder index out of range", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
dirtyIndices: make(map[types.FieldIndex][]uint64),
rebuildTrie: make(map[types.FieldIndex]bool),
sharedFieldReferences: map[types.FieldIndex]*stateutil.Reference{
types.Builders: stateutil.NewRef(1),
},
builders: []*ethpb.Builder{{Balance: 5}},
}
err := st.DecreaseWithdrawalBalances([]*enginev1.Withdrawal{
{ValidatorIndex: primitives.BuilderIndex(2).ToValidatorIndex(), Amount: 1},
})
require.ErrorContains(t, "out of range", err)
require.Equal(t, false, st.dirtyFields[types.Builders])
require.Equal(t, 0, len(st.dirtyIndices[types.Builders]))
})
}
func TestDequeueBuilderPendingWithdrawals(t *testing.T) {
t.Run("previous fork returns expected error", func(t *testing.T) {
st := &BeaconState{version: version.Fulu}
err := st.DequeueBuilderPendingWithdrawals(1)
require.ErrorContains(t, "DequeueBuilderPendingWithdrawals", err)
})
t.Run("returns error when dequeueing more than length", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
sharedFieldReferences: map[types.FieldIndex]*stateutil.Reference{
types.BuilderPendingWithdrawals: stateutil.NewRef(1),
},
builderPendingWithdrawals: []*ethpb.BuilderPendingWithdrawal{{Amount: 1}},
}
err := st.DequeueBuilderPendingWithdrawals(2)
require.ErrorContains(t, "cannot dequeue more builder withdrawals", err)
require.Equal(t, 1, len(st.builderPendingWithdrawals))
require.Equal(t, false, st.dirtyFields[types.BuilderPendingWithdrawals])
})
t.Run("no-op on zero", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
sharedFieldReferences: map[types.FieldIndex]*stateutil.Reference{
types.BuilderPendingWithdrawals: stateutil.NewRef(1),
},
builderPendingWithdrawals: []*ethpb.BuilderPendingWithdrawal{{Amount: 1}},
}
require.NoError(t, st.DequeueBuilderPendingWithdrawals(0))
require.Equal(t, 1, len(st.builderPendingWithdrawals))
require.Equal(t, false, st.dirtyFields[types.BuilderPendingWithdrawals])
require.Equal(t, false, st.rebuildTrie[types.BuilderPendingWithdrawals])
})
t.Run("dequeues and marks dirty", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
sharedFieldReferences: map[types.FieldIndex]*stateutil.Reference{
types.BuilderPendingWithdrawals: stateutil.NewRef(1),
},
builderPendingWithdrawals: []*ethpb.BuilderPendingWithdrawal{
{Amount: 1},
{Amount: 2},
{Amount: 3},
},
rebuildTrie: make(map[types.FieldIndex]bool),
}
require.NoError(t, st.DequeueBuilderPendingWithdrawals(2))
require.Equal(t, 1, len(st.builderPendingWithdrawals))
require.Equal(t, primitives.Gwei(3), st.builderPendingWithdrawals[0].Amount)
require.Equal(t, true, st.dirtyFields[types.BuilderPendingWithdrawals])
require.Equal(t, true, st.rebuildTrie[types.BuilderPendingWithdrawals])
})
t.Run("copy-on-write preserves shared state", func(t *testing.T) {
sharedRef := stateutil.NewRef(2)
sharedWithdrawals := []*ethpb.BuilderPendingWithdrawal{
{Amount: 1},
{Amount: 2},
{Amount: 3},
}
st1 := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
sharedFieldReferences: map[types.FieldIndex]*stateutil.Reference{
types.BuilderPendingWithdrawals: sharedRef,
},
builderPendingWithdrawals: sharedWithdrawals,
rebuildTrie: make(map[types.FieldIndex]bool),
}
st2 := &BeaconState{
sharedFieldReferences: map[types.FieldIndex]*stateutil.Reference{
types.BuilderPendingWithdrawals: sharedRef,
},
builderPendingWithdrawals: sharedWithdrawals,
}
require.NoError(t, st1.DequeueBuilderPendingWithdrawals(2))
require.Equal(t, primitives.Gwei(3), st1.builderPendingWithdrawals[0].Amount)
require.Equal(t, 3, len(st2.builderPendingWithdrawals))
require.Equal(t, primitives.Gwei(1), st2.builderPendingWithdrawals[0].Amount)
require.Equal(t, uint(1), st1.sharedFieldReferences[types.BuilderPendingWithdrawals].Refs())
require.Equal(t, uint(1), st2.sharedFieldReferences[types.BuilderPendingWithdrawals].Refs())
})
}
func TestSetNextWithdrawalBuilderIndex(t *testing.T) {
t.Run("previous fork returns expected error", func(t *testing.T) {
st := &BeaconState{version: version.Fulu}
err := st.SetNextWithdrawalBuilderIndex(1)
require.ErrorContains(t, "SetNextWithdrawalBuilderIndex", err)
})
t.Run("sets and marks dirty", func(t *testing.T) {
st := &BeaconState{
version: version.Gloas,
dirtyFields: make(map[types.FieldIndex]bool),
}
require.NoError(t, st.SetNextWithdrawalBuilderIndex(7))
require.Equal(t, primitives.BuilderIndex(7), st.nextWithdrawalBuilderIndex)
require.Equal(t, true, st.dirtyFields[types.NextWithdrawalBuilderIndex])
})
}
func TestAppendBuilderPendingWithdrawal_CopyOnWrite(t *testing.T) {
wd := &ethpb.BuilderPendingWithdrawal{
FeeRecipient: make([]byte, 20),

View File

@@ -1027,10 +1027,10 @@ func TestGetVerifyingStateEdgeCases(t *testing.T) {
sc: signatureCache,
sr: &mockStateByRooter{sbr: sbrErrorIfCalled(t)}, // Should not be called
hsp: &mockHeadStateProvider{
headRoot: parentRoot[:], // Same as parent
headSlot: 32, // Epoch 1
headState: fuluState.Copy(), // HeadState (not ReadOnly) for ProcessSlots
headStateReadOnly: nil, // Should not use ReadOnly path
headRoot: parentRoot[:], // Same as parent
headSlot: 32, // Epoch 1
headState: fuluState.Copy(), // HeadState (not ReadOnly) for ProcessSlots
headStateReadOnly: nil, // Should not use ReadOnly path
},
fc: &mockForkchoicer{
// Return same root for both to simulate same chain
@@ -1045,8 +1045,8 @@ func TestGetVerifyingStateEdgeCases(t *testing.T) {
// Wrap to detect HeadState call
originalHsp := initializer.shared.hsp.(*mockHeadStateProvider)
wrappedHsp := &mockHeadStateProvider{
headRoot: originalHsp.headRoot,
headSlot: originalHsp.headSlot,
headRoot: originalHsp.headRoot,
headSlot: originalHsp.headSlot,
headState: originalHsp.headState,
}
initializer.shared.hsp = &headStateCallTracker{

View File

@@ -0,0 +1,7 @@
### Changed
- gRPC fallback now matches rest api implementation and will also check and connect to only synced nodes.
### Removed
- gRPC resolver for load balancing, the new implementation matches rest api's so we should remove the resolver so it's handled the same way for consistency.

View File

@@ -0,0 +1,2 @@
### Added
- Close opened host in test helpers

View File

@@ -0,0 +1,3 @@
### Ignored
- Add handy documentation for SSZ Query package (`encoding/ssz/query`).

View File

@@ -1,2 +0,0 @@
### Added
- add modified process withdrawals for gloas

View File

@@ -0,0 +1,2 @@
### Ignored
- Run go fmt

View File

@@ -47,7 +47,6 @@ type BeaconChainConfig struct {
HysteresisQuotient uint64 `yaml:"HYSTERESIS_QUOTIENT" spec:"true"` // HysteresisQuotient defines the hysteresis quotient for effective balance calculations.
HysteresisDownwardMultiplier uint64 `yaml:"HYSTERESIS_DOWNWARD_MULTIPLIER" spec:"true"` // HysteresisDownwardMultiplier defines the hysteresis downward multiplier for effective balance calculations.
HysteresisUpwardMultiplier uint64 `yaml:"HYSTERESIS_UPWARD_MULTIPLIER" spec:"true"` // HysteresisUpwardMultiplier defines the hysteresis upward multiplier for effective balance calculations.
BuilderIndexFlag uint64 `yaml:"BUILDER_INDEX_FLAG" spec:"true"` // BuilderIndexFlag marks a ValidatorIndex as a BuilderIndex when the bit is set.
// Gwei value constants.
MinDepositAmount uint64 `yaml:"MIN_DEPOSIT_AMOUNT" spec:"true"` // MinDepositAmount is the minimum amount of Gwei a validator can send to the deposit contract at once (lower amounts will be reverted).
@@ -131,7 +130,6 @@ type BeaconChainConfig struct {
MaxWithdrawalsPerPayload uint64 `yaml:"MAX_WITHDRAWALS_PER_PAYLOAD" spec:"true"` // MaxWithdrawalsPerPayload defines the maximum number of withdrawals in a block.
MaxBlsToExecutionChanges uint64 `yaml:"MAX_BLS_TO_EXECUTION_CHANGES" spec:"true"` // MaxBlsToExecutionChanges defines the maximum number of BLS-to-execution-change objects in a block.
MaxValidatorsPerWithdrawalsSweep uint64 `yaml:"MAX_VALIDATORS_PER_WITHDRAWALS_SWEEP" spec:"true"` // MaxValidatorsPerWithdrawalsSweep bounds the size of the sweep searching for withdrawals per slot.
MaxBuildersPerWithdrawalsSweep uint64 `yaml:"MAX_BUILDERS_PER_WITHDRAWALS_SWEEP" spec:"true"` // MaxBuildersPerWithdrawalsSweep bounds the size of the builder withdrawals sweep per slot.
// BLS domain values.
DomainBeaconProposer [4]byte `yaml:"DOMAIN_BEACON_PROPOSER" spec:"true"` // DomainBeaconProposer defines the BLS signature domain for beacon proposal verification.

View File

@@ -194,8 +194,6 @@ func ConfigToYaml(cfg *BeaconChainConfig) []byte {
fmt.Sprintf("SHARD_COMMITTEE_PERIOD: %d", cfg.ShardCommitteePeriod),
fmt.Sprintf("MIN_VALIDATOR_WITHDRAWABILITY_DELAY: %d", cfg.MinValidatorWithdrawabilityDelay),
fmt.Sprintf("MAX_VALIDATORS_PER_WITHDRAWALS_SWEEP: %d", cfg.MaxValidatorsPerWithdrawalsSweep),
fmt.Sprintf("MAX_BUILDERS_PER_WITHDRAWALS_SWEEP: %d", cfg.MaxBuildersPerWithdrawalsSweep),
fmt.Sprintf("BUILDER_INDEX_FLAG: %d", cfg.BuilderIndexFlag),
fmt.Sprintf("MAX_SEED_LOOKAHEAD: %d", cfg.MaxSeedLookahead),
fmt.Sprintf("EJECTION_BALANCE: %d", cfg.EjectionBalance),
fmt.Sprintf("MIN_PER_EPOCH_CHURN_LIMIT: %d", cfg.MinPerEpochChurnLimit),

View File

@@ -85,7 +85,6 @@ var mainnetBeaconConfig = &BeaconChainConfig{
HysteresisQuotient: 4,
HysteresisDownwardMultiplier: 1,
HysteresisUpwardMultiplier: 5,
BuilderIndexFlag: 1099511627776,
// Gwei value constants.
MinDepositAmount: 1 * 1e9,
@@ -176,7 +175,6 @@ var mainnetBeaconConfig = &BeaconChainConfig{
MaxWithdrawalsPerPayload: 16,
MaxBlsToExecutionChanges: 16,
MaxValidatorsPerWithdrawalsSweep: 16384,
MaxBuildersPerWithdrawalsSweep: 16384,
// BLS domain values.
DomainBeaconProposer: bytesutil.Uint32ToBytes4(0x00000000),

View File

@@ -49,7 +49,6 @@ func compareConfigs(t *testing.T, expected, actual *params.BeaconChainConfig) {
require.DeepEqual(t, expected.HysteresisQuotient, actual.HysteresisQuotient)
require.DeepEqual(t, expected.HysteresisDownwardMultiplier, actual.HysteresisDownwardMultiplier)
require.DeepEqual(t, expected.HysteresisUpwardMultiplier, actual.HysteresisUpwardMultiplier)
require.DeepEqual(t, expected.BuilderIndexFlag, actual.BuilderIndexFlag)
require.DeepEqual(t, expected.MinDepositAmount, actual.MinDepositAmount)
require.DeepEqual(t, expected.MaxEffectiveBalance, actual.MaxEffectiveBalance)
require.DeepEqual(t, expected.EjectionBalance, actual.EjectionBalance)
@@ -95,7 +94,6 @@ func compareConfigs(t *testing.T, expected, actual *params.BeaconChainConfig) {
require.DeepEqual(t, expected.MaxDeposits, actual.MaxDeposits)
require.DeepEqual(t, expected.MaxVoluntaryExits, actual.MaxVoluntaryExits)
require.DeepEqual(t, expected.MaxWithdrawalsPerPayload, actual.MaxWithdrawalsPerPayload)
require.DeepEqual(t, expected.MaxBuildersPerWithdrawalsSweep, actual.MaxBuildersPerWithdrawalsSweep)
require.DeepEqual(t, expected.DomainBeaconProposer, actual.DomainBeaconProposer)
require.DeepEqual(t, expected.DomainRandao, actual.DomainRandao)
require.DeepEqual(t, expected.DomainBeaconAttester, actual.DomainBeaconAttester)

View File

@@ -13,16 +13,6 @@ var _ fssz.Unmarshaler = (*BuilderIndex)(nil)
// BuilderIndex is an index into the builder registry.
type BuilderIndex uint64
// ToValidatorIndex sets the builder flag on a builder index.
//
// Spec v1.6.1 (pseudocode):
// def convert_builder_index_to_validator_index(builder_index: BuilderIndex) -> ValidatorIndex:
//
// return ValidatorIndex(builder_index | BUILDER_INDEX_FLAG)
func (b BuilderIndex) ToValidatorIndex() ValidatorIndex {
return ValidatorIndex(uint64(b) | BuilderIndexFlag)
}
// HashTreeRoot returns the SSZ hash tree root of the index.
func (b BuilderIndex) HashTreeRoot() ([32]byte, error) {
return fssz.HashWithDefaultHasher(b)

View File

@@ -10,34 +10,9 @@ var _ fssz.HashRoot = (ValidatorIndex)(0)
var _ fssz.Marshaler = (*ValidatorIndex)(nil)
var _ fssz.Unmarshaler = (*ValidatorIndex)(nil)
// BuilderIndexFlag marks a ValidatorIndex as a BuilderIndex when the bit is set.
//
// Spec v1.6.1: BUILDER_INDEX_FLAG.
const BuilderIndexFlag uint64 = 1 << 40
// ValidatorIndex in eth2.
type ValidatorIndex uint64
// IsBuilderIndex returns true when the BuilderIndex flag is set on the validator index.
//
// Spec v1.6.1 (pseudocode):
// def is_builder_index(validator_index: ValidatorIndex) -> bool:
//
// return (validator_index & BUILDER_INDEX_FLAG) != 0
func (v ValidatorIndex) IsBuilderIndex() bool {
return uint64(v)&BuilderIndexFlag != 0
}
// ToBuilderIndex strips the builder flag from a validator index.
//
// Spec v1.6.1 (pseudocode):
// def convert_validator_index_to_builder_index(validator_index: ValidatorIndex) -> BuilderIndex:
//
// return BuilderIndex(validator_index & ~BUILDER_INDEX_FLAG)
func (v ValidatorIndex) ToBuilderIndex() BuilderIndex {
return BuilderIndex(uint64(v) & ^BuilderIndexFlag)
}
// Div divides validator index by x.
// This method panics if dividing by zero!
func (v ValidatorIndex) Div(x uint64) ValidatorIndex {

View File

@@ -33,32 +33,3 @@ func TestValidatorIndex_Casting(t *testing.T) {
}
})
}
func TestValidatorIndex_BuilderIndexFlagConversions(t *testing.T) {
base := uint64(42)
unflagged := ValidatorIndex(base)
if unflagged.IsBuilderIndex() {
t.Fatalf("expected unflagged validator index to not be a builder index")
}
if got, want := unflagged.ToBuilderIndex(), BuilderIndex(base); got != want {
t.Fatalf("unexpected builder index: got %d want %d", got, want)
}
flagged := ValidatorIndex(base | BuilderIndexFlag)
if !flagged.IsBuilderIndex() {
t.Fatalf("expected flagged validator index to be a builder index")
}
if got, want := flagged.ToBuilderIndex(), BuilderIndex(base); got != want {
t.Fatalf("unexpected builder index: got %d want %d", got, want)
}
builder := BuilderIndex(base)
roundTrip := builder.ToValidatorIndex()
if !roundTrip.IsBuilderIndex() {
t.Fatalf("expected round-tripped validator index to be a builder index")
}
if got, want := roundTrip.ToBuilderIndex(), builder; got != want {
t.Fatalf("unexpected round-trip builder index: got %d want %d", got, want)
}
}

190
encoding/ssz/query/doc.md Normal file
View File

@@ -0,0 +1,190 @@
# SSZ Query Package
The `encoding/ssz/query` package provides a system for analyzing and querying SSZ ([Simple Serialize](https://github.com/ethereum/consensus-specs/blob/master/ssz/simple-serialize.md)) data structures, as well as generating Merkle proofs from them. It enables runtime analysis of SSZ-serialized Go objects with reflection, path-based queries through nested structures, generalized index calculation, and Merkle proof generation.
This package is designed to be generic. It operates on arbitrary SSZ-serialized Go values at runtime, so the same query/proof machinery applies equally to any SSZ type, including the BeaconState/BeaconBlock.
## Usage Example
```go
// 1. Analyze an SSZ object
block := &ethpb.BeaconBlock{...}
info, err := query.AnalyzeObject(block)
// 2. Parse a path
path, err := query.ParsePath(".body.attestations[0].data.slot")
// 3. Get the generalized index
gindex, err := query.GetGeneralizedIndexFromPath(info, path)
// 4. Generate a Merkle proof
proof, err := info.Prove(gindex)
// 5. Get offset and length to slice the SSZ-encoded bytes
sszBytes, _ := block.MarshalSSZ()
_, offset, length, err := query.CalculateOffsetAndLength(info, path)
// slotBytes contains the SSZ-encoded value at the queried path
slotBytes := sszBytes[offset : offset+length]
```
## Exported API
The main exported API consists of:
```go
// AnalyzeObject analyzes an SSZ object and returns its structural information
func AnalyzeObject(obj SSZObject) (*SszInfo, error)
// ParsePath parses a path string like ".field1.field2[0].field3"
func ParsePath(rawPath string) (Path, error)
// CalculateOffsetAndLength computes byte offset and length for a path within an SSZ object
func CalculateOffsetAndLength(sszInfo *SszInfo, path Path) (*SszInfo, uint64, uint64, error)
// GetGeneralizedIndexFromPath calculates the generalized index for a given path
func GetGeneralizedIndexFromPath(info *SszInfo, path Path) (uint64, error)
// Prove generates a Merkle proof for a target generalized index
func (s *SszInfo) Prove(gindex uint64) (*fastssz.Proof, error)
```
## Type System
### SSZ Types
The package now supports [all standard SSZ types](https://github.com/ethereum/consensus-specs/blob/master/ssz/simple-serialize.md#typing) except `ProgressiveList`, `ProgressiveContainer`, `ProgressiveBitlist`, `Union`, and `CompatibleUnion`.
### Core Data Structures
#### `SszInfo`
The `SszInfo` structure contains complete structural metadata for an SSZ type:
```go
type SszInfo struct {
sszType SszType // SSZ Type classification
typ reflect.Type // Go reflect.Type
source SSZObject // Original SSZObject reference. Mostly used for reusing SSZ methods like `HashTreeRoot`.
isVariable bool // True if contains variable-size fields
// Composite types have corresponding metadata. Other fields would be nil except for the current type.
containerInfo *containerInfo
listInfo *listInfo
vectorInfo *vectorInfo
bitlistInfo *bitlistInfo
bitvectorInfo *bitvectorInfo
}
```
#### `Path`
The `Path` structure represents navigation paths through SSZ structures. It supports accessing a field by field name, accessing an element by index (list/vector type), and finding the length of homogenous collection types. The `ParsePath` function parses a raw string into a `Path` instance, which is commonly used in other APIs like `CalculateOffsetAndLength` and `GetGeneralizedIndexFromPath`.
```go
type Path struct {
Length bool // Flag for length queries (e.g., len(.field))
Elements []PathElement // Sequence of field accesses and indices
}
type PathElement struct {
Name string // Field name
Index *uint64 // list/vector index (nil if not an index access)
}
```
## Implementation Details
### Type Analysis (`analyzer.go`)
The `AnalyzeObject` function performs recursive type introspection using Go reflection:
1. **Type Inspection** - Examines Go `reflect.Value` to determine SSZ type
- Basic types (`uint8`, `uint16`, `uint32`, `uint64`, `bool`): `SSZType` constants
- Slices: Determined from struct tags (`ssz-size` for vectors, `ssz-max` for lists). There is a related [write-up](https://hackmd.io/@junsong/H101DKnwxl) regarding struct tags.
- Structs: Analyzed as Containers with field ordering from JSON tags
- Pointers: Dereferenced automatically
2. **Variable-Length Population** - Determines actual sizes at runtime
- For lists: Iterates elements, caches sizes for variable-element lists
- For containers: Recursively populates variable fields, adjusts offsets
- For bitlists: Decodes bit length from bitvector
3. **Offset Calculation** - Computes byte positions within serialized data
- Fixed-size fields: Offset = sum of preceding field sizes
- Variable-size fields: Offset stored as 4-byte pointer entries
### Path Parsing (`path.go`)
The `ParsePath` function parses path strings with the following rules:
- **Dot notation**: `.field1.field2` for field access
- **Array indexing**: `[0]`, `[42]` for element access
- **Length queries**: `len(.field)` for list/vector lengths
- **Character set**: Only `[A-Za-z0-9._\[\]\(\)]` allowed
Example:
```go
path, _ := ParsePath(".nested.array_field[5].inner_field")
// Returns: Path{
// Elements: [
// PathElement{Name: "nested"},
// PathElement{Name: "array_field", Index: <Pointer to uint64(5)>},
// PathElement{Name: "inner_field"}
// ]
// }
```
### Generalized Index Calculation (`generalized_index.go`)
The generalized index is a tree position identifier. This package follows the [Ethereum consensus-specs](https://github.com/ethereum/consensus-specs/blob/master/ssz/merkle-proofs.md#generalized-merkle-tree-index) to calculate the generalized index.
### Merkle Proof Generation (`merkle_proof.go`, `proof_collector.go`)
The `Prove` method generates Merkle proofs using a single-sweep merkleization algorithm:
#### Algorithm Overview
**Key Terms:**
- **Target gindex** (generalized index): The position of the SSZ element you want to prove, expressed as a generalized Merkle tree index. Stored in `Proof.Index`.
- Note: The generalized index for root is 1.
- **Registered gindices**: The set of tree positions whose node hashes must be captured during merkleization in order to later assemble the proof.
- **Sibling node**: The node that shares the same parent as another node.
- **Leaf value**: The 32-byte hash of the target node (the node being proven). Stored in `Proof.Leaf`.
**Phases:**
1. **Registration Phase** (`addTarget`)
> Goal: determine exactly which sibling hashes are needed for the proof.
- Record the target gindex as the proof target.
- Starting from the target node, walk the Merkle tree from the leaf (target gindex) to the root (gindex = 1).
- At each step:
- Compute and register the sibling gindex (`i XOR 1`) as “must collect”.
- Move to the parent (`i = i/2`).
- This produces the full set of registered gindices (the sibling nodes on the target-to-root path).
2. **Merkleization Phase** (`merkleize`)
> Goal: recursively merkleize the tree and capture the needed hashes.
- Recursively traverse the SSZ structure and compute Merkle tree node hashes from leaves to root.
- Whenever the traversal computes a node whose gindex is in registered gindices, store that nodes hash for later proof construction.
3. **Proof Assembly Phase** (`toProof`)
> Goal: create the final `fastssz.Proof` object in the correct format and order.
```go
// Proof represents a merkle proof against a general index.
type Proof struct {
Index int
Leaf []byte
Hashes [][]byte
}
```
- Set `Proof.Index` to the target gindex.
- Set `Proof.Leaf` to the 32-byte hash of the target node.
- Build `Proof.Hashes` by walking from the target node up to (but not including) the root:
- At node `i`, append the stored hash for the sibling (`i XOR 1`).
- Move to the parent (`i = i/2`).
- The resulting `Proof.Hashes` is ordered from the target level upward, containing one sibling hash per tree level on the path to the root.

View File

@@ -26,21 +26,21 @@ func TestLifecycle(t *testing.T) {
port := 1000 + rand.Intn(1000)
prometheusService := NewService(t.Context(), fmt.Sprintf(":%d", port), nil)
prometheusService.Start()
// Actively wait until the service responds on /metrics (faster and less flaky than a fixed sleep)
deadline := time.Now().Add(3 * time.Second)
for {
if time.Now().After(deadline) {
t.Fatalf("metrics endpoint not ready within timeout")
}
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
if err == nil {
_ = resp.Body.Close()
if resp.StatusCode == http.StatusOK {
break
}
}
time.Sleep(50 * time.Millisecond)
}
// Actively wait until the service responds on /metrics (faster and less flaky than a fixed sleep)
deadline := time.Now().Add(3 * time.Second)
for {
if time.Now().After(deadline) {
t.Fatalf("metrics endpoint not ready within timeout")
}
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
if err == nil {
_ = resp.Body.Close()
if resp.StatusCode == http.StatusOK {
break
}
}
time.Sleep(50 * time.Millisecond)
}
// Query the service to ensure it really started.
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
@@ -49,18 +49,18 @@ func TestLifecycle(t *testing.T) {
err = prometheusService.Stop()
require.NoError(t, err)
// Actively wait until the service stops responding on /metrics
deadline = time.Now().Add(3 * time.Second)
for {
if time.Now().After(deadline) {
t.Fatalf("metrics endpoint still reachable after timeout")
}
_, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
if err != nil {
break
}
time.Sleep(50 * time.Millisecond)
}
// Actively wait until the service stops responding on /metrics
deadline = time.Now().Add(3 * time.Second)
for {
if time.Now().After(deadline) {
t.Fatalf("metrics endpoint still reachable after timeout")
}
_, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))
if err != nil {
break
}
time.Sleep(50 * time.Millisecond)
}
// Query the service to ensure it really stopped.
_, err = http.Get(fmt.Sprintf("http://localhost:%d/metrics", port))

View File

@@ -225,9 +225,9 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
if err := helpers.ComponentsStarted(ctx, []e2etypes.ComponentRunner{r.depositor}); err != nil {
return errors.Wrap(err, "testDepositsAndTx unable to run, depositor did not Start")
}
go func() {
if r.config.TestDeposits {
log.Info("Running deposit tests")
go func() {
if r.config.TestDeposits {
log.Info("Running deposit tests")
// The validators with an index < minGenesisActiveCount all have deposits already from the chain start.
// Skip all of those chain start validators by seeking to minGenesisActiveCount in the validator list
// for further deposit testing.
@@ -238,12 +238,12 @@ func (r *testRunner) testDepositsAndTx(ctx context.Context, g *errgroup.Group,
r.t.Error(errors.Wrap(err, "depositor.SendAndMine failed"))
}
}
}
// Only generate background transactions when relevant for the test.
if r.config.TestDeposits || r.config.TestFeature || r.config.UseBuilder {
r.testTxGeneration(ctx, g, keystorePath, []e2etypes.ComponentRunner{})
}
}()
}
// Only generate background transactions when relevant for the test.
if r.config.TestDeposits || r.config.TestFeature || r.config.UseBuilder {
r.testTxGeneration(ctx, g, keystorePath, []e2etypes.ComponentRunner{})
}
}()
if r.config.TestDeposits {
return depositCheckValidator.Start(ctx)
}

View File

@@ -38,8 +38,8 @@ func TestEndToEnd_MinimalConfig(t *testing.T) {
r := e2eMinimal(t, cfg,
types.WithCheckpointSync(),
types.WithEpochs(10),
types.WithExitEpoch(4), // Minimum due to ShardCommitteePeriod=4
types.WithLargeBlobs(), // Use large blob transactions for BPO testing
types.WithExitEpoch(4), // Minimum due to ShardCommitteePeriod=4
types.WithLargeBlobs(), // Use large blob transactions for BPO testing
)
r.run()
}
}

View File

@@ -204,7 +204,6 @@ go_test(
"gloas__operations__execution_payload_header_test.go",
"gloas__operations__payload_attestation_test.go",
"gloas__operations__proposer_slashing_test.go",
"gloas__operations__withdrawals_test.go",
"gloas__sanity__slots_test.go",
"gloas__ssz_static__ssz_static_test.go",
"phase0__epoch_processing__effective_balance_updates_test.go",

View File

@@ -1,11 +0,0 @@
package mainnet
import (
"testing"
"github.com/OffchainLabs/prysm/v7/testing/spectest/shared/gloas/operations"
)
func TestMainnet_Gloas_Operations_Withdrawals(t *testing.T) {
operations.RunWithdrawalsTest(t, "mainnet")
}

View File

@@ -210,7 +210,6 @@ go_test(
"gloas__operations__execution_payload_bid_test.go",
"gloas__operations__payload_attestation_test.go",
"gloas__operations__proposer_slashing_test.go",
"gloas__operations__withdrawals_test.go",
"gloas__sanity__slots_test.go",
"gloas__ssz_static__ssz_static_test.go",
"phase0__epoch_processing__effective_balance_updates_test.go",

View File

@@ -1,11 +0,0 @@
package minimal
import (
"testing"
"github.com/OffchainLabs/prysm/v7/testing/spectest/shared/gloas/operations"
)
func TestMinimal_Gloas_Operations_Withdrawals(t *testing.T) {
operations.RunWithdrawalsTest(t, "minimal")
}

View File

@@ -8,20 +8,16 @@ go_library(
"helpers.go",
"payload_attestation.go",
"proposer_slashing.go",
"withdrawals.go",
],
importpath = "github.com/OffchainLabs/prysm/v7/testing/spectest/shared/gloas/operations",
visibility = ["//visibility:public"],
deps = [
"//beacon-chain/core/gloas:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/state/state-native:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime/version:go_default_library",
"//testing/require:go_default_library",
"//testing/spectest/shared/common/operations:go_default_library",
"//testing/spectest/utils:go_default_library",
],
)

View File

@@ -1,48 +0,0 @@
package operations
import (
"context"
"path"
"testing"
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/gloas"
"github.com/OffchainLabs/prysm/v7/beacon-chain/state"
"github.com/OffchainLabs/prysm/v7/consensus-types/blocks"
"github.com/OffchainLabs/prysm/v7/consensus-types/interfaces"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/testing/require"
common "github.com/OffchainLabs/prysm/v7/testing/spectest/shared/common/operations"
"github.com/OffchainLabs/prysm/v7/testing/spectest/utils"
)
func emptyBlockGloas() (interfaces.SignedBeaconBlock, error) {
b := &ethpb.SignedBeaconBlockGloas{
Block: &ethpb.BeaconBlockGloas{
Body: &ethpb.BeaconBlockBodyGloas{},
},
}
return blocks.NewSignedBeaconBlock(b)
}
func RunWithdrawalsTest(t *testing.T, config string) {
require.NoError(t, utils.SetConfig(t, config))
testFolders, testsFolderPath := utils.TestFolders(t, config, version.String(version.Gloas), "operations/withdrawals/pyspec_tests")
if len(testFolders) == 0 {
t.Fatalf("No test folders found for %s/%s/%s", config, version.String(version.Gloas), "operations/withdrawals/pyspec_tests")
}
for _, folder := range testFolders {
t.Run(folder.Name(), func(t *testing.T) {
folderPath := path.Join(testsFolderPath, folder.Name())
blk, err := emptyBlockGloas()
require.NoError(t, err)
common.RunBlockOperationTest(t, folderPath, blk, sszToState, func(_ context.Context, s state.BeaconState, _ interfaces.ReadOnlySignedBeaconBlock) (state.BeaconState, error) {
if err := gloas.ProcessWithdrawals(s); err != nil {
return nil, err
}
return s, nil
})
})
}
}

View File

@@ -283,16 +283,16 @@ func (mr *MockValidatorClientMockRecorder) ProposeExit(ctx, in any) *gomock.Call
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProposeExit", reflect.TypeOf((*MockValidatorClient)(nil).ProposeExit), ctx, in)
}
// SetHost mocks base method.
func (m *MockValidatorClient) SetHost(host string) {
// SwitchHost mocks base method.
func (m *MockValidatorClient) SwitchHost(host string) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetHost", host)
m.ctrl.Call(m, "SwitchHost", host)
}
// SetHost indicates an expected call of SetHost.
func (mr *MockValidatorClientMockRecorder) SetHost(host any) *gomock.Call {
// SwitchHost indicates an expected call of SwitchHost.
func (mr *MockValidatorClientMockRecorder) SwitchHost(host any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHost", reflect.TypeOf((*MockValidatorClient)(nil).SetHost), host)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SwitchHost", reflect.TypeOf((*MockValidatorClient)(nil).SwitchHost), host)
}
// StartEventStream mocks base method.

View File

@@ -25,6 +25,7 @@ go_library(
],
deps = [
"//api/grpc:go_default_library",
"//api/rest:go_default_library",
"//beacon-chain/core/blocks:go_default_library",
"//cmd/validator/flags:go_default_library",
"//config/fieldparams:go_default_library",

View File

@@ -3,14 +3,13 @@ package accounts
import (
"context"
"io"
"net/http"
"os"
"time"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/crypto/bls"
"github.com/OffchainLabs/prysm/v7/validator/accounts/wallet"
beaconApi "github.com/OffchainLabs/prysm/v7/validator/client/beacon-api"
iface "github.com/OffchainLabs/prysm/v7/validator/client/iface"
nodeClientFactory "github.com/OffchainLabs/prysm/v7/validator/client/node-client-factory"
validatorClientFactory "github.com/OffchainLabs/prysm/v7/validator/client/validator-client-factory"
@@ -77,22 +76,17 @@ func (acm *CLIManager) prepareBeaconClients(ctx context.Context) (*iface.Validat
}
ctx = grpcutil.AppendHeaders(ctx, acm.grpcHeaders)
grpcConn, err := grpc.DialContext(ctx, acm.beaconRPCProvider, acm.dialOpts...)
if err != nil {
return nil, nil, errors.Wrapf(err, "could not dial endpoint %s", acm.beaconRPCProvider)
}
conn := validatorHelpers.NewNodeConnection(
grpcConn,
acm.beaconApiEndpoint,
validatorHelpers.WithBeaconApiTimeout(acm.beaconApiTimeout),
)
restHandler := beaconApi.NewBeaconApiRestHandler(
http.Client{Timeout: acm.beaconApiTimeout},
acm.beaconApiEndpoint,
conn, err := validatorHelpers.NewNodeConnection(
validatorHelpers.WithGRPC(ctx, acm.beaconRPCProvider, acm.dialOpts),
validatorHelpers.WithREST(acm.beaconApiEndpoint, rest.WithHttpTimeout(acm.beaconApiTimeout)),
)
validatorClient := validatorClientFactory.NewValidatorClient(conn, restHandler)
nodeClient := nodeClientFactory.NewNodeClient(conn, restHandler)
if err != nil {
return nil, nil, err
}
validatorClient := validatorClientFactory.NewValidatorClient(conn)
nodeClient := nodeClientFactory.NewNodeClient(conn)
return &validatorClient, &nodeClient, nil
}

View File

@@ -10,7 +10,6 @@ go_library(
"log.go",
"log_helpers.go",
"metrics.go",
"multiple_endpoints_grpc_resolver.go",
"propose.go",
"registration.go",
"runner.go",
@@ -29,6 +28,7 @@ go_library(
"//api/client:go_default_library",
"//api/client/event:go_default_library",
"//api/grpc:go_default_library",
"//api/rest:go_default_library",
"//api/server/structs:go_default_library",
"//async:go_default_library",
"//async/event:go_default_library",
@@ -58,7 +58,6 @@ go_library(
"//time/slots:go_default_library",
"//validator/accounts/iface:go_default_library",
"//validator/accounts/wallet:go_default_library",
"//validator/client/beacon-api:go_default_library",
"//validator/client/beacon-chain-client-factory:go_default_library",
"//validator/client/iface:go_default_library",
"//validator/client/node-client-factory:go_default_library",
@@ -86,13 +85,11 @@ go_library(
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opentelemetry_go_contrib_instrumentation_google_golang_org_grpc_otelgrpc//:go_default_library",
"@io_opentelemetry_go_contrib_instrumentation_net_http_otelhttp//:go_default_library",
"@io_opentelemetry_go_otel_trace//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//credentials:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
"@org_golang_google_grpc//resolver:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
"@org_golang_google_protobuf//types/known/emptypb:go_default_library",
@@ -124,6 +121,8 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//api/grpc:go_default_library",
"//api/rest:go_default_library",
"//api/server/structs:go_default_library",
"//async/event:go_default_library",
"//beacon-chain/core/signing:go_default_library",

View File

@@ -26,7 +26,6 @@ go_library(
"propose_exit.go",
"prysm_beacon_chain_client.go",
"registration.go",
"rest_handler_client.go",
"state_validators.go",
"status.go",
"stream_blocks.go",
@@ -43,6 +42,7 @@ go_library(
"//api:go_default_library",
"//api/apiutil:go_default_library",
"//api/client/event:go_default_library",
"//api/rest:go_default_library",
"//api/server/structs:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
@@ -111,6 +111,7 @@ go_test(
deps = [
"//api:go_default_library",
"//api/apiutil:go_default_library",
"//api/rest:go_default_library",
"//api/server/structs:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/rpc/eth/shared/testing:go_default_library",

View File

@@ -5,6 +5,7 @@ import (
"reflect"
"strconv"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
@@ -17,7 +18,7 @@ import (
type beaconApiChainClient struct {
fallbackClient iface.ChainClient
jsonRestHandler RestHandler
jsonRestHandler rest.RestHandler
stateValidatorsProvider StateValidatorsProvider
}
@@ -327,7 +328,7 @@ func (c beaconApiChainClient) ValidatorParticipation(ctx context.Context, in *et
return nil, errors.New("beaconApiChainClient.ValidatorParticipation is not implemented. To use a fallback client, pass a fallback client as the last argument of NewBeaconApiChainClientWithFallback.")
}
func NewBeaconApiChainClientWithFallback(jsonRestHandler RestHandler, fallbackClient iface.ChainClient) iface.ChainClient {
func NewBeaconApiChainClientWithFallback(jsonRestHandler rest.RestHandler, fallbackClient iface.ChainClient) iface.ChainClient {
return &beaconApiChainClient{
jsonRestHandler: jsonRestHandler,
fallbackClient: fallbackClient,

View File

@@ -5,6 +5,7 @@ import (
"net/http"
"strconv"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
@@ -20,7 +21,7 @@ var (
type beaconApiNodeClient struct {
fallbackClient iface.NodeClient
jsonRestHandler RestHandler
jsonRestHandler rest.RestHandler
genesisProvider GenesisProvider
}
@@ -115,7 +116,7 @@ func (c *beaconApiNodeClient) IsReady(ctx context.Context) bool {
return statusCode == http.StatusOK
}
func NewNodeClientWithFallback(jsonRestHandler RestHandler, fallbackClient iface.NodeClient) iface.NodeClient {
func NewNodeClientWithFallback(jsonRestHandler rest.RestHandler, fallbackClient iface.NodeClient) iface.NodeClient {
b := &beaconApiNodeClient{
jsonRestHandler: jsonRestHandler,
fallbackClient: fallbackClient,

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/OffchainLabs/prysm/v7/api/client/event"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
@@ -22,13 +23,13 @@ type beaconApiValidatorClient struct {
genesisProvider GenesisProvider
dutiesProvider dutiesProvider
stateValidatorsProvider StateValidatorsProvider
jsonRestHandler RestHandler
jsonRestHandler rest.RestHandler
beaconBlockConverter BeaconBlockConverter
prysmChainClient iface.PrysmChainClient
isEventStreamRunning bool
}
func NewBeaconApiValidatorClient(jsonRestHandler RestHandler, opts ...ValidatorClientOpt) iface.ValidatorClient {
func NewBeaconApiValidatorClient(jsonRestHandler rest.RestHandler, opts ...ValidatorClientOpt) iface.ValidatorClient {
c := &beaconApiValidatorClient{
genesisProvider: &beaconApiGenesisProvider{jsonRestHandler: jsonRestHandler},
dutiesProvider: beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler},
@@ -331,6 +332,6 @@ func (c *beaconApiValidatorClient) Host() string {
return c.jsonRestHandler.Host()
}
func (c *beaconApiValidatorClient) SetHost(host string) {
c.jsonRestHandler.SetHost(host)
func (c *beaconApiValidatorClient) SwitchHost(host string) {
c.jsonRestHandler.SwitchHost(host)
}

View File

@@ -549,7 +549,7 @@ func TestBeaconApiValidatorClient_Host(t *testing.T) {
hosts := []string{"http://localhost:8080", "http://localhost:8081"}
jsonRestHandler := mock.NewMockJsonRestHandler(ctrl)
jsonRestHandler.EXPECT().SetHost(
jsonRestHandler.EXPECT().SwitchHost(
hosts[0],
).Times(1)
jsonRestHandler.EXPECT().Host().Return(
@@ -557,17 +557,17 @@ func TestBeaconApiValidatorClient_Host(t *testing.T) {
).Times(1)
validatorClient := beaconApiValidatorClient{jsonRestHandler: jsonRestHandler}
validatorClient.SetHost(hosts[0])
validatorClient.SwitchHost(hosts[0])
host := validatorClient.Host()
require.Equal(t, hosts[0], host)
jsonRestHandler.EXPECT().SetHost(
jsonRestHandler.EXPECT().SwitchHost(
hosts[1],
).Times(1)
jsonRestHandler.EXPECT().Host().Return(
hosts[1],
).Times(1)
validatorClient.SetHost(hosts[1])
validatorClient.SwitchHost(hosts[1])
host = validatorClient.Host()
require.Equal(t, hosts[1], host)
}

View File

@@ -9,6 +9,7 @@ import (
"strconv"
"github.com/OffchainLabs/prysm/v7/api/apiutil"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
@@ -27,7 +28,7 @@ type dutiesProvider interface {
}
type beaconApiDutiesProvider struct {
jsonRestHandler RestHandler
jsonRestHandler rest.RestHandler
}
type attesterDuty struct {

View File

@@ -7,6 +7,7 @@ import (
"sync"
"time"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/encoding/bytesutil"
@@ -20,7 +21,7 @@ type GenesisProvider interface {
}
type beaconApiGenesisProvider struct {
jsonRestHandler RestHandler
jsonRestHandler rest.RestHandler
genesis *structs.Genesis
once sync.Once
}

View File

@@ -150,14 +150,14 @@ func (mr *MockRestHandlerMockRecorder) PostSSZ(ctx, endpoint, headers, data any)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PostSSZ", reflect.TypeOf((*MockRestHandler)(nil).PostSSZ), ctx, endpoint, headers, data)
}
// SetHost mocks base method.
func (m *MockRestHandler) SetHost(host string) {
// SwitchHost mocks base method.
func (m *MockRestHandler) SwitchHost(host string) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetHost", host)
m.ctrl.Call(m, "SwitchHost", host)
}
// SetHost indicates an expected call of SetHost.
func (mr *MockRestHandlerMockRecorder) SetHost(host any) *gomock.Call {
// SwitchHost indicates an expected call of SwitchHost.
func (mr *MockRestHandlerMockRecorder) SwitchHost(host any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetHost", reflect.TypeOf((*MockRestHandler)(nil).SetHost), host)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SwitchHost", reflect.TypeOf((*MockRestHandler)(nil).SwitchHost), host)
}

View File

@@ -10,6 +10,7 @@ import (
"strings"
"github.com/OffchainLabs/prysm/v7/api/apiutil"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
validator2 "github.com/OffchainLabs/prysm/v7/consensus-types/validator"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
@@ -18,7 +19,7 @@ import (
)
// NewPrysmChainClient returns implementation of iface.PrysmChainClient.
func NewPrysmChainClient(jsonRestHandler RestHandler, nodeClient iface.NodeClient) iface.PrysmChainClient {
func NewPrysmChainClient(jsonRestHandler rest.RestHandler, nodeClient iface.NodeClient) iface.PrysmChainClient {
return prysmChainClient{
jsonRestHandler: jsonRestHandler,
nodeClient: nodeClient,
@@ -26,7 +27,7 @@ func NewPrysmChainClient(jsonRestHandler RestHandler, nodeClient iface.NodeClien
}
type prysmChainClient struct {
jsonRestHandler RestHandler
jsonRestHandler rest.RestHandler
nodeClient iface.NodeClient
}

View File

@@ -12,13 +12,12 @@ import (
"time"
"github.com/OffchainLabs/prysm/v7/api"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/config/params"
"github.com/OffchainLabs/prysm/v7/network/httputil"
"github.com/OffchainLabs/prysm/v7/runtime/version"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
)
@@ -45,10 +44,7 @@ func TestGet(t *testing.T) {
server := httptest.NewServer(mux)
defer server.Close()
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Second * 5},
host: server.URL,
}
jsonRestHandler := rest.NewRestHandler(http.Client{Timeout: time.Second * 5}, server.URL)
resp := &structs.GetGenesisResponse{}
require.NoError(t, jsonRestHandler.Get(ctx, endpoint+"?arg1=abc&arg2=def", resp))
assert.DeepEqual(t, genesisJson, resp)
@@ -79,10 +75,7 @@ func TestGetSSZ(t *testing.T) {
server := httptest.NewServer(mux)
defer server.Close()
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Second * 5},
host: server.URL,
}
jsonRestHandler := rest.NewRestHandler(http.Client{Timeout: time.Second * 5}, server.URL)
body, header, err := jsonRestHandler.GetSSZ(ctx, endpoint)
require.NoError(t, err)
@@ -108,10 +101,7 @@ func TestGetSSZ(t *testing.T) {
server := httptest.NewServer(mux)
defer server.Close()
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Second * 5},
host: server.URL,
}
jsonRestHandler := rest.NewRestHandler(http.Client{Timeout: time.Second * 5}, server.URL)
body, header, err := jsonRestHandler.GetSSZ(ctx, endpoint)
require.NoError(t, err)
@@ -136,10 +126,7 @@ func TestGetSSZ(t *testing.T) {
server := httptest.NewServer(mux)
defer server.Close()
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Second * 5},
host: server.URL,
}
jsonRestHandler := rest.NewRestHandler(http.Client{Timeout: time.Second * 5}, server.URL)
_, _, err := jsonRestHandler.GetSSZ(ctx, endpoint)
require.NoError(t, err)
@@ -161,7 +148,7 @@ func TestAcceptOverrideSSZ(t *testing.T) {
require.NoError(t, err)
}))
defer srv.Close()
c := NewBeaconApiRestHandler(http.Client{Timeout: time.Second * 5}, srv.URL)
c := rest.NewRestHandler(http.Client{Timeout: time.Second * 5}, srv.URL)
_, _, err := c.GetSSZ(t.Context(), "/test")
require.NoError(t, err)
}
@@ -204,162 +191,12 @@ func TestPost(t *testing.T) {
server := httptest.NewServer(mux)
defer server.Close()
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Second * 5},
host: server.URL,
}
jsonRestHandler := rest.NewRestHandler(http.Client{Timeout: time.Second * 5}, server.URL)
resp := &structs.GetGenesisResponse{}
require.NoError(t, jsonRestHandler.Post(ctx, endpoint, headers, bytes.NewBuffer(dataBytes), resp))
assert.DeepEqual(t, genesisJson, resp)
}
func Test_decodeResp(t *testing.T) {
type j struct {
Foo string `json:"foo"`
}
t.Run("200 JSON with charset", func(t *testing.T) {
body := bytes.Buffer{}
r := &http.Response{
Status: "200",
StatusCode: http.StatusOK,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {"application/json; charset=utf-8"}},
}
require.NoError(t, decodeResp(r, nil))
})
t.Run("200 non-JSON", func(t *testing.T) {
body := bytes.Buffer{}
r := &http.Response{
Status: "200",
StatusCode: http.StatusOK,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.OctetStreamMediaType}},
}
require.NoError(t, decodeResp(r, nil))
})
t.Run("204 non-JSON", func(t *testing.T) {
body := bytes.Buffer{}
r := &http.Response{
Status: "204",
StatusCode: http.StatusNoContent,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.OctetStreamMediaType}},
}
require.NoError(t, decodeResp(r, nil))
})
t.Run("500 non-JSON", func(t *testing.T) {
body := bytes.Buffer{}
_, err := body.WriteString("foo")
require.NoError(t, err)
r := &http.Response{
Status: "500",
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.OctetStreamMediaType}},
}
err = decodeResp(r, nil)
errJson := &httputil.DefaultJsonError{}
require.Equal(t, true, errors.As(err, &errJson))
assert.Equal(t, http.StatusInternalServerError, errJson.Code)
assert.Equal(t, "foo", errJson.Message)
})
t.Run("200 JSON with resp", func(t *testing.T) {
body := bytes.Buffer{}
b, err := json.Marshal(&j{Foo: "foo"})
require.NoError(t, err)
body.Write(b)
r := &http.Response{
Status: "200",
StatusCode: http.StatusOK,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
}
resp := &j{}
require.NoError(t, decodeResp(r, resp))
assert.Equal(t, "foo", resp.Foo)
})
t.Run("200 JSON without resp", func(t *testing.T) {
body := bytes.Buffer{}
r := &http.Response{
Status: "200",
StatusCode: http.StatusOK,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
}
require.NoError(t, decodeResp(r, nil))
})
t.Run("204 JSON", func(t *testing.T) {
body := bytes.Buffer{}
r := &http.Response{
Status: "204",
StatusCode: http.StatusNoContent,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
}
require.NoError(t, decodeResp(r, nil))
})
t.Run("500 JSON", func(t *testing.T) {
body := bytes.Buffer{}
b, err := json.Marshal(&httputil.DefaultJsonError{Code: http.StatusInternalServerError, Message: "error"})
require.NoError(t, err)
body.Write(b)
r := &http.Response{
Status: "500",
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
}
err = decodeResp(r, nil)
errJson := &httputil.DefaultJsonError{}
require.Equal(t, true, errors.As(err, &errJson))
assert.Equal(t, http.StatusInternalServerError, errJson.Code)
assert.Equal(t, "error", errJson.Message)
})
t.Run("200 JSON cannot decode", func(t *testing.T) {
body := bytes.Buffer{}
_, err := body.WriteString("foo")
require.NoError(t, err)
r := &http.Response{
Status: "200",
StatusCode: http.StatusOK,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
Request: &http.Request{},
}
resp := &j{}
err = decodeResp(r, resp)
assert.ErrorContains(t, "failed to decode response body into json", err)
})
t.Run("500 JSON cannot decode", func(t *testing.T) {
body := bytes.Buffer{}
_, err := body.WriteString("foo")
require.NoError(t, err)
r := &http.Response{
Status: "500",
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {api.JsonMediaType}},
Request: &http.Request{},
}
err = decodeResp(r, nil)
assert.ErrorContains(t, "failed to decode response body into error json", err)
})
t.Run("500 not JSON", func(t *testing.T) {
body := bytes.Buffer{}
_, err := body.WriteString("foo")
require.NoError(t, err)
r := &http.Response{
Status: "500",
StatusCode: http.StatusInternalServerError,
Body: io.NopCloser(&body),
Header: map[string][]string{"Content-Type": {"text/plain"}},
Request: &http.Request{},
}
err = decodeResp(r, nil)
assert.ErrorContains(t, "HTTP request unsuccessful (500: foo)", err)
})
}
func TestGetStatusCode(t *testing.T) {
ctx := t.Context()
const endpoint = "/eth/v1/node/health"
@@ -401,10 +238,7 @@ func TestGetStatusCode(t *testing.T) {
server := httptest.NewServer(mux)
defer server.Close()
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Second * 5},
host: server.URL,
}
jsonRestHandler := rest.NewRestHandler(http.Client{Timeout: time.Second * 5}, server.URL)
statusCode, err := jsonRestHandler.GetStatusCode(ctx, endpoint)
require.NoError(t, err)
@@ -413,10 +247,7 @@ func TestGetStatusCode(t *testing.T) {
}
t.Run("returns error on connection failure", func(t *testing.T) {
jsonRestHandler := BeaconApiRestHandler{
client: http.Client{Timeout: time.Millisecond * 100},
host: "http://localhost:99999", // Invalid port
}
jsonRestHandler := rest.NewRestHandler(http.Client{Timeout: time.Millisecond * 100}, "http://localhost:99999")
_, err := jsonRestHandler.GetStatusCode(ctx, endpoint)
require.ErrorContains(t, "failed to perform request", err)

View File

@@ -9,6 +9,7 @@ import (
"strconv"
"github.com/OffchainLabs/prysm/v7/api/apiutil"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
"github.com/pkg/errors"
@@ -21,7 +22,7 @@ type StateValidatorsProvider interface {
}
type beaconApiStateValidatorsProvider struct {
jsonRestHandler RestHandler
jsonRestHandler rest.RestHandler
}
func (c beaconApiStateValidatorsProvider) StateValidators(

View File

@@ -9,19 +9,17 @@ import (
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
)
func NewChainClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.ChainClient {
grpcClient := grpcApi.NewGrpcChainClient(validatorConn.GetGrpcClientConn())
func NewChainClient(validatorConn validatorHelpers.NodeConnection) iface.ChainClient {
grpcClient := grpcApi.NewGrpcChainClient(validatorConn)
if features.Get().EnableBeaconRESTApi {
return beaconApi.NewBeaconApiChainClientWithFallback(jsonRestHandler, grpcClient)
} else {
return grpcClient
return beaconApi.NewBeaconApiChainClientWithFallback(validatorConn.GetRestHandler(), grpcClient)
}
return grpcClient
}
func NewPrysmChainClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.PrysmChainClient {
func NewPrysmChainClient(validatorConn validatorHelpers.NodeConnection) iface.PrysmChainClient {
if features.Get().EnableBeaconRESTApi {
return beaconApi.NewPrysmChainClient(jsonRestHandler, nodeClientFactory.NewNodeClient(validatorConn, jsonRestHandler))
} else {
return grpcApi.NewGrpcPrysmChainClient(validatorConn.GetGrpcClientConn())
return beaconApi.NewPrysmChainClient(validatorConn.GetRestHandler(), nodeClientFactory.NewNodeClient(validatorConn))
}
return grpcApi.NewGrpcPrysmChainClient(validatorConn)
}

View File

@@ -4,6 +4,7 @@ go_library(
name = "go_default_library",
srcs = [
"grpc_beacon_chain_client.go",
"grpc_client_manager.go",
"grpc_node_client.go",
"grpc_prysm_beacon_chain_client.go",
"grpc_validator_client.go",
@@ -25,6 +26,7 @@ go_library(
"//proto/eth/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//validator/client/iface:go_default_library",
"//validator/helpers:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_golang_protobuf//ptypes/empty",
"@com_github_pkg_errors//:go_default_library",
@@ -39,6 +41,8 @@ go_test(
name = "go_default_test",
size = "small",
srcs = [
"grpc_client_manager_test.go",
"grpc_node_client_test.go",
"grpc_prysm_beacon_chain_client_test.go",
"grpc_validator_client_test.go",
],
@@ -56,7 +60,11 @@ go_test(
"//testing/util:go_default_library",
"//testing/validator-mock:go_default_library",
"//validator/client/iface:go_default_library",
"//validator/helpers:go_default_library",
"//validator/testing:go_default_library",
"@com_github_golang_protobuf//ptypes/empty",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_protobuf//types/known/emptypb:go_default_library",
"@org_uber_go_mock//gomock:go_default_library",
],

View File

@@ -5,38 +5,42 @@ import (
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"
)
type grpcChainClient struct {
beaconChainClient ethpb.BeaconChainClient
*grpcClientManager[ethpb.BeaconChainClient]
}
func (c *grpcChainClient) ChainHead(ctx context.Context, in *empty.Empty) (*ethpb.ChainHead, error) {
return c.beaconChainClient.GetChainHead(ctx, in)
return c.getClient().GetChainHead(ctx, in)
}
func (c *grpcChainClient) ValidatorBalances(ctx context.Context, in *ethpb.ListValidatorBalancesRequest) (*ethpb.ValidatorBalances, error) {
return c.beaconChainClient.ListValidatorBalances(ctx, in)
return c.getClient().ListValidatorBalances(ctx, in)
}
func (c *grpcChainClient) Validators(ctx context.Context, in *ethpb.ListValidatorsRequest) (*ethpb.Validators, error) {
return c.beaconChainClient.ListValidators(ctx, in)
return c.getClient().ListValidators(ctx, in)
}
func (c *grpcChainClient) ValidatorQueue(ctx context.Context, in *empty.Empty) (*ethpb.ValidatorQueue, error) {
return c.beaconChainClient.GetValidatorQueue(ctx, in)
return c.getClient().GetValidatorQueue(ctx, in)
}
func (c *grpcChainClient) ValidatorPerformance(ctx context.Context, in *ethpb.ValidatorPerformanceRequest) (*ethpb.ValidatorPerformanceResponse, error) {
return c.beaconChainClient.GetValidatorPerformance(ctx, in)
return c.getClient().GetValidatorPerformance(ctx, in)
}
func (c *grpcChainClient) ValidatorParticipation(ctx context.Context, in *ethpb.GetValidatorParticipationRequest) (*ethpb.ValidatorParticipationResponse, error) {
return c.beaconChainClient.GetValidatorParticipation(ctx, in)
return c.getClient().GetValidatorParticipation(ctx, in)
}
func NewGrpcChainClient(cc grpc.ClientConnInterface) iface.ChainClient {
return &grpcChainClient{ethpb.NewBeaconChainClient(cc)}
// NewGrpcChainClient creates a new gRPC chain client that supports
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
func NewGrpcChainClient(conn validatorHelpers.NodeConnection) iface.ChainClient {
return &grpcChainClient{
grpcClientManager: newGrpcClientManager(conn, ethpb.NewBeaconChainClient),
}
}

View File

@@ -0,0 +1,44 @@
package grpc_api
import (
"sync"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"google.golang.org/grpc"
)
// grpcClientManager handles dynamic gRPC client recreation when the connection changes.
// It uses generics to work with any gRPC client type.
type grpcClientManager[T any] struct {
mu sync.Mutex
conn validatorHelpers.NodeConnection
client T
lastHost string
newClient func(grpc.ClientConnInterface) T
}
// newGrpcClientManager creates a new client manager with the given connection and client constructor.
func newGrpcClientManager[T any](
conn validatorHelpers.NodeConnection,
newClient func(grpc.ClientConnInterface) T,
) *grpcClientManager[T] {
return &grpcClientManager[T]{
conn: conn,
newClient: newClient,
client: newClient(conn.GetGrpcClientConn()),
lastHost: conn.GetGrpcConnectionProvider().CurrentHost(),
}
}
// getClient returns the current client, recreating it if the connection has changed.
func (m *grpcClientManager[T]) getClient() T {
m.mu.Lock()
defer m.mu.Unlock()
currentHost := m.conn.GetGrpcConnectionProvider().CurrentHost()
if m.lastHost != currentHost {
m.client = m.newClient(m.conn.GetGrpcClientConn())
m.lastHost = currentHost
}
return m.client
}

View File

@@ -0,0 +1,168 @@
package grpc_api
import (
"sync"
"testing"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"google.golang.org/grpc"
)
// mockProvider implements grpcutil.GrpcConnectionProvider for testing.
type mockProvider struct {
hosts []string
currentIndex int
mu sync.Mutex
}
func (m *mockProvider) CurrentConn() *grpc.ClientConn { return nil }
func (m *mockProvider) Hosts() []string { return m.hosts }
func (m *mockProvider) Close() {}
func (m *mockProvider) CurrentHost() string {
m.mu.Lock()
defer m.mu.Unlock()
return m.hosts[m.currentIndex]
}
func (m *mockProvider) SwitchHost(index int) error {
m.mu.Lock()
defer m.mu.Unlock()
m.currentIndex = index
return nil
}
// nextHost is a test helper for round-robin simulation (not part of the interface).
func (m *mockProvider) nextHost() {
m.mu.Lock()
defer m.mu.Unlock()
m.currentIndex = (m.currentIndex + 1) % len(m.hosts)
}
// testClient is a simple type for testing the generic client manager.
type testClient struct{ id int }
// testManager creates a manager with client creation counting.
func testManager(t *testing.T, provider *mockProvider) (*grpcClientManager[*testClient], *int) {
conn, err := validatorHelpers.NewNodeConnection(validatorHelpers.WithGRPCProvider(provider))
require.NoError(t, err)
clientCount := new(int)
newClient := func(grpc.ClientConnInterface) *testClient {
*clientCount++
return &testClient{id: *clientCount}
}
manager := newGrpcClientManager(conn, newClient)
require.NotNil(t, manager)
return manager, clientCount
}
func TestGrpcClientManager(t *testing.T) {
t.Run("tracks host", func(t *testing.T) {
provider := &mockProvider{hosts: []string{"host1:4000", "host2:4000"}}
manager, count := testManager(t, provider)
assert.Equal(t, 1, *count)
assert.Equal(t, "host1:4000", manager.lastHost)
})
t.Run("same host returns same client", func(t *testing.T) {
provider := &mockProvider{hosts: []string{"host1:4000", "host2:4000"}}
manager, count := testManager(t, provider)
c1, c2, c3 := manager.getClient(), manager.getClient(), manager.getClient()
assert.Equal(t, 1, *count)
assert.Equal(t, c1, c2)
assert.Equal(t, c2, c3)
})
t.Run("host change recreates client", func(t *testing.T) {
provider := &mockProvider{hosts: []string{"host1:4000", "host2:4000"}}
manager, count := testManager(t, provider)
c1 := manager.getClient()
assert.Equal(t, 1, c1.id)
provider.nextHost()
c2 := manager.getClient()
assert.Equal(t, 2, *count)
assert.Equal(t, 2, c2.id)
// Same host again - no recreation
c3 := manager.getClient()
assert.Equal(t, 2, *count)
assert.Equal(t, c2, c3)
})
t.Run("multiple host switches", func(t *testing.T) {
provider := &mockProvider{hosts: []string{"host1:4000", "host2:4000", "host3:4000"}}
manager, count := testManager(t, provider)
assert.Equal(t, 1, *count)
for expected := 2; expected <= 4; expected++ {
provider.nextHost()
_ = manager.getClient()
assert.Equal(t, expected, *count)
}
})
}
func TestGrpcClientManager_Concurrent(t *testing.T) {
t.Run("concurrent access same host", func(t *testing.T) {
provider := &mockProvider{hosts: []string{"host1:4000", "host2:4000"}}
manager, _ := testManager(t, provider)
var clientCount int
var countMu sync.Mutex
// Override with thread-safe counter
manager.newClient = func(grpc.ClientConnInterface) *testClient {
countMu.Lock()
clientCount++
id := clientCount
countMu.Unlock()
return &testClient{id: id}
}
manager.client = manager.newClient(nil)
clientCount = 1
var wg sync.WaitGroup
for range 100 {
wg.Go(func() { _ = manager.getClient() })
}
wg.Wait()
countMu.Lock()
assert.Equal(t, 1, clientCount)
countMu.Unlock()
})
t.Run("concurrent with host changes", func(t *testing.T) {
provider := &mockProvider{hosts: []string{"host1:4000", "host2:4000"}}
manager, _ := testManager(t, provider)
var clientCount int
var countMu sync.Mutex
manager.newClient = func(grpc.ClientConnInterface) *testClient {
countMu.Lock()
clientCount++
id := clientCount
countMu.Unlock()
return &testClient{id: id}
}
manager.client = manager.newClient(nil)
clientCount = 1
var wg sync.WaitGroup
for range 50 {
wg.Go(func() { _ = manager.getClient() })
wg.Go(func() { provider.nextHost() })
}
wg.Wait()
countMu.Lock()
assert.NotEqual(t, 0, clientCount, "Should have created at least one client")
countMu.Unlock()
})
}

View File

@@ -5,8 +5,8 @@ import (
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"github.com/golang/protobuf/ptypes/empty"
"google.golang.org/grpc"
)
var (
@@ -14,35 +14,40 @@ var (
)
type grpcNodeClient struct {
nodeClient ethpb.NodeClient
*grpcClientManager[ethpb.NodeClient]
}
func (c *grpcNodeClient) SyncStatus(ctx context.Context, in *empty.Empty) (*ethpb.SyncStatus, error) {
return c.nodeClient.GetSyncStatus(ctx, in)
return c.getClient().GetSyncStatus(ctx, in)
}
func (c *grpcNodeClient) Genesis(ctx context.Context, in *empty.Empty) (*ethpb.Genesis, error) {
return c.nodeClient.GetGenesis(ctx, in)
return c.getClient().GetGenesis(ctx, in)
}
func (c *grpcNodeClient) Version(ctx context.Context, in *empty.Empty) (*ethpb.Version, error) {
return c.nodeClient.GetVersion(ctx, in)
return c.getClient().GetVersion(ctx, in)
}
func (c *grpcNodeClient) Peers(ctx context.Context, in *empty.Empty) (*ethpb.Peers, error) {
return c.nodeClient.ListPeers(ctx, in)
return c.getClient().ListPeers(ctx, in)
}
func (c *grpcNodeClient) IsReady(ctx context.Context) bool {
_, err := c.nodeClient.GetHealth(ctx, &ethpb.HealthRequest{})
// GetHealth returns 200 OK only if node is synced and not optimistic.
// otherwise it will throw an error
_, err := c.getClient().GetHealth(ctx, &ethpb.HealthRequest{})
if err != nil {
log.WithError(err).Error("Failed to get health of node")
log.WithError(err).Debug("Node is not ready")
return false
}
return true
}
func NewNodeClient(cc grpc.ClientConnInterface) iface.NodeClient {
g := &grpcNodeClient{nodeClient: ethpb.NewNodeClient(cc)}
return g
// NewNodeClient creates a new gRPC node client that supports
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
func NewNodeClient(conn validatorHelpers.NodeConnection) iface.NodeClient {
return &grpcNodeClient{
grpcClientManager: newGrpcClientManager(conn, ethpb.NewNodeClient),
}
}

View File

@@ -0,0 +1,72 @@
package grpc_api
import (
"errors"
"testing"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/mock"
"github.com/OffchainLabs/prysm/v7/testing/require"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"github.com/golang/protobuf/ptypes/empty"
"go.uber.org/mock/gomock"
"google.golang.org/grpc"
)
func TestGrpcNodeClient_IsReady(t *testing.T) {
// The IsReady function now relies on GetHealth which returns:
// - 200 OK (nil error) only if node is synced AND not optimistic
// - 206 Partial Content (error) if syncing or optimistic
// - 503 Unavailable (error) if unavailable
testCases := []struct {
name string
healthErr error
expectedResult bool
}{
{
name: "returns true when health check succeeds (synced and not optimistic)",
healthErr: nil,
expectedResult: true,
},
{
name: "returns false when health check fails (syncing, optimistic, or unavailable)",
healthErr: errors.New("node not ready"),
expectedResult: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
ctx := t.Context()
mockNodeClient := mock.NewMockNodeClient(ctrl)
// Set up health check expectation
mockNodeClient.EXPECT().GetHealth(
gomock.Any(),
gomock.Any(),
).Return(&empty.Empty{}, tc.healthErr)
// Create a mock provider
provider := &mockProvider{hosts: []string{"host1:4000"}}
conn, err := validatorHelpers.NewNodeConnection(validatorHelpers.WithGRPCProvider(provider))
require.NoError(t, err)
// Create client with injected mock
client := &grpcNodeClient{
grpcClientManager: &grpcClientManager[ethpb.NodeClient]{
conn: conn,
client: mockNodeClient,
lastHost: "host1:4000",
newClient: func(grpc.ClientConnInterface) ethpb.NodeClient { return mockNodeClient },
},
}
result := client.IsReady(ctx)
assert.Equal(t, tc.expectedResult, result)
})
}
}

View File

@@ -12,9 +12,9 @@ import (
eth "github.com/OffchainLabs/prysm/v7/proto/eth/v1"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
type grpcPrysmChainClient struct {
@@ -95,6 +95,8 @@ func (c *grpcPrysmChainClient) ValidatorPerformance(ctx context.Context, in *eth
return c.chainClient.ValidatorPerformance(ctx, in)
}
func NewGrpcPrysmChainClient(cc grpc.ClientConnInterface) iface.PrysmChainClient {
return &grpcPrysmChainClient{chainClient: &grpcChainClient{ethpb.NewBeaconChainClient(cc)}}
// NewGrpcPrysmChainClient creates a new gRPC Prysm chain client that supports
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
func NewGrpcPrysmChainClient(conn validatorHelpers.NodeConnection) iface.PrysmChainClient {
return &grpcPrysmChainClient{chainClient: NewGrpcChainClient(conn)}
}

View File

@@ -14,24 +14,24 @@ import (
"github.com/OffchainLabs/prysm/v7/monitoring/tracing/trace"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/golang/protobuf/ptypes/empty"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type grpcValidatorClient struct {
beaconNodeValidatorClient ethpb.BeaconNodeValidatorClient
isEventStreamRunning bool
*grpcClientManager[ethpb.BeaconNodeValidatorClient]
isEventStreamRunning bool
}
func (c *grpcValidatorClient) Duties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.ValidatorDutiesContainer, error) {
if features.Get().DisableDutiesV2 {
return c.getDuties(ctx, in)
}
dutiesResponse, err := c.beaconNodeValidatorClient.GetDutiesV2(ctx, in)
dutiesResponse, err := c.getClient().GetDutiesV2(ctx, in)
if err != nil {
if status.Code(err) == codes.Unimplemented {
log.Warn("GetDutiesV2 returned status code unavailable, falling back to GetDuties")
@@ -47,7 +47,7 @@ func (c *grpcValidatorClient) Duties(ctx context.Context, in *ethpb.DutiesReques
// getDuties is calling the v1 of get duties
func (c *grpcValidatorClient) getDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.ValidatorDutiesContainer, error) {
dutiesResponse, err := c.beaconNodeValidatorClient.GetDuties(ctx, in)
dutiesResponse, err := c.getClient().GetDuties(ctx, in)
if err != nil {
return nil, errors.Wrap(
client.ErrConnectionIssue,
@@ -147,108 +147,108 @@ func toValidatorDutyV2(duty *ethpb.DutiesV2Response_Duty) (*ethpb.ValidatorDuty,
}
func (c *grpcValidatorClient) CheckDoppelGanger(ctx context.Context, in *ethpb.DoppelGangerRequest) (*ethpb.DoppelGangerResponse, error) {
return c.beaconNodeValidatorClient.CheckDoppelGanger(ctx, in)
return c.getClient().CheckDoppelGanger(ctx, in)
}
func (c *grpcValidatorClient) DomainData(ctx context.Context, in *ethpb.DomainRequest) (*ethpb.DomainResponse, error) {
return c.beaconNodeValidatorClient.DomainData(ctx, in)
return c.getClient().DomainData(ctx, in)
}
func (c *grpcValidatorClient) AttestationData(ctx context.Context, in *ethpb.AttestationDataRequest) (*ethpb.AttestationData, error) {
return c.beaconNodeValidatorClient.GetAttestationData(ctx, in)
return c.getClient().GetAttestationData(ctx, in)
}
func (c *grpcValidatorClient) BeaconBlock(ctx context.Context, in *ethpb.BlockRequest) (*ethpb.GenericBeaconBlock, error) {
return c.beaconNodeValidatorClient.GetBeaconBlock(ctx, in)
return c.getClient().GetBeaconBlock(ctx, in)
}
func (c *grpcValidatorClient) FeeRecipientByPubKey(ctx context.Context, in *ethpb.FeeRecipientByPubKeyRequest) (*ethpb.FeeRecipientByPubKeyResponse, error) {
return c.beaconNodeValidatorClient.GetFeeRecipientByPubKey(ctx, in)
return c.getClient().GetFeeRecipientByPubKey(ctx, in)
}
func (c *grpcValidatorClient) SyncCommitteeContribution(ctx context.Context, in *ethpb.SyncCommitteeContributionRequest) (*ethpb.SyncCommitteeContribution, error) {
return c.beaconNodeValidatorClient.GetSyncCommitteeContribution(ctx, in)
return c.getClient().GetSyncCommitteeContribution(ctx, in)
}
func (c *grpcValidatorClient) SyncMessageBlockRoot(ctx context.Context, in *empty.Empty) (*ethpb.SyncMessageBlockRootResponse, error) {
return c.beaconNodeValidatorClient.GetSyncMessageBlockRoot(ctx, in)
return c.getClient().GetSyncMessageBlockRoot(ctx, in)
}
func (c *grpcValidatorClient) SyncSubcommitteeIndex(ctx context.Context, in *ethpb.SyncSubcommitteeIndexRequest) (*ethpb.SyncSubcommitteeIndexResponse, error) {
return c.beaconNodeValidatorClient.GetSyncSubcommitteeIndex(ctx, in)
return c.getClient().GetSyncSubcommitteeIndex(ctx, in)
}
func (c *grpcValidatorClient) MultipleValidatorStatus(ctx context.Context, in *ethpb.MultipleValidatorStatusRequest) (*ethpb.MultipleValidatorStatusResponse, error) {
return c.beaconNodeValidatorClient.MultipleValidatorStatus(ctx, in)
return c.getClient().MultipleValidatorStatus(ctx, in)
}
func (c *grpcValidatorClient) PrepareBeaconProposer(ctx context.Context, in *ethpb.PrepareBeaconProposerRequest) (*empty.Empty, error) {
return c.beaconNodeValidatorClient.PrepareBeaconProposer(ctx, in)
return c.getClient().PrepareBeaconProposer(ctx, in)
}
func (c *grpcValidatorClient) ProposeAttestation(ctx context.Context, in *ethpb.Attestation) (*ethpb.AttestResponse, error) {
return c.beaconNodeValidatorClient.ProposeAttestation(ctx, in)
return c.getClient().ProposeAttestation(ctx, in)
}
func (c *grpcValidatorClient) ProposeAttestationElectra(ctx context.Context, in *ethpb.SingleAttestation) (*ethpb.AttestResponse, error) {
return c.beaconNodeValidatorClient.ProposeAttestationElectra(ctx, in)
return c.getClient().ProposeAttestationElectra(ctx, in)
}
func (c *grpcValidatorClient) ProposeBeaconBlock(ctx context.Context, in *ethpb.GenericSignedBeaconBlock) (*ethpb.ProposeResponse, error) {
return c.beaconNodeValidatorClient.ProposeBeaconBlock(ctx, in)
return c.getClient().ProposeBeaconBlock(ctx, in)
}
func (c *grpcValidatorClient) ProposeExit(ctx context.Context, in *ethpb.SignedVoluntaryExit) (*ethpb.ProposeExitResponse, error) {
return c.beaconNodeValidatorClient.ProposeExit(ctx, in)
return c.getClient().ProposeExit(ctx, in)
}
func (c *grpcValidatorClient) StreamBlocksAltair(ctx context.Context, in *ethpb.StreamBlocksRequest) (ethpb.BeaconNodeValidator_StreamBlocksAltairClient, error) {
return c.beaconNodeValidatorClient.StreamBlocksAltair(ctx, in)
return c.getClient().StreamBlocksAltair(ctx, in)
}
func (c *grpcValidatorClient) SubmitAggregateSelectionProof(ctx context.Context, in *ethpb.AggregateSelectionRequest, _ primitives.ValidatorIndex, _ uint64) (*ethpb.AggregateSelectionResponse, error) {
return c.beaconNodeValidatorClient.SubmitAggregateSelectionProof(ctx, in)
return c.getClient().SubmitAggregateSelectionProof(ctx, in)
}
func (c *grpcValidatorClient) SubmitAggregateSelectionProofElectra(ctx context.Context, in *ethpb.AggregateSelectionRequest, _ primitives.ValidatorIndex, _ uint64) (*ethpb.AggregateSelectionElectraResponse, error) {
return c.beaconNodeValidatorClient.SubmitAggregateSelectionProofElectra(ctx, in)
return c.getClient().SubmitAggregateSelectionProofElectra(ctx, in)
}
func (c *grpcValidatorClient) SubmitSignedAggregateSelectionProof(ctx context.Context, in *ethpb.SignedAggregateSubmitRequest) (*ethpb.SignedAggregateSubmitResponse, error) {
return c.beaconNodeValidatorClient.SubmitSignedAggregateSelectionProof(ctx, in)
return c.getClient().SubmitSignedAggregateSelectionProof(ctx, in)
}
func (c *grpcValidatorClient) SubmitSignedAggregateSelectionProofElectra(ctx context.Context, in *ethpb.SignedAggregateSubmitElectraRequest) (*ethpb.SignedAggregateSubmitResponse, error) {
return c.beaconNodeValidatorClient.SubmitSignedAggregateSelectionProofElectra(ctx, in)
return c.getClient().SubmitSignedAggregateSelectionProofElectra(ctx, in)
}
func (c *grpcValidatorClient) SubmitSignedContributionAndProof(ctx context.Context, in *ethpb.SignedContributionAndProof) (*empty.Empty, error) {
return c.beaconNodeValidatorClient.SubmitSignedContributionAndProof(ctx, in)
return c.getClient().SubmitSignedContributionAndProof(ctx, in)
}
func (c *grpcValidatorClient) SubmitSyncMessage(ctx context.Context, in *ethpb.SyncCommitteeMessage) (*empty.Empty, error) {
return c.beaconNodeValidatorClient.SubmitSyncMessage(ctx, in)
return c.getClient().SubmitSyncMessage(ctx, in)
}
func (c *grpcValidatorClient) SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error) {
return c.beaconNodeValidatorClient.SubmitValidatorRegistrations(ctx, in)
return c.getClient().SubmitValidatorRegistrations(ctx, in)
}
func (c *grpcValidatorClient) SubscribeCommitteeSubnets(ctx context.Context, in *ethpb.CommitteeSubnetsSubscribeRequest, _ []*ethpb.ValidatorDuty) (*empty.Empty, error) {
return c.beaconNodeValidatorClient.SubscribeCommitteeSubnets(ctx, in)
return c.getClient().SubscribeCommitteeSubnets(ctx, in)
}
func (c *grpcValidatorClient) ValidatorIndex(ctx context.Context, in *ethpb.ValidatorIndexRequest) (*ethpb.ValidatorIndexResponse, error) {
return c.beaconNodeValidatorClient.ValidatorIndex(ctx, in)
return c.getClient().ValidatorIndex(ctx, in)
}
func (c *grpcValidatorClient) ValidatorStatus(ctx context.Context, in *ethpb.ValidatorStatusRequest) (*ethpb.ValidatorStatusResponse, error) {
return c.beaconNodeValidatorClient.ValidatorStatus(ctx, in)
return c.getClient().ValidatorStatus(ctx, in)
}
// Deprecated: Do not use.
func (c *grpcValidatorClient) WaitForChainStart(ctx context.Context, in *empty.Empty) (*ethpb.ChainStartResponse, error) {
stream, err := c.beaconNodeValidatorClient.WaitForChainStart(ctx, in)
stream, err := c.getClient().WaitForChainStart(ctx, in)
if err != nil {
return nil, errors.Wrap(
client.ErrConnectionIssue,
@@ -260,13 +260,13 @@ func (c *grpcValidatorClient) WaitForChainStart(ctx context.Context, in *empty.E
}
func (c *grpcValidatorClient) AssignValidatorToSubnet(ctx context.Context, in *ethpb.AssignValidatorToSubnetRequest) (*empty.Empty, error) {
return c.beaconNodeValidatorClient.AssignValidatorToSubnet(ctx, in)
return c.getClient().AssignValidatorToSubnet(ctx, in)
}
func (c *grpcValidatorClient) AggregatedSigAndAggregationBits(
ctx context.Context,
in *ethpb.AggregatedSigAndAggregationBitsRequest,
) (*ethpb.AggregatedSigAndAggregationBitsResponse, error) {
return c.beaconNodeValidatorClient.AggregatedSigAndAggregationBits(ctx, in)
return c.getClient().AggregatedSigAndAggregationBits(ctx, in)
}
func (*grpcValidatorClient) AggregatedSelections(context.Context, []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) {
@@ -277,8 +277,12 @@ func (*grpcValidatorClient) AggregatedSyncSelections(context.Context, []iface.Sy
return nil, iface.ErrNotSupported
}
func NewGrpcValidatorClient(cc grpc.ClientConnInterface) iface.ValidatorClient {
return &grpcValidatorClient{ethpb.NewBeaconNodeValidatorClient(cc), false}
// NewGrpcValidatorClient creates a new gRPC validator client that supports
// dynamic connection switching via the NodeConnection's GrpcConnectionProvider.
func NewGrpcValidatorClient(conn validatorHelpers.NodeConnection) iface.ValidatorClient {
return &grpcValidatorClient{
grpcClientManager: newGrpcClientManager(conn, ethpb.NewBeaconNodeValidatorClient),
}
}
func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []string, eventsChannel chan<- *eventClient.Event) {
@@ -308,7 +312,7 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
log.Warn("gRPC only supports the head topic, other topics will be ignored")
}
stream, err := c.beaconNodeValidatorClient.StreamSlots(ctx, &ethpb.StreamSlotsRequest{VerifiedOnly: true})
stream, err := c.getClient().StreamSlots(ctx, &ethpb.StreamSlotsRequest{VerifiedOnly: true})
if err != nil {
eventsChannel <- &eventClient.Event{
EventType: eventClient.EventConnectionError,
@@ -374,11 +378,20 @@ func (c *grpcValidatorClient) EventStreamIsRunning() bool {
return c.isEventStreamRunning
}
func (*grpcValidatorClient) Host() string {
log.Warn(iface.ErrNotSupported)
return ""
func (c *grpcValidatorClient) Host() string {
return c.grpcClientManager.conn.GetGrpcConnectionProvider().CurrentHost()
}
func (*grpcValidatorClient) SetHost(_ string) {
log.Warn(iface.ErrNotSupported)
func (c *grpcValidatorClient) SwitchHost(host string) {
provider := c.grpcClientManager.conn.GetGrpcConnectionProvider()
// Find the index of the requested host and switch to it
for i, h := range provider.Hosts() {
if h == host {
if err := provider.SwitchHost(i); err != nil {
log.WithError(err).WithField("host", host).Error("Failed to set gRPC host")
}
return
}
}
log.WithField("host", host).Warn("Requested gRPC host not found in configured endpoints")
}

View File

@@ -14,8 +14,10 @@ import (
"github.com/OffchainLabs/prysm/v7/testing/assert"
mock2 "github.com/OffchainLabs/prysm/v7/testing/mock"
"github.com/OffchainLabs/prysm/v7/testing/require"
validatorTesting "github.com/OffchainLabs/prysm/v7/validator/testing"
logTest "github.com/sirupsen/logrus/hooks/test"
"go.uber.org/mock/gomock"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)
@@ -133,7 +135,15 @@ func TestWaitForChainStart_StreamSetupFails(t *testing.T) {
gomock.Any(),
).Return(nil, errors.New("failed stream"))
validatorClient := &grpcValidatorClient{beaconNodeValidatorClient, true}
validatorClient := &grpcValidatorClient{
grpcClientManager: newGrpcClientManager(
validatorTesting.MockNodeConnection(),
func(_ grpc.ClientConnInterface) eth.BeaconNodeValidatorClient {
return beaconNodeValidatorClient
},
),
isEventStreamRunning: true,
}
_, err := validatorClient.WaitForChainStart(t.Context(), &emptypb.Empty{})
want := "could not setup beacon chain ChainStart streaming client"
assert.ErrorContains(t, want, err)
@@ -146,7 +156,15 @@ func TestStartEventStream(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
beaconNodeValidatorClient := mock2.NewMockBeaconNodeValidatorClient(ctrl)
grpcClient := &grpcValidatorClient{beaconNodeValidatorClient, true}
grpcClient := &grpcValidatorClient{
grpcClientManager: newGrpcClientManager(
validatorTesting.MockNodeConnection(),
func(_ grpc.ClientConnInterface) eth.BeaconNodeValidatorClient {
return beaconNodeValidatorClient
},
),
isEventStreamRunning: true,
}
tests := []struct {
name string
topics []string

View File

@@ -152,5 +152,5 @@ type ValidatorClient interface {
AggregatedSelections(ctx context.Context, selections []BeaconCommitteeSelection) ([]BeaconCommitteeSelection, error)
AggregatedSyncSelections(ctx context.Context, selections []SyncCommitteeSelection) ([]SyncCommitteeSelection, error)
Host() string
SetHost(host string)
SwitchHost(host string)
}

View File

@@ -1,53 +0,0 @@
package client
import (
"strings"
"google.golang.org/grpc/resolver"
)
// Modification of a default grpc passthrough resolver (google.golang.org/grpc/resolver/passthrough) allowing to use multiple addresses
// in grpc endpoint. Example:
// conn, err := grpc.DialContext(ctx, "127.0.0.1:4000,127.0.0.1:4001", grpc.WithInsecure(), grpc.WithResolvers(&multipleEndpointsGrpcResolverBuilder{}))
// It can be used with any grpc load balancer (pick_first, round_robin). Default is pick_first.
// Round robin can be used by adding the following option:
// grpc.WithDefaultServiceConfig("{\"loadBalancingConfig\":[{\"round_robin\":{}}]}")
type multipleEndpointsGrpcResolverBuilder struct{}
// Build creates and starts multiple endpoints resolver.
func (*multipleEndpointsGrpcResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (resolver.Resolver, error) {
r := &multipleEndpointsGrpcResolver{
target: target,
cc: cc,
}
r.start()
return r, nil
}
// Scheme returns default scheme.
func (*multipleEndpointsGrpcResolverBuilder) Scheme() string {
return resolver.GetDefaultScheme()
}
type multipleEndpointsGrpcResolver struct {
target resolver.Target
cc resolver.ClientConn
}
func (r *multipleEndpointsGrpcResolver) start() {
ep := r.target.Endpoint()
endpoints := strings.Split(ep, ",")
var addrs []resolver.Address
for _, endpoint := range endpoints {
addrs = append(addrs, resolver.Address{Addr: endpoint, ServerName: endpoint})
}
if err := r.cc.UpdateState(resolver.State{Addresses: addrs}); err != nil {
log.WithError(err).Error("Failed to update grpc connection state")
}
}
// ResolveNow --
func (*multipleEndpointsGrpcResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
// Close --
func (*multipleEndpointsGrpcResolver) Close() {}

View File

@@ -8,11 +8,10 @@ import (
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
)
func NewNodeClient(validatorConn validatorHelpers.NodeConnection, jsonRestHandler beaconApi.RestHandler) iface.NodeClient {
grpcClient := grpcApi.NewNodeClient(validatorConn.GetGrpcClientConn())
func NewNodeClient(validatorConn validatorHelpers.NodeConnection) iface.NodeClient {
grpcClient := grpcApi.NewNodeClient(validatorConn)
if features.Get().EnableBeaconRESTApi {
return beaconApi.NewNodeClientWithFallback(jsonRestHandler, grpcClient)
} else {
return grpcClient
return beaconApi.NewNodeClientWithFallback(validatorConn.GetRestHandler(), grpcClient)
}
return grpcClient
}

View File

@@ -2,13 +2,11 @@ package client
import (
"context"
"net/http"
"strings"
"time"
api "github.com/OffchainLabs/prysm/v7/api/client"
eventClient "github.com/OffchainLabs/prysm/v7/api/client/event"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/async/event"
lruwrpr "github.com/OffchainLabs/prysm/v7/cache/lru"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
@@ -17,7 +15,6 @@ import (
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/validator/accounts/wallet"
beaconApi "github.com/OffchainLabs/prysm/v7/validator/client/beacon-api"
beaconChainClientFactory "github.com/OffchainLabs/prysm/v7/validator/client/beacon-chain-client-factory"
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
nodeclientfactory "github.com/OffchainLabs/prysm/v7/validator/client/node-client-factory"
@@ -35,7 +32,6 @@ import (
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/proto"
@@ -72,6 +68,7 @@ type Config struct {
DB db.Database
Wallet *wallet.Wallet
WalletInitializedFeed *event.Feed
Conn validatorHelpers.NodeConnection // Optional: pre-built connection (if nil, built from endpoint configs)
MaxHealthChecks int
GRPCMaxCallRecvMsgSize int
GRPCRetries uint
@@ -122,6 +119,12 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e
maxHealthChecks: cfg.MaxHealthChecks,
}
// Use pre-built connection if provided
if cfg.Conn != nil {
s.conn = cfg.Conn
return s, nil
}
dialOpts := ConstructDialOptions(
cfg.GRPCMaxCallRecvMsgSize,
cfg.BeaconNodeCert,
@@ -134,19 +137,21 @@ func NewValidatorService(ctx context.Context, cfg *Config) (*ValidatorService, e
s.ctx = grpcutil.AppendHeaders(ctx, cfg.GRPCHeaders)
grpcConn, err := grpc.DialContext(ctx, cfg.BeaconNodeGRPCEndpoint, dialOpts...)
conn, err := validatorHelpers.NewNodeConnection(
validatorHelpers.WithGRPC(s.ctx, cfg.BeaconNodeGRPCEndpoint, dialOpts),
validatorHelpers.WithREST(cfg.BeaconApiEndpoint,
rest.WithHttpHeaders(cfg.BeaconApiHeaders),
rest.WithHttpTimeout(cfg.BeaconApiTimeout),
rest.WithTracing(),
),
)
if err != nil {
return s, err
}
if cfg.BeaconNodeCert != "" {
if cfg.BeaconNodeCert != "" && cfg.BeaconNodeGRPCEndpoint != "" {
log.Info("Established secure gRPC connection")
}
s.conn = validatorHelpers.NewNodeConnection(
grpcConn,
cfg.BeaconApiEndpoint,
validatorHelpers.WithBeaconApiHeaders(cfg.BeaconApiHeaders),
validatorHelpers.WithBeaconApiTimeout(cfg.BeaconApiTimeout),
)
s.conn = conn
return s, nil
}
@@ -181,20 +186,13 @@ func (v *ValidatorService) Start() {
return
}
u := strings.ReplaceAll(v.conn.GetBeaconApiUrl(), " ", "")
hosts := strings.Split(u, ",")
if len(hosts) == 0 {
log.WithError(err).Error("No API hosts provided")
restProvider := v.conn.GetRestConnectionProvider()
if restProvider == nil || len(restProvider.Hosts()) == 0 {
log.Error("No REST API hosts provided")
return
}
headersTransport := api.NewCustomHeadersTransport(http.DefaultTransport, v.conn.GetBeaconApiHeaders())
restHandler := beaconApi.NewBeaconApiRestHandler(
http.Client{Timeout: v.conn.GetBeaconApiTimeout(), Transport: otelhttp.NewTransport(headersTransport)},
hosts[0],
)
validatorClient := validatorclientfactory.NewValidatorClient(v.conn, restHandler)
validatorClient := validatorclientfactory.NewValidatorClient(v.conn)
v.validator = &validator{
slotFeed: new(event.Feed),
@@ -208,12 +206,12 @@ func (v *ValidatorService) Start() {
graffiti: v.graffiti,
graffitiStruct: v.graffitiStruct,
graffitiOrderedIndex: graffitiOrderedIndex,
beaconNodeHosts: hosts,
conn: v.conn,
currentHostIndex: 0,
validatorClient: validatorClient,
chainClient: beaconChainClientFactory.NewChainClient(v.conn, restHandler),
nodeClient: nodeclientfactory.NewNodeClient(v.conn, restHandler),
prysmChainClient: beaconChainClientFactory.NewPrysmChainClient(v.conn, restHandler),
chainClient: beaconChainClientFactory.NewChainClient(v.conn),
nodeClient: nodeclientfactory.NewNodeClient(v.conn),
prysmChainClient: beaconChainClientFactory.NewPrysmChainClient(v.conn),
db: v.db,
km: nil,
web3SignerConfig: v.web3SignerConfig,
@@ -369,7 +367,6 @@ func ConstructDialOptions(
grpcprometheus.StreamClientInterceptor,
grpcretry.StreamClientInterceptor(),
),
grpc.WithResolvers(&multipleEndpointsGrpcResolverBuilder{}),
}
dialOpts = append(dialOpts, extraOpts...)

View File

@@ -33,7 +33,10 @@ func TestStop_CancelsContext(t *testing.T) {
func TestNew_Insecure(t *testing.T) {
hook := logTest.NewGlobal()
_, err := NewValidatorService(t.Context(), &Config{})
_, err := NewValidatorService(t.Context(), &Config{
BeaconNodeGRPCEndpoint: "localhost:4000",
BeaconApiEndpoint: "http://localhost:3500",
})
require.NoError(t, err)
require.LogsContain(t, hook, "You are using an insecure gRPC connection")
}
@@ -58,7 +61,11 @@ func TestStart_GrpcHeaders(t *testing.T) {
"Authorization", "this is a valid value",
},
} {
cfg := &Config{GRPCHeaders: strings.Split(input, ",")}
cfg := &Config{
BeaconNodeGRPCEndpoint: "localhost:4000",
BeaconApiEndpoint: "http://localhost:3500",
GRPCHeaders: strings.Split(input, ","),
}
validatorService, err := NewValidatorService(ctx, cfg)
require.NoError(t, err)
md, _ := metadata.FromOutgoingContext(validatorService.ctx)

View File

@@ -10,12 +10,10 @@ import (
func NewValidatorClient(
validatorConn validatorHelpers.NodeConnection,
jsonRestHandler beaconApi.RestHandler,
opt ...beaconApi.ValidatorClientOpt,
) iface.ValidatorClient {
if features.Get().EnableBeaconRESTApi {
return beaconApi.NewBeaconApiValidatorClient(jsonRestHandler, opt...)
} else {
return grpcApi.NewGrpcValidatorClient(validatorConn.GetGrpcClientConn())
return beaconApi.NewBeaconApiValidatorClient(validatorConn.GetRestHandler(), opt...)
}
return grpcApi.NewGrpcValidatorClient(validatorConn)
}

View File

@@ -38,6 +38,7 @@ import (
"github.com/OffchainLabs/prysm/v7/validator/db"
dbCommon "github.com/OffchainLabs/prysm/v7/validator/db/common"
"github.com/OffchainLabs/prysm/v7/validator/graffiti"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"github.com/OffchainLabs/prysm/v7/validator/keymanager"
"github.com/OffchainLabs/prysm/v7/validator/keymanager/local"
remoteweb3signer "github.com/OffchainLabs/prysm/v7/validator/keymanager/remote-web3signer"
@@ -101,9 +102,9 @@ type validator struct {
pubkeyToStatus map[[fieldparams.BLSPubkeyLength]byte]*validatorStatus
wallet *wallet.Wallet
walletInitializedChan chan *wallet.Wallet
currentHostIndex uint64
walletInitializedFeed *event.Feed
graffitiOrderedIndex uint64
conn validatorHelpers.NodeConnection
submittedAtts map[submittedAttKey]*submittedAtt
validatorsRegBatchSize int
validatorClient iface.ValidatorClient
@@ -114,7 +115,7 @@ type validator struct {
km keymanager.IKeymanager
accountChangedSub event.Subscription
ticker slots.Ticker
beaconNodeHosts []string
currentHostIndex uint64
genesisTime time.Time
graffiti []byte
voteStats voteStats
@@ -1311,34 +1312,64 @@ func (v *validator) Host() string {
}
func (v *validator) changeHost() {
next := (v.currentHostIndex + 1) % uint64(len(v.beaconNodeHosts))
hosts := v.hosts()
if len(hosts) <= 1 {
return
}
next := (v.currentHostIndex + 1) % uint64(len(hosts))
log.WithFields(logrus.Fields{
"currentHost": v.beaconNodeHosts[v.currentHostIndex],
"nextHost": v.beaconNodeHosts[next],
"currentHost": hosts[v.currentHostIndex],
"nextHost": hosts[next],
}).Warn("Beacon node is not responding, switching host")
v.validatorClient.SetHost(v.beaconNodeHosts[next])
v.validatorClient.SwitchHost(hosts[next])
v.currentHostIndex = next
}
// hosts returns the list of configured beacon node hosts.
func (v *validator) hosts() []string {
if features.Get().EnableBeaconRESTApi {
return v.conn.GetRestConnectionProvider().Hosts()
}
return v.conn.GetGrpcConnectionProvider().Hosts()
}
// numHosts returns the number of configured beacon node hosts.
func (v *validator) numHosts() int {
return len(v.hosts())
}
func (v *validator) FindHealthyHost(ctx context.Context) bool {
// Tail-recursive closure keeps retry count private.
var check func(remaining int) bool
check = func(remaining int) bool {
if v.nodeClient.IsReady(ctx) { // ready → done
numHosts := v.numHosts()
startingHost := v.Host()
attemptedHosts := []string{}
// Check all hosts for a fully synced node
for i := range numHosts {
if v.nodeClient.IsReady(ctx) {
if len(attemptedHosts) > 0 {
log.WithFields(logrus.Fields{
"previousHost": startingHost,
"newHost": v.Host(),
"failedAttempts": attemptedHosts,
}).Info("Failover succeeded: connected to healthy beacon node")
}
return true
}
if len(v.beaconNodeHosts) == 1 && features.Get().EnableBeaconRESTApi {
log.WithField("host", v.Host()).Warn("Beacon node is not responding, no backup node configured")
return false
log.WithField("host", v.Host()).Debug("Beacon node not fully synced")
attemptedHosts = append(attemptedHosts, v.Host())
// Try next host if not the last iteration
if i < numHosts-1 {
v.changeHost()
}
if remaining == 0 || !features.Get().EnableBeaconRESTApi {
return false // exhausted or REST disabled
}
v.changeHost()
return check(remaining - 1) // recurse
}
return check(len(v.beaconNodeHosts))
if numHosts == 1 {
log.WithField("host", v.Host()).Warn("Beacon node is not fully synced, no backup node configured")
} else {
log.Warn("No fully synced beacon node found")
}
return false
}
func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) {

View File

@@ -16,6 +16,8 @@ import (
"testing"
"time"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/async/event"
"github.com/OffchainLabs/prysm/v7/cmd/validator/flags"
@@ -37,6 +39,7 @@ import (
"github.com/OffchainLabs/prysm/v7/validator/accounts/wallet"
"github.com/OffchainLabs/prysm/v7/validator/client/iface"
dbTest "github.com/OffchainLabs/prysm/v7/validator/db/testing"
validatorHelpers "github.com/OffchainLabs/prysm/v7/validator/helpers"
"github.com/OffchainLabs/prysm/v7/validator/keymanager"
"github.com/OffchainLabs/prysm/v7/validator/keymanager/local"
remoteweb3signer "github.com/OffchainLabs/prysm/v7/validator/keymanager/remote-web3signer"
@@ -2792,18 +2795,27 @@ func TestValidator_Host(t *testing.T) {
}
func TestValidator_ChangeHost(t *testing.T) {
// Enable REST API mode for this test since changeHost only calls SwitchHost in REST API mode
resetCfg := features.InitWithReset(&features.Flags{EnableBeaconRESTApi: true})
defer resetCfg()
ctrl := gomock.NewController(t)
defer ctrl.Finish()
hosts := []string{"http://localhost:8080", "http://localhost:8081"}
restProvider := &rest.MockRestProvider{MockHosts: hosts}
conn, err := validatorHelpers.NewNodeConnection(validatorHelpers.WithRestProvider(restProvider))
require.NoError(t, err)
client := validatormock.NewMockValidatorClient(ctrl)
v := validator{
validatorClient: client,
beaconNodeHosts: []string{"http://localhost:8080", "http://localhost:8081"},
conn: conn,
currentHostIndex: 0,
}
client.EXPECT().SetHost(v.beaconNodeHosts[1])
client.EXPECT().SetHost(v.beaconNodeHosts[0])
client.EXPECT().SwitchHost(hosts[1])
client.EXPECT().SwitchHost(hosts[0])
v.changeHost()
assert.Equal(t, uint64(1), v.currentHostIndex)
v.changeHost()
@@ -2838,12 +2850,16 @@ func TestUpdateValidatorStatusCache(t *testing.T) {
gomock.Any(),
gomock.Any()).Return(mockResponse, nil)
mockProvider := &grpcutil.MockGrpcProvider{MockHosts: []string{"localhost:4000", "localhost:4001"}}
conn, err := validatorHelpers.NewNodeConnection(validatorHelpers.WithGRPCProvider(mockProvider))
require.NoError(t, err)
v := &validator{
validatorClient: client,
beaconNodeHosts: []string{"http://localhost:8080", "http://localhost:8081"},
conn: conn,
currentHostIndex: 0,
pubkeyToStatus: map[[fieldparams.BLSPubkeyLength]byte]*validatorStatus{
[fieldparams.BLSPubkeyLength]byte{0x03}: &validatorStatus{ // add non existent key and status to cache, should be fully removed on update
[fieldparams.BLSPubkeyLength]byte{0x03}: { // add non existent key and status to cache, should be fully removed on update
publicKey: []byte{0x03},
status: &ethpb.ValidatorStatusResponse{
Status: ethpb.ValidatorStatus_ACTIVE,
@@ -2853,7 +2869,7 @@ func TestUpdateValidatorStatusCache(t *testing.T) {
},
}
err := v.updateValidatorStatusCache(ctx, pubkeys)
err = v.updateValidatorStatusCache(ctx, pubkeys)
assert.NoError(t, err)
// make sure the nonexistent key is fully removed

View File

@@ -10,6 +10,8 @@ go_library(
importpath = "github.com/OffchainLabs/prysm/v7/validator/helpers",
visibility = ["//visibility:public"],
deps = [
"//api/grpc:go_default_library",
"//api/rest:go_default_library",
"//config/fieldparams:go_default_library",
"//consensus-types/primitives:go_default_library",
"//validator/db/iface:go_default_library",
@@ -24,18 +26,23 @@ go_test(
srcs = [
"converts_test.go",
"metadata_test.go",
"node_connection_test.go",
],
embed = [":go_default_library"],
deps = [
"//api/grpc:go_default_library",
"//api/rest:go_default_library",
"//config/fieldparams:go_default_library",
"//config/proposer:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//validator/db/common:go_default_library",
"//validator/db/iface:go_default_library",
"//validator/slashing-protection-history/format:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@org_golang_google_grpc//:go_default_library",
],
)

View File

@@ -1,78 +1,120 @@
package helpers
import (
"time"
"context"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
// Use an interface with a private dummy function to force all other packages to call NewNodeConnection
// NodeConnection provides access to both gRPC and REST API connections to a beacon node.
type NodeConnection interface {
// GetGrpcClientConn returns the current gRPC client connection.
// Returns nil if no gRPC provider is configured.
GetGrpcClientConn() *grpc.ClientConn
GetBeaconApiUrl() string
GetBeaconApiHeaders() map[string][]string
setBeaconApiHeaders(map[string][]string)
GetBeaconApiTimeout() time.Duration
setBeaconApiTimeout(time.Duration)
dummy()
// GetGrpcConnectionProvider returns the gRPC connection provider.
GetGrpcConnectionProvider() grpcutil.GrpcConnectionProvider
// GetRestConnectionProvider returns the REST connection provider.
GetRestConnectionProvider() rest.RestConnectionProvider
// GetRestHandler returns the REST handler for making API requests.
// Returns nil if no REST provider is configured.
GetRestHandler() rest.RestHandler
}
type nodeConnection struct {
grpcClientConn *grpc.ClientConn
beaconApiUrl string
beaconApiHeaders map[string][]string
beaconApiTimeout time.Duration
}
// NodeConnectionOption is a functional option for configuring the node connection.
type NodeConnectionOption func(nc NodeConnection)
// WithBeaconApiHeaders sets the HTTP headers that should be sent to the server along with each request.
func WithBeaconApiHeaders(headers map[string][]string) NodeConnectionOption {
return func(nc NodeConnection) {
nc.setBeaconApiHeaders(headers)
}
}
// WithBeaconApiTimeout sets the HTTP request timeout.
func WithBeaconApiTimeout(timeout time.Duration) NodeConnectionOption {
return func(nc NodeConnection) {
nc.setBeaconApiTimeout(timeout)
}
grpcConnectionProvider grpcutil.GrpcConnectionProvider
restConnectionProvider rest.RestConnectionProvider
}
func (c *nodeConnection) GetGrpcClientConn() *grpc.ClientConn {
return c.grpcClientConn
}
func (c *nodeConnection) GetBeaconApiUrl() string {
return c.beaconApiUrl
}
func (c *nodeConnection) GetBeaconApiHeaders() map[string][]string {
return c.beaconApiHeaders
}
func (c *nodeConnection) setBeaconApiHeaders(headers map[string][]string) {
c.beaconApiHeaders = headers
}
func (c *nodeConnection) GetBeaconApiTimeout() time.Duration {
return c.beaconApiTimeout
}
func (c *nodeConnection) setBeaconApiTimeout(timeout time.Duration) {
c.beaconApiTimeout = timeout
}
func (*nodeConnection) dummy() {}
func NewNodeConnection(grpcConn *grpc.ClientConn, beaconApiUrl string, opts ...NodeConnectionOption) NodeConnection {
conn := &nodeConnection{}
conn.grpcClientConn = grpcConn
conn.beaconApiUrl = beaconApiUrl
for _, opt := range opts {
opt(conn)
if c.grpcConnectionProvider == nil {
return nil
}
return conn
return c.grpcConnectionProvider.CurrentConn()
}
func (c *nodeConnection) GetGrpcConnectionProvider() grpcutil.GrpcConnectionProvider {
return c.grpcConnectionProvider
}
func (c *nodeConnection) GetRestConnectionProvider() rest.RestConnectionProvider {
return c.restConnectionProvider
}
func (c *nodeConnection) GetRestHandler() rest.RestHandler {
if c.restConnectionProvider == nil {
return nil
}
return c.restConnectionProvider.RestHandler()
}
// NodeConnectionOption is a functional option for configuring a NodeConnection.
type NodeConnectionOption func(*nodeConnection) error
// WithGRPC configures a gRPC connection provider for the NodeConnection.
// If endpoint is empty, this option is a no-op.
func WithGRPC(ctx context.Context, endpoint string, dialOpts []grpc.DialOption) NodeConnectionOption {
return func(c *nodeConnection) error {
if endpoint == "" {
return nil
}
provider, err := grpcutil.NewGrpcConnectionProvider(ctx, endpoint, dialOpts)
if err != nil {
return errors.Wrap(err, "failed to create gRPC connection provider")
}
c.grpcConnectionProvider = provider
return nil
}
}
// WithREST configures a REST connection provider for the NodeConnection.
// If endpoint is empty, this option is a no-op.
func WithREST(endpoint string, opts ...rest.RestConnectionProviderOption) NodeConnectionOption {
return func(c *nodeConnection) error {
if endpoint == "" {
return nil
}
provider, err := rest.NewRestConnectionProvider(endpoint, opts...)
if err != nil {
return errors.Wrap(err, "failed to create REST connection provider")
}
c.restConnectionProvider = provider
return nil
}
}
// WithGRPCProvider sets a pre-built gRPC connection provider.
func WithGRPCProvider(provider grpcutil.GrpcConnectionProvider) NodeConnectionOption {
return func(c *nodeConnection) error {
c.grpcConnectionProvider = provider
return nil
}
}
// WithRestProvider sets a pre-built REST connection provider.
func WithRestProvider(provider rest.RestConnectionProvider) NodeConnectionOption {
return func(c *nodeConnection) error {
c.restConnectionProvider = provider
return nil
}
}
// NewNodeConnection creates a new NodeConnection with the given options.
// At least one provider (gRPC or REST) must be configured via options.
// Returns an error if no providers are configured.
func NewNodeConnection(opts ...NodeConnectionOption) (NodeConnection, error) {
c := &nodeConnection{}
for _, opt := range opts {
if err := opt(c); err != nil {
return nil, err
}
}
if c.grpcConnectionProvider == nil && c.restConnectionProvider == nil {
return nil, errors.New("at least one beacon node endpoint must be provided (--beacon-rpc-provider or --beacon-rest-api-provider)")
}
return c, nil
}

View File

@@ -0,0 +1,101 @@
package helpers
import (
"context"
"testing"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/api/rest"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"google.golang.org/grpc"
)
func TestNewNodeConnection(t *testing.T) {
t.Run("with both providers", func(t *testing.T) {
grpcProvider := &grpcutil.MockGrpcProvider{MockHosts: []string{"localhost:4000"}}
restProvider := &rest.MockRestProvider{MockHosts: []string{"http://localhost:3500"}}
conn, err := NewNodeConnection(
WithGRPCProvider(grpcProvider),
WithRestProvider(restProvider),
)
require.NoError(t, err)
assert.Equal(t, grpcProvider, conn.GetGrpcConnectionProvider())
assert.Equal(t, restProvider, conn.GetRestConnectionProvider())
})
t.Run("with only rest provider", func(t *testing.T) {
restProvider := &rest.MockRestProvider{MockHosts: []string{"http://localhost:3500"}}
conn, err := NewNodeConnection(WithRestProvider(restProvider))
require.NoError(t, err)
assert.Equal(t, (grpcutil.GrpcConnectionProvider)(nil), conn.GetGrpcConnectionProvider())
assert.Equal(t, (*grpc.ClientConn)(nil), conn.GetGrpcClientConn())
assert.Equal(t, restProvider, conn.GetRestConnectionProvider())
})
t.Run("with only grpc provider", func(t *testing.T) {
grpcProvider := &grpcutil.MockGrpcProvider{MockHosts: []string{"localhost:4000"}}
conn, err := NewNodeConnection(WithGRPCProvider(grpcProvider))
require.NoError(t, err)
assert.Equal(t, grpcProvider, conn.GetGrpcConnectionProvider())
assert.Equal(t, (rest.RestConnectionProvider)(nil), conn.GetRestConnectionProvider())
assert.Equal(t, (rest.RestHandler)(nil), conn.GetRestHandler())
})
t.Run("with no providers returns error", func(t *testing.T) {
conn, err := NewNodeConnection()
require.ErrorContains(t, "at least one beacon node endpoint must be provided", err)
assert.Equal(t, (NodeConnection)(nil), conn)
})
t.Run("with empty endpoints is no-op", func(t *testing.T) {
// Empty endpoints should be skipped, resulting in no providers
conn, err := NewNodeConnection(
WithGRPC(context.Background(), "", nil),
WithREST(""),
)
require.ErrorContains(t, "at least one beacon node endpoint must be provided", err)
assert.Equal(t, (NodeConnection)(nil), conn)
})
}
func TestNodeConnection_GetGrpcClientConn(t *testing.T) {
t.Run("delegates to provider", func(t *testing.T) {
// We can't easily create a real grpc.ClientConn in tests,
// but we can verify the delegation works with nil
grpcProvider := &grpcutil.MockGrpcProvider{MockConn: nil, MockHosts: []string{"localhost:4000"}}
conn, err := NewNodeConnection(WithGRPCProvider(grpcProvider))
require.NoError(t, err)
// Should delegate to provider.CurrentConn()
assert.Equal(t, grpcProvider.CurrentConn(), conn.GetGrpcClientConn())
})
t.Run("returns nil when provider is nil", func(t *testing.T) {
restProvider := &rest.MockRestProvider{MockHosts: []string{"http://localhost:3500"}}
conn, err := NewNodeConnection(WithRestProvider(restProvider))
require.NoError(t, err)
assert.Equal(t, (*grpc.ClientConn)(nil), conn.GetGrpcClientConn())
})
}
func TestNodeConnection_GetRestHandler(t *testing.T) {
t.Run("delegates to provider", func(t *testing.T) {
mockHandler := &rest.MockRestHandler{}
restProvider := &rest.MockRestProvider{MockHandler: mockHandler, MockHosts: []string{"http://localhost:3500"}}
conn, err := NewNodeConnection(WithRestProvider(restProvider))
require.NoError(t, err)
assert.Equal(t, mockHandler, conn.GetRestHandler())
})
t.Run("returns nil when provider is nil", func(t *testing.T) {
grpcProvider := &grpcutil.MockGrpcProvider{MockHosts: []string{"localhost:4000"}}
conn, err := NewNodeConnection(WithGRPCProvider(grpcProvider))
require.NoError(t, err)
assert.Equal(t, (rest.RestHandler)(nil), conn.GetRestHandler())
})
}

View File

@@ -41,6 +41,8 @@ func TestNode_Builds(t *testing.T) {
set.String("wallet-password-file", passwordFile, "path to wallet password")
set.String("keymanager-kind", "imported", "keymanager kind")
set.String("verbosity", "debug", "log verbosity")
set.String("beacon-rpc-provider", "localhost:4000", "beacon node RPC endpoint")
set.String("beacon-rest-api-provider", "http://localhost:3500", "beacon node REST API endpoint")
require.NoError(t, set.Set(flags.WalletPasswordFileFlag.Name, passwordFile))
ctx := cli.NewContext(&app, set, nil)
opts := []accounts.Option{

View File

@@ -23,9 +23,9 @@ go_library(
],
deps = [
"//api:go_default_library",
"//api/client:go_default_library",
"//api/grpc:go_default_library",
"//api/pagination:go_default_library",
"//api/rest:go_default_library",
"//api/server:go_default_library",
"//api/server/httprest:go_default_library",
"//api/server/middleware:go_default_library",
@@ -55,7 +55,6 @@ go_library(
"//validator/accounts/petnames:go_default_library",
"//validator/accounts/wallet:go_default_library",
"//validator/client:go_default_library",
"//validator/client/beacon-api:go_default_library",
"//validator/client/beacon-chain-client-factory:go_default_library",
"//validator/client/iface:go_default_library",
"//validator/client/node-client-factory:go_default_library",
@@ -79,7 +78,6 @@ go_library(
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_wealdtech_go_eth2_wallet_encryptor_keystorev4//:go_default_library",
"@io_opentelemetry_go_contrib_instrumentation_net_http_otelhttp//:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//metadata:go_default_library",
@@ -106,6 +104,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//api:go_default_library",
"//api/grpc:go_default_library",
"//async/event:go_default_library",
"//cmd/validator/flags:go_default_library",
"//config/features:go_default_library",

View File

@@ -1,13 +1,10 @@
package rpc
import (
"net/http"
api "github.com/OffchainLabs/prysm/v7/api/client"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/api/rest"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/validator/client"
beaconApi "github.com/OffchainLabs/prysm/v7/validator/client/beacon-api"
beaconChainClientFactory "github.com/OffchainLabs/prysm/v7/validator/client/beacon-chain-client-factory"
nodeClientFactory "github.com/OffchainLabs/prysm/v7/validator/client/node-client-factory"
validatorClientFactory "github.com/OffchainLabs/prysm/v7/validator/client/validator-client-factory"
@@ -17,7 +14,6 @@ import (
grpcopentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"google.golang.org/grpc"
)
@@ -41,30 +37,26 @@ func (s *Server) registerBeaconClient() error {
s.ctx = grpcutil.AppendHeaders(s.ctx, s.grpcHeaders)
grpcConn, err := grpc.DialContext(s.ctx, s.beaconNodeEndpoint, dialOpts...)
conn, err := validatorHelpers.NewNodeConnection(
validatorHelpers.WithGRPC(s.ctx, s.beaconNodeEndpoint, dialOpts),
validatorHelpers.WithREST(s.beaconApiEndpoint,
rest.WithHttpHeaders(s.beaconApiHeaders),
rest.WithHttpTimeout(s.beaconApiTimeout),
rest.WithTracing(),
),
)
if err != nil {
return errors.Wrapf(err, "could not dial endpoint: %s", s.beaconNodeEndpoint)
return err
}
if s.beaconNodeCert != "" {
if s.beaconNodeCert != "" && s.beaconNodeEndpoint != "" {
log.Info("Established secure gRPC connection")
}
s.healthClient = ethpb.NewHealthClient(grpcConn)
if grpcConn := conn.GetGrpcClientConn(); grpcConn != nil {
s.healthClient = ethpb.NewHealthClient(grpcConn)
}
conn := validatorHelpers.NewNodeConnection(
grpcConn,
s.beaconApiEndpoint,
validatorHelpers.WithBeaconApiHeaders(s.beaconApiHeaders),
validatorHelpers.WithBeaconApiTimeout(s.beaconApiTimeout),
)
headersTransport := api.NewCustomHeadersTransport(http.DefaultTransport, conn.GetBeaconApiHeaders())
restHandler := beaconApi.NewBeaconApiRestHandler(
http.Client{Timeout: s.beaconApiTimeout, Transport: otelhttp.NewTransport(headersTransport)},
s.beaconApiEndpoint,
)
s.chainClient = beaconChainClientFactory.NewChainClient(conn, restHandler)
s.nodeClient = nodeClientFactory.NewNodeClient(conn, restHandler)
s.beaconNodeValidatorClient = validatorClientFactory.NewValidatorClient(conn, restHandler)
s.chainClient = beaconChainClientFactory.NewChainClient(conn)
s.nodeClient = nodeClientFactory.NewNodeClient(conn)
s.beaconNodeValidatorClient = validatorClientFactory.NewValidatorClient(conn)
return nil
}

View File

@@ -3,19 +3,17 @@ package rpc
import (
"testing"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"google.golang.org/grpc/metadata"
)
func TestGrpcHeaders(t *testing.T) {
s := &Server{
ctx: t.Context(),
grpcHeaders: []string{"first=value1", "second=value2"},
}
err := s.registerBeaconClient()
require.NoError(t, err)
md, _ := metadata.FromOutgoingContext(s.ctx)
ctx := t.Context()
grpcHeaders := []string{"first=value1", "second=value2"}
ctx = grpcutil.AppendHeaders(ctx, grpcHeaders)
md, _ := metadata.FromOutgoingContext(ctx)
require.Equal(t, 2, md.Len(), "MetadataV0 contains wrong number of values")
assert.Equal(t, "value1", md.Get("first")[0])
assert.Equal(t, "value2", md.Get("second")[0])

View File

@@ -22,6 +22,7 @@ import (
"github.com/OffchainLabs/prysm/v7/validator/client"
"github.com/OffchainLabs/prysm/v7/validator/client/testutil"
"github.com/OffchainLabs/prysm/v7/validator/keymanager"
validatorTesting "github.com/OffchainLabs/prysm/v7/validator/testing"
"github.com/google/uuid"
"github.com/tyler-smith/go-bip39"
keystorev4 "github.com/wealdtech/go-eth2-wallet-encryptor-keystorev4"
@@ -46,6 +47,7 @@ func TestServer_CreateWallet_Local(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: validatorTesting.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -443,6 +445,7 @@ func TestServer_WalletConfig(t *testing.T) {
require.NoError(t, err)
s.wallet = w
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: validatorTesting.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,

View File

@@ -53,6 +53,7 @@ func TestServer_ListAccounts(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: constant.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -158,6 +159,7 @@ func TestServer_BackupAccounts(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: constant.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -282,6 +284,7 @@ func TestServer_VoluntaryExit(t *testing.T) {
require.NoError(t, err)
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: constant.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,

View File

@@ -52,6 +52,7 @@ func TestServer_ListKeystores(t *testing.T) {
t.Run("wallet not ready", func(t *testing.T) {
m := &testutil.FakeValidator{}
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
})
require.NoError(t, err)
@@ -81,6 +82,7 @@ func TestServer_ListKeystores(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -147,6 +149,7 @@ func TestServer_ImportKeystores(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -368,6 +371,7 @@ func TestServer_ImportKeystores_WrongKeymanagerKind(t *testing.T) {
}})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -652,6 +656,7 @@ func TestServer_DeleteKeystores_WrongKeymanagerKind(t *testing.T) {
}})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -695,6 +700,7 @@ func setupServerWithWallet(t testing.TB) *Server {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -730,6 +736,7 @@ func TestServer_SetVoluntaryExit(t *testing.T) {
m := &testutil.FakeValidator{Km: km}
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
})
require.NoError(t, err)
@@ -953,6 +960,7 @@ func TestServer_GetGasLimit(t *testing.T) {
err := m.SetProposerSettings(ctx, tt.args)
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
})
require.NoError(t, err)
@@ -1111,6 +1119,7 @@ func TestServer_SetGasLimit(t *testing.T) {
require.NoError(t, err)
validatorDB := dbtest.SetupDB(t, t.TempDir(), [][fieldparams.BLSPubkeyLength]byte{}, isSlashingProtectionMinimal)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
DB: validatorDB,
})
@@ -1300,6 +1309,7 @@ func TestServer_DeleteGasLimit(t *testing.T) {
require.NoError(t, err)
validatorDB := dbtest.SetupDB(t, t.TempDir(), [][fieldparams.BLSPubkeyLength]byte{}, isSlashingProtectionMinimal)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
DB: validatorDB,
})
@@ -1348,6 +1358,7 @@ func TestServer_ListRemoteKeys(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false, Web3SignerConfig: config})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -1404,6 +1415,7 @@ func TestServer_ImportRemoteKeys(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false, Web3SignerConfig: config})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -1466,6 +1478,7 @@ func TestServer_DeleteRemoteKeys(t *testing.T) {
km, err := w.InitializeKeymanager(ctx, iface.InitKeymanagerConfig{ListenForChanges: false, Web3SignerConfig: config})
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Wallet: w,
Validator: &testutil.FakeValidator{
Km: km,
@@ -1567,6 +1580,7 @@ func TestServer_ListFeeRecipientByPubkey(t *testing.T) {
require.NoError(t, err)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
})
require.NoError(t, err)
@@ -1591,6 +1605,7 @@ func TestServer_ListFeeRecipientByPubKey_NoFeeRecipientSet(t *testing.T) {
ctx := t.Context()
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: &testutil.FakeValidator{},
})
require.NoError(t, err)
@@ -1780,6 +1795,7 @@ func TestServer_FeeRecipientByPubkey(t *testing.T) {
// save a default here
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
DB: validatorDB,
})
@@ -1890,6 +1906,7 @@ func TestServer_DeleteFeeRecipientByPubkey(t *testing.T) {
require.NoError(t, err)
validatorDB := dbtest.SetupDB(t, t.TempDir(), [][fieldparams.BLSPubkeyLength]byte{}, isSlashingProtectionMinimal)
vs, err := client.NewValidatorService(ctx, &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
DB: validatorDB,
})
@@ -1940,6 +1957,7 @@ func TestServer_Graffiti(t *testing.T) {
graffiti := "graffiti"
m := &testutil.FakeValidator{}
vs, err := client.NewValidatorService(t.Context(), &client.Config{
Conn: mocks.MockNodeConnection(),
Validator: m,
})
require.NoError(t, err)

View File

@@ -5,6 +5,7 @@ go_library(
testonly = True,
srcs = [
"constants.go",
"mock_node_connection.go",
"mock_protector.go",
"protection_history.go",
],
@@ -14,6 +15,7 @@ go_library(
"//validator:__subpackages__",
],
deps = [
"//api/grpc:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
@@ -22,6 +24,7 @@ go_library(
"//encoding/bytesutil:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//validator/db/common:go_default_library",
"//validator/helpers:go_default_library",
"//validator/slashing-protection-history/format:go_default_library",
],
)

View File

@@ -0,0 +1,16 @@
package testing
import (
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/validator/helpers"
)
// MockNodeConnection creates a minimal NodeConnection for testing.
func MockNodeConnection() helpers.NodeConnection {
conn, _ := helpers.NewNodeConnection(
helpers.WithGRPCProvider(&grpcutil.MockGrpcProvider{
MockHosts: []string{"mock:4000"},
}),
)
return conn
}