SSE implementation that sheds stuck clients (#14413)

* sse implementation that sheds stuck clients

* Radek and James feedback

* Refactor event streamer code for readability

* less-flaky test signaling

* test case where queue fills; fixes

* add changelog entry

* james and preston feedback

* swap our Subscription interface with an alias

* event.Data can be nil for the payload attr event

* deepsource

---------

Co-authored-by: Kasey Kirkham <kasey@users.noreply.github.com>
This commit is contained in:
kasey
2024-10-04 16:18:17 -05:00
committed by GitHub
parent f498463843
commit c11e3392d4
21 changed files with 908 additions and 565 deletions

View File

@@ -19,6 +19,7 @@ The format is based on Keep a Changelog, and this project adheres to Semantic Ve
- Tests to ensure sepolia config matches the official upstream yaml - Tests to ensure sepolia config matches the official upstream yaml
- HTTP endpoint for PublishBlobs - HTTP endpoint for PublishBlobs
- GetBlockV2, GetBlindedBlock, ProduceBlockV2, ProduceBlockV3: add Electra case. - GetBlockV2, GetBlindedBlock, ProduceBlockV2, ProduceBlockV3: add Electra case.
- SSE implementation that sheds stuck clients. [pr](https://github.com/prysmaticlabs/prysm/pull/14413)
### Changed ### Changed

View File

@@ -1,5 +1,7 @@
package api package api
import "net/http"
const ( const (
VersionHeader = "Eth-Consensus-Version" VersionHeader = "Eth-Consensus-Version"
ExecutionPayloadBlindedHeader = "Eth-Execution-Payload-Blinded" ExecutionPayloadBlindedHeader = "Eth-Execution-Payload-Blinded"
@@ -10,3 +12,9 @@ const (
EventStreamMediaType = "text/event-stream" EventStreamMediaType = "text/event-stream"
KeepAlive = "keep-alive" KeepAlive = "keep-alive"
) )
// SetSSEHeaders sets the headers needed for a server-sent event response.
func SetSSEHeaders(w http.ResponseWriter) {
w.Header().Set("Content-Type", EventStreamMediaType)
w.Header().Set("Connection", KeepAlive)
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
"github.com/prysmaticlabs/prysm/v5/math" "github.com/prysmaticlabs/prysm/v5/math"
enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1" enginev1 "github.com/prysmaticlabs/prysm/v5/proto/engine/v1"
ethv1 "github.com/prysmaticlabs/prysm/v5/proto/eth/v1"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
) )
@@ -1508,3 +1509,37 @@ func PendingConsolidationsFromConsensus(cs []*eth.PendingConsolidation) []*Pendi
} }
return consolidations return consolidations
} }
func HeadEventFromV1(event *ethv1.EventHead) *HeadEvent {
return &HeadEvent{
Slot: fmt.Sprintf("%d", event.Slot),
Block: hexutil.Encode(event.Block),
State: hexutil.Encode(event.State),
EpochTransition: event.EpochTransition,
ExecutionOptimistic: event.ExecutionOptimistic,
PreviousDutyDependentRoot: hexutil.Encode(event.PreviousDutyDependentRoot),
CurrentDutyDependentRoot: hexutil.Encode(event.CurrentDutyDependentRoot),
}
}
func FinalizedCheckpointEventFromV1(event *ethv1.EventFinalizedCheckpoint) *FinalizedCheckpointEvent {
return &FinalizedCheckpointEvent{
Block: hexutil.Encode(event.Block),
State: hexutil.Encode(event.State),
Epoch: fmt.Sprintf("%d", event.Epoch),
ExecutionOptimistic: event.ExecutionOptimistic,
}
}
func EventChainReorgFromV1(event *ethv1.EventChainReorg) *ChainReorgEvent {
return &ChainReorgEvent{
Slot: fmt.Sprintf("%d", event.Slot),
Depth: fmt.Sprintf("%d", event.Depth),
OldHeadBlock: hexutil.Encode(event.OldHeadBlock),
NewHeadBlock: hexutil.Encode(event.NewHeadBlock),
OldHeadState: hexutil.Encode(event.OldHeadState),
NewHeadState: hexutil.Encode(event.NewHeadState),
Epoch: fmt.Sprintf("%d", event.Epoch),
ExecutionOptimistic: event.ExecutionOptimistic,
}
}

View File

@@ -4,6 +4,7 @@ go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"feed.go", "feed.go",
"interface.go",
"subscription.go", "subscription.go",
], ],
importpath = "github.com/prysmaticlabs/prysm/v5/async/event", importpath = "github.com/prysmaticlabs/prysm/v5/async/event",

View File

@@ -22,3 +22,4 @@ import (
// Feed is a re-export of the go-ethereum event feed. // Feed is a re-export of the go-ethereum event feed.
type Feed = geth_event.Feed type Feed = geth_event.Feed
type Subscription = geth_event.Subscription

8
async/event/interface.go Normal file
View File

@@ -0,0 +1,8 @@
package event
// SubscriberSender is an abstract representation of an *event.Feed
// to use in describing types that accept or return an *event.Feed.
type SubscriberSender interface {
Subscribe(channel interface{}) Subscription
Send(value interface{}) (nsent int)
}

View File

@@ -28,25 +28,6 @@ import (
// request backoff time. // request backoff time.
const waitQuotient = 10 const waitQuotient = 10
// Subscription represents a stream of events. The carrier of the events is typically a
// channel, but isn't part of the interface.
//
// Subscriptions can fail while established. Failures are reported through an error
// channel. It receives a value if there is an issue with the subscription (e.g. the
// network connection delivering the events has been closed). Only one value will ever be
// sent.
//
// The error channel is closed when the subscription ends successfully (i.e. when the
// source of events is closed). It is also closed when Unsubscribe is called.
//
// The Unsubscribe method cancels the sending of events. You must call Unsubscribe in all
// cases to ensure that resources related to the subscription are released. It can be
// called any number of times.
type Subscription interface {
Err() <-chan error // returns the error channel
Unsubscribe() // cancels sending of events, closing the error channel
}
// NewSubscription runs a producer function as a subscription in a new goroutine. The // NewSubscription runs a producer function as a subscription in a new goroutine. The
// channel given to the producer is closed when Unsubscribe is called. If fn returns an // channel given to the producer is closed when Unsubscribe is called. If fn returns an
// error, it is sent on the subscription's error channel. // error, it is sent on the subscription's error channel.

View File

@@ -32,7 +32,7 @@ type mockBeaconNode struct {
} }
// StateFeed mocks the same method in the beacon node. // StateFeed mocks the same method in the beacon node.
func (mbn *mockBeaconNode) StateFeed() *event.Feed { func (mbn *mockBeaconNode) StateFeed() event.SubscriberSender {
mbn.mu.Lock() mbn.mu.Lock()
defer mbn.mu.Unlock() defer mbn.mu.Unlock()
if mbn.stateFeed == nil { if mbn.stateFeed == nil {

View File

@@ -98,6 +98,44 @@ func (s *ChainService) BlockNotifier() blockfeed.Notifier {
return s.blockNotifier return s.blockNotifier
} }
type EventFeedWrapper struct {
feed *event.Feed
subscribed chan struct{} // this channel is closed once a subscription is made
}
func (w *EventFeedWrapper) Subscribe(channel interface{}) event.Subscription {
select {
case <-w.subscribed:
break // already closed
default:
close(w.subscribed)
}
return w.feed.Subscribe(channel)
}
func (w *EventFeedWrapper) Send(value interface{}) int {
return w.feed.Send(value)
}
// WaitForSubscription allows test to wait for the feed to have a subscription before beginning to send events.
func (w *EventFeedWrapper) WaitForSubscription(ctx context.Context) error {
select {
case <-w.subscribed:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
var _ event.SubscriberSender = &EventFeedWrapper{}
func NewEventFeedWrapper() *EventFeedWrapper {
return &EventFeedWrapper{
feed: new(event.Feed),
subscribed: make(chan struct{}),
}
}
// MockBlockNotifier mocks the block notifier. // MockBlockNotifier mocks the block notifier.
type MockBlockNotifier struct { type MockBlockNotifier struct {
feed *event.Feed feed *event.Feed
@@ -131,7 +169,7 @@ func (msn *MockStateNotifier) ReceivedEvents() []*feed.Event {
} }
// StateFeed returns a state feed. // StateFeed returns a state feed.
func (msn *MockStateNotifier) StateFeed() *event.Feed { func (msn *MockStateNotifier) StateFeed() event.SubscriberSender {
msn.feedLock.Lock() msn.feedLock.Lock()
defer msn.feedLock.Unlock() defer msn.feedLock.Unlock()
@@ -159,6 +197,23 @@ func (msn *MockStateNotifier) StateFeed() *event.Feed {
return msn.feed return msn.feed
} }
// NewSimpleStateNotifier makes a state feed without the custom mock feed machinery.
func NewSimpleStateNotifier() *MockStateNotifier {
return &MockStateNotifier{feed: new(event.Feed)}
}
type SimpleNotifier struct {
Feed event.SubscriberSender
}
func (n *SimpleNotifier) StateFeed() event.SubscriberSender {
return n.Feed
}
func (n *SimpleNotifier) OperationFeed() event.SubscriberSender {
return n.Feed
}
// OperationNotifier mocks the same method in the chain service. // OperationNotifier mocks the same method in the chain service.
func (s *ChainService) OperationNotifier() opfeed.Notifier { func (s *ChainService) OperationNotifier() opfeed.Notifier {
if s.opNotifier == nil { if s.opNotifier == nil {
@@ -173,7 +228,7 @@ type MockOperationNotifier struct {
} }
// OperationFeed returns an operation feed. // OperationFeed returns an operation feed.
func (mon *MockOperationNotifier) OperationFeed() *event.Feed { func (mon *MockOperationNotifier) OperationFeed() event.SubscriberSender {
if mon.feed == nil { if mon.feed == nil {
mon.feed = new(event.Feed) mon.feed = new(event.Feed)
} }

View File

@@ -4,5 +4,5 @@ import "github.com/prysmaticlabs/prysm/v5/async/event"
// Notifier interface defines the methods of the service that provides beacon block operation updates to consumers. // Notifier interface defines the methods of the service that provides beacon block operation updates to consumers.
type Notifier interface { type Notifier interface {
OperationFeed() *event.Feed OperationFeed() event.SubscriberSender
} }

View File

@@ -4,5 +4,5 @@ import "github.com/prysmaticlabs/prysm/v5/async/event"
// Notifier interface defines the methods of the service that provides state updates to consumers. // Notifier interface defines the methods of the service that provides state updates to consumers.
type Notifier interface { type Notifier interface {
StateFeed() *event.Feed StateFeed() event.SubscriberSender
} }

View File

@@ -73,7 +73,7 @@ type goodNotifier struct {
MockStateFeed *event.Feed MockStateFeed *event.Feed
} }
func (g *goodNotifier) StateFeed() *event.Feed { func (g *goodNotifier) StateFeed() event.SubscriberSender {
if g.MockStateFeed == nil { if g.MockStateFeed == nil {
g.MockStateFeed = new(event.Feed) g.MockStateFeed = new(event.Feed)
} }

View File

@@ -398,7 +398,7 @@ func initSyncWaiter(ctx context.Context, complete chan struct{}) func() error {
} }
// StateFeed implements statefeed.Notifier. // StateFeed implements statefeed.Notifier.
func (b *BeaconNode) StateFeed() *event.Feed { func (b *BeaconNode) StateFeed() event.SubscriberSender {
return b.stateFeed return b.stateFeed
} }
@@ -408,7 +408,7 @@ func (b *BeaconNode) BlockFeed() *event.Feed {
} }
// OperationFeed implements opfeed.Notifier. // OperationFeed implements opfeed.Notifier.
func (b *BeaconNode) OperationFeed() *event.Feed { func (b *BeaconNode) OperationFeed() event.SubscriberSender {
return b.opFeed return b.opFeed
} }

View File

@@ -29,12 +29,16 @@ go_library(
"//time/slots:go_default_library", "//time/slots:go_default_library",
"@com_github_ethereum_go_ethereum//common/hexutil:go_default_library", "@com_github_ethereum_go_ethereum//common/hexutil:go_default_library",
"@com_github_pkg_errors//:go_default_library", "@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
], ],
) )
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = ["events_test.go"], srcs = [
"events_test.go",
"http_test.go",
],
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//beacon-chain/blockchain/testing:go_default_library", "//beacon-chain/blockchain/testing:go_default_library",
@@ -49,9 +53,9 @@ go_test(
"//consensus-types/primitives:go_default_library", "//consensus-types/primitives:go_default_library",
"//proto/eth/v1:go_default_library", "//proto/eth/v1:go_default_library",
"//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1:go_default_library",
"//testing/assert:go_default_library",
"//testing/require:go_default_library", "//testing/require:go_default_library",
"//testing/util:go_default_library", "//testing/util:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library", "@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_r3labs_sse_v2//:go_default_library",
], ],
) )

View File

@@ -1,11 +1,13 @@
package events package events
import ( import (
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"net/http" "net/http"
time2 "time" "time"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/pkg/errors" "github.com/pkg/errors"
@@ -16,7 +18,7 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time" chaintime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/transition"
"github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" "github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
@@ -26,9 +28,13 @@ import (
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/runtime/version" "github.com/prysmaticlabs/prysm/v5/runtime/version"
"github.com/prysmaticlabs/prysm/v5/time/slots" "github.com/prysmaticlabs/prysm/v5/time/slots"
log "github.com/sirupsen/logrus"
) )
const DefaultEventFeedDepth = 1000
const ( const (
InvalidTopic = "__invalid__"
// HeadTopic represents a new chain head event topic. // HeadTopic represents a new chain head event topic.
HeadTopic = "head" HeadTopic = "head"
// BlockTopic represents a new produced block event topic. // BlockTopic represents a new produced block event topic.
@@ -59,25 +65,83 @@ const (
LightClientOptimisticUpdateTopic = "light_client_optimistic_update" LightClientOptimisticUpdateTopic = "light_client_optimistic_update"
) )
const topicDataMismatch = "Event data type %T does not correspond to event topic %s" var (
errInvalidTopicName = errors.New("invalid topic name")
errNoValidTopics = errors.New("no valid topics specified")
errSlowReader = errors.New("client failed to read fast enough to keep outgoing buffer below threshold")
errNotRequested = errors.New("event not requested by client")
errUnhandledEventData = errors.New("unable to represent event data in the event stream")
)
const chanBuffer = 1000 // StreamingResponseWriter defines a type that can be used by the eventStreamer.
// This must be an http.ResponseWriter that supports flushing and hijacking.
type StreamingResponseWriter interface {
http.ResponseWriter
http.Flusher
}
var casesHandled = map[string]bool{ // The eventStreamer uses lazyReaders to defer serialization until the moment the value is ready to be written to the client.
HeadTopic: true, type lazyReader func() io.Reader
BlockTopic: true,
AttestationTopic: true, var opsFeedEventTopics = map[feed.EventType]string{
VoluntaryExitTopic: true, operation.AggregatedAttReceived: AttestationTopic,
FinalizedCheckpointTopic: true, operation.UnaggregatedAttReceived: AttestationTopic,
ChainReorgTopic: true, operation.ExitReceived: VoluntaryExitTopic,
SyncCommitteeContributionTopic: true, operation.SyncCommitteeContributionReceived: SyncCommitteeContributionTopic,
BLSToExecutionChangeTopic: true, operation.BLSToExecutionChangeReceived: BLSToExecutionChangeTopic,
PayloadAttributesTopic: true, operation.BlobSidecarReceived: BlobSidecarTopic,
BlobSidecarTopic: true, operation.AttesterSlashingReceived: AttesterSlashingTopic,
ProposerSlashingTopic: true, operation.ProposerSlashingReceived: ProposerSlashingTopic,
AttesterSlashingTopic: true, }
LightClientFinalityUpdateTopic: true,
LightClientOptimisticUpdateTopic: true, var stateFeedEventTopics = map[feed.EventType]string{
statefeed.NewHead: HeadTopic,
statefeed.MissedSlot: PayloadAttributesTopic,
statefeed.FinalizedCheckpoint: FinalizedCheckpointTopic,
statefeed.LightClientFinalityUpdate: LightClientFinalityUpdateTopic,
statefeed.LightClientOptimisticUpdate: LightClientOptimisticUpdateTopic,
statefeed.Reorg: ChainReorgTopic,
statefeed.BlockProcessed: BlockTopic,
}
var topicsForStateFeed = topicsForFeed(stateFeedEventTopics)
var topicsForOpsFeed = topicsForFeed(opsFeedEventTopics)
func topicsForFeed(em map[feed.EventType]string) map[string]bool {
topics := make(map[string]bool, len(em))
for _, topic := range em {
topics[topic] = true
}
return topics
}
type topicRequest struct {
topics map[string]bool
needStateFeed bool
needOpsFeed bool
}
func (req *topicRequest) requested(topic string) bool {
return req.topics[topic]
}
func newTopicRequest(topics []string) (*topicRequest, error) {
req := &topicRequest{topics: make(map[string]bool)}
for _, name := range topics {
if topicsForStateFeed[name] {
req.needStateFeed = true
} else if topicsForOpsFeed[name] {
req.needOpsFeed = true
} else {
return nil, errors.Wrapf(errInvalidTopicName, name)
}
req.topics[name] = true
}
if len(req.topics) == 0 || (!req.needStateFeed && !req.needOpsFeed) {
return nil, errNoValidTopics
}
return req, nil
} }
// StreamEvents provides an endpoint to subscribe to the beacon node Server-Sent-Events stream. // StreamEvents provides an endpoint to subscribe to the beacon node Server-Sent-Events stream.
@@ -88,326 +152,412 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "events.StreamEvents") ctx, span := trace.StartSpan(r.Context(), "events.StreamEvents")
defer span.End() defer span.End()
flusher, ok := w.(http.Flusher) topics, err := newTopicRequest(r.URL.Query()["topics"])
if err != nil {
httputil.HandleError(w, err.Error(), http.StatusBadRequest)
return
}
sw, ok := w.(StreamingResponseWriter)
if !ok { if !ok {
httputil.HandleError(w, "Streaming unsupported!", http.StatusInternalServerError) msg := "beacon node misconfiguration: http stack may not support required response handling features, like flushing"
httputil.HandleError(w, msg, http.StatusInternalServerError)
return return
} }
depth := s.EventFeedDepth
topics := r.URL.Query()["topics"] if depth == 0 {
if len(topics) == 0 { depth = DefaultEventFeedDepth
httputil.HandleError(w, "No topics specified to subscribe to", http.StatusBadRequest)
return
} }
topicsMap := make(map[string]bool) es, err := newEventStreamer(depth, s.KeepAliveInterval)
for _, topic := range topics { if err != nil {
if _, ok := casesHandled[topic]; !ok {
httputil.HandleError(w, fmt.Sprintf("Invalid topic: %s", topic), http.StatusBadRequest)
return
}
topicsMap[topic] = true
}
// Subscribe to event feeds from information received in the beacon node runtime.
opsChan := make(chan *feed.Event, chanBuffer)
opsSub := s.OperationNotifier.OperationFeed().Subscribe(opsChan)
stateChan := make(chan *feed.Event, chanBuffer)
stateSub := s.StateNotifier.StateFeed().Subscribe(stateChan)
defer opsSub.Unsubscribe()
defer stateSub.Unsubscribe()
// Set up SSE response headers
w.Header().Set("Content-Type", api.EventStreamMediaType)
w.Header().Set("Connection", api.KeepAlive)
// Handle each event received and context cancellation.
// We send a keepalive dummy message immediately to prevent clients
// stalling while waiting for the first response chunk.
// After that we send a keepalive dummy message every SECONDS_PER_SLOT
// to prevent anyone (e.g. proxy servers) from closing connections.
if err := sendKeepalive(w, flusher); err != nil {
httputil.HandleError(w, err.Error(), http.StatusInternalServerError) httputil.HandleError(w, err.Error(), http.StatusInternalServerError)
return return
} }
keepaliveTicker := time2.NewTicker(time2.Duration(params.BeaconConfig().SecondsPerSlot) * time2.Second)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
api.SetSSEHeaders(sw)
go es.outboxWriteLoop(ctx, cancel, sw)
if err := es.recvEventLoop(ctx, cancel, topics, s); err != nil {
log.WithError(err).Debug("Shutting down StreamEvents handler.")
}
}
func newEventStreamer(buffSize int, ka time.Duration) (*eventStreamer, error) {
if ka == 0 {
ka = time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
}
return &eventStreamer{
outbox: make(chan lazyReader, buffSize),
keepAlive: ka,
}, nil
}
type eventStreamer struct {
outbox chan lazyReader
keepAlive time.Duration
}
func (es *eventStreamer) recvEventLoop(ctx context.Context, cancel context.CancelFunc, req *topicRequest, s *Server) error {
eventsChan := make(chan *feed.Event, len(es.outbox))
if req.needOpsFeed {
opsSub := s.OperationNotifier.OperationFeed().Subscribe(eventsChan)
defer opsSub.Unsubscribe()
}
if req.needStateFeed {
stateSub := s.StateNotifier.StateFeed().Subscribe(eventsChan)
defer stateSub.Unsubscribe()
}
for { for {
select { select {
case event := <-opsChan:
if err := handleBlockOperationEvents(w, flusher, topicsMap, event); err != nil {
httputil.HandleError(w, err.Error(), http.StatusInternalServerError)
return
}
case event := <-stateChan:
if err := s.handleStateEvents(ctx, w, flusher, topicsMap, event); err != nil {
httputil.HandleError(w, err.Error(), http.StatusInternalServerError)
return
}
case <-keepaliveTicker.C:
if err := sendKeepalive(w, flusher); err != nil {
httputil.HandleError(w, err.Error(), http.StatusInternalServerError)
return
}
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err()
case event := <-eventsChan:
lr, err := s.lazyReaderForEvent(ctx, event, req)
if err != nil {
if !errors.Is(err, errNotRequested) {
log.WithField("event_type", fmt.Sprintf("%v", event.Data)).WithError(err).Error("StreamEvents API endpoint received an event it was unable to handle.")
}
continue
}
// If the client can't keep up, the outbox will eventually completely fill, at which
// safeWrite will error, and we'll hit the below return statement, at which point the deferred
// Unsuscribe calls will be made and the event feed will stop writing to this channel.
// Since the outbox and event stream channels are separately buffered, the event subscription
// channel should stay relatively empty, which gives this loop time to unsubscribe
// and cleanup before the event stream channel fills and disrupts other readers.
if err := es.safeWrite(ctx, lr); err != nil {
cancel()
// note: we could hijack the connection and close it here. Does that cause issues? What are the benefits?
// A benefit of hijack and close is that it may force an error on the remote end, however just closing the context of the
// http handler may be sufficient to cause the remote http response reader to close.
if errors.Is(err, errSlowReader) {
log.WithError(err).Warn("Client is unable to keep up with event stream, shutting down.")
}
return err
}
}
}
}
func (es *eventStreamer) safeWrite(ctx context.Context, rf func() io.Reader) error {
if rf == nil {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case es.outbox <- rf:
return nil
default:
// If this is the case, the select case to write to the outbox could not proceed, meaning the outbox is full.
// If a reader can't keep up with the stream, we shut them down.
return errSlowReader
}
}
// newlineReader is used to write keep-alives to the client.
// keep-alives in the sse protocol are a single ':' colon followed by 2 newlines.
func newlineReader() io.Reader {
return bytes.NewBufferString(":\n\n")
}
// outboxWriteLoop runs in a separate goroutine. Its job is to write the values in the outbox to
// the client as fast as the client can read them.
func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.CancelFunc, w StreamingResponseWriter) {
var err error
defer func() {
if err != nil {
log.WithError(err).Debug("Event streamer shutting down due to error.")
}
}()
defer cancel()
// Write a keepalive at the start to test the connection and simplify test setup.
if err = es.writeOutbox(ctx, w, nil); err != nil {
return
}
kaT := time.NewTimer(es.keepAlive)
// Ensure the keepalive timer is stopped and drained if it has fired.
defer func() {
if !kaT.Stop() {
<-kaT.C
}
}()
for {
select {
case <-ctx.Done():
err = ctx.Err()
return return
case <-kaT.C:
if err = es.writeOutbox(ctx, w, nil); err != nil {
return
}
// In this case the timer doesn't need to be Stopped before the Reset call after the select statement,
// because the timer has already fired.
case lr := <-es.outbox:
if err = es.writeOutbox(ctx, w, lr); err != nil {
return
}
// We don't know if the timer fired concurrently to this case being ready, so we need to check the return
// of Stop and drain the timer channel if it fired. We won't need to do this in go 1.23.
if !kaT.Stop() {
<-kaT.C
}
}
kaT.Reset(es.keepAlive)
}
}
func (es *eventStreamer) writeOutbox(ctx context.Context, w StreamingResponseWriter, first lazyReader) error {
needKeepAlive := true
if first != nil {
if _, err := io.Copy(w, first()); err != nil {
return err
}
needKeepAlive = false
}
// While the first event was being read by the client, further events may be queued in the outbox.
// We can drain them right away rather than go back out to the outer select statement, where the keepAlive timer
// may have fired, triggering an unnecessary extra keep-alive write and flush.
for {
select {
case <-ctx.Done():
return ctx.Err()
case rf := <-es.outbox:
if _, err := io.Copy(w, rf()); err != nil {
return err
}
needKeepAlive = false
default:
if needKeepAlive {
if _, err := io.Copy(w, newlineReader()); err != nil {
return err
}
}
w.Flush()
return nil
} }
} }
} }
func handleBlockOperationEvents(w http.ResponseWriter, flusher http.Flusher, requestedTopics map[string]bool, event *feed.Event) error { func jsonMarshalReader(name string, v any) io.Reader {
switch event.Type { d, err := json.Marshal(v)
case operation.AggregatedAttReceived: if err != nil {
if _, ok := requestedTopics[AttestationTopic]; !ok { log.WithError(err).WithField("type_name", fmt.Sprintf("%T", v)).Error("Could not marshal event data.")
return nil return nil
}
attData, ok := event.Data.(*operation.AggregatedAttReceivedData)
if !ok {
return write(w, flusher, topicDataMismatch, event.Data, AttestationTopic)
}
att := structs.AttFromConsensus(attData.Attestation.Aggregate)
return send(w, flusher, AttestationTopic, att)
case operation.UnaggregatedAttReceived:
if _, ok := requestedTopics[AttestationTopic]; !ok {
return nil
}
attData, ok := event.Data.(*operation.UnAggregatedAttReceivedData)
if !ok {
return write(w, flusher, topicDataMismatch, event.Data, AttestationTopic)
}
a, ok := attData.Attestation.(*eth.Attestation)
if !ok {
return write(w, flusher, topicDataMismatch, event.Data, AttestationTopic)
}
att := structs.AttFromConsensus(a)
return send(w, flusher, AttestationTopic, att)
case operation.ExitReceived:
if _, ok := requestedTopics[VoluntaryExitTopic]; !ok {
return nil
}
exitData, ok := event.Data.(*operation.ExitReceivedData)
if !ok {
return write(w, flusher, topicDataMismatch, event.Data, VoluntaryExitTopic)
}
exit := structs.SignedExitFromConsensus(exitData.Exit)
return send(w, flusher, VoluntaryExitTopic, exit)
case operation.SyncCommitteeContributionReceived:
if _, ok := requestedTopics[SyncCommitteeContributionTopic]; !ok {
return nil
}
contributionData, ok := event.Data.(*operation.SyncCommitteeContributionReceivedData)
if !ok {
return write(w, flusher, topicDataMismatch, event.Data, SyncCommitteeContributionTopic)
}
contribution := structs.SignedContributionAndProofFromConsensus(contributionData.Contribution)
return send(w, flusher, SyncCommitteeContributionTopic, contribution)
case operation.BLSToExecutionChangeReceived:
if _, ok := requestedTopics[BLSToExecutionChangeTopic]; !ok {
return nil
}
changeData, ok := event.Data.(*operation.BLSToExecutionChangeReceivedData)
if !ok {
return write(w, flusher, topicDataMismatch, event.Data, BLSToExecutionChangeTopic)
}
return send(w, flusher, BLSToExecutionChangeTopic, structs.SignedBLSChangeFromConsensus(changeData.Change))
case operation.BlobSidecarReceived:
if _, ok := requestedTopics[BlobSidecarTopic]; !ok {
return nil
}
blobData, ok := event.Data.(*operation.BlobSidecarReceivedData)
if !ok {
return write(w, flusher, topicDataMismatch, event.Data, BlobSidecarTopic)
}
versionedHash := blockchain.ConvertKzgCommitmentToVersionedHash(blobData.Blob.KzgCommitment)
blobEvent := &structs.BlobSidecarEvent{
BlockRoot: hexutil.Encode(blobData.Blob.BlockRootSlice()),
Index: fmt.Sprintf("%d", blobData.Blob.Index),
Slot: fmt.Sprintf("%d", blobData.Blob.Slot()),
VersionedHash: versionedHash.String(),
KzgCommitment: hexutil.Encode(blobData.Blob.KzgCommitment),
}
return send(w, flusher, BlobSidecarTopic, blobEvent)
case operation.AttesterSlashingReceived:
if _, ok := requestedTopics[AttesterSlashingTopic]; !ok {
return nil
}
attesterSlashingData, ok := event.Data.(*operation.AttesterSlashingReceivedData)
if !ok {
return write(w, flusher, topicDataMismatch, event.Data, AttesterSlashingTopic)
}
slashing, ok := attesterSlashingData.AttesterSlashing.(*eth.AttesterSlashing)
if ok {
return send(w, flusher, AttesterSlashingTopic, structs.AttesterSlashingFromConsensus(slashing))
}
// TODO: extend to Electra
case operation.ProposerSlashingReceived:
if _, ok := requestedTopics[ProposerSlashingTopic]; !ok {
return nil
}
proposerSlashingData, ok := event.Data.(*operation.ProposerSlashingReceivedData)
if !ok {
return write(w, flusher, topicDataMismatch, event.Data, ProposerSlashingTopic)
}
return send(w, flusher, ProposerSlashingTopic, structs.ProposerSlashingFromConsensus(proposerSlashingData.ProposerSlashing))
} }
return nil return bytes.NewBufferString("event: " + name + "\ndata: " + string(d) + "\n\n")
} }
func (s *Server) handleStateEvents(ctx context.Context, w http.ResponseWriter, flusher http.Flusher, requestedTopics map[string]bool, event *feed.Event) error { func topicForEvent(event *feed.Event) string {
switch event.Type { switch event.Data.(type) {
case statefeed.NewHead: case *operation.AggregatedAttReceivedData:
if _, ok := requestedTopics[HeadTopic]; ok { return AttestationTopic
headData, ok := event.Data.(*ethpb.EventHead) case *operation.UnAggregatedAttReceivedData:
if !ok { return AttestationTopic
return write(w, flusher, topicDataMismatch, event.Data, HeadTopic) case *operation.ExitReceivedData:
} return VoluntaryExitTopic
head := &structs.HeadEvent{ case *operation.SyncCommitteeContributionReceivedData:
Slot: fmt.Sprintf("%d", headData.Slot), return SyncCommitteeContributionTopic
Block: hexutil.Encode(headData.Block), case *operation.BLSToExecutionChangeReceivedData:
State: hexutil.Encode(headData.State), return BLSToExecutionChangeTopic
EpochTransition: headData.EpochTransition, case *operation.BlobSidecarReceivedData:
ExecutionOptimistic: headData.ExecutionOptimistic, return BlobSidecarTopic
PreviousDutyDependentRoot: hexutil.Encode(headData.PreviousDutyDependentRoot), case *operation.AttesterSlashingReceivedData:
CurrentDutyDependentRoot: hexutil.Encode(headData.CurrentDutyDependentRoot), return AttesterSlashingTopic
} case *operation.ProposerSlashingReceivedData:
return send(w, flusher, HeadTopic, head) return ProposerSlashingTopic
case *ethpb.EventHead:
return HeadTopic
case *ethpb.EventFinalizedCheckpoint:
return FinalizedCheckpointTopic
case *ethpbv2.LightClientFinalityUpdateWithVersion:
return LightClientFinalityUpdateTopic
case *ethpbv2.LightClientOptimisticUpdateWithVersion:
return LightClientOptimisticUpdateTopic
case *ethpb.EventChainReorg:
return ChainReorgTopic
case *statefeed.BlockProcessedData:
return BlockTopic
default:
if event.Type == statefeed.MissedSlot {
return PayloadAttributesTopic
} }
if _, ok := requestedTopics[PayloadAttributesTopic]; ok { return InvalidTopic
return s.sendPayloadAttributes(ctx, w, flusher) }
} }
case statefeed.MissedSlot:
if _, ok := requestedTopics[PayloadAttributesTopic]; ok { func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topics *topicRequest) (lazyReader, error) {
return s.sendPayloadAttributes(ctx, w, flusher) eventName := topicForEvent(event)
} if !topics.requested(eventName) {
case statefeed.FinalizedCheckpoint: return nil, errNotRequested
if _, ok := requestedTopics[FinalizedCheckpointTopic]; !ok { }
return nil if eventName == PayloadAttributesTopic {
} return s.currentPayloadAttributes(ctx)
checkpointData, ok := event.Data.(*ethpb.EventFinalizedCheckpoint) }
if !ok { if event == nil || event.Data == nil {
return write(w, flusher, topicDataMismatch, event.Data, FinalizedCheckpointTopic) return nil, errors.New("event or event data is nil")
} }
checkpoint := &structs.FinalizedCheckpointEvent{ switch v := event.Data.(type) {
Block: hexutil.Encode(checkpointData.Block), case *ethpb.EventHead:
State: hexutil.Encode(checkpointData.State), // The head event is a special case because, if the client requested the payload attributes topic,
Epoch: fmt.Sprintf("%d", checkpointData.Epoch), // we send two event messages in reaction; the head event and the payload attributes.
ExecutionOptimistic: checkpointData.ExecutionOptimistic, headReader := func() io.Reader {
} return jsonMarshalReader(eventName, structs.HeadEventFromV1(v))
return send(w, flusher, FinalizedCheckpointTopic, checkpoint) }
case statefeed.LightClientFinalityUpdate: // Don't do the expensive attr lookup unless the client requested it.
if _, ok := requestedTopics[LightClientFinalityUpdateTopic]; !ok { if !topics.requested(PayloadAttributesTopic) {
return nil return headReader, nil
} }
updateData, ok := event.Data.(*ethpbv2.LightClientFinalityUpdateWithVersion) // Since payload attributes could change before the outbox is written, we need to do a blocking operation to
if !ok { // get the current payload attributes right here.
return write(w, flusher, topicDataMismatch, event.Data, LightClientFinalityUpdateTopic) attrReader, err := s.currentPayloadAttributes(ctx)
} if err != nil {
update, err := structs.LightClientFinalityUpdateFromConsensus(updateData.Data) return nil, errors.Wrap(err, "could not get payload attributes for head event")
if err != nil { }
return err return func() io.Reader {
} return io.MultiReader(headReader(), attrReader())
updateEvent := &structs.LightClientFinalityUpdateEvent{ }, nil
Version: version.String(int(updateData.Version)), case *operation.AggregatedAttReceivedData:
Data: update, return func() io.Reader {
} att := structs.AttFromConsensus(v.Attestation.Aggregate)
return send(w, flusher, LightClientFinalityUpdateTopic, updateEvent) return jsonMarshalReader(eventName, att)
case statefeed.LightClientOptimisticUpdate: }, nil
if _, ok := requestedTopics[LightClientOptimisticUpdateTopic]; !ok { case *operation.UnAggregatedAttReceivedData:
return nil att, ok := v.Attestation.(*eth.Attestation)
} if !ok {
updateData, ok := event.Data.(*ethpbv2.LightClientOptimisticUpdateWithVersion) return nil, errors.Wrapf(errUnhandledEventData, "Unexpected type %T for the .Attestation field of UnAggregatedAttReceivedData", v.Attestation)
if !ok { }
return write(w, flusher, topicDataMismatch, event.Data, LightClientOptimisticUpdateTopic) return func() io.Reader {
} att := structs.AttFromConsensus(att)
update, err := structs.LightClientOptimisticUpdateFromConsensus(updateData.Data) return jsonMarshalReader(eventName, att)
if err != nil { }, nil
return err case *operation.ExitReceivedData:
} return func() io.Reader {
updateEvent := &structs.LightClientOptimisticUpdateEvent{ return jsonMarshalReader(eventName, structs.SignedExitFromConsensus(v.Exit))
Version: version.String(int(updateData.Version)), }, nil
Data: update, case *operation.SyncCommitteeContributionReceivedData:
} return func() io.Reader {
return send(w, flusher, LightClientOptimisticUpdateTopic, updateEvent) return jsonMarshalReader(eventName, structs.SignedContributionAndProofFromConsensus(v.Contribution))
case statefeed.Reorg: }, nil
if _, ok := requestedTopics[ChainReorgTopic]; !ok { case *operation.BLSToExecutionChangeReceivedData:
return nil return func() io.Reader {
} return jsonMarshalReader(eventName, structs.SignedBLSChangeFromConsensus(v.Change))
reorgData, ok := event.Data.(*ethpb.EventChainReorg) }, nil
if !ok { case *operation.BlobSidecarReceivedData:
return write(w, flusher, topicDataMismatch, event.Data, ChainReorgTopic) return func() io.Reader {
} versionedHash := blockchain.ConvertKzgCommitmentToVersionedHash(v.Blob.KzgCommitment)
reorg := &structs.ChainReorgEvent{ return jsonMarshalReader(eventName, &structs.BlobSidecarEvent{
Slot: fmt.Sprintf("%d", reorgData.Slot), BlockRoot: hexutil.Encode(v.Blob.BlockRootSlice()),
Depth: fmt.Sprintf("%d", reorgData.Depth), Index: fmt.Sprintf("%d", v.Blob.Index),
OldHeadBlock: hexutil.Encode(reorgData.OldHeadBlock), Slot: fmt.Sprintf("%d", v.Blob.Slot()),
NewHeadBlock: hexutil.Encode(reorgData.NewHeadBlock), VersionedHash: versionedHash.String(),
OldHeadState: hexutil.Encode(reorgData.OldHeadState), KzgCommitment: hexutil.Encode(v.Blob.KzgCommitment),
NewHeadState: hexutil.Encode(reorgData.NewHeadState), })
Epoch: fmt.Sprintf("%d", reorgData.Epoch), }, nil
ExecutionOptimistic: reorgData.ExecutionOptimistic, case *operation.AttesterSlashingReceivedData:
} slashing, ok := v.AttesterSlashing.(*eth.AttesterSlashing)
return send(w, flusher, ChainReorgTopic, reorg) if !ok {
case statefeed.BlockProcessed: return nil, errors.Wrapf(errUnhandledEventData, "Unexpected type %T for the .AttesterSlashing field of AttesterSlashingReceivedData", v.AttesterSlashing)
if _, ok := requestedTopics[BlockTopic]; !ok { }
return nil return func() io.Reader {
} return jsonMarshalReader(eventName, structs.AttesterSlashingFromConsensus(slashing))
blkData, ok := event.Data.(*statefeed.BlockProcessedData) }, nil
if !ok { case *operation.ProposerSlashingReceivedData:
return write(w, flusher, topicDataMismatch, event.Data, BlockTopic) return func() io.Reader {
} return jsonMarshalReader(eventName, structs.ProposerSlashingFromConsensus(v.ProposerSlashing))
blockRoot, err := blkData.SignedBlock.Block().HashTreeRoot() }, nil
if err != nil { case *ethpb.EventFinalizedCheckpoint:
return write(w, flusher, "Could not get block root: "+err.Error()) return func() io.Reader {
} return jsonMarshalReader(eventName, structs.FinalizedCheckpointEventFromV1(v))
blk := &structs.BlockEvent{ }, nil
Slot: fmt.Sprintf("%d", blkData.Slot), case *ethpbv2.LightClientFinalityUpdateWithVersion:
Block: hexutil.Encode(blockRoot[:]), cv, err := structs.LightClientFinalityUpdateFromConsensus(v.Data)
ExecutionOptimistic: blkData.Optimistic, if err != nil {
} return nil, errors.Wrap(err, "LightClientFinalityUpdateWithVersion event conversion failure")
return send(w, flusher, BlockTopic, blk) }
ev := &structs.LightClientFinalityUpdateEvent{
Version: version.String(int(v.Version)),
Data: cv,
}
return func() io.Reader {
return jsonMarshalReader(eventName, ev)
}, nil
case *ethpbv2.LightClientOptimisticUpdateWithVersion:
cv, err := structs.LightClientOptimisticUpdateFromConsensus(v.Data)
if err != nil {
return nil, errors.Wrap(err, "LightClientOptimisticUpdateWithVersion event conversion failure")
}
ev := &structs.LightClientOptimisticUpdateEvent{
Version: version.String(int(v.Version)),
Data: cv,
}
return func() io.Reader {
return jsonMarshalReader(eventName, ev)
}, nil
case *ethpb.EventChainReorg:
return func() io.Reader {
return jsonMarshalReader(eventName, structs.EventChainReorgFromV1(v))
}, nil
case *statefeed.BlockProcessedData:
blockRoot, err := v.SignedBlock.Block().HashTreeRoot()
if err != nil {
return nil, errors.Wrap(err, "could not compute block root for BlockProcessedData state feed event")
}
return func() io.Reader {
blk := &structs.BlockEvent{
Slot: fmt.Sprintf("%d", v.Slot),
Block: hexutil.Encode(blockRoot[:]),
ExecutionOptimistic: v.Optimistic,
}
return jsonMarshalReader(eventName, blk)
}, nil
default:
return nil, errors.Wrapf(errUnhandledEventData, "event data type %T unsupported", v)
} }
return nil
} }
// This event stream is intended to be used by builders and relays. // This event stream is intended to be used by builders and relays.
// Parent fields are based on state at N_{current_slot}, while the rest of fields are based on state of N_{current_slot + 1} // Parent fields are based on state at N_{current_slot}, while the rest of fields are based on state of N_{current_slot + 1}
func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWriter, flusher http.Flusher) error { func (s *Server) currentPayloadAttributes(ctx context.Context) (lazyReader, error) {
headRoot, err := s.HeadFetcher.HeadRoot(ctx) headRoot, err := s.HeadFetcher.HeadRoot(ctx)
if err != nil { if err != nil {
return write(w, flusher, "Could not get head root: "+err.Error()) return nil, errors.Wrap(err, "could not get head root")
} }
st, err := s.HeadFetcher.HeadState(ctx) st, err := s.HeadFetcher.HeadState(ctx)
if err != nil { if err != nil {
return write(w, flusher, "Could not get head state: "+err.Error()) return nil, errors.Wrap(err, "could not get head state")
} }
// advance the head state // advance the head state
headState, err := transition.ProcessSlotsIfPossible(ctx, st, s.ChainInfoFetcher.CurrentSlot()+1) headState, err := transition.ProcessSlotsIfPossible(ctx, st, s.ChainInfoFetcher.CurrentSlot()+1)
if err != nil { if err != nil {
return write(w, flusher, "Could not advance head state: "+err.Error()) return nil, errors.Wrap(err, "could not advance head state")
} }
headBlock, err := s.HeadFetcher.HeadBlock(ctx) headBlock, err := s.HeadFetcher.HeadBlock(ctx)
if err != nil { if err != nil {
return write(w, flusher, "Could not get head block: "+err.Error()) return nil, errors.Wrap(err, "could not get head block")
} }
headPayload, err := headBlock.Block().Body().Execution() headPayload, err := headBlock.Block().Body().Execution()
if err != nil { if err != nil {
return write(w, flusher, "Could not get execution payload: "+err.Error()) return nil, errors.Wrap(err, "could not get execution payload")
} }
t, err := slots.ToTime(headState.GenesisTime(), headState.Slot()) t, err := slots.ToTime(headState.GenesisTime(), headState.Slot())
if err != nil { if err != nil {
return write(w, flusher, "Could not get head state slot time: "+err.Error()) return nil, errors.Wrap(err, "could not get head state slot time")
} }
prevRando, err := helpers.RandaoMix(headState, time.CurrentEpoch(headState)) prevRando, err := helpers.RandaoMix(headState, chaintime.CurrentEpoch(headState))
if err != nil { if err != nil {
return write(w, flusher, "Could not get head state randao mix: "+err.Error()) return nil, errors.Wrap(err, "could not get head state randao mix")
} }
proposerIndex, err := helpers.BeaconProposerIndex(ctx, headState) proposerIndex, err := helpers.BeaconProposerIndex(ctx, headState)
if err != nil { if err != nil {
return write(w, flusher, "Could not get head state proposer index: "+err.Error()) return nil, errors.Wrap(err, "could not get head state proposer index")
} }
feeRecipient := params.BeaconConfig().DefaultFeeRecipient.Bytes() feeRecipient := params.BeaconConfig().DefaultFeeRecipient.Bytes()
tValidator, exists := s.TrackedValidatorsCache.Validator(proposerIndex) tValidator, exists := s.TrackedValidatorsCache.Validator(proposerIndex)
@@ -425,7 +575,7 @@ func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWrite
case version.Capella: case version.Capella:
withdrawals, _, err := headState.ExpectedWithdrawals() withdrawals, _, err := headState.ExpectedWithdrawals()
if err != nil { if err != nil {
return write(w, flusher, "Could not get head state expected withdrawals: "+err.Error()) return nil, errors.Wrap(err, "could not get head state expected withdrawals")
} }
attributes = &structs.PayloadAttributesV2{ attributes = &structs.PayloadAttributesV2{
Timestamp: fmt.Sprintf("%d", t.Unix()), Timestamp: fmt.Sprintf("%d", t.Unix()),
@@ -436,11 +586,11 @@ func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWrite
case version.Deneb, version.Electra: case version.Deneb, version.Electra:
withdrawals, _, err := headState.ExpectedWithdrawals() withdrawals, _, err := headState.ExpectedWithdrawals()
if err != nil { if err != nil {
return write(w, flusher, "Could not get head state expected withdrawals: "+err.Error()) return nil, errors.Wrap(err, "could not get head state expected withdrawals")
} }
parentRoot, err := headBlock.Block().HashTreeRoot() parentRoot, err := headBlock.Block().HashTreeRoot()
if err != nil { if err != nil {
return write(w, flusher, "Could not get head block root: "+err.Error()) return nil, errors.Wrap(err, "could not get head block root")
} }
attributes = &structs.PayloadAttributesV3{ attributes = &structs.PayloadAttributesV3{
Timestamp: fmt.Sprintf("%d", t.Unix()), Timestamp: fmt.Sprintf("%d", t.Unix()),
@@ -450,12 +600,12 @@ func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWrite
ParentBeaconBlockRoot: hexutil.Encode(parentRoot[:]), ParentBeaconBlockRoot: hexutil.Encode(parentRoot[:]),
} }
default: default:
return write(w, flusher, "Payload version %s is not supported", version.String(headState.Version())) return nil, errors.Wrapf(err, "Payload version %s is not supported", version.String(headState.Version()))
} }
attributesBytes, err := json.Marshal(attributes) attributesBytes, err := json.Marshal(attributes)
if err != nil { if err != nil {
return write(w, flusher, err.Error()) return nil, errors.Wrap(err, "errors marshaling payload attributes to json")
} }
eventData := structs.PayloadAttributesEventData{ eventData := structs.PayloadAttributesEventData{
ProposerIndex: fmt.Sprintf("%d", proposerIndex), ProposerIndex: fmt.Sprintf("%d", proposerIndex),
@@ -467,31 +617,12 @@ func (s *Server) sendPayloadAttributes(ctx context.Context, w http.ResponseWrite
} }
eventDataBytes, err := json.Marshal(eventData) eventDataBytes, err := json.Marshal(eventData)
if err != nil { if err != nil {
return write(w, flusher, err.Error()) return nil, errors.Wrap(err, "errors marshaling payload attributes event data to json")
} }
return send(w, flusher, PayloadAttributesTopic, &structs.PayloadAttributesEvent{ return func() io.Reader {
Version: version.String(headState.Version()), return jsonMarshalReader(PayloadAttributesTopic, &structs.PayloadAttributesEvent{
Data: eventDataBytes, Version: version.String(headState.Version()),
}) Data: eventDataBytes,
} })
}, nil
func send(w http.ResponseWriter, flusher http.Flusher, name string, data interface{}) error {
j, err := json.Marshal(data)
if err != nil {
return write(w, flusher, "Could not marshal event to JSON: "+err.Error())
}
return write(w, flusher, "event: %s\ndata: %s\n\n", name, string(j))
}
func sendKeepalive(w http.ResponseWriter, flusher http.Flusher) error {
return write(w, flusher, ":\n\n")
}
func write(w http.ResponseWriter, flusher http.Flusher, format string, a ...any) error {
_, err := fmt.Fprintf(w, format, a...)
if err != nil {
return errors.Wrap(err, "could not write to response writer")
}
flusher.Flush()
return nil
} }

View File

@@ -1,6 +1,7 @@
package events package events
import ( import (
"context"
"fmt" "fmt"
"io" "io"
"math" "math"
@@ -23,56 +24,99 @@ import (
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/eth/v1" ethpb "github.com/prysmaticlabs/prysm/v5/proto/eth/v1"
eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" eth "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/assert"
"github.com/prysmaticlabs/prysm/v5/testing/require" "github.com/prysmaticlabs/prysm/v5/testing/require"
"github.com/prysmaticlabs/prysm/v5/testing/util" "github.com/prysmaticlabs/prysm/v5/testing/util"
sse "github.com/r3labs/sse/v2"
) )
type flushableResponseRecorder struct { func requireAllEventsReceived(t *testing.T, stn, opn *mockChain.EventFeedWrapper, events []*feed.Event, req *topicRequest, s *Server, w *StreamingResponseWriterRecorder) {
*httptest.ResponseRecorder // maxBufferSize param copied from sse lib client code
flushed bool sseR := sse.NewEventStreamReader(w.Body(), 1<<24)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
expected := make(map[string]bool)
for i := range events {
ev := events[i]
// serialize the event the same way the server will so that we can compare expectation to results.
top := topicForEvent(ev)
eb, err := s.lazyReaderForEvent(context.Background(), ev, req)
require.NoError(t, err)
exb, err := io.ReadAll(eb())
require.NoError(t, err)
exs := string(exb[0 : len(exb)-2]) // remove trailing double newline
if topicsForOpsFeed[top] {
if err := opn.WaitForSubscription(ctx); err != nil {
t.Fatal(err)
}
// Send the event on the feed.
s.OperationNotifier.OperationFeed().Send(ev)
} else {
if err := stn.WaitForSubscription(ctx); err != nil {
t.Fatal(err)
}
// Send the event on the feed.
s.StateNotifier.StateFeed().Send(ev)
}
expected[exs] = true
}
done := make(chan struct{})
go func() {
defer close(done)
for {
ev, err := sseR.ReadEvent()
if err == io.EOF {
return
}
require.NoError(t, err)
str := string(ev)
delete(expected, str)
if len(expected) == 0 {
return
}
}
}()
select {
case <-done:
break
case <-ctx.Done():
t.Fatalf("context canceled / timed out waiting for events, err=%v", ctx.Err())
}
require.Equal(t, 0, len(expected), "expected events not seen")
} }
func (f *flushableResponseRecorder) Flush() { func (tr *topicRequest) testHttpRequest(_ *testing.T) *http.Request {
f.flushed = true tq := make([]string, 0, len(tr.topics))
for topic := range tr.topics {
tq = append(tq, "topics="+topic)
}
return httptest.NewRequest(http.MethodGet, fmt.Sprintf("http://example.com/eth/v1/events?%s", strings.Join(tq, "&")), nil)
} }
func TestStreamEvents_OperationsEvents(t *testing.T) { func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
t.Run("operations", func(t *testing.T) { topics, err := newTopicRequest([]string{
s := &Server{ AttestationTopic,
StateNotifier: &mockChain.MockStateNotifier{}, VoluntaryExitTopic,
OperationNotifier: &mockChain.MockOperationNotifier{}, SyncCommitteeContributionTopic,
} BLSToExecutionChangeTopic,
BlobSidecarTopic,
AttesterSlashingTopic,
ProposerSlashingTopic,
})
require.NoError(t, err)
ro, err := blocks.NewROBlob(util.HydrateBlobSidecar(&eth.BlobSidecar{}))
require.NoError(t, err)
vblob := blocks.NewVerifiedROBlob(ro)
topics := []string{ return topics, []*feed.Event{
AttestationTopic, &feed.Event{
VoluntaryExitTopic,
SyncCommitteeContributionTopic,
BLSToExecutionChangeTopic,
BlobSidecarTopic,
AttesterSlashingTopic,
ProposerSlashingTopic,
}
for i, topic := range topics {
topics[i] = "topics=" + topic
}
request := httptest.NewRequest(http.MethodGet, fmt.Sprintf("http://example.com/eth/v1/events?%s", strings.Join(topics, "&")), nil)
w := &flushableResponseRecorder{
ResponseRecorder: httptest.NewRecorder(),
}
go func() {
s.StreamEvents(w, request)
}()
// wait for initiation of StreamEvents
time.Sleep(100 * time.Millisecond)
s.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived, Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{ Data: &operation.UnAggregatedAttReceivedData{
Attestation: util.HydrateAttestation(&eth.Attestation{}), Attestation: util.HydrateAttestation(&eth.Attestation{}),
}, },
}) },
s.OperationNotifier.OperationFeed().Send(&feed.Event{ &feed.Event{
Type: operation.AggregatedAttReceived, Type: operation.AggregatedAttReceived,
Data: &operation.AggregatedAttReceivedData{ Data: &operation.AggregatedAttReceivedData{
Attestation: &eth.AggregateAttestationAndProof{ Attestation: &eth.AggregateAttestationAndProof{
@@ -81,8 +125,8 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
SelectionProof: make([]byte, 96), SelectionProof: make([]byte, 96),
}, },
}, },
}) },
s.OperationNotifier.OperationFeed().Send(&feed.Event{ &feed.Event{
Type: operation.ExitReceived, Type: operation.ExitReceived,
Data: &operation.ExitReceivedData{ Data: &operation.ExitReceivedData{
Exit: &eth.SignedVoluntaryExit{ Exit: &eth.SignedVoluntaryExit{
@@ -93,8 +137,8 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
Signature: make([]byte, 96), Signature: make([]byte, 96),
}, },
}, },
}) },
s.OperationNotifier.OperationFeed().Send(&feed.Event{ &feed.Event{
Type: operation.SyncCommitteeContributionReceived, Type: operation.SyncCommitteeContributionReceived,
Data: &operation.SyncCommitteeContributionReceivedData{ Data: &operation.SyncCommitteeContributionReceivedData{
Contribution: &eth.SignedContributionAndProof{ Contribution: &eth.SignedContributionAndProof{
@@ -112,8 +156,8 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
Signature: make([]byte, 96), Signature: make([]byte, 96),
}, },
}, },
}) },
s.OperationNotifier.OperationFeed().Send(&feed.Event{ &feed.Event{
Type: operation.BLSToExecutionChangeReceived, Type: operation.BLSToExecutionChangeReceived,
Data: &operation.BLSToExecutionChangeReceivedData{ Data: &operation.BLSToExecutionChangeReceivedData{
Change: &eth.SignedBLSToExecutionChange{ Change: &eth.SignedBLSToExecutionChange{
@@ -125,18 +169,14 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
Signature: make([]byte, 96), Signature: make([]byte, 96),
}, },
}, },
}) },
ro, err := blocks.NewROBlob(util.HydrateBlobSidecar(&eth.BlobSidecar{})) &feed.Event{
require.NoError(t, err)
vblob := blocks.NewVerifiedROBlob(ro)
s.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.BlobSidecarReceived, Type: operation.BlobSidecarReceived,
Data: &operation.BlobSidecarReceivedData{ Data: &operation.BlobSidecarReceivedData{
Blob: &vblob, Blob: &vblob,
}, },
}) },
&feed.Event{
s.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.AttesterSlashingReceived, Type: operation.AttesterSlashingReceived,
Data: &operation.AttesterSlashingReceivedData{ Data: &operation.AttesterSlashingReceivedData{
AttesterSlashing: &eth.AttesterSlashing{ AttesterSlashing: &eth.AttesterSlashing{
@@ -168,9 +208,8 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
}, },
}, },
}, },
}) },
&feed.Event{
s.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.ProposerSlashingReceived, Type: operation.ProposerSlashingReceived,
Data: &operation.ProposerSlashingReceivedData{ Data: &operation.ProposerSlashingReceivedData{
ProposerSlashing: &eth.ProposerSlashing{ ProposerSlashing: &eth.ProposerSlashing{
@@ -192,100 +231,107 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
}, },
}, },
}, },
}) },
}
}
time.Sleep(1 * time.Second) func TestStreamEvents_OperationsEvents(t *testing.T) {
request.Context().Done() t.Run("operations", func(t *testing.T) {
stn := mockChain.NewEventFeedWrapper()
opn := mockChain.NewEventFeedWrapper()
s := &Server{
StateNotifier: &mockChain.SimpleNotifier{Feed: stn},
OperationNotifier: &mockChain.SimpleNotifier{Feed: opn},
}
resp := w.Result() topics, events := operationEventsFixtures(t)
body, err := io.ReadAll(resp.Body) request := topics.testHttpRequest(t)
require.NoError(t, err) w := NewStreamingResponseWriterRecorder()
require.NotNil(t, body)
assert.Equal(t, operationsResult, string(body)) go func() {
s.StreamEvents(w, request)
}()
requireAllEventsReceived(t, stn, opn, events, topics, s, w)
}) })
t.Run("state", func(t *testing.T) { t.Run("state", func(t *testing.T) {
stn := mockChain.NewEventFeedWrapper()
opn := mockChain.NewEventFeedWrapper()
s := &Server{ s := &Server{
StateNotifier: &mockChain.MockStateNotifier{}, StateNotifier: &mockChain.SimpleNotifier{Feed: stn},
OperationNotifier: &mockChain.MockOperationNotifier{}, OperationNotifier: &mockChain.SimpleNotifier{Feed: opn},
} }
topics := []string{HeadTopic, FinalizedCheckpointTopic, ChainReorgTopic, BlockTopic} topics, err := newTopicRequest([]string{
for i, topic := range topics { HeadTopic,
topics[i] = "topics=" + topic FinalizedCheckpointTopic,
} ChainReorgTopic,
request := httptest.NewRequest(http.MethodGet, fmt.Sprintf("http://example.com/eth/v1/events?%s", strings.Join(topics, "&")), nil) BlockTopic,
w := &flushableResponseRecorder{ })
ResponseRecorder: httptest.NewRecorder(), require.NoError(t, err)
request := topics.testHttpRequest(t)
w := NewStreamingResponseWriterRecorder()
b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlock(&eth.SignedBeaconBlock{}))
require.NoError(t, err)
events := []*feed.Event{
&feed.Event{
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: 0,
BlockRoot: [32]byte{},
SignedBlock: b,
Verified: true,
Optimistic: false,
},
},
&feed.Event{
Type: statefeed.NewHead,
Data: &ethpb.EventHead{
Slot: 0,
Block: make([]byte, 32),
State: make([]byte, 32),
EpochTransition: true,
PreviousDutyDependentRoot: make([]byte, 32),
CurrentDutyDependentRoot: make([]byte, 32),
ExecutionOptimistic: false,
},
},
&feed.Event{
Type: statefeed.Reorg,
Data: &ethpb.EventChainReorg{
Slot: 0,
Depth: 0,
OldHeadBlock: make([]byte, 32),
NewHeadBlock: make([]byte, 32),
OldHeadState: make([]byte, 32),
NewHeadState: make([]byte, 32),
Epoch: 0,
ExecutionOptimistic: false,
},
},
&feed.Event{
Type: statefeed.FinalizedCheckpoint,
Data: &ethpb.EventFinalizedCheckpoint{
Block: make([]byte, 32),
State: make([]byte, 32),
Epoch: 0,
ExecutionOptimistic: false,
},
},
} }
go func() { go func() {
s.StreamEvents(w, request) s.StreamEvents(w, request)
}() }()
// wait for initiation of StreamEvents
time.Sleep(100 * time.Millisecond)
s.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.NewHead,
Data: &ethpb.EventHead{
Slot: 0,
Block: make([]byte, 32),
State: make([]byte, 32),
EpochTransition: true,
PreviousDutyDependentRoot: make([]byte, 32),
CurrentDutyDependentRoot: make([]byte, 32),
ExecutionOptimistic: false,
},
})
s.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.FinalizedCheckpoint,
Data: &ethpb.EventFinalizedCheckpoint{
Block: make([]byte, 32),
State: make([]byte, 32),
Epoch: 0,
ExecutionOptimistic: false,
},
})
s.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.Reorg,
Data: &ethpb.EventChainReorg{
Slot: 0,
Depth: 0,
OldHeadBlock: make([]byte, 32),
NewHeadBlock: make([]byte, 32),
OldHeadState: make([]byte, 32),
NewHeadState: make([]byte, 32),
Epoch: 0,
ExecutionOptimistic: false,
},
})
b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlock(&eth.SignedBeaconBlock{}))
require.NoError(t, err)
s.StateNotifier.StateFeed().Send(&feed.Event{
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: 0,
BlockRoot: [32]byte{},
SignedBlock: b,
Verified: true,
Optimistic: false,
},
})
// wait for feed requireAllEventsReceived(t, stn, opn, events, topics, s, w)
time.Sleep(1 * time.Second)
request.Context().Done()
resp := w.Result()
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NotNil(t, body)
assert.Equal(t, stateResult, string(body))
}) })
t.Run("payload attributes", func(t *testing.T) { t.Run("payload attributes", func(t *testing.T) {
type testCase struct { type testCase struct {
name string name string
getState func() state.BeaconState getState func() state.BeaconState
getBlock func() interfaces.SignedBeaconBlock getBlock func() interfaces.SignedBeaconBlock
expected string
SetTrackedValidatorsCache func(*cache.TrackedValidatorsCache) SetTrackedValidatorsCache func(*cache.TrackedValidatorsCache)
} }
testCases := []testCase{ testCases := []testCase{
@@ -301,7 +347,6 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
return b return b
}, },
expected: payloadAttributesBellatrixResult,
}, },
{ {
name: "capella", name: "capella",
@@ -315,7 +360,6 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
return b return b
}, },
expected: payloadAttributesCapellaResult,
}, },
{ {
name: "deneb", name: "deneb",
@@ -329,7 +373,6 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
return b return b
}, },
expected: payloadAttributesDenebResult,
}, },
{ {
name: "electra", name: "electra",
@@ -343,7 +386,6 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
return b return b
}, },
expected: payloadAttributesElectraResultWithTVC,
SetTrackedValidatorsCache: func(c *cache.TrackedValidatorsCache) { SetTrackedValidatorsCache: func(c *cache.TrackedValidatorsCache) {
c.Set(cache.TrackedValidator{ c.Set(cache.TrackedValidator{
Active: true, Active: true,
@@ -368,9 +410,11 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
Slot: &currentSlot, Slot: &currentSlot,
} }
stn := mockChain.NewEventFeedWrapper()
opn := mockChain.NewEventFeedWrapper()
s := &Server{ s := &Server{
StateNotifier: &mockChain.MockStateNotifier{}, StateNotifier: &mockChain.SimpleNotifier{Feed: stn},
OperationNotifier: &mockChain.MockOperationNotifier{}, OperationNotifier: &mockChain.SimpleNotifier{Feed: opn},
HeadFetcher: mockChainService, HeadFetcher: mockChainService,
ChainInfoFetcher: mockChainService, ChainInfoFetcher: mockChainService,
TrackedValidatorsCache: cache.NewTrackedValidatorsCache(), TrackedValidatorsCache: cache.NewTrackedValidatorsCache(),
@@ -378,100 +422,76 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
if tc.SetTrackedValidatorsCache != nil { if tc.SetTrackedValidatorsCache != nil {
tc.SetTrackedValidatorsCache(s.TrackedValidatorsCache) tc.SetTrackedValidatorsCache(s.TrackedValidatorsCache)
} }
topics, err := newTopicRequest([]string{PayloadAttributesTopic})
request := httptest.NewRequest(http.MethodGet, fmt.Sprintf("http://example.com/eth/v1/events?topics=%s", PayloadAttributesTopic), nil) require.NoError(t, err)
w := &flushableResponseRecorder{ request := topics.testHttpRequest(t)
ResponseRecorder: httptest.NewRecorder(), w := NewStreamingResponseWriterRecorder()
} events := []*feed.Event{&feed.Event{Type: statefeed.MissedSlot}}
go func() { go func() {
s.StreamEvents(w, request) s.StreamEvents(w, request)
}() }()
// wait for initiation of StreamEvents requireAllEventsReceived(t, stn, opn, events, topics, s, w)
time.Sleep(100 * time.Millisecond)
s.StateNotifier.StateFeed().Send(&feed.Event{Type: statefeed.MissedSlot})
// wait for feed
time.Sleep(1 * time.Second)
request.Context().Done()
resp := w.Result()
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NotNil(t, body)
assert.Equal(t, tc.expected, string(body), "wrong result for "+tc.name)
} }
}) })
} }
const operationsResult = `: func TestStuckReader(t *testing.T) {
topics, events := operationEventsFixtures(t)
require.Equal(t, 8, len(events))
// set eventFeedDepth to a number lower than the events we intend to send to force the server to drop the reader.
stn := mockChain.NewEventFeedWrapper()
opn := mockChain.NewEventFeedWrapper()
s := &Server{
StateNotifier: &mockChain.SimpleNotifier{Feed: stn},
OperationNotifier: &mockChain.SimpleNotifier{Feed: opn},
EventFeedDepth: len(events) - 1,
}
event: attestation ctx, cancel := context.WithTimeout(context.Background(), time.Second)
data: {"aggregation_bits":"0x00","data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"} defer cancel()
eventsWritten := make(chan struct{})
go func() {
for i := range events {
ev := events[i]
top := topicForEvent(ev)
if topicsForOpsFeed[top] {
err := opn.WaitForSubscription(ctx)
require.NoError(t, err)
s.OperationNotifier.OperationFeed().Send(ev)
} else {
err := stn.WaitForSubscription(ctx)
require.NoError(t, err)
s.StateNotifier.StateFeed().Send(ev)
}
}
close(eventsWritten)
}()
event: attestation request := topics.testHttpRequest(t)
data: {"aggregation_bits":"0x00","data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"} w := NewStreamingResponseWriterRecorder()
event: voluntary_exit handlerFinished := make(chan struct{})
data: {"message":{"epoch":"0","validator_index":"0"},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"} go func() {
s.StreamEvents(w, request)
close(handlerFinished)
}()
event: contribution_and_proof // Make sure that the stream writer shut down when the reader failed to clear the write buffer.
data: {"message":{"aggregator_index":"0","contribution":{"slot":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","subcommittee_index":"0","aggregation_bits":"0x00000000000000000000000000000000","signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"},"selection_proof":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"} select {
case <-handlerFinished:
// We expect the stream handler to max out the queue buffer and exit gracefully.
return
case <-ctx.Done():
t.Fatalf("context canceled / timed out waiting for handler completion, err=%v", ctx.Err())
}
event: bls_to_execution_change // Also make sure all the events were written.
data: {"message":{"validator_index":"0","from_bls_pubkey":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","to_execution_address":"0x0000000000000000000000000000000000000000"},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"} select {
case <-eventsWritten:
event: blob_sidecar // We expect the stream handler to max out the queue buffer and exit gracefully.
data: {"block_root":"0xc78009fdf07fc56a11f122370658a353aaa542ed63e44c4bc15ff4cd105ab33c","index":"0","slot":"0","kzg_commitment":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","versioned_hash":"0x01b0761f87b081d5cf10757ccc89f12be355c70e2e29df288b65b30710dcbcd1"} return
case <-ctx.Done():
event: attester_slashing t.Fatalf("context canceled / timed out waiting to write all events, err=%v", ctx.Err())
data: {"attestation_1":{"attesting_indices":["0","1"],"data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"},"attestation_2":{"attesting_indices":["0","1"],"data":{"slot":"0","index":"0","beacon_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","source":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"target":{"epoch":"0","root":"0x0000000000000000000000000000000000000000000000000000000000000000"}},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"}} }
}
event: proposer_slashing
data: {"signed_header_1":{"message":{"slot":"0","proposer_index":"0","parent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","body_root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"},"signed_header_2":{"message":{"slot":"0","proposer_index":"0","parent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","state_root":"0x0000000000000000000000000000000000000000000000000000000000000000","body_root":"0x0000000000000000000000000000000000000000000000000000000000000000"},"signature":"0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000"}}
`
const stateResult = `:
event: head
data: {"slot":"0","block":"0x0000000000000000000000000000000000000000000000000000000000000000","state":"0x0000000000000000000000000000000000000000000000000000000000000000","epoch_transition":true,"execution_optimistic":false,"previous_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","current_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000"}
event: finalized_checkpoint
data: {"block":"0x0000000000000000000000000000000000000000000000000000000000000000","state":"0x0000000000000000000000000000000000000000000000000000000000000000","epoch":"0","execution_optimistic":false}
event: chain_reorg
data: {"slot":"0","depth":"0","old_head_block":"0x0000000000000000000000000000000000000000000000000000000000000000","old_head_state":"0x0000000000000000000000000000000000000000000000000000000000000000","new_head_block":"0x0000000000000000000000000000000000000000000000000000000000000000","new_head_state":"0x0000000000000000000000000000000000000000000000000000000000000000","epoch":"0","execution_optimistic":false}
event: block
data: {"slot":"0","block":"0xeade62f0457b2fdf48e7d3fc4b60736688286be7c7a3ac4c9a16a5e0600bd9e4","execution_optimistic":false}
`
const payloadAttributesBellatrixResult = `:
event: payload_attributes
data: {"version":"bellatrix","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000"}}}
`
const payloadAttributesCapellaResult = `:
event: payload_attributes
data: {"version":"capella","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[]}}}
`
const payloadAttributesDenebResult = `:
event: payload_attributes
data: {"version":"deneb","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0x0000000000000000000000000000000000000000","withdrawals":[],"parent_beacon_block_root":"0xbef96cb938fd48b2403d3e662664325abb0102ed12737cbb80d717520e50cf4a"}}}
`
const payloadAttributesElectraResultWithTVC = `:
event: payload_attributes
data: {"version":"electra","data":{"proposer_index":"0","proposal_slot":"1","parent_block_number":"0","parent_block_root":"0x0000000000000000000000000000000000000000000000000000000000000000","parent_block_hash":"0x0000000000000000000000000000000000000000000000000000000000000000","payload_attributes":{"timestamp":"12","prev_randao":"0x0000000000000000000000000000000000000000000000000000000000000000","suggested_fee_recipient":"0xd2dbd02e4efe087d7d195de828b9dd25f19a89c9","withdrawals":[],"parent_beacon_block_root":"0xf2110e448638f41cb34514ecdbb49c055536cd5f715f1cb259d1287bb900853e"}}}
`

View File

@@ -0,0 +1,75 @@
package events
import (
"io"
"net/http"
"net/http/httptest"
"testing"
"github.com/prysmaticlabs/prysm/v5/testing/require"
)
type StreamingResponseWriterRecorder struct {
http.ResponseWriter
r io.Reader
w io.Writer
statusWritten *int
status chan int
bodyRecording []byte
flushed bool
}
func (w *StreamingResponseWriterRecorder) StatusChan() chan int {
return w.status
}
func NewStreamingResponseWriterRecorder() *StreamingResponseWriterRecorder {
r, w := io.Pipe()
return &StreamingResponseWriterRecorder{
ResponseWriter: httptest.NewRecorder(),
r: r,
w: w,
status: make(chan int, 1),
}
}
// Write implements http.ResponseWriter.
func (w *StreamingResponseWriterRecorder) Write(data []byte) (int, error) {
w.WriteHeader(http.StatusOK)
n, err := w.w.Write(data)
if err != nil {
return n, err
}
return w.ResponseWriter.Write(data)
}
// WriteHeader implements http.ResponseWriter.
func (w *StreamingResponseWriterRecorder) WriteHeader(statusCode int) {
if w.statusWritten != nil {
return
}
w.statusWritten = &statusCode
w.status <- statusCode
w.ResponseWriter.WriteHeader(statusCode)
}
func (w *StreamingResponseWriterRecorder) Body() io.Reader {
return w.r
}
func (w *StreamingResponseWriterRecorder) RequireStatus(t *testing.T, status int) {
if w.statusWritten == nil {
t.Fatal("WriteHeader was not called")
}
require.Equal(t, status, *w.statusWritten)
}
func (w *StreamingResponseWriterRecorder) Flush() {
fw, ok := w.ResponseWriter.(http.Flusher)
if ok {
fw.Flush()
}
w.flushed = true
}
var _ http.ResponseWriter = &StreamingResponseWriterRecorder{}

View File

@@ -4,6 +4,8 @@
package events package events
import ( import (
"time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain" "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/cache" "github.com/prysmaticlabs/prysm/v5/beacon-chain/cache"
opfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation" opfeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/operation"
@@ -18,4 +20,6 @@ type Server struct {
HeadFetcher blockchain.HeadFetcher HeadFetcher blockchain.HeadFetcher
ChainInfoFetcher blockchain.ChainInfoFetcher ChainInfoFetcher blockchain.ChainInfoFetcher
TrackedValidatorsCache *cache.TrackedValidatorsCache TrackedValidatorsCache *cache.TrackedValidatorsCache
KeepAliveInterval time.Duration
EventFeedDepth int
} }

View File

@@ -2833,6 +2833,12 @@ def prysm_deps():
sum = "h1:HxSrwun11U+LlmwpgM1kEqIqH90IT4N8auv/cD7QFJg=", sum = "h1:HxSrwun11U+LlmwpgM1kEqIqH90IT4N8auv/cD7QFJg=",
version = "v0.8.0", version = "v0.8.0",
) )
go_repository(
name = "com_github_r3labs_sse_v2",
importpath = "github.com/r3labs/sse/v2",
sum = "h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=",
version = "v2.10.0",
)
go_repository( go_repository(
name = "com_github_raulk_go_watchdog", name = "com_github_raulk_go_watchdog",
importpath = "github.com/raulk/go-watchdog", importpath = "github.com/raulk/go-watchdog",
@@ -4304,6 +4310,12 @@ def prysm_deps():
sum = "h1:stTHdEoWg1pQ8riaP5ROrjS6zy6wewH/Q2iwnLCQUXY=", sum = "h1:stTHdEoWg1pQ8riaP5ROrjS6zy6wewH/Q2iwnLCQUXY=",
version = "v1.0.0-20160220154919-db14e161995a", version = "v1.0.0-20160220154919-db14e161995a",
) )
go_repository(
name = "in_gopkg_cenkalti_backoff_v1",
importpath = "gopkg.in/cenkalti/backoff.v1",
sum = "h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=",
version = "v1.1.0",
)
go_repository( go_repository(
name = "in_gopkg_check_v1", name = "in_gopkg_check_v1",
importpath = "gopkg.in/check.v1", importpath = "gopkg.in/check.v1",

2
go.mod
View File

@@ -66,6 +66,7 @@ require (
github.com/prysmaticlabs/go-bitfield v0.0.0-20240328144219-a1caa50c3a1e github.com/prysmaticlabs/go-bitfield v0.0.0-20240328144219-a1caa50c3a1e
github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c github.com/prysmaticlabs/prombbolt v0.0.0-20210126082820-9b7adba6db7c
github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20230228205207-28762a7b9294 github.com/prysmaticlabs/protoc-gen-go-cast v0.0.0-20230228205207-28762a7b9294
github.com/r3labs/sse/v2 v2.10.0
github.com/rs/cors v1.7.0 github.com/rs/cors v1.7.0
github.com/schollz/progressbar/v3 v3.3.4 github.com/schollz/progressbar/v3 v3.3.4
github.com/sirupsen/logrus v1.9.0 github.com/sirupsen/logrus v1.9.0
@@ -259,6 +260,7 @@ require (
golang.org/x/term v0.23.0 // indirect golang.org/x/term v0.23.0 // indirect
golang.org/x/text v0.17.0 // indirect golang.org/x/text v0.17.0 // indirect
golang.org/x/time v0.5.0 // indirect golang.org/x/time v0.5.0 // indirect
gopkg.in/cenkalti/backoff.v1 v1.1.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect

5
go.sum
View File

@@ -914,6 +914,8 @@ github.com/quic-go/quic-go v0.46.0 h1:uuwLClEEyk1DNvchH8uCByQVjo3yKL9opKulExNDs7
github.com/quic-go/quic-go v0.46.0/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI= github.com/quic-go/quic-go v0.46.0/go.mod h1:1dLehS7TIR64+vxGR70GDcatWTOtMX2PUtnKsjbTurI=
github.com/quic-go/webtransport-go v0.8.0 h1:HxSrwun11U+LlmwpgM1kEqIqH90IT4N8auv/cD7QFJg= github.com/quic-go/webtransport-go v0.8.0 h1:HxSrwun11U+LlmwpgM1kEqIqH90IT4N8auv/cD7QFJg=
github.com/quic-go/webtransport-go v0.8.0/go.mod h1:N99tjprW432Ut5ONql/aUhSLT0YVSlwHohQsuac9WaM= github.com/quic-go/webtransport-go v0.8.0/go.mod h1:N99tjprW432Ut5ONql/aUhSLT0YVSlwHohQsuac9WaM=
github.com/r3labs/sse/v2 v2.10.0 h1:hFEkLLFY4LDifoHdiCN/LlGBAdVJYsANaLqNYa1l/v0=
github.com/r3labs/sse/v2 v2.10.0/go.mod h1:Igau6Whc+F17QUgML1fYe1VPZzTV6EMCnYktEmkNJ7I=
github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk= github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk=
github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU= github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
@@ -1223,6 +1225,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191116160921-f9c825593386/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -1607,6 +1610,8 @@ google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6h
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a/go.mod h1:KF9sEfUPAXdG8Oev9e99iLGnl2uJMjc5B+4y3O7x610= gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a/go.mod h1:KF9sEfUPAXdG8Oev9e99iLGnl2uJMjc5B+4y3O7x610=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=