mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-02-04 18:15:12 -05:00
Compare commits
2 Commits
fix-genlog
...
improve-ev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d256410582 | ||
|
|
ba0b57209b |
@@ -34,6 +34,17 @@ type Event struct {
|
||||
Data []byte
|
||||
}
|
||||
|
||||
// Send sends an event to the channel, respecting context cancellation.
|
||||
// Returns true if the event was sent, false if the context was cancelled.
|
||||
func Send(ctx context.Context, ch chan<- *Event, e *Event) bool {
|
||||
select {
|
||||
case ch <- e:
|
||||
return true
|
||||
case <-ctx.Done():
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// EventStream is responsible for subscribing to the Beacon API events endpoint
|
||||
// and dispatching received events to subscribers.
|
||||
type EventStream struct {
|
||||
@@ -67,19 +78,20 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
|
||||
fullUrl := h.host + "/eth/v1/events?topics=" + allTopics
|
||||
req, err := http.NewRequestWithContext(h.ctx, http.MethodGet, fullUrl, nil)
|
||||
if err != nil {
|
||||
eventsChannel <- &Event{
|
||||
Send(h.ctx, eventsChannel, &Event{
|
||||
EventType: EventConnectionError,
|
||||
Data: []byte(errors.Wrap(err, "failed to create HTTP request").Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
req.Header.Set("Accept", api.EventStreamMediaType)
|
||||
req.Header.Set("Connection", api.KeepAlive)
|
||||
resp, err := h.httpClient.Do(req)
|
||||
if err != nil {
|
||||
eventsChannel <- &Event{
|
||||
Send(h.ctx, eventsChannel, &Event{
|
||||
EventType: EventConnectionError,
|
||||
Data: []byte(errors.Wrap(err, client.ErrConnectionIssue.Error()).Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -97,42 +109,31 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
|
||||
|
||||
// Iterate over lines of the event stream
|
||||
for scanner.Scan() {
|
||||
select {
|
||||
case <-h.ctx.Done():
|
||||
log.Info("Context canceled, stopping event stream")
|
||||
close(eventsChannel)
|
||||
return
|
||||
default:
|
||||
line := scanner.Text()
|
||||
// Handle the event based on your specific format
|
||||
if line == "" {
|
||||
// Empty line indicates the end of an event
|
||||
if eventType != "" && data != "" {
|
||||
// Process the event when both eventType and data are set
|
||||
eventsChannel <- &Event{EventType: eventType, Data: []byte(data)}
|
||||
line := scanner.Text()
|
||||
if line == "" {
|
||||
// Empty line indicates the end of an event
|
||||
if eventType != "" && data != "" {
|
||||
if !Send(h.ctx, eventsChannel, &Event{EventType: eventType, Data: []byte(data)}) {
|
||||
return
|
||||
}
|
||||
|
||||
// Reset eventType and data for the next event
|
||||
eventType, data = "", ""
|
||||
continue
|
||||
}
|
||||
et, ok := strings.CutPrefix(line, "event: ")
|
||||
if ok {
|
||||
// Extract event type from the "event" field
|
||||
eventType = et
|
||||
}
|
||||
d, ok := strings.CutPrefix(line, "data: ")
|
||||
if ok {
|
||||
// Extract data from the "data" field
|
||||
data = d
|
||||
}
|
||||
eventType, data = "", ""
|
||||
continue
|
||||
}
|
||||
et, ok := strings.CutPrefix(line, "event: ")
|
||||
if ok {
|
||||
eventType = et
|
||||
}
|
||||
d, ok := strings.CutPrefix(line, "data: ")
|
||||
if ok {
|
||||
data = d
|
||||
}
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
eventsChannel <- &Event{
|
||||
Send(h.ctx, eventsChannel, &Event{
|
||||
EventType: EventConnectionError,
|
||||
Data: []byte(errors.Wrap(err, errors.Wrap(client.ErrConnectionIssue, "scanner failed").Error()).Error()),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@@ -94,5 +95,86 @@ func TestEventStreamRequestError(t *testing.T) {
|
||||
if event.EventType != EventConnectionError {
|
||||
t.Errorf("Expected event type %q, got %q", EventConnectionError, event.EventType)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestEventStream_ContextCancelDuringBlockedSend(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, r *http.Request) {
|
||||
flusher, ok := w.(http.Flusher)
|
||||
require.Equal(t, true, ok)
|
||||
// Send events continuously until the client disconnects.
|
||||
for i := 0; ; i++ {
|
||||
_, err := fmt.Fprintf(w, "event: head\ndata: data%d\n\n", i)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
flusher.Flush()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
})
|
||||
server := httptest.NewServer(mux)
|
||||
defer server.Close()
|
||||
|
||||
// Use an unbuffered channel so sends will block.
|
||||
eventsChannel := make(chan *Event)
|
||||
ctx, cancel := context.WithCancel(t.Context())
|
||||
stream, err := NewEventStream(ctx, http.DefaultClient, server.URL, []string{"head"})
|
||||
require.NoError(t, err)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
stream.Subscribe(eventsChannel)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Cancel the context while the goroutine is trying to send on the blocked channel.
|
||||
cancel()
|
||||
|
||||
// The goroutine should exit promptly.
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("Subscribe goroutine did not exit after context cancel")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventStream_DoesNotCloseChannel(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, r *http.Request) {
|
||||
flusher, ok := w.(http.Flusher)
|
||||
require.Equal(t, true, ok)
|
||||
_, err := fmt.Fprintf(w, "event: head\ndata: data1\n\n")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
flusher.Flush()
|
||||
// Close the connection after one event to end the scanner loop.
|
||||
})
|
||||
server := httptest.NewServer(mux)
|
||||
defer server.Close()
|
||||
|
||||
eventsChannel := make(chan *Event, 10)
|
||||
stream, err := NewEventStream(t.Context(), http.DefaultClient, server.URL, []string{"head"})
|
||||
require.NoError(t, err)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
stream.Subscribe(eventsChannel)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Wait for Subscribe to finish.
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("Subscribe goroutine did not exit")
|
||||
}
|
||||
|
||||
// Channel should still be open (not closed). Verify by sending to it.
|
||||
select {
|
||||
case eventsChannel <- &Event{EventType: "test"}:
|
||||
// Successfully sent, channel is open.
|
||||
default:
|
||||
t.Fatal("Channel appears to be closed or blocked unexpectedly")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,6 +121,7 @@ go_test(
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//api/client/event:go_default_library",
|
||||
"//api/grpc:go_default_library",
|
||||
"//api/server/structs:go_default_library",
|
||||
"//async/event:go_default_library",
|
||||
|
||||
@@ -290,10 +290,10 @@ func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context, topics
|
||||
client := &http.Client{} // event stream should not be subject to the same settings as other api calls
|
||||
eventStream, err := event.NewEventStream(ctx, client, c.handler.Host(), topics)
|
||||
if err != nil {
|
||||
eventsChannel <- &event.Event{
|
||||
event.Send(ctx, eventsChannel, &event.Event{
|
||||
EventType: event.EventError,
|
||||
Data: []byte(errors.Wrap(err, "failed to start event stream").Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
c.isEventStreamRunning = true
|
||||
|
||||
@@ -294,10 +294,10 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
||||
ctx, span := trace.StartSpan(ctx, "validator.gRPCClient.StartEventStream")
|
||||
defer span.End()
|
||||
if len(topics) == 0 {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.Send(ctx, eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventError,
|
||||
Data: []byte(errors.New("no topics were added").Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
// TODO(13563): ONLY WORKS WITH HEAD TOPIC.
|
||||
@@ -308,10 +308,10 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
||||
}
|
||||
}
|
||||
if !containsHead {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.Send(ctx, eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, "gRPC only supports the head topic, and head topic was not passed").Error()),
|
||||
}
|
||||
})
|
||||
}
|
||||
if containsHead && len(topics) > 1 {
|
||||
log.Warn("gRPC only supports the head topic, other topics will be ignored")
|
||||
@@ -319,62 +319,44 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
||||
|
||||
stream, err := c.getClient().StreamSlots(ctx, ðpb.StreamSlotsRequest{VerifiedOnly: true})
|
||||
if err != nil {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.Send(ctx, eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, err.Error()).Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
c.isEventStreamRunning = true
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info("Context canceled, stopping event stream")
|
||||
res, err := stream.Recv()
|
||||
if err != nil {
|
||||
c.isEventStreamRunning = false
|
||||
eventClient.Send(ctx, eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, err.Error()).Error()),
|
||||
})
|
||||
return
|
||||
}
|
||||
if res == nil {
|
||||
continue
|
||||
}
|
||||
b, err := json.Marshal(structs.HeadEvent{
|
||||
Slot: strconv.FormatUint(uint64(res.Slot), 10),
|
||||
PreviousDutyDependentRoot: hexutil.Encode(res.PreviousDutyDependentRoot),
|
||||
CurrentDutyDependentRoot: hexutil.Encode(res.CurrentDutyDependentRoot),
|
||||
})
|
||||
if err != nil {
|
||||
eventClient.Send(ctx, eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventError,
|
||||
Data: []byte(errors.Wrap(err, "failed to marshal Head Event").Error()),
|
||||
})
|
||||
continue
|
||||
}
|
||||
if !eventClient.Send(ctx, eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventHead,
|
||||
Data: b,
|
||||
}) {
|
||||
c.isEventStreamRunning = false
|
||||
return
|
||||
default:
|
||||
if ctx.Err() != nil {
|
||||
c.isEventStreamRunning = false
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, ctx.Err().Error()).Error()),
|
||||
}
|
||||
return
|
||||
}
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventError,
|
||||
Data: []byte(ctx.Err().Error()),
|
||||
}
|
||||
return
|
||||
}
|
||||
res, err := stream.Recv()
|
||||
if err != nil {
|
||||
c.isEventStreamRunning = false
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, err.Error()).Error()),
|
||||
}
|
||||
return
|
||||
}
|
||||
if res == nil {
|
||||
continue
|
||||
}
|
||||
b, err := json.Marshal(structs.HeadEvent{
|
||||
Slot: strconv.FormatUint(uint64(res.Slot), 10),
|
||||
PreviousDutyDependentRoot: hexutil.Encode(res.PreviousDutyDependentRoot),
|
||||
CurrentDutyDependentRoot: hexutil.Encode(res.CurrentDutyDependentRoot),
|
||||
})
|
||||
if err != nil {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventError,
|
||||
Data: []byte(errors.Wrap(err, "failed to marshal Head Event").Error()),
|
||||
}
|
||||
}
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventHead,
|
||||
Data: b,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -86,10 +86,17 @@ func (r *runner) run(ctx context.Context) {
|
||||
cleanup := v.Done
|
||||
defer cleanup()
|
||||
v.SetTicker()
|
||||
|
||||
var wgEvents sync.WaitGroup
|
||||
wgEvents.Go(func() {
|
||||
r.processEvents(ctx)
|
||||
})
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info("Context canceled, stopping validator")
|
||||
wgEvents.Wait()
|
||||
//nolint:govet
|
||||
return // Exit if context is canceled.
|
||||
case slot := <-v.NextSlot():
|
||||
@@ -148,14 +155,24 @@ func (r *runner) run(ctx context.Context) {
|
||||
// performRoles calls span.End()
|
||||
rolesCtx, _ := context.WithDeadline(ctx, deadline) //nolint:govet
|
||||
performRoles(rolesCtx, allRoles, v, slot, &wg, span)
|
||||
case e := <-v.EventsChan():
|
||||
v.ProcessEvent(ctx, e)
|
||||
case currentKeys := <-v.AccountsChangedChan(): // should be less of a priority than next slot
|
||||
case currentKeys := <-v.AccountsChangedChan():
|
||||
onAccountsChanged(ctx, v, currentKeys)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processEvents handles events in a dedicated goroutine, decoupled from slot processing.
|
||||
func (r *runner) processEvents(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case e := <-r.validator.EventsChan():
|
||||
r.validator.ProcessEvent(ctx, e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func onAccountsChanged(ctx context.Context, v iface.Validator, current [][48]byte) {
|
||||
ctx, span := prysmTrace.StartSpan(ctx, "validator.accountsChanged")
|
||||
defer span.End()
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/OffchainLabs/go-bitfield"
|
||||
eventClient "github.com/OffchainLabs/prysm/v7/api/client/event"
|
||||
"github.com/OffchainLabs/prysm/v7/async/event"
|
||||
"github.com/OffchainLabs/prysm/v7/cache/lru"
|
||||
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
|
||||
@@ -561,3 +562,77 @@ func TestRunnerPushesProposerSettings_ValidContext(t *testing.T) {
|
||||
|
||||
runTest(t, timedCtx, v)
|
||||
}
|
||||
|
||||
func TestEventProcessingDoesNotBlockSlotProcessing(t *testing.T) {
|
||||
slotChan := make(chan primitives.Slot, 1)
|
||||
v := &testutil.FakeValidator{
|
||||
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
|
||||
NextSlotRet: slotChan,
|
||||
RolesAtRet: []iface.ValidatorRole{iface.RoleUnknown},
|
||||
EventsChannel: make(chan *eventClient.Event, 64),
|
||||
IsRegularDeadline: true,
|
||||
}
|
||||
require.NoError(t, v.SetProposerSettings(t.Context(), &proposer.Settings{}))
|
||||
|
||||
ctx, cancel := context.WithCancel(t.Context())
|
||||
defer cancel()
|
||||
|
||||
// Send an event before starting the runner.
|
||||
v.EventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventHead,
|
||||
Data: []byte(`{"slot":"1","previous_duty_dependent_root":"0x00","current_duty_dependent_root":"0x00"}`),
|
||||
}
|
||||
|
||||
// Start the runner in a goroutine.
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
runTest(t, ctx, v)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Give the runner time to start and spawn its event goroutine.
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Now send a slot - it should be processed without being blocked by event processing.
|
||||
slotChan <- 1
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
cancel()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("Runner did not exit after context cancel")
|
||||
}
|
||||
|
||||
require.Equal(t, true, v.RoleAtCalled, "Expected RolesAt to be called when slot arrives")
|
||||
}
|
||||
|
||||
func TestEventGoroutine_ExitsOnContextCancel(t *testing.T) {
|
||||
v := &testutil.FakeValidator{
|
||||
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
|
||||
NextSlotRet: make(chan primitives.Slot),
|
||||
EventsChannel: make(chan *eventClient.Event, 1),
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(t.Context())
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
runTest(t, ctx, v)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
// Give the runner time to start.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
// Runner and event goroutine exited cleanly.
|
||||
case <-time.After(5 * time.Second):
|
||||
t.Fatal("Runner did not exit within timeout after context cancel")
|
||||
}
|
||||
|
||||
require.Equal(t, true, v.DoneCalled, "Expected Done() to be called")
|
||||
}
|
||||
|
||||
@@ -231,7 +231,7 @@ func (v *ValidatorService) Start() {
|
||||
distributed: v.distributed,
|
||||
disableDutiesPolling: v.disableDutiesPolling,
|
||||
accountsChangedChannel: make(chan [][fieldparams.BLSPubkeyLength]byte, 1),
|
||||
eventsChannel: make(chan *eventClient.Event, 1),
|
||||
eventsChannel: make(chan *eventClient.Event, 64),
|
||||
}
|
||||
|
||||
hm := newHealthMonitor(v.ctx, v.cancel, v.maxHealthChecks, v.validator)
|
||||
|
||||
@@ -60,6 +60,7 @@ type FakeValidator struct {
|
||||
proposerSettings *proposer.Settings
|
||||
Balances map[[48]byte]uint64
|
||||
EventsChannel chan *event.Event
|
||||
ProcessEventCalled chan *event.Event
|
||||
ProposerSettingsErr error
|
||||
Km keymanager.IKeymanager
|
||||
graffiti string
|
||||
@@ -320,7 +321,11 @@ func (*FakeValidator) StartEventStream(_ context.Context, _ []string) {
|
||||
|
||||
}
|
||||
|
||||
func (*FakeValidator) ProcessEvent(_ context.Context, _ *event.Event) {}
|
||||
func (fv *FakeValidator) ProcessEvent(_ context.Context, e *event.Event) {
|
||||
if fv.ProcessEventCalled != nil {
|
||||
fv.ProcessEventCalled <- e
|
||||
}
|
||||
}
|
||||
|
||||
func (*FakeValidator) EventStreamIsRunning() bool {
|
||||
return true
|
||||
|
||||
@@ -1273,6 +1273,7 @@ func (v *validator) checkDependentRoots(ctx context.Context, head *structs.HeadE
|
||||
func (v *validator) ProcessEvent(ctx context.Context, event *eventClient.Event) {
|
||||
if event == nil || event.Data == nil {
|
||||
log.Warn("Received empty event")
|
||||
return
|
||||
}
|
||||
switch event.EventType {
|
||||
case eventClient.EventError:
|
||||
@@ -1280,28 +1281,73 @@ func (v *validator) ProcessEvent(ctx context.Context, event *eventClient.Event)
|
||||
case eventClient.EventConnectionError:
|
||||
log.WithError(errors.New(string(event.Data))).Error("Event stream interrupted")
|
||||
case eventClient.EventHead:
|
||||
log.Debug("Received head event")
|
||||
head := &structs.HeadEvent{}
|
||||
if err := json.Unmarshal(event.Data, head); err != nil {
|
||||
log.WithError(err).Error("Failed to unmarshal head Event into JSON")
|
||||
}
|
||||
uintSlot, err := strconv.ParseUint(head.Slot, 10, 64)
|
||||
latest := v.drainHeadEvents(ctx, event)
|
||||
head, slot, err := v.parseHeadEvent(latest)
|
||||
if err != nil {
|
||||
log.WithError(err).Error("Failed to parse slot")
|
||||
log.WithError(err).Error("Failed to parse head event")
|
||||
return
|
||||
}
|
||||
v.setHighestSlot(primitives.Slot(uintSlot))
|
||||
v.setHighestSlot(slot)
|
||||
if !v.disableDutiesPolling {
|
||||
if err := v.checkDependentRoots(ctx, head); err != nil {
|
||||
log.WithError(err).Error("Failed to check dependent roots")
|
||||
}
|
||||
}
|
||||
default:
|
||||
// just keep going and log the error
|
||||
log.WithField("type", event.EventType).WithField("data", string(event.Data)).Warn("Received an unknown event")
|
||||
}
|
||||
}
|
||||
|
||||
// parseHeadEvent unmarshals a head event and extracts the slot.
|
||||
func (v *validator) parseHeadEvent(event *eventClient.Event) (*structs.HeadEvent, primitives.Slot, error) {
|
||||
head := &structs.HeadEvent{}
|
||||
if err := json.Unmarshal(event.Data, head); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
uintSlot, err := strconv.ParseUint(head.Slot, 10, 64)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return head, primitives.Slot(uintSlot), nil
|
||||
}
|
||||
|
||||
// drainHeadEvents reads any queued events from the channel, processing
|
||||
// non-head events immediately and returning the latest head event.
|
||||
// It also calls setHighestSlot for each intermediate head event.
|
||||
func (v *validator) drainHeadEvents(ctx context.Context, first *eventClient.Event) *eventClient.Event {
|
||||
latest := first
|
||||
drained := 0
|
||||
for {
|
||||
select {
|
||||
case next := <-v.eventsChannel:
|
||||
if next == nil || next.Data == nil {
|
||||
continue
|
||||
}
|
||||
switch next.EventType {
|
||||
case eventClient.EventHead:
|
||||
if _, slot, err := v.parseHeadEvent(next); err == nil {
|
||||
v.setHighestSlot(slot)
|
||||
}
|
||||
latest = next
|
||||
drained++
|
||||
case eventClient.EventError:
|
||||
log.Error(string(next.Data))
|
||||
case eventClient.EventConnectionError:
|
||||
log.WithError(errors.New(string(next.Data))).Error("Event stream interrupted")
|
||||
default:
|
||||
log.WithField("type", next.EventType).WithField("data", string(next.Data)).Warn("Received an unknown event")
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return latest
|
||||
default:
|
||||
if drained > 0 {
|
||||
log.WithField("drained", drained).Info("Drained stale head events during reorg")
|
||||
}
|
||||
return latest
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (v *validator) EventStreamIsRunning() bool {
|
||||
return v.validatorClient.EventStreamIsRunning()
|
||||
}
|
||||
|
||||
@@ -3,11 +3,14 @@ package client
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
@@ -16,6 +19,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
eventClient "github.com/OffchainLabs/prysm/v7/api/client/event"
|
||||
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
|
||||
"github.com/OffchainLabs/prysm/v7/api/server/structs"
|
||||
"github.com/OffchainLabs/prysm/v7/async/event"
|
||||
@@ -3167,3 +3171,241 @@ func TestGetAttestationData_PostElectraConcurrentAccess(t *testing.T) {
|
||||
require.DeepEqual(t, expectedData, results[i])
|
||||
}
|
||||
}
|
||||
|
||||
func makeHeadEvent(t *testing.T, slot uint64, prevRoot, currRoot string) *eventClient.Event {
|
||||
t.Helper()
|
||||
data, err := json.Marshal(structs.HeadEvent{
|
||||
Slot: fmt.Sprintf("%d", slot),
|
||||
PreviousDutyDependentRoot: prevRoot,
|
||||
CurrentDutyDependentRoot: currRoot,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
return &eventClient.Event{
|
||||
EventType: eventClient.EventHead,
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessEvent_NilEvent(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
v := &validator{
|
||||
eventsChannel: make(chan *eventClient.Event, 64),
|
||||
slotFeed: new(event.Feed),
|
||||
disableDutiesPolling: true,
|
||||
}
|
||||
|
||||
// Should not panic on nil event.
|
||||
v.ProcessEvent(t.Context(), nil)
|
||||
assert.LogsContain(t, hook, "Received empty event")
|
||||
|
||||
// Should not panic on event with nil data.
|
||||
v.ProcessEvent(t.Context(), &eventClient.Event{EventType: eventClient.EventHead})
|
||||
assert.LogsContain(t, hook, "Received empty event")
|
||||
}
|
||||
|
||||
func TestProcessEvent_DrainsStaleHeadEvents(t *testing.T) {
|
||||
zeroRoot := hexutil.Encode(params.BeaconConfig().ZeroHash[:])
|
||||
eventsChannel := make(chan *eventClient.Event, 64)
|
||||
v := &validator{
|
||||
eventsChannel: eventsChannel,
|
||||
slotFeed: new(event.Feed),
|
||||
disableDutiesPolling: true,
|
||||
}
|
||||
|
||||
// Queue 5 head events in the channel with increasing slots.
|
||||
for i := 1; i <= 5; i++ {
|
||||
eventsChannel <- makeHeadEvent(t, uint64(i), zeroRoot, zeroRoot)
|
||||
}
|
||||
|
||||
// Process the first event; drainHeadEvents should consume the rest.
|
||||
first := makeHeadEvent(t, 0, zeroRoot, zeroRoot)
|
||||
v.ProcessEvent(t.Context(), first)
|
||||
|
||||
// The channel should now be empty.
|
||||
select {
|
||||
case e := <-eventsChannel:
|
||||
t.Fatalf("Expected channel to be drained, but got event: %v", e)
|
||||
default:
|
||||
}
|
||||
|
||||
// Highest slot should be 5 (the last drained event).
|
||||
v.highestValidSlotLock.Lock()
|
||||
got := v.highestValidSlot
|
||||
v.highestValidSlotLock.Unlock()
|
||||
require.Equal(t, primitives.Slot(5), got)
|
||||
}
|
||||
|
||||
func TestProcessEvent_MixedEventsDuringDrain(t *testing.T) {
|
||||
hook := logTest.NewGlobal()
|
||||
zeroRoot := hexutil.Encode(params.BeaconConfig().ZeroHash[:])
|
||||
eventsChannel := make(chan *eventClient.Event, 64)
|
||||
v := &validator{
|
||||
eventsChannel: eventsChannel,
|
||||
slotFeed: new(event.Feed),
|
||||
disableDutiesPolling: true,
|
||||
}
|
||||
|
||||
// Queue: head(1), error, head(3)
|
||||
eventsChannel <- makeHeadEvent(t, 1, zeroRoot, zeroRoot)
|
||||
eventsChannel <- &eventClient.Event{
|
||||
EventType: eventClient.EventError,
|
||||
Data: []byte("test error message"),
|
||||
}
|
||||
eventsChannel <- makeHeadEvent(t, 3, zeroRoot, zeroRoot)
|
||||
|
||||
first := makeHeadEvent(t, 0, zeroRoot, zeroRoot)
|
||||
v.ProcessEvent(t.Context(), first)
|
||||
|
||||
// Error event should have been logged.
|
||||
assert.LogsContain(t, hook, "test error message")
|
||||
|
||||
// Highest slot should be 3 (latest head event).
|
||||
v.highestValidSlotLock.Lock()
|
||||
got := v.highestValidSlot
|
||||
v.highestValidSlotLock.Unlock()
|
||||
require.Equal(t, primitives.Slot(3), got)
|
||||
}
|
||||
|
||||
func TestEventFlood_NoDutyDelay(t *testing.T) {
|
||||
zeroRoot := hexutil.Encode(params.BeaconConfig().ZeroHash[:])
|
||||
eventsChannel := make(chan *eventClient.Event, 128)
|
||||
v := &validator{
|
||||
eventsChannel: eventsChannel,
|
||||
slotFeed: new(event.Feed),
|
||||
disableDutiesPolling: true,
|
||||
}
|
||||
|
||||
// Flood the channel with 50 head events with increasing slots.
|
||||
for i := 1; i <= 50; i++ {
|
||||
eventsChannel <- makeHeadEvent(t, uint64(i), zeroRoot, zeroRoot)
|
||||
}
|
||||
|
||||
// Process one event: the drain should consume all 50 queued events.
|
||||
first := makeHeadEvent(t, 0, zeroRoot, zeroRoot)
|
||||
v.ProcessEvent(t.Context(), first)
|
||||
|
||||
// Channel should be empty.
|
||||
select {
|
||||
case e := <-eventsChannel:
|
||||
t.Fatalf("Expected channel to be drained, but got event: %v", e)
|
||||
default:
|
||||
}
|
||||
|
||||
// Highest slot should be 50.
|
||||
v.highestValidSlotLock.Lock()
|
||||
got := v.highestValidSlot
|
||||
v.highestValidSlotLock.Unlock()
|
||||
require.Equal(t, primitives.Slot(50), got)
|
||||
}
|
||||
|
||||
func TestReorgBurst_DedupEffective(t *testing.T) {
|
||||
eventsChannel := make(chan *eventClient.Event, 128)
|
||||
v := &validator{
|
||||
eventsChannel: eventsChannel,
|
||||
slotFeed: new(event.Feed),
|
||||
disableDutiesPolling: true,
|
||||
}
|
||||
|
||||
// Simulate reorg burst: alternating dependent roots queued up.
|
||||
rootA := hexutil.Encode(bytesutil.PadTo([]byte{0xAA}, 32))
|
||||
rootB := hexutil.Encode(bytesutil.PadTo([]byte{0xBB}, 32))
|
||||
for i := 1; i <= 10; i++ {
|
||||
root := rootA
|
||||
if i%2 == 0 {
|
||||
root = rootB
|
||||
}
|
||||
eventsChannel <- makeHeadEvent(t, uint64(i), root, root)
|
||||
}
|
||||
|
||||
first := makeHeadEvent(t, 0, rootA, rootA)
|
||||
v.ProcessEvent(t.Context(), first)
|
||||
|
||||
// All events should be drained.
|
||||
select {
|
||||
case e := <-eventsChannel:
|
||||
t.Fatalf("Expected channel to be drained, but got event: %v", e)
|
||||
default:
|
||||
}
|
||||
|
||||
// Highest slot should be 10 (latest event from the reorg burst).
|
||||
v.highestValidSlotLock.Lock()
|
||||
got := v.highestValidSlot
|
||||
v.highestValidSlotLock.Unlock()
|
||||
require.Equal(t, primitives.Slot(10), got)
|
||||
}
|
||||
|
||||
func TestFullPipeline_CleanShutdown(t *testing.T) {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, r *http.Request) {
|
||||
flusher, ok := w.(http.Flusher)
|
||||
require.Equal(t, true, ok)
|
||||
for i := 0; ; i++ {
|
||||
data := fmt.Sprintf(`{"slot":"%d","previous_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","current_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000"}`, i)
|
||||
_, err := fmt.Fprintf(w, "event: head\ndata: %s\n\n", data)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
flusher.Flush()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
}
|
||||
})
|
||||
server := httptest.NewServer(mux)
|
||||
defer server.Close()
|
||||
|
||||
eventsChannel := make(chan *eventClient.Event, 64)
|
||||
ctx, cancel := context.WithCancel(t.Context())
|
||||
|
||||
// Start SSE subscriber goroutine.
|
||||
stream, err := eventClient.NewEventStream(ctx, http.DefaultClient, server.URL, []string{"head"})
|
||||
require.NoError(t, err)
|
||||
|
||||
subscribeDone := make(chan struct{})
|
||||
go func() {
|
||||
stream.Subscribe(eventsChannel)
|
||||
close(subscribeDone)
|
||||
}()
|
||||
|
||||
// Start event processor goroutine.
|
||||
v := &validator{
|
||||
eventsChannel: eventsChannel,
|
||||
slotFeed: new(event.Feed),
|
||||
disableDutiesPolling: true,
|
||||
}
|
||||
|
||||
processDone := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
close(processDone)
|
||||
return
|
||||
case e := <-eventsChannel:
|
||||
v.ProcessEvent(ctx, e)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Let the pipeline run for a bit to process some events.
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
|
||||
// Cancel context and verify all goroutines exit cleanly.
|
||||
cancel()
|
||||
|
||||
select {
|
||||
case <-subscribeDone:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Subscribe goroutine did not exit within timeout")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-processDone:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("Process goroutine did not exit within timeout")
|
||||
}
|
||||
|
||||
// Verify some events were actually processed.
|
||||
v.highestValidSlotLock.Lock()
|
||||
got := v.highestValidSlot
|
||||
v.highestValidSlotLock.Unlock()
|
||||
require.NotEqual(t, primitives.Slot(0), got, "Expected some events to be processed")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user