Compare commits

..

1 Commits

Author SHA1 Message Date
Bastin
ecde60c67c fix gen-logs.sh bug 2026-02-04 22:22:54 +01:00
14 changed files with 118 additions and 558 deletions

View File

@@ -34,17 +34,6 @@ type Event struct {
Data []byte
}
// Send sends an event to the channel, respecting context cancellation.
// Returns true if the event was sent, false if the context was cancelled.
func Send(ctx context.Context, ch chan<- *Event, e *Event) bool {
select {
case ch <- e:
return true
case <-ctx.Done():
return false
}
}
// EventStream is responsible for subscribing to the Beacon API events endpoint
// and dispatching received events to subscribers.
type EventStream struct {
@@ -78,20 +67,19 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
fullUrl := h.host + "/eth/v1/events?topics=" + allTopics
req, err := http.NewRequestWithContext(h.ctx, http.MethodGet, fullUrl, nil)
if err != nil {
Send(h.ctx, eventsChannel, &Event{
eventsChannel <- &Event{
EventType: EventConnectionError,
Data: []byte(errors.Wrap(err, "failed to create HTTP request").Error()),
})
return
}
}
req.Header.Set("Accept", api.EventStreamMediaType)
req.Header.Set("Connection", api.KeepAlive)
resp, err := h.httpClient.Do(req)
if err != nil {
Send(h.ctx, eventsChannel, &Event{
eventsChannel <- &Event{
EventType: EventConnectionError,
Data: []byte(errors.Wrap(err, client.ErrConnectionIssue.Error()).Error()),
})
}
return
}
@@ -109,31 +97,42 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
// Iterate over lines of the event stream
for scanner.Scan() {
line := scanner.Text()
if line == "" {
// Empty line indicates the end of an event
if eventType != "" && data != "" {
if !Send(h.ctx, eventsChannel, &Event{EventType: eventType, Data: []byte(data)}) {
return
select {
case <-h.ctx.Done():
log.Info("Context canceled, stopping event stream")
close(eventsChannel)
return
default:
line := scanner.Text()
// Handle the event based on your specific format
if line == "" {
// Empty line indicates the end of an event
if eventType != "" && data != "" {
// Process the event when both eventType and data are set
eventsChannel <- &Event{EventType: eventType, Data: []byte(data)}
}
// Reset eventType and data for the next event
eventType, data = "", ""
continue
}
et, ok := strings.CutPrefix(line, "event: ")
if ok {
// Extract event type from the "event" field
eventType = et
}
d, ok := strings.CutPrefix(line, "data: ")
if ok {
// Extract data from the "data" field
data = d
}
eventType, data = "", ""
continue
}
et, ok := strings.CutPrefix(line, "event: ")
if ok {
eventType = et
}
d, ok := strings.CutPrefix(line, "data: ")
if ok {
data = d
}
}
if err := scanner.Err(); err != nil {
Send(h.ctx, eventsChannel, &Event{
eventsChannel <- &Event{
EventType: EventConnectionError,
Data: []byte(errors.Wrap(err, errors.Wrap(client.ErrConnectionIssue, "scanner failed").Error()).Error()),
})
}
}
}

View File

@@ -1,7 +1,6 @@
package event
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
@@ -95,86 +94,5 @@ func TestEventStreamRequestError(t *testing.T) {
if event.EventType != EventConnectionError {
t.Errorf("Expected event type %q, got %q", EventConnectionError, event.EventType)
}
}
func TestEventStream_ContextCancelDuringBlockedSend(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
require.Equal(t, true, ok)
// Send events continuously until the client disconnects.
for i := 0; ; i++ {
_, err := fmt.Fprintf(w, "event: head\ndata: data%d\n\n", i)
if err != nil {
return
}
flusher.Flush()
time.Sleep(10 * time.Millisecond)
}
})
server := httptest.NewServer(mux)
defer server.Close()
// Use an unbuffered channel so sends will block.
eventsChannel := make(chan *Event)
ctx, cancel := context.WithCancel(t.Context())
stream, err := NewEventStream(ctx, http.DefaultClient, server.URL, []string{"head"})
require.NoError(t, err)
done := make(chan struct{})
go func() {
stream.Subscribe(eventsChannel)
close(done)
}()
// Cancel the context while the goroutine is trying to send on the blocked channel.
cancel()
// The goroutine should exit promptly.
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("Subscribe goroutine did not exit after context cancel")
}
}
func TestEventStream_DoesNotCloseChannel(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
require.Equal(t, true, ok)
_, err := fmt.Fprintf(w, "event: head\ndata: data1\n\n")
if err != nil {
return
}
flusher.Flush()
// Close the connection after one event to end the scanner loop.
})
server := httptest.NewServer(mux)
defer server.Close()
eventsChannel := make(chan *Event, 10)
stream, err := NewEventStream(t.Context(), http.DefaultClient, server.URL, []string{"head"})
require.NoError(t, err)
done := make(chan struct{})
go func() {
stream.Subscribe(eventsChannel)
close(done)
}()
// Wait for Subscribe to finish.
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("Subscribe goroutine did not exit")
}
// Channel should still be open (not closed). Verify by sending to it.
select {
case eventsChannel <- &Event{EventType: "test"}:
// Successfully sent, channel is open.
default:
t.Fatal("Channel appears to be closed or blocked unexpectedly")
}
}

View File

@@ -0,0 +1,3 @@
### Fixed
- Fixed a bug where `cmd/beacon-chain/execution` was being ignored by `hack/gen-logs.sh` due to a `.gitignore` rule.

View File

@@ -1,5 +0,0 @@
package execution
import "github.com/sirupsen/logrus"
var log = logrus.WithField("prefix", "execution")

View File

@@ -31,6 +31,11 @@ EXCLUDED_PATH_PREFIXES=(
".vscode"
)
# Gitignore overrides: paths that should still be scanned even if ignored by VCS.
GITIGNORE_OVERRIDES=(
"cmd/beacon-chain/execution"
)
# The logrus import path
LOGRUS_IMPORT="github.com/sirupsen/logrus"
# ----------------------------
@@ -70,6 +75,14 @@ rg_args=(
-0 # NUL-delimited output
)
if [[ ${#GITIGNORE_OVERRIDES[@]} -gt 0 ]]; then
# Disable VCS ignores so overrides are honored.
rg_args+=( --no-ignore-vcs )
for ov in "${GITIGNORE_OVERRIDES[@]}"; do
rg_args+=( --glob "$ov/**" )
done
fi
for ex in "${EXCLUDED_PATH_PREFIXES[@]}"; do
rg_args+=( --glob "!$ex/**" )
done

View File

@@ -121,7 +121,6 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//api/client/event:go_default_library",
"//api/grpc:go_default_library",
"//api/server/structs:go_default_library",
"//async/event:go_default_library",

View File

@@ -290,10 +290,10 @@ func (c *beaconApiValidatorClient) StartEventStream(ctx context.Context, topics
client := &http.Client{} // event stream should not be subject to the same settings as other api calls
eventStream, err := event.NewEventStream(ctx, client, c.handler.Host(), topics)
if err != nil {
event.Send(ctx, eventsChannel, &event.Event{
eventsChannel <- &event.Event{
EventType: event.EventError,
Data: []byte(errors.Wrap(err, "failed to start event stream").Error()),
})
}
return
}
c.isEventStreamRunning = true

View File

@@ -294,10 +294,10 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
ctx, span := trace.StartSpan(ctx, "validator.gRPCClient.StartEventStream")
defer span.End()
if len(topics) == 0 {
eventClient.Send(ctx, eventsChannel, &eventClient.Event{
eventsChannel <- &eventClient.Event{
EventType: eventClient.EventError,
Data: []byte(errors.New("no topics were added").Error()),
})
}
return
}
// TODO(13563): ONLY WORKS WITH HEAD TOPIC.
@@ -308,10 +308,10 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
}
}
if !containsHead {
eventClient.Send(ctx, eventsChannel, &eventClient.Event{
eventsChannel <- &eventClient.Event{
EventType: eventClient.EventConnectionError,
Data: []byte(errors.Wrap(client.ErrConnectionIssue, "gRPC only supports the head topic, and head topic was not passed").Error()),
})
}
}
if containsHead && len(topics) > 1 {
log.Warn("gRPC only supports the head topic, other topics will be ignored")
@@ -319,44 +319,62 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
stream, err := c.getClient().StreamSlots(ctx, &ethpb.StreamSlotsRequest{VerifiedOnly: true})
if err != nil {
eventClient.Send(ctx, eventsChannel, &eventClient.Event{
eventsChannel <- &eventClient.Event{
EventType: eventClient.EventConnectionError,
Data: []byte(errors.Wrap(client.ErrConnectionIssue, err.Error()).Error()),
})
}
return
}
c.isEventStreamRunning = true
for {
res, err := stream.Recv()
if err != nil {
c.isEventStreamRunning = false
eventClient.Send(ctx, eventsChannel, &eventClient.Event{
EventType: eventClient.EventConnectionError,
Data: []byte(errors.Wrap(client.ErrConnectionIssue, err.Error()).Error()),
})
return
}
if res == nil {
continue
}
b, err := json.Marshal(structs.HeadEvent{
Slot: strconv.FormatUint(uint64(res.Slot), 10),
PreviousDutyDependentRoot: hexutil.Encode(res.PreviousDutyDependentRoot),
CurrentDutyDependentRoot: hexutil.Encode(res.CurrentDutyDependentRoot),
})
if err != nil {
eventClient.Send(ctx, eventsChannel, &eventClient.Event{
EventType: eventClient.EventError,
Data: []byte(errors.Wrap(err, "failed to marshal Head Event").Error()),
})
continue
}
if !eventClient.Send(ctx, eventsChannel, &eventClient.Event{
EventType: eventClient.EventHead,
Data: b,
}) {
select {
case <-ctx.Done():
log.Info("Context canceled, stopping event stream")
c.isEventStreamRunning = false
return
default:
if ctx.Err() != nil {
c.isEventStreamRunning = false
if errors.Is(ctx.Err(), context.Canceled) {
eventsChannel <- &eventClient.Event{
EventType: eventClient.EventConnectionError,
Data: []byte(errors.Wrap(client.ErrConnectionIssue, ctx.Err().Error()).Error()),
}
return
}
eventsChannel <- &eventClient.Event{
EventType: eventClient.EventError,
Data: []byte(ctx.Err().Error()),
}
return
}
res, err := stream.Recv()
if err != nil {
c.isEventStreamRunning = false
eventsChannel <- &eventClient.Event{
EventType: eventClient.EventConnectionError,
Data: []byte(errors.Wrap(client.ErrConnectionIssue, err.Error()).Error()),
}
return
}
if res == nil {
continue
}
b, err := json.Marshal(structs.HeadEvent{
Slot: strconv.FormatUint(uint64(res.Slot), 10),
PreviousDutyDependentRoot: hexutil.Encode(res.PreviousDutyDependentRoot),
CurrentDutyDependentRoot: hexutil.Encode(res.CurrentDutyDependentRoot),
})
if err != nil {
eventsChannel <- &eventClient.Event{
EventType: eventClient.EventError,
Data: []byte(errors.Wrap(err, "failed to marshal Head Event").Error()),
}
}
eventsChannel <- &eventClient.Event{
EventType: eventClient.EventHead,
Data: b,
}
}
}
}

View File

@@ -86,17 +86,10 @@ func (r *runner) run(ctx context.Context) {
cleanup := v.Done
defer cleanup()
v.SetTicker()
var wgEvents sync.WaitGroup
wgEvents.Go(func() {
r.processEvents(ctx)
})
for {
select {
case <-ctx.Done():
log.Info("Context canceled, stopping validator")
wgEvents.Wait()
//nolint:govet
return // Exit if context is canceled.
case slot := <-v.NextSlot():
@@ -155,24 +148,14 @@ func (r *runner) run(ctx context.Context) {
// performRoles calls span.End()
rolesCtx, _ := context.WithDeadline(ctx, deadline) //nolint:govet
performRoles(rolesCtx, allRoles, v, slot, &wg, span)
case currentKeys := <-v.AccountsChangedChan():
case e := <-v.EventsChan():
v.ProcessEvent(ctx, e)
case currentKeys := <-v.AccountsChangedChan(): // should be less of a priority than next slot
onAccountsChanged(ctx, v, currentKeys)
}
}
}
// processEvents handles events in a dedicated goroutine, decoupled from slot processing.
func (r *runner) processEvents(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case e := <-r.validator.EventsChan():
r.validator.ProcessEvent(ctx, e)
}
}
}
func onAccountsChanged(ctx context.Context, v iface.Validator, current [][48]byte) {
ctx, span := prysmTrace.StartSpan(ctx, "validator.accountsChanged")
defer span.End()

View File

@@ -11,7 +11,6 @@ import (
"time"
"github.com/OffchainLabs/go-bitfield"
eventClient "github.com/OffchainLabs/prysm/v7/api/client/event"
"github.com/OffchainLabs/prysm/v7/async/event"
"github.com/OffchainLabs/prysm/v7/cache/lru"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
@@ -562,77 +561,3 @@ func TestRunnerPushesProposerSettings_ValidContext(t *testing.T) {
runTest(t, timedCtx, v)
}
func TestEventProcessingDoesNotBlockSlotProcessing(t *testing.T) {
slotChan := make(chan primitives.Slot, 1)
v := &testutil.FakeValidator{
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
NextSlotRet: slotChan,
RolesAtRet: []iface.ValidatorRole{iface.RoleUnknown},
EventsChannel: make(chan *eventClient.Event, 64),
IsRegularDeadline: true,
}
require.NoError(t, v.SetProposerSettings(t.Context(), &proposer.Settings{}))
ctx, cancel := context.WithCancel(t.Context())
defer cancel()
// Send an event before starting the runner.
v.EventsChannel <- &eventClient.Event{
EventType: eventClient.EventHead,
Data: []byte(`{"slot":"1","previous_duty_dependent_root":"0x00","current_duty_dependent_root":"0x00"}`),
}
// Start the runner in a goroutine.
done := make(chan struct{})
go func() {
runTest(t, ctx, v)
close(done)
}()
// Give the runner time to start and spawn its event goroutine.
time.Sleep(100 * time.Millisecond)
// Now send a slot - it should be processed without being blocked by event processing.
slotChan <- 1
time.Sleep(200 * time.Millisecond)
cancel()
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("Runner did not exit after context cancel")
}
require.Equal(t, true, v.RoleAtCalled, "Expected RolesAt to be called when slot arrives")
}
func TestEventGoroutine_ExitsOnContextCancel(t *testing.T) {
v := &testutil.FakeValidator{
Km: &mockKeymanager{accountsChangedFeed: &event.Feed{}},
NextSlotRet: make(chan primitives.Slot),
EventsChannel: make(chan *eventClient.Event, 1),
}
ctx, cancel := context.WithCancel(t.Context())
done := make(chan struct{})
go func() {
runTest(t, ctx, v)
close(done)
}()
// Give the runner time to start.
time.Sleep(50 * time.Millisecond)
cancel()
select {
case <-done:
// Runner and event goroutine exited cleanly.
case <-time.After(5 * time.Second):
t.Fatal("Runner did not exit within timeout after context cancel")
}
require.Equal(t, true, v.DoneCalled, "Expected Done() to be called")
}

View File

@@ -231,7 +231,7 @@ func (v *ValidatorService) Start() {
distributed: v.distributed,
disableDutiesPolling: v.disableDutiesPolling,
accountsChangedChannel: make(chan [][fieldparams.BLSPubkeyLength]byte, 1),
eventsChannel: make(chan *eventClient.Event, 64),
eventsChannel: make(chan *eventClient.Event, 1),
}
hm := newHealthMonitor(v.ctx, v.cancel, v.maxHealthChecks, v.validator)

View File

@@ -60,7 +60,6 @@ type FakeValidator struct {
proposerSettings *proposer.Settings
Balances map[[48]byte]uint64
EventsChannel chan *event.Event
ProcessEventCalled chan *event.Event
ProposerSettingsErr error
Km keymanager.IKeymanager
graffiti string
@@ -321,11 +320,7 @@ func (*FakeValidator) StartEventStream(_ context.Context, _ []string) {
}
func (fv *FakeValidator) ProcessEvent(_ context.Context, e *event.Event) {
if fv.ProcessEventCalled != nil {
fv.ProcessEventCalled <- e
}
}
func (*FakeValidator) ProcessEvent(_ context.Context, _ *event.Event) {}
func (*FakeValidator) EventStreamIsRunning() bool {
return true

View File

@@ -1273,7 +1273,6 @@ func (v *validator) checkDependentRoots(ctx context.Context, head *structs.HeadE
func (v *validator) ProcessEvent(ctx context.Context, event *eventClient.Event) {
if event == nil || event.Data == nil {
log.Warn("Received empty event")
return
}
switch event.EventType {
case eventClient.EventError:
@@ -1281,73 +1280,28 @@ func (v *validator) ProcessEvent(ctx context.Context, event *eventClient.Event)
case eventClient.EventConnectionError:
log.WithError(errors.New(string(event.Data))).Error("Event stream interrupted")
case eventClient.EventHead:
latest := v.drainHeadEvents(ctx, event)
head, slot, err := v.parseHeadEvent(latest)
log.Debug("Received head event")
head := &structs.HeadEvent{}
if err := json.Unmarshal(event.Data, head); err != nil {
log.WithError(err).Error("Failed to unmarshal head Event into JSON")
}
uintSlot, err := strconv.ParseUint(head.Slot, 10, 64)
if err != nil {
log.WithError(err).Error("Failed to parse head event")
log.WithError(err).Error("Failed to parse slot")
return
}
v.setHighestSlot(slot)
v.setHighestSlot(primitives.Slot(uintSlot))
if !v.disableDutiesPolling {
if err := v.checkDependentRoots(ctx, head); err != nil {
log.WithError(err).Error("Failed to check dependent roots")
}
}
default:
// just keep going and log the error
log.WithField("type", event.EventType).WithField("data", string(event.Data)).Warn("Received an unknown event")
}
}
// parseHeadEvent unmarshals a head event and extracts the slot.
func (v *validator) parseHeadEvent(event *eventClient.Event) (*structs.HeadEvent, primitives.Slot, error) {
head := &structs.HeadEvent{}
if err := json.Unmarshal(event.Data, head); err != nil {
return nil, 0, err
}
uintSlot, err := strconv.ParseUint(head.Slot, 10, 64)
if err != nil {
return nil, 0, err
}
return head, primitives.Slot(uintSlot), nil
}
// drainHeadEvents reads any queued events from the channel, processing
// non-head events immediately and returning the latest head event.
// It also calls setHighestSlot for each intermediate head event.
func (v *validator) drainHeadEvents(ctx context.Context, first *eventClient.Event) *eventClient.Event {
latest := first
drained := 0
for {
select {
case next := <-v.eventsChannel:
if next == nil || next.Data == nil {
continue
}
switch next.EventType {
case eventClient.EventHead:
if _, slot, err := v.parseHeadEvent(next); err == nil {
v.setHighestSlot(slot)
}
latest = next
drained++
case eventClient.EventError:
log.Error(string(next.Data))
case eventClient.EventConnectionError:
log.WithError(errors.New(string(next.Data))).Error("Event stream interrupted")
default:
log.WithField("type", next.EventType).WithField("data", string(next.Data)).Warn("Received an unknown event")
}
case <-ctx.Done():
return latest
default:
if drained > 0 {
log.WithField("drained", drained).Info("Drained stale head events during reorg")
}
return latest
}
}
}
func (v *validator) EventStreamIsRunning() bool {
return v.validatorClient.EventStreamIsRunning()
}

View File

@@ -3,14 +3,11 @@ package client
import (
"bytes"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"math"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"sort"
@@ -19,7 +16,6 @@ import (
"testing"
"time"
eventClient "github.com/OffchainLabs/prysm/v7/api/client/event"
grpcutil "github.com/OffchainLabs/prysm/v7/api/grpc"
"github.com/OffchainLabs/prysm/v7/api/server/structs"
"github.com/OffchainLabs/prysm/v7/async/event"
@@ -3171,241 +3167,3 @@ func TestGetAttestationData_PostElectraConcurrentAccess(t *testing.T) {
require.DeepEqual(t, expectedData, results[i])
}
}
func makeHeadEvent(t *testing.T, slot uint64, prevRoot, currRoot string) *eventClient.Event {
t.Helper()
data, err := json.Marshal(structs.HeadEvent{
Slot: fmt.Sprintf("%d", slot),
PreviousDutyDependentRoot: prevRoot,
CurrentDutyDependentRoot: currRoot,
})
require.NoError(t, err)
return &eventClient.Event{
EventType: eventClient.EventHead,
Data: data,
}
}
func TestProcessEvent_NilEvent(t *testing.T) {
hook := logTest.NewGlobal()
v := &validator{
eventsChannel: make(chan *eventClient.Event, 64),
slotFeed: new(event.Feed),
disableDutiesPolling: true,
}
// Should not panic on nil event.
v.ProcessEvent(t.Context(), nil)
assert.LogsContain(t, hook, "Received empty event")
// Should not panic on event with nil data.
v.ProcessEvent(t.Context(), &eventClient.Event{EventType: eventClient.EventHead})
assert.LogsContain(t, hook, "Received empty event")
}
func TestProcessEvent_DrainsStaleHeadEvents(t *testing.T) {
zeroRoot := hexutil.Encode(params.BeaconConfig().ZeroHash[:])
eventsChannel := make(chan *eventClient.Event, 64)
v := &validator{
eventsChannel: eventsChannel,
slotFeed: new(event.Feed),
disableDutiesPolling: true,
}
// Queue 5 head events in the channel with increasing slots.
for i := 1; i <= 5; i++ {
eventsChannel <- makeHeadEvent(t, uint64(i), zeroRoot, zeroRoot)
}
// Process the first event; drainHeadEvents should consume the rest.
first := makeHeadEvent(t, 0, zeroRoot, zeroRoot)
v.ProcessEvent(t.Context(), first)
// The channel should now be empty.
select {
case e := <-eventsChannel:
t.Fatalf("Expected channel to be drained, but got event: %v", e)
default:
}
// Highest slot should be 5 (the last drained event).
v.highestValidSlotLock.Lock()
got := v.highestValidSlot
v.highestValidSlotLock.Unlock()
require.Equal(t, primitives.Slot(5), got)
}
func TestProcessEvent_MixedEventsDuringDrain(t *testing.T) {
hook := logTest.NewGlobal()
zeroRoot := hexutil.Encode(params.BeaconConfig().ZeroHash[:])
eventsChannel := make(chan *eventClient.Event, 64)
v := &validator{
eventsChannel: eventsChannel,
slotFeed: new(event.Feed),
disableDutiesPolling: true,
}
// Queue: head(1), error, head(3)
eventsChannel <- makeHeadEvent(t, 1, zeroRoot, zeroRoot)
eventsChannel <- &eventClient.Event{
EventType: eventClient.EventError,
Data: []byte("test error message"),
}
eventsChannel <- makeHeadEvent(t, 3, zeroRoot, zeroRoot)
first := makeHeadEvent(t, 0, zeroRoot, zeroRoot)
v.ProcessEvent(t.Context(), first)
// Error event should have been logged.
assert.LogsContain(t, hook, "test error message")
// Highest slot should be 3 (latest head event).
v.highestValidSlotLock.Lock()
got := v.highestValidSlot
v.highestValidSlotLock.Unlock()
require.Equal(t, primitives.Slot(3), got)
}
func TestEventFlood_NoDutyDelay(t *testing.T) {
zeroRoot := hexutil.Encode(params.BeaconConfig().ZeroHash[:])
eventsChannel := make(chan *eventClient.Event, 128)
v := &validator{
eventsChannel: eventsChannel,
slotFeed: new(event.Feed),
disableDutiesPolling: true,
}
// Flood the channel with 50 head events with increasing slots.
for i := 1; i <= 50; i++ {
eventsChannel <- makeHeadEvent(t, uint64(i), zeroRoot, zeroRoot)
}
// Process one event: the drain should consume all 50 queued events.
first := makeHeadEvent(t, 0, zeroRoot, zeroRoot)
v.ProcessEvent(t.Context(), first)
// Channel should be empty.
select {
case e := <-eventsChannel:
t.Fatalf("Expected channel to be drained, but got event: %v", e)
default:
}
// Highest slot should be 50.
v.highestValidSlotLock.Lock()
got := v.highestValidSlot
v.highestValidSlotLock.Unlock()
require.Equal(t, primitives.Slot(50), got)
}
func TestReorgBurst_DedupEffective(t *testing.T) {
eventsChannel := make(chan *eventClient.Event, 128)
v := &validator{
eventsChannel: eventsChannel,
slotFeed: new(event.Feed),
disableDutiesPolling: true,
}
// Simulate reorg burst: alternating dependent roots queued up.
rootA := hexutil.Encode(bytesutil.PadTo([]byte{0xAA}, 32))
rootB := hexutil.Encode(bytesutil.PadTo([]byte{0xBB}, 32))
for i := 1; i <= 10; i++ {
root := rootA
if i%2 == 0 {
root = rootB
}
eventsChannel <- makeHeadEvent(t, uint64(i), root, root)
}
first := makeHeadEvent(t, 0, rootA, rootA)
v.ProcessEvent(t.Context(), first)
// All events should be drained.
select {
case e := <-eventsChannel:
t.Fatalf("Expected channel to be drained, but got event: %v", e)
default:
}
// Highest slot should be 10 (latest event from the reorg burst).
v.highestValidSlotLock.Lock()
got := v.highestValidSlot
v.highestValidSlotLock.Unlock()
require.Equal(t, primitives.Slot(10), got)
}
func TestFullPipeline_CleanShutdown(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
require.Equal(t, true, ok)
for i := 0; ; i++ {
data := fmt.Sprintf(`{"slot":"%d","previous_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000","current_duty_dependent_root":"0x0000000000000000000000000000000000000000000000000000000000000000"}`, i)
_, err := fmt.Fprintf(w, "event: head\ndata: %s\n\n", data)
if err != nil {
return
}
flusher.Flush()
time.Sleep(50 * time.Millisecond)
}
})
server := httptest.NewServer(mux)
defer server.Close()
eventsChannel := make(chan *eventClient.Event, 64)
ctx, cancel := context.WithCancel(t.Context())
// Start SSE subscriber goroutine.
stream, err := eventClient.NewEventStream(ctx, http.DefaultClient, server.URL, []string{"head"})
require.NoError(t, err)
subscribeDone := make(chan struct{})
go func() {
stream.Subscribe(eventsChannel)
close(subscribeDone)
}()
// Start event processor goroutine.
v := &validator{
eventsChannel: eventsChannel,
slotFeed: new(event.Feed),
disableDutiesPolling: true,
}
processDone := make(chan struct{})
go func() {
for {
select {
case <-ctx.Done():
close(processDone)
return
case e := <-eventsChannel:
v.ProcessEvent(ctx, e)
}
}
}()
// Let the pipeline run for a bit to process some events.
time.Sleep(200 * time.Millisecond)
// Cancel context and verify all goroutines exit cleanly.
cancel()
select {
case <-subscribeDone:
case <-time.After(2 * time.Second):
t.Fatal("Subscribe goroutine did not exit within timeout")
}
select {
case <-processDone:
case <-time.After(2 * time.Second):
t.Fatal("Process goroutine did not exit within timeout")
}
// Verify some events were actually processed.
v.highestValidSlotLock.Lock()
got := v.highestValidSlot
v.highestValidSlotLock.Unlock()
require.NotEqual(t, primitives.Slot(0), got, "Expected some events to be processed")
}