mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-09 15:37:56 -05:00
replace receive slot with event stream (#13563)
* WIP * event stream wip * returning nil * temp removing some tests * wip health checks * fixing conficts * updating fields based on linting * fixing more errors * fixing mocks * fixing more mocks * fixing more linting * removing white space for lint * fixing log format * gaz * reverting changes on grpc * fixing unit tests * adding in tests for health tracker and event stream * adding more tests for streaming slot * gaz * Update api/client/event/event_stream.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * review comments * Update validator/client/runner.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update validator/client/validator.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update validator/client/validator.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update validator/client/validator.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update validator/client/validator.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update validator/client/beacon-api/beacon_api_validator_client.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update validator/client/validator.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * Update validator/client/validator.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * addressing radek comments * Update validator/client/validator.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * addressing review feedback * moving things to below next slot ticker * fixing tests * update naming * adding TODO comment * Update api/client/beacon/health.go Co-authored-by: Radosław Kapka <rkapka@wp.pl> * addressing comments * fixing broken linting * fixing more import issues * fixing more import issues * linting * updating based on radek's comments * addressing more comments * fixing nogo error * fixing duplicate import * gaz * adding radek's review suggestion * Update proto/prysm/v1alpha1/node.proto Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com> * preston review comments * Update api/client/event/event_stream.go Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com> * Update validator/client/validator.go Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com> * addressing some more preston review items * fixing tests for linting * fixing missed linting * updating based on feedback to simplify * adding interface check at the top * reverting some comments * cleaning up intatiations * reworking the health tracker * fixing linting * fixing more linting to adhear to interface * adding interface check at the the top of the file * fixing unit tests * attempting to fix dependency cycle * addressing radek's comment * Update validator/client/beacon-api/beacon_api_validator_client.go Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com> * adding more tests and feedback items * fixing TODO comment --------- Co-authored-by: Radosław Kapka <rkapka@wp.pl> Co-authored-by: Preston Van Loon <pvanloon@offchainlabs.com>
This commit is contained in:
@@ -87,10 +87,7 @@ func (acm *CLIManager) prepareBeaconClients(ctx context.Context) (*iface.Validat
|
||||
acm.beaconApiTimeout,
|
||||
)
|
||||
|
||||
restHandler := &beaconApi.BeaconApiJsonRestHandler{
|
||||
HttpClient: http.Client{Timeout: acm.beaconApiTimeout},
|
||||
Host: acm.beaconApiEndpoint,
|
||||
}
|
||||
restHandler := beaconApi.NewBeaconApiJsonRestHandler(http.Client{Timeout: acm.beaconApiTimeout}, acm.beaconApiEndpoint)
|
||||
validatorClient := validatorClientFactory.NewValidatorClient(conn, restHandler)
|
||||
nodeClient := nodeClientFactory.NewNodeClient(conn, restHandler)
|
||||
|
||||
|
||||
@@ -10,6 +10,8 @@ go_library(
|
||||
"//validator:__subpackages__",
|
||||
],
|
||||
deps = [
|
||||
"//api/client/beacon:go_default_library",
|
||||
"//api/client/event:go_default_library",
|
||||
"//config/proposer:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/beacon"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/event"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/proposer"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
@@ -213,14 +215,18 @@ func (m *Validator) SetProposerSettings(_ context.Context, settings *proposer.Se
|
||||
return nil
|
||||
}
|
||||
|
||||
func (_ *Validator) StartEventStream(_ context.Context) error {
|
||||
func (*Validator) StartEventStream(_ context.Context, _ []string, _ chan<- *event.Event) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (_ *Validator) EventStreamIsRunning() bool {
|
||||
func (*Validator) ProcessEvent(event *event.Event) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (_ *Validator) NodeIsHealthy(ctx context.Context) bool {
|
||||
func (*Validator) EventStreamIsRunning() bool {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (*Validator) HealthTracker() *beacon.NodeHealthTracker {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
@@ -23,7 +23,11 @@ go_library(
|
||||
"//validator:__subpackages__",
|
||||
],
|
||||
deps = [
|
||||
"//api/client:go_default_library",
|
||||
"//api/client/beacon:go_default_library",
|
||||
"//api/client/event:go_default_library",
|
||||
"//api/grpc:go_default_library",
|
||||
"//api/server/structs:go_default_library",
|
||||
"//async:go_default_library",
|
||||
"//async/event:go_default_library",
|
||||
"//beacon-chain/builder:go_default_library",
|
||||
@@ -115,6 +119,8 @@ go_test(
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//api/client/beacon:go_default_library",
|
||||
"//api/client/beacon/testing:go_default_library",
|
||||
"//async/event:go_default_library",
|
||||
"//beacon-chain/core/signing:go_default_library",
|
||||
"//cache/lru:go_default_library",
|
||||
|
||||
@@ -16,7 +16,6 @@ go_library(
|
||||
"domain_data.go",
|
||||
"doppelganger.go",
|
||||
"duties.go",
|
||||
"event_handler.go",
|
||||
"genesis.go",
|
||||
"get_beacon_block.go",
|
||||
"index.go",
|
||||
@@ -43,10 +42,11 @@ go_library(
|
||||
visibility = ["//validator:__subpackages__"],
|
||||
deps = [
|
||||
"//api:go_default_library",
|
||||
"//api/client/beacon:go_default_library",
|
||||
"//api/client/event:go_default_library",
|
||||
"//api/server/structs:go_default_library",
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/core/signing:go_default_library",
|
||||
"//beacon-chain/rpc/eth/events:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//consensus-types/validator:go_default_library",
|
||||
@@ -86,7 +86,6 @@ go_test(
|
||||
"domain_data_test.go",
|
||||
"doppelganger_test.go",
|
||||
"duties_test.go",
|
||||
"event_handler_test.go",
|
||||
"genesis_test.go",
|
||||
"get_beacon_block_test.go",
|
||||
"index_test.go",
|
||||
@@ -138,7 +137,6 @@ go_test(
|
||||
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
|
||||
"@com_github_golang_protobuf//ptypes/empty",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
"@org_golang_google_protobuf//types/known/emptypb:go_default_library",
|
||||
"@org_golang_google_protobuf//types/known/timestamppb:go_default_library",
|
||||
"@org_uber_go_mock//gomock:go_default_library",
|
||||
|
||||
@@ -7,16 +7,22 @@ import (
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/beacon"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/server/structs"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/validator/client/iface"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
)
|
||||
|
||||
var (
|
||||
_ = iface.NodeClient(&beaconApiNodeClient{})
|
||||
)
|
||||
|
||||
type beaconApiNodeClient struct {
|
||||
fallbackClient iface.NodeClient
|
||||
jsonRestHandler JsonRestHandler
|
||||
genesisProvider GenesisProvider
|
||||
healthTracker *beacon.NodeHealthTracker
|
||||
}
|
||||
|
||||
func (c *beaconApiNodeClient) GetSyncStatus(ctx context.Context, _ *empty.Empty) (*ethpb.SyncStatus, error) {
|
||||
@@ -101,10 +107,16 @@ func (c *beaconApiNodeClient) IsHealthy(ctx context.Context) bool {
|
||||
return c.jsonRestHandler.Get(ctx, "/eth/v1/node/health", nil) == nil
|
||||
}
|
||||
|
||||
func (c *beaconApiNodeClient) HealthTracker() *beacon.NodeHealthTracker {
|
||||
return c.healthTracker
|
||||
}
|
||||
|
||||
func NewNodeClientWithFallback(jsonRestHandler JsonRestHandler, fallbackClient iface.NodeClient) iface.NodeClient {
|
||||
return &beaconApiNodeClient{
|
||||
b := &beaconApiNodeClient{
|
||||
jsonRestHandler: jsonRestHandler,
|
||||
fallbackClient: fallbackClient,
|
||||
genesisProvider: beaconApiGenesisProvider{jsonRestHandler: jsonRestHandler},
|
||||
}
|
||||
b.healthTracker = beacon.NewNodeHealthTracker(b)
|
||||
return b
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/event"
|
||||
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/validator/client/iface"
|
||||
@@ -14,20 +15,14 @@ import (
|
||||
|
||||
type ValidatorClientOpt func(*beaconApiValidatorClient)
|
||||
|
||||
func WithEventHandler(h *EventHandler) ValidatorClientOpt {
|
||||
return func(c *beaconApiValidatorClient) {
|
||||
c.eventHandler = h
|
||||
}
|
||||
}
|
||||
|
||||
type beaconApiValidatorClient struct {
|
||||
genesisProvider GenesisProvider
|
||||
dutiesProvider dutiesProvider
|
||||
stateValidatorsProvider StateValidatorsProvider
|
||||
jsonRestHandler JsonRestHandler
|
||||
eventHandler *EventHandler
|
||||
beaconBlockConverter BeaconBlockConverter
|
||||
prysmBeaconChainCLient iface.PrysmBeaconChainClient
|
||||
isEventStreamRunning bool
|
||||
}
|
||||
|
||||
func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler, opts ...ValidatorClientOpt) iface.ValidatorClient {
|
||||
@@ -41,6 +36,7 @@ func NewBeaconApiValidatorClient(jsonRestHandler JsonRestHandler, opts ...Valida
|
||||
nodeClient: &beaconApiNodeClient{jsonRestHandler: jsonRestHandler},
|
||||
jsonRestHandler: jsonRestHandler,
|
||||
},
|
||||
isEventStreamRunning: false,
|
||||
}
|
||||
for _, o := range opts {
|
||||
o(c)
|
||||
@@ -135,10 +131,6 @@ func (c *beaconApiValidatorClient) ProposeExit(ctx context.Context, in *ethpb.Si
|
||||
})
|
||||
}
|
||||
|
||||
func (c *beaconApiValidatorClient) StreamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest) (ethpb.BeaconNodeValidator_StreamSlotsClient, error) {
|
||||
return c.streamSlots(ctx, in, time.Second), nil
|
||||
}
|
||||
|
||||
func (c *beaconApiValidatorClient) StreamBlocksAltair(ctx context.Context, in *ethpb.StreamBlocksRequest) (ethpb.BeaconNodeValidator_StreamBlocksAltairClient, error) {
|
||||
return c.streamBlocks(ctx, in, time.Second), nil
|
||||
}
|
||||
@@ -198,17 +190,22 @@ func (c *beaconApiValidatorClient) WaitForChainStart(ctx context.Context, _ *emp
|
||||
return c.waitForChainStart(ctx)
|
||||
}
|
||||
|
||||
func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context) error {
|
||||
if c.eventHandler != nil {
|
||||
if err := c.eventHandler.get(ctx, []string{"head"}); err != nil {
|
||||
return errors.Wrapf(err, "could not invoke event handler")
|
||||
func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context, topics []string, eventsChannel chan<- *event.Event) {
|
||||
eventStream, err := event.NewEventStream(ctx, c.jsonRestHandler.HttpClient(), c.jsonRestHandler.Host(), topics)
|
||||
if err != nil {
|
||||
eventsChannel <- &event.Event{
|
||||
EventType: event.EventError,
|
||||
Data: []byte(errors.Wrap(err, "failed to start event stream").Error()),
|
||||
}
|
||||
return
|
||||
}
|
||||
return nil
|
||||
c.isEventStreamRunning = true
|
||||
eventStream.Subscribe(eventsChannel)
|
||||
c.isEventStreamRunning = false
|
||||
}
|
||||
|
||||
func (c *beaconApiValidatorClient) EventStreamIsRunning() bool {
|
||||
return c.eventHandler.running
|
||||
return c.isEventStreamRunning
|
||||
}
|
||||
|
||||
func (c *beaconApiValidatorClient) GetAggregatedSelections(ctx context.Context, selections []iface.BeaconCommitteeSelection) ([]iface.BeaconCommitteeSelection, error) {
|
||||
|
||||
@@ -1,134 +0,0 @@
|
||||
package beacon_api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/api"
|
||||
)
|
||||
|
||||
// Currently set to the first power of 2 bigger than the size of the `head` event
|
||||
// which is 446 bytes
|
||||
const eventByteLimit = 512
|
||||
|
||||
// EventHandler is responsible for subscribing to the Beacon API events endpoint
|
||||
// and dispatching received events to subscribers.
|
||||
type EventHandler struct {
|
||||
httpClient *http.Client
|
||||
host string
|
||||
running bool
|
||||
subs []eventSub
|
||||
sync.Mutex
|
||||
}
|
||||
|
||||
type eventSub struct {
|
||||
name string
|
||||
ch chan<- event
|
||||
}
|
||||
|
||||
type event struct {
|
||||
eventType string
|
||||
data string
|
||||
}
|
||||
|
||||
// NewEventHandler returns a new handler.
|
||||
func NewEventHandler(httpClient *http.Client, host string) *EventHandler {
|
||||
return &EventHandler{
|
||||
httpClient: httpClient,
|
||||
host: host,
|
||||
running: false,
|
||||
subs: make([]eventSub, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func (h *EventHandler) subscribe(sub eventSub) {
|
||||
h.Lock()
|
||||
h.subs = append(h.subs, sub)
|
||||
h.Unlock()
|
||||
}
|
||||
|
||||
func (h *EventHandler) get(ctx context.Context, topics []string) error {
|
||||
if len(topics) == 0 {
|
||||
return errors.New("no topics provided")
|
||||
}
|
||||
if h.running {
|
||||
log.Warn("Event listener is already running, ignoring function call")
|
||||
}
|
||||
|
||||
go func() {
|
||||
h.running = true
|
||||
defer func() { h.running = false }()
|
||||
|
||||
allTopics := strings.Join(topics, ",")
|
||||
log.Info("Starting listening to Beacon API events on topics: " + allTopics)
|
||||
url := h.host + "/eth/v1/events?topics=" + allTopics
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to create HTTP request")
|
||||
return
|
||||
}
|
||||
req.Header.Set("Accept", api.EventStreamMediaType)
|
||||
req.Header.Set("Connection", api.KeepAlive)
|
||||
resp, err := h.httpClient.Do(req)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to perform HTTP request")
|
||||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if closeErr := resp.Body.Close(); closeErr != nil {
|
||||
log.WithError(closeErr).Error("Failed to close events response body")
|
||||
}
|
||||
}()
|
||||
|
||||
// We signal an EOF error in a special way. When we get this error while reading the response body,
|
||||
// there might still be an event received in the body that we should handle.
|
||||
eof := false
|
||||
for {
|
||||
if ctx.Err() != nil {
|
||||
log.WithError(ctx.Err()).Error("Stopping listening to Beacon API events")
|
||||
return
|
||||
}
|
||||
|
||||
rawData := make([]byte, eventByteLimit)
|
||||
_, err = resp.Body.Read(rawData)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "EOF") {
|
||||
log.Error("Received EOF while reading events response body. Stopping listening to Beacon API events")
|
||||
eof = true
|
||||
} else {
|
||||
log.WithError(err).Error("Stopping listening to Beacon API events")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
e := strings.Split(string(rawData), "\n")
|
||||
// We expect that the event format will contain event type and data separated with a newline
|
||||
if len(e) < 2 {
|
||||
// We reached EOF and there is no event to send
|
||||
if eof {
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
for _, sub := range h.subs {
|
||||
select {
|
||||
case sub.ch <- event{eventType: e[0], data: e[1]}:
|
||||
// Event sent successfully.
|
||||
default:
|
||||
log.Warn("Subscriber '" + sub.name + "' not ready to receive events")
|
||||
}
|
||||
}
|
||||
// We reached EOF and sent the last event
|
||||
if eof {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -1,55 +0,0 @@
|
||||
package beacon_api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/assert"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/require"
|
||||
logtest "github.com/sirupsen/logrus/hooks/test"
|
||||
)
|
||||
|
||||
func TestEventHandler(t *testing.T) {
|
||||
logHook := logtest.NewGlobal()
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, r *http.Request) {
|
||||
flusher, ok := w.(http.Flusher)
|
||||
require.Equal(t, true, ok)
|
||||
_, err := fmt.Fprint(w, "head\ndata\n\n")
|
||||
require.NoError(t, err)
|
||||
flusher.Flush()
|
||||
})
|
||||
server := httptest.NewServer(mux)
|
||||
defer server.Close()
|
||||
|
||||
handler := NewEventHandler(http.DefaultClient, server.URL)
|
||||
ch1 := make(chan event, 1)
|
||||
sub1 := eventSub{ch: ch1}
|
||||
ch2 := make(chan event, 1)
|
||||
sub2 := eventSub{ch: ch2}
|
||||
ch3 := make(chan event, 1)
|
||||
sub3 := eventSub{name: "sub3", ch: ch3}
|
||||
// fill up the channel so that it can't receive more events
|
||||
ch3 <- event{}
|
||||
handler.subscribe(sub1)
|
||||
handler.subscribe(sub2)
|
||||
handler.subscribe(sub3)
|
||||
|
||||
require.NoError(t, handler.get(context.Background(), []string{"head"}))
|
||||
// make sure the goroutine inside handler.get is invoked
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
e := <-ch1
|
||||
assert.Equal(t, "head", e.eventType)
|
||||
assert.Equal(t, "data", e.data)
|
||||
e = <-ch2
|
||||
assert.Equal(t, "head", e.eventType)
|
||||
assert.Equal(t, "data", e.data)
|
||||
|
||||
assert.LogsContain(t, logHook, "Subscriber 'sub3' not ready to receive events")
|
||||
}
|
||||
@@ -16,23 +16,43 @@ import (
|
||||
type JsonRestHandler interface {
|
||||
Get(ctx context.Context, endpoint string, resp interface{}) error
|
||||
Post(ctx context.Context, endpoint string, headers map[string]string, data *bytes.Buffer, resp interface{}) error
|
||||
HttpClient() *http.Client
|
||||
Host() string
|
||||
}
|
||||
|
||||
type BeaconApiJsonRestHandler struct {
|
||||
HttpClient http.Client
|
||||
Host string
|
||||
client http.Client
|
||||
host string
|
||||
}
|
||||
|
||||
// NewBeaconApiJsonRestHandler returns a JsonRestHandler
|
||||
func NewBeaconApiJsonRestHandler(client http.Client, host string) JsonRestHandler {
|
||||
return &BeaconApiJsonRestHandler{
|
||||
client: client,
|
||||
host: host,
|
||||
}
|
||||
}
|
||||
|
||||
// GetHttpClient returns the underlying HTTP client of the handler
|
||||
func (c BeaconApiJsonRestHandler) HttpClient() *http.Client {
|
||||
return &c.client
|
||||
}
|
||||
|
||||
// GetHost returns the underlying HTTP host
|
||||
func (c BeaconApiJsonRestHandler) Host() string {
|
||||
return c.host
|
||||
}
|
||||
|
||||
// Get sends a GET request and decodes the response body as a JSON object into the passed in object.
|
||||
// If an HTTP error is returned, the body is decoded as a DefaultJsonError JSON object and returned as the first return value.
|
||||
func (c BeaconApiJsonRestHandler) Get(ctx context.Context, endpoint string, resp interface{}) error {
|
||||
url := c.Host + endpoint
|
||||
url := c.host + endpoint
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to create request for endpoint %s", url)
|
||||
}
|
||||
|
||||
httpResp, err := c.HttpClient.Do(req)
|
||||
httpResp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to perform request for endpoint %s", url)
|
||||
}
|
||||
@@ -58,7 +78,7 @@ func (c BeaconApiJsonRestHandler) Post(
|
||||
return errors.New("data is nil")
|
||||
}
|
||||
|
||||
url := c.Host + apiEndpoint
|
||||
url := c.host + apiEndpoint
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, data)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to create request for endpoint %s", url)
|
||||
@@ -69,7 +89,7 @@ func (c BeaconApiJsonRestHandler) Post(
|
||||
}
|
||||
req.Header.Set("Content-Type", api.JsonMediaType)
|
||||
|
||||
httpResp, err := c.HttpClient.Do(req)
|
||||
httpResp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "failed to perform request for endpoint %s", url)
|
||||
}
|
||||
|
||||
@@ -41,8 +41,8 @@ func TestGet(t *testing.T) {
|
||||
defer server.Close()
|
||||
|
||||
jsonRestHandler := BeaconApiJsonRestHandler{
|
||||
HttpClient: http.Client{Timeout: time.Second * 5},
|
||||
Host: server.URL,
|
||||
client: http.Client{Timeout: time.Second * 5},
|
||||
host: server.URL,
|
||||
}
|
||||
resp := &structs.GetGenesisResponse{}
|
||||
require.NoError(t, jsonRestHandler.Get(ctx, endpoint+"?arg1=abc&arg2=def", resp))
|
||||
@@ -87,8 +87,8 @@ func TestPost(t *testing.T) {
|
||||
defer server.Close()
|
||||
|
||||
jsonRestHandler := BeaconApiJsonRestHandler{
|
||||
HttpClient: http.Client{Timeout: time.Second * 5},
|
||||
Host: server.URL,
|
||||
client: http.Client{Timeout: time.Second * 5},
|
||||
host: server.URL,
|
||||
}
|
||||
resp := &structs.GetGenesisResponse{}
|
||||
require.NoError(t, jsonRestHandler.Post(ctx, endpoint, headers, bytes.NewBuffer(dataBytes), resp))
|
||||
|
||||
@@ -12,6 +12,7 @@ package mock
|
||||
import (
|
||||
bytes "bytes"
|
||||
context "context"
|
||||
"net/http"
|
||||
reflect "reflect"
|
||||
|
||||
gomock "go.uber.org/mock/gomock"
|
||||
@@ -35,6 +36,14 @@ func NewMockJsonRestHandler(ctrl *gomock.Controller) *MockJsonRestHandler {
|
||||
return mock
|
||||
}
|
||||
|
||||
func (mr *MockJsonRestHandler) HttpClient() *http.Client {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mr *MockJsonRestHandler) Host() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||
func (m *MockJsonRestHandler) EXPECT() *MockJsonRestHandlerMockRecorder {
|
||||
return m.recorder
|
||||
@@ -67,3 +76,4 @@ func (mr *MockJsonRestHandlerMockRecorder) Post(ctx, endpoint, headers, data, re
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Post", reflect.TypeOf((*MockJsonRestHandler)(nil).Post), ctx, endpoint, headers, data, resp)
|
||||
}
|
||||
|
||||
|
||||
@@ -4,13 +4,11 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/server/structs"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/rpc/eth/events"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"google.golang.org/grpc"
|
||||
@@ -23,15 +21,6 @@ type abstractSignedBlockResponseJson struct {
|
||||
Data json.RawMessage `json:"data"`
|
||||
}
|
||||
|
||||
type streamSlotsClient struct {
|
||||
grpc.ClientStream
|
||||
ctx context.Context
|
||||
beaconApiClient beaconApiValidatorClient
|
||||
streamSlotsRequest *ethpb.StreamSlotsRequest
|
||||
pingDelay time.Duration
|
||||
ch chan event
|
||||
}
|
||||
|
||||
type streamBlocksAltairClient struct {
|
||||
grpc.ClientStream
|
||||
ctx context.Context
|
||||
@@ -47,18 +36,6 @@ type headSignedBeaconBlockResult struct {
|
||||
slot primitives.Slot
|
||||
}
|
||||
|
||||
func (c beaconApiValidatorClient) streamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest, pingDelay time.Duration) ethpb.BeaconNodeValidator_StreamSlotsClient {
|
||||
ch := make(chan event, 1)
|
||||
c.eventHandler.subscribe(eventSub{name: "stream slots", ch: ch})
|
||||
return &streamSlotsClient{
|
||||
ctx: ctx,
|
||||
beaconApiClient: c,
|
||||
streamSlotsRequest: in,
|
||||
pingDelay: pingDelay,
|
||||
ch: ch,
|
||||
}
|
||||
}
|
||||
|
||||
func (c beaconApiValidatorClient) streamBlocks(ctx context.Context, in *ethpb.StreamBlocksRequest, pingDelay time.Duration) ethpb.BeaconNodeValidator_StreamBlocksAltairClient {
|
||||
return &streamBlocksAltairClient{
|
||||
ctx: ctx,
|
||||
@@ -68,30 +45,6 @@ func (c beaconApiValidatorClient) streamBlocks(ctx context.Context, in *ethpb.St
|
||||
}
|
||||
}
|
||||
|
||||
func (c *streamSlotsClient) Recv() (*ethpb.StreamSlotsResponse, error) {
|
||||
for {
|
||||
select {
|
||||
case rawEvent := <-c.ch:
|
||||
if rawEvent.eventType != events.HeadTopic {
|
||||
continue
|
||||
}
|
||||
e := &structs.HeadEvent{}
|
||||
if err := json.Unmarshal([]byte(rawEvent.data), e); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to unmarshal head event into JSON")
|
||||
}
|
||||
uintSlot, err := strconv.ParseUint(e.Slot, 10, 64)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to parse slot")
|
||||
}
|
||||
return ðpb.StreamSlotsResponse{
|
||||
Slot: primitives.Slot(uintSlot),
|
||||
}, nil
|
||||
case <-c.ctx.Done():
|
||||
return nil, errors.New("context canceled")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *streamBlocksAltairClient) Recv() (*ethpb.StreamBlocksResponse, error) {
|
||||
result, err := c.beaconApiClient.getHeadSignedBeaconBlock(c.ctx)
|
||||
if err != nil {
|
||||
|
||||
@@ -11,6 +11,10 @@ go_library(
|
||||
importpath = "github.com/prysmaticlabs/prysm/v5/validator/client/grpc-api",
|
||||
visibility = ["//validator:__subpackages__"],
|
||||
deps = [
|
||||
"//api/client:go_default_library",
|
||||
"//api/client/beacon:go_default_library",
|
||||
"//api/client/event:go_default_library",
|
||||
"//api/server/structs:go_default_library",
|
||||
"//beacon-chain/rpc/eth/helpers:go_default_library",
|
||||
"//beacon-chain/state/state-native:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
@@ -20,6 +24,8 @@ go_library(
|
||||
"//validator/client/iface:go_default_library",
|
||||
"@com_github_golang_protobuf//ptypes/empty",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@com_github_sirupsen_logrus//:go_default_library",
|
||||
"@io_opencensus_go//trace:go_default_library",
|
||||
"@org_golang_google_grpc//:go_default_library",
|
||||
],
|
||||
)
|
||||
@@ -33,6 +39,8 @@ go_test(
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//api/client/event:go_default_library",
|
||||
"//api/server/structs:go_default_library",
|
||||
"//config/params:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//consensus-types/validator:go_default_library",
|
||||
@@ -43,6 +51,7 @@ go_test(
|
||||
"//testing/util:go_default_library",
|
||||
"//testing/validator-mock:go_default_library",
|
||||
"//validator/client/iface:go_default_library",
|
||||
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
|
||||
"@org_golang_google_protobuf//types/known/emptypb:go_default_library",
|
||||
"@org_uber_go_mock//gomock:go_default_library",
|
||||
],
|
||||
|
||||
@@ -4,13 +4,20 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/beacon"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/validator/client/iface"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
var (
|
||||
_ = iface.NodeClient(&grpcNodeClient{})
|
||||
)
|
||||
|
||||
type grpcNodeClient struct {
|
||||
nodeClient ethpb.NodeClient
|
||||
nodeClient ethpb.NodeClient
|
||||
healthTracker *beacon.NodeHealthTracker
|
||||
}
|
||||
|
||||
func (c *grpcNodeClient) GetSyncStatus(ctx context.Context, in *empty.Empty) (*ethpb.SyncStatus, error) {
|
||||
@@ -29,10 +36,21 @@ func (c *grpcNodeClient) ListPeers(ctx context.Context, in *empty.Empty) (*ethpb
|
||||
return c.nodeClient.ListPeers(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcNodeClient) IsHealthy(context.Context) bool {
|
||||
panic("function not supported for gRPC client")
|
||||
func (c *grpcNodeClient) IsHealthy(ctx context.Context) bool {
|
||||
_, err := c.nodeClient.GetHealth(ctx, ðpb.HealthRequest{})
|
||||
if err != nil {
|
||||
log.WithError(err).Debug("failed to get health of node")
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *grpcNodeClient) HealthTracker() *beacon.NodeHealthTracker {
|
||||
return c.healthTracker
|
||||
}
|
||||
|
||||
func NewNodeClient(cc grpc.ClientConnInterface) iface.NodeClient {
|
||||
return &grpcNodeClient{ethpb.NewNodeClient(cc)}
|
||||
g := &grpcNodeClient{nodeClient: ethpb.NewNodeClient(cc)}
|
||||
g.healthTracker = beacon.NewNodeHealthTracker(g)
|
||||
return g
|
||||
}
|
||||
|
||||
@@ -2,16 +2,24 @@ package grpc_api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"strconv"
|
||||
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client"
|
||||
eventClient "github.com/prysmaticlabs/prysm/v5/api/client/event"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/server/structs"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/validator/client/iface"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"go.opencensus.io/trace"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type grpcValidatorClient struct {
|
||||
beaconNodeValidatorClient ethpb.BeaconNodeValidatorClient
|
||||
isEventStreamRunning bool
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) GetDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) {
|
||||
@@ -70,10 +78,6 @@ func (c *grpcValidatorClient) ProposeExit(ctx context.Context, in *ethpb.SignedV
|
||||
return c.beaconNodeValidatorClient.ProposeExit(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) StreamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest) (ethpb.BeaconNodeValidator_StreamSlotsClient, error) {
|
||||
return c.beaconNodeValidatorClient.StreamSlots(ctx, in)
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) StreamBlocksAltair(ctx context.Context, in *ethpb.StreamBlocksRequest) (ethpb.BeaconNodeValidator_StreamBlocksAltairClient, error) {
|
||||
return c.beaconNodeValidatorClient.StreamBlocksAltair(ctx, in)
|
||||
}
|
||||
@@ -119,7 +123,7 @@ func (c *grpcValidatorClient) WaitForChainStart(ctx context.Context, in *empty.E
|
||||
stream, err := c.beaconNodeValidatorClient.WaitForChainStart(ctx, in)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(
|
||||
iface.ErrConnectionIssue,
|
||||
client.ErrConnectionIssue,
|
||||
errors.Wrap(err, "could not setup beacon chain ChainStart streaming client").Error(),
|
||||
)
|
||||
}
|
||||
@@ -146,13 +150,97 @@ func (grpcValidatorClient) GetAggregatedSyncSelections(context.Context, []iface.
|
||||
}
|
||||
|
||||
func NewGrpcValidatorClient(cc grpc.ClientConnInterface) iface.ValidatorClient {
|
||||
return &grpcValidatorClient{ethpb.NewBeaconNodeValidatorClient(cc)}
|
||||
return &grpcValidatorClient{ethpb.NewBeaconNodeValidatorClient(cc), false}
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) StartEventStream(context.Context) error {
|
||||
panic("function not supported for gRPC client")
|
||||
func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []string, eventsChannel chan<- *eventClient.Event) {
|
||||
ctx, span := trace.StartSpan(ctx, "validator.gRPCClient.StartEventStream")
|
||||
defer span.End()
|
||||
if len(topics) == 0 {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventError,
|
||||
Data: []byte(errors.New("no topics were added").Error()),
|
||||
}
|
||||
return
|
||||
}
|
||||
// TODO(13563): ONLY WORKS WITH HEAD TOPIC RIGHT NOW/ONLY PROVIDES THE SLOT
|
||||
containsHead := false
|
||||
for i := range topics {
|
||||
if topics[i] == eventClient.EventHead {
|
||||
containsHead = true
|
||||
}
|
||||
}
|
||||
if !containsHead {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, "gRPC only supports the head topic, and head topic was not passed").Error()),
|
||||
}
|
||||
}
|
||||
if containsHead && len(topics) > 1 {
|
||||
log.Warn("gRPC only supports the head topic, other topics will be ignored")
|
||||
}
|
||||
|
||||
stream, err := c.beaconNodeValidatorClient.StreamSlots(ctx, ðpb.StreamSlotsRequest{VerifiedOnly: true})
|
||||
if err != nil {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, err.Error()).Error()),
|
||||
}
|
||||
return
|
||||
}
|
||||
c.isEventStreamRunning = true
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info("Context canceled, stopping event stream")
|
||||
close(eventsChannel)
|
||||
c.isEventStreamRunning = false
|
||||
return
|
||||
default:
|
||||
if ctx.Err() != nil {
|
||||
c.isEventStreamRunning = false
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, ctx.Err().Error()).Error()),
|
||||
}
|
||||
return
|
||||
}
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventError,
|
||||
Data: []byte(ctx.Err().Error()),
|
||||
}
|
||||
return
|
||||
}
|
||||
res, err := stream.Recv()
|
||||
if err != nil {
|
||||
c.isEventStreamRunning = false
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, err.Error()).Error()),
|
||||
}
|
||||
return
|
||||
}
|
||||
if res == nil {
|
||||
continue
|
||||
}
|
||||
b, err := json.Marshal(structs.HeadEvent{
|
||||
Slot: strconv.FormatUint(uint64(res.Slot), 10),
|
||||
})
|
||||
if err != nil {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventError,
|
||||
Data: []byte(errors.Wrap(err, "failed to marshal Head Event").Error()),
|
||||
}
|
||||
}
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventHead,
|
||||
Data: b,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *grpcValidatorClient) EventStreamIsRunning() bool {
|
||||
panic("function not supported for gRPC client")
|
||||
return c.isEventStreamRunning
|
||||
}
|
||||
|
||||
@@ -2,11 +2,18 @@ package grpc_api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
eventClient "github.com/prysmaticlabs/prysm/v5/api/client/event"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/server/structs"
|
||||
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/assert"
|
||||
mock2 "github.com/prysmaticlabs/prysm/v5/testing/mock"
|
||||
"github.com/prysmaticlabs/prysm/v5/testing/require"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
"go.uber.org/mock/gomock"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
)
|
||||
@@ -21,8 +28,105 @@ func TestWaitForChainStart_StreamSetupFails(t *testing.T) {
|
||||
gomock.Any(),
|
||||
).Return(nil, errors.New("failed stream"))
|
||||
|
||||
validatorClient := &grpcValidatorClient{beaconNodeValidatorClient}
|
||||
validatorClient := &grpcValidatorClient{beaconNodeValidatorClient, true}
|
||||
_, err := validatorClient.WaitForChainStart(context.Background(), &emptypb.Empty{})
|
||||
want := "could not setup beacon chain ChainStart streaming client"
|
||||
assert.ErrorContains(t, want, err)
|
||||
}
|
||||
|
||||
func TestStartEventStream(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
beaconNodeValidatorClient := mock2.NewMockBeaconNodeValidatorClient(ctrl)
|
||||
grpcClient := &grpcValidatorClient{beaconNodeValidatorClient, true}
|
||||
tests := []struct {
|
||||
name string
|
||||
topics []string
|
||||
prepare func()
|
||||
verify func(t *testing.T, event *eventClient.Event)
|
||||
}{
|
||||
{
|
||||
name: "Happy path Head topic",
|
||||
topics: []string{"head"},
|
||||
prepare: func() {
|
||||
stream := mock2.NewMockBeaconNodeValidator_StreamSlotsClient(ctrl)
|
||||
beaconNodeValidatorClient.EXPECT().StreamSlots(gomock.Any(),
|
||||
ð.StreamSlotsRequest{VerifiedOnly: true}).Return(stream, nil)
|
||||
stream.EXPECT().Context().Return(ctx).AnyTimes()
|
||||
stream.EXPECT().Recv().Return(
|
||||
ð.StreamSlotsResponse{Slot: 123},
|
||||
nil,
|
||||
).AnyTimes()
|
||||
},
|
||||
verify: func(t *testing.T, event *eventClient.Event) {
|
||||
require.Equal(t, event.EventType, eventClient.EventHead)
|
||||
head := structs.HeadEvent{}
|
||||
require.NoError(t, json.Unmarshal(event.Data, &head))
|
||||
require.Equal(t, head.Slot, "123")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no head produces error",
|
||||
topics: []string{"unsupportedTopic"},
|
||||
prepare: func() {
|
||||
stream := mock2.NewMockBeaconNodeValidator_StreamSlotsClient(ctrl)
|
||||
beaconNodeValidatorClient.EXPECT().StreamSlots(gomock.Any(),
|
||||
ð.StreamSlotsRequest{VerifiedOnly: true}).Return(stream, nil)
|
||||
stream.EXPECT().Context().Return(ctx).AnyTimes()
|
||||
stream.EXPECT().Recv().Return(
|
||||
ð.StreamSlotsResponse{Slot: 123},
|
||||
nil,
|
||||
).AnyTimes()
|
||||
},
|
||||
verify: func(t *testing.T, event *eventClient.Event) {
|
||||
require.Equal(t, event.EventType, eventClient.EventConnectionError)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "Unsupported topics warning",
|
||||
topics: []string{"head", "unsupportedTopic"},
|
||||
prepare: func() {
|
||||
stream := mock2.NewMockBeaconNodeValidator_StreamSlotsClient(ctrl)
|
||||
beaconNodeValidatorClient.EXPECT().StreamSlots(gomock.Any(),
|
||||
ð.StreamSlotsRequest{VerifiedOnly: true}).Return(stream, nil)
|
||||
stream.EXPECT().Context().Return(ctx).AnyTimes()
|
||||
stream.EXPECT().Recv().Return(
|
||||
ð.StreamSlotsResponse{Slot: 123},
|
||||
nil,
|
||||
).AnyTimes()
|
||||
},
|
||||
verify: func(t *testing.T, event *eventClient.Event) {
|
||||
require.Equal(t, event.EventType, eventClient.EventHead)
|
||||
head := structs.HeadEvent{}
|
||||
require.NoError(t, json.Unmarshal(event.Data, &head))
|
||||
require.Equal(t, head.Slot, "123")
|
||||
assert.LogsContain(t, hook, "gRPC only supports the head topic")
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "No topics error",
|
||||
topics: []string{},
|
||||
prepare: func() {},
|
||||
verify: func(t *testing.T, event *eventClient.Event) {
|
||||
require.Equal(t, event.EventType, eventClient.EventError)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
eventsChannel := make(chan *eventClient.Event, 1) // Buffer to prevent blocking
|
||||
tc.prepare() // Setup mock expectations
|
||||
|
||||
go grpcClient.StartEventStream(ctx, tc.topics, eventsChannel)
|
||||
|
||||
event := <-eventsChannel
|
||||
// Depending on what you're testing, you may need a timeout or a specific number of events to read
|
||||
time.AfterFunc(1*time.Second, cancel) // Prevents hanging forever
|
||||
tc.verify(t, event)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@ go_library(
|
||||
importpath = "github.com/prysmaticlabs/prysm/v5/validator/client/iface",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//api/client/beacon:go_default_library",
|
||||
"//api/client/event:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/proposer:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/beacon"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
)
|
||||
|
||||
@@ -12,5 +13,5 @@ type NodeClient interface {
|
||||
GetGenesis(ctx context.Context, in *empty.Empty) (*ethpb.Genesis, error)
|
||||
GetVersion(ctx context.Context, in *empty.Empty) (*ethpb.Version, error)
|
||||
ListPeers(ctx context.Context, in *empty.Empty) (*ethpb.Peers, error)
|
||||
IsHealthy(ctx context.Context) bool
|
||||
HealthTracker() *beacon.NodeHealthTracker
|
||||
}
|
||||
|
||||
@@ -2,9 +2,10 @@ package iface
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/beacon"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/event"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/proposer"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
@@ -14,9 +15,6 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/validator/keymanager"
|
||||
)
|
||||
|
||||
// ErrConnectionIssue represents a connection problem.
|
||||
var ErrConnectionIssue = errors.New("could not connect")
|
||||
|
||||
// ValidatorRole defines the validator role.
|
||||
type ValidatorRole int8
|
||||
|
||||
@@ -57,16 +55,16 @@ type Validator interface {
|
||||
UpdateDomainDataCaches(ctx context.Context, slot primitives.Slot)
|
||||
WaitForKeymanagerInitialization(ctx context.Context) error
|
||||
Keymanager() (keymanager.IKeymanager, error)
|
||||
ReceiveSlots(ctx context.Context, connectionErrorChannel chan<- error)
|
||||
HandleKeyReload(ctx context.Context, currentKeys [][fieldparams.BLSPubkeyLength]byte) (bool, error)
|
||||
CheckDoppelGanger(ctx context.Context) error
|
||||
PushProposerSettings(ctx context.Context, km keymanager.IKeymanager, slot primitives.Slot, deadline time.Time) error
|
||||
SignValidatorRegistrationRequest(ctx context.Context, signer SigningFunc, newValidatorRegistration *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error)
|
||||
StartEventStream(ctx context.Context, topics []string, eventsChan chan<- *event.Event)
|
||||
ProcessEvent(event *event.Event)
|
||||
ProposerSettings() *proposer.Settings
|
||||
SetProposerSettings(context.Context, *proposer.Settings) error
|
||||
StartEventStream(ctx context.Context) error
|
||||
EventStreamIsRunning() bool
|
||||
NodeIsHealthy(ctx context.Context) bool
|
||||
HealthTracker() *beacon.NodeHealthTracker
|
||||
}
|
||||
|
||||
// SigningFunc interface defines a type for the a function that signs a message
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/golang/protobuf/ptypes/empty"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/event"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
|
||||
)
|
||||
@@ -144,9 +145,8 @@ type ValidatorClient interface {
|
||||
GetSyncSubcommitteeIndex(ctx context.Context, in *ethpb.SyncSubcommitteeIndexRequest) (*ethpb.SyncSubcommitteeIndexResponse, error)
|
||||
GetSyncCommitteeContribution(ctx context.Context, in *ethpb.SyncCommitteeContributionRequest) (*ethpb.SyncCommitteeContribution, error)
|
||||
SubmitSignedContributionAndProof(ctx context.Context, in *ethpb.SignedContributionAndProof) (*empty.Empty, error)
|
||||
StreamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest) (ethpb.BeaconNodeValidator_StreamSlotsClient, error)
|
||||
SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error)
|
||||
StartEventStream(ctx context.Context) error
|
||||
StartEventStream(ctx context.Context, topics []string, eventsChannel chan<- *event.Event)
|
||||
EventStreamIsRunning() bool
|
||||
GetAggregatedSelections(ctx context.Context, selections []BeaconCommitteeSelection) ([]BeaconCommitteeSelection, error)
|
||||
GetAggregatedSyncSelections(ctx context.Context, selections []SyncCommitteeSelection) ([]SyncCommitteeSelection, error)
|
||||
|
||||
@@ -7,7 +7,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/features"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/event"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
@@ -40,12 +41,12 @@ func run(ctx context.Context, v iface.Validator) {
|
||||
if err != nil {
|
||||
return // Exit if context is canceled.
|
||||
}
|
||||
|
||||
connectionErrorChannel := make(chan error, 1)
|
||||
go v.ReceiveSlots(ctx, connectionErrorChannel)
|
||||
if err := v.UpdateDuties(ctx, headSlot); err != nil {
|
||||
handleAssignmentError(err, headSlot)
|
||||
}
|
||||
eventsChan := make(chan *event.Event, 1)
|
||||
healthTracker := v.HealthTracker()
|
||||
runHealthCheckRoutine(ctx, v, eventsChan)
|
||||
|
||||
accountsChangedChan := make(chan [][fieldparams.BLSPubkeyLength]byte, 1)
|
||||
km, err := v.Keymanager()
|
||||
@@ -76,15 +77,10 @@ func run(ctx context.Context, v iface.Validator) {
|
||||
sub.Unsubscribe()
|
||||
close(accountsChangedChan)
|
||||
return // Exit if context is canceled.
|
||||
case slotsError := <-connectionErrorChannel:
|
||||
if slotsError != nil {
|
||||
log.WithError(slotsError).Warn("slots stream interrupted")
|
||||
go v.ReceiveSlots(ctx, connectionErrorChannel)
|
||||
case slot := <-v.NextSlot():
|
||||
if !healthTracker.IsHealthy() {
|
||||
continue
|
||||
}
|
||||
case currentKeys := <-accountsChangedChan:
|
||||
onAccountsChanged(ctx, v, currentKeys, accountsChangedChan)
|
||||
case slot := <-v.NextSlot():
|
||||
span.AddAttributes(trace.Int64Attribute("slot", int64(slot))) // lint:ignore uintcast -- This conversion is OK for tracing.
|
||||
|
||||
deadline := v.SlotDeadline(slot)
|
||||
@@ -128,6 +124,22 @@ func run(ctx context.Context, v iface.Validator) {
|
||||
continue
|
||||
}
|
||||
performRoles(slotCtx, allRoles, v, slot, &wg, span)
|
||||
case isHealthyAgain := <-healthTracker.HealthUpdates():
|
||||
if isHealthyAgain {
|
||||
headSlot, err = initializeValidatorAndGetHeadSlot(ctx, v)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to re initialize validator and get head slot")
|
||||
continue
|
||||
}
|
||||
if err := v.UpdateDuties(ctx, headSlot); err != nil {
|
||||
handleAssignmentError(err, headSlot)
|
||||
continue
|
||||
}
|
||||
}
|
||||
case e := <-eventsChan:
|
||||
v.ProcessEvent(e)
|
||||
case currentKeys := <-accountsChangedChan: // should be less of a priority than next slot
|
||||
onAccountsChanged(ctx, v, currentKeys, accountsChangedChan)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -196,13 +208,6 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (
|
||||
log.WithError(err).Fatal("Could not wait for validator activation")
|
||||
}
|
||||
|
||||
if features.Get().EnableBeaconRESTApi {
|
||||
if err = v.StartEventStream(ctx); err != nil {
|
||||
log.WithError(err).Fatal("Could not start API event stream")
|
||||
}
|
||||
runHealthCheckRoutine(ctx, v)
|
||||
}
|
||||
|
||||
headSlot, err = v.CanonicalHeadSlot(ctx)
|
||||
if isConnectionError(err) {
|
||||
log.WithError(err).Warn("Could not get current canonical head slot")
|
||||
@@ -273,7 +278,7 @@ func performRoles(slotCtx context.Context, allRoles map[[48]byte][]iface.Validat
|
||||
}
|
||||
|
||||
func isConnectionError(err error) bool {
|
||||
return err != nil && errors.Is(err, iface.ErrConnectionIssue)
|
||||
return err != nil && errors.Is(err, client.ErrConnectionIssue)
|
||||
}
|
||||
|
||||
func handleAssignmentError(err error, slot primitives.Slot) {
|
||||
@@ -288,24 +293,23 @@ func handleAssignmentError(err error, slot primitives.Slot) {
|
||||
}
|
||||
}
|
||||
|
||||
func runHealthCheckRoutine(ctx context.Context, v iface.Validator) {
|
||||
func runHealthCheckRoutine(ctx context.Context, v iface.Validator, eventsChan chan<- *event.Event) {
|
||||
log.Info("Starting health check routine for beacon node apis")
|
||||
healthCheckTicker := time.NewTicker(time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second)
|
||||
tracker := v.HealthTracker()
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-healthCheckTicker.C:
|
||||
if v.NodeIsHealthy(ctx) && !v.EventStreamIsRunning() {
|
||||
if err := v.StartEventStream(ctx); err != nil {
|
||||
log.WithError(err).Error("Could not start API event stream")
|
||||
}
|
||||
}
|
||||
case <-ctx.Done():
|
||||
if ctx.Err() != nil {
|
||||
log.WithError(ctx.Err()).Error("Context cancelled")
|
||||
}
|
||||
log.Error("Context cancelled")
|
||||
// trigger the healthcheck immediately the first time
|
||||
for ; true; <-healthCheckTicker.C {
|
||||
if ctx.Err() != nil {
|
||||
log.WithError(ctx.Err()).Error("Context cancelled")
|
||||
return
|
||||
}
|
||||
isHealthy := tracker.CheckHealth(ctx)
|
||||
// in case of node returning healthy but event stream died
|
||||
if isHealthy && !v.EventStreamIsRunning() {
|
||||
log.Info("Event stream reconnecting...")
|
||||
go v.StartEventStream(ctx, event.DefaultEventTopics, eventsChan)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -8,6 +8,8 @@ import (
|
||||
|
||||
"github.com/ethereum/go-ethereum/common"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/beacon"
|
||||
healthTesting "github.com/prysmaticlabs/prysm/v5/api/client/beacon/testing"
|
||||
"github.com/prysmaticlabs/prysm/v5/async/event"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/params"
|
||||
@@ -18,6 +20,7 @@ import (
|
||||
"github.com/prysmaticlabs/prysm/v5/validator/client/iface"
|
||||
"github.com/prysmaticlabs/prysm/v5/validator/client/testutil"
|
||||
logTest "github.com/sirupsen/logrus/hooks/test"
|
||||
"go.uber.org/mock/gomock"
|
||||
)
|
||||
|
||||
func cancelledContext() context.Context {
|
||||
@@ -27,21 +30,41 @@ func cancelledContext() context.Context {
|
||||
}
|
||||
|
||||
func TestCancelledContext_CleansUpValidator(t *testing.T) {
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
v := &testutil.FakeValidator{
|
||||
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
|
||||
Tracker: tracker,
|
||||
}
|
||||
run(cancelledContext(), v)
|
||||
assert.Equal(t, true, v.DoneCalled, "Expected Done() to be called")
|
||||
}
|
||||
|
||||
func TestCancelledContext_WaitsForChainStart(t *testing.T) {
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
v := &testutil.FakeValidator{
|
||||
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
|
||||
Tracker: tracker,
|
||||
}
|
||||
run(cancelledContext(), v)
|
||||
assert.Equal(t, 1, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
|
||||
}
|
||||
|
||||
func TestRetry_On_ConnectionError(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
retry := 10
|
||||
node.EXPECT().IsHealthy(gomock.Any()).Return(true)
|
||||
v := &testutil.FakeValidator{
|
||||
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
|
||||
Tracker: tracker,
|
||||
RetryTillSuccess: retry,
|
||||
}
|
||||
backOffPeriod = 10 * time.Millisecond
|
||||
@@ -55,18 +78,31 @@ func TestRetry_On_ConnectionError(t *testing.T) {
|
||||
assert.Equal(t, retry*3, v.WaitForChainStartCalled, "Expected WaitForChainStart() to be called")
|
||||
assert.Equal(t, retry*2, v.WaitForSyncCalled, "Expected WaitForSync() to be called")
|
||||
assert.Equal(t, retry, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
|
||||
assert.Equal(t, retry, v.CanonicalHeadSlotCalled, "Expected WaitForActivation() to be called")
|
||||
assert.Equal(t, retry, v.ReceiveBlocksCalled, "Expected WaitForActivation() to be called")
|
||||
assert.Equal(t, retry, v.CanonicalHeadSlotCalled, "Expected CanonicalHeadSlotCalled() to be called")
|
||||
}
|
||||
|
||||
func TestCancelledContext_WaitsForActivation(t *testing.T) {
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
v := &testutil.FakeValidator{
|
||||
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
|
||||
Tracker: tracker,
|
||||
}
|
||||
run(cancelledContext(), v)
|
||||
assert.Equal(t, 1, v.WaitForActivationCalled, "Expected WaitForActivation() to be called")
|
||||
}
|
||||
|
||||
func TestUpdateDuties_NextSlot(t *testing.T) {
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
|
||||
// avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy
|
||||
_ = tracker.CheckHealth(context.Background())
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
slot := primitives.Slot(55)
|
||||
@@ -86,7 +122,14 @@ func TestUpdateDuties_NextSlot(t *testing.T) {
|
||||
|
||||
func TestUpdateDuties_HandlesError(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
|
||||
// avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy
|
||||
_ = tracker.CheckHealth(context.Background())
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
slot := primitives.Slot(55)
|
||||
@@ -105,7 +148,14 @@ func TestUpdateDuties_HandlesError(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRoleAt_NextSlot(t *testing.T) {
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
|
||||
// avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy
|
||||
_ = tracker.CheckHealth(context.Background())
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
slot := primitives.Slot(55)
|
||||
@@ -124,7 +174,14 @@ func TestRoleAt_NextSlot(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAttests_NextSlot(t *testing.T) {
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
|
||||
// avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy
|
||||
_ = tracker.CheckHealth(context.Background())
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
slot := primitives.Slot(55)
|
||||
@@ -144,7 +201,14 @@ func TestAttests_NextSlot(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestProposes_NextSlot(t *testing.T) {
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
|
||||
// avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy
|
||||
_ = tracker.CheckHealth(context.Background())
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
slot := primitives.Slot(55)
|
||||
@@ -164,7 +228,14 @@ func TestProposes_NextSlot(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestBothProposesAndAttests_NextSlot(t *testing.T) {
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
|
||||
// avoid race condition between the cancellation of the context in the go stream from slot and the setting of IsHealthy
|
||||
_ = tracker.CheckHealth(context.Background())
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
slot := primitives.Slot(55)
|
||||
@@ -188,7 +259,12 @@ func TestBothProposesAndAttests_NextSlot(t *testing.T) {
|
||||
func TestKeyReload_ActiveKey(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
km := &mockKeymanager{}
|
||||
v := &testutil.FakeValidator{Km: km}
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
|
||||
v := &testutil.FakeValidator{Km: km, Tracker: tracker}
|
||||
ac := make(chan [][fieldparams.BLSPubkeyLength]byte)
|
||||
current := [][fieldparams.BLSPubkeyLength]byte{testutil.ActiveKey}
|
||||
onAccountsChanged(ctx, v, current, ac)
|
||||
@@ -202,7 +278,12 @@ func TestKeyReload_NoActiveKey(t *testing.T) {
|
||||
na := notActive(t)
|
||||
ctx := context.Background()
|
||||
km := &mockKeymanager{}
|
||||
v := &testutil.FakeValidator{Km: km}
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
|
||||
v := &testutil.FakeValidator{Km: km, Tracker: tracker}
|
||||
ac := make(chan [][fieldparams.BLSPubkeyLength]byte)
|
||||
current := [][fieldparams.BLSPubkeyLength]byte{na}
|
||||
onAccountsChanged(ctx, v, current, ac)
|
||||
@@ -224,7 +305,12 @@ func notActive(t *testing.T) [fieldparams.BLSPubkeyLength]byte {
|
||||
}
|
||||
|
||||
func TestUpdateProposerSettingsAt_EpochStart(t *testing.T) {
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}}
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, Tracker: tracker}
|
||||
err := v.SetProposerSettings(context.Background(), &proposer.Settings{
|
||||
DefaultConfig: &proposer.Option{
|
||||
FeeRecipientConfig: &proposer.FeeRecipientConfig{
|
||||
@@ -249,7 +335,16 @@ func TestUpdateProposerSettingsAt_EpochStart(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestUpdateProposerSettingsAt_EpochEndOk(t *testing.T) {
|
||||
v := &testutil.FakeValidator{Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}}, ProposerSettingWait: time.Duration(params.BeaconConfig().SecondsPerSlot-1) * time.Second}
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
|
||||
v := &testutil.FakeValidator{
|
||||
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
|
||||
ProposerSettingWait: time.Duration(params.BeaconConfig().SecondsPerSlot-1) * time.Second,
|
||||
Tracker: tracker,
|
||||
}
|
||||
err := v.SetProposerSettings(context.Background(), &proposer.Settings{
|
||||
DefaultConfig: &proposer.Option{
|
||||
FeeRecipientConfig: &proposer.FeeRecipientConfig{
|
||||
@@ -275,9 +370,15 @@ func TestUpdateProposerSettingsAt_EpochEndOk(t *testing.T) {
|
||||
|
||||
func TestUpdateProposerSettings_ContinuesAfterValidatorRegistrationFails(t *testing.T) {
|
||||
errSomeotherError := errors.New("some internal error")
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
node := healthTesting.NewMockHealthClient(ctrl)
|
||||
tracker := beacon.NewNodeHealthTracker(node)
|
||||
node.EXPECT().IsHealthy(gomock.Any()).Return(true).AnyTimes()
|
||||
v := &testutil.FakeValidator{
|
||||
ProposerSettingsErr: errors.Wrap(ErrBuilderValidatorRegistration, errSomeotherError.Error()),
|
||||
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
|
||||
Tracker: tracker,
|
||||
}
|
||||
err := v.SetProposerSettings(context.Background(), &proposer.Settings{
|
||||
DefaultConfig: &proposer.Option{
|
||||
|
||||
@@ -194,14 +194,12 @@ func (v *ValidatorService) Start() {
|
||||
return
|
||||
}
|
||||
|
||||
restHandler := &beaconApi.BeaconApiJsonRestHandler{
|
||||
HttpClient: http.Client{Timeout: v.conn.GetBeaconApiTimeout()},
|
||||
Host: v.conn.GetBeaconApiUrl(),
|
||||
}
|
||||
restHandler := beaconApi.NewBeaconApiJsonRestHandler(
|
||||
http.Client{Timeout: v.conn.GetBeaconApiTimeout()},
|
||||
v.conn.GetBeaconApiUrl(),
|
||||
)
|
||||
|
||||
evHandler := beaconApi.NewEventHandler(http.DefaultClient, v.conn.GetBeaconApiUrl())
|
||||
opts := []beaconApi.ValidatorClientOpt{beaconApi.WithEventHandler(evHandler)}
|
||||
validatorClient := validatorClientFactory.NewValidatorClient(v.conn, restHandler, opts...)
|
||||
validatorClient := validatorClientFactory.NewValidatorClient(v.conn, restHandler)
|
||||
|
||||
valStruct := &validator{
|
||||
validatorClient: validatorClient,
|
||||
|
||||
@@ -10,6 +10,9 @@ go_library(
|
||||
importpath = "github.com/prysmaticlabs/prysm/v5/validator/client/testutil",
|
||||
visibility = ["//validator:__subpackages__"],
|
||||
deps = [
|
||||
"//api/client:go_default_library",
|
||||
"//api/client/beacon:go_default_library",
|
||||
"//api/client/event:go_default_library",
|
||||
"//config/fieldparams:go_default_library",
|
||||
"//config/proposer:go_default_library",
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
|
||||
@@ -5,6 +5,9 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
api "github.com/prysmaticlabs/prysm/v5/api/client"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/beacon"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/event"
|
||||
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
|
||||
"github.com/prysmaticlabs/prysm/v5/config/proposer"
|
||||
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
|
||||
@@ -55,6 +58,7 @@ type FakeValidator struct {
|
||||
proposerSettings *proposer.Settings
|
||||
ProposerSettingWait time.Duration
|
||||
Km keymanager.IKeymanager
|
||||
Tracker *beacon.NodeHealthTracker
|
||||
}
|
||||
|
||||
// Done for mocking.
|
||||
@@ -75,7 +79,7 @@ func (fv *FakeValidator) LogSubmittedSyncCommitteeMessages() {}
|
||||
func (fv *FakeValidator) WaitForChainStart(_ context.Context) error {
|
||||
fv.WaitForChainStartCalled++
|
||||
if fv.RetryTillSuccess >= fv.WaitForChainStartCalled {
|
||||
return iface.ErrConnectionIssue
|
||||
return api.ErrConnectionIssue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -87,7 +91,7 @@ func (fv *FakeValidator) WaitForActivation(_ context.Context, accountChan chan [
|
||||
return nil
|
||||
}
|
||||
if fv.RetryTillSuccess >= fv.WaitForActivationCalled {
|
||||
return iface.ErrConnectionIssue
|
||||
return api.ErrConnectionIssue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -96,7 +100,7 @@ func (fv *FakeValidator) WaitForActivation(_ context.Context, accountChan chan [
|
||||
func (fv *FakeValidator) WaitForSync(_ context.Context) error {
|
||||
fv.WaitForSyncCalled++
|
||||
if fv.RetryTillSuccess >= fv.WaitForSyncCalled {
|
||||
return iface.ErrConnectionIssue
|
||||
return api.ErrConnectionIssue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -111,7 +115,7 @@ func (fv *FakeValidator) SlasherReady(_ context.Context) error {
|
||||
func (fv *FakeValidator) CanonicalHeadSlot(_ context.Context) (primitives.Slot, error) {
|
||||
fv.CanonicalHeadSlotCalled++
|
||||
if fv.RetryTillSuccess > fv.CanonicalHeadSlotCalled {
|
||||
return 0, iface.ErrConnectionIssue
|
||||
return 0, api.ErrConnectionIssue
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
@@ -217,14 +221,6 @@ func (*FakeValidator) CheckDoppelGanger(_ context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReceiveSlots for mocking
|
||||
func (fv *FakeValidator) ReceiveSlots(_ context.Context, connectionErrorChannel chan<- error) {
|
||||
fv.ReceiveBlocksCalled++
|
||||
if fv.RetryTillSuccess > fv.ReceiveBlocksCalled {
|
||||
connectionErrorChannel <- iface.ErrConnectionIssue
|
||||
}
|
||||
}
|
||||
|
||||
// HandleKeyReload for mocking
|
||||
func (fv *FakeValidator) HandleKeyReload(_ context.Context, newKeys [][fieldparams.BLSPubkeyLength]byte) (anyActive bool, err error) {
|
||||
fv.HandleKeyReloadCalled = true
|
||||
@@ -286,14 +282,15 @@ func (fv *FakeValidator) SetProposerSettings(_ context.Context, settings *propos
|
||||
return nil
|
||||
}
|
||||
|
||||
func (fv *FakeValidator) StartEventStream(_ context.Context) error {
|
||||
return nil
|
||||
func (*FakeValidator) StartEventStream(_ context.Context, _ []string, _ chan<- *event.Event) {
|
||||
}
|
||||
|
||||
func (fv *FakeValidator) EventStreamIsRunning() bool {
|
||||
func (*FakeValidator) ProcessEvent(_ *event.Event) {}
|
||||
|
||||
func (*FakeValidator) EventStreamIsRunning() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (fv *FakeValidator) NodeIsHealthy(context.Context) bool {
|
||||
return true
|
||||
func (fv *FakeValidator) HealthTracker() *beacon.NodeHealthTracker {
|
||||
return fv.Tracker
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
@@ -20,6 +21,10 @@ import (
|
||||
"github.com/ethereum/go-ethereum/common/hexutil"
|
||||
lru "github.com/hashicorp/golang-lru"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/client/beacon"
|
||||
eventClient "github.com/prysmaticlabs/prysm/v5/api/client/event"
|
||||
"github.com/prysmaticlabs/prysm/v5/api/server/structs"
|
||||
"github.com/prysmaticlabs/prysm/v5/async/event"
|
||||
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/altair"
|
||||
"github.com/prysmaticlabs/prysm/v5/cmd"
|
||||
@@ -248,7 +253,7 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
|
||||
|
||||
chainStartRes, err := v.validatorClient.WaitForChainStart(ctx, &emptypb.Empty{})
|
||||
if err == io.EOF {
|
||||
return iface.ErrConnectionIssue
|
||||
return client.ErrConnectionIssue
|
||||
}
|
||||
|
||||
if ctx.Err() == context.Canceled {
|
||||
@@ -257,7 +262,7 @@ func (v *validator) WaitForChainStart(ctx context.Context) error {
|
||||
|
||||
if err != nil {
|
||||
return errors.Wrap(
|
||||
iface.ErrConnectionIssue,
|
||||
client.ErrConnectionIssue,
|
||||
errors.Wrap(err, "could not receive ChainStart from stream").Error(),
|
||||
)
|
||||
}
|
||||
@@ -310,7 +315,7 @@ func (v *validator) WaitForSync(ctx context.Context) error {
|
||||
|
||||
s, err := v.nodeClient.GetSyncStatus(ctx, &emptypb.Empty{})
|
||||
if err != nil {
|
||||
return errors.Wrap(iface.ErrConnectionIssue, errors.Wrap(err, "could not get sync status").Error())
|
||||
return errors.Wrap(client.ErrConnectionIssue, errors.Wrap(err, "could not get sync status").Error())
|
||||
}
|
||||
if !s.Syncing {
|
||||
return nil
|
||||
@@ -322,7 +327,7 @@ func (v *validator) WaitForSync(ctx context.Context) error {
|
||||
case <-time.After(slots.DivideSlotBy(2 /* twice per slot */)):
|
||||
s, err := v.nodeClient.GetSyncStatus(ctx, &emptypb.Empty{})
|
||||
if err != nil {
|
||||
return errors.Wrap(iface.ErrConnectionIssue, errors.Wrap(err, "could not get sync status").Error())
|
||||
return errors.Wrap(client.ErrConnectionIssue, errors.Wrap(err, "could not get sync status").Error())
|
||||
}
|
||||
if !s.Syncing {
|
||||
return nil
|
||||
@@ -334,35 +339,6 @@ func (v *validator) WaitForSync(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
// ReceiveSlots starts a stream listener to obtain
|
||||
// slots from the beacon node when it imports a block. Upon receiving a slot, the service
|
||||
// broadcasts it to a feed for other usages to subscribe to.
|
||||
func (v *validator) ReceiveSlots(ctx context.Context, connectionErrorChannel chan<- error) {
|
||||
stream, err := v.validatorClient.StreamSlots(ctx, ðpb.StreamSlotsRequest{VerifiedOnly: true})
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to retrieve slots stream, " + iface.ErrConnectionIssue.Error())
|
||||
connectionErrorChannel <- errors.Wrap(iface.ErrConnectionIssue, err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
if ctx.Err() == context.Canceled {
|
||||
log.WithError(ctx.Err()).Error("Context canceled - shutting down slots receiver")
|
||||
return
|
||||
}
|
||||
res, err := stream.Recv()
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Could not receive slots from beacon node: " + iface.ErrConnectionIssue.Error())
|
||||
connectionErrorChannel <- errors.Wrap(iface.ErrConnectionIssue, err.Error())
|
||||
return
|
||||
}
|
||||
if res == nil {
|
||||
continue
|
||||
}
|
||||
v.setHighestSlot(res.Slot)
|
||||
}
|
||||
}
|
||||
|
||||
func (v *validator) checkAndLogValidatorStatus(statuses []*validatorStatus, activeValCount int64) bool {
|
||||
nonexistentIndex := primitives.ValidatorIndex(^uint64(0))
|
||||
var validatorActivated bool
|
||||
@@ -429,7 +405,7 @@ func (v *validator) CanonicalHeadSlot(ctx context.Context) (primitives.Slot, err
|
||||
defer span.End()
|
||||
head, err := v.beaconClient.GetChainHead(ctx, &emptypb.Empty{})
|
||||
if err != nil {
|
||||
return 0, errors.Wrap(iface.ErrConnectionIssue, err.Error())
|
||||
return 0, errors.Wrap(client.ErrConnectionIssue, err.Error())
|
||||
}
|
||||
return head.HeadSlot, nil
|
||||
}
|
||||
@@ -1092,16 +1068,43 @@ func (v *validator) PushProposerSettings(ctx context.Context, km keymanager.IKey
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *validator) StartEventStream(ctx context.Context) error {
|
||||
return v.validatorClient.StartEventStream(ctx)
|
||||
func (v *validator) StartEventStream(ctx context.Context, topics []string, eventsChannel chan<- *eventClient.Event) {
|
||||
log.WithField("topics", topics).Info("Starting event stream")
|
||||
v.validatorClient.StartEventStream(ctx, topics, eventsChannel)
|
||||
}
|
||||
|
||||
func (v *validator) ProcessEvent(event *eventClient.Event) {
|
||||
if event == nil || event.Data == nil {
|
||||
log.Warn("Received empty event")
|
||||
}
|
||||
switch event.EventType {
|
||||
case eventClient.EventError:
|
||||
log.Error(string(event.Data))
|
||||
case eventClient.EventConnectionError:
|
||||
log.WithError(errors.New(string(event.Data))).Error("Event stream interrupted")
|
||||
case eventClient.EventHead:
|
||||
log.Debug("Received head event")
|
||||
head := &structs.HeadEvent{}
|
||||
if err := json.Unmarshal(event.Data, head); err != nil {
|
||||
log.WithError(err).Error("Failed to unmarshal head Event into JSON")
|
||||
}
|
||||
uintSlot, err := strconv.ParseUint(head.Slot, 10, 64)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to parse slot")
|
||||
}
|
||||
v.setHighestSlot(primitives.Slot(uintSlot))
|
||||
default:
|
||||
// just keep going and log the error
|
||||
log.WithField("type", event.EventType).WithField("data", string(event.Data)).Warn("Received an unknown event")
|
||||
}
|
||||
}
|
||||
|
||||
func (v *validator) EventStreamIsRunning() bool {
|
||||
return v.validatorClient.EventStreamIsRunning()
|
||||
}
|
||||
|
||||
func (v *validator) NodeIsHealthy(ctx context.Context) bool {
|
||||
return v.nodeClient.IsHealthy(ctx)
|
||||
func (v *validator) HealthTracker() *beacon.NodeHealthTracker {
|
||||
return v.nodeClient.HealthTracker()
|
||||
}
|
||||
|
||||
func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) {
|
||||
|
||||
@@ -943,33 +943,6 @@ func TestCheckAndLogValidatorStatus_OK(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestService_ReceiveSlots_SetHighest(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
client := validatormock.NewMockValidatorClient(ctrl)
|
||||
|
||||
v := validator{
|
||||
validatorClient: client,
|
||||
slotFeed: new(event.Feed),
|
||||
}
|
||||
stream := mock2.NewMockBeaconNodeValidator_StreamSlotsClient(ctrl)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
client.EXPECT().StreamSlots(
|
||||
gomock.Any(),
|
||||
ðpb.StreamSlotsRequest{VerifiedOnly: true},
|
||||
).Return(stream, nil)
|
||||
stream.EXPECT().Context().Return(ctx).AnyTimes()
|
||||
stream.EXPECT().Recv().Return(
|
||||
ðpb.StreamSlotsResponse{Slot: 123},
|
||||
nil,
|
||||
).Do(func() {
|
||||
cancel()
|
||||
})
|
||||
connectionErrorChannel := make(chan error)
|
||||
v.ReceiveSlots(ctx, connectionErrorChannel)
|
||||
require.Equal(t, primitives.Slot(123), v.highestValidSlot)
|
||||
}
|
||||
|
||||
type doppelGangerRequestMatcher struct {
|
||||
req *ethpb.DoppelGangerRequest
|
||||
}
|
||||
|
||||
@@ -54,10 +54,8 @@ func (s *Server) registerBeaconClient() error {
|
||||
s.beaconApiTimeout,
|
||||
)
|
||||
|
||||
restHandler := &beaconApi.BeaconApiJsonRestHandler{
|
||||
HttpClient: http.Client{Timeout: s.beaconApiTimeout},
|
||||
Host: s.beaconApiEndpoint,
|
||||
}
|
||||
restHandler := beaconApi.NewBeaconApiJsonRestHandler(http.Client{Timeout: s.beaconApiTimeout}, s.beaconApiEndpoint)
|
||||
|
||||
s.beaconChainClient = beaconChainClientFactory.NewBeaconChainClient(conn, restHandler)
|
||||
s.beaconNodeClient = nodeClientFactory.NewNodeClient(conn, restHandler)
|
||||
s.beaconNodeValidatorClient = validatorClientFactory.NewValidatorClient(conn, restHandler)
|
||||
|
||||
Reference in New Issue
Block a user