From c11e3392d49c154120ae655bf94da1b4f46ea4a0 Mon Sep 17 00:00:00 2001 From: kasey <489222+kasey@users.noreply.github.com> Date: Fri, 4 Oct 2024 16:18:17 -0500 Subject: [PATCH] SSE implementation that sheds stuck clients (#14413) * sse implementation that sheds stuck clients * Radek and James feedback * Refactor event streamer code for readability * less-flaky test signaling * test case where queue fills; fixes * add changelog entry * james and preston feedback * swap our Subscription interface with an alias * event.Data can be nil for the payload attr event * deepsource --------- Co-authored-by: Kasey Kirkham --- CHANGELOG.md | 1 + api/headers.go | 8 + api/server/structs/conversions.go | 35 + async/event/BUILD.bazel | 1 + async/event/feed.go | 1 + async/event/interface.go | 8 + async/event/subscription.go | 19 - beacon-chain/blockchain/setup_test.go | 2 +- beacon-chain/blockchain/testing/mock.go | 59 +- beacon-chain/core/feed/operation/notifier.go | 2 +- beacon-chain/core/feed/state/notifier.go | 2 +- beacon-chain/execution/service_test.go | 2 +- beacon-chain/node/node.go | 4 +- beacon-chain/rpc/eth/events/BUILD.bazel | 8 +- beacon-chain/rpc/eth/events/events.go | 765 +++++++++++-------- beacon-chain/rpc/eth/events/events_test.go | 458 +++++------ beacon-chain/rpc/eth/events/http_test.go | 75 ++ beacon-chain/rpc/eth/events/server.go | 4 + deps.bzl | 12 + go.mod | 2 + go.sum | 5 + 21 files changed, 908 insertions(+), 565 deletions(-) create mode 100644 async/event/interface.go create mode 100644 beacon-chain/rpc/eth/events/http_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index e9b42f0f3a..eed6e3e46b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve - Tests to ensure sepolia config matches the official upstream yaml - HTTP endpoint for PublishBlobs - GetBlockV2, GetBlindedBlock, ProduceBlockV2, ProduceBlockV3: add Electra case. +- SSE implementation that sheds stuck clients. [pr](https://github.com/prysmaticlabs/prysm/pull/14413) ### Changed diff --git a/api/headers.go b/api/headers.go index b3f7a29ab4..69e279416c 100644 --- a/api/headers.go +++ b/api/headers.go @@ -1,5 +1,7 @@ package api +import "net/http" + const ( VersionHeader = "Eth-Consensus-Version" ExecutionPayloadBlindedHeader = "Eth-Execution-Payload-Blinded" @@ -10,3 +12,9 @@ const ( EventStreamMediaType = "text/event-stream" KeepAlive = "keep-alive" ) + +// SetSSEHeaders sets the headers needed for a server-sent event response. +func SetSSEHeaders(w http.ResponseWriter) { + w.Header().Set("Content-Type", EventStreamMediaType) + w.Header().Set("Connection", KeepAlive) +} diff --git a/api/server/structs/conversions.go b/api/server/structs/conversions.go index b8bc72f136..d735910929 100644 --- a/api/server/structs/conversions.go +++ b/api/server/structs/conversions.go @@ -15,6 +15,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v5/math" enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1" + ethv1 "github.com/prysmaticlabs/prysm/v5/proto/eth/v1" eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" ) @@ -1508,3 +1509,37 @@ func PendingConsolidationsFromConsensus(cs []*eth.PendingConsolidation) []*Pendi } return consolidations } + +func HeadEventFromV1(event *ethv1.EventHead) *HeadEvent { + return &HeadEvent{ + Slot: fmt.Sprintf("%d", event.Slot), + Block: hexutil.Encode(event.Block), + State: hexutil.Encode(event.State), + EpochTransition: event.EpochTransition, + ExecutionOptimistic: event.ExecutionOptimistic, + PreviousDutyDependentRoot: hexutil.Encode(event.PreviousDutyDependentRoot), + CurrentDutyDependentRoot: hexutil.Encode(event.CurrentDutyDependentRoot), + } +} + +func FinalizedCheckpointEventFromV1(event *ethv1.EventFinalizedCheckpoint) *FinalizedCheckpointEvent { + return &FinalizedCheckpointEvent{ + Block: hexutil.Encode(event.Block), + State: hexutil.Encode(event.State), + Epoch: fmt.Sprintf("%d", event.Epoch), + ExecutionOptimistic: event.ExecutionOptimistic, + } +} + +func EventChainReorgFromV1(event *ethv1.EventChainReorg) *ChainReorgEvent { + return &ChainReorgEvent{ + Slot: fmt.Sprintf("%d", event.Slot), + Depth: fmt.Sprintf("%d", event.Depth), + OldHeadBlock: hexutil.Encode(event.OldHeadBlock), + NewHeadBlock: hexutil.Encode(event.NewHeadBlock), + OldHeadState: hexutil.Encode(event.OldHeadState), + NewHeadState: hexutil.Encode(event.NewHeadState), + Epoch: fmt.Sprintf("%d", event.Epoch), + ExecutionOptimistic: event.ExecutionOptimistic, + } +} diff --git a/async/event/BUILD.bazel b/async/event/BUILD.bazel index 6d32b727ad..02ebe83ca9 100644 --- a/async/event/BUILD.bazel +++ b/async/event/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "feed.go", + "interface.go", "subscription.go", ], importpath = "github.com/prysmaticlabs/prysm/v5/async/event", diff --git a/async/event/feed.go b/async/event/feed.go index 1ebb581c14..0d44c76857 100644 --- a/async/event/feed.go +++ b/async/event/feed.go @@ -22,3 +22,4 @@ import ( // Feed is a re-export of the go-ethereum event feed. type Feed = geth_event.Feed +type Subscription = geth_event.Subscription diff --git a/async/event/interface.go b/async/event/interface.go new file mode 100644 index 0000000000..d54f9fd321 --- /dev/null +++ b/async/event/interface.go @@ -0,0 +1,8 @@ +package event + +// SubscriberSender is an abstract representation of an *event.Feed +// to use in describing types that accept or return an *event.Feed. +type SubscriberSender interface { + Subscribe(channel interface{}) Subscription + Send(value interface{}) (nsent int) +} diff --git a/async/event/subscription.go b/async/event/subscription.go index 087810b25d..9ed0dfe1e8 100644 --- a/async/event/subscription.go +++ b/async/event/subscription.go @@ -28,25 +28,6 @@ import ( // request backoff time. const waitQuotient = 10 -// Subscription represents a stream of events. The carrier of the events is typically a -// channel, but isn't part of the interface. -// -// Subscriptions can fail while established. Failures are reported through an error -// channel. It receives a value if there is an issue with the subscription (e.g. the -// network connection delivering the events has been closed). Only one value will ever be -// sent. -// -// The error channel is closed when the subscription ends successfully (i.e. when the -// source of events is closed). It is also closed when Unsubscribe is called. -// -// The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all -// cases to ensure that resources related to the subscription are released. It can be -// called any number of times. -type Subscription interface { - Err() <-chan error // returns the error channel - Unsubscribe() // cancels sending of events, closing the error channel -} - // NewSubscription runs a producer function as a subscription in a new goroutine. The // channel given to the producer is closed when Unsubscribe is called. If fn returns an // error, it is sent on the subscription's error channel. diff --git a/beacon-chain/blockchain/setup_test.go b/beacon-chain/blockchain/setup_test.go index 22acd22147..f21ddc6915 100644 --- a/beacon-chain/blockchain/setup_test.go +++ b/beacon-chain/blockchain/setup_test.go @@ -32,7 +32,7 @@ type mockBeaconNode struct { } // StateFeed mocks the same method in the beacon node. -func (mbn *mockBeaconNode) StateFeed() *event.Feed { +func (mbn *mockBeaconNode) StateFeed() event.SubscriberSender { mbn.mu.Lock() defer mbn.mu.Unlock() if mbn.stateFeed == nil { diff --git a/beacon-chain/blockchain/testing/mock.go b/beacon-chain/blockchain/testing/mock.go index d0da4f0cd0..0ba1be0765 100644 --- a/beacon-chain/blockchain/testing/mock.go +++ b/beacon-chain/blockchain/testing/mock.go @@ -98,6 +98,44 @@ func (s *ChainService) BlockNotifier() blockfeed.Notifier { return s.blockNotifier } +type EventFeedWrapper struct { + feed *event.Feed + subscribed chan struct{} // this channel is closed once a subscription is made +} + +func (w *EventFeedWrapper) Subscribe(channel interface{}) event.Subscription { + select { + case <-w.subscribed: + break // already closed + default: + close(w.subscribed) + } + return w.feed.Subscribe(channel) +} + +func (w *EventFeedWrapper) Send(value interface{}) int { + return w.feed.Send(value) +} + +// WaitForSubscription allows test to wait for the feed to have a subscription before beginning to send events. +func (w *EventFeedWrapper) WaitForSubscription(ctx context.Context) error { + select { + case <-w.subscribed: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +var _ event.SubscriberSender = &EventFeedWrapper{} + +func NewEventFeedWrapper() *EventFeedWrapper { + return &EventFeedWrapper{ + feed: new(event.Feed), + subscribed: make(chan struct{}), + } +} + // MockBlockNotifier mocks the block notifier. type MockBlockNotifier struct { feed *event.Feed @@ -131,7 +169,7 @@ func (msn *MockStateNotifier) ReceivedEvents() []*feed.Event { } // StateFeed returns a state feed. -func (msn *MockStateNotifier) StateFeed() *event.Feed { +func (msn *MockStateNotifier) StateFeed() event.SubscriberSender { msn.feedLock.Lock() defer msn.feedLock.Unlock() @@ -159,6 +197,23 @@ func (msn *MockStateNotifier) StateFeed() *event.Feed { return msn.feed } +// NewSimpleStateNotifier makes a state feed without the custom mock feed machinery. +func NewSimpleStateNotifier() *MockStateNotifier { + return &MockStateNotifier{feed: new(event.Feed)} +} + +type SimpleNotifier struct { + Feed event.SubscriberSender +} + +func (n *SimpleNotifier) StateFeed() event.SubscriberSender { + return n.Feed +} + +func (n *SimpleNotifier) OperationFeed() event.SubscriberSender { + return n.Feed +} + // OperationNotifier mocks the same method in the chain service. func (s *ChainService) OperationNotifier() opfeed.Notifier { if s.opNotifier == nil { @@ -173,7 +228,7 @@ type MockOperationNotifier struct { } // OperationFeed returns an operation feed. -func (mon *MockOperationNotifier) OperationFeed() *event.Feed { +func (mon *MockOperationNotifier) OperationFeed() event.SubscriberSender { if mon.feed == nil { mon.feed = new(event.Feed) } diff --git a/beacon-chain/core/feed/operation/notifier.go b/beacon-chain/core/feed/operation/notifier.go index 798519ee84..acfd5cf68c 100644 --- a/beacon-chain/core/feed/operation/notifier.go +++ b/beacon-chain/core/feed/operation/notifier.go @@ -4,5 +4,5 @@ import "github.com/prysmaticlabs/prysm/v5/async/event" // Notifier interface defines the methods of the service that provides beacon block operation updates to consumers. type Notifier interface { - OperationFeed() *event.Feed + OperationFeed() event.SubscriberSender } diff --git a/beacon-chain/core/feed/state/notifier.go b/beacon-chain/core/feed/state/notifier.go index 6ba795e73a..f3487279ab 100644 --- a/beacon-chain/core/feed/state/notifier.go +++ b/beacon-chain/core/feed/state/notifier.go @@ -4,5 +4,5 @@ import "github.com/prysmaticlabs/prysm/v5/async/event" // Notifier interface defines the methods of the service that provides state updates to consumers. type Notifier interface { - StateFeed() *event.Feed + StateFeed() event.SubscriberSender } diff --git a/beacon-chain/execution/service_test.go b/beacon-chain/execution/service_test.go index 32e8bc56df..f32cdc5444 100644 --- a/beacon-chain/execution/service_test.go +++ b/beacon-chain/execution/service_test.go @@ -73,7 +73,7 @@ type goodNotifier struct { MockStateFeed *event.Feed } -func (g *goodNotifier) StateFeed() *event.Feed { +func (g *goodNotifier) StateFeed() event.SubscriberSender { if g.MockStateFeed == nil { g.MockStateFeed = new(event.Feed) } diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index c6e6f510ce..b5c735ba89 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -398,7 +398,7 @@ func initSyncWaiter(ctx context.Context, complete chan struct{}) func() error { } // StateFeed implements statefeed.Notifier. -func (b *BeaconNode) StateFeed() *event.Feed { +func (b *BeaconNode) StateFeed() event.SubscriberSender { return b.stateFeed } @@ -408,7 +408,7 @@ func (b *BeaconNode) BlockFeed() *event.Feed { } // OperationFeed implements opfeed.Notifier. -func (b *BeaconNode) OperationFeed() *event.Feed { +func (b *BeaconNode) OperationFeed() event.SubscriberSender { return b.opFeed } diff --git a/beacon-chain/rpc/eth/events/BUILD.bazel b/beacon-chain/rpc/eth/events/BUILD.bazel index 640017b6b2..8a549d05f8 100644 --- a/beacon-chain/rpc/eth/events/BUILD.bazel +++ b/beacon-chain/rpc/eth/events/BUILD.bazel @@ -29,12 +29,16 @@ go_library( "//time/slots:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_pkg_errors//:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", ], ) go_test( name = "go_default_test", - srcs = ["events_test.go"], + srcs = [ + "events_test.go", + "http_test.go", + ], embed = [":go_default_library"], deps = [ "//beacon-chain/blockchain/testing:go_default_library", @@ -49,9 +53,9 @@ go_test( "//consensus-types/primitives:go_default_library", "//proto/eth/v1:go_default_library", "//proto/prysm/v1alpha1:go_default_library", - "//testing/assert:go_default_library", "//testing/require:go_default_library", "//testing/util:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library", + "@com_github_r3labs_sse_v2//:go_default_library", ], ) diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index 1c726be459..fe753d8714 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -1,11 +1,13 @@ package events import ( + "bytes" "context" "encoding/json" "fmt" + "io" "net/http" - time2 "time" + "time" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/pkg/errors" @@ -16,7 +18,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation" statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time" + chaintime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" @@ -26,9 +28,13 @@ import ( eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/time/slots" + log "github.com/sirupsen/logrus" ) +const DefaultEventFeedDepth = 1000 + const ( + InvalidTopic = "__invalid__" // HeadTopic represents a new chain head event topic. HeadTopic = "head" // BlockTopic represents a new produced block event topic. @@ -59,25 +65,83 @@ const ( LightClientOptimisticUpdateTopic = "light_client_optimistic_update" ) -const topicDataMismatch = "Event data type %T does not correspond to event topic %s" +var ( + errInvalidTopicName = errors.New("invalid topic name") + errNoValidTopics = errors.New("no valid topics specified") + errSlowReader = errors.New("client failed to read fast enough to keep outgoing buffer below threshold") + errNotRequested = errors.New("event not requested by client") + errUnhandledEventData = errors.New("unable to represent event data in the event stream") +) -const chanBuffer = 1000 +// StreamingResponseWriter defines a type that can be used by the eventStreamer. +// This must be an http.ResponseWriter that supports flushing and hijacking. +type StreamingResponseWriter interface { + http.ResponseWriter + http.Flusher +} -var casesHandled = map[string]bool{ - HeadTopic: true, - BlockTopic: true, - AttestationTopic: true, - VoluntaryExitTopic: true, - FinalizedCheckpointTopic: true, - ChainReorgTopic: true, - SyncCommitteeContributionTopic: true, - BLSToExecutionChangeTopic: true, - PayloadAttributesTopic: true, - BlobSidecarTopic: true, - ProposerSlashingTopic: true, - AttesterSlashingTopic: true, - LightClientFinalityUpdateTopic: true, - LightClientOptimisticUpdateTopic: true, +// The eventStreamer uses lazyReaders to defer serialization until the moment the value is ready to be written to the client. +type lazyReader func() io.Reader + +var opsFeedEventTopics = map[feed.EventType]string{ + operation.AggregatedAttReceived: AttestationTopic, + operation.UnaggregatedAttReceived: AttestationTopic, + operation.ExitReceived: VoluntaryExitTopic, + operation.SyncCommitteeContributionReceived: SyncCommitteeContributionTopic, + operation.BLSToExecutionChangeReceived: BLSToExecutionChangeTopic, + operation.BlobSidecarReceived: BlobSidecarTopic, + operation.AttesterSlashingReceived: AttesterSlashingTopic, + operation.ProposerSlashingReceived: ProposerSlashingTopic, +} + +var stateFeedEventTopics = map[feed.EventType]string{ + statefeed.NewHead: HeadTopic, + statefeed.MissedSlot: PayloadAttributesTopic, + statefeed.FinalizedCheckpoint: FinalizedCheckpointTopic, + statefeed.LightClientFinalityUpdate: LightClientFinalityUpdateTopic, + statefeed.LightClientOptimisticUpdate: LightClientOptimisticUpdateTopic, + statefeed.Reorg: ChainReorgTopic, + statefeed.BlockProcessed: BlockTopic, +} + +var topicsForStateFeed = topicsForFeed(stateFeedEventTopics) +var topicsForOpsFeed = topicsForFeed(opsFeedEventTopics) + +func topicsForFeed(em map[feed.EventType]string) map[string]bool { + topics := make(map[string]bool, len(em)) + for _, topic := range em { + topics[topic] = true + } + return topics +} + +type topicRequest struct { + topics map[string]bool + needStateFeed bool + needOpsFeed bool +} + +func (req *topicRequest) requested(topic string) bool { + return req.topics[topic] +} + +func newTopicRequest(topics []string) (*topicRequest, error) { + req := &topicRequest{topics: make(map[string]bool)} + for _, name := range topics { + if topicsForStateFeed[name] { + req.needStateFeed = true + } else if topicsForOpsFeed[name] { + req.needOpsFeed = true + } else { + return nil, errors.Wrapf(errInvalidTopicName, name) + } + req.topics[name] = true + } + if len(req.topics) == 0 || (!req.needStateFeed && !req.needOpsFeed) { + return nil, errNoValidTopics + } + + return req, nil } // StreamEvents provides an endpoint to subscribe to the beacon node Server-Sent-Events stream. @@ -88,326 +152,412 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) { ctx, span := trace.StartSpan(r.Context(), "events.StreamEvents") defer span.End() - flusher, ok := w.(http.Flusher) + topics, err := newTopicRequest(r.URL.Query()["topics"]) + if err != nil { + httputil.HandleError(w, err.Error(), http.StatusBadRequest) + return + } + + sw, ok := w.(StreamingResponseWriter) if !ok { - httputil.HandleError(w, "Streaming unsupported!", http.StatusInternalServerError) + msg := "beacon node misconfiguration: http stack may not support required response handling features, like flushing" + httputil.HandleError(w, msg, http.StatusInternalServerError) return } - - topics := r.URL.Query()["topics"] - if len(topics) == 0 { - httputil.HandleError(w, "No topics specified to subscribe to", http.StatusBadRequest) - return + depth := s.EventFeedDepth + if depth == 0 { + depth = DefaultEventFeedDepth } - topicsMap := make(map[string]bool) - for _, topic := range topics { - if _, ok := casesHandled[topic]; !ok { - httputil.HandleError(w, fmt.Sprintf("Invalid topic: %s", topic), http.StatusBadRequest) - return - } - topicsMap[topic] = true - } - - // Subscribe to event feeds from information received in the beacon node runtime. - opsChan := make(chan *feed.Event, chanBuffer) - opsSub := s.OperationNotifier.OperationFeed().Subscribe(opsChan) - stateChan := make(chan *feed.Event, chanBuffer) - stateSub := s.StateNotifier.StateFeed().Subscribe(stateChan) - defer opsSub.Unsubscribe() - defer stateSub.Unsubscribe() - - // Set up SSE response headers - w.Header().Set("Content-Type", api.EventStreamMediaType) - w.Header().Set("Connection", api.KeepAlive) - - // Handle each event received and context cancellation. - // We send a keepalive dummy message immediately to prevent clients - // stalling while waiting for the first response chunk. - // After that we send a keepalive dummy message every SECONDS_PER_SLOT - // to prevent anyone (e.g. proxy servers) from closing connections. - if err := sendKeepalive(w, flusher); err != nil { + es, err := newEventStreamer(depth, s.KeepAliveInterval) + if err != nil { httputil.HandleError(w, err.Error(), http.StatusInternalServerError) return } - keepaliveTicker := time2.NewTicker(time2.Duration(params.BeaconConfig().SecondsPerSlot) * time2.Second) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + api.SetSSEHeaders(sw) + go es.outboxWriteLoop(ctx, cancel, sw) + if err := es.recvEventLoop(ctx, cancel, topics, s); err != nil { + log.WithError(err).Debug("Shutting down StreamEvents handler.") + } +} + +func newEventStreamer(buffSize int, ka time.Duration) (*eventStreamer, error) { + if ka == 0 { + ka = time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second + } + return &eventStreamer{ + outbox: make(chan lazyReader, buffSize), + keepAlive: ka, + }, nil +} + +type eventStreamer struct { + outbox chan lazyReader + keepAlive time.Duration +} + +func (es *eventStreamer) recvEventLoop(ctx context.Context, cancel context.CancelFunc, req *topicRequest, s *Server) error { + eventsChan := make(chan *feed.Event, len(es.outbox)) + if req.needOpsFeed { + opsSub := s.OperationNotifier.OperationFeed().Subscribe(eventsChan) + defer opsSub.Unsubscribe() + } + if req.needStateFeed { + stateSub := s.StateNotifier.StateFeed().Subscribe(eventsChan) + defer stateSub.Unsubscribe() + } for { select { - case event := <-opsChan: - if err := handleBlockOperationEvents(w, flusher, topicsMap, event); err != nil { - httputil.HandleError(w, err.Error(), http.StatusInternalServerError) - return - } - case event := <-stateChan: - if err := s.handleStateEvents(ctx, w, flusher, topicsMap, event); err != nil { - httputil.HandleError(w, err.Error(), http.StatusInternalServerError) - return - } - case <-keepaliveTicker.C: - if err := sendKeepalive(w, flusher); err != nil { - httputil.HandleError(w, err.Error(), http.StatusInternalServerError) - return - } case <-ctx.Done(): + return ctx.Err() + case event := <-eventsChan: + lr, err := s.lazyReaderForEvent(ctx, event, req) + if err != nil { + if !errors.Is(err, errNotRequested) { + log.WithField("event_type", fmt.Sprintf("%v", event.Data)).WithError(err).Error("StreamEvents API endpoint received an event it was unable to handle.") + } + continue + } + // If the client can't keep up, the outbox will eventually completely fill, at which + // safeWrite will error, and we'll hit the below return statement, at which point the deferred + // Unsuscribe calls will be made and the event feed will stop writing to this channel. + // Since the outbox and event stream channels are separately buffered, the event subscription + // channel should stay relatively empty, which gives this loop time to unsubscribe + // and cleanup before the event stream channel fills and disrupts other readers. + if err := es.safeWrite(ctx, lr); err != nil { + cancel() + // note: we could hijack the connection and close it here. Does that cause issues? What are the benefits? + // A benefit of hijack and close is that it may force an error on the remote end, however just closing the context of the + // http handler may be sufficient to cause the remote http response reader to close. + if errors.Is(err, errSlowReader) { + log.WithError(err).Warn("Client is unable to keep up with event stream, shutting down.") + } + return err + } + } + } +} + +func (es *eventStreamer) safeWrite(ctx context.Context, rf func() io.Reader) error { + if rf == nil { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case es.outbox <- rf: + return nil + default: + // If this is the case, the select case to write to the outbox could not proceed, meaning the outbox is full. + // If a reader can't keep up with the stream, we shut them down. + return errSlowReader + } +} + +// newlineReader is used to write keep-alives to the client. +// keep-alives in the sse protocol are a single ':' colon followed by 2 newlines. +func newlineReader() io.Reader { + return bytes.NewBufferString(":\n\n") +} + +// outboxWriteLoop runs in a separate goroutine. Its job is to write the values in the outbox to +// the client as fast as the client can read them. +func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.CancelFunc, w StreamingResponseWriter) { + var err error + defer func() { + if err != nil { + log.WithError(err).Debug("Event streamer shutting down due to error.") + } + }() + defer cancel() + // Write a keepalive at the start to test the connection and simplify test setup. + if err = es.writeOutbox(ctx, w, nil); err != nil { + return + } + + kaT := time.NewTimer(es.keepAlive) + // Ensure the keepalive timer is stopped and drained if it has fired. + defer func() { + if !kaT.Stop() { + <-kaT.C + } + }() + for { + select { + case <-ctx.Done(): + err = ctx.Err() return + case <-kaT.C: + if err = es.writeOutbox(ctx, w, nil); err != nil { + return + } + // In this case the timer doesn't need to be Stopped before the Reset call after the select statement, + // because the timer has already fired. + case lr := <-es.outbox: + if err = es.writeOutbox(ctx, w, lr); err != nil { + return + } + // We don't know if the timer fired concurrently to this case being ready, so we need to check the return + // of Stop and drain the timer channel if it fired. We won't need to do this in go 1.23. + if !kaT.Stop() { + <-kaT.C + } + } + kaT.Reset(es.keepAlive) + } +} + +func (es *eventStreamer) writeOutbox(ctx context.Context, w StreamingResponseWriter, first lazyReader) error { + needKeepAlive := true + if first != nil { + if _, err := io.Copy(w, first()); err != nil { + return err + } + needKeepAlive = false + } + // While the first event was being read by the client, further events may be queued in the outbox. + // We can drain them right away rather than go back out to the outer select statement, where the keepAlive timer + // may have fired, triggering an unnecessary extra keep-alive write and flush. + for { + select { + case <-ctx.Done(): + return ctx.Err() + case rf := <-es.outbox: + if _, err := io.Copy(w, rf()); err != nil { + return err + } + needKeepAlive = false + default: + if needKeepAlive { + if _, err := io.Copy(w, newlineReader()); err != nil { + return err + } + } + w.Flush() + return nil } } } -func handleBlockOperationEvents(w http.ResponseWriter, flusher http.Flusher, requestedTopics map[string]bool, event *feed.Event) error { - switch event.Type { - case operation.AggregatedAttReceived: - if _, ok := requestedTopics[AttestationTopic]; !ok { - return nil - } - attData, ok := event.Data.(*operation.AggregatedAttReceivedData) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, AttestationTopic) - } - att := structs.AttFromConsensus(attData.Attestation.Aggregate) - return send(w, flusher, AttestationTopic, att) - case operation.UnaggregatedAttReceived: - if _, ok := requestedTopics[AttestationTopic]; !ok { - return nil - } - attData, ok := event.Data.(*operation.UnAggregatedAttReceivedData) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, AttestationTopic) - } - a, ok := attData.Attestation.(*eth.Attestation) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, AttestationTopic) - } - att := structs.AttFromConsensus(a) - return send(w, flusher, AttestationTopic, att) - case operation.ExitReceived: - if _, ok := requestedTopics[VoluntaryExitTopic]; !ok { - return nil - } - exitData, ok := event.Data.(*operation.ExitReceivedData) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, VoluntaryExitTopic) - } - exit := structs.SignedExitFromConsensus(exitData.Exit) - return send(w, flusher, VoluntaryExitTopic, exit) - case operation.SyncCommitteeContributionReceived: - if _, ok := requestedTopics[SyncCommitteeContributionTopic]; !ok { - return nil - } - contributionData, ok := event.Data.(*operation.SyncCommitteeContributionReceivedData) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, SyncCommitteeContributionTopic) - } - contribution := structs.SignedContributionAndProofFromConsensus(contributionData.Contribution) - return send(w, flusher, SyncCommitteeContributionTopic, contribution) - case operation.BLSToExecutionChangeReceived: - if _, ok := requestedTopics[BLSToExecutionChangeTopic]; !ok { - return nil - } - changeData, ok := event.Data.(*operation.BLSToExecutionChangeReceivedData) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, BLSToExecutionChangeTopic) - } - return send(w, flusher, BLSToExecutionChangeTopic, structs.SignedBLSChangeFromConsensus(changeData.Change)) - case operation.BlobSidecarReceived: - if _, ok := requestedTopics[BlobSidecarTopic]; !ok { - return nil - } - blobData, ok := event.Data.(*operation.BlobSidecarReceivedData) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, BlobSidecarTopic) - } - versionedHash := blockchain.ConvertKzgCommitmentToVersionedHash(blobData.Blob.KzgCommitment) - blobEvent := &structs.BlobSidecarEvent{ - BlockRoot: hexutil.Encode(blobData.Blob.BlockRootSlice()), - Index: fmt.Sprintf("%d", blobData.Blob.Index), - Slot: fmt.Sprintf("%d", blobData.Blob.Slot()), - VersionedHash: versionedHash.String(), - KzgCommitment: hexutil.Encode(blobData.Blob.KzgCommitment), - } - return send(w, flusher, BlobSidecarTopic, blobEvent) - case operation.AttesterSlashingReceived: - if _, ok := requestedTopics[AttesterSlashingTopic]; !ok { - return nil - } - attesterSlashingData, ok := event.Data.(*operation.AttesterSlashingReceivedData) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, AttesterSlashingTopic) - } - slashing, ok := attesterSlashingData.AttesterSlashing.(*eth.AttesterSlashing) - if ok { - return send(w, flusher, AttesterSlashingTopic, structs.AttesterSlashingFromConsensus(slashing)) - } - // TODO: extend to Electra - case operation.ProposerSlashingReceived: - if _, ok := requestedTopics[ProposerSlashingTopic]; !ok { - return nil - } - proposerSlashingData, ok := event.Data.(*operation.ProposerSlashingReceivedData) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, ProposerSlashingTopic) - } - return send(w, flusher, ProposerSlashingTopic, structs.ProposerSlashingFromConsensus(proposerSlashingData.ProposerSlashing)) +func jsonMarshalReader(name string, v any) io.Reader { + d, err := json.Marshal(v) + if err != nil { + log.WithError(err).WithField("type_name", fmt.Sprintf("%T", v)).Error("Could not marshal event data.") + return nil } - return nil + return bytes.NewBufferString("event: " + name + "\ndata: " + string(d) + "\n\n") } -func (s *Server) handleStateEvents(ctx context.Context, w http.ResponseWriter, flusher http.Flusher, requestedTopics map[string]bool, event *feed.Event) error { - switch event.Type { - case statefeed.NewHead: - if _, ok := requestedTopics[HeadTopic]; ok { - headData, ok := event.Data.(*ethpb.EventHead) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, HeadTopic) - } - head := &structs.HeadEvent{ - Slot: fmt.Sprintf("%d", headData.Slot), - Block: hexutil.Encode(headData.Block), - State: hexutil.Encode(headData.State), - EpochTransition: headData.EpochTransition, - ExecutionOptimistic: headData.ExecutionOptimistic, - PreviousDutyDependentRoot: hexutil.Encode(headData.PreviousDutyDependentRoot), - CurrentDutyDependentRoot: hexutil.Encode(headData.CurrentDutyDependentRoot), - } - return send(w, flusher, HeadTopic, head) +func topicForEvent(event *feed.Event) string { + switch event.Data.(type) { + case *operation.AggregatedAttReceivedData: + return AttestationTopic + case *operation.UnAggregatedAttReceivedData: + return AttestationTopic + case *operation.ExitReceivedData: + return VoluntaryExitTopic + case *operation.SyncCommitteeContributionReceivedData: + return SyncCommitteeContributionTopic + case *operation.BLSToExecutionChangeReceivedData: + return BLSToExecutionChangeTopic + case *operation.BlobSidecarReceivedData: + return BlobSidecarTopic + case *operation.AttesterSlashingReceivedData: + return AttesterSlashingTopic + case *operation.ProposerSlashingReceivedData: + return ProposerSlashingTopic + case *ethpb.EventHead: + return HeadTopic + case *ethpb.EventFinalizedCheckpoint: + return FinalizedCheckpointTopic + case *ethpbv2.LightClientFinalityUpdateWithVersion: + return LightClientFinalityUpdateTopic + case *ethpbv2.LightClientOptimisticUpdateWithVersion: + return LightClientOptimisticUpdateTopic + case *ethpb.EventChainReorg: + return ChainReorgTopic + case *statefeed.BlockProcessedData: + return BlockTopic + default: + if event.Type == statefeed.MissedSlot { + return PayloadAttributesTopic } - if _, ok := requestedTopics[PayloadAttributesTopic]; ok { - return s.sendPayloadAttributes(ctx, w, flusher) - } - case statefeed.MissedSlot: - if _, ok := requestedTopics[PayloadAttributesTopic]; ok { - return s.sendPayloadAttributes(ctx, w, flusher) - } - case statefeed.FinalizedCheckpoint: - if _, ok := requestedTopics[FinalizedCheckpointTopic]; !ok { - return nil - } - checkpointData, ok := event.Data.(*ethpb.EventFinalizedCheckpoint) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, FinalizedCheckpointTopic) - } - checkpoint := &structs.FinalizedCheckpointEvent{ - Block: hexutil.Encode(checkpointData.Block), - State: hexutil.Encode(checkpointData.State), - Epoch: fmt.Sprintf("%d", checkpointData.Epoch), - ExecutionOptimistic: checkpointData.ExecutionOptimistic, - } - return send(w, flusher, FinalizedCheckpointTopic, checkpoint) - case statefeed.LightClientFinalityUpdate: - if _, ok := requestedTopics[LightClientFinalityUpdateTopic]; !ok { - return nil - } - updateData, ok := event.Data.(*ethpbv2.LightClientFinalityUpdateWithVersion) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, LightClientFinalityUpdateTopic) - } - update, err := structs.LightClientFinalityUpdateFromConsensus(updateData.Data) - if err != nil { - return err - } - updateEvent := &structs.LightClientFinalityUpdateEvent{ - Version: version.String(int(updateData.Version)), - Data: update, - } - return send(w, flusher, LightClientFinalityUpdateTopic, updateEvent) - case statefeed.LightClientOptimisticUpdate: - if _, ok := requestedTopics[LightClientOptimisticUpdateTopic]; !ok { - return nil - } - updateData, ok := event.Data.(*ethpbv2.LightClientOptimisticUpdateWithVersion) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, LightClientOptimisticUpdateTopic) - } - update, err := structs.LightClientOptimisticUpdateFromConsensus(updateData.Data) - if err != nil { - return err - } - updateEvent := &structs.LightClientOptimisticUpdateEvent{ - Version: version.String(int(updateData.Version)), - Data: update, - } - return send(w, flusher, LightClientOptimisticUpdateTopic, updateEvent) - case statefeed.Reorg: - if _, ok := requestedTopics[ChainReorgTopic]; !ok { - return nil - } - reorgData, ok := event.Data.(*ethpb.EventChainReorg) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, ChainReorgTopic) - } - reorg := &structs.ChainReorgEvent{ - Slot: fmt.Sprintf("%d", reorgData.Slot), - Depth: fmt.Sprintf("%d", reorgData.Depth), - OldHeadBlock: hexutil.Encode(reorgData.OldHeadBlock), - NewHeadBlock: hexutil.Encode(reorgData.NewHeadBlock), - OldHeadState: hexutil.Encode(reorgData.OldHeadState), - NewHeadState: hexutil.Encode(reorgData.NewHeadState), - Epoch: fmt.Sprintf("%d", reorgData.Epoch), - ExecutionOptimistic: reorgData.ExecutionOptimistic, - } - return send(w, flusher, ChainReorgTopic, reorg) - case statefeed.BlockProcessed: - if _, ok := requestedTopics[BlockTopic]; !ok { - return nil - } - blkData, ok := event.Data.(*statefeed.BlockProcessedData) - if !ok { - return write(w, flusher, topicDataMismatch, event.Data, BlockTopic) - } - blockRoot, err := blkData.SignedBlock.Block().HashTreeRoot() - if err != nil { - return write(w, flusher, "Could not get block root: "+err.Error()) - } - blk := &structs.BlockEvent{ - Slot: fmt.Sprintf("%d", blkData.Slot), - Block: hexutil.Encode(blockRoot[:]), - ExecutionOptimistic: blkData.Optimistic, - } - return send(w, flusher, BlockTopic, blk) + return InvalidTopic + } +} + +func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topics *topicRequest) (lazyReader, error) { + eventName := topicForEvent(event) + if !topics.requested(eventName) { + return nil, errNotRequested + } + if eventName == PayloadAttributesTopic { + return s.currentPayloadAttributes(ctx) + } + if event == nil || event.Data == nil { + return nil, errors.New("event or event data is nil") + } + switch v := event.Data.(type) { + case *ethpb.EventHead: + // The head event is a special case because, if the client requested the payload attributes topic, + // we send two event messages in reaction; the head event and the payload attributes. + headReader := func() io.Reader { + return jsonMarshalReader(eventName, structs.HeadEventFromV1(v)) + } + // Don't do the expensive attr lookup unless the client requested it. + if !topics.requested(PayloadAttributesTopic) { + return headReader, nil + } + // Since payload attributes could change before the outbox is written, we need to do a blocking operation to + // get the current payload attributes right here. + attrReader, err := s.currentPayloadAttributes(ctx) + if err != nil { + return nil, errors.Wrap(err, "could not get payload attributes for head event") + } + return func() io.Reader { + return io.MultiReader(headReader(), attrReader()) + }, nil + case *operation.AggregatedAttReceivedData: + return func() io.Reader { + att := structs.AttFromConsensus(v.Attestation.Aggregate) + return jsonMarshalReader(eventName, att) + }, nil + case *operation.UnAggregatedAttReceivedData: + att, ok := v.Attestation.(*eth.Attestation) + if !ok { + return nil, errors.Wrapf(errUnhandledEventData, "Unexpected type %T for the .Attestation field of UnAggregatedAttReceivedData", v.Attestation) + } + return func() io.Reader { + att := structs.AttFromConsensus(att) + return jsonMarshalReader(eventName, att) + }, nil + case *operation.ExitReceivedData: + return func() io.Reader { + return jsonMarshalReader(eventName, structs.SignedExitFromConsensus(v.Exit)) + }, nil + case *operation.SyncCommitteeContributionReceivedData: + return func() io.Reader { + return jsonMarshalReader(eventName, structs.SignedContributionAndProofFromConsensus(v.Contribution)) + }, nil + case *operation.BLSToExecutionChangeReceivedData: + return func() io.Reader { + return jsonMarshalReader(eventName, structs.SignedBLSChangeFromConsensus(v.Change)) + }, nil + case *operation.BlobSidecarReceivedData: + return func() io.Reader { + versionedHash := blockchain.ConvertKzgCommitmentToVersionedHash(v.Blob.KzgCommitment) + return jsonMarshalReader(eventName, &structs.BlobSidecarEvent{ + BlockRoot: hexutil.Encode(v.Blob.BlockRootSlice()), + Index: fmt.Sprintf("%d", v.Blob.Index), + Slot: fmt.Sprintf("%d", v.Blob.Slot()), + VersionedHash: versionedHash.String(), + KzgCommitment: hexutil.Encode(v.Blob.KzgCommitment), + }) + }, nil + case *operation.AttesterSlashingReceivedData: + slashing, ok := v.AttesterSlashing.(*eth.AttesterSlashing) + if !ok { + return nil, errors.Wrapf(errUnhandledEventData, "Unexpected type %T for the .AttesterSlashing field of AttesterSlashingReceivedData", v.AttesterSlashing) + } + return func() io.Reader { + return jsonMarshalReader(eventName, structs.AttesterSlashingFromConsensus(slashing)) + }, nil + case *operation.ProposerSlashingReceivedData: + return func() io.Reader { + return jsonMarshalReader(eventName, structs.ProposerSlashingFromConsensus(v.ProposerSlashing)) + }, nil + case *ethpb.EventFinalizedCheckpoint: + return func() io.Reader { + return jsonMarshalReader(eventName, structs.FinalizedCheckpointEventFromV1(v)) + }, nil + case *ethpbv2.LightClientFinalityUpdateWithVersion: + cv, err := structs.LightClientFinalityUpdateFromConsensus(v.Data) + if err != nil { + return nil, errors.Wrap(err, "LightClientFinalityUpdateWithVersion event conversion failure") + } + ev := &structs.LightClientFinalityUpdateEvent{ + Version: version.String(int(v.Version)), + Data: cv, + } + return func() io.Reader { + return jsonMarshalReader(eventName, ev) + }, nil + case *ethpbv2.LightClientOptimisticUpdateWithVersion: + cv, err := structs.LightClientOptimisticUpdateFromConsensus(v.Data) + if err != nil { + return nil, errors.Wrap(err, "LightClientOptimisticUpdateWithVersion event conversion failure") + } + ev := &structs.LightClientOptimisticUpdateEvent{ + Version: version.String(int(v.Version)), + Data: cv, + } + return func() io.Reader { + return jsonMarshalReader(eventName, ev) + }, nil + case *ethpb.EventChainReorg: + return func() io.Reader { + return jsonMarshalReader(eventName, structs.EventChainReorgFromV1(v)) + }, nil + case *statefeed.BlockProcessedData: + blockRoot, err := v.SignedBlock.Block().HashTreeRoot() + if err != nil { + return nil, errors.Wrap(err, "could not compute block root for BlockProcessedData state feed event") + } + return func() io.Reader { + blk := &structs.BlockEvent{ + Slot: fmt.Sprintf("%d", v.Slot), + Block: hexutil.Encode(blockRoot[:]), + ExecutionOptimistic: v.Optimistic, + } + return jsonMarshalReader(eventName, blk) + }, nil + default: + return nil, errors.Wrapf(errUnhandledEventData, "event data type %T unsupported", v) } - return nil } // This event stream is intended to be used by builders and relays. // Parent fields are based on state at N_{current_slot}, while the rest of fields are based on state of N_{current_slot + 1} -func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWriter, flusher http.Flusher) error { +func (s *Server) currentPayloadAttributes(ctx context.Context) (lazyReader, error) { headRoot, err := s.HeadFetcher.HeadRoot(ctx) if err != nil { - return write(w, flusher, "Could not get head root: "+err.Error()) + return nil, errors.Wrap(err, "could not get head root") } st, err := s.HeadFetcher.HeadState(ctx) if err != nil { - return write(w, flusher, "Could not get head state: "+err.Error()) + return nil, errors.Wrap(err, "could not get head state") } // advance the head state headState, err := transition.ProcessSlotsIfPossible(ctx, st, s.ChainInfoFetcher.CurrentSlot()+1) if err != nil { - return write(w, flusher, "Could not advance head state: "+err.Error()) + return nil, errors.Wrap(err, "could not advance head state") } headBlock, err := s.HeadFetcher.HeadBlock(ctx) if err != nil { - return write(w, flusher, "Could not get head block: "+err.Error()) + return nil, errors.Wrap(err, "could not get head block") } headPayload, err := headBlock.Block().Body().Execution() if err != nil { - return write(w, flusher, "Could not get execution payload: "+err.Error()) + return nil, errors.Wrap(err, "could not get execution payload") } t, err := slots.ToTime(headState.GenesisTime(), headState.Slot()) if err != nil { - return write(w, flusher, "Could not get head state slot time: "+err.Error()) + return nil, errors.Wrap(err, "could not get head state slot time") } - prevRando, err := helpers.RandaoMix(headState, time.CurrentEpoch(headState)) + prevRando, err := helpers.RandaoMix(headState, chaintime.CurrentEpoch(headState)) if err != nil { - return write(w, flusher, "Could not get head state randao mix: "+err.Error()) + return nil, errors.Wrap(err, "could not get head state randao mix") } proposerIndex, err := helpers.BeaconProposerIndex(ctx, headState) if err != nil { - return write(w, flusher, "Could not get head state proposer index: "+err.Error()) + return nil, errors.Wrap(err, "could not get head state proposer index") } feeRecipient := params.BeaconConfig().DefaultFeeRecipient.Bytes() tValidator, exists := s.TrackedValidatorsCache.Validator(proposerIndex) @@ -425,7 +575,7 @@ func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWrite case version.Capella: withdrawals, _, err := headState.ExpectedWithdrawals() if err != nil { - return write(w, flusher, "Could not get head state expected withdrawals: "+err.Error()) + return nil, errors.Wrap(err, "could not get head state expected withdrawals") } attributes = &structs.PayloadAttributesV2{ Timestamp: fmt.Sprintf("%d", t.Unix()), @@ -436,11 +586,11 @@ func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWrite case version.Deneb, version.Electra: withdrawals, _, err := headState.ExpectedWithdrawals() if err != nil { - return write(w, flusher, "Could not get head state expected withdrawals: "+err.Error()) + return nil, errors.Wrap(err, "could not get head state expected withdrawals") } parentRoot, err := headBlock.Block().HashTreeRoot() if err != nil { - return write(w, flusher, "Could not get head block root: "+err.Error()) + return nil, errors.Wrap(err, "could not get head block root") } attributes = &structs.PayloadAttributesV3{ Timestamp: fmt.Sprintf("%d", t.Unix()), @@ -450,12 +600,12 @@ func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWrite ParentBeaconBlockRoot: hexutil.Encode(parentRoot[:]), } default: - return write(w, flusher, "Payload version %s is not supported", version.String(headState.Version())) + return nil, errors.Wrapf(err, "Payload version %s is not supported", version.String(headState.Version())) } attributesBytes, err := json.Marshal(attributes) if err != nil { - return write(w, flusher, err.Error()) + return nil, errors.Wrap(err, "errors marshaling payload attributes to json") } eventData := structs.PayloadAttributesEventData{ ProposerIndex: fmt.Sprintf("%d", proposerIndex), @@ -467,31 +617,12 @@ func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWrite } eventDataBytes, err := json.Marshal(eventData) if err != nil { - return write(w, flusher, err.Error()) + return nil, errors.Wrap(err, "errors marshaling payload attributes event data to json") } - return send(w, flusher, PayloadAttributesTopic, &structs.PayloadAttributesEvent{ - Version: version.String(headState.Version()), - Data: eventDataBytes, - }) -} - -func send(w http.ResponseWriter, flusher http.Flusher, name string, data interface{}) error { - j, err := json.Marshal(data) - if err != nil { - return write(w, flusher, "Could not marshal event to JSON: "+err.Error()) - } - return write(w, flusher, "event: %s\ndata: %s\n\n", name, string(j)) -} - -func sendKeepalive(w http.ResponseWriter, flusher http.Flusher) error { - return write(w, flusher, ":\n\n") -} - -func write(w http.ResponseWriter, flusher http.Flusher, format string, a ...any) error { - _, err := fmt.Fprintf(w, format, a...) - if err != nil { - return errors.Wrap(err, "could not write to response writer") - } - flusher.Flush() - return nil + return func() io.Reader { + return jsonMarshalReader(PayloadAttributesTopic, &structs.PayloadAttributesEvent{ + Version: version.String(headState.Version()), + Data: eventDataBytes, + }) + }, nil } diff --git a/beacon-chain/rpc/eth/events/events_test.go b/beacon-chain/rpc/eth/events/events_test.go index 94b08b176d..420f34fdf9 100644 --- a/beacon-chain/rpc/eth/events/events_test.go +++ b/beacon-chain/rpc/eth/events/events_test.go @@ -1,6 +1,7 @@ package events import ( + "context" "fmt" "io" "math" @@ -23,56 +24,99 @@ import ( "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" ethpb "github.com/prysmaticlabs/prysm/v5/proto/eth/v1" eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" - "github.com/prysmaticlabs/prysm/v5/testing/assert" "github.com/prysmaticlabs/prysm/v5/testing/require" "github.com/prysmaticlabs/prysm/v5/testing/util" + sse "github.com/r3labs/sse/v2" ) -type flushableResponseRecorder struct { - *httptest.ResponseRecorder - flushed bool +func requireAllEventsReceived(t *testing.T, stn, opn *mockChain.EventFeedWrapper, events []*feed.Event, req *topicRequest, s *Server, w *StreamingResponseWriterRecorder) { + // maxBufferSize param copied from sse lib client code + sseR := sse.NewEventStreamReader(w.Body(), 1<<24) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + expected := make(map[string]bool) + for i := range events { + ev := events[i] + // serialize the event the same way the server will so that we can compare expectation to results. + top := topicForEvent(ev) + eb, err := s.lazyReaderForEvent(context.Background(), ev, req) + require.NoError(t, err) + exb, err := io.ReadAll(eb()) + require.NoError(t, err) + exs := string(exb[0 : len(exb)-2]) // remove trailing double newline + + if topicsForOpsFeed[top] { + if err := opn.WaitForSubscription(ctx); err != nil { + t.Fatal(err) + } + // Send the event on the feed. + s.OperationNotifier.OperationFeed().Send(ev) + } else { + if err := stn.WaitForSubscription(ctx); err != nil { + t.Fatal(err) + } + // Send the event on the feed. + s.StateNotifier.StateFeed().Send(ev) + } + expected[exs] = true + } + done := make(chan struct{}) + go func() { + defer close(done) + for { + ev, err := sseR.ReadEvent() + if err == io.EOF { + return + } + require.NoError(t, err) + str := string(ev) + delete(expected, str) + if len(expected) == 0 { + return + } + } + }() + select { + case <-done: + break + case <-ctx.Done(): + t.Fatalf("context canceled / timed out waiting for events, err=%v", ctx.Err()) + } + require.Equal(t, 0, len(expected), "expected events not seen") } -func (f *flushableResponseRecorder) Flush() { - f.flushed = true +func (tr *topicRequest) testHttpRequest(_ *testing.T) *http.Request { + tq := make([]string, 0, len(tr.topics)) + for topic := range tr.topics { + tq = append(tq, "topics="+topic) + } + return httptest.NewRequest(http.MethodGet, fmt.Sprintf("http://example.com/eth/v1/events?%s", strings.Join(tq, "&")), nil) } -func TestStreamEvents_OperationsEvents(t *testing.T) { - t.Run("operations", func(t *testing.T) { - s := &Server{ - StateNotifier: &mockChain.MockStateNotifier{}, - OperationNotifier: &mockChain.MockOperationNotifier{}, - } +func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) { + topics, err := newTopicRequest([]string{ + AttestationTopic, + VoluntaryExitTopic, + SyncCommitteeContributionTopic, + BLSToExecutionChangeTopic, + BlobSidecarTopic, + AttesterSlashingTopic, + ProposerSlashingTopic, + }) + require.NoError(t, err) + ro, err := blocks.NewROBlob(util.HydrateBlobSidecar(ð.BlobSidecar{})) + require.NoError(t, err) + vblob := blocks.NewVerifiedROBlob(ro) - topics := []string{ - AttestationTopic, - VoluntaryExitTopic, - SyncCommitteeContributionTopic, - BLSToExecutionChangeTopic, - BlobSidecarTopic, - AttesterSlashingTopic, - ProposerSlashingTopic, - } - for i, topic := range topics { - topics[i] = "topics=" + topic - } - request := httptest.NewRequest(http.MethodGet, fmt.Sprintf("http://example.com/eth/v1/events?%s", strings.Join(topics, "&")), nil) - w := &flushableResponseRecorder{ - ResponseRecorder: httptest.NewRecorder(), - } - - go func() { - s.StreamEvents(w, request) - }() - // wait for initiation of StreamEvents - time.Sleep(100 * time.Millisecond) - s.OperationNotifier.OperationFeed().Send(&feed.Event{ + return topics, []*feed.Event{ + &feed.Event{ Type: operation.UnaggregatedAttReceived, Data: &operation.UnAggregatedAttReceivedData{ Attestation: util.HydrateAttestation(ð.Attestation{}), }, - }) - s.OperationNotifier.OperationFeed().Send(&feed.Event{ + }, + &feed.Event{ Type: operation.AggregatedAttReceived, Data: &operation.AggregatedAttReceivedData{ Attestation: ð.AggregateAttestationAndProof{ @@ -81,8 +125,8 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { SelectionProof: make([]byte, 96), }, }, - }) - s.OperationNotifier.OperationFeed().Send(&feed.Event{ + }, + &feed.Event{ Type: operation.ExitReceived, Data: &operation.ExitReceivedData{ Exit: ð.SignedVoluntaryExit{ @@ -93,8 +137,8 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { Signature: make([]byte, 96), }, }, - }) - s.OperationNotifier.OperationFeed().Send(&feed.Event{ + }, + &feed.Event{ Type: operation.SyncCommitteeContributionReceived, Data: &operation.SyncCommitteeContributionReceivedData{ Contribution: ð.SignedContributionAndProof{ @@ -112,8 +156,8 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { Signature: make([]byte, 96), }, }, - }) - s.OperationNotifier.OperationFeed().Send(&feed.Event{ + }, + &feed.Event{ Type: operation.BLSToExecutionChangeReceived, Data: &operation.BLSToExecutionChangeReceivedData{ Change: ð.SignedBLSToExecutionChange{ @@ -125,18 +169,14 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { Signature: make([]byte, 96), }, }, - }) - ro, err := blocks.NewROBlob(util.HydrateBlobSidecar(ð.BlobSidecar{})) - require.NoError(t, err) - vblob := blocks.NewVerifiedROBlob(ro) - s.OperationNotifier.OperationFeed().Send(&feed.Event{ + }, + &feed.Event{ Type: operation.BlobSidecarReceived, Data: &operation.BlobSidecarReceivedData{ Blob: &vblob, }, - }) - - s.OperationNotifier.OperationFeed().Send(&feed.Event{ + }, + &feed.Event{ Type: operation.AttesterSlashingReceived, Data: &operation.AttesterSlashingReceivedData{ AttesterSlashing: ð.AttesterSlashing{ @@ -168,9 +208,8 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { }, }, }, - }) - - s.OperationNotifier.OperationFeed().Send(&feed.Event{ + }, + &feed.Event{ Type: operation.ProposerSlashingReceived, Data: &operation.ProposerSlashingReceivedData{ ProposerSlashing: ð.ProposerSlashing{ @@ -192,100 +231,107 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { }, }, }, - }) + }, + } +} - time.Sleep(1 * time.Second) - request.Context().Done() +func TestStreamEvents_OperationsEvents(t *testing.T) { + t.Run("operations", func(t *testing.T) { + stn := mockChain.NewEventFeedWrapper() + opn := mockChain.NewEventFeedWrapper() + s := &Server{ + StateNotifier: &mockChain.SimpleNotifier{Feed: stn}, + OperationNotifier: &mockChain.SimpleNotifier{Feed: opn}, + } - resp := w.Result() - body, err := io.ReadAll(resp.Body) - require.NoError(t, err) - require.NotNil(t, body) - assert.Equal(t, operationsResult, string(body)) + topics, events := operationEventsFixtures(t) + request := topics.testHttpRequest(t) + w := NewStreamingResponseWriterRecorder() + + go func() { + s.StreamEvents(w, request) + }() + + requireAllEventsReceived(t, stn, opn, events, topics, s, w) }) t.Run("state", func(t *testing.T) { + stn := mockChain.NewEventFeedWrapper() + opn := mockChain.NewEventFeedWrapper() s := &Server{ - StateNotifier: &mockChain.MockStateNotifier{}, - OperationNotifier: &mockChain.MockOperationNotifier{}, + StateNotifier: &mockChain.SimpleNotifier{Feed: stn}, + OperationNotifier: &mockChain.SimpleNotifier{Feed: opn}, } - topics := []string{HeadTopic, FinalizedCheckpointTopic, ChainReorgTopic, BlockTopic} - for i, topic := range topics { - topics[i] = "topics=" + topic - } - request := httptest.NewRequest(http.MethodGet, fmt.Sprintf("http://example.com/eth/v1/events?%s", strings.Join(topics, "&")), nil) - w := &flushableResponseRecorder{ - ResponseRecorder: httptest.NewRecorder(), + topics, err := newTopicRequest([]string{ + HeadTopic, + FinalizedCheckpointTopic, + ChainReorgTopic, + BlockTopic, + }) + require.NoError(t, err) + request := topics.testHttpRequest(t) + w := NewStreamingResponseWriterRecorder() + + b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlock(ð.SignedBeaconBlock{})) + require.NoError(t, err) + events := []*feed.Event{ + &feed.Event{ + Type: statefeed.BlockProcessed, + Data: &statefeed.BlockProcessedData{ + Slot: 0, + BlockRoot: [32]byte{}, + SignedBlock: b, + Verified: true, + Optimistic: false, + }, + }, + &feed.Event{ + Type: statefeed.NewHead, + Data: ðpb.EventHead{ + Slot: 0, + Block: make([]byte, 32), + State: make([]byte, 32), + EpochTransition: true, + PreviousDutyDependentRoot: make([]byte, 32), + CurrentDutyDependentRoot: make([]byte, 32), + ExecutionOptimistic: false, + }, + }, + &feed.Event{ + Type: statefeed.Reorg, + Data: ðpb.EventChainReorg{ + Slot: 0, + Depth: 0, + OldHeadBlock: make([]byte, 32), + NewHeadBlock: make([]byte, 32), + OldHeadState: make([]byte, 32), + NewHeadState: make([]byte, 32), + Epoch: 0, + ExecutionOptimistic: false, + }, + }, + &feed.Event{ + Type: statefeed.FinalizedCheckpoint, + Data: ðpb.EventFinalizedCheckpoint{ + Block: make([]byte, 32), + State: make([]byte, 32), + Epoch: 0, + ExecutionOptimistic: false, + }, + }, } go func() { s.StreamEvents(w, request) }() - // wait for initiation of StreamEvents - time.Sleep(100 * time.Millisecond) - s.StateNotifier.StateFeed().Send(&feed.Event{ - Type: statefeed.NewHead, - Data: ðpb.EventHead{ - Slot: 0, - Block: make([]byte, 32), - State: make([]byte, 32), - EpochTransition: true, - PreviousDutyDependentRoot: make([]byte, 32), - CurrentDutyDependentRoot: make([]byte, 32), - ExecutionOptimistic: false, - }, - }) - s.StateNotifier.StateFeed().Send(&feed.Event{ - Type: statefeed.FinalizedCheckpoint, - Data: ðpb.EventFinalizedCheckpoint{ - Block: make([]byte, 32), - State: make([]byte, 32), - Epoch: 0, - ExecutionOptimistic: false, - }, - }) - s.StateNotifier.StateFeed().Send(&feed.Event{ - Type: statefeed.Reorg, - Data: ðpb.EventChainReorg{ - Slot: 0, - Depth: 0, - OldHeadBlock: make([]byte, 32), - NewHeadBlock: make([]byte, 32), - OldHeadState: make([]byte, 32), - NewHeadState: make([]byte, 32), - Epoch: 0, - ExecutionOptimistic: false, - }, - }) - b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlock(ð.SignedBeaconBlock{})) - require.NoError(t, err) - s.StateNotifier.StateFeed().Send(&feed.Event{ - Type: statefeed.BlockProcessed, - Data: &statefeed.BlockProcessedData{ - Slot: 0, - BlockRoot: [32]byte{}, - SignedBlock: b, - Verified: true, - Optimistic: false, - }, - }) - // wait for feed - time.Sleep(1 * time.Second) - request.Context().Done() - - resp := w.Result() - body, err := io.ReadAll(resp.Body) - require.NoError(t, err) - require.NotNil(t, body) - assert.Equal(t, stateResult, string(body)) + requireAllEventsReceived(t, stn, opn, events, topics, s, w) }) t.Run("payload attributes", func(t *testing.T) { type testCase struct { name string getState func() state.BeaconState getBlock func() interfaces.SignedBeaconBlock - expected string SetTrackedValidatorsCache func(*cache.TrackedValidatorsCache) } testCases := []testCase{ @@ -301,7 +347,6 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { require.NoError(t, err) return b }, - expected: payloadAttributesBellatrixResult, }, { name: "capella", @@ -315,7 +360,6 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { require.NoError(t, err) return b }, - expected: payloadAttributesCapellaResult, }, { name: "deneb", @@ -329,7 +373,6 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { require.NoError(t, err) return b }, - expected: payloadAttributesDenebResult, }, { name: "electra", @@ -343,7 +386,6 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { require.NoError(t, err) return b }, - expected: payloadAttributesElectraResultWithTVC, SetTrackedValidatorsCache: func(c *cache.TrackedValidatorsCache) { c.Set(cache.TrackedValidator{ Active: true, @@ -368,9 +410,11 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { Slot: ¤tSlot, } + stn := mockChain.NewEventFeedWrapper() + opn := mockChain.NewEventFeedWrapper() s := &Server{ - StateNotifier: &mockChain.MockStateNotifier{}, - OperationNotifier: &mockChain.MockOperationNotifier{}, + StateNotifier: &mockChain.SimpleNotifier{Feed: stn}, + OperationNotifier: &mockChain.SimpleNotifier{Feed: opn}, HeadFetcher: mockChainService, ChainInfoFetcher: mockChainService, TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), @@ -378,100 +422,76 @@ func TestStreamEvents_OperationsEvents(t *testing.T) { if tc.SetTrackedValidatorsCache != nil { tc.SetTrackedValidatorsCache(s.TrackedValidatorsCache) } - - request := httptest.NewRequest(http.MethodGet, fmt.Sprintf("http://example.com/eth/v1/events?topics=%s", PayloadAttributesTopic), nil) - w := &flushableResponseRecorder{ - ResponseRecorder: httptest.NewRecorder(), - } + topics, err := newTopicRequest([]string{PayloadAttributesTopic}) + require.NoError(t, err) + request := topics.testHttpRequest(t) + w := NewStreamingResponseWriterRecorder() + events := []*feed.Event{&feed.Event{Type: statefeed.MissedSlot}} go func() { s.StreamEvents(w, request) }() - // wait for initiation of StreamEvents - time.Sleep(100 * time.Millisecond) - s.StateNotifier.StateFeed().Send(&feed.Event{Type: statefeed.MissedSlot}) - - // wait for feed - time.Sleep(1 * time.Second) - request.Context().Done() - - resp := w.Result() - body, err := io.ReadAll(resp.Body) - require.NoError(t, err) - require.NotNil(t, body) - assert.Equal(t, tc.expected, string(body), "wrong result for "+tc.name) + requireAllEventsReceived(t, stn, opn, events, topics, s, w) } }) } -const operationsResult = `: +func TestStuckReader(t *testing.T) { + topics, events := operationEventsFixtures(t) + require.Equal(t, 8, len(events)) + // set eventFeedDepth to a number lower than the events we intend to send to force the server to drop the reader. + stn := mockChain.NewEventFeedWrapper() + opn := mockChain.NewEventFeedWrapper() + s := &Server{ + StateNotifier: &mockChain.SimpleNotifier{Feed: stn}, + OperationNotifier: &mockChain.SimpleNotifier{Feed: opn}, + EventFeedDepth: len(events) - 1, + } -event: attestation -data: {"aggregation_bits":"0x00","data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"} + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + eventsWritten := make(chan struct{}) + go func() { + for i := range events { + ev := events[i] + top := topicForEvent(ev) + if topicsForOpsFeed[top] { + err := opn.WaitForSubscription(ctx) + require.NoError(t, err) + s.OperationNotifier.OperationFeed().Send(ev) + } else { + err := stn.WaitForSubscription(ctx) + require.NoError(t, err) + s.StateNotifier.StateFeed().Send(ev) + } + } + close(eventsWritten) + }() -event: attestation -data: {"aggregation_bits":"0x00","data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"} + request := topics.testHttpRequest(t) + w := NewStreamingResponseWriterRecorder() -event: voluntary_exit -data: {"message":{"epoch":"0","validator_index":"0"},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"} + handlerFinished := make(chan struct{}) + go func() { + s.StreamEvents(w, request) + close(handlerFinished) + }() -event: contribution_and_proof -data: {"message":{"aggregator_index":"0","contribution":{"slot":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","subcommittee_index":"0","aggregation_bits":"0x00000000000000000000000000000000","signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"},"selection_proof":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"} + // Make sure that the stream writer shut down when the reader failed to clear the write buffer. + select { + case <-handlerFinished: + // We expect the stream handler to max out the queue buffer and exit gracefully. + return + case <-ctx.Done(): + t.Fatalf("context canceled / timed out waiting for handler completion, err=%v", ctx.Err()) + } -event: bls_to_execution_change -data: {"message":{"validator_index":"0","from_bls_pubkey":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","to_execution_address":"0x0000000000000000000000000000000000000000"},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"} - -event: blob_sidecar -data: {"block_root":"0xc78009fdf07fc56a11f122370658a353aaa542ed63e44c4bc15ff4cd105ab33c","index":"0","slot":"0","kzg_commitment":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","versioned_hash":"0x01b0761f87b081d5cf10757ccc89f12be355c70e2e29df288b65b30710dcbcd1"} - -event: attester_slashing -data: {"attestation_1":{"attesting_indices":["0","1"],"data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"},"attestation_2":{"attesting_indices":["0","1"],"data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"}} - -event: proposer_slashing -data: {"signed_header_1":{"message":{"slot":"0","proposer_index":"0","parent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","body_root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"},"signed_header_2":{"message":{"slot":"0","proposer_index":"0","parent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","body_root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"}} - -` - -const stateResult = `: - -event: head -data: {"slot":"0","block":"0x0000000000000000000000000000000000000000000000000000000000000000","state":"0x0000000000000000000000000000000000000000000000000000000000000000","epoch_transition":true,"execution_optimistic":false,"previous_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","current_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000"} - -event: finalized_checkpoint -data: {"block":"0x0000000000000000000000000000000000000000000000000000000000000000","state":"0x0000000000000000000000000000000000000000000000000000000000000000","epoch":"0","execution_optimistic":false} - -event: chain_reorg -data: {"slot":"0","depth":"0","old_head_block":"0x0000000000000000000000000000000000000000000000000000000000000000","old_head_state":"0x0000000000000000000000000000000000000000000000000000000000000000","new_head_block":"0x0000000000000000000000000000000000000000000000000000000000000000","new_head_state":"0x0000000000000000000000000000000000000000000000000000000000000000","epoch":"0","execution_optimistic":false} - -event: block -data: {"slot":"0","block":"0xeade62f0457b2fdf48e7d3fc4b60736688286be7c7a3ac4c9a16a5e0600bd9e4","execution_optimistic":false} - -` - -const payloadAttributesBellatrixResult = `: - -event: payload_attributes -data: {"version":"bellatrix","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000"}}} - -` - -const payloadAttributesCapellaResult = `: - -event: payload_attributes -data: {"version":"capella","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[]}}} - -` - -const payloadAttributesDenebResult = `: - -event: payload_attributes -data: {"version":"deneb","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[],"parent_beacon_block_root":"0xbef96cb938fd48b2403d3e662664325abb0102ed12737cbb80d717520e50cf4a"}}} - -` - -const payloadAttributesElectraResultWithTVC = `: - -event: payload_attributes -data: {"version":"electra","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0xd2dbd02e4efe087d7d195de828b9dd25f19a89c9","withdrawals":[],"parent_beacon_block_root":"0xf2110e448638f41cb34514ecdbb49c055536cd5f715f1cb259d1287bb900853e"}}} - -` + // Also make sure all the events were written. + select { + case <-eventsWritten: + // We expect the stream handler to max out the queue buffer and exit gracefully. + return + case <-ctx.Done(): + t.Fatalf("context canceled / timed out waiting to write all events, err=%v", ctx.Err()) + } +} diff --git a/beacon-chain/rpc/eth/events/http_test.go b/beacon-chain/rpc/eth/events/http_test.go new file mode 100644 index 0000000000..1bfaaa873d --- /dev/null +++ b/beacon-chain/rpc/eth/events/http_test.go @@ -0,0 +1,75 @@ +package events + +import ( + "io" + "net/http" + "net/http/httptest" + "testing" + + "github.com/prysmaticlabs/prysm/v5/testing/require" +) + +type StreamingResponseWriterRecorder struct { + http.ResponseWriter + r io.Reader + w io.Writer + statusWritten *int + status chan int + bodyRecording []byte + flushed bool +} + +func (w *StreamingResponseWriterRecorder) StatusChan() chan int { + return w.status +} + +func NewStreamingResponseWriterRecorder() *StreamingResponseWriterRecorder { + r, w := io.Pipe() + return &StreamingResponseWriterRecorder{ + ResponseWriter: httptest.NewRecorder(), + r: r, + w: w, + status: make(chan int, 1), + } +} + +// Write implements http.ResponseWriter. +func (w *StreamingResponseWriterRecorder) Write(data []byte) (int, error) { + w.WriteHeader(http.StatusOK) + n, err := w.w.Write(data) + if err != nil { + return n, err + } + return w.ResponseWriter.Write(data) +} + +// WriteHeader implements http.ResponseWriter. +func (w *StreamingResponseWriterRecorder) WriteHeader(statusCode int) { + if w.statusWritten != nil { + return + } + w.statusWritten = &statusCode + w.status <- statusCode + w.ResponseWriter.WriteHeader(statusCode) +} + +func (w *StreamingResponseWriterRecorder) Body() io.Reader { + return w.r +} + +func (w *StreamingResponseWriterRecorder) RequireStatus(t *testing.T, status int) { + if w.statusWritten == nil { + t.Fatal("WriteHeader was not called") + } + require.Equal(t, status, *w.statusWritten) +} + +func (w *StreamingResponseWriterRecorder) Flush() { + fw, ok := w.ResponseWriter.(http.Flusher) + if ok { + fw.Flush() + } + w.flushed = true +} + +var _ http.ResponseWriter = &StreamingResponseWriterRecorder{} diff --git a/beacon-chain/rpc/eth/events/server.go b/beacon-chain/rpc/eth/events/server.go index 58cd671e9c..26e83454e5 100644 --- a/beacon-chain/rpc/eth/events/server.go +++ b/beacon-chain/rpc/eth/events/server.go @@ -4,6 +4,8 @@ package events import ( + "time" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache" opfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation" @@ -18,4 +20,6 @@ type Server struct { HeadFetcher blockchain.HeadFetcher ChainInfoFetcher blockchain.ChainInfoFetcher TrackedValidatorsCache *cache.TrackedValidatorsCache + KeepAliveInterval time.Duration + EventFeedDepth int } diff --git a/deps.bzl b/deps.bzl index 95addcea05..0d971d392c 100644 --- a/deps.bzl +++ b/deps.bzl @@ -2833,6 +2833,12 @@ def prysm_deps(): sum = "h1:HxSrwun11U+LlmwpgM1kEqIqH90IT4N8auv/cD7QFJg=", version = "v0.8.0", ) + go_repository( + name = "com_github_r3labs_sse_v2", + importpath = "github.com/r3labs/sse/v2", + sum = "h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=", + version = "v2.10.0", + ) go_repository( name = "com_github_raulk_go_watchdog", importpath = "github.com/raulk/go-watchdog", @@ -4304,6 +4310,12 @@ def prysm_deps(): sum = "h1:stTHdEoWg1pQ8riaP5ROrjS6zy6wewH/Q2iwnLCQUXY=", version = "v1.0.0-20160220154919-db14e161995a", ) + go_repository( + name = "in_gopkg_cenkalti_backoff_v1", + importpath = "gopkg.in/cenkalti/backoff.v1", + sum = "h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=", + version = "v1.1.0", + ) go_repository( name = "in_gopkg_check_v1", importpath = "gopkg.in/check.v1", diff --git a/go.mod b/go.mod index c433d99846..138f1084a5 100644 --- a/go.mod +++ b/go.mod @@ -66,6 +66,7 @@ require ( github.com/prysmaticlabs/go-bitfield v0.0.0-20240328144219-a1caa50c3a1e github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20230228205207-28762a7b9294 + github.com/r3labs/sse/v2 v2.10.0 github.com/rs/cors v1.7.0 github.com/schollz/progressbar/v3 v3.3.4 github.com/sirupsen/logrus v1.9.0 @@ -259,6 +260,7 @@ require ( golang.org/x/term v0.23.0 // indirect golang.org/x/text v0.17.0 // indirect golang.org/x/time v0.5.0 // indirect + gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect diff --git a/go.sum b/go.sum index bf21aa4e08..b53de69d27 100644 --- a/go.sum +++ b/go.sum @@ -914,6 +914,8 @@ github.com/quic-go/quic-go v0.46.0 h1:uuwLClEEyk1DNvchH8uCByQVjo3yKL9opKulExNDs7 github.com/quic-go/quic-go v0.46.0/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI= github.com/quic-go/webtransport-go v0.8.0 h1:HxSrwun11U+LlmwpgM1kEqIqH90IT4N8auv/cD7QFJg= github.com/quic-go/webtransport-go v0.8.0/go.mod h1:N99tjprW432Ut5ONql/aUhSLT0YVSlwHohQsuac9WaM= +github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0= +github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I= github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk= github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -1223,6 +1225,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -1607,6 +1610,8 @@ google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6h google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a/go.mod h1:KF9sEfUPAXdG8Oev9e99iLGnl2uJMjc5B+4y3O7x610= +gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= +gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=