mirror of
https://github.com/OffchainLabs/prysm.git
synced 2026-01-30 23:58:23 -05:00
Compare commits
1 Commits
gloas-ptc-
...
event-stre
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cde4f3b009 |
@@ -34,6 +34,18 @@ type Event struct {
|
||||
Data []byte
|
||||
}
|
||||
|
||||
// PublishEvent enqueues an event without blocking the producer. If the channel is full,
|
||||
// the event is dropped since only the most recent heads are relevant.
|
||||
func PublishEvent(eventsChannel chan<- *Event, event *Event) {
|
||||
if eventsChannel == nil || event == nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case eventsChannel <- event:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// EventStream is responsible for subscribing to the Beacon API events endpoint
|
||||
// and dispatching received events to subscribers.
|
||||
type EventStream struct {
|
||||
@@ -67,19 +79,20 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
|
||||
fullUrl := h.host + "/eth/v1/events?topics=" + allTopics
|
||||
req, err := http.NewRequestWithContext(h.ctx, http.MethodGet, fullUrl, nil)
|
||||
if err != nil {
|
||||
eventsChannel <- &Event{
|
||||
PublishEvent(eventsChannel, &Event{
|
||||
EventType: EventConnectionError,
|
||||
Data: []byte(errors.Wrap(err, "failed to create HTTP request").Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
req.Header.Set("Accept", api.EventStreamMediaType)
|
||||
req.Header.Set("Connection", api.KeepAlive)
|
||||
resp, err := h.httpClient.Do(req)
|
||||
if err != nil {
|
||||
eventsChannel <- &Event{
|
||||
PublishEvent(eventsChannel, &Event{
|
||||
EventType: EventConnectionError,
|
||||
Data: []byte(errors.Wrap(err, client.ErrConnectionIssue.Error()).Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -100,7 +113,6 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
|
||||
select {
|
||||
case <-h.ctx.Done():
|
||||
log.Info("Context canceled, stopping event stream")
|
||||
close(eventsChannel)
|
||||
return
|
||||
default:
|
||||
line := scanner.Text()
|
||||
@@ -109,7 +121,7 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
|
||||
// Empty line indicates the end of an event
|
||||
if eventType != "" && data != "" {
|
||||
// Process the event when both eventType and data are set
|
||||
eventsChannel <- &Event{EventType: eventType, Data: []byte(data)}
|
||||
PublishEvent(eventsChannel, &Event{EventType: eventType, Data: []byte(data)})
|
||||
}
|
||||
|
||||
// Reset eventType and data for the next event
|
||||
@@ -130,9 +142,9 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
|
||||
}
|
||||
|
||||
if err := scanner.Err(); err != nil {
|
||||
eventsChannel <- &Event{
|
||||
PublishEvent(eventsChannel, &Event{
|
||||
EventType: EventConnectionError,
|
||||
Data: []byte(errors.Wrap(err, errors.Wrap(client.ErrConnectionIssue, "scanner failed").Error()).Error()),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -53,7 +53,7 @@ func TestEventStream(t *testing.T) {
|
||||
defer server.Close()
|
||||
|
||||
topics := []string{"head"}
|
||||
eventsChannel := make(chan *Event, 1)
|
||||
eventsChannel := make(chan *Event, 4)
|
||||
stream, err := NewEventStream(t.Context(), http.DefaultClient, server.URL, topics)
|
||||
require.NoError(t, err)
|
||||
go stream.Subscribe(eventsChannel)
|
||||
@@ -80,7 +80,7 @@ func TestEventStream(t *testing.T) {
|
||||
|
||||
func TestEventStreamRequestError(t *testing.T) {
|
||||
topics := []string{"head"}
|
||||
eventsChannel := make(chan *Event, 1)
|
||||
eventsChannel := make(chan *Event, 4)
|
||||
ctx := t.Context()
|
||||
|
||||
// use valid url that will result in failed request with nil body
|
||||
|
||||
@@ -540,12 +540,6 @@ type PayloadAttestation struct {
|
||||
Signature string `json:"signature"`
|
||||
}
|
||||
|
||||
type PayloadAttestationMessage struct {
|
||||
ValidatorIndex string `json:"validator_index"`
|
||||
Data *PayloadAttestationData `json:"data"`
|
||||
Signature string `json:"signature"`
|
||||
}
|
||||
|
||||
type BeaconBlockBodyGloas struct {
|
||||
RandaoReveal string `json:"randao_reveal"`
|
||||
Eth1Data *Eth1Data `json:"eth1_data"`
|
||||
|
||||
@@ -3263,34 +3263,3 @@ func (d *PayloadAttestationData) ToConsensus() (*eth.PayloadAttestationData, err
|
||||
BlobDataAvailable: d.BlobDataAvailable,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func PayloadAttestationMessageFromConsensus(msg *eth.PayloadAttestationMessage) *PayloadAttestationMessage {
|
||||
return &PayloadAttestationMessage{
|
||||
ValidatorIndex: fmt.Sprintf("%d", msg.ValidatorIndex),
|
||||
Data: PayloadAttestationDataFromConsensus(msg.Data),
|
||||
Signature: hexutil.Encode(msg.Signature),
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PayloadAttestationMessage) ToConsensus() (*eth.PayloadAttestationMessage, error) {
|
||||
if p == nil {
|
||||
return nil, errNilValue
|
||||
}
|
||||
validatorIndex, err := strconv.ParseUint(p.ValidatorIndex, 10, 64)
|
||||
if err != nil {
|
||||
return nil, server.NewDecodeError(err, "ValidatorIndex")
|
||||
}
|
||||
data, err := p.Data.ToConsensus()
|
||||
if err != nil {
|
||||
return nil, server.NewDecodeError(err, "Data")
|
||||
}
|
||||
sig, err := bytesutil.DecodeHexWithLength(p.Signature, fieldparams.BLSSignatureLength)
|
||||
if err != nil {
|
||||
return nil, server.NewDecodeError(err, "Signature")
|
||||
}
|
||||
return ð.PayloadAttestationMessage{
|
||||
ValidatorIndex: primitives.ValidatorIndex(validatorIndex),
|
||||
Data: data,
|
||||
Signature: sig,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -188,11 +188,6 @@ type BLSToExecutionChangesPoolResponse struct {
|
||||
Data []*SignedBLSToExecutionChange `json:"data"`
|
||||
}
|
||||
|
||||
type GetPoolPayloadAttestationsResponse struct {
|
||||
Version string `json:"version"`
|
||||
Data []*PayloadAttestation `json:"data"`
|
||||
}
|
||||
|
||||
type GetAttesterSlashingsResponse struct {
|
||||
Version string `json:"version,omitempty"`
|
||||
Data json.RawMessage `json:"data"` // Accepts both `[]*AttesterSlashing` and `[]*AttesterSlashingElectra` types
|
||||
|
||||
@@ -31,11 +31,6 @@ type GetAttestationDataResponse struct {
|
||||
Data *AttestationData `json:"data"`
|
||||
}
|
||||
|
||||
type GetPayloadAttestationDataResponse struct {
|
||||
Version string `json:"version"`
|
||||
Data *PayloadAttestationData `json:"data"`
|
||||
}
|
||||
|
||||
type ProduceSyncCommitteeContributionResponse struct {
|
||||
Data *SyncCommitteeContribution `json:"data"`
|
||||
}
|
||||
|
||||
@@ -37,7 +37,6 @@ go_library(
|
||||
"//beacon-chain/node/registration:go_default_library",
|
||||
"//beacon-chain/operations/attestations:go_default_library",
|
||||
"//beacon-chain/operations/blstoexec:go_default_library",
|
||||
"//beacon-chain/operations/payloadattestation:go_default_library",
|
||||
"//beacon-chain/operations/slashings:go_default_library",
|
||||
"//beacon-chain/operations/synccommittee:go_default_library",
|
||||
"//beacon-chain/operations/voluntaryexits:go_default_library",
|
||||
|
||||
@@ -40,7 +40,6 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/node/registration"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/attestations"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/blstoexec"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/payloadattestation"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/slashings"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/synccommittee"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits"
|
||||
@@ -102,7 +101,6 @@ type BeaconNode struct {
|
||||
slashingsPool slashings.PoolManager
|
||||
syncCommitteePool synccommittee.Pool
|
||||
blsToExecPool blstoexec.PoolManager
|
||||
payloadAttestationPool payloadattestation.PoolManager
|
||||
depositCache cache.DepositCache
|
||||
trackedValidatorsCache *cache.TrackedValidatorsCache
|
||||
payloadIDCache *cache.PayloadIDCache
|
||||
@@ -157,7 +155,6 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
|
||||
slashingsPool: slashings.NewPool(),
|
||||
syncCommitteePool: synccommittee.NewPool(),
|
||||
blsToExecPool: blstoexec.NewPool(),
|
||||
payloadAttestationPool: payloadattestation.NewPool(),
|
||||
trackedValidatorsCache: cache.NewTrackedValidatorsCache(),
|
||||
payloadIDCache: cache.NewPayloadIDCache(),
|
||||
slasherBlockHeadersFeed: new(event.Feed),
|
||||
@@ -975,7 +972,6 @@ func (b *BeaconNode) registerRPCService(router *http.ServeMux) error {
|
||||
SlashingsPool: b.slashingsPool,
|
||||
BLSChangesPool: b.blsToExecPool,
|
||||
SyncCommitteeObjectPool: b.syncCommitteePool,
|
||||
PayloadAttestationPool: b.payloadAttestationPool,
|
||||
ExecutionChainService: web3Service,
|
||||
ExecutionChainInfoFetcher: web3Service,
|
||||
ChainStartFetcher: chainStartFetcher,
|
||||
|
||||
@@ -1,27 +0,0 @@
|
||||
load("@prysm//tools/go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["pool.go"],
|
||||
importpath = "github.com/OffchainLabs/prysm/v7/beacon-chain/operations/payloadattestation",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//crypto/hash:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"@com_github_pkg_errors//:go_default_library",
|
||||
"@org_golang_google_protobuf//proto:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["pool_test.go"],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//consensus-types/primitives:go_default_library",
|
||||
"//proto/prysm/v1alpha1:go_default_library",
|
||||
"//testing/assert:go_default_library",
|
||||
"//testing/require:go_default_library",
|
||||
],
|
||||
)
|
||||
@@ -1,108 +0,0 @@
|
||||
package payloadattestation
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||
"github.com/OffchainLabs/prysm/v7/crypto/hash"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
// PoolManager maintains pending payload attestations.
|
||||
// This pool is used by proposers to insert payload attestations into new blocks.
|
||||
type PoolManager interface {
|
||||
PendingPayloadAttestations(slot ...primitives.Slot) []*ethpb.PayloadAttestation
|
||||
InsertPayloadAttestation(msg *ethpb.PayloadAttestationMessage) error
|
||||
MarkIncluded(att *ethpb.PayloadAttestation)
|
||||
}
|
||||
|
||||
// Pool is a concrete implementation of PoolManager.
|
||||
type Pool struct {
|
||||
lock sync.RWMutex
|
||||
pending map[[32]byte]*ethpb.PayloadAttestation
|
||||
}
|
||||
|
||||
// NewPool returns an initialized pool.
|
||||
func NewPool() *Pool {
|
||||
return &Pool{
|
||||
pending: make(map[[32]byte]*ethpb.PayloadAttestation),
|
||||
}
|
||||
}
|
||||
|
||||
// PendingPayloadAttestations returns all pending payload attestations from the pool.
|
||||
// If a slot is provided, only attestations for that slot are returned.
|
||||
func (p *Pool) PendingPayloadAttestations(slot ...primitives.Slot) []*ethpb.PayloadAttestation {
|
||||
p.lock.RLock()
|
||||
defer p.lock.RUnlock()
|
||||
|
||||
result := make([]*ethpb.PayloadAttestation, 0, len(p.pending))
|
||||
for _, att := range p.pending {
|
||||
if len(slot) > 0 && att.Data.Slot != slot[0] {
|
||||
continue
|
||||
}
|
||||
result = append(result, att)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// InsertPayloadAttestation inserts a payload attestation message into the pool.
|
||||
// The message is converted to an aggregated PayloadAttestation and merged with
|
||||
// any existing attestation that has matching PayloadAttestationData.
|
||||
func (p *Pool) InsertPayloadAttestation(msg *ethpb.PayloadAttestationMessage) error {
|
||||
if msg == nil || msg.Data == nil {
|
||||
return errors.New("nil payload attestation message")
|
||||
}
|
||||
|
||||
key, err := dataKey(msg.Data)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not compute data key")
|
||||
}
|
||||
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
existing, ok := p.pending[key]
|
||||
if !ok {
|
||||
// Create a new aggregated PayloadAttestation from this message.
|
||||
p.pending[key] = ðpb.PayloadAttestation{
|
||||
AggregationBits: []byte{},
|
||||
Data: proto.Clone(msg.Data).(*ethpb.PayloadAttestationData),
|
||||
Signature: msg.Signature,
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Merge: for now just replace the signature since proper BLS aggregation
|
||||
// requires knowing the PTC committee position. Full aggregation is a TODO.
|
||||
_ = existing
|
||||
return nil
|
||||
}
|
||||
|
||||
// MarkIncluded removes the attestation with matching data from the pool.
|
||||
func (p *Pool) MarkIncluded(att *ethpb.PayloadAttestation) {
|
||||
if att == nil || att.Data == nil {
|
||||
return
|
||||
}
|
||||
|
||||
key, err := dataKey(att.Data)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
|
||||
delete(p.pending, key)
|
||||
}
|
||||
|
||||
// dataKey computes a deterministic key for PayloadAttestationData
|
||||
// by hashing its serialized form.
|
||||
func dataKey(data *ethpb.PayloadAttestationData) ([32]byte, error) {
|
||||
enc, err := proto.Marshal(data)
|
||||
if err != nil {
|
||||
return [32]byte{}, err
|
||||
}
|
||||
return hash.Hash(enc), nil
|
||||
}
|
||||
@@ -1,201 +0,0 @@
|
||||
package payloadattestation
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/OffchainLabs/prysm/v7/consensus-types/primitives"
|
||||
ethpb "github.com/OffchainLabs/prysm/v7/proto/prysm/v1alpha1"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/assert"
|
||||
"github.com/OffchainLabs/prysm/v7/testing/require"
|
||||
)
|
||||
|
||||
func TestPool_PendingPayloadAttestations(t *testing.T) {
|
||||
t.Run("empty pool", func(t *testing.T) {
|
||||
pool := NewPool()
|
||||
atts := pool.PendingPayloadAttestations()
|
||||
assert.Equal(t, 0, len(atts))
|
||||
})
|
||||
|
||||
t.Run("non-empty pool", func(t *testing.T) {
|
||||
pool := NewPool()
|
||||
msg1 := ðpb.PayloadAttestationMessage{
|
||||
ValidatorIndex: 0,
|
||||
Data: ðpb.PayloadAttestationData{
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Slot: 1,
|
||||
PayloadPresent: true,
|
||||
BlobDataAvailable: false,
|
||||
},
|
||||
Signature: make([]byte, 96),
|
||||
}
|
||||
msg2 := ðpb.PayloadAttestationMessage{
|
||||
ValidatorIndex: 1,
|
||||
Data: ðpb.PayloadAttestationData{
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Slot: 2,
|
||||
PayloadPresent: false,
|
||||
BlobDataAvailable: true,
|
||||
},
|
||||
Signature: make([]byte, 96),
|
||||
}
|
||||
require.NoError(t, pool.InsertPayloadAttestation(msg1))
|
||||
require.NoError(t, pool.InsertPayloadAttestation(msg2))
|
||||
atts := pool.PendingPayloadAttestations()
|
||||
assert.Equal(t, 2, len(atts))
|
||||
})
|
||||
|
||||
t.Run("filter by slot", func(t *testing.T) {
|
||||
pool := NewPool()
|
||||
msg1 := ðpb.PayloadAttestationMessage{
|
||||
ValidatorIndex: 0,
|
||||
Data: ðpb.PayloadAttestationData{
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Slot: 1,
|
||||
PayloadPresent: true,
|
||||
BlobDataAvailable: false,
|
||||
},
|
||||
Signature: make([]byte, 96),
|
||||
}
|
||||
msg2 := ðpb.PayloadAttestationMessage{
|
||||
ValidatorIndex: 1,
|
||||
Data: ðpb.PayloadAttestationData{
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Slot: 2,
|
||||
PayloadPresent: false,
|
||||
BlobDataAvailable: true,
|
||||
},
|
||||
Signature: make([]byte, 96),
|
||||
}
|
||||
require.NoError(t, pool.InsertPayloadAttestation(msg1))
|
||||
require.NoError(t, pool.InsertPayloadAttestation(msg2))
|
||||
|
||||
atts := pool.PendingPayloadAttestations(primitives.Slot(1))
|
||||
assert.Equal(t, 1, len(atts))
|
||||
assert.Equal(t, primitives.Slot(1), atts[0].Data.Slot)
|
||||
|
||||
atts = pool.PendingPayloadAttestations(primitives.Slot(2))
|
||||
assert.Equal(t, 1, len(atts))
|
||||
assert.Equal(t, primitives.Slot(2), atts[0].Data.Slot)
|
||||
|
||||
atts = pool.PendingPayloadAttestations(primitives.Slot(99))
|
||||
assert.Equal(t, 0, len(atts))
|
||||
})
|
||||
}
|
||||
|
||||
func TestPool_InsertPayloadAttestation(t *testing.T) {
|
||||
t.Run("nil message", func(t *testing.T) {
|
||||
pool := NewPool()
|
||||
err := pool.InsertPayloadAttestation(nil)
|
||||
require.ErrorContains(t, "nil payload attestation message", err)
|
||||
})
|
||||
|
||||
t.Run("nil data", func(t *testing.T) {
|
||||
pool := NewPool()
|
||||
err := pool.InsertPayloadAttestation(ðpb.PayloadAttestationMessage{})
|
||||
require.ErrorContains(t, "nil payload attestation message", err)
|
||||
})
|
||||
|
||||
t.Run("insert creates new entry", func(t *testing.T) {
|
||||
pool := NewPool()
|
||||
msg := ðpb.PayloadAttestationMessage{
|
||||
ValidatorIndex: 0,
|
||||
Data: ðpb.PayloadAttestationData{
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Slot: 1,
|
||||
PayloadPresent: true,
|
||||
BlobDataAvailable: false,
|
||||
},
|
||||
Signature: make([]byte, 96),
|
||||
}
|
||||
require.NoError(t, pool.InsertPayloadAttestation(msg))
|
||||
atts := pool.PendingPayloadAttestations()
|
||||
assert.Equal(t, 1, len(atts))
|
||||
})
|
||||
|
||||
t.Run("duplicate data does not create second entry", func(t *testing.T) {
|
||||
pool := NewPool()
|
||||
msg1 := ðpb.PayloadAttestationMessage{
|
||||
ValidatorIndex: 0,
|
||||
Data: ðpb.PayloadAttestationData{
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Slot: 1,
|
||||
PayloadPresent: true,
|
||||
BlobDataAvailable: false,
|
||||
},
|
||||
Signature: make([]byte, 96),
|
||||
}
|
||||
msg2 := ðpb.PayloadAttestationMessage{
|
||||
ValidatorIndex: 1,
|
||||
Data: ðpb.PayloadAttestationData{
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Slot: 1,
|
||||
PayloadPresent: true,
|
||||
BlobDataAvailable: false,
|
||||
},
|
||||
Signature: make([]byte, 96),
|
||||
}
|
||||
require.NoError(t, pool.InsertPayloadAttestation(msg1))
|
||||
require.NoError(t, pool.InsertPayloadAttestation(msg2))
|
||||
atts := pool.PendingPayloadAttestations()
|
||||
assert.Equal(t, 1, len(atts))
|
||||
})
|
||||
}
|
||||
|
||||
func TestPool_MarkIncluded(t *testing.T) {
|
||||
t.Run("mark included removes from pool", func(t *testing.T) {
|
||||
pool := NewPool()
|
||||
msg := ðpb.PayloadAttestationMessage{
|
||||
ValidatorIndex: 0,
|
||||
Data: ðpb.PayloadAttestationData{
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Slot: 1,
|
||||
PayloadPresent: true,
|
||||
BlobDataAvailable: false,
|
||||
},
|
||||
Signature: make([]byte, 96),
|
||||
}
|
||||
require.NoError(t, pool.InsertPayloadAttestation(msg))
|
||||
assert.Equal(t, 1, len(pool.PendingPayloadAttestations()))
|
||||
|
||||
pool.MarkIncluded(ðpb.PayloadAttestation{
|
||||
Data: ðpb.PayloadAttestationData{
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Slot: 1,
|
||||
PayloadPresent: true,
|
||||
BlobDataAvailable: false,
|
||||
},
|
||||
})
|
||||
assert.Equal(t, 0, len(pool.PendingPayloadAttestations()))
|
||||
})
|
||||
|
||||
t.Run("mark included with non-matching data does nothing", func(t *testing.T) {
|
||||
pool := NewPool()
|
||||
msg := ðpb.PayloadAttestationMessage{
|
||||
ValidatorIndex: 0,
|
||||
Data: ðpb.PayloadAttestationData{
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Slot: 1,
|
||||
PayloadPresent: true,
|
||||
BlobDataAvailable: false,
|
||||
},
|
||||
Signature: make([]byte, 96),
|
||||
}
|
||||
require.NoError(t, pool.InsertPayloadAttestation(msg))
|
||||
|
||||
pool.MarkIncluded(ðpb.PayloadAttestation{
|
||||
Data: ðpb.PayloadAttestationData{
|
||||
BeaconBlockRoot: make([]byte, 32),
|
||||
Slot: 999,
|
||||
PayloadPresent: true,
|
||||
BlobDataAvailable: false,
|
||||
},
|
||||
})
|
||||
assert.Equal(t, 1, len(pool.PendingPayloadAttestations()))
|
||||
})
|
||||
|
||||
t.Run("mark included with nil is safe", func(t *testing.T) {
|
||||
pool := NewPool()
|
||||
pool.MarkIncluded(nil)
|
||||
pool.MarkIncluded(ðpb.PayloadAttestation{})
|
||||
})
|
||||
}
|
||||
@@ -26,7 +26,6 @@ go_library(
|
||||
"//beacon-chain/light-client:go_default_library",
|
||||
"//beacon-chain/operations/attestations:go_default_library",
|
||||
"//beacon-chain/operations/blstoexec:go_default_library",
|
||||
"//beacon-chain/operations/payloadattestation:go_default_library",
|
||||
"//beacon-chain/operations/slashings:go_default_library",
|
||||
"//beacon-chain/operations/synccommittee:go_default_library",
|
||||
"//beacon-chain/operations/voluntaryexits:go_default_library",
|
||||
|
||||
@@ -217,7 +217,6 @@ func (s *Service) validatorEndpoints(
|
||||
OperationNotifier: s.cfg.OperationNotifier,
|
||||
TrackedValidatorsCache: s.cfg.TrackedValidatorsCache,
|
||||
PayloadIDCache: s.cfg.PayloadIDCache,
|
||||
PayloadAttestationPool: s.cfg.PayloadAttestationPool,
|
||||
CoreService: coreService,
|
||||
BlockRewardFetcher: rewardFetcher,
|
||||
}
|
||||
@@ -391,16 +390,6 @@ func (s *Service) validatorEndpoints(
|
||||
handler: server.SyncCommitteeSelections,
|
||||
methods: []string{http.MethodPost},
|
||||
},
|
||||
{
|
||||
template: "/eth/v1/validator/payload_attestation_data/{slot}",
|
||||
name: namespace + ".GetPayloadAttestationData",
|
||||
middleware: []middleware.Middleware{
|
||||
middleware.AcceptHeaderHandler([]string{api.JsonMediaType, api.OctetStreamMediaType}),
|
||||
middleware.AcceptEncodingHeaderHandler(),
|
||||
},
|
||||
handler: server.GetPayloadAttestationData,
|
||||
methods: []string{http.MethodGet},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -523,7 +512,6 @@ func (s *Service) beaconEndpoints(
|
||||
SyncChecker: s.cfg.SyncService,
|
||||
ExecutionReconstructor: s.cfg.ExecutionReconstructor,
|
||||
BLSChangesPool: s.cfg.BLSChangesPool,
|
||||
PayloadAttestationPool: s.cfg.PayloadAttestationPool,
|
||||
FinalizationFetcher: s.cfg.FinalizationFetcher,
|
||||
ForkchoiceFetcher: s.cfg.ForkchoiceFetcher,
|
||||
CoreService: coreService,
|
||||
@@ -881,27 +869,6 @@ func (s *Service) beaconEndpoints(
|
||||
handler: server.GetProposerLookahead,
|
||||
methods: []string{http.MethodGet},
|
||||
},
|
||||
{
|
||||
template: "/eth/v1/beacon/pool/payload_attestations",
|
||||
name: namespace + ".ListPayloadAttestations",
|
||||
middleware: []middleware.Middleware{
|
||||
middleware.AcceptHeaderHandler([]string{api.JsonMediaType}),
|
||||
middleware.AcceptEncodingHeaderHandler(),
|
||||
},
|
||||
handler: server.ListPayloadAttestations,
|
||||
methods: []string{http.MethodGet},
|
||||
},
|
||||
{
|
||||
template: "/eth/v1/beacon/pool/payload_attestations",
|
||||
name: namespace + ".SubmitPayloadAttestations",
|
||||
middleware: []middleware.Middleware{
|
||||
middleware.ContentTypeHandler([]string{api.JsonMediaType}),
|
||||
middleware.AcceptHeaderHandler([]string{api.JsonMediaType}),
|
||||
middleware.AcceptEncodingHeaderHandler(),
|
||||
},
|
||||
handler: server.SubmitPayloadAttestations,
|
||||
methods: []string{http.MethodPost},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -48,7 +48,6 @@ func Test_endpoints(t *testing.T) {
|
||||
"/eth/v1/beacon/pool/sync_committees": {http.MethodPost},
|
||||
"/eth/v1/beacon/pool/voluntary_exits": {http.MethodGet, http.MethodPost},
|
||||
"/eth/v1/beacon/pool/bls_to_execution_changes": {http.MethodGet, http.MethodPost},
|
||||
"/eth/v1/beacon/pool/payload_attestations": {http.MethodGet, http.MethodPost},
|
||||
"/prysm/v1/beacon/individual_votes": {http.MethodPost},
|
||||
}
|
||||
|
||||
@@ -92,23 +91,22 @@ func Test_endpoints(t *testing.T) {
|
||||
}
|
||||
|
||||
validatorRoutes := map[string][]string{
|
||||
"/eth/v1/validator/duties/attester/{epoch}": {http.MethodPost},
|
||||
"/eth/v1/validator/duties/proposer/{epoch}": {http.MethodGet},
|
||||
"/eth/v1/validator/duties/sync/{epoch}": {http.MethodPost},
|
||||
"/eth/v3/validator/blocks/{slot}": {http.MethodGet},
|
||||
"/eth/v1/validator/attestation_data": {http.MethodGet},
|
||||
"/eth/v2/validator/aggregate_attestation": {http.MethodGet},
|
||||
"/eth/v2/validator/aggregate_and_proofs": {http.MethodPost},
|
||||
"/eth/v1/validator/beacon_committee_subscriptions": {http.MethodPost},
|
||||
"/eth/v1/validator/sync_committee_subscriptions": {http.MethodPost},
|
||||
"/eth/v1/validator/beacon_committee_selections": {http.MethodPost},
|
||||
"/eth/v1/validator/sync_committee_selections": {http.MethodPost},
|
||||
"/eth/v1/validator/sync_committee_contribution": {http.MethodGet},
|
||||
"/eth/v1/validator/contribution_and_proofs": {http.MethodPost},
|
||||
"/eth/v1/validator/prepare_beacon_proposer": {http.MethodPost},
|
||||
"/eth/v1/validator/register_validator": {http.MethodPost},
|
||||
"/eth/v1/validator/liveness/{epoch}": {http.MethodPost},
|
||||
"/eth/v1/validator/payload_attestation_data/{slot}": {http.MethodGet},
|
||||
"/eth/v1/validator/duties/attester/{epoch}": {http.MethodPost},
|
||||
"/eth/v1/validator/duties/proposer/{epoch}": {http.MethodGet},
|
||||
"/eth/v1/validator/duties/sync/{epoch}": {http.MethodPost},
|
||||
"/eth/v3/validator/blocks/{slot}": {http.MethodGet},
|
||||
"/eth/v1/validator/attestation_data": {http.MethodGet},
|
||||
"/eth/v2/validator/aggregate_attestation": {http.MethodGet},
|
||||
"/eth/v2/validator/aggregate_and_proofs": {http.MethodPost},
|
||||
"/eth/v1/validator/beacon_committee_subscriptions": {http.MethodPost},
|
||||
"/eth/v1/validator/sync_committee_subscriptions": {http.MethodPost},
|
||||
"/eth/v1/validator/beacon_committee_selections": {http.MethodPost},
|
||||
"/eth/v1/validator/sync_committee_selections": {http.MethodPost},
|
||||
"/eth/v1/validator/sync_committee_contribution": {http.MethodGet},
|
||||
"/eth/v1/validator/contribution_and_proofs": {http.MethodPost},
|
||||
"/eth/v1/validator/prepare_beacon_proposer": {http.MethodPost},
|
||||
"/eth/v1/validator/register_validator": {http.MethodPost},
|
||||
"/eth/v1/validator/liveness/{epoch}": {http.MethodPost},
|
||||
}
|
||||
|
||||
prysmBeaconRoutes := map[string][]string{
|
||||
|
||||
@@ -32,7 +32,6 @@ go_library(
|
||||
"//beacon-chain/execution:go_default_library",
|
||||
"//beacon-chain/operations/attestations:go_default_library",
|
||||
"//beacon-chain/operations/blstoexec:go_default_library",
|
||||
"//beacon-chain/operations/payloadattestation:go_default_library",
|
||||
"//beacon-chain/operations/slashings:go_default_library",
|
||||
"//beacon-chain/operations/voluntaryexits:go_default_library",
|
||||
"//beacon-chain/p2p:go_default_library",
|
||||
|
||||
@@ -893,89 +893,3 @@ func (s *Server) SubmitProposerSlashing(w http.ResponseWriter, r *http.Request)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// SubmitPayloadAttestations submits payload attestation messages to the node's pool.
|
||||
func (s *Server) SubmitPayloadAttestations(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitPayloadAttestations")
|
||||
defer span.End()
|
||||
|
||||
if shared.IsSyncing(ctx, w, s.SyncChecker, s.HeadFetcher, s.TimeFetcher, s.OptimisticModeFetcher) {
|
||||
return
|
||||
}
|
||||
|
||||
versionHeader := r.Header.Get(api.VersionHeader)
|
||||
if versionHeader == "" {
|
||||
httputil.HandleError(w, api.VersionHeader+" header is required", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var msgs []*structs.PayloadAttestationMessage
|
||||
if err := json.NewDecoder(r.Body).Decode(&msgs); err != nil {
|
||||
httputil.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var failures []*server.IndexedError
|
||||
for i, msg := range msgs {
|
||||
consensusMsg, err := msg.ToConsensus()
|
||||
if err != nil {
|
||||
failures = append(failures, &server.IndexedError{
|
||||
Index: i,
|
||||
Message: "Could not convert message: " + err.Error(),
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: Add full gossip validation (BLS signatures, PTC membership).
|
||||
if err := s.PayloadAttestationPool.InsertPayloadAttestation(consensusMsg); err != nil {
|
||||
failures = append(failures, &server.IndexedError{
|
||||
Index: i,
|
||||
Message: "Could not insert payload attestation: " + err.Error(),
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
if err := s.Broadcaster.Broadcast(ctx, consensusMsg); err != nil {
|
||||
log.WithError(err).Error("Could not broadcast payload attestation message")
|
||||
}
|
||||
}
|
||||
|
||||
if len(failures) > 0 {
|
||||
failuresErr := &server.IndexedErrorContainer{
|
||||
Code: http.StatusBadRequest,
|
||||
Message: server.ErrIndexedValidationFail,
|
||||
Failures: failures,
|
||||
}
|
||||
httputil.WriteError(w, failuresErr)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// ListPayloadAttestations retrieves payload attestations from the pool.
|
||||
func (s *Server) ListPayloadAttestations(w http.ResponseWriter, r *http.Request) {
|
||||
_, span := trace.StartSpan(r.Context(), "beacon.ListPayloadAttestations")
|
||||
defer span.End()
|
||||
|
||||
rawSlot, slot, ok := shared.UintFromQuery(w, r, "slot", false)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
var atts []*eth.PayloadAttestation
|
||||
if rawSlot != "" {
|
||||
atts = s.PayloadAttestationPool.PendingPayloadAttestations(primitives.Slot(slot))
|
||||
} else {
|
||||
atts = s.PayloadAttestationPool.PendingPayloadAttestations()
|
||||
}
|
||||
|
||||
data := make([]*structs.PayloadAttestation, len(atts))
|
||||
for i, att := range atts {
|
||||
data[i] = structs.PayloadAttestationFromConsensus(att)
|
||||
}
|
||||
|
||||
w.Header().Set(api.VersionHeader, version.String(version.Gloas))
|
||||
httputil.WriteJson(w, &structs.GetPoolPayloadAttestationsResponse{
|
||||
Version: version.String(version.Gloas),
|
||||
Data: data,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/execution"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/attestations"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/blstoexec"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/payloadattestation"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/slashings"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
|
||||
@@ -49,7 +48,6 @@ type Server struct {
|
||||
ExecutionReconstructor execution.Reconstructor
|
||||
FinalizationFetcher blockchain.FinalizationFetcher
|
||||
BLSChangesPool blstoexec.PoolManager
|
||||
PayloadAttestationPool payloadattestation.PoolManager
|
||||
ForkchoiceFetcher blockchain.ForkchoiceFetcher
|
||||
CoreService *core.Service
|
||||
AttestationStateFetcher blockchain.AttestationStateFetcher
|
||||
|
||||
@@ -21,7 +21,6 @@ go_library(
|
||||
"//beacon-chain/core/helpers:go_default_library",
|
||||
"//beacon-chain/db:go_default_library",
|
||||
"//beacon-chain/operations/attestations:go_default_library",
|
||||
"//beacon-chain/operations/payloadattestation:go_default_library",
|
||||
"//beacon-chain/operations/synccommittee:go_default_library",
|
||||
"//beacon-chain/p2p:go_default_library",
|
||||
"//beacon-chain/rpc/core:go_default_library",
|
||||
|
||||
@@ -1427,54 +1427,3 @@ func sortProposerDuties(duties []*structs.ProposerDuty) error {
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// GetPayloadAttestationData produces payload attestation data for the requested slot.
|
||||
func (s *Server) GetPayloadAttestationData(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, span := trace.StartSpan(r.Context(), "validator.GetPayloadAttestationData")
|
||||
defer span.End()
|
||||
|
||||
if shared.IsSyncing(ctx, w, s.SyncChecker, s.HeadFetcher, s.TimeFetcher, s.OptimisticModeFetcher) {
|
||||
return
|
||||
}
|
||||
|
||||
_, slot, ok := shared.UintFromRoute(w, r, "slot")
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
headRoot, err := s.HeadFetcher.HeadRoot(ctx)
|
||||
if err != nil {
|
||||
httputil.HandleError(w, "Could not get head root: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// TODO: Determine payload_present and blob_data_available from the SignedExecutionPayloadEnvelope
|
||||
// once that handling is implemented. For now, stub with false.
|
||||
payloadPresent := false
|
||||
blobDataAvailable := false
|
||||
|
||||
data := ðpbalpha.PayloadAttestationData{
|
||||
BeaconBlockRoot: headRoot,
|
||||
Slot: primitives.Slot(slot),
|
||||
PayloadPresent: payloadPresent,
|
||||
BlobDataAvailable: blobDataAvailable,
|
||||
}
|
||||
|
||||
if httputil.RespondWithSsz(r) {
|
||||
sszData, err := data.MarshalSSZ()
|
||||
if err != nil {
|
||||
httputil.HandleError(w, "Could not marshal payload attestation data: "+err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.Header().Set(api.VersionHeader, version.String(version.Gloas))
|
||||
httputil.WriteSsz(w, sszData)
|
||||
return
|
||||
}
|
||||
|
||||
response := &structs.GetPayloadAttestationDataResponse{
|
||||
Version: version.String(version.Gloas),
|
||||
Data: structs.PayloadAttestationDataFromConsensus(data),
|
||||
}
|
||||
w.Header().Set(api.VersionHeader, version.String(version.Gloas))
|
||||
httputil.WriteJson(w, response)
|
||||
}
|
||||
|
||||
@@ -7,7 +7,6 @@ import (
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/core/feed/operation"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/db"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/attestations"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/payloadattestation"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/synccommittee"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/p2p"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/rpc/core"
|
||||
@@ -39,5 +38,4 @@ type Server struct {
|
||||
BlockRewardFetcher rewards.BlockRewardsFetcher
|
||||
TrackedValidatorsCache *cache.TrackedValidatorsCache
|
||||
PayloadIDCache *cache.PayloadIDCache
|
||||
PayloadAttestationPool payloadattestation.PoolManager
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@ import (
|
||||
lightClient "github.com/OffchainLabs/prysm/v7/beacon-chain/light-client"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/attestations"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/blstoexec"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/payloadattestation"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/slashings"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/synccommittee"
|
||||
"github.com/OffchainLabs/prysm/v7/beacon-chain/operations/voluntaryexits"
|
||||
@@ -104,7 +103,6 @@ type Config struct {
|
||||
SlashingsPool slashings.PoolManager
|
||||
SyncCommitteeObjectPool synccommittee.Pool
|
||||
BLSChangesPool blstoexec.PoolManager
|
||||
PayloadAttestationPool payloadattestation.PoolManager
|
||||
SyncService chainSync.Checker
|
||||
Broadcaster p2p.Broadcaster
|
||||
PeersFetcher p2p.PeersProvider
|
||||
|
||||
@@ -285,10 +285,10 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
||||
ctx, span := trace.StartSpan(ctx, "validator.gRPCClient.StartEventStream")
|
||||
defer span.End()
|
||||
if len(topics) == 0 {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventError,
|
||||
Data: []byte(errors.New("no topics were added").Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
// TODO(13563): ONLY WORKS WITH HEAD TOPIC.
|
||||
@@ -299,10 +299,10 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
||||
}
|
||||
}
|
||||
if !containsHead {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, "gRPC only supports the head topic, and head topic was not passed").Error()),
|
||||
}
|
||||
})
|
||||
}
|
||||
if containsHead && len(topics) > 1 {
|
||||
log.Warn("gRPC only supports the head topic, other topics will be ignored")
|
||||
@@ -310,10 +310,10 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
||||
|
||||
stream, err := c.beaconNodeValidatorClient.StreamSlots(ctx, ðpb.StreamSlotsRequest{VerifiedOnly: true})
|
||||
if err != nil {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, err.Error()).Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
c.isEventStreamRunning = true
|
||||
@@ -327,25 +327,25 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
||||
if ctx.Err() != nil {
|
||||
c.isEventStreamRunning = false
|
||||
if errors.Is(ctx.Err(), context.Canceled) {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, ctx.Err().Error()).Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventError,
|
||||
Data: []byte(ctx.Err().Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
res, err := stream.Recv()
|
||||
if err != nil {
|
||||
c.isEventStreamRunning = false
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventConnectionError,
|
||||
Data: []byte(errors.Wrap(client.ErrConnectionIssue, err.Error()).Error()),
|
||||
}
|
||||
})
|
||||
return
|
||||
}
|
||||
if res == nil {
|
||||
@@ -357,15 +357,15 @@ func (c *grpcValidatorClient) StartEventStream(ctx context.Context, topics []str
|
||||
CurrentDutyDependentRoot: hexutil.Encode(res.CurrentDutyDependentRoot),
|
||||
})
|
||||
if err != nil {
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventError,
|
||||
Data: []byte(errors.Wrap(err, "failed to marshal Head Event").Error()),
|
||||
}
|
||||
})
|
||||
}
|
||||
eventsChannel <- &eventClient.Event{
|
||||
eventClient.PublishEvent(eventsChannel, &eventClient.Event{
|
||||
EventType: eventClient.EventHead,
|
||||
Data: b,
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -223,7 +223,7 @@ func TestStartEventStream(t *testing.T) {
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
eventsChannel := make(chan *eventClient.Event, 1) // Buffer to prevent blocking
|
||||
eventsChannel := make(chan *eventClient.Event, 4) // Buffer to prevent blocking
|
||||
tc.prepare() // Setup mock expectations
|
||||
|
||||
go grpcClient.StartEventStream(ctx, tc.topics, eventsChannel)
|
||||
|
||||
@@ -441,7 +441,6 @@ func TestRunnerPushesProposerSettings_ValidContext(t *testing.T) {
|
||||
defer assertValidContext(t, timedCtx, ctx)
|
||||
delay(t)
|
||||
})
|
||||
vcm.EXPECT().EventStreamIsRunning().Return(true).AnyTimes().Do(func() { delay(t) })
|
||||
vcm.EXPECT().SubmitValidatorRegistrations(liveCtx, gomock.Any()).Do(func(ctx context.Context, _ any) {
|
||||
defer assertValidContext(t, timedCtx, ctx) // This is the specific regression test assertion for PR 15369.
|
||||
delay(t)
|
||||
|
||||
@@ -66,6 +66,8 @@ type ValidatorService struct {
|
||||
closeClientFunc func() // validator client stop function is used here
|
||||
}
|
||||
|
||||
const eventChannelBufferSize = 32
|
||||
|
||||
// Config for the validator service.
|
||||
type Config struct {
|
||||
Validator iface.Validator
|
||||
@@ -234,7 +236,7 @@ func (v *ValidatorService) Start() {
|
||||
distributed: v.distributed,
|
||||
disableDutiesPolling: v.disableDutiesPolling,
|
||||
accountsChangedChannel: make(chan [][fieldparams.BLSPubkeyLength]byte, 1),
|
||||
eventsChannel: make(chan *eventClient.Event, 1),
|
||||
eventsChannel: make(chan *eventClient.Event, eventChannelBufferSize),
|
||||
}
|
||||
|
||||
hm := newHealthMonitor(v.ctx, v.cancel, v.maxHealthChecks, v.validator)
|
||||
|
||||
@@ -64,6 +64,11 @@ var (
|
||||
msgNoKeysFetched = "No validating keys fetched. Waiting for keys..."
|
||||
)
|
||||
|
||||
const (
|
||||
eventStreamStopped uint32 = iota
|
||||
eventStreamRunning
|
||||
)
|
||||
|
||||
type validator struct {
|
||||
logValidatorPerformance bool
|
||||
distributed bool
|
||||
@@ -82,6 +87,7 @@ type validator struct {
|
||||
cachedAttestationData *ethpb.AttestationData
|
||||
accountsChangedChannel chan [][fieldparams.BLSPubkeyLength]byte
|
||||
eventsChannel chan *eventClient.Event
|
||||
eventStreamState atomic.Uint32
|
||||
highestValidSlot primitives.Slot
|
||||
submittedAggregates map[submittedAttKey]*submittedAtt
|
||||
graffitiStruct *graffiti.Graffiti
|
||||
@@ -1211,12 +1217,40 @@ func (v *validator) PushProposerSettings(ctx context.Context, slot primitives.Sl
|
||||
}
|
||||
|
||||
func (v *validator) StartEventStream(ctx context.Context, topics []string) {
|
||||
if v.EventStreamIsRunning() {
|
||||
if !v.eventStreamState.CompareAndSwap(eventStreamStopped, eventStreamRunning) {
|
||||
log.Debug("EventStream is already running")
|
||||
return
|
||||
}
|
||||
log.WithField("topics", topics).Info("Starting event stream")
|
||||
v.validatorClient.StartEventStream(ctx, topics, v.eventsChannel)
|
||||
go v.runEventStream(ctx, topics)
|
||||
}
|
||||
|
||||
func (v *validator) runEventStream(ctx context.Context, topics []string) {
|
||||
defer v.eventStreamState.Store(eventStreamStopped)
|
||||
backoff := time.Second
|
||||
const maxBackoff = 30 * time.Second
|
||||
|
||||
for {
|
||||
v.validatorClient.StartEventStream(ctx, topics, v.eventsChannel)
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
log.WithField("retryIn", backoff).Warn("Event stream ended unexpectedly, attempting to resubscribe")
|
||||
timer := time.NewTimer(backoff)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
case <-timer.C:
|
||||
}
|
||||
if backoff < maxBackoff {
|
||||
backoff *= 2
|
||||
if backoff > maxBackoff {
|
||||
backoff = maxBackoff
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (v *validator) checkDependentRoots(ctx context.Context, head *structs.HeadEvent) error {
|
||||
@@ -1303,7 +1337,7 @@ func (v *validator) ProcessEvent(ctx context.Context, event *eventClient.Event)
|
||||
}
|
||||
|
||||
func (v *validator) EventStreamIsRunning() bool {
|
||||
return v.validatorClient.EventStreamIsRunning()
|
||||
return v.eventStreamState.Load() == eventStreamRunning
|
||||
}
|
||||
|
||||
func (v *validator) Host() string {
|
||||
|
||||
Reference in New Issue
Block a user