mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-30 23:58:23 -05:00
Compare commits
2 Commits
gRPC-fallb
...
event-stre
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cde4f3b009 | ||
|
|
a7fdd11777 |
@@ -34,6 +34,18 @@ type Event struct {
|
||||
Data []byte
|
||||
}
|
||||
|
||||
// PublishEvent enqueues an event without blocking the producer. If the channel is full,
|
||||
// the event is dropped since only the most recent heads are relevant.
|
||||
func PublishEvent(eventsChannel chan<- *Event, event *Event) {
|
||||
if eventsChannel == nil || event == nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case eventsChannel <- event:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// EventStream is responsible for subscribing to the Beacon API events endpoint
|
||||
// and dispatching received events to subscribers.
|
||||
type EventStream struct {
|
||||
@@ -67,19 +79,20 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
|
||||
fullUrl := h.host + "/eth/v1/events?topics=" + allTopics
|
||||
req, err := http.NewRequestWithContext(h.ctx, http.MethodGet, fullUrl, nil)
|
||||
if err != nil {
|
||||
eventsChannel <- &Event{
|
||||
PublishEvent(eventsChannel, &Event{
|
||||
EventType: EventConnectionError,
|
||||
Data: []byte(errors.Wrap(err, "failed to create HTTP request").Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
req.Header.Set("Accept", api.EventStreamMediaType)
|
||||
req.Header.Set("Connection", api.KeepAlive)
|
||||
resp, err := h.httpClient.Do(req)
|
||||
if err != nil {
|
||||
eventsChannel <- &Event{
|
||||
PublishEvent(eventsChannel, &Event{
|
||||
EventType: EventConnectionError,
|
||||
Data: []byte(errors.Wrap(err, client.ErrConnectionIssue.Error()).Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -100,7 +113,6 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
|
||||
select {
|
||||
case <-h.ctx.Done():
|
||||
log.Info("Context canceled, stopping event stream")
|
||||
close(eventsChannel)
|
||||
return
|
||||
default:
|
||||
line := scanner.Text()
|
||||
@@ -109,7 +121,7 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
|
||||
// Empty line indicates the end of an event
|
||||
if eventType != "" && data != "" {
|
||||
// Process the event when both eventType and data are set
|
||||
eventsChannel <- &Event{EventType: eventType, Data: []byte(data)}
|
||||
PublishEvent(eventsChannel, &Event{EventType: eventType, Data: []byte(data)})
|
||||
}
|
||||
|
||||
// Reset eventType and data for the next event
|
||||
@@ -130,9 +142,9 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
eventsChannel <- &Event{
|
||||
PublishEvent(eventsChannel, &Event{
|
||||
EventType: EventConnectionError,
|
||||
Data: []byte(errors.Wrap(err, errors.Wrap(client.ErrConnectionIssue, "scanner failed").Error()).Error()),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ func TestEventStream(t *testing.T) {
|
||||
defer server.Close()
|
||||
|
||||
topics := []string{"head"}
|
||||
eventsChannel := make(chan *Event, 1)
|
||||
eventsChannel := make(chan *Event, 4)
|
||||
stream, err := NewEventStream(t.Context(), http.DefaultClient, server.URL, topics)
|
||||
require.NoError(t, err)
|
||||
go stream.Subscribe(eventsChannel)
|
||||
@@ -80,7 +80,7 @@ func TestEventStream(t *testing.T) {
|
||||
|
||||
func TestEventStreamRequestError(t *testing.T) {
|
||||
topics := []string{"head"}
|
||||
eventsChannel := make(chan *Event, 1)
|
||||
eventsChannel := make(chan *Event, 4)
|
||||
ctx := t.Context()
|
||||
|
||||
// use valid url that will result in failed request with nil body
|
||||
|
||||
@@ -114,17 +114,32 @@ func payloadCommittee(ctx context.Context, st state.ReadOnlyBeaconState, slot pr
|
||||
}
|
||||
|
||||
committeesPerSlot := helpers.SlotCommitteeCount(activeCount)
|
||||
out := make([]primitives.ValidatorIndex, 0, activeCount/uint64(params.BeaconConfig().SlotsPerEpoch))
|
||||
|
||||
for i := primitives.CommitteeIndex(0); i < primitives.CommitteeIndex(committeesPerSlot); i++ {
|
||||
committee, err := helpers.BeaconCommitteeFromState(ctx, st, slot, i)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to get beacon committee %d", i)
|
||||
selected := make([]primitives.ValidatorIndex, 0, fieldparams.PTCSize)
|
||||
var i uint64
|
||||
for uint64(len(selected)) < fieldparams.PTCSize {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
for committeeIndex := primitives.CommitteeIndex(0); committeeIndex < primitives.CommitteeIndex(committeesPerSlot); committeeIndex++ {
|
||||
if uint64(len(selected)) >= fieldparams.PTCSize {
|
||||
break
|
||||
}
|
||||
|
||||
committee, err := helpers.BeaconCommitteeFromState(ctx, st, slot, committeeIndex)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to get beacon committee %d", committeeIndex)
|
||||
}
|
||||
|
||||
selected, i, err = selectByBalanceFill(ctx, st, committee, seed, selected, i)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to sample beacon committee %d", committeeIndex)
|
||||
}
|
||||
}
|
||||
out = append(out, committee...)
|
||||
}
|
||||
|
||||
return selectByBalance(ctx, st, out, seed, fieldparams.PTCSize)
|
||||
return selected, nil
|
||||
}
|
||||
|
||||
// ptcSeed computes the seed for the payload timeliness committee.
|
||||
@@ -148,33 +163,39 @@ func ptcSeed(st state.ReadOnlyBeaconState, epoch primitives.Epoch, slot primitiv
|
||||
// if compute_balance_weighted_acceptance(state, indices[next], seed, i):
|
||||
// selected.append(indices[next])
|
||||
// i += 1
|
||||
func selectByBalance(ctx context.Context, st state.ReadOnlyBeaconState, candidates []primitives.ValidatorIndex, seed [32]byte, count uint64) ([]primitives.ValidatorIndex, error) {
|
||||
if len(candidates) == 0 {
|
||||
return nil, errors.New("no candidates for balance weighted selection")
|
||||
}
|
||||
|
||||
func selectByBalanceFill(
|
||||
ctx context.Context,
|
||||
st state.ReadOnlyBeaconState,
|
||||
candidates []primitives.ValidatorIndex,
|
||||
seed [32]byte,
|
||||
selected []primitives.ValidatorIndex,
|
||||
i uint64,
|
||||
) ([]primitives.ValidatorIndex, uint64, error) {
|
||||
hashFunc := hash.CustomSHA256Hasher()
|
||||
// Pre-allocate buffer for hash input: seed (32 bytes) + round counter (8 bytes).
|
||||
var buf [40]byte
|
||||
copy(buf[:], seed[:])
|
||||
maxBalance := params.BeaconConfig().MaxEffectiveBalanceElectra
|
||||
|
||||
selected := make([]primitives.ValidatorIndex, 0, count)
|
||||
total := uint64(len(candidates))
|
||||
for i := uint64(0); uint64(len(selected)) < count; i++ {
|
||||
for _, idx := range candidates {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
return nil, i, ctx.Err()
|
||||
}
|
||||
idx := candidates[i%total]
|
||||
|
||||
ok, err := acceptByBalance(st, idx, buf[:], hashFunc, maxBalance, i)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, i, err
|
||||
}
|
||||
if ok {
|
||||
selected = append(selected, idx)
|
||||
}
|
||||
if uint64(len(selected)) == fieldparams.PTCSize {
|
||||
break
|
||||
}
|
||||
i++
|
||||
}
|
||||
return selected, nil
|
||||
|
||||
return selected, i, nil
|
||||
}
|
||||
|
||||
// acceptByBalance determines if a validator is accepted based on its effective balance.
|
||||
|
||||
2
changelog/terencechain_gloas-ptc-sampling.md
Normal file
2
changelog/terencechain_gloas-ptc-sampling.md
Normal file
@@ -0,0 +1,2 @@
|
||||
### Changed
|
||||
- Sample PTC per committee to reduce allocations.
|
||||
@@ -285,10 +285,10 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
||||
ctx, span := trace.StartSpan(ctx, "validator.gRPCClient.StartEventStream")
|
||||
defer span.End()
|
||||
if len(topics) == 0 {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventError,
|
||||
Data: []byte(errors.New("no topics were added").Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
// TODO(13563): ONLY WORKS WITH HEAD TOPIC.
|
||||
@@ -299,10 +299,10 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
||||
}
|
||||
}
|
||||
if !containsHead {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, "gRPC only supports the head topic, and head topic was not passed").Error()),
|
||||
}
|
||||
})
|
||||
}
|
||||
if containsHead && len(topics) > 1 {
|
||||
log.Warn("gRPC only supports the head topic, other topics will be ignored")
|
||||
@@ -310,10 +310,10 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
||||
|
||||
stream, err := c.beaconNodeValidatorClient.StreamSlots(ctx, ðpb.StreamSlotsRequest{VerifiedOnly: true})
|
||||
if err != nil {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, err.Error()).Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
c.isEventStreamRunning = true
|
||||
@@ -327,25 +327,25 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
||||
if ctx.Err() != nil {
|
||||
c.isEventStreamRunning = false
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, ctx.Err().Error()).Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventError,
|
||||
Data: []byte(ctx.Err().Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
res, err := stream.Recv()
|
||||
if err != nil {
|
||||
c.isEventStreamRunning = false
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, err.Error()).Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
if res == nil {
|
||||
@@ -357,15 +357,15 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
||||
CurrentDutyDependentRoot: hexutil.Encode(res.CurrentDutyDependentRoot),
|
||||
})
|
||||
if err != nil {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventError,
|
||||
Data: []byte(errors.Wrap(err, "failed to marshal Head Event").Error()),
|
||||
}
|
||||
})
|
||||
}
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventHead,
|
||||
Data: b,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -223,7 +223,7 @@ func TestStartEventStream(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
eventsChannel := make(chan *eventClient.Event, 1) // Buffer to prevent blocking
|
||||
eventsChannel := make(chan *eventClient.Event, 4) // Buffer to prevent blocking
|
||||
tc.prepare() // Setup mock expectations
|
||||
|
||||
go grpcClient.StartEventStream(ctx, tc.topics, eventsChannel)
|
||||
|
||||
@@ -441,7 +441,6 @@ func TestRunnerPushesProposerSettings_ValidContext(t *testing.T) {
|
||||
defer assertValidContext(t, timedCtx, ctx)
|
||||
delay(t)
|
||||
})
|
||||
vcm.EXPECT().EventStreamIsRunning().Return(true).AnyTimes().Do(func() { delay(t) })
|
||||
vcm.EXPECT().SubmitValidatorRegistrations(liveCtx, gomock.Any()).Do(func(ctx context.Context, _ any) {
|
||||
defer assertValidContext(t, timedCtx, ctx) // This is the specific regression test assertion for PR 15369.
|
||||
delay(t)
|
||||
|
||||
@@ -66,6 +66,8 @@ type ValidatorService struct {
|
||||
closeClientFunc func() // validator client stop function is used here
|
||||
}
|
||||
|
||||
const eventChannelBufferSize = 32
|
||||
|
||||
// Config for the validator service.
|
||||
type Config struct {
|
||||
Validator iface.Validator
|
||||
@@ -234,7 +236,7 @@ func (v *ValidatorService) Start() {
|
||||
distributed: v.distributed,
|
||||
disableDutiesPolling: v.disableDutiesPolling,
|
||||
accountsChangedChannel: make(chan [][fieldparams.BLSPubkeyLength]byte, 1),
|
||||
eventsChannel: make(chan *eventClient.Event, 1),
|
||||
eventsChannel: make(chan *eventClient.Event, eventChannelBufferSize),
|
||||
}
|
||||
|
||||
hm := newHealthMonitor(v.ctx, v.cancel, v.maxHealthChecks, v.validator)
|
||||
|
||||
@@ -64,6 +64,11 @@ var (
|
||||
msgNoKeysFetched = "No validating keys fetched. Waiting for keys..."
|
||||
)
|
||||
|
||||
const (
|
||||
eventStreamStopped uint32 = iota
|
||||
eventStreamRunning
|
||||
)
|
||||
|
||||
type validator struct {
|
||||
logValidatorPerformance bool
|
||||
distributed bool
|
||||
@@ -82,6 +87,7 @@ type validator struct {
|
||||
cachedAttestationData *ethpb.AttestationData
|
||||
accountsChangedChannel chan [][fieldparams.BLSPubkeyLength]byte
|
||||
eventsChannel chan *eventClient.Event
|
||||
eventStreamState atomic.Uint32
|
||||
highestValidSlot primitives.Slot
|
||||
submittedAggregates map[submittedAttKey]*submittedAtt
|
||||
graffitiStruct *graffiti.Graffiti
|
||||
@@ -1211,12 +1217,40 @@ func (v *validator) PushProposerSettings(ctx context.Context, slot primitives.Sl
|
||||
}
|
||||
|
||||
func (v *validator) StartEventStream(ctx context.Context, topics []string) {
|
||||
if v.EventStreamIsRunning() {
|
||||
if !v.eventStreamState.CompareAndSwap(eventStreamStopped, eventStreamRunning) {
|
||||
log.Debug("EventStream is already running")
|
||||
return
|
||||
}
|
||||
log.WithField("topics", topics).Info("Starting event stream")
|
||||
v.validatorClient.StartEventStream(ctx, topics, v.eventsChannel)
|
||||
go v.runEventStream(ctx, topics)
|
||||
}
|
||||
|
||||
func (v *validator) runEventStream(ctx context.Context, topics []string) {
|
||||
defer v.eventStreamState.Store(eventStreamStopped)
|
||||
backoff := time.Second
|
||||
const maxBackoff = 30 * time.Second
|
||||
|
||||
for {
|
||||
v.validatorClient.StartEventStream(ctx, topics, v.eventsChannel)
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
log.WithField("retryIn", backoff).Warn("Event stream ended unexpectedly, attempting to resubscribe")
|
||||
timer := time.NewTimer(backoff)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
case <-timer.C:
|
||||
}
|
||||
if backoff < maxBackoff {
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (v *validator) checkDependentRoots(ctx context.Context, head *structs.HeadEvent) error {
|
||||
@@ -1303,7 +1337,7 @@ func (v *validator) ProcessEvent(ctx context.Context, event *eventClient.Event)
|
||||
}
|
||||
|
||||
func (v *validator) EventStreamIsRunning() bool {
|
||||
return v.validatorClient.EventStreamIsRunning()
|
||||
return v.eventStreamState.Load() == eventStreamRunning
|
||||
}
|
||||
|
||||
func (v *validator) Host() string {
|
||||
|
||||
Reference in New Issue
Block a user