diff --git a/api/headers.go b/api/headers.go index 245cdbf59a..5973b81408 100644 --- a/api/headers.go +++ b/api/headers.go @@ -7,5 +7,4 @@ const ( ConsensusBlockValueHeader = "Eth-Consensus-Block-Value" JsonMediaType = "application/json" OctetStreamMediaType = "application/octet-stream" - EventStreamMediaType = "text/event-stream" ) diff --git a/beacon-chain/gateway/BUILD.bazel b/beacon-chain/gateway/BUILD.bazel index f0bb36f59b..5ea81d383e 100644 --- a/beacon-chain/gateway/BUILD.bazel +++ b/beacon-chain/gateway/BUILD.bazel @@ -6,7 +6,6 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/gateway", visibility = ["//beacon-chain:__subpackages__"], deps = [ - "//api:go_default_library", "//api/gateway:go_default_library", "//cmd/beacon-chain/flags:go_default_library", "//proto/prysm/v1alpha1:go_default_library", diff --git a/beacon-chain/gateway/helpers.go b/beacon-chain/gateway/helpers.go index d8abfe1dfd..b51777b03c 100644 --- a/beacon-chain/gateway/helpers.go +++ b/beacon-chain/gateway/helpers.go @@ -2,7 +2,6 @@ package gateway import ( gwruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" - "github.com/prysmaticlabs/prysm/v4/api" "github.com/prysmaticlabs/prysm/v4/api/gateway" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" ethpbalpha "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" @@ -41,7 +40,7 @@ func DefaultConfig(enableDebugRPCEndpoints bool, httpModules string) MuxConfig { }, }), gwruntime.WithMarshalerOption( - api.EventStreamMediaType, &gwruntime.EventSourceJSONPb{}, + "text/event-stream", &gwruntime.EventSourceJSONPb{}, ), ) v1AlphaPbHandler = &gateway.PbMux{ diff --git a/beacon-chain/rpc/eth/events/BUILD.bazel b/beacon-chain/rpc/eth/events/BUILD.bazel index 179eae40e6..dc7be32fc5 100644 --- a/beacon-chain/rpc/eth/events/BUILD.bazel +++ b/beacon-chain/rpc/eth/events/BUILD.bazel @@ -8,9 +8,8 @@ go_library( "structs.go", ], importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/events", - visibility = ["//visibility:public"], + visibility = ["//beacon-chain:__subpackages__"], deps = [ - "//api:go_default_library", "//beacon-chain/blockchain:go_default_library", "//beacon-chain/core/feed:go_default_library", "//beacon-chain/core/feed/operation:go_default_library", @@ -19,7 +18,6 @@ go_library( "//beacon-chain/core/time:go_default_library", "//beacon-chain/core/transition:go_default_library", "//beacon-chain/rpc/eth/shared:go_default_library", - "//config/params:go_default_library", "//network/httputil:go_default_library", "//proto/eth/v1:go_default_library", "//runtime/version:go_default_library", diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index b7d153886b..07b466f5b5 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -5,10 +5,8 @@ import ( "encoding/json" "fmt" "net/http" - time2 "time" "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/prysmaticlabs/prysm/v4/api" "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation" @@ -17,7 +15,6 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared" - "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/network/httputil" ethpb "github.com/prysmaticlabs/prysm/v4/proto/eth/v1" "github.com/prysmaticlabs/prysm/v4/runtime/version" @@ -109,24 +106,16 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) { defer stateSub.Unsubscribe() // Set up SSE response headers - w.Header().Set("Content-Type", api.EventStreamMediaType) + w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Connection", "keep-alive") // Handle each event received and context cancellation. - // We send a keepalive dummy message immediately to prevent clients - // stalling while waiting for the first response chunk. - // After that we send a keepalive dummy message every SECONDS_PER_SLOT - // to prevent anyone (e.g. proxy servers) from closing connections. - sendKeepalive(w, flusher) - keepaliveTicker := time2.NewTicker(time2.Duration(params.BeaconConfig().SecondsPerSlot) * time2.Second) for { select { case event := <-opsChan: handleBlockOperationEvents(w, flusher, topicsMap, event) case event := <-stateChan: s.handleStateEvents(ctx, w, flusher, topicsMap, event) - case <-keepaliveTicker.C: - sendKeepalive(w, flusher) case <-ctx.Done(): return } @@ -442,10 +431,6 @@ func send(w http.ResponseWriter, flusher http.Flusher, name string, data interfa write(w, flusher, "event: %s\ndata: %s\n\n", name, string(j)) } -func sendKeepalive(w http.ResponseWriter, flusher http.Flusher) { - write(w, flusher, ":\n\n") -} - func write(w http.ResponseWriter, flusher http.Flusher, format string, a ...any) { _, err := fmt.Fprintf(w, format, a...) if err != nil { diff --git a/beacon-chain/rpc/eth/events/events_test.go b/beacon-chain/rpc/eth/events/events_test.go index 236c3da257..1acb924456 100644 --- a/beacon-chain/rpc/eth/events/events_test.go +++ b/beacon-chain/rpc/eth/events/events_test.go @@ -375,9 +375,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { }) } -const operationsResult = `: - -event: attestation +const operationsResult = `event: attestation data: {"aggregation_bits":"0x00","data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"} event: attestation @@ -403,9 +401,7 @@ data: {"signed_header_1":{"message":{"slot":"0","proposer_index":"0","parent_roo ` -const stateResult = `: - -event: head +const stateResult = `event: head data: {"slot":"0","block":"0x0000000000000000000000000000000000000000000000000000000000000000","state":"0x0000000000000000000000000000000000000000000000000000000000000000","epoch_transition":true,"execution_optimistic":false,"previous_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","current_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000"} event: finalized_checkpoint @@ -419,23 +415,17 @@ data: {"slot":"0","block":"0xeade62f0457b2fdf48e7d3fc4b60736688286be7c7a3ac4c9a1 ` -const payloadAttributesBellatrixResult = `: - -event: payload_attributes +const payloadAttributesBellatrixResult = `event: payload_attributes data: {"version":"bellatrix","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000"}}} ` -const payloadAttributesCapellaResult = `: - -event: payload_attributes +const payloadAttributesCapellaResult = `event: payload_attributes data: {"version":"capella","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[]}}} ` -const payloadAttributesDenebResult = `: - -event: payload_attributes +const payloadAttributesDenebResult = `event: payload_attributes data: {"version":"deneb","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[],"parent_beacon_block_root":"0xbef96cb938fd48b2403d3e662664325abb0102ed12737cbb80d717520e50cf4a"}}} ` diff --git a/testing/validator-mock/validator_client_mock.go b/testing/validator-mock/validator_client_mock.go index 82afc94b2d..071b039d11 100644 --- a/testing/validator-mock/validator_client_mock.go +++ b/testing/validator-mock/validator_client_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/prysmaticlabs/prysm/v4/validator/client/iface (interfaces: ValidatorClient) -// Package mock is a generated GoMock package. +// Package validator_mock is a generated GoMock package. package validator_mock import ( @@ -247,20 +247,6 @@ func (mr *MockValidatorClientMockRecorder) ProposeExit(arg0, arg1 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProposeExit", reflect.TypeOf((*MockValidatorClient)(nil).ProposeExit), arg0, arg1) } -// StartEventStream mocks base method. -func (m *MockValidatorClient) StartEventStream(arg0 context.Context) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StartEventStream", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// StartEventStream indicates an expected call of StartEventStream. -func (mr *MockValidatorClientMockRecorder) StartEventStream(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartEventStream", reflect.TypeOf((*MockValidatorClient)(nil).StartEventStream), arg0) -} - // StreamSlots mocks base method. func (m *MockValidatorClient) StreamSlots(arg0 context.Context, arg1 *eth.StreamSlotsRequest) (eth.BeaconNodeValidator_StreamSlotsClient, error) { m.ctrl.T.Helper() diff --git a/validator/accounts/testing/mock.go b/validator/accounts/testing/mock.go index ba024e5c68..cb903c65ff 100644 --- a/validator/accounts/testing/mock.go +++ b/validator/accounts/testing/mock.go @@ -212,7 +212,3 @@ func (m *Validator) SetProposerSettings(_ context.Context, settings *validatorse m.proposerSettings = settings return nil } - -func (_ *Validator) StartEventStream(_ context.Context) error { - panic("implement me") -} diff --git a/validator/client/beacon-api/BUILD.bazel b/validator/client/beacon-api/BUILD.bazel index fad6d0ea3b..07746d098e 100644 --- a/validator/client/beacon-api/BUILD.bazel +++ b/validator/client/beacon-api/BUILD.bazel @@ -15,7 +15,6 @@ go_library( "domain_data.go", "doppelganger.go", "duties.go", - "event_handler.go", "genesis.go", "get_beacon_block.go", "index.go", @@ -44,7 +43,6 @@ go_library( "//beacon-chain/core/signing:go_default_library", "//beacon-chain/rpc/eth/beacon:go_default_library", "//beacon-chain/rpc/eth/config:go_default_library", - "//beacon-chain/rpc/eth/events:go_default_library", "//beacon-chain/rpc/eth/node:go_default_library", "//beacon-chain/rpc/eth/shared:go_default_library", "//beacon-chain/rpc/eth/validator:go_default_library", @@ -85,7 +83,6 @@ go_test( "domain_data_test.go", "doppelganger_test.go", "duties_test.go", - "event_handler_test.go", "genesis_test.go", "get_beacon_block_test.go", "index_test.go", @@ -142,7 +139,6 @@ go_test( "@com_github_golang_mock//gomock:go_default_library", "@com_github_golang_protobuf//ptypes/empty", "@com_github_pkg_errors//:go_default_library", - "@com_github_sirupsen_logrus//hooks/test:go_default_library", "@org_golang_google_protobuf//types/known/emptypb:go_default_library", "@org_golang_google_protobuf//types/known/timestamppb:go_default_library", ], diff --git a/validator/client/beacon-api/beacon_api_validator_client.go b/validator/client/beacon-api/beacon_api_validator_client.go index d0c761866b..67da2d6ebe 100644 --- a/validator/client/beacon-api/beacon_api_validator_client.go +++ b/validator/client/beacon-api/beacon_api_validator_client.go @@ -14,38 +14,22 @@ import ( "github.com/prysmaticlabs/prysm/v4/validator/client/iface" ) -type ValidatorClientOpt func(*beaconApiValidatorClient) - -func WithEventHandler(h *EventHandler) ValidatorClientOpt { - return func(c *beaconApiValidatorClient) { - c.eventHandler = h - } -} - -func WithEventErrorChannel(ch chan error) ValidatorClientOpt { - return func(c *beaconApiValidatorClient) { - c.eventErrCh = ch - } -} - type beaconApiValidatorClient struct { genesisProvider GenesisProvider dutiesProvider dutiesProvider stateValidatorsProvider StateValidatorsProvider jsonRestHandler JsonRestHandler - eventHandler *EventHandler - eventErrCh chan error beaconBlockConverter BeaconBlockConverter prysmBeaconChainCLient iface.PrysmBeaconChainClient } -func NewBeaconApiValidatorClient(host string, timeout time.Duration, opts ...ValidatorClientOpt) iface.ValidatorClient { +func NewBeaconApiValidatorClient(host string, timeout time.Duration) iface.ValidatorClient { jsonRestHandler := beaconApiJsonRestHandler{ httpClient: http.Client{Timeout: timeout}, host: host, } - c := &beaconApiValidatorClient{ + return &beaconApiValidatorClient{ genesisProvider: beaconApiGenesisProvider{jsonRestHandler: jsonRestHandler}, dutiesProvider: beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler}, stateValidatorsProvider: beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler}, @@ -56,10 +40,6 @@ func NewBeaconApiValidatorClient(host string, timeout time.Duration, opts ...Val jsonRestHandler: jsonRestHandler, }, } - for _, o := range opts { - o(c) - } - return c } func (c *beaconApiValidatorClient) GetDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) { @@ -175,15 +155,3 @@ func (c *beaconApiValidatorClient) WaitForActivation(ctx context.Context, in *et func (c *beaconApiValidatorClient) WaitForChainStart(ctx context.Context, _ *empty.Empty) (*ethpb.ChainStartResponse, error) { return c.waitForChainStart(ctx) } - -func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context) error { - if c.eventHandler != nil { - if c.eventErrCh == nil { - return errors.New("event handler cannot be initialized without an event error channel") - } - if err := c.eventHandler.get(ctx, []string{"head"}, c.eventErrCh); err != nil { - return errors.Wrapf(err, "event handler stopped working") - } - } - return nil -} diff --git a/validator/client/beacon-api/event_handler.go b/validator/client/beacon-api/event_handler.go deleted file mode 100644 index 85efa12865..0000000000 --- a/validator/client/beacon-api/event_handler.go +++ /dev/null @@ -1,126 +0,0 @@ -package beacon_api - -import ( - "context" - "net/http" - "strings" - "sync" - - "github.com/pkg/errors" - "github.com/prysmaticlabs/prysm/v4/api" -) - -// Currently set to the first power of 2 bigger than the size of the `head` event -// which is 446 bytes -const eventByteLimit = 512 - -// EventHandler is responsible for subscribing to the Beacon API events endpoint -// and dispatching received events to subscribers. -type EventHandler struct { - httpClient *http.Client - host string - subs []eventSub - sync.Mutex -} - -type eventSub struct { - name string - ch chan<- event -} - -type event struct { - eventType string - data string -} - -// NewEventHandler returns a new handler. -func NewEventHandler(httpClient *http.Client, host string) *EventHandler { - return &EventHandler{ - httpClient: httpClient, - host: host, - subs: make([]eventSub, 0), - } -} - -func (h *EventHandler) subscribe(sub eventSub) { - h.Lock() - h.subs = append(h.subs, sub) - h.Unlock() -} - -func (h *EventHandler) get(ctx context.Context, topics []string, eventErrCh chan<- error) error { - if len(topics) == 0 { - return errors.New("no topics provided") - } - - allTopics := strings.Join(topics, ",") - log.Info("Starting listening to Beacon API events on topics " + allTopics) - url := h.host + "/eth/v1/events?topics=" + allTopics - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - return errors.Wrap(err, "failed to create HTTP request") - } - - req.Header.Set("Accept", api.EventStreamMediaType) - req.Header.Set("Connection", "keep-alive") - - resp, err := h.httpClient.Do(req) - if err != nil { - return errors.Wrap(err, "failed to perform HTTP request") - } - - go func() { - defer func() { - if closeErr := resp.Body.Close(); closeErr != nil { - log.WithError(closeErr).Error("Failed to close events response body") - } - }() - - // We signal an EOF error in a special way. When we get this error while reading the response body, - // there might still be an event received in the body that we should handle. - eof := false - for { - if ctx.Err() != nil { - eventErrCh <- ctx.Err() - return - } - - rawData := make([]byte, eventByteLimit) - _, err = resp.Body.Read(rawData) - if err != nil { - if strings.Contains(err.Error(), "EOF") { - log.Error("Received EOF while reading events response body") - eof = true - } else { - eventErrCh <- err - return - } - } - - e := strings.Split(string(rawData), "\n") - // We expect that the event format will contain event type and data separated with a newline - if len(e) < 2 { - // We reached EOF and there is no event to send - if eof { - return - } - continue - } - - for _, sub := range h.subs { - select { - case sub.ch <- event{eventType: e[0], data: e[1]}: - // Event sent successfully. - default: - log.Warn("Subscriber '" + sub.name + "' not ready to receive events") - } - } - // We reached EOF and sent the last event - if eof { - return - } - } - }() - - return nil -} diff --git a/validator/client/beacon-api/event_handler_test.go b/validator/client/beacon-api/event_handler_test.go deleted file mode 100644 index 3de171cd4d..0000000000 --- a/validator/client/beacon-api/event_handler_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package beacon_api - -import ( - "context" - "fmt" - "net/http" - "net/http/httptest" - "testing" - "time" - - "github.com/prysmaticlabs/prysm/v4/testing/assert" - "github.com/prysmaticlabs/prysm/v4/testing/require" - logtest "github.com/sirupsen/logrus/hooks/test" -) - -func TestEventHandler(t *testing.T) { - logHook := logtest.NewGlobal() - - mux := http.NewServeMux() - mux.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, r *http.Request) { - flusher, ok := w.(http.Flusher) - require.Equal(t, true, ok) - _, err := fmt.Fprint(w, "head\ndata\n\n") - require.NoError(t, err) - flusher.Flush() - }) - server := httptest.NewServer(mux) - defer server.Close() - - handler := NewEventHandler(http.DefaultClient, server.URL) - ch1 := make(chan event, 1) - sub1 := eventSub{ch: ch1} - ch2 := make(chan event, 1) - sub2 := eventSub{ch: ch2} - ch3 := make(chan event, 1) - sub3 := eventSub{name: "sub3", ch: ch3} - // fill up the channel so that it can't receive more events - ch3 <- event{} - handler.subscribe(sub1) - handler.subscribe(sub2) - handler.subscribe(sub3) - - require.NoError(t, handler.get(context.Background(), []string{"head"}, make(chan error))) - // make sure the goroutine inside handler.get is invoked - time.Sleep(500 * time.Millisecond) - - e := <-ch1 - assert.Equal(t, "head", e.eventType) - assert.Equal(t, "data", e.data) - e = <-ch2 - assert.Equal(t, "head", e.eventType) - assert.Equal(t, "data", e.data) - - assert.LogsContain(t, logHook, "Subscriber 'sub3' not ready to receive events") -} diff --git a/validator/client/beacon-api/stream_blocks.go b/validator/client/beacon-api/stream_blocks.go index 2d6d453aec..d68b7e030a 100644 --- a/validator/client/beacon-api/stream_blocks.go +++ b/validator/client/beacon-api/stream_blocks.go @@ -4,12 +4,10 @@ import ( "bytes" "context" "encoding/json" - "strconv" "time" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" - "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/events" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" @@ -28,8 +26,8 @@ type streamSlotsClient struct { ctx context.Context beaconApiClient beaconApiValidatorClient streamSlotsRequest *ethpb.StreamSlotsRequest + prevBlockSlot primitives.Slot pingDelay time.Duration - ch chan event } type streamBlocksAltairClient struct { @@ -48,14 +46,11 @@ type headSignedBeaconBlockResult struct { } func (c beaconApiValidatorClient) streamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest, pingDelay time.Duration) ethpb.BeaconNodeValidator_StreamSlotsClient { - ch := make(chan event, 1) - c.eventHandler.subscribe(eventSub{name: "stream slots", ch: ch}) return &streamSlotsClient{ ctx: ctx, beaconApiClient: c, streamSlotsRequest: in, pingDelay: pingDelay, - ch: ch, } } @@ -69,27 +64,28 @@ func (c beaconApiValidatorClient) streamBlocks(ctx context.Context, in *ethpb.St } func (c *streamSlotsClient) Recv() (*ethpb.StreamSlotsResponse, error) { - for { + result, err := c.beaconApiClient.getHeadSignedBeaconBlock(c.ctx) + if err != nil { + return nil, errors.Wrap(err, "failed to get latest signed block") + } + + // We keep querying the beacon chain for the latest block until we receive a new slot + for (c.streamSlotsRequest.VerifiedOnly && result.executionOptimistic) || c.prevBlockSlot == result.slot { select { - case rawEvent := <-c.ch: - if rawEvent.eventType != events.HeadTopic { - continue - } - e := &events.HeadEvent{} - if err := json.Unmarshal([]byte(rawEvent.data), e); err != nil { - return nil, errors.Wrap(err, "failed to unmarshal head event into JSON") - } - uintSlot, err := strconv.ParseUint(e.Slot, 10, 64) + case <-time.After(c.pingDelay): + result, err = c.beaconApiClient.getHeadSignedBeaconBlock(c.ctx) if err != nil { - return nil, errors.Wrap(err, "failed to parse slot") + return nil, errors.Wrap(err, "failed to get latest signed block") } - return ðpb.StreamSlotsResponse{ - Slot: primitives.Slot(uintSlot), - }, nil case <-c.ctx.Done(): return nil, errors.New("context canceled") } } + + c.prevBlockSlot = result.slot + return ðpb.StreamSlotsResponse{ + Slot: result.slot, + }, nil } func (c *streamBlocksAltairClient) Recv() (*ethpb.StreamBlocksResponse, error) { diff --git a/validator/client/grpc-api/grpc_validator_client.go b/validator/client/grpc-api/grpc_validator_client.go index cfbe34455c..aa1c079130 100644 --- a/validator/client/grpc-api/grpc_validator_client.go +++ b/validator/client/grpc-api/grpc_validator_client.go @@ -141,8 +141,3 @@ func (c *grpcValidatorClient) AggregatedSigAndAggregationBits( func NewGrpcValidatorClient(cc grpc.ClientConnInterface) iface.ValidatorClient { return &grpcValidatorClient{ethpb.NewBeaconNodeValidatorClient(cc)} } - -// StartEventStream doesn't do anything in gRPC client -func (c *grpcValidatorClient) StartEventStream(context.Context) error { - return nil -} diff --git a/validator/client/iface/validator.go b/validator/client/iface/validator.go index 589086f5e5..a43fb97f66 100644 --- a/validator/client/iface/validator.go +++ b/validator/client/iface/validator.go @@ -64,7 +64,6 @@ type Validator interface { SignValidatorRegistrationRequest(ctx context.Context, signer SigningFunc, newValidatorRegistration *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error) ProposerSettings() *validatorserviceconfig.ProposerSettings SetProposerSettings(context.Context, *validatorserviceconfig.ProposerSettings) error - StartEventStream(ctx context.Context) error } // SigningFunc interface defines a type for the a function that signs a message diff --git a/validator/client/iface/validator_client.go b/validator/client/iface/validator_client.go index c29f47d158..e83e5d1faa 100644 --- a/validator/client/iface/validator_client.go +++ b/validator/client/iface/validator_client.go @@ -34,5 +34,4 @@ type ValidatorClient interface { SubmitSignedContributionAndProof(ctx context.Context, in *ethpb.SignedContributionAndProof) (*empty.Empty, error) StreamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest) (ethpb.BeaconNodeValidator_StreamSlotsClient, error) SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error) - StartEventStream(ctx context.Context) error } diff --git a/validator/client/runner.go b/validator/client/runner.go index a85ff92ab5..080c17f70e 100644 --- a/validator/client/runner.go +++ b/validator/client/runner.go @@ -189,9 +189,6 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) ( if err != nil { log.WithError(err).Fatal("Could not wait for validator activation") } - if err = v.StartEventStream(ctx); err != nil { - log.WithError(err).Fatal("Could not start API event stream") - } headSlot, err = v.CanonicalHeadSlot(ctx) if isConnectionError(err) { diff --git a/validator/client/service.go b/validator/client/service.go index 56de846314..566ed05153 100644 --- a/validator/client/service.go +++ b/validator/client/service.go @@ -2,7 +2,6 @@ package client import ( "context" - "net/http" "strings" "time" @@ -21,7 +20,6 @@ import ( "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/validator/accounts/wallet" - beaconApi "github.com/prysmaticlabs/prysm/v4/validator/client/beacon-api" beaconChainClientFactory "github.com/prysmaticlabs/prysm/v4/validator/client/beacon-chain-client-factory" "github.com/prysmaticlabs/prysm/v4/validator/client/iface" nodeClientFactory "github.com/prysmaticlabs/prysm/v4/validator/client/node-client-factory" @@ -191,16 +189,7 @@ func (v *ValidatorService) Start() { return } - evHandler := beaconApi.NewEventHandler(http.DefaultClient, v.conn.GetBeaconApiUrl()) - evErrCh := make(chan error) - opts := []beaconApi.ValidatorClientOpt{beaconApi.WithEventHandler(evHandler), beaconApi.WithEventErrorChannel(evErrCh)} - validatorClient := validatorClientFactory.NewValidatorClient(v.conn, opts...) - go func() { - e := <-evErrCh - log.WithError(e).Error("Event streaming failed") - v.cancel() - }() - + validatorClient := validatorClientFactory.NewValidatorClient(v.conn) beaconClient := beaconChainClientFactory.NewBeaconChainClient(v.conn) prysmBeaconClient := beaconChainClientFactory.NewPrysmBeaconClient(v.conn) diff --git a/validator/client/testutil/mock_validator.go b/validator/client/testutil/mock_validator.go index 72bb9272aa..b7956d04ea 100644 --- a/validator/client/testutil/mock_validator.go +++ b/validator/client/testutil/mock_validator.go @@ -174,18 +174,18 @@ func (fv *FakeValidator) ProposeBlock(_ context.Context, slot primitives.Slot, _ } // SubmitAggregateAndProof for mocking. -func (*FakeValidator) SubmitAggregateAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { +func (_ *FakeValidator) SubmitAggregateAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { } // SubmitSyncCommitteeMessage for mocking. -func (*FakeValidator) SubmitSyncCommitteeMessage(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { +func (_ *FakeValidator) SubmitSyncCommitteeMessage(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { } // LogAttestationsSubmitted for mocking. -func (*FakeValidator) LogAttestationsSubmitted() {} +func (_ *FakeValidator) LogAttestationsSubmitted() {} // UpdateDomainDataCaches for mocking. -func (*FakeValidator) UpdateDomainDataCaches(context.Context, primitives.Slot) {} +func (_ *FakeValidator) UpdateDomainDataCaches(context.Context, primitives.Slot) {} // BalancesByPubkeys for mocking. func (fv *FakeValidator) BalancesByPubkeys(_ context.Context) map[[fieldparams.BLSPubkeyLength]byte]uint64 { @@ -213,7 +213,7 @@ func (fv *FakeValidator) Keymanager() (keymanager.IKeymanager, error) { } // CheckDoppelGanger for mocking -func (*FakeValidator) CheckDoppelGanger(_ context.Context) error { +func (_ *FakeValidator) CheckDoppelGanger(_ context.Context) error { return nil } @@ -237,7 +237,7 @@ func (fv *FakeValidator) HandleKeyReload(_ context.Context, newKeys [][fieldpara } // SubmitSignedContributionAndProof for mocking -func (*FakeValidator) SubmitSignedContributionAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { +func (_ *FakeValidator) SubmitSignedContributionAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { } // HasProposerSettings for mocking @@ -266,26 +266,22 @@ func (fv *FakeValidator) PushProposerSettings(ctx context.Context, km keymanager } // SetPubKeyToValidatorIndexMap for mocking -func (*FakeValidator) SetPubKeyToValidatorIndexMap(_ context.Context, _ keymanager.IKeymanager) error { +func (_ *FakeValidator) SetPubKeyToValidatorIndexMap(_ context.Context, _ keymanager.IKeymanager) error { return nil } // SignValidatorRegistrationRequest for mocking -func (*FakeValidator) SignValidatorRegistrationRequest(_ context.Context, _ iface.SigningFunc, _ *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error) { +func (_ *FakeValidator) SignValidatorRegistrationRequest(_ context.Context, _ iface.SigningFunc, _ *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error) { return nil, nil } // ProposerSettings for mocking -func (fv *FakeValidator) ProposerSettings() *validatorserviceconfig.ProposerSettings { - return fv.proposerSettings +func (f *FakeValidator) ProposerSettings() *validatorserviceconfig.ProposerSettings { + return f.proposerSettings } // SetProposerSettings for mocking -func (fv *FakeValidator) SetProposerSettings(_ context.Context, settings *validatorserviceconfig.ProposerSettings) error { - fv.proposerSettings = settings - return nil -} - -func (fv *FakeValidator) StartEventStream(_ context.Context) error { +func (f *FakeValidator) SetProposerSettings(_ context.Context, settings *validatorserviceconfig.ProposerSettings) error { + f.proposerSettings = settings return nil } diff --git a/validator/client/validator-client-factory/validator_client_factory.go b/validator/client/validator-client-factory/validator_client_factory.go index edbf51ee09..cb0a9f72b9 100644 --- a/validator/client/validator-client-factory/validator_client_factory.go +++ b/validator/client/validator-client-factory/validator_client_factory.go @@ -8,11 +8,11 @@ import ( validatorHelpers "github.com/prysmaticlabs/prysm/v4/validator/helpers" ) -func NewValidatorClient(validatorConn validatorHelpers.NodeConnection, opt ...beaconApi.ValidatorClientOpt) iface.ValidatorClient { +func NewValidatorClient(validatorConn validatorHelpers.NodeConnection) iface.ValidatorClient { featureFlags := features.Get() if featureFlags.EnableBeaconRESTApi { - return beaconApi.NewBeaconApiValidatorClient(validatorConn.GetBeaconApiUrl(), validatorConn.GetBeaconApiTimeout(), opt...) + return beaconApi.NewBeaconApiValidatorClient(validatorConn.GetBeaconApiUrl(), validatorConn.GetBeaconApiTimeout()) } else { return grpcApi.NewGrpcValidatorClient(validatorConn.GetGrpcClientConn()) } diff --git a/validator/client/validator.go b/validator/client/validator.go index cd8f9e5fe5..9678992320 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -333,7 +333,7 @@ func (v *validator) ReceiveSlots(ctx context.Context, connectionErrorChannel cha } res, err := stream.Recv() if err != nil { - log.WithError(err).Error("Could not receive slots from beacon node: " + iface.ErrConnectionIssue.Error()) + log.WithError(err).Error("Could not receive slots from beacon node, " + iface.ErrConnectionIssue.Error()) connectionErrorChannel <- errors.Wrap(iface.ErrConnectionIssue, err.Error()) return } @@ -1035,10 +1035,6 @@ func (v *validator) PushProposerSettings(ctx context.Context, km keymanager.IKey return nil } -func (v *validator) StartEventStream(ctx context.Context) error { - return v.validatorClient.StartEventStream(ctx) -} - func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) { filteredKeys := make([][fieldparams.BLSPubkeyLength]byte, 0) statusRequestKeys := make([][]byte, 0) diff --git a/validator/node/BUILD.bazel b/validator/node/BUILD.bazel index 5c4bd43994..9774123d4b 100644 --- a/validator/node/BUILD.bazel +++ b/validator/node/BUILD.bazel @@ -39,7 +39,6 @@ go_library( "//validator:__subpackages__", ], deps = [ - "//api:go_default_library", "//api/gateway:go_default_library", "//api/server:go_default_library", "//async/event:go_default_library", diff --git a/validator/node/node.go b/validator/node/node.go index a12c9ae44a..aa21ae5fcd 100644 --- a/validator/node/node.go +++ b/validator/node/node.go @@ -26,7 +26,6 @@ import ( gwruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" "github.com/pkg/errors" fastssz "github.com/prysmaticlabs/fastssz" - "github.com/prysmaticlabs/prysm/v4/api" "github.com/prysmaticlabs/prysm/v4/api/gateway" "github.com/prysmaticlabs/prysm/v4/api/server" "github.com/prysmaticlabs/prysm/v4/async/event" @@ -852,7 +851,7 @@ func (c *ValidatorClient) registerRPCGatewayService(router *mux.Router) error { }, }), gwruntime.WithMarshalerOption( - api.EventStreamMediaType, &gwruntime.EventSourceJSONPb{}, // TODO: remove this + "text/event-stream", &gwruntime.EventSourceJSONPb{}, // TODO: remove this ), gwruntime.WithForwardResponseOption(gateway.HttpResponseModifier), ) diff --git a/validator/rpc/handlers_health.go b/validator/rpc/handlers_health.go index d845b6de3a..4ab87bc87d 100644 --- a/validator/rpc/handlers_health.go +++ b/validator/rpc/handlers_health.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" - "github.com/prysmaticlabs/prysm/v4/api" "github.com/prysmaticlabs/prysm/v4/network/httputil" pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v4/runtime/version" @@ -40,7 +39,7 @@ func (s *Server) StreamBeaconLogs(w http.ResponseWriter, r *http.Request) { ctx, span := trace.StartSpan(r.Context(), "validator.web.health.StreamBeaconLogs") defer span.End() // Set up SSE response headers - w.Header().Set("Content-Type", api.EventStreamMediaType) + w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") @@ -109,7 +108,7 @@ func (s *Server) StreamValidatorLogs(w http.ResponseWriter, r *http.Request) { close(ch) }() // Set up SSE response headers - w.Header().Set("Content-Type", api.EventStreamMediaType) + w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") diff --git a/validator/rpc/handlers_health_test.go b/validator/rpc/handlers_health_test.go index f3dc20b4bd..7be1938bc5 100644 --- a/validator/rpc/handlers_health_test.go +++ b/validator/rpc/handlers_health_test.go @@ -11,7 +11,6 @@ import ( "github.com/golang/mock/gomock" "github.com/golang/protobuf/ptypes/empty" - "github.com/prysmaticlabs/prysm/v4/api" "github.com/prysmaticlabs/prysm/v4/io/logs/mock" eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" @@ -93,7 +92,7 @@ func TestStreamBeaconLogs(t *testing.T) { } ct, ok := resp.Header["Content-Type"] require.Equal(t, ok, true) - require.Equal(t, ct[0], api.EventStreamMediaType) + require.Equal(t, ct[0], "text/event-stream") cn, ok := resp.Header["Connection"] require.Equal(t, ok, true) require.Equal(t, cn[0], "keep-alive") @@ -144,7 +143,7 @@ func TestStreamValidatorLogs(t *testing.T) { } ct, ok := resp.Header["Content-Type"] require.Equal(t, ok, true) - require.Equal(t, ct[0], api.EventStreamMediaType) + require.Equal(t, ct[0], "text/event-stream") cn, ok := resp.Header["Connection"] require.Equal(t, ok, true) require.Equal(t, cn[0], "keep-alive")