Revert "REST VC: Subscribe to Beacon API events (#13354)" (#13428)

This reverts commit e68b2821c1.
This commit is contained in:
Radosław Kapka
2024-01-06 22:36:42 +01:00
committed by GitHub
parent 073c4edc5f
commit 8d092a1113
25 changed files with 48 additions and 350 deletions

View File

@@ -7,5 +7,4 @@ const (
ConsensusBlockValueHeader = "Eth-Consensus-Block-Value" ConsensusBlockValueHeader = "Eth-Consensus-Block-Value"
JsonMediaType = "application/json" JsonMediaType = "application/json"
OctetStreamMediaType = "application/octet-stream" OctetStreamMediaType = "application/octet-stream"
EventStreamMediaType = "text/event-stream"
) )

View File

@@ -6,7 +6,6 @@ go_library(
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/gateway", importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/gateway",
visibility = ["//beacon-chain:__subpackages__"], visibility = ["//beacon-chain:__subpackages__"],
deps = [ deps = [
"//api:go_default_library",
"//api/gateway:go_default_library", "//api/gateway:go_default_library",
"//cmd/beacon-chain/flags:go_default_library", "//cmd/beacon-chain/flags:go_default_library",
"//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1:go_default_library",

View File

@@ -2,7 +2,6 @@ package gateway
import ( import (
gwruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" gwruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/prysmaticlabs/prysm/v4/api"
"github.com/prysmaticlabs/prysm/v4/api/gateway" "github.com/prysmaticlabs/prysm/v4/api/gateway"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
ethpbalpha "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" ethpbalpha "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
@@ -41,7 +40,7 @@ func DefaultConfig(enableDebugRPCEndpoints bool, httpModules string) MuxConfig {
}, },
}), }),
gwruntime.WithMarshalerOption( gwruntime.WithMarshalerOption(
api.EventStreamMediaType, &gwruntime.EventSourceJSONPb{}, "text/event-stream", &gwruntime.EventSourceJSONPb{},
), ),
) )
v1AlphaPbHandler = &gateway.PbMux{ v1AlphaPbHandler = &gateway.PbMux{

View File

@@ -8,9 +8,8 @@ go_library(
"structs.go", "structs.go",
], ],
importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/events", importpath = "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/events",
visibility = ["//visibility:public"], visibility = ["//beacon-chain:__subpackages__"],
deps = [ deps = [
"//api:go_default_library",
"//beacon-chain/blockchain:go_default_library", "//beacon-chain/blockchain:go_default_library",
"//beacon-chain/core/feed:go_default_library", "//beacon-chain/core/feed:go_default_library",
"//beacon-chain/core/feed/operation:go_default_library", "//beacon-chain/core/feed/operation:go_default_library",
@@ -19,7 +18,6 @@ go_library(
"//beacon-chain/core/time:go_default_library", "//beacon-chain/core/time:go_default_library",
"//beacon-chain/core/transition:go_default_library", "//beacon-chain/core/transition:go_default_library",
"//beacon-chain/rpc/eth/shared:go_default_library", "//beacon-chain/rpc/eth/shared:go_default_library",
"//config/params:go_default_library",
"//network/httputil:go_default_library", "//network/httputil:go_default_library",
"//proto/eth/v1:go_default_library", "//proto/eth/v1:go_default_library",
"//runtime/version:go_default_library", "//runtime/version:go_default_library",

View File

@@ -5,10 +5,8 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
time2 "time"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/prysmaticlabs/prysm/v4/api"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/v4/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/feed/operation"
@@ -17,7 +15,6 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/network/httputil" "github.com/prysmaticlabs/prysm/v4/network/httputil"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/eth/v1" ethpb "github.com/prysmaticlabs/prysm/v4/proto/eth/v1"
"github.com/prysmaticlabs/prysm/v4/runtime/version" "github.com/prysmaticlabs/prysm/v4/runtime/version"
@@ -109,24 +106,16 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) {
defer stateSub.Unsubscribe() defer stateSub.Unsubscribe()
// Set up SSE response headers // Set up SSE response headers
w.Header().Set("Content-Type", api.EventStreamMediaType) w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
// Handle each event received and context cancellation. // Handle each event received and context cancellation.
// We send a keepalive dummy message immediately to prevent clients
// stalling while waiting for the first response chunk.
// After that we send a keepalive dummy message every SECONDS_PER_SLOT
// to prevent anyone (e.g. proxy servers) from closing connections.
sendKeepalive(w, flusher)
keepaliveTicker := time2.NewTicker(time2.Duration(params.BeaconConfig().SecondsPerSlot) * time2.Second)
for { for {
select { select {
case event := <-opsChan: case event := <-opsChan:
handleBlockOperationEvents(w, flusher, topicsMap, event) handleBlockOperationEvents(w, flusher, topicsMap, event)
case event := <-stateChan: case event := <-stateChan:
s.handleStateEvents(ctx, w, flusher, topicsMap, event) s.handleStateEvents(ctx, w, flusher, topicsMap, event)
case <-keepaliveTicker.C:
sendKeepalive(w, flusher)
case <-ctx.Done(): case <-ctx.Done():
return return
} }
@@ -442,10 +431,6 @@ func send(w http.ResponseWriter, flusher http.Flusher, name string, data interfa
write(w, flusher, "event: %s\ndata: %s\n\n", name, string(j)) write(w, flusher, "event: %s\ndata: %s\n\n", name, string(j))
} }
func sendKeepalive(w http.ResponseWriter, flusher http.Flusher) {
write(w, flusher, ":\n\n")
}
func write(w http.ResponseWriter, flusher http.Flusher, format string, a ...any) { func write(w http.ResponseWriter, flusher http.Flusher, format string, a ...any) {
_, err := fmt.Fprintf(w, format, a...) _, err := fmt.Fprintf(w, format, a...)
if err != nil { if err != nil {

View File

@@ -375,9 +375,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
}) })
} }
const operationsResult = `: const operationsResult = `event: attestation
event: attestation
data: {"aggregation_bits":"0x00","data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"} data: {"aggregation_bits":"0x00","data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"}
event: attestation event: attestation
@@ -403,9 +401,7 @@ data: {"signed_header_1":{"message":{"slot":"0","proposer_index":"0","parent_roo
` `
const stateResult = `: const stateResult = `event: head
event: head
data: {"slot":"0","block":"0x0000000000000000000000000000000000000000000000000000000000000000","state":"0x0000000000000000000000000000000000000000000000000000000000000000","epoch_transition":true,"execution_optimistic":false,"previous_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","current_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000"} data: {"slot":"0","block":"0x0000000000000000000000000000000000000000000000000000000000000000","state":"0x0000000000000000000000000000000000000000000000000000000000000000","epoch_transition":true,"execution_optimistic":false,"previous_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","current_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000"}
event: finalized_checkpoint event: finalized_checkpoint
@@ -419,23 +415,17 @@ data: {"slot":"0","block":"0xeade62f0457b2fdf48e7d3fc4b60736688286be7c7a3ac4c9a1
` `
const payloadAttributesBellatrixResult = `: const payloadAttributesBellatrixResult = `event: payload_attributes
event: payload_attributes
data: {"version":"bellatrix","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000"}}} data: {"version":"bellatrix","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000"}}}
` `
const payloadAttributesCapellaResult = `: const payloadAttributesCapellaResult = `event: payload_attributes
event: payload_attributes
data: {"version":"capella","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[]}}} data: {"version":"capella","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[]}}}
` `
const payloadAttributesDenebResult = `: const payloadAttributesDenebResult = `event: payload_attributes
event: payload_attributes
data: {"version":"deneb","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[],"parent_beacon_block_root":"0xbef96cb938fd48b2403d3e662664325abb0102ed12737cbb80d717520e50cf4a"}}} data: {"version":"deneb","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[],"parent_beacon_block_root":"0xbef96cb938fd48b2403d3e662664325abb0102ed12737cbb80d717520e50cf4a"}}}
` `

View File

@@ -1,7 +1,7 @@
// Code generated by MockGen. DO NOT EDIT. // Code generated by MockGen. DO NOT EDIT.
// Source: github.com/prysmaticlabs/prysm/v4/validator/client/iface (interfaces: ValidatorClient) // Source: github.com/prysmaticlabs/prysm/v4/validator/client/iface (interfaces: ValidatorClient)
// Package mock is a generated GoMock package. // Package validator_mock is a generated GoMock package.
package validator_mock package validator_mock
import ( import (
@@ -247,20 +247,6 @@ func (mr *MockValidatorClientMockRecorder) ProposeExit(arg0, arg1 interface{}) *
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProposeExit", reflect.TypeOf((*MockValidatorClient)(nil).ProposeExit), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProposeExit", reflect.TypeOf((*MockValidatorClient)(nil).ProposeExit), arg0, arg1)
} }
// StartEventStream mocks base method.
func (m *MockValidatorClient) StartEventStream(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StartEventStream", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// StartEventStream indicates an expected call of StartEventStream.
func (mr *MockValidatorClientMockRecorder) StartEventStream(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StartEventStream", reflect.TypeOf((*MockValidatorClient)(nil).StartEventStream), arg0)
}
// StreamSlots mocks base method. // StreamSlots mocks base method.
func (m *MockValidatorClient) StreamSlots(arg0 context.Context, arg1 *eth.StreamSlotsRequest) (eth.BeaconNodeValidator_StreamSlotsClient, error) { func (m *MockValidatorClient) StreamSlots(arg0 context.Context, arg1 *eth.StreamSlotsRequest) (eth.BeaconNodeValidator_StreamSlotsClient, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@@ -212,7 +212,3 @@ func (m *Validator) SetProposerSettings(_ context.Context, settings *validatorse
m.proposerSettings = settings m.proposerSettings = settings
return nil return nil
} }
func (_ *Validator) StartEventStream(_ context.Context) error {
panic("implement me")
}

View File

@@ -15,7 +15,6 @@ go_library(
"domain_data.go", "domain_data.go",
"doppelganger.go", "doppelganger.go",
"duties.go", "duties.go",
"event_handler.go",
"genesis.go", "genesis.go",
"get_beacon_block.go", "get_beacon_block.go",
"index.go", "index.go",
@@ -44,7 +43,6 @@ go_library(
"//beacon-chain/core/signing:go_default_library", "//beacon-chain/core/signing:go_default_library",
"//beacon-chain/rpc/eth/beacon:go_default_library", "//beacon-chain/rpc/eth/beacon:go_default_library",
"//beacon-chain/rpc/eth/config:go_default_library", "//beacon-chain/rpc/eth/config:go_default_library",
"//beacon-chain/rpc/eth/events:go_default_library",
"//beacon-chain/rpc/eth/node:go_default_library", "//beacon-chain/rpc/eth/node:go_default_library",
"//beacon-chain/rpc/eth/shared:go_default_library", "//beacon-chain/rpc/eth/shared:go_default_library",
"//beacon-chain/rpc/eth/validator:go_default_library", "//beacon-chain/rpc/eth/validator:go_default_library",
@@ -85,7 +83,6 @@ go_test(
"domain_data_test.go", "domain_data_test.go",
"doppelganger_test.go", "doppelganger_test.go",
"duties_test.go", "duties_test.go",
"event_handler_test.go",
"genesis_test.go", "genesis_test.go",
"get_beacon_block_test.go", "get_beacon_block_test.go",
"index_test.go", "index_test.go",
@@ -142,7 +139,6 @@ go_test(
"@com_github_golang_mock//gomock:go_default_library", "@com_github_golang_mock//gomock:go_default_library",
"@com_github_golang_protobuf//ptypes/empty", "@com_github_golang_protobuf//ptypes/empty",
"@com_github_pkg_errors//:go_default_library", "@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
"@org_golang_google_protobuf//types/known/emptypb:go_default_library", "@org_golang_google_protobuf//types/known/emptypb:go_default_library",
"@org_golang_google_protobuf//types/known/timestamppb:go_default_library", "@org_golang_google_protobuf//types/known/timestamppb:go_default_library",
], ],

View File

@@ -14,38 +14,22 @@ import (
"github.com/prysmaticlabs/prysm/v4/validator/client/iface" "github.com/prysmaticlabs/prysm/v4/validator/client/iface"
) )
type ValidatorClientOpt func(*beaconApiValidatorClient)
func WithEventHandler(h *EventHandler) ValidatorClientOpt {
return func(c *beaconApiValidatorClient) {
c.eventHandler = h
}
}
func WithEventErrorChannel(ch chan error) ValidatorClientOpt {
return func(c *beaconApiValidatorClient) {
c.eventErrCh = ch
}
}
type beaconApiValidatorClient struct { type beaconApiValidatorClient struct {
genesisProvider GenesisProvider genesisProvider GenesisProvider
dutiesProvider dutiesProvider dutiesProvider dutiesProvider
stateValidatorsProvider StateValidatorsProvider stateValidatorsProvider StateValidatorsProvider
jsonRestHandler JsonRestHandler jsonRestHandler JsonRestHandler
eventHandler *EventHandler
eventErrCh chan error
beaconBlockConverter BeaconBlockConverter beaconBlockConverter BeaconBlockConverter
prysmBeaconChainCLient iface.PrysmBeaconChainClient prysmBeaconChainCLient iface.PrysmBeaconChainClient
} }
func NewBeaconApiValidatorClient(host string, timeout time.Duration, opts ...ValidatorClientOpt) iface.ValidatorClient { func NewBeaconApiValidatorClient(host string, timeout time.Duration) iface.ValidatorClient {
jsonRestHandler := beaconApiJsonRestHandler{ jsonRestHandler := beaconApiJsonRestHandler{
httpClient: http.Client{Timeout: timeout}, httpClient: http.Client{Timeout: timeout},
host: host, host: host,
} }
c := &beaconApiValidatorClient{ return &beaconApiValidatorClient{
genesisProvider: beaconApiGenesisProvider{jsonRestHandler: jsonRestHandler}, genesisProvider: beaconApiGenesisProvider{jsonRestHandler: jsonRestHandler},
dutiesProvider: beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler}, dutiesProvider: beaconApiDutiesProvider{jsonRestHandler: jsonRestHandler},
stateValidatorsProvider: beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler}, stateValidatorsProvider: beaconApiStateValidatorsProvider{jsonRestHandler: jsonRestHandler},
@@ -56,10 +40,6 @@ func NewBeaconApiValidatorClient(host string, timeout time.Duration, opts ...Val
jsonRestHandler: jsonRestHandler, jsonRestHandler: jsonRestHandler,
}, },
} }
for _, o := range opts {
o(c)
}
return c
} }
func (c *beaconApiValidatorClient) GetDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) { func (c *beaconApiValidatorClient) GetDuties(ctx context.Context, in *ethpb.DutiesRequest) (*ethpb.DutiesResponse, error) {
@@ -175,15 +155,3 @@ func (c *beaconApiValidatorClient) WaitForActivation(ctx context.Context, in *et
func (c *beaconApiValidatorClient) WaitForChainStart(ctx context.Context, _ *empty.Empty) (*ethpb.ChainStartResponse, error) { func (c *beaconApiValidatorClient) WaitForChainStart(ctx context.Context, _ *empty.Empty) (*ethpb.ChainStartResponse, error) {
return c.waitForChainStart(ctx) return c.waitForChainStart(ctx)
} }
func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context) error {
if c.eventHandler != nil {
if c.eventErrCh == nil {
return errors.New("event handler cannot be initialized without an event error channel")
}
if err := c.eventHandler.get(ctx, []string{"head"}, c.eventErrCh); err != nil {
return errors.Wrapf(err, "event handler stopped working")
}
}
return nil
}

View File

@@ -1,126 +0,0 @@
package beacon_api
import (
"context"
"net/http"
"strings"
"sync"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/api"
)
// Currently set to the first power of 2 bigger than the size of the `head` event
// which is 446 bytes
const eventByteLimit = 512
// EventHandler is responsible for subscribing to the Beacon API events endpoint
// and dispatching received events to subscribers.
type EventHandler struct {
httpClient *http.Client
host string
subs []eventSub
sync.Mutex
}
type eventSub struct {
name string
ch chan<- event
}
type event struct {
eventType string
data string
}
// NewEventHandler returns a new handler.
func NewEventHandler(httpClient *http.Client, host string) *EventHandler {
return &EventHandler{
httpClient: httpClient,
host: host,
subs: make([]eventSub, 0),
}
}
func (h *EventHandler) subscribe(sub eventSub) {
h.Lock()
h.subs = append(h.subs, sub)
h.Unlock()
}
func (h *EventHandler) get(ctx context.Context, topics []string, eventErrCh chan<- error) error {
if len(topics) == 0 {
return errors.New("no topics provided")
}
allTopics := strings.Join(topics, ",")
log.Info("Starting listening to Beacon API events on topics " + allTopics)
url := h.host + "/eth/v1/events?topics=" + allTopics
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return errors.Wrap(err, "failed to create HTTP request")
}
req.Header.Set("Accept", api.EventStreamMediaType)
req.Header.Set("Connection", "keep-alive")
resp, err := h.httpClient.Do(req)
if err != nil {
return errors.Wrap(err, "failed to perform HTTP request")
}
go func() {
defer func() {
if closeErr := resp.Body.Close(); closeErr != nil {
log.WithError(closeErr).Error("Failed to close events response body")
}
}()
// We signal an EOF error in a special way. When we get this error while reading the response body,
// there might still be an event received in the body that we should handle.
eof := false
for {
if ctx.Err() != nil {
eventErrCh <- ctx.Err()
return
}
rawData := make([]byte, eventByteLimit)
_, err = resp.Body.Read(rawData)
if err != nil {
if strings.Contains(err.Error(), "EOF") {
log.Error("Received EOF while reading events response body")
eof = true
} else {
eventErrCh <- err
return
}
}
e := strings.Split(string(rawData), "\n")
// We expect that the event format will contain event type and data separated with a newline
if len(e) < 2 {
// We reached EOF and there is no event to send
if eof {
return
}
continue
}
for _, sub := range h.subs {
select {
case sub.ch <- event{eventType: e[0], data: e[1]}:
// Event sent successfully.
default:
log.Warn("Subscriber '" + sub.name + "' not ready to receive events")
}
}
// We reached EOF and sent the last event
if eof {
return
}
}
}()
return nil
}

View File

@@ -1,55 +0,0 @@
package beacon_api
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
logtest "github.com/sirupsen/logrus/hooks/test"
)
func TestEventHandler(t *testing.T) {
logHook := logtest.NewGlobal()
mux := http.NewServeMux()
mux.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
require.Equal(t, true, ok)
_, err := fmt.Fprint(w, "head\ndata\n\n")
require.NoError(t, err)
flusher.Flush()
})
server := httptest.NewServer(mux)
defer server.Close()
handler := NewEventHandler(http.DefaultClient, server.URL)
ch1 := make(chan event, 1)
sub1 := eventSub{ch: ch1}
ch2 := make(chan event, 1)
sub2 := eventSub{ch: ch2}
ch3 := make(chan event, 1)
sub3 := eventSub{name: "sub3", ch: ch3}
// fill up the channel so that it can't receive more events
ch3 <- event{}
handler.subscribe(sub1)
handler.subscribe(sub2)
handler.subscribe(sub3)
require.NoError(t, handler.get(context.Background(), []string{"head"}, make(chan error)))
// make sure the goroutine inside handler.get is invoked
time.Sleep(500 * time.Millisecond)
e := <-ch1
assert.Equal(t, "head", e.eventType)
assert.Equal(t, "data", e.data)
e = <-ch2
assert.Equal(t, "head", e.eventType)
assert.Equal(t, "data", e.data)
assert.LogsContain(t, logHook, "Subscriber 'sub3' not ready to receive events")
}

View File

@@ -4,12 +4,10 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"strconv"
"time" "time"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/events"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/eth/shared"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
@@ -28,8 +26,8 @@ type streamSlotsClient struct {
ctx context.Context ctx context.Context
beaconApiClient beaconApiValidatorClient beaconApiClient beaconApiValidatorClient
streamSlotsRequest *ethpb.StreamSlotsRequest streamSlotsRequest *ethpb.StreamSlotsRequest
prevBlockSlot primitives.Slot
pingDelay time.Duration pingDelay time.Duration
ch chan event
} }
type streamBlocksAltairClient struct { type streamBlocksAltairClient struct {
@@ -48,14 +46,11 @@ type headSignedBeaconBlockResult struct {
} }
func (c beaconApiValidatorClient) streamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest, pingDelay time.Duration) ethpb.BeaconNodeValidator_StreamSlotsClient { func (c beaconApiValidatorClient) streamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest, pingDelay time.Duration) ethpb.BeaconNodeValidator_StreamSlotsClient {
ch := make(chan event, 1)
c.eventHandler.subscribe(eventSub{name: "stream slots", ch: ch})
return &streamSlotsClient{ return &streamSlotsClient{
ctx: ctx, ctx: ctx,
beaconApiClient: c, beaconApiClient: c,
streamSlotsRequest: in, streamSlotsRequest: in,
pingDelay: pingDelay, pingDelay: pingDelay,
ch: ch,
} }
} }
@@ -69,27 +64,28 @@ func (c beaconApiValidatorClient) streamBlocks(ctx context.Context, in *ethpb.St
} }
func (c *streamSlotsClient) Recv() (*ethpb.StreamSlotsResponse, error) { func (c *streamSlotsClient) Recv() (*ethpb.StreamSlotsResponse, error) {
for { result, err := c.beaconApiClient.getHeadSignedBeaconBlock(c.ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to get latest signed block")
}
// We keep querying the beacon chain for the latest block until we receive a new slot
for (c.streamSlotsRequest.VerifiedOnly && result.executionOptimistic) || c.prevBlockSlot == result.slot {
select { select {
case rawEvent := <-c.ch: case <-time.After(c.pingDelay):
if rawEvent.eventType != events.HeadTopic { result, err = c.beaconApiClient.getHeadSignedBeaconBlock(c.ctx)
continue
}
e := &events.HeadEvent{}
if err := json.Unmarshal([]byte(rawEvent.data), e); err != nil {
return nil, errors.Wrap(err, "failed to unmarshal head event into JSON")
}
uintSlot, err := strconv.ParseUint(e.Slot, 10, 64)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to parse slot") return nil, errors.Wrap(err, "failed to get latest signed block")
} }
return &ethpb.StreamSlotsResponse{
Slot: primitives.Slot(uintSlot),
}, nil
case <-c.ctx.Done(): case <-c.ctx.Done():
return nil, errors.New("context canceled") return nil, errors.New("context canceled")
} }
} }
c.prevBlockSlot = result.slot
return &ethpb.StreamSlotsResponse{
Slot: result.slot,
}, nil
} }
func (c *streamBlocksAltairClient) Recv() (*ethpb.StreamBlocksResponse, error) { func (c *streamBlocksAltairClient) Recv() (*ethpb.StreamBlocksResponse, error) {

View File

@@ -141,8 +141,3 @@ func (c *grpcValidatorClient) AggregatedSigAndAggregationBits(
func NewGrpcValidatorClient(cc grpc.ClientConnInterface) iface.ValidatorClient { func NewGrpcValidatorClient(cc grpc.ClientConnInterface) iface.ValidatorClient {
return &grpcValidatorClient{ethpb.NewBeaconNodeValidatorClient(cc)} return &grpcValidatorClient{ethpb.NewBeaconNodeValidatorClient(cc)}
} }
// StartEventStream doesn't do anything in gRPC client
func (c *grpcValidatorClient) StartEventStream(context.Context) error {
return nil
}

View File

@@ -64,7 +64,6 @@ type Validator interface {
SignValidatorRegistrationRequest(ctx context.Context, signer SigningFunc, newValidatorRegistration *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error) SignValidatorRegistrationRequest(ctx context.Context, signer SigningFunc, newValidatorRegistration *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error)
ProposerSettings() *validatorserviceconfig.ProposerSettings ProposerSettings() *validatorserviceconfig.ProposerSettings
SetProposerSettings(context.Context, *validatorserviceconfig.ProposerSettings) error SetProposerSettings(context.Context, *validatorserviceconfig.ProposerSettings) error
StartEventStream(ctx context.Context) error
} }
// SigningFunc interface defines a type for the a function that signs a message // SigningFunc interface defines a type for the a function that signs a message

View File

@@ -34,5 +34,4 @@ type ValidatorClient interface {
SubmitSignedContributionAndProof(ctx context.Context, in *ethpb.SignedContributionAndProof) (*empty.Empty, error) SubmitSignedContributionAndProof(ctx context.Context, in *ethpb.SignedContributionAndProof) (*empty.Empty, error)
StreamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest) (ethpb.BeaconNodeValidator_StreamSlotsClient, error) StreamSlots(ctx context.Context, in *ethpb.StreamSlotsRequest) (ethpb.BeaconNodeValidator_StreamSlotsClient, error)
SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error) SubmitValidatorRegistrations(ctx context.Context, in *ethpb.SignedValidatorRegistrationsV1) (*empty.Empty, error)
StartEventStream(ctx context.Context) error
} }

View File

@@ -189,9 +189,6 @@ func initializeValidatorAndGetHeadSlot(ctx context.Context, v iface.Validator) (
if err != nil { if err != nil {
log.WithError(err).Fatal("Could not wait for validator activation") log.WithError(err).Fatal("Could not wait for validator activation")
} }
if err = v.StartEventStream(ctx); err != nil {
log.WithError(err).Fatal("Could not start API event stream")
}
headSlot, err = v.CanonicalHeadSlot(ctx) headSlot, err = v.CanonicalHeadSlot(ctx)
if isConnectionError(err) { if isConnectionError(err) {

View File

@@ -2,7 +2,6 @@ package client
import ( import (
"context" "context"
"net/http"
"strings" "strings"
"time" "time"
@@ -21,7 +20,6 @@ import (
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/validator/accounts/wallet" "github.com/prysmaticlabs/prysm/v4/validator/accounts/wallet"
beaconApi "github.com/prysmaticlabs/prysm/v4/validator/client/beacon-api"
beaconChainClientFactory "github.com/prysmaticlabs/prysm/v4/validator/client/beacon-chain-client-factory" beaconChainClientFactory "github.com/prysmaticlabs/prysm/v4/validator/client/beacon-chain-client-factory"
"github.com/prysmaticlabs/prysm/v4/validator/client/iface" "github.com/prysmaticlabs/prysm/v4/validator/client/iface"
nodeClientFactory "github.com/prysmaticlabs/prysm/v4/validator/client/node-client-factory" nodeClientFactory "github.com/prysmaticlabs/prysm/v4/validator/client/node-client-factory"
@@ -191,16 +189,7 @@ func (v *ValidatorService) Start() {
return return
} }
evHandler := beaconApi.NewEventHandler(http.DefaultClient, v.conn.GetBeaconApiUrl()) validatorClient := validatorClientFactory.NewValidatorClient(v.conn)
evErrCh := make(chan error)
opts := []beaconApi.ValidatorClientOpt{beaconApi.WithEventHandler(evHandler), beaconApi.WithEventErrorChannel(evErrCh)}
validatorClient := validatorClientFactory.NewValidatorClient(v.conn, opts...)
go func() {
e := <-evErrCh
log.WithError(e).Error("Event streaming failed")
v.cancel()
}()
beaconClient := beaconChainClientFactory.NewBeaconChainClient(v.conn) beaconClient := beaconChainClientFactory.NewBeaconChainClient(v.conn)
prysmBeaconClient := beaconChainClientFactory.NewPrysmBeaconClient(v.conn) prysmBeaconClient := beaconChainClientFactory.NewPrysmBeaconClient(v.conn)

View File

@@ -174,18 +174,18 @@ func (fv *FakeValidator) ProposeBlock(_ context.Context, slot primitives.Slot, _
} }
// SubmitAggregateAndProof for mocking. // SubmitAggregateAndProof for mocking.
func (*FakeValidator) SubmitAggregateAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { func (_ *FakeValidator) SubmitAggregateAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) {
} }
// SubmitSyncCommitteeMessage for mocking. // SubmitSyncCommitteeMessage for mocking.
func (*FakeValidator) SubmitSyncCommitteeMessage(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { func (_ *FakeValidator) SubmitSyncCommitteeMessage(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) {
} }
// LogAttestationsSubmitted for mocking. // LogAttestationsSubmitted for mocking.
func (*FakeValidator) LogAttestationsSubmitted() {} func (_ *FakeValidator) LogAttestationsSubmitted() {}
// UpdateDomainDataCaches for mocking. // UpdateDomainDataCaches for mocking.
func (*FakeValidator) UpdateDomainDataCaches(context.Context, primitives.Slot) {} func (_ *FakeValidator) UpdateDomainDataCaches(context.Context, primitives.Slot) {}
// BalancesByPubkeys for mocking. // BalancesByPubkeys for mocking.
func (fv *FakeValidator) BalancesByPubkeys(_ context.Context) map[[fieldparams.BLSPubkeyLength]byte]uint64 { func (fv *FakeValidator) BalancesByPubkeys(_ context.Context) map[[fieldparams.BLSPubkeyLength]byte]uint64 {
@@ -213,7 +213,7 @@ func (fv *FakeValidator) Keymanager() (keymanager.IKeymanager, error) {
} }
// CheckDoppelGanger for mocking // CheckDoppelGanger for mocking
func (*FakeValidator) CheckDoppelGanger(_ context.Context) error { func (_ *FakeValidator) CheckDoppelGanger(_ context.Context) error {
return nil return nil
} }
@@ -237,7 +237,7 @@ func (fv *FakeValidator) HandleKeyReload(_ context.Context, newKeys [][fieldpara
} }
// SubmitSignedContributionAndProof for mocking // SubmitSignedContributionAndProof for mocking
func (*FakeValidator) SubmitSignedContributionAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) { func (_ *FakeValidator) SubmitSignedContributionAndProof(_ context.Context, _ primitives.Slot, _ [fieldparams.BLSPubkeyLength]byte) {
} }
// HasProposerSettings for mocking // HasProposerSettings for mocking
@@ -266,26 +266,22 @@ func (fv *FakeValidator) PushProposerSettings(ctx context.Context, km keymanager
} }
// SetPubKeyToValidatorIndexMap for mocking // SetPubKeyToValidatorIndexMap for mocking
func (*FakeValidator) SetPubKeyToValidatorIndexMap(_ context.Context, _ keymanager.IKeymanager) error { func (_ *FakeValidator) SetPubKeyToValidatorIndexMap(_ context.Context, _ keymanager.IKeymanager) error {
return nil return nil
} }
// SignValidatorRegistrationRequest for mocking // SignValidatorRegistrationRequest for mocking
func (*FakeValidator) SignValidatorRegistrationRequest(_ context.Context, _ iface.SigningFunc, _ *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error) { func (_ *FakeValidator) SignValidatorRegistrationRequest(_ context.Context, _ iface.SigningFunc, _ *ethpb.ValidatorRegistrationV1) (*ethpb.SignedValidatorRegistrationV1, error) {
return nil, nil return nil, nil
} }
// ProposerSettings for mocking // ProposerSettings for mocking
func (fv *FakeValidator) ProposerSettings() *validatorserviceconfig.ProposerSettings { func (f *FakeValidator) ProposerSettings() *validatorserviceconfig.ProposerSettings {
return fv.proposerSettings return f.proposerSettings
} }
// SetProposerSettings for mocking // SetProposerSettings for mocking
func (fv *FakeValidator) SetProposerSettings(_ context.Context, settings *validatorserviceconfig.ProposerSettings) error { func (f *FakeValidator) SetProposerSettings(_ context.Context, settings *validatorserviceconfig.ProposerSettings) error {
fv.proposerSettings = settings f.proposerSettings = settings
return nil
}
func (fv *FakeValidator) StartEventStream(_ context.Context) error {
return nil return nil
} }

View File

@@ -8,11 +8,11 @@ import (
validatorHelpers "github.com/prysmaticlabs/prysm/v4/validator/helpers" validatorHelpers "github.com/prysmaticlabs/prysm/v4/validator/helpers"
) )
func NewValidatorClient(validatorConn validatorHelpers.NodeConnection, opt ...beaconApi.ValidatorClientOpt) iface.ValidatorClient { func NewValidatorClient(validatorConn validatorHelpers.NodeConnection) iface.ValidatorClient {
featureFlags := features.Get() featureFlags := features.Get()
if featureFlags.EnableBeaconRESTApi { if featureFlags.EnableBeaconRESTApi {
return beaconApi.NewBeaconApiValidatorClient(validatorConn.GetBeaconApiUrl(), validatorConn.GetBeaconApiTimeout(), opt...) return beaconApi.NewBeaconApiValidatorClient(validatorConn.GetBeaconApiUrl(), validatorConn.GetBeaconApiTimeout())
} else { } else {
return grpcApi.NewGrpcValidatorClient(validatorConn.GetGrpcClientConn()) return grpcApi.NewGrpcValidatorClient(validatorConn.GetGrpcClientConn())
} }

View File

@@ -333,7 +333,7 @@ func (v *validator) ReceiveSlots(ctx context.Context, connectionErrorChannel cha
} }
res, err := stream.Recv() res, err := stream.Recv()
if err != nil { if err != nil {
log.WithError(err).Error("Could not receive slots from beacon node: " + iface.ErrConnectionIssue.Error()) log.WithError(err).Error("Could not receive slots from beacon node, " + iface.ErrConnectionIssue.Error())
connectionErrorChannel <- errors.Wrap(iface.ErrConnectionIssue, err.Error()) connectionErrorChannel <- errors.Wrap(iface.ErrConnectionIssue, err.Error())
return return
} }
@@ -1035,10 +1035,6 @@ func (v *validator) PushProposerSettings(ctx context.Context, km keymanager.IKey
return nil return nil
} }
func (v *validator) StartEventStream(ctx context.Context) error {
return v.validatorClient.StartEventStream(ctx)
}
func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) { func (v *validator) filterAndCacheActiveKeys(ctx context.Context, pubkeys [][fieldparams.BLSPubkeyLength]byte, slot primitives.Slot) ([][fieldparams.BLSPubkeyLength]byte, error) {
filteredKeys := make([][fieldparams.BLSPubkeyLength]byte, 0) filteredKeys := make([][fieldparams.BLSPubkeyLength]byte, 0)
statusRequestKeys := make([][]byte, 0) statusRequestKeys := make([][]byte, 0)

View File

@@ -39,7 +39,6 @@ go_library(
"//validator:__subpackages__", "//validator:__subpackages__",
], ],
deps = [ deps = [
"//api:go_default_library",
"//api/gateway:go_default_library", "//api/gateway:go_default_library",
"//api/server:go_default_library", "//api/server:go_default_library",
"//async/event:go_default_library", "//async/event:go_default_library",

View File

@@ -26,7 +26,6 @@ import (
gwruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime" gwruntime "github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/pkg/errors" "github.com/pkg/errors"
fastssz "github.com/prysmaticlabs/fastssz" fastssz "github.com/prysmaticlabs/fastssz"
"github.com/prysmaticlabs/prysm/v4/api"
"github.com/prysmaticlabs/prysm/v4/api/gateway" "github.com/prysmaticlabs/prysm/v4/api/gateway"
"github.com/prysmaticlabs/prysm/v4/api/server" "github.com/prysmaticlabs/prysm/v4/api/server"
"github.com/prysmaticlabs/prysm/v4/async/event" "github.com/prysmaticlabs/prysm/v4/async/event"
@@ -852,7 +851,7 @@ func (c *ValidatorClient) registerRPCGatewayService(router *mux.Router) error {
}, },
}), }),
gwruntime.WithMarshalerOption( gwruntime.WithMarshalerOption(
api.EventStreamMediaType, &gwruntime.EventSourceJSONPb{}, // TODO: remove this "text/event-stream", &gwruntime.EventSourceJSONPb{}, // TODO: remove this
), ),
gwruntime.WithForwardResponseOption(gateway.HttpResponseModifier), gwruntime.WithForwardResponseOption(gateway.HttpResponseModifier),
) )

View File

@@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"github.com/prysmaticlabs/prysm/v4/api"
"github.com/prysmaticlabs/prysm/v4/network/httputil" "github.com/prysmaticlabs/prysm/v4/network/httputil"
pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v4/runtime/version" "github.com/prysmaticlabs/prysm/v4/runtime/version"
@@ -40,7 +39,7 @@ func (s *Server) StreamBeaconLogs(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "validator.web.health.StreamBeaconLogs") ctx, span := trace.StartSpan(r.Context(), "validator.web.health.StreamBeaconLogs")
defer span.End() defer span.End()
// Set up SSE response headers // Set up SSE response headers
w.Header().Set("Content-Type", api.EventStreamMediaType) w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")
@@ -109,7 +108,7 @@ func (s *Server) StreamValidatorLogs(w http.ResponseWriter, r *http.Request) {
close(ch) close(ch)
}() }()
// Set up SSE response headers // Set up SSE response headers
w.Header().Set("Content-Type", api.EventStreamMediaType) w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive") w.Header().Set("Connection", "keep-alive")

View File

@@ -11,7 +11,6 @@ import (
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/golang/protobuf/ptypes/empty" "github.com/golang/protobuf/ptypes/empty"
"github.com/prysmaticlabs/prysm/v4/api"
"github.com/prysmaticlabs/prysm/v4/io/logs/mock" "github.com/prysmaticlabs/prysm/v4/io/logs/mock"
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" pb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
@@ -93,7 +92,7 @@ func TestStreamBeaconLogs(t *testing.T) {
} }
ct, ok := resp.Header["Content-Type"] ct, ok := resp.Header["Content-Type"]
require.Equal(t, ok, true) require.Equal(t, ok, true)
require.Equal(t, ct[0], api.EventStreamMediaType) require.Equal(t, ct[0], "text/event-stream")
cn, ok := resp.Header["Connection"] cn, ok := resp.Header["Connection"]
require.Equal(t, ok, true) require.Equal(t, ok, true)
require.Equal(t, cn[0], "keep-alive") require.Equal(t, cn[0], "keep-alive")
@@ -144,7 +143,7 @@ func TestStreamValidatorLogs(t *testing.T) {
} }
ct, ok := resp.Header["Content-Type"] ct, ok := resp.Header["Content-Type"]
require.Equal(t, ok, true) require.Equal(t, ok, true)
require.Equal(t, ct[0], api.EventStreamMediaType) require.Equal(t, ct[0], "text/event-stream")
cn, ok := resp.Header["Connection"] cn, ok := resp.Header["Connection"]
require.Equal(t, ok, true) require.Equal(t, ok, true)
require.Equal(t, cn[0], "keep-alive") require.Equal(t, cn[0], "keep-alive")