Compare commits

..

1 Commits

Author SHA1 Message Date
james-prysm
cde4f3b009 wip 2026-01-30 16:58:23 -06:00
16 changed files with 113 additions and 130 deletions

View File

@@ -34,6 +34,18 @@ type Event struct {
Data []byte
}
// PublishEvent enqueues an event without blocking the producer. If the channel is full,
// the event is dropped since only the most recent heads are relevant.
func PublishEvent(eventsChannel chan<- *Event, event *Event) {
if eventsChannel == nil || event == nil {
return
}
select {
case eventsChannel <- event:
default:
}
}
// EventStream is responsible for subscribing to the Beacon API events endpoint
// and dispatching received events to subscribers.
type EventStream struct {
@@ -67,19 +79,20 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
fullUrl := h.host + "/eth/v1/events?topics=" + allTopics
req, err := http.NewRequestWithContext(h.ctx, http.MethodGet, fullUrl, nil)
if err != nil {
eventsChannel <- &Event{
PublishEvent(eventsChannel, &Event{
EventType: EventConnectionError,
Data: []byte(errors.Wrap(err, "failed to create HTTP request").Error()),
}
})
return
}
req.Header.Set("Accept", api.EventStreamMediaType)
req.Header.Set("Connection", api.KeepAlive)
resp, err := h.httpClient.Do(req)
if err != nil {
eventsChannel <- &Event{
PublishEvent(eventsChannel, &Event{
EventType: EventConnectionError,
Data: []byte(errors.Wrap(err, client.ErrConnectionIssue.Error()).Error()),
}
})
return
}
@@ -100,7 +113,6 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
select {
case <-h.ctx.Done():
log.Info("Context canceled, stopping event stream")
close(eventsChannel)
return
default:
line := scanner.Text()
@@ -109,7 +121,7 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
// Empty line indicates the end of an event
if eventType != "" && data != "" {
// Process the event when both eventType and data are set
eventsChannel <- &Event{EventType: eventType, Data: []byte(data)}
PublishEvent(eventsChannel, &Event{EventType: eventType, Data: []byte(data)})
}
// Reset eventType and data for the next event
@@ -130,9 +142,9 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
}
if err := scanner.Err(); err != nil {
eventsChannel <- &Event{
PublishEvent(eventsChannel, &Event{
EventType: EventConnectionError,
Data: []byte(errors.Wrap(err, errors.Wrap(client.ErrConnectionIssue, "scanner failed").Error()).Error()),
}
})
}
}

View File

@@ -53,7 +53,7 @@ func TestEventStream(t *testing.T) {
defer server.Close()
topics := []string{"head"}
eventsChannel := make(chan *Event, 1)
eventsChannel := make(chan *Event, 4)
stream, err := NewEventStream(t.Context(), http.DefaultClient, server.URL, topics)
require.NoError(t, err)
go stream.Subscribe(eventsChannel)
@@ -80,7 +80,7 @@ func TestEventStream(t *testing.T) {
func TestEventStreamRequestError(t *testing.T) {
topics := []string{"head"}
eventsChannel := make(chan *Event, 1)
eventsChannel := make(chan *Event, 4)
ctx := t.Context()
// use valid url that will result in failed request with nil body

View File

@@ -46,7 +46,6 @@ go_library(
"//proto/prysm/v1alpha1:go_default_library",
"@com_github_pkg_errors//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)
@@ -79,7 +78,6 @@ go_test(
"//testing/assert:go_default_library",
"//testing/require:go_default_library",
"//testing/util:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
)

View File

@@ -2,16 +2,16 @@ package stateutil
import (
"bytes"
"context"
"encoding/binary"
"runtime"
"sync"
fieldparams "github.com/OffchainLabs/prysm/v7/config/fieldparams"
"github.com/OffchainLabs/prysm/v7/crypto/hash/htr"
"github.com/OffchainLabs/prysm/v7/encoding/ssz"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"github.com/sirupsen/logrus"
)
const (
@@ -54,23 +54,17 @@ func validatorRegistryRoot(validators []*ethpb.Validator) ([32]byte, error) {
return res, nil
}
func hashValidatorHelper(ctx context.Context, validators []*ethpb.Validator, roots [][32]byte, j int, groupSize int) func() error {
return func() error {
for i := range groupSize {
select {
case <-ctx.Done():
return ctx.Err()
default:
fRoots, err := ValidatorFieldRoots(validators[j*groupSize+i])
if err != nil {
return errors.Wrap(err, "could not get validator field roots")
}
for k, root := range fRoots {
roots[(j*groupSize+i)*validatorFieldRoots+k] = root
}
}
func hashValidatorHelper(validators []*ethpb.Validator, roots [][32]byte, j int, groupSize int, wg *sync.WaitGroup) {
defer wg.Done()
for i := range groupSize {
fRoots, err := ValidatorFieldRoots(validators[j*groupSize+i])
if err != nil {
logrus.WithError(err).Error("Could not get validator field roots")
return
}
for k, root := range fRoots {
roots[(j*groupSize+i)*validatorFieldRoots+k] = root
}
return nil
}
}
@@ -81,13 +75,14 @@ func OptimizedValidatorRoots(validators []*ethpb.Validator) ([][32]byte, error)
if len(validators) == 0 {
return [][32]byte{}, nil
}
g, ctx := errgroup.WithContext(context.Background())
wg := sync.WaitGroup{}
n := runtime.GOMAXPROCS(0)
rootsSize := len(validators) * validatorFieldRoots
groupSize := len(validators) / n
roots := make([][32]byte, rootsSize)
wg.Add(n - 1)
for j := 0; j < n-1; j++ {
g.Go(hashValidatorHelper(ctx, validators, roots, j, groupSize))
go hashValidatorHelper(validators, roots, j, groupSize, &wg)
}
for i := (n - 1) * groupSize; i < len(validators); i++ {
fRoots, err := ValidatorFieldRoots(validators[i])
@@ -98,9 +93,7 @@ func OptimizedValidatorRoots(validators []*ethpb.Validator) ([][32]byte, error)
roots[i*validatorFieldRoots+k] = root
}
}
if err := g.Wait(); err != nil {
return [][32]byte{}, err
}
wg.Wait()
// A validator's tree can represented with a depth of 3. As log2(8) = 3
// Using this property we can lay out all the individual fields of a

View File

@@ -3,13 +3,13 @@ package stateutil
import (
"reflect"
"strings"
"sync"
"testing"
mathutil "github.com/OffchainLabs/prysm/v7/math"
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
"github.com/OffchainLabs/prysm/v7/testing/assert"
"github.com/OffchainLabs/prysm/v7/testing/require"
"golang.org/x/sync/errgroup"
)
func TestValidatorConstants(t *testing.T) {
@@ -34,15 +34,15 @@ func TestValidatorConstants(t *testing.T) {
}
func TestHashValidatorHelper(t *testing.T) {
g, ctx := errgroup.WithContext(t.Context())
wg := sync.WaitGroup{}
wg.Add(1)
v := &ethpb.Validator{}
valList := make([]*ethpb.Validator, 10*validatorFieldRoots)
for i := range valList {
valList[i] = v
}
roots := make([][32]byte, len(valList))
g.Go(hashValidatorHelper(ctx, valList, roots, 2, 2))
require.NoError(t, g.Wait())
hashValidatorHelper(valList, roots, 2, 2, &wg)
for i := range 4 * validatorFieldRoots {
require.Equal(t, [32]byte{}, roots[i])
}

View File

@@ -1,3 +0,0 @@
### Added
- Added README for maintaining specrefs.

View File

@@ -1,3 +0,0 @@
### Added
- The ability to download the nightly reference tests from a specific day.

View File

@@ -1,3 +0,0 @@
### Changed
- Use `errgroup` in `OptimizedValidatorRoots`.

View File

@@ -1,35 +0,0 @@
# Specification References
This directory contains specification reference tracking files managed by
[ethspecify](https://github.com/jtraglia/ethspecify).
## Installation
Install `ethspecify` with the following command:
```bash
pipx install ethspecify
```
> [!NOTE]
> You can run `ethspecify <cmd>` in the `specrefs` directory or
> `ethspecify <cmd> --path=specrefs` from the project's root directory.
## Maintenance
When adding support for a new specification version, follow these steps:
0. Change directory into the `specrefs` directory.
1. Update the version in `.ethspecify.yml` configuration.
2. Run `ethspecify process` to update/populate specrefs.
3. Run `ethspecify check` to check specrefs.
4. If there are errors, use the error message as a guide to fix the issue. If
there are new specrefs with empty sources, implement/locate each item and
update each specref source list. If you choose not to implement an item,
add an exception to the appropriate section the the `.ethspecify.yml`
configuration.
5. Repeat steps 3 and 4 until `ethspecify check` passes.
6. Run `git diff` to view updated specrefs. If an object/function/etc has
changed, make the necessary updates to the implementation.
7. Lastly, in the project's root directory, run `act -j check-specrefs` to
ensure everything is correct.

View File

@@ -21,14 +21,10 @@ There are tests for mainnet and minimal config, so for each config we will add a
## Running nightly spectests
Since [PR 15312](https://github.com/OffchainLabs/prysm/pull/15312), Prysm has support to download "nightly" spectests from github via a starlark rule configuration by environment variable.
Set `--repo_env=CONSENSUS_SPEC_TESTS_VERSION=nightly` or `--repo_env=CONSENSUS_SPEC_TESTS_VERSION=nightly-<run_id>` when running spectest to download the "nightly" spectests.
Note: A GITHUB_TOKEN environment variable is required to be set. The github token does not need to be associated with your main account; it can be from a "burner account". And the token does not need to be a fine-grained token; it can be a classic token.
Since [PR 15312](https://github.com/OffchainLabs/prysm/pull/15312), Prysm has support to download "nightly" spectests from github via a starlark rule configuration by environment variable.
Set `--repo_env=CONSENSUS_SPEC_TESTS_VERSION=nightly` when running spectest to download the "nightly" spectests.
Note: A GITHUB_TOKEN environment variable is required to be set. The github token must be a [fine grained token](https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens#creating-a-fine-grained-personal-access-token).
```
bazel test //... --test_tag_filters=spectest --repo_env=CONSENSUS_SPEC_TESTS_VERSION=nightly
```
```
bazel test //... --test_tag_filters=spectest --repo_env=CONSENSUS_SPEC_TESTS_VERSION=nightly-21422848633
```

View File

@@ -1,6 +1,5 @@
# bazel build @consensus_spec_tests//:test_data
# bazel build @consensus_spec_tests//:test_data --repo_env=CONSENSUS_SPEC_TESTS_VERSION=nightly
# bazel build @consensus_spec_tests//:test_data --repo_env=CONSENSUS_SPEC_TESTS_VERSION=nightly-<run_id>
def _get_redirected_url(repository_ctx, url, headers):
if not repository_ctx.which("curl"):
@@ -25,7 +24,7 @@ def _impl(repository_ctx):
version = repository_ctx.getenv("CONSENSUS_SPEC_TESTS_VERSION") or repository_ctx.attr.version
token = repository_ctx.getenv("GITHUB_TOKEN") or ""
if version == "nightly" or version.startswith("nightly-"):
if version == "nightly":
print("Downloading nightly tests")
if not token:
fail("Error GITHUB_TOKEN is not set")
@@ -35,22 +34,16 @@ def _impl(repository_ctx):
"Accept": "application/vnd.github+json",
}
if version.startswith("nightly-"):
run_id = version.split("nightly-", 1)[1]
if not run_id:
fail("Error invalid run id")
else:
repository_ctx.download(
"https://api.github.com/repos/%s/actions/workflows/%s/runs?branch=%s&status=success&per_page=1"
% (repository_ctx.attr.repo, repository_ctx.attr.workflow, repository_ctx.attr.branch),
headers = headers,
output = "runs.json"
)
repository_ctx.download(
"https://api.github.com/repos/%s/actions/workflows/%s/runs?branch=%s&status=success&per_page=1"
% (repository_ctx.attr.repo, repository_ctx.attr.workflow, repository_ctx.attr.branch),
headers = headers,
output = "runs.json"
)
run_id = json.decode(repository_ctx.read("runs.json"))["workflow_runs"][0]["id"]
repository_ctx.delete("runs.json")
run_id = json.decode(repository_ctx.read("runs.json"))["workflow_runs"][0]["id"]
repository_ctx.delete("runs.json")
print("Run id:", run_id)
repository_ctx.download(
"https://api.github.com/repos/%s/actions/runs/%s/artifacts"
% (repository_ctx.attr.repo, run_id),
@@ -115,8 +108,8 @@ consensus_spec_tests = repository_rule(
"version": attr.string(mandatory = True),
"flavors": attr.string_dict(mandatory = True),
"repo": attr.string(default = "ethereum/consensus-specs"),
"workflow": attr.string(default = "nightly-reftests.yml"),
"branch": attr.string(default = "master"),
"workflow": attr.string(default = "generate_vectors.yml"),
"branch": attr.string(default = "dev"),
"release_url_template": attr.string(default = "https://github.com/ethereum/consensus-specs/releases/download/%s"),
},
)

View File

@@ -285,10 +285,10 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
ctx, span := trace.StartSpan(ctx, "validator.gRPCClient.StartEventStream")
defer span.End()
if len(topics) == 0 {
eventsChannel <- &eventClient.Event{
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
EventType: eventClient.EventError,
Data: []byte(errors.New("no topics were added").Error()),
}
})
return
}
// TODO(13563): ONLY WORKS WITH HEAD TOPIC.
@@ -299,10 +299,10 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
}
}
if !containsHead {
eventsChannel <- &eventClient.Event{
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
EventType: eventClient.EventConnectionError,
Data: []byte(errors.Wrap(client.ErrConnectionIssue, "gRPC only supports the head topic, and head topic was not passed").Error()),
}
})
}
if containsHead && len(topics) > 1 {
log.Warn("gRPC only supports the head topic, other topics will be ignored")
@@ -310,10 +310,10 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
stream, err := c.beaconNodeValidatorClient.StreamSlots(ctx, &ethpb.StreamSlotsRequest{VerifiedOnly: true})
if err != nil {
eventsChannel <- &eventClient.Event{
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
EventType: eventClient.EventConnectionError,
Data: []byte(errors.Wrap(client.ErrConnectionIssue, err.Error()).Error()),
}
})
return
}
c.isEventStreamRunning = true
@@ -327,25 +327,25 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
if ctx.Err() != nil {
c.isEventStreamRunning = false
if errors.Is(ctx.Err(), context.Canceled) {
eventsChannel <- &eventClient.Event{
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
EventType: eventClient.EventConnectionError,
Data: []byte(errors.Wrap(client.ErrConnectionIssue, ctx.Err().Error()).Error()),
}
})
return
}
eventsChannel <- &eventClient.Event{
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
EventType: eventClient.EventError,
Data: []byte(ctx.Err().Error()),
}
})
return
}
res, err := stream.Recv()
if err != nil {
c.isEventStreamRunning = false
eventsChannel <- &eventClient.Event{
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
EventType: eventClient.EventConnectionError,
Data: []byte(errors.Wrap(client.ErrConnectionIssue, err.Error()).Error()),
}
})
return
}
if res == nil {
@@ -357,15 +357,15 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
CurrentDutyDependentRoot: hexutil.Encode(res.CurrentDutyDependentRoot),
})
if err != nil {
eventsChannel <- &eventClient.Event{
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
EventType: eventClient.EventError,
Data: []byte(errors.Wrap(err, "failed to marshal Head Event").Error()),
}
})
}
eventsChannel <- &eventClient.Event{
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
EventType: eventClient.EventHead,
Data: b,
}
})
}
}
}

View File

@@ -223,7 +223,7 @@ func TestStartEventStream(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
eventsChannel := make(chan *eventClient.Event, 1) // Buffer to prevent blocking
eventsChannel := make(chan *eventClient.Event, 4) // Buffer to prevent blocking
tc.prepare() // Setup mock expectations
go grpcClient.StartEventStream(ctx, tc.topics, eventsChannel)

View File

@@ -441,7 +441,6 @@ func TestRunnerPushesProposerSettings_ValidContext(t *testing.T) {
defer assertValidContext(t, timedCtx, ctx)
delay(t)
})
vcm.EXPECT().EventStreamIsRunning().Return(true).AnyTimes().Do(func() { delay(t) })
vcm.EXPECT().SubmitValidatorRegistrations(liveCtx, gomock.Any()).Do(func(ctx context.Context, _ any) {
defer assertValidContext(t, timedCtx, ctx) // This is the specific regression test assertion for PR 15369.
delay(t)

View File

@@ -66,6 +66,8 @@ type ValidatorService struct {
closeClientFunc func() // validator client stop function is used here
}
const eventChannelBufferSize = 32
// Config for the validator service.
type Config struct {
Validator iface.Validator
@@ -234,7 +236,7 @@ func (v *ValidatorService) Start() {
distributed: v.distributed,
disableDutiesPolling: v.disableDutiesPolling,
accountsChangedChannel: make(chan [][fieldparams.BLSPubkeyLength]byte, 1),
eventsChannel: make(chan *eventClient.Event, 1),
eventsChannel: make(chan *eventClient.Event, eventChannelBufferSize),
}
hm := newHealthMonitor(v.ctx, v.cancel, v.maxHealthChecks, v.validator)

View File

@@ -64,6 +64,11 @@ var (
msgNoKeysFetched = "No validating keys fetched. Waiting for keys..."
)
const (
eventStreamStopped uint32 = iota
eventStreamRunning
)
type validator struct {
logValidatorPerformance bool
distributed bool
@@ -82,6 +87,7 @@ type validator struct {
cachedAttestationData *ethpb.AttestationData
accountsChangedChannel chan [][fieldparams.BLSPubkeyLength]byte
eventsChannel chan *eventClient.Event
eventStreamState atomic.Uint32
highestValidSlot primitives.Slot
submittedAggregates map[submittedAttKey]*submittedAtt
graffitiStruct *graffiti.Graffiti
@@ -1211,12 +1217,40 @@ func (v *validator) PushProposerSettings(ctx context.Context, slot primitives.Sl
}
func (v *validator) StartEventStream(ctx context.Context, topics []string) {
if v.EventStreamIsRunning() {
if !v.eventStreamState.CompareAndSwap(eventStreamStopped, eventStreamRunning) {
log.Debug("EventStream is already running")
return
}
log.WithField("topics", topics).Info("Starting event stream")
v.validatorClient.StartEventStream(ctx, topics, v.eventsChannel)
go v.runEventStream(ctx, topics)
}
func (v *validator) runEventStream(ctx context.Context, topics []string) {
defer v.eventStreamState.Store(eventStreamStopped)
backoff := time.Second
const maxBackoff = 30 * time.Second
for {
v.validatorClient.StartEventStream(ctx, topics, v.eventsChannel)
if ctx.Err() != nil {
return
}
log.WithField("retryIn", backoff).Warn("Event stream ended unexpectedly, attempting to resubscribe")
timer := time.NewTimer(backoff)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
}
if backoff < maxBackoff {
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
}
}
func (v *validator) checkDependentRoots(ctx context.Context, head *structs.HeadEvent) error {
@@ -1303,7 +1337,7 @@ func (v *validator) ProcessEvent(ctx context.Context, event *eventClient.Event)
}
func (v *validator) EventStreamIsRunning() bool {
return v.validatorClient.EventStreamIsRunning()
return v.eventStreamState.Load() == eventStreamRunning
}
func (v *validator) Host() string {